From 67ea485566b6ecc7ea456d2ddb8323271d17d09c Mon Sep 17 00:00:00 2001 From: James Hush Date: Thu, 12 Feb 2026 12:10:34 +0800 Subject: [PATCH] Fix race condition in SpeechTimeoutUserTurnStopStrategy finalized transcript handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a finalized transcript arrived after user_speech_timeout had elapsed from the VAD stop, the strategy would trigger the turn stop immediately without giving the user time to resume speaking. This happened because STT processing latency consumed the user_speech_timeout window — by the time the transcript arrived, the elapsed time check passed even though the user was still mid-sentence. The fix removes the immediate early trigger path and instead lets the original timeout (which includes the STT wait component) complete naturally. When remaining user_speech_timeout > 0, the timeout is shortened since STT is done. When it has elapsed, the existing timeout continues running, providing a buffer for VAD to detect resumed speech. --- .../speech_timeout_user_turn_stop_strategy.py | 39 ++++++----- tests/test_user_turn_stop_strategy.py | 66 +++++++++++++++++++ 2 files changed, 87 insertions(+), 18 deletions(-) diff --git a/src/pipecat/turns/user_stop/speech_timeout_user_turn_stop_strategy.py b/src/pipecat/turns/user_stop/speech_timeout_user_turn_stop_strategy.py index 66d6fa703..92b2e15b2 100644 --- a/src/pipecat/turns/user_stop/speech_timeout_user_turn_stop_strategy.py +++ b/src/pipecat/turns/user_stop/speech_timeout_user_turn_stop_strategy.py @@ -126,8 +126,26 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy): self._text += frame.text if frame.finalized: self._transcript_finalized = True - # For finalized transcripts, check if we can trigger early - await self._maybe_trigger_user_turn_stopped() + # With the transcript finalized, we no longer need to wait for + # STT latency. If a timeout is running (from VAD stop), recalculate + # to use only user_speech_timeout, potentially shortening the wait. + if self._timeout_task and self._vad_stopped_time is not None: + elapsed = time.time() - self._vad_stopped_time + remaining = self._user_speech_timeout - elapsed + if remaining > 0: + # Shorten timeout: replace STT+speech timeout with just + # remaining speech timeout since STT is done. + await self.task_manager.cancel_task(self._timeout_task) + self._timeout_task = self.task_manager.create_task( + self._timeout_handler(remaining), f"{self}::_timeout_handler" + ) + # If remaining <= 0: user_speech_timeout has elapsed, but the + # original timeout (which may include extra STT wait time) is + # still running. Let it complete naturally — this provides a + # buffer for VAD to detect any resumed speech before triggering. + elif self._timeout_task is None: + # Timeout already completed, check if we should trigger now + await self._maybe_trigger_user_turn_stopped() # Fallback: handle transcripts when no VAD stop was received. # This handles edge cases where transcripts arrive without VAD firing. @@ -178,25 +196,10 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy): Conditions: - User is not currently speaking - We have transcription text - - Either the timeout has elapsed OR we have a finalized transcript - and user_speech_timeout has elapsed + - The timeout has fully elapsed (timeout task completed) """ if self._vad_user_speaking or not self._text: return - # For finalized transcripts, check if user_speech_timeout has elapsed. - # If elapsed, trigger user turn stopped immediately. Else, wait for user resume - # speaking timeout. - if self._transcript_finalized and self._vad_stopped_time is not None: - elapsed = time.time() - self._vad_stopped_time - if elapsed >= self._user_speech_timeout: - # Cancel any remaining timeout since we're triggering now - if self._timeout_task: - await self.task_manager.cancel_task(self._timeout_task) - self._timeout_task = None - await self.trigger_user_turn_stopped() - return - - # For non-finalized, only trigger if timeout task has completed if self._timeout_task is None: await self.trigger_user_turn_stopped() diff --git a/tests/test_user_turn_stop_strategy.py b/tests/test_user_turn_stop_strategy.py index 80fb98efc..2fa4c1248 100644 --- a/tests/test_user_turn_stop_strategy.py +++ b/tests/test_user_turn_stop_strategy.py @@ -452,6 +452,72 @@ class TestSpeechTimeoutUserTurnStopStrategy(unittest.IsolatedAsyncioTestCase): await asyncio.sleep(AGGREGATION_TIMEOUT + 0.1) self.assertTrue(should_start) + async def test_finalized_transcript_does_not_trigger_early_with_slow_stt(self): + """Test that a finalized transcript arriving after user_speech_timeout + but before the full timeout does not trigger immediately. + + This reproduces a race condition where: + - STT has high latency (effective_stt_wait > user_speech_timeout) + - User pauses briefly, VAD fires stop + - The full timeout = max(effective_stt_wait, user_speech_timeout) + - The finalized transcript arrives after user_speech_timeout from VAD stop + but before the full timeout + - The user resumes speaking before the full timeout + + Previously, the early trigger path would fire because + time.time() - vad_stopped_time >= user_speech_timeout, even though the + user was about to resume speaking. + """ + user_speech_timeout = 0.1 + strategy = SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=user_speech_timeout) + await strategy.setup(self.task_manager) + + # Set high STT P99 latency so effective_stt_wait > user_speech_timeout + stt_timeout = 0.5 + stop_secs = 0.1 + await strategy.process_frame( + STTMetadataFrame(service_name="test", ttfs_p99_latency=stt_timeout) + ) + # effective_stt_wait = max(0, 0.5 - 0.1) = 0.4 + # timeout = max(0.4, 0.1) = 0.4 + + should_start = None + + @strategy.event_handler("on_user_turn_stopped") + async def on_user_turn_stopped(strategy, params): + nonlocal should_start + should_start = True + + # S - user starts speaking + await strategy.process_frame(VADUserStartedSpeakingFrame()) + + # E - user pauses briefly + await strategy.process_frame(VADUserStoppedSpeakingFrame(stop_secs=stop_secs)) + + # Wait for user_speech_timeout to elapse but NOT the full timeout + await asyncio.sleep(user_speech_timeout + 0.05) # 0.15s elapsed + self.assertIsNone(should_start) + + # Finalized transcript arrives (simulating slow STT). + # At this point, elapsed from VAD stop (~0.15s) > user_speech_timeout (0.1s). + # The old code would trigger immediately here. + await strategy.process_frame( + TranscriptionFrame(text="Hello!", user_id="cat", timestamp="", finalized=True) + ) + + # Should NOT trigger — the full timeout (0.4s) hasn't elapsed yet, + # giving the user time to resume speaking + self.assertIsNone(should_start) + + # User resumes speaking — this cancels the timeout + await strategy.process_frame(VADUserStartedSpeakingFrame()) + + # Wait well past the original timeout + await asyncio.sleep(0.5) + + # Should still not have triggered — user resumed speaking + self.assertIsNone(should_start) + async def test_sie_delay_it(self): strategy = await self._create_strategy()