Merge pull request #4405 from pipecat-ai/aleix/user-turn-inference-event

Split user-turn-stop into inference-triggered and finalized events
This commit is contained in:
Aleix Conchillo Flaqué
2026-05-07 17:51:57 -07:00
committed by GitHub
22 changed files with 958 additions and 103 deletions

View File

@@ -0,0 +1 @@
- Added `LLMTurnCompletionUserTurnStopStrategy` in `pipecat.turns.user_stop`. When installed, the strategy gates `on_user_turn_stopped` on a `UserTurnInferenceCompletedFrame` (a new fieldless system frame emitted by any component that can judge turn completeness — e.g. the `UserTurnCompletionLLMServiceMixin` on `✓`). A `finalization_timeout` provides a safety net if no completion frame ever arrives.

View File

@@ -0,0 +1 @@
- Added `deferred(strategy)` and `DeferredUserTurnStopStrategy` in `pipecat.turns.user_stop`. Wraps a stop strategy so it fires only the inference-triggered event and suppresses `on_user_turn_stopped`, leaving finalization to another strategy in the chain such as `LLMTurnCompletionUserTurnStopStrategy`.

View File

@@ -0,0 +1 @@
- Added `FilterIncompleteUserTurnStrategies` in `pipecat.turns.user_turn_strategies` — a `UserTurnStrategies` specialization that wraps the detector chain with `deferred(...)` and appends `LLMTurnCompletionUserTurnStopStrategy` as the finalizer. Common case: `user_turn_strategies=FilterIncompleteUserTurnStrategies()`. Pass `config=UserTurnCompletionConfig(...)` to customize timeouts and prompts.

View File

@@ -0,0 +1 @@
- Added `ExternalUserTurnCompletionStopStrategy` in `pipecat.turns.user_stop` — a generic stop strategy that finalizes the user turn whenever a `UserTurnInferenceCompletedFrame` arrives, regardless of which component produced it. `LLMTurnCompletionUserTurnStopStrategy` now extends this base; future producers (Flux, custom end-of-turn classifiers, etc.) can use the base directly or subclass it to add producer-specific setup.

1
changelog/4405.added.md Normal file
View File

@@ -0,0 +1 @@
- Added `on_user_turn_inference_triggered`, a new event on the user turn controller, processor, aggregator and stop strategies that fires when a strategy has enough signal to start LLM inference. By default it fires together with `on_user_turn_stopped`; a gating strategy can fire only the inference-triggered event and defer finalization to a peer.

View File

@@ -0,0 +1 @@
- Deprecated `LLMUserAggregatorParams.filter_incomplete_user_turns`. Use `user_turn_strategies=FilterIncompleteUserTurnStrategies()` (or add `LLMTurnCompletionUserTurnStopStrategy` to a custom `user_turn_strategies.stop`) instead. Setting the legacy flag still works for one release: the aggregator emits a `DeprecationWarning` and rewires the strategies as if you had passed `FilterIncompleteUserTurnStrategies` directly.

1
changelog/4405.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed `on_user_turn_stopped` firing prematurely when `filter_incomplete_user_turns` was enabled. The event now fires only after the LLM confirms the user turn is complete (`✓`); previously the smart-turn detector's tentative stop was bubbling up before the LLM had a chance to veto it, causing observers, transcript appenders and UI indicators to receive an early — and sometimes duplicated — signal.

View File

@@ -10,7 +10,7 @@ Demonstrates LLM-based turn completion detection to suppress bot responses when
the user was cut off mid-thought. The LLM outputs one of three markers:
- ✓ (complete): User finished their thought, respond normally
- ○ (incomplete short): User was cut off, wait ~5s then prompt
- ◐ (incomplete long): User needs time to think, wait ~15s then prompt
- ◐ (incomplete long): User needs time to think, wait ~10s then prompt
When incomplete is detected, the bot's response is suppressed. After the timeout
expires, the LLM is automatically prompted to re-engage the user.
@@ -41,6 +41,7 @@ from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.turns.user_turn_strategies import FilterIncompleteUserTurnStrategies
load_dotenv(override=True)
@@ -83,23 +84,28 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
)
context = LLMContext()
# `FilterIncompleteUserTurnStrategies` pairs the default detector
# chain with `LLMTurnCompletionUserTurnStopStrategy`: detectors
# trigger LLM inference but the public `on_user_turn_stopped` event
# fires only when the LLM confirms ✓. The LLM marks each response
# with one of:
# ✓ = complete (respond normally)
# ○ = incomplete short (wait 5s, then prompt)
# ◐ = incomplete long (wait 10s, then prompt)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
vad_analyzer=SileroVADAnalyzer(),
# Enable turn completion filtering - the LLM will output:
# ✓ = complete (respond normally)
# ○ = incomplete short (wait 5s, then prompt)
# ◐ = incomplete long (wait 15s, then prompt)
filter_incomplete_user_turns=True,
# Optional: customize turn completion behavior
# turn_completion_config=TurnCompletionConfig(
# incomplete_short_timeout=5.0,
# incomplete_long_timeout=15.0,
# incomplete_short_prompt="Custom prompt...",
# incomplete_long_prompt="Custom prompt...",
# instructions="Custom turn completion instructions...",
# ),
user_turn_strategies=FilterIncompleteUserTurnStrategies(
# Optional: customize turn completion behavior
# config=UserTurnCompletionConfig(
# incomplete_short_timeout=5.0,
# incomplete_long_timeout=10.0,
# incomplete_short_prompt="Custom prompt...",
# incomplete_long_prompt="Custom prompt...",
# instructions="Custom turn completion instructions...",
# ),
),
),
)

View File

@@ -339,6 +339,40 @@ class LLMTextFrame(TextFrame):
self.includes_inter_frame_spaces = True
@dataclass
class LLMMarkerFrame(DataFrame):
"""Sideband marker emitted by an LLM service.
A marker is short, structured assistant output that should be
persisted in the conversation context but should not flow through
the standard text path (TTS, transcript). The assistant aggregator
writes the marker to the context so the LLM can self-condition on
prior markers on subsequent turns.
The primary use today is the ``filter_incomplete_user_turns``
protocol, where ``UserTurnCompletionLLMServiceMixin`` emits the
turn-completion markers ✓ / ○ / ◐ on every response. The frame is
intentionally generic so other components — STT services with
built-in turn signals, end-of-turn classifiers, custom annotations,
etc. — can use the same mechanism to inject sideband signals into
the assistant context.
Parameters:
marker: The marker payload (typically a short string such as a
single character).
append_to_context_immediately: If True, the marker is written
to the context as its own standalone assistant message as
soon as it's received. If False, the marker is appended to
the running assistant aggregation and flushed to the
context together with the following text as a single
message (e.g. for the ✓ case the context message ends up
as "✓ <response>").
"""
marker: str
append_to_context_immediately: bool = True
@dataclass
class AggregatedTextFrame(TextFrame):
"""Text frame representing an aggregation of TextFrames.
@@ -970,6 +1004,24 @@ class UserSpeakingFrame(SystemFrame):
pass
@dataclass
class UserTurnInferenceCompletedFrame(SystemFrame):
"""Frame indicating that the user turn is semantically complete.
Emitted by any component that can judge conversational turn
completeness — for example an LLM with turn-completion markers, an
STT service with built-in turn detection, or a dedicated
end-of-turn classifier. Stop strategies that gate the
user-turn-stop event on an external completeness signal (e.g.
``LLMTurnCompletionUserTurnStopStrategy``) consume this frame to
finalize the turn. Producers should emit this frame only when they
judge the turn complete; an absence of this frame means the turn is
not yet considered complete.
"""
pass
@dataclass
class VADUserStartedSpeakingFrame(SystemFrame):
"""Frame emitted when VAD definitively detects user started speaking.

View File

