Merge pull request #4171 from pipecat-ai/pk/fix-gemini-3.1-flash-live-video

Gate Gemini Live sending real-time input messages to the API until it…
This commit is contained in:
kompfner
2026-03-27 16:26:03 -04:00
committed by GitHub

View File

@@ -803,6 +803,7 @@ class GeminiLiveLLMService(LLMService):
self._audio_input_paused = start_audio_paused
self._video_input_paused = start_video_paused
self._ready_for_realtime_input = False
self._context = None
self._api_key = api_key
self._http_options = update_google_client_http_options(http_options)
@@ -996,7 +997,7 @@ class GeminiLiveLLMService(LLMService):
async def _handle_user_started_speaking(self, frame):
self._user_is_speaking = True
if self._vad_disabled and self._session:
if self._vad_disabled and self._session and self._ready_for_realtime_input:
try:
await self._session.send_realtime_input(activity_start=ActivityStart())
except Exception as e:
@@ -1006,7 +1007,7 @@ class GeminiLiveLLMService(LLMService):
self._user_is_speaking = False
self._user_audio_buffer = bytearray()
await self.start_ttfb_metrics()
if self._vad_disabled and self._session:
if self._vad_disabled and self._session and self._ready_for_realtime_input:
try:
await self._session.send_realtime_input(activity_end=ActivityEnd())
except Exception as e:
@@ -1489,13 +1490,19 @@ class GeminiLiveLLMService(LLMService):
await self._session.close()
self._session = None
self._completed_tool_calls = set()
self._ready_for_realtime_input = False
self._disconnecting = False
except Exception as e:
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
async def _send_user_audio(self, frame):
"""Send user audio frame to Gemini Live API."""
if self._audio_input_paused or self._disconnecting or not self._session:
if (
self._audio_input_paused
or self._disconnecting
or not self._session
or not self._ready_for_realtime_input
):
return
# Send all audio to Gemini
@@ -1530,7 +1537,7 @@ class GeminiLiveLLMService(LLMService):
Args:
text: The text to send as user input.
"""
if self._disconnecting or not self._session:
if self._disconnecting or not self._session or not self._ready_for_realtime_input:
return
try:
@@ -1540,7 +1547,12 @@ class GeminiLiveLLMService(LLMService):
async def _send_user_video(self, frame):
"""Send user video frame to Gemini Live API."""
if self._video_input_paused or self._disconnecting or not self._session:
if (
self._video_input_paused
or self._disconnecting
or not self._session
or not self._ready_for_realtime_input
):
return
now = time.time()
@@ -1571,6 +1583,8 @@ class GeminiLiveLLMService(LLMService):
adapter: GeminiLLMAdapter = self.get_llm_adapter()
messages = adapter.get_llm_invocation_params(self._context).get("messages", [])
if not messages:
# No messages to seed convo with, so we're ready for realtime input right away
self._ready_for_realtime_input = True
return
logger.debug(f"Creating initial response: {messages}")
@@ -1594,6 +1608,8 @@ class GeminiLiveLLMService(LLMService):
if not self._inference_on_context_initialization and not self._is_gemini_3:
self._needs_initial_turn_complete_message = True
self._ready_for_realtime_input = True
async def _create_single_response(self, messages_list):
"""Create a single response from a list of messages.