Compare commits

...

7 Commits

Author SHA1 Message Date
Aleix Conchillo Flaqué
644babe241 scripts(evals): run evals in parallel 2026-03-27 15:37:01 -07:00
kompfner
b33df03724 Merge pull request #4179 from pipecat-ai/pk/fix-gemini-live-vertex
Don't send history_config for Gemini Live Vertex (unsupported)
2026-03-27 17:34:29 -04:00
Paul Kompfner
28fbe1db08 Don't send history_config for Gemini Live Vertex (unsupported) 2026-03-27 17:30:47 -04:00
kompfner
9240e92d9f Merge pull request #4177 from pipecat-ai/pk/tweak-26i-for-gemini-3.1-flash-live-support
Tweak 26i example system instruction for Gemini 3.1 Flash Live compat…
2026-03-27 17:20:06 -04:00
Paul Kompfner
5caf53f086 Tweak 26i example system instruction for Gemini 3.1 Flash Live compatibility
Gemini 3.1 Flash Live won't reliably report ending its turn until
after it says something following a tool call. Restructure the system
instruction so the model says goodbye *after* calling
end_conversation, and add a comment explaining the deferred EndFrame
behavior that makes this work.
2026-03-27 17:13:17 -04:00
Mark Backman
ac2716811c Merge pull request #4176 from pipecat-ai/mb/fix-websocket-rtvi-messages
Fix RTVI events not delivered over WebSocket transports
2026-03-27 16:50:37 -04:00
Mark Backman
d313d56776 Fix RTVI events not delivered over WebSocket transports
The base serializer filters out RTVI protocol messages by default
(ignore_rtvi_messages=True) to prevent them from being sent over
telephony media streams. ProtobufFrameSerializer is used by WebSocket
transports, which are the delivery channel for these messages, so
disable the filter there.
2026-03-27 16:47:11 -04:00
7 changed files with 115 additions and 33 deletions

1
changelog/4176.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed RTVI events not being delivered to clients when using WebSocket transports. `ProtobufFrameSerializer` now sets `ignore_rtvi_messages=False` by default.

View File

@@ -51,6 +51,12 @@ async def end_conversation(params: FunctionCallParams):
await params.llm.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM)
# NOTE: we can ask the model to say something *after* the call to
# end_conversation because GeminiLiveLLMService defers processing EndFrames
# until after the bot finishes its current turn. With Gemini 3.1 Flash Live,
# the model won't reliably report ending its turn until after it says something
# following the tool call, which is why the system instruction is structured
# the way it is.
system_instruction = """
You are a helpful assistant who can answer questions and use tools.
@@ -59,9 +65,10 @@ You have three tools available to you:
2. get_restaurant_recommendation: Use this tool to get a restaurant recommendation in a specific location.
3. end_conversation: Use this tool to gracefully end the conversation.
After you've responded to the user three times, do two things, in order:
1. Politely let them know that that's all the time you have today and say goodbye.
2. *WITHOUT WAITING FOR THE USER TO RESPOND*, call the end_conversation tool to gracefully end the conversation.
After you've responded to the user three times, do the following:
1. Politely let them know that that's all the time you have today (but don't say "goodbye" yet).
2. Then immediately call the end_conversation function. *DO NOT FORGET TO DO THIS STEP.*
3. After the tool reports success, say goodbye.
"""

View File

