Compare commits
4 Commits
aleix/para
...
filipi/syn
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
edca44a913 | ||
|
|
22b3a24548 | ||
|
|
de1fd67b2d | ||
|
|
265540b8ce |
@@ -1 +0,0 @@
|
||||
- Fixed RTVI events not being delivered to clients when using WebSocket transports. `ProtobufFrameSerializer` now sets `ignore_rtvi_messages=False` by default.
|
||||
1
changelog/4178.fixed.md
Normal file
1
changelog/4178.fixed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed timed frames (e.g. word-boundary events) arriving out of order or too late relative to TTS audio playback. When a `TTSStoppedFrame` carries a presentation timestamp, the clock queue now flushes all pending timed frames immediately once the audio task finishes sending the preceding audio, ensuring timed events always reach downstream processors before the stop signal.
|
||||
@@ -51,12 +51,6 @@ async def end_conversation(params: FunctionCallParams):
|
||||
await params.llm.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM)
|
||||
|
||||
|
||||
# NOTE: we can ask the model to say something *after* the call to
|
||||
# end_conversation because GeminiLiveLLMService defers processing EndFrames
|
||||
# until after the bot finishes its current turn. With Gemini 3.1 Flash Live,
|
||||
# the model won't reliably report ending its turn until after it says something
|
||||
# following the tool call, which is why the system instruction is structured
|
||||
# the way it is.
|
||||
system_instruction = """
|
||||
You are a helpful assistant who can answer questions and use tools.
|
||||
|
||||
@@ -65,10 +59,9 @@ You have three tools available to you:
|
||||
2. get_restaurant_recommendation: Use this tool to get a restaurant recommendation in a specific location.
|
||||
3. end_conversation: Use this tool to gracefully end the conversation.
|
||||
|
||||
After you've responded to the user three times, do the following:
|
||||
1. Politely let them know that that's all the time you have today (but don't say "goodbye" yet).
|
||||
2. Then immediately call the end_conversation function. *DO NOT FORGET TO DO THIS STEP.*
|
||||
3. After the tool reports success, say goodbye.
|
||||
After you've responded to the user three times, do two things, in order:
|
||||
1. Politely let them know that that's all the time you have today and say goodbye.
|
||||
2. *WITHOUT WAITING FOR THE USER TO RESPOND*, call the end_conversation tool to gracefully end the conversation.
|
||||
"""
|
||||
|
||||
|
||||
|
||||
@@ -16,7 +16,6 @@ from pathlib import Path
|
||||
from typing import Any, List, Optional, Tuple
|
||||
|
||||
import aiofiles
|
||||
import aiohttp
|
||||
from loguru import logger
|
||||
from PIL.ImageFile import ImageFile
|
||||
from utils import (
|
||||
@@ -54,7 +53,6 @@ from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.llm_service import FunctionCallParams
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.daily.transport import DailyParams, DailyTransport
|
||||
from pipecat.transports.daily.utils import DailyRESTHelper, DailyRoomObject, DailyRoomParams
|
||||
|
||||
SCRIPT_DIR = Path(__file__).resolve().parent
|
||||
|
||||
@@ -89,6 +87,7 @@ class EvalRunner:
|
||||
self._log_level = log_level
|
||||
self._total_success = 0
|
||||
self._tests: List[EvalResult] = []
|
||||
self._result_future: Optional[asyncio.Future[bool]] = None
|
||||
|
||||
# We to save runner files.
|
||||
name = name or f"{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
||||
@@ -98,6 +97,17 @@ class EvalRunner:
|
||||
os.makedirs(self._logs_dir, exist_ok=True)
|
||||
os.makedirs(self._recordings_dir, exist_ok=True)
|
||||
|
||||
async def function_assert_eval(self, params: FunctionCallParams):
|
||||
result = params.arguments["result"]
|
||||
reasoning = params.arguments["reasoning"]
|
||||
logger.debug(f"🧠 EVAL REASONING(result: {result}): {reasoning}")
|
||||
await params.result_callback(None)
|
||||
await params.llm.push_frame(EndTaskFrame(reason=result), FrameDirection.UPSTREAM)
|
||||
|
||||
async def assert_eval(self, result: bool):
|
||||
if self._result_future:
|
||||
self._result_future.set_result(result)
|
||||
|
||||
async def run_eval(
|
||||
self,
|
||||
example_file: str,
|
||||
@@ -106,44 +116,23 @@ class EvalRunner:
|
||||
if not re.match(self._pattern, example_file):
|
||||
return
|
||||
|
||||
print(f"\n🚀 Launching {example_file} ...\n")
|
||||
|
||||
# Store logs
|
||||
filename = self._log_file_name(example_file)
|
||||
log_file_id = logger.add(filename, level=self._log_level)
|
||||
|
||||
print_begin_test(example_file)
|
||||
|
||||
# Create Daily Room
|
||||
logger.info("Creating Daily room...")
|
||||
async with aiohttp.ClientSession() as aiohttp_session:
|
||||
helper = DailyRESTHelper(
|
||||
daily_api_key=os.getenv("DAILY_API_KEY"), aiohttp_session=aiohttp_session
|
||||
)
|
||||
room = await helper.create_room(params=DailyRoomParams())
|
||||
logger.info(f"Created Daily room {room.url}")
|
||||
|
||||
script_path = self._examples_dir / example_file
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
# Create per-eval future and callbacks so multiple evals can run concurrently.
|
||||
result_future = asyncio.get_running_loop().create_future()
|
||||
function_assert_eval, assert_eval = self._create_eval_callbacks(result_future)
|
||||
# Create a future to store the eval result.
|
||||
self._result_future = asyncio.get_running_loop().create_future()
|
||||
|
||||
try:
|
||||
tasks = [
|
||||
asyncio.create_task(run_example_pipeline(room.url, script_path, eval_config)),
|
||||
asyncio.create_task(
|
||||
run_eval_pipeline(
|
||||
self,
|
||||
example_file,
|
||||
eval_config,
|
||||
room_url=room.url,
|
||||
function_assert_eval=function_assert_eval,
|
||||
assert_eval=assert_eval,
|
||||
)
|
||||
),
|
||||
asyncio.create_task(run_example_pipeline(script_path, eval_config)),
|
||||
asyncio.create_task(run_eval_pipeline(self, example_file, eval_config)),
|
||||
]
|
||||
_, pending = await asyncio.wait(tasks, timeout=EVAL_TIMEOUT_SECS)
|
||||
if pending:
|
||||
@@ -160,7 +149,7 @@ class EvalRunner:
|
||||
|
||||
try:
|
||||
# Wait for the future to resolve.
|
||||
result = await asyncio.wait_for(result_future, timeout=EVAL_RESULT_TIMEOUT_SECS)
|
||||
result = await asyncio.wait_for(self._result_future, timeout=EVAL_RESULT_TIMEOUT_SECS)
|
||||
except asyncio.TimeoutError:
|
||||
logger.error(f"ERROR: Timeout waiting for eval result.")
|
||||
result = False
|
||||
@@ -174,15 +163,6 @@ class EvalRunner:
|
||||
|
||||
print_end_test(example_file, result, eval_time)
|
||||
|
||||
# Delete Daily Room
|
||||
logger.info(f"Deleting Daily room {room.url} ...")
|
||||
async with aiohttp.ClientSession() as aiohttp_session:
|
||||
helper = DailyRESTHelper(
|
||||
daily_api_key=os.getenv("DAILY_API_KEY"), aiohttp_session=aiohttp_session
|
||||
)
|
||||
await helper.delete_room_by_name(room.name)
|
||||
logger.info(f"Deleted Daily room {room.url}")
|
||||
|
||||
logger.remove(log_file_id)
|
||||
|
||||
def print_results(self):
|
||||
@@ -203,25 +183,6 @@ class EvalRunner:
|
||||
else:
|
||||
logger.warning(f"There's no audio to save for {name}")
|
||||
|
||||
def _create_eval_callbacks(self, result_future: asyncio.Future):
|
||||
"""Create per-eval callback functions that capture a local result_future.
|
||||
|
||||
This allows multiple evals to run concurrently without sharing state.
|
||||
"""
|
||||
|
||||
async def function_assert_eval(params: FunctionCallParams):
|
||||
result = params.arguments["result"]
|
||||
reasoning = params.arguments["reasoning"]
|
||||
logger.debug(f"🧠 EVAL REASONING(result: {result}): {reasoning}")
|
||||
await params.result_callback(None)
|
||||
await params.llm.push_frame(EndTaskFrame(reason=result), FrameDirection.UPSTREAM)
|
||||
|
||||
async def assert_eval(result: bool):
|
||||
if not result_future.done():
|
||||
result_future.set_result(result)
|
||||
|
||||
return function_assert_eval, assert_eval
|
||||
|
||||
def _base_file_name(self, example_file: str):
|
||||
base_name = os.path.splitext(example_file)[0]
|
||||
return f"{base_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
||||
@@ -235,7 +196,9 @@ class EvalRunner:
|
||||
return os.path.join(self._recordings_dir, f"{base_name}.wav")
|
||||
|
||||
|
||||
async def run_example_pipeline(room_url: str, script_path: Path, eval_config: EvalConfig):
|
||||
async def run_example_pipeline(script_path: Path, eval_config: EvalConfig):
|
||||
room_url = os.getenv("DAILY_ROOM_URL")
|
||||
|
||||
module = load_module_from_path(script_path)
|
||||
|
||||
transport = DailyTransport(
|
||||
@@ -260,13 +223,11 @@ async def run_eval_pipeline(
|
||||
eval_runner: EvalRunner,
|
||||
example_file: str,
|
||||
eval_config: EvalConfig,
|
||||
*,
|
||||
room_url: str,
|
||||
function_assert_eval,
|
||||
assert_eval,
|
||||
):
|
||||
logger.info(f"Starting eval bot")
|
||||
|
||||
room_url = os.getenv("DAILY_ROOM_URL")
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
None,
|
||||
@@ -345,7 +306,7 @@ async def run_eval_pipeline(
|
||||
),
|
||||
)
|
||||
|
||||
llm.register_function("eval_function", function_assert_eval)
|
||||
llm.register_function("eval_function", eval_runner.function_assert_eval)
|
||||
|
||||
context = LLMContext(tools=tools)
|
||||
context_aggregator = LLMContextAggregatorPair(
|
||||
@@ -413,9 +374,9 @@ async def run_eval_pipeline(
|
||||
@task.event_handler("on_pipeline_finished")
|
||||
async def on_pipeline_finished(task, frame):
|
||||
if isinstance(frame, EndFrame):
|
||||
await assert_eval(frame.reason)
|
||||
await eval_runner.assert_eval(frame.reason)
|
||||
elif isinstance(frame, CancelFrame):
|
||||
await assert_eval(False)
|
||||
await eval_runner.assert_eval(False)
|
||||
|
||||
# TODO(aleix): We should handle SIGINT and SIGTERM so we can cancel both the
|
||||
# eval and the example.
|
||||
|
||||
@@ -321,18 +321,11 @@ async def main(args: argparse.Namespace):
|
||||
log_level=log_level,
|
||||
)
|
||||
|
||||
concurrency = args.concurrency
|
||||
semaphore = asyncio.Semaphore(concurrency)
|
||||
# Parse test config: (test, prompt, eval, user_speaks_first)
|
||||
for test_config in TESTS:
|
||||
test, eval_config = test_config
|
||||
|
||||
async def run_with_semaphore(test: str, eval_config: EvalConfig):
|
||||
async with semaphore:
|
||||
await runner.run_eval(test, eval_config)
|
||||
|
||||
tasks = []
|
||||
for test, eval_config in TESTS:
|
||||
tasks.append(asyncio.create_task(run_with_semaphore(test, eval_config)))
|
||||
|
||||
await asyncio.gather(*tasks)
|
||||
await runner.run_eval(test, eval_config)
|
||||
|
||||
runner.print_results()
|
||||
|
||||
@@ -340,13 +333,6 @@ async def main(args: argparse.Namespace):
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Pipecat Eval Runner")
|
||||
parser.add_argument("--audio", "-a", action="store_true", help="Record audio for each test")
|
||||
parser.add_argument(
|
||||
"--concurrency",
|
||||
"-c",
|
||||
type=int,
|
||||
default=3,
|
||||
help="Max number of evals to run concurrently (default: 3)",
|
||||
)
|
||||
parser.add_argument("--name", "-n", help="Name for the current runner (e.g. 'v.0.0.68')")
|
||||
parser.add_argument("--pattern", "-p", help="Only run tests that match the pattern")
|
||||
parser.add_argument("--verbose", "-v", action="count", default=0)
|
||||
|
||||
@@ -68,11 +68,6 @@ class ProtobufFrameSerializer(FrameSerializer):
|
||||
params: Configuration parameters.
|
||||
"""
|
||||
super().__init__(params)
|
||||
# The base serializer defaults to filtering out RTVI protocol messages
|
||||
# to avoid sending them over telephony media streams. ProtobufFrameSerializer
|
||||
# is used by WebSocket transports, which are the delivery channel for
|
||||
# these messages, so we disable the filter.
|
||||
self._params.ignore_rtvi_messages = False
|
||||
|
||||
async def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
"""Serialize a frame to Protocol Buffer binary format.
|
||||
|
||||
@@ -1238,14 +1238,6 @@ class GeminiLiveLLMService(LLMService):
|
||||
self._end_frame_deferral_timeout_task.cancel()
|
||||
self._end_frame_deferral_timeout_task = None
|
||||
|
||||
def _get_history_config(self) -> Optional[HistoryConfig]:
|
||||
"""Return the history config for the Live API connection.
|
||||
|
||||
Subclasses can override this to disable history config (e.g. Vertex AI
|
||||
does not support it).
|
||||
"""
|
||||
return HistoryConfig(initial_history_in_client_content=True)
|
||||
|
||||
async def _connect(self, session_resumption_handle: Optional[str] = None):
|
||||
"""Establish client connection to Gemini Live API."""
|
||||
if self._session:
|
||||
@@ -1281,13 +1273,9 @@ class GeminiLiveLLMService(LLMService):
|
||||
input_audio_transcription=AudioTranscriptionConfig(),
|
||||
output_audio_transcription=AudioTranscriptionConfig(),
|
||||
session_resumption=SessionResumptionConfig(handle=session_resumption_handle),
|
||||
history_config=HistoryConfig(initial_history_in_client_content=True),
|
||||
)
|
||||
|
||||
# Add history config, if supported (not supported by Vertex)
|
||||
history_config = self._get_history_config()
|
||||
if history_config:
|
||||
config.history_config = history_config
|
||||
|
||||
# Add context window compression to configuration, if enabled
|
||||
cwc = self._settings.context_window_compression or {}
|
||||
if cwc.get("enabled", False):
|
||||
|
||||
@@ -212,10 +212,6 @@ class GeminiLiveVertexLLMService(GeminiLiveLLMService):
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
def _get_history_config(self):
|
||||
"""Vertex AI does not support history_config."""
|
||||
return None
|
||||
|
||||
def create_client(self):
|
||||
"""Create the Gemini client instance."""
|
||||
self._client = Client(
|
||||
|
||||
@@ -359,6 +359,16 @@ class BaseOutputTransport(FrameProcessor):
|
||||
await sender.handle_mixer_control_frame(frame)
|
||||
elif isinstance(frame, TTSStoppedFrame):
|
||||
await sender.handle_sync_frame(frame)
|
||||
if frame.pts:
|
||||
# The frame goes into both queues. Create fresh events to keep
|
||||
# the queues in sync: the audio task signals _clock_flush_event
|
||||
# to trigger an immediate drain; the clock task signals
|
||||
# _clock_drained_event once the drain is complete so the audio
|
||||
# task knows it can safely push TTSStoppedFrame downstream.
|
||||
# This way we can keep audio and words in sync.
|
||||
sender._clock_flush_event = asyncio.Event()
|
||||
sender._clock_drained_event = asyncio.Event()
|
||||
await sender.handle_timed_frame(frame)
|
||||
elif frame.pts:
|
||||
await sender.handle_timed_frame(frame)
|
||||
else:
|
||||
@@ -431,6 +441,22 @@ class BaseOutputTransport(FrameProcessor):
|
||||
self._video_task: Optional[asyncio.Task] = None
|
||||
self._clock_task: Optional[asyncio.Task] = None
|
||||
|
||||
# When a TTSStoppedFrame has a pts it is enqueued in both the
|
||||
# audio_queue and the clock_queue. These two events keep the queues
|
||||
# in sync around that frame:
|
||||
#
|
||||
# _clock_flush_event – set by the audio task once all preceding
|
||||
# audio has been sent, telling the clock task to stop waiting on
|
||||
# timestamps and drain every pending frame immediately.
|
||||
#
|
||||
# _clock_drained_event – set by the clock task once it has drained
|
||||
# all frames up to and including the TTSStoppedFrame, telling the
|
||||
# audio task it is safe to push the TTSStoppedFrame downstream.
|
||||
# This guarantees that all timed frames arrive before the stop
|
||||
# signal regardless of which queue wins the race.
|
||||
self._clock_flush_event: Optional[asyncio.Event] = None
|
||||
self._clock_drained_event: Optional[asyncio.Event] = None
|
||||
|
||||
@property
|
||||
def sample_rate(self) -> int:
|
||||
"""Get the audio sample rate.
|
||||
@@ -800,6 +826,26 @@ class BaseOutputTransport(FrameProcessor):
|
||||
await self._send_silence(self._params.audio_out_end_silence_secs)
|
||||
break
|
||||
|
||||
# If this TTSStoppedFrame is also in the clock queue, signal
|
||||
# the clock task to drain immediately and then wait for it to
|
||||
# confirm all timed frames have been pushed downstream before
|
||||
# pushing TTSStoppedFrame here. This keeps the audio queue as
|
||||
# the single owner of the downstream push for this frame.
|
||||
if isinstance(frame, TTSStoppedFrame) and self._clock_flush_event is not None:
|
||||
logger.debug(f"{self._transport} audio queue signalling clock queue flush")
|
||||
self._clock_flush_event.set()
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
self._clock_drained_event.wait(), timeout=BOT_VAD_STOP_FALLBACK_SECS
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(
|
||||
f"{self._transport} timed out waiting for clock queue to drain, "
|
||||
"pushing TTSStoppedFrame downstream anyway"
|
||||
)
|
||||
self._clock_flush_event = None
|
||||
self._clock_drained_event = None
|
||||
|
||||
# Handle frame.
|
||||
await self._handle_frame(frame)
|
||||
|
||||
@@ -951,13 +997,46 @@ class BaseOutputTransport(FrameProcessor):
|
||||
# If we have a frame we check it's presentation timestamp. If it
|
||||
# has already passed we process it, otherwise we wait until it's
|
||||
# time to process it.
|
||||
#
|
||||
# When a TTSStoppedFrame with a pts is in flight, this queue and
|
||||
# the audio_queue are kept in sync: the audio task signals
|
||||
# _clock_flush_event as soon as all preceding audio has been
|
||||
# sent, which wakes up the wait below early so every pending
|
||||
# clock-queue frame is delivered immediately instead of stalling
|
||||
# until its timestamp arrives.
|
||||
if running:
|
||||
current_time = self._transport.get_clock().get_time()
|
||||
if timestamp > current_time:
|
||||
wait_time = nanoseconds_to_seconds(timestamp - current_time)
|
||||
await asyncio.sleep(wait_time)
|
||||
if self._clock_flush_event:
|
||||
# Race between the natural timestamp and a drain
|
||||
# signal from the audio task. If the audio task sets
|
||||
# the event first, we fall through immediately so
|
||||
# this frame (and all subsequent ones up to the
|
||||
# TTSStoppedFrame) are processed without delay.
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
asyncio.shield(self._clock_flush_event.wait()),
|
||||
timeout=wait_time,
|
||||
)
|
||||
logger.debug(
|
||||
f"{self._transport} clock queue flushed: delivering {frame} immediately"
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
else:
|
||||
await asyncio.sleep(wait_time)
|
||||
|
||||
# Push frame downstream.
|
||||
await self._transport.push_frame(frame)
|
||||
# If this is the TTSStoppedFrame, signal the audio task that
|
||||
# the drain is complete. The audio task owns the downstream
|
||||
# push for this frame, so skip it here.
|
||||
if isinstance(frame, TTSStoppedFrame) and self._clock_drained_event is not None:
|
||||
logger.debug(
|
||||
f"{self._transport} clock queue drained, handing off TTSStoppedFrame to audio queue"
|
||||
)
|
||||
self._clock_drained_event.set()
|
||||
else:
|
||||
# Push frame downstream.
|
||||
await self._transport.push_frame(frame)
|
||||
|
||||
self._clock_queue.task_done()
|
||||
|
||||
Reference in New Issue
Block a user