Extract ExternalUserTurnCompletionStopStrategy as a reusable base
`LLMTurnCompletionUserTurnStopStrategy` previously bundled two concerns: pushing `LLMUpdateSettingsFrame` on `StartFrame`, and finalizing the turn on `UserTurnCompletedFrame`. The latter is producer-agnostic — any component that emits `UserTurnCompletedFrame` (STT with built-in turn detection, dedicated end-of-turn classifiers, custom code) can drive finalization the same way. Move the frame-handling half into a new `ExternalUserTurnCompletionStopStrategy`. The LLM-specific subclass now only adds the settings-frame push and inherits finalization. Mirrors the existing `ExternalUserTurnStopStrategy` naming pattern.
This commit is contained in:
@@ -6,6 +6,7 @@
|
||||
|
||||
from .base_user_turn_stop_strategy import BaseUserTurnStopStrategy, UserTurnStoppedParams
|
||||
from .deferred_user_turn_stop_strategy import DeferredUserTurnStopStrategy, deferred
|
||||
from .external_user_turn_completion_stop_strategy import ExternalUserTurnCompletionStopStrategy
|
||||
from .external_user_turn_stop_strategy import ExternalUserTurnStopStrategy
|
||||
from .llm_turn_completion_user_turn_stop_strategy import LLMTurnCompletionUserTurnStopStrategy
|
||||
from .speech_timeout_user_turn_stop_strategy import SpeechTimeoutUserTurnStopStrategy
|
||||
@@ -14,6 +15,7 @@ from .turn_analyzer_user_turn_stop_strategy import TurnAnalyzerUserTurnStopStrat
|
||||
__all__ = [
|
||||
"BaseUserTurnStopStrategy",
|
||||
"DeferredUserTurnStopStrategy",
|
||||
"ExternalUserTurnCompletionStopStrategy",
|
||||
"ExternalUserTurnStopStrategy",
|
||||
"LLMTurnCompletionUserTurnStopStrategy",
|
||||
"SpeechTimeoutUserTurnStopStrategy",
|
||||
|
||||
@@ -0,0 +1,47 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""User turn stop strategy that finalizes on ``UserTurnCompletedFrame``."""
|
||||
|
||||
from pipecat.frames.frames import Frame, UserTurnCompletedFrame
|
||||
from pipecat.turns.types import ProcessFrameResult
|
||||
from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy
|
||||
|
||||
|
||||
class ExternalUserTurnCompletionStopStrategy(BaseUserTurnStopStrategy):
|
||||
"""Finalize the user turn whenever a ``UserTurnCompletedFrame`` arrives.
|
||||
|
||||
Generic stop strategy for pipelines where some external component
|
||||
(LLM with completion markers, STT with built-in turn detection, a
|
||||
dedicated end-of-turn classifier, custom user code, etc.) judges
|
||||
when a turn is semantically complete and emits
|
||||
:class:`~pipecat.frames.frames.UserTurnCompletedFrame`.
|
||||
|
||||
Pair this with one or more ``deferred(...)``-wrapped detector
|
||||
strategies that drive ``on_user_turn_inference_triggered`` but
|
||||
leave finalization to this strategy::
|
||||
|
||||
stop=[
|
||||
deferred(TurnAnalyzerUserTurnStopStrategy(turn_analyzer=...)),
|
||||
ExternalUserTurnCompletionStopStrategy(),
|
||||
]
|
||||
|
||||
For LLM-completion-marker gating specifically, use the subclass
|
||||
:class:`~pipecat.turns.user_stop.LLMTurnCompletionUserTurnStopStrategy`
|
||||
instead, which additionally pushes the ``LLMUpdateSettingsFrame``
|
||||
that enables the marker protocol on the LLM.
|
||||
|
||||
If the producer never emits ``UserTurnCompletedFrame``, the
|
||||
controller's ``user_turn_stop_timeout`` watchdog finalizes the
|
||||
turn after no activity. Tune that timeout if your producer can
|
||||
take longer than the default to respond.
|
||||
"""
|
||||
|
||||
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
|
||||
"""Fire ``on_user_turn_stopped`` whenever ``UserTurnCompletedFrame`` is seen."""
|
||||
if isinstance(frame, UserTurnCompletedFrame):
|
||||
await self.trigger_user_turn_finalized()
|
||||
return ProcessFrameResult.CONTINUE
|
||||
@@ -6,47 +6,40 @@
|
||||
|
||||
"""User turn stop strategy gated on the LLM's turn-completion verdict."""
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
LLMUpdateSettingsFrame,
|
||||
StartFrame,
|
||||
UserTurnCompletedFrame,
|
||||
)
|
||||
from pipecat.frames.frames import Frame, LLMUpdateSettingsFrame, StartFrame
|
||||
from pipecat.services.settings import LLMSettings
|
||||
from pipecat.turns.types import ProcessFrameResult
|
||||
from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy
|
||||
from pipecat.turns.user_stop.external_user_turn_completion_stop_strategy import (
|
||||
ExternalUserTurnCompletionStopStrategy,
|
||||
)
|
||||
from pipecat.turns.user_turn_completion_mixin import UserTurnCompletionConfig
|
||||
|
||||
|
||||
class LLMTurnCompletionUserTurnStopStrategy(BaseUserTurnStopStrategy):
|
||||
"""User turn stop strategy that finalizes only when the LLM agrees.
|
||||
class LLMTurnCompletionUserTurnStopStrategy(ExternalUserTurnCompletionStopStrategy):
|
||||
"""LLM-gated stop strategy.
|
||||
|
||||
This strategy lets another stop strategy (e.g. smart-turn analyzer)
|
||||
trigger LLM inference, then defers the public ``on_user_turn_stopped``
|
||||
event until the LLM emits a ``UserTurnCompletedFrame``. On
|
||||
``incomplete_short`` / ``incomplete_long`` markers the
|
||||
:class:`~pipecat.turns.user_turn_completion_mixin.UserTurnCompletionLLMServiceMixin`
|
||||
re-prompts the LLM internally and no completion frame is emitted.
|
||||
|
||||
To use this strategy, install it alongside one or more upstream stop
|
||||
strategies in ``UserTurnStrategies.stop`` and wrap those upstream
|
||||
strategies with :func:`~pipecat.turns.user_stop.deferred` so they
|
||||
fire only ``on_user_turn_inference_triggered`` and leave
|
||||
finalization to this strategy. The aggregator's deprecation path
|
||||
for ``filter_incomplete_user_turns`` does this rewiring
|
||||
automatically.
|
||||
|
||||
If the LLM never returns a completion frame (malformed output,
|
||||
unreachable service, etc.), the controller's
|
||||
``user_turn_stop_timeout`` watchdog is the safety net — it fires
|
||||
``on_user_turn_stopped`` after no activity for that many seconds.
|
||||
Tune ``user_turn_stop_timeout`` higher if your LLM regularly takes
|
||||
longer than the default to respond.
|
||||
|
||||
On ``StartFrame`` the strategy pushes an ``LLMUpdateSettingsFrame``
|
||||
Extends
|
||||
:class:`~pipecat.turns.user_stop.ExternalUserTurnCompletionStopStrategy`
|
||||
with the LLM-specific setup needed for the marker-based completion
|
||||
protocol: on ``StartFrame``, pushes an ``LLMUpdateSettingsFrame``
|
||||
upstream that enables ``filter_incomplete_user_turns`` on the LLM
|
||||
service and seeds the
|
||||
and seeds the
|
||||
:class:`~pipecat.turns.user_turn_completion_mixin.UserTurnCompletionConfig`.
|
||||
|
||||
Finalization itself is inherited: when the LLM service's
|
||||
:class:`~pipecat.turns.user_turn_completion_mixin.UserTurnCompletionLLMServiceMixin`
|
||||
detects a ``✓`` marker, it broadcasts a
|
||||
:class:`~pipecat.frames.frames.UserTurnCompletedFrame` and the
|
||||
base class fires ``on_user_turn_stopped``. On
|
||||
``incomplete_short`` / ``incomplete_long`` markers the mixin
|
||||
re-prompts internally and no completion frame is emitted, so the
|
||||
public stop event stays deferred.
|
||||
|
||||
Install alongside one or more ``deferred(...)``-wrapped detector
|
||||
strategies that drive ``on_user_turn_inference_triggered`` but
|
||||
leave finalization to this strategy. The aggregator's deprecation
|
||||
path for ``filter_incomplete_user_turns`` does this rewiring
|
||||
automatically.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@@ -73,13 +66,10 @@ class LLMTurnCompletionUserTurnStopStrategy(BaseUserTurnStopStrategy):
|
||||
return self._config
|
||||
|
||||
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
|
||||
"""Observe frames to drive the finalization decision."""
|
||||
"""Configure the LLM on start and delegate completion handling to the base."""
|
||||
if isinstance(frame, StartFrame):
|
||||
await self._configure_llm()
|
||||
elif isinstance(frame, UserTurnCompletedFrame):
|
||||
await self.trigger_user_turn_finalized()
|
||||
|
||||
return ProcessFrameResult.CONTINUE
|
||||
return await super().process_frame(frame)
|
||||
|
||||
async def _configure_llm(self):
|
||||
await self.push_frame(
|
||||
|
||||
Reference in New Issue
Block a user