diff --git a/changelog/3446.fixed.md b/changelog/3446.fixed.md new file mode 100644 index 000000000..64cc3cb32 --- /dev/null +++ b/changelog/3446.fixed.md @@ -0,0 +1,8 @@ +- Fixed an issue where the "bot-llm-text" RTVI event would not fire for realtime (speech-to-speech) services: + + - `AWSNovaSonicLLMService` + - `GeminiLiveLLMService` + - `OpenAIRealtimeLLMService` + - `GrokRealtimeLLMService` + + The issue was that these services weren't pushing `LLMTextFrame`s. Now they do. diff --git a/src/pipecat/services/aws/nova_sonic/llm.py b/src/pipecat/services/aws/nova_sonic/llm.py index 14ebde729..fbcbe292e 100644 --- a/src/pipecat/services/aws/nova_sonic/llm.py +++ b/src/pipecat/services/aws/nova_sonic/llm.py @@ -38,6 +38,7 @@ from pipecat.frames.frames import ( LLMContextFrame, LLMFullResponseEndFrame, LLMFullResponseStartFrame, + LLMTextFrame, StartFrame, TranscriptionFrame, TTSAudioRawFrame, @@ -1077,9 +1078,7 @@ class AWSNovaSonicLLMService(LLMService): logger.debug(f"Assistant response text added: {text}") # Report the text of the assistant response. - frame = TTSTextFrame(text, aggregated_by=AggregationType.SENTENCE) - frame.includes_inter_frame_spaces = True - await self.push_frame(frame) + await self._push_assistant_response_text_frames(text) # HACK: here we're also buffering the assistant text ourselves as a # backup rather than relying solely on the assistant context aggregator @@ -1112,11 +1111,7 @@ class AWSNovaSonicLLMService(LLMService): # TTSTextFrame would be ignored otherwise (the interruption frame # would have cleared the assistant aggregator state). await self.push_frame(LLMFullResponseStartFrame()) - frame = TTSTextFrame( - self._assistant_text_buffer, aggregated_by=AggregationType.SENTENCE - ) - frame.includes_inter_frame_spaces = True - await self.push_frame(frame) + await self._push_assistant_response_text_frames(self._assistant_text_buffer) self._may_need_repush_assistant_text = False # Report the end of the assistant response. @@ -1128,6 +1123,25 @@ class AWSNovaSonicLLMService(LLMService): # Clear out the buffered assistant text self._assistant_text_buffer = "" + async def _push_assistant_response_text_frames(self, text: str): + # In a typical "cascade" LLM + TTS setup, LLMTextFrames would not + # proceed beyond the TTS service. Therefore, since a speech-to-speech + # service like Nova Sonic combines both LLM and TTS functionality, you + # would think we wouldn't need to push LLMTextFrames at all. However, + # RTVI relies on LLMTextFrames being pushed to trigger its + # "bot-llm-text" event. So here we push an LLMTextFrame, too, but avoid + # appending it to context to avoid context message duplication. + + # Push LLMTextFrame + llm_text_frame = LLMTextFrame(text) + llm_text_frame.append_to_context = False + await self.push_frame(llm_text_frame) + + # Push TTSTextFrame + tts_text_frame = TTSTextFrame(text, aggregated_by=AggregationType.SENTENCE) + tts_text_frame.includes_inter_frame_spaces = True + await self.push_frame(tts_text_frame) + # # user transcription reporting # diff --git a/src/pipecat/services/google/gemini_live/llm.py b/src/pipecat/services/google/gemini_live/llm.py index 13b5fb18c..f61c9826c 100644 --- a/src/pipecat/services/google/gemini_live/llm.py +++ b/src/pipecat/services/google/gemini_live/llm.py @@ -1710,11 +1710,26 @@ class GeminiLiveLLMService(LLMService): await self.push_frame(TTSStartedFrame()) await self.push_frame(LLMFullResponseStartFrame()) - frame = TTSTextFrame(text=text, aggregated_by=AggregationType.SENTENCE) - # Gemini Live text already includes any necessary inter-chunk spaces - frame.includes_inter_frame_spaces = True + await self._push_output_transcription_text_frames(text) - await self.push_frame(frame) + async def _push_output_transcription_text_frames(self, text: str): + # In a typical "cascade" LLM + TTS setup, LLMTextFrames would not + # proceed beyond the TTS service. Therefore, since a speech-to-speech + # service like Gemini Live combines both LLM and TTS functionality, you + # might think we wouldn't need to push LLMTextFrames at all. However, + # RTVI relies on LLMTextFrames being pushed to trigger its + # "bot-llm-text" event. So here we push an LLMTextFrame, too, but avoid + # appending it to context to avoid context message duplication. + + # Push LLMTextFrame + llm_text_frame = LLMTextFrame(text) + llm_text_frame.append_to_context = False + await self.push_frame(llm_text_frame) + + # Push TTSTextFrame + tts_text_frame = TTSTextFrame(text, aggregated_by=AggregationType.SENTENCE) + tts_text_frame.includes_inter_frame_spaces = True + await self.push_frame(tts_text_frame) async def _handle_msg_grounding_metadata(self, message: LiveServerMessage): """Handle dedicated grounding metadata messages.""" diff --git a/src/pipecat/services/grok/realtime/llm.py b/src/pipecat/services/grok/realtime/llm.py index bcd701cd3..5e368b62b 100644 --- a/src/pipecat/services/grok/realtime/llm.py +++ b/src/pipecat/services/grok/realtime/llm.py @@ -33,6 +33,7 @@ from pipecat.frames.frames import ( LLMFullResponseStartFrame, LLMMessagesAppendFrame, LLMSetToolsFrame, + LLMTextFrame, LLMUpdateSettingsFrame, StartFrame, TranscriptionFrame, @@ -619,9 +620,26 @@ class GrokRealtimeLLMService(LLMService): async def _handle_evt_audio_transcript_delta(self, evt): """Handle audio transcript delta event.""" if evt.delta: - frame = TTSTextFrame(evt.delta, aggregated_by=AggregationType.SENTENCE) - frame.includes_inter_frame_spaces = True - await self.push_frame(frame) + await self._push_output_transcript_text_frames(evt.delta) + + async def _push_output_transcript_text_frames(self, text: str): + # In a typical "cascade" LLM + TTS setup, LLMTextFrames would not + # proceed beyond the TTS service. Therefore, since a speech-to-speech + # service like Grok Realtime combines both LLM and TTS functionality, + # you might think we wouldn't need to push LLMTextFrames at all. + # However, RTVI relies on LLMTextFrames being pushed to trigger its + # "bot-llm-text" event. So here we push an LLMTextFrame, too, but avoid + # appending it to context to avoid context message duplication. + + # Push LLMTextFrame + llm_text_frame = LLMTextFrame(text) + llm_text_frame.append_to_context = False + await self.push_frame(llm_text_frame) + + # Push TTSTextFrame + tts_text_frame = TTSTextFrame(text, aggregated_by=AggregationType.SENTENCE) + tts_text_frame.includes_inter_frame_spaces = True + await self.push_frame(tts_text_frame) async def _handle_evt_function_call_arguments_done(self, evt): """Handle function call arguments done event.""" diff --git a/src/pipecat/services/openai/realtime/llm.py b/src/pipecat/services/openai/realtime/llm.py index 7c48d7e34..11a83741e 100644 --- a/src/pipecat/services/openai/realtime/llm.py +++ b/src/pipecat/services/openai/realtime/llm.py @@ -724,10 +724,26 @@ class OpenAIRealtimeLLMService(LLMService): # We receive audio transcript deltas (as opposed to text deltas) when # the output modality is "audio" (the default) if evt.delta: - frame = TTSTextFrame(evt.delta, aggregated_by=AggregationType.SENTENCE) - # OpenAI Realtime text already includes any necessary inter-chunk spaces - frame.includes_inter_frame_spaces = True - await self.push_frame(frame) + await self._push_output_transcript_text_frames(evt.delta) + + async def _push_output_transcript_text_frames(self, text: str): + # In a typical "cascade" LLM + TTS setup, LLMTextFrames would not + # proceed beyond the TTS service. Therefore, since a speech-to-speech + # service like OpenAI Realtime combines both LLM and TTS functionality, + # you might think we wouldn't need to push LLMTextFrames at all. + # However, RTVI relies on LLMTextFrames being pushed to trigger its + # "bot-llm-text" event. So here we push an LLMTextFrame, too, but avoid + # appending it to context to avoid context message duplication. + + # Push LLMTextFrame + llm_text_frame = LLMTextFrame(text) + llm_text_frame.append_to_context = False + await self.push_frame(llm_text_frame) + + # Push TTSTextFrame + tts_text_frame = TTSTextFrame(text, aggregated_by=AggregationType.SENTENCE) + tts_text_frame.includes_inter_frame_spaces = True + await self.push_frame(tts_text_frame) async def _handle_evt_function_call_arguments_done(self, evt): """Handle completion of function call arguments.