diff --git a/examples/turn-management/turn-management-filter-incomplete-turns.py b/examples/turn-management/turn-management-filter-incomplete-turns.py index 81273aaaa..72037943b 100644 --- a/examples/turn-management/turn-management-filter-incomplete-turns.py +++ b/examples/turn-management/turn-management-filter-incomplete-turns.py @@ -41,6 +41,10 @@ from pipecat.services.openai.llm import OpenAILLMService from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams +from pipecat.turns.user_turn_strategies import ( + UserTurnStrategies, + llm_completion_user_turn_stop_strategies, +) load_dotenv(override=True) @@ -83,23 +87,31 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ) context = LLMContext() + # `llm_completion_user_turn_stop_strategies()` pairs the default + # stop strategies with `LLMTurnCompletionUserTurnStopStrategy`: + # those strategies trigger LLM inference but the public + # `on_user_turn_stopped` event fires only when the LLM confirms ✓. + # The LLM marks each response with one of: + # ✓ = complete (respond normally) + # ○ = incomplete short (wait 5s, then prompt) + # ◐ = incomplete long (wait 15s, then prompt) user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, user_params=LLMUserAggregatorParams( vad_analyzer=SileroVADAnalyzer(), - # Enable turn completion filtering - the LLM will output: - # ✓ = complete (respond normally) - # ○ = incomplete short (wait 5s, then prompt) - # ◐ = incomplete long (wait 15s, then prompt) - filter_incomplete_user_turns=True, - # Optional: customize turn completion behavior - # turn_completion_config=TurnCompletionConfig( - # incomplete_short_timeout=5.0, - # incomplete_long_timeout=15.0, - # incomplete_short_prompt="Custom prompt...", - # incomplete_long_prompt="Custom prompt...", - # instructions="Custom turn completion instructions...", - # ), + user_turn_strategies=UserTurnStrategies( + stop=llm_completion_user_turn_stop_strategies(), + # Optional: customize turn completion behavior + # stop=llm_completion_user_turn_stop_strategies( + # config=UserTurnCompletionConfig( + # incomplete_short_timeout=5.0, + # incomplete_long_timeout=15.0, + # incomplete_short_prompt="Custom prompt...", + # incomplete_long_prompt="Custom prompt...", + # instructions="Custom turn completion instructions...", + # ), + # ), + ), ), ) diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 24006428d..505c9ca1d 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -970,6 +970,24 @@ class UserSpeakingFrame(SystemFrame): pass +@dataclass +class UserTurnCompletedFrame(SystemFrame): + """Frame indicating that the user turn is semantically complete. + + Emitted by any component that can judge conversational turn + completeness — for example an LLM with turn-completion markers, an + STT service with built-in turn detection, or a dedicated + end-of-turn classifier. Stop strategies that gate the + user-turn-stop event on an external completeness signal (e.g. + ``LLMTurnCompletionUserTurnStopStrategy``) consume this frame to + finalize the turn. Producers should emit this frame only when they + judge the turn complete; an absence of this frame means the turn is + not yet considered complete. + """ + + pass + + @dataclass class VADUserStartedSpeakingFrame(SystemFrame): """Frame emitted when VAD definitively detects user started speaking. diff --git a/src/pipecat/processors/aggregators/llm_response_universal.py b/src/pipecat/processors/aggregators/llm_response_universal.py index d1bc110de..1f61f33bb 100644 --- a/src/pipecat/processors/aggregators/llm_response_universal.py +++ b/src/pipecat/processors/aggregators/llm_response_universal.py @@ -53,7 +53,6 @@ from pipecat.frames.frames import ( LLMThoughtEndFrame, LLMThoughtStartFrame, LLMThoughtTextFrame, - LLMUpdateSettingsFrame, StartFrame, TextFrame, TranscriptionFrame, @@ -79,7 +78,6 @@ from pipecat.processors.aggregators.llm_context_summarizer import ( SummaryAppliedEvent, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -from pipecat.services.settings import LLMSettings from pipecat.turns.user_idle_controller import UserIdleController from pipecat.turns.user_mute import BaseUserMuteStrategy from pipecat.turns.user_start import BaseUserTurnStartStrategy, UserTurnStartedParams @@ -100,25 +98,6 @@ class LLMUserAggregatorParams: """Parameters for configuring LLM user aggregation behavior. Parameters: - user_turn_strategies: User turn start and stop strategies. - user_mute_strategies: List of user mute strategies. - user_turn_stop_timeout: Time in seconds to wait before considering the - user's turn finished. - user_idle_timeout: Timeout in seconds for detecting user idle state. - The aggregator will emit an `on_user_turn_idle` event when the user - has been idle (not speaking) for this duration. Set to 0 to disable - idle detection. - vad_analyzer: Voice Activity Detection analyzer instance. - audio_idle_timeout: Timeout in seconds to force speech stop when - no audio frames are received while in SPEAKING state (e.g. user mutes - mic mid-speech). Set to 0 to disable. Defaults to 1.0. - filter_incomplete_user_turns: Whether to filter out incomplete user turns. - When enabled, the LLM outputs a turn completion marker at the start of - each response: ✓ (complete), ○ (incomplete short), or ◐ (incomplete long). - Incomplete responses are suppressed and timeouts trigger re-prompting. - user_turn_completion_config: Configuration for turn completion behavior including - custom instructions, timeouts, and prompts. Only used when - filter_incomplete_user_turns is True. add_tool_change_messages: When True, on each ``LLMSetToolsFrame`` the aggregator computes the diff against the currently advertised tools and appends a developer-role message to the context describing @@ -131,17 +110,59 @@ class LLMUserAggregatorParams: (LLM-specific) tools are ignored. When using ``LLMContextAggregatorPair``, prefer setting this via its ``add_tool_change_messages`` argument instead. Defaults to False. + audio_idle_timeout: Timeout in seconds to force speech stop when + no audio frames are received while in SPEAKING state (e.g. user mutes + mic mid-speech). Set to 0 to disable. Defaults to 1.0. + user_turn_strategies: User turn start and stop strategies. + user_mute_strategies: List of user mute strategies. + user_turn_stop_timeout: Time in seconds to wait before considering the + user's turn finished. + user_idle_timeout: Timeout in seconds for detecting user idle state. + The aggregator will emit an `on_user_turn_idle` event when the user + has been idle (not speaking) for this duration. Set to 0 to disable + idle detection. + vad_analyzer: Voice Activity Detection analyzer instance. + filter_incomplete_user_turns: [DEPRECATED] Add + :class:`~pipecat.turns.user_stop.LLMTurnCompletionUserTurnStopStrategy` + to ``user_turn_strategies.stop`` instead. When enabled, the LLM + outputs a turn-completion marker at the start of each response: + ✓ (complete), ○ (incomplete short), or ◐ (incomplete long). + Incomplete responses are suppressed and timeouts trigger + re-prompting. + user_turn_completion_config: [DEPRECATED] Configuration for turn + completion behavior including custom instructions, timeouts, and + prompts. Only used when filter_incomplete_user_turns is True + (deprecated path) — for the new strategy-based API, pass the config + directly to ``LLMTurnCompletionUserTurnStopStrategy(config=...)``. """ + add_tool_change_messages: bool = False + audio_idle_timeout: float = 1.0 user_turn_strategies: UserTurnStrategies | None = None user_mute_strategies: list[BaseUserMuteStrategy] = field(default_factory=list) user_turn_stop_timeout: float = 5.0 user_idle_timeout: float = 0 vad_analyzer: VADAnalyzer | None = None - audio_idle_timeout: float = 1.0 filter_incomplete_user_turns: bool = False user_turn_completion_config: UserTurnCompletionConfig | None = None - add_tool_change_messages: bool = False + + def __post_init__(self): + if self.filter_incomplete_user_turns: + warnings.warn( + "LLMUserAggregatorParams.filter_incomplete_user_turns is deprecated. " + "Add LLMTurnCompletionUserTurnStopStrategy to " + "user_turn_strategies.stop instead.", + DeprecationWarning, + stacklevel=2, + ) + if self.user_turn_completion_config: + warnings.warn( + "LLMUserAggregatorParams.user_turn_completion_config is deprecated. " + "Add LLMTurnCompletionUserTurnStopStrategy to " + "user_turn_strategies.stop instead.", + DeprecationWarning, + stacklevel=2, + ) @dataclass @@ -541,13 +562,35 @@ class LLMUserAggregator(LLMContextAggregator): self._register_event_handler("on_user_turn_stopped") self._register_event_handler("on_user_turn_stop_timeout") self._register_event_handler("on_user_turn_idle") + self._register_event_handler("on_user_turn_inference_triggered") self._register_event_handler("on_user_mute_started") self._register_event_handler("on_user_mute_stopped") user_turn_strategies = self._params.user_turn_strategies or UserTurnStrategies() + # Deprecated path: translate filter_incomplete_user_turns into + # wrapping pre-existing stop strategies with deferred() and + # appending LLMTurnCompletionUserTurnStopStrategy. The + # DeprecationWarning is emitted in LLMUserAggregatorParams.__post_init__. + if self._params.filter_incomplete_user_turns: + from pipecat.turns.user_stop import ( + LLMTurnCompletionUserTurnStopStrategy, + deferred, + ) + + existing_stop = list(user_turn_strategies.stop or []) + user_turn_strategies.stop = [deferred(s) for s in existing_stop] + [ + LLMTurnCompletionUserTurnStopStrategy( + config=self._params.user_turn_completion_config + ) + ] + self._user_is_muted = False self._user_turn_start_timestamp = "" + # Aggregation captured at inference-trigger time and surfaced again + # in `on_user_turn_stopped`. Set to None when no inference-triggered + # event has fired since the last finalization. + self._pending_user_turn_aggregation: str | None = None self._user_turn_controller = UserTurnController( user_turn_strategies=user_turn_strategies, @@ -558,6 +601,9 @@ class LLMUserAggregator(LLMContextAggregator): self._user_turn_controller.add_event_handler( "on_user_turn_started", self._on_user_turn_started ) + self._user_turn_controller.add_event_handler( + "on_user_turn_inference_triggered", self._on_user_turn_inference_triggered + ) self._user_turn_controller.add_event_handler( "on_user_turn_stopped", self._on_user_turn_stopped ) @@ -676,21 +722,6 @@ class LLMUserAggregator(LLMContextAggregator): for s in self._params.user_mute_strategies: await s.setup(self.task_manager) - # Enable incomplete turn filtering on the LLM if configured - if self._params.filter_incomplete_user_turns: - # Get config or use defaults - config = self._params.user_turn_completion_config or UserTurnCompletionConfig() - - # Enable the feature on the LLM with config - await self.push_frame( - LLMUpdateSettingsFrame( - delta=LLMSettings( - filter_incomplete_user_turns=True, - user_turn_completion_config=config, - ) - ) - ) - async def _stop(self, frame: EndFrame): await self._maybe_emit_user_turn_stopped(on_session_end=True) await self._cleanup() @@ -830,6 +861,7 @@ class LLMUserAggregator(LLMContextAggregator): logger.debug(f"{self}: User started speaking (strategy: {strategy})") self._user_turn_start_timestamp = time_now_iso8601() + self._pending_user_turn_aggregation = None if params.enable_user_speaking_frames: await self.broadcast_frame(UserStartedSpeakingFrame) @@ -841,6 +873,22 @@ class LLMUserAggregator(LLMContextAggregator): await self._call_event_handler("on_user_turn_started", strategy) + async def _on_user_turn_inference_triggered( + self, + controller: UserTurnController, + strategy: BaseUserTurnStopStrategy, + ): + logger.debug(f"{self}: User turn inference triggered (strategy: {strategy})") + + # Push aggregation now: this writes the user message to the context + # and pushes LLMContextFrame, which is what kicks LLM inference. + # `on_user_turn_stopped` later fires when the turn is semantically + # final and surfaces the aggregated message via the public event. + aggregation = await self.push_aggregation() + self._pending_user_turn_aggregation = aggregation + + await self._call_event_handler("on_user_turn_inference_triggered", strategy) + async def _on_user_turn_stopped( self, controller: UserTurnController, @@ -875,15 +923,31 @@ class LLMUserAggregator(LLMContextAggregator): ): """Maybe emit user turn stopped event. + The aggregation has typically already been pushed at + inference-trigger time and is cached in + ``self._pending_user_turn_aggregation``. Any aggregation that has + accumulated since the last inference-trigger (e.g. transcriptions + that arrived between inference trigger and finalization) is flushed + here so end-of-turn content is never lost. + Args: strategy: The strategy that triggered the turn stop. on_session_end: If True, only emit if there's unemitted content (avoids duplicate events when session ends). """ aggregation = await self.push_aggregation() - if not on_session_end or aggregation: + previous_aggregation = self._pending_user_turn_aggregation + self._pending_user_turn_aggregation = None + + content = None + if aggregation and previous_aggregation: + content = f"{previous_aggregation} {aggregation}".strip() + else: + content = previous_aggregation or aggregation + + if not on_session_end or content: message = UserTurnStoppedMessage( - content=aggregation, timestamp=self._user_turn_start_timestamp + content=content, timestamp=self._user_turn_start_timestamp ) await self._call_event_handler("on_user_turn_stopped", strategy, message) self._user_turn_start_timestamp = "" diff --git a/src/pipecat/turns/user_stop/__init__.py b/src/pipecat/turns/user_stop/__init__.py index 7ff676744..4effd6d5c 100644 --- a/src/pipecat/turns/user_stop/__init__.py +++ b/src/pipecat/turns/user_stop/__init__.py @@ -5,14 +5,19 @@ # from .base_user_turn_stop_strategy import BaseUserTurnStopStrategy, UserTurnStoppedParams +from .deferred_user_turn_stop_strategy import DeferredUserTurnStopStrategy, deferred from .external_user_turn_stop_strategy import ExternalUserTurnStopStrategy +from .llm_turn_completion_user_turn_stop_strategy import LLMTurnCompletionUserTurnStopStrategy from .speech_timeout_user_turn_stop_strategy import SpeechTimeoutUserTurnStopStrategy from .turn_analyzer_user_turn_stop_strategy import TurnAnalyzerUserTurnStopStrategy __all__ = [ "BaseUserTurnStopStrategy", + "DeferredUserTurnStopStrategy", "ExternalUserTurnStopStrategy", + "LLMTurnCompletionUserTurnStopStrategy", "SpeechTimeoutUserTurnStopStrategy", "UserTurnStoppedParams", "TurnAnalyzerUserTurnStopStrategy", + "deferred", ] diff --git a/src/pipecat/turns/user_stop/base_user_turn_stop_strategy.py b/src/pipecat/turns/user_stop/base_user_turn_stop_strategy.py index e5dc1bd66..6f303282c 100644 --- a/src/pipecat/turns/user_stop/base_user_turn_stop_strategy.py +++ b/src/pipecat/turns/user_stop/base_user_turn_stop_strategy.py @@ -45,7 +45,16 @@ class BaseUserTurnStopStrategy(BaseObject): Events triggered by strategies: - `on_push_frame`: Indicates the strategy wants to push a frame. - - `on_user_turn_stopped`: Signals that the user stopped speaking. + - `on_user_turn_inference_triggered`: Signals that enough evidence + exists to start LLM inference for the current user turn. In most + cases this fires together with `on_user_turn_stopped`. Strategies + that gate finalization on the LLM (e.g. + ``LLMTurnCompletionUserTurnStopStrategy``) fire only this event + upstream and a separate strategy fires `on_user_turn_stopped` once + the LLM confirms the turn is complete. + - `on_user_turn_stopped`: Signals that the user turn is semantically + final. Observers, transcript appenders, and UI indicators should + bind this event. """ @@ -64,6 +73,7 @@ class BaseUserTurnStopStrategy(BaseObject): self._task_manager: BaseTaskManager | None = None self._register_event_handler("on_push_frame", sync=True) self._register_event_handler("on_broadcast_frame", sync=True) + self._register_event_handler("on_user_turn_inference_triggered", sync=True) self._register_event_handler("on_user_turn_stopped", sync=True) @property @@ -123,7 +133,23 @@ class BaseUserTurnStopStrategy(BaseObject): await self._call_event_handler("on_broadcast_frame", frame_cls, **kwargs) async def trigger_user_turn_stopped(self): - """Trigger the `on_user_turn_stopped` event.""" + """Fire both ``on_user_turn_inference_triggered`` and ``on_user_turn_stopped``. + + Most strategies call this when they decide a turn has ended. To + defer finalization to another strategy (so this strategy fires + only the inference-triggered event), wrap this strategy with + :func:`~pipecat.turns.user_stop.deferred` instead of changing + the trigger call. + """ + await self.trigger_user_turn_inference_triggered() + await self.trigger_user_turn_finalized() + + async def trigger_user_turn_inference_triggered(self): + """Trigger only the `on_user_turn_inference_triggered` event.""" + await self._call_event_handler("on_user_turn_inference_triggered") + + async def trigger_user_turn_finalized(self): + """Trigger only the `on_user_turn_stopped` event.""" await self._call_event_handler( "on_user_turn_stopped", UserTurnStoppedParams(enable_user_speaking_frames=self._enable_user_speaking_frames), diff --git a/src/pipecat/turns/user_stop/deferred_user_turn_stop_strategy.py b/src/pipecat/turns/user_stop/deferred_user_turn_stop_strategy.py new file mode 100644 index 000000000..7f3296af3 --- /dev/null +++ b/src/pipecat/turns/user_stop/deferred_user_turn_stop_strategy.py @@ -0,0 +1,104 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Wrapper that defers a stop strategy's finalization to another strategy.""" + +from pipecat.frames.frames import Frame +from pipecat.turns.types import ProcessFrameResult +from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy +from pipecat.utils.asyncio.task_manager import BaseTaskManager + + +class DeferredUserTurnStopStrategy(BaseUserTurnStopStrategy): + """Wraps a stop strategy and suppresses its ``on_user_turn_stopped`` event. + + Event subscriptions added to the wrapper are forwarded directly to + the inner strategy, except for ``on_user_turn_stopped``, which is + dropped. The inner strategy's frame-side and inference-triggered + events therefore reach external listeners (the controller, etc.) + unchanged; finalization is left to another strategy in the chain + such as ``LLMTurnCompletionUserTurnStopStrategy``. + + Use the :func:`deferred` helper for ergonomic construction:: + + stop=[ + deferred(TurnAnalyzerUserTurnStopStrategy(turn_analyzer=...)), + LLMTurnCompletionUserTurnStopStrategy(), + ] + """ + + def __init__(self, inner: BaseUserTurnStopStrategy, **kwargs): + """Initialize the deferred wrapper. + + Args: + inner: The strategy whose finalization should be deferred. + **kwargs: Additional keyword arguments forwarded to the base + class. + """ + super().__init__(**kwargs) + self._inner = inner + + @property + def inner(self) -> BaseUserTurnStopStrategy: + """Return the wrapped strategy.""" + return self._inner + + def add_event_handler(self, event_name: str, handler): + """Forward event subscriptions to the inner strategy. + + ``on_user_turn_stopped`` is silently dropped — that's the whole + point of the wrapper. Every other event handler is attached to + the inner strategy directly, so the inner's events reach the + listener without any per-event proxy method on the wrapper. + """ + if event_name == "on_user_turn_stopped": + return + self._inner.add_event_handler(event_name, handler) + + async def setup(self, task_manager: BaseTaskManager): + """Set up the inner strategy.""" + await super().setup(task_manager) + await self._inner.setup(task_manager) + + async def cleanup(self): + """Clean up the inner strategy.""" + await super().cleanup() + await self._inner.cleanup() + + async def reset(self): + """Reset the inner strategy for a new user turn.""" + await super().reset() + await self._inner.reset() + + async def process_frame(self, frame: Frame) -> ProcessFrameResult | None: + """Forward frame processing to the inner strategy.""" + return await self._inner.process_frame(frame) + + +def deferred(strategy: BaseUserTurnStopStrategy) -> DeferredUserTurnStopStrategy: + """Defer this stop strategy's finalization to another strategy. + + Wraps ``strategy`` in a :class:`DeferredUserTurnStopStrategy`: the + inner strategy continues to drive inference-triggered events, but + its ``on_user_turn_stopped`` event is suppressed. Use when another + strategy in the chain (e.g. + ``LLMTurnCompletionUserTurnStopStrategy``) owns finalization. + + Example:: + + stop=[ + deferred(TurnAnalyzerUserTurnStopStrategy(turn_analyzer=...)), + LLMTurnCompletionUserTurnStopStrategy(), + ] + + Args: + strategy: The stop strategy to defer. + + Returns: + A wrapper that exposes the inner strategy's behavior with + finalization suppressed. + """ + return DeferredUserTurnStopStrategy(strategy) diff --git a/src/pipecat/turns/user_stop/llm_turn_completion_user_turn_stop_strategy.py b/src/pipecat/turns/user_stop/llm_turn_completion_user_turn_stop_strategy.py new file mode 100644 index 000000000..e75a6d3da --- /dev/null +++ b/src/pipecat/turns/user_stop/llm_turn_completion_user_turn_stop_strategy.py @@ -0,0 +1,92 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""User turn stop strategy gated on the LLM's turn-completion verdict.""" + +from pipecat.frames.frames import ( + Frame, + LLMUpdateSettingsFrame, + StartFrame, + UserTurnCompletedFrame, +) +from pipecat.services.settings import LLMSettings +from pipecat.turns.types import ProcessFrameResult +from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy +from pipecat.turns.user_turn_completion_mixin import UserTurnCompletionConfig + + +class LLMTurnCompletionUserTurnStopStrategy(BaseUserTurnStopStrategy): + """User turn stop strategy that finalizes only when the LLM agrees. + + This strategy lets another stop strategy (e.g. smart-turn analyzer) + trigger LLM inference, then defers the public ``on_user_turn_stopped`` + event until the LLM emits a ``UserTurnCompletedFrame``. On + ``incomplete_short`` / ``incomplete_long`` markers the + :class:`~pipecat.turns.user_turn_completion_mixin.UserTurnCompletionLLMServiceMixin` + re-prompts the LLM internally and no completion frame is emitted. + + To use this strategy, install it alongside one or more upstream stop + strategies in ``UserTurnStrategies.stop`` and wrap those upstream + strategies with :func:`~pipecat.turns.user_stop.deferred` so they + fire only ``on_user_turn_inference_triggered`` and leave + finalization to this strategy. The aggregator's deprecation path + for ``filter_incomplete_user_turns`` does this rewiring + automatically. + + If the LLM never returns a completion frame (malformed output, + unreachable service, etc.), the controller's + ``user_turn_stop_timeout`` watchdog is the safety net — it fires + ``on_user_turn_stopped`` after no activity for that many seconds. + Tune ``user_turn_stop_timeout`` higher if your LLM regularly takes + longer than the default to respond. + + On ``StartFrame`` the strategy pushes an ``LLMUpdateSettingsFrame`` + upstream that enables ``filter_incomplete_user_turns`` on the LLM + service and seeds the + :class:`~pipecat.turns.user_turn_completion_mixin.UserTurnCompletionConfig`. + """ + + def __init__( + self, + *, + config: UserTurnCompletionConfig | None = None, + **kwargs, + ): + """Initialize the LLM turn-completion stop strategy. + + Args: + config: Configuration applied to the LLM via the + ``filter_incomplete_user_turns`` setting on + ``StartFrame``. Defaults to ``UserTurnCompletionConfig()``. + **kwargs: Additional keyword arguments forwarded to the base + class. + """ + super().__init__(**kwargs) + self._config = config or UserTurnCompletionConfig() + + @property + def config(self) -> UserTurnCompletionConfig: + """Return the configured ``UserTurnCompletionConfig``.""" + return self._config + + async def process_frame(self, frame: Frame) -> ProcessFrameResult: + """Observe frames to drive the finalization decision.""" + if isinstance(frame, StartFrame): + await self._configure_llm() + elif isinstance(frame, UserTurnCompletedFrame): + await self.trigger_user_turn_finalized() + + return ProcessFrameResult.CONTINUE + + async def _configure_llm(self): + await self.push_frame( + LLMUpdateSettingsFrame( + delta=LLMSettings( + filter_incomplete_user_turns=True, + user_turn_completion_config=self._config, + ) + ) + ) diff --git a/src/pipecat/turns/user_turn_completion_mixin.py b/src/pipecat/turns/user_turn_completion_mixin.py index 51d6c7828..606ba9902 100644 --- a/src/pipecat/turns/user_turn_completion_mixin.py +++ b/src/pipecat/turns/user_turn_completion_mixin.py @@ -25,6 +25,7 @@ from pipecat.frames.frames import ( LLMMessagesAppendFrame, LLMRunFrame, LLMTextFrame, + UserTurnCompletedFrame, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor @@ -279,7 +280,7 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor): await asyncio.sleep(timeout) # Timeout expired - reset state before prompting LLM - logger.info(f"Incomplete {incomplete_type} timeout expired, prompting LLM") + logger.debug(f"Incomplete {incomplete_type} timeout expired, prompting LLM") await self._turn_reset() self._incomplete_timeout_task = None self._incomplete_type = None @@ -402,6 +403,10 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor): ) self._turn_suppressed = True + # No UserTurnCompletedFrame is broadcast here: the turn is + # explicitly not complete. The re-prompt path is driven by + # this mixin's own timeout. + # Push the marker with skip_tts=True so it's added to context (maintains # conversation continuity per prompt instructions) but not spoken by TTS frame = LLMTextFrame(self._turn_text_buffer) @@ -416,6 +421,13 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor): if USER_TURN_COMPLETE_MARKER in self._turn_text_buffer: logger.debug(f"COMPLETE ({USER_TURN_COMPLETE_MARKER}) detected, pushing buffered text") + # Broadcast that the user turn is complete so a stop strategy + # gating finalization on this signal (e.g. + # LLMTurnCompletionUserTurnStopStrategy) can fire + # `on_user_turn_stopped`. Must fire before the LLMTextFrame so + # downstream consumers see the signal before the response. + await self.broadcast_frame(UserTurnCompletedFrame) + # Split buffer at the marker to handle cases where marker and text # arrive in the same chunk (e.g., "✓ Hello!" from some LLMs) marker_pos = self._turn_text_buffer.index(USER_TURN_COMPLETE_MARKER) diff --git a/src/pipecat/turns/user_turn_controller.py b/src/pipecat/turns/user_turn_controller.py index 2ba4e304b..974a8ff0f 100644 --- a/src/pipecat/turns/user_turn_controller.py +++ b/src/pipecat/turns/user_turn_controller.py @@ -38,7 +38,11 @@ class UserTurnController(BaseObject): Event handlers available: - on_user_turn_started: Emitted when a user turn starts. - - on_user_turn_stopped: Emitted when a user turn stops. + - on_user_turn_inference_triggered: Emitted when enough signal exists to + start LLM inference. Fires together with `on_user_turn_stopped` for + most strategies; fires alone when a downstream strategy gates + finalization on the LLM's verdict. + - on_user_turn_stopped: Emitted when a user turn is semantically final. - on_user_turn_stop_timeout: Emitted if no stop strategy triggers before timeout. - on_push_frame: Emitted when a strategy wants to push a frame. - on_broadcast_frame: Emitted when a strategy wants to broadcast a frame. @@ -49,6 +53,10 @@ class UserTurnController(BaseObject): async def on_user_turn_started(controller, strategy: BaseUserTurnStartStrategy, params: UserTurnStartedParams): ... + @controller.event_handler("on_user_turn_inference_triggered") + async def on_user_turn_inference_triggered(controller, strategy: BaseUserTurnStopStrategy): + ... + @controller.event_handler("on_user_turn_stopped") async def on_user_turn_stopped(controller, strategy: BaseUserTurnStopStrategy, params: UserTurnStoppedParams): ... @@ -95,6 +103,7 @@ class UserTurnController(BaseObject): self._register_event_handler("on_push_frame", sync=True) self._register_event_handler("on_broadcast_frame", sync=True) self._register_event_handler("on_user_turn_started", sync=True) + self._register_event_handler("on_user_turn_inference_triggered", sync=True) self._register_event_handler("on_user_turn_stopped", sync=True) self._register_event_handler("on_user_turn_stop_timeout", sync=True) self._register_event_handler("on_reset_aggregation", sync=True) @@ -186,6 +195,9 @@ class UserTurnController(BaseObject): await s.setup(self.task_manager) s.add_event_handler("on_push_frame", self._on_push_frame) s.add_event_handler("on_broadcast_frame", self._on_broadcast_frame) + s.add_event_handler( + "on_user_turn_inference_triggered", self._on_user_turn_inference_triggered + ) s.add_event_handler("on_user_turn_stopped", self._on_user_turn_stopped) async def _cleanup_strategies(self): @@ -246,6 +258,9 @@ class UserTurnController(BaseObject): ): await self._trigger_user_turn_start(strategy, params) + async def _on_user_turn_inference_triggered(self, strategy: BaseUserTurnStopStrategy): + await self._trigger_user_turn_inference_triggered(strategy) + async def _on_user_turn_stopped( self, strategy: BaseUserTurnStopStrategy, params: UserTurnStoppedParams ): @@ -274,6 +289,20 @@ class UserTurnController(BaseObject): await self._call_event_handler("on_user_turn_started", strategy, params) + async def _trigger_user_turn_inference_triggered( + self, strategy: BaseUserTurnStopStrategy | None + ): + # Inference-triggered fires only while a turn is active. The turn + # remains active afterward — only `on_user_turn_stopped` flips state. + if not self._user_turn: + return + + # Re-arm the stop watchdog so a stuck turn (inference fired but + # finalization never arrives) still times out and finalizes. + self._user_turn_stop_timeout_event.set() + + await self._call_event_handler("on_user_turn_inference_triggered", strategy) + async def _trigger_user_turn_stop( self, strategy: BaseUserTurnStopStrategy | None, params: UserTurnStoppedParams ): diff --git a/src/pipecat/turns/user_turn_processor.py b/src/pipecat/turns/user_turn_processor.py index 536d297b5..54fe533e0 100644 --- a/src/pipecat/turns/user_turn_processor.py +++ b/src/pipecat/turns/user_turn_processor.py @@ -35,7 +35,11 @@ class UserTurnProcessor(FrameProcessor): Event handlers available: - on_user_turn_started: Emitted when a user turn starts. - - on_user_turn_stopped: Emitted when a user turn stops. + - on_user_turn_inference_triggered: Emitted when enough signal exists to + start LLM inference. Fires together with `on_user_turn_stopped` for + most strategies; fires alone when a downstream strategy gates + finalization on the LLM's verdict. + - on_user_turn_stopped: Emitted when a user turn is semantically final. - on_user_turn_stop_timeout: Emitted if no stop strategy triggers before timeout. - on_user_turn_idle: Emitted when the user has been idle for the configured timeout. @@ -45,6 +49,10 @@ class UserTurnProcessor(FrameProcessor): async def on_user_turn_started(processor, strategy: BaseUserTurnStartStrategy): ... + @processor.event_handler("on_user_turn_inference_triggered") + async def on_user_turn_inference_triggered(processor, strategy: BaseUserTurnStopStrategy): + ... + @processor.event_handler("on_user_turn_stopped") async def on_user_turn_stopped(processor, strategy: BaseUserTurnStopStrategy): ... @@ -85,6 +93,7 @@ class UserTurnProcessor(FrameProcessor): self._register_event_handler("on_user_turn_stopped") self._register_event_handler("on_user_turn_stop_timeout") self._register_event_handler("on_user_turn_idle") + self._register_event_handler("on_user_turn_inference_triggered") self._user_turn_controller = UserTurnController( user_turn_strategies=user_turn_strategies or UserTurnStrategies(), @@ -101,6 +110,9 @@ class UserTurnProcessor(FrameProcessor): self._user_turn_controller.add_event_handler( "on_user_turn_stop_timeout", self._on_user_turn_stop_timeout ) + self._user_turn_controller.add_event_handler( + "on_user_turn_inference_triggered", self._on_user_turn_inference_triggered + ) self._user_idle_controller = UserIdleController(user_idle_timeout=user_idle_timeout) self._user_idle_controller.add_event_handler("on_user_turn_idle", self._on_user_turn_idle) @@ -204,3 +216,11 @@ class UserTurnProcessor(FrameProcessor): async def _on_user_turn_idle(self, controller): await self._call_event_handler("on_user_turn_idle") + + async def _on_user_turn_inference_triggered( + self, + controller: UserTurnController, + strategy: BaseUserTurnStopStrategy, + ): + logger.debug(f"{self}: User turn inference triggered (strategy: {strategy})") + await self._call_event_handler("on_user_turn_inference_triggered", strategy) diff --git a/src/pipecat/turns/user_turn_strategies.py b/src/pipecat/turns/user_turn_strategies.py index 8bbe87a7a..56c7d9e9f 100644 --- a/src/pipecat/turns/user_turn_strategies.py +++ b/src/pipecat/turns/user_turn_strategies.py @@ -17,8 +17,11 @@ from pipecat.turns.user_start import ( from pipecat.turns.user_stop import ( BaseUserTurnStopStrategy, ExternalUserTurnStopStrategy, + LLMTurnCompletionUserTurnStopStrategy, TurnAnalyzerUserTurnStopStrategy, + deferred, ) +from pipecat.turns.user_turn_completion_mixin import UserTurnCompletionConfig def default_user_turn_start_strategies() -> list[BaseUserTurnStartStrategy]: @@ -48,6 +51,42 @@ def default_user_turn_stop_strategies() -> list[BaseUserTurnStopStrategy]: return [TurnAnalyzerUserTurnStopStrategy(turn_analyzer=LocalSmartTurnAnalyzerV3())] +def llm_completion_user_turn_stop_strategies( + strategies: list[BaseUserTurnStopStrategy] | None = None, + *, + config: UserTurnCompletionConfig | None = None, +) -> list[BaseUserTurnStopStrategy]: + """Build a stop-strategy list gated on the LLM's turn-completion verdict. + + Wraps ``strategies`` with :func:`deferred` so they trigger inference + but don't fire ``on_user_turn_stopped`` themselves, then appends + :class:`~pipecat.turns.user_stop.LLMTurnCompletionUserTurnStopStrategy` + as the finalizer. Use as the ``stop`` field of a + :class:`UserTurnStrategies`:: + + UserTurnStrategies( + stop=llm_completion_user_turn_stop_strategies(), + ) + + Args: + strategies: Stop strategies that should drive inference. If + None, uses :func:`default_user_turn_stop_strategies`. + config: Optional configuration applied to the LLM via the + ``filter_incomplete_user_turns`` setting. Customizes the + turn-completion instructions, incomplete-turn timeouts, and + re-prompts. + + Returns: + ``[deferred(s) for s in strategies] + + [LLMTurnCompletionUserTurnStopStrategy(config=config)]``. + """ + strategies = strategies if strategies is not None else default_user_turn_stop_strategies() + return [ + *(deferred(s) for s in strategies), + LLMTurnCompletionUserTurnStopStrategy(config=config), + ] + + @dataclass class UserTurnStrategies: """Container for user turn start and stop strategies. diff --git a/tests/test_context_aggregators_universal.py b/tests/test_context_aggregators_universal.py index cefc1b200..01ca70c80 100644 --- a/tests/test_context_aggregators_universal.py +++ b/tests/test_context_aggregators_universal.py @@ -179,8 +179,16 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase): assert context.messages[0]["content"] == "Hi there!" async def test_llm_messages_update_does_not_inject_turn_completion_into_context(self): + from pipecat.turns.user_turn_strategies import ( + llm_completion_user_turn_stop_strategies, + ) + context = LLMContext() - params = LLMUserAggregatorParams(filter_incomplete_user_turns=True) + params = LLMUserAggregatorParams( + user_turn_strategies=UserTurnStrategies( + stop=llm_completion_user_turn_stop_strategies(), + ), + ) pipeline = Pipeline([LLMUserAggregator(context, params=params)]) new_messages = [ @@ -291,8 +299,8 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase): UserStartedSpeakingFrame, InterruptionFrame, VADUserStoppedSpeakingFrame, - UserStoppedSpeakingFrame, LLMContextFrame, + UserStoppedSpeakingFrame, ] await run_test( pipeline, @@ -563,6 +571,132 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase): expected_down_frames=[SpeechControlParamsFrame], ) + async def test_inference_triggered_event_fires_on_default_strategies(self): + """Default flow fires inference-triggered before stopped, both with the same strategy.""" + from pipecat.frames.frames import UserTurnCompletedFrame # noqa: F401 + + context = LLMContext() + user_aggregator = LLMUserAggregator( + context, + params=LLMUserAggregatorParams( + user_turn_strategies=UserTurnStrategies( + stop=[ + SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT) + ] + ), + ), + ) + + events: list[str] = [] + + @user_aggregator.event_handler("on_user_turn_inference_triggered") + async def on_inference_triggered(aggregator, strategy): + events.append("inference_triggered") + + @user_aggregator.event_handler("on_user_turn_stopped") + async def on_stopped(aggregator, strategy, message): + events.append(f"stopped:{message.content}") + + pipeline = Pipeline([user_aggregator]) + frames_to_send = [ + VADUserStartedSpeakingFrame(), + TranscriptionFrame(text="Hi!", user_id="", timestamp="now"), + SleepFrame(), + VADUserStoppedSpeakingFrame(), + SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.1), + ] + await run_test(pipeline, frames_to_send=frames_to_send) + + self.assertEqual(events, ["inference_triggered", "stopped:Hi!"]) + + async def test_filter_incomplete_user_turns_emits_deprecation_warning(self): + """Setting the legacy flag emits a DeprecationWarning.""" + import warnings + + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + LLMUserAggregatorParams(filter_incomplete_user_turns=True) + matched = [ + x + for x in w + if issubclass(x.category, DeprecationWarning) + and "filter_incomplete_user_turns" in str(x.message) + ] + self.assertTrue(matched, "expected a DeprecationWarning") + + async def test_filter_incomplete_user_turns_installs_strategy(self): + """Legacy flag wraps existing stops with deferred() and appends the LLM strategy.""" + import warnings + + from pipecat.turns.user_stop import ( + DeferredUserTurnStopStrategy, + LLMTurnCompletionUserTurnStopStrategy, + SpeechTimeoutUserTurnStopStrategy, + ) + + existing = SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT) + + context = LLMContext() + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + params = LLMUserAggregatorParams( + filter_incomplete_user_turns=True, + user_turn_strategies=UserTurnStrategies(stop=[existing]), + ) + aggregator = LLMUserAggregator(context, params=params) + + stop_strategies = aggregator._params.user_turn_strategies.stop + self.assertEqual(len(stop_strategies), 2) + self.assertIsInstance(stop_strategies[0], DeferredUserTurnStopStrategy) + self.assertIs(stop_strategies[0].inner, existing) + self.assertIsInstance(stop_strategies[1], LLMTurnCompletionUserTurnStopStrategy) + + async def test_llm_completion_strategy_finalizes_on_complete_marker(self): + """LLMTurnCompletionUserTurnStopStrategy finalizes only on UserTurnCompletedFrame(complete).""" + from pipecat.frames.frames import UserTurnCompletedFrame + from pipecat.turns.user_stop import LLMTurnCompletionUserTurnStopStrategy, deferred + + gating = LLMTurnCompletionUserTurnStopStrategy() + upstream = deferred( + SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT) + ) + context = LLMContext() + user_aggregator = LLMUserAggregator( + context, + params=LLMUserAggregatorParams( + user_turn_strategies=UserTurnStrategies(stop=[upstream, gating]), + ), + ) + + events: list[str] = [] + + @user_aggregator.event_handler("on_user_turn_inference_triggered") + async def on_inference_triggered(aggregator, strategy): + events.append("inference_triggered") + + @user_aggregator.event_handler("on_user_turn_stopped") + async def on_stopped(aggregator, strategy, message): + events.append("stopped") + + pipeline = Pipeline([user_aggregator]) + + # Drive the pipeline. Inference fires after the upstream + # strategy's timeout. Stop fires only when UserTurnCompletedFrame + # arrives (producer absence == "not yet complete"). + frames_to_send = [ + VADUserStartedSpeakingFrame(), + TranscriptionFrame(text="Hi", user_id="", timestamp="now"), + SleepFrame(), + VADUserStoppedSpeakingFrame(), + SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.1), + # At this point inference_triggered should have fired but NOT stopped. + UserTurnCompletedFrame(), + SleepFrame(), + ] + await run_test(pipeline, frames_to_send=frames_to_send) + + self.assertEqual(events, ["inference_triggered", "stopped"]) + class TestLLMAssistantAggregator(unittest.IsolatedAsyncioTestCase): async def test_empty(self): diff --git a/tests/test_user_turn_completion_mixin.py b/tests/test_user_turn_completion_mixin.py index e503ecbf2..66a48b51f 100644 --- a/tests/test_user_turn_completion_mixin.py +++ b/tests/test_user_turn_completion_mixin.py @@ -8,7 +8,7 @@ import unittest import unittest.mock from unittest.mock import AsyncMock -from pipecat.frames.frames import LLMFullResponseEndFrame, LLMTextFrame +from pipecat.frames.frames import LLMFullResponseEndFrame, LLMTextFrame, UserTurnCompletedFrame from pipecat.processors.frame_processor import FrameProcessor from pipecat.services.llm_service import LLMService from pipecat.services.settings import LLMSettings @@ -44,21 +44,25 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase # Simulate LLM generating: "✓ Hello there!" await processor._push_turn_text(f"{USER_TURN_COMPLETE_MARKER} Hello there!") - # Should have 2 text frames: marker (skip_tts) and content (normal) - self.assertEqual(len(pushed_frames), 2) + # Two LLMTextFrames: marker (skip_tts) and content (normal). The + # broadcast also pushes UserTurnCompletedFrame upstream + downstream. + text_frames = [f for f in pushed_frames if isinstance(f, LLMTextFrame)] + self.assertEqual(len(text_frames), 2) - # First frame should be the marker with skip_tts=True - self.assertIsInstance(pushed_frames[0], LLMTextFrame) - self.assertEqual(pushed_frames[0].text, USER_TURN_COMPLETE_MARKER) - self.assertTrue(pushed_frames[0].skip_tts) + # First text frame should be the marker with skip_tts=True + self.assertEqual(text_frames[0].text, USER_TURN_COMPLETE_MARKER) + self.assertTrue(text_frames[0].skip_tts) - # Second frame should be the actual text without skip_tts - self.assertIsInstance(pushed_frames[1], LLMTextFrame) - self.assertEqual(pushed_frames[1].text, "Hello there!") - self.assertFalse(pushed_frames[1].skip_tts) + # Second text frame should be the actual text without skip_tts + self.assertEqual(text_frames[1].text, "Hello there!") + self.assertFalse(text_frames[1].skip_tts) + + # UserTurnCompletedFrame broadcast in both directions. + completed = [f for f in pushed_frames if isinstance(f, UserTurnCompletedFrame)] + self.assertEqual(len(completed), 2) async def test_incomplete_short_marker_suppresses_text(self): - """Test that ○ marker suppresses text with skip_tts.""" + """Test that ○ marker suppresses text with skip_tts and emits no completed frame.""" processor = MockProcessor() pushed_frames = [] @@ -70,14 +74,17 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase await processor._push_turn_text(USER_TURN_INCOMPLETE_SHORT_MARKER) - # Should have 1 text frame with skip_tts=True - self.assertEqual(len(pushed_frames), 1) - self.assertIsInstance(pushed_frames[0], LLMTextFrame) - self.assertEqual(pushed_frames[0].text, USER_TURN_INCOMPLETE_SHORT_MARKER) - self.assertTrue(pushed_frames[0].skip_tts) + text_frames = [f for f in pushed_frames if isinstance(f, LLMTextFrame)] + self.assertEqual(len(text_frames), 1) + self.assertEqual(text_frames[0].text, USER_TURN_INCOMPLETE_SHORT_MARKER) + self.assertTrue(text_frames[0].skip_tts) + + # Incomplete markers do not emit UserTurnCompletedFrame. + completed = [f for f in pushed_frames if isinstance(f, UserTurnCompletedFrame)] + self.assertEqual(len(completed), 0) async def test_incomplete_long_marker_suppresses_text(self): - """Test that ◐ marker suppresses text with skip_tts.""" + """Test that ◐ marker suppresses text with skip_tts and emits no completed frame.""" processor = MockProcessor() pushed_frames = [] @@ -89,11 +96,13 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase await processor._push_turn_text(USER_TURN_INCOMPLETE_LONG_MARKER) - # Should have 1 text frame with skip_tts=True - self.assertEqual(len(pushed_frames), 1) - self.assertIsInstance(pushed_frames[0], LLMTextFrame) - self.assertEqual(pushed_frames[0].text, USER_TURN_INCOMPLETE_LONG_MARKER) - self.assertTrue(pushed_frames[0].skip_tts) + text_frames = [f for f in pushed_frames if isinstance(f, LLMTextFrame)] + self.assertEqual(len(text_frames), 1) + self.assertEqual(text_frames[0].text, USER_TURN_INCOMPLETE_LONG_MARKER) + self.assertTrue(text_frames[0].skip_tts) + + completed = [f for f in pushed_frames if isinstance(f, UserTurnCompletedFrame)] + self.assertEqual(len(completed), 0) async def test_text_buffered_until_marker_found(self): """Test that text is buffered until a marker is detected.""" @@ -114,8 +123,10 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase # Now send the complete marker await processor._push_turn_text(f" {USER_TURN_COMPLETE_MARKER} How are you?") - # Now frames should be pushed - self.assertEqual(len(pushed_frames), 2) + # Two LLMTextFrames pushed (marker + content) plus the + # UserTurnCompletedFrame broadcast. + text_frames = [f for f in pushed_frames if isinstance(f, LLMTextFrame)] + self.assertEqual(len(text_frames), 2) async def test_turn_state_reset_after_llm_full_response_end_frame(self): """Test that _turn_complete_found is reset when LLMFullResponseEndFrame is pushed.""" diff --git a/tests/test_user_turn_controller.py b/tests/test_user_turn_controller.py index 2883d39bd..8365d48df 100644 --- a/tests/test_user_turn_controller.py +++ b/tests/test_user_turn_controller.py @@ -19,7 +19,7 @@ from pipecat.turns.user_start import VADUserTurnStartStrategy from pipecat.turns.user_start.min_words_user_turn_start_strategy import ( MinWordsUserTurnStartStrategy, ) -from pipecat.turns.user_stop import SpeechTimeoutUserTurnStopStrategy +from pipecat.turns.user_stop import SpeechTimeoutUserTurnStopStrategy, deferred from pipecat.turns.user_turn_controller import UserTurnController from pipecat.turns.user_turn_strategies import ExternalUserTurnStrategies, UserTurnStrategies from pipecat.utils.asyncio.task_manager import TaskManager, TaskManagerParams @@ -71,6 +71,66 @@ class TestUserTurnController(unittest.IsolatedAsyncioTestCase): await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1) self.assertTrue(should_stop) + async def test_inference_triggered_fires_alongside_stopped(self): + """Default strategies fire both inference-triggered and stopped, in order.""" + controller = UserTurnController( + user_turn_strategies=UserTurnStrategies( + stop=[SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT)], + ) + ) + + await controller.setup(self.task_manager) + + events: list[str] = [] + + @controller.event_handler("on_user_turn_inference_triggered") + async def on_user_turn_inference_triggered(controller, strategy): + events.append("inference_triggered") + + @controller.event_handler("on_user_turn_stopped") + async def on_user_turn_stopped(controller, strategy, params): + events.append("stopped") + + await controller.process_frame(VADUserStartedSpeakingFrame()) + await controller.process_frame( + TranscriptionFrame(text="Hello!", user_id="", timestamp="now") + ) + await controller.process_frame(VADUserStoppedSpeakingFrame()) + await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1) + + self.assertEqual(events, ["inference_triggered", "stopped"]) + + async def test_deferred_wrapper_skips_stopped(self): + """A deferred() wrapper drops the inner strategy's on_user_turn_stopped event.""" + wrapped = deferred( + SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT) + ) + controller = UserTurnController(user_turn_strategies=UserTurnStrategies(stop=[wrapped])) + + await controller.setup(self.task_manager) + + events: list[str] = [] + + @controller.event_handler("on_user_turn_inference_triggered") + async def on_user_turn_inference_triggered(controller, strategy): + events.append("inference_triggered") + + @controller.event_handler("on_user_turn_stopped") + async def on_user_turn_stopped(controller, strategy, params): + events.append("stopped") + + await controller.process_frame(VADUserStartedSpeakingFrame()) + await controller.process_frame( + TranscriptionFrame(text="Hello!", user_id="", timestamp="now") + ) + await controller.process_frame(VADUserStoppedSpeakingFrame()) + await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1) + + # The inner strategy fires inference-triggered (forwarded by the + # wrapper). Finalization is suppressed, but the controller's + # stop watchdog eventually fires `stopped`. + self.assertEqual(events[0], "inference_triggered") + async def test_user_turn_start_reset(self): controller = UserTurnController( user_turn_strategies=UserTurnStrategies(