From 5000b040ddd5aa10eac3239b87fbbded0362075e Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Tue, 17 Mar 2026 11:31:08 -0400 Subject: [PATCH] Fix stale state in user turn stop strategies between turns Reset stop strategies at turn start (not just turn stop) so that late transcriptions arriving between turns do not leave stale _text that causes premature stops on the next turn. Also cancel pending timeout tasks in reset() for both SpeechTimeout and TurnAnalyzer strategies. --- .../speech_timeout_user_turn_stop_strategy.py | 3 + .../turn_analyzer_user_turn_stop_strategy.py | 3 + src/pipecat/turns/user_turn_controller.py | 4 ++ tests/test_user_turn_controller.py | 68 +++++++++++++++++++ tests/test_user_turn_stop_strategy.py | 44 ++++++++++++ 5 files changed, 122 insertions(+) diff --git a/src/pipecat/turns/user_stop/speech_timeout_user_turn_stop_strategy.py b/src/pipecat/turns/user_stop/speech_timeout_user_turn_stop_strategy.py index 66d6fa703..ec94d9176 100644 --- a/src/pipecat/turns/user_stop/speech_timeout_user_turn_stop_strategy.py +++ b/src/pipecat/turns/user_stop/speech_timeout_user_turn_stop_strategy.py @@ -64,6 +64,9 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy): self._vad_user_speaking = False self._transcript_finalized = False self._vad_stopped_time = None + if self._timeout_task: + await self.task_manager.cancel_task(self._timeout_task) + self._timeout_task = None async def setup(self, task_manager: BaseTaskManager): """Initialize the strategy with the given task manager. diff --git a/src/pipecat/turns/user_stop/turn_analyzer_user_turn_stop_strategy.py b/src/pipecat/turns/user_stop/turn_analyzer_user_turn_stop_strategy.py index f141a75b7..232bde223 100644 --- a/src/pipecat/turns/user_stop/turn_analyzer_user_turn_stop_strategy.py +++ b/src/pipecat/turns/user_stop/turn_analyzer_user_turn_stop_strategy.py @@ -68,6 +68,9 @@ class TurnAnalyzerUserTurnStopStrategy(BaseUserTurnStopStrategy): self._vad_user_speaking = False self._vad_stopped_time = None self._transcript_finalized = False + if self._timeout_task: + await self.task_manager.cancel_task(self._timeout_task) + self._timeout_task = None async def setup(self, task_manager: BaseTaskManager): """Initialize the strategy with the given task manager. diff --git a/src/pipecat/turns/user_turn_controller.py b/src/pipecat/turns/user_turn_controller.py index adaafd298..a064fc47b 100644 --- a/src/pipecat/turns/user_turn_controller.py +++ b/src/pipecat/turns/user_turn_controller.py @@ -256,6 +256,10 @@ class UserTurnController(BaseObject): for s in self._user_turn_strategies.start or []: await s.reset() + # Reset all user turn stop strategies to start fresh for the new turn. + for s in self._user_turn_strategies.stop or []: + await s.reset() + await self._call_event_handler("on_user_turn_started", strategy, params) async def _trigger_user_turn_stop( diff --git a/tests/test_user_turn_controller.py b/tests/test_user_turn_controller.py index 72a04a519..2883d39bd 100644 --- a/tests/test_user_turn_controller.py +++ b/tests/test_user_turn_controller.py @@ -15,6 +15,7 @@ from pipecat.frames.frames import ( VADUserStartedSpeakingFrame, VADUserStoppedSpeakingFrame, ) +from pipecat.turns.user_start import VADUserTurnStartStrategy from pipecat.turns.user_start.min_words_user_turn_start_strategy import ( MinWordsUserTurnStartStrategy, ) @@ -199,6 +200,73 @@ class TestUserTurnController(unittest.IsolatedAsyncioTestCase): self.assertTrue(should_stop) self.assertTrue(timeout) + async def test_late_transcription_between_turns_no_premature_stop(self): + """Test that a late transcription arriving between turns does not cause a premature stop. + + Reproduces the bug from issue #4053: after turn 1 completes and reset() + clears state, a late TranscriptionFrame sets _text to stale content. On + the next turn, that stale _text gates a premature turn stop via timeout(0) + before the current turn's transcript arrives. + + Uses only VADUserTurnStartStrategy (no TranscriptionUserTurnStartStrategy) + so the late transcription doesn't trigger a spurious turn start. + """ + controller = UserTurnController( + user_turn_strategies=UserTurnStrategies( + start=[VADUserTurnStartStrategy()], + stop=[SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT)], + ), + user_turn_stop_timeout=USER_TURN_STOP_TIMEOUT, + ) + + await controller.setup(self.task_manager) + + start_count = 0 + stop_count = 0 + + @controller.event_handler("on_user_turn_started") + async def on_user_turn_started(controller, strategy, params): + nonlocal start_count + start_count += 1 + + @controller.event_handler("on_user_turn_stopped") + async def on_user_turn_stopped(controller, strategy, params): + nonlocal stop_count + stop_count += 1 + + # === Turn 1: S-T-E === + await controller.process_frame(VADUserStartedSpeakingFrame()) + self.assertEqual(start_count, 1) + + await controller.process_frame( + TranscriptionFrame(text="Hello!", user_id="", timestamp="now") + ) + + await controller.process_frame(VADUserStoppedSpeakingFrame()) + await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1) + self.assertEqual(stop_count, 1) + + # === Between turns: late transcription arrives === + # This sets _text on the stop strategy while _user_turn is False. + await controller.process_frame( + TranscriptionFrame(text="Hello!", user_id="", timestamp="now") + ) + + # === Turn 2: S-T-E (transcription arrives during turn) === + # The fix resets stop strategies at turn start, clearing stale _text. + await controller.process_frame(VADUserStartedSpeakingFrame()) + self.assertEqual(start_count, 2) + + await controller.process_frame( + TranscriptionFrame(text="How are you?", user_id="", timestamp="now") + ) + + await controller.process_frame(VADUserStoppedSpeakingFrame()) + + # Wait for user_speech_timeout to elapse — should get turn 2 stop + await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1) + self.assertEqual(stop_count, 2) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_user_turn_stop_strategy.py b/tests/test_user_turn_stop_strategy.py index 80fb98efc..85f9f2752 100644 --- a/tests/test_user_turn_stop_strategy.py +++ b/tests/test_user_turn_stop_strategy.py @@ -493,6 +493,50 @@ class TestSpeechTimeoutUserTurnStopStrategy(unittest.IsolatedAsyncioTestCase): # Finalized transcript received after timeout, triggers immediately self.assertTrue(should_start) + async def test_reset_clears_stale_text_no_premature_stop(self): + """Test that reset() clears stale text and cancels timeout, preventing premature stop. + + Reproduces the bug from issue #4053: after turn 1 completes and + reset() is called, a late transcription sets _text. If reset() is + called again at turn 2 start, the stale _text should be cleared + so no premature stop occurs on VAD stop. + """ + strategy = await self._create_strategy() + + stop_count = 0 + + @strategy.event_handler("on_user_turn_stopped") + async def on_user_turn_stopped(strategy, params): + nonlocal stop_count + stop_count += 1 + + # === Turn 1: S-T-E === + await strategy.process_frame(VADUserStartedSpeakingFrame()) + await strategy.process_frame(TranscriptionFrame(text="Hello!", user_id="cat", timestamp="")) + await strategy.process_frame(VADUserStoppedSpeakingFrame()) + await asyncio.sleep(AGGREGATION_TIMEOUT + 0.1) + self.assertEqual(stop_count, 1) + + # Reset after turn 1 (as controller would do at turn stop) + await strategy.reset() + + # === Late transcription arrives between turns === + await strategy.process_frame(TranscriptionFrame(text="Hello!", user_id="cat", timestamp="")) + + # Reset at turn 2 start (the fix: controller now resets stop strategies at turn start) + await strategy.reset() + + # === Turn 2: S-T-E (transcription arrives during turn) === + await strategy.process_frame(VADUserStartedSpeakingFrame()) + await strategy.process_frame( + TranscriptionFrame(text="How are you?", user_id="cat", timestamp="") + ) + await strategy.process_frame(VADUserStoppedSpeakingFrame()) + + # Wait for timeout — should get turn 2 stop with the real transcription + await asyncio.sleep(AGGREGATION_TIMEOUT + 0.1) + self.assertEqual(stop_count, 2) + class TestExternalUserTurnStopStrategy(unittest.IsolatedAsyncioTestCase): async def test_external_strategy(self):