Compare commits
11 Commits
main
...
pk/optiona
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3fee91ddec | ||
|
|
638294c1cc | ||
|
|
ea96b7aec7 | ||
|
|
666c619113 | ||
|
|
797d09a1d5 | ||
|
|
ee1538d18e | ||
|
|
8330c3487d | ||
|
|
4479a3a6af | ||
|
|
8631518388 | ||
|
|
47e2f7a037 | ||
|
|
6d21507e95 |
1
changelog/4480.added.md
Normal file
1
changelog/4480.added.md
Normal file
@@ -0,0 +1 @@
|
||||
- Added `wait_for_transcript_to_end_user_turn` on `LLMUserAggregatorParams` for pipelines where local turn detection drives a realtime service like Gemini Live. Set it to False to avoid unnecessary latency from transcript delay — the realtime service consumes user audio directly, so we don't need user transcripts in context before it can respond. The option makes it so that (1) turn strategies do not consider user transcripts, letting the user turn end sooner, and (2) user transcripts are then handled by the aggregator: a simple timer gives it time to gather those transcripts after the user turn ends, and once gathered, the aggregator emits a new `on_user_turn_message_finalized` event with the new user context message. The new event also fires in the default mode (coinciding with `on_user_turn_stopped`), so consumers that want the populated user transcript can subscribe to it uniformly. See `examples/realtime/realtime-gemini-live-local-vad.py` for the full pattern.
|
||||
@@ -20,6 +20,7 @@ from pipecat.processors.aggregators.llm_response_universal import (
|
||||
AssistantTurnStoppedMessage,
|
||||
LLMContextAggregatorPair,
|
||||
LLMUserAggregatorParams,
|
||||
UserMessageFinalizedMessage,
|
||||
UserTurnStoppedMessage,
|
||||
)
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
@@ -70,10 +71,25 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
},
|
||||
],
|
||||
)
|
||||
# `wait_for_transcript_to_end_user_turn=False` is the right setting
|
||||
# for pipelines like this one — local turn detection driving a
|
||||
# realtime service. It avoids unnecessary latency from transcript
|
||||
# delay: the realtime service consumes user audio directly, so
|
||||
# we don't need user transcripts in context before it can respond.
|
||||
# With this option:
|
||||
#
|
||||
# - Turn strategies do not consider user transcripts, so the user
|
||||
# turn ends sooner.
|
||||
# - User transcripts are handled by the aggregator: a simple
|
||||
# post-turn transcript wait gives them time to arrive after the
|
||||
# user turn ends, then the aggregator emits
|
||||
# `on_user_turn_message_finalized` with the new user context
|
||||
# message.
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
wait_for_transcript_to_end_user_turn=False,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -107,8 +123,23 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
# `on_user_turn_stopped` fires at the end of the user turn. With
|
||||
# `wait_for_transcript_to_end_user_turn=False`, no user
|
||||
# transcripts have arrived yet at this point, so
|
||||
# `message.content` is empty. Logged here to make the end-of-turn
|
||||
# signal visible alongside the later finalization event.
|
||||
@user_aggregator.event_handler("on_user_turn_stopped")
|
||||
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
|
||||
logger.info(f"User turn ended (strategy: {type(strategy).__name__})")
|
||||
|
||||
# `on_user_turn_message_finalized` fires when the user message has
|
||||
# been finalized into the context. Here it fires later than
|
||||
# `on_user_turn_stopped`, after the aggregator's post-turn
|
||||
# transcript wait completes.
|
||||
@user_aggregator.event_handler("on_user_turn_message_finalized")
|
||||
async def on_user_turn_message_finalized(
|
||||
aggregator, strategy, message: UserMessageFinalizedMessage
|
||||
):
|
||||
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
|
||||
line = f"{timestamp}user: {message.content}"
|
||||
logger.info(f"Transcript: {line}")
|
||||
|
||||
182
examples/realtime/realtime-openai-local-vad.py
Normal file
182
examples/realtime/realtime-openai-local-vad.py
Normal file
@@ -0,0 +1,182 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import (
|
||||
AssistantTurnStoppedMessage,
|
||||
LLMContextAggregatorPair,
|
||||
LLMUserAggregatorParams,
|
||||
UserMessageFinalizedMessage,
|
||||
UserTurnStoppedMessage,
|
||||
)
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.openai.realtime.events import (
|
||||
AudioConfiguration,
|
||||
AudioInput,
|
||||
InputAudioTranscription,
|
||||
SessionProperties,
|
||||
)
|
||||
from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
# We use lambdas to defer transport parameter creation until the transport
|
||||
# type is selected at runtime.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
# `turn_detection=False` disables OpenAI Realtime's server-side VAD,
|
||||
# so this pipeline's local turn detection drives turn boundaries.
|
||||
# The service then sends `input_audio_buffer.commit` +
|
||||
# `response.create` when it sees `UserStoppedSpeakingFrame`.
|
||||
llm = OpenAIRealtimeLLMService(
|
||||
api_key=os.environ["OPENAI_API_KEY"],
|
||||
settings=OpenAIRealtimeLLMService.Settings(
|
||||
session_properties=SessionProperties(
|
||||
audio=AudioConfiguration(
|
||||
input=AudioInput(
|
||||
transcription=InputAudioTranscription(),
|
||||
turn_detection=False,
|
||||
),
|
||||
),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
context = LLMContext(
|
||||
[
|
||||
{
|
||||
"role": "developer",
|
||||
"content": "Say hello. Then ask if I want to hear a joke.",
|
||||
},
|
||||
],
|
||||
)
|
||||
# `wait_for_transcript_to_end_user_turn=False` is the right setting
|
||||
# for pipelines like this one — local turn detection driving a
|
||||
# realtime service. It avoids unnecessary latency from transcript
|
||||
# delay: the realtime service consumes user audio directly, so
|
||||
# we don't need user transcripts in context before it can respond.
|
||||
# With this option:
|
||||
#
|
||||
# - Turn strategies do not consider user transcripts, so the user
|
||||
# turn ends sooner.
|
||||
# - User transcripts are handled by the aggregator: a simple
|
||||
# post-turn transcript wait gives them time to arrive after the
|
||||
# user turn ends, then the aggregator emits
|
||||
# `on_user_turn_message_finalized` with the new user context
|
||||
# message.
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
wait_for_transcript_to_end_user_turn=False,
|
||||
),
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
user_aggregator,
|
||||
llm,
|
||||
transport.output(),
|
||||
assistant_aggregator,
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
# `on_user_turn_stopped` fires at the end of the user turn. With
|
||||
# `wait_for_transcript_to_end_user_turn=False`, no user
|
||||
# transcripts have arrived yet at this point, so
|
||||
# `message.content` is empty. Logged here to make the end-of-turn
|
||||
# signal visible alongside the later finalization event.
|
||||
@user_aggregator.event_handler("on_user_turn_stopped")
|
||||
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
|
||||
logger.info(f"User turn ended (strategy: {type(strategy).__name__})")
|
||||
|
||||
# `on_user_turn_message_finalized` fires when the user message has
|
||||
# been finalized into the context. Here it fires later than
|
||||
# `on_user_turn_stopped`, after the aggregator's post-turn
|
||||
# transcript wait completes.
|
||||
@user_aggregator.event_handler("on_user_turn_message_finalized")
|
||||
async def on_user_turn_message_finalized(
|
||||
aggregator, strategy, message: UserMessageFinalizedMessage
|
||||
):
|
||||
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
|
||||
line = f"{timestamp}user: {message.content}"
|
||||
logger.info(f"Transcript: {line}")
|
||||
|
||||
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
|
||||
async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage):
|
||||
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
|
||||
line = f"{timestamp}assistant: {message.content}"
|
||||
logger.info(f"Transcript: {line}")
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
@@ -55,6 +55,7 @@ from pipecat.frames.frames import (
|
||||
LLMThoughtStartFrame,
|
||||
LLMThoughtTextFrame,
|
||||
StartFrame,
|
||||
STTMetadataFrame,
|
||||
TextFrame,
|
||||
TranscriptionFrame,
|
||||
TranslationFrame,
|
||||
@@ -80,6 +81,7 @@ from pipecat.processors.aggregators.llm_context_summarizer import (
|
||||
SummaryAppliedEvent,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.services.stt_latency import DEFAULT_TTFS_P99
|
||||
from pipecat.turns.user_idle_controller import UserIdleController
|
||||
from pipecat.turns.user_mute import BaseUserMuteStrategy
|
||||
from pipecat.turns.user_start import BaseUserTurnStartStrategy, UserTurnStartedParams
|
||||
@@ -127,6 +129,25 @@ class LLMUserAggregatorParams:
|
||||
has been idle (not speaking) for this duration. Set to 0 to disable
|
||||
idle detection.
|
||||
vad_analyzer: Voice Activity Detection analyzer instance.
|
||||
wait_for_transcript_to_end_user_turn: Defaults to True. Set to
|
||||
False for pipelines where local turn detection drives a
|
||||
realtime service like Gemini Live. The realtime service
|
||||
consumes user audio directly, so we don't need user
|
||||
transcripts in context before it can respond, and waiting
|
||||
for them is pure latency. When False:
|
||||
|
||||
- Turn strategies do not consider user transcripts, so the
|
||||
user turn ends sooner. ``on_user_turn_stopped`` fires at
|
||||
the end of turn with empty content. To achieve this,
|
||||
the aggregator drops ``TranscriptionUserTurnStartStrategy``
|
||||
from start strategies and flips
|
||||
``wait_for_transcript=False`` on any stop strategy that
|
||||
supports it.
|
||||
- User transcripts are handled by the aggregator: a simple
|
||||
post-turn transcript wait gives it time to receive them
|
||||
after the user turn ends, then the aggregator emits a
|
||||
new ``on_user_turn_message_finalized`` event with the
|
||||
new user context message.
|
||||
filter_incomplete_user_turns: [DEPRECATED] Use
|
||||
``user_turn_strategies=FilterIncompleteUserTurnStrategies()``
|
||||
instead. When enabled, the LLM outputs a turn-completion
|
||||
@@ -157,6 +178,7 @@ class LLMUserAggregatorParams:
|
||||
user_turn_stop_timeout: float = 5.0
|
||||
user_idle_timeout: float = 0
|
||||
vad_analyzer: VADAnalyzer | None = None
|
||||
wait_for_transcript_to_end_user_turn: bool = True
|
||||
filter_incomplete_user_turns: bool = False
|
||||
user_turn_completion_config: UserTurnCompletionConfig | None = None
|
||||
|
||||
@@ -259,13 +281,43 @@ class LLMAssistantAggregatorParams:
|
||||
|
||||
@dataclass
|
||||
class UserTurnStoppedMessage:
|
||||
"""A user turn stopped message containing a user transcript update.
|
||||
"""A message accompanying ``on_user_turn_stopped`` (end of user turn).
|
||||
|
||||
A message in a conversation transcript containing the user content. This is
|
||||
the aggregated transcript that is then used in the context.
|
||||
With ``wait_for_transcript_to_end_user_turn=True`` (the default),
|
||||
the user message is finalized at the end of the turn, so
|
||||
``content`` carries the aggregated transcript. With it set to
|
||||
False, the aggregator is still in its post-turn transcript wait
|
||||
at this point, so ``content`` is ``None`` — subscribe to
|
||||
``on_user_turn_message_finalized`` for the assembled message.
|
||||
|
||||
Parameters:
|
||||
content: The message content/text.
|
||||
content: The aggregated user transcript, or ``None`` when
|
||||
``wait_for_transcript_to_end_user_turn=False`` (the
|
||||
aggregator is still in its post-turn transcript wait at
|
||||
this point).
|
||||
timestamp: When the user turn started.
|
||||
user_id: Optional identifier for the user.
|
||||
|
||||
"""
|
||||
|
||||
content: str | None
|
||||
timestamp: str
|
||||
user_id: str | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class UserMessageFinalizedMessage:
|
||||
"""A message accompanying ``on_user_turn_message_finalized``.
|
||||
|
||||
Fired when the user message has been finalized into the context.
|
||||
With ``wait_for_transcript_to_end_user_turn=True`` (the default)
|
||||
this coincides with ``on_user_turn_stopped``. With it set to
|
||||
False, the aggregator first runs a post-turn transcript wait, so
|
||||
this event fires later than ``on_user_turn_stopped``.
|
||||
``content`` is always populated.
|
||||
|
||||
Parameters:
|
||||
content: The aggregated user transcript.
|
||||
timestamp: When the user turn started.
|
||||
user_id: Optional identifier for the user.
|
||||
|
||||
@@ -526,8 +578,21 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
|
||||
Event handlers available:
|
||||
|
||||
- on_user_turn_started: Called when the user turn starts
|
||||
- on_user_turn_stopped: Called when the user turn ends
|
||||
- on_user_turn_started: Called when the user turn starts.
|
||||
- on_user_turn_stopped: Called at the end of turn, with a
|
||||
``UserTurnStoppedMessage``. With
|
||||
``wait_for_transcript_to_end_user_turn=True`` (the default),
|
||||
``message.content`` carries the aggregated transcript. With it
|
||||
set to False, the aggregator is still in its post-turn transcript
|
||||
wait at this point, so ``message.content`` is ``None``; subscribe
|
||||
to ``on_user_turn_message_finalized`` for the assembled message.
|
||||
- on_user_turn_message_finalized: Called when the user message
|
||||
has been finalized into the context, with a
|
||||
``UserMessageFinalizedMessage``. With
|
||||
``wait_for_transcript_to_end_user_turn=True`` this coincides
|
||||
with ``on_user_turn_stopped``; with it set to False it fires
|
||||
later, after the aggregator's post-turn transcript wait window
|
||||
completes. ``message.content`` is always populated.
|
||||
- on_user_turn_stop_timeout: Called when no user turn stop strategy triggers
|
||||
- on_user_turn_idle: Called when the user has been idle for the configured timeout
|
||||
- on_user_mute_started: Called when the user becomes muted
|
||||
@@ -543,6 +608,10 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
async def on_user_turn_stopped(aggregator, strategy: BaseUserTurnStopStrategy, message: UserTurnStoppedMessage):
|
||||
...
|
||||
|
||||
@aggregator.event_handler("on_user_turn_message_finalized")
|
||||
async def on_user_turn_message_finalized(aggregator, strategy: BaseUserTurnStopStrategy, message: UserMessageFinalizedMessage):
|
||||
...
|
||||
|
||||
@aggregator.event_handler("on_user_turn_stop_timeout")
|
||||
async def on_user_turn_stop_timeout(aggregator):
|
||||
...
|
||||
@@ -586,12 +655,14 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
|
||||
self._register_event_handler("on_user_turn_started")
|
||||
self._register_event_handler("on_user_turn_stopped")
|
||||
self._register_event_handler("on_user_turn_message_finalized")
|
||||
self._register_event_handler("on_user_turn_stop_timeout")
|
||||
self._register_event_handler("on_user_turn_idle")
|
||||
self._register_event_handler("on_user_turn_inference_triggered")
|
||||
self._register_event_handler("on_user_mute_started")
|
||||
self._register_event_handler("on_user_mute_stopped")
|
||||
|
||||
user_provided_strategies = self._params.user_turn_strategies is not None
|
||||
user_turn_strategies = self._params.user_turn_strategies or UserTurnStrategies()
|
||||
|
||||
# Deprecated path: translate filter_incomplete_user_turns into
|
||||
@@ -605,6 +676,17 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
)
|
||||
self._params.user_turn_strategies = user_turn_strategies
|
||||
|
||||
# When `wait_for_transcript_to_end_user_turn=False`, mutate the
|
||||
# user turn strategies so they don't consider user transcripts:
|
||||
# drop the transcription start strategy, flip
|
||||
# `wait_for_transcript=False` on stop strategies that support
|
||||
# it. Loud log if the user passed their own strategies (we're
|
||||
# overwriting parts of their config); quiet log otherwise.
|
||||
if not self._params.wait_for_transcript_to_end_user_turn:
|
||||
self._apply_no_transcript_wait_bundle(
|
||||
user_turn_strategies, user_provided_strategies=user_provided_strategies
|
||||
)
|
||||
|
||||
self._user_is_muted = False
|
||||
self._user_turn_start_timestamp = ""
|
||||
# Full transcript across the user turn. Each
|
||||
@@ -616,6 +698,20 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
# inferences fire before finalization.
|
||||
self._full_user_turn_aggregation: str | None = None
|
||||
|
||||
# Post-turn transcript wait state, used when the aggregator
|
||||
# waits for transcripts after the user turn ends
|
||||
# (`_wait_for_post_turn_transcripts == True`):
|
||||
# `on_user_turn_stopped` has fired with empty content, and the
|
||||
# aggregator is waiting on `_post_turn_transcript_wait_task`
|
||||
# before finalizing the user message into context. The wait
|
||||
# window duration is taken from the last `STTMetadataFrame`
|
||||
# seen (`STTMetadataFrame.ttfs_p99_latency`), falling back to
|
||||
# `DEFAULT_TTFS_P99` if no STT service has reported one.
|
||||
self._post_turn_transcript_wait_strategy: BaseUserTurnStopStrategy | None = None
|
||||
self._inference_during_post_turn_transcript_wait: bool = False
|
||||
self._post_turn_transcript_wait_task: asyncio.Task | None = None
|
||||
self._stt_ttfs_p99_latency: float | None = None
|
||||
|
||||
self._user_turn_controller = UserTurnController(
|
||||
user_turn_strategies=user_turn_strategies,
|
||||
user_turn_stop_timeout=self._params.user_turn_stop_timeout,
|
||||
@@ -658,6 +754,81 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
self._vad_controller.add_event_handler("on_push_frame", self._on_push_frame)
|
||||
self._vad_controller.add_event_handler("on_broadcast_frame", self._on_broadcast_frame)
|
||||
|
||||
@property
|
||||
def _wait_for_post_turn_transcripts(self) -> bool:
|
||||
"""True when the aggregator runs a post-turn transcript wait.
|
||||
|
||||
Inverse of the public ``wait_for_transcript_to_end_user_turn``
|
||||
param: when that's False, this is True. In this mode, turn
|
||||
strategies don't consider user transcripts (so the user turn
|
||||
ends sooner), and the aggregator runs a simple timer after the
|
||||
end of turn to receive any transcripts that arrive, then emits
|
||||
``on_user_turn_message_finalized`` with the assembled user
|
||||
context message. Always travels with the strategy-mutation
|
||||
bundle applied at init.
|
||||
"""
|
||||
return not self._params.wait_for_transcript_to_end_user_turn
|
||||
|
||||
def _apply_no_transcript_wait_bundle(
|
||||
self,
|
||||
user_turn_strategies: UserTurnStrategies,
|
||||
*,
|
||||
user_provided_strategies: bool,
|
||||
):
|
||||
"""Adjust strategies to match ``wait_for_transcript_to_end_user_turn=False``.
|
||||
|
||||
Mutates the user turn strategies so they don't consider user
|
||||
transcripts: drops ``TranscriptionUserTurnStartStrategy`` from
|
||||
start strategies (so late-arriving transcripts don't start
|
||||
new turns), and sets ``wait_for_transcript=False`` on any
|
||||
stop strategy that supports it. The net effect: the user turn
|
||||
ends sooner.
|
||||
|
||||
Logs loudly when adjusting user-provided strategies — we're
|
||||
mutating objects the caller passed in. Logs quietly when only
|
||||
synthesized defaults are in play.
|
||||
"""
|
||||
# Local import to avoid a top-level cycle with `turns.user_start`.
|
||||
from pipecat.turns.user_start import TranscriptionUserTurnStartStrategy
|
||||
|
||||
adjustments: list[str] = []
|
||||
|
||||
if user_turn_strategies.start:
|
||||
filtered = [
|
||||
s
|
||||
for s in user_turn_strategies.start
|
||||
if not isinstance(s, TranscriptionUserTurnStartStrategy)
|
||||
]
|
||||
dropped = len(user_turn_strategies.start) - len(filtered)
|
||||
if dropped:
|
||||
user_turn_strategies.start = filtered
|
||||
adjustments.append(
|
||||
f"dropped {dropped} TranscriptionUserTurnStartStrategy from start strategies"
|
||||
)
|
||||
|
||||
flipped = 0
|
||||
for s in user_turn_strategies.stop or []:
|
||||
if hasattr(s, "_wait_for_transcript") and s._wait_for_transcript:
|
||||
s._wait_for_transcript = False
|
||||
flipped += 1
|
||||
if flipped:
|
||||
adjustments.append(
|
||||
f"set wait_for_transcript=False on {flipped} stop "
|
||||
f"strateg{'y' if flipped == 1 else 'ies'}"
|
||||
)
|
||||
|
||||
if not adjustments:
|
||||
return
|
||||
|
||||
message = (
|
||||
f"{self}: wait_for_transcript_to_end_user_turn=False adjusted "
|
||||
f"user turn strategies: {'; '.join(adjustments)}."
|
||||
)
|
||||
if user_provided_strategies:
|
||||
logger.warning(message)
|
||||
else:
|
||||
logger.info(message)
|
||||
|
||||
async def cleanup(self):
|
||||
"""Clean up processor resources."""
|
||||
await super().cleanup()
|
||||
@@ -697,6 +868,13 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
# Interim transcriptions and translations are consumed here
|
||||
# and not pushed downstream, same as final TranscriptionFrame.
|
||||
pass
|
||||
elif isinstance(frame, STTMetadataFrame):
|
||||
# Record the STT service's reported P99 TTFS so the
|
||||
# post-turn transcript wait timer can size itself to the real
|
||||
# latency. Frame is also pushed downstream so other
|
||||
# processors keep seeing it.
|
||||
self._stt_ttfs_p99_latency = frame.ttfs_p99_latency
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, LLMRunFrame):
|
||||
await self._handle_llm_run(frame)
|
||||
elif isinstance(frame, LLMMessagesAppendFrame):
|
||||
@@ -747,13 +925,31 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
await s.setup(self.task_manager)
|
||||
|
||||
async def _stop(self, frame: EndFrame):
|
||||
await self._maybe_emit_user_turn_stopped(on_session_end=True)
|
||||
await self._finalize_on_session_end()
|
||||
await self._cleanup()
|
||||
|
||||
async def _cancel(self, frame: CancelFrame):
|
||||
await self._maybe_emit_user_turn_stopped(on_session_end=True)
|
||||
await self._finalize_on_session_end()
|
||||
await self._cleanup()
|
||||
|
||||
async def _finalize_on_session_end(self):
|
||||
"""Flush any pending user message on session end.
|
||||
|
||||
If a post-turn transcript wait is in flight, complete it now so
|
||||
the user message is captured before the session shuts down.
|
||||
Otherwise, run the mode-appropriate finalize path on whatever
|
||||
is currently in the buffer.
|
||||
"""
|
||||
if (
|
||||
self._post_turn_transcript_wait_strategy is not None
|
||||
or self._inference_during_post_turn_transcript_wait
|
||||
):
|
||||
await self._complete_post_turn_transcript_wait(on_session_end=True)
|
||||
elif self._wait_for_post_turn_transcripts:
|
||||
await self._finalize_user_message(on_session_end=True)
|
||||
else:
|
||||
await self._finalize_user_turn(on_session_end=True)
|
||||
|
||||
async def _cleanup(self):
|
||||
if self._vad_controller:
|
||||
await self._vad_controller.cleanup()
|
||||
@@ -884,6 +1080,21 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
):
|
||||
logger.debug(f"{self}: User started speaking (strategy: {strategy})")
|
||||
|
||||
# Precondition guard: if the previous turn's post-turn
|
||||
# transcript wait is still active when the next turn starts,
|
||||
# the assumption that transcripts arrive before the next turn
|
||||
# has been violated. Complete the previous turn's wait now so
|
||||
# its user message is finalized before this turn proceeds.
|
||||
if (
|
||||
self._post_turn_transcript_wait_strategy is not None
|
||||
or self._inference_during_post_turn_transcript_wait
|
||||
):
|
||||
logger.warning(
|
||||
f"{self}: user turn started before previous turn's transcripts "
|
||||
f"arrived; flushing previous turn now"
|
||||
)
|
||||
await self._complete_post_turn_transcript_wait()
|
||||
|
||||
self._user_turn_start_timestamp = time_now_iso8601()
|
||||
self._full_user_turn_aggregation = None
|
||||
|
||||
@@ -904,6 +1115,14 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
):
|
||||
logger.debug(f"{self}: User turn inference triggered (strategy: {strategy})")
|
||||
|
||||
if self._wait_for_post_turn_transcripts:
|
||||
# The aggregator is in its post-turn transcript wait.
|
||||
# Defer push_aggregation and event emission; they'll run
|
||||
# alongside user message finalization when the wait window
|
||||
# completes.
|
||||
self._inference_during_post_turn_transcript_wait = True
|
||||
return
|
||||
|
||||
# Push aggregation now: this writes the user message segment to
|
||||
# the context and emits LLMContextFrame, which kicks LLM
|
||||
# inference. Concatenate the segment into
|
||||
@@ -929,42 +1148,144 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
):
|
||||
logger.debug(f"{self}: User stopped speaking (strategy: {strategy})")
|
||||
|
||||
# End-of-turn side effects always fire on the strategy event,
|
||||
# regardless of whether user message finalization is deferred
|
||||
# to a post-turn transcript wait window.
|
||||
if params.enable_user_speaking_frames:
|
||||
await self.broadcast_frame(UserStoppedSpeakingFrame)
|
||||
|
||||
await self._user_idle_controller.process_frame(UserStoppedSpeakingFrame())
|
||||
|
||||
await self._maybe_emit_user_turn_stopped(strategy)
|
||||
if self._wait_for_post_turn_transcripts:
|
||||
# Fire `on_user_turn_stopped` now for the end of turn —
|
||||
# content is `None` because no transcripts have arrived
|
||||
# yet. Start the post-turn transcript wait timer; when it
|
||||
# completes, the aggregator finalizes the user message and
|
||||
# emits `on_user_turn_message_finalized`. Consumers wanting
|
||||
# the assembled message subscribe to
|
||||
# `on_user_turn_message_finalized`.
|
||||
end_of_turn_message = UserTurnStoppedMessage(
|
||||
content=None, timestamp=self._user_turn_start_timestamp
|
||||
)
|
||||
await self._call_event_handler("on_user_turn_stopped", strategy, end_of_turn_message)
|
||||
|
||||
self._post_turn_transcript_wait_strategy = strategy
|
||||
wait_timeout = (
|
||||
self._stt_ttfs_p99_latency
|
||||
if self._stt_ttfs_p99_latency is not None
|
||||
else DEFAULT_TTFS_P99
|
||||
)
|
||||
self._post_turn_transcript_wait_task = self.create_task(
|
||||
self._post_turn_transcript_wait_handler(wait_timeout),
|
||||
f"{self}::post_turn_transcript_wait",
|
||||
)
|
||||
return
|
||||
|
||||
await self._finalize_user_turn(strategy)
|
||||
|
||||
async def _post_turn_transcript_wait_handler(self, timeout: float):
|
||||
"""Post-turn transcript wait timer.
|
||||
|
||||
Waits ``timeout`` seconds — giving transcripts time to arrive
|
||||
after the end of turn — then completes the wait and finalizes
|
||||
the user message into context, with whatever transcripts the
|
||||
aggregator has received by then (possibly none).
|
||||
|
||||
The simple-timer approach relies on the assumptions that
|
||||
transcripts don't arrive too late and that the assistant
|
||||
response won't finish before this timer.
|
||||
|
||||
Cancelled by reset, the next-turn precondition guard, or
|
||||
session end.
|
||||
"""
|
||||
try:
|
||||
await asyncio.sleep(timeout)
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
finally:
|
||||
self._post_turn_transcript_wait_task = None
|
||||
|
||||
await self._complete_post_turn_transcript_wait()
|
||||
|
||||
async def _complete_post_turn_transcript_wait(self, *, on_session_end: bool = False):
|
||||
"""Complete the active post-turn transcript wait window.
|
||||
|
||||
``on_user_turn_stopped`` already fired at the end of turn (with
|
||||
empty content) and the aggregator has been receiving
|
||||
transcripts since. This finalizes that work: flushes any
|
||||
inference-triggered segment whose push was deferred during the
|
||||
wait, then emits ``on_user_turn_message_finalized`` with the
|
||||
assembled user context message. Called from the post-turn
|
||||
transcript wait timer (the normal path), the precondition guard
|
||||
in ``_on_user_turn_started``, and the session-end paths.
|
||||
"""
|
||||
if self._post_turn_transcript_wait_task:
|
||||
await self.cancel_task(self._post_turn_transcript_wait_task)
|
||||
self._post_turn_transcript_wait_task = None
|
||||
|
||||
wait_strategy = self._post_turn_transcript_wait_strategy
|
||||
had_pending_inference = self._inference_during_post_turn_transcript_wait
|
||||
self._post_turn_transcript_wait_strategy = None
|
||||
self._inference_during_post_turn_transcript_wait = False
|
||||
|
||||
if had_pending_inference:
|
||||
segment = await self.push_aggregation()
|
||||
if segment:
|
||||
if self._full_user_turn_aggregation:
|
||||
self._full_user_turn_aggregation = (
|
||||
f"{self._full_user_turn_aggregation} {segment}".strip()
|
||||
)
|
||||
else:
|
||||
self._full_user_turn_aggregation = segment
|
||||
await self._call_event_handler("on_user_turn_inference_triggered", wait_strategy)
|
||||
|
||||
if wait_strategy is not None or on_session_end:
|
||||
# `on_user_turn_stopped` already fired at the end of turn;
|
||||
# this is the deferred user message finalization.
|
||||
await self._finalize_user_message(wait_strategy, on_session_end=on_session_end)
|
||||
|
||||
async def _on_reset_aggregation(
|
||||
self, controller: UserTurnController, strategy: BaseUserTurnStartStrategy
|
||||
):
|
||||
logger.debug(f"{self}: Resetting aggregation (strategy: {strategy})")
|
||||
await self._cancel_post_turn_transcript_wait()
|
||||
await self.reset()
|
||||
|
||||
async def _cancel_post_turn_transcript_wait(self):
|
||||
"""Cancel any active post-turn transcript wait window without finalizing.
|
||||
|
||||
Called from reset paths (interruption, explicit reset).
|
||||
"Reset" means "throw it away" — we don't flush a partial
|
||||
transcript that was about to be invalidated anyway.
|
||||
"""
|
||||
if self._post_turn_transcript_wait_task:
|
||||
await self.cancel_task(self._post_turn_transcript_wait_task)
|
||||
self._post_turn_transcript_wait_task = None
|
||||
self._post_turn_transcript_wait_strategy = None
|
||||
self._inference_during_post_turn_transcript_wait = False
|
||||
|
||||
async def _on_user_turn_stop_timeout(self, controller):
|
||||
await self._call_event_handler("on_user_turn_stop_timeout")
|
||||
|
||||
async def _on_user_turn_idle(self, controller):
|
||||
await self._call_event_handler("on_user_turn_idle")
|
||||
|
||||
async def _maybe_emit_user_turn_stopped(
|
||||
self,
|
||||
strategy: BaseUserTurnStopStrategy | None = None,
|
||||
on_session_end: bool = False,
|
||||
):
|
||||
"""Maybe emit user turn stopped event.
|
||||
async def _flush_user_message_to_context(
|
||||
self, on_session_end: bool = False
|
||||
) -> tuple[str, str] | None:
|
||||
"""Push the aggregated user message to context, return ``(content, timestamp)``.
|
||||
|
||||
Earlier inference triggers in the same turn have already pushed
|
||||
their segments to the context and accumulated them into
|
||||
``self._full_user_turn_aggregation``. Any aggregation that
|
||||
arrived after the last inference trigger is flushed here so
|
||||
end-of-turn content is never lost from the public event.
|
||||
Earlier inference triggers in the same turn already pushed their
|
||||
segments to the context and accumulated them in
|
||||
``self._full_user_turn_aggregation``; whatever arrived after the
|
||||
last inference trigger is flushed here so end-of-turn content is
|
||||
never lost.
|
||||
|
||||
Args:
|
||||
strategy: The strategy that triggered the turn stop.
|
||||
on_session_end: If True, only emit if there's unemitted content
|
||||
(avoids duplicate events when session ends).
|
||||
Returns ``(content, timestamp)`` for the just-finalized user
|
||||
message, or ``None`` when there's no content to flush and
|
||||
``on_session_end=True`` (avoids emitting empty events during
|
||||
session shutdown). Callers construct the appropriate message
|
||||
dataclass for each event they emit.
|
||||
"""
|
||||
segment = await self.push_aggregation()
|
||||
full_aggregation = self._full_user_turn_aggregation
|
||||
@@ -975,12 +1296,53 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
else:
|
||||
content = full_aggregation or segment
|
||||
|
||||
if not on_session_end or content:
|
||||
message = UserTurnStoppedMessage(
|
||||
content=content, timestamp=self._user_turn_start_timestamp
|
||||
)
|
||||
await self._call_event_handler("on_user_turn_stopped", strategy, message)
|
||||
self._user_turn_start_timestamp = ""
|
||||
if on_session_end and not content:
|
||||
return None
|
||||
|
||||
timestamp = self._user_turn_start_timestamp
|
||||
self._user_turn_start_timestamp = ""
|
||||
return content, timestamp
|
||||
|
||||
async def _finalize_user_turn(
|
||||
self,
|
||||
strategy: BaseUserTurnStopStrategy | None = None,
|
||||
on_session_end: bool = False,
|
||||
):
|
||||
"""Finalize the user turn: flush the message, emit both events.
|
||||
|
||||
Used in the default mode (``_wait_for_post_turn_transcripts ==
|
||||
False``), where end of turn and user message finalization
|
||||
coincide. Emits both ``on_user_turn_stopped`` and
|
||||
``on_user_turn_message_finalized``.
|
||||
"""
|
||||
result = await self._flush_user_message_to_context(on_session_end=on_session_end)
|
||||
if result is None:
|
||||
return
|
||||
content, timestamp = result
|
||||
stopped_msg = UserTurnStoppedMessage(content=content, timestamp=timestamp)
|
||||
finalized_msg = UserMessageFinalizedMessage(content=content, timestamp=timestamp)
|
||||
await self._call_event_handler("on_user_turn_stopped", strategy, stopped_msg)
|
||||
await self._call_event_handler("on_user_turn_message_finalized", strategy, finalized_msg)
|
||||
|
||||
async def _finalize_user_message(
|
||||
self,
|
||||
strategy: BaseUserTurnStopStrategy | None = None,
|
||||
on_session_end: bool = False,
|
||||
):
|
||||
"""Finalize the user message: flush to context, emit one event.
|
||||
|
||||
Used when the aggregator runs a post-turn transcript wait
|
||||
(``_wait_for_post_turn_transcripts == True``), where user
|
||||
message finalization fires after the end of turn. Emits
|
||||
``on_user_turn_message_finalized`` only; ``on_user_turn_stopped``
|
||||
was already emitted at the end of turn.
|
||||
"""
|
||||
result = await self._flush_user_message_to_context(on_session_end=on_session_end)
|
||||
if result is None:
|
||||
return
|
||||
content, timestamp = result
|
||||
finalized_msg = UserMessageFinalizedMessage(content=content, timestamp=timestamp)
|
||||
await self._call_event_handler("on_user_turn_message_finalized", strategy, finalized_msg)
|
||||
|
||||
|
||||
class LLMAssistantAggregator(LLMContextAggregator):
|
||||
|
||||
@@ -43,18 +43,42 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
|
||||
(rearmed on each transcript). stt_timeout has no meaning here since it
|
||||
is defined relative to VAD stop, and STT has already emitted a
|
||||
transcript — so the stt wait is marked done immediately.
|
||||
|
||||
Set ``wait_for_transcript=False`` to make this strategy not consider
|
||||
user transcripts, so the user turn ends sooner — as soon as the
|
||||
user_speech_timeout elapses. Most callers don't set this directly:
|
||||
it's flipped automatically by
|
||||
``wait_for_transcript_to_end_user_turn=False`` on
|
||||
``LLMUserAggregatorParams``, which also wires the aggregator to
|
||||
gather user transcripts after the turn ends. That pattern fits
|
||||
pipelines where local turn detection drives a realtime service like
|
||||
Gemini Live — the realtime service consumes user audio directly,
|
||||
so user transcripts don't need to be in context before it can
|
||||
respond.
|
||||
"""
|
||||
|
||||
def __init__(self, *, user_speech_timeout: float = 0.6, **kwargs):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
user_speech_timeout: float = 0.6,
|
||||
wait_for_transcript: bool = True,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the speech timeout-based user turn stop strategy.
|
||||
|
||||
Args:
|
||||
user_speech_timeout: Time to wait for the user to potentially
|
||||
say more after they pause speaking. Defaults to 0.6 seconds.
|
||||
wait_for_transcript: Whether the strategy considers user
|
||||
transcripts in deciding when the user turn ends.
|
||||
Defaults to True. Usually flipped indirectly via
|
||||
``wait_for_transcript_to_end_user_turn=False`` on
|
||||
``LLMUserAggregatorParams``.
|
||||
**kwargs: Additional keyword arguments.
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
self._user_speech_timeout = user_speech_timeout
|
||||
self._wait_for_transcript = wait_for_transcript
|
||||
self._stt_timeout: float = 0.0 # STT P99 latency from STTMetadataFrame
|
||||
self._stop_secs: float = 0.0 # VAD stop_secs from VADUserStoppedSpeakingFrame
|
||||
self._stop_secs_warned: bool = False
|
||||
@@ -158,11 +182,12 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
|
||||
# fallback-mode run of the same timer is superseded here.
|
||||
await self._restart_user_speech_timer()
|
||||
|
||||
# stt_timeout is a safety net. Short-circuit it if the transcript is
|
||||
# already finalized, or if the VAD stop_secs already covered it.
|
||||
# stt_timeout is a safety net. Short-circuit it if we're not waiting
|
||||
# for a transcript, if the transcript is already finalized, or if the
|
||||
# VAD stop_secs already covered it.
|
||||
self._stt_wait_done = False
|
||||
effective_stt_wait = max(0.0, self._stt_timeout - self._stop_secs)
|
||||
if self._transcript_finalized or effective_stt_wait <= 0:
|
||||
if not self._wait_for_transcript or self._transcript_finalized or effective_stt_wait <= 0:
|
||||
self._stt_wait_done = True
|
||||
else:
|
||||
self._stt_timeout_task = self.task_manager.create_task(
|
||||
@@ -253,9 +278,11 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
|
||||
Both timers must be done (stt is marked done immediately on the
|
||||
fallback path and when finalization short-circuits the safety net),
|
||||
the user must not be currently speaking, and at least one transcript
|
||||
must have been received.
|
||||
must have been received (skipped when ``wait_for_transcript`` is False).
|
||||
"""
|
||||
if self._vad_user_speaking or not self._text:
|
||||
if self._vad_user_speaking:
|
||||
return
|
||||
if self._wait_for_transcript and not self._text:
|
||||
return
|
||||
|
||||
if self._user_speech_wait_done and self._stt_wait_done:
|
||||
|
||||
@@ -42,17 +42,41 @@ class TurnAnalyzerUserTurnStopStrategy(BaseUserTurnStopStrategy):
|
||||
the turn can be triggered immediately once the finalized transcript is
|
||||
received. Otherwise, an STT timeout (adjusted by VAD stop_secs) is used
|
||||
as a fallback.
|
||||
|
||||
Set ``wait_for_transcript=False`` to make this strategy not consider
|
||||
user transcripts, so the user turn ends sooner — as soon as the
|
||||
analyzer concludes the turn is complete. Most callers don't set
|
||||
this directly: it's flipped automatically by
|
||||
``wait_for_transcript_to_end_user_turn=False`` on
|
||||
``LLMUserAggregatorParams``, which also wires the aggregator to
|
||||
gather user transcripts after the turn ends. That pattern fits
|
||||
pipelines where local turn detection drives a realtime service like
|
||||
Gemini Live — the realtime service consumes user audio directly,
|
||||
so user transcripts don't need to be in context before it can
|
||||
respond.
|
||||
"""
|
||||
|
||||
def __init__(self, *, turn_analyzer: BaseTurnAnalyzer, **kwargs):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
turn_analyzer: BaseTurnAnalyzer,
|
||||
wait_for_transcript: bool = True,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the user turn stop strategy.
|
||||
|
||||
Args:
|
||||
turn_analyzer: The turn detection analyzer instance to detect end of user turn.
|
||||
wait_for_transcript: Whether the strategy considers user
|
||||
transcripts in deciding when the user turn ends.
|
||||
Defaults to True. Usually flipped indirectly via
|
||||
``wait_for_transcript_to_end_user_turn=False`` on
|
||||
``LLMUserAggregatorParams``.
|
||||
**kwargs: Additional keyword arguments.
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
self._turn_analyzer = turn_analyzer
|
||||
self._wait_for_transcript = wait_for_transcript
|
||||
self._stt_timeout: float = 0.0 # STT P99 latency from STTMetadataFrame
|
||||
self._stop_secs: float = 0.0 # VAD stop_secs from VADUserStoppedSpeakingFrame
|
||||
|
||||
@@ -169,6 +193,13 @@ class TurnAnalyzerUserTurnStopStrategy(BaseUserTurnStopStrategy):
|
||||
# wait for transcriptions.
|
||||
self._turn_complete = state == EndOfTurnState.COMPLETE
|
||||
|
||||
if not self._wait_for_transcript:
|
||||
# No transcript to wait for. Trigger now if the turn is already
|
||||
# complete; otherwise the analyzer's audio path will trigger once
|
||||
# it indicates completion.
|
||||
await self._maybe_trigger_user_turn_stopped()
|
||||
return
|
||||
|
||||
# Start the STT timeout (adjusted by VAD stop_secs since that time already elapsed)
|
||||
timeout = max(0, self._stt_timeout - self._stop_secs)
|
||||
|
||||
@@ -256,11 +287,13 @@ class TurnAnalyzerUserTurnStopStrategy(BaseUserTurnStopStrategy):
|
||||
"""Trigger user turn stopped if conditions are met.
|
||||
|
||||
Conditions:
|
||||
- We have transcription text
|
||||
- We have transcription text (skipped when ``wait_for_transcript`` is False)
|
||||
- Turn analyzer indicates turn is complete
|
||||
- Either the timeout has elapsed OR we have a finalized transcript
|
||||
"""
|
||||
if not self._text or not self._turn_complete:
|
||||
if not self._turn_complete:
|
||||
return
|
||||
if self._wait_for_transcript and not self._text:
|
||||
return
|
||||
|
||||
# For finalized transcripts, trigger immediately
|
||||
|
||||
@@ -763,6 +763,474 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase):
|
||||
user_messages = [m for m in context.get_messages() if m.get("role") == "user"]
|
||||
self.assertEqual([m["content"] for m in user_messages], ["I'm thinking", "about pizza"])
|
||||
|
||||
async def test_no_wait_for_transcript_basic_flow(self):
|
||||
"""``wait_for_transcript_to_end_user_turn=False`` splits the lifecycle:
|
||||
|
||||
- ``on_user_turn_stopped`` fires at the end of turn with empty
|
||||
content (no transcripts have arrived yet).
|
||||
- Transcripts arriving after the end of turn are captured into
|
||||
``_aggregation``.
|
||||
- When the post-turn transcript wait timer fires,
|
||||
``on_user_turn_message_finalized`` fires with the populated
|
||||
user context message.
|
||||
"""
|
||||
from unittest.mock import patch
|
||||
|
||||
from pipecat.processors.aggregators import llm_response_universal
|
||||
|
||||
# Shrink the timer so the test runs quickly.
|
||||
with patch.object(llm_response_universal, "DEFAULT_TTFS_P99", TRANSCRIPTION_TIMEOUT):
|
||||
context = LLMContext()
|
||||
user_aggregator = LLMUserAggregator(
|
||||
context,
|
||||
params=LLMUserAggregatorParams(
|
||||
user_turn_strategies=UserTurnStrategies(
|
||||
stop=[
|
||||
SpeechTimeoutUserTurnStopStrategy(
|
||||
user_speech_timeout=TRANSCRIPTION_TIMEOUT
|
||||
)
|
||||
],
|
||||
),
|
||||
wait_for_transcript_to_end_user_turn=False,
|
||||
),
|
||||
)
|
||||
|
||||
events: list[tuple[str, str]] = []
|
||||
|
||||
@user_aggregator.event_handler("on_user_turn_stopped")
|
||||
async def on_stopped(aggregator, strategy, message):
|
||||
events.append(("stopped", message.content))
|
||||
|
||||
@user_aggregator.event_handler("on_user_turn_message_finalized")
|
||||
async def on_finalized(aggregator, strategy, message):
|
||||
events.append(("finalized", message.content))
|
||||
|
||||
pipeline = Pipeline([user_aggregator])
|
||||
|
||||
frames_to_send = [
|
||||
VADUserStartedSpeakingFrame(),
|
||||
SleepFrame(),
|
||||
VADUserStoppedSpeakingFrame(),
|
||||
# Let the user_speech_timeout fire so the strategy
|
||||
# fires turn-stopped.
|
||||
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.05),
|
||||
# Transcripts arrive after the end of turn (just one
|
||||
# here for the basic case).
|
||||
TranscriptionFrame(text="Hello!", user_id="", timestamp="now"),
|
||||
# Wait for the post-turn transcript wait timer to fire.
|
||||
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.05),
|
||||
]
|
||||
await run_test(pipeline, frames_to_send=frames_to_send)
|
||||
|
||||
# Two events fired in order: end of turn first (empty),
|
||||
# user message finalization later (populated).
|
||||
self.assertEqual(events, [("stopped", None), ("finalized", "Hello!")])
|
||||
|
||||
# Context contains the user message.
|
||||
user_messages = [m for m in context.get_messages() if m.get("role") == "user"]
|
||||
self.assertEqual([m["content"] for m in user_messages], ["Hello!"])
|
||||
|
||||
async def test_no_wait_for_transcript_uses_stt_metadata_for_wait_timer(self):
|
||||
"""The post-turn transcript wait timer prefers the STT-reported P99 TTFS
|
||||
over ``DEFAULT_TTFS_P99``. With a long ``DEFAULT_TTFS_P99`` and
|
||||
a short STT-reported value, the wait completes by the shorter
|
||||
time — if the timer fell back to ``DEFAULT_TTFS_P99``, this test
|
||||
would hang.
|
||||
"""
|
||||
from unittest.mock import patch
|
||||
|
||||
from pipecat.frames.frames import STTMetadataFrame
|
||||
from pipecat.processors.aggregators import llm_response_universal
|
||||
|
||||
with patch.object(llm_response_universal, "DEFAULT_TTFS_P99", 60.0):
|
||||
context = LLMContext()
|
||||
user_aggregator = LLMUserAggregator(
|
||||
context,
|
||||
params=LLMUserAggregatorParams(
|
||||
user_turn_strategies=UserTurnStrategies(
|
||||
stop=[
|
||||
SpeechTimeoutUserTurnStopStrategy(
|
||||
user_speech_timeout=TRANSCRIPTION_TIMEOUT
|
||||
)
|
||||
],
|
||||
),
|
||||
wait_for_transcript_to_end_user_turn=False,
|
||||
),
|
||||
)
|
||||
|
||||
events: list[tuple[str, str | None]] = []
|
||||
|
||||
@user_aggregator.event_handler("on_user_turn_stopped")
|
||||
async def on_stopped(aggregator, strategy, message):
|
||||
events.append(("stopped", message.content))
|
||||
|
||||
@user_aggregator.event_handler("on_user_turn_message_finalized")
|
||||
async def on_finalized(aggregator, strategy, message):
|
||||
events.append(("finalized", message.content))
|
||||
|
||||
pipeline = Pipeline([user_aggregator])
|
||||
|
||||
frames_to_send = [
|
||||
# STT service advertises its P99 TTFS latency.
|
||||
STTMetadataFrame(service_name="TestSTT", ttfs_p99_latency=TRANSCRIPTION_TIMEOUT),
|
||||
VADUserStartedSpeakingFrame(),
|
||||
SleepFrame(),
|
||||
VADUserStoppedSpeakingFrame(),
|
||||
# Let the user_speech_timeout fire so the strategy
|
||||
# fires turn-stopped.
|
||||
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.05),
|
||||
TranscriptionFrame(text="Hello!", user_id="", timestamp="now"),
|
||||
# Wait for the post-turn transcript wait timer to fire (sized
|
||||
# to the STT-reported TTFS, not DEFAULT_TTFS_P99).
|
||||
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.05),
|
||||
]
|
||||
await run_test(pipeline, frames_to_send=frames_to_send)
|
||||
|
||||
self.assertEqual(events, [("stopped", None), ("finalized", "Hello!")])
|
||||
|
||||
async def test_no_wait_for_transcript_no_transcripts_arrive(self):
|
||||
"""When no transcripts arrive, the post-turn transcript wait timer still
|
||||
runs — ``on_user_turn_message_finalized`` fires with empty
|
||||
content and nothing is written to context.
|
||||
"""
|
||||
from unittest.mock import patch
|
||||
|
||||
from pipecat.processors.aggregators import llm_response_universal
|
||||
|
||||
with patch.object(llm_response_universal, "DEFAULT_TTFS_P99", TRANSCRIPTION_TIMEOUT):
|
||||
context = LLMContext()
|
||||
user_aggregator = LLMUserAggregator(
|
||||
context,
|
||||
params=LLMUserAggregatorParams(
|
||||
user_turn_strategies=UserTurnStrategies(
|
||||
stop=[
|
||||
SpeechTimeoutUserTurnStopStrategy(
|
||||
user_speech_timeout=TRANSCRIPTION_TIMEOUT
|
||||
)
|
||||
],
|
||||
),
|
||||
wait_for_transcript_to_end_user_turn=False,
|
||||
),
|
||||
)
|
||||
|
||||
events: list[tuple[str, str]] = []
|
||||
|
||||
@user_aggregator.event_handler("on_user_turn_stopped")
|
||||
async def on_stopped(aggregator, strategy, message):
|
||||
events.append(("stopped", message.content))
|
||||
|
||||
@user_aggregator.event_handler("on_user_turn_message_finalized")
|
||||
async def on_finalized(aggregator, strategy, message):
|
||||
events.append(("finalized", message.content))
|
||||
|
||||
pipeline = Pipeline([user_aggregator])
|
||||
|
||||
frames_to_send = [
|
||||
VADUserStartedSpeakingFrame(),
|
||||
SleepFrame(),
|
||||
VADUserStoppedSpeakingFrame(),
|
||||
# Strategy fires turn-stopped.
|
||||
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.05),
|
||||
# Pending-finalization timer fires without any transcripts.
|
||||
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.05),
|
||||
]
|
||||
await run_test(pipeline, frames_to_send=frames_to_send)
|
||||
|
||||
self.assertEqual(events, [("stopped", None), ("finalized", "")])
|
||||
|
||||
# No user message added to context (empty aggregation).
|
||||
user_messages = [m for m in context.get_messages() if m.get("role") == "user"]
|
||||
self.assertEqual(user_messages, [])
|
||||
|
||||
async def test_no_wait_for_transcript_next_turn_force_flushes_previous(self):
|
||||
"""If a new user turn starts while the previous turn's
|
||||
finalization is still pending (precondition violation), the
|
||||
previous turn's finalization fires before the new turn's start.
|
||||
Whatever transcripts were captured by then are what lands in
|
||||
context.
|
||||
"""
|
||||
from unittest.mock import patch
|
||||
|
||||
from pipecat.processors.aggregators import llm_response_universal
|
||||
|
||||
with patch.object(
|
||||
llm_response_universal,
|
||||
"DEFAULT_TTFS_P99",
|
||||
TRANSCRIPTION_TIMEOUT * 10, # timer should NOT fire during the test
|
||||
):
|
||||
context = LLMContext()
|
||||
user_aggregator = LLMUserAggregator(
|
||||
context,
|
||||
params=LLMUserAggregatorParams(
|
||||
user_turn_strategies=UserTurnStrategies(
|
||||
stop=[
|
||||
SpeechTimeoutUserTurnStopStrategy(
|
||||
user_speech_timeout=TRANSCRIPTION_TIMEOUT
|
||||
)
|
||||
],
|
||||
),
|
||||
wait_for_transcript_to_end_user_turn=False,
|
||||
),
|
||||
)
|
||||
|
||||
events: list[tuple[str, str]] = []
|
||||
|
||||
@user_aggregator.event_handler("on_user_turn_stopped")
|
||||
async def on_stopped(aggregator, strategy, message):
|
||||
events.append(("stopped", message.content))
|
||||
|
||||
@user_aggregator.event_handler("on_user_turn_message_finalized")
|
||||
async def on_finalized(aggregator, strategy, message):
|
||||
events.append(("finalized", message.content))
|
||||
|
||||
@user_aggregator.event_handler("on_user_turn_started")
|
||||
async def on_started(aggregator, strategy):
|
||||
events.append(("started", ""))
|
||||
|
||||
pipeline = Pipeline([user_aggregator])
|
||||
|
||||
frames_to_send = [
|
||||
# Turn 1
|
||||
VADUserStartedSpeakingFrame(),
|
||||
SleepFrame(),
|
||||
VADUserStoppedSpeakingFrame(),
|
||||
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.05),
|
||||
# Late transcript for turn 1 arrives (just one here for
|
||||
# simplicity).
|
||||
TranscriptionFrame(text="Hello!", user_id="", timestamp="now"),
|
||||
SleepFrame(),
|
||||
# Turn 2 starts before turn 1's post-turn transcript wait timer
|
||||
# fires — precondition violation. The aggregator should
|
||||
# force-flush turn 1 first.
|
||||
VADUserStartedSpeakingFrame(),
|
||||
SleepFrame(),
|
||||
]
|
||||
await run_test(pipeline, frames_to_send=frames_to_send)
|
||||
|
||||
# The sequence must show turn 1's end of turn and user message
|
||||
# finalization firing before turn 2's start event.
|
||||
self.assertEqual(
|
||||
events,
|
||||
[
|
||||
("started", ""), # turn 1 starts
|
||||
("stopped", None), # turn 1 end of turn
|
||||
("finalized", "Hello!"), # forced flush before turn 2 starts
|
||||
("started", ""), # turn 2 starts
|
||||
],
|
||||
)
|
||||
|
||||
user_messages = [m for m in context.get_messages() if m.get("role") == "user"]
|
||||
self.assertEqual([m["content"] for m in user_messages], ["Hello!"])
|
||||
|
||||
async def test_no_wait_for_transcript_context_order_with_assistant_response(self):
|
||||
"""End-to-end ordering test: with both aggregators, verify the user
|
||||
message lands in context *before* the assistant message, even
|
||||
though the user's transcripts arrive after the end of turn.
|
||||
|
||||
Correct ordering requires the user aggregator's deferred
|
||||
``push_aggregation`` to run before the assistant aggregator's
|
||||
``push_aggregation`` (which fires on ``LLMFullResponseEndFrame``).
|
||||
The patched-short post-turn transcript wait timer plus the sleep
|
||||
between LLM start and end make that constraint hold here.
|
||||
"""
|
||||
from unittest.mock import patch
|
||||
|
||||
from pipecat.processors.aggregators import llm_response_universal
|
||||
|
||||
# Short timer so the user flush fires while the assistant
|
||||
# response is still streaming.
|
||||
with patch.object(llm_response_universal, "DEFAULT_TTFS_P99", 0.05):
|
||||
context = LLMContext()
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(
|
||||
user_turn_strategies=UserTurnStrategies(
|
||||
stop=[
|
||||
SpeechTimeoutUserTurnStopStrategy(
|
||||
user_speech_timeout=TRANSCRIPTION_TIMEOUT
|
||||
)
|
||||
],
|
||||
),
|
||||
wait_for_transcript_to_end_user_turn=False,
|
||||
),
|
||||
)
|
||||
|
||||
pipeline = Pipeline([user_aggregator, assistant_aggregator])
|
||||
|
||||
frames_to_send = [
|
||||
VADUserStartedSpeakingFrame(),
|
||||
SleepFrame(),
|
||||
VADUserStoppedSpeakingFrame(),
|
||||
# Strategy fires turn-stopped (end of turn).
|
||||
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.05),
|
||||
# User transcripts arrive after end of turn (the realtime
|
||||
# service has finally emitted them — just one here).
|
||||
TranscriptionFrame(text="What's the weather?", user_id="", timestamp="now"),
|
||||
# Bot starts responding. Ordering correctness depends on
|
||||
# the user's post-turn transcript wait timer firing before
|
||||
# LLMFullResponseEndFrame below.
|
||||
LLMFullResponseStartFrame(),
|
||||
LLMTextFrame("It's sunny."),
|
||||
# Allow time for the user's post-turn transcript wait timer to
|
||||
# fire (flushing the user message to context) before
|
||||
# the assistant turn ends.
|
||||
SleepFrame(sleep=0.1),
|
||||
LLMFullResponseEndFrame(),
|
||||
SleepFrame(),
|
||||
]
|
||||
await run_test(pipeline, frames_to_send=frames_to_send)
|
||||
|
||||
# Context must contain the user message before the assistant message.
|
||||
roles_and_content = [(m.get("role"), m.get("content")) for m in context.get_messages()]
|
||||
self.assertEqual(
|
||||
roles_and_content,
|
||||
[
|
||||
("user", "What's the weather?"),
|
||||
("assistant", "It's sunny."),
|
||||
],
|
||||
)
|
||||
|
||||
async def test_no_wait_for_transcript_strategies_are_mutated(self):
|
||||
"""``wait_for_transcript_to_end_user_turn=False`` mutates the
|
||||
provided strategies: drops ``TranscriptionUserTurnStartStrategy``
|
||||
from start, flips ``wait_for_transcript=False`` on stop.
|
||||
"""
|
||||
from pipecat.turns.user_start import (
|
||||
TranscriptionUserTurnStartStrategy,
|
||||
VADUserTurnStartStrategy,
|
||||
)
|
||||
|
||||
context = LLMContext()
|
||||
stop = SpeechTimeoutUserTurnStopStrategy(
|
||||
user_speech_timeout=TRANSCRIPTION_TIMEOUT,
|
||||
wait_for_transcript=True, # explicitly True; bundle should flip to False
|
||||
)
|
||||
user_aggregator = LLMUserAggregator(
|
||||
context,
|
||||
params=LLMUserAggregatorParams(
|
||||
user_turn_strategies=UserTurnStrategies(
|
||||
start=[
|
||||
VADUserTurnStartStrategy(),
|
||||
TranscriptionUserTurnStartStrategy(),
|
||||
],
|
||||
stop=[stop],
|
||||
),
|
||||
wait_for_transcript_to_end_user_turn=False,
|
||||
),
|
||||
)
|
||||
|
||||
# Start strategies: TranscriptionUserTurnStartStrategy dropped.
|
||||
start_types = [type(s) for s in (user_aggregator._params.user_turn_strategies.start or [])]
|
||||
self.assertEqual(start_types, [VADUserTurnStartStrategy])
|
||||
|
||||
# Stop strategy: wait_for_transcript flipped to False.
|
||||
self.assertFalse(stop._wait_for_transcript)
|
||||
|
||||
async def test_transcript_fallback_default_mode(self):
|
||||
"""The strategy's fallback path (transcripts with no prior VAD)
|
||||
triggers turn-stopped correctly in default mode, and the user
|
||||
message lands in context with the aggregated content.
|
||||
"""
|
||||
context = LLMContext()
|
||||
user_aggregator = LLMUserAggregator(
|
||||
context,
|
||||
params=LLMUserAggregatorParams(
|
||||
user_turn_strategies=UserTurnStrategies(
|
||||
stop=[
|
||||
SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT)
|
||||
],
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
events: list[tuple[str, str]] = []
|
||||
|
||||
@user_aggregator.event_handler("on_user_turn_stopped")
|
||||
async def on_stopped(aggregator, strategy, message):
|
||||
events.append(("stopped", message.content))
|
||||
|
||||
@user_aggregator.event_handler("on_user_turn_message_finalized")
|
||||
async def on_finalized(aggregator, strategy, message):
|
||||
events.append(("finalized", message.content))
|
||||
|
||||
pipeline = Pipeline([user_aggregator])
|
||||
|
||||
# No VAD frames — fallback path: transcripts with no prior VAD
|
||||
# (just one transcript here for simplicity).
|
||||
frames_to_send = [
|
||||
TranscriptionFrame(text="Hello!", user_id="", timestamp="now"),
|
||||
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.05),
|
||||
]
|
||||
await run_test(pipeline, frames_to_send=frames_to_send)
|
||||
|
||||
# Both events fire with the aggregated content.
|
||||
self.assertEqual(events, [("stopped", "Hello!"), ("finalized", "Hello!")])
|
||||
|
||||
user_messages = [m for m in context.get_messages() if m.get("role") == "user"]
|
||||
self.assertEqual([m["content"] for m in user_messages], ["Hello!"])
|
||||
|
||||
async def test_transcript_fallback_no_wait_for_transcript_mode(self):
|
||||
"""The strategy's fallback path still gets the user message into
|
||||
context when ``wait_for_transcript_to_end_user_turn=False``,
|
||||
even though no end-of-turn event ever fires (the bundle drops
|
||||
``TranscriptionUserTurnStartStrategy``, so a transcript-only
|
||||
flow never starts a turn in the controller; the strategy's
|
||||
stop-fire is dropped by the controller too).
|
||||
|
||||
At session end the aggregated text is flushed and
|
||||
``on_user_turn_message_finalized`` fires with the content.
|
||||
``on_user_turn_stopped`` doesn't fire — when the aggregator
|
||||
runs a post-turn transcript wait, that event is reserved for
|
||||
the end-of-turn path.
|
||||
"""
|
||||
from unittest.mock import patch
|
||||
|
||||
from pipecat.processors.aggregators import llm_response_universal
|
||||
|
||||
with patch.object(llm_response_universal, "DEFAULT_TTFS_P99", TRANSCRIPTION_TIMEOUT):
|
||||
context = LLMContext()
|
||||
user_aggregator = LLMUserAggregator(
|
||||
context,
|
||||
params=LLMUserAggregatorParams(
|
||||
user_turn_strategies=UserTurnStrategies(
|
||||
stop=[
|
||||
SpeechTimeoutUserTurnStopStrategy(
|
||||
user_speech_timeout=TRANSCRIPTION_TIMEOUT
|
||||
)
|
||||
],
|
||||
),
|
||||
wait_for_transcript_to_end_user_turn=False,
|
||||
),
|
||||
)
|
||||
|
||||
events: list[tuple[str, str]] = []
|
||||
|
||||
@user_aggregator.event_handler("on_user_turn_stopped")
|
||||
async def on_stopped(aggregator, strategy, message):
|
||||
events.append(("stopped", message.content))
|
||||
|
||||
@user_aggregator.event_handler("on_user_turn_message_finalized")
|
||||
async def on_finalized(aggregator, strategy, message):
|
||||
events.append(("finalized", message.content))
|
||||
|
||||
pipeline = Pipeline([user_aggregator])
|
||||
|
||||
frames_to_send = [
|
||||
TranscriptionFrame(text="Hello!", user_id="", timestamp="now"),
|
||||
# Wait long enough that the strategy's fallback timer
|
||||
# has elapsed (its stop-fire is dropped by the
|
||||
# controller, since no turn ever started).
|
||||
SleepFrame(sleep=2 * TRANSCRIPTION_TIMEOUT + 0.1),
|
||||
]
|
||||
await run_test(pipeline, frames_to_send=frames_to_send)
|
||||
|
||||
# No end-of-turn event (no turn ever started in the controller).
|
||||
# Only message_finalized fires, with the populated transcript.
|
||||
self.assertEqual(events, [("finalized", "Hello!")])
|
||||
|
||||
user_messages = [m for m in context.get_messages() if m.get("role") == "user"]
|
||||
self.assertEqual([m["content"] for m in user_messages], ["Hello!"])
|
||||
|
||||
|
||||
class TestLLMAssistantAggregator(unittest.IsolatedAsyncioTestCase):
|
||||
async def test_empty(self):
|
||||
|
||||
Reference in New Issue
Block a user