diff --git a/src/pipecat/turns/user_stop/speech_timeout_user_turn_stop_strategy.py b/src/pipecat/turns/user_stop/speech_timeout_user_turn_stop_strategy.py index 66d6fa703..92b2e15b2 100644 --- a/src/pipecat/turns/user_stop/speech_timeout_user_turn_stop_strategy.py +++ b/src/pipecat/turns/user_stop/speech_timeout_user_turn_stop_strategy.py @@ -126,8 +126,26 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy): self._text += frame.text if frame.finalized: self._transcript_finalized = True - # For finalized transcripts, check if we can trigger early - await self._maybe_trigger_user_turn_stopped() + # With the transcript finalized, we no longer need to wait for + # STT latency. If a timeout is running (from VAD stop), recalculate + # to use only user_speech_timeout, potentially shortening the wait. + if self._timeout_task and self._vad_stopped_time is not None: + elapsed = time.time() - self._vad_stopped_time + remaining = self._user_speech_timeout - elapsed + if remaining > 0: + # Shorten timeout: replace STT+speech timeout with just + # remaining speech timeout since STT is done. + await self.task_manager.cancel_task(self._timeout_task) + self._timeout_task = self.task_manager.create_task( + self._timeout_handler(remaining), f"{self}::_timeout_handler" + ) + # If remaining <= 0: user_speech_timeout has elapsed, but the + # original timeout (which may include extra STT wait time) is + # still running. Let it complete naturally — this provides a + # buffer for VAD to detect any resumed speech before triggering. + elif self._timeout_task is None: + # Timeout already completed, check if we should trigger now + await self._maybe_trigger_user_turn_stopped() # Fallback: handle transcripts when no VAD stop was received. # This handles edge cases where transcripts arrive without VAD firing. @@ -178,25 +196,10 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy): Conditions: - User is not currently speaking - We have transcription text - - Either the timeout has elapsed OR we have a finalized transcript - and user_speech_timeout has elapsed + - The timeout has fully elapsed (timeout task completed) """ if self._vad_user_speaking or not self._text: return - # For finalized transcripts, check if user_speech_timeout has elapsed. - # If elapsed, trigger user turn stopped immediately. Else, wait for user resume - # speaking timeout. - if self._transcript_finalized and self._vad_stopped_time is not None: - elapsed = time.time() - self._vad_stopped_time - if elapsed >= self._user_speech_timeout: - # Cancel any remaining timeout since we're triggering now - if self._timeout_task: - await self.task_manager.cancel_task(self._timeout_task) - self._timeout_task = None - await self.trigger_user_turn_stopped() - return - - # For non-finalized, only trigger if timeout task has completed if self._timeout_task is None: await self.trigger_user_turn_stopped() diff --git a/tests/test_user_turn_stop_strategy.py b/tests/test_user_turn_stop_strategy.py index 80fb98efc..2fa4c1248 100644 --- a/tests/test_user_turn_stop_strategy.py +++ b/tests/test_user_turn_stop_strategy.py @@ -452,6 +452,72 @@ class TestSpeechTimeoutUserTurnStopStrategy(unittest.IsolatedAsyncioTestCase): await asyncio.sleep(AGGREGATION_TIMEOUT + 0.1) self.assertTrue(should_start) + async def test_finalized_transcript_does_not_trigger_early_with_slow_stt(self): + """Test that a finalized transcript arriving after user_speech_timeout + but before the full timeout does not trigger immediately. + + This reproduces a race condition where: + - STT has high latency (effective_stt_wait > user_speech_timeout) + - User pauses briefly, VAD fires stop + - The full timeout = max(effective_stt_wait, user_speech_timeout) + - The finalized transcript arrives after user_speech_timeout from VAD stop + but before the full timeout + - The user resumes speaking before the full timeout + + Previously, the early trigger path would fire because + time.time() - vad_stopped_time >= user_speech_timeout, even though the + user was about to resume speaking. + """ + user_speech_timeout = 0.1 + strategy = SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=user_speech_timeout) + await strategy.setup(self.task_manager) + + # Set high STT P99 latency so effective_stt_wait > user_speech_timeout + stt_timeout = 0.5 + stop_secs = 0.1 + await strategy.process_frame( + STTMetadataFrame(service_name="test", ttfs_p99_latency=stt_timeout) + ) + # effective_stt_wait = max(0, 0.5 - 0.1) = 0.4 + # timeout = max(0.4, 0.1) = 0.4 + + should_start = None + + @strategy.event_handler("on_user_turn_stopped") + async def on_user_turn_stopped(strategy, params): + nonlocal should_start + should_start = True + + # S - user starts speaking + await strategy.process_frame(VADUserStartedSpeakingFrame()) + + # E - user pauses briefly + await strategy.process_frame(VADUserStoppedSpeakingFrame(stop_secs=stop_secs)) + + # Wait for user_speech_timeout to elapse but NOT the full timeout + await asyncio.sleep(user_speech_timeout + 0.05) # 0.15s elapsed + self.assertIsNone(should_start) + + # Finalized transcript arrives (simulating slow STT). + # At this point, elapsed from VAD stop (~0.15s) > user_speech_timeout (0.1s). + # The old code would trigger immediately here. + await strategy.process_frame( + TranscriptionFrame(text="Hello!", user_id="cat", timestamp="", finalized=True) + ) + + # Should NOT trigger — the full timeout (0.4s) hasn't elapsed yet, + # giving the user time to resume speaking + self.assertIsNone(should_start) + + # User resumes speaking — this cancels the timeout + await strategy.process_frame(VADUserStartedSpeakingFrame()) + + # Wait well past the original timeout + await asyncio.sleep(0.5) + + # Should still not have triggered — user resumed speaking + self.assertIsNone(should_start) + async def test_sie_delay_it(self): strategy = await self._create_strategy()