diff --git a/examples/realtime/realtime-gemini-live-local-vad.py b/examples/realtime/realtime-gemini-live-local-vad.py index 404aaf512..650fe3b05 100644 --- a/examples/realtime/realtime-gemini-live-local-vad.py +++ b/examples/realtime/realtime-gemini-live-local-vad.py @@ -80,10 +80,11 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): # # - Turn strategies do not consider user transcripts, so the user # turn ends sooner. - # - User transcripts are handled by the aggregator: a simple timer - # gives it time to gather them after the user turn ends, then - # the aggregator emits `on_user_turn_message_finalized` with the - # new user context message. + # - User transcripts are handled by the aggregator: a simple + # post-turn transcript wait gives them time to arrive after the + # user turn ends, then the aggregator emits + # `on_user_turn_message_finalized` with the new user context + # message. user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, user_params=LLMUserAggregatorParams( @@ -123,8 +124,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): await task.cancel() # `on_user_turn_stopped` fires at the end of the user turn. With - # `wait_for_transcript_to_end_user_turn=False`, the aggregator - # hasn't gathered user transcripts yet at this point, so + # `wait_for_transcript_to_end_user_turn=False`, no user + # transcripts have arrived yet at this point, so # `message.content` is empty. Logged here to make the end-of-turn # signal visible alongside the later finalization event. @user_aggregator.event_handler("on_user_turn_stopped") @@ -133,8 +134,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): # `on_user_turn_message_finalized` fires when the user message has # been finalized into the context. Here it fires later than - # `on_user_turn_stopped`, after the aggregator has gathered the - # realtime service's user transcripts. + # `on_user_turn_stopped`, after the aggregator's post-turn + # transcript wait completes. @user_aggregator.event_handler("on_user_turn_message_finalized") async def on_user_turn_message_finalized( aggregator, strategy, message: UserMessageFinalizedMessage diff --git a/src/pipecat/processors/aggregators/llm_response_universal.py b/src/pipecat/processors/aggregators/llm_response_universal.py index f72cfcf4c..0d3bebbe4 100644 --- a/src/pipecat/processors/aggregators/llm_response_universal.py +++ b/src/pipecat/processors/aggregators/llm_response_universal.py @@ -138,16 +138,16 @@ class LLMUserAggregatorParams: - Turn strategies do not consider user transcripts, so the user turn ends sooner. ``on_user_turn_stopped`` fires at - the end of turn with empty content (transcripts haven't - been gathered yet). To achieve this, the aggregator - drops ``TranscriptionUserTurnStartStrategy`` from start - strategies and flips ``wait_for_transcript=False`` on - any stop strategy that supports it. + the end of turn with empty content. To achieve this, + the aggregator drops ``TranscriptionUserTurnStartStrategy`` + from start strategies and flips + ``wait_for_transcript=False`` on any stop strategy that + supports it. - User transcripts are handled by the aggregator: a simple - timer gives it time to gather them after the user turn - ends, then the aggregator emits a new - ``on_user_turn_message_finalized`` event with the new - user context message. + post-turn transcript wait gives it time to receive them + after the user turn ends, then the aggregator emits a + new ``on_user_turn_message_finalized`` event with the + new user context message. filter_incomplete_user_turns: [DEPRECATED] Use ``user_turn_strategies=FilterIncompleteUserTurnStrategies()`` instead. When enabled, the LLM outputs a turn-completion @@ -286,14 +286,15 @@ class UserTurnStoppedMessage: With ``wait_for_transcript_to_end_user_turn=True`` (the default), the user message is finalized at the end of the turn, so ``content`` carries the aggregated transcript. With it set to - False, the aggregator is still gathering user transcripts at this - point, so ``content`` is ``None`` — subscribe to + False, the aggregator is still in its post-turn transcript wait + at this point, so ``content`` is ``None`` — subscribe to ``on_user_turn_message_finalized`` for the assembled message. Parameters: content: The aggregated user transcript, or ``None`` when ``wait_for_transcript_to_end_user_turn=False`` (the - aggregator is still gathering transcripts at this point). + aggregator is still in its post-turn transcript wait at + this point). timestamp: When the user turn started. user_id: Optional identifier for the user. @@ -311,9 +312,9 @@ class UserMessageFinalizedMessage: Fired when the user message has been finalized into the context. With ``wait_for_transcript_to_end_user_turn=True`` (the default) this coincides with ``on_user_turn_stopped``. With it set to - False, the aggregator first gathers user transcripts after the - end of turn, so this event fires later than - ``on_user_turn_stopped``. ``content`` is always populated. + False, the aggregator first runs a post-turn transcript wait, so + this event fires later than ``on_user_turn_stopped``. + ``content`` is always populated. Parameters: content: The aggregated user transcript. @@ -582,15 +583,15 @@ class LLMUserAggregator(LLMContextAggregator): ``UserTurnStoppedMessage``. With ``wait_for_transcript_to_end_user_turn=True`` (the default), ``message.content`` carries the aggregated transcript. With it - set to False, the aggregator is still gathering user transcripts - at this point, so ``message.content`` is ``None``; subscribe to - ``on_user_turn_message_finalized`` for the assembled message. + set to False, the aggregator is still in its post-turn transcript + wait at this point, so ``message.content`` is ``None``; subscribe + to ``on_user_turn_message_finalized`` for the assembled message. - on_user_turn_message_finalized: Called when the user message has been finalized into the context, with a ``UserMessageFinalizedMessage``. With ``wait_for_transcript_to_end_user_turn=True`` this coincides with ``on_user_turn_stopped``; with it set to False it fires - later, after the aggregator's transcript-gather window + later, after the aggregator's post-turn transcript wait window completes. ``message.content`` is always populated. - on_user_turn_stop_timeout: Called when no user turn stop strategy triggers - on_user_turn_idle: Called when the user has been idle for the configured timeout @@ -697,18 +698,18 @@ class LLMUserAggregator(LLMContextAggregator): # inferences fire before finalization. self._full_user_turn_aggregation: str | None = None - # Transcript-gather state, used when the aggregator gathers - # user transcripts after the user turn ends - # (`_aggregator_gathers_transcripts == True`): + # Post-turn transcript wait state, used when the aggregator + # waits for transcripts after the user turn ends + # (`_wait_for_post_turn_transcripts == True`): # `on_user_turn_stopped` has fired with empty content, and the - # aggregator is waiting on `_transcript_gather_task` before - # finalizing the user message into context. The gather window - # duration is taken from the last `STTMetadataFrame` seen - # (`STTMetadataFrame.ttfs_p99_latency`), falling back to + # aggregator is waiting on `_post_turn_transcript_wait_task` + # before finalizing the user message into context. The wait + # window duration is taken from the last `STTMetadataFrame` + # seen (`STTMetadataFrame.ttfs_p99_latency`), falling back to # `DEFAULT_TTFS_P99` if no STT service has reported one. - self._gathering_for_strategy: BaseUserTurnStopStrategy | None = None - self._inference_during_gather: bool = False - self._transcript_gather_task: asyncio.Task | None = None + self._post_turn_transcript_wait_strategy: BaseUserTurnStopStrategy | None = None + self._inference_during_post_turn_transcript_wait: bool = False + self._post_turn_transcript_wait_task: asyncio.Task | None = None self._stt_ttfs_p99_latency: float | None = None self._user_turn_controller = UserTurnController( @@ -754,16 +755,17 @@ class LLMUserAggregator(LLMContextAggregator): self._vad_controller.add_event_handler("on_broadcast_frame", self._on_broadcast_frame) @property - def _aggregator_gathers_transcripts(self) -> bool: - """True when the aggregator gathers user transcripts after the turn ends. + def _wait_for_post_turn_transcripts(self) -> bool: + """True when the aggregator runs a post-turn transcript wait. - Internal alias for ``wait_for_transcript_to_end_user_turn=False``. - In this mode, turn strategies don't consider user transcripts - (so the user turn ends sooner), and the aggregator runs a - simple timer after the end of turn to gather any transcripts - that arrive, then emits ``on_user_turn_message_finalized`` - with the assembled user context message. Always travels with - the strategy-mutation bundle applied at init. + Inverse of the public ``wait_for_transcript_to_end_user_turn`` + param: when that's False, this is True. In this mode, turn + strategies don't consider user transcripts (so the user turn + ends sooner), and the aggregator runs a simple timer after the + end of turn to receive any transcripts that arrive, then emits + ``on_user_turn_message_finalized`` with the assembled user + context message. Always travels with the strategy-mutation + bundle applied at init. """ return not self._params.wait_for_transcript_to_end_user_turn @@ -868,7 +870,7 @@ class LLMUserAggregator(LLMContextAggregator): pass elif isinstance(frame, STTMetadataFrame): # Record the STT service's reported P99 TTFS so the - # transcript-gather timer can size itself to the real + # post-turn transcript wait timer can size itself to the real # latency. Frame is also pushed downstream so other # processors keep seeing it. self._stt_ttfs_p99_latency = frame.ttfs_p99_latency @@ -933,15 +935,17 @@ class LLMUserAggregator(LLMContextAggregator): async def _finalize_on_session_end(self): """Flush any pending user message on session end. - If a transcript-gather is in flight (the aggregator hasn't - finished gathering transcripts yet), complete it now so the - user message is captured before the session shuts down. + If a post-turn transcript wait is in flight, complete it now so + the user message is captured before the session shuts down. Otherwise, run the mode-appropriate finalize path on whatever is currently in the buffer. """ - if self._gathering_for_strategy is not None or self._inference_during_gather: - await self._complete_transcript_gather(on_session_end=True) - elif self._aggregator_gathers_transcripts: + if ( + self._post_turn_transcript_wait_strategy is not None + or self._inference_during_post_turn_transcript_wait + ): + await self._complete_post_turn_transcript_wait(on_session_end=True) + elif self._wait_for_post_turn_transcripts: await self._finalize_user_message(on_session_end=True) else: await self._finalize_user_turn(on_session_end=True) @@ -1076,17 +1080,20 @@ class LLMUserAggregator(LLMContextAggregator): ): logger.debug(f"{self}: User started speaking (strategy: {strategy})") - # Precondition guard: if the previous turn's transcript-gather - # window is still active when the next turn starts, the - # assumption that transcripts arrive before the next turn - # has been violated. Complete the previous turn's gather now - # so its user message is finalized before this turn proceeds. - if self._gathering_for_strategy is not None or self._inference_during_gather: + # Precondition guard: if the previous turn's post-turn + # transcript wait is still active when the next turn starts, + # the assumption that transcripts arrive before the next turn + # has been violated. Complete the previous turn's wait now so + # its user message is finalized before this turn proceeds. + if ( + self._post_turn_transcript_wait_strategy is not None + or self._inference_during_post_turn_transcript_wait + ): logger.warning( f"{self}: user turn started before previous turn's transcripts " - f"were gathered; flushing previous turn now" + f"arrived; flushing previous turn now" ) - await self._complete_transcript_gather() + await self._complete_post_turn_transcript_wait() self._user_turn_start_timestamp = time_now_iso8601() self._full_user_turn_aggregation = None @@ -1108,12 +1115,12 @@ class LLMUserAggregator(LLMContextAggregator): ): logger.debug(f"{self}: User turn inference triggered (strategy: {strategy})") - if self._aggregator_gathers_transcripts: - # The aggregator is gathering transcripts after the user - # turn end. Defer push_aggregation and event emission; - # they'll run alongside user message finalization when the - # transcript-gather window completes. - self._inference_during_gather = True + if self._wait_for_post_turn_transcripts: + # The aggregator is in its post-turn transcript wait. + # Defer push_aggregation and event emission; they'll run + # alongside user message finalization when the wait window + # completes. + self._inference_during_post_turn_transcript_wait = True return # Push aggregation now: this writes the user message segment to @@ -1143,51 +1150,50 @@ class LLMUserAggregator(LLMContextAggregator): # End-of-turn side effects always fire on the strategy event, # regardless of whether user message finalization is deferred - # to a transcript-gather window. + # to a post-turn transcript wait window. if params.enable_user_speaking_frames: await self.broadcast_frame(UserStoppedSpeakingFrame) await self._user_idle_controller.process_frame(UserStoppedSpeakingFrame()) - if self._aggregator_gathers_transcripts: + if self._wait_for_post_turn_transcripts: # Fire `on_user_turn_stopped` now for the end of turn — - # content is `None` because the aggregator hasn't gathered - # transcripts yet. Start the transcript-gather timer; when - # it completes, the aggregator finalizes the user message - # and emits `on_user_turn_message_finalized`. Consumers - # wanting the assembled message subscribe to + # content is `None` because no transcripts have arrived + # yet. Start the post-turn transcript wait timer; when it + # completes, the aggregator finalizes the user message and + # emits `on_user_turn_message_finalized`. Consumers wanting + # the assembled message subscribe to # `on_user_turn_message_finalized`. end_of_turn_message = UserTurnStoppedMessage( content=None, timestamp=self._user_turn_start_timestamp ) await self._call_event_handler("on_user_turn_stopped", strategy, end_of_turn_message) - self._gathering_for_strategy = strategy - gather_timeout = ( + self._post_turn_transcript_wait_strategy = strategy + wait_timeout = ( self._stt_ttfs_p99_latency if self._stt_ttfs_p99_latency is not None else DEFAULT_TTFS_P99 ) - self._transcript_gather_task = self.create_task( - self._transcript_gather_handler(gather_timeout), - f"{self}::transcript_gather", + self._post_turn_transcript_wait_task = self.create_task( + self._post_turn_transcript_wait_handler(wait_timeout), + f"{self}::post_turn_transcript_wait", ) return await self._finalize_user_turn(strategy) - async def _transcript_gather_handler(self, timeout: float): - """Transcript-gather timer. + async def _post_turn_transcript_wait_handler(self, timeout: float): + """Post-turn transcript wait timer. Waits ``timeout`` seconds — giving transcripts time to arrive - after the end of turn — then completes the gather and - finalizes the user message into context, with whatever - transcripts the aggregator has captured by then (possibly - nothing). + after the end of turn — then completes the wait and finalizes + the user message into context, with whatever transcripts the + aggregator has received by then (possibly none). The simple-timer approach relies on the assumptions that - transcripts don't arrive too late and that the bot response - won't finish before this timer. + transcripts don't arrive too late and that the assistant + response won't finish before this timer. Cancelled by reset, the next-turn precondition guard, or session end. @@ -1197,30 +1203,30 @@ class LLMUserAggregator(LLMContextAggregator): except asyncio.CancelledError: return finally: - self._transcript_gather_task = None + self._post_turn_transcript_wait_task = None - await self._complete_transcript_gather() + await self._complete_post_turn_transcript_wait() - async def _complete_transcript_gather(self, *, on_session_end: bool = False): - """Complete the active transcript-gather window. + async def _complete_post_turn_transcript_wait(self, *, on_session_end: bool = False): + """Complete the active post-turn transcript wait window. ``on_user_turn_stopped`` already fired at the end of turn (with - empty content) and the aggregator has been gathering + empty content) and the aggregator has been receiving transcripts since. This finalizes that work: flushes any inference-triggered segment whose push was deferred during the - gather, then emits ``on_user_turn_message_finalized`` with the - assembled user context message. Called from the - transcript-gather timer (the normal path), the precondition - guard in ``_on_user_turn_started``, and the session-end paths. + wait, then emits ``on_user_turn_message_finalized`` with the + assembled user context message. Called from the post-turn + transcript wait timer (the normal path), the precondition guard + in ``_on_user_turn_started``, and the session-end paths. """ - if self._transcript_gather_task: - await self.cancel_task(self._transcript_gather_task) - self._transcript_gather_task = None + if self._post_turn_transcript_wait_task: + await self.cancel_task(self._post_turn_transcript_wait_task) + self._post_turn_transcript_wait_task = None - gather_strategy = self._gathering_for_strategy - had_pending_inference = self._inference_during_gather - self._gathering_for_strategy = None - self._inference_during_gather = False + wait_strategy = self._post_turn_transcript_wait_strategy + had_pending_inference = self._inference_during_post_turn_transcript_wait + self._post_turn_transcript_wait_strategy = None + self._inference_during_post_turn_transcript_wait = False if had_pending_inference: segment = await self.push_aggregation() @@ -1231,32 +1237,32 @@ class LLMUserAggregator(LLMContextAggregator): ) else: self._full_user_turn_aggregation = segment - await self._call_event_handler("on_user_turn_inference_triggered", gather_strategy) + await self._call_event_handler("on_user_turn_inference_triggered", wait_strategy) - if gather_strategy is not None or on_session_end: + if wait_strategy is not None or on_session_end: # `on_user_turn_stopped` already fired at the end of turn; # this is the deferred user message finalization. - await self._finalize_user_message(gather_strategy, on_session_end=on_session_end) + await self._finalize_user_message(wait_strategy, on_session_end=on_session_end) async def _on_reset_aggregation( self, controller: UserTurnController, strategy: BaseUserTurnStartStrategy ): logger.debug(f"{self}: Resetting aggregation (strategy: {strategy})") - await self._cancel_transcript_gather() + await self._cancel_post_turn_transcript_wait() await self.reset() - async def _cancel_transcript_gather(self): - """Cancel any active transcript-gather window without finalizing. + async def _cancel_post_turn_transcript_wait(self): + """Cancel any active post-turn transcript wait window without finalizing. Called from reset paths (interruption, explicit reset). "Reset" means "throw it away" — we don't flush a partial transcript that was about to be invalidated anyway. """ - if self._transcript_gather_task: - await self.cancel_task(self._transcript_gather_task) - self._transcript_gather_task = None - self._gathering_for_strategy = None - self._inference_during_gather = False + if self._post_turn_transcript_wait_task: + await self.cancel_task(self._post_turn_transcript_wait_task) + self._post_turn_transcript_wait_task = None + self._post_turn_transcript_wait_strategy = None + self._inference_during_post_turn_transcript_wait = False async def _on_user_turn_stop_timeout(self, controller): await self._call_event_handler("on_user_turn_stop_timeout") @@ -1304,7 +1310,7 @@ class LLMUserAggregator(LLMContextAggregator): ): """Finalize the user turn: flush the message, emit both events. - Used in the default mode (``_aggregator_gathers_transcripts == + Used in the default mode (``_wait_for_post_turn_transcripts == False``), where end of turn and user message finalization coincide. Emits both ``on_user_turn_stopped`` and ``on_user_turn_message_finalized``. @@ -1325,9 +1331,9 @@ class LLMUserAggregator(LLMContextAggregator): ): """Finalize the user message: flush to context, emit one event. - Used when the aggregator gathers transcripts after the user - turn ends (``_aggregator_gathers_transcripts == True``), where - user message finalization fires after the end of turn. Emits + Used when the aggregator runs a post-turn transcript wait + (``_wait_for_post_turn_transcripts == True``), where user + message finalization fires after the end of turn. Emits ``on_user_turn_message_finalized`` only; ``on_user_turn_stopped`` was already emitted at the end of turn. """ diff --git a/tests/test_context_aggregators_universal.py b/tests/test_context_aggregators_universal.py index 81c29b5f4..8cc33cf9f 100644 --- a/tests/test_context_aggregators_universal.py +++ b/tests/test_context_aggregators_universal.py @@ -767,10 +767,10 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase): """``wait_for_transcript_to_end_user_turn=False`` splits the lifecycle: - ``on_user_turn_stopped`` fires at the end of turn with empty - content (the aggregator hasn't gathered transcripts yet). + content (no transcripts have arrived yet). - Transcripts arriving after the end of turn are captured into ``_aggregation``. - - When the transcript-gather timer fires, + - When the post-turn transcript wait timer fires, ``on_user_turn_message_finalized`` fires with the populated user context message. """ @@ -817,7 +817,7 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase): # Transcripts arrive after the end of turn (just one # here for the basic case). TranscriptionFrame(text="Hello!", user_id="", timestamp="now"), - # Wait for the transcript-gather timer to fire. + # Wait for the post-turn transcript wait timer to fire. SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.05), ] await run_test(pipeline, frames_to_send=frames_to_send) @@ -830,10 +830,10 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase): user_messages = [m for m in context.get_messages() if m.get("role") == "user"] self.assertEqual([m["content"] for m in user_messages], ["Hello!"]) - async def test_no_wait_for_transcript_uses_stt_metadata_for_gather_timer(self): - """The transcript-gather timer prefers the STT-reported P99 TTFS + async def test_no_wait_for_transcript_uses_stt_metadata_for_wait_timer(self): + """The post-turn transcript wait timer prefers the STT-reported P99 TTFS over ``DEFAULT_TTFS_P99``. With a long ``DEFAULT_TTFS_P99`` and - a short STT-reported value, the gather completes by the shorter + a short STT-reported value, the wait completes by the shorter time — if the timer fell back to ``DEFAULT_TTFS_P99``, this test would hang. """ @@ -880,7 +880,7 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase): # fires turn-stopped. SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.05), TranscriptionFrame(text="Hello!", user_id="", timestamp="now"), - # Wait for the transcript-gather timer to fire (sized + # Wait for the post-turn transcript wait timer to fire (sized # to the STT-reported TTFS, not DEFAULT_TTFS_P99). SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.05), ] @@ -889,7 +889,7 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase): self.assertEqual(events, [("stopped", None), ("finalized", "Hello!")]) async def test_no_wait_for_transcript_no_transcripts_arrive(self): - """When no transcripts arrive, the transcript-gather timer still + """When no transcripts arrive, the post-turn transcript wait timer still runs — ``on_user_turn_message_finalized`` fires with empty content and nothing is written to context. """ @@ -999,7 +999,7 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase): # simplicity). TranscriptionFrame(text="Hello!", user_id="", timestamp="now"), SleepFrame(), - # Turn 2 starts before turn 1's transcript-gather timer + # Turn 2 starts before turn 1's post-turn transcript wait timer # fires — precondition violation. The aggregator should # force-flush turn 1 first. VADUserStartedSpeakingFrame(), @@ -1030,7 +1030,7 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase): Correct ordering requires the user aggregator's deferred ``push_aggregation`` to run before the assistant aggregator's ``push_aggregation`` (which fires on ``LLMFullResponseEndFrame``). - The patched-short transcript-gather timer plus the sleep + The patched-short post-turn transcript wait timer plus the sleep between LLM start and end make that constraint hold here. """ from unittest.mock import patch @@ -1067,11 +1067,11 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase): # service has finally emitted them — just one here). TranscriptionFrame(text="What's the weather?", user_id="", timestamp="now"), # Bot starts responding. Ordering correctness depends on - # the user's transcript-gather timer firing before + # the user's post-turn transcript wait timer firing before # LLMFullResponseEndFrame below. LLMFullResponseStartFrame(), LLMTextFrame("It's sunny."), - # Allow time for the user's transcript-gather timer to + # Allow time for the user's post-turn transcript wait timer to # fire (flushing the user message to context) before # the assistant turn ends. SleepFrame(sleep=0.1), @@ -1180,7 +1180,7 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase): At session end the aggregated text is flushed and ``on_user_turn_message_finalized`` fires with the content. ``on_user_turn_stopped`` doesn't fire — when the aggregator - gathers transcripts after the turn ends, it's reserved for + runs a post-turn transcript wait, that event is reserved for the end-of-turn path. """ from unittest.mock import patch