Rename transcript-gather to post-turn transcript wait

Switch the vocabulary for the timer-driven phase that runs when
`wait_for_transcript_to_end_user_turn=False`. "Transcript gather" was
too vague to be self-documenting; "post-turn transcript wait" names
when it happens (after the user turn ends) and what it's for (waiting
for late-arriving transcripts).

Renames the internal property to `_wait_for_post_turn_transcripts`
and the supporting state/method names to match
(`_post_turn_transcript_wait_task`, `_complete_post_turn_transcript_wait`,
etc.). Updates docstrings, comments, log messages, the example
inline doc, and the test prose to use the new vocabulary consistently.
This commit is contained in:
Paul Kompfner
2026-05-18 10:51:14 -04:00
parent 666c619113
commit ea96b7aec7
3 changed files with 138 additions and 131 deletions

View File

@@ -80,10 +80,11 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
#
# - Turn strategies do not consider user transcripts, so the user
# turn ends sooner.
# - User transcripts are handled by the aggregator: a simple timer
# gives it time to gather them after the user turn ends, then
# the aggregator emits `on_user_turn_message_finalized` with the
# new user context message.
# - User transcripts are handled by the aggregator: a simple
# post-turn transcript wait gives them time to arrive after the
# user turn ends, then the aggregator emits
# `on_user_turn_message_finalized` with the new user context
# message.
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
@@ -123,8 +124,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
await task.cancel()
# `on_user_turn_stopped` fires at the end of the user turn. With
# `wait_for_transcript_to_end_user_turn=False`, the aggregator
# hasn't gathered user transcripts yet at this point, so
# `wait_for_transcript_to_end_user_turn=False`, no user
# transcripts have arrived yet at this point, so
# `message.content` is empty. Logged here to make the end-of-turn
# signal visible alongside the later finalization event.
@user_aggregator.event_handler("on_user_turn_stopped")
@@ -133,8 +134,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# `on_user_turn_message_finalized` fires when the user message has
# been finalized into the context. Here it fires later than
# `on_user_turn_stopped`, after the aggregator has gathered the
# realtime service's user transcripts.
# `on_user_turn_stopped`, after the aggregator's post-turn
# transcript wait completes.
@user_aggregator.event_handler("on_user_turn_message_finalized")
async def on_user_turn_message_finalized(
aggregator, strategy, message: UserMessageFinalizedMessage

View File

@@ -138,16 +138,16 @@ class LLMUserAggregatorParams:
- Turn strategies do not consider user transcripts, so the
user turn ends sooner. ``on_user_turn_stopped`` fires at
the end of turn with empty content (transcripts haven't
been gathered yet). To achieve this, the aggregator
drops ``TranscriptionUserTurnStartStrategy`` from start
strategies and flips ``wait_for_transcript=False`` on
any stop strategy that supports it.
the end of turn with empty content. To achieve this,
the aggregator drops ``TranscriptionUserTurnStartStrategy``
from start strategies and flips
``wait_for_transcript=False`` on any stop strategy that
supports it.
- User transcripts are handled by the aggregator: a simple
timer gives it time to gather them after the user turn
ends, then the aggregator emits a new
``on_user_turn_message_finalized`` event with the new
user context message.
post-turn transcript wait gives it time to receive them
after the user turn ends, then the aggregator emits a
new ``on_user_turn_message_finalized`` event with the
new user context message.
filter_incomplete_user_turns: [DEPRECATED] Use
``user_turn_strategies=FilterIncompleteUserTurnStrategies()``
instead. When enabled, the LLM outputs a turn-completion
@@ -286,14 +286,15 @@ class UserTurnStoppedMessage:
With ``wait_for_transcript_to_end_user_turn=True`` (the default),
the user message is finalized at the end of the turn, so
``content`` carries the aggregated transcript. With it set to
False, the aggregator is still gathering user transcripts at this
point, so ``content`` is ``None`` — subscribe to
False, the aggregator is still in its post-turn transcript wait
at this point, so ``content`` is ``None`` — subscribe to
``on_user_turn_message_finalized`` for the assembled message.
Parameters:
content: The aggregated user transcript, or ``None`` when
``wait_for_transcript_to_end_user_turn=False`` (the
aggregator is still gathering transcripts at this point).
aggregator is still in its post-turn transcript wait at
this point).
timestamp: When the user turn started.
user_id: Optional identifier for the user.
@@ -311,9 +312,9 @@ class UserMessageFinalizedMessage:
Fired when the user message has been finalized into the context.
With ``wait_for_transcript_to_end_user_turn=True`` (the default)
this coincides with ``on_user_turn_stopped``. With it set to
False, the aggregator first gathers user transcripts after the
end of turn, so this event fires later than
``on_user_turn_stopped``. ``content`` is always populated.
False, the aggregator first runs a post-turn transcript wait, so
this event fires later than ``on_user_turn_stopped``.
``content`` is always populated.
Parameters:
content: The aggregated user transcript.
@@ -582,15 +583,15 @@ class LLMUserAggregator(LLMContextAggregator):
``UserTurnStoppedMessage``. With
``wait_for_transcript_to_end_user_turn=True`` (the default),
``message.content`` carries the aggregated transcript. With it
set to False, the aggregator is still gathering user transcripts
at this point, so ``message.content`` is ``None``; subscribe to
``on_user_turn_message_finalized`` for the assembled message.
set to False, the aggregator is still in its post-turn transcript
wait at this point, so ``message.content`` is ``None``; subscribe
to ``on_user_turn_message_finalized`` for the assembled message.
- on_user_turn_message_finalized: Called when the user message
has been finalized into the context, with a
``UserMessageFinalizedMessage``. With
``wait_for_transcript_to_end_user_turn=True`` this coincides
with ``on_user_turn_stopped``; with it set to False it fires
later, after the aggregator's transcript-gather window
later, after the aggregator's post-turn transcript wait window
completes. ``message.content`` is always populated.
- on_user_turn_stop_timeout: Called when no user turn stop strategy triggers
- on_user_turn_idle: Called when the user has been idle for the configured timeout
@@ -697,18 +698,18 @@ class LLMUserAggregator(LLMContextAggregator):
# inferences fire before finalization.
self._full_user_turn_aggregation: str | None = None
# Transcript-gather state, used when the aggregator gathers
# user transcripts after the user turn ends
# (`_aggregator_gathers_transcripts == True`):
# Post-turn transcript wait state, used when the aggregator
# waits for transcripts after the user turn ends
# (`_wait_for_post_turn_transcripts == True`):
# `on_user_turn_stopped` has fired with empty content, and the
# aggregator is waiting on `_transcript_gather_task` before
# finalizing the user message into context. The gather window
# duration is taken from the last `STTMetadataFrame` seen
# (`STTMetadataFrame.ttfs_p99_latency`), falling back to
# aggregator is waiting on `_post_turn_transcript_wait_task`
# before finalizing the user message into context. The wait
# window duration is taken from the last `STTMetadataFrame`
# seen (`STTMetadataFrame.ttfs_p99_latency`), falling back to
# `DEFAULT_TTFS_P99` if no STT service has reported one.
self._gathering_for_strategy: BaseUserTurnStopStrategy | None = None
self._inference_during_gather: bool = False
self._transcript_gather_task: asyncio.Task | None = None
self._post_turn_transcript_wait_strategy: BaseUserTurnStopStrategy | None = None
self._inference_during_post_turn_transcript_wait: bool = False
self._post_turn_transcript_wait_task: asyncio.Task | None = None
self._stt_ttfs_p99_latency: float | None = None
self._user_turn_controller = UserTurnController(
@@ -754,16 +755,17 @@ class LLMUserAggregator(LLMContextAggregator):
self._vad_controller.add_event_handler("on_broadcast_frame", self._on_broadcast_frame)
@property
def _aggregator_gathers_transcripts(self) -> bool:
"""True when the aggregator gathers user transcripts after the turn ends.
def _wait_for_post_turn_transcripts(self) -> bool:
"""True when the aggregator runs a post-turn transcript wait.
Internal alias for ``wait_for_transcript_to_end_user_turn=False``.
In this mode, turn strategies don't consider user transcripts
(so the user turn ends sooner), and the aggregator runs a
simple timer after the end of turn to gather any transcripts
that arrive, then emits ``on_user_turn_message_finalized``
with the assembled user context message. Always travels with
the strategy-mutation bundle applied at init.
Inverse of the public ``wait_for_transcript_to_end_user_turn``
param: when that's False, this is True. In this mode, turn
strategies don't consider user transcripts (so the user turn
ends sooner), and the aggregator runs a simple timer after the
end of turn to receive any transcripts that arrive, then emits
``on_user_turn_message_finalized`` with the assembled user
context message. Always travels with the strategy-mutation
bundle applied at init.
"""
return not self._params.wait_for_transcript_to_end_user_turn
@@ -868,7 +870,7 @@ class LLMUserAggregator(LLMContextAggregator):
pass
elif isinstance(frame, STTMetadataFrame):
# Record the STT service's reported P99 TTFS so the
# transcript-gather timer can size itself to the real
# post-turn transcript wait timer can size itself to the real
# latency. Frame is also pushed downstream so other
# processors keep seeing it.
self._stt_ttfs_p99_latency = frame.ttfs_p99_latency
@@ -933,15 +935,17 @@ class LLMUserAggregator(LLMContextAggregator):
async def _finalize_on_session_end(self):
"""Flush any pending user message on session end.
If a transcript-gather is in flight (the aggregator hasn't
finished gathering transcripts yet), complete it now so the
user message is captured before the session shuts down.
If a post-turn transcript wait is in flight, complete it now so
the user message is captured before the session shuts down.
Otherwise, run the mode-appropriate finalize path on whatever
is currently in the buffer.
"""
if self._gathering_for_strategy is not None or self._inference_during_gather:
await self._complete_transcript_gather(on_session_end=True)
elif self._aggregator_gathers_transcripts:
if (
self._post_turn_transcript_wait_strategy is not None
or self._inference_during_post_turn_transcript_wait
):
await self._complete_post_turn_transcript_wait(on_session_end=True)
elif self._wait_for_post_turn_transcripts:
await self._finalize_user_message(on_session_end=True)
else:
await self._finalize_user_turn(on_session_end=True)
@@ -1076,17 +1080,20 @@ class LLMUserAggregator(LLMContextAggregator):
):
logger.debug(f"{self}: User started speaking (strategy: {strategy})")
# Precondition guard: if the previous turn's transcript-gather
# window is still active when the next turn starts, the
# assumption that transcripts arrive before the next turn
# has been violated. Complete the previous turn's gather now
# so its user message is finalized before this turn proceeds.
if self._gathering_for_strategy is not None or self._inference_during_gather:
# Precondition guard: if the previous turn's post-turn
# transcript wait is still active when the next turn starts,
# the assumption that transcripts arrive before the next turn
# has been violated. Complete the previous turn's wait now so
# its user message is finalized before this turn proceeds.
if (
self._post_turn_transcript_wait_strategy is not None
or self._inference_during_post_turn_transcript_wait
):
logger.warning(
f"{self}: user turn started before previous turn's transcripts "
f"were gathered; flushing previous turn now"
f"arrived; flushing previous turn now"
)
await self._complete_transcript_gather()
await self._complete_post_turn_transcript_wait()
self._user_turn_start_timestamp = time_now_iso8601()
self._full_user_turn_aggregation = None
@@ -1108,12 +1115,12 @@ class LLMUserAggregator(LLMContextAggregator):
):
logger.debug(f"{self}: User turn inference triggered (strategy: {strategy})")
if self._aggregator_gathers_transcripts:
# The aggregator is gathering transcripts after the user
# turn end. Defer push_aggregation and event emission;
# they'll run alongside user message finalization when the
# transcript-gather window completes.
self._inference_during_gather = True
if self._wait_for_post_turn_transcripts:
# The aggregator is in its post-turn transcript wait.
# Defer push_aggregation and event emission; they'll run
# alongside user message finalization when the wait window
# completes.
self._inference_during_post_turn_transcript_wait = True
return
# Push aggregation now: this writes the user message segment to
@@ -1143,51 +1150,50 @@ class LLMUserAggregator(LLMContextAggregator):
# End-of-turn side effects always fire on the strategy event,
# regardless of whether user message finalization is deferred
# to a transcript-gather window.
# to a post-turn transcript wait window.
if params.enable_user_speaking_frames:
await self.broadcast_frame(UserStoppedSpeakingFrame)
await self._user_idle_controller.process_frame(UserStoppedSpeakingFrame())
if self._aggregator_gathers_transcripts:
if self._wait_for_post_turn_transcripts:
# Fire `on_user_turn_stopped` now for the end of turn —
# content is `None` because the aggregator hasn't gathered
# transcripts yet. Start the transcript-gather timer; when
# it completes, the aggregator finalizes the user message
# and emits `on_user_turn_message_finalized`. Consumers
# wanting the assembled message subscribe to
# content is `None` because no transcripts have arrived
# yet. Start the post-turn transcript wait timer; when it
# completes, the aggregator finalizes the user message and
# emits `on_user_turn_message_finalized`. Consumers wanting
# the assembled message subscribe to
# `on_user_turn_message_finalized`.
end_of_turn_message = UserTurnStoppedMessage(
content=None, timestamp=self._user_turn_start_timestamp
)
await self._call_event_handler("on_user_turn_stopped", strategy, end_of_turn_message)
self._gathering_for_strategy = strategy
gather_timeout = (
self._post_turn_transcript_wait_strategy = strategy
wait_timeout = (
self._stt_ttfs_p99_latency
if self._stt_ttfs_p99_latency is not None
else DEFAULT_TTFS_P99
)
self._transcript_gather_task = self.create_task(
self._transcript_gather_handler(gather_timeout),
f"{self}::transcript_gather",
self._post_turn_transcript_wait_task = self.create_task(
self._post_turn_transcript_wait_handler(wait_timeout),
f"{self}::post_turn_transcript_wait",
)
return
await self._finalize_user_turn(strategy)
async def _transcript_gather_handler(self, timeout: float):
"""Transcript-gather timer.
async def _post_turn_transcript_wait_handler(self, timeout: float):
"""Post-turn transcript wait timer.
Waits ``timeout`` seconds — giving transcripts time to arrive
after the end of turn — then completes the gather and
finalizes the user message into context, with whatever
transcripts the aggregator has captured by then (possibly
nothing).
after the end of turn — then completes the wait and finalizes
the user message into context, with whatever transcripts the
aggregator has received by then (possibly none).
The simple-timer approach relies on the assumptions that
transcripts don't arrive too late and that the bot response
won't finish before this timer.
transcripts don't arrive too late and that the assistant
response won't finish before this timer.
Cancelled by reset, the next-turn precondition guard, or
session end.
@@ -1197,30 +1203,30 @@ class LLMUserAggregator(LLMContextAggregator):
except asyncio.CancelledError:
return
finally:
self._transcript_gather_task = None
self._post_turn_transcript_wait_task = None
await self._complete_transcript_gather()
await self._complete_post_turn_transcript_wait()
async def _complete_transcript_gather(self, *, on_session_end: bool = False):
"""Complete the active transcript-gather window.
async def _complete_post_turn_transcript_wait(self, *, on_session_end: bool = False):
"""Complete the active post-turn transcript wait window.
``on_user_turn_stopped`` already fired at the end of turn (with
empty content) and the aggregator has been gathering
empty content) and the aggregator has been receiving
transcripts since. This finalizes that work: flushes any
inference-triggered segment whose push was deferred during the
gather, then emits ``on_user_turn_message_finalized`` with the
assembled user context message. Called from the
transcript-gather timer (the normal path), the precondition
guard in ``_on_user_turn_started``, and the session-end paths.
wait, then emits ``on_user_turn_message_finalized`` with the
assembled user context message. Called from the post-turn
transcript wait timer (the normal path), the precondition guard
in ``_on_user_turn_started``, and the session-end paths.
"""
if self._transcript_gather_task:
await self.cancel_task(self._transcript_gather_task)
self._transcript_gather_task = None
if self._post_turn_transcript_wait_task:
await self.cancel_task(self._post_turn_transcript_wait_task)
self._post_turn_transcript_wait_task = None
gather_strategy = self._gathering_for_strategy
had_pending_inference = self._inference_during_gather
self._gathering_for_strategy = None
self._inference_during_gather = False
wait_strategy = self._post_turn_transcript_wait_strategy
had_pending_inference = self._inference_during_post_turn_transcript_wait
self._post_turn_transcript_wait_strategy = None
self._inference_during_post_turn_transcript_wait = False
if had_pending_inference:
segment = await self.push_aggregation()
@@ -1231,32 +1237,32 @@ class LLMUserAggregator(LLMContextAggregator):
)
else:
self._full_user_turn_aggregation = segment
await self._call_event_handler("on_user_turn_inference_triggered", gather_strategy)
await self._call_event_handler("on_user_turn_inference_triggered", wait_strategy)
if gather_strategy is not None or on_session_end:
if wait_strategy is not None or on_session_end:
# `on_user_turn_stopped` already fired at the end of turn;
# this is the deferred user message finalization.
await self._finalize_user_message(gather_strategy, on_session_end=on_session_end)
await self._finalize_user_message(wait_strategy, on_session_end=on_session_end)
async def _on_reset_aggregation(
self, controller: UserTurnController, strategy: BaseUserTurnStartStrategy
):
logger.debug(f"{self}: Resetting aggregation (strategy: {strategy})")
await self._cancel_transcript_gather()
await self._cancel_post_turn_transcript_wait()
await self.reset()
async def _cancel_transcript_gather(self):
"""Cancel any active transcript-gather window without finalizing.
async def _cancel_post_turn_transcript_wait(self):
"""Cancel any active post-turn transcript wait window without finalizing.
Called from reset paths (interruption, explicit reset).
"Reset" means "throw it away" — we don't flush a partial
transcript that was about to be invalidated anyway.
"""
if self._transcript_gather_task:
await self.cancel_task(self._transcript_gather_task)
self._transcript_gather_task = None
self._gathering_for_strategy = None
self._inference_during_gather = False
if self._post_turn_transcript_wait_task:
await self.cancel_task(self._post_turn_transcript_wait_task)
self._post_turn_transcript_wait_task = None
self._post_turn_transcript_wait_strategy = None
self._inference_during_post_turn_transcript_wait = False
async def _on_user_turn_stop_timeout(self, controller):
await self._call_event_handler("on_user_turn_stop_timeout")
@@ -1304,7 +1310,7 @@ class LLMUserAggregator(LLMContextAggregator):
):
"""Finalize the user turn: flush the message, emit both events.
Used in the default mode (``_aggregator_gathers_transcripts ==
Used in the default mode (``_wait_for_post_turn_transcripts ==
False``), where end of turn and user message finalization
coincide. Emits both ``on_user_turn_stopped`` and
``on_user_turn_message_finalized``.
@@ -1325,9 +1331,9 @@ class LLMUserAggregator(LLMContextAggregator):
):
"""Finalize the user message: flush to context, emit one event.
Used when the aggregator gathers transcripts after the user
turn ends (``_aggregator_gathers_transcripts == True``), where
user message finalization fires after the end of turn. Emits
Used when the aggregator runs a post-turn transcript wait
(``_wait_for_post_turn_transcripts == True``), where user
message finalization fires after the end of turn. Emits
``on_user_turn_message_finalized`` only; ``on_user_turn_stopped``
was already emitted at the end of turn.
"""

View File

@@ -767,10 +767,10 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase):
"""``wait_for_transcript_to_end_user_turn=False`` splits the lifecycle:
- ``on_user_turn_stopped`` fires at the end of turn with empty
content (the aggregator hasn't gathered transcripts yet).
content (no transcripts have arrived yet).
- Transcripts arriving after the end of turn are captured into
``_aggregation``.
- When the transcript-gather timer fires,
- When the post-turn transcript wait timer fires,
``on_user_turn_message_finalized`` fires with the populated
user context message.
"""
@@ -817,7 +817,7 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase):
# Transcripts arrive after the end of turn (just one
# here for the basic case).
TranscriptionFrame(text="Hello!", user_id="", timestamp="now"),
# Wait for the transcript-gather timer to fire.
# Wait for the post-turn transcript wait timer to fire.
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.05),
]
await run_test(pipeline, frames_to_send=frames_to_send)
@@ -830,10 +830,10 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase):
user_messages = [m for m in context.get_messages() if m.get("role") == "user"]
self.assertEqual([m["content"] for m in user_messages], ["Hello!"])
async def test_no_wait_for_transcript_uses_stt_metadata_for_gather_timer(self):
"""The transcript-gather timer prefers the STT-reported P99 TTFS
async def test_no_wait_for_transcript_uses_stt_metadata_for_wait_timer(self):
"""The post-turn transcript wait timer prefers the STT-reported P99 TTFS
over ``DEFAULT_TTFS_P99``. With a long ``DEFAULT_TTFS_P99`` and
a short STT-reported value, the gather completes by the shorter
a short STT-reported value, the wait completes by the shorter
time — if the timer fell back to ``DEFAULT_TTFS_P99``, this test
would hang.
"""
@@ -880,7 +880,7 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase):
# fires turn-stopped.
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.05),
TranscriptionFrame(text="Hello!", user_id="", timestamp="now"),
# Wait for the transcript-gather timer to fire (sized
# Wait for the post-turn transcript wait timer to fire (sized
# to the STT-reported TTFS, not DEFAULT_TTFS_P99).
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.05),
]
@@ -889,7 +889,7 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase):
self.assertEqual(events, [("stopped", None), ("finalized", "Hello!")])
async def test_no_wait_for_transcript_no_transcripts_arrive(self):
"""When no transcripts arrive, the transcript-gather timer still
"""When no transcripts arrive, the post-turn transcript wait timer still
runs — ``on_user_turn_message_finalized`` fires with empty
content and nothing is written to context.
"""
@@ -999,7 +999,7 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase):
# simplicity).
TranscriptionFrame(text="Hello!", user_id="", timestamp="now"),
SleepFrame(),
# Turn 2 starts before turn 1's transcript-gather timer
# Turn 2 starts before turn 1's post-turn transcript wait timer
# fires — precondition violation. The aggregator should
# force-flush turn 1 first.
VADUserStartedSpeakingFrame(),
@@ -1030,7 +1030,7 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase):
Correct ordering requires the user aggregator's deferred
``push_aggregation`` to run before the assistant aggregator's
``push_aggregation`` (which fires on ``LLMFullResponseEndFrame``).
The patched-short transcript-gather timer plus the sleep
The patched-short post-turn transcript wait timer plus the sleep
between LLM start and end make that constraint hold here.
"""
from unittest.mock import patch
@@ -1067,11 +1067,11 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase):
# service has finally emitted them — just one here).
TranscriptionFrame(text="What's the weather?", user_id="", timestamp="now"),
# Bot starts responding. Ordering correctness depends on
# the user's transcript-gather timer firing before
# the user's post-turn transcript wait timer firing before
# LLMFullResponseEndFrame below.
LLMFullResponseStartFrame(),
LLMTextFrame("It's sunny."),
# Allow time for the user's transcript-gather timer to
# Allow time for the user's post-turn transcript wait timer to
# fire (flushing the user message to context) before
# the assistant turn ends.
SleepFrame(sleep=0.1),
@@ -1180,7 +1180,7 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase):
At session end the aggregated text is flushed and
``on_user_turn_message_finalized`` fires with the content.
``on_user_turn_stopped`` doesn't fire — when the aggregator
gathers transcripts after the turn ends, it's reserved for
runs a post-turn transcript wait, that event is reserved for
the end-of-turn path.
"""
from unittest.mock import patch