diff --git a/changelog/4405.added.2.md b/changelog/4405.added.2.md new file mode 100644 index 000000000..30e86f904 --- /dev/null +++ b/changelog/4405.added.2.md @@ -0,0 +1 @@ +- Added `LLMTurnCompletionUserTurnStopStrategy` in `pipecat.turns.user_stop`. When installed, the strategy gates `on_user_turn_stopped` on a `UserTurnInferenceCompletedFrame` (a new fieldless system frame emitted by any component that can judge turn completeness — e.g. the `UserTurnCompletionLLMServiceMixin` on `✓`). A `finalization_timeout` provides a safety net if no completion frame ever arrives. diff --git a/changelog/4405.added.3.md b/changelog/4405.added.3.md new file mode 100644 index 000000000..8b048a219 --- /dev/null +++ b/changelog/4405.added.3.md @@ -0,0 +1 @@ +- Added `deferred(strategy)` and `DeferredUserTurnStopStrategy` in `pipecat.turns.user_stop`. Wraps a stop strategy so it fires only the inference-triggered event and suppresses `on_user_turn_stopped`, leaving finalization to another strategy in the chain such as `LLMTurnCompletionUserTurnStopStrategy`. diff --git a/changelog/4405.added.4.md b/changelog/4405.added.4.md new file mode 100644 index 000000000..5077f0416 --- /dev/null +++ b/changelog/4405.added.4.md @@ -0,0 +1 @@ +- Added `FilterIncompleteUserTurnStrategies` in `pipecat.turns.user_turn_strategies` — a `UserTurnStrategies` specialization that wraps the detector chain with `deferred(...)` and appends `LLMTurnCompletionUserTurnStopStrategy` as the finalizer. Common case: `user_turn_strategies=FilterIncompleteUserTurnStrategies()`. Pass `config=UserTurnCompletionConfig(...)` to customize timeouts and prompts. diff --git a/changelog/4405.added.5.md b/changelog/4405.added.5.md new file mode 100644 index 000000000..508084e30 --- /dev/null +++ b/changelog/4405.added.5.md @@ -0,0 +1 @@ +- Added `ExternalUserTurnCompletionStopStrategy` in `pipecat.turns.user_stop` — a generic stop strategy that finalizes the user turn whenever a `UserTurnInferenceCompletedFrame` arrives, regardless of which component produced it. `LLMTurnCompletionUserTurnStopStrategy` now extends this base; future producers (Flux, custom end-of-turn classifiers, etc.) can use the base directly or subclass it to add producer-specific setup. diff --git a/changelog/4405.added.md b/changelog/4405.added.md new file mode 100644 index 000000000..0cd8859b0 --- /dev/null +++ b/changelog/4405.added.md @@ -0,0 +1 @@ +- Added `on_user_turn_inference_triggered`, a new event on the user turn controller, processor, aggregator and stop strategies that fires when a strategy has enough signal to start LLM inference. By default it fires together with `on_user_turn_stopped`; a gating strategy can fire only the inference-triggered event and defer finalization to a peer. diff --git a/changelog/4405.deprecated.md b/changelog/4405.deprecated.md new file mode 100644 index 000000000..3040f7bd8 --- /dev/null +++ b/changelog/4405.deprecated.md @@ -0,0 +1 @@ +- Deprecated `LLMUserAggregatorParams.filter_incomplete_user_turns`. Use `user_turn_strategies=FilterIncompleteUserTurnStrategies()` (or add `LLMTurnCompletionUserTurnStopStrategy` to a custom `user_turn_strategies.stop`) instead. Setting the legacy flag still works for one release: the aggregator emits a `DeprecationWarning` and rewires the strategies as if you had passed `FilterIncompleteUserTurnStrategies` directly. diff --git a/changelog/4405.fixed.md b/changelog/4405.fixed.md new file mode 100644 index 000000000..65a9d42bf --- /dev/null +++ b/changelog/4405.fixed.md @@ -0,0 +1 @@ +- Fixed `on_user_turn_stopped` firing prematurely when `filter_incomplete_user_turns` was enabled. The event now fires only after the LLM confirms the user turn is complete (`✓`); previously the smart-turn detector's tentative stop was bubbling up before the LLM had a chance to veto it, causing observers, transcript appenders and UI indicators to receive an early — and sometimes duplicated — signal. diff --git a/examples/turn-management/turn-management-filter-incomplete-turns.py b/examples/turn-management/turn-management-filter-incomplete-turns.py index 81273aaaa..99c0394fb 100644 --- a/examples/turn-management/turn-management-filter-incomplete-turns.py +++ b/examples/turn-management/turn-management-filter-incomplete-turns.py @@ -10,7 +10,7 @@ Demonstrates LLM-based turn completion detection to suppress bot responses when the user was cut off mid-thought. The LLM outputs one of three markers: - ✓ (complete): User finished their thought, respond normally - ○ (incomplete short): User was cut off, wait ~5s then prompt -- ◐ (incomplete long): User needs time to think, wait ~15s then prompt +- ◐ (incomplete long): User needs time to think, wait ~10s then prompt When incomplete is detected, the bot's response is suppressed. After the timeout expires, the LLM is automatically prompted to re-engage the user. @@ -41,6 +41,7 @@ from pipecat.services.openai.llm import OpenAILLMService from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams +from pipecat.turns.user_turn_strategies import FilterIncompleteUserTurnStrategies load_dotenv(override=True) @@ -83,23 +84,28 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ) context = LLMContext() + # `FilterIncompleteUserTurnStrategies` pairs the default detector + # chain with `LLMTurnCompletionUserTurnStopStrategy`: detectors + # trigger LLM inference but the public `on_user_turn_stopped` event + # fires only when the LLM confirms ✓. The LLM marks each response + # with one of: + # ✓ = complete (respond normally) + # ○ = incomplete short (wait 5s, then prompt) + # ◐ = incomplete long (wait 10s, then prompt) user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, user_params=LLMUserAggregatorParams( vad_analyzer=SileroVADAnalyzer(), - # Enable turn completion filtering - the LLM will output: - # ✓ = complete (respond normally) - # ○ = incomplete short (wait 5s, then prompt) - # ◐ = incomplete long (wait 15s, then prompt) - filter_incomplete_user_turns=True, - # Optional: customize turn completion behavior - # turn_completion_config=TurnCompletionConfig( - # incomplete_short_timeout=5.0, - # incomplete_long_timeout=15.0, - # incomplete_short_prompt="Custom prompt...", - # incomplete_long_prompt="Custom prompt...", - # instructions="Custom turn completion instructions...", - # ), + user_turn_strategies=FilterIncompleteUserTurnStrategies( + # Optional: customize turn completion behavior + # config=UserTurnCompletionConfig( + # incomplete_short_timeout=5.0, + # incomplete_long_timeout=10.0, + # incomplete_short_prompt="Custom prompt...", + # incomplete_long_prompt="Custom prompt...", + # instructions="Custom turn completion instructions...", + # ), + ), ), ) diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 24006428d..863da578e 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -339,6 +339,40 @@ class LLMTextFrame(TextFrame): self.includes_inter_frame_spaces = True +@dataclass +class LLMMarkerFrame(DataFrame): + """Sideband marker emitted by an LLM service. + + A marker is short, structured assistant output that should be + persisted in the conversation context but should not flow through + the standard text path (TTS, transcript). The assistant aggregator + writes the marker to the context so the LLM can self-condition on + prior markers on subsequent turns. + + The primary use today is the ``filter_incomplete_user_turns`` + protocol, where ``UserTurnCompletionLLMServiceMixin`` emits the + turn-completion markers ✓ / ○ / ◐ on every response. The frame is + intentionally generic so other components — STT services with + built-in turn signals, end-of-turn classifiers, custom annotations, + etc. — can use the same mechanism to inject sideband signals into + the assistant context. + + Parameters: + marker: The marker payload (typically a short string such as a + single character). + append_to_context_immediately: If True, the marker is written + to the context as its own standalone assistant message as + soon as it's received. If False, the marker is appended to + the running assistant aggregation and flushed to the + context together with the following text as a single + message (e.g. for the ✓ case the context message ends up + as "✓ "). + """ + + marker: str + append_to_context_immediately: bool = True + + @dataclass class AggregatedTextFrame(TextFrame): """Text frame representing an aggregation of TextFrames. @@ -970,6 +1004,24 @@ class UserSpeakingFrame(SystemFrame): pass +@dataclass +class UserTurnInferenceCompletedFrame(SystemFrame): + """Frame indicating that the user turn is semantically complete. + + Emitted by any component that can judge conversational turn + completeness — for example an LLM with turn-completion markers, an + STT service with built-in turn detection, or a dedicated + end-of-turn classifier. Stop strategies that gate the + user-turn-stop event on an external completeness signal (e.g. + ``LLMTurnCompletionUserTurnStopStrategy``) consume this frame to + finalize the turn. Producers should emit this frame only when they + judge the turn complete; an absence of this frame means the turn is + not yet considered complete. + """ + + pass + + @dataclass class VADUserStartedSpeakingFrame(SystemFrame): """Frame emitted when VAD definitively detects user started speaking. diff --git a/src/pipecat/processors/aggregators/llm_response_universal.py b/src/pipecat/processors/aggregators/llm_response_universal.py index d1bc110de..475d470e3 100644 --- a/src/pipecat/processors/aggregators/llm_response_universal.py +++ b/src/pipecat/processors/aggregators/llm_response_universal.py @@ -44,6 +44,7 @@ from pipecat.frames.frames import ( LLMContextSummaryRequestFrame, LLMFullResponseEndFrame, LLMFullResponseStartFrame, + LLMMarkerFrame, LLMMessagesAppendFrame, LLMMessagesTransformFrame, LLMMessagesUpdateFrame, @@ -53,7 +54,6 @@ from pipecat.frames.frames import ( LLMThoughtEndFrame, LLMThoughtStartFrame, LLMThoughtTextFrame, - LLMUpdateSettingsFrame, StartFrame, TextFrame, TranscriptionFrame, @@ -79,14 +79,16 @@ from pipecat.processors.aggregators.llm_context_summarizer import ( SummaryAppliedEvent, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -from pipecat.services.settings import LLMSettings from pipecat.turns.user_idle_controller import UserIdleController from pipecat.turns.user_mute import BaseUserMuteStrategy from pipecat.turns.user_start import BaseUserTurnStartStrategy, UserTurnStartedParams from pipecat.turns.user_stop import BaseUserTurnStopStrategy, UserTurnStoppedParams from pipecat.turns.user_turn_completion_mixin import UserTurnCompletionConfig from pipecat.turns.user_turn_controller import UserTurnController -from pipecat.turns.user_turn_strategies import UserTurnStrategies +from pipecat.turns.user_turn_strategies import ( + FilterIncompleteUserTurnStrategies, + UserTurnStrategies, +) from pipecat.utils.context.llm_context_summarization import ( LLMAutoContextSummarizationConfig, LLMContextSummarizationConfig, @@ -100,25 +102,6 @@ class LLMUserAggregatorParams: """Parameters for configuring LLM user aggregation behavior. Parameters: - user_turn_strategies: User turn start and stop strategies. - user_mute_strategies: List of user mute strategies. - user_turn_stop_timeout: Time in seconds to wait before considering the - user's turn finished. - user_idle_timeout: Timeout in seconds for detecting user idle state. - The aggregator will emit an `on_user_turn_idle` event when the user - has been idle (not speaking) for this duration. Set to 0 to disable - idle detection. - vad_analyzer: Voice Activity Detection analyzer instance. - audio_idle_timeout: Timeout in seconds to force speech stop when - no audio frames are received while in SPEAKING state (e.g. user mutes - mic mid-speech). Set to 0 to disable. Defaults to 1.0. - filter_incomplete_user_turns: Whether to filter out incomplete user turns. - When enabled, the LLM outputs a turn completion marker at the start of - each response: ✓ (complete), ○ (incomplete short), or ◐ (incomplete long). - Incomplete responses are suppressed and timeouts trigger re-prompting. - user_turn_completion_config: Configuration for turn completion behavior including - custom instructions, timeouts, and prompts. Only used when - filter_incomplete_user_turns is True. add_tool_change_messages: When True, on each ``LLMSetToolsFrame`` the aggregator computes the diff against the currently advertised tools and appends a developer-role message to the context describing @@ -131,17 +114,75 @@ class LLMUserAggregatorParams: (LLM-specific) tools are ignored. When using ``LLMContextAggregatorPair``, prefer setting this via its ``add_tool_change_messages`` argument instead. Defaults to False. + audio_idle_timeout: Timeout in seconds to force speech stop when + no audio frames are received while in SPEAKING state (e.g. user mutes + mic mid-speech). Set to 0 to disable. Defaults to 1.0. + user_turn_strategies: User turn start and stop strategies. + user_mute_strategies: List of user mute strategies. + user_turn_stop_timeout: Time in seconds to wait before considering the + user's turn finished. + user_idle_timeout: Timeout in seconds for detecting user idle state. + The aggregator will emit an `on_user_turn_idle` event when the user + has been idle (not speaking) for this duration. Set to 0 to disable + idle detection. + vad_analyzer: Voice Activity Detection analyzer instance. + filter_incomplete_user_turns: [DEPRECATED] Use + ``user_turn_strategies=FilterIncompleteUserTurnStrategies()`` + instead. When enabled, the LLM outputs a turn-completion + marker at the start of each response: ✓ (complete), ○ + (incomplete short), or ◐ (incomplete long). Incomplete + responses are suppressed and timeouts trigger re-prompting. + + .. deprecated:: 1.2.0 + Use ``user_turn_strategies=FilterIncompleteUserTurnStrategies()`` + instead. Will be removed in version 2.0.0. + + user_turn_completion_config: [DEPRECATED] Configuration for turn + completion behavior including custom instructions, timeouts, and + prompts. Only used when filter_incomplete_user_turns is True + (deprecated path) — for the new strategy-based API, pass the config + directly to ``FilterIncompleteUserTurnStrategies(config=...)``. + + .. deprecated:: 1.2.0 + Pass the config directly to + ``FilterIncompleteUserTurnStrategies(config=...)`` instead. + Will be removed in version 2.0.0. """ + add_tool_change_messages: bool = False + audio_idle_timeout: float = 1.0 user_turn_strategies: UserTurnStrategies | None = None user_mute_strategies: list[BaseUserMuteStrategy] = field(default_factory=list) user_turn_stop_timeout: float = 5.0 user_idle_timeout: float = 0 vad_analyzer: VADAnalyzer | None = None - audio_idle_timeout: float = 1.0 filter_incomplete_user_turns: bool = False user_turn_completion_config: UserTurnCompletionConfig | None = None - add_tool_change_messages: bool = False + + def __post_init__(self): + if self.filter_incomplete_user_turns: + warnings.warn( + "LLMUserAggregatorParams.filter_incomplete_user_turns is deprecated. " + "Use user_turn_strategies=FilterIncompleteUserTurnStrategies() instead.", + DeprecationWarning, + stacklevel=2, + ) + if self.user_turn_completion_config: + warnings.warn( + "LLMUserAggregatorParams.user_turn_completion_config is deprecated. " + "Use user_turn_strategies=FilterIncompleteUserTurnStrategies() instead.", + DeprecationWarning, + stacklevel=2, + ) + + if self.user_turn_completion_config is not None: + warnings.warn( + "LLMUserAggregatorParams.user_turn_completion_config is deprecated. " + "Pass the config directly to " + "FilterIncompleteUserTurnStrategies(config=...) instead.", + DeprecationWarning, + stacklevel=2, + ) @dataclass @@ -178,6 +219,11 @@ class LLMAssistantAggregatorParams: # --------------------------------------------------------------------------- # Deprecated field names — kept for backward compatibility. # Use enable_auto_context_summarization and auto_context_summarization_config instead. + # + # .. deprecated:: 1.2.0 + # Use ``enable_auto_context_summarization`` and + # ``auto_context_summarization_config`` instead. Will be removed in + # version 2.0.0. # --------------------------------------------------------------------------- enable_context_summarization: bool | None = None context_summarization_config: LLMContextSummarizationConfig | None = None @@ -541,13 +587,33 @@ class LLMUserAggregator(LLMContextAggregator): self._register_event_handler("on_user_turn_stopped") self._register_event_handler("on_user_turn_stop_timeout") self._register_event_handler("on_user_turn_idle") + self._register_event_handler("on_user_turn_inference_triggered") self._register_event_handler("on_user_mute_started") self._register_event_handler("on_user_mute_stopped") user_turn_strategies = self._params.user_turn_strategies or UserTurnStrategies() + # Deprecated path: translate filter_incomplete_user_turns into + # the equivalent FilterIncompleteUserTurnStrategies wiring. The + # DeprecationWarning is emitted in LLMUserAggregatorParams.__post_init__. + if self._params.filter_incomplete_user_turns: + user_turn_strategies = FilterIncompleteUserTurnStrategies( + start=user_turn_strategies.start, + stop=user_turn_strategies.stop, + config=self._params.user_turn_completion_config, + ) + self._params.user_turn_strategies = user_turn_strategies + self._user_is_muted = False self._user_turn_start_timestamp = "" + # Full transcript across the user turn. Each + # `_on_user_turn_inference_triggered` push captures only the + # new segment since the previous push (push_aggregation resets + # `_aggregation` after writing to context); we accumulate those + # segments here so the eventual `on_user_turn_stopped` event + # surfaces the full turn transcript even when several + # inferences fire before finalization. + self._full_user_turn_aggregation: str | None = None self._user_turn_controller = UserTurnController( user_turn_strategies=user_turn_strategies, @@ -558,6 +624,9 @@ class LLMUserAggregator(LLMContextAggregator): self._user_turn_controller.add_event_handler( "on_user_turn_started", self._on_user_turn_started ) + self._user_turn_controller.add_event_handler( + "on_user_turn_inference_triggered", self._on_user_turn_inference_triggered + ) self._user_turn_controller.add_event_handler( "on_user_turn_stopped", self._on_user_turn_stopped ) @@ -676,21 +745,6 @@ class LLMUserAggregator(LLMContextAggregator): for s in self._params.user_mute_strategies: await s.setup(self.task_manager) - # Enable incomplete turn filtering on the LLM if configured - if self._params.filter_incomplete_user_turns: - # Get config or use defaults - config = self._params.user_turn_completion_config or UserTurnCompletionConfig() - - # Enable the feature on the LLM with config - await self.push_frame( - LLMUpdateSettingsFrame( - delta=LLMSettings( - filter_incomplete_user_turns=True, - user_turn_completion_config=config, - ) - ) - ) - async def _stop(self, frame: EndFrame): await self._maybe_emit_user_turn_stopped(on_session_end=True) await self._cleanup() @@ -830,6 +884,7 @@ class LLMUserAggregator(LLMContextAggregator): logger.debug(f"{self}: User started speaking (strategy: {strategy})") self._user_turn_start_timestamp = time_now_iso8601() + self._full_user_turn_aggregation = None if params.enable_user_speaking_frames: await self.broadcast_frame(UserStartedSpeakingFrame) @@ -841,6 +896,30 @@ class LLMUserAggregator(LLMContextAggregator): await self._call_event_handler("on_user_turn_started", strategy) + async def _on_user_turn_inference_triggered( + self, + controller: UserTurnController, + strategy: BaseUserTurnStopStrategy, + ): + logger.debug(f"{self}: User turn inference triggered (strategy: {strategy})") + + # Push aggregation now: this writes the user message segment to + # the context and emits LLMContextFrame, which kicks LLM + # inference. Concatenate the segment into + # `_full_user_turn_aggregation` so multiple inferences in the + # same turn don't lose earlier segments from the eventual + # `on_user_turn_stopped` event. + segment = await self.push_aggregation() + if segment: + if self._full_user_turn_aggregation: + self._full_user_turn_aggregation = ( + f"{self._full_user_turn_aggregation} {segment}".strip() + ) + else: + self._full_user_turn_aggregation = segment + + await self._call_event_handler("on_user_turn_inference_triggered", strategy) + async def _on_user_turn_stopped( self, controller: UserTurnController, @@ -875,15 +954,29 @@ class LLMUserAggregator(LLMContextAggregator): ): """Maybe emit user turn stopped event. + Earlier inference triggers in the same turn have already pushed + their segments to the context and accumulated them into + ``self._full_user_turn_aggregation``. Any aggregation that + arrived after the last inference trigger is flushed here so + end-of-turn content is never lost from the public event. + Args: strategy: The strategy that triggered the turn stop. on_session_end: If True, only emit if there's unemitted content (avoids duplicate events when session ends). """ - aggregation = await self.push_aggregation() - if not on_session_end or aggregation: + segment = await self.push_aggregation() + full_aggregation = self._full_user_turn_aggregation + self._full_user_turn_aggregation = None + + if segment and full_aggregation: + content = f"{full_aggregation} {segment}".strip() + else: + content = full_aggregation or segment + + if not on_session_end or content: message = UserTurnStoppedMessage( - content=aggregation, timestamp=self._user_turn_start_timestamp + content=content, timestamp=self._user_turn_start_timestamp ) await self._call_event_handler("on_user_turn_stopped", strategy, message) self._user_turn_start_timestamp = "" @@ -1041,6 +1134,8 @@ class LLMAssistantAggregator(LLMContextAggregator): await self._handle_llm_end(frame) elif isinstance(frame, TextFrame): await self._handle_text(frame) + elif isinstance(frame, LLMMarkerFrame): + await self._handle_marker_frame(frame) elif isinstance(frame, LLMThoughtStartFrame): await self._handle_thought_start(frame) elif isinstance(frame, LLMThoughtTextFrame): @@ -1446,6 +1541,31 @@ class LLMAssistantAggregator(LLMContextAggregator): ) ) + async def _handle_marker_frame(self, frame: LLMMarkerFrame): + if frame.append_to_context_immediately: + # Stand-alone marker: write it to the context now as its + # own assistant message. Used when the marker is the entire + # assistant turn — e.g. the ○ / ◐ incomplete-turn signals, + # where the spoken response is suppressed and the marker + # is the only artifact. + self._context.add_message({"role": "assistant", "content": frame.marker}) + await self.push_context_frame() + timestamp_frame = LLMContextAssistantTimestampFrame(timestamp=time_now_iso8601()) + await self.push_frame(timestamp_frame) + return + + # Marker is part of an in-progress assistant response. Append + # it to the running aggregation so `push_aggregation` writes + # marker + text as a single context message — e.g. the ✓ + # complete-turn signal that prefixes the spoken response, + # producing "✓ " in context. Markers are stripped + # from the transcript via + # `_maybe_strip_turn_completion_markers` so consumers see + # clean text. + self._aggregation.append( + TextPartForConcatenation(frame.marker, includes_inter_part_spaces=False) + ) + async def _handle_thought_start(self, frame: LLMThoughtStartFrame): await self._reset_thought_aggregation() self._thought_append_to_context = frame.append_to_context diff --git a/src/pipecat/turns/user_stop/__init__.py b/src/pipecat/turns/user_stop/__init__.py index 7ff676744..14eea8407 100644 --- a/src/pipecat/turns/user_stop/__init__.py +++ b/src/pipecat/turns/user_stop/__init__.py @@ -5,14 +5,21 @@ # from .base_user_turn_stop_strategy import BaseUserTurnStopStrategy, UserTurnStoppedParams +from .deferred_user_turn_stop_strategy import DeferredUserTurnStopStrategy, deferred +from .external_user_turn_completion_stop_strategy import ExternalUserTurnCompletionStopStrategy from .external_user_turn_stop_strategy import ExternalUserTurnStopStrategy +from .llm_turn_completion_user_turn_stop_strategy import LLMTurnCompletionUserTurnStopStrategy from .speech_timeout_user_turn_stop_strategy import SpeechTimeoutUserTurnStopStrategy from .turn_analyzer_user_turn_stop_strategy import TurnAnalyzerUserTurnStopStrategy __all__ = [ "BaseUserTurnStopStrategy", + "DeferredUserTurnStopStrategy", + "ExternalUserTurnCompletionStopStrategy", "ExternalUserTurnStopStrategy", + "LLMTurnCompletionUserTurnStopStrategy", "SpeechTimeoutUserTurnStopStrategy", "UserTurnStoppedParams", "TurnAnalyzerUserTurnStopStrategy", + "deferred", ] diff --git a/src/pipecat/turns/user_stop/base_user_turn_stop_strategy.py b/src/pipecat/turns/user_stop/base_user_turn_stop_strategy.py index e5dc1bd66..6f303282c 100644 --- a/src/pipecat/turns/user_stop/base_user_turn_stop_strategy.py +++ b/src/pipecat/turns/user_stop/base_user_turn_stop_strategy.py @@ -45,7 +45,16 @@ class BaseUserTurnStopStrategy(BaseObject): Events triggered by strategies: - `on_push_frame`: Indicates the strategy wants to push a frame. - - `on_user_turn_stopped`: Signals that the user stopped speaking. + - `on_user_turn_inference_triggered`: Signals that enough evidence + exists to start LLM inference for the current user turn. In most + cases this fires together with `on_user_turn_stopped`. Strategies + that gate finalization on the LLM (e.g. + ``LLMTurnCompletionUserTurnStopStrategy``) fire only this event + upstream and a separate strategy fires `on_user_turn_stopped` once + the LLM confirms the turn is complete. + - `on_user_turn_stopped`: Signals that the user turn is semantically + final. Observers, transcript appenders, and UI indicators should + bind this event. """ @@ -64,6 +73,7 @@ class BaseUserTurnStopStrategy(BaseObject): self._task_manager: BaseTaskManager | None = None self._register_event_handler("on_push_frame", sync=True) self._register_event_handler("on_broadcast_frame", sync=True) + self._register_event_handler("on_user_turn_inference_triggered", sync=True) self._register_event_handler("on_user_turn_stopped", sync=True) @property @@ -123,7 +133,23 @@ class BaseUserTurnStopStrategy(BaseObject): await self._call_event_handler("on_broadcast_frame", frame_cls, **kwargs) async def trigger_user_turn_stopped(self): - """Trigger the `on_user_turn_stopped` event.""" + """Fire both ``on_user_turn_inference_triggered`` and ``on_user_turn_stopped``. + + Most strategies call this when they decide a turn has ended. To + defer finalization to another strategy (so this strategy fires + only the inference-triggered event), wrap this strategy with + :func:`~pipecat.turns.user_stop.deferred` instead of changing + the trigger call. + """ + await self.trigger_user_turn_inference_triggered() + await self.trigger_user_turn_finalized() + + async def trigger_user_turn_inference_triggered(self): + """Trigger only the `on_user_turn_inference_triggered` event.""" + await self._call_event_handler("on_user_turn_inference_triggered") + + async def trigger_user_turn_finalized(self): + """Trigger only the `on_user_turn_stopped` event.""" await self._call_event_handler( "on_user_turn_stopped", UserTurnStoppedParams(enable_user_speaking_frames=self._enable_user_speaking_frames), diff --git a/src/pipecat/turns/user_stop/deferred_user_turn_stop_strategy.py b/src/pipecat/turns/user_stop/deferred_user_turn_stop_strategy.py new file mode 100644 index 000000000..7f3296af3 --- /dev/null +++ b/src/pipecat/turns/user_stop/deferred_user_turn_stop_strategy.py @@ -0,0 +1,104 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Wrapper that defers a stop strategy's finalization to another strategy.""" + +from pipecat.frames.frames import Frame +from pipecat.turns.types import ProcessFrameResult +from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy +from pipecat.utils.asyncio.task_manager import BaseTaskManager + + +class DeferredUserTurnStopStrategy(BaseUserTurnStopStrategy): + """Wraps a stop strategy and suppresses its ``on_user_turn_stopped`` event. + + Event subscriptions added to the wrapper are forwarded directly to + the inner strategy, except for ``on_user_turn_stopped``, which is + dropped. The inner strategy's frame-side and inference-triggered + events therefore reach external listeners (the controller, etc.) + unchanged; finalization is left to another strategy in the chain + such as ``LLMTurnCompletionUserTurnStopStrategy``. + + Use the :func:`deferred` helper for ergonomic construction:: + + stop=[ + deferred(TurnAnalyzerUserTurnStopStrategy(turn_analyzer=...)), + LLMTurnCompletionUserTurnStopStrategy(), + ] + """ + + def __init__(self, inner: BaseUserTurnStopStrategy, **kwargs): + """Initialize the deferred wrapper. + + Args: + inner: The strategy whose finalization should be deferred. + **kwargs: Additional keyword arguments forwarded to the base + class. + """ + super().__init__(**kwargs) + self._inner = inner + + @property + def inner(self) -> BaseUserTurnStopStrategy: + """Return the wrapped strategy.""" + return self._inner + + def add_event_handler(self, event_name: str, handler): + """Forward event subscriptions to the inner strategy. + + ``on_user_turn_stopped`` is silently dropped — that's the whole + point of the wrapper. Every other event handler is attached to + the inner strategy directly, so the inner's events reach the + listener without any per-event proxy method on the wrapper. + """ + if event_name == "on_user_turn_stopped": + return + self._inner.add_event_handler(event_name, handler) + + async def setup(self, task_manager: BaseTaskManager): + """Set up the inner strategy.""" + await super().setup(task_manager) + await self._inner.setup(task_manager) + + async def cleanup(self): + """Clean up the inner strategy.""" + await super().cleanup() + await self._inner.cleanup() + + async def reset(self): + """Reset the inner strategy for a new user turn.""" + await super().reset() + await self._inner.reset() + + async def process_frame(self, frame: Frame) -> ProcessFrameResult | None: + """Forward frame processing to the inner strategy.""" + return await self._inner.process_frame(frame) + + +def deferred(strategy: BaseUserTurnStopStrategy) -> DeferredUserTurnStopStrategy: + """Defer this stop strategy's finalization to another strategy. + + Wraps ``strategy`` in a :class:`DeferredUserTurnStopStrategy`: the + inner strategy continues to drive inference-triggered events, but + its ``on_user_turn_stopped`` event is suppressed. Use when another + strategy in the chain (e.g. + ``LLMTurnCompletionUserTurnStopStrategy``) owns finalization. + + Example:: + + stop=[ + deferred(TurnAnalyzerUserTurnStopStrategy(turn_analyzer=...)), + LLMTurnCompletionUserTurnStopStrategy(), + ] + + Args: + strategy: The stop strategy to defer. + + Returns: + A wrapper that exposes the inner strategy's behavior with + finalization suppressed. + """ + return DeferredUserTurnStopStrategy(strategy) diff --git a/src/pipecat/turns/user_stop/external_user_turn_completion_stop_strategy.py b/src/pipecat/turns/user_stop/external_user_turn_completion_stop_strategy.py new file mode 100644 index 000000000..3850c234c --- /dev/null +++ b/src/pipecat/turns/user_stop/external_user_turn_completion_stop_strategy.py @@ -0,0 +1,47 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""User turn stop strategy that finalizes on ``UserTurnInferenceCompletedFrame``.""" + +from pipecat.frames.frames import Frame, UserTurnInferenceCompletedFrame +from pipecat.turns.types import ProcessFrameResult +from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy + + +class ExternalUserTurnCompletionStopStrategy(BaseUserTurnStopStrategy): + """Finalize the user turn whenever a ``UserTurnInferenceCompletedFrame`` arrives. + + Generic stop strategy for pipelines where some external component + (LLM with completion markers, STT with built-in turn detection, a + dedicated end-of-turn classifier, custom user code, etc.) judges + when a turn is semantically complete and emits + :class:`~pipecat.frames.frames.UserTurnInferenceCompletedFrame`. + + Pair this with one or more ``deferred(...)``-wrapped detector + strategies that drive ``on_user_turn_inference_triggered`` but + leave finalization to this strategy:: + + stop=[ + deferred(TurnAnalyzerUserTurnStopStrategy(turn_analyzer=...)), + ExternalUserTurnCompletionStopStrategy(), + ] + + For LLM-completion-marker gating specifically, use the subclass + :class:`~pipecat.turns.user_stop.LLMTurnCompletionUserTurnStopStrategy` + instead, which additionally pushes the ``LLMUpdateSettingsFrame`` + that enables the marker protocol on the LLM. + + If the producer never emits ``UserTurnInferenceCompletedFrame``, the + controller's ``user_turn_stop_timeout`` watchdog finalizes the + turn after no activity. Tune that timeout if your producer can + take longer than the default to respond. + """ + + async def process_frame(self, frame: Frame) -> ProcessFrameResult: + """Fire ``on_user_turn_stopped`` whenever ``UserTurnInferenceCompletedFrame`` is seen.""" + if isinstance(frame, UserTurnInferenceCompletedFrame): + await self.trigger_user_turn_finalized() + return ProcessFrameResult.CONTINUE diff --git a/src/pipecat/turns/user_stop/llm_turn_completion_user_turn_stop_strategy.py b/src/pipecat/turns/user_stop/llm_turn_completion_user_turn_stop_strategy.py new file mode 100644 index 000000000..0f1a97f20 --- /dev/null +++ b/src/pipecat/turns/user_stop/llm_turn_completion_user_turn_stop_strategy.py @@ -0,0 +1,82 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""User turn stop strategy gated on the LLM's turn-completion verdict.""" + +from pipecat.frames.frames import Frame, LLMUpdateSettingsFrame, StartFrame +from pipecat.services.settings import LLMSettings +from pipecat.turns.types import ProcessFrameResult +from pipecat.turns.user_stop.external_user_turn_completion_stop_strategy import ( + ExternalUserTurnCompletionStopStrategy, +) +from pipecat.turns.user_turn_completion_mixin import UserTurnCompletionConfig + + +class LLMTurnCompletionUserTurnStopStrategy(ExternalUserTurnCompletionStopStrategy): + """LLM-gated stop strategy. + + Extends + :class:`~pipecat.turns.user_stop.ExternalUserTurnCompletionStopStrategy` + with the LLM-specific setup needed for the marker-based completion + protocol: on ``StartFrame``, pushes an ``LLMUpdateSettingsFrame`` + upstream that enables ``filter_incomplete_user_turns`` on the LLM + and seeds the + :class:`~pipecat.turns.user_turn_completion_mixin.UserTurnCompletionConfig`. + + Finalization itself is inherited: when the LLM service's + :class:`~pipecat.turns.user_turn_completion_mixin.UserTurnCompletionLLMServiceMixin` + detects a ``✓`` marker, it broadcasts a + :class:`~pipecat.frames.frames.UserTurnInferenceCompletedFrame` and the + base class fires ``on_user_turn_stopped``. On + ``incomplete_short`` / ``incomplete_long`` markers the mixin + re-prompts internally and no completion frame is emitted, so the + public stop event stays deferred. + + Install alongside one or more ``deferred(...)``-wrapped detector + strategies that drive ``on_user_turn_inference_triggered`` but + leave finalization to this strategy. The aggregator's deprecation + path for ``filter_incomplete_user_turns`` does this rewiring + automatically. + """ + + def __init__( + self, + *, + config: UserTurnCompletionConfig | None = None, + **kwargs, + ): + """Initialize the LLM turn-completion stop strategy. + + Args: + config: Configuration applied to the LLM via the + ``filter_incomplete_user_turns`` setting on + ``StartFrame``. Defaults to ``UserTurnCompletionConfig()``. + **kwargs: Additional keyword arguments forwarded to the base + class. + """ + super().__init__(**kwargs) + self._config = config or UserTurnCompletionConfig() + + @property + def config(self) -> UserTurnCompletionConfig: + """Return the configured ``UserTurnCompletionConfig``.""" + return self._config + + async def process_frame(self, frame: Frame) -> ProcessFrameResult: + """Configure the LLM on start and delegate completion handling to the base.""" + if isinstance(frame, StartFrame): + await self._configure_llm() + return await super().process_frame(frame) + + async def _configure_llm(self): + await self.push_frame( + LLMUpdateSettingsFrame( + delta=LLMSettings( + filter_incomplete_user_turns=True, + user_turn_completion_config=self._config, + ) + ) + ) diff --git a/src/pipecat/turns/user_turn_completion_mixin.py b/src/pipecat/turns/user_turn_completion_mixin.py index 51d6c7828..c423e16bf 100644 --- a/src/pipecat/turns/user_turn_completion_mixin.py +++ b/src/pipecat/turns/user_turn_completion_mixin.py @@ -22,9 +22,11 @@ from pipecat.frames.frames import ( Frame, InterruptionFrame, LLMFullResponseEndFrame, + LLMMarkerFrame, LLMMessagesAppendFrame, LLMRunFrame, LLMTextFrame, + UserTurnInferenceCompletedFrame, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor @@ -185,8 +187,8 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor): It processes turn completion markers to enable smarter conversation flow: - ✓ (COMPLETE): Push response normally - - ○ (INCOMPLETE SHORT): Suppress response, wait ~5s, then prompt - - ◐ (INCOMPLETE LONG): Suppress response, wait ~15s, then prompt + - ○ (INCOMPLETE SHORT): Suppress response, wait 5s, then prompt + - ◐ (INCOMPLETE LONG): Suppress response, wait 10s, then prompt When incomplete timeouts expire, the mixin automatically prompts the LLM with a contextual follow-up message to re-engage the user. @@ -279,7 +281,7 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor): await asyncio.sleep(timeout) # Timeout expired - reset state before prompting LLM - logger.info(f"Incomplete {incomplete_type} timeout expired, prompting LLM") + logger.debug(f"Incomplete {incomplete_type} timeout expired, prompting LLM") await self._turn_reset() self._incomplete_timeout_task = None self._incomplete_type = None @@ -402,11 +404,15 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor): ) self._turn_suppressed = True - # Push the marker with skip_tts=True so it's added to context (maintains - # conversation continuity per prompt instructions) but not spoken by TTS - frame = LLMTextFrame(self._turn_text_buffer) - frame.skip_tts = True - await self.push_frame(frame) + # No UserTurnInferenceCompletedFrame is broadcast here: the turn is + # explicitly not complete. The re-prompt path is driven by + # this mixin's own timeout. + + # Persist the marker to context as a stand-alone assistant + # message via LLMMarkerFrame: the bot produces no spoken + # output for incomplete turns, so the marker is the entire + # context entry. + await self.push_frame(LLMMarkerFrame(marker)) self._turn_text_buffer = "" await self._start_incomplete_timeout(incomplete_type) @@ -416,17 +422,25 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor): if USER_TURN_COMPLETE_MARKER in self._turn_text_buffer: logger.debug(f"COMPLETE ({USER_TURN_COMPLETE_MARKER}) detected, pushing buffered text") + # Broadcast that the user turn is complete so a stop strategy + # gating finalization on this signal (e.g. + # LLMTurnCompletionUserTurnStopStrategy) can fire + # `on_user_turn_stopped`. Must fire before the marker so + # downstream consumers see the signal before the response. + await self.broadcast_frame(UserTurnInferenceCompletedFrame) + + # Push the marker as a sideband signal that the assistant + # aggregator will prepend to the upcoming aggregated text, + # so the context message ends up as "✓ ". + await self.push_frame( + LLMMarkerFrame(USER_TURN_COMPLETE_MARKER, append_to_context_immediately=False) + ) + # Split buffer at the marker to handle cases where marker and text # arrive in the same chunk (e.g., "✓ Hello!" from some LLMs) marker_pos = self._turn_text_buffer.index(USER_TURN_COMPLETE_MARKER) marker_end = marker_pos + len(USER_TURN_COMPLETE_MARKER) - # Push the marker with skip_tts=True - adds to context but not spoken - marker_text = self._turn_text_buffer[:marker_end] - frame = LLMTextFrame(marker_text) - frame.skip_tts = True - await self.push_frame(frame) - # Push remaining text after marker as normal speech remaining_text = self._turn_text_buffer[marker_end:] if remaining_text: diff --git a/src/pipecat/turns/user_turn_controller.py b/src/pipecat/turns/user_turn_controller.py index 2ba4e304b..974a8ff0f 100644 --- a/src/pipecat/turns/user_turn_controller.py +++ b/src/pipecat/turns/user_turn_controller.py @@ -38,7 +38,11 @@ class UserTurnController(BaseObject): Event handlers available: - on_user_turn_started: Emitted when a user turn starts. - - on_user_turn_stopped: Emitted when a user turn stops. + - on_user_turn_inference_triggered: Emitted when enough signal exists to + start LLM inference. Fires together with `on_user_turn_stopped` for + most strategies; fires alone when a downstream strategy gates + finalization on the LLM's verdict. + - on_user_turn_stopped: Emitted when a user turn is semantically final. - on_user_turn_stop_timeout: Emitted if no stop strategy triggers before timeout. - on_push_frame: Emitted when a strategy wants to push a frame. - on_broadcast_frame: Emitted when a strategy wants to broadcast a frame. @@ -49,6 +53,10 @@ class UserTurnController(BaseObject): async def on_user_turn_started(controller, strategy: BaseUserTurnStartStrategy, params: UserTurnStartedParams): ... + @controller.event_handler("on_user_turn_inference_triggered") + async def on_user_turn_inference_triggered(controller, strategy: BaseUserTurnStopStrategy): + ... + @controller.event_handler("on_user_turn_stopped") async def on_user_turn_stopped(controller, strategy: BaseUserTurnStopStrategy, params: UserTurnStoppedParams): ... @@ -95,6 +103,7 @@ class UserTurnController(BaseObject): self._register_event_handler("on_push_frame", sync=True) self._register_event_handler("on_broadcast_frame", sync=True) self._register_event_handler("on_user_turn_started", sync=True) + self._register_event_handler("on_user_turn_inference_triggered", sync=True) self._register_event_handler("on_user_turn_stopped", sync=True) self._register_event_handler("on_user_turn_stop_timeout", sync=True) self._register_event_handler("on_reset_aggregation", sync=True) @@ -186,6 +195,9 @@ class UserTurnController(BaseObject): await s.setup(self.task_manager) s.add_event_handler("on_push_frame", self._on_push_frame) s.add_event_handler("on_broadcast_frame", self._on_broadcast_frame) + s.add_event_handler( + "on_user_turn_inference_triggered", self._on_user_turn_inference_triggered + ) s.add_event_handler("on_user_turn_stopped", self._on_user_turn_stopped) async def _cleanup_strategies(self): @@ -246,6 +258,9 @@ class UserTurnController(BaseObject): ): await self._trigger_user_turn_start(strategy, params) + async def _on_user_turn_inference_triggered(self, strategy: BaseUserTurnStopStrategy): + await self._trigger_user_turn_inference_triggered(strategy) + async def _on_user_turn_stopped( self, strategy: BaseUserTurnStopStrategy, params: UserTurnStoppedParams ): @@ -274,6 +289,20 @@ class UserTurnController(BaseObject): await self._call_event_handler("on_user_turn_started", strategy, params) + async def _trigger_user_turn_inference_triggered( + self, strategy: BaseUserTurnStopStrategy | None + ): + # Inference-triggered fires only while a turn is active. The turn + # remains active afterward — only `on_user_turn_stopped` flips state. + if not self._user_turn: + return + + # Re-arm the stop watchdog so a stuck turn (inference fired but + # finalization never arrives) still times out and finalizes. + self._user_turn_stop_timeout_event.set() + + await self._call_event_handler("on_user_turn_inference_triggered", strategy) + async def _trigger_user_turn_stop( self, strategy: BaseUserTurnStopStrategy | None, params: UserTurnStoppedParams ): diff --git a/src/pipecat/turns/user_turn_processor.py b/src/pipecat/turns/user_turn_processor.py index 536d297b5..54fe533e0 100644 --- a/src/pipecat/turns/user_turn_processor.py +++ b/src/pipecat/turns/user_turn_processor.py @@ -35,7 +35,11 @@ class UserTurnProcessor(FrameProcessor): Event handlers available: - on_user_turn_started: Emitted when a user turn starts. - - on_user_turn_stopped: Emitted when a user turn stops. + - on_user_turn_inference_triggered: Emitted when enough signal exists to + start LLM inference. Fires together with `on_user_turn_stopped` for + most strategies; fires alone when a downstream strategy gates + finalization on the LLM's verdict. + - on_user_turn_stopped: Emitted when a user turn is semantically final. - on_user_turn_stop_timeout: Emitted if no stop strategy triggers before timeout. - on_user_turn_idle: Emitted when the user has been idle for the configured timeout. @@ -45,6 +49,10 @@ class UserTurnProcessor(FrameProcessor): async def on_user_turn_started(processor, strategy: BaseUserTurnStartStrategy): ... + @processor.event_handler("on_user_turn_inference_triggered") + async def on_user_turn_inference_triggered(processor, strategy: BaseUserTurnStopStrategy): + ... + @processor.event_handler("on_user_turn_stopped") async def on_user_turn_stopped(processor, strategy: BaseUserTurnStopStrategy): ... @@ -85,6 +93,7 @@ class UserTurnProcessor(FrameProcessor): self._register_event_handler("on_user_turn_stopped") self._register_event_handler("on_user_turn_stop_timeout") self._register_event_handler("on_user_turn_idle") + self._register_event_handler("on_user_turn_inference_triggered") self._user_turn_controller = UserTurnController( user_turn_strategies=user_turn_strategies or UserTurnStrategies(), @@ -101,6 +110,9 @@ class UserTurnProcessor(FrameProcessor): self._user_turn_controller.add_event_handler( "on_user_turn_stop_timeout", self._on_user_turn_stop_timeout ) + self._user_turn_controller.add_event_handler( + "on_user_turn_inference_triggered", self._on_user_turn_inference_triggered + ) self._user_idle_controller = UserIdleController(user_idle_timeout=user_idle_timeout) self._user_idle_controller.add_event_handler("on_user_turn_idle", self._on_user_turn_idle) @@ -204,3 +216,11 @@ class UserTurnProcessor(FrameProcessor): async def _on_user_turn_idle(self, controller): await self._call_event_handler("on_user_turn_idle") + + async def _on_user_turn_inference_triggered( + self, + controller: UserTurnController, + strategy: BaseUserTurnStopStrategy, + ): + logger.debug(f"{self}: User turn inference triggered (strategy: {strategy})") + await self._call_event_handler("on_user_turn_inference_triggered", strategy) diff --git a/src/pipecat/turns/user_turn_strategies.py b/src/pipecat/turns/user_turn_strategies.py index 8bbe87a7a..613722804 100644 --- a/src/pipecat/turns/user_turn_strategies.py +++ b/src/pipecat/turns/user_turn_strategies.py @@ -17,8 +17,11 @@ from pipecat.turns.user_start import ( from pipecat.turns.user_stop import ( BaseUserTurnStopStrategy, ExternalUserTurnStopStrategy, + LLMTurnCompletionUserTurnStopStrategy, TurnAnalyzerUserTurnStopStrategy, + deferred, ) +from pipecat.turns.user_turn_completion_mixin import UserTurnCompletionConfig def default_user_turn_start_strategies() -> list[BaseUserTurnStartStrategy]: @@ -95,3 +98,57 @@ class ExternalUserTurnStrategies(UserTurnStrategies): def __post_init__(self): self.start = [ExternalUserTurnStartStrategy()] self.stop = [ExternalUserTurnStopStrategy()] + + +@dataclass +class FilterIncompleteUserTurnStrategies(UserTurnStrategies): + """Stop strategies gated on the LLM's turn-completion verdict. + + The LLM is asked to begin every response with one of three markers: + ✓ (complete), ○ (incomplete short), or ◐ (incomplete long). Only ✓ + finalizes the user turn; ○ / ◐ keep the turn open so the user can + continue speaking and the LLM can re-evaluate later. + + Configuring strategies this way preserves the existing detector + chain (defaults or user-supplied) for inference triggering and + appends :class:`~pipecat.turns.user_stop.LLMTurnCompletionUserTurnStopStrategy` + as the finalizer. The detector strategies are wrapped with + :func:`~pipecat.turns.user_stop.deferred` automatically so they fire + only ``on_user_turn_inference_triggered`` and leave finalization to + the LLM gate. + + Parameters: + config: Optional configuration applied to the LLM via the + ``filter_incomplete_user_turns`` setting. Customizes the + turn-completion instructions, incomplete-turn timeouts, and + re-prompts. If None, defaults from + :class:`~pipecat.turns.user_turn_completion_mixin.UserTurnCompletionConfig` + are used. + + Example:: + + user_turn_strategies=FilterIncompleteUserTurnStrategies() + + # Custom detector chain: + user_turn_strategies=FilterIncompleteUserTurnStrategies( + stop=[SpeechTimeoutUserTurnStopStrategy(...)], + ) + + # Custom completion config: + user_turn_strategies=FilterIncompleteUserTurnStrategies( + config=UserTurnCompletionConfig( + incomplete_short_timeout=5.0, + incomplete_long_timeout=10.0, + ), + ) + """ + + config: UserTurnCompletionConfig | None = None + + def __post_init__(self): + super().__post_init__() + # Defer the detector chain so it only fires inference-triggered, + # then append the LLM gate as the sole finalizer. + gated: list[BaseUserTurnStopStrategy] = [deferred(s) for s in self.stop or []] + gated.append(LLMTurnCompletionUserTurnStopStrategy(config=self.config)) + self.stop = gated diff --git a/tests/test_context_aggregators_universal.py b/tests/test_context_aggregators_universal.py index cefc1b200..a8ecace9d 100644 --- a/tests/test_context_aggregators_universal.py +++ b/tests/test_context_aggregators_universal.py @@ -64,7 +64,10 @@ from pipecat.turns.user_mute import ( MuteUntilFirstBotCompleteUserMuteStrategy, ) from pipecat.turns.user_stop import SpeechTimeoutUserTurnStopStrategy -from pipecat.turns.user_turn_strategies import UserTurnStrategies +from pipecat.turns.user_turn_strategies import ( + FilterIncompleteUserTurnStrategies, + UserTurnStrategies, +) from pipecat.utils.text.base_text_aggregator import AggregationType USER_TURN_STOP_TIMEOUT = 0.2 @@ -180,7 +183,9 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase): async def test_llm_messages_update_does_not_inject_turn_completion_into_context(self): context = LLMContext() - params = LLMUserAggregatorParams(filter_incomplete_user_turns=True) + params = LLMUserAggregatorParams( + user_turn_strategies=FilterIncompleteUserTurnStrategies(), + ) pipeline = Pipeline([LLMUserAggregator(context, params=params)]) new_messages = [ @@ -291,8 +296,8 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase): UserStartedSpeakingFrame, InterruptionFrame, VADUserStoppedSpeakingFrame, - UserStoppedSpeakingFrame, LLMContextFrame, + UserStoppedSpeakingFrame, ] await run_test( pipeline, @@ -563,6 +568,201 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase): expected_down_frames=[SpeechControlParamsFrame], ) + async def test_inference_triggered_event_fires_on_default_strategies(self): + """Default flow fires inference-triggered before stopped, both with the same strategy.""" + from pipecat.frames.frames import UserTurnInferenceCompletedFrame # noqa: F401 + + context = LLMContext() + user_aggregator = LLMUserAggregator( + context, + params=LLMUserAggregatorParams( + user_turn_strategies=UserTurnStrategies( + stop=[ + SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT) + ] + ), + ), + ) + + events: list[str] = [] + + @user_aggregator.event_handler("on_user_turn_inference_triggered") + async def on_inference_triggered(aggregator, strategy): + events.append("inference_triggered") + + @user_aggregator.event_handler("on_user_turn_stopped") + async def on_stopped(aggregator, strategy, message): + events.append(f"stopped:{message.content}") + + pipeline = Pipeline([user_aggregator]) + frames_to_send = [ + VADUserStartedSpeakingFrame(), + TranscriptionFrame(text="Hi!", user_id="", timestamp="now"), + SleepFrame(), + VADUserStoppedSpeakingFrame(), + SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.1), + ] + await run_test(pipeline, frames_to_send=frames_to_send) + + self.assertEqual(events, ["inference_triggered", "stopped:Hi!"]) + + async def test_filter_incomplete_user_turns_emits_deprecation_warning(self): + """Setting the legacy flag emits a DeprecationWarning.""" + import warnings + + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + LLMUserAggregatorParams(filter_incomplete_user_turns=True) + matched = [ + x + for x in w + if issubclass(x.category, DeprecationWarning) + and "filter_incomplete_user_turns" in str(x.message) + ] + self.assertTrue(matched, "expected a DeprecationWarning") + + async def test_filter_incomplete_user_turns_installs_strategy(self): + """Legacy flag wraps existing stops with deferred() and appends the LLM strategy.""" + import warnings + + from pipecat.turns.user_stop import ( + DeferredUserTurnStopStrategy, + LLMTurnCompletionUserTurnStopStrategy, + SpeechTimeoutUserTurnStopStrategy, + ) + + existing = SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT) + + context = LLMContext() + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + params = LLMUserAggregatorParams( + filter_incomplete_user_turns=True, + user_turn_strategies=UserTurnStrategies(stop=[existing]), + ) + aggregator = LLMUserAggregator(context, params=params) + + stop_strategies = aggregator._params.user_turn_strategies.stop + self.assertEqual(len(stop_strategies), 2) + self.assertIsInstance(stop_strategies[0], DeferredUserTurnStopStrategy) + self.assertIs(stop_strategies[0].inner, existing) + self.assertIsInstance(stop_strategies[1], LLMTurnCompletionUserTurnStopStrategy) + + async def test_llm_completion_strategy_finalizes_on_complete_marker(self): + """LLMTurnCompletionUserTurnStopStrategy finalizes only on UserTurnInferenceCompletedFrame(complete).""" + from pipecat.frames.frames import UserTurnInferenceCompletedFrame + from pipecat.turns.user_stop import LLMTurnCompletionUserTurnStopStrategy, deferred + + gating = LLMTurnCompletionUserTurnStopStrategy() + upstream = deferred( + SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT) + ) + context = LLMContext() + user_aggregator = LLMUserAggregator( + context, + params=LLMUserAggregatorParams( + user_turn_strategies=UserTurnStrategies(stop=[upstream, gating]), + ), + ) + + events: list[str] = [] + + @user_aggregator.event_handler("on_user_turn_inference_triggered") + async def on_inference_triggered(aggregator, strategy): + events.append("inference_triggered") + + @user_aggregator.event_handler("on_user_turn_stopped") + async def on_stopped(aggregator, strategy, message): + events.append("stopped") + + pipeline = Pipeline([user_aggregator]) + + # Drive the pipeline. Inference fires after the upstream + # strategy's timeout. Stop fires only when UserTurnInferenceCompletedFrame + # arrives (producer absence == "not yet complete"). + frames_to_send = [ + VADUserStartedSpeakingFrame(), + TranscriptionFrame(text="Hi", user_id="", timestamp="now"), + SleepFrame(), + VADUserStoppedSpeakingFrame(), + SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.1), + # At this point inference_triggered should have fired but NOT stopped. + UserTurnInferenceCompletedFrame(), + SleepFrame(), + ] + await run_test(pipeline, frames_to_send=frames_to_send) + + self.assertEqual(events, ["inference_triggered", "stopped"]) + + async def test_multiple_inferences_in_one_turn_preserve_aggregation(self): + """Two inference triggers before finalization should preserve the full user transcript. + + When the LLM marks the first inference incomplete (○ / ◐) and the + user keeps speaking, the deferred upstream strategy fires a + second inference. Both the public ``on_user_turn_stopped`` event + and the conversation context should reflect the full user + utterance, not just the segment from the last inference. + """ + from pipecat.frames.frames import UserTurnInferenceCompletedFrame + from pipecat.turns.user_stop import LLMTurnCompletionUserTurnStopStrategy, deferred + + gating = LLMTurnCompletionUserTurnStopStrategy() + upstream = deferred( + SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT) + ) + context = LLMContext() + user_aggregator = LLMUserAggregator( + context, + params=LLMUserAggregatorParams( + user_turn_strategies=UserTurnStrategies(stop=[upstream, gating]), + ), + ) + + inference_count = 0 + stop_message = None + + @user_aggregator.event_handler("on_user_turn_inference_triggered") + async def on_inference_triggered(aggregator, strategy): + nonlocal inference_count + inference_count += 1 + + @user_aggregator.event_handler("on_user_turn_stopped") + async def on_stopped(aggregator, strategy, message): + nonlocal stop_message + stop_message = message + + pipeline = Pipeline([user_aggregator]) + + frames_to_send = [ + VADUserStartedSpeakingFrame(), + TranscriptionFrame(text="I'm thinking", user_id="", timestamp="now"), + SleepFrame(), + VADUserStoppedSpeakingFrame(), + SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.1), + # First inference fired here. Imagine the LLM returned ○; + # the turn is not yet finalized, so the user keeps talking. + VADUserStartedSpeakingFrame(), + TranscriptionFrame(text="about pizza", user_id="", timestamp="now"), + SleepFrame(), + VADUserStoppedSpeakingFrame(), + SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.1), + # Second inference fired here. Now the LLM returns ✓ and the + # turn finalizes via UserTurnInferenceCompletedFrame. + UserTurnInferenceCompletedFrame(), + SleepFrame(), + ] + await run_test(pipeline, frames_to_send=frames_to_send) + + self.assertEqual(inference_count, 2) + self.assertIsNotNone(stop_message) + # The public event should report the full transcript, even + # though each inference push only writes its own segment to + # the context. + self.assertEqual(stop_message.content, "I'm thinking about pizza") + + user_messages = [m for m in context.get_messages() if m.get("role") == "user"] + self.assertEqual([m["content"] for m in user_messages], ["I'm thinking", "about pizza"]) + class TestLLMAssistantAggregator(unittest.IsolatedAsyncioTestCase): async def test_empty(self): diff --git a/tests/test_user_turn_completion_mixin.py b/tests/test_user_turn_completion_mixin.py index e503ecbf2..5a4f0c640 100644 --- a/tests/test_user_turn_completion_mixin.py +++ b/tests/test_user_turn_completion_mixin.py @@ -8,7 +8,12 @@ import unittest import unittest.mock from unittest.mock import AsyncMock -from pipecat.frames.frames import LLMFullResponseEndFrame, LLMTextFrame +from pipecat.frames.frames import ( + LLMFullResponseEndFrame, + LLMMarkerFrame, + LLMTextFrame, + UserTurnInferenceCompletedFrame, +) from pipecat.processors.frame_processor import FrameProcessor from pipecat.services.llm_service import LLMService from pipecat.services.settings import LLMSettings @@ -44,21 +49,24 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase # Simulate LLM generating: "✓ Hello there!" await processor._push_turn_text(f"{USER_TURN_COMPLETE_MARKER} Hello there!") - # Should have 2 text frames: marker (skip_tts) and content (normal) - self.assertEqual(len(pushed_frames), 2) + # The marker rides as LLMMarkerFrame(append_to_context_immediately=False); + # only the spoken text is pushed as an LLMTextFrame. + text_frames = [f for f in pushed_frames if isinstance(f, LLMTextFrame)] + self.assertEqual(len(text_frames), 1) + self.assertEqual(text_frames[0].text, "Hello there!") + self.assertFalse(text_frames[0].skip_tts) - # First frame should be the marker with skip_tts=True - self.assertIsInstance(pushed_frames[0], LLMTextFrame) - self.assertEqual(pushed_frames[0].text, USER_TURN_COMPLETE_MARKER) - self.assertTrue(pushed_frames[0].skip_tts) + marker_frames = [f for f in pushed_frames if isinstance(f, LLMMarkerFrame)] + self.assertEqual(len(marker_frames), 1) + self.assertEqual(marker_frames[0].marker, USER_TURN_COMPLETE_MARKER) + self.assertFalse(marker_frames[0].append_to_context_immediately) - # Second frame should be the actual text without skip_tts - self.assertIsInstance(pushed_frames[1], LLMTextFrame) - self.assertEqual(pushed_frames[1].text, "Hello there!") - self.assertFalse(pushed_frames[1].skip_tts) + # UserTurnInferenceCompletedFrame broadcast in both directions. + completed = [f for f in pushed_frames if isinstance(f, UserTurnInferenceCompletedFrame)] + self.assertEqual(len(completed), 2) async def test_incomplete_short_marker_suppresses_text(self): - """Test that ○ marker suppresses text with skip_tts.""" + """Test that ○ marker suppresses text and is emitted as a stand-alone marker frame.""" processor = MockProcessor() pushed_frames = [] @@ -70,14 +78,21 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase await processor._push_turn_text(USER_TURN_INCOMPLETE_SHORT_MARKER) - # Should have 1 text frame with skip_tts=True - self.assertEqual(len(pushed_frames), 1) - self.assertIsInstance(pushed_frames[0], LLMTextFrame) - self.assertEqual(pushed_frames[0].text, USER_TURN_INCOMPLETE_SHORT_MARKER) - self.assertTrue(pushed_frames[0].skip_tts) + # No LLMTextFrame: response is suppressed. + text_frames = [f for f in pushed_frames if isinstance(f, LLMTextFrame)] + self.assertEqual(len(text_frames), 0) + + marker_frames = [f for f in pushed_frames if isinstance(f, LLMMarkerFrame)] + self.assertEqual(len(marker_frames), 1) + self.assertEqual(marker_frames[0].marker, USER_TURN_INCOMPLETE_SHORT_MARKER) + self.assertTrue(marker_frames[0].append_to_context_immediately) + + # Incomplete markers do not emit UserTurnInferenceCompletedFrame. + completed = [f for f in pushed_frames if isinstance(f, UserTurnInferenceCompletedFrame)] + self.assertEqual(len(completed), 0) async def test_incomplete_long_marker_suppresses_text(self): - """Test that ◐ marker suppresses text with skip_tts.""" + """Test that ◐ marker suppresses text and is emitted as a stand-alone marker frame.""" processor = MockProcessor() pushed_frames = [] @@ -89,11 +104,16 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase await processor._push_turn_text(USER_TURN_INCOMPLETE_LONG_MARKER) - # Should have 1 text frame with skip_tts=True - self.assertEqual(len(pushed_frames), 1) - self.assertIsInstance(pushed_frames[0], LLMTextFrame) - self.assertEqual(pushed_frames[0].text, USER_TURN_INCOMPLETE_LONG_MARKER) - self.assertTrue(pushed_frames[0].skip_tts) + text_frames = [f for f in pushed_frames if isinstance(f, LLMTextFrame)] + self.assertEqual(len(text_frames), 0) + + marker_frames = [f for f in pushed_frames if isinstance(f, LLMMarkerFrame)] + self.assertEqual(len(marker_frames), 1) + self.assertEqual(marker_frames[0].marker, USER_TURN_INCOMPLETE_LONG_MARKER) + self.assertTrue(marker_frames[0].append_to_context_immediately) + + completed = [f for f in pushed_frames if isinstance(f, UserTurnInferenceCompletedFrame)] + self.assertEqual(len(completed), 0) async def test_text_buffered_until_marker_found(self): """Test that text is buffered until a marker is detected.""" @@ -114,8 +134,12 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase # Now send the complete marker await processor._push_turn_text(f" {USER_TURN_COMPLETE_MARKER} How are you?") - # Now frames should be pushed - self.assertEqual(len(pushed_frames), 2) + # One LLMTextFrame for the spoken portion; one LLMMarkerFrame for + # the marker; UserTurnInferenceCompletedFrame broadcast in both directions. + text_frames = [f for f in pushed_frames if isinstance(f, LLMTextFrame)] + self.assertEqual(len(text_frames), 1) + marker_frames = [f for f in pushed_frames if isinstance(f, LLMMarkerFrame)] + self.assertEqual(len(marker_frames), 1) async def test_turn_state_reset_after_llm_full_response_end_frame(self): """Test that _turn_complete_found is reset when LLMFullResponseEndFrame is pushed.""" diff --git a/tests/test_user_turn_controller.py b/tests/test_user_turn_controller.py index 2883d39bd..8365d48df 100644 --- a/tests/test_user_turn_controller.py +++ b/tests/test_user_turn_controller.py @@ -19,7 +19,7 @@ from pipecat.turns.user_start import VADUserTurnStartStrategy from pipecat.turns.user_start.min_words_user_turn_start_strategy import ( MinWordsUserTurnStartStrategy, ) -from pipecat.turns.user_stop import SpeechTimeoutUserTurnStopStrategy +from pipecat.turns.user_stop import SpeechTimeoutUserTurnStopStrategy, deferred from pipecat.turns.user_turn_controller import UserTurnController from pipecat.turns.user_turn_strategies import ExternalUserTurnStrategies, UserTurnStrategies from pipecat.utils.asyncio.task_manager import TaskManager, TaskManagerParams @@ -71,6 +71,66 @@ class TestUserTurnController(unittest.IsolatedAsyncioTestCase): await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1) self.assertTrue(should_stop) + async def test_inference_triggered_fires_alongside_stopped(self): + """Default strategies fire both inference-triggered and stopped, in order.""" + controller = UserTurnController( + user_turn_strategies=UserTurnStrategies( + stop=[SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT)], + ) + ) + + await controller.setup(self.task_manager) + + events: list[str] = [] + + @controller.event_handler("on_user_turn_inference_triggered") + async def on_user_turn_inference_triggered(controller, strategy): + events.append("inference_triggered") + + @controller.event_handler("on_user_turn_stopped") + async def on_user_turn_stopped(controller, strategy, params): + events.append("stopped") + + await controller.process_frame(VADUserStartedSpeakingFrame()) + await controller.process_frame( + TranscriptionFrame(text="Hello!", user_id="", timestamp="now") + ) + await controller.process_frame(VADUserStoppedSpeakingFrame()) + await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1) + + self.assertEqual(events, ["inference_triggered", "stopped"]) + + async def test_deferred_wrapper_skips_stopped(self): + """A deferred() wrapper drops the inner strategy's on_user_turn_stopped event.""" + wrapped = deferred( + SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT) + ) + controller = UserTurnController(user_turn_strategies=UserTurnStrategies(stop=[wrapped])) + + await controller.setup(self.task_manager) + + events: list[str] = [] + + @controller.event_handler("on_user_turn_inference_triggered") + async def on_user_turn_inference_triggered(controller, strategy): + events.append("inference_triggered") + + @controller.event_handler("on_user_turn_stopped") + async def on_user_turn_stopped(controller, strategy, params): + events.append("stopped") + + await controller.process_frame(VADUserStartedSpeakingFrame()) + await controller.process_frame( + TranscriptionFrame(text="Hello!", user_id="", timestamp="now") + ) + await controller.process_frame(VADUserStoppedSpeakingFrame()) + await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1) + + # The inner strategy fires inference-triggered (forwarded by the + # wrapper). Finalization is suppressed, but the controller's + # stop watchdog eventually fires `stopped`. + self.assertEqual(events[0], "inference_triggered") + async def test_user_turn_start_reset(self): controller = UserTurnController( user_turn_strategies=UserTurnStrategies(