Merge pull request #4337 from pipecat-ai/mb/fix-speech-stop-strategy

Split user-turn stop timeout into independent speech and STT timers
This commit is contained in:
Mark Backman
2026-04-22 10:23:03 -04:00
committed by GitHub
4 changed files with 216 additions and 89 deletions

View File

@@ -0,0 +1 @@
- `SpeechTimeoutUserTurnStopStrategy` now waits only `user_speech_timeout` when a transcript arrives without a VAD stop event, rather than `max(ttfs_p99_latency, user_speech_timeout)`. If you had `ttfs_p99_latency > user_speech_timeout`, turn detection in that path is slightly faster than before.

View File

@@ -0,0 +1 @@
- If you use an STT service that emits finalized transcripts (Speechmatics, Soniox, Deepgram Flux, AssemblyAI) with `SpeechTimeoutUserTurnStopStrategy`, user turns now end as soon as `user_speech_timeout` elapses after VAD stop. Previously the strategy also waited for the STT P99 latency (`ttfs_p99_latency`) even when the transcript was already marked final. `user_speech_timeout` is still honored as a floor — STT finalization never shortens it.

View File

@@ -7,7 +7,6 @@
"""Speech timeout-based user turn stop strategy."""
import asyncio
import time
from loguru import logger
@@ -25,20 +24,25 @@ from pipecat.utils.asyncio.task_manager import BaseTaskManager
class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
"""User turn stop strategy that uses a configurable timeout to determine if the user is done speaking.
"""User turn stop strategy using two independent timers after VAD stop.
After the user stops speaking (detected by VAD), this strategy waits for a
configurable timeout before triggering the end of the user's turn. The
timeout accounts for two factors:
After the user stops speaking (detected by VAD), this strategy runs two
independent timers. The user turn stop is triggered only when both have
finished and at least one transcript has been received:
- user_speech_timeout: Time to wait for the user to potentially say more
after they pause.
- stt_timeout: The P99 time for the STT service to return a transcription
after the user stops speaking, adjusted by the VAD stop_secs.
- user_speech_timeout: Policy floor — the window in which the user may
resume speaking after a pause. Always runs to completion.
- stt_timeout: Safety net for STT latency — the P99 time for the STT
service to return a final transcript after VAD stop, adjusted by the
VAD stop_secs. Short-circuited when the STT service emits a finalized
transcript (TranscriptionFrame.finalized=True), since finalization
means STT has nothing more to send.
For services that support finalization (TranscriptionFrame.finalized=True),
the turn can be triggered immediately once the finalized transcript is
received and the user resume speaking timeout has elapsed.
Fallback: when a transcript arrives without a VAD stop event, the
user_speech_timeout timer measures inactivity since the last transcript
(rearmed on each transcript). stt_timeout has no meaning here since it
is defined relative to VAD stop, and STT has already emitted a
transcript — so the stt wait is marked done immediately.
"""
def __init__(self, *, user_speech_timeout: float = 0.6, **kwargs):
@@ -59,8 +63,11 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
self._vad_user_speaking = False
self._transcript_finalized = False
self._vad_stopped_time: float | None = None
self._timeout_task: asyncio.Task | None = None
self._timeout_expired: bool = False
self._user_speech_timeout_task: asyncio.Task | None = None
self._stt_timeout_task: asyncio.Task | None = None
self._user_speech_wait_done: bool = False
self._stt_wait_done: bool = False
async def reset(self):
"""Reset the strategy to its initial state."""
@@ -69,10 +76,9 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
self._vad_user_speaking = False
self._transcript_finalized = False
self._vad_stopped_time = None
self._timeout_expired = False
if self._timeout_task:
await self.task_manager.cancel_task(self._timeout_task)
self._timeout_task = None
self._user_speech_wait_done = False
self._stt_wait_done = False
await self._cancel_all_tasks()
async def setup(self, task_manager: BaseTaskManager):
"""Initialize the strategy with the given task manager.
@@ -85,9 +91,7 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
async def cleanup(self):
"""Cleanup the strategy."""
await super().cleanup()
if self._timeout_task:
await self.task_manager.cancel_task(self._timeout_task)
self._timeout_task = None
await self._cancel_all_tasks()
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
"""Process an incoming frame to update strategy state.
@@ -105,8 +109,10 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
self._stt_timeout = frame.ttfs_p99_latency
self._stop_secs_warned = False
elif isinstance(frame, VADUserStartedSpeakingFrame):
logger.debug(f"{self} VADUserStartedSpeakingFrame received")
await self._handle_vad_user_started_speaking(frame)
elif isinstance(frame, VADUserStoppedSpeakingFrame):
logger.debug(f"{self} VADUserStoppedSpeakingFrame received")
await self._handle_vad_user_stopped_speaking(frame)
elif isinstance(frame, TranscriptionFrame):
await self._handle_transcription(frame)
@@ -118,11 +124,9 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
self._vad_user_speaking = True
self._transcript_finalized = False
self._vad_stopped_time = None
self._timeout_expired = False
# Cancel any pending timeout
if self._timeout_task:
await self.task_manager.cancel_task(self._timeout_task)
self._timeout_task = None
self._user_speech_wait_done = False
self._stt_wait_done = False
await self._cancel_all_tasks()
async def _handle_vad_user_stopped_speaking(self, frame: VADUserStoppedSpeakingFrame):
"""Handle when the VAD indicates the user has stopped speaking."""
@@ -150,59 +154,69 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
f"user_turn_stop_timeout parameter in the LLMUserAggregatorParams."
)
# Start the timeout task
timeout = self._calculate_timeout()
self._timeout_task = self.task_manager.create_task(
self._timeout_handler(timeout), f"{self}::_timeout_handler"
)
# Make sure the task is scheduled.
# user_speech_timeout is the policy floor and always runs. A prior
# fallback-mode run of the same timer is superseded here.
await self._restart_user_speech_timer()
# stt_timeout is a safety net. Short-circuit it if the transcript is
# already finalized, or if the VAD stop_secs already covered it.
self._stt_wait_done = False
effective_stt_wait = max(0.0, self._stt_timeout - self._stop_secs)
if self._transcript_finalized or effective_stt_wait <= 0:
self._stt_wait_done = True
else:
self._stt_timeout_task = self.task_manager.create_task(
self._stt_timeout_handler(effective_stt_wait),
f"{self}::_stt_timeout_handler",
)
# Make sure the tasks are scheduled.
await asyncio.sleep(0)
async def _handle_transcription(self, frame: TranscriptionFrame):
"""Handle user transcription."""
self._text += frame.text
if frame.finalized:
self._transcript_finalized = True
# For finalized transcripts, check if we can trigger early
# Short-circuit the stt_timeout safety net: STT has told us
# there's nothing more coming.
if not self._stt_wait_done:
self._stt_wait_done = True
if self._stt_timeout_task:
await self.task_manager.cancel_task(self._stt_timeout_task)
self._stt_timeout_task = None
# If both waits are already done, the turn was waiting on text —
# trigger now.
if self._user_speech_wait_done and self._stt_wait_done:
await self._maybe_trigger_user_turn_stopped()
elif self._timeout_expired:
# The p99 timeout already elapsed without a transcript. Now that
# we have one, trigger the turn stop immediately.
await self.trigger_user_turn_stopped()
return
# Fallback: handle transcripts when no VAD stop was received.
# This handles edge cases where transcripts arrive without VAD firing.
# _vad_stopped_time is None means VAD stopped hasn't been received yet.
# In fallback mode, reset timeout on each transcript to wait for inactivity.
# Fallback: transcript arrived without a VAD stop. Measure inactivity
# since the last transcript with the user_speech_timer. stt_timeout
# has no meaning here (it's defined relative to VAD stop), so mark
# the stt wait done immediately.
if not self._vad_user_speaking and self._vad_stopped_time is None:
# Cancel existing fallback timeout if any
if self._timeout_task:
await self.task_manager.cancel_task(self._timeout_task)
timeout = self._calculate_timeout()
self._timeout_task = self.task_manager.create_task(
self._timeout_handler(timeout), f"{self}::_timeout_handler"
)
# Make sure the task is scheduled.
await asyncio.sleep(0)
self._stt_wait_done = True
await self._restart_user_speech_timer()
def _calculate_timeout(self) -> float:
"""Calculate the timeout value based on current state.
async def _restart_user_speech_timer(self):
"""Cancel any running user_speech timer and start a fresh one."""
if self._user_speech_timeout_task:
await self.task_manager.cancel_task(self._user_speech_timeout_task)
self._user_speech_timeout_task = None
self._user_speech_wait_done = False
self._user_speech_timeout_task = self.task_manager.create_task(
self._user_speech_timeout_handler(self._user_speech_timeout),
f"{self}::_user_speech_timeout_handler",
)
# Make sure the task is scheduled so it can't be cancelled before
# starting (which would leave its coroutine un-awaited).
await asyncio.sleep(0)
Returns:
The timeout in seconds to wait after VAD stopped speaking.
"""
# Adjust STT timeout by VAD stop_secs since that time has already elapsed
effective_stt_wait = max(0, self._stt_timeout - self._stop_secs)
# If transcript is already finalized, we don't need to wait for STT
if self._transcript_finalized:
return self._user_speech_timeout
return max(effective_stt_wait, self._user_speech_timeout)
async def _timeout_handler(self, timeout: float):
"""Wait for the timeout then trigger user turn stopped if conditions met.
async def _user_speech_timeout_handler(self, timeout: float):
"""Wait user_speech_timeout then attempt to trigger user turn stopped.
Args:
timeout: The timeout in seconds to wait.
@@ -212,36 +226,46 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
except asyncio.CancelledError:
return
finally:
self._timeout_task = None
self._user_speech_timeout_task = None
self._timeout_expired = True
self._user_speech_wait_done = True
await self._maybe_trigger_user_turn_stopped()
async def _stt_timeout_handler(self, timeout: float):
"""Wait stt_timeout then attempt to trigger user turn stopped.
Args:
timeout: The timeout in seconds to wait.
"""
try:
await asyncio.sleep(timeout)
except asyncio.CancelledError:
return
finally:
self._stt_timeout_task = None
self._stt_wait_done = True
await self._maybe_trigger_user_turn_stopped()
async def _maybe_trigger_user_turn_stopped(self):
"""Trigger user turn stopped if conditions are met.
"""Trigger user turn stopped if all required conditions are met.
Conditions:
- User is not currently speaking
- We have transcription text
- Either the timeout has elapsed OR we have a finalized transcript
and user_speech_timeout has elapsed
Both timers must be done (stt is marked done immediately on the
fallback path and when finalization short-circuits the safety net),
the user must not be currently speaking, and at least one transcript
must have been received.
"""
if self._vad_user_speaking or not self._text:
return
# For finalized transcripts, check if user_speech_timeout has elapsed.
# If elapsed, trigger user turn stopped immediately. Else, wait for user resume
# speaking timeout.
if self._transcript_finalized and self._vad_stopped_time is not None:
elapsed = time.time() - self._vad_stopped_time
if elapsed >= self._user_speech_timeout:
# Cancel any remaining timeout since we're triggering now
if self._timeout_task:
await self.task_manager.cancel_task(self._timeout_task)
self._timeout_task = None
await self.trigger_user_turn_stopped()
return
# For non-finalized, only trigger if timeout task has completed
if self._timeout_task is None:
if self._user_speech_wait_done and self._stt_wait_done:
await self.trigger_user_turn_stopped()
async def _cancel_all_tasks(self):
"""Cancel any running timer tasks and clear the handles."""
if self._user_speech_timeout_task:
await self.task_manager.cancel_task(self._user_speech_timeout_task)
self._user_speech_timeout_task = None
if self._stt_timeout_task:
await self.task_manager.cancel_task(self._stt_timeout_task)
self._stt_timeout_task = None

