Compare commits
3 Commits
main
...
fix/speech
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9786c4f8da | ||
|
|
eb0ce5aea1 | ||
|
|
67ea485566 |
5
changelog/3722.fixed.md
Normal file
5
changelog/3722.fixed.md
Normal file
@@ -0,0 +1,5 @@
|
||||
- Fixed a race condition in `SpeechTimeoutUserTurnStopStrategy` where a finalized
|
||||
transcript arriving after `user_speech_timeout` elapsed from VAD stop would
|
||||
immediately trigger a turn stop, even if the user was still speaking. STT
|
||||
processing latency was consuming the `user_speech_timeout` window, leaving no
|
||||
time for the user to resume speaking.
|
||||
@@ -34,8 +34,12 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
|
||||
after the user stops speaking, adjusted by the VAD stop_secs.
|
||||
|
||||
For services that support finalization (TranscriptionFrame.finalized=True),
|
||||
the turn can be triggered immediately once the finalized transcript is
|
||||
received and the user resume speaking timeout has elapsed.
|
||||
receiving the finalized transcript allows the strategy to shorten the
|
||||
timeout by removing the STT wait component, since only the
|
||||
`user_speech_timeout` portion is still needed. If `user_speech_timeout`
|
||||
has already elapsed when the transcript arrives, the original timeout
|
||||
continues running to provide a buffer for VAD to detect any resumed
|
||||
speech before triggering.
|
||||
"""
|
||||
|
||||
def __init__(self, *, user_speech_timeout: float = 0.6, **kwargs):
|
||||
@@ -126,8 +130,26 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
|
||||
self._text += frame.text
|
||||
if frame.finalized:
|
||||
self._transcript_finalized = True
|
||||
# For finalized transcripts, check if we can trigger early
|
||||
await self._maybe_trigger_user_turn_stopped()
|
||||
# With the transcript finalized, we no longer need to wait for
|
||||
# STT latency. If a timeout is running (from VAD stop), recalculate
|
||||
# to use only user_speech_timeout, potentially shortening the wait.
|
||||
if self._timeout_task and self._vad_stopped_time is not None:
|
||||
elapsed = time.time() - self._vad_stopped_time
|
||||
remaining = self._user_speech_timeout - elapsed
|
||||
if remaining > 0:
|
||||
# Shorten timeout: replace STT+speech timeout with just
|
||||
# remaining speech timeout since STT is done.
|
||||
await self.task_manager.cancel_task(self._timeout_task)
|
||||
self._timeout_task = self.task_manager.create_task(
|
||||
self._timeout_handler(remaining), f"{self}::_timeout_handler"
|
||||
)
|
||||
# If remaining <= 0: user_speech_timeout has elapsed, but the
|
||||
# original timeout (which may include extra STT wait time) is
|
||||
# still running. Let it complete naturally — this provides a
|
||||
# buffer for VAD to detect any resumed speech before triggering.
|
||||
elif self._timeout_task is None:
|
||||
# Timeout already completed, check if we should trigger now
|
||||
await self._maybe_trigger_user_turn_stopped()
|
||||
|
||||
# Fallback: handle transcripts when no VAD stop was received.
|
||||
# This handles edge cases where transcripts arrive without VAD firing.
|
||||
@@ -178,25 +200,10 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
|
||||
Conditions:
|
||||
- User is not currently speaking
|
||||
- We have transcription text
|
||||
- Either the timeout has elapsed OR we have a finalized transcript
|
||||
and user_speech_timeout has elapsed
|
||||
- The timeout has fully elapsed (timeout task completed)
|
||||
"""
|
||||
if self._vad_user_speaking or not self._text:
|
||||
return
|
||||
|
||||
# For finalized transcripts, check if user_speech_timeout has elapsed.
|
||||
# If elapsed, trigger user turn stopped immediately. Else, wait for user resume
|
||||
# speaking timeout.
|
||||
if self._transcript_finalized and self._vad_stopped_time is not None:
|
||||
elapsed = time.time() - self._vad_stopped_time
|
||||
if elapsed >= self._user_speech_timeout:
|
||||
# Cancel any remaining timeout since we're triggering now
|
||||
if self._timeout_task:
|
||||
await self.task_manager.cancel_task(self._timeout_task)
|
||||
self._timeout_task = None
|
||||
await self.trigger_user_turn_stopped()
|
||||
return
|
||||
|
||||
# For non-finalized, only trigger if timeout task has completed
|
||||
if self._timeout_task is None:
|
||||
await self.trigger_user_turn_stopped()
|
||||
|
||||
@@ -452,6 +452,72 @@ class TestSpeechTimeoutUserTurnStopStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
await asyncio.sleep(AGGREGATION_TIMEOUT + 0.1)
|
||||
self.assertTrue(should_start)
|
||||
|
||||
async def test_finalized_transcript_does_not_trigger_early_with_slow_stt(self):
|
||||
"""Test that a finalized transcript arriving after user_speech_timeout
|
||||
but before the full timeout does not trigger immediately.
|
||||
|
||||
This reproduces a race condition where:
|
||||
- STT has high latency (effective_stt_wait > user_speech_timeout)
|
||||
- User pauses briefly, VAD fires stop
|
||||
- The full timeout = max(effective_stt_wait, user_speech_timeout)
|
||||
- The finalized transcript arrives after user_speech_timeout from VAD stop
|
||||
but before the full timeout
|
||||
- The user resumes speaking before the full timeout
|
||||
|
||||
Previously, the early trigger path would fire because
|
||||
time.time() - vad_stopped_time >= user_speech_timeout, even though the
|
||||
user was about to resume speaking.
|
||||
"""
|
||||
user_speech_timeout = 0.1
|
||||
strategy = SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=user_speech_timeout)
|
||||
await strategy.setup(self.task_manager)
|
||||
|
||||
# Set high STT P99 latency so effective_stt_wait > user_speech_timeout
|
||||
stt_timeout = 0.5
|
||||
stop_secs = 0.1
|
||||
await strategy.process_frame(
|
||||
STTMetadataFrame(service_name="test", ttfs_p99_latency=stt_timeout)
|
||||
)
|
||||
# effective_stt_wait = max(0, 0.5 - 0.1) = 0.4
|
||||
# timeout = max(0.4, 0.1) = 0.4
|
||||
|
||||
should_start = None
|
||||
|
||||
@strategy.event_handler("on_user_turn_stopped")
|
||||
async def on_user_turn_stopped(strategy, params):
|
||||
nonlocal should_start
|
||||
should_start = True
|
||||
|
||||
# S - user starts speaking
|
||||
await strategy.process_frame(VADUserStartedSpeakingFrame())
|
||||
|
||||
# E - user pauses briefly
|
||||
await strategy.process_frame(VADUserStoppedSpeakingFrame(stop_secs=stop_secs))
|
||||
|
||||
# Wait for user_speech_timeout to elapse but NOT the full timeout
|
||||
await asyncio.sleep(user_speech_timeout + 0.05) # 0.15s elapsed
|
||||
self.assertIsNone(should_start)
|
||||
|
||||
# Finalized transcript arrives (simulating slow STT).
|
||||
# At this point, elapsed from VAD stop (~0.15s) > user_speech_timeout (0.1s).
|
||||
# The old code would trigger immediately here.
|
||||
await strategy.process_frame(
|
||||
TranscriptionFrame(text="Hello!", user_id="cat", timestamp="", finalized=True)
|
||||
)
|
||||
|
||||
# Should NOT trigger — the full timeout (0.4s) hasn't elapsed yet,
|
||||
# giving the user time to resume speaking
|
||||
self.assertIsNone(should_start)
|
||||
|
||||
# User resumes speaking — this cancels the timeout
|
||||
await strategy.process_frame(VADUserStartedSpeakingFrame())
|
||||
|
||||
# Wait well past the original timeout
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
# Should still not have triggered — user resumed speaking
|
||||
self.assertIsNone(should_start)
|
||||
|
||||
async def test_sie_delay_it(self):
|
||||
strategy = await self._create_strategy()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user