Merge pull request #3466 from pipecat-ai/pk/fix-aws-nova-sonic-rtvi-bot-output

Fix realtime (speech-to-speech) services' RTVI event compatibility
This commit is contained in:
kompfner
2026-01-16 09:56:13 -05:00
committed by GitHub
5 changed files with 90 additions and 19 deletions

8
changelog/3446.fixed.md Normal file
View File

@@ -0,0 +1,8 @@
- Fixed an issue where the "bot-llm-text" RTVI event would not fire for realtime (speech-to-speech) services:
- `AWSNovaSonicLLMService`
- `GeminiLiveLLMService`
- `OpenAIRealtimeLLMService`
- `GrokRealtimeLLMService`
The issue was that these services weren't pushing `LLMTextFrame`s. Now they do.

View File

@@ -38,6 +38,7 @@ from pipecat.frames.frames import (
LLMContextFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMTextFrame,
StartFrame,
TranscriptionFrame,
TTSAudioRawFrame,
@@ -1077,9 +1078,7 @@ class AWSNovaSonicLLMService(LLMService):
logger.debug(f"Assistant response text added: {text}")
# Report the text of the assistant response.
frame = TTSTextFrame(text, aggregated_by=AggregationType.SENTENCE)
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
await self._push_assistant_response_text_frames(text)
# HACK: here we're also buffering the assistant text ourselves as a
# backup rather than relying solely on the assistant context aggregator
@@ -1112,11 +1111,7 @@ class AWSNovaSonicLLMService(LLMService):
# TTSTextFrame would be ignored otherwise (the interruption frame
# would have cleared the assistant aggregator state).
await self.push_frame(LLMFullResponseStartFrame())
frame = TTSTextFrame(
self._assistant_text_buffer, aggregated_by=AggregationType.SENTENCE
)
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
await self._push_assistant_response_text_frames(self._assistant_text_buffer)
self._may_need_repush_assistant_text = False
# Report the end of the assistant response.
@@ -1128,6 +1123,25 @@ class AWSNovaSonicLLMService(LLMService):
# Clear out the buffered assistant text
self._assistant_text_buffer = ""
async def _push_assistant_response_text_frames(self, text: str):
# In a typical "cascade" LLM + TTS setup, LLMTextFrames would not
# proceed beyond the TTS service. Therefore, since a speech-to-speech
# service like Nova Sonic combines both LLM and TTS functionality, you
# would think we wouldn't need to push LLMTextFrames at all. However,
# RTVI relies on LLMTextFrames being pushed to trigger its
# "bot-llm-text" event. So here we push an LLMTextFrame, too, but avoid
# appending it to context to avoid context message duplication.
# Push LLMTextFrame
llm_text_frame = LLMTextFrame(text)
llm_text_frame.append_to_context = False
await self.push_frame(llm_text_frame)
# Push TTSTextFrame
tts_text_frame = TTSTextFrame(text, aggregated_by=AggregationType.SENTENCE)
tts_text_frame.includes_inter_frame_spaces = True
await self.push_frame(tts_text_frame)
#
# user transcription reporting
#

View File

@@ -1710,11 +1710,26 @@ class GeminiLiveLLMService(LLMService):
await self.push_frame(TTSStartedFrame())
await self.push_frame(LLMFullResponseStartFrame())
frame = TTSTextFrame(text=text, aggregated_by=AggregationType.SENTENCE)
# Gemini Live text already includes any necessary inter-chunk spaces
frame.includes_inter_frame_spaces = True
await self._push_output_transcription_text_frames(text)
await self.push_frame(frame)
async def _push_output_transcription_text_frames(self, text: str):
# In a typical "cascade" LLM + TTS setup, LLMTextFrames would not
# proceed beyond the TTS service. Therefore, since a speech-to-speech
# service like Gemini Live combines both LLM and TTS functionality, you
# might think we wouldn't need to push LLMTextFrames at all. However,
# RTVI relies on LLMTextFrames being pushed to trigger its
# "bot-llm-text" event. So here we push an LLMTextFrame, too, but avoid
# appending it to context to avoid context message duplication.
# Push LLMTextFrame
llm_text_frame = LLMTextFrame(text)
llm_text_frame.append_to_context = False
await self.push_frame(llm_text_frame)
# Push TTSTextFrame
tts_text_frame = TTSTextFrame(text, aggregated_by=AggregationType.SENTENCE)
tts_text_frame.includes_inter_frame_spaces = True
await self.push_frame(tts_text_frame)
async def _handle_msg_grounding_metadata(self, message: LiveServerMessage):
"""Handle dedicated grounding metadata messages."""

View File

@@ -33,6 +33,7 @@ from pipecat.frames.frames import (
LLMFullResponseStartFrame,
LLMMessagesAppendFrame,
LLMSetToolsFrame,
LLMTextFrame,
LLMUpdateSettingsFrame,
StartFrame,
TranscriptionFrame,
@@ -619,9 +620,26 @@ class GrokRealtimeLLMService(LLMService):
async def _handle_evt_audio_transcript_delta(self, evt):
"""Handle audio transcript delta event."""
if evt.delta:
frame = TTSTextFrame(evt.delta, aggregated_by=AggregationType.SENTENCE)
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
await self._push_output_transcript_text_frames(evt.delta)
async def _push_output_transcript_text_frames(self, text: str):
# In a typical "cascade" LLM + TTS setup, LLMTextFrames would not
# proceed beyond the TTS service. Therefore, since a speech-to-speech
# service like Grok Realtime combines both LLM and TTS functionality,
# you might think we wouldn't need to push LLMTextFrames at all.
# However, RTVI relies on LLMTextFrames being pushed to trigger its
# "bot-llm-text" event. So here we push an LLMTextFrame, too, but avoid
# appending it to context to avoid context message duplication.
# Push LLMTextFrame
llm_text_frame = LLMTextFrame(text)
llm_text_frame.append_to_context = False
await self.push_frame(llm_text_frame)
# Push TTSTextFrame
tts_text_frame = TTSTextFrame(text, aggregated_by=AggregationType.SENTENCE)
tts_text_frame.includes_inter_frame_spaces = True
await self.push_frame(tts_text_frame)
async def _handle_evt_function_call_arguments_done(self, evt):
"""Handle function call arguments done event."""

View File

@@ -724,10 +724,26 @@ class OpenAIRealtimeLLMService(LLMService):
# We receive audio transcript deltas (as opposed to text deltas) when
# the output modality is "audio" (the default)
if evt.delta:
frame = TTSTextFrame(evt.delta, aggregated_by=AggregationType.SENTENCE)
# OpenAI Realtime text already includes any necessary inter-chunk spaces
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
await self._push_output_transcript_text_frames(evt.delta)
async def _push_output_transcript_text_frames(self, text: str):
# In a typical "cascade" LLM + TTS setup, LLMTextFrames would not
# proceed beyond the TTS service. Therefore, since a speech-to-speech
# service like OpenAI Realtime combines both LLM and TTS functionality,
# you might think we wouldn't need to push LLMTextFrames at all.
# However, RTVI relies on LLMTextFrames being pushed to trigger its
# "bot-llm-text" event. So here we push an LLMTextFrame, too, but avoid
# appending it to context to avoid context message duplication.
# Push LLMTextFrame
llm_text_frame = LLMTextFrame(text)
llm_text_frame.append_to_context = False
await self.push_frame(llm_text_frame)
# Push TTSTextFrame
tts_text_frame = TTSTextFrame(text, aggregated_by=AggregationType.SENTENCE)
tts_text_frame.includes_inter_frame_spaces = True
await self.push_frame(tts_text_frame)
async def _handle_evt_function_call_arguments_done(self, evt):
"""Handle completion of function call arguments.