Split user-turn stop timeout into independent speech and STT timers

SpeechTimeoutUserTurnStopStrategy previously collapsed two waits into
max(stt_timeout, user_speech_timeout), which over-waited for finalizing
STT services and could also end the turn early in a legacy code path.
Run them as independent timers instead:

- user_speech_timeout: policy floor, always runs to completion.
- stt_timeout: latency safety net, short-circuited by a finalized
  transcript since STT has signaled it has nothing more to send.

The no-VAD fallback now waits only user_speech_timeout rather than
max(stt_timeout, user_speech_timeout); stt_timeout is defined relative
to VAD stop and has no meaning when no VAD event occurred. This
shortens the fallback wait for users who set stt_timeout greater than
user_speech_timeout.
This commit is contained in:
Mark Backman
2026-04-20 11:55:09 -04:00
parent e8c3f73968
commit b59c4775da
2 changed files with 237 additions and 83 deletions

View File

@@ -7,7 +7,6 @@
"""Speech timeout-based user turn stop strategy."""
import asyncio
import time
from loguru import logger
@@ -25,20 +24,24 @@ from pipecat.utils.asyncio.task_manager import BaseTaskManager
class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
"""User turn stop strategy that uses a configurable timeout to determine if the user is done speaking.
"""User turn stop strategy using two independent timers after VAD stop.
After the user stops speaking (detected by VAD), this strategy waits for a
configurable timeout before triggering the end of the user's turn. The
timeout accounts for two factors:
After the user stops speaking (detected by VAD), this strategy runs two
independent timers. The user turn stop is triggered only when both have
finished and at least one transcript has been received:
- user_speech_timeout: Time to wait for the user to potentially say more
after they pause.
- stt_timeout: The P99 time for the STT service to return a transcription
after the user stops speaking, adjusted by the VAD stop_secs.
- user_speech_timeout: Policy floor — the window in which the user may
resume speaking after a pause. Always runs to completion.
- stt_timeout: Safety net for STT latency — the P99 time for the STT
service to return a final transcript after VAD stop, adjusted by the
VAD stop_secs. Short-circuited when the STT service emits a finalized
transcript (TranscriptionFrame.finalized=True), since finalization
means STT has nothing more to send.
For services that support finalization (TranscriptionFrame.finalized=True),
the turn can be triggered immediately once the finalized transcript is
received and the user resume speaking timeout has elapsed.
Fallback: when a transcript arrives without a VAD stop event, the
strategy waits only user_speech_timeout for inactivity (rearmed on each
transcript). stt_timeout has no meaning here since it is defined
relative to VAD stop, and STT has already emitted a transcript.
"""
def __init__(self, *, user_speech_timeout: float = 0.6, **kwargs):
@@ -59,8 +62,16 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
self._vad_user_speaking = False
self._transcript_finalized = False
self._vad_stopped_time: float | None = None
self._timeout_task: asyncio.Task | None = None
self._timeout_expired: bool = False
# VAD-driven timers and completion flags.
self._user_speech_timeout_task: asyncio.Task | None = None
self._stt_timeout_task: asyncio.Task | None = None
self._user_speech_expired: bool = False
self._stt_wait_done: bool = False
# Fallback timer (transcript arrived without VAD stop).
self._fallback_timeout_task: asyncio.Task | None = None
self._fallback_expired: bool = False
async def reset(self):
"""Reset the strategy to its initial state."""
@@ -69,10 +80,10 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
self._vad_user_speaking = False
self._transcript_finalized = False
self._vad_stopped_time = None
self._timeout_expired = False
if self._timeout_task:
await self.task_manager.cancel_task(self._timeout_task)
self._timeout_task = None
self._user_speech_expired = False
self._stt_wait_done = False
self._fallback_expired = False
await self._cancel_all_tasks()
async def setup(self, task_manager: BaseTaskManager):
"""Initialize the strategy with the given task manager.
@@ -85,9 +96,7 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
async def cleanup(self):
"""Cleanup the strategy."""
await super().cleanup()
if self._timeout_task:
await self.task_manager.cancel_task(self._timeout_task)
self._timeout_task = None
await self._cancel_all_tasks()
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
"""Process an incoming frame to update strategy state.
@@ -105,8 +114,10 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
self._stt_timeout = frame.ttfs_p99_latency
self._stop_secs_warned = False
elif isinstance(frame, VADUserStartedSpeakingFrame):
logger.debug(f"{self} VADUserStartedSpeakingFrame received")
await self._handle_vad_user_started_speaking(frame)
elif isinstance(frame, VADUserStoppedSpeakingFrame):
logger.debug(f"{self} VADUserStoppedSpeakingFrame received")
await self._handle_vad_user_stopped_speaking(frame)
elif isinstance(frame, TranscriptionFrame):
await self._handle_transcription(frame)
@@ -118,11 +129,10 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
self._vad_user_speaking = True
self._transcript_finalized = False
self._vad_stopped_time = None
self._timeout_expired = False
# Cancel any pending timeout
if self._timeout_task:
await self.task_manager.cancel_task(self._timeout_task)
self._timeout_task = None
self._user_speech_expired = False
self._stt_wait_done = False
self._fallback_expired = False
await self._cancel_all_tasks()
async def _handle_vad_user_stopped_speaking(self, frame: VADUserStoppedSpeakingFrame):
"""Handle when the VAD indicates the user has stopped speaking."""
@@ -150,59 +160,66 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
f"user_turn_stop_timeout parameter in the LLMUserAggregatorParams."
)
# Start the timeout task
timeout = self._calculate_timeout()
self._timeout_task = self.task_manager.create_task(
self._timeout_handler(timeout), f"{self}::_timeout_handler"
# Any prior fallback timer is superseded by the VAD-driven path.
if self._fallback_timeout_task:
await self.task_manager.cancel_task(self._fallback_timeout_task)
self._fallback_timeout_task = None
self._fallback_expired = False
# user_speech_timeout is the policy floor and always runs.
self._user_speech_timeout_task = self.task_manager.create_task(
self._user_speech_timeout_handler(self._user_speech_timeout),
f"{self}::_user_speech_timeout_handler",
)
# Make sure the task is scheduled.
# stt_timeout is a safety net. Short-circuit it if the transcript is
# already finalized, or if the VAD stop_secs already covered it.
effective_stt_wait = max(0.0, self._stt_timeout - self._stop_secs)
if self._transcript_finalized or effective_stt_wait <= 0:
self._stt_wait_done = True
else:
self._stt_timeout_task = self.task_manager.create_task(
self._stt_timeout_handler(effective_stt_wait),
f"{self}::_stt_timeout_handler",
)
# Make sure the tasks are scheduled.
await asyncio.sleep(0)
async def _handle_transcription(self, frame: TranscriptionFrame):
"""Handle user transcription."""
self._text += frame.text
if frame.finalized:
self._transcript_finalized = True
# For finalized transcripts, check if we can trigger early
# Short-circuit the stt_timeout safety net: STT has told us
# there's nothing more coming.
if not self._stt_wait_done:
self._stt_wait_done = True
if self._stt_timeout_task:
await self.task_manager.cancel_task(self._stt_timeout_task)
self._stt_timeout_task = None
# If both VAD-path timers are done (or the fallback timer already
# expired), the turn was waiting on text — trigger now.
if self._fallback_expired or (self._user_speech_expired and self._stt_wait_done):
await self._maybe_trigger_user_turn_stopped()
elif self._timeout_expired:
# The p99 timeout already elapsed without a transcript. Now that
# we have one, trigger the turn stop immediately.
await self.trigger_user_turn_stopped()
return
# Fallback: handle transcripts when no VAD stop was received.
# This handles edge cases where transcripts arrive without VAD firing.
# _vad_stopped_time is None means VAD stopped hasn't been received yet.
# In fallback mode, reset timeout on each transcript to wait for inactivity.
# Rearm the fallback timer on each transcript to wait for inactivity.
if not self._vad_user_speaking and self._vad_stopped_time is None:
# Cancel existing fallback timeout if any
if self._timeout_task:
await self.task_manager.cancel_task(self._timeout_task)
timeout = self._calculate_timeout()
self._timeout_task = self.task_manager.create_task(
self._timeout_handler(timeout), f"{self}::_timeout_handler"
if self._fallback_timeout_task:
await self.task_manager.cancel_task(self._fallback_timeout_task)
self._fallback_timeout_task = self.task_manager.create_task(
self._fallback_timeout_handler(self._user_speech_timeout),
f"{self}::_fallback_timeout_handler",
)
# Make sure the task is scheduled.
await asyncio.sleep(0)
def _calculate_timeout(self) -> float:
"""Calculate the timeout value based on current state.
Returns:
The timeout in seconds to wait after VAD stopped speaking.
"""
# Adjust STT timeout by VAD stop_secs since that time has already elapsed
effective_stt_wait = max(0, self._stt_timeout - self._stop_secs)
# If transcript is already finalized, we don't need to wait for STT
if self._transcript_finalized:
return self._user_speech_timeout
return max(effective_stt_wait, self._user_speech_timeout)
async def _timeout_handler(self, timeout: float):
"""Wait for the timeout then trigger user turn stopped if conditions met.
async def _user_speech_timeout_handler(self, timeout: float):
"""Wait user_speech_timeout then attempt to trigger user turn stopped.
Args:
timeout: The timeout in seconds to wait.
@@ -212,36 +229,72 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
except asyncio.CancelledError:
return
finally:
self._timeout_task = None
self._user_speech_timeout_task = None
self._timeout_expired = True
self._user_speech_expired = True
await self._maybe_trigger_user_turn_stopped()
async def _stt_timeout_handler(self, timeout: float):
"""Wait stt_timeout then attempt to trigger user turn stopped.
Args:
timeout: The timeout in seconds to wait.
"""
try:
await asyncio.sleep(timeout)
except asyncio.CancelledError:
return
finally:
self._stt_timeout_task = None
self._stt_wait_done = True
await self._maybe_trigger_user_turn_stopped()
async def _fallback_timeout_handler(self, timeout: float):
"""Wait user_speech_timeout of inactivity on the fallback path.
Args:
timeout: The timeout in seconds to wait.
"""
try:
await asyncio.sleep(timeout)
except asyncio.CancelledError:
return
finally:
self._fallback_timeout_task = None
self._fallback_expired = True
await self._maybe_trigger_user_turn_stopped()
async def _maybe_trigger_user_turn_stopped(self):
"""Trigger user turn stopped if conditions are met.
"""Trigger user turn stopped if all required conditions are met.
Conditions:
- User is not currently speaking
- We have transcription text
- Either the timeout has elapsed OR we have a finalized transcript
and user_speech_timeout has elapsed
VAD path: both user_speech_timeout and stt_timeout must have
completed (stt short-circuited by finalization counts as complete).
Fallback path: the fallback timer must have completed.
In all cases, the user must not be currently speaking and at least
one transcript must have been received.
"""
if self._vad_user_speaking or not self._text:
return
# For finalized transcripts, check if user_speech_timeout has elapsed.
# If elapsed, trigger user turn stopped immediately. Else, wait for user resume
# speaking timeout.
if self._transcript_finalized and self._vad_stopped_time is not None:
elapsed = time.time() - self._vad_stopped_time
if elapsed >= self._user_speech_timeout:
# Cancel any remaining timeout since we're triggering now
if self._timeout_task:
await self.task_manager.cancel_task(self._timeout_task)
self._timeout_task = None
if self._vad_stopped_time is not None:
if self._user_speech_expired and self._stt_wait_done:
await self.trigger_user_turn_stopped()
return
return
# For non-finalized, only trigger if timeout task has completed
if self._timeout_task is None:
if self._fallback_expired:
await self.trigger_user_turn_stopped()
async def _cancel_all_tasks(self):
"""Cancel any running timer tasks and clear the handles."""
if self._user_speech_timeout_task:
await self.task_manager.cancel_task(self._user_speech_timeout_task)
self._user_speech_timeout_task = None
if self._stt_timeout_task:
await self.task_manager.cancel_task(self._stt_timeout_task)
self._stt_timeout_task = None
if self._fallback_timeout_task:
await self.task_manager.cancel_task(self._fallback_timeout_task)
self._fallback_timeout_task = None

