diff --git a/changelog/3825.fixed.md b/changelog/3825.fixed.md new file mode 100644 index 000000000..7cd9ba508 --- /dev/null +++ b/changelog/3825.fixed.md @@ -0,0 +1 @@ +- Fixed `InterimTranscriptionFrame` and `TranslationFrame` being unintentionally pushed downstream in `LLMUserAggregator`. They are now consumed like `TranscriptionFrame`. diff --git a/src/pipecat/processors/aggregators/llm_response_universal.py b/src/pipecat/processors/aggregators/llm_response_universal.py index e5884a868..4a28b38d5 100644 --- a/src/pipecat/processors/aggregators/llm_response_universal.py +++ b/src/pipecat/processors/aggregators/llm_response_universal.py @@ -461,6 +461,10 @@ class LLMUserAggregator(LLMContextAggregator): await self.push_frame(frame, direction) elif isinstance(frame, TranscriptionFrame): await self._handle_transcription(frame) + elif isinstance(frame, (InterimTranscriptionFrame, TranslationFrame)): + # Interim transcriptions and translations are consumed here + # and not pushed downstream, same as final TranscriptionFrame. + pass elif isinstance(frame, LLMRunFrame): await self._handle_llm_run(frame) elif isinstance(frame, LLMMessagesAppendFrame): diff --git a/tests/test_context_aggregators_universal.py b/tests/test_context_aggregators_universal.py index 1bba463b0..e86905e1c 100644 --- a/tests/test_context_aggregators_universal.py +++ b/tests/test_context_aggregators_universal.py @@ -12,6 +12,7 @@ from pipecat.frames.frames import ( FunctionCallFromLLM, FunctionCallResultFrame, FunctionCallsStartedFrame, + InterimTranscriptionFrame, InterruptionFrame, LLMContextAssistantTimestampFrame, LLMContextFrame, @@ -26,6 +27,7 @@ from pipecat.frames.frames import ( LLMThoughtTextFrame, StartFrame, TranscriptionFrame, + TranslationFrame, UserMuteStartedFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame, @@ -428,6 +430,44 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase): ignore_start=False, ) + async def test_interim_transcription_not_pushed_downstream(self): + """InterimTranscriptionFrame should be consumed and not pushed downstream.""" + context = LLMContext() + pipeline = Pipeline([LLMUserAggregator(context)]) + + frames_to_send = [ + InterimTranscriptionFrame(text="Hel", user_id="", timestamp="now"), + InterimTranscriptionFrame(text="Hello", user_id="", timestamp="now"), + ] + # The interim transcription triggers a user turn start via the default + # TranscriptionUserTurnStartStrategy, so we expect turn-related frames + # but NOT the InterimTranscriptionFrame itself. + expected_down_frames = [ + UserStartedSpeakingFrame, + InterruptionFrame, + ] + (down_frames, _) = await run_test( + pipeline, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + ) + self.assertFalse(any(isinstance(f, InterimTranscriptionFrame) for f in down_frames)) + + async def test_translation_not_pushed_downstream(self): + """TranslationFrame should be consumed and not pushed downstream.""" + context = LLMContext() + pipeline = Pipeline([LLMUserAggregator(context)]) + + frames_to_send = [ + TranslationFrame(text="Hola!", user_id="", timestamp="now", language="es"), + ] + # No downstream frames expected — translations are consumed. + await run_test( + pipeline, + frames_to_send=frames_to_send, + expected_down_frames=[], + ) + class TestLLMAssistantAggregator(unittest.IsolatedAsyncioTestCase): async def test_empty(self):