From 5b2991f47f338e1ceb9ea52fe472ac59d65a76aa Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Fri, 27 Mar 2026 12:20:05 -0400 Subject: [PATCH] Gate Gemini Live sending real-time input messages to the API until it's ready, i.e. after we've sent the initial conversation history (or determined that we don't need to). This fixes the 26c example when using Gemini 3.1 Flash Live, which seems to be more strict about not receiving real-time input (at least, video messages) before conversation history. --- .../services/google/gemini_live/llm.py | 26 +++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/src/pipecat/services/google/gemini_live/llm.py b/src/pipecat/services/google/gemini_live/llm.py index f58972ea4..706c6d475 100644 --- a/src/pipecat/services/google/gemini_live/llm.py +++ b/src/pipecat/services/google/gemini_live/llm.py @@ -803,6 +803,7 @@ class GeminiLiveLLMService(LLMService): self._audio_input_paused = start_audio_paused self._video_input_paused = start_video_paused + self._ready_for_realtime_input = False self._context = None self._api_key = api_key self._http_options = update_google_client_http_options(http_options) @@ -996,7 +997,7 @@ class GeminiLiveLLMService(LLMService): async def _handle_user_started_speaking(self, frame): self._user_is_speaking = True - if self._vad_disabled and self._session: + if self._vad_disabled and self._session and self._ready_for_realtime_input: try: await self._session.send_realtime_input(activity_start=ActivityStart()) except Exception as e: @@ -1006,7 +1007,7 @@ class GeminiLiveLLMService(LLMService): self._user_is_speaking = False self._user_audio_buffer = bytearray() await self.start_ttfb_metrics() - if self._vad_disabled and self._session: + if self._vad_disabled and self._session and self._ready_for_realtime_input: try: await self._session.send_realtime_input(activity_end=ActivityEnd()) except Exception as e: @@ -1489,13 +1490,19 @@ class GeminiLiveLLMService(LLMService): await self._session.close() self._session = None self._completed_tool_calls = set() + self._ready_for_realtime_input = False self._disconnecting = False except Exception as e: await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e) async def _send_user_audio(self, frame): """Send user audio frame to Gemini Live API.""" - if self._audio_input_paused or self._disconnecting or not self._session: + if ( + self._audio_input_paused + or self._disconnecting + or not self._session + or not self._ready_for_realtime_input + ): return # Send all audio to Gemini @@ -1530,7 +1537,7 @@ class GeminiLiveLLMService(LLMService): Args: text: The text to send as user input. """ - if self._disconnecting or not self._session: + if self._disconnecting or not self._session or not self._ready_for_realtime_input: return try: @@ -1540,7 +1547,12 @@ class GeminiLiveLLMService(LLMService): async def _send_user_video(self, frame): """Send user video frame to Gemini Live API.""" - if self._video_input_paused or self._disconnecting or not self._session: + if ( + self._video_input_paused + or self._disconnecting + or not self._session + or not self._ready_for_realtime_input + ): return now = time.time() @@ -1571,6 +1583,8 @@ class GeminiLiveLLMService(LLMService): adapter: GeminiLLMAdapter = self.get_llm_adapter() messages = adapter.get_llm_invocation_params(self._context).get("messages", []) if not messages: + # No messages to seed convo with, so we're ready for realtime input right away + self._ready_for_realtime_input = True return logger.debug(f"Creating initial response: {messages}") @@ -1594,6 +1608,8 @@ class GeminiLiveLLMService(LLMService): if not self._inference_on_context_initialization and not self._is_gemini_3: self._needs_initial_turn_complete_message = True + self._ready_for_realtime_input = True + async def _create_single_response(self, messages_list): """Create a single response from a list of messages.