@@ -16,6 +16,7 @@ from pathlib import Path
from typing import Any, List, Optional, Tuple
import aiofiles
import aiohttp
from loguru import logger
from PIL.ImageFile import ImageFile
from utils import (
@@ -53,6 +54,7 @@ from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.daily.transport import DailyParams, DailyTransport
from pipecat.transports.daily.utils import DailyRESTHelper, DailyRoomObject, DailyRoomParams
SCRIPT_DIR = Path(__file__).resolve().parent
@@ -87,7 +89,6 @@ class EvalRunner:
self._log_level = log_level
self._total_success = 0
self._tests: List[EvalResult] = []
self._result_future: Optional[asyncio.Future[bool]] = None
# We to save runner files.
name = name or f"{datetime.now().strftime('%Y%m%d_%H%M%S')}"
@@ -97,17 +98,6 @@ class EvalRunner:
os.makedirs(self._logs_dir, exist_ok=True)
os.makedirs(self._recordings_dir, exist_ok=True)
async def function_assert_eval(self, params: FunctionCallParams):
result = params.arguments["result"]
reasoning = params.arguments["reasoning"]
logger.debug(f"🧠 EVAL REASONING(result: {result}): {reasoning}")
await params.result_callback(None)
await params.llm.push_frame(EndTaskFrame(reason=result), FrameDirection.UPSTREAM)
async def assert_eval(self, result: bool):
if self._result_future:
self._result_future.set_result(result)
async def run_eval(
self,
example_file: str,
@@ -116,23 +106,44 @@ class EvalRunner:
if not re.match(self._pattern, example_file):
return
print(f"\n🚀 Launching {example_file} ...\n")
# Store logs
filename = self._log_file_name(example_file)
log_file_id = logger.add(filename, level=self._log_level)
print_begin_test(example_file)
# Create Daily Room
logger.info("Creating Daily room...")
async with aiohttp.ClientSession() as aiohttp_session:
helper = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY"), aiohttp_session=aiohttp_session
)
room = await helper.create_room(params=DailyRoomParams())
logger.info(f"Created Daily room {room.url}")
script_path = self._examples_dir / example_file
start_time = time.time()
# Create a future to store the eval result.
self._result_future = asyncio.get_running_loop().create_future()
# Create per-eval future and callbacks so multiple evals can run concurrently.
result_future = asyncio.get_running_loop().create_future()
function_assert_eval, assert_eval = self._create_eval_callbacks(result_future)
try:
tasks = [
asyncio.create_task(run_example_pipeline(script_path, eval_config)),
asyncio.create_task(run_eval_pipeline(self, example_file, eval_config)),
asyncio.create_task(run_example_pipeline(room.url, script_path, eval_config)),
asyncio.create_task(
run_eval_pipeline(
self,
example_file,
eval_config,
room_url=room.url,
function_assert_eval=function_assert_eval,
assert_eval=assert_eval,
)
),
]
_, pending = await asyncio.wait(tasks, timeout=EVAL_TIMEOUT_SECS)
if pending:
@@ -149,7 +160,7 @@ class EvalRunner:
try:
# Wait for the future to resolve.
result = await asyncio.wait_for(self._result_future, timeout=EVAL_RESULT_TIMEOUT_SECS)
result = await asyncio.wait_for(result_future, timeout=EVAL_RESULT_TIMEOUT_SECS)
except asyncio.TimeoutError:
logger.error(f"ERROR: Timeout waiting for eval result.")
result = False
@@ -163,6 +174,15 @@ class EvalRunner:
print_end_test(example_file, result, eval_time)
# Delete Daily Room
logger.info(f"Deleting Daily room {room.url} ...")
async with aiohttp.ClientSession() as aiohttp_session:
helper = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY"), aiohttp_session=aiohttp_session
)
await helper.delete_room_by_name(room.name)
logger.info(f"Deleted Daily room {room.url}")
logger.remove(log_file_id)
def print_results(self):
@@ -183,6 +203,25 @@ class EvalRunner:
else:
logger.warning(f"There's no audio to save for {name}")
def _create_eval_callbacks(self, result_future: asyncio.Future):
"""Create per-eval callback functions that capture a local result_future.
This allows multiple evals to run concurrently without sharing state.
"""
async def function_assert_eval(params: FunctionCallParams):
result = params.arguments["result"]
reasoning = params.arguments["reasoning"]
logger.debug(f"🧠 EVAL REASONING(result: {result}): {reasoning}")
await params.result_callback(None)
await params.llm.push_frame(EndTaskFrame(reason=result), FrameDirection.UPSTREAM)
async def assert_eval(result: bool):
if not result_future.done():
result_future.set_result(result)
return function_assert_eval, assert_eval
def _base_file_name(self, example_file: str):
base_name = os.path.splitext(example_file)[0]
return f"{base_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
@@ -196,9 +235,7 @@ class EvalRunner:
return os.path.join(self._recordings_dir, f"{base_name}.wav")
async def run_example_pipeline(script_path: Path, eval_config: EvalConfig):
room_url = os.getenv("DAILY_ROOM_URL")
async def run_example_pipeline(room_url: str, script_path: Path, eval_config: EvalConfig):
module = load_module_from_path(script_path)
transport = DailyTransport(
@@ -223,11 +260,13 @@ async def run_eval_pipeline(
eval_runner: EvalRunner,
example_file: str,
eval_config: EvalConfig,
*,
room_url: str,
function_assert_eval,
assert_eval,
):
logger.info(f"Starting eval bot")
room_url = os.getenv("DAILY_ROOM_URL")
transport = DailyTransport(
room_url,
None,
@@ -306,7 +345,7 @@ async def run_eval_pipeline(
),
)
llm.register_function("eval_function", eval_runner.function_assert_eval)
llm.register_function("eval_function", function_assert_eval)
context = LLMContext(tools=tools)
context_aggregator = LLMContextAggregatorPair(
@@ -374,9 +413,9 @@ async def run_eval_pipeline(
@task.event_handler("on_pipeline_finished")
async def on_pipeline_finished(task, frame):
if isinstance(frame, EndFrame):
await eval_runner.assert_eval(frame.reason)
await assert_eval(frame.reason)
elif isinstance(frame, CancelFrame):
await eval_runner.assert_eval(False)
await assert_eval(False)
# TODO(aleix): We should handle SIGINT and SIGTERM so we can cancel both the
# eval and the example.

View File

@@ -321,11 +321,18 @@ async def main(args: argparse.Namespace):
log_level=log_level,
)
# Parse test config: (test, prompt, eval, user_speaks_first)
for test_config in TESTS:
test, eval_config = test_config
concurrency = args.concurrency
semaphore = asyncio.Semaphore(concurrency)
await runner.run_eval(test, eval_config)
async def run_with_semaphore(test: str, eval_config: EvalConfig):
async with semaphore:
await runner.run_eval(test, eval_config)
tasks = []
for test, eval_config in TESTS:
tasks.append(asyncio.create_task(run_with_semaphore(test, eval_config)))
await asyncio.gather(*tasks)
runner.print_results()
@@ -333,6 +340,13 @@ async def main(args: argparse.Namespace):
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pipecat Eval Runner")
parser.add_argument("--audio", "-a", action="store_true", help="Record audio for each test")
parser.add_argument(
"--concurrency",
"-c",
type=int,
default=3,
help="Max number of evals to run concurrently (default: 3)",
)
parser.add_argument("--name", "-n", help="Name for the current runner (e.g. 'v.0.0.68')")
parser.add_argument("--pattern", "-p", help="Only run tests that match the pattern")
parser.add_argument("--verbose", "-v", action="count", default=0)

View File

@@ -68,6 +68,11 @@ class ProtobufFrameSerializer(FrameSerializer):
params: Configuration parameters.
"""
super().__init__(params)
# The base serializer defaults to filtering out RTVI protocol messages
# to avoid sending them over telephony media streams. ProtobufFrameSerializer
# is used by WebSocket transports, which are the delivery channel for
# these messages, so we disable the filter.
self._params.ignore_rtvi_messages = False
async def serialize(self, frame: Frame) -> str | bytes | None:
"""Serialize a frame to Protocol Buffer binary format.

View File

@@ -1238,6 +1238,14 @@ class GeminiLiveLLMService(LLMService):
self._end_frame_deferral_timeout_task.cancel()
self._end_frame_deferral_timeout_task = None
def _get_history_config(self) -> Optional[HistoryConfig]:
"""Return the history config for the Live API connection.
Subclasses can override this to disable history config (e.g. Vertex AI
does not support it).
"""
return HistoryConfig(initial_history_in_client_content=True)
async def _connect(self, session_resumption_handle: Optional[str] = None):
"""Establish client connection to Gemini Live API."""
if self._session:
@@ -1273,9 +1281,13 @@ class GeminiLiveLLMService(LLMService):
input_audio_transcription=AudioTranscriptionConfig(),
output_audio_transcription=AudioTranscriptionConfig(),
session_resumption=SessionResumptionConfig(handle=session_resumption_handle),
history_config=HistoryConfig(initial_history_in_client_content=True),
)
# Add history config, if supported (not supported by Vertex)
history_config = self._get_history_config()
if history_config:
config.history_config = history_config
# Add context window compression to configuration, if enabled
cwc = self._settings.context_window_compression or {}
if cwc.get("enabled", False):

View File

@@ -212,6 +212,10 @@ class GeminiLiveVertexLLMService(GeminiLiveLLMService):
**kwargs,
)
def _get_history_config(self):
"""Vertex AI does not support history_config."""
return None
def create_client(self):
"""Create the Gemini client instance."""
self._client = Client(