Split user-turn-stop into inference-triggered and finalized events
Fixes a real bug: with `filter_incomplete_user_turns` enabled, the
smart-turn detector's tentative stop was firing `on_user_turn_stopped`
before the LLM had a chance to veto it. Observers, transcript
appenders and UI indicators received an early — and sometimes
duplicated — signal.
Decomposes the single stop concern into two events:
- `on_user_turn_inference_triggered` fires when a stop strategy has
enough signal to start LLM inference. The aggregator pushes the
context here, kicking off the LLM call.
- `on_user_turn_stopped` fires only when the user turn is semantically
final. Built-in strategies fire both events at the same call site,
preserving today's behavior for the common case.
Adds `LLMTurnCompletionUserTurnStopStrategy`, which gates
finalization on a `UserTurnCompletedFrame` (a fieldless system frame
emitted by any component judging turn completeness — currently the
`UserTurnCompletionLLMServiceMixin` on `✓`).
Adds `deferred(strategy)` / `DeferredUserTurnStopStrategy`, a thin
wrapper that forwards an inner strategy's events except
`on_user_turn_stopped`. Use this to install a stop strategy as an
inference trigger only, leaving finalization to a peer (e.g. the LLM
completion strategy).
Adds `llm_completion_user_turn_stop_strategies()` for the common
case:
UserTurnStrategies(
stop=llm_completion_user_turn_stop_strategies(),
)
Deprecates `LLMUserAggregatorParams.filter_incomplete_user_turns`.
The aggregator emits a `DeprecationWarning`, wraps existing stop
strategies with `deferred(...)`, and appends
`LLMTurnCompletionUserTurnStopStrategy` automatically.
This commit is contained in:
@@ -41,6 +41,10 @@ from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
from pipecat.turns.user_turn_strategies import (
|
||||
UserTurnStrategies,
|
||||
llm_completion_user_turn_stop_strategies,
|
||||
)
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
@@ -83,23 +87,31 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
)
|
||||
|
||||
context = LLMContext()
|
||||
# `llm_completion_user_turn_stop_strategies()` pairs the default
|
||||
# stop strategies with `LLMTurnCompletionUserTurnStopStrategy`:
|
||||
# those strategies trigger LLM inference but the public
|
||||
# `on_user_turn_stopped` event fires only when the LLM confirms ✓.
|
||||
# The LLM marks each response with one of:
|
||||
# ✓ = complete (respond normally)
|
||||
# ○ = incomplete short (wait 5s, then prompt)
|
||||
# ◐ = incomplete long (wait 15s, then prompt)
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
# Enable turn completion filtering - the LLM will output:
|
||||
# ✓ = complete (respond normally)
|
||||
# ○ = incomplete short (wait 5s, then prompt)
|
||||
# ◐ = incomplete long (wait 15s, then prompt)
|
||||
filter_incomplete_user_turns=True,
|
||||
# Optional: customize turn completion behavior
|
||||
# turn_completion_config=TurnCompletionConfig(
|
||||
# incomplete_short_timeout=5.0,
|
||||
# incomplete_long_timeout=15.0,
|
||||
# incomplete_short_prompt="Custom prompt...",
|
||||
# incomplete_long_prompt="Custom prompt...",
|
||||
# instructions="Custom turn completion instructions...",
|
||||
# ),
|
||||
user_turn_strategies=UserTurnStrategies(
|
||||
stop=llm_completion_user_turn_stop_strategies(),
|
||||
# Optional: customize turn completion behavior
|
||||
# stop=llm_completion_user_turn_stop_strategies(
|
||||
# config=UserTurnCompletionConfig(
|
||||
# incomplete_short_timeout=5.0,
|
||||
# incomplete_long_timeout=15.0,
|
||||
# incomplete_short_prompt="Custom prompt...",
|
||||
# incomplete_long_prompt="Custom prompt...",
|
||||
# instructions="Custom turn completion instructions...",
|
||||
# ),
|
||||
# ),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@@ -970,6 +970,24 @@ class UserSpeakingFrame(SystemFrame):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class UserTurnCompletedFrame(SystemFrame):
|
||||
"""Frame indicating that the user turn is semantically complete.
|
||||
|
||||
Emitted by any component that can judge conversational turn
|
||||
completeness — for example an LLM with turn-completion markers, an
|
||||
STT service with built-in turn detection, or a dedicated
|
||||
end-of-turn classifier. Stop strategies that gate the
|
||||
user-turn-stop event on an external completeness signal (e.g.
|
||||
``LLMTurnCompletionUserTurnStopStrategy``) consume this frame to
|
||||
finalize the turn. Producers should emit this frame only when they
|
||||
judge the turn complete; an absence of this frame means the turn is
|
||||
not yet considered complete.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class VADUserStartedSpeakingFrame(SystemFrame):
|
||||
"""Frame emitted when VAD definitively detects user started speaking.
|
||||
|
||||
@@ -53,7 +53,6 @@ from pipecat.frames.frames import (
|
||||
LLMThoughtEndFrame,
|
||||
LLMThoughtStartFrame,
|
||||
LLMThoughtTextFrame,
|
||||
LLMUpdateSettingsFrame,
|
||||
StartFrame,
|
||||
TextFrame,
|
||||
TranscriptionFrame,
|
||||
@@ -79,7 +78,6 @@ from pipecat.processors.aggregators.llm_context_summarizer import (
|
||||
SummaryAppliedEvent,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.services.settings import LLMSettings
|
||||
from pipecat.turns.user_idle_controller import UserIdleController
|
||||
from pipecat.turns.user_mute import BaseUserMuteStrategy
|
||||
from pipecat.turns.user_start import BaseUserTurnStartStrategy, UserTurnStartedParams
|
||||
@@ -100,25 +98,6 @@ class LLMUserAggregatorParams:
|
||||
"""Parameters for configuring LLM user aggregation behavior.
|
||||
|
||||
Parameters:
|
||||
user_turn_strategies: User turn start and stop strategies.
|
||||
user_mute_strategies: List of user mute strategies.
|
||||
user_turn_stop_timeout: Time in seconds to wait before considering the
|
||||
user's turn finished.
|
||||
user_idle_timeout: Timeout in seconds for detecting user idle state.
|
||||
The aggregator will emit an `on_user_turn_idle` event when the user
|
||||
has been idle (not speaking) for this duration. Set to 0 to disable
|
||||
idle detection.
|
||||
vad_analyzer: Voice Activity Detection analyzer instance.
|
||||
audio_idle_timeout: Timeout in seconds to force speech stop when
|
||||
no audio frames are received while in SPEAKING state (e.g. user mutes
|
||||
mic mid-speech). Set to 0 to disable. Defaults to 1.0.
|
||||
filter_incomplete_user_turns: Whether to filter out incomplete user turns.
|
||||
When enabled, the LLM outputs a turn completion marker at the start of
|
||||
each response: ✓ (complete), ○ (incomplete short), or ◐ (incomplete long).
|
||||
Incomplete responses are suppressed and timeouts trigger re-prompting.
|
||||
user_turn_completion_config: Configuration for turn completion behavior including
|
||||
custom instructions, timeouts, and prompts. Only used when
|
||||
filter_incomplete_user_turns is True.
|
||||
add_tool_change_messages: When True, on each ``LLMSetToolsFrame`` the
|
||||
aggregator computes the diff against the currently advertised tools
|
||||
and appends a developer-role message to the context describing
|
||||
@@ -131,17 +110,59 @@ class LLMUserAggregatorParams:
|
||||
(LLM-specific) tools are ignored. When using
|
||||
``LLMContextAggregatorPair``, prefer setting this via its
|
||||
``add_tool_change_messages`` argument instead. Defaults to False.
|
||||
audio_idle_timeout: Timeout in seconds to force speech stop when
|
||||
no audio frames are received while in SPEAKING state (e.g. user mutes
|
||||
mic mid-speech). Set to 0 to disable. Defaults to 1.0.
|
||||
user_turn_strategies: User turn start and stop strategies.
|
||||
user_mute_strategies: List of user mute strategies.
|
||||
user_turn_stop_timeout: Time in seconds to wait before considering the
|
||||
user's turn finished.
|
||||
user_idle_timeout: Timeout in seconds for detecting user idle state.
|
||||
The aggregator will emit an `on_user_turn_idle` event when the user
|
||||
has been idle (not speaking) for this duration. Set to 0 to disable
|
||||
idle detection.
|
||||
vad_analyzer: Voice Activity Detection analyzer instance.
|
||||
filter_incomplete_user_turns: [DEPRECATED] Add
|
||||
:class:`~pipecat.turns.user_stop.LLMTurnCompletionUserTurnStopStrategy`
|
||||
to ``user_turn_strategies.stop`` instead. When enabled, the LLM
|
||||
outputs a turn-completion marker at the start of each response:
|
||||
✓ (complete), ○ (incomplete short), or ◐ (incomplete long).
|
||||
Incomplete responses are suppressed and timeouts trigger
|
||||
re-prompting.
|
||||
user_turn_completion_config: [DEPRECATED] Configuration for turn
|
||||
completion behavior including custom instructions, timeouts, and
|
||||
prompts. Only used when filter_incomplete_user_turns is True
|
||||
(deprecated path) — for the new strategy-based API, pass the config
|
||||
directly to ``LLMTurnCompletionUserTurnStopStrategy(config=...)``.
|
||||
"""
|
||||
|
||||
add_tool_change_messages: bool = False
|
||||
audio_idle_timeout: float = 1.0
|
||||
user_turn_strategies: UserTurnStrategies | None = None
|
||||
user_mute_strategies: list[BaseUserMuteStrategy] = field(default_factory=list)
|
||||
user_turn_stop_timeout: float = 5.0
|
||||
user_idle_timeout: float = 0
|
||||
vad_analyzer: VADAnalyzer | None = None
|
||||
audio_idle_timeout: float = 1.0
|
||||
filter_incomplete_user_turns: bool = False
|
||||
user_turn_completion_config: UserTurnCompletionConfig | None = None
|
||||
add_tool_change_messages: bool = False
|
||||
|
||||
def __post_init__(self):
|
||||
if self.filter_incomplete_user_turns:
|
||||
warnings.warn(
|
||||
"LLMUserAggregatorParams.filter_incomplete_user_turns is deprecated. "
|
||||
"Add LLMTurnCompletionUserTurnStopStrategy to "
|
||||
"user_turn_strategies.stop instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
if self.user_turn_completion_config:
|
||||
warnings.warn(
|
||||
"LLMUserAggregatorParams.user_turn_completion_config is deprecated. "
|
||||
"Add LLMTurnCompletionUserTurnStopStrategy to "
|
||||
"user_turn_strategies.stop instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -541,13 +562,35 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
self._register_event_handler("on_user_turn_stopped")
|
||||
self._register_event_handler("on_user_turn_stop_timeout")
|
||||
self._register_event_handler("on_user_turn_idle")
|
||||
self._register_event_handler("on_user_turn_inference_triggered")
|
||||
self._register_event_handler("on_user_mute_started")
|
||||
self._register_event_handler("on_user_mute_stopped")
|
||||
|
||||
user_turn_strategies = self._params.user_turn_strategies or UserTurnStrategies()
|
||||
|
||||
# Deprecated path: translate filter_incomplete_user_turns into
|
||||
# wrapping pre-existing stop strategies with deferred() and
|
||||
# appending LLMTurnCompletionUserTurnStopStrategy. The
|
||||
# DeprecationWarning is emitted in LLMUserAggregatorParams.__post_init__.
|
||||
if self._params.filter_incomplete_user_turns:
|
||||
from pipecat.turns.user_stop import (
|
||||
LLMTurnCompletionUserTurnStopStrategy,
|
||||
deferred,
|
||||
)
|
||||
|
||||
existing_stop = list(user_turn_strategies.stop or [])
|
||||
user_turn_strategies.stop = [deferred(s) for s in existing_stop] + [
|
||||
LLMTurnCompletionUserTurnStopStrategy(
|
||||
config=self._params.user_turn_completion_config
|
||||
)
|
||||
]
|
||||
|
||||
self._user_is_muted = False
|
||||
self._user_turn_start_timestamp = ""
|
||||
# Aggregation captured at inference-trigger time and surfaced again
|
||||
# in `on_user_turn_stopped`. Set to None when no inference-triggered
|
||||
# event has fired since the last finalization.
|
||||
self._pending_user_turn_aggregation: str | None = None
|
||||
|
||||
self._user_turn_controller = UserTurnController(
|
||||
user_turn_strategies=user_turn_strategies,
|
||||
@@ -558,6 +601,9 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
self._user_turn_controller.add_event_handler(
|
||||
"on_user_turn_started", self._on_user_turn_started
|
||||
)
|
||||
self._user_turn_controller.add_event_handler(
|
||||
"on_user_turn_inference_triggered", self._on_user_turn_inference_triggered
|
||||
)
|
||||
self._user_turn_controller.add_event_handler(
|
||||
"on_user_turn_stopped", self._on_user_turn_stopped
|
||||
)
|
||||
@@ -676,21 +722,6 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
for s in self._params.user_mute_strategies:
|
||||
await s.setup(self.task_manager)
|
||||
|
||||
# Enable incomplete turn filtering on the LLM if configured
|
||||
if self._params.filter_incomplete_user_turns:
|
||||
# Get config or use defaults
|
||||
config = self._params.user_turn_completion_config or UserTurnCompletionConfig()
|
||||
|
||||
# Enable the feature on the LLM with config
|
||||
await self.push_frame(
|
||||
LLMUpdateSettingsFrame(
|
||||
delta=LLMSettings(
|
||||
filter_incomplete_user_turns=True,
|
||||
user_turn_completion_config=config,
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
async def _stop(self, frame: EndFrame):
|
||||
await self._maybe_emit_user_turn_stopped(on_session_end=True)
|
||||
await self._cleanup()
|
||||
@@ -830,6 +861,7 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
logger.debug(f"{self}: User started speaking (strategy: {strategy})")
|
||||
|
||||
self._user_turn_start_timestamp = time_now_iso8601()
|
||||
self._pending_user_turn_aggregation = None
|
||||
|
||||
if params.enable_user_speaking_frames:
|
||||
await self.broadcast_frame(UserStartedSpeakingFrame)
|
||||
@@ -841,6 +873,22 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
|
||||
await self._call_event_handler("on_user_turn_started", strategy)
|
||||
|
||||
async def _on_user_turn_inference_triggered(
|
||||
self,
|
||||
controller: UserTurnController,
|
||||
strategy: BaseUserTurnStopStrategy,
|
||||
):
|
||||
logger.debug(f"{self}: User turn inference triggered (strategy: {strategy})")
|
||||
|
||||
# Push aggregation now: this writes the user message to the context
|
||||
# and pushes LLMContextFrame, which is what kicks LLM inference.
|
||||
# `on_user_turn_stopped` later fires when the turn is semantically
|
||||
# final and surfaces the aggregated message via the public event.
|
||||
aggregation = await self.push_aggregation()
|
||||
self._pending_user_turn_aggregation = aggregation
|
||||
|
||||
await self._call_event_handler("on_user_turn_inference_triggered", strategy)
|
||||
|
||||
async def _on_user_turn_stopped(
|
||||
self,
|
||||
controller: UserTurnController,
|
||||
@@ -875,15 +923,31 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
):
|
||||
"""Maybe emit user turn stopped event.
|
||||
|
||||
The aggregation has typically already been pushed at
|
||||
inference-trigger time and is cached in
|
||||
``self._pending_user_turn_aggregation``. Any aggregation that has
|
||||
accumulated since the last inference-trigger (e.g. transcriptions
|
||||
that arrived between inference trigger and finalization) is flushed
|
||||
here so end-of-turn content is never lost.
|
||||
|
||||
Args:
|
||||
strategy: The strategy that triggered the turn stop.
|
||||
on_session_end: If True, only emit if there's unemitted content
|
||||
(avoids duplicate events when session ends).
|
||||
"""
|
||||
aggregation = await self.push_aggregation()
|
||||
if not on_session_end or aggregation:
|
||||
previous_aggregation = self._pending_user_turn_aggregation
|
||||
self._pending_user_turn_aggregation = None
|
||||
|
||||
content = None
|
||||
if aggregation and previous_aggregation:
|
||||
content = f"{previous_aggregation} {aggregation}".strip()
|
||||
else:
|
||||
content = previous_aggregation or aggregation
|
||||
|
||||
if not on_session_end or content:
|
||||
message = UserTurnStoppedMessage(
|
||||
content=aggregation, timestamp=self._user_turn_start_timestamp
|
||||
content=content, timestamp=self._user_turn_start_timestamp
|
||||
)
|
||||
await self._call_event_handler("on_user_turn_stopped", strategy, message)
|
||||
self._user_turn_start_timestamp = ""
|
||||
|
||||
@@ -5,14 +5,19 @@
|
||||
#
|
||||
|
||||
from .base_user_turn_stop_strategy import BaseUserTurnStopStrategy, UserTurnStoppedParams
|
||||
from .deferred_user_turn_stop_strategy import DeferredUserTurnStopStrategy, deferred
|
||||
from .external_user_turn_stop_strategy import ExternalUserTurnStopStrategy
|
||||
from .llm_turn_completion_user_turn_stop_strategy import LLMTurnCompletionUserTurnStopStrategy
|
||||
from .speech_timeout_user_turn_stop_strategy import SpeechTimeoutUserTurnStopStrategy
|
||||
from .turn_analyzer_user_turn_stop_strategy import TurnAnalyzerUserTurnStopStrategy
|
||||
|
||||
__all__ = [
|
||||
"BaseUserTurnStopStrategy",
|
||||
"DeferredUserTurnStopStrategy",
|
||||
"ExternalUserTurnStopStrategy",
|
||||
"LLMTurnCompletionUserTurnStopStrategy",
|
||||
"SpeechTimeoutUserTurnStopStrategy",
|
||||
"UserTurnStoppedParams",
|
||||
"TurnAnalyzerUserTurnStopStrategy",
|
||||
"deferred",
|
||||
]
|
||||
|
||||
@@ -45,7 +45,16 @@ class BaseUserTurnStopStrategy(BaseObject):
|
||||
Events triggered by strategies:
|
||||
|
||||
- `on_push_frame`: Indicates the strategy wants to push a frame.
|
||||
- `on_user_turn_stopped`: Signals that the user stopped speaking.
|
||||
- `on_user_turn_inference_triggered`: Signals that enough evidence
|
||||
exists to start LLM inference for the current user turn. In most
|
||||
cases this fires together with `on_user_turn_stopped`. Strategies
|
||||
that gate finalization on the LLM (e.g.
|
||||
``LLMTurnCompletionUserTurnStopStrategy``) fire only this event
|
||||
upstream and a separate strategy fires `on_user_turn_stopped` once
|
||||
the LLM confirms the turn is complete.
|
||||
- `on_user_turn_stopped`: Signals that the user turn is semantically
|
||||
final. Observers, transcript appenders, and UI indicators should
|
||||
bind this event.
|
||||
|
||||
"""
|
||||
|
||||
@@ -64,6 +73,7 @@ class BaseUserTurnStopStrategy(BaseObject):
|
||||
self._task_manager: BaseTaskManager | None = None
|
||||
self._register_event_handler("on_push_frame", sync=True)
|
||||
self._register_event_handler("on_broadcast_frame", sync=True)
|
||||
self._register_event_handler("on_user_turn_inference_triggered", sync=True)
|
||||
self._register_event_handler("on_user_turn_stopped", sync=True)
|
||||
|
||||
@property
|
||||
@@ -123,7 +133,23 @@ class BaseUserTurnStopStrategy(BaseObject):
|
||||
await self._call_event_handler("on_broadcast_frame", frame_cls, **kwargs)
|
||||
|
||||
async def trigger_user_turn_stopped(self):
|
||||
"""Trigger the `on_user_turn_stopped` event."""
|
||||
"""Fire both ``on_user_turn_inference_triggered`` and ``on_user_turn_stopped``.
|
||||
|
||||
Most strategies call this when they decide a turn has ended. To
|
||||
defer finalization to another strategy (so this strategy fires
|
||||
only the inference-triggered event), wrap this strategy with
|
||||
:func:`~pipecat.turns.user_stop.deferred` instead of changing
|
||||
the trigger call.
|
||||
"""
|
||||
await self.trigger_user_turn_inference_triggered()
|
||||
await self.trigger_user_turn_finalized()
|
||||
|
||||
async def trigger_user_turn_inference_triggered(self):
|
||||
"""Trigger only the `on_user_turn_inference_triggered` event."""
|
||||
await self._call_event_handler("on_user_turn_inference_triggered")
|
||||
|
||||
async def trigger_user_turn_finalized(self):
|
||||
"""Trigger only the `on_user_turn_stopped` event."""
|
||||
await self._call_event_handler(
|
||||
"on_user_turn_stopped",
|
||||
UserTurnStoppedParams(enable_user_speaking_frames=self._enable_user_speaking_frames),
|
||||
|
||||
104
src/pipecat/turns/user_stop/deferred_user_turn_stop_strategy.py
Normal file
104
src/pipecat/turns/user_stop/deferred_user_turn_stop_strategy.py
Normal file
@@ -0,0 +1,104 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Wrapper that defers a stop strategy's finalization to another strategy."""
|
||||
|
||||
from pipecat.frames.frames import Frame
|
||||
from pipecat.turns.types import ProcessFrameResult
|
||||
from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy
|
||||
from pipecat.utils.asyncio.task_manager import BaseTaskManager
|
||||
|
||||
|
||||
class DeferredUserTurnStopStrategy(BaseUserTurnStopStrategy):
|
||||
"""Wraps a stop strategy and suppresses its ``on_user_turn_stopped`` event.
|
||||
|
||||
Event subscriptions added to the wrapper are forwarded directly to
|
||||
the inner strategy, except for ``on_user_turn_stopped``, which is
|
||||
dropped. The inner strategy's frame-side and inference-triggered
|
||||
events therefore reach external listeners (the controller, etc.)
|
||||
unchanged; finalization is left to another strategy in the chain
|
||||
such as ``LLMTurnCompletionUserTurnStopStrategy``.
|
||||
|
||||
Use the :func:`deferred` helper for ergonomic construction::
|
||||
|
||||
stop=[
|
||||
deferred(TurnAnalyzerUserTurnStopStrategy(turn_analyzer=...)),
|
||||
LLMTurnCompletionUserTurnStopStrategy(),
|
||||
]
|
||||
"""
|
||||
|
||||
def __init__(self, inner: BaseUserTurnStopStrategy, **kwargs):
|
||||
"""Initialize the deferred wrapper.
|
||||
|
||||
Args:
|
||||
inner: The strategy whose finalization should be deferred.
|
||||
**kwargs: Additional keyword arguments forwarded to the base
|
||||
class.
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
self._inner = inner
|
||||
|
||||
@property
|
||||
def inner(self) -> BaseUserTurnStopStrategy:
|
||||
"""Return the wrapped strategy."""
|
||||
return self._inner
|
||||
|
||||
def add_event_handler(self, event_name: str, handler):
|
||||
"""Forward event subscriptions to the inner strategy.
|
||||
|
||||
``on_user_turn_stopped`` is silently dropped — that's the whole
|
||||
point of the wrapper. Every other event handler is attached to
|
||||
the inner strategy directly, so the inner's events reach the
|
||||
listener without any per-event proxy method on the wrapper.
|
||||
"""
|
||||
if event_name == "on_user_turn_stopped":
|
||||
return
|
||||
self._inner.add_event_handler(event_name, handler)
|
||||
|
||||
async def setup(self, task_manager: BaseTaskManager):
|
||||
"""Set up the inner strategy."""
|
||||
await super().setup(task_manager)
|
||||
await self._inner.setup(task_manager)
|
||||
|
||||
async def cleanup(self):
|
||||
"""Clean up the inner strategy."""
|
||||
await super().cleanup()
|
||||
await self._inner.cleanup()
|
||||
|
||||
async def reset(self):
|
||||
"""Reset the inner strategy for a new user turn."""
|
||||
await super().reset()
|
||||
await self._inner.reset()
|
||||
|
||||
async def process_frame(self, frame: Frame) -> ProcessFrameResult | None:
|
||||
"""Forward frame processing to the inner strategy."""
|
||||
return await self._inner.process_frame(frame)
|
||||
|
||||
|
||||
def deferred(strategy: BaseUserTurnStopStrategy) -> DeferredUserTurnStopStrategy:
|
||||
"""Defer this stop strategy's finalization to another strategy.
|
||||
|
||||
Wraps ``strategy`` in a :class:`DeferredUserTurnStopStrategy`: the
|
||||
inner strategy continues to drive inference-triggered events, but
|
||||
its ``on_user_turn_stopped`` event is suppressed. Use when another
|
||||
strategy in the chain (e.g.
|
||||
``LLMTurnCompletionUserTurnStopStrategy``) owns finalization.
|
||||
|
||||
Example::
|
||||
|
||||
stop=[
|
||||
deferred(TurnAnalyzerUserTurnStopStrategy(turn_analyzer=...)),
|
||||
LLMTurnCompletionUserTurnStopStrategy(),
|
||||
]
|
||||
|
||||
Args:
|
||||
strategy: The stop strategy to defer.
|
||||
|
||||
Returns:
|
||||
A wrapper that exposes the inner strategy's behavior with
|
||||
finalization suppressed.
|
||||
"""
|
||||
return DeferredUserTurnStopStrategy(strategy)
|
||||
@@ -0,0 +1,92 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""User turn stop strategy gated on the LLM's turn-completion verdict."""
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
LLMUpdateSettingsFrame,
|
||||
StartFrame,
|
||||
UserTurnCompletedFrame,
|
||||
)
|
||||
from pipecat.services.settings import LLMSettings
|
||||
from pipecat.turns.types import ProcessFrameResult
|
||||
from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy
|
||||
from pipecat.turns.user_turn_completion_mixin import UserTurnCompletionConfig
|
||||
|
||||
|
||||
class LLMTurnCompletionUserTurnStopStrategy(BaseUserTurnStopStrategy):
|
||||
"""User turn stop strategy that finalizes only when the LLM agrees.
|
||||
|
||||
This strategy lets another stop strategy (e.g. smart-turn analyzer)
|
||||
trigger LLM inference, then defers the public ``on_user_turn_stopped``
|
||||
event until the LLM emits a ``UserTurnCompletedFrame``. On
|
||||
``incomplete_short`` / ``incomplete_long`` markers the
|
||||
:class:`~pipecat.turns.user_turn_completion_mixin.UserTurnCompletionLLMServiceMixin`
|
||||
re-prompts the LLM internally and no completion frame is emitted.
|
||||
|
||||
To use this strategy, install it alongside one or more upstream stop
|
||||
strategies in ``UserTurnStrategies.stop`` and wrap those upstream
|
||||
strategies with :func:`~pipecat.turns.user_stop.deferred` so they
|
||||
fire only ``on_user_turn_inference_triggered`` and leave
|
||||
finalization to this strategy. The aggregator's deprecation path
|
||||
for ``filter_incomplete_user_turns`` does this rewiring
|
||||
automatically.
|
||||
|
||||
If the LLM never returns a completion frame (malformed output,
|
||||
unreachable service, etc.), the controller's
|
||||
``user_turn_stop_timeout`` watchdog is the safety net — it fires
|
||||
``on_user_turn_stopped`` after no activity for that many seconds.
|
||||
Tune ``user_turn_stop_timeout`` higher if your LLM regularly takes
|
||||
longer than the default to respond.
|
||||
|
||||
On ``StartFrame`` the strategy pushes an ``LLMUpdateSettingsFrame``
|
||||
upstream that enables ``filter_incomplete_user_turns`` on the LLM
|
||||
service and seeds the
|
||||
:class:`~pipecat.turns.user_turn_completion_mixin.UserTurnCompletionConfig`.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
config: UserTurnCompletionConfig | None = None,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the LLM turn-completion stop strategy.
|
||||
|
||||
Args:
|
||||
config: Configuration applied to the LLM via the
|
||||
``filter_incomplete_user_turns`` setting on
|
||||
``StartFrame``. Defaults to ``UserTurnCompletionConfig()``.
|
||||
**kwargs: Additional keyword arguments forwarded to the base
|
||||
class.
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
self._config = config or UserTurnCompletionConfig()
|
||||
|
||||
@property
|
||||
def config(self) -> UserTurnCompletionConfig:
|
||||
"""Return the configured ``UserTurnCompletionConfig``."""
|
||||
return self._config
|
||||
|
||||
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
|
||||
"""Observe frames to drive the finalization decision."""
|
||||
if isinstance(frame, StartFrame):
|
||||
await self._configure_llm()
|
||||
elif isinstance(frame, UserTurnCompletedFrame):
|
||||
await self.trigger_user_turn_finalized()
|
||||
|
||||
return ProcessFrameResult.CONTINUE
|
||||
|
||||
async def _configure_llm(self):
|
||||
await self.push_frame(
|
||||
LLMUpdateSettingsFrame(
|
||||
delta=LLMSettings(
|
||||
filter_incomplete_user_turns=True,
|
||||
user_turn_completion_config=self._config,
|
||||
)
|
||||
)
|
||||
)
|
||||
@@ -25,6 +25,7 @@ from pipecat.frames.frames import (
|
||||
LLMMessagesAppendFrame,
|
||||
LLMRunFrame,
|
||||
LLMTextFrame,
|
||||
UserTurnCompletedFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
|
||||
@@ -279,7 +280,7 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor):
|
||||
await asyncio.sleep(timeout)
|
||||
|
||||
# Timeout expired - reset state before prompting LLM
|
||||
logger.info(f"Incomplete {incomplete_type} timeout expired, prompting LLM")
|
||||
logger.debug(f"Incomplete {incomplete_type} timeout expired, prompting LLM")
|
||||
await self._turn_reset()
|
||||
self._incomplete_timeout_task = None
|
||||
self._incomplete_type = None
|
||||
@@ -402,6 +403,10 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor):
|
||||
)
|
||||
self._turn_suppressed = True
|
||||
|
||||
# No UserTurnCompletedFrame is broadcast here: the turn is
|
||||
# explicitly not complete. The re-prompt path is driven by
|
||||
# this mixin's own timeout.
|
||||
|
||||
# Push the marker with skip_tts=True so it's added to context (maintains
|
||||
# conversation continuity per prompt instructions) but not spoken by TTS
|
||||
frame = LLMTextFrame(self._turn_text_buffer)
|
||||
@@ -416,6 +421,13 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor):
|
||||
if USER_TURN_COMPLETE_MARKER in self._turn_text_buffer:
|
||||
logger.debug(f"COMPLETE ({USER_TURN_COMPLETE_MARKER}) detected, pushing buffered text")
|
||||
|
||||
# Broadcast that the user turn is complete so a stop strategy
|
||||
# gating finalization on this signal (e.g.
|
||||
# LLMTurnCompletionUserTurnStopStrategy) can fire
|
||||
# `on_user_turn_stopped`. Must fire before the LLMTextFrame so
|
||||
# downstream consumers see the signal before the response.
|
||||
await self.broadcast_frame(UserTurnCompletedFrame)
|
||||
|
||||
# Split buffer at the marker to handle cases where marker and text
|
||||
# arrive in the same chunk (e.g., "✓ Hello!" from some LLMs)
|
||||
marker_pos = self._turn_text_buffer.index(USER_TURN_COMPLETE_MARKER)
|
||||
|
||||
@@ -38,7 +38,11 @@ class UserTurnController(BaseObject):
|
||||
Event handlers available:
|
||||
|
||||
- on_user_turn_started: Emitted when a user turn starts.
|
||||
- on_user_turn_stopped: Emitted when a user turn stops.
|
||||
- on_user_turn_inference_triggered: Emitted when enough signal exists to
|
||||
start LLM inference. Fires together with `on_user_turn_stopped` for
|
||||
most strategies; fires alone when a downstream strategy gates
|
||||
finalization on the LLM's verdict.
|
||||
- on_user_turn_stopped: Emitted when a user turn is semantically final.
|
||||
- on_user_turn_stop_timeout: Emitted if no stop strategy triggers before timeout.
|
||||
- on_push_frame: Emitted when a strategy wants to push a frame.
|
||||
- on_broadcast_frame: Emitted when a strategy wants to broadcast a frame.
|
||||
@@ -49,6 +53,10 @@ class UserTurnController(BaseObject):
|
||||
async def on_user_turn_started(controller, strategy: BaseUserTurnStartStrategy, params: UserTurnStartedParams):
|
||||
...
|
||||
|
||||
@controller.event_handler("on_user_turn_inference_triggered")
|
||||
async def on_user_turn_inference_triggered(controller, strategy: BaseUserTurnStopStrategy):
|
||||
...
|
||||
|
||||
@controller.event_handler("on_user_turn_stopped")
|
||||
async def on_user_turn_stopped(controller, strategy: BaseUserTurnStopStrategy, params: UserTurnStoppedParams):
|
||||
...
|
||||
@@ -95,6 +103,7 @@ class UserTurnController(BaseObject):
|
||||
self._register_event_handler("on_push_frame", sync=True)
|
||||
self._register_event_handler("on_broadcast_frame", sync=True)
|
||||
self._register_event_handler("on_user_turn_started", sync=True)
|
||||
self._register_event_handler("on_user_turn_inference_triggered", sync=True)
|
||||
self._register_event_handler("on_user_turn_stopped", sync=True)
|
||||
self._register_event_handler("on_user_turn_stop_timeout", sync=True)
|
||||
self._register_event_handler("on_reset_aggregation", sync=True)
|
||||
@@ -186,6 +195,9 @@ class UserTurnController(BaseObject):
|
||||
await s.setup(self.task_manager)
|
||||
s.add_event_handler("on_push_frame", self._on_push_frame)
|
||||
s.add_event_handler("on_broadcast_frame", self._on_broadcast_frame)
|
||||
s.add_event_handler(
|
||||
"on_user_turn_inference_triggered", self._on_user_turn_inference_triggered
|
||||
)
|
||||
s.add_event_handler("on_user_turn_stopped", self._on_user_turn_stopped)
|
||||
|
||||
async def _cleanup_strategies(self):
|
||||
@@ -246,6 +258,9 @@ class UserTurnController(BaseObject):
|
||||
):
|
||||
await self._trigger_user_turn_start(strategy, params)
|
||||
|
||||
async def _on_user_turn_inference_triggered(self, strategy: BaseUserTurnStopStrategy):
|
||||
await self._trigger_user_turn_inference_triggered(strategy)
|
||||
|
||||
async def _on_user_turn_stopped(
|
||||
self, strategy: BaseUserTurnStopStrategy, params: UserTurnStoppedParams
|
||||
):
|
||||
@@ -274,6 +289,20 @@ class UserTurnController(BaseObject):
|
||||
|
||||
await self._call_event_handler("on_user_turn_started", strategy, params)
|
||||
|
||||
async def _trigger_user_turn_inference_triggered(
|
||||
self, strategy: BaseUserTurnStopStrategy | None
|
||||
):
|
||||
# Inference-triggered fires only while a turn is active. The turn
|
||||
# remains active afterward — only `on_user_turn_stopped` flips state.
|
||||
if not self._user_turn:
|
||||
return
|
||||
|
||||
# Re-arm the stop watchdog so a stuck turn (inference fired but
|
||||
# finalization never arrives) still times out and finalizes.
|
||||
self._user_turn_stop_timeout_event.set()
|
||||
|
||||
await self._call_event_handler("on_user_turn_inference_triggered", strategy)
|
||||
|
||||
async def _trigger_user_turn_stop(
|
||||
self, strategy: BaseUserTurnStopStrategy | None, params: UserTurnStoppedParams
|
||||
):
|
||||
|
||||
@@ -35,7 +35,11 @@ class UserTurnProcessor(FrameProcessor):
|
||||
Event handlers available:
|
||||
|
||||
- on_user_turn_started: Emitted when a user turn starts.
|
||||
- on_user_turn_stopped: Emitted when a user turn stops.
|
||||
- on_user_turn_inference_triggered: Emitted when enough signal exists to
|
||||
start LLM inference. Fires together with `on_user_turn_stopped` for
|
||||
most strategies; fires alone when a downstream strategy gates
|
||||
finalization on the LLM's verdict.
|
||||
- on_user_turn_stopped: Emitted when a user turn is semantically final.
|
||||
- on_user_turn_stop_timeout: Emitted if no stop strategy triggers before timeout.
|
||||
- on_user_turn_idle: Emitted when the user has been idle for the configured timeout.
|
||||
|
||||
@@ -45,6 +49,10 @@ class UserTurnProcessor(FrameProcessor):
|
||||
async def on_user_turn_started(processor, strategy: BaseUserTurnStartStrategy):
|
||||
...
|
||||
|
||||
@processor.event_handler("on_user_turn_inference_triggered")
|
||||
async def on_user_turn_inference_triggered(processor, strategy: BaseUserTurnStopStrategy):
|
||||
...
|
||||
|
||||
@processor.event_handler("on_user_turn_stopped")
|
||||
async def on_user_turn_stopped(processor, strategy: BaseUserTurnStopStrategy):
|
||||
...
|
||||
@@ -85,6 +93,7 @@ class UserTurnProcessor(FrameProcessor):
|
||||
self._register_event_handler("on_user_turn_stopped")
|
||||
self._register_event_handler("on_user_turn_stop_timeout")
|
||||
self._register_event_handler("on_user_turn_idle")
|
||||
self._register_event_handler("on_user_turn_inference_triggered")
|
||||
|
||||
self._user_turn_controller = UserTurnController(
|
||||
user_turn_strategies=user_turn_strategies or UserTurnStrategies(),
|
||||
@@ -101,6 +110,9 @@ class UserTurnProcessor(FrameProcessor):
|
||||
self._user_turn_controller.add_event_handler(
|
||||
"on_user_turn_stop_timeout", self._on_user_turn_stop_timeout
|
||||
)
|
||||
self._user_turn_controller.add_event_handler(
|
||||
"on_user_turn_inference_triggered", self._on_user_turn_inference_triggered
|
||||
)
|
||||
|
||||
self._user_idle_controller = UserIdleController(user_idle_timeout=user_idle_timeout)
|
||||
self._user_idle_controller.add_event_handler("on_user_turn_idle", self._on_user_turn_idle)
|
||||
@@ -204,3 +216,11 @@ class UserTurnProcessor(FrameProcessor):
|
||||
|
||||
async def _on_user_turn_idle(self, controller):
|
||||
await self._call_event_handler("on_user_turn_idle")
|
||||
|
||||
async def _on_user_turn_inference_triggered(
|
||||
self,
|
||||
controller: UserTurnController,
|
||||
strategy: BaseUserTurnStopStrategy,
|
||||
):
|
||||
logger.debug(f"{self}: User turn inference triggered (strategy: {strategy})")
|
||||
await self._call_event_handler("on_user_turn_inference_triggered", strategy)
|
||||
|
||||
@@ -17,8 +17,11 @@ from pipecat.turns.user_start import (
|
||||
from pipecat.turns.user_stop import (
|
||||
BaseUserTurnStopStrategy,
|
||||
ExternalUserTurnStopStrategy,
|
||||
LLMTurnCompletionUserTurnStopStrategy,
|
||||
TurnAnalyzerUserTurnStopStrategy,
|
||||
deferred,
|
||||
)
|
||||
from pipecat.turns.user_turn_completion_mixin import UserTurnCompletionConfig
|
||||
|
||||
|
||||
def default_user_turn_start_strategies() -> list[BaseUserTurnStartStrategy]:
|
||||
@@ -48,6 +51,42 @@ def default_user_turn_stop_strategies() -> list[BaseUserTurnStopStrategy]:
|
||||
return [TurnAnalyzerUserTurnStopStrategy(turn_analyzer=LocalSmartTurnAnalyzerV3())]
|
||||
|
||||
|
||||
def llm_completion_user_turn_stop_strategies(
|
||||
strategies: list[BaseUserTurnStopStrategy] | None = None,
|
||||
*,
|
||||
config: UserTurnCompletionConfig | None = None,
|
||||
) -> list[BaseUserTurnStopStrategy]:
|
||||
"""Build a stop-strategy list gated on the LLM's turn-completion verdict.
|
||||
|
||||
Wraps ``strategies`` with :func:`deferred` so they trigger inference
|
||||
but don't fire ``on_user_turn_stopped`` themselves, then appends
|
||||
:class:`~pipecat.turns.user_stop.LLMTurnCompletionUserTurnStopStrategy`
|
||||
as the finalizer. Use as the ``stop`` field of a
|
||||
:class:`UserTurnStrategies`::
|
||||
|
||||
UserTurnStrategies(
|
||||
stop=llm_completion_user_turn_stop_strategies(),
|
||||
)
|
||||
|
||||
Args:
|
||||
strategies: Stop strategies that should drive inference. If
|
||||
None, uses :func:`default_user_turn_stop_strategies`.
|
||||
config: Optional configuration applied to the LLM via the
|
||||
``filter_incomplete_user_turns`` setting. Customizes the
|
||||
turn-completion instructions, incomplete-turn timeouts, and
|
||||
re-prompts.
|
||||
|
||||
Returns:
|
||||
``[deferred(s) for s in strategies] +
|
||||
[LLMTurnCompletionUserTurnStopStrategy(config=config)]``.
|
||||
"""
|
||||
strategies = strategies if strategies is not None else default_user_turn_stop_strategies()
|
||||
return [
|
||||
*(deferred(s) for s in strategies),
|
||||
LLMTurnCompletionUserTurnStopStrategy(config=config),
|
||||
]
|
||||
|
||||
|
||||
@dataclass
|
||||
class UserTurnStrategies:
|
||||
"""Container for user turn start and stop strategies.
|
||||
|
||||
@@ -179,8 +179,16 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase):
|
||||
assert context.messages[0]["content"] == "Hi there!"
|
||||
|
||||
async def test_llm_messages_update_does_not_inject_turn_completion_into_context(self):
|
||||
from pipecat.turns.user_turn_strategies import (
|
||||
llm_completion_user_turn_stop_strategies,
|
||||
)
|
||||
|
||||
context = LLMContext()
|
||||
params = LLMUserAggregatorParams(filter_incomplete_user_turns=True)
|
||||
params = LLMUserAggregatorParams(
|
||||
user_turn_strategies=UserTurnStrategies(
|
||||
stop=llm_completion_user_turn_stop_strategies(),
|
||||
),
|
||||
)
|
||||
pipeline = Pipeline([LLMUserAggregator(context, params=params)])
|
||||
|
||||
new_messages = [
|
||||
@@ -291,8 +299,8 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase):
|
||||
UserStartedSpeakingFrame,
|
||||
InterruptionFrame,
|
||||
VADUserStoppedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
LLMContextFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
]
|
||||
await run_test(
|
||||
pipeline,
|
||||
@@ -563,6 +571,132 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase):
|
||||
expected_down_frames=[SpeechControlParamsFrame],
|
||||
)
|
||||
|
||||
async def test_inference_triggered_event_fires_on_default_strategies(self):
|
||||
"""Default flow fires inference-triggered before stopped, both with the same strategy."""
|
||||
from pipecat.frames.frames import UserTurnCompletedFrame # noqa: F401
|
||||
|
||||
context = LLMContext()
|
||||
user_aggregator = LLMUserAggregator(
|
||||
context,
|
||||
params=LLMUserAggregatorParams(
|
||||
user_turn_strategies=UserTurnStrategies(
|
||||
stop=[
|
||||
SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT)
|
||||
]
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
events: list[str] = []
|
||||
|
||||
@user_aggregator.event_handler("on_user_turn_inference_triggered")
|
||||
async def on_inference_triggered(aggregator, strategy):
|
||||
events.append("inference_triggered")
|
||||
|
||||
@user_aggregator.event_handler("on_user_turn_stopped")
|
||||
async def on_stopped(aggregator, strategy, message):
|
||||
events.append(f"stopped:{message.content}")
|
||||
|
||||
pipeline = Pipeline([user_aggregator])
|
||||
frames_to_send = [
|
||||
VADUserStartedSpeakingFrame(),
|
||||
TranscriptionFrame(text="Hi!", user_id="", timestamp="now"),
|
||||
SleepFrame(),
|
||||
VADUserStoppedSpeakingFrame(),
|
||||
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.1),
|
||||
]
|
||||
await run_test(pipeline, frames_to_send=frames_to_send)
|
||||
|
||||
self.assertEqual(events, ["inference_triggered", "stopped:Hi!"])
|
||||
|
||||
async def test_filter_incomplete_user_turns_emits_deprecation_warning(self):
|
||||
"""Setting the legacy flag emits a DeprecationWarning."""
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings(record=True) as w:
|
||||
warnings.simplefilter("always")
|
||||
LLMUserAggregatorParams(filter_incomplete_user_turns=True)
|
||||
matched = [
|
||||
x
|
||||
for x in w
|
||||
if issubclass(x.category, DeprecationWarning)
|
||||
and "filter_incomplete_user_turns" in str(x.message)
|
||||
]
|
||||
self.assertTrue(matched, "expected a DeprecationWarning")
|
||||
|
||||
async def test_filter_incomplete_user_turns_installs_strategy(self):
|
||||
"""Legacy flag wraps existing stops with deferred() and appends the LLM strategy."""
|
||||
import warnings
|
||||
|
||||
from pipecat.turns.user_stop import (
|
||||
DeferredUserTurnStopStrategy,
|
||||
LLMTurnCompletionUserTurnStopStrategy,
|
||||
SpeechTimeoutUserTurnStopStrategy,
|
||||
)
|
||||
|
||||
existing = SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT)
|
||||
|
||||
context = LLMContext()
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
params = LLMUserAggregatorParams(
|
||||
filter_incomplete_user_turns=True,
|
||||
user_turn_strategies=UserTurnStrategies(stop=[existing]),
|
||||
)
|
||||
aggregator = LLMUserAggregator(context, params=params)
|
||||
|
||||
stop_strategies = aggregator._params.user_turn_strategies.stop
|
||||
self.assertEqual(len(stop_strategies), 2)
|
||||
self.assertIsInstance(stop_strategies[0], DeferredUserTurnStopStrategy)
|
||||
self.assertIs(stop_strategies[0].inner, existing)
|
||||
self.assertIsInstance(stop_strategies[1], LLMTurnCompletionUserTurnStopStrategy)
|
||||
|
||||
async def test_llm_completion_strategy_finalizes_on_complete_marker(self):
|
||||
"""LLMTurnCompletionUserTurnStopStrategy finalizes only on UserTurnCompletedFrame(complete)."""
|
||||
from pipecat.frames.frames import UserTurnCompletedFrame
|
||||
from pipecat.turns.user_stop import LLMTurnCompletionUserTurnStopStrategy, deferred
|
||||
|
||||
gating = LLMTurnCompletionUserTurnStopStrategy()
|
||||
upstream = deferred(
|
||||
SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT)
|
||||
)
|
||||
context = LLMContext()
|
||||
user_aggregator = LLMUserAggregator(
|
||||
context,
|
||||
params=LLMUserAggregatorParams(
|
||||
user_turn_strategies=UserTurnStrategies(stop=[upstream, gating]),
|
||||
),
|
||||
)
|
||||
|
||||
events: list[str] = []
|
||||
|
||||
@user_aggregator.event_handler("on_user_turn_inference_triggered")
|
||||
async def on_inference_triggered(aggregator, strategy):
|
||||
events.append("inference_triggered")
|
||||
|
||||
@user_aggregator.event_handler("on_user_turn_stopped")
|
||||
async def on_stopped(aggregator, strategy, message):
|
||||
events.append("stopped")
|
||||
|
||||
pipeline = Pipeline([user_aggregator])
|
||||
|
||||
# Drive the pipeline. Inference fires after the upstream
|
||||
# strategy's timeout. Stop fires only when UserTurnCompletedFrame
|
||||
# arrives (producer absence == "not yet complete").
|
||||
frames_to_send = [
|
||||
VADUserStartedSpeakingFrame(),
|
||||
TranscriptionFrame(text="Hi", user_id="", timestamp="now"),
|
||||
SleepFrame(),
|
||||
VADUserStoppedSpeakingFrame(),
|
||||
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.1),
|
||||
# At this point inference_triggered should have fired but NOT stopped.
|
||||
UserTurnCompletedFrame(),
|
||||
SleepFrame(),
|
||||
]
|
||||
await run_test(pipeline, frames_to_send=frames_to_send)
|
||||
|
||||
self.assertEqual(events, ["inference_triggered", "stopped"])
|
||||
|
||||
|
||||
class TestLLMAssistantAggregator(unittest.IsolatedAsyncioTestCase):
|
||||
async def test_empty(self):
|
||||
|
||||
@@ -8,7 +8,7 @@ import unittest
|
||||
import unittest.mock
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
from pipecat.frames.frames import LLMFullResponseEndFrame, LLMTextFrame
|
||||
from pipecat.frames.frames import LLMFullResponseEndFrame, LLMTextFrame, UserTurnCompletedFrame
|
||||
from pipecat.processors.frame_processor import FrameProcessor
|
||||
from pipecat.services.llm_service import LLMService
|
||||
from pipecat.services.settings import LLMSettings
|
||||
@@ -44,21 +44,25 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase
|
||||
# Simulate LLM generating: "✓ Hello there!"
|
||||
await processor._push_turn_text(f"{USER_TURN_COMPLETE_MARKER} Hello there!")
|
||||
|
||||
# Should have 2 text frames: marker (skip_tts) and content (normal)
|
||||
self.assertEqual(len(pushed_frames), 2)
|
||||
# Two LLMTextFrames: marker (skip_tts) and content (normal). The
|
||||
# broadcast also pushes UserTurnCompletedFrame upstream + downstream.
|
||||
text_frames = [f for f in pushed_frames if isinstance(f, LLMTextFrame)]
|
||||
self.assertEqual(len(text_frames), 2)
|
||||
|
||||
# First frame should be the marker with skip_tts=True
|
||||
self.assertIsInstance(pushed_frames[0], LLMTextFrame)
|
||||
self.assertEqual(pushed_frames[0].text, USER_TURN_COMPLETE_MARKER)
|
||||
self.assertTrue(pushed_frames[0].skip_tts)
|
||||
# First text frame should be the marker with skip_tts=True
|
||||
self.assertEqual(text_frames[0].text, USER_TURN_COMPLETE_MARKER)
|
||||
self.assertTrue(text_frames[0].skip_tts)
|
||||
|
||||
# Second frame should be the actual text without skip_tts
|
||||
self.assertIsInstance(pushed_frames[1], LLMTextFrame)
|
||||
self.assertEqual(pushed_frames[1].text, "Hello there!")
|
||||
self.assertFalse(pushed_frames[1].skip_tts)
|
||||
# Second text frame should be the actual text without skip_tts
|
||||
self.assertEqual(text_frames[1].text, "Hello there!")
|
||||
self.assertFalse(text_frames[1].skip_tts)
|
||||
|
||||
# UserTurnCompletedFrame broadcast in both directions.
|
||||
completed = [f for f in pushed_frames if isinstance(f, UserTurnCompletedFrame)]
|
||||
self.assertEqual(len(completed), 2)
|
||||
|
||||
async def test_incomplete_short_marker_suppresses_text(self):
|
||||
"""Test that ○ marker suppresses text with skip_tts."""
|
||||
"""Test that ○ marker suppresses text with skip_tts and emits no completed frame."""
|
||||
processor = MockProcessor()
|
||||
|
||||
pushed_frames = []
|
||||
@@ -70,14 +74,17 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase
|
||||
|
||||
await processor._push_turn_text(USER_TURN_INCOMPLETE_SHORT_MARKER)
|
||||
|
||||
# Should have 1 text frame with skip_tts=True
|
||||
self.assertEqual(len(pushed_frames), 1)
|
||||
self.assertIsInstance(pushed_frames[0], LLMTextFrame)
|
||||
self.assertEqual(pushed_frames[0].text, USER_TURN_INCOMPLETE_SHORT_MARKER)
|
||||
self.assertTrue(pushed_frames[0].skip_tts)
|
||||
text_frames = [f for f in pushed_frames if isinstance(f, LLMTextFrame)]
|
||||
self.assertEqual(len(text_frames), 1)
|
||||
self.assertEqual(text_frames[0].text, USER_TURN_INCOMPLETE_SHORT_MARKER)
|
||||
self.assertTrue(text_frames[0].skip_tts)
|
||||
|
||||
# Incomplete markers do not emit UserTurnCompletedFrame.
|
||||
completed = [f for f in pushed_frames if isinstance(f, UserTurnCompletedFrame)]
|
||||
self.assertEqual(len(completed), 0)
|
||||
|
||||
async def test_incomplete_long_marker_suppresses_text(self):
|
||||
"""Test that ◐ marker suppresses text with skip_tts."""
|
||||
"""Test that ◐ marker suppresses text with skip_tts and emits no completed frame."""
|
||||
processor = MockProcessor()
|
||||
|
||||
pushed_frames = []
|
||||
@@ -89,11 +96,13 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase
|
||||
|
||||
await processor._push_turn_text(USER_TURN_INCOMPLETE_LONG_MARKER)
|
||||
|
||||
# Should have 1 text frame with skip_tts=True
|
||||
self.assertEqual(len(pushed_frames), 1)
|
||||
self.assertIsInstance(pushed_frames[0], LLMTextFrame)
|
||||
self.assertEqual(pushed_frames[0].text, USER_TURN_INCOMPLETE_LONG_MARKER)
|
||||
self.assertTrue(pushed_frames[0].skip_tts)
|
||||
text_frames = [f for f in pushed_frames if isinstance(f, LLMTextFrame)]
|
||||
self.assertEqual(len(text_frames), 1)
|
||||
self.assertEqual(text_frames[0].text, USER_TURN_INCOMPLETE_LONG_MARKER)
|
||||
self.assertTrue(text_frames[0].skip_tts)
|
||||
|
||||
completed = [f for f in pushed_frames if isinstance(f, UserTurnCompletedFrame)]
|
||||
self.assertEqual(len(completed), 0)
|
||||
|
||||
async def test_text_buffered_until_marker_found(self):
|
||||
"""Test that text is buffered until a marker is detected."""
|
||||
@@ -114,8 +123,10 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase
|
||||
# Now send the complete marker
|
||||
await processor._push_turn_text(f" {USER_TURN_COMPLETE_MARKER} How are you?")
|
||||
|
||||
# Now frames should be pushed
|
||||
self.assertEqual(len(pushed_frames), 2)
|
||||
# Two LLMTextFrames pushed (marker + content) plus the
|
||||
# UserTurnCompletedFrame broadcast.
|
||||
text_frames = [f for f in pushed_frames if isinstance(f, LLMTextFrame)]
|
||||
self.assertEqual(len(text_frames), 2)
|
||||
|
||||
async def test_turn_state_reset_after_llm_full_response_end_frame(self):
|
||||
"""Test that _turn_complete_found is reset when LLMFullResponseEndFrame is pushed."""
|
||||
|
||||
@@ -19,7 +19,7 @@ from pipecat.turns.user_start import VADUserTurnStartStrategy
|
||||
from pipecat.turns.user_start.min_words_user_turn_start_strategy import (
|
||||
MinWordsUserTurnStartStrategy,
|
||||
)
|
||||
from pipecat.turns.user_stop import SpeechTimeoutUserTurnStopStrategy
|
||||
from pipecat.turns.user_stop import SpeechTimeoutUserTurnStopStrategy, deferred
|
||||
from pipecat.turns.user_turn_controller import UserTurnController
|
||||
from pipecat.turns.user_turn_strategies import ExternalUserTurnStrategies, UserTurnStrategies
|
||||
from pipecat.utils.asyncio.task_manager import TaskManager, TaskManagerParams
|
||||
@@ -71,6 +71,66 @@ class TestUserTurnController(unittest.IsolatedAsyncioTestCase):
|
||||
await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1)
|
||||
self.assertTrue(should_stop)
|
||||
|
||||
async def test_inference_triggered_fires_alongside_stopped(self):
|
||||
"""Default strategies fire both inference-triggered and stopped, in order."""
|
||||
controller = UserTurnController(
|
||||
user_turn_strategies=UserTurnStrategies(
|
||||
stop=[SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT)],
|
||||
)
|
||||
)
|
||||
|
||||
await controller.setup(self.task_manager)
|
||||
|
||||
events: list[str] = []
|
||||
|
||||
@controller.event_handler("on_user_turn_inference_triggered")
|
||||
async def on_user_turn_inference_triggered(controller, strategy):
|
||||
events.append("inference_triggered")
|
||||
|
||||
@controller.event_handler("on_user_turn_stopped")
|
||||
async def on_user_turn_stopped(controller, strategy, params):
|
||||
events.append("stopped")
|
||||
|
||||
await controller.process_frame(VADUserStartedSpeakingFrame())
|
||||
await controller.process_frame(
|
||||
TranscriptionFrame(text="Hello!", user_id="", timestamp="now")
|
||||
)
|
||||
await controller.process_frame(VADUserStoppedSpeakingFrame())
|
||||
await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1)
|
||||
|
||||
self.assertEqual(events, ["inference_triggered", "stopped"])
|
||||
|
||||
async def test_deferred_wrapper_skips_stopped(self):
|
||||
"""A deferred() wrapper drops the inner strategy's on_user_turn_stopped event."""
|
||||
wrapped = deferred(
|
||||
SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT)
|
||||
)
|
||||
controller = UserTurnController(user_turn_strategies=UserTurnStrategies(stop=[wrapped]))
|
||||
|
||||
await controller.setup(self.task_manager)
|
||||
|
||||
events: list[str] = []
|
||||
|
||||
@controller.event_handler("on_user_turn_inference_triggered")
|
||||
async def on_user_turn_inference_triggered(controller, strategy):
|
||||
events.append("inference_triggered")
|
||||
|
||||
@controller.event_handler("on_user_turn_stopped")
|
||||
async def on_user_turn_stopped(controller, strategy, params):
|
||||
events.append("stopped")
|
||||
|
||||
await controller.process_frame(VADUserStartedSpeakingFrame())
|
||||
await controller.process_frame(
|
||||
TranscriptionFrame(text="Hello!", user_id="", timestamp="now")
|
||||
)
|
||||
await controller.process_frame(VADUserStoppedSpeakingFrame())
|
||||
await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1)
|
||||
|
||||
# The inner strategy fires inference-triggered (forwarded by the
|
||||
# wrapper). Finalization is suppressed, but the controller's
|
||||
# stop watchdog eventually fires `stopped`.
|
||||
self.assertEqual(events[0], "inference_triggered")
|
||||
|
||||
async def test_user_turn_start_reset(self):
|
||||
controller = UserTurnController(
|
||||
user_turn_strategies=UserTurnStrategies(
|
||||
|
||||
Reference in New Issue
Block a user