Fix stale state in user turn stop strategies between turns

Reset stop strategies at turn start (not just turn stop) so that late
transcriptions arriving between turns do not leave stale _text that
causes premature stops on the next turn. Also cancel pending timeout
tasks in reset() for both SpeechTimeout and TurnAnalyzer strategies.
This commit is contained in:
Mark Backman
2026-03-17 11:31:08 -04:00
parent fffb16ad39
commit 55fb274d5a
5 changed files with 122 additions and 0 deletions

View File

@@ -64,6 +64,9 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy):
self._vad_user_speaking = False
self._transcript_finalized = False
self._vad_stopped_time = None
if self._timeout_task:
await self.task_manager.cancel_task(self._timeout_task)
self._timeout_task = None
async def setup(self, task_manager: BaseTaskManager):
"""Initialize the strategy with the given task manager.

View File

@@ -68,6 +68,9 @@ class TurnAnalyzerUserTurnStopStrategy(BaseUserTurnStopStrategy):
self._vad_user_speaking = False
self._vad_stopped_time = None
self._transcript_finalized = False
if self._timeout_task:
await self.task_manager.cancel_task(self._timeout_task)
self._timeout_task = None
async def setup(self, task_manager: BaseTaskManager):
"""Initialize the strategy with the given task manager.

View File

@@ -256,6 +256,10 @@ class UserTurnController(BaseObject):
for s in self._user_turn_strategies.start or []:
await s.reset()
# Reset all user turn stop strategies to start fresh for the new turn.
for s in self._user_turn_strategies.stop or []:
await s.reset()
await self._call_event_handler("on_user_turn_started", strategy, params)
async def _trigger_user_turn_stop(

View File

@@ -15,6 +15,7 @@ from pipecat.frames.frames import (
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.turns.user_start import VADUserTurnStartStrategy
from pipecat.turns.user_start.min_words_user_turn_start_strategy import (
MinWordsUserTurnStartStrategy,
)
@@ -199,6 +200,73 @@ class TestUserTurnController(unittest.IsolatedAsyncioTestCase):
self.assertTrue(should_stop)
self.assertTrue(timeout)
async def test_late_transcription_between_turns_no_premature_stop(self):
"""Test that a late transcription arriving between turns does not cause a premature stop.
Reproduces the bug from issue #4053: after turn 1 completes and reset()
clears state, a late TranscriptionFrame sets _text to stale content. On
the next turn, that stale _text gates a premature turn stop via timeout(0)
before the current turn's transcript arrives.
Uses only VADUserTurnStartStrategy (no TranscriptionUserTurnStartStrategy)
so the late transcription doesn't trigger a spurious turn start.
"""
controller = UserTurnController(
user_turn_strategies=UserTurnStrategies(
start=[VADUserTurnStartStrategy()],
stop=[SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT)],
),
user_turn_stop_timeout=USER_TURN_STOP_TIMEOUT,
)
await controller.setup(self.task_manager)
start_count = 0
stop_count = 0
@controller.event_handler("on_user_turn_started")
async def on_user_turn_started(controller, strategy, params):
nonlocal start_count
start_count += 1
@controller.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(controller, strategy, params):
nonlocal stop_count
stop_count += 1
# === Turn 1: S-T-E ===
await controller.process_frame(VADUserStartedSpeakingFrame())
self.assertEqual(start_count, 1)
await controller.process_frame(
TranscriptionFrame(text="Hello!", user_id="", timestamp="now")
)
await controller.process_frame(VADUserStoppedSpeakingFrame())
await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1)
self.assertEqual(stop_count, 1)
# === Between turns: late transcription arrives ===
# This sets _text on the stop strategy while _user_turn is False.
await controller.process_frame(
TranscriptionFrame(text="Hello!", user_id="", timestamp="now")
)
# === Turn 2: S-T-E (transcription arrives during turn) ===
# The fix resets stop strategies at turn start, clearing stale _text.
await controller.process_frame(VADUserStartedSpeakingFrame())
self.assertEqual(start_count, 2)
await controller.process_frame(
TranscriptionFrame(text="How are you?", user_id="", timestamp="now")
)
await controller.process_frame(VADUserStoppedSpeakingFrame())
# Wait for user_speech_timeout to elapse — should get turn 2 stop
await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1)
self.assertEqual(stop_count, 2)
if __name__ == "__main__":
unittest.main()

View File

@@ -493,6 +493,50 @@ class TestSpeechTimeoutUserTurnStopStrategy(unittest.IsolatedAsyncioTestCase):
# Finalized transcript received after timeout, triggers immediately
self.assertTrue(should_start)
async def test_reset_clears_stale_text_no_premature_stop(self):
"""Test that reset() clears stale text and cancels timeout, preventing premature stop.
Reproduces the bug from issue #4053: after turn 1 completes and
reset() is called, a late transcription sets _text. If reset() is
called again at turn 2 start, the stale _text should be cleared
so no premature stop occurs on VAD stop.
"""
strategy = await self._create_strategy()
stop_count = 0
@strategy.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(strategy, params):
nonlocal stop_count
stop_count += 1
# === Turn 1: S-T-E ===
await strategy.process_frame(VADUserStartedSpeakingFrame())
await strategy.process_frame(TranscriptionFrame(text="Hello!", user_id="cat", timestamp=""))
await strategy.process_frame(VADUserStoppedSpeakingFrame())
await asyncio.sleep(AGGREGATION_TIMEOUT + 0.1)
self.assertEqual(stop_count, 1)
# Reset after turn 1 (as controller would do at turn stop)
await strategy.reset()
# === Late transcription arrives between turns ===
await strategy.process_frame(TranscriptionFrame(text="Hello!", user_id="cat", timestamp=""))
# Reset at turn 2 start (the fix: controller now resets stop strategies at turn start)
await strategy.reset()
# === Turn 2: S-T-E (transcription arrives during turn) ===
await strategy.process_frame(VADUserStartedSpeakingFrame())
await strategy.process_frame(
TranscriptionFrame(text="How are you?", user_id="cat", timestamp="")
)
await strategy.process_frame(VADUserStoppedSpeakingFrame())
# Wait for timeout — should get turn 2 stop with the real transcription
await asyncio.sleep(AGGREGATION_TIMEOUT + 0.1)
self.assertEqual(stop_count, 2)
class TestExternalUserTurnStopStrategy(unittest.IsolatedAsyncioTestCase):
async def test_external_strategy(self):