@@ -44,6 +44,7 @@ from pipecat.frames.frames import (
LLMContextSummaryRequestFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMarkerFrame,
LLMMessagesAppendFrame,
LLMMessagesTransformFrame,
LLMMessagesUpdateFrame,
@@ -53,7 +54,6 @@ from pipecat.frames.frames import (
LLMThoughtEndFrame,
LLMThoughtStartFrame,
LLMThoughtTextFrame,
LLMUpdateSettingsFrame,
StartFrame,
TextFrame,
TranscriptionFrame,
@@ -79,14 +79,16 @@ from pipecat.processors.aggregators.llm_context_summarizer import (
SummaryAppliedEvent,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.settings import LLMSettings
from pipecat.turns.user_idle_controller import UserIdleController
from pipecat.turns.user_mute import BaseUserMuteStrategy
from pipecat.turns.user_start import BaseUserTurnStartStrategy, UserTurnStartedParams
from pipecat.turns.user_stop import BaseUserTurnStopStrategy, UserTurnStoppedParams
from pipecat.turns.user_turn_completion_mixin import UserTurnCompletionConfig
from pipecat.turns.user_turn_controller import UserTurnController
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.turns.user_turn_strategies import (
FilterIncompleteUserTurnStrategies,
UserTurnStrategies,
)
from pipecat.utils.context.llm_context_summarization import (
LLMAutoContextSummarizationConfig,
LLMContextSummarizationConfig,
@@ -100,25 +102,6 @@ class LLMUserAggregatorParams:
"""Parameters for configuring LLM user aggregation behavior.
Parameters:
user_turn_strategies: User turn start and stop strategies.
user_mute_strategies: List of user mute strategies.
user_turn_stop_timeout: Time in seconds to wait before considering the
user's turn finished.
user_idle_timeout: Timeout in seconds for detecting user idle state.
The aggregator will emit an `on_user_turn_idle` event when the user
has been idle (not speaking) for this duration. Set to 0 to disable
idle detection.
vad_analyzer: Voice Activity Detection analyzer instance.
audio_idle_timeout: Timeout in seconds to force speech stop when
no audio frames are received while in SPEAKING state (e.g. user mutes
mic mid-speech). Set to 0 to disable. Defaults to 1.0.
filter_incomplete_user_turns: Whether to filter out incomplete user turns.
When enabled, the LLM outputs a turn completion marker at the start of
each response: ✓ (complete), ○ (incomplete short), or ◐ (incomplete long).
Incomplete responses are suppressed and timeouts trigger re-prompting.
user_turn_completion_config: Configuration for turn completion behavior including
custom instructions, timeouts, and prompts. Only used when
filter_incomplete_user_turns is True.
add_tool_change_messages: When True, on each ``LLMSetToolsFrame`` the
aggregator computes the diff against the currently advertised tools
and appends a developer-role message to the context describing
@@ -131,17 +114,75 @@ class LLMUserAggregatorParams:
(LLM-specific) tools are ignored. When using
``LLMContextAggregatorPair``, prefer setting this via its
``add_tool_change_messages`` argument instead. Defaults to False.
audio_idle_timeout: Timeout in seconds to force speech stop when
no audio frames are received while in SPEAKING state (e.g. user mutes
mic mid-speech). Set to 0 to disable. Defaults to 1.0.
user_turn_strategies: User turn start and stop strategies.
user_mute_strategies: List of user mute strategies.
user_turn_stop_timeout: Time in seconds to wait before considering the
user's turn finished.
user_idle_timeout: Timeout in seconds for detecting user idle state.
The aggregator will emit an `on_user_turn_idle` event when the user
has been idle (not speaking) for this duration. Set to 0 to disable
idle detection.
vad_analyzer: Voice Activity Detection analyzer instance.
filter_incomplete_user_turns: [DEPRECATED] Use
``user_turn_strategies=FilterIncompleteUserTurnStrategies()``
instead. When enabled, the LLM outputs a turn-completion
marker at the start of each response: ✓ (complete), ○
(incomplete short), or ◐ (incomplete long). Incomplete
responses are suppressed and timeouts trigger re-prompting.
.. deprecated:: 1.2.0
Use ``user_turn_strategies=FilterIncompleteUserTurnStrategies()``
instead. Will be removed in version 2.0.0.
user_turn_completion_config: [DEPRECATED] Configuration for turn
completion behavior including custom instructions, timeouts, and
prompts. Only used when filter_incomplete_user_turns is True
(deprecated path) — for the new strategy-based API, pass the config
directly to ``FilterIncompleteUserTurnStrategies(config=...)``.
.. deprecated:: 1.2.0
Pass the config directly to
``FilterIncompleteUserTurnStrategies(config=...)`` instead.
Will be removed in version 2.0.0.
"""
add_tool_change_messages: bool = False
audio_idle_timeout: float = 1.0
user_turn_strategies: UserTurnStrategies | None = None
user_mute_strategies: list[BaseUserMuteStrategy] = field(default_factory=list)
user_turn_stop_timeout: float = 5.0
user_idle_timeout: float = 0
vad_analyzer: VADAnalyzer | None = None
audio_idle_timeout: float = 1.0
filter_incomplete_user_turns: bool = False
user_turn_completion_config: UserTurnCompletionConfig | None = None
add_tool_change_messages: bool = False
def __post_init__(self):
if self.filter_incomplete_user_turns:
warnings.warn(
"LLMUserAggregatorParams.filter_incomplete_user_turns is deprecated. "
"Use user_turn_strategies=FilterIncompleteUserTurnStrategies() instead.",
DeprecationWarning,
stacklevel=2,
)
if self.user_turn_completion_config:
warnings.warn(
"LLMUserAggregatorParams.user_turn_completion_config is deprecated. "
"Use user_turn_strategies=FilterIncompleteUserTurnStrategies() instead.",
DeprecationWarning,
stacklevel=2,
)
if self.user_turn_completion_config is not None:
warnings.warn(
"LLMUserAggregatorParams.user_turn_completion_config is deprecated. "
"Pass the config directly to "
"FilterIncompleteUserTurnStrategies(config=...) instead.",
DeprecationWarning,
stacklevel=2,
)
@dataclass
@@ -178,6 +219,11 @@ class LLMAssistantAggregatorParams:
# ---------------------------------------------------------------------------
# Deprecated field names — kept for backward compatibility.
# Use enable_auto_context_summarization and auto_context_summarization_config instead.
#
# .. deprecated:: 1.2.0
# Use ``enable_auto_context_summarization`` and
# ``auto_context_summarization_config`` instead. Will be removed in
# version 2.0.0.
# ---------------------------------------------------------------------------
enable_context_summarization: bool | None = None
context_summarization_config: LLMContextSummarizationConfig | None = None
@@ -541,13 +587,33 @@ class LLMUserAggregator(LLMContextAggregator):
self._register_event_handler("on_user_turn_stopped")
self._register_event_handler("on_user_turn_stop_timeout")
self._register_event_handler("on_user_turn_idle")
self._register_event_handler("on_user_turn_inference_triggered")
self._register_event_handler("on_user_mute_started")
self._register_event_handler("on_user_mute_stopped")
user_turn_strategies = self._params.user_turn_strategies or UserTurnStrategies()
# Deprecated path: translate filter_incomplete_user_turns into
# the equivalent FilterIncompleteUserTurnStrategies wiring. The
# DeprecationWarning is emitted in LLMUserAggregatorParams.__post_init__.
if self._params.filter_incomplete_user_turns:
user_turn_strategies = FilterIncompleteUserTurnStrategies(
start=user_turn_strategies.start,
stop=user_turn_strategies.stop,
config=self._params.user_turn_completion_config,
)
self._params.user_turn_strategies = user_turn_strategies
self._user_is_muted = False
self._user_turn_start_timestamp = ""
# Full transcript across the user turn. Each
# `_on_user_turn_inference_triggered` push captures only the
# new segment since the previous push (push_aggregation resets
# `_aggregation` after writing to context); we accumulate those
# segments here so the eventual `on_user_turn_stopped` event
# surfaces the full turn transcript even when several
# inferences fire before finalization.
self._full_user_turn_aggregation: str | None = None
self._user_turn_controller = UserTurnController(
user_turn_strategies=user_turn_strategies,
@@ -558,6 +624,9 @@ class LLMUserAggregator(LLMContextAggregator):
self._user_turn_controller.add_event_handler(
"on_user_turn_started", self._on_user_turn_started
)
self._user_turn_controller.add_event_handler(
"on_user_turn_inference_triggered", self._on_user_turn_inference_triggered
)
self._user_turn_controller.add_event_handler(
"on_user_turn_stopped", self._on_user_turn_stopped
)
@@ -676,21 +745,6 @@ class LLMUserAggregator(LLMContextAggregator):
for s in self._params.user_mute_strategies:
await s.setup(self.task_manager)
# Enable incomplete turn filtering on the LLM if configured
if self._params.filter_incomplete_user_turns:
# Get config or use defaults
config = self._params.user_turn_completion_config or UserTurnCompletionConfig()
# Enable the feature on the LLM with config
await self.push_frame(
LLMUpdateSettingsFrame(
delta=LLMSettings(
filter_incomplete_user_turns=True,
user_turn_completion_config=config,
)
)
)
async def _stop(self, frame: EndFrame):
await self._maybe_emit_user_turn_stopped(on_session_end=True)
await self._cleanup()
@@ -830,6 +884,7 @@ class LLMUserAggregator(LLMContextAggregator):
logger.debug(f"{self}: User started speaking (strategy: {strategy})")
self._user_turn_start_timestamp = time_now_iso8601()
self._full_user_turn_aggregation = None
if params.enable_user_speaking_frames:
await self.broadcast_frame(UserStartedSpeakingFrame)
@@ -841,6 +896,30 @@ class LLMUserAggregator(LLMContextAggregator):
await self._call_event_handler("on_user_turn_started", strategy)
async def _on_user_turn_inference_triggered(
self,
controller: UserTurnController,
strategy: BaseUserTurnStopStrategy,
):
logger.debug(f"{self}: User turn inference triggered (strategy: {strategy})")
# Push aggregation now: this writes the user message segment to
# the context and emits LLMContextFrame, which kicks LLM
# inference. Concatenate the segment into
# `_full_user_turn_aggregation` so multiple inferences in the
# same turn don't lose earlier segments from the eventual
# `on_user_turn_stopped` event.
segment = await self.push_aggregation()
if segment:
if self._full_user_turn_aggregation:
self._full_user_turn_aggregation = (
f"{self._full_user_turn_aggregation} {segment}".strip()
)
else:
self._full_user_turn_aggregation = segment
await self._call_event_handler("on_user_turn_inference_triggered", strategy)
async def _on_user_turn_stopped(
self,
controller: UserTurnController,
@@ -875,15 +954,29 @@ class LLMUserAggregator(LLMContextAggregator):
):
"""Maybe emit user turn stopped event.
Earlier inference triggers in the same turn have already pushed
their segments to the context and accumulated them into
``self._full_user_turn_aggregation``. Any aggregation that
arrived after the last inference trigger is flushed here so
end-of-turn content is never lost from the public event.
Args:
strategy: The strategy that triggered the turn stop.
on_session_end: If True, only emit if there's unemitted content
(avoids duplicate events when session ends).
"""
aggregation = await self.push_aggregation()
if not on_session_end or aggregation:
segment = await self.push_aggregation()
full_aggregation = self._full_user_turn_aggregation
self._full_user_turn_aggregation = None
if segment and full_aggregation:
content = f"{full_aggregation} {segment}".strip()
else:
content = full_aggregation or segment
if not on_session_end or content:
message = UserTurnStoppedMessage(
content=aggregation, timestamp=self._user_turn_start_timestamp
content=content, timestamp=self._user_turn_start_timestamp
)
await self._call_event_handler("on_user_turn_stopped", strategy, message)
self._user_turn_start_timestamp = ""
@@ -1041,6 +1134,8 @@ class LLMAssistantAggregator(LLMContextAggregator):
await self._handle_llm_end(frame)
elif isinstance(frame, TextFrame):
await self._handle_text(frame)
elif isinstance(frame, LLMMarkerFrame):
await self._handle_marker_frame(frame)
elif isinstance(frame, LLMThoughtStartFrame):
await self._handle_thought_start(frame)
elif isinstance(frame, LLMThoughtTextFrame):
@@ -1446,6 +1541,31 @@ class LLMAssistantAggregator(LLMContextAggregator):
)
)
async def _handle_marker_frame(self, frame: LLMMarkerFrame):
if frame.append_to_context_immediately:
# Stand-alone marker: write it to the context now as its
# own assistant message. Used when the marker is the entire
# assistant turn — e.g. the ○ / ◐ incomplete-turn signals,
# where the spoken response is suppressed and the marker
# is the only artifact.
self._context.add_message({"role": "assistant", "content": frame.marker})
await self.push_context_frame()
timestamp_frame = LLMContextAssistantTimestampFrame(timestamp=time_now_iso8601())
await self.push_frame(timestamp_frame)
return
# Marker is part of an in-progress assistant response. Append
# it to the running aggregation so `push_aggregation` writes
# marker + text as a single context message — e.g. the ✓
# complete-turn signal that prefixes the spoken response,
# producing "✓ <response>" in context. Markers are stripped
# from the transcript via
# `_maybe_strip_turn_completion_markers` so consumers see
# clean text.
self._aggregation.append(
TextPartForConcatenation(frame.marker, includes_inter_part_spaces=False)
)
async def _handle_thought_start(self, frame: LLMThoughtStartFrame):
await self._reset_thought_aggregation()
self._thought_append_to_context = frame.append_to_context

View File

@@ -5,14 +5,21 @@
#
from .base_user_turn_stop_strategy import BaseUserTurnStopStrategy, UserTurnStoppedParams
from .deferred_user_turn_stop_strategy import DeferredUserTurnStopStrategy, deferred
from .external_user_turn_completion_stop_strategy import ExternalUserTurnCompletionStopStrategy
from .external_user_turn_stop_strategy import ExternalUserTurnStopStrategy
from .llm_turn_completion_user_turn_stop_strategy import LLMTurnCompletionUserTurnStopStrategy
from .speech_timeout_user_turn_stop_strategy import SpeechTimeoutUserTurnStopStrategy
from .turn_analyzer_user_turn_stop_strategy import TurnAnalyzerUserTurnStopStrategy
__all__ = [
"BaseUserTurnStopStrategy",
"DeferredUserTurnStopStrategy",
"ExternalUserTurnCompletionStopStrategy",
"ExternalUserTurnStopStrategy",
"LLMTurnCompletionUserTurnStopStrategy",
"SpeechTimeoutUserTurnStopStrategy",
"UserTurnStoppedParams",
"TurnAnalyzerUserTurnStopStrategy",
"deferred",
]

View File

@@ -45,7 +45,16 @@ class BaseUserTurnStopStrategy(BaseObject):
Events triggered by strategies:
- `on_push_frame`: Indicates the strategy wants to push a frame.
- `on_user_turn_stopped`: Signals that the user stopped speaking.
- `on_user_turn_inference_triggered`: Signals that enough evidence
exists to start LLM inference for the current user turn. In most
cases this fires together with `on_user_turn_stopped`. Strategies
that gate finalization on the LLM (e.g.
``LLMTurnCompletionUserTurnStopStrategy``) fire only this event
upstream and a separate strategy fires `on_user_turn_stopped` once
the LLM confirms the turn is complete.
- `on_user_turn_stopped`: Signals that the user turn is semantically
final. Observers, transcript appenders, and UI indicators should
bind this event.
"""
@@ -64,6 +73,7 @@ class BaseUserTurnStopStrategy(BaseObject):
self._task_manager: BaseTaskManager | None = None
self._register_event_handler("on_push_frame", sync=True)
self._register_event_handler("on_broadcast_frame", sync=True)
self._register_event_handler("on_user_turn_inference_triggered", sync=True)
self._register_event_handler("on_user_turn_stopped", sync=True)
@property
@@ -123,7 +133,23 @@ class BaseUserTurnStopStrategy(BaseObject):
await self._call_event_handler("on_broadcast_frame", frame_cls, **kwargs)
async def trigger_user_turn_stopped(self):
"""Trigger the `on_user_turn_stopped` event."""
"""Fire both ``on_user_turn_inference_triggered`` and ``on_user_turn_stopped``.
Most strategies call this when they decide a turn has ended. To
defer finalization to another strategy (so this strategy fires
only the inference-triggered event), wrap this strategy with
:func:`~pipecat.turns.user_stop.deferred` instead of changing
the trigger call.
"""
await self.trigger_user_turn_inference_triggered()
await self.trigger_user_turn_finalized()
async def trigger_user_turn_inference_triggered(self):
"""Trigger only the `on_user_turn_inference_triggered` event."""
await self._call_event_handler("on_user_turn_inference_triggered")
async def trigger_user_turn_finalized(self):
"""Trigger only the `on_user_turn_stopped` event."""
await self._call_event_handler(
"on_user_turn_stopped",
UserTurnStoppedParams(enable_user_speaking_frames=self._enable_user_speaking_frames),

View File

@@ -0,0 +1,104 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Wrapper that defers a stop strategy's finalization to another strategy."""
from pipecat.frames.frames import Frame
from pipecat.turns.types import ProcessFrameResult
from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy
from pipecat.utils.asyncio.task_manager import BaseTaskManager
class DeferredUserTurnStopStrategy(BaseUserTurnStopStrategy):
"""Wraps a stop strategy and suppresses its ``on_user_turn_stopped`` event.
Event subscriptions added to the wrapper are forwarded directly to
the inner strategy, except for ``on_user_turn_stopped``, which is
dropped. The inner strategy's frame-side and inference-triggered
events therefore reach external listeners (the controller, etc.)
unchanged; finalization is left to another strategy in the chain
such as ``LLMTurnCompletionUserTurnStopStrategy``.
Use the :func:`deferred` helper for ergonomic construction::
stop=[
deferred(TurnAnalyzerUserTurnStopStrategy(turn_analyzer=...)),
LLMTurnCompletionUserTurnStopStrategy(),
]
"""
def __init__(self, inner: BaseUserTurnStopStrategy, **kwargs):
"""Initialize the deferred wrapper.
Args:
inner: The strategy whose finalization should be deferred.
**kwargs: Additional keyword arguments forwarded to the base
class.
"""
super().__init__(**kwargs)
self._inner = inner
@property
def inner(self) -> BaseUserTurnStopStrategy:
"""Return the wrapped strategy."""
return self._inner
def add_event_handler(self, event_name: str, handler):
"""Forward event subscriptions to the inner strategy.
``on_user_turn_stopped`` is silently dropped — that's the whole
point of the wrapper. Every other event handler is attached to
the inner strategy directly, so the inner's events reach the
listener without any per-event proxy method on the wrapper.
"""
if event_name == "on_user_turn_stopped":
return
self._inner.add_event_handler(event_name, handler)
async def setup(self, task_manager: BaseTaskManager):
"""Set up the inner strategy."""
await super().setup(task_manager)
await self._inner.setup(task_manager)
async def cleanup(self):
"""Clean up the inner strategy."""
await super().cleanup()
await self._inner.cleanup()
async def reset(self):
"""Reset the inner strategy for a new user turn."""
await super().reset()
await self._inner.reset()
async def process_frame(self, frame: Frame) -> ProcessFrameResult | None:
"""Forward frame processing to the inner strategy."""
return await self._inner.process_frame(frame)
def deferred(strategy: BaseUserTurnStopStrategy) -> DeferredUserTurnStopStrategy:
"""Defer this stop strategy's finalization to another strategy.
Wraps ``strategy`` in a :class:`DeferredUserTurnStopStrategy`: the
inner strategy continues to drive inference-triggered events, but
its ``on_user_turn_stopped`` event is suppressed. Use when another
strategy in the chain (e.g.
``LLMTurnCompletionUserTurnStopStrategy``) owns finalization.
Example::
stop=[
deferred(TurnAnalyzerUserTurnStopStrategy(turn_analyzer=...)),
LLMTurnCompletionUserTurnStopStrategy(),
]
Args:
strategy: The stop strategy to defer.
Returns:
A wrapper that exposes the inner strategy's behavior with
finalization suppressed.
"""
return DeferredUserTurnStopStrategy(strategy)

View File

@@ -0,0 +1,47 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""User turn stop strategy that finalizes on ``UserTurnInferenceCompletedFrame``."""
from pipecat.frames.frames import Frame, UserTurnInferenceCompletedFrame
from pipecat.turns.types import ProcessFrameResult
from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy
class ExternalUserTurnCompletionStopStrategy(BaseUserTurnStopStrategy):
"""Finalize the user turn whenever a ``UserTurnInferenceCompletedFrame`` arrives.
Generic stop strategy for pipelines where some external component
(LLM with completion markers, STT with built-in turn detection, a
dedicated end-of-turn classifier, custom user code, etc.) judges
when a turn is semantically complete and emits
:class:`~pipecat.frames.frames.UserTurnInferenceCompletedFrame`.
Pair this with one or more ``deferred(...)``-wrapped detector
strategies that drive ``on_user_turn_inference_triggered`` but
leave finalization to this strategy::
stop=[
deferred(TurnAnalyzerUserTurnStopStrategy(turn_analyzer=...)),
ExternalUserTurnCompletionStopStrategy(),
]
For LLM-completion-marker gating specifically, use the subclass
:class:`~pipecat.turns.user_stop.LLMTurnCompletionUserTurnStopStrategy`
instead, which additionally pushes the ``LLMUpdateSettingsFrame``
that enables the marker protocol on the LLM.
If the producer never emits ``UserTurnInferenceCompletedFrame``, the
controller's ``user_turn_stop_timeout`` watchdog finalizes the
turn after no activity. Tune that timeout if your producer can
take longer than the default to respond.
"""
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
"""Fire ``on_user_turn_stopped`` whenever ``UserTurnInferenceCompletedFrame`` is seen."""
if isinstance(frame, UserTurnInferenceCompletedFrame):
await self.trigger_user_turn_finalized()
return ProcessFrameResult.CONTINUE

View File

@@ -0,0 +1,82 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""User turn stop strategy gated on the LLM's turn-completion verdict."""
from pipecat.frames.frames import Frame, LLMUpdateSettingsFrame, StartFrame
from pipecat.services.settings import LLMSettings
from pipecat.turns.types import ProcessFrameResult
from pipecat.turns.user_stop.external_user_turn_completion_stop_strategy import (
ExternalUserTurnCompletionStopStrategy,
)
from pipecat.turns.user_turn_completion_mixin import UserTurnCompletionConfig
class LLMTurnCompletionUserTurnStopStrategy(ExternalUserTurnCompletionStopStrategy):
"""LLM-gated stop strategy.
Extends
:class:`~pipecat.turns.user_stop.ExternalUserTurnCompletionStopStrategy`
with the LLM-specific setup needed for the marker-based completion
protocol: on ``StartFrame``, pushes an ``LLMUpdateSettingsFrame``
upstream that enables ``filter_incomplete_user_turns`` on the LLM
and seeds the
:class:`~pipecat.turns.user_turn_completion_mixin.UserTurnCompletionConfig`.
Finalization itself is inherited: when the LLM service's
:class:`~pipecat.turns.user_turn_completion_mixin.UserTurnCompletionLLMServiceMixin`
detects a ``✓`` marker, it broadcasts a
:class:`~pipecat.frames.frames.UserTurnInferenceCompletedFrame` and the
base class fires ``on_user_turn_stopped``. On
``incomplete_short`` / ``incomplete_long`` markers the mixin
re-prompts internally and no completion frame is emitted, so the
public stop event stays deferred.
Install alongside one or more ``deferred(...)``-wrapped detector
strategies that drive ``on_user_turn_inference_triggered`` but
leave finalization to this strategy. The aggregator's deprecation
path for ``filter_incomplete_user_turns`` does this rewiring
automatically.
"""
def __init__(
self,
*,
config: UserTurnCompletionConfig | None = None,
**kwargs,
):
"""Initialize the LLM turn-completion stop strategy.
Args:
config: Configuration applied to the LLM via the
``filter_incomplete_user_turns`` setting on
``StartFrame``. Defaults to ``UserTurnCompletionConfig()``.
**kwargs: Additional keyword arguments forwarded to the base
class.
"""
super().__init__(**kwargs)
self._config = config or UserTurnCompletionConfig()
@property
def config(self) -> UserTurnCompletionConfig:
"""Return the configured ``UserTurnCompletionConfig``."""
return self._config
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
"""Configure the LLM on start and delegate completion handling to the base."""
if isinstance(frame, StartFrame):
await self._configure_llm()
return await super().process_frame(frame)
async def _configure_llm(self):
await self.push_frame(
LLMUpdateSettingsFrame(
delta=LLMSettings(
filter_incomplete_user_turns=True,
user_turn_completion_config=self._config,
)
)
)

View File

@@ -22,9 +22,11 @@ from pipecat.frames.frames import (
Frame,
InterruptionFrame,
LLMFullResponseEndFrame,
LLMMarkerFrame,
LLMMessagesAppendFrame,
LLMRunFrame,
LLMTextFrame,
UserTurnInferenceCompletedFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
@@ -185,8 +187,8 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor):
It processes turn completion markers to enable smarter conversation flow:
- ✓ (COMPLETE): Push response normally
- ○ (INCOMPLETE SHORT): Suppress response, wait ~5s, then prompt
- ◐ (INCOMPLETE LONG): Suppress response, wait ~15s, then prompt
- ○ (INCOMPLETE SHORT): Suppress response, wait 5s, then prompt
- ◐ (INCOMPLETE LONG): Suppress response, wait 10s, then prompt
When incomplete timeouts expire, the mixin automatically prompts the LLM
with a contextual follow-up message to re-engage the user.
@@ -279,7 +281,7 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor):
await asyncio.sleep(timeout)
# Timeout expired - reset state before prompting LLM
logger.info(f"Incomplete {incomplete_type} timeout expired, prompting LLM")
logger.debug(f"Incomplete {incomplete_type} timeout expired, prompting LLM")
await self._turn_reset()
self._incomplete_timeout_task = None
self._incomplete_type = None
@@ -402,11 +404,15 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor):
)
self._turn_suppressed = True
# Push the marker with skip_tts=True so it's added to context (maintains
# conversation continuity per prompt instructions) but not spoken by TTS
frame = LLMTextFrame(self._turn_text_buffer)
frame.skip_tts = True
await self.push_frame(frame)
# No UserTurnInferenceCompletedFrame is broadcast here: the turn is
# explicitly not complete. The re-prompt path is driven by
# this mixin's own timeout.
# Persist the marker to context as a stand-alone assistant
# message via LLMMarkerFrame: the bot produces no spoken
# output for incomplete turns, so the marker is the entire
# context entry.
await self.push_frame(LLMMarkerFrame(marker))
self._turn_text_buffer = ""
await self._start_incomplete_timeout(incomplete_type)
@@ -416,17 +422,25 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor):
if USER_TURN_COMPLETE_MARKER in self._turn_text_buffer:
logger.debug(f"COMPLETE ({USER_TURN_COMPLETE_MARKER}) detected, pushing buffered text")
# Broadcast that the user turn is complete so a stop strategy
# gating finalization on this signal (e.g.
# LLMTurnCompletionUserTurnStopStrategy) can fire
# `on_user_turn_stopped`. Must fire before the marker so
# downstream consumers see the signal before the response.
await self.broadcast_frame(UserTurnInferenceCompletedFrame)
# Push the marker as a sideband signal that the assistant
# aggregator will prepend to the upcoming aggregated text,
# so the context message ends up as "✓ <response>".
await self.push_frame(
LLMMarkerFrame(USER_TURN_COMPLETE_MARKER, append_to_context_immediately=False)
)
# Split buffer at the marker to handle cases where marker and text
# arrive in the same chunk (e.g., "✓ Hello!" from some LLMs)
marker_pos = self._turn_text_buffer.index(USER_TURN_COMPLETE_MARKER)
marker_end = marker_pos + len(USER_TURN_COMPLETE_MARKER)
# Push the marker with skip_tts=True - adds to context but not spoken
marker_text = self._turn_text_buffer[:marker_end]
frame = LLMTextFrame(marker_text)
frame.skip_tts = True
await self.push_frame(frame)
# Push remaining text after marker as normal speech
remaining_text = self._turn_text_buffer[marker_end:]
if remaining_text:

