Merge pull request #3825 from pipecat-ai/mb/llm-user-aggregator-interim-transcription

Consume InterimTranscriptionFrame and TranslationFrame in LLMUserAggregator
This commit is contained in:
Mark Backman
2026-02-25 09:06:34 -05:00
committed by GitHub
3 changed files with 45 additions and 0 deletions

1
changelog/3825.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed `InterimTranscriptionFrame` and `TranslationFrame` being unintentionally pushed downstream in `LLMUserAggregator`. They are now consumed like `TranscriptionFrame`.

View File

@@ -461,6 +461,10 @@ class LLMUserAggregator(LLMContextAggregator):
await self.push_frame(frame, direction)
elif isinstance(frame, TranscriptionFrame):
await self._handle_transcription(frame)
elif isinstance(frame, (InterimTranscriptionFrame, TranslationFrame)):
# Interim transcriptions and translations are consumed here
# and not pushed downstream, same as final TranscriptionFrame.
pass
elif isinstance(frame, LLMRunFrame):
await self._handle_llm_run(frame)
elif isinstance(frame, LLMMessagesAppendFrame):

View File

@@ -12,6 +12,7 @@ from pipecat.frames.frames import (
FunctionCallFromLLM,
FunctionCallResultFrame,
FunctionCallsStartedFrame,
InterimTranscriptionFrame,
InterruptionFrame,
LLMContextAssistantTimestampFrame,
LLMContextFrame,
@@ -26,6 +27,7 @@ from pipecat.frames.frames import (
LLMThoughtTextFrame,
StartFrame,
TranscriptionFrame,
TranslationFrame,
UserMuteStartedFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
@@ -428,6 +430,44 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase):
ignore_start=False,
)
async def test_interim_transcription_not_pushed_downstream(self):
"""InterimTranscriptionFrame should be consumed and not pushed downstream."""
context = LLMContext()
pipeline = Pipeline([LLMUserAggregator(context)])
frames_to_send = [
InterimTranscriptionFrame(text="Hel", user_id="", timestamp="now"),
InterimTranscriptionFrame(text="Hello", user_id="", timestamp="now"),
]
# The interim transcription triggers a user turn start via the default
# TranscriptionUserTurnStartStrategy, so we expect turn-related frames
# but NOT the InterimTranscriptionFrame itself.
expected_down_frames = [
UserStartedSpeakingFrame,
InterruptionFrame,
]
(down_frames, _) = await run_test(
pipeline,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
)
self.assertFalse(any(isinstance(f, InterimTranscriptionFrame) for f in down_frames))
async def test_translation_not_pushed_downstream(self):
"""TranslationFrame should be consumed and not pushed downstream."""
context = LLMContext()
pipeline = Pipeline([LLMUserAggregator(context)])
frames_to_send = [
TranslationFrame(text="Hola!", user_id="", timestamp="now", language="es"),
]
# No downstream frames expected — translations are consumed.
await run_test(
pipeline,
frames_to_send=frames_to_send,
expected_down_frames=[],
)
class TestLLMAssistantAggregator(unittest.IsolatedAsyncioTestCase):
async def test_empty(self):