Compare commits
7 Commits
filipi/syn
...
aleix/para
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
644babe241 | ||
|
|
b33df03724 | ||
|
|
28fbe1db08 | ||
|
|
9240e92d9f | ||
|
|
5caf53f086 | ||
|
|
ac2716811c | ||
|
|
d313d56776 |
1
changelog/4176.fixed.md
Normal file
1
changelog/4176.fixed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed RTVI events not being delivered to clients when using WebSocket transports. `ProtobufFrameSerializer` now sets `ignore_rtvi_messages=False` by default.
|
||||
@@ -51,6 +51,12 @@ async def end_conversation(params: FunctionCallParams):
|
||||
await params.llm.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM)
|
||||
|
||||
|
||||
# NOTE: we can ask the model to say something *after* the call to
|
||||
# end_conversation because GeminiLiveLLMService defers processing EndFrames
|
||||
# until after the bot finishes its current turn. With Gemini 3.1 Flash Live,
|
||||
# the model won't reliably report ending its turn until after it says something
|
||||
# following the tool call, which is why the system instruction is structured
|
||||
# the way it is.
|
||||
system_instruction = """
|
||||
You are a helpful assistant who can answer questions and use tools.
|
||||
|
||||
@@ -59,9 +65,10 @@ You have three tools available to you:
|
||||
2. get_restaurant_recommendation: Use this tool to get a restaurant recommendation in a specific location.
|
||||
3. end_conversation: Use this tool to gracefully end the conversation.
|
||||
|
||||
After you've responded to the user three times, do two things, in order:
|
||||
1. Politely let them know that that's all the time you have today and say goodbye.
|
||||
2. *WITHOUT WAITING FOR THE USER TO RESPOND*, call the end_conversation tool to gracefully end the conversation.
|
||||
After you've responded to the user three times, do the following:
|
||||
1. Politely let them know that that's all the time you have today (but don't say "goodbye" yet).
|
||||
2. Then immediately call the end_conversation function. *DO NOT FORGET TO DO THIS STEP.*
|
||||
3. After the tool reports success, say goodbye.
|
||||
"""
|
||||
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ from pathlib import Path
|
||||
from typing import Any, List, Optional, Tuple
|
||||
|
||||
import aiofiles
|
||||
import aiohttp
|
||||
from loguru import logger
|
||||
from PIL.ImageFile import ImageFile
|
||||
from utils import (
|
||||
@@ -53,6 +54,7 @@ from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.llm_service import FunctionCallParams
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.daily.transport import DailyParams, DailyTransport
|
||||
from pipecat.transports.daily.utils import DailyRESTHelper, DailyRoomObject, DailyRoomParams
|
||||
|
||||
SCRIPT_DIR = Path(__file__).resolve().parent
|
||||
|
||||
@@ -87,7 +89,6 @@ class EvalRunner:
|
||||
self._log_level = log_level
|
||||
self._total_success = 0
|
||||
self._tests: List[EvalResult] = []
|
||||
self._result_future: Optional[asyncio.Future[bool]] = None
|
||||
|
||||
# We to save runner files.
|
||||
name = name or f"{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
||||
@@ -97,17 +98,6 @@ class EvalRunner:
|
||||
os.makedirs(self._logs_dir, exist_ok=True)
|
||||
os.makedirs(self._recordings_dir, exist_ok=True)
|
||||
|
||||
async def function_assert_eval(self, params: FunctionCallParams):
|
||||
result = params.arguments["result"]
|
||||
reasoning = params.arguments["reasoning"]
|
||||
logger.debug(f"🧠 EVAL REASONING(result: {result}): {reasoning}")
|
||||
await params.result_callback(None)
|
||||
await params.llm.push_frame(EndTaskFrame(reason=result), FrameDirection.UPSTREAM)
|
||||
|
||||
async def assert_eval(self, result: bool):
|
||||
if self._result_future:
|
||||
self._result_future.set_result(result)
|
||||
|
||||
async def run_eval(
|
||||
self,
|
||||
example_file: str,
|
||||
@@ -116,23 +106,44 @@ class EvalRunner:
|
||||
if not re.match(self._pattern, example_file):
|
||||
return
|
||||
|
||||
print(f"\n🚀 Launching {example_file} ...\n")
|
||||
|
||||
# Store logs
|
||||
filename = self._log_file_name(example_file)
|
||||
log_file_id = logger.add(filename, level=self._log_level)
|
||||
|
||||
print_begin_test(example_file)
|
||||
|
||||
# Create Daily Room
|
||||
logger.info("Creating Daily room...")
|
||||
async with aiohttp.ClientSession() as aiohttp_session:
|
||||
helper = DailyRESTHelper(
|
||||
daily_api_key=os.getenv("DAILY_API_KEY"), aiohttp_session=aiohttp_session
|
||||
)
|
||||
room = await helper.create_room(params=DailyRoomParams())
|
||||
logger.info(f"Created Daily room {room.url}")
|
||||
|
||||
script_path = self._examples_dir / example_file
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
# Create a future to store the eval result.
|
||||
self._result_future = asyncio.get_running_loop().create_future()
|
||||
# Create per-eval future and callbacks so multiple evals can run concurrently.
|
||||
result_future = asyncio.get_running_loop().create_future()
|
||||
function_assert_eval, assert_eval = self._create_eval_callbacks(result_future)
|
||||
|
||||
try:
|
||||
tasks = [
|
||||
asyncio.create_task(run_example_pipeline(script_path, eval_config)),
|
||||
asyncio.create_task(run_eval_pipeline(self, example_file, eval_config)),
|
||||
asyncio.create_task(run_example_pipeline(room.url, script_path, eval_config)),
|
||||
asyncio.create_task(
|
||||
run_eval_pipeline(
|
||||
self,
|
||||
example_file,
|
||||
eval_config,
|
||||
room_url=room.url,
|
||||
function_assert_eval=function_assert_eval,
|
||||
assert_eval=assert_eval,
|
||||
)
|
||||
),
|
||||
]
|
||||
_, pending = await asyncio.wait(tasks, timeout=EVAL_TIMEOUT_SECS)
|
||||
if pending:
|
||||
@@ -149,7 +160,7 @@ class EvalRunner:
|
||||
|
||||
try:
|
||||
# Wait for the future to resolve.
|
||||
result = await asyncio.wait_for(self._result_future, timeout=EVAL_RESULT_TIMEOUT_SECS)
|
||||
result = await asyncio.wait_for(result_future, timeout=EVAL_RESULT_TIMEOUT_SECS)
|
||||
except asyncio.TimeoutError:
|
||||
logger.error(f"ERROR: Timeout waiting for eval result.")
|
||||
result = False
|
||||
@@ -163,6 +174,15 @@ class EvalRunner:
|
||||
|
||||
print_end_test(example_file, result, eval_time)
|
||||
|
||||
# Delete Daily Room
|
||||
logger.info(f"Deleting Daily room {room.url} ...")
|
||||
async with aiohttp.ClientSession() as aiohttp_session:
|
||||
helper = DailyRESTHelper(
|
||||
daily_api_key=os.getenv("DAILY_API_KEY"), aiohttp_session=aiohttp_session
|
||||
)
|
||||
await helper.delete_room_by_name(room.name)
|
||||
logger.info(f"Deleted Daily room {room.url}")
|
||||
|
||||
logger.remove(log_file_id)
|
||||
|
||||
def print_results(self):
|
||||
@@ -183,6 +203,25 @@ class EvalRunner:
|
||||
else:
|
||||
logger.warning(f"There's no audio to save for {name}")
|
||||
|
||||
def _create_eval_callbacks(self, result_future: asyncio.Future):
|
||||
"""Create per-eval callback functions that capture a local result_future.
|
||||
|
||||
This allows multiple evals to run concurrently without sharing state.
|
||||
"""
|
||||
|
||||
async def function_assert_eval(params: FunctionCallParams):
|
||||
result = params.arguments["result"]
|
||||
reasoning = params.arguments["reasoning"]
|
||||
logger.debug(f"🧠 EVAL REASONING(result: {result}): {reasoning}")
|
||||
await params.result_callback(None)
|
||||
await params.llm.push_frame(EndTaskFrame(reason=result), FrameDirection.UPSTREAM)
|
||||
|
||||
async def assert_eval(result: bool):
|
||||
if not result_future.done():
|
||||
result_future.set_result(result)
|
||||
|
||||
return function_assert_eval, assert_eval
|
||||
|
||||
def _base_file_name(self, example_file: str):
|
||||
base_name = os.path.splitext(example_file)[0]
|
||||
return f"{base_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
||||
@@ -196,9 +235,7 @@ class EvalRunner:
|
||||
return os.path.join(self._recordings_dir, f"{base_name}.wav")
|
||||
|
||||
|
||||
async def run_example_pipeline(script_path: Path, eval_config: EvalConfig):
|
||||
room_url = os.getenv("DAILY_ROOM_URL")
|
||||
|
||||
async def run_example_pipeline(room_url: str, script_path: Path, eval_config: EvalConfig):
|
||||
module = load_module_from_path(script_path)
|
||||
|
||||
transport = DailyTransport(
|
||||
@@ -223,11 +260,13 @@ async def run_eval_pipeline(
|
||||
eval_runner: EvalRunner,
|
||||
example_file: str,
|
||||
eval_config: EvalConfig,
|
||||
*,
|
||||
room_url: str,
|
||||
function_assert_eval,
|
||||
assert_eval,
|
||||
):
|
||||
logger.info(f"Starting eval bot")
|
||||
|
||||
room_url = os.getenv("DAILY_ROOM_URL")
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
None,
|
||||
@@ -306,7 +345,7 @@ async def run_eval_pipeline(
|
||||
),
|
||||
)
|
||||
|
||||
llm.register_function("eval_function", eval_runner.function_assert_eval)
|
||||
llm.register_function("eval_function", function_assert_eval)
|
||||
|
||||
context = LLMContext(tools=tools)
|
||||
context_aggregator = LLMContextAggregatorPair(
|
||||
@@ -374,9 +413,9 @@ async def run_eval_pipeline(
|
||||
@task.event_handler("on_pipeline_finished")
|
||||
async def on_pipeline_finished(task, frame):
|
||||
if isinstance(frame, EndFrame):
|
||||
await eval_runner.assert_eval(frame.reason)
|
||||
await assert_eval(frame.reason)
|
||||
elif isinstance(frame, CancelFrame):
|
||||
await eval_runner.assert_eval(False)
|
||||
await assert_eval(False)
|
||||
|
||||
# TODO(aleix): We should handle SIGINT and SIGTERM so we can cancel both the
|
||||
# eval and the example.
|
||||
|
||||
@@ -321,11 +321,18 @@ async def main(args: argparse.Namespace):
|
||||
log_level=log_level,
|
||||
)
|
||||
|
||||
# Parse test config: (test, prompt, eval, user_speaks_first)
|
||||
for test_config in TESTS:
|
||||
test, eval_config = test_config
|
||||
concurrency = args.concurrency
|
||||
semaphore = asyncio.Semaphore(concurrency)
|
||||
|
||||
await runner.run_eval(test, eval_config)
|
||||
async def run_with_semaphore(test: str, eval_config: EvalConfig):
|
||||
async with semaphore:
|
||||
await runner.run_eval(test, eval_config)
|
||||
|
||||
tasks = []
|
||||
for test, eval_config in TESTS:
|
||||
tasks.append(asyncio.create_task(run_with_semaphore(test, eval_config)))
|
||||
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
runner.print_results()
|
||||
|
||||
@@ -333,6 +340,13 @@ async def main(args: argparse.Namespace):
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Pipecat Eval Runner")
|
||||
parser.add_argument("--audio", "-a", action="store_true", help="Record audio for each test")
|
||||
parser.add_argument(
|
||||
"--concurrency",
|
||||
"-c",
|
||||
type=int,
|
||||
default=3,
|
||||
help="Max number of evals to run concurrently (default: 3)",
|
||||
)
|
||||
parser.add_argument("--name", "-n", help="Name for the current runner (e.g. 'v.0.0.68')")
|
||||
parser.add_argument("--pattern", "-p", help="Only run tests that match the pattern")
|
||||
parser.add_argument("--verbose", "-v", action="count", default=0)
|
||||
|
||||
@@ -68,6 +68,11 @@ class ProtobufFrameSerializer(FrameSerializer):
|
||||
params: Configuration parameters.
|
||||
"""
|
||||
super().__init__(params)
|
||||
# The base serializer defaults to filtering out RTVI protocol messages
|
||||
# to avoid sending them over telephony media streams. ProtobufFrameSerializer
|
||||
# is used by WebSocket transports, which are the delivery channel for
|
||||
# these messages, so we disable the filter.
|
||||
self._params.ignore_rtvi_messages = False
|
||||
|
||||
async def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
"""Serialize a frame to Protocol Buffer binary format.
|
||||
|
||||
@@ -1238,6 +1238,14 @@ class GeminiLiveLLMService(LLMService):
|
||||
self._end_frame_deferral_timeout_task.cancel()
|
||||
self._end_frame_deferral_timeout_task = None
|
||||
|
||||
def _get_history_config(self) -> Optional[HistoryConfig]:
|
||||
"""Return the history config for the Live API connection.
|
||||
|
||||
Subclasses can override this to disable history config (e.g. Vertex AI
|
||||
does not support it).
|
||||
"""
|
||||
return HistoryConfig(initial_history_in_client_content=True)
|
||||
|
||||
async def _connect(self, session_resumption_handle: Optional[str] = None):
|
||||
"""Establish client connection to Gemini Live API."""
|
||||
if self._session:
|
||||
@@ -1273,9 +1281,13 @@ class GeminiLiveLLMService(LLMService):
|
||||
input_audio_transcription=AudioTranscriptionConfig(),
|
||||
output_audio_transcription=AudioTranscriptionConfig(),
|
||||
session_resumption=SessionResumptionConfig(handle=session_resumption_handle),
|
||||
history_config=HistoryConfig(initial_history_in_client_content=True),
|
||||
)
|
||||
|
||||
# Add history config, if supported (not supported by Vertex)
|
||||
history_config = self._get_history_config()
|
||||
if history_config:
|
||||
config.history_config = history_config
|
||||
|
||||
# Add context window compression to configuration, if enabled
|
||||
cwc = self._settings.context_window_compression or {}
|
||||
if cwc.get("enabled", False):
|
||||
|
||||
@@ -212,6 +212,10 @@ class GeminiLiveVertexLLMService(GeminiLiveLLMService):
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
def _get_history_config(self):
|
||||
"""Vertex AI does not support history_config."""
|
||||
return None
|
||||
|
||||
def create_client(self):
|
||||
"""Create the Gemini client instance."""
|
||||
self._client = Client(
|
||||
|
||||
Reference in New Issue
Block a user