View File

@@ -38,7 +38,11 @@ class UserTurnController(BaseObject):
Event handlers available:
- on_user_turn_started: Emitted when a user turn starts.
- on_user_turn_stopped: Emitted when a user turn stops.
- on_user_turn_inference_triggered: Emitted when enough signal exists to
start LLM inference. Fires together with `on_user_turn_stopped` for
most strategies; fires alone when a downstream strategy gates
finalization on the LLM's verdict.
- on_user_turn_stopped: Emitted when a user turn is semantically final.
- on_user_turn_stop_timeout: Emitted if no stop strategy triggers before timeout.
- on_push_frame: Emitted when a strategy wants to push a frame.
- on_broadcast_frame: Emitted when a strategy wants to broadcast a frame.
@@ -49,6 +53,10 @@ class UserTurnController(BaseObject):
async def on_user_turn_started(controller, strategy: BaseUserTurnStartStrategy, params: UserTurnStartedParams):
...
@controller.event_handler("on_user_turn_inference_triggered")
async def on_user_turn_inference_triggered(controller, strategy: BaseUserTurnStopStrategy):
...
@controller.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(controller, strategy: BaseUserTurnStopStrategy, params: UserTurnStoppedParams):
...
@@ -95,6 +103,7 @@ class UserTurnController(BaseObject):
self._register_event_handler("on_push_frame", sync=True)
self._register_event_handler("on_broadcast_frame", sync=True)
self._register_event_handler("on_user_turn_started", sync=True)
self._register_event_handler("on_user_turn_inference_triggered", sync=True)
self._register_event_handler("on_user_turn_stopped", sync=True)
self._register_event_handler("on_user_turn_stop_timeout", sync=True)
self._register_event_handler("on_reset_aggregation", sync=True)
@@ -186,6 +195,9 @@ class UserTurnController(BaseObject):
await s.setup(self.task_manager)
s.add_event_handler("on_push_frame", self._on_push_frame)
s.add_event_handler("on_broadcast_frame", self._on_broadcast_frame)
s.add_event_handler(
"on_user_turn_inference_triggered", self._on_user_turn_inference_triggered
)
s.add_event_handler("on_user_turn_stopped", self._on_user_turn_stopped)
async def _cleanup_strategies(self):
@@ -246,6 +258,9 @@ class UserTurnController(BaseObject):
):
await self._trigger_user_turn_start(strategy, params)
async def _on_user_turn_inference_triggered(self, strategy: BaseUserTurnStopStrategy):
await self._trigger_user_turn_inference_triggered(strategy)
async def _on_user_turn_stopped(
self, strategy: BaseUserTurnStopStrategy, params: UserTurnStoppedParams
):
@@ -274,6 +289,20 @@ class UserTurnController(BaseObject):
await self._call_event_handler("on_user_turn_started", strategy, params)
async def _trigger_user_turn_inference_triggered(
self, strategy: BaseUserTurnStopStrategy | None
):
# Inference-triggered fires only while a turn is active. The turn
# remains active afterward — only `on_user_turn_stopped` flips state.
if not self._user_turn:
return
# Re-arm the stop watchdog so a stuck turn (inference fired but
# finalization never arrives) still times out and finalizes.
self._user_turn_stop_timeout_event.set()
await self._call_event_handler("on_user_turn_inference_triggered", strategy)
async def _trigger_user_turn_stop(
self, strategy: BaseUserTurnStopStrategy | None, params: UserTurnStoppedParams
):