View File

@@ -529,6 +529,107 @@ class TestSpeechTimeoutUserTurnStopStrategy(unittest.IsolatedAsyncioTestCase):
# Non-finalized transcript received after timeout, triggers immediately
self.assertTrue(should_start)
async def test_finalized_short_circuits_stt_wait(self):
"""Finalized transcript cancels the stt_timeout safety net.
user_speech_timeout still runs to completion as a policy floor,
but stt_timeout is skipped once STT says it's done. Net effect:
the turn stops at user_speech_timeout, not stt_timeout.
"""
stt_timeout = AGGREGATION_TIMEOUT * 4
strategy = SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=AGGREGATION_TIMEOUT)
await strategy.setup(self.task_manager)
await strategy.process_frame(
STTMetadataFrame(service_name="test", ttfs_p99_latency=stt_timeout)
)
should_start = None
@strategy.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(strategy, params):
nonlocal should_start
should_start = True
# S → E: starts user_speech_timeout (short) and stt_timeout (long).
await strategy.process_frame(VADUserStartedSpeakingFrame())
await strategy.process_frame(VADUserStoppedSpeakingFrame())
# Finalized transcript arrives before user_speech_timeout elapses.
await strategy.process_frame(
TranscriptionFrame(text="Hello!", user_id="cat", timestamp="", finalized=True)
)
# user_speech_timeout is still running, so no trigger yet.
self.assertIsNone(should_start)
# user_speech_timeout elapses — stt_timeout was short-circuited,
# so the turn stops now rather than waiting for stt_timeout.
await asyncio.sleep(AGGREGATION_TIMEOUT + 0.1)
self.assertTrue(should_start)
async def test_non_finalized_waits_full_stt_timeout(self):
"""Non-finalized transcript does not short-circuit stt_timeout.
When STT never signals finalization, the stt_timeout safety net
must run its full course — the turn should not stop until the
longer of the two timers has elapsed.
"""
stt_timeout = AGGREGATION_TIMEOUT * 4
strategy = SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=AGGREGATION_TIMEOUT)
await strategy.setup(self.task_manager)
await strategy.process_frame(
STTMetadataFrame(service_name="test", ttfs_p99_latency=stt_timeout)
)
should_start = None
@strategy.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(strategy, params):
nonlocal should_start
should_start = True
# S → E: both timers start.
await strategy.process_frame(VADUserStartedSpeakingFrame())
await strategy.process_frame(VADUserStoppedSpeakingFrame())
# Non-finalized transcript during the wait.
await strategy.process_frame(TranscriptionFrame(text="Hello!", user_id="cat", timestamp=""))
# user_speech_timeout elapses but stt_timeout has not — no trigger.
await asyncio.sleep(AGGREGATION_TIMEOUT + 0.1)
self.assertIsNone(should_start)
# Wait for the remainder of stt_timeout.
await asyncio.sleep(stt_timeout - AGGREGATION_TIMEOUT + 0.1)
self.assertTrue(should_start)
async def test_fallback_uses_only_user_speech_timeout(self):
"""Fallback path (no VAD) ignores stt_timeout and uses only user_speech_timeout.
stt_timeout is defined as "p99 after VAD stop" — without a VAD
reference point it has no meaning. The fallback measures
inactivity since the last transcript, which is user_speech_timeout.
"""
stt_timeout = AGGREGATION_TIMEOUT * 4
strategy = SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=AGGREGATION_TIMEOUT)
await strategy.setup(self.task_manager)
await strategy.process_frame(
STTMetadataFrame(service_name="test", ttfs_p99_latency=stt_timeout)
)
should_start = None
@strategy.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(strategy, params):
nonlocal should_start
should_start = True
# Transcript arrives without any VAD frame — fallback path.
await strategy.process_frame(TranscriptionFrame(text="Hello!", user_id="cat", timestamp=""))
# The fallback timer is user_speech_timeout, not stt_timeout.
await asyncio.sleep(AGGREGATION_TIMEOUT + 0.1)
self.assertTrue(should_start)
async def test_reset_clears_stale_text_no_premature_stop(self):
"""Test that reset() clears stale text and cancels timeout, preventing premature stop.