Add timeout for handling user transcript messages

This commit is contained in:
Mark Backman
2026-01-05 15:34:19 -05:00
parent c53c49558f
commit be621fbc5c
2 changed files with 56 additions and 1 deletions

1
changelog/3356.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed an issue in GeminiLiveLLMService where TranscriptionFrames were occasionally not pushed.

View File

@@ -11,6 +11,7 @@ Gemini Live API, supporting both text and audio modalities with
voice transcription, streaming responses, and tool usage.
"""
import asyncio
import base64
import io
import time
@@ -696,6 +697,7 @@ class GeminiLiveLLMService(LLMService):
self._bot_audio_buffer = bytearray()
self._bot_text_buffer = ""
self._llm_output_buffer = ""
self._transcription_timeout_task = None
self._sample_rate = 24000
@@ -1278,6 +1280,9 @@ class GeminiLiveLLMService(LLMService):
if self._connection_task:
await self.cancel_task(self._connection_task, timeout=1.0)
self._connection_task = None
if self._transcription_timeout_task:
await self.cancel_task(self._transcription_timeout_task)
self._transcription_timeout_task = None
if self._session:
await self._session.close()
self._session = None
@@ -1564,12 +1569,49 @@ class GeminiLiveLLMService(LLMService):
"""Handle a transcription result with tracing."""
pass
async def _transcription_timeout_handler(self):
"""Handle timeout for user transcription buffer.
If no new transcription messages arrive within the timeout period,
flush any remaining text in the buffer as a complete sentence.
"""
try:
# Wait for timeout period (0.5 seconds)
await asyncio.sleep(0.5)
# If we still have buffered text after timeout, flush it
if self._user_transcription_buffer:
logger.trace(
f"[Transcription:user:timeout] Flushing buffer: [{self._user_transcription_buffer}]"
)
complete_sentence = self._user_transcription_buffer
self._user_transcription_buffer = ""
await self._handle_user_transcription(
complete_sentence, True, self._settings["language"]
)
await self.push_frame(
TranscriptionFrame(
text=complete_sentence,
user_id="",
timestamp=time_now_iso8601(),
result=None,
),
FrameDirection.UPSTREAM,
)
except asyncio.CancelledError:
# Task was cancelled because new transcription arrived. This is expected
# when back to back transcription messages arrive.
logger.trace("Transcription timeout task cancelled (new text arrived)")
raise
async def _handle_msg_input_transcription(self, message: LiveServerMessage):
"""Handle the input transcription message.
Gemini Live sends user transcriptions in either single words or multi-word
phrases. As a result, we have to aggregate the input transcription. This handler
aggregates into sentences, splitting on the end of sentence markers.
aggregates into sentences, splitting on the end of sentence markers. If no
punctuation arrives within a timeout period, the buffer is flushed automatically.
"""
if not message.server_content.input_transcription:
return
@@ -1579,6 +1621,11 @@ class GeminiLiveLLMService(LLMService):
if not text:
return
# Cancel any existing timeout task since we received new text
if self._transcription_timeout_task:
await self.cancel_task(self._transcription_timeout_task)
self._transcription_timeout_task = None
# Strip leading space from sentence starts if buffer is empty
if text.startswith(" ") and not self._user_transcription_buffer:
text = text.lstrip()
@@ -1612,6 +1659,13 @@ class GeminiLiveLLMService(LLMService):
FrameDirection.UPSTREAM,
)
# If there's still text in the buffer (no end-of-sentence marker found),
# start a timeout task to flush it later
if self._user_transcription_buffer:
self._transcription_timeout_task = self.create_task(
self._transcription_timeout_handler()
)
async def _handle_msg_output_transcription(self, message: LiveServerMessage):
"""Handle the output transcription message."""
if not message.server_content.output_transcription: