From 2281cd83591fcd5b0a76d28966fcee4feec4784e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 1 May 2026 15:09:30 -0700 Subject: [PATCH] Extract ExternalUserTurnCompletionStopStrategy as a reusable base MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `LLMTurnCompletionUserTurnStopStrategy` previously bundled two concerns: pushing `LLMUpdateSettingsFrame` on `StartFrame`, and finalizing the turn on `UserTurnCompletedFrame`. The latter is producer-agnostic — any component that emits `UserTurnCompletedFrame` (STT with built-in turn detection, dedicated end-of-turn classifiers, custom code) can drive finalization the same way. Move the frame-handling half into a new `ExternalUserTurnCompletionStopStrategy`. The LLM-specific subclass now only adds the settings-frame push and inherits finalization. Mirrors the existing `ExternalUserTurnStopStrategy` naming pattern. --- src/pipecat/turns/user_stop/__init__.py | 2 + ...rnal_user_turn_completion_stop_strategy.py | 47 +++++++++++++ ...turn_completion_user_turn_stop_strategy.py | 66 ++++++++----------- 3 files changed, 77 insertions(+), 38 deletions(-) create mode 100644 src/pipecat/turns/user_stop/external_user_turn_completion_stop_strategy.py diff --git a/src/pipecat/turns/user_stop/__init__.py b/src/pipecat/turns/user_stop/__init__.py index 4effd6d5c..14eea8407 100644 --- a/src/pipecat/turns/user_stop/__init__.py +++ b/src/pipecat/turns/user_stop/__init__.py @@ -6,6 +6,7 @@ from .base_user_turn_stop_strategy import BaseUserTurnStopStrategy, UserTurnStoppedParams from .deferred_user_turn_stop_strategy import DeferredUserTurnStopStrategy, deferred +from .external_user_turn_completion_stop_strategy import ExternalUserTurnCompletionStopStrategy from .external_user_turn_stop_strategy import ExternalUserTurnStopStrategy from .llm_turn_completion_user_turn_stop_strategy import LLMTurnCompletionUserTurnStopStrategy from .speech_timeout_user_turn_stop_strategy import SpeechTimeoutUserTurnStopStrategy @@ -14,6 +15,7 @@ from .turn_analyzer_user_turn_stop_strategy import TurnAnalyzerUserTurnStopStrat __all__ = [ "BaseUserTurnStopStrategy", "DeferredUserTurnStopStrategy", + "ExternalUserTurnCompletionStopStrategy", "ExternalUserTurnStopStrategy", "LLMTurnCompletionUserTurnStopStrategy", "SpeechTimeoutUserTurnStopStrategy", diff --git a/src/pipecat/turns/user_stop/external_user_turn_completion_stop_strategy.py b/src/pipecat/turns/user_stop/external_user_turn_completion_stop_strategy.py new file mode 100644 index 000000000..c132a5db8 --- /dev/null +++ b/src/pipecat/turns/user_stop/external_user_turn_completion_stop_strategy.py @@ -0,0 +1,47 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""User turn stop strategy that finalizes on ``UserTurnCompletedFrame``.""" + +from pipecat.frames.frames import Frame, UserTurnCompletedFrame +from pipecat.turns.types import ProcessFrameResult +from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy + + +class ExternalUserTurnCompletionStopStrategy(BaseUserTurnStopStrategy): + """Finalize the user turn whenever a ``UserTurnCompletedFrame`` arrives. + + Generic stop strategy for pipelines where some external component + (LLM with completion markers, STT with built-in turn detection, a + dedicated end-of-turn classifier, custom user code, etc.) judges + when a turn is semantically complete and emits + :class:`~pipecat.frames.frames.UserTurnCompletedFrame`. + + Pair this with one or more ``deferred(...)``-wrapped detector + strategies that drive ``on_user_turn_inference_triggered`` but + leave finalization to this strategy:: + + stop=[ + deferred(TurnAnalyzerUserTurnStopStrategy(turn_analyzer=...)), + ExternalUserTurnCompletionStopStrategy(), + ] + + For LLM-completion-marker gating specifically, use the subclass + :class:`~pipecat.turns.user_stop.LLMTurnCompletionUserTurnStopStrategy` + instead, which additionally pushes the ``LLMUpdateSettingsFrame`` + that enables the marker protocol on the LLM. + + If the producer never emits ``UserTurnCompletedFrame``, the + controller's ``user_turn_stop_timeout`` watchdog finalizes the + turn after no activity. Tune that timeout if your producer can + take longer than the default to respond. + """ + + async def process_frame(self, frame: Frame) -> ProcessFrameResult: + """Fire ``on_user_turn_stopped`` whenever ``UserTurnCompletedFrame`` is seen.""" + if isinstance(frame, UserTurnCompletedFrame): + await self.trigger_user_turn_finalized() + return ProcessFrameResult.CONTINUE diff --git a/src/pipecat/turns/user_stop/llm_turn_completion_user_turn_stop_strategy.py b/src/pipecat/turns/user_stop/llm_turn_completion_user_turn_stop_strategy.py index e75a6d3da..08dea6df1 100644 --- a/src/pipecat/turns/user_stop/llm_turn_completion_user_turn_stop_strategy.py +++ b/src/pipecat/turns/user_stop/llm_turn_completion_user_turn_stop_strategy.py @@ -6,47 +6,40 @@ """User turn stop strategy gated on the LLM's turn-completion verdict.""" -from pipecat.frames.frames import ( - Frame, - LLMUpdateSettingsFrame, - StartFrame, - UserTurnCompletedFrame, -) +from pipecat.frames.frames import Frame, LLMUpdateSettingsFrame, StartFrame from pipecat.services.settings import LLMSettings from pipecat.turns.types import ProcessFrameResult -from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy +from pipecat.turns.user_stop.external_user_turn_completion_stop_strategy import ( + ExternalUserTurnCompletionStopStrategy, +) from pipecat.turns.user_turn_completion_mixin import UserTurnCompletionConfig -class LLMTurnCompletionUserTurnStopStrategy(BaseUserTurnStopStrategy): - """User turn stop strategy that finalizes only when the LLM agrees. +class LLMTurnCompletionUserTurnStopStrategy(ExternalUserTurnCompletionStopStrategy): + """LLM-gated stop strategy. - This strategy lets another stop strategy (e.g. smart-turn analyzer) - trigger LLM inference, then defers the public ``on_user_turn_stopped`` - event until the LLM emits a ``UserTurnCompletedFrame``. On - ``incomplete_short`` / ``incomplete_long`` markers the - :class:`~pipecat.turns.user_turn_completion_mixin.UserTurnCompletionLLMServiceMixin` - re-prompts the LLM internally and no completion frame is emitted. - - To use this strategy, install it alongside one or more upstream stop - strategies in ``UserTurnStrategies.stop`` and wrap those upstream - strategies with :func:`~pipecat.turns.user_stop.deferred` so they - fire only ``on_user_turn_inference_triggered`` and leave - finalization to this strategy. The aggregator's deprecation path - for ``filter_incomplete_user_turns`` does this rewiring - automatically. - - If the LLM never returns a completion frame (malformed output, - unreachable service, etc.), the controller's - ``user_turn_stop_timeout`` watchdog is the safety net — it fires - ``on_user_turn_stopped`` after no activity for that many seconds. - Tune ``user_turn_stop_timeout`` higher if your LLM regularly takes - longer than the default to respond. - - On ``StartFrame`` the strategy pushes an ``LLMUpdateSettingsFrame`` + Extends + :class:`~pipecat.turns.user_stop.ExternalUserTurnCompletionStopStrategy` + with the LLM-specific setup needed for the marker-based completion + protocol: on ``StartFrame``, pushes an ``LLMUpdateSettingsFrame`` upstream that enables ``filter_incomplete_user_turns`` on the LLM - service and seeds the + and seeds the :class:`~pipecat.turns.user_turn_completion_mixin.UserTurnCompletionConfig`. + + Finalization itself is inherited: when the LLM service's + :class:`~pipecat.turns.user_turn_completion_mixin.UserTurnCompletionLLMServiceMixin` + detects a ``✓`` marker, it broadcasts a + :class:`~pipecat.frames.frames.UserTurnCompletedFrame` and the + base class fires ``on_user_turn_stopped``. On + ``incomplete_short`` / ``incomplete_long`` markers the mixin + re-prompts internally and no completion frame is emitted, so the + public stop event stays deferred. + + Install alongside one or more ``deferred(...)``-wrapped detector + strategies that drive ``on_user_turn_inference_triggered`` but + leave finalization to this strategy. The aggregator's deprecation + path for ``filter_incomplete_user_turns`` does this rewiring + automatically. """ def __init__( @@ -73,13 +66,10 @@ class LLMTurnCompletionUserTurnStopStrategy(BaseUserTurnStopStrategy): return self._config async def process_frame(self, frame: Frame) -> ProcessFrameResult: - """Observe frames to drive the finalization decision.""" + """Configure the LLM on start and delegate completion handling to the base.""" if isinstance(frame, StartFrame): await self._configure_llm() - elif isinstance(frame, UserTurnCompletedFrame): - await self.trigger_user_turn_finalized() - - return ProcessFrameResult.CONTINUE + return await super().process_frame(frame) async def _configure_llm(self): await self.push_frame(