From b59c4775da06625cec3bb864b07cd986225c177d Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Mon, 20 Apr 2026 11:55:09 -0400 Subject: [PATCH] Split user-turn stop timeout into independent speech and STT timers SpeechTimeoutUserTurnStopStrategy previously collapsed two waits into max(stt_timeout, user_speech_timeout), which over-waited for finalizing STT services and could also end the turn early in a legacy code path. Run them as independent timers instead: - user_speech_timeout: policy floor, always runs to completion. - stt_timeout: latency safety net, short-circuited by a finalized transcript since STT has signaled it has nothing more to send. The no-VAD fallback now waits only user_speech_timeout rather than max(stt_timeout, user_speech_timeout); stt_timeout is defined relative to VAD stop and has no meaning when no VAD event occurred. This shortens the fallback wait for users who set stt_timeout greater than user_speech_timeout. --- .../speech_timeout_user_turn_stop_strategy.py | 219 +++++++++++------- tests/test_user_turn_stop_strategy.py | 101 ++++++++ 2 files changed, 237 insertions(+), 83 deletions(-) diff --git a/src/pipecat/turns/user_stop/speech_timeout_user_turn_stop_strategy.py b/src/pipecat/turns/user_stop/speech_timeout_user_turn_stop_strategy.py index e37b55756..9a4060f1c 100644 --- a/src/pipecat/turns/user_stop/speech_timeout_user_turn_stop_strategy.py +++ b/src/pipecat/turns/user_stop/speech_timeout_user_turn_stop_strategy.py @@ -7,7 +7,6 @@ """Speech timeout-based user turn stop strategy.""" import asyncio -import time from loguru import logger @@ -25,20 +24,24 @@ from pipecat.utils.asyncio.task_manager import BaseTaskManager class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy): - """User turn stop strategy that uses a configurable timeout to determine if the user is done speaking. + """User turn stop strategy using two independent timers after VAD stop. - After the user stops speaking (detected by VAD), this strategy waits for a - configurable timeout before triggering the end of the user's turn. The - timeout accounts for two factors: + After the user stops speaking (detected by VAD), this strategy runs two + independent timers. The user turn stop is triggered only when both have + finished and at least one transcript has been received: - - user_speech_timeout: Time to wait for the user to potentially say more - after they pause. - - stt_timeout: The P99 time for the STT service to return a transcription - after the user stops speaking, adjusted by the VAD stop_secs. + - user_speech_timeout: Policy floor — the window in which the user may + resume speaking after a pause. Always runs to completion. + - stt_timeout: Safety net for STT latency — the P99 time for the STT + service to return a final transcript after VAD stop, adjusted by the + VAD stop_secs. Short-circuited when the STT service emits a finalized + transcript (TranscriptionFrame.finalized=True), since finalization + means STT has nothing more to send. - For services that support finalization (TranscriptionFrame.finalized=True), - the turn can be triggered immediately once the finalized transcript is - received and the user resume speaking timeout has elapsed. + Fallback: when a transcript arrives without a VAD stop event, the + strategy waits only user_speech_timeout for inactivity (rearmed on each + transcript). stt_timeout has no meaning here since it is defined + relative to VAD stop, and STT has already emitted a transcript. """ def __init__(self, *, user_speech_timeout: float = 0.6, **kwargs): @@ -59,8 +62,16 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy): self._vad_user_speaking = False self._transcript_finalized = False self._vad_stopped_time: float | None = None - self._timeout_task: asyncio.Task | None = None - self._timeout_expired: bool = False + + # VAD-driven timers and completion flags. + self._user_speech_timeout_task: asyncio.Task | None = None + self._stt_timeout_task: asyncio.Task | None = None + self._user_speech_expired: bool = False + self._stt_wait_done: bool = False + + # Fallback timer (transcript arrived without VAD stop). + self._fallback_timeout_task: asyncio.Task | None = None + self._fallback_expired: bool = False async def reset(self): """Reset the strategy to its initial state.""" @@ -69,10 +80,10 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy): self._vad_user_speaking = False self._transcript_finalized = False self._vad_stopped_time = None - self._timeout_expired = False - if self._timeout_task: - await self.task_manager.cancel_task(self._timeout_task) - self._timeout_task = None + self._user_speech_expired = False + self._stt_wait_done = False + self._fallback_expired = False + await self._cancel_all_tasks() async def setup(self, task_manager: BaseTaskManager): """Initialize the strategy with the given task manager. @@ -85,9 +96,7 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy): async def cleanup(self): """Cleanup the strategy.""" await super().cleanup() - if self._timeout_task: - await self.task_manager.cancel_task(self._timeout_task) - self._timeout_task = None + await self._cancel_all_tasks() async def process_frame(self, frame: Frame) -> ProcessFrameResult: """Process an incoming frame to update strategy state. @@ -105,8 +114,10 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy): self._stt_timeout = frame.ttfs_p99_latency self._stop_secs_warned = False elif isinstance(frame, VADUserStartedSpeakingFrame): + logger.debug(f"{self} VADUserStartedSpeakingFrame received") await self._handle_vad_user_started_speaking(frame) elif isinstance(frame, VADUserStoppedSpeakingFrame): + logger.debug(f"{self} VADUserStoppedSpeakingFrame received") await self._handle_vad_user_stopped_speaking(frame) elif isinstance(frame, TranscriptionFrame): await self._handle_transcription(frame) @@ -118,11 +129,10 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy): self._vad_user_speaking = True self._transcript_finalized = False self._vad_stopped_time = None - self._timeout_expired = False - # Cancel any pending timeout - if self._timeout_task: - await self.task_manager.cancel_task(self._timeout_task) - self._timeout_task = None + self._user_speech_expired = False + self._stt_wait_done = False + self._fallback_expired = False + await self._cancel_all_tasks() async def _handle_vad_user_stopped_speaking(self, frame: VADUserStoppedSpeakingFrame): """Handle when the VAD indicates the user has stopped speaking.""" @@ -150,59 +160,66 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy): f"user_turn_stop_timeout parameter in the LLMUserAggregatorParams." ) - # Start the timeout task - timeout = self._calculate_timeout() - self._timeout_task = self.task_manager.create_task( - self._timeout_handler(timeout), f"{self}::_timeout_handler" + # Any prior fallback timer is superseded by the VAD-driven path. + if self._fallback_timeout_task: + await self.task_manager.cancel_task(self._fallback_timeout_task) + self._fallback_timeout_task = None + self._fallback_expired = False + + # user_speech_timeout is the policy floor and always runs. + self._user_speech_timeout_task = self.task_manager.create_task( + self._user_speech_timeout_handler(self._user_speech_timeout), + f"{self}::_user_speech_timeout_handler", ) - # Make sure the task is scheduled. + + # stt_timeout is a safety net. Short-circuit it if the transcript is + # already finalized, or if the VAD stop_secs already covered it. + effective_stt_wait = max(0.0, self._stt_timeout - self._stop_secs) + if self._transcript_finalized or effective_stt_wait <= 0: + self._stt_wait_done = True + else: + self._stt_timeout_task = self.task_manager.create_task( + self._stt_timeout_handler(effective_stt_wait), + f"{self}::_stt_timeout_handler", + ) + + # Make sure the tasks are scheduled. await asyncio.sleep(0) async def _handle_transcription(self, frame: TranscriptionFrame): """Handle user transcription.""" self._text += frame.text + if frame.finalized: self._transcript_finalized = True - # For finalized transcripts, check if we can trigger early + # Short-circuit the stt_timeout safety net: STT has told us + # there's nothing more coming. + if not self._stt_wait_done: + self._stt_wait_done = True + if self._stt_timeout_task: + await self.task_manager.cancel_task(self._stt_timeout_task) + self._stt_timeout_task = None + + # If both VAD-path timers are done (or the fallback timer already + # expired), the turn was waiting on text — trigger now. + if self._fallback_expired or (self._user_speech_expired and self._stt_wait_done): await self._maybe_trigger_user_turn_stopped() - elif self._timeout_expired: - # The p99 timeout already elapsed without a transcript. Now that - # we have one, trigger the turn stop immediately. - await self.trigger_user_turn_stopped() return # Fallback: handle transcripts when no VAD stop was received. - # This handles edge cases where transcripts arrive without VAD firing. - # _vad_stopped_time is None means VAD stopped hasn't been received yet. - # In fallback mode, reset timeout on each transcript to wait for inactivity. + # Rearm the fallback timer on each transcript to wait for inactivity. if not self._vad_user_speaking and self._vad_stopped_time is None: - # Cancel existing fallback timeout if any - if self._timeout_task: - await self.task_manager.cancel_task(self._timeout_task) - timeout = self._calculate_timeout() - self._timeout_task = self.task_manager.create_task( - self._timeout_handler(timeout), f"{self}::_timeout_handler" + if self._fallback_timeout_task: + await self.task_manager.cancel_task(self._fallback_timeout_task) + self._fallback_timeout_task = self.task_manager.create_task( + self._fallback_timeout_handler(self._user_speech_timeout), + f"{self}::_fallback_timeout_handler", ) # Make sure the task is scheduled. await asyncio.sleep(0) - def _calculate_timeout(self) -> float: - """Calculate the timeout value based on current state. - - Returns: - The timeout in seconds to wait after VAD stopped speaking. - """ - # Adjust STT timeout by VAD stop_secs since that time has already elapsed - effective_stt_wait = max(0, self._stt_timeout - self._stop_secs) - - # If transcript is already finalized, we don't need to wait for STT - if self._transcript_finalized: - return self._user_speech_timeout - - return max(effective_stt_wait, self._user_speech_timeout) - - async def _timeout_handler(self, timeout: float): - """Wait for the timeout then trigger user turn stopped if conditions met. + async def _user_speech_timeout_handler(self, timeout: float): + """Wait user_speech_timeout then attempt to trigger user turn stopped. Args: timeout: The timeout in seconds to wait. @@ -212,36 +229,72 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy): except asyncio.CancelledError: return finally: - self._timeout_task = None + self._user_speech_timeout_task = None - self._timeout_expired = True + self._user_speech_expired = True + await self._maybe_trigger_user_turn_stopped() + + async def _stt_timeout_handler(self, timeout: float): + """Wait stt_timeout then attempt to trigger user turn stopped. + + Args: + timeout: The timeout in seconds to wait. + """ + try: + await asyncio.sleep(timeout) + except asyncio.CancelledError: + return + finally: + self._stt_timeout_task = None + + self._stt_wait_done = True + await self._maybe_trigger_user_turn_stopped() + + async def _fallback_timeout_handler(self, timeout: float): + """Wait user_speech_timeout of inactivity on the fallback path. + + Args: + timeout: The timeout in seconds to wait. + """ + try: + await asyncio.sleep(timeout) + except asyncio.CancelledError: + return + finally: + self._fallback_timeout_task = None + + self._fallback_expired = True await self._maybe_trigger_user_turn_stopped() async def _maybe_trigger_user_turn_stopped(self): - """Trigger user turn stopped if conditions are met. + """Trigger user turn stopped if all required conditions are met. - Conditions: - - User is not currently speaking - - We have transcription text - - Either the timeout has elapsed OR we have a finalized transcript - and user_speech_timeout has elapsed + VAD path: both user_speech_timeout and stt_timeout must have + completed (stt short-circuited by finalization counts as complete). + Fallback path: the fallback timer must have completed. + + In all cases, the user must not be currently speaking and at least + one transcript must have been received. """ if self._vad_user_speaking or not self._text: return - # For finalized transcripts, check if user_speech_timeout has elapsed. - # If elapsed, trigger user turn stopped immediately. Else, wait for user resume - # speaking timeout. - if self._transcript_finalized and self._vad_stopped_time is not None: - elapsed = time.time() - self._vad_stopped_time - if elapsed >= self._user_speech_timeout: - # Cancel any remaining timeout since we're triggering now - if self._timeout_task: - await self.task_manager.cancel_task(self._timeout_task) - self._timeout_task = None + if self._vad_stopped_time is not None: + if self._user_speech_expired and self._stt_wait_done: await self.trigger_user_turn_stopped() - return + return - # For non-finalized, only trigger if timeout task has completed - if self._timeout_task is None: + if self._fallback_expired: await self.trigger_user_turn_stopped() + + async def _cancel_all_tasks(self): + """Cancel any running timer tasks and clear the handles.""" + if self._user_speech_timeout_task: + await self.task_manager.cancel_task(self._user_speech_timeout_task) + self._user_speech_timeout_task = None + if self._stt_timeout_task: + await self.task_manager.cancel_task(self._stt_timeout_task) + self._stt_timeout_task = None + if self._fallback_timeout_task: + await self.task_manager.cancel_task(self._fallback_timeout_task) + self._fallback_timeout_task = None diff --git a/tests/test_user_turn_stop_strategy.py b/tests/test_user_turn_stop_strategy.py index ecaf5317f..9acceaf0f 100644 --- a/tests/test_user_turn_stop_strategy.py +++ b/tests/test_user_turn_stop_strategy.py @@ -529,6 +529,107 @@ class TestSpeechTimeoutUserTurnStopStrategy(unittest.IsolatedAsyncioTestCase): # Non-finalized transcript received after timeout, triggers immediately self.assertTrue(should_start) + async def test_finalized_short_circuits_stt_wait(self): + """Finalized transcript cancels the stt_timeout safety net. + + user_speech_timeout still runs to completion as a policy floor, + but stt_timeout is skipped once STT says it's done. Net effect: + the turn stops at user_speech_timeout, not stt_timeout. + """ + stt_timeout = AGGREGATION_TIMEOUT * 4 + strategy = SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=AGGREGATION_TIMEOUT) + await strategy.setup(self.task_manager) + await strategy.process_frame( + STTMetadataFrame(service_name="test", ttfs_p99_latency=stt_timeout) + ) + + should_start = None + + @strategy.event_handler("on_user_turn_stopped") + async def on_user_turn_stopped(strategy, params): + nonlocal should_start + should_start = True + + # S → E: starts user_speech_timeout (short) and stt_timeout (long). + await strategy.process_frame(VADUserStartedSpeakingFrame()) + await strategy.process_frame(VADUserStoppedSpeakingFrame()) + + # Finalized transcript arrives before user_speech_timeout elapses. + await strategy.process_frame( + TranscriptionFrame(text="Hello!", user_id="cat", timestamp="", finalized=True) + ) + # user_speech_timeout is still running, so no trigger yet. + self.assertIsNone(should_start) + + # user_speech_timeout elapses — stt_timeout was short-circuited, + # so the turn stops now rather than waiting for stt_timeout. + await asyncio.sleep(AGGREGATION_TIMEOUT + 0.1) + self.assertTrue(should_start) + + async def test_non_finalized_waits_full_stt_timeout(self): + """Non-finalized transcript does not short-circuit stt_timeout. + + When STT never signals finalization, the stt_timeout safety net + must run its full course — the turn should not stop until the + longer of the two timers has elapsed. + """ + stt_timeout = AGGREGATION_TIMEOUT * 4 + strategy = SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=AGGREGATION_TIMEOUT) + await strategy.setup(self.task_manager) + await strategy.process_frame( + STTMetadataFrame(service_name="test", ttfs_p99_latency=stt_timeout) + ) + + should_start = None + + @strategy.event_handler("on_user_turn_stopped") + async def on_user_turn_stopped(strategy, params): + nonlocal should_start + should_start = True + + # S → E: both timers start. + await strategy.process_frame(VADUserStartedSpeakingFrame()) + await strategy.process_frame(VADUserStoppedSpeakingFrame()) + + # Non-finalized transcript during the wait. + await strategy.process_frame(TranscriptionFrame(text="Hello!", user_id="cat", timestamp="")) + + # user_speech_timeout elapses but stt_timeout has not — no trigger. + await asyncio.sleep(AGGREGATION_TIMEOUT + 0.1) + self.assertIsNone(should_start) + + # Wait for the remainder of stt_timeout. + await asyncio.sleep(stt_timeout - AGGREGATION_TIMEOUT + 0.1) + self.assertTrue(should_start) + + async def test_fallback_uses_only_user_speech_timeout(self): + """Fallback path (no VAD) ignores stt_timeout and uses only user_speech_timeout. + + stt_timeout is defined as "p99 after VAD stop" — without a VAD + reference point it has no meaning. The fallback measures + inactivity since the last transcript, which is user_speech_timeout. + """ + stt_timeout = AGGREGATION_TIMEOUT * 4 + strategy = SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=AGGREGATION_TIMEOUT) + await strategy.setup(self.task_manager) + await strategy.process_frame( + STTMetadataFrame(service_name="test", ttfs_p99_latency=stt_timeout) + ) + + should_start = None + + @strategy.event_handler("on_user_turn_stopped") + async def on_user_turn_stopped(strategy, params): + nonlocal should_start + should_start = True + + # Transcript arrives without any VAD frame — fallback path. + await strategy.process_frame(TranscriptionFrame(text="Hello!", user_id="cat", timestamp="")) + + # The fallback timer is user_speech_timeout, not stt_timeout. + await asyncio.sleep(AGGREGATION_TIMEOUT + 0.1) + self.assertTrue(should_start) + async def test_reset_clears_stale_text_no_premature_stop(self): """Test that reset() clears stale text and cancels timeout, preventing premature stop.