View File

@@ -529,6 +529,107 @@ class TestSpeechTimeoutUserTurnStopStrategy(unittest.IsolatedAsyncioTestCase):
# Non-finalized transcript received after timeout, triggers immediately
self.assertTrue(should_start)
async def test_finalized_short_circuits_stt_wait(self):
"""Finalized transcript cancels the stt_timeout safety net.
user_speech_timeout still runs to completion as a policy floor,
but stt_timeout is skipped once STT says it's done. Net effect:
the turn stops at user_speech_timeout, not stt_timeout.
"""
stt_timeout = AGGREGATION_TIMEOUT * 4
strategy = SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=AGGREGATION_TIMEOUT)
await strategy.setup(self.task_manager)
await strategy.process_frame(
STTMetadataFrame(service_name="test", ttfs_p99_latency=stt_timeout)
)
should_start = None
@strategy.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(strategy, params):
nonlocal should_start
should_start = True
# S → E: starts user_speech_timeout (short) and stt_timeout (long).
await strategy.process_frame(VADUserStartedSpeakingFrame())
await strategy.process_frame(VADUserStoppedSpeakingFrame())
# Finalized transcript arrives before user_speech_timeout elapses.
await strategy.process_frame(
TranscriptionFrame(text="Hello!", user_id="cat", timestamp="", finalized=True)
)
# user_speech_timeout is still running, so no trigger yet.
self.assertIsNone(should_start)
# user_speech_timeout elapses — stt_timeout was short-circuited,
# so the turn stops now rather than waiting for stt_timeout.
await asyncio.sleep(AGGREGATION_TIMEOUT + 0.1)
self.assertTrue(should_start)
async def test_non_finalized_waits_full_stt_timeout(self):
"""Non-finalized transcript does not short-circuit stt_timeout.
When STT never signals finalization, the stt_timeout safety net
must run its full course — the turn should not stop until the
longer of the two timers has elapsed.
"""
stt_timeout = AGGREGATION_TIMEOUT * 4
strategy = SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=AGGREGATION_TIMEOUT)
await strategy.setup(self.task_manager)
await strategy.process_frame(
STTMetadataFrame(service_name="test", ttfs_p99_latency=stt_timeout)
)
should_start = None
@strategy.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(strategy, params):
nonlocal should_start
should_start = True
# S → E: both timers start.
await strategy.process_frame(VADUserStartedSpeakingFrame())
await strategy.process_frame(VADUserStoppedSpeakingFrame())
# Non-finalized transcript during the wait.
await strategy.process_frame(TranscriptionFrame(text="Hello!", user_id="cat", timestamp=""))
# user_speech_timeout elapses but stt_timeout has not — no trigger.
await asyncio.sleep(AGGREGATION_TIMEOUT + 0.1)
self.assertIsNone(should_start)
# Wait for the remainder of stt_timeout.
await asyncio.sleep(stt_timeout - AGGREGATION_TIMEOUT + 0.1)
self.assertTrue(should_start)
async def test_fallback_uses_only_user_speech_timeout(self):
"""Fallback path (no VAD) ignores stt_timeout and uses only user_speech_timeout.
stt_timeout is defined as "p99 after VAD stop" — without a VAD
reference point it has no meaning. The fallback measures
inactivity since the last transcript, which is user_speech_timeout.
"""
stt_timeout = AGGREGATION_TIMEOUT * 4
strategy = SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=AGGREGATION_TIMEOUT)
await strategy.setup(self.task_manager)
await strategy.process_frame(
STTMetadataFrame(service_name="test", ttfs_p99_latency=stt_timeout)
)
should_start = None
@strategy.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(strategy, params):
nonlocal should_start
should_start = True
# Transcript arrives without any VAD frame — fallback path.
await strategy.process_frame(TranscriptionFrame(text="Hello!", user_id="cat", timestamp=""))
# The fallback timer is user_speech_timeout, not stt_timeout.
await asyncio.sleep(AGGREGATION_TIMEOUT + 0.1)
self.assertTrue(should_start)
async def test_reset_clears_stale_text_no_premature_stop(self):
"""Test that reset() clears stale text and cancels timeout, preventing premature stop.