From be621fbc5cfe3b792299c03991777a2efe519868 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Mon, 5 Jan 2026 15:34:19 -0500 Subject: [PATCH] Add timeout for handling user transcript messages --- changelog/3356.fixed.md | 1 + .../services/google/gemini_live/llm.py | 56 ++++++++++++++++++- 2 files changed, 56 insertions(+), 1 deletion(-) create mode 100644 changelog/3356.fixed.md diff --git a/changelog/3356.fixed.md b/changelog/3356.fixed.md new file mode 100644 index 000000000..0532e742a --- /dev/null +++ b/changelog/3356.fixed.md @@ -0,0 +1 @@ +- Fixed an issue in GeminiLiveLLMService where TranscriptionFrames were occasionally not pushed. diff --git a/src/pipecat/services/google/gemini_live/llm.py b/src/pipecat/services/google/gemini_live/llm.py index 6071da151..088aaeb98 100644 --- a/src/pipecat/services/google/gemini_live/llm.py +++ b/src/pipecat/services/google/gemini_live/llm.py @@ -11,6 +11,7 @@ Gemini Live API, supporting both text and audio modalities with voice transcription, streaming responses, and tool usage. """ +import asyncio import base64 import io import time @@ -696,6 +697,7 @@ class GeminiLiveLLMService(LLMService): self._bot_audio_buffer = bytearray() self._bot_text_buffer = "" self._llm_output_buffer = "" + self._transcription_timeout_task = None self._sample_rate = 24000 @@ -1278,6 +1280,9 @@ class GeminiLiveLLMService(LLMService): if self._connection_task: await self.cancel_task(self._connection_task, timeout=1.0) self._connection_task = None + if self._transcription_timeout_task: + await self.cancel_task(self._transcription_timeout_task) + self._transcription_timeout_task = None if self._session: await self._session.close() self._session = None @@ -1564,12 +1569,49 @@ class GeminiLiveLLMService(LLMService): """Handle a transcription result with tracing.""" pass + async def _transcription_timeout_handler(self): + """Handle timeout for user transcription buffer. + + If no new transcription messages arrive within the timeout period, + flush any remaining text in the buffer as a complete sentence. + """ + try: + # Wait for timeout period (0.5 seconds) + await asyncio.sleep(0.5) + + # If we still have buffered text after timeout, flush it + if self._user_transcription_buffer: + logger.trace( + f"[Transcription:user:timeout] Flushing buffer: [{self._user_transcription_buffer}]" + ) + complete_sentence = self._user_transcription_buffer + self._user_transcription_buffer = "" + + await self._handle_user_transcription( + complete_sentence, True, self._settings["language"] + ) + await self.push_frame( + TranscriptionFrame( + text=complete_sentence, + user_id="", + timestamp=time_now_iso8601(), + result=None, + ), + FrameDirection.UPSTREAM, + ) + except asyncio.CancelledError: + # Task was cancelled because new transcription arrived. This is expected + # when back to back transcription messages arrive. + logger.trace("Transcription timeout task cancelled (new text arrived)") + raise + async def _handle_msg_input_transcription(self, message: LiveServerMessage): """Handle the input transcription message. Gemini Live sends user transcriptions in either single words or multi-word phrases. As a result, we have to aggregate the input transcription. This handler - aggregates into sentences, splitting on the end of sentence markers. + aggregates into sentences, splitting on the end of sentence markers. If no + punctuation arrives within a timeout period, the buffer is flushed automatically. """ if not message.server_content.input_transcription: return @@ -1579,6 +1621,11 @@ class GeminiLiveLLMService(LLMService): if not text: return + # Cancel any existing timeout task since we received new text + if self._transcription_timeout_task: + await self.cancel_task(self._transcription_timeout_task) + self._transcription_timeout_task = None + # Strip leading space from sentence starts if buffer is empty if text.startswith(" ") and not self._user_transcription_buffer: text = text.lstrip() @@ -1612,6 +1659,13 @@ class GeminiLiveLLMService(LLMService): FrameDirection.UPSTREAM, ) + # If there's still text in the buffer (no end-of-sentence marker found), + # start a timeout task to flush it later + if self._user_transcription_buffer: + self._transcription_timeout_task = self.create_task( + self._transcription_timeout_handler() + ) + async def _handle_msg_output_transcription(self, message: LiveServerMessage): """Handle the output transcription message.""" if not message.server_content.output_transcription: