From 480eca42f55adfc539026a8e1dd60b50b1d558c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 1 May 2026 14:53:53 -0700 Subject: [PATCH 1/9] Split user-turn-stop into inference-triggered and finalized events MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes a real bug: with `filter_incomplete_user_turns` enabled, the smart-turn detector's tentative stop was firing `on_user_turn_stopped` before the LLM had a chance to veto it. Observers, transcript appenders and UI indicators received an early — and sometimes duplicated — signal. Decomposes the single stop concern into two events: - `on_user_turn_inference_triggered` fires when a stop strategy has enough signal to start LLM inference. The aggregator pushes the context here, kicking off the LLM call. - `on_user_turn_stopped` fires only when the user turn is semantically final. Built-in strategies fire both events at the same call site, preserving today's behavior for the common case. Adds `LLMTurnCompletionUserTurnStopStrategy`, which gates finalization on a `UserTurnCompletedFrame` (a fieldless system frame emitted by any component judging turn completeness — currently the `UserTurnCompletionLLMServiceMixin` on `✓`). Adds `deferred(strategy)` / `DeferredUserTurnStopStrategy`, a thin wrapper that forwards an inner strategy's events except `on_user_turn_stopped`. Use this to install a stop strategy as an inference trigger only, leaving finalization to a peer (e.g. the LLM completion strategy). Adds `llm_completion_user_turn_stop_strategies()` for the common case: UserTurnStrategies( stop=llm_completion_user_turn_stop_strategies(), ) Deprecates `LLMUserAggregatorParams.filter_incomplete_user_turns`. The aggregator emits a `DeprecationWarning`, wraps existing stop strategies with `deferred(...)`, and appends `LLMTurnCompletionUserTurnStopStrategy` automatically. --- ...turn-management-filter-incomplete-turns.py | 38 +++-- src/pipecat/frames/frames.py | 18 +++ .../aggregators/llm_response_universal.py | 144 +++++++++++++----- src/pipecat/turns/user_stop/__init__.py | 5 + .../user_stop/base_user_turn_stop_strategy.py | 30 +++- .../deferred_user_turn_stop_strategy.py | 104 +++++++++++++ ...turn_completion_user_turn_stop_strategy.py | 92 +++++++++++ .../turns/user_turn_completion_mixin.py | 14 +- src/pipecat/turns/user_turn_controller.py | 31 +++- src/pipecat/turns/user_turn_processor.py | 22 ++- src/pipecat/turns/user_turn_strategies.py | 39 +++++ tests/test_context_aggregators_universal.py | 138 ++++++++++++++++- tests/test_user_turn_completion_mixin.py | 61 +++++--- tests/test_user_turn_controller.py | 62 +++++++- 14 files changed, 712 insertions(+), 86 deletions(-) create mode 100644 src/pipecat/turns/user_stop/deferred_user_turn_stop_strategy.py create mode 100644 src/pipecat/turns/user_stop/llm_turn_completion_user_turn_stop_strategy.py diff --git a/examples/turn-management/turn-management-filter-incomplete-turns.py b/examples/turn-management/turn-management-filter-incomplete-turns.py index 81273aaaa..72037943b 100644 --- a/examples/turn-management/turn-management-filter-incomplete-turns.py +++ b/examples/turn-management/turn-management-filter-incomplete-turns.py @@ -41,6 +41,10 @@ from pipecat.services.openai.llm import OpenAILLMService from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams +from pipecat.turns.user_turn_strategies import ( + UserTurnStrategies, + llm_completion_user_turn_stop_strategies, +) load_dotenv(override=True) @@ -83,23 +87,31 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ) context = LLMContext() + # `llm_completion_user_turn_stop_strategies()` pairs the default + # stop strategies with `LLMTurnCompletionUserTurnStopStrategy`: + # those strategies trigger LLM inference but the public + # `on_user_turn_stopped` event fires only when the LLM confirms ✓. + # The LLM marks each response with one of: + # ✓ = complete (respond normally) + # ○ = incomplete short (wait 5s, then prompt) + # ◐ = incomplete long (wait 15s, then prompt) user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, user_params=LLMUserAggregatorParams( vad_analyzer=SileroVADAnalyzer(), - # Enable turn completion filtering - the LLM will output: - # ✓ = complete (respond normally) - # ○ = incomplete short (wait 5s, then prompt) - # ◐ = incomplete long (wait 15s, then prompt) - filter_incomplete_user_turns=True, - # Optional: customize turn completion behavior - # turn_completion_config=TurnCompletionConfig( - # incomplete_short_timeout=5.0, - # incomplete_long_timeout=15.0, - # incomplete_short_prompt="Custom prompt...", - # incomplete_long_prompt="Custom prompt...", - # instructions="Custom turn completion instructions...", - # ), + user_turn_strategies=UserTurnStrategies( + stop=llm_completion_user_turn_stop_strategies(), + # Optional: customize turn completion behavior + # stop=llm_completion_user_turn_stop_strategies( + # config=UserTurnCompletionConfig( + # incomplete_short_timeout=5.0, + # incomplete_long_timeout=15.0, + # incomplete_short_prompt="Custom prompt...", + # incomplete_long_prompt="Custom prompt...", + # instructions="Custom turn completion instructions...", + # ), + # ), + ), ), ) diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 24006428d..505c9ca1d 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -970,6 +970,24 @@ class UserSpeakingFrame(SystemFrame): pass +@dataclass +class UserTurnCompletedFrame(SystemFrame): + """Frame indicating that the user turn is semantically complete. + + Emitted by any component that can judge conversational turn + completeness — for example an LLM with turn-completion markers, an + STT service with built-in turn detection, or a dedicated + end-of-turn classifier. Stop strategies that gate the + user-turn-stop event on an external completeness signal (e.g. + ``LLMTurnCompletionUserTurnStopStrategy``) consume this frame to + finalize the turn. Producers should emit this frame only when they + judge the turn complete; an absence of this frame means the turn is + not yet considered complete. + """ + + pass + + @dataclass class VADUserStartedSpeakingFrame(SystemFrame): """Frame emitted when VAD definitively detects user started speaking. diff --git a/src/pipecat/processors/aggregators/llm_response_universal.py b/src/pipecat/processors/aggregators/llm_response_universal.py index d1bc110de..1f61f33bb 100644 --- a/src/pipecat/processors/aggregators/llm_response_universal.py +++ b/src/pipecat/processors/aggregators/llm_response_universal.py @@ -53,7 +53,6 @@ from pipecat.frames.frames import ( LLMThoughtEndFrame, LLMThoughtStartFrame, LLMThoughtTextFrame, - LLMUpdateSettingsFrame, StartFrame, TextFrame, TranscriptionFrame, @@ -79,7 +78,6 @@ from pipecat.processors.aggregators.llm_context_summarizer import ( SummaryAppliedEvent, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -from pipecat.services.settings import LLMSettings from pipecat.turns.user_idle_controller import UserIdleController from pipecat.turns.user_mute import BaseUserMuteStrategy from pipecat.turns.user_start import BaseUserTurnStartStrategy, UserTurnStartedParams @@ -100,25 +98,6 @@ class LLMUserAggregatorParams: """Parameters for configuring LLM user aggregation behavior. Parameters: - user_turn_strategies: User turn start and stop strategies. - user_mute_strategies: List of user mute strategies. - user_turn_stop_timeout: Time in seconds to wait before considering the - user's turn finished. - user_idle_timeout: Timeout in seconds for detecting user idle state. - The aggregator will emit an `on_user_turn_idle` event when the user - has been idle (not speaking) for this duration. Set to 0 to disable - idle detection. - vad_analyzer: Voice Activity Detection analyzer instance. - audio_idle_timeout: Timeout in seconds to force speech stop when - no audio frames are received while in SPEAKING state (e.g. user mutes - mic mid-speech). Set to 0 to disable. Defaults to 1.0. - filter_incomplete_user_turns: Whether to filter out incomplete user turns. - When enabled, the LLM outputs a turn completion marker at the start of - each response: ✓ (complete), ○ (incomplete short), or ◐ (incomplete long). - Incomplete responses are suppressed and timeouts trigger re-prompting. - user_turn_completion_config: Configuration for turn completion behavior including - custom instructions, timeouts, and prompts. Only used when - filter_incomplete_user_turns is True. add_tool_change_messages: When True, on each ``LLMSetToolsFrame`` the aggregator computes the diff against the currently advertised tools and appends a developer-role message to the context describing @@ -131,17 +110,59 @@ class LLMUserAggregatorParams: (LLM-specific) tools are ignored. When using ``LLMContextAggregatorPair``, prefer setting this via its ``add_tool_change_messages`` argument instead. Defaults to False. + audio_idle_timeout: Timeout in seconds to force speech stop when + no audio frames are received while in SPEAKING state (e.g. user mutes + mic mid-speech). Set to 0 to disable. Defaults to 1.0. + user_turn_strategies: User turn start and stop strategies. + user_mute_strategies: List of user mute strategies. + user_turn_stop_timeout: Time in seconds to wait before considering the + user's turn finished. + user_idle_timeout: Timeout in seconds for detecting user idle state. + The aggregator will emit an `on_user_turn_idle` event when the user + has been idle (not speaking) for this duration. Set to 0 to disable + idle detection. + vad_analyzer: Voice Activity Detection analyzer instance. + filter_incomplete_user_turns: [DEPRECATED] Add + :class:`~pipecat.turns.user_stop.LLMTurnCompletionUserTurnStopStrategy` + to ``user_turn_strategies.stop`` instead. When enabled, the LLM + outputs a turn-completion marker at the start of each response: + ✓ (complete), ○ (incomplete short), or ◐ (incomplete long). + Incomplete responses are suppressed and timeouts trigger + re-prompting. + user_turn_completion_config: [DEPRECATED] Configuration for turn + completion behavior including custom instructions, timeouts, and + prompts. Only used when filter_incomplete_user_turns is True + (deprecated path) — for the new strategy-based API, pass the config + directly to ``LLMTurnCompletionUserTurnStopStrategy(config=...)``. """ + add_tool_change_messages: bool = False + audio_idle_timeout: float = 1.0 user_turn_strategies: UserTurnStrategies | None = None user_mute_strategies: list[BaseUserMuteStrategy] = field(default_factory=list) user_turn_stop_timeout: float = 5.0 user_idle_timeout: float = 0 vad_analyzer: VADAnalyzer | None = None - audio_idle_timeout: float = 1.0 filter_incomplete_user_turns: bool = False user_turn_completion_config: UserTurnCompletionConfig | None = None - add_tool_change_messages: bool = False + + def __post_init__(self): + if self.filter_incomplete_user_turns: + warnings.warn( + "LLMUserAggregatorParams.filter_incomplete_user_turns is deprecated. " + "Add LLMTurnCompletionUserTurnStopStrategy to " + "user_turn_strategies.stop instead.", + DeprecationWarning, + stacklevel=2, + ) + if self.user_turn_completion_config: + warnings.warn( + "LLMUserAggregatorParams.user_turn_completion_config is deprecated. " + "Add LLMTurnCompletionUserTurnStopStrategy to " + "user_turn_strategies.stop instead.", + DeprecationWarning, + stacklevel=2, + ) @dataclass @@ -541,13 +562,35 @@ class LLMUserAggregator(LLMContextAggregator): self._register_event_handler("on_user_turn_stopped") self._register_event_handler("on_user_turn_stop_timeout") self._register_event_handler("on_user_turn_idle") + self._register_event_handler("on_user_turn_inference_triggered") self._register_event_handler("on_user_mute_started") self._register_event_handler("on_user_mute_stopped") user_turn_strategies = self._params.user_turn_strategies or UserTurnStrategies() + # Deprecated path: translate filter_incomplete_user_turns into + # wrapping pre-existing stop strategies with deferred() and + # appending LLMTurnCompletionUserTurnStopStrategy. The + # DeprecationWarning is emitted in LLMUserAggregatorParams.__post_init__. + if self._params.filter_incomplete_user_turns: + from pipecat.turns.user_stop import ( + LLMTurnCompletionUserTurnStopStrategy, + deferred, + ) + + existing_stop = list(user_turn_strategies.stop or []) + user_turn_strategies.stop = [deferred(s) for s in existing_stop] + [ + LLMTurnCompletionUserTurnStopStrategy( + config=self._params.user_turn_completion_config + ) + ] + self._user_is_muted = False self._user_turn_start_timestamp = "" + # Aggregation captured at inference-trigger time and surfaced again + # in `on_user_turn_stopped`. Set to None when no inference-triggered + # event has fired since the last finalization. + self._pending_user_turn_aggregation: str | None = None self._user_turn_controller = UserTurnController( user_turn_strategies=user_turn_strategies, @@ -558,6 +601,9 @@ class LLMUserAggregator(LLMContextAggregator): self._user_turn_controller.add_event_handler( "on_user_turn_started", self._on_user_turn_started ) + self._user_turn_controller.add_event_handler( + "on_user_turn_inference_triggered", self._on_user_turn_inference_triggered + ) self._user_turn_controller.add_event_handler( "on_user_turn_stopped", self._on_user_turn_stopped ) @@ -676,21 +722,6 @@ class LLMUserAggregator(LLMContextAggregator): for s in self._params.user_mute_strategies: await s.setup(self.task_manager) - # Enable incomplete turn filtering on the LLM if configured - if self._params.filter_incomplete_user_turns: - # Get config or use defaults - config = self._params.user_turn_completion_config or UserTurnCompletionConfig() - - # Enable the feature on the LLM with config - await self.push_frame( - LLMUpdateSettingsFrame( - delta=LLMSettings( - filter_incomplete_user_turns=True, - user_turn_completion_config=config, - ) - ) - ) - async def _stop(self, frame: EndFrame): await self._maybe_emit_user_turn_stopped(on_session_end=True) await self._cleanup() @@ -830,6 +861,7 @@ class LLMUserAggregator(LLMContextAggregator): logger.debug(f"{self}: User started speaking (strategy: {strategy})") self._user_turn_start_timestamp = time_now_iso8601() + self._pending_user_turn_aggregation = None if params.enable_user_speaking_frames: await self.broadcast_frame(UserStartedSpeakingFrame) @@ -841,6 +873,22 @@ class LLMUserAggregator(LLMContextAggregator): await self._call_event_handler("on_user_turn_started", strategy) + async def _on_user_turn_inference_triggered( + self, + controller: UserTurnController, + strategy: BaseUserTurnStopStrategy, + ): + logger.debug(f"{self}: User turn inference triggered (strategy: {strategy})") + + # Push aggregation now: this writes the user message to the context + # and pushes LLMContextFrame, which is what kicks LLM inference. + # `on_user_turn_stopped` later fires when the turn is semantically + # final and surfaces the aggregated message via the public event. + aggregation = await self.push_aggregation() + self._pending_user_turn_aggregation = aggregation + + await self._call_event_handler("on_user_turn_inference_triggered", strategy) + async def _on_user_turn_stopped( self, controller: UserTurnController, @@ -875,15 +923,31 @@ class LLMUserAggregator(LLMContextAggregator): ): """Maybe emit user turn stopped event. + The aggregation has typically already been pushed at + inference-trigger time and is cached in + ``self._pending_user_turn_aggregation``. Any aggregation that has + accumulated since the last inference-trigger (e.g. transcriptions + that arrived between inference trigger and finalization) is flushed + here so end-of-turn content is never lost. + Args: strategy: The strategy that triggered the turn stop. on_session_end: If True, only emit if there's unemitted content (avoids duplicate events when session ends). """ aggregation = await self.push_aggregation() - if not on_session_end or aggregation: + previous_aggregation = self._pending_user_turn_aggregation + self._pending_user_turn_aggregation = None + + content = None + if aggregation and previous_aggregation: + content = f"{previous_aggregation} {aggregation}".strip() + else: + content = previous_aggregation or aggregation + + if not on_session_end or content: message = UserTurnStoppedMessage( - content=aggregation, timestamp=self._user_turn_start_timestamp + content=content, timestamp=self._user_turn_start_timestamp ) await self._call_event_handler("on_user_turn_stopped", strategy, message) self._user_turn_start_timestamp = "" diff --git a/src/pipecat/turns/user_stop/__init__.py b/src/pipecat/turns/user_stop/__init__.py index 7ff676744..4effd6d5c 100644 --- a/src/pipecat/turns/user_stop/__init__.py +++ b/src/pipecat/turns/user_stop/__init__.py @@ -5,14 +5,19 @@ # from .base_user_turn_stop_strategy import BaseUserTurnStopStrategy, UserTurnStoppedParams +from .deferred_user_turn_stop_strategy import DeferredUserTurnStopStrategy, deferred from .external_user_turn_stop_strategy import ExternalUserTurnStopStrategy +from .llm_turn_completion_user_turn_stop_strategy import LLMTurnCompletionUserTurnStopStrategy from .speech_timeout_user_turn_stop_strategy import SpeechTimeoutUserTurnStopStrategy from .turn_analyzer_user_turn_stop_strategy import TurnAnalyzerUserTurnStopStrategy __all__ = [ "BaseUserTurnStopStrategy", + "DeferredUserTurnStopStrategy", "ExternalUserTurnStopStrategy", + "LLMTurnCompletionUserTurnStopStrategy", "SpeechTimeoutUserTurnStopStrategy", "UserTurnStoppedParams", "TurnAnalyzerUserTurnStopStrategy", + "deferred", ] diff --git a/src/pipecat/turns/user_stop/base_user_turn_stop_strategy.py b/src/pipecat/turns/user_stop/base_user_turn_stop_strategy.py index e5dc1bd66..6f303282c 100644 --- a/src/pipecat/turns/user_stop/base_user_turn_stop_strategy.py +++ b/src/pipecat/turns/user_stop/base_user_turn_stop_strategy.py @@ -45,7 +45,16 @@ class BaseUserTurnStopStrategy(BaseObject): Events triggered by strategies: - `on_push_frame`: Indicates the strategy wants to push a frame. - - `on_user_turn_stopped`: Signals that the user stopped speaking. + - `on_user_turn_inference_triggered`: Signals that enough evidence + exists to start LLM inference for the current user turn. In most + cases this fires together with `on_user_turn_stopped`. Strategies + that gate finalization on the LLM (e.g. + ``LLMTurnCompletionUserTurnStopStrategy``) fire only this event + upstream and a separate strategy fires `on_user_turn_stopped` once + the LLM confirms the turn is complete. + - `on_user_turn_stopped`: Signals that the user turn is semantically + final. Observers, transcript appenders, and UI indicators should + bind this event. """ @@ -64,6 +73,7 @@ class BaseUserTurnStopStrategy(BaseObject): self._task_manager: BaseTaskManager | None = None self._register_event_handler("on_push_frame", sync=True) self._register_event_handler("on_broadcast_frame", sync=True) + self._register_event_handler("on_user_turn_inference_triggered", sync=True) self._register_event_handler("on_user_turn_stopped", sync=True) @property @@ -123,7 +133,23 @@ class BaseUserTurnStopStrategy(BaseObject): await self._call_event_handler("on_broadcast_frame", frame_cls, **kwargs) async def trigger_user_turn_stopped(self): - """Trigger the `on_user_turn_stopped` event.""" + """Fire both ``on_user_turn_inference_triggered`` and ``on_user_turn_stopped``. + + Most strategies call this when they decide a turn has ended. To + defer finalization to another strategy (so this strategy fires + only the inference-triggered event), wrap this strategy with + :func:`~pipecat.turns.user_stop.deferred` instead of changing + the trigger call. + """ + await self.trigger_user_turn_inference_triggered() + await self.trigger_user_turn_finalized() + + async def trigger_user_turn_inference_triggered(self): + """Trigger only the `on_user_turn_inference_triggered` event.""" + await self._call_event_handler("on_user_turn_inference_triggered") + + async def trigger_user_turn_finalized(self): + """Trigger only the `on_user_turn_stopped` event.""" await self._call_event_handler( "on_user_turn_stopped", UserTurnStoppedParams(enable_user_speaking_frames=self._enable_user_speaking_frames), diff --git a/src/pipecat/turns/user_stop/deferred_user_turn_stop_strategy.py b/src/pipecat/turns/user_stop/deferred_user_turn_stop_strategy.py new file mode 100644 index 000000000..7f3296af3 --- /dev/null +++ b/src/pipecat/turns/user_stop/deferred_user_turn_stop_strategy.py @@ -0,0 +1,104 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Wrapper that defers a stop strategy's finalization to another strategy.""" + +from pipecat.frames.frames import Frame +from pipecat.turns.types import ProcessFrameResult +from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy +from pipecat.utils.asyncio.task_manager import BaseTaskManager + + +class DeferredUserTurnStopStrategy(BaseUserTurnStopStrategy): + """Wraps a stop strategy and suppresses its ``on_user_turn_stopped`` event. + + Event subscriptions added to the wrapper are forwarded directly to + the inner strategy, except for ``on_user_turn_stopped``, which is + dropped. The inner strategy's frame-side and inference-triggered + events therefore reach external listeners (the controller, etc.) + unchanged; finalization is left to another strategy in the chain + such as ``LLMTurnCompletionUserTurnStopStrategy``. + + Use the :func:`deferred` helper for ergonomic construction:: + + stop=[ + deferred(TurnAnalyzerUserTurnStopStrategy(turn_analyzer=...)), + LLMTurnCompletionUserTurnStopStrategy(), + ] + """ + + def __init__(self, inner: BaseUserTurnStopStrategy, **kwargs): + """Initialize the deferred wrapper. + + Args: + inner: The strategy whose finalization should be deferred. + **kwargs: Additional keyword arguments forwarded to the base + class. + """ + super().__init__(**kwargs) + self._inner = inner + + @property + def inner(self) -> BaseUserTurnStopStrategy: + """Return the wrapped strategy.""" + return self._inner + + def add_event_handler(self, event_name: str, handler): + """Forward event subscriptions to the inner strategy. + + ``on_user_turn_stopped`` is silently dropped — that's the whole + point of the wrapper. Every other event handler is attached to + the inner strategy directly, so the inner's events reach the + listener without any per-event proxy method on the wrapper. + """ + if event_name == "on_user_turn_stopped": + return + self._inner.add_event_handler(event_name, handler) + + async def setup(self, task_manager: BaseTaskManager): + """Set up the inner strategy.""" + await super().setup(task_manager) + await self._inner.setup(task_manager) + + async def cleanup(self): + """Clean up the inner strategy.""" + await super().cleanup() + await self._inner.cleanup() + + async def reset(self): + """Reset the inner strategy for a new user turn.""" + await super().reset() + await self._inner.reset() + + async def process_frame(self, frame: Frame) -> ProcessFrameResult | None: + """Forward frame processing to the inner strategy.""" + return await self._inner.process_frame(frame) + + +def deferred(strategy: BaseUserTurnStopStrategy) -> DeferredUserTurnStopStrategy: + """Defer this stop strategy's finalization to another strategy. + + Wraps ``strategy`` in a :class:`DeferredUserTurnStopStrategy`: the + inner strategy continues to drive inference-triggered events, but + its ``on_user_turn_stopped`` event is suppressed. Use when another + strategy in the chain (e.g. + ``LLMTurnCompletionUserTurnStopStrategy``) owns finalization. + + Example:: + + stop=[ + deferred(TurnAnalyzerUserTurnStopStrategy(turn_analyzer=...)), + LLMTurnCompletionUserTurnStopStrategy(), + ] + + Args: + strategy: The stop strategy to defer. + + Returns: + A wrapper that exposes the inner strategy's behavior with + finalization suppressed. + """ + return DeferredUserTurnStopStrategy(strategy) diff --git a/src/pipecat/turns/user_stop/llm_turn_completion_user_turn_stop_strategy.py b/src/pipecat/turns/user_stop/llm_turn_completion_user_turn_stop_strategy.py new file mode 100644 index 000000000..e75a6d3da --- /dev/null +++ b/src/pipecat/turns/user_stop/llm_turn_completion_user_turn_stop_strategy.py @@ -0,0 +1,92 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""User turn stop strategy gated on the LLM's turn-completion verdict.""" + +from pipecat.frames.frames import ( + Frame, + LLMUpdateSettingsFrame, + StartFrame, + UserTurnCompletedFrame, +) +from pipecat.services.settings import LLMSettings +from pipecat.turns.types import ProcessFrameResult +from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy +from pipecat.turns.user_turn_completion_mixin import UserTurnCompletionConfig + + +class LLMTurnCompletionUserTurnStopStrategy(BaseUserTurnStopStrategy): + """User turn stop strategy that finalizes only when the LLM agrees. + + This strategy lets another stop strategy (e.g. smart-turn analyzer) + trigger LLM inference, then defers the public ``on_user_turn_stopped`` + event until the LLM emits a ``UserTurnCompletedFrame``. On + ``incomplete_short`` / ``incomplete_long`` markers the + :class:`~pipecat.turns.user_turn_completion_mixin.UserTurnCompletionLLMServiceMixin` + re-prompts the LLM internally and no completion frame is emitted. + + To use this strategy, install it alongside one or more upstream stop + strategies in ``UserTurnStrategies.stop`` and wrap those upstream + strategies with :func:`~pipecat.turns.user_stop.deferred` so they + fire only ``on_user_turn_inference_triggered`` and leave + finalization to this strategy. The aggregator's deprecation path + for ``filter_incomplete_user_turns`` does this rewiring + automatically. + + If the LLM never returns a completion frame (malformed output, + unreachable service, etc.), the controller's + ``user_turn_stop_timeout`` watchdog is the safety net — it fires + ``on_user_turn_stopped`` after no activity for that many seconds. + Tune ``user_turn_stop_timeout`` higher if your LLM regularly takes + longer than the default to respond. + + On ``StartFrame`` the strategy pushes an ``LLMUpdateSettingsFrame`` + upstream that enables ``filter_incomplete_user_turns`` on the LLM + service and seeds the + :class:`~pipecat.turns.user_turn_completion_mixin.UserTurnCompletionConfig`. + """ + + def __init__( + self, + *, + config: UserTurnCompletionConfig | None = None, + **kwargs, + ): + """Initialize the LLM turn-completion stop strategy. + + Args: + config: Configuration applied to the LLM via the + ``filter_incomplete_user_turns`` setting on + ``StartFrame``. Defaults to ``UserTurnCompletionConfig()``. + **kwargs: Additional keyword arguments forwarded to the base + class. + """ + super().__init__(**kwargs) + self._config = config or UserTurnCompletionConfig() + + @property + def config(self) -> UserTurnCompletionConfig: + """Return the configured ``UserTurnCompletionConfig``.""" + return self._config + + async def process_frame(self, frame: Frame) -> ProcessFrameResult: + """Observe frames to drive the finalization decision.""" + if isinstance(frame, StartFrame): + await self._configure_llm() + elif isinstance(frame, UserTurnCompletedFrame): + await self.trigger_user_turn_finalized() + + return ProcessFrameResult.CONTINUE + + async def _configure_llm(self): + await self.push_frame( + LLMUpdateSettingsFrame( + delta=LLMSettings( + filter_incomplete_user_turns=True, + user_turn_completion_config=self._config, + ) + ) + ) diff --git a/src/pipecat/turns/user_turn_completion_mixin.py b/src/pipecat/turns/user_turn_completion_mixin.py index 51d6c7828..606ba9902 100644 --- a/src/pipecat/turns/user_turn_completion_mixin.py +++ b/src/pipecat/turns/user_turn_completion_mixin.py @@ -25,6 +25,7 @@ from pipecat.frames.frames import ( LLMMessagesAppendFrame, LLMRunFrame, LLMTextFrame, + UserTurnCompletedFrame, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor @@ -279,7 +280,7 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor): await asyncio.sleep(timeout) # Timeout expired - reset state before prompting LLM - logger.info(f"Incomplete {incomplete_type} timeout expired, prompting LLM") + logger.debug(f"Incomplete {incomplete_type} timeout expired, prompting LLM") await self._turn_reset() self._incomplete_timeout_task = None self._incomplete_type = None @@ -402,6 +403,10 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor): ) self._turn_suppressed = True + # No UserTurnCompletedFrame is broadcast here: the turn is + # explicitly not complete. The re-prompt path is driven by + # this mixin's own timeout. + # Push the marker with skip_tts=True so it's added to context (maintains # conversation continuity per prompt instructions) but not spoken by TTS frame = LLMTextFrame(self._turn_text_buffer) @@ -416,6 +421,13 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor): if USER_TURN_COMPLETE_MARKER in self._turn_text_buffer: logger.debug(f"COMPLETE ({USER_TURN_COMPLETE_MARKER}) detected, pushing buffered text") + # Broadcast that the user turn is complete so a stop strategy + # gating finalization on this signal (e.g. + # LLMTurnCompletionUserTurnStopStrategy) can fire + # `on_user_turn_stopped`. Must fire before the LLMTextFrame so + # downstream consumers see the signal before the response. + await self.broadcast_frame(UserTurnCompletedFrame) + # Split buffer at the marker to handle cases where marker and text # arrive in the same chunk (e.g., "✓ Hello!" from some LLMs) marker_pos = self._turn_text_buffer.index(USER_TURN_COMPLETE_MARKER) diff --git a/src/pipecat/turns/user_turn_controller.py b/src/pipecat/turns/user_turn_controller.py index 2ba4e304b..974a8ff0f 100644 --- a/src/pipecat/turns/user_turn_controller.py +++ b/src/pipecat/turns/user_turn_controller.py @@ -38,7 +38,11 @@ class UserTurnController(BaseObject): Event handlers available: - on_user_turn_started: Emitted when a user turn starts. - - on_user_turn_stopped: Emitted when a user turn stops. + - on_user_turn_inference_triggered: Emitted when enough signal exists to + start LLM inference. Fires together with `on_user_turn_stopped` for + most strategies; fires alone when a downstream strategy gates + finalization on the LLM's verdict. + - on_user_turn_stopped: Emitted when a user turn is semantically final. - on_user_turn_stop_timeout: Emitted if no stop strategy triggers before timeout. - on_push_frame: Emitted when a strategy wants to push a frame. - on_broadcast_frame: Emitted when a strategy wants to broadcast a frame. @@ -49,6 +53,10 @@ class UserTurnController(BaseObject): async def on_user_turn_started(controller, strategy: BaseUserTurnStartStrategy, params: UserTurnStartedParams): ... + @controller.event_handler("on_user_turn_inference_triggered") + async def on_user_turn_inference_triggered(controller, strategy: BaseUserTurnStopStrategy): + ... + @controller.event_handler("on_user_turn_stopped") async def on_user_turn_stopped(controller, strategy: BaseUserTurnStopStrategy, params: UserTurnStoppedParams): ... @@ -95,6 +103,7 @@ class UserTurnController(BaseObject): self._register_event_handler("on_push_frame", sync=True) self._register_event_handler("on_broadcast_frame", sync=True) self._register_event_handler("on_user_turn_started", sync=True) + self._register_event_handler("on_user_turn_inference_triggered", sync=True) self._register_event_handler("on_user_turn_stopped", sync=True) self._register_event_handler("on_user_turn_stop_timeout", sync=True) self._register_event_handler("on_reset_aggregation", sync=True) @@ -186,6 +195,9 @@ class UserTurnController(BaseObject): await s.setup(self.task_manager) s.add_event_handler("on_push_frame", self._on_push_frame) s.add_event_handler("on_broadcast_frame", self._on_broadcast_frame) + s.add_event_handler( + "on_user_turn_inference_triggered", self._on_user_turn_inference_triggered + ) s.add_event_handler("on_user_turn_stopped", self._on_user_turn_stopped) async def _cleanup_strategies(self): @@ -246,6 +258,9 @@ class UserTurnController(BaseObject): ): await self._trigger_user_turn_start(strategy, params) + async def _on_user_turn_inference_triggered(self, strategy: BaseUserTurnStopStrategy): + await self._trigger_user_turn_inference_triggered(strategy) + async def _on_user_turn_stopped( self, strategy: BaseUserTurnStopStrategy, params: UserTurnStoppedParams ): @@ -274,6 +289,20 @@ class UserTurnController(BaseObject): await self._call_event_handler("on_user_turn_started", strategy, params) + async def _trigger_user_turn_inference_triggered( + self, strategy: BaseUserTurnStopStrategy | None + ): + # Inference-triggered fires only while a turn is active. The turn + # remains active afterward — only `on_user_turn_stopped` flips state. + if not self._user_turn: + return + + # Re-arm the stop watchdog so a stuck turn (inference fired but + # finalization never arrives) still times out and finalizes. + self._user_turn_stop_timeout_event.set() + + await self._call_event_handler("on_user_turn_inference_triggered", strategy) + async def _trigger_user_turn_stop( self, strategy: BaseUserTurnStopStrategy | None, params: UserTurnStoppedParams ): diff --git a/src/pipecat/turns/user_turn_processor.py b/src/pipecat/turns/user_turn_processor.py index 536d297b5..54fe533e0 100644 --- a/src/pipecat/turns/user_turn_processor.py +++ b/src/pipecat/turns/user_turn_processor.py @@ -35,7 +35,11 @@ class UserTurnProcessor(FrameProcessor): Event handlers available: - on_user_turn_started: Emitted when a user turn starts. - - on_user_turn_stopped: Emitted when a user turn stops. + - on_user_turn_inference_triggered: Emitted when enough signal exists to + start LLM inference. Fires together with `on_user_turn_stopped` for + most strategies; fires alone when a downstream strategy gates + finalization on the LLM's verdict. + - on_user_turn_stopped: Emitted when a user turn is semantically final. - on_user_turn_stop_timeout: Emitted if no stop strategy triggers before timeout. - on_user_turn_idle: Emitted when the user has been idle for the configured timeout. @@ -45,6 +49,10 @@ class UserTurnProcessor(FrameProcessor): async def on_user_turn_started(processor, strategy: BaseUserTurnStartStrategy): ... + @processor.event_handler("on_user_turn_inference_triggered") + async def on_user_turn_inference_triggered(processor, strategy: BaseUserTurnStopStrategy): + ... + @processor.event_handler("on_user_turn_stopped") async def on_user_turn_stopped(processor, strategy: BaseUserTurnStopStrategy): ... @@ -85,6 +93,7 @@ class UserTurnProcessor(FrameProcessor): self._register_event_handler("on_user_turn_stopped") self._register_event_handler("on_user_turn_stop_timeout") self._register_event_handler("on_user_turn_idle") + self._register_event_handler("on_user_turn_inference_triggered") self._user_turn_controller = UserTurnController( user_turn_strategies=user_turn_strategies or UserTurnStrategies(), @@ -101,6 +110,9 @@ class UserTurnProcessor(FrameProcessor): self._user_turn_controller.add_event_handler( "on_user_turn_stop_timeout", self._on_user_turn_stop_timeout ) + self._user_turn_controller.add_event_handler( + "on_user_turn_inference_triggered", self._on_user_turn_inference_triggered + ) self._user_idle_controller = UserIdleController(user_idle_timeout=user_idle_timeout) self._user_idle_controller.add_event_handler("on_user_turn_idle", self._on_user_turn_idle) @@ -204,3 +216,11 @@ class UserTurnProcessor(FrameProcessor): async def _on_user_turn_idle(self, controller): await self._call_event_handler("on_user_turn_idle") + + async def _on_user_turn_inference_triggered( + self, + controller: UserTurnController, + strategy: BaseUserTurnStopStrategy, + ): + logger.debug(f"{self}: User turn inference triggered (strategy: {strategy})") + await self._call_event_handler("on_user_turn_inference_triggered", strategy) diff --git a/src/pipecat/turns/user_turn_strategies.py b/src/pipecat/turns/user_turn_strategies.py index 8bbe87a7a..56c7d9e9f 100644 --- a/src/pipecat/turns/user_turn_strategies.py +++ b/src/pipecat/turns/user_turn_strategies.py @@ -17,8 +17,11 @@ from pipecat.turns.user_start import ( from pipecat.turns.user_stop import ( BaseUserTurnStopStrategy, ExternalUserTurnStopStrategy, + LLMTurnCompletionUserTurnStopStrategy, TurnAnalyzerUserTurnStopStrategy, + deferred, ) +from pipecat.turns.user_turn_completion_mixin import UserTurnCompletionConfig def default_user_turn_start_strategies() -> list[BaseUserTurnStartStrategy]: @@ -48,6 +51,42 @@ def default_user_turn_stop_strategies() -> list[BaseUserTurnStopStrategy]: return [TurnAnalyzerUserTurnStopStrategy(turn_analyzer=LocalSmartTurnAnalyzerV3())] +def llm_completion_user_turn_stop_strategies( + strategies: list[BaseUserTurnStopStrategy] | None = None, + *, + config: UserTurnCompletionConfig | None = None, +) -> list[BaseUserTurnStopStrategy]: + """Build a stop-strategy list gated on the LLM's turn-completion verdict. + + Wraps ``strategies`` with :func:`deferred` so they trigger inference + but don't fire ``on_user_turn_stopped`` themselves, then appends + :class:`~pipecat.turns.user_stop.LLMTurnCompletionUserTurnStopStrategy` + as the finalizer. Use as the ``stop`` field of a + :class:`UserTurnStrategies`:: + + UserTurnStrategies( + stop=llm_completion_user_turn_stop_strategies(), + ) + + Args: + strategies: Stop strategies that should drive inference. If + None, uses :func:`default_user_turn_stop_strategies`. + config: Optional configuration applied to the LLM via the + ``filter_incomplete_user_turns`` setting. Customizes the + turn-completion instructions, incomplete-turn timeouts, and + re-prompts. + + Returns: + ``[deferred(s) for s in strategies] + + [LLMTurnCompletionUserTurnStopStrategy(config=config)]``. + """ + strategies = strategies if strategies is not None else default_user_turn_stop_strategies() + return [ + *(deferred(s) for s in strategies), + LLMTurnCompletionUserTurnStopStrategy(config=config), + ] + + @dataclass class UserTurnStrategies: """Container for user turn start and stop strategies. diff --git a/tests/test_context_aggregators_universal.py b/tests/test_context_aggregators_universal.py index cefc1b200..01ca70c80 100644 --- a/tests/test_context_aggregators_universal.py +++ b/tests/test_context_aggregators_universal.py @@ -179,8 +179,16 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase): assert context.messages[0]["content"] == "Hi there!" async def test_llm_messages_update_does_not_inject_turn_completion_into_context(self): + from pipecat.turns.user_turn_strategies import ( + llm_completion_user_turn_stop_strategies, + ) + context = LLMContext() - params = LLMUserAggregatorParams(filter_incomplete_user_turns=True) + params = LLMUserAggregatorParams( + user_turn_strategies=UserTurnStrategies( + stop=llm_completion_user_turn_stop_strategies(), + ), + ) pipeline = Pipeline([LLMUserAggregator(context, params=params)]) new_messages = [ @@ -291,8 +299,8 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase): UserStartedSpeakingFrame, InterruptionFrame, VADUserStoppedSpeakingFrame, - UserStoppedSpeakingFrame, LLMContextFrame, + UserStoppedSpeakingFrame, ] await run_test( pipeline, @@ -563,6 +571,132 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase): expected_down_frames=[SpeechControlParamsFrame], ) + async def test_inference_triggered_event_fires_on_default_strategies(self): + """Default flow fires inference-triggered before stopped, both with the same strategy.""" + from pipecat.frames.frames import UserTurnCompletedFrame # noqa: F401 + + context = LLMContext() + user_aggregator = LLMUserAggregator( + context, + params=LLMUserAggregatorParams( + user_turn_strategies=UserTurnStrategies( + stop=[ + SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT) + ] + ), + ), + ) + + events: list[str] = [] + + @user_aggregator.event_handler("on_user_turn_inference_triggered") + async def on_inference_triggered(aggregator, strategy): + events.append("inference_triggered") + + @user_aggregator.event_handler("on_user_turn_stopped") + async def on_stopped(aggregator, strategy, message): + events.append(f"stopped:{message.content}") + + pipeline = Pipeline([user_aggregator]) + frames_to_send = [ + VADUserStartedSpeakingFrame(), + TranscriptionFrame(text="Hi!", user_id="", timestamp="now"), + SleepFrame(), + VADUserStoppedSpeakingFrame(), + SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.1), + ] + await run_test(pipeline, frames_to_send=frames_to_send) + + self.assertEqual(events, ["inference_triggered", "stopped:Hi!"]) + + async def test_filter_incomplete_user_turns_emits_deprecation_warning(self): + """Setting the legacy flag emits a DeprecationWarning.""" + import warnings + + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + LLMUserAggregatorParams(filter_incomplete_user_turns=True) + matched = [ + x + for x in w + if issubclass(x.category, DeprecationWarning) + and "filter_incomplete_user_turns" in str(x.message) + ] + self.assertTrue(matched, "expected a DeprecationWarning") + + async def test_filter_incomplete_user_turns_installs_strategy(self): + """Legacy flag wraps existing stops with deferred() and appends the LLM strategy.""" + import warnings + + from pipecat.turns.user_stop import ( + DeferredUserTurnStopStrategy, + LLMTurnCompletionUserTurnStopStrategy, + SpeechTimeoutUserTurnStopStrategy, + ) + + existing = SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT) + + context = LLMContext() + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + params = LLMUserAggregatorParams( + filter_incomplete_user_turns=True, + user_turn_strategies=UserTurnStrategies(stop=[existing]), + ) + aggregator = LLMUserAggregator(context, params=params) + + stop_strategies = aggregator._params.user_turn_strategies.stop + self.assertEqual(len(stop_strategies), 2) + self.assertIsInstance(stop_strategies[0], DeferredUserTurnStopStrategy) + self.assertIs(stop_strategies[0].inner, existing) + self.assertIsInstance(stop_strategies[1], LLMTurnCompletionUserTurnStopStrategy) + + async def test_llm_completion_strategy_finalizes_on_complete_marker(self): + """LLMTurnCompletionUserTurnStopStrategy finalizes only on UserTurnCompletedFrame(complete).""" + from pipecat.frames.frames import UserTurnCompletedFrame + from pipecat.turns.user_stop import LLMTurnCompletionUserTurnStopStrategy, deferred + + gating = LLMTurnCompletionUserTurnStopStrategy() + upstream = deferred( + SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT) + ) + context = LLMContext() + user_aggregator = LLMUserAggregator( + context, + params=LLMUserAggregatorParams( + user_turn_strategies=UserTurnStrategies(stop=[upstream, gating]), + ), + ) + + events: list[str] = [] + + @user_aggregator.event_handler("on_user_turn_inference_triggered") + async def on_inference_triggered(aggregator, strategy): + events.append("inference_triggered") + + @user_aggregator.event_handler("on_user_turn_stopped") + async def on_stopped(aggregator, strategy, message): + events.append("stopped") + + pipeline = Pipeline([user_aggregator]) + + # Drive the pipeline. Inference fires after the upstream + # strategy's timeout. Stop fires only when UserTurnCompletedFrame + # arrives (producer absence == "not yet complete"). + frames_to_send = [ + VADUserStartedSpeakingFrame(), + TranscriptionFrame(text="Hi", user_id="", timestamp="now"), + SleepFrame(), + VADUserStoppedSpeakingFrame(), + SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.1), + # At this point inference_triggered should have fired but NOT stopped. + UserTurnCompletedFrame(), + SleepFrame(), + ] + await run_test(pipeline, frames_to_send=frames_to_send) + + self.assertEqual(events, ["inference_triggered", "stopped"]) + class TestLLMAssistantAggregator(unittest.IsolatedAsyncioTestCase): async def test_empty(self): diff --git a/tests/test_user_turn_completion_mixin.py b/tests/test_user_turn_completion_mixin.py index e503ecbf2..66a48b51f 100644 --- a/tests/test_user_turn_completion_mixin.py +++ b/tests/test_user_turn_completion_mixin.py @@ -8,7 +8,7 @@ import unittest import unittest.mock from unittest.mock import AsyncMock -from pipecat.frames.frames import LLMFullResponseEndFrame, LLMTextFrame +from pipecat.frames.frames import LLMFullResponseEndFrame, LLMTextFrame, UserTurnCompletedFrame from pipecat.processors.frame_processor import FrameProcessor from pipecat.services.llm_service import LLMService from pipecat.services.settings import LLMSettings @@ -44,21 +44,25 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase # Simulate LLM generating: "✓ Hello there!" await processor._push_turn_text(f"{USER_TURN_COMPLETE_MARKER} Hello there!") - # Should have 2 text frames: marker (skip_tts) and content (normal) - self.assertEqual(len(pushed_frames), 2) + # Two LLMTextFrames: marker (skip_tts) and content (normal). The + # broadcast also pushes UserTurnCompletedFrame upstream + downstream. + text_frames = [f for f in pushed_frames if isinstance(f, LLMTextFrame)] + self.assertEqual(len(text_frames), 2) - # First frame should be the marker with skip_tts=True - self.assertIsInstance(pushed_frames[0], LLMTextFrame) - self.assertEqual(pushed_frames[0].text, USER_TURN_COMPLETE_MARKER) - self.assertTrue(pushed_frames[0].skip_tts) + # First text frame should be the marker with skip_tts=True + self.assertEqual(text_frames[0].text, USER_TURN_COMPLETE_MARKER) + self.assertTrue(text_frames[0].skip_tts) - # Second frame should be the actual text without skip_tts - self.assertIsInstance(pushed_frames[1], LLMTextFrame) - self.assertEqual(pushed_frames[1].text, "Hello there!") - self.assertFalse(pushed_frames[1].skip_tts) + # Second text frame should be the actual text without skip_tts + self.assertEqual(text_frames[1].text, "Hello there!") + self.assertFalse(text_frames[1].skip_tts) + + # UserTurnCompletedFrame broadcast in both directions. + completed = [f for f in pushed_frames if isinstance(f, UserTurnCompletedFrame)] + self.assertEqual(len(completed), 2) async def test_incomplete_short_marker_suppresses_text(self): - """Test that ○ marker suppresses text with skip_tts.""" + """Test that ○ marker suppresses text with skip_tts and emits no completed frame.""" processor = MockProcessor() pushed_frames = [] @@ -70,14 +74,17 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase await processor._push_turn_text(USER_TURN_INCOMPLETE_SHORT_MARKER) - # Should have 1 text frame with skip_tts=True - self.assertEqual(len(pushed_frames), 1) - self.assertIsInstance(pushed_frames[0], LLMTextFrame) - self.assertEqual(pushed_frames[0].text, USER_TURN_INCOMPLETE_SHORT_MARKER) - self.assertTrue(pushed_frames[0].skip_tts) + text_frames = [f for f in pushed_frames if isinstance(f, LLMTextFrame)] + self.assertEqual(len(text_frames), 1) + self.assertEqual(text_frames[0].text, USER_TURN_INCOMPLETE_SHORT_MARKER) + self.assertTrue(text_frames[0].skip_tts) + + # Incomplete markers do not emit UserTurnCompletedFrame. + completed = [f for f in pushed_frames if isinstance(f, UserTurnCompletedFrame)] + self.assertEqual(len(completed), 0) async def test_incomplete_long_marker_suppresses_text(self): - """Test that ◐ marker suppresses text with skip_tts.""" + """Test that ◐ marker suppresses text with skip_tts and emits no completed frame.""" processor = MockProcessor() pushed_frames = [] @@ -89,11 +96,13 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase await processor._push_turn_text(USER_TURN_INCOMPLETE_LONG_MARKER) - # Should have 1 text frame with skip_tts=True - self.assertEqual(len(pushed_frames), 1) - self.assertIsInstance(pushed_frames[0], LLMTextFrame) - self.assertEqual(pushed_frames[0].text, USER_TURN_INCOMPLETE_LONG_MARKER) - self.assertTrue(pushed_frames[0].skip_tts) + text_frames = [f for f in pushed_frames if isinstance(f, LLMTextFrame)] + self.assertEqual(len(text_frames), 1) + self.assertEqual(text_frames[0].text, USER_TURN_INCOMPLETE_LONG_MARKER) + self.assertTrue(text_frames[0].skip_tts) + + completed = [f for f in pushed_frames if isinstance(f, UserTurnCompletedFrame)] + self.assertEqual(len(completed), 0) async def test_text_buffered_until_marker_found(self): """Test that text is buffered until a marker is detected.""" @@ -114,8 +123,10 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase # Now send the complete marker await processor._push_turn_text(f" {USER_TURN_COMPLETE_MARKER} How are you?") - # Now frames should be pushed - self.assertEqual(len(pushed_frames), 2) + # Two LLMTextFrames pushed (marker + content) plus the + # UserTurnCompletedFrame broadcast. + text_frames = [f for f in pushed_frames if isinstance(f, LLMTextFrame)] + self.assertEqual(len(text_frames), 2) async def test_turn_state_reset_after_llm_full_response_end_frame(self): """Test that _turn_complete_found is reset when LLMFullResponseEndFrame is pushed.""" diff --git a/tests/test_user_turn_controller.py b/tests/test_user_turn_controller.py index 2883d39bd..8365d48df 100644 --- a/tests/test_user_turn_controller.py +++ b/tests/test_user_turn_controller.py @@ -19,7 +19,7 @@ from pipecat.turns.user_start import VADUserTurnStartStrategy from pipecat.turns.user_start.min_words_user_turn_start_strategy import ( MinWordsUserTurnStartStrategy, ) -from pipecat.turns.user_stop import SpeechTimeoutUserTurnStopStrategy +from pipecat.turns.user_stop import SpeechTimeoutUserTurnStopStrategy, deferred from pipecat.turns.user_turn_controller import UserTurnController from pipecat.turns.user_turn_strategies import ExternalUserTurnStrategies, UserTurnStrategies from pipecat.utils.asyncio.task_manager import TaskManager, TaskManagerParams @@ -71,6 +71,66 @@ class TestUserTurnController(unittest.IsolatedAsyncioTestCase): await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1) self.assertTrue(should_stop) + async def test_inference_triggered_fires_alongside_stopped(self): + """Default strategies fire both inference-triggered and stopped, in order.""" + controller = UserTurnController( + user_turn_strategies=UserTurnStrategies( + stop=[SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT)], + ) + ) + + await controller.setup(self.task_manager) + + events: list[str] = [] + + @controller.event_handler("on_user_turn_inference_triggered") + async def on_user_turn_inference_triggered(controller, strategy): + events.append("inference_triggered") + + @controller.event_handler("on_user_turn_stopped") + async def on_user_turn_stopped(controller, strategy, params): + events.append("stopped") + + await controller.process_frame(VADUserStartedSpeakingFrame()) + await controller.process_frame( + TranscriptionFrame(text="Hello!", user_id="", timestamp="now") + ) + await controller.process_frame(VADUserStoppedSpeakingFrame()) + await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1) + + self.assertEqual(events, ["inference_triggered", "stopped"]) + + async def test_deferred_wrapper_skips_stopped(self): + """A deferred() wrapper drops the inner strategy's on_user_turn_stopped event.""" + wrapped = deferred( + SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT) + ) + controller = UserTurnController(user_turn_strategies=UserTurnStrategies(stop=[wrapped])) + + await controller.setup(self.task_manager) + + events: list[str] = [] + + @controller.event_handler("on_user_turn_inference_triggered") + async def on_user_turn_inference_triggered(controller, strategy): + events.append("inference_triggered") + + @controller.event_handler("on_user_turn_stopped") + async def on_user_turn_stopped(controller, strategy, params): + events.append("stopped") + + await controller.process_frame(VADUserStartedSpeakingFrame()) + await controller.process_frame( + TranscriptionFrame(text="Hello!", user_id="", timestamp="now") + ) + await controller.process_frame(VADUserStoppedSpeakingFrame()) + await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1) + + # The inner strategy fires inference-triggered (forwarded by the + # wrapper). Finalization is suppressed, but the controller's + # stop watchdog eventually fires `stopped`. + self.assertEqual(events[0], "inference_triggered") + async def test_user_turn_start_reset(self): controller = UserTurnController( user_turn_strategies=UserTurnStrategies( From 2281cd83591fcd5b0a76d28966fcee4feec4784e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 1 May 2026 15:09:30 -0700 Subject: [PATCH 2/9] Extract ExternalUserTurnCompletionStopStrategy as a reusable base MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `LLMTurnCompletionUserTurnStopStrategy` previously bundled two concerns: pushing `LLMUpdateSettingsFrame` on `StartFrame`, and finalizing the turn on `UserTurnCompletedFrame`. The latter is producer-agnostic — any component that emits `UserTurnCompletedFrame` (STT with built-in turn detection, dedicated end-of-turn classifiers, custom code) can drive finalization the same way. Move the frame-handling half into a new `ExternalUserTurnCompletionStopStrategy`. The LLM-specific subclass now only adds the settings-frame push and inherits finalization. Mirrors the existing `ExternalUserTurnStopStrategy` naming pattern. --- src/pipecat/turns/user_stop/__init__.py | 2 + ...rnal_user_turn_completion_stop_strategy.py | 47 +++++++++++++ ...turn_completion_user_turn_stop_strategy.py | 66 ++++++++----------- 3 files changed, 77 insertions(+), 38 deletions(-) create mode 100644 src/pipecat/turns/user_stop/external_user_turn_completion_stop_strategy.py diff --git a/src/pipecat/turns/user_stop/__init__.py b/src/pipecat/turns/user_stop/__init__.py index 4effd6d5c..14eea8407 100644 --- a/src/pipecat/turns/user_stop/__init__.py +++ b/src/pipecat/turns/user_stop/__init__.py @@ -6,6 +6,7 @@ from .base_user_turn_stop_strategy import BaseUserTurnStopStrategy, UserTurnStoppedParams from .deferred_user_turn_stop_strategy import DeferredUserTurnStopStrategy, deferred +from .external_user_turn_completion_stop_strategy import ExternalUserTurnCompletionStopStrategy from .external_user_turn_stop_strategy import ExternalUserTurnStopStrategy from .llm_turn_completion_user_turn_stop_strategy import LLMTurnCompletionUserTurnStopStrategy from .speech_timeout_user_turn_stop_strategy import SpeechTimeoutUserTurnStopStrategy @@ -14,6 +15,7 @@ from .turn_analyzer_user_turn_stop_strategy import TurnAnalyzerUserTurnStopStrat __all__ = [ "BaseUserTurnStopStrategy", "DeferredUserTurnStopStrategy", + "ExternalUserTurnCompletionStopStrategy", "ExternalUserTurnStopStrategy", "LLMTurnCompletionUserTurnStopStrategy", "SpeechTimeoutUserTurnStopStrategy", diff --git a/src/pipecat/turns/user_stop/external_user_turn_completion_stop_strategy.py b/src/pipecat/turns/user_stop/external_user_turn_completion_stop_strategy.py new file mode 100644 index 000000000..c132a5db8 --- /dev/null +++ b/src/pipecat/turns/user_stop/external_user_turn_completion_stop_strategy.py @@ -0,0 +1,47 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""User turn stop strategy that finalizes on ``UserTurnCompletedFrame``.""" + +from pipecat.frames.frames import Frame, UserTurnCompletedFrame +from pipecat.turns.types import ProcessFrameResult +from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy + + +class ExternalUserTurnCompletionStopStrategy(BaseUserTurnStopStrategy): + """Finalize the user turn whenever a ``UserTurnCompletedFrame`` arrives. + + Generic stop strategy for pipelines where some external component + (LLM with completion markers, STT with built-in turn detection, a + dedicated end-of-turn classifier, custom user code, etc.) judges + when a turn is semantically complete and emits + :class:`~pipecat.frames.frames.UserTurnCompletedFrame`. + + Pair this with one or more ``deferred(...)``-wrapped detector + strategies that drive ``on_user_turn_inference_triggered`` but + leave finalization to this strategy:: + + stop=[ + deferred(TurnAnalyzerUserTurnStopStrategy(turn_analyzer=...)), + ExternalUserTurnCompletionStopStrategy(), + ] + + For LLM-completion-marker gating specifically, use the subclass + :class:`~pipecat.turns.user_stop.LLMTurnCompletionUserTurnStopStrategy` + instead, which additionally pushes the ``LLMUpdateSettingsFrame`` + that enables the marker protocol on the LLM. + + If the producer never emits ``UserTurnCompletedFrame``, the + controller's ``user_turn_stop_timeout`` watchdog finalizes the + turn after no activity. Tune that timeout if your producer can + take longer than the default to respond. + """ + + async def process_frame(self, frame: Frame) -> ProcessFrameResult: + """Fire ``on_user_turn_stopped`` whenever ``UserTurnCompletedFrame`` is seen.""" + if isinstance(frame, UserTurnCompletedFrame): + await self.trigger_user_turn_finalized() + return ProcessFrameResult.CONTINUE diff --git a/src/pipecat/turns/user_stop/llm_turn_completion_user_turn_stop_strategy.py b/src/pipecat/turns/user_stop/llm_turn_completion_user_turn_stop_strategy.py index e75a6d3da..08dea6df1 100644 --- a/src/pipecat/turns/user_stop/llm_turn_completion_user_turn_stop_strategy.py +++ b/src/pipecat/turns/user_stop/llm_turn_completion_user_turn_stop_strategy.py @@ -6,47 +6,40 @@ """User turn stop strategy gated on the LLM's turn-completion verdict.""" -from pipecat.frames.frames import ( - Frame, - LLMUpdateSettingsFrame, - StartFrame, - UserTurnCompletedFrame, -) +from pipecat.frames.frames import Frame, LLMUpdateSettingsFrame, StartFrame from pipecat.services.settings import LLMSettings from pipecat.turns.types import ProcessFrameResult -from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy +from pipecat.turns.user_stop.external_user_turn_completion_stop_strategy import ( + ExternalUserTurnCompletionStopStrategy, +) from pipecat.turns.user_turn_completion_mixin import UserTurnCompletionConfig -class LLMTurnCompletionUserTurnStopStrategy(BaseUserTurnStopStrategy): - """User turn stop strategy that finalizes only when the LLM agrees. +class LLMTurnCompletionUserTurnStopStrategy(ExternalUserTurnCompletionStopStrategy): + """LLM-gated stop strategy. - This strategy lets another stop strategy (e.g. smart-turn analyzer) - trigger LLM inference, then defers the public ``on_user_turn_stopped`` - event until the LLM emits a ``UserTurnCompletedFrame``. On - ``incomplete_short`` / ``incomplete_long`` markers the - :class:`~pipecat.turns.user_turn_completion_mixin.UserTurnCompletionLLMServiceMixin` - re-prompts the LLM internally and no completion frame is emitted. - - To use this strategy, install it alongside one or more upstream stop - strategies in ``UserTurnStrategies.stop`` and wrap those upstream - strategies with :func:`~pipecat.turns.user_stop.deferred` so they - fire only ``on_user_turn_inference_triggered`` and leave - finalization to this strategy. The aggregator's deprecation path - for ``filter_incomplete_user_turns`` does this rewiring - automatically. - - If the LLM never returns a completion frame (malformed output, - unreachable service, etc.), the controller's - ``user_turn_stop_timeout`` watchdog is the safety net — it fires - ``on_user_turn_stopped`` after no activity for that many seconds. - Tune ``user_turn_stop_timeout`` higher if your LLM regularly takes - longer than the default to respond. - - On ``StartFrame`` the strategy pushes an ``LLMUpdateSettingsFrame`` + Extends + :class:`~pipecat.turns.user_stop.ExternalUserTurnCompletionStopStrategy` + with the LLM-specific setup needed for the marker-based completion + protocol: on ``StartFrame``, pushes an ``LLMUpdateSettingsFrame`` upstream that enables ``filter_incomplete_user_turns`` on the LLM - service and seeds the + and seeds the :class:`~pipecat.turns.user_turn_completion_mixin.UserTurnCompletionConfig`. + + Finalization itself is inherited: when the LLM service's + :class:`~pipecat.turns.user_turn_completion_mixin.UserTurnCompletionLLMServiceMixin` + detects a ``✓`` marker, it broadcasts a + :class:`~pipecat.frames.frames.UserTurnCompletedFrame` and the + base class fires ``on_user_turn_stopped``. On + ``incomplete_short`` / ``incomplete_long`` markers the mixin + re-prompts internally and no completion frame is emitted, so the + public stop event stays deferred. + + Install alongside one or more ``deferred(...)``-wrapped detector + strategies that drive ``on_user_turn_inference_triggered`` but + leave finalization to this strategy. The aggregator's deprecation + path for ``filter_incomplete_user_turns`` does this rewiring + automatically. """ def __init__( @@ -73,13 +66,10 @@ class LLMTurnCompletionUserTurnStopStrategy(BaseUserTurnStopStrategy): return self._config async def process_frame(self, frame: Frame) -> ProcessFrameResult: - """Observe frames to drive the finalization decision.""" + """Configure the LLM on start and delegate completion handling to the base.""" if isinstance(frame, StartFrame): await self._configure_llm() - elif isinstance(frame, UserTurnCompletedFrame): - await self.trigger_user_turn_finalized() - - return ProcessFrameResult.CONTINUE + return await super().process_frame(frame) async def _configure_llm(self): await self.push_frame( From 1fa0310ea87b8db42086401af1d1903115d7765f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 1 May 2026 15:12:11 -0700 Subject: [PATCH 3/9] Add changelog for #4405 --- changelog/4405.added.2.md | 1 + changelog/4405.added.3.md | 1 + changelog/4405.added.4.md | 1 + changelog/4405.added.5.md | 1 + changelog/4405.added.md | 1 + changelog/4405.deprecated.md | 1 + changelog/4405.fixed.md | 1 + 7 files changed, 7 insertions(+) create mode 100644 changelog/4405.added.2.md create mode 100644 changelog/4405.added.3.md create mode 100644 changelog/4405.added.4.md create mode 100644 changelog/4405.added.5.md create mode 100644 changelog/4405.added.md create mode 100644 changelog/4405.deprecated.md create mode 100644 changelog/4405.fixed.md diff --git a/changelog/4405.added.2.md b/changelog/4405.added.2.md new file mode 100644 index 000000000..30f49e576 --- /dev/null +++ b/changelog/4405.added.2.md @@ -0,0 +1 @@ +- Added `LLMTurnCompletionUserTurnStopStrategy` in `pipecat.turns.user_stop`. When installed, the strategy gates `on_user_turn_stopped` on a `UserTurnCompletedFrame` (a new fieldless system frame emitted by any component that can judge turn completeness — e.g. the `UserTurnCompletionLLMServiceMixin` on `✓`). A `finalization_timeout` provides a safety net if no completion frame ever arrives. diff --git a/changelog/4405.added.3.md b/changelog/4405.added.3.md new file mode 100644 index 000000000..8b048a219 --- /dev/null +++ b/changelog/4405.added.3.md @@ -0,0 +1 @@ +- Added `deferred(strategy)` and `DeferredUserTurnStopStrategy` in `pipecat.turns.user_stop`. Wraps a stop strategy so it fires only the inference-triggered event and suppresses `on_user_turn_stopped`, leaving finalization to another strategy in the chain such as `LLMTurnCompletionUserTurnStopStrategy`. diff --git a/changelog/4405.added.4.md b/changelog/4405.added.4.md new file mode 100644 index 000000000..9cc5246ed --- /dev/null +++ b/changelog/4405.added.4.md @@ -0,0 +1 @@ +- Added `llm_completion_user_turn_stop_strategies()` in `pipecat.turns.user_turn_strategies` — a one-call helper that pairs default (or user-supplied) stop strategies, each wrapped with `deferred(...)`, with `LLMTurnCompletionUserTurnStopStrategy`. Common case: `stop=llm_completion_user_turn_stop_strategies()`. diff --git a/changelog/4405.added.5.md b/changelog/4405.added.5.md new file mode 100644 index 000000000..1b8eadea2 --- /dev/null +++ b/changelog/4405.added.5.md @@ -0,0 +1 @@ +- Added `ExternalUserTurnCompletionStopStrategy` in `pipecat.turns.user_stop` — a generic stop strategy that finalizes the user turn whenever a `UserTurnCompletedFrame` arrives, regardless of which component produced it. `LLMTurnCompletionUserTurnStopStrategy` now extends this base; future producers (Flux, custom end-of-turn classifiers, etc.) can use the base directly or subclass it to add producer-specific setup. diff --git a/changelog/4405.added.md b/changelog/4405.added.md new file mode 100644 index 000000000..0cd8859b0 --- /dev/null +++ b/changelog/4405.added.md @@ -0,0 +1 @@ +- Added `on_user_turn_inference_triggered`, a new event on the user turn controller, processor, aggregator and stop strategies that fires when a strategy has enough signal to start LLM inference. By default it fires together with `on_user_turn_stopped`; a gating strategy can fire only the inference-triggered event and defer finalization to a peer. diff --git a/changelog/4405.deprecated.md b/changelog/4405.deprecated.md new file mode 100644 index 000000000..eb7aede5e --- /dev/null +++ b/changelog/4405.deprecated.md @@ -0,0 +1 @@ +- Deprecated `LLMUserAggregatorParams.filter_incomplete_user_turns`. Add `LLMTurnCompletionUserTurnStopStrategy` to `user_turn_strategies.stop` instead (typically via `llm_completion_user_turn_stop_strategies()`). Setting the legacy flag still works for one release: the aggregator emits a `DeprecationWarning`, wraps existing stop strategies with `deferred(...)`, and appends the new finalizer strategy. diff --git a/changelog/4405.fixed.md b/changelog/4405.fixed.md new file mode 100644 index 000000000..65a9d42bf --- /dev/null +++ b/changelog/4405.fixed.md @@ -0,0 +1 @@ +- Fixed `on_user_turn_stopped` firing prematurely when `filter_incomplete_user_turns` was enabled. The event now fires only after the LLM confirms the user turn is complete (`✓`); previously the smart-turn detector's tentative stop was bubbling up before the LLM had a chance to veto it, causing observers, transcript appenders and UI indicators to receive an early — and sometimes duplicated — signal. From d1c8162b0ce490042a66bb2152cf02eb05982410 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 5 May 2026 16:30:37 -0700 Subject: [PATCH 4/9] Route turn-completion markers through LLMMarkerFrame MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add an `LLMMarkerFrame(DataFrame)` for sideband LLM markers that need to be persisted to context but should not flow through the standard text path (TTS, transcript). The frame carries an `append_to_context_immediately` flag so the assistant aggregator can either commit the marker as a stand-alone message (○ / ◐) or merge it with the upcoming aggregation as a prefix on the response (✓). `UserTurnCompletionLLMServiceMixin` now emits `LLMMarkerFrame` instead of pushing the marker as `LLMTextFrame(skip_tts=True)`, which fixes the case where an incomplete-turn marker (○ / ◐) was aggregated by the assistant aggregator but never committed to the context because the assistant turn lifecycle didn't run to completion (no spoken response, no `LLMFullResponseEndFrame`-driven `push_aggregation`). The frame is intentionally generic so other components — STT services with built-in turn signals, end-of-turn classifiers, custom annotations — can use the same mechanism to inject sideband signals into the assistant context. --- src/pipecat/frames/frames.py | 34 +++++++++++ .../aggregators/llm_response_universal.py | 28 +++++++++ .../turns/user_turn_completion_mixin.py | 26 +++++---- tests/test_user_turn_completion_mixin.py | 57 ++++++++++++------- 4 files changed, 111 insertions(+), 34 deletions(-) diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 505c9ca1d..569574842 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -339,6 +339,40 @@ class LLMTextFrame(TextFrame): self.includes_inter_frame_spaces = True +@dataclass +class LLMMarkerFrame(DataFrame): + """Sideband marker emitted by an LLM service. + + A marker is short, structured assistant output that should be + persisted in the conversation context but should not flow through + the standard text path (TTS, transcript). The assistant aggregator + writes the marker to the context so the LLM can self-condition on + prior markers on subsequent turns. + + The primary use today is the ``filter_incomplete_user_turns`` + protocol, where ``UserTurnCompletionLLMServiceMixin`` emits the + turn-completion markers ✓ / ○ / ◐ on every response. The frame is + intentionally generic so other components — STT services with + built-in turn signals, end-of-turn classifiers, custom annotations, + etc. — can use the same mechanism to inject sideband signals into + the assistant context. + + Parameters: + marker: The marker payload (typically a short string such as a + single character). + append_to_context_immediately: If True, the marker is written + to the context as its own standalone assistant message as + soon as it's received. If False, the marker is appended to + the running assistant aggregation and flushed to the + context together with the following text as a single + message (e.g. for the ✓ case the context message ends up + as "✓ "). + """ + + marker: str + append_to_context_immediately: bool = True + + @dataclass class AggregatedTextFrame(TextFrame): """Text frame representing an aggregation of TextFrames. diff --git a/src/pipecat/processors/aggregators/llm_response_universal.py b/src/pipecat/processors/aggregators/llm_response_universal.py index 1f61f33bb..019490929 100644 --- a/src/pipecat/processors/aggregators/llm_response_universal.py +++ b/src/pipecat/processors/aggregators/llm_response_universal.py @@ -44,6 +44,7 @@ from pipecat.frames.frames import ( LLMContextSummaryRequestFrame, LLMFullResponseEndFrame, LLMFullResponseStartFrame, + LLMMarkerFrame, LLMMessagesAppendFrame, LLMMessagesTransformFrame, LLMMessagesUpdateFrame, @@ -1105,6 +1106,8 @@ class LLMAssistantAggregator(LLMContextAggregator): await self._handle_llm_end(frame) elif isinstance(frame, TextFrame): await self._handle_text(frame) + elif isinstance(frame, LLMMarkerFrame): + await self._handle_marker_frame(frame) elif isinstance(frame, LLMThoughtStartFrame): await self._handle_thought_start(frame) elif isinstance(frame, LLMThoughtTextFrame): @@ -1510,6 +1513,31 @@ class LLMAssistantAggregator(LLMContextAggregator): ) ) + async def _handle_marker_frame(self, frame: LLMMarkerFrame): + if frame.append_to_context_immediately: + # Stand-alone marker: write it to the context now as its + # own assistant message. Used when the marker is the entire + # assistant turn — e.g. the ○ / ◐ incomplete-turn signals, + # where the spoken response is suppressed and the marker + # is the only artifact. + self._context.add_message({"role": "assistant", "content": frame.marker}) + await self.push_context_frame() + timestamp_frame = LLMContextAssistantTimestampFrame(timestamp=time_now_iso8601()) + await self.push_frame(timestamp_frame) + return + + # Marker is part of an in-progress assistant response. Append + # it to the running aggregation so `push_aggregation` writes + # marker + text as a single context message — e.g. the ✓ + # complete-turn signal that prefixes the spoken response, + # producing "✓ " in context. Markers are stripped + # from the transcript via + # `_maybe_strip_turn_completion_markers` so consumers see + # clean text. + self._aggregation.append( + TextPartForConcatenation(frame.marker, includes_inter_part_spaces=False) + ) + async def _handle_thought_start(self, frame: LLMThoughtStartFrame): await self._reset_thought_aggregation() self._thought_append_to_context = frame.append_to_context diff --git a/src/pipecat/turns/user_turn_completion_mixin.py b/src/pipecat/turns/user_turn_completion_mixin.py index 606ba9902..50ead0b82 100644 --- a/src/pipecat/turns/user_turn_completion_mixin.py +++ b/src/pipecat/turns/user_turn_completion_mixin.py @@ -22,6 +22,7 @@ from pipecat.frames.frames import ( Frame, InterruptionFrame, LLMFullResponseEndFrame, + LLMMarkerFrame, LLMMessagesAppendFrame, LLMRunFrame, LLMTextFrame, @@ -407,11 +408,11 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor): # explicitly not complete. The re-prompt path is driven by # this mixin's own timeout. - # Push the marker with skip_tts=True so it's added to context (maintains - # conversation continuity per prompt instructions) but not spoken by TTS - frame = LLMTextFrame(self._turn_text_buffer) - frame.skip_tts = True - await self.push_frame(frame) + # Persist the marker to context as a stand-alone assistant + # message via LLMMarkerFrame: the bot produces no spoken + # output for incomplete turns, so the marker is the entire + # context entry. + await self.push_frame(LLMMarkerFrame(marker)) self._turn_text_buffer = "" await self._start_incomplete_timeout(incomplete_type) @@ -424,21 +425,22 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor): # Broadcast that the user turn is complete so a stop strategy # gating finalization on this signal (e.g. # LLMTurnCompletionUserTurnStopStrategy) can fire - # `on_user_turn_stopped`. Must fire before the LLMTextFrame so + # `on_user_turn_stopped`. Must fire before the marker so # downstream consumers see the signal before the response. await self.broadcast_frame(UserTurnCompletedFrame) + # Push the marker as a sideband signal that the assistant + # aggregator will prepend to the upcoming aggregated text, + # so the context message ends up as "✓ ". + await self.push_frame( + LLMMarkerFrame(USER_TURN_COMPLETE_MARKER, append_to_context_immediately=False) + ) + # Split buffer at the marker to handle cases where marker and text # arrive in the same chunk (e.g., "✓ Hello!" from some LLMs) marker_pos = self._turn_text_buffer.index(USER_TURN_COMPLETE_MARKER) marker_end = marker_pos + len(USER_TURN_COMPLETE_MARKER) - # Push the marker with skip_tts=True - adds to context but not spoken - marker_text = self._turn_text_buffer[:marker_end] - frame = LLMTextFrame(marker_text) - frame.skip_tts = True - await self.push_frame(frame) - # Push remaining text after marker as normal speech remaining_text = self._turn_text_buffer[marker_end:] if remaining_text: diff --git a/tests/test_user_turn_completion_mixin.py b/tests/test_user_turn_completion_mixin.py index 66a48b51f..0947d0a3c 100644 --- a/tests/test_user_turn_completion_mixin.py +++ b/tests/test_user_turn_completion_mixin.py @@ -8,7 +8,12 @@ import unittest import unittest.mock from unittest.mock import AsyncMock -from pipecat.frames.frames import LLMFullResponseEndFrame, LLMTextFrame, UserTurnCompletedFrame +from pipecat.frames.frames import ( + LLMFullResponseEndFrame, + LLMMarkerFrame, + LLMTextFrame, + UserTurnCompletedFrame, +) from pipecat.processors.frame_processor import FrameProcessor from pipecat.services.llm_service import LLMService from pipecat.services.settings import LLMSettings @@ -44,25 +49,24 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase # Simulate LLM generating: "✓ Hello there!" await processor._push_turn_text(f"{USER_TURN_COMPLETE_MARKER} Hello there!") - # Two LLMTextFrames: marker (skip_tts) and content (normal). The - # broadcast also pushes UserTurnCompletedFrame upstream + downstream. + # The marker rides as LLMMarkerFrame(append_to_context_immediately=False); + # only the spoken text is pushed as an LLMTextFrame. text_frames = [f for f in pushed_frames if isinstance(f, LLMTextFrame)] - self.assertEqual(len(text_frames), 2) + self.assertEqual(len(text_frames), 1) + self.assertEqual(text_frames[0].text, "Hello there!") + self.assertFalse(text_frames[0].skip_tts) - # First text frame should be the marker with skip_tts=True - self.assertEqual(text_frames[0].text, USER_TURN_COMPLETE_MARKER) - self.assertTrue(text_frames[0].skip_tts) - - # Second text frame should be the actual text without skip_tts - self.assertEqual(text_frames[1].text, "Hello there!") - self.assertFalse(text_frames[1].skip_tts) + marker_frames = [f for f in pushed_frames if isinstance(f, LLMMarkerFrame)] + self.assertEqual(len(marker_frames), 1) + self.assertEqual(marker_frames[0].marker, USER_TURN_COMPLETE_MARKER) + self.assertFalse(marker_frames[0].append_to_context_immediately) # UserTurnCompletedFrame broadcast in both directions. completed = [f for f in pushed_frames if isinstance(f, UserTurnCompletedFrame)] self.assertEqual(len(completed), 2) async def test_incomplete_short_marker_suppresses_text(self): - """Test that ○ marker suppresses text with skip_tts and emits no completed frame.""" + """Test that ○ marker suppresses text and is emitted as a stand-alone marker frame.""" processor = MockProcessor() pushed_frames = [] @@ -74,17 +78,21 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase await processor._push_turn_text(USER_TURN_INCOMPLETE_SHORT_MARKER) + # No LLMTextFrame: response is suppressed. text_frames = [f for f in pushed_frames if isinstance(f, LLMTextFrame)] - self.assertEqual(len(text_frames), 1) - self.assertEqual(text_frames[0].text, USER_TURN_INCOMPLETE_SHORT_MARKER) - self.assertTrue(text_frames[0].skip_tts) + self.assertEqual(len(text_frames), 0) + + marker_frames = [f for f in pushed_frames if isinstance(f, LLMMarkerFrame)] + self.assertEqual(len(marker_frames), 1) + self.assertEqual(marker_frames[0].marker, USER_TURN_INCOMPLETE_SHORT_MARKER) + self.assertTrue(marker_frames[0].append_to_context_immediately) # Incomplete markers do not emit UserTurnCompletedFrame. completed = [f for f in pushed_frames if isinstance(f, UserTurnCompletedFrame)] self.assertEqual(len(completed), 0) async def test_incomplete_long_marker_suppresses_text(self): - """Test that ◐ marker suppresses text with skip_tts and emits no completed frame.""" + """Test that ◐ marker suppresses text and is emitted as a stand-alone marker frame.""" processor = MockProcessor() pushed_frames = [] @@ -97,9 +105,12 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase await processor._push_turn_text(USER_TURN_INCOMPLETE_LONG_MARKER) text_frames = [f for f in pushed_frames if isinstance(f, LLMTextFrame)] - self.assertEqual(len(text_frames), 1) - self.assertEqual(text_frames[0].text, USER_TURN_INCOMPLETE_LONG_MARKER) - self.assertTrue(text_frames[0].skip_tts) + self.assertEqual(len(text_frames), 0) + + marker_frames = [f for f in pushed_frames if isinstance(f, LLMMarkerFrame)] + self.assertEqual(len(marker_frames), 1) + self.assertEqual(marker_frames[0].marker, USER_TURN_INCOMPLETE_LONG_MARKER) + self.assertTrue(marker_frames[0].append_to_context_immediately) completed = [f for f in pushed_frames if isinstance(f, UserTurnCompletedFrame)] self.assertEqual(len(completed), 0) @@ -123,10 +134,12 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase # Now send the complete marker await processor._push_turn_text(f" {USER_TURN_COMPLETE_MARKER} How are you?") - # Two LLMTextFrames pushed (marker + content) plus the - # UserTurnCompletedFrame broadcast. + # One LLMTextFrame for the spoken portion; one LLMMarkerFrame for + # the marker; UserTurnCompletedFrame broadcast in both directions. text_frames = [f for f in pushed_frames if isinstance(f, LLMTextFrame)] - self.assertEqual(len(text_frames), 2) + self.assertEqual(len(text_frames), 1) + marker_frames = [f for f in pushed_frames if isinstance(f, LLMMarkerFrame)] + self.assertEqual(len(marker_frames), 1) async def test_turn_state_reset_after_llm_full_response_end_frame(self): """Test that _turn_complete_found is reset when LLMFullResponseEndFrame is pushed.""" From e3e90d38aa995c054c54fd23bd30d1c4df81fddc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 5 May 2026 16:31:56 -0700 Subject: [PATCH 5/9] Preserve full user transcript across multiple inferences in one turn MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a stop-strategy chain splits inference-triggered from finalization (e.g. `LLMTurnCompletionUserTurnStopStrategy` gating a deferred detector), more than one inference can fire inside a single user turn — each adds the new transcription segment to the context. Previously each inference overwrote `_pending_user_turn_aggregation`, so the eventual `on_user_turn_stopped` event surfaced only the segment from the last inference, dropping anything the user said before it. Concatenate each segment into `_full_user_turn_aggregation` instead of overwriting, and combine that running buffer with any post-final- inference segment when emitting the public event. --- .../aggregators/llm_response_universal.py | 58 +++++++++------- tests/test_context_aggregators_universal.py | 69 +++++++++++++++++++ 2 files changed, 103 insertions(+), 24 deletions(-) diff --git a/src/pipecat/processors/aggregators/llm_response_universal.py b/src/pipecat/processors/aggregators/llm_response_universal.py index 019490929..f136b848d 100644 --- a/src/pipecat/processors/aggregators/llm_response_universal.py +++ b/src/pipecat/processors/aggregators/llm_response_universal.py @@ -588,10 +588,14 @@ class LLMUserAggregator(LLMContextAggregator): self._user_is_muted = False self._user_turn_start_timestamp = "" - # Aggregation captured at inference-trigger time and surfaced again - # in `on_user_turn_stopped`. Set to None when no inference-triggered - # event has fired since the last finalization. - self._pending_user_turn_aggregation: str | None = None + # Full transcript across the user turn. Each + # `_on_user_turn_inference_triggered` push captures only the + # new segment since the previous push (push_aggregation resets + # `_aggregation` after writing to context); we accumulate those + # segments here so the eventual `on_user_turn_stopped` event + # surfaces the full turn transcript even when several + # inferences fire before finalization. + self._full_user_turn_aggregation: str | None = None self._user_turn_controller = UserTurnController( user_turn_strategies=user_turn_strategies, @@ -862,7 +866,7 @@ class LLMUserAggregator(LLMContextAggregator): logger.debug(f"{self}: User started speaking (strategy: {strategy})") self._user_turn_start_timestamp = time_now_iso8601() - self._pending_user_turn_aggregation = None + self._full_user_turn_aggregation = None if params.enable_user_speaking_frames: await self.broadcast_frame(UserStartedSpeakingFrame) @@ -881,12 +885,20 @@ class LLMUserAggregator(LLMContextAggregator): ): logger.debug(f"{self}: User turn inference triggered (strategy: {strategy})") - # Push aggregation now: this writes the user message to the context - # and pushes LLMContextFrame, which is what kicks LLM inference. - # `on_user_turn_stopped` later fires when the turn is semantically - # final and surfaces the aggregated message via the public event. - aggregation = await self.push_aggregation() - self._pending_user_turn_aggregation = aggregation + # Push aggregation now: this writes the user message segment to + # the context and emits LLMContextFrame, which kicks LLM + # inference. Concatenate the segment into + # `_full_user_turn_aggregation` so multiple inferences in the + # same turn don't lose earlier segments from the eventual + # `on_user_turn_stopped` event. + segment = await self.push_aggregation() + if segment: + if self._full_user_turn_aggregation: + self._full_user_turn_aggregation = ( + f"{self._full_user_turn_aggregation} {segment}".strip() + ) + else: + self._full_user_turn_aggregation = segment await self._call_event_handler("on_user_turn_inference_triggered", strategy) @@ -924,27 +936,25 @@ class LLMUserAggregator(LLMContextAggregator): ): """Maybe emit user turn stopped event. - The aggregation has typically already been pushed at - inference-trigger time and is cached in - ``self._pending_user_turn_aggregation``. Any aggregation that has - accumulated since the last inference-trigger (e.g. transcriptions - that arrived between inference trigger and finalization) is flushed - here so end-of-turn content is never lost. + Earlier inference triggers in the same turn have already pushed + their segments to the context and accumulated them into + ``self._full_user_turn_aggregation``. Any aggregation that + arrived after the last inference trigger is flushed here so + end-of-turn content is never lost from the public event. Args: strategy: The strategy that triggered the turn stop. on_session_end: If True, only emit if there's unemitted content (avoids duplicate events when session ends). """ - aggregation = await self.push_aggregation() - previous_aggregation = self._pending_user_turn_aggregation - self._pending_user_turn_aggregation = None + segment = await self.push_aggregation() + full_aggregation = self._full_user_turn_aggregation + self._full_user_turn_aggregation = None - content = None - if aggregation and previous_aggregation: - content = f"{previous_aggregation} {aggregation}".strip() + if segment and full_aggregation: + content = f"{full_aggregation} {segment}".strip() else: - content = previous_aggregation or aggregation + content = full_aggregation or segment if not on_session_end or content: message = UserTurnStoppedMessage( diff --git a/tests/test_context_aggregators_universal.py b/tests/test_context_aggregators_universal.py index 01ca70c80..d16a34c43 100644 --- a/tests/test_context_aggregators_universal.py +++ b/tests/test_context_aggregators_universal.py @@ -697,6 +697,75 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase): self.assertEqual(events, ["inference_triggered", "stopped"]) + async def test_multiple_inferences_in_one_turn_preserve_aggregation(self): + """Two inference triggers before finalization should preserve the full user transcript. + + When the LLM marks the first inference incomplete (○ / ◐) and the + user keeps speaking, the deferred upstream strategy fires a + second inference. Both the public ``on_user_turn_stopped`` event + and the conversation context should reflect the full user + utterance, not just the segment from the last inference. + """ + from pipecat.frames.frames import UserTurnCompletedFrame + from pipecat.turns.user_stop import LLMTurnCompletionUserTurnStopStrategy, deferred + + gating = LLMTurnCompletionUserTurnStopStrategy() + upstream = deferred( + SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT) + ) + context = LLMContext() + user_aggregator = LLMUserAggregator( + context, + params=LLMUserAggregatorParams( + user_turn_strategies=UserTurnStrategies(stop=[upstream, gating]), + ), + ) + + inference_count = 0 + stop_message = None + + @user_aggregator.event_handler("on_user_turn_inference_triggered") + async def on_inference_triggered(aggregator, strategy): + nonlocal inference_count + inference_count += 1 + + @user_aggregator.event_handler("on_user_turn_stopped") + async def on_stopped(aggregator, strategy, message): + nonlocal stop_message + stop_message = message + + pipeline = Pipeline([user_aggregator]) + + frames_to_send = [ + VADUserStartedSpeakingFrame(), + TranscriptionFrame(text="I'm thinking", user_id="", timestamp="now"), + SleepFrame(), + VADUserStoppedSpeakingFrame(), + SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.1), + # First inference fired here. Imagine the LLM returned ○; + # the turn is not yet finalized, so the user keeps talking. + VADUserStartedSpeakingFrame(), + TranscriptionFrame(text="about pizza", user_id="", timestamp="now"), + SleepFrame(), + VADUserStoppedSpeakingFrame(), + SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.1), + # Second inference fired here. Now the LLM returns ✓ and the + # turn finalizes via UserTurnCompletedFrame. + UserTurnCompletedFrame(), + SleepFrame(), + ] + await run_test(pipeline, frames_to_send=frames_to_send) + + self.assertEqual(inference_count, 2) + self.assertIsNotNone(stop_message) + # The public event should report the full transcript, even + # though each inference push only writes its own segment to + # the context. + self.assertEqual(stop_message.content, "I'm thinking about pizza") + + user_messages = [m for m in context.get_messages() if m.get("role") == "user"] + self.assertEqual([m["content"] for m in user_messages], ["I'm thinking", "about pizza"]) + class TestLLMAssistantAggregator(unittest.IsolatedAsyncioTestCase): async def test_empty(self): From 952dddca8b2b796a7ba4d6db9a02d2a3e975f921 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 6 May 2026 11:12:32 -0700 Subject: [PATCH 6/9] Replace llm_completion_user_turn_stop_strategies() with FilterIncompleteUserTurnStrategies Wrap the detector chain with `deferred(...)` and append the LLM completion gate via a `UserTurnStrategies` specialization rather than a free-standing helper, mirroring the existing `ExternalUserTurnStrategies` pattern. The class lives next to other strategy containers in `pipecat.turns.user_turn_strategies`, so users discover it where they're already configuring `user_turn_strategies`. The deprecated `filter_incomplete_user_turns` flag now rewires through `FilterIncompleteUserTurnStrategies` under the hood, keeping the migration path identical to before. `deferred(...)` stays public as the explicit escape hatch for non-default compositions. --- changelog/4405.added.4.md | 2 +- changelog/4405.deprecated.md | 2 +- ...turn-management-filter-incomplete-turns.py | 32 +++---- .../aggregators/llm_response_universal.py | 45 +++++----- src/pipecat/turns/user_turn_strategies.py | 90 +++++++++++-------- tests/test_context_aggregators_universal.py | 13 ++- 6 files changed, 94 insertions(+), 90 deletions(-) diff --git a/changelog/4405.added.4.md b/changelog/4405.added.4.md index 9cc5246ed..5077f0416 100644 --- a/changelog/4405.added.4.md +++ b/changelog/4405.added.4.md @@ -1 +1 @@ -- Added `llm_completion_user_turn_stop_strategies()` in `pipecat.turns.user_turn_strategies` — a one-call helper that pairs default (or user-supplied) stop strategies, each wrapped with `deferred(...)`, with `LLMTurnCompletionUserTurnStopStrategy`. Common case: `stop=llm_completion_user_turn_stop_strategies()`. +- Added `FilterIncompleteUserTurnStrategies` in `pipecat.turns.user_turn_strategies` — a `UserTurnStrategies` specialization that wraps the detector chain with `deferred(...)` and appends `LLMTurnCompletionUserTurnStopStrategy` as the finalizer. Common case: `user_turn_strategies=FilterIncompleteUserTurnStrategies()`. Pass `config=UserTurnCompletionConfig(...)` to customize timeouts and prompts. diff --git a/changelog/4405.deprecated.md b/changelog/4405.deprecated.md index eb7aede5e..3040f7bd8 100644 --- a/changelog/4405.deprecated.md +++ b/changelog/4405.deprecated.md @@ -1 +1 @@ -- Deprecated `LLMUserAggregatorParams.filter_incomplete_user_turns`. Add `LLMTurnCompletionUserTurnStopStrategy` to `user_turn_strategies.stop` instead (typically via `llm_completion_user_turn_stop_strategies()`). Setting the legacy flag still works for one release: the aggregator emits a `DeprecationWarning`, wraps existing stop strategies with `deferred(...)`, and appends the new finalizer strategy. +- Deprecated `LLMUserAggregatorParams.filter_incomplete_user_turns`. Use `user_turn_strategies=FilterIncompleteUserTurnStrategies()` (or add `LLMTurnCompletionUserTurnStopStrategy` to a custom `user_turn_strategies.stop`) instead. Setting the legacy flag still works for one release: the aggregator emits a `DeprecationWarning` and rewires the strategies as if you had passed `FilterIncompleteUserTurnStrategies` directly. diff --git a/examples/turn-management/turn-management-filter-incomplete-turns.py b/examples/turn-management/turn-management-filter-incomplete-turns.py index 72037943b..00066b6c8 100644 --- a/examples/turn-management/turn-management-filter-incomplete-turns.py +++ b/examples/turn-management/turn-management-filter-incomplete-turns.py @@ -41,10 +41,7 @@ from pipecat.services.openai.llm import OpenAILLMService from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams -from pipecat.turns.user_turn_strategies import ( - UserTurnStrategies, - llm_completion_user_turn_stop_strategies, -) +from pipecat.turns.user_turn_strategies import FilterIncompleteUserTurnStrategies load_dotenv(override=True) @@ -87,11 +84,11 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ) context = LLMContext() - # `llm_completion_user_turn_stop_strategies()` pairs the default - # stop strategies with `LLMTurnCompletionUserTurnStopStrategy`: - # those strategies trigger LLM inference but the public - # `on_user_turn_stopped` event fires only when the LLM confirms ✓. - # The LLM marks each response with one of: + # `FilterIncompleteUserTurnStrategies` pairs the default detector + # chain with `LLMTurnCompletionUserTurnStopStrategy`: detectors + # trigger LLM inference but the public `on_user_turn_stopped` event + # fires only when the LLM confirms ✓. The LLM marks each response + # with one of: # ✓ = complete (respond normally) # ○ = incomplete short (wait 5s, then prompt) # ◐ = incomplete long (wait 15s, then prompt) @@ -99,17 +96,14 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): context, user_params=LLMUserAggregatorParams( vad_analyzer=SileroVADAnalyzer(), - user_turn_strategies=UserTurnStrategies( - stop=llm_completion_user_turn_stop_strategies(), + user_turn_strategies=FilterIncompleteUserTurnStrategies( # Optional: customize turn completion behavior - # stop=llm_completion_user_turn_stop_strategies( - # config=UserTurnCompletionConfig( - # incomplete_short_timeout=5.0, - # incomplete_long_timeout=15.0, - # incomplete_short_prompt="Custom prompt...", - # incomplete_long_prompt="Custom prompt...", - # instructions="Custom turn completion instructions...", - # ), + # config=UserTurnCompletionConfig( + # incomplete_short_timeout=5.0, + # incomplete_long_timeout=15.0, + # incomplete_short_prompt="Custom prompt...", + # incomplete_long_prompt="Custom prompt...", + # instructions="Custom turn completion instructions...", # ), ), ), diff --git a/src/pipecat/processors/aggregators/llm_response_universal.py b/src/pipecat/processors/aggregators/llm_response_universal.py index f136b848d..0e5b9cef4 100644 --- a/src/pipecat/processors/aggregators/llm_response_universal.py +++ b/src/pipecat/processors/aggregators/llm_response_universal.py @@ -85,7 +85,10 @@ from pipecat.turns.user_start import BaseUserTurnStartStrategy, UserTurnStartedP from pipecat.turns.user_stop import BaseUserTurnStopStrategy, UserTurnStoppedParams from pipecat.turns.user_turn_completion_mixin import UserTurnCompletionConfig from pipecat.turns.user_turn_controller import UserTurnController -from pipecat.turns.user_turn_strategies import UserTurnStrategies +from pipecat.turns.user_turn_strategies import ( + FilterIncompleteUserTurnStrategies, + UserTurnStrategies, +) from pipecat.utils.context.llm_context_summarization import ( LLMAutoContextSummarizationConfig, LLMContextSummarizationConfig, @@ -123,18 +126,18 @@ class LLMUserAggregatorParams: has been idle (not speaking) for this duration. Set to 0 to disable idle detection. vad_analyzer: Voice Activity Detection analyzer instance. - filter_incomplete_user_turns: [DEPRECATED] Add - :class:`~pipecat.turns.user_stop.LLMTurnCompletionUserTurnStopStrategy` - to ``user_turn_strategies.stop`` instead. When enabled, the LLM - outputs a turn-completion marker at the start of each response: - ✓ (complete), ○ (incomplete short), or ◐ (incomplete long). - Incomplete responses are suppressed and timeouts trigger - re-prompting. + filter_incomplete_user_turns: [DEPRECATED] Use + ``user_turn_strategies=FilterIncompleteUserTurnStrategies()`` + instead. When enabled, the LLM outputs a turn-completion + marker at the start of each response: ✓ (complete), ○ + (incomplete short), or ◐ (incomplete long). Incomplete + responses are suppressed and timeouts trigger re-prompting. user_turn_completion_config: [DEPRECATED] Configuration for turn completion behavior including custom instructions, timeouts, and prompts. Only used when filter_incomplete_user_turns is True (deprecated path) — for the new strategy-based API, pass the config - directly to ``LLMTurnCompletionUserTurnStopStrategy(config=...)``. + directly to ``FilterIncompleteUserTurnStrategies(config=...)``. + """ add_tool_change_messages: bool = False @@ -151,16 +154,14 @@ class LLMUserAggregatorParams: if self.filter_incomplete_user_turns: warnings.warn( "LLMUserAggregatorParams.filter_incomplete_user_turns is deprecated. " - "Add LLMTurnCompletionUserTurnStopStrategy to " - "user_turn_strategies.stop instead.", + "Use user_turn_strategies=FilterIncompleteUserTurnStrategies() instead.", DeprecationWarning, stacklevel=2, ) if self.user_turn_completion_config: warnings.warn( "LLMUserAggregatorParams.user_turn_completion_config is deprecated. " - "Add LLMTurnCompletionUserTurnStopStrategy to " - "user_turn_strategies.stop instead.", + "Use user_turn_strategies=FilterIncompleteUserTurnStrategies() instead.", DeprecationWarning, stacklevel=2, ) @@ -570,21 +571,15 @@ class LLMUserAggregator(LLMContextAggregator): user_turn_strategies = self._params.user_turn_strategies or UserTurnStrategies() # Deprecated path: translate filter_incomplete_user_turns into - # wrapping pre-existing stop strategies with deferred() and - # appending LLMTurnCompletionUserTurnStopStrategy. The + # the equivalent FilterIncompleteUserTurnStrategies wiring. The # DeprecationWarning is emitted in LLMUserAggregatorParams.__post_init__. if self._params.filter_incomplete_user_turns: - from pipecat.turns.user_stop import ( - LLMTurnCompletionUserTurnStopStrategy, - deferred, + user_turn_strategies = FilterIncompleteUserTurnStrategies( + start=user_turn_strategies.start, + stop=user_turn_strategies.stop, + config=self._params.user_turn_completion_config, ) - - existing_stop = list(user_turn_strategies.stop or []) - user_turn_strategies.stop = [deferred(s) for s in existing_stop] + [ - LLMTurnCompletionUserTurnStopStrategy( - config=self._params.user_turn_completion_config - ) - ] + self._params.user_turn_strategies = user_turn_strategies self._user_is_muted = False self._user_turn_start_timestamp = "" diff --git a/src/pipecat/turns/user_turn_strategies.py b/src/pipecat/turns/user_turn_strategies.py index 56c7d9e9f..eee0b6ff6 100644 --- a/src/pipecat/turns/user_turn_strategies.py +++ b/src/pipecat/turns/user_turn_strategies.py @@ -51,42 +51,6 @@ def default_user_turn_stop_strategies() -> list[BaseUserTurnStopStrategy]: return [TurnAnalyzerUserTurnStopStrategy(turn_analyzer=LocalSmartTurnAnalyzerV3())] -def llm_completion_user_turn_stop_strategies( - strategies: list[BaseUserTurnStopStrategy] | None = None, - *, - config: UserTurnCompletionConfig | None = None, -) -> list[BaseUserTurnStopStrategy]: - """Build a stop-strategy list gated on the LLM's turn-completion verdict. - - Wraps ``strategies`` with :func:`deferred` so they trigger inference - but don't fire ``on_user_turn_stopped`` themselves, then appends - :class:`~pipecat.turns.user_stop.LLMTurnCompletionUserTurnStopStrategy` - as the finalizer. Use as the ``stop`` field of a - :class:`UserTurnStrategies`:: - - UserTurnStrategies( - stop=llm_completion_user_turn_stop_strategies(), - ) - - Args: - strategies: Stop strategies that should drive inference. If - None, uses :func:`default_user_turn_stop_strategies`. - config: Optional configuration applied to the LLM via the - ``filter_incomplete_user_turns`` setting. Customizes the - turn-completion instructions, incomplete-turn timeouts, and - re-prompts. - - Returns: - ``[deferred(s) for s in strategies] + - [LLMTurnCompletionUserTurnStopStrategy(config=config)]``. - """ - strategies = strategies if strategies is not None else default_user_turn_stop_strategies() - return [ - *(deferred(s) for s in strategies), - LLMTurnCompletionUserTurnStopStrategy(config=config), - ] - - @dataclass class UserTurnStrategies: """Container for user turn start and stop strategies. @@ -134,3 +98,57 @@ class ExternalUserTurnStrategies(UserTurnStrategies): def __post_init__(self): self.start = [ExternalUserTurnStartStrategy()] self.stop = [ExternalUserTurnStopStrategy()] + + +@dataclass +class FilterIncompleteUserTurnStrategies(UserTurnStrategies): + """Stop strategies gated on the LLM's turn-completion verdict. + + The LLM is asked to begin every response with one of three markers: + ✓ (complete), ○ (incomplete short), or ◐ (incomplete long). Only ✓ + finalizes the user turn; ○ / ◐ keep the turn open so the user can + continue speaking and the LLM can re-evaluate later. + + Configuring strategies this way preserves the existing detector + chain (defaults or user-supplied) for inference triggering and + appends :class:`~pipecat.turns.user_stop.LLMTurnCompletionUserTurnStopStrategy` + as the finalizer. The detector strategies are wrapped with + :func:`~pipecat.turns.user_stop.deferred` automatically so they fire + only ``on_user_turn_inference_triggered`` and leave finalization to + the LLM gate. + + Parameters: + config: Optional configuration applied to the LLM via the + ``filter_incomplete_user_turns`` setting. Customizes the + turn-completion instructions, incomplete-turn timeouts, and + re-prompts. If None, defaults from + :class:`~pipecat.turns.user_turn_completion_mixin.UserTurnCompletionConfig` + are used. + + Example:: + + user_turn_strategies=FilterIncompleteUserTurnStrategies() + + # Custom detector chain: + user_turn_strategies=FilterIncompleteUserTurnStrategies( + stop=[SpeechTimeoutUserTurnStopStrategy(...)], + ) + + # Custom completion config: + user_turn_strategies=FilterIncompleteUserTurnStrategies( + config=UserTurnCompletionConfig( + incomplete_short_timeout=5.0, + incomplete_long_timeout=15.0, + ), + ) + """ + + config: UserTurnCompletionConfig | None = None + + def __post_init__(self): + super().__post_init__() + # Defer the detector chain so it only fires inference-triggered, + # then append the LLM gate as the sole finalizer. + gated: list[BaseUserTurnStopStrategy] = [deferred(s) for s in self.stop or []] + gated.append(LLMTurnCompletionUserTurnStopStrategy(config=self.config)) + self.stop = gated diff --git a/tests/test_context_aggregators_universal.py b/tests/test_context_aggregators_universal.py index d16a34c43..da3023b9b 100644 --- a/tests/test_context_aggregators_universal.py +++ b/tests/test_context_aggregators_universal.py @@ -64,7 +64,10 @@ from pipecat.turns.user_mute import ( MuteUntilFirstBotCompleteUserMuteStrategy, ) from pipecat.turns.user_stop import SpeechTimeoutUserTurnStopStrategy -from pipecat.turns.user_turn_strategies import UserTurnStrategies +from pipecat.turns.user_turn_strategies import ( + FilterIncompleteUserTurnStrategies, + UserTurnStrategies, +) from pipecat.utils.text.base_text_aggregator import AggregationType USER_TURN_STOP_TIMEOUT = 0.2 @@ -179,15 +182,9 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase): assert context.messages[0]["content"] == "Hi there!" async def test_llm_messages_update_does_not_inject_turn_completion_into_context(self): - from pipecat.turns.user_turn_strategies import ( - llm_completion_user_turn_stop_strategies, - ) - context = LLMContext() params = LLMUserAggregatorParams( - user_turn_strategies=UserTurnStrategies( - stop=llm_completion_user_turn_stop_strategies(), - ), + user_turn_strategies=FilterIncompleteUserTurnStrategies(), ) pipeline = Pipeline([LLMUserAggregator(context, params=params)]) From b78cecf7b2aba72b0fe7231257803e3dd9bf500c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 6 May 2026 11:17:56 -0700 Subject: [PATCH 7/9] Rename UserTurnCompletedFrame to UserTurnInferenceCompletedFrame MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The old name overlapped semantically with `UserStoppedSpeakingFrame`: both could be read as "the user's turn is done." They're at different layers — `UserStoppedSpeakingFrame` is the acoustic stop signal, while this frame is the post-judgment "inference about the turn is now complete (turn is semantically final)" signal emitted by the LLM mixin (on ✓), an end-of-turn classifier, or a custom producer. The new name pairs naturally with the existing `on_user_turn_inference_triggered` event vocabulary and removes the ambiguity with `UserStoppedSpeakingFrame`. --- changelog/4405.added.2.md | 2 +- changelog/4405.added.5.md | 2 +- src/pipecat/frames/frames.py | 2 +- ...xternal_user_turn_completion_stop_strategy.py | 14 +++++++------- ...lm_turn_completion_user_turn_stop_strategy.py | 2 +- src/pipecat/turns/user_turn_completion_mixin.py | 6 +++--- tests/test_context_aggregators_universal.py | 16 ++++++++-------- tests/test_user_turn_completion_mixin.py | 14 +++++++------- 8 files changed, 29 insertions(+), 29 deletions(-) diff --git a/changelog/4405.added.2.md b/changelog/4405.added.2.md index 30f49e576..30e86f904 100644 --- a/changelog/4405.added.2.md +++ b/changelog/4405.added.2.md @@ -1 +1 @@ -- Added `LLMTurnCompletionUserTurnStopStrategy` in `pipecat.turns.user_stop`. When installed, the strategy gates `on_user_turn_stopped` on a `UserTurnCompletedFrame` (a new fieldless system frame emitted by any component that can judge turn completeness — e.g. the `UserTurnCompletionLLMServiceMixin` on `✓`). A `finalization_timeout` provides a safety net if no completion frame ever arrives. +- Added `LLMTurnCompletionUserTurnStopStrategy` in `pipecat.turns.user_stop`. When installed, the strategy gates `on_user_turn_stopped` on a `UserTurnInferenceCompletedFrame` (a new fieldless system frame emitted by any component that can judge turn completeness — e.g. the `UserTurnCompletionLLMServiceMixin` on `✓`). A `finalization_timeout` provides a safety net if no completion frame ever arrives. diff --git a/changelog/4405.added.5.md b/changelog/4405.added.5.md index 1b8eadea2..508084e30 100644 --- a/changelog/4405.added.5.md +++ b/changelog/4405.added.5.md @@ -1 +1 @@ -- Added `ExternalUserTurnCompletionStopStrategy` in `pipecat.turns.user_stop` — a generic stop strategy that finalizes the user turn whenever a `UserTurnCompletedFrame` arrives, regardless of which component produced it. `LLMTurnCompletionUserTurnStopStrategy` now extends this base; future producers (Flux, custom end-of-turn classifiers, etc.) can use the base directly or subclass it to add producer-specific setup. +- Added `ExternalUserTurnCompletionStopStrategy` in `pipecat.turns.user_stop` — a generic stop strategy that finalizes the user turn whenever a `UserTurnInferenceCompletedFrame` arrives, regardless of which component produced it. `LLMTurnCompletionUserTurnStopStrategy` now extends this base; future producers (Flux, custom end-of-turn classifiers, etc.) can use the base directly or subclass it to add producer-specific setup. diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 569574842..863da578e 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -1005,7 +1005,7 @@ class UserSpeakingFrame(SystemFrame): @dataclass -class UserTurnCompletedFrame(SystemFrame): +class UserTurnInferenceCompletedFrame(SystemFrame): """Frame indicating that the user turn is semantically complete. Emitted by any component that can judge conversational turn diff --git a/src/pipecat/turns/user_stop/external_user_turn_completion_stop_strategy.py b/src/pipecat/turns/user_stop/external_user_turn_completion_stop_strategy.py index c132a5db8..3850c234c 100644 --- a/src/pipecat/turns/user_stop/external_user_turn_completion_stop_strategy.py +++ b/src/pipecat/turns/user_stop/external_user_turn_completion_stop_strategy.py @@ -4,21 +4,21 @@ # SPDX-License-Identifier: BSD 2-Clause License # -"""User turn stop strategy that finalizes on ``UserTurnCompletedFrame``.""" +"""User turn stop strategy that finalizes on ``UserTurnInferenceCompletedFrame``.""" -from pipecat.frames.frames import Frame, UserTurnCompletedFrame +from pipecat.frames.frames import Frame, UserTurnInferenceCompletedFrame from pipecat.turns.types import ProcessFrameResult from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy class ExternalUserTurnCompletionStopStrategy(BaseUserTurnStopStrategy): - """Finalize the user turn whenever a ``UserTurnCompletedFrame`` arrives. + """Finalize the user turn whenever a ``UserTurnInferenceCompletedFrame`` arrives. Generic stop strategy for pipelines where some external component (LLM with completion markers, STT with built-in turn detection, a dedicated end-of-turn classifier, custom user code, etc.) judges when a turn is semantically complete and emits - :class:`~pipecat.frames.frames.UserTurnCompletedFrame`. + :class:`~pipecat.frames.frames.UserTurnInferenceCompletedFrame`. Pair this with one or more ``deferred(...)``-wrapped detector strategies that drive ``on_user_turn_inference_triggered`` but @@ -34,14 +34,14 @@ class ExternalUserTurnCompletionStopStrategy(BaseUserTurnStopStrategy): instead, which additionally pushes the ``LLMUpdateSettingsFrame`` that enables the marker protocol on the LLM. - If the producer never emits ``UserTurnCompletedFrame``, the + If the producer never emits ``UserTurnInferenceCompletedFrame``, the controller's ``user_turn_stop_timeout`` watchdog finalizes the turn after no activity. Tune that timeout if your producer can take longer than the default to respond. """ async def process_frame(self, frame: Frame) -> ProcessFrameResult: - """Fire ``on_user_turn_stopped`` whenever ``UserTurnCompletedFrame`` is seen.""" - if isinstance(frame, UserTurnCompletedFrame): + """Fire ``on_user_turn_stopped`` whenever ``UserTurnInferenceCompletedFrame`` is seen.""" + if isinstance(frame, UserTurnInferenceCompletedFrame): await self.trigger_user_turn_finalized() return ProcessFrameResult.CONTINUE diff --git a/src/pipecat/turns/user_stop/llm_turn_completion_user_turn_stop_strategy.py b/src/pipecat/turns/user_stop/llm_turn_completion_user_turn_stop_strategy.py index 08dea6df1..0f1a97f20 100644 --- a/src/pipecat/turns/user_stop/llm_turn_completion_user_turn_stop_strategy.py +++ b/src/pipecat/turns/user_stop/llm_turn_completion_user_turn_stop_strategy.py @@ -29,7 +29,7 @@ class LLMTurnCompletionUserTurnStopStrategy(ExternalUserTurnCompletionStopStrate Finalization itself is inherited: when the LLM service's :class:`~pipecat.turns.user_turn_completion_mixin.UserTurnCompletionLLMServiceMixin` detects a ``✓`` marker, it broadcasts a - :class:`~pipecat.frames.frames.UserTurnCompletedFrame` and the + :class:`~pipecat.frames.frames.UserTurnInferenceCompletedFrame` and the base class fires ``on_user_turn_stopped``. On ``incomplete_short`` / ``incomplete_long`` markers the mixin re-prompts internally and no completion frame is emitted, so the diff --git a/src/pipecat/turns/user_turn_completion_mixin.py b/src/pipecat/turns/user_turn_completion_mixin.py index 50ead0b82..cf600466d 100644 --- a/src/pipecat/turns/user_turn_completion_mixin.py +++ b/src/pipecat/turns/user_turn_completion_mixin.py @@ -26,7 +26,7 @@ from pipecat.frames.frames import ( LLMMessagesAppendFrame, LLMRunFrame, LLMTextFrame, - UserTurnCompletedFrame, + UserTurnInferenceCompletedFrame, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor @@ -404,7 +404,7 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor): ) self._turn_suppressed = True - # No UserTurnCompletedFrame is broadcast here: the turn is + # No UserTurnInferenceCompletedFrame is broadcast here: the turn is # explicitly not complete. The re-prompt path is driven by # this mixin's own timeout. @@ -427,7 +427,7 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor): # LLMTurnCompletionUserTurnStopStrategy) can fire # `on_user_turn_stopped`. Must fire before the marker so # downstream consumers see the signal before the response. - await self.broadcast_frame(UserTurnCompletedFrame) + await self.broadcast_frame(UserTurnInferenceCompletedFrame) # Push the marker as a sideband signal that the assistant # aggregator will prepend to the upcoming aggregated text, diff --git a/tests/test_context_aggregators_universal.py b/tests/test_context_aggregators_universal.py index da3023b9b..a8ecace9d 100644 --- a/tests/test_context_aggregators_universal.py +++ b/tests/test_context_aggregators_universal.py @@ -570,7 +570,7 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase): async def test_inference_triggered_event_fires_on_default_strategies(self): """Default flow fires inference-triggered before stopped, both with the same strategy.""" - from pipecat.frames.frames import UserTurnCompletedFrame # noqa: F401 + from pipecat.frames.frames import UserTurnInferenceCompletedFrame # noqa: F401 context = LLMContext() user_aggregator = LLMUserAggregator( @@ -649,8 +649,8 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase): self.assertIsInstance(stop_strategies[1], LLMTurnCompletionUserTurnStopStrategy) async def test_llm_completion_strategy_finalizes_on_complete_marker(self): - """LLMTurnCompletionUserTurnStopStrategy finalizes only on UserTurnCompletedFrame(complete).""" - from pipecat.frames.frames import UserTurnCompletedFrame + """LLMTurnCompletionUserTurnStopStrategy finalizes only on UserTurnInferenceCompletedFrame(complete).""" + from pipecat.frames.frames import UserTurnInferenceCompletedFrame from pipecat.turns.user_stop import LLMTurnCompletionUserTurnStopStrategy, deferred gating = LLMTurnCompletionUserTurnStopStrategy() @@ -678,7 +678,7 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase): pipeline = Pipeline([user_aggregator]) # Drive the pipeline. Inference fires after the upstream - # strategy's timeout. Stop fires only when UserTurnCompletedFrame + # strategy's timeout. Stop fires only when UserTurnInferenceCompletedFrame # arrives (producer absence == "not yet complete"). frames_to_send = [ VADUserStartedSpeakingFrame(), @@ -687,7 +687,7 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase): VADUserStoppedSpeakingFrame(), SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.1), # At this point inference_triggered should have fired but NOT stopped. - UserTurnCompletedFrame(), + UserTurnInferenceCompletedFrame(), SleepFrame(), ] await run_test(pipeline, frames_to_send=frames_to_send) @@ -703,7 +703,7 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase): and the conversation context should reflect the full user utterance, not just the segment from the last inference. """ - from pipecat.frames.frames import UserTurnCompletedFrame + from pipecat.frames.frames import UserTurnInferenceCompletedFrame from pipecat.turns.user_stop import LLMTurnCompletionUserTurnStopStrategy, deferred gating = LLMTurnCompletionUserTurnStopStrategy() @@ -747,8 +747,8 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase): VADUserStoppedSpeakingFrame(), SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.1), # Second inference fired here. Now the LLM returns ✓ and the - # turn finalizes via UserTurnCompletedFrame. - UserTurnCompletedFrame(), + # turn finalizes via UserTurnInferenceCompletedFrame. + UserTurnInferenceCompletedFrame(), SleepFrame(), ] await run_test(pipeline, frames_to_send=frames_to_send) diff --git a/tests/test_user_turn_completion_mixin.py b/tests/test_user_turn_completion_mixin.py index 0947d0a3c..5a4f0c640 100644 --- a/tests/test_user_turn_completion_mixin.py +++ b/tests/test_user_turn_completion_mixin.py @@ -12,7 +12,7 @@ from pipecat.frames.frames import ( LLMFullResponseEndFrame, LLMMarkerFrame, LLMTextFrame, - UserTurnCompletedFrame, + UserTurnInferenceCompletedFrame, ) from pipecat.processors.frame_processor import FrameProcessor from pipecat.services.llm_service import LLMService @@ -61,8 +61,8 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase self.assertEqual(marker_frames[0].marker, USER_TURN_COMPLETE_MARKER) self.assertFalse(marker_frames[0].append_to_context_immediately) - # UserTurnCompletedFrame broadcast in both directions. - completed = [f for f in pushed_frames if isinstance(f, UserTurnCompletedFrame)] + # UserTurnInferenceCompletedFrame broadcast in both directions. + completed = [f for f in pushed_frames if isinstance(f, UserTurnInferenceCompletedFrame)] self.assertEqual(len(completed), 2) async def test_incomplete_short_marker_suppresses_text(self): @@ -87,8 +87,8 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase self.assertEqual(marker_frames[0].marker, USER_TURN_INCOMPLETE_SHORT_MARKER) self.assertTrue(marker_frames[0].append_to_context_immediately) - # Incomplete markers do not emit UserTurnCompletedFrame. - completed = [f for f in pushed_frames if isinstance(f, UserTurnCompletedFrame)] + # Incomplete markers do not emit UserTurnInferenceCompletedFrame. + completed = [f for f in pushed_frames if isinstance(f, UserTurnInferenceCompletedFrame)] self.assertEqual(len(completed), 0) async def test_incomplete_long_marker_suppresses_text(self): @@ -112,7 +112,7 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase self.assertEqual(marker_frames[0].marker, USER_TURN_INCOMPLETE_LONG_MARKER) self.assertTrue(marker_frames[0].append_to_context_immediately) - completed = [f for f in pushed_frames if isinstance(f, UserTurnCompletedFrame)] + completed = [f for f in pushed_frames if isinstance(f, UserTurnInferenceCompletedFrame)] self.assertEqual(len(completed), 0) async def test_text_buffered_until_marker_found(self): @@ -135,7 +135,7 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase await processor._push_turn_text(f" {USER_TURN_COMPLETE_MARKER} How are you?") # One LLMTextFrame for the spoken portion; one LLMMarkerFrame for - # the marker; UserTurnCompletedFrame broadcast in both directions. + # the marker; UserTurnInferenceCompletedFrame broadcast in both directions. text_frames = [f for f in pushed_frames if isinstance(f, LLMTextFrame)] self.assertEqual(len(text_frames), 1) marker_frames = [f for f in pushed_frames if isinstance(f, LLMMarkerFrame)] From 457a68ce648d86c70976307f1eef48f5122d567f Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 7 May 2026 09:14:53 -0400 Subject: [PATCH 8/9] Correct docstrings and comments regarding incomplete_long_timeout duration, 10 sec --- .../turn-management-filter-incomplete-turns.py | 6 +++--- src/pipecat/turns/user_turn_completion_mixin.py | 4 ++-- src/pipecat/turns/user_turn_strategies.py | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/examples/turn-management/turn-management-filter-incomplete-turns.py b/examples/turn-management/turn-management-filter-incomplete-turns.py index 00066b6c8..99c0394fb 100644 --- a/examples/turn-management/turn-management-filter-incomplete-turns.py +++ b/examples/turn-management/turn-management-filter-incomplete-turns.py @@ -10,7 +10,7 @@ Demonstrates LLM-based turn completion detection to suppress bot responses when the user was cut off mid-thought. The LLM outputs one of three markers: - ✓ (complete): User finished their thought, respond normally - ○ (incomplete short): User was cut off, wait ~5s then prompt -- ◐ (incomplete long): User needs time to think, wait ~15s then prompt +- ◐ (incomplete long): User needs time to think, wait ~10s then prompt When incomplete is detected, the bot's response is suppressed. After the timeout expires, the LLM is automatically prompted to re-engage the user. @@ -91,7 +91,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): # with one of: # ✓ = complete (respond normally) # ○ = incomplete short (wait 5s, then prompt) - # ◐ = incomplete long (wait 15s, then prompt) + # ◐ = incomplete long (wait 10s, then prompt) user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, user_params=LLMUserAggregatorParams( @@ -100,7 +100,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): # Optional: customize turn completion behavior # config=UserTurnCompletionConfig( # incomplete_short_timeout=5.0, - # incomplete_long_timeout=15.0, + # incomplete_long_timeout=10.0, # incomplete_short_prompt="Custom prompt...", # incomplete_long_prompt="Custom prompt...", # instructions="Custom turn completion instructions...", diff --git a/src/pipecat/turns/user_turn_completion_mixin.py b/src/pipecat/turns/user_turn_completion_mixin.py index cf600466d..c423e16bf 100644 --- a/src/pipecat/turns/user_turn_completion_mixin.py +++ b/src/pipecat/turns/user_turn_completion_mixin.py @@ -187,8 +187,8 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor): It processes turn completion markers to enable smarter conversation flow: - ✓ (COMPLETE): Push response normally - - ○ (INCOMPLETE SHORT): Suppress response, wait ~5s, then prompt - - ◐ (INCOMPLETE LONG): Suppress response, wait ~15s, then prompt + - ○ (INCOMPLETE SHORT): Suppress response, wait 5s, then prompt + - ◐ (INCOMPLETE LONG): Suppress response, wait 10s, then prompt When incomplete timeouts expire, the mixin automatically prompts the LLM with a contextual follow-up message to re-engage the user. diff --git a/src/pipecat/turns/user_turn_strategies.py b/src/pipecat/turns/user_turn_strategies.py index eee0b6ff6..613722804 100644 --- a/src/pipecat/turns/user_turn_strategies.py +++ b/src/pipecat/turns/user_turn_strategies.py @@ -138,7 +138,7 @@ class FilterIncompleteUserTurnStrategies(UserTurnStrategies): user_turn_strategies=FilterIncompleteUserTurnStrategies( config=UserTurnCompletionConfig( incomplete_short_timeout=5.0, - incomplete_long_timeout=15.0, + incomplete_long_timeout=10.0, ), ) """ From c46ede833565dc349aedac78f8cca6e87a6de575 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 7 May 2026 09:42:11 -0400 Subject: [PATCH 9/9] Use Sphinx .. deprecated:: directive for deprecated aggregator params Aligns deprecation docstrings on LLMUserAggregatorParams and LLMAssistantAggregatorParams with CONTRIBUTING.md conventions: present-tense parameter descriptions plus a `.. deprecated:: 1.2.0` directive noting replacement and 2.0.0 removal. Also adds a runtime DeprecationWarning for `user_turn_completion_config`, which previously had no warning despite being deprecated. --- .../aggregators/llm_response_universal.py | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/pipecat/processors/aggregators/llm_response_universal.py b/src/pipecat/processors/aggregators/llm_response_universal.py index 0e5b9cef4..475d470e3 100644 --- a/src/pipecat/processors/aggregators/llm_response_universal.py +++ b/src/pipecat/processors/aggregators/llm_response_universal.py @@ -132,12 +132,21 @@ class LLMUserAggregatorParams: marker at the start of each response: ✓ (complete), ○ (incomplete short), or ◐ (incomplete long). Incomplete responses are suppressed and timeouts trigger re-prompting. + + .. deprecated:: 1.2.0 + Use ``user_turn_strategies=FilterIncompleteUserTurnStrategies()`` + instead. Will be removed in version 2.0.0. + user_turn_completion_config: [DEPRECATED] Configuration for turn completion behavior including custom instructions, timeouts, and prompts. Only used when filter_incomplete_user_turns is True (deprecated path) — for the new strategy-based API, pass the config directly to ``FilterIncompleteUserTurnStrategies(config=...)``. + .. deprecated:: 1.2.0 + Pass the config directly to + ``FilterIncompleteUserTurnStrategies(config=...)`` instead. + Will be removed in version 2.0.0. """ add_tool_change_messages: bool = False @@ -166,6 +175,15 @@ class LLMUserAggregatorParams: stacklevel=2, ) + if self.user_turn_completion_config is not None: + warnings.warn( + "LLMUserAggregatorParams.user_turn_completion_config is deprecated. " + "Pass the config directly to " + "FilterIncompleteUserTurnStrategies(config=...) instead.", + DeprecationWarning, + stacklevel=2, + ) + @dataclass class LLMAssistantAggregatorParams: @@ -201,6 +219,11 @@ class LLMAssistantAggregatorParams: # --------------------------------------------------------------------------- # Deprecated field names — kept for backward compatibility. # Use enable_auto_context_summarization and auto_context_summarization_config instead. + # + # .. deprecated:: 1.2.0 + # Use ``enable_auto_context_summarization`` and + # ``auto_context_summarization_config`` instead. Will be removed in + # version 2.0.0. # --------------------------------------------------------------------------- enable_context_summarization: bool | None = None context_summarization_config: LLMContextSummarizationConfig | None = None