View File

@@ -35,7 +35,11 @@ class UserTurnProcessor(FrameProcessor):
Event handlers available:
- on_user_turn_started: Emitted when a user turn starts.
- on_user_turn_stopped: Emitted when a user turn stops.
- on_user_turn_inference_triggered: Emitted when enough signal exists to
start LLM inference. Fires together with `on_user_turn_stopped` for
most strategies; fires alone when a downstream strategy gates
finalization on the LLM's verdict.
- on_user_turn_stopped: Emitted when a user turn is semantically final.
- on_user_turn_stop_timeout: Emitted if no stop strategy triggers before timeout.
- on_user_turn_idle: Emitted when the user has been idle for the configured timeout.
@@ -45,6 +49,10 @@ class UserTurnProcessor(FrameProcessor):
async def on_user_turn_started(processor, strategy: BaseUserTurnStartStrategy):
...
@processor.event_handler("on_user_turn_inference_triggered")
async def on_user_turn_inference_triggered(processor, strategy: BaseUserTurnStopStrategy):
...
@processor.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(processor, strategy: BaseUserTurnStopStrategy):
...
@@ -85,6 +93,7 @@ class UserTurnProcessor(FrameProcessor):
self._register_event_handler("on_user_turn_stopped")
self._register_event_handler("on_user_turn_stop_timeout")
self._register_event_handler("on_user_turn_idle")
self._register_event_handler("on_user_turn_inference_triggered")
self._user_turn_controller = UserTurnController(
user_turn_strategies=user_turn_strategies or UserTurnStrategies(),
@@ -101,6 +110,9 @@ class UserTurnProcessor(FrameProcessor):
self._user_turn_controller.add_event_handler(
"on_user_turn_stop_timeout", self._on_user_turn_stop_timeout
)
self._user_turn_controller.add_event_handler(
"on_user_turn_inference_triggered", self._on_user_turn_inference_triggered
)
self._user_idle_controller = UserIdleController(user_idle_timeout=user_idle_timeout)
self._user_idle_controller.add_event_handler("on_user_turn_idle", self._on_user_turn_idle)
@@ -204,3 +216,11 @@ class UserTurnProcessor(FrameProcessor):
async def _on_user_turn_idle(self, controller):
await self._call_event_handler("on_user_turn_idle")
async def _on_user_turn_inference_triggered(
self,
controller: UserTurnController,
strategy: BaseUserTurnStopStrategy,
):
logger.debug(f"{self}: User turn inference triggered (strategy: {strategy})")
await self._call_event_handler("on_user_turn_inference_triggered", strategy)

View File

@@ -17,8 +17,11 @@ from pipecat.turns.user_start import (
from pipecat.turns.user_stop import (
BaseUserTurnStopStrategy,
ExternalUserTurnStopStrategy,
LLMTurnCompletionUserTurnStopStrategy,
TurnAnalyzerUserTurnStopStrategy,
deferred,
)
from pipecat.turns.user_turn_completion_mixin import UserTurnCompletionConfig
def default_user_turn_start_strategies() -> list[BaseUserTurnStartStrategy]:
@@ -95,3 +98,57 @@ class ExternalUserTurnStrategies(UserTurnStrategies):
def __post_init__(self):
self.start = [ExternalUserTurnStartStrategy()]
self.stop = [ExternalUserTurnStopStrategy()]
@dataclass
class FilterIncompleteUserTurnStrategies(UserTurnStrategies):
"""Stop strategies gated on the LLM's turn-completion verdict.
The LLM is asked to begin every response with one of three markers:
✓ (complete), ○ (incomplete short), or ◐ (incomplete long). Only ✓
finalizes the user turn; ○ / ◐ keep the turn open so the user can
continue speaking and the LLM can re-evaluate later.
Configuring strategies this way preserves the existing detector
chain (defaults or user-supplied) for inference triggering and
appends :class:`~pipecat.turns.user_stop.LLMTurnCompletionUserTurnStopStrategy`
as the finalizer. The detector strategies are wrapped with
:func:`~pipecat.turns.user_stop.deferred` automatically so they fire
only ``on_user_turn_inference_triggered`` and leave finalization to
the LLM gate.
Parameters:
config: Optional configuration applied to the LLM via the
``filter_incomplete_user_turns`` setting. Customizes the
turn-completion instructions, incomplete-turn timeouts, and
re-prompts. If None, defaults from
:class:`~pipecat.turns.user_turn_completion_mixin.UserTurnCompletionConfig`
are used.
Example::
user_turn_strategies=FilterIncompleteUserTurnStrategies()
# Custom detector chain:
user_turn_strategies=FilterIncompleteUserTurnStrategies(
stop=[SpeechTimeoutUserTurnStopStrategy(...)],
)
# Custom completion config:
user_turn_strategies=FilterIncompleteUserTurnStrategies(
config=UserTurnCompletionConfig(
incomplete_short_timeout=5.0,
incomplete_long_timeout=10.0,
),
)
"""
config: UserTurnCompletionConfig | None = None
def __post_init__(self):
super().__post_init__()
# Defer the detector chain so it only fires inference-triggered,
# then append the LLM gate as the sole finalizer.
gated: list[BaseUserTurnStopStrategy] = [deferred(s) for s in self.stop or []]
gated.append(LLMTurnCompletionUserTurnStopStrategy(config=self.config))
self.stop = gated

View File

@@ -64,7 +64,10 @@ from pipecat.turns.user_mute import (
MuteUntilFirstBotCompleteUserMuteStrategy,
)
from pipecat.turns.user_stop import SpeechTimeoutUserTurnStopStrategy
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.turns.user_turn_strategies import (
FilterIncompleteUserTurnStrategies,
UserTurnStrategies,
)
from pipecat.utils.text.base_text_aggregator import AggregationType
USER_TURN_STOP_TIMEOUT = 0.2
@@ -180,7 +183,9 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase):
async def test_llm_messages_update_does_not_inject_turn_completion_into_context(self):
context = LLMContext()
params = LLMUserAggregatorParams(filter_incomplete_user_turns=True)
params = LLMUserAggregatorParams(
user_turn_strategies=FilterIncompleteUserTurnStrategies(),
)
pipeline = Pipeline([LLMUserAggregator(context, params=params)])
new_messages = [
@@ -291,8 +296,8 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase):
UserStartedSpeakingFrame,
InterruptionFrame,
VADUserStoppedSpeakingFrame,
UserStoppedSpeakingFrame,
LLMContextFrame,
UserStoppedSpeakingFrame,
]
await run_test(
pipeline,
@@ -563,6 +568,201 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase):
expected_down_frames=[SpeechControlParamsFrame],
)
async def test_inference_triggered_event_fires_on_default_strategies(self):
"""Default flow fires inference-triggered before stopped, both with the same strategy."""
from pipecat.frames.frames import UserTurnInferenceCompletedFrame # noqa: F401
context = LLMContext()
user_aggregator = LLMUserAggregator(
context,
params=LLMUserAggregatorParams(
user_turn_strategies=UserTurnStrategies(
stop=[
SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT)
]
),
),
)
events: list[str] = []
@user_aggregator.event_handler("on_user_turn_inference_triggered")
async def on_inference_triggered(aggregator, strategy):
events.append("inference_triggered")
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_stopped(aggregator, strategy, message):
events.append(f"stopped:{message.content}")
pipeline = Pipeline([user_aggregator])
frames_to_send = [
VADUserStartedSpeakingFrame(),
TranscriptionFrame(text="Hi!", user_id="", timestamp="now"),
SleepFrame(),
VADUserStoppedSpeakingFrame(),
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.1),
]
await run_test(pipeline, frames_to_send=frames_to_send)
self.assertEqual(events, ["inference_triggered", "stopped:Hi!"])
async def test_filter_incomplete_user_turns_emits_deprecation_warning(self):
"""Setting the legacy flag emits a DeprecationWarning."""
import warnings
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
LLMUserAggregatorParams(filter_incomplete_user_turns=True)
matched = [
x
for x in w
if issubclass(x.category, DeprecationWarning)
and "filter_incomplete_user_turns" in str(x.message)
]
self.assertTrue(matched, "expected a DeprecationWarning")
async def test_filter_incomplete_user_turns_installs_strategy(self):
"""Legacy flag wraps existing stops with deferred() and appends the LLM strategy."""
import warnings
from pipecat.turns.user_stop import (
DeferredUserTurnStopStrategy,
LLMTurnCompletionUserTurnStopStrategy,
SpeechTimeoutUserTurnStopStrategy,
)
existing = SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT)
context = LLMContext()
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
params = LLMUserAggregatorParams(
filter_incomplete_user_turns=True,
user_turn_strategies=UserTurnStrategies(stop=[existing]),
)
aggregator = LLMUserAggregator(context, params=params)
stop_strategies = aggregator._params.user_turn_strategies.stop
self.assertEqual(len(stop_strategies), 2)
self.assertIsInstance(stop_strategies[0], DeferredUserTurnStopStrategy)
self.assertIs(stop_strategies[0].inner, existing)
self.assertIsInstance(stop_strategies[1], LLMTurnCompletionUserTurnStopStrategy)
async def test_llm_completion_strategy_finalizes_on_complete_marker(self):
"""LLMTurnCompletionUserTurnStopStrategy finalizes only on UserTurnInferenceCompletedFrame(complete)."""
from pipecat.frames.frames import UserTurnInferenceCompletedFrame
from pipecat.turns.user_stop import LLMTurnCompletionUserTurnStopStrategy, deferred
gating = LLMTurnCompletionUserTurnStopStrategy()
upstream = deferred(
SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT)
)
context = LLMContext()
user_aggregator = LLMUserAggregator(
context,
params=LLMUserAggregatorParams(
user_turn_strategies=UserTurnStrategies(stop=[upstream, gating]),
),
)
events: list[str] = []
@user_aggregator.event_handler("on_user_turn_inference_triggered")
async def on_inference_triggered(aggregator, strategy):
events.append("inference_triggered")
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_stopped(aggregator, strategy, message):
events.append("stopped")
pipeline = Pipeline([user_aggregator])
# Drive the pipeline. Inference fires after the upstream
# strategy's timeout. Stop fires only when UserTurnInferenceCompletedFrame
# arrives (producer absence == "not yet complete").
frames_to_send = [
VADUserStartedSpeakingFrame(),
TranscriptionFrame(text="Hi", user_id="", timestamp="now"),
SleepFrame(),
VADUserStoppedSpeakingFrame(),
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.1),
# At this point inference_triggered should have fired but NOT stopped.
UserTurnInferenceCompletedFrame(),
SleepFrame(),
]
await run_test(pipeline, frames_to_send=frames_to_send)
self.assertEqual(events, ["inference_triggered", "stopped"])
async def test_multiple_inferences_in_one_turn_preserve_aggregation(self):
"""Two inference triggers before finalization should preserve the full user transcript.
When the LLM marks the first inference incomplete (○ / ◐) and the
user keeps speaking, the deferred upstream strategy fires a
second inference. Both the public ``on_user_turn_stopped`` event
and the conversation context should reflect the full user
utterance, not just the segment from the last inference.
"""
from pipecat.frames.frames import UserTurnInferenceCompletedFrame
from pipecat.turns.user_stop import LLMTurnCompletionUserTurnStopStrategy, deferred
gating = LLMTurnCompletionUserTurnStopStrategy()
upstream = deferred(
SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT)
)
context = LLMContext()
user_aggregator = LLMUserAggregator(
context,
params=LLMUserAggregatorParams(
user_turn_strategies=UserTurnStrategies(stop=[upstream, gating]),
),
)
inference_count = 0
stop_message = None
@user_aggregator.event_handler("on_user_turn_inference_triggered")
async def on_inference_triggered(aggregator, strategy):
nonlocal inference_count
inference_count += 1
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_stopped(aggregator, strategy, message):
nonlocal stop_message
stop_message = message
pipeline = Pipeline([user_aggregator])
frames_to_send = [
VADUserStartedSpeakingFrame(),
TranscriptionFrame(text="I'm thinking", user_id="", timestamp="now"),
SleepFrame(),
VADUserStoppedSpeakingFrame(),
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.1),
# First inference fired here. Imagine the LLM returned ○;
# the turn is not yet finalized, so the user keeps talking.
VADUserStartedSpeakingFrame(),
TranscriptionFrame(text="about pizza", user_id="", timestamp="now"),
SleepFrame(),
VADUserStoppedSpeakingFrame(),
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.1),
# Second inference fired here. Now the LLM returns ✓ and the
# turn finalizes via UserTurnInferenceCompletedFrame.
UserTurnInferenceCompletedFrame(),
SleepFrame(),
]
await run_test(pipeline, frames_to_send=frames_to_send)
self.assertEqual(inference_count, 2)
self.assertIsNotNone(stop_message)
# The public event should report the full transcript, even
# though each inference push only writes its own segment to
# the context.
self.assertEqual(stop_message.content, "I'm thinking about pizza")
user_messages = [m for m in context.get_messages() if m.get("role") == "user"]
self.assertEqual([m["content"] for m in user_messages], ["I'm thinking", "about pizza"])
class TestLLMAssistantAggregator(unittest.IsolatedAsyncioTestCase):
async def test_empty(self):

View File

@@ -8,7 +8,12 @@ import unittest
import unittest.mock
from unittest.mock import AsyncMock
from pipecat.frames.frames import LLMFullResponseEndFrame, LLMTextFrame
from pipecat.frames.frames import (
LLMFullResponseEndFrame,
LLMMarkerFrame,
LLMTextFrame,
UserTurnInferenceCompletedFrame,
)
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.services.llm_service import LLMService
from pipecat.services.settings import LLMSettings
@@ -44,21 +49,24 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase
# Simulate LLM generating: "✓ Hello there!"
await processor._push_turn_text(f"{USER_TURN_COMPLETE_MARKER} Hello there!")
# Should have 2 text frames: marker (skip_tts) and content (normal)
self.assertEqual(len(pushed_frames), 2)
# The marker rides as LLMMarkerFrame(append_to_context_immediately=False);
# only the spoken text is pushed as an LLMTextFrame.
text_frames = [f for f in pushed_frames if isinstance(f, LLMTextFrame)]
self.assertEqual(len(text_frames), 1)
self.assertEqual(text_frames[0].text, "Hello there!")
self.assertFalse(text_frames[0].skip_tts)
# First frame should be the marker with skip_tts=True
self.assertIsInstance(pushed_frames[0], LLMTextFrame)
self.assertEqual(pushed_frames[0].text, USER_TURN_COMPLETE_MARKER)
self.assertTrue(pushed_frames[0].skip_tts)
marker_frames = [f for f in pushed_frames if isinstance(f, LLMMarkerFrame)]
self.assertEqual(len(marker_frames), 1)
self.assertEqual(marker_frames[0].marker, USER_TURN_COMPLETE_MARKER)
self.assertFalse(marker_frames[0].append_to_context_immediately)
# Second frame should be the actual text without skip_tts
self.assertIsInstance(pushed_frames[1], LLMTextFrame)
self.assertEqual(pushed_frames[1].text, "Hello there!")
self.assertFalse(pushed_frames[1].skip_tts)
# UserTurnInferenceCompletedFrame broadcast in both directions.
completed = [f for f in pushed_frames if isinstance(f, UserTurnInferenceCompletedFrame)]
self.assertEqual(len(completed), 2)
async def test_incomplete_short_marker_suppresses_text(self):
"""Test that ○ marker suppresses text with skip_tts."""
"""Test that ○ marker suppresses text and is emitted as a stand-alone marker frame."""
processor = MockProcessor()
pushed_frames = []
@@ -70,14 +78,21 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase
await processor._push_turn_text(USER_TURN_INCOMPLETE_SHORT_MARKER)
# Should have 1 text frame with skip_tts=True
self.assertEqual(len(pushed_frames), 1)
self.assertIsInstance(pushed_frames[0], LLMTextFrame)
self.assertEqual(pushed_frames[0].text, USER_TURN_INCOMPLETE_SHORT_MARKER)
self.assertTrue(pushed_frames[0].skip_tts)
# No LLMTextFrame: response is suppressed.
text_frames = [f for f in pushed_frames if isinstance(f, LLMTextFrame)]
self.assertEqual(len(text_frames), 0)
marker_frames = [f for f in pushed_frames if isinstance(f, LLMMarkerFrame)]
self.assertEqual(len(marker_frames), 1)
self.assertEqual(marker_frames[0].marker, USER_TURN_INCOMPLETE_SHORT_MARKER)
self.assertTrue(marker_frames[0].append_to_context_immediately)
# Incomplete markers do not emit UserTurnInferenceCompletedFrame.
completed = [f for f in pushed_frames if isinstance(f, UserTurnInferenceCompletedFrame)]
self.assertEqual(len(completed), 0)
async def test_incomplete_long_marker_suppresses_text(self):
"""Test that ◐ marker suppresses text with skip_tts."""
"""Test that ◐ marker suppresses text and is emitted as a stand-alone marker frame."""
processor = MockProcessor()
pushed_frames = []
@@ -89,11 +104,16 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase
await processor._push_turn_text(USER_TURN_INCOMPLETE_LONG_MARKER)
# Should have 1 text frame with skip_tts=True
self.assertEqual(len(pushed_frames), 1)
self.assertIsInstance(pushed_frames[0], LLMTextFrame)
self.assertEqual(pushed_frames[0].text, USER_TURN_INCOMPLETE_LONG_MARKER)
self.assertTrue(pushed_frames[0].skip_tts)
text_frames = [f for f in pushed_frames if isinstance(f, LLMTextFrame)]
self.assertEqual(len(text_frames), 0)
marker_frames = [f for f in pushed_frames if isinstance(f, LLMMarkerFrame)]
self.assertEqual(len(marker_frames), 1)
self.assertEqual(marker_frames[0].marker, USER_TURN_INCOMPLETE_LONG_MARKER)
self.assertTrue(marker_frames[0].append_to_context_immediately)
completed = [f for f in pushed_frames if isinstance(f, UserTurnInferenceCompletedFrame)]
self.assertEqual(len(completed), 0)
async def test_text_buffered_until_marker_found(self):
"""Test that text is buffered until a marker is detected."""
@@ -114,8 +134,12 @@ class TestUserUserTurnCompletionLLMServiceMixin(unittest.IsolatedAsyncioTestCase
# Now send the complete marker
await processor._push_turn_text(f" {USER_TURN_COMPLETE_MARKER} How are you?")
# Now frames should be pushed
self.assertEqual(len(pushed_frames), 2)
# One LLMTextFrame for the spoken portion; one LLMMarkerFrame for
# the marker; UserTurnInferenceCompletedFrame broadcast in both directions.
text_frames = [f for f in pushed_frames if isinstance(f, LLMTextFrame)]
self.assertEqual(len(text_frames), 1)
marker_frames = [f for f in pushed_frames if isinstance(f, LLMMarkerFrame)]
self.assertEqual(len(marker_frames), 1)
async def test_turn_state_reset_after_llm_full_response_end_frame(self):
"""Test that _turn_complete_found is reset when LLMFullResponseEndFrame is pushed."""

View File

@@ -19,7 +19,7 @@ from pipecat.turns.user_start import VADUserTurnStartStrategy
from pipecat.turns.user_start.min_words_user_turn_start_strategy import (
MinWordsUserTurnStartStrategy,
)
from pipecat.turns.user_stop import SpeechTimeoutUserTurnStopStrategy
from pipecat.turns.user_stop import SpeechTimeoutUserTurnStopStrategy, deferred
from pipecat.turns.user_turn_controller import UserTurnController
from pipecat.turns.user_turn_strategies import ExternalUserTurnStrategies, UserTurnStrategies
from pipecat.utils.asyncio.task_manager import TaskManager, TaskManagerParams
@@ -71,6 +71,66 @@ class TestUserTurnController(unittest.IsolatedAsyncioTestCase):
await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1)
self.assertTrue(should_stop)
async def test_inference_triggered_fires_alongside_stopped(self):
"""Default strategies fire both inference-triggered and stopped, in order."""
controller = UserTurnController(
user_turn_strategies=UserTurnStrategies(
stop=[SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT)],
)
)
await controller.setup(self.task_manager)
events: list[str] = []
@controller.event_handler("on_user_turn_inference_triggered")
async def on_user_turn_inference_triggered(controller, strategy):
events.append("inference_triggered")
@controller.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(controller, strategy, params):
events.append("stopped")
await controller.process_frame(VADUserStartedSpeakingFrame())
await controller.process_frame(
TranscriptionFrame(text="Hello!", user_id="", timestamp="now")
)
await controller.process_frame(VADUserStoppedSpeakingFrame())
await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1)
self.assertEqual(events, ["inference_triggered", "stopped"])
async def test_deferred_wrapper_skips_stopped(self):
"""A deferred() wrapper drops the inner strategy's on_user_turn_stopped event."""
wrapped = deferred(
SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT)
)
controller = UserTurnController(user_turn_strategies=UserTurnStrategies(stop=[wrapped]))
await controller.setup(self.task_manager)
events: list[str] = []
@controller.event_handler("on_user_turn_inference_triggered")
async def on_user_turn_inference_triggered(controller, strategy):
events.append("inference_triggered")
@controller.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(controller, strategy, params):
events.append("stopped")
await controller.process_frame(VADUserStartedSpeakingFrame())
await controller.process_frame(
TranscriptionFrame(text="Hello!", user_id="", timestamp="now")
)
await controller.process_frame(VADUserStoppedSpeakingFrame())
await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1)
# The inner strategy fires inference-triggered (forwarded by the
# wrapper). Finalization is suppressed, but the controller's
# stop watchdog eventually fires `stopped`.
self.assertEqual(events[0], "inference_triggered")
async def test_user_turn_start_reset(self):
controller = UserTurnController(
user_turn_strategies=UserTurnStrategies(