diff --git a/src/pipecat/turns/user_stop/speech_timeout_user_turn_stop_strategy.py b/src/pipecat/turns/user_stop/speech_timeout_user_turn_stop_strategy.py index 66d6fa703..ec94d9176 100644 --- a/src/pipecat/turns/user_stop/speech_timeout_user_turn_stop_strategy.py +++ b/src/pipecat/turns/user_stop/speech_timeout_user_turn_stop_strategy.py @@ -64,6 +64,9 @@ class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy): self._vad_user_speaking = False self._transcript_finalized = False self._vad_stopped_time = None + if self._timeout_task: + await self.task_manager.cancel_task(self._timeout_task) + self._timeout_task = None async def setup(self, task_manager: BaseTaskManager): """Initialize the strategy with the given task manager. diff --git a/src/pipecat/turns/user_stop/turn_analyzer_user_turn_stop_strategy.py b/src/pipecat/turns/user_stop/turn_analyzer_user_turn_stop_strategy.py index f141a75b7..232bde223 100644 --- a/src/pipecat/turns/user_stop/turn_analyzer_user_turn_stop_strategy.py +++ b/src/pipecat/turns/user_stop/turn_analyzer_user_turn_stop_strategy.py @@ -68,6 +68,9 @@ class TurnAnalyzerUserTurnStopStrategy(BaseUserTurnStopStrategy): self._vad_user_speaking = False self._vad_stopped_time = None self._transcript_finalized = False + if self._timeout_task: + await self.task_manager.cancel_task(self._timeout_task) + self._timeout_task = None async def setup(self, task_manager: BaseTaskManager): """Initialize the strategy with the given task manager. diff --git a/src/pipecat/turns/user_turn_controller.py b/src/pipecat/turns/user_turn_controller.py index adaafd298..a064fc47b 100644 --- a/src/pipecat/turns/user_turn_controller.py +++ b/src/pipecat/turns/user_turn_controller.py @@ -256,6 +256,10 @@ class UserTurnController(BaseObject): for s in self._user_turn_strategies.start or []: await s.reset() + # Reset all user turn stop strategies to start fresh for the new turn. + for s in self._user_turn_strategies.stop or []: + await s.reset() + await self._call_event_handler("on_user_turn_started", strategy, params) async def _trigger_user_turn_stop( diff --git a/tests/test_user_turn_controller.py b/tests/test_user_turn_controller.py index 72a04a519..2883d39bd 100644 --- a/tests/test_user_turn_controller.py +++ b/tests/test_user_turn_controller.py @@ -15,6 +15,7 @@ from pipecat.frames.frames import ( VADUserStartedSpeakingFrame, VADUserStoppedSpeakingFrame, ) +from pipecat.turns.user_start import VADUserTurnStartStrategy from pipecat.turns.user_start.min_words_user_turn_start_strategy import ( MinWordsUserTurnStartStrategy, ) @@ -199,6 +200,73 @@ class TestUserTurnController(unittest.IsolatedAsyncioTestCase): self.assertTrue(should_stop) self.assertTrue(timeout) + async def test_late_transcription_between_turns_no_premature_stop(self): + """Test that a late transcription arriving between turns does not cause a premature stop. + + Reproduces the bug from issue #4053: after turn 1 completes and reset() + clears state, a late TranscriptionFrame sets _text to stale content. On + the next turn, that stale _text gates a premature turn stop via timeout(0) + before the current turn's transcript arrives. + + Uses only VADUserTurnStartStrategy (no TranscriptionUserTurnStartStrategy) + so the late transcription doesn't trigger a spurious turn start. + """ + controller = UserTurnController( + user_turn_strategies=UserTurnStrategies( + start=[VADUserTurnStartStrategy()], + stop=[SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT)], + ), + user_turn_stop_timeout=USER_TURN_STOP_TIMEOUT, + ) + + await controller.setup(self.task_manager) + + start_count = 0 + stop_count = 0 + + @controller.event_handler("on_user_turn_started") + async def on_user_turn_started(controller, strategy, params): + nonlocal start_count + start_count += 1 + + @controller.event_handler("on_user_turn_stopped") + async def on_user_turn_stopped(controller, strategy, params): + nonlocal stop_count + stop_count += 1 + + # === Turn 1: S-T-E === + await controller.process_frame(VADUserStartedSpeakingFrame()) + self.assertEqual(start_count, 1) + + await controller.process_frame( + TranscriptionFrame(text="Hello!", user_id="", timestamp="now") + ) + + await controller.process_frame(VADUserStoppedSpeakingFrame()) + await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1) + self.assertEqual(stop_count, 1) + + # === Between turns: late transcription arrives === + # This sets _text on the stop strategy while _user_turn is False. + await controller.process_frame( + TranscriptionFrame(text="Hello!", user_id="", timestamp="now") + ) + + # === Turn 2: S-T-E (transcription arrives during turn) === + # The fix resets stop strategies at turn start, clearing stale _text. + await controller.process_frame(VADUserStartedSpeakingFrame()) + self.assertEqual(start_count, 2) + + await controller.process_frame( + TranscriptionFrame(text="How are you?", user_id="", timestamp="now") + ) + + await controller.process_frame(VADUserStoppedSpeakingFrame()) + + # Wait for user_speech_timeout to elapse — should get turn 2 stop + await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1) + self.assertEqual(stop_count, 2) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_user_turn_stop_strategy.py b/tests/test_user_turn_stop_strategy.py index 80fb98efc..85f9f2752 100644 --- a/tests/test_user_turn_stop_strategy.py +++ b/tests/test_user_turn_stop_strategy.py @@ -493,6 +493,50 @@ class TestSpeechTimeoutUserTurnStopStrategy(unittest.IsolatedAsyncioTestCase): # Finalized transcript received after timeout, triggers immediately self.assertTrue(should_start) + async def test_reset_clears_stale_text_no_premature_stop(self): + """Test that reset() clears stale text and cancels timeout, preventing premature stop. + + Reproduces the bug from issue #4053: after turn 1 completes and + reset() is called, a late transcription sets _text. If reset() is + called again at turn 2 start, the stale _text should be cleared + so no premature stop occurs on VAD stop. + """ + strategy = await self._create_strategy() + + stop_count = 0 + + @strategy.event_handler("on_user_turn_stopped") + async def on_user_turn_stopped(strategy, params): + nonlocal stop_count + stop_count += 1 + + # === Turn 1: S-T-E === + await strategy.process_frame(VADUserStartedSpeakingFrame()) + await strategy.process_frame(TranscriptionFrame(text="Hello!", user_id="cat", timestamp="")) + await strategy.process_frame(VADUserStoppedSpeakingFrame()) + await asyncio.sleep(AGGREGATION_TIMEOUT + 0.1) + self.assertEqual(stop_count, 1) + + # Reset after turn 1 (as controller would do at turn stop) + await strategy.reset() + + # === Late transcription arrives between turns === + await strategy.process_frame(TranscriptionFrame(text="Hello!", user_id="cat", timestamp="")) + + # Reset at turn 2 start (the fix: controller now resets stop strategies at turn start) + await strategy.reset() + + # === Turn 2: S-T-E (transcription arrives during turn) === + await strategy.process_frame(VADUserStartedSpeakingFrame()) + await strategy.process_frame( + TranscriptionFrame(text="How are you?", user_id="cat", timestamp="") + ) + await strategy.process_frame(VADUserStoppedSpeakingFrame()) + + # Wait for timeout — should get turn 2 stop with the real transcription + await asyncio.sleep(AGGREGATION_TIMEOUT + 0.1) + self.assertEqual(stop_count, 2) + class TestExternalUserTurnStopStrategy(unittest.IsolatedAsyncioTestCase): async def test_external_strategy(self):