Compare commits

...

3 Commits

Author SHA1 Message Date
James Hush
9786c4f8da Update docstring 2026-02-12 12:30:10 +08:00
James Hush
eb0ce5aea1 Add changelog for #3722 2026-02-12 12:11:25 +08:00
James Hush
67ea485566 Fix race condition in SpeechTimeoutUserTurnStopStrategy finalized transcript handling
When a finalized transcript arrived after user_speech_timeout had elapsed
from the VAD stop, the strategy would trigger the turn stop immediately
without giving the user time to resume speaking. This happened because
STT processing latency consumed the user_speech_timeout window — by the
time the transcript arrived, the elapsed time check passed even though
the user was still mid-sentence.

The fix removes the immediate early trigger path and instead lets the
original timeout (which includes the STT wait component) complete
naturally. When remaining user_speech_timeout > 0, the timeout is
shortened since STT is done. When it has elapsed, the existing timeout
continues running, providing a buffer for VAD to detect resumed speech.
2026-02-12 12:10:34 +08:00
3 changed files with 98 additions and 20 deletions

5
changelog/3722.fixed.md Normal file
View File

@@ -0,0 +1,5 @@
- Fixed a race condition in `SpeechTimeoutUserTurnStopStrategy` where a finalized
transcript arriving after `user_speech_timeout` elapsed from VAD stop would
immediately trigger a turn stop, even if the user was still speaking. STT
processing latency was consuming the `user_speech_timeout` window, leaving no
time for the user to resume speaking.

View File

@@ -34,8 +34,12 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
after the user stops speaking, adjusted by the VAD stop_secs.
For services that support finalization (TranscriptionFrame.finalized=True),
the turn can be triggered immediately once the finalized transcript is
received and the user resume speaking timeout has elapsed.
receiving the finalized transcript allows the strategy to shorten the
timeout by removing the STT wait component, since only the
`user_speech_timeout` portion is still needed. If `user_speech_timeout`
has already elapsed when the transcript arrives, the original timeout
continues running to provide a buffer for VAD to detect any resumed
speech before triggering.
"""
def __init__(self, *, user_speech_timeout: float = 0.6, **kwargs):
@@ -126,8 +130,26 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
self._text += frame.text
if frame.finalized:
self._transcript_finalized = True
# For finalized transcripts, check if we can trigger early
await self._maybe_trigger_user_turn_stopped()
# With the transcript finalized, we no longer need to wait for
# STT latency. If a timeout is running (from VAD stop), recalculate
# to use only user_speech_timeout, potentially shortening the wait.
if self._timeout_task and self._vad_stopped_time is not None:
elapsed = time.time() - self._vad_stopped_time
remaining = self._user_speech_timeout - elapsed
if remaining > 0:
# Shorten timeout: replace STT+speech timeout with just
# remaining speech timeout since STT is done.
await self.task_manager.cancel_task(self._timeout_task)
self._timeout_task = self.task_manager.create_task(
self._timeout_handler(remaining), f"{self}::_timeout_handler"
)
# If remaining <= 0: user_speech_timeout has elapsed, but the
# original timeout (which may include extra STT wait time) is
# still running. Let it complete naturally — this provides a
# buffer for VAD to detect any resumed speech before triggering.
elif self._timeout_task is None:
# Timeout already completed, check if we should trigger now
await self._maybe_trigger_user_turn_stopped()
# Fallback: handle transcripts when no VAD stop was received.
# This handles edge cases where transcripts arrive without VAD firing.
@@ -178,25 +200,10 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
Conditions:
- User is not currently speaking
- We have transcription text
- Either the timeout has elapsed OR we have a finalized transcript
and user_speech_timeout has elapsed
- The timeout has fully elapsed (timeout task completed)
"""
if self._vad_user_speaking or not self._text:
return
# For finalized transcripts, check if user_speech_timeout has elapsed.
# If elapsed, trigger user turn stopped immediately. Else, wait for user resume
# speaking timeout.
if self._transcript_finalized and self._vad_stopped_time is not None:
elapsed = time.time() - self._vad_stopped_time
if elapsed >= self._user_speech_timeout:
# Cancel any remaining timeout since we're triggering now
if self._timeout_task:
await self.task_manager.cancel_task(self._timeout_task)
self._timeout_task = None
await self.trigger_user_turn_stopped()
return
# For non-finalized, only trigger if timeout task has completed
if self._timeout_task is None:
await self.trigger_user_turn_stopped()

View File

@@ -452,6 +452,72 @@ class TestSpeechTimeoutUserTurnStopStrategy(unittest.IsolatedAsyncioTestCase):
await asyncio.sleep(AGGREGATION_TIMEOUT + 0.1)
self.assertTrue(should_start)
async def test_finalized_transcript_does_not_trigger_early_with_slow_stt(self):
"""Test that a finalized transcript arriving after user_speech_timeout
but before the full timeout does not trigger immediately.
This reproduces a race condition where:
- STT has high latency (effective_stt_wait > user_speech_timeout)
- User pauses briefly, VAD fires stop
- The full timeout = max(effective_stt_wait, user_speech_timeout)
- The finalized transcript arrives after user_speech_timeout from VAD stop
but before the full timeout
- The user resumes speaking before the full timeout
Previously, the early trigger path would fire because
time.time() - vad_stopped_time >= user_speech_timeout, even though the
user was about to resume speaking.
"""
user_speech_timeout = 0.1
strategy = SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=user_speech_timeout)
await strategy.setup(self.task_manager)
# Set high STT P99 latency so effective_stt_wait > user_speech_timeout
stt_timeout = 0.5
stop_secs = 0.1
await strategy.process_frame(
STTMetadataFrame(service_name="test", ttfs_p99_latency=stt_timeout)
)
# effective_stt_wait = max(0, 0.5 - 0.1) = 0.4
# timeout = max(0.4, 0.1) = 0.4
should_start = None
@strategy.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(strategy, params):
nonlocal should_start
should_start = True
# S - user starts speaking
await strategy.process_frame(VADUserStartedSpeakingFrame())
# E - user pauses briefly
await strategy.process_frame(VADUserStoppedSpeakingFrame(stop_secs=stop_secs))
# Wait for user_speech_timeout to elapse but NOT the full timeout
await asyncio.sleep(user_speech_timeout + 0.05) # 0.15s elapsed
self.assertIsNone(should_start)
# Finalized transcript arrives (simulating slow STT).
# At this point, elapsed from VAD stop (~0.15s) > user_speech_timeout (0.1s).
# The old code would trigger immediately here.
await strategy.process_frame(
TranscriptionFrame(text="Hello!", user_id="cat", timestamp="", finalized=True)
)
# Should NOT trigger — the full timeout (0.4s) hasn't elapsed yet,
# giving the user time to resume speaking
self.assertIsNone(should_start)
# User resumes speaking — this cancels the timeout
await strategy.process_frame(VADUserStartedSpeakingFrame())
# Wait well past the original timeout
await asyncio.sleep(0.5)
# Should still not have triggered — user resumed speaking
self.assertIsNone(should_start)
async def test_sie_delay_it(self):
strategy = await self._create_strategy()