From beb4e86b5f6dab95fe702cd6295bcd0f90aea291 Mon Sep 17 00:00:00 2001 From: filipi87 Date: Wed, 11 Feb 2026 16:17:28 -0300 Subject: [PATCH] Fixing an issue in RTVI where we were sometimes receiving bot output messages before the bot started speaking. --- src/pipecat/processors/frameworks/rtvi.py | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index d85cae11f..c1497b40b 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -1116,6 +1116,10 @@ class RTVIObserver(BaseObserver): self._last_user_audio_level = 0 self._last_bot_audio_level = 0 + # Track bot speaking state for queuing aggregated text frames + self._bot_is_speaking = False + self._queued_aggregated_text_frames: List[AggregatedTextFrame] = [] + if self._params.system_logs_enabled: self._system_logger_id = logger.add(self._logger_sink) @@ -1384,17 +1388,30 @@ class RTVIObserver(BaseObserver): async def _handle_bot_speaking(self, frame: Frame): """Handle bot speaking event frames.""" - message = None if isinstance(frame, BotStartedSpeakingFrame): message = RTVIBotStartedSpeakingMessage() + await self.send_rtvi_message(message) + # Flush any queued aggregated text frames + for queued_frame in self._queued_aggregated_text_frames: + await self._send_aggregated_llm_text(queued_frame) + self._queued_aggregated_text_frames.clear() + self._bot_is_speaking = True elif isinstance(frame, BotStoppedSpeakingFrame): message = RTVIBotStoppedSpeakingMessage() - - if message: await self.send_rtvi_message(message) + self._bot_is_speaking = False async def _handle_aggregated_llm_text(self, frame: AggregatedTextFrame): """Handle aggregated LLM text output frames.""" + if self._bot_is_speaking: + # Bot has already started speaking, send directly + await self._send_aggregated_llm_text(frame) + else: + # Bot hasn't started speaking yet, queue the frame + self._queued_aggregated_text_frames.append(frame) + + async def _send_aggregated_llm_text(self, frame: AggregatedTextFrame): + """Send aggregated LLM text messages.""" # Skip certain aggregator types if configured to do so. if ( self._params.skip_aggregator_types