From 728361a6a7d83f4074e62581ae86ee91b4c00e18 Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Mon, 6 Oct 2025 10:17:50 -0400 Subject: [PATCH 1/6] Add `GeminiVertexMultimodalLiveLLMService` --- CHANGELOG.md | 3 + .../26h-gemini-multimodal-live-vertex.py | 134 +++++++++++++ .../services/gemini_multimodal_live/gemini.py | 42 +++- .../services/gemini_multimodal_live/vertex.py | 187 ++++++++++++++++++ 4 files changed, 360 insertions(+), 6 deletions(-) create mode 100644 examples/foundational/26h-gemini-multimodal-live-vertex.py create mode 100644 src/pipecat/services/gemini_multimodal_live/vertex.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 10cd8738f..6999bd37c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added `GeminiVertexMultimodalLiveLLMService`, for accessing Gemini Live via + Google Vertex AI. + - Added some new configuration options to `GeminiMultimodalLiveLLMService`: - `thinking` diff --git a/examples/foundational/26h-gemini-multimodal-live-vertex.py b/examples/foundational/26h-gemini-multimodal-live-vertex.py new file mode 100644 index 000000000..581d2524a --- /dev/null +++ b/examples/foundational/26h-gemini-multimodal-live-vertex.py @@ -0,0 +1,134 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import os + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.audio.vad.vad_analyzer import VADParams +from pipecat.frames.frames import LLMMessagesAppendFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService +from pipecat.services.gemini_multimodal_live.vertex import GeminiVertexMultimodalLiveLLMService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams +from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams + +# Load environment variables +load_dotenv(override=True) + + +# We store functions so objects (e.g. SileroVADAnalyzer) don't get +# instantiated. The function will be called when the desired transport gets +# selected. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + # set stop_secs to something roughly similar to the internal setting + # of the Multimodal Live api, just to align events. + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)), + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + # set stop_secs to something roughly similar to the internal setting + # of the Multimodal Live api, just to align events. + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)), + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + # set stop_secs to something roughly similar to the internal setting + # of the Multimodal Live api, just to align events. + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)), + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info(f"Starting bot") + + # Create the Gemini Vertex Multimodal Live LLM service + system_instruction = f""" + You are a helpful AI assistant. + Your goal is to demonstrate your capabilities in a helpful and engaging way. + Your output will be converted to audio so don't include special characters in your answers. + Respond to what the user said in a creative and helpful way. + """ + + llm = GeminiVertexMultimodalLiveLLMService( + credentials=os.getenv("GOOGLE_VERTEX_TEST_CREDENTIALS"), + project_id=os.getenv("GOOGLE_CLOUD_PROJECT_ID"), + location=os.getenv("GOOGLE_CLOUD_LOCATION"), + system_instruction=system_instruction, + voice_id="Puck", # Aoede, Charon, Fenrir, Kore, Puck + ) + + # Build the pipeline + pipeline = Pipeline( + [ + transport.input(), + llm, + transport.output(), + ] + ) + + # Configure the pipeline task + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) + + # Handle client connection event + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + # Kick off the conversation. + await task.queue_frames( + [ + LLMMessagesAppendFrame( + messages=[ + { + "role": "user", + "content": f"Greet the user and introduce yourself.", + } + ] + ) + ] + ) + + # Handle client disconnection events + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + # Run the pipeline + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/src/pipecat/services/gemini_multimodal_live/gemini.py b/src/pipecat/services/gemini_multimodal_live/gemini.py index 65b5eb781..0de76fbf1 100644 --- a/src/pipecat/services/gemini_multimodal_live/gemini.py +++ b/src/pipecat/services/gemini_multimodal_live/gemini.py @@ -601,7 +601,8 @@ class GeminiMultimodalLiveLLMService(LLMService): self._audio_input_paused = start_audio_paused self._video_input_paused = start_video_paused self._context = None - self._create_client(api_key, http_options) + self._api_key = api_key + self._http_options = http_options self._session: AsyncSession = None self._connection_task = None @@ -649,8 +650,8 @@ class GeminiMultimodalLiveLLMService(LLMService): "extra": params.extra if isinstance(params.extra, dict) else {}, } - # Initialize the File API client - self.file_api = GeminiFileAPI(api_key=api_key, base_url=file_api_base_url) + self._file_api_base_url = file_api_base_url + self._file_api: Optional[GeminiFileAPI] = None # Grounding metadata tracking self._search_result_buffer = "" @@ -662,8 +663,23 @@ class GeminiMultimodalLiveLLMService(LLMService): # Bookkeeping for ending gracefully (i.e. after the bot is finished) self._end_frame_pending_bot_turn_finished: Optional[EndFrame] = None - def _create_client(self, api_key: str, http_options: Optional[HttpOptions] = None): - self._client = Client(api_key=api_key, http_options=http_options) + # Initialize the API client. Subclasses can override this if needed. + self.create_client() + + def create_client(self): + """Create the Gemini API client instance. Subclasses can override this.""" + self._client = Client(api_key=self._api_key, http_options=self._http_options) + + @property + def file_api(self) -> GeminiFileAPI: + """Get the Gemini File API client instance. Subclasses can override this. + + Returns: + The Gemini File API client. + """ + if not self._file_api: + self._file_api = GeminiFileAPI(api_key=self._api_key, base_url=self._file_api_base_url) + return self._file_api def can_generate_metrics(self) -> bool: """Check if the service can generate usage metrics. @@ -1282,7 +1298,21 @@ class GeminiMultimodalLiveLLMService(LLMService): inline_data = part.inline_data if not inline_data: return - if inline_data.mime_type != f"audio/pcm;rate={self._sample_rate}": + + # Check if mime type matches expected format + expected_mime_type = f"audio/pcm;rate={self._sample_rate}" + if inline_data.mime_type == expected_mime_type: + # Perfect match, continue processing + pass + elif inline_data.mime_type == "audio/pcm": + # Sample rate not provided in mime type, assume default + if not hasattr(self, "_sample_rate_warning_logged"): + logger.warning( + f"Sample rate not provided in mime type '{inline_data.mime_type}', assuming rate of {self._sample_rate}" + ) + self._sample_rate_warning_logged = True + else: + # Unrecognized format logger.warning(f"Unrecognized server_content format {inline_data.mime_type}") return diff --git a/src/pipecat/services/gemini_multimodal_live/vertex.py b/src/pipecat/services/gemini_multimodal_live/vertex.py new file mode 100644 index 000000000..e07eb474f --- /dev/null +++ b/src/pipecat/services/gemini_multimodal_live/vertex.py @@ -0,0 +1,187 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Google Vertex AI Gemini Multimodal Live service. + +This module provides integration with Google's Gemini Multimodal Live model via +Vertex AI, supporting both text and audio modalities with voice transcription, +streaming responses, and tool usage. +""" + +import json +import os +from typing import List, Optional, Union + +from loguru import logger + +from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.services.gemini_multimodal_live.gemini import ( + GeminiMultimodalLiveLLMService, + HttpOptions, + InputParams, +) + +try: + from google.auth import default + from google.auth.exceptions import GoogleAuthError + from google.auth.transport.requests import Request + from google.genai import Client + from google.oauth2 import service_account + +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error( + "In order to use Google Vertex AI, you need to `pip install pipecat-ai[google]`. Also, set up your Google credentials properly." + ) + raise Exception(f"Missing module: {e}") + + +class GeminiVertexMultimodalLiveLLMService(GeminiMultimodalLiveLLMService): + """Google Vertex AI Gemini Multimodal Live service. + + Provides access to Google's Gemini Multimodal Live model via Vertex AI, + supporting both text and audio modalities. It handles voice transcription, + streaming audio responses, and tool usage. + """ + + def __init__( + self, + *, + credentials: Optional[str] = None, + credentials_path: Optional[str] = None, + location: str = "us-east4", + project_id: str, + model="google/gemini-2.0-flash-live-preview-04-09", + voice_id: str = "Charon", + start_audio_paused: bool = False, + start_video_paused: bool = False, + system_instruction: Optional[str] = None, + tools: Optional[Union[List[dict], ToolsSchema]] = None, + params: Optional[InputParams] = None, + inference_on_context_initialization: bool = True, + file_api_base_url: str = "https://generativelanguage.googleapis.com/v1beta/files", + http_options: Optional[HttpOptions] = None, + **kwargs, + ): + """Initialize the Google Vertex AI Gemini Multimodal Live service. + + Args: + credentials: JSON string of service account credentials. + credentials_path: Path to the service account JSON file. + location: GCP region for Vertex AI endpoint (e.g., "us-east4"). + project_id: Google Cloud project ID. + model: Model identifier to use. Defaults to "models/gemini-2.0-flash-live-preview-04-09". + voice_id: TTS voice identifier. Defaults to "Charon". + start_audio_paused: Whether to start with audio input paused. Defaults to False. + start_video_paused: Whether to start with video input paused. Defaults to False. + system_instruction: System prompt for the model. Defaults to None. + tools: Tools/functions available to the model. Defaults to None. + params: Configuration parameters for the model along with Vertex AI + location and project ID. + inference_on_context_initialization: Whether to generate a response when context + is first set. Defaults to True. + file_api_base_url: Base URL for the Gemini File API. Defaults to the official endpoint. + http_options: HTTP options for the client. + **kwargs: Additional arguments passed to parent GeminiMultimodalLiveLLMService. + """ + # Check if user incorrectly passed api_key, which is used by parent + # class but not here. + if "api_key" in kwargs: + logger.error( + "GeminiVertexMultimodalLiveLLMService does not accept 'api_key' parameter. " + "Use 'credentials' or 'credentials_path' instead for Vertex AI authentication." + ) + raise ValueError( + "Invalid parameter 'api_key'. Use 'credentials' or 'credentials_path' for Vertex AI authentication." + ) + + # These need to be set before calling super().__init__() because + # super().__init__() invokes create_client(), which needs these. + self._credentials = self._get_credentials(credentials, credentials_path) + self._project_id = project_id + self._location = location + + # Call parent constructor with the obtained API key + super().__init__( + # api_key is required by parent class, but actually not used with + # Vertex + api_key="dummy", + model=model, + voice_id=voice_id, + start_audio_paused=start_audio_paused, + start_video_paused=start_video_paused, + system_instruction=system_instruction, + tools=tools, + params=params, + inference_on_context_initialization=inference_on_context_initialization, + file_api_base_url=file_api_base_url, + http_options=http_options, + **kwargs, + ) + + def create_client(self): + """Create the Gemini client instance.""" + self._client = Client( + vertexai=True, + credentials=self._credentials, + project=self._project_id, + location=self._location, + ) + + @property + def file_api(self): + """Gemini File API is not supported with Vertex AI.""" + raise NotImplementedError( + "When using Vertex AI, the recommended approach is to use Google Cloud Storage for file handling. The Gemini File API is not directly supported in this context." + ) + + @staticmethod + def _get_credentials(credentials: Optional[str], credentials_path: Optional[str]) -> str: + """Retrieve Credentials using Google service account credentials JSON. + + Supports multiple authentication methods: + 1. Direct JSON credentials string + 2. Path to service account JSON file + 3. Default application credentials (ADC) + + Args: + credentials: JSON string of service account credentials. + credentials_path: Path to the service account JSON file. + + Returns: + OAuth token for API authentication. + + Raises: + ValueError: If no valid credentials are provided or found. + """ + creds: Optional[service_account.Credentials] = None + + if credentials: + # Parse and load credentials from JSON string + creds = service_account.Credentials.from_service_account_info( + json.loads(credentials), + scopes=["https://www.googleapis.com/auth/cloud-platform"], + ) + elif credentials_path: + # Load credentials from JSON file + creds = service_account.Credentials.from_service_account_file( + credentials_path, + scopes=["https://www.googleapis.com/auth/cloud-platform"], + ) + else: + try: + creds, project_id = default( + scopes=["https://www.googleapis.com/auth/cloud-platform"] + ) + except GoogleAuthError: + pass + + if not creds: + raise ValueError("No valid credentials provided.") + + creds.refresh(Request()) # Ensure token is up-to-date, lifetime is 1 hour. + + return creds From a14fb20d15e976b49e41fe09e27326db5515e58e Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Mon, 6 Oct 2025 15:10:23 -0400 Subject: [PATCH 2/6] Fix Gemini Live w/Vertex AI not being able to handle an empty list provided for "function_declarations" --- src/pipecat/adapters/services/gemini_adapter.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/pipecat/adapters/services/gemini_adapter.py b/src/pipecat/adapters/services/gemini_adapter.py index 63a86e6d2..abe33a8ec 100644 --- a/src/pipecat/adapters/services/gemini_adapter.py +++ b/src/pipecat/adapters/services/gemini_adapter.py @@ -87,9 +87,11 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]): Includes both converted standard tools and any custom Gemini-specific tools. """ functions_schema = tools_schema.standard_tools - formatted_standard_tools = [ - {"function_declarations": [func.to_default_dict() for func in functions_schema]} - ] + formatted_standard_tools = ( + [{"function_declarations": [func.to_default_dict() for func in functions_schema]}] + if functions_schema + else [] + ) custom_gemini_tools = [] if tools_schema.custom_tools: custom_gemini_tools = tools_schema.custom_tools.get(AdapterType.GEMINI, []) From 0b6dd98000e2426c7a7bd8539f2b31d65288d6b0 Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Mon, 6 Oct 2025 15:13:05 -0400 Subject: [PATCH 3/6] Make a note in our examples that there's an issue with Gemini Live + Vertex around using "google_search" alongside other tools --- .../26b-gemini-multimodal-live-function-calling.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/examples/foundational/26b-gemini-multimodal-live-function-calling.py b/examples/foundational/26b-gemini-multimodal-live-function-calling.py index f14713a5c..a5024d4a6 100644 --- a/examples/foundational/26b-gemini-multimodal-live-function-calling.py +++ b/examples/foundational/26b-gemini-multimodal-live-function-calling.py @@ -122,6 +122,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): required=["location"], ) search_tool = {"google_search": {}} + # KNOWN ISSUE: If using GeminiVertexMultimodalLiveLLMService, it appears + # you cannot use the "google_search" tool alongside other tools. + # See https://github.com/googleapis/python-genai/issues/941. tools = ToolsSchema( standard_tools=[weather_function, restaurant_function], custom_tools={AdapterType.GEMINI: [search_tool]}, From 2699f0c2a61fee9ffb8e60766f90a75caee5ff8d Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Mon, 6 Oct 2025 15:39:08 -0400 Subject: [PATCH 4/6] Fix tool calls when using Gemini Live + Vertex AI --- src/pipecat/services/gemini_multimodal_live/gemini.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/pipecat/services/gemini_multimodal_live/gemini.py b/src/pipecat/services/gemini_multimodal_live/gemini.py index 0de76fbf1..8ab836173 100644 --- a/src/pipecat/services/gemini_multimodal_live/gemini.py +++ b/src/pipecat/services/gemini_multimodal_live/gemini.py @@ -16,6 +16,7 @@ import io import json import random import time +import uuid from dataclasses import dataclass from enum import Enum from typing import Any, Dict, List, Optional, Union @@ -1345,7 +1346,11 @@ class GeminiMultimodalLiveLLMService(LLMService): function_calls_llm = [ FunctionCallFromLLM( context=self._context, - tool_call_id=f.id, + tool_call_id=( + # NOTE: when using Vertex AI we don't get server-provided + # tool call IDs here + f.id or str(uuid.uuid4()) + ), function_name=f.name, arguments=f.args, ) From 99f008e9279b9565c21fc9585d4aaabf0aa7991b Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Mon, 6 Oct 2025 15:54:56 -0400 Subject: [PATCH 5/6] Make a note in our examples that there's an issue with Gemini Live + Vertex around specifying a modality other than AUDIO --- examples/foundational/26d-gemini-multimodal-live-text.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/examples/foundational/26d-gemini-multimodal-live-text.py b/examples/foundational/26d-gemini-multimodal-live-text.py index 667887b03..dd3e0d65b 100644 --- a/examples/foundational/26d-gemini-multimodal-live-text.py +++ b/examples/foundational/26d-gemini-multimodal-live-text.py @@ -80,6 +80,8 @@ transport_params = { async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info(f"Starting bot") + # KNOWN ISSUE: If using GeminiVertexMultimodalLiveLLMService, it appears + # you cannot specify a modality other than AUDIO. llm = GeminiMultimodalLiveLLMService( api_key=os.getenv("GOOGLE_API_KEY"), system_instruction=SYSTEM_INSTRUCTION, From f2d90639845b90d25c9f8218697a32936c4e856e Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Thu, 9 Oct 2025 09:40:47 -0400 Subject: [PATCH 6/6] Renames: remove "multimodal" from Gemini Live types --- .../foundational/26-gemini-multimodal-live.py | 4 +- ...6a-gemini-multimodal-live-transcription.py | 4 +- ...gemini-multimodal-live-function-calling.py | 4 +- .../26c-gemini-multimodal-live-video.py | 4 +- .../26d-gemini-multimodal-live-text.py | 10 +- .../26e-gemini-multimodal-google-search.py | 4 +- .../26f-gemini-multimodal-live-files-api.py | 6 +- ...emini-multimodal-live-groundingMetadata.py | 4 +- .../26h-gemini-multimodal-live-vertex.py | 6 +- ...26i-gemini-multimodal-live-graceful-end.py | 4 +- examples/foundational/46-video-processing.py | 4 +- src/pipecat/services/gemini_live/__init__.py | 3 + src/pipecat/services/gemini_live/file_api.py | 189 ++ src/pipecat/services/gemini_live/gemini.py | 1582 ++++++++++++++++ .../vertex.py | 24 +- .../services/gemini_multimodal_live/events.py | 17 +- .../gemini_multimodal_live/file_api.py | 194 +- .../services/gemini_multimodal_live/gemini.py | 1617 +---------------- 18 files changed, 1887 insertions(+), 1793 deletions(-) create mode 100644 src/pipecat/services/gemini_live/__init__.py create mode 100644 src/pipecat/services/gemini_live/file_api.py create mode 100644 src/pipecat/services/gemini_live/gemini.py rename src/pipecat/services/{gemini_multimodal_live => gemini_live}/vertex.py (89%) diff --git a/examples/foundational/26-gemini-multimodal-live.py b/examples/foundational/26-gemini-multimodal-live.py index b446a9b8c..f063c247f 100644 --- a/examples/foundational/26-gemini-multimodal-live.py +++ b/examples/foundational/26-gemini-multimodal-live.py @@ -17,7 +17,7 @@ from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport -from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService +from pipecat.services.gemini_live.gemini import GeminiLiveLLMService from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams @@ -65,7 +65,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): Respond to what the user said in a creative and helpful way. """ - llm = GeminiMultimodalLiveLLMService( + llm = GeminiLiveLLMService( api_key=os.getenv("GOOGLE_API_KEY"), system_instruction=system_instruction, voice_id="Puck", # Aoede, Charon, Fenrir, Kore, Puck diff --git a/examples/foundational/26a-gemini-multimodal-live-transcription.py b/examples/foundational/26a-gemini-multimodal-live-transcription.py index ad3cd06ee..3c0eb19ce 100644 --- a/examples/foundational/26a-gemini-multimodal-live-transcription.py +++ b/examples/foundational/26a-gemini-multimodal-live-transcription.py @@ -20,7 +20,7 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.processors.transcript_processor import TranscriptProcessor from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport -from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService +from pipecat.services.gemini_live.gemini import GeminiLiveLLMService from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams @@ -65,7 +65,7 @@ transport_params = { async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info(f"Starting bot") - llm = GeminiMultimodalLiveLLMService( + llm = GeminiLiveLLMService( api_key=os.getenv("GOOGLE_API_KEY"), voice_id="Aoede", # Puck, Charon, Kore, Fenrir, Aoede # system_instruction="Talk like a pirate." diff --git a/examples/foundational/26b-gemini-multimodal-live-function-calling.py b/examples/foundational/26b-gemini-multimodal-live-function-calling.py index a5024d4a6..a86822a20 100644 --- a/examples/foundational/26b-gemini-multimodal-live-function-calling.py +++ b/examples/foundational/26b-gemini-multimodal-live-function-calling.py @@ -22,7 +22,7 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport -from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService +from pipecat.services.gemini_live.gemini import GeminiLiveLLMService from pipecat.services.llm_service import FunctionCallParams from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams @@ -130,7 +130,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): custom_tools={AdapterType.GEMINI: [search_tool]}, ) - llm = GeminiMultimodalLiveLLMService( + llm = GeminiLiveLLMService( api_key=os.getenv("GOOGLE_API_KEY"), system_instruction=system_instruction, tools=tools, diff --git a/examples/foundational/26c-gemini-multimodal-live-video.py b/examples/foundational/26c-gemini-multimodal-live-video.py index a28eaaacf..7eac0bc0b 100644 --- a/examples/foundational/26c-gemini-multimodal-live-video.py +++ b/examples/foundational/26c-gemini-multimodal-live-video.py @@ -24,7 +24,7 @@ from pipecat.runner.utils import ( maybe_capture_participant_camera, maybe_capture_participant_screen, ) -from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService +from pipecat.services.gemini_live.gemini import GeminiLiveLLMService from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams @@ -58,7 +58,7 @@ transport_params = { async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): - llm = GeminiMultimodalLiveLLMService( + llm = GeminiLiveLLMService( api_key=os.getenv("GOOGLE_API_KEY"), voice_id="Aoede", # Puck, Charon, Kore, Fenrir, Aoede # system_instruction="Talk like a pirate." diff --git a/examples/foundational/26d-gemini-multimodal-live-text.py b/examples/foundational/26d-gemini-multimodal-live-text.py index dd3e0d65b..625ca17d0 100644 --- a/examples/foundational/26d-gemini-multimodal-live-text.py +++ b/examples/foundational/26d-gemini-multimodal-live-text.py @@ -20,9 +20,9 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.cartesia.tts import CartesiaTTSService -from pipecat.services.gemini_multimodal_live.gemini import ( - GeminiMultimodalLiveLLMService, - GeminiMultimodalModalities, +from pipecat.services.gemini_live.gemini import ( + GeminiLiveLLMService, + GeminiModalities, InputParams, ) from pipecat.transports.base_transport import BaseTransport, TransportParams @@ -82,11 +82,11 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): # KNOWN ISSUE: If using GeminiVertexMultimodalLiveLLMService, it appears # you cannot specify a modality other than AUDIO. - llm = GeminiMultimodalLiveLLMService( + llm = GeminiLiveLLMService( api_key=os.getenv("GOOGLE_API_KEY"), system_instruction=SYSTEM_INSTRUCTION, tools=[{"google_search": {}}, {"code_execution": {}}], - params=InputParams(modalities=GeminiMultimodalModalities.TEXT), + params=InputParams(modalities=GeminiModalities.TEXT), ) # Optionally, you can set the response modalities via a function diff --git a/examples/foundational/26e-gemini-multimodal-google-search.py b/examples/foundational/26e-gemini-multimodal-google-search.py index 31fad54fe..1bcf02db9 100644 --- a/examples/foundational/26e-gemini-multimodal-google-search.py +++ b/examples/foundational/26e-gemini-multimodal-google-search.py @@ -19,7 +19,7 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport -from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService +from pipecat.services.gemini_live.gemini import GeminiLiveLLMService from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams @@ -83,7 +83,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info(f"Starting bot") # Initialize the Gemini Multimodal Live model - llm = GeminiMultimodalLiveLLMService( + llm = GeminiLiveLLMService( api_key=os.getenv("GOOGLE_API_KEY"), voice_id="Puck", # Aoede, Charon, Fenrir, Kore, Puck system_instruction=system_instruction, diff --git a/examples/foundational/26f-gemini-multimodal-live-files-api.py b/examples/foundational/26f-gemini-multimodal-live-files-api.py index b01c21803..7ce4a483d 100644 --- a/examples/foundational/26f-gemini-multimodal-live-files-api.py +++ b/examples/foundational/26f-gemini-multimodal-live-files-api.py @@ -19,8 +19,8 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport -from pipecat.services.gemini_multimodal_live.gemini import ( - GeminiMultimodalLiveLLMService, +from pipecat.services.gemini_live.gemini import ( + GeminiLiveLLMService, ) from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams @@ -110,7 +110,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): """ # Initialize Gemini service with File API support - llm = GeminiMultimodalLiveLLMService( + llm = GeminiLiveLLMService( api_key=os.getenv("GOOGLE_API_KEY"), system_instruction=system_instruction, voice_id="Charon", # Aoede, Charon, Fenrir, Kore, Puck diff --git a/examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py b/examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py index 6cfeed51b..9b8b25315 100644 --- a/examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py +++ b/examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py @@ -14,7 +14,7 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport -from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService +from pipecat.services.gemini_live.gemini import GeminiLiveLLMService from pipecat.services.google.frames import LLMSearchResponseFrame from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams @@ -105,7 +105,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): custom_tools={AdapterType.GEMINI: [{"google_search": {}}, {"code_execution": {}}]}, ) - llm = GeminiMultimodalLiveLLMService( + llm = GeminiLiveLLMService( api_key=os.getenv("GOOGLE_API_KEY"), system_instruction=SYSTEM_INSTRUCTION, voice_id="Charon", # Aoede, Charon, Fenrir, Kore, Puck diff --git a/examples/foundational/26h-gemini-multimodal-live-vertex.py b/examples/foundational/26h-gemini-multimodal-live-vertex.py index 581d2524a..4210dc0e2 100644 --- a/examples/foundational/26h-gemini-multimodal-live-vertex.py +++ b/examples/foundational/26h-gemini-multimodal-live-vertex.py @@ -17,8 +17,8 @@ from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport -from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService -from pipecat.services.gemini_multimodal_live.vertex import GeminiVertexMultimodalLiveLLMService +from pipecat.services.gemini_live.gemini import GeminiLiveLLMService +from pipecat.services.gemini_live.vertex import GeminiLiveVertexLLMService from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams @@ -66,7 +66,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): Respond to what the user said in a creative and helpful way. """ - llm = GeminiVertexMultimodalLiveLLMService( + llm = GeminiLiveVertexLLMService( credentials=os.getenv("GOOGLE_VERTEX_TEST_CREDENTIALS"), project_id=os.getenv("GOOGLE_CLOUD_PROJECT_ID"), location=os.getenv("GOOGLE_CLOUD_LOCATION"), diff --git a/examples/foundational/26i-gemini-multimodal-live-graceful-end.py b/examples/foundational/26i-gemini-multimodal-live-graceful-end.py index e178997b8..9451d0ef0 100644 --- a/examples/foundational/26i-gemini-multimodal-live-graceful-end.py +++ b/examples/foundational/26i-gemini-multimodal-live-graceful-end.py @@ -24,7 +24,7 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.processors.frame_processor import FrameDirection from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport -from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService +from pipecat.services.gemini_live.gemini import GeminiLiveLLMService from pipecat.services.llm_service import FunctionCallParams from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams @@ -144,7 +144,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): custom_tools={AdapterType.GEMINI: [search_tool]}, ) - llm = GeminiMultimodalLiveLLMService( + llm = GeminiLiveLLMService( api_key=os.getenv("GOOGLE_API_KEY"), system_instruction=system_instruction, tools=tools, diff --git a/examples/foundational/46-video-processing.py b/examples/foundational/46-video-processing.py index bfb718ff2..7f7363538 100644 --- a/examples/foundational/46-video-processing.py +++ b/examples/foundational/46-video-processing.py @@ -20,7 +20,7 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIProcessor from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport -from pipecat.services.gemini_multimodal_live import GeminiMultimodalLiveLLMService +from pipecat.services.gemini_live import GeminiLiveLLMService from pipecat.transports.base_transport import TransportParams from pipecat.transports.daily.transport import DailyParams, DailyTransport @@ -94,7 +94,7 @@ Respond to what the user said in a creative and helpful way. Keep your responses async def run_bot(pipecat_transport): - llm = GeminiMultimodalLiveLLMService( + llm = GeminiLiveLLMService( api_key=os.getenv("GOOGLE_API_KEY"), voice_id="Puck", # Aoede, Charon, Fenrir, Kore, Puck transcribe_user_audio=True, diff --git a/src/pipecat/services/gemini_live/__init__.py b/src/pipecat/services/gemini_live/__init__.py new file mode 100644 index 000000000..6a2d33bd8 --- /dev/null +++ b/src/pipecat/services/gemini_live/__init__.py @@ -0,0 +1,3 @@ +from .file_api import GeminiFileAPI +from .gemini import GeminiLiveLLMService +from .vertex import GeminiLiveVertexLLMService diff --git a/src/pipecat/services/gemini_live/file_api.py b/src/pipecat/services/gemini_live/file_api.py new file mode 100644 index 000000000..5ae7fdbb7 --- /dev/null +++ b/src/pipecat/services/gemini_live/file_api.py @@ -0,0 +1,189 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Gemini File API client for uploading and managing files. + +This module provides a client for Google's Gemini File API, enabling file +uploads, metadata retrieval, listing, and deletion. Files uploaded through +this API can be referenced in Gemini generative model calls. +""" + +import mimetypes +from typing import Any, Dict, Optional + +import aiohttp +from loguru import logger + + +class GeminiFileAPI: + """Client for the Gemini File API. + + This class provides methods for uploading, fetching, listing, and deleting files + through Google's Gemini File API. + + Files uploaded through this API remain available for 48 hours and can be referenced + in calls to the Gemini generative models. Maximum file size is 2GB, with total + project storage limited to 20GB. + """ + + def __init__( + self, api_key: str, base_url: str = "https://generativelanguage.googleapis.com/v1beta/files" + ): + """Initialize the Gemini File API client. + + Args: + api_key: Google AI API key + base_url: Base URL for the Gemini File API (default is the v1beta endpoint) + """ + self._api_key = api_key + self._base_url = base_url + # Upload URL uses the /upload/ path + self.upload_base_url = "https://generativelanguage.googleapis.com/upload/v1beta/files" + + async def upload_file( + self, file_path: str, display_name: Optional[str] = None + ) -> Dict[str, Any]: + """Upload a file to the Gemini File API using the correct resumable upload protocol. + + Args: + file_path: Path to the file to upload + display_name: Optional display name for the file + + Returns: + File metadata including uri, name, and display_name + """ + logger.info(f"Uploading file: {file_path}") + + async with aiohttp.ClientSession() as session: + # Determine the file's MIME type + mime_type, _ = mimetypes.guess_type(file_path) + if not mime_type: + mime_type = "application/octet-stream" + + # Read the file + with open(file_path, "rb") as f: + file_data = f.read() + + # Create the metadata payload + metadata = {} + if display_name: + metadata = {"file": {"display_name": display_name}} + + # Step 1: Initial resumable request to get upload URL + headers = { + "X-Goog-Upload-Protocol": "resumable", + "X-Goog-Upload-Command": "start", + "X-Goog-Upload-Header-Content-Length": str(len(file_data)), + "X-Goog-Upload-Header-Content-Type": mime_type, + "Content-Type": "application/json", + } + + logger.debug(f"Step 1: Getting upload URL from {self.upload_base_url}") + async with session.post( + f"{self.upload_base_url}?key={self._api_key}", headers=headers, json=metadata + ) as response: + if response.status != 200: + error_text = await response.text() + logger.error(f"Error initiating file upload: {error_text}") + raise Exception(f"Failed to initiate upload: {response.status} - {error_text}") + + # Get the upload URL from the response header + upload_url = response.headers.get("X-Goog-Upload-URL") + if not upload_url: + logger.error(f"Response headers: {dict(response.headers)}") + raise Exception("No upload URL in response headers") + + logger.debug(f"Got upload URL: {upload_url}") + + # Step 2: Upload the actual file data + upload_headers = { + "Content-Length": str(len(file_data)), + "X-Goog-Upload-Offset": "0", + "X-Goog-Upload-Command": "upload, finalize", + } + + logger.debug(f"Step 2: Uploading file data to {upload_url}") + async with session.post(upload_url, headers=upload_headers, data=file_data) as response: + if response.status != 200: + error_text = await response.text() + logger.error(f"Error uploading file data: {error_text}") + raise Exception(f"Failed to upload file: {response.status} - {error_text}") + + file_info = await response.json() + logger.info(f"File uploaded successfully: {file_info.get('file', {}).get('name')}") + return file_info + + async def get_file(self, name: str) -> Dict[str, Any]: + """Get metadata for a file. + + Args: + name: File name (or full path) + + Returns: + File metadata + """ + # Extract just the name part if a full path is provided + if "/" in name: + name = name.split("/")[-1] + + async with aiohttp.ClientSession() as session: + async with session.get(f"{self._base_url}/{name}?key={self._api_key}") as response: + if response.status != 200: + error_text = await response.text() + logger.error(f"Error getting file metadata: {error_text}") + raise Exception(f"Failed to get file metadata: {response.status}") + + file_info = await response.json() + return file_info + + async def list_files( + self, page_size: int = 10, page_token: Optional[str] = None + ) -> Dict[str, Any]: + """List uploaded files. + + Args: + page_size: Number of files to return per page + page_token: Token for pagination + + Returns: + List of files and next page token if available + """ + params = {"key": self._api_key, "pageSize": page_size} + + if page_token: + params["pageToken"] = page_token + + async with aiohttp.ClientSession() as session: + async with session.get(self._base_url, params=params) as response: + if response.status != 200: + error_text = await response.text() + logger.error(f"Error listing files: {error_text}") + raise Exception(f"Failed to list files: {response.status}") + + result = await response.json() + return result + + async def delete_file(self, name: str) -> bool: + """Delete a file. + + Args: + name: File name (or full path) + + Returns: + True if deleted successfully + """ + # Extract just the name part if a full path is provided + if "/" in name: + name = name.split("/")[-1] + + async with aiohttp.ClientSession() as session: + async with session.delete(f"{self._base_url}/{name}?key={self._api_key}") as response: + if response.status != 200: + error_text = await response.text() + logger.error(f"Error deleting file: {error_text}") + raise Exception(f"Failed to delete file: {response.status}") + + return True diff --git a/src/pipecat/services/gemini_live/gemini.py b/src/pipecat/services/gemini_live/gemini.py new file mode 100644 index 000000000..4b5f05209 --- /dev/null +++ b/src/pipecat/services/gemini_live/gemini.py @@ -0,0 +1,1582 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Google Gemini Live API service implementation. + +This module provides real-time conversational AI capabilities using Google's +Gemini Live API, supporting both text and audio modalities with +voice transcription, streaming responses, and tool usage. +""" + +import base64 +import io +import json +import random +import time +import uuid +from dataclasses import dataclass +from enum import Enum +from typing import Any, Dict, List, Optional, Union + +from loguru import logger +from PIL import Image +from pydantic import BaseModel, Field + +from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.adapters.services.gemini_adapter import GeminiLLMAdapter +from pipecat.frames.frames import ( + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, + CancelFrame, + EndFrame, + ErrorFrame, + Frame, + InputAudioRawFrame, + InputImageRawFrame, + InputTextRawFrame, + InterruptionFrame, + LLMContextFrame, + LLMFullResponseEndFrame, + LLMFullResponseStartFrame, + LLMMessagesAppendFrame, + LLMSetToolsFrame, + LLMTextFrame, + LLMUpdateSettingsFrame, + StartFrame, + TranscriptionFrame, + TTSAudioRawFrame, + TTSStartedFrame, + TTSStoppedFrame, + TTSTextFrame, + UserImageRawFrame, + UserStartedSpeakingFrame, + UserStoppedSpeakingFrame, +) +from pipecat.metrics.metrics import LLMTokenUsage +from pipecat.processors.aggregators.llm_response import ( + LLMAssistantAggregatorParams, + LLMUserAggregatorParams, +) +from pipecat.processors.aggregators.openai_llm_context import ( + OpenAILLMContext, + OpenAILLMContextFrame, +) +from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.google.frames import LLMSearchOrigin, LLMSearchResponseFrame, LLMSearchResult +from pipecat.services.llm_service import FunctionCallFromLLM, LLMService +from pipecat.services.openai.llm import ( + OpenAIAssistantContextAggregator, + OpenAIUserContextAggregator, +) +from pipecat.transcriptions.language import Language +from pipecat.utils.string import match_endofsentence +from pipecat.utils.time import time_now_iso8601 +from pipecat.utils.tracing.service_decorators import traced_gemini_live, traced_stt + +from .file_api import GeminiFileAPI + +try: + from google.genai import Client + from google.genai.live import AsyncSession + from google.genai.types import ( + AudioTranscriptionConfig, + AutomaticActivityDetection, + Blob, + Content, + ContextWindowCompressionConfig, + EndSensitivity, + FileData, + FunctionResponse, + GenerationConfig, + GroundingMetadata, + HttpOptions, + LiveConnectConfig, + LiveServerMessage, + MediaResolution, + Modality, + Part, + ProactivityConfig, + RealtimeInputConfig, + SessionResumptionConfig, + SlidingWindow, + SpeechConfig, + StartSensitivity, + ThinkingConfig, + VoiceConfig, + ) +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error("In order to use Google AI, you need to `pip install pipecat-ai[google]`.") + raise Exception(f"Missing module: {e}") + + +# Connection management constants +MAX_CONSECUTIVE_FAILURES = 3 +CONNECTION_ESTABLISHED_THRESHOLD = 10.0 # seconds + + +def language_to_gemini_language(language: Language) -> Optional[str]: + """Maps a Language enum value to a Gemini Live supported language code. + + Source: + https://ai.google.dev/api/generate-content#MediaResolution + + Args: + language: The language enum value to convert. + + Returns: + The Gemini language code string, or None if the language is not supported. + """ + language_map = { + # Arabic + Language.AR: "ar-XA", + # Bengali + Language.BN_IN: "bn-IN", + # Chinese (Mandarin) + Language.CMN: "cmn-CN", + Language.CMN_CN: "cmn-CN", + Language.ZH: "cmn-CN", # Map general Chinese to Mandarin for Gemini + Language.ZH_CN: "cmn-CN", # Map Simplified Chinese to Mandarin for Gemini + # German + Language.DE: "de-DE", + Language.DE_DE: "de-DE", + # English + Language.EN: "en-US", # Default to US English (though not explicitly listed in supported codes) + Language.EN_US: "en-US", + Language.EN_AU: "en-AU", + Language.EN_GB: "en-GB", + Language.EN_IN: "en-IN", + # Spanish + Language.ES: "es-ES", # Default to Spain Spanish + Language.ES_ES: "es-ES", + Language.ES_US: "es-US", + # French + Language.FR: "fr-FR", # Default to France French + Language.FR_FR: "fr-FR", + Language.FR_CA: "fr-CA", + # Gujarati + Language.GU: "gu-IN", + Language.GU_IN: "gu-IN", + # Hindi + Language.HI: "hi-IN", + Language.HI_IN: "hi-IN", + # Indonesian + Language.ID: "id-ID", + Language.ID_ID: "id-ID", + # Italian + Language.IT: "it-IT", + Language.IT_IT: "it-IT", + # Japanese + Language.JA: "ja-JP", + Language.JA_JP: "ja-JP", + # Kannada + Language.KN: "kn-IN", + Language.KN_IN: "kn-IN", + # Korean + Language.KO: "ko-KR", + Language.KO_KR: "ko-KR", + # Malayalam + Language.ML: "ml-IN", + Language.ML_IN: "ml-IN", + # Marathi + Language.MR: "mr-IN", + Language.MR_IN: "mr-IN", + # Dutch + Language.NL: "nl-NL", + Language.NL_NL: "nl-NL", + # Polish + Language.PL: "pl-PL", + Language.PL_PL: "pl-PL", + # Portuguese (Brazil) + Language.PT_BR: "pt-BR", + # Russian + Language.RU: "ru-RU", + Language.RU_RU: "ru-RU", + # Tamil + Language.TA: "ta-IN", + Language.TA_IN: "ta-IN", + # Telugu + Language.TE: "te-IN", + Language.TE_IN: "te-IN", + # Thai + Language.TH: "th-TH", + Language.TH_TH: "th-TH", + # Turkish + Language.TR: "tr-TR", + Language.TR_TR: "tr-TR", + # Vietnamese + Language.VI: "vi-VN", + Language.VI_VN: "vi-VN", + } + return language_map.get(language) + + +class GeminiLiveContext(OpenAILLMContext): + """Extended OpenAI context for Gemini Live API. + + Provides Gemini-specific context management including system instruction + extraction and message format conversion for the Live API. + """ + + @staticmethod + def upgrade(obj: OpenAILLMContext) -> "GeminiLiveContext": + """Upgrade an OpenAI context to Gemini context. + + Args: + obj: The OpenAI context to upgrade. + + Returns: + The upgraded Gemini context instance. + """ + if isinstance(obj, OpenAILLMContext) and not isinstance(obj, GeminiLiveContext): + logger.debug(f"Upgrading to Gemini Live Context: {obj}") + obj.__class__ = GeminiLiveContext + obj._restructure_from_openai_messages() + return obj + + def _restructure_from_openai_messages(self): + pass + + def extract_system_instructions(self): + """Extract system instructions from context messages. + + Returns: + Combined system instruction text from all system messages. + """ + system_instruction = "" + for item in self.messages: + if item.get("role") == "system": + content = item.get("content", "") + if content: + if system_instruction and not system_instruction.endswith("\n"): + system_instruction += "\n" + system_instruction += str(content) + return system_instruction + + def add_file_reference(self, file_uri: str, mime_type: str, text: Optional[str] = None): + """Add a file reference to the context. + + This adds a user message with a file reference that will be sent during context initialization. + + Args: + file_uri: URI of the uploaded file + mime_type: MIME type of the file + text: Optional text prompt to accompany the file + """ + # Create parts list with file reference + parts = [] + if text: + parts.append({"type": "text", "text": text}) + + # Add file reference part + parts.append( + {"type": "file_data", "file_data": {"mime_type": mime_type, "file_uri": file_uri}} + ) + + # Add to messages + message = {"role": "user", "content": parts} + self.messages.append(message) + logger.info(f"Added file reference to context: {file_uri}") + + def get_messages_for_initializing_history(self) -> List[Content]: + """Get messages formatted for Gemini history initialization. + + Returns: + List of messages in Gemini format for conversation history. + """ + messages: List[Content] = [] + for item in self.messages: + role = item.get("role") + + if role == "system": + continue + + elif role == "assistant": + role = "model" + + content = item.get("content") + parts: List[Part] = [] + if isinstance(content, str): + parts = [Part(text=content)] + elif isinstance(content, list): + for part in content: + if part.get("type") == "text": + parts.append(Part(text=part.get("text"))) + elif part.get("type") == "file_data": + file_data = part.get("file_data", {}) + parts.append( + Part( + file_data=FileData( + mime_type=file_data.get("mime_type"), + file_uri=file_data.get("file_uri"), + ) + ) + ) + else: + logger.warning(f"Unsupported content type: {str(part)[:80]}") + else: + logger.warning(f"Unsupported content type: {str(content)[:80]}") + messages.append(Content(role=role, parts=parts)) + return messages + + +class GeminiLiveUserContextAggregator(OpenAIUserContextAggregator): + """User context aggregator for Gemini Live. + + Extends OpenAI user aggregator to handle Gemini-specific message passing + while maintaining compatibility with the standard aggregation pipeline. + """ + + async def process_frame(self, frame, direction): + """Process incoming frames for user context aggregation. + + Args: + frame: The frame to process. + direction: The frame processing direction. + """ + await super().process_frame(frame, direction) + # kind of a hack just to pass the LLMMessagesAppendFrame through, but it's fine for now + if isinstance(frame, LLMMessagesAppendFrame): + await self.push_frame(frame, direction) + + +class GeminiLiveAssistantContextAggregator(OpenAIAssistantContextAggregator): + """Assistant context aggregator for Gemini Live. + + Handles assistant response aggregation while filtering out LLMTextFrames + to prevent duplicate context entries, as Gemini Live pushes both + LLMTextFrames and TTSTextFrames. + """ + + async def process_frame(self, frame: Frame, direction: FrameDirection): + """Process incoming frames for assistant context aggregation. + + Args: + frame: The frame to process. + direction: The frame processing direction. + """ + # The LLMAssistantContextAggregator uses TextFrames to aggregate the LLM output, + # but the GeminiLiveAssistantContextAggregator pushes LLMTextFrames and TTSTextFrames. We + # need to override this proces_frame for LLMTextFrame, so that only the TTSTextFrames + # are process. This ensures that the context gets only one set of messages. + if not isinstance(frame, LLMTextFrame): + await super().process_frame(frame, direction) + + async def handle_user_image_frame(self, frame: UserImageRawFrame): + """Handle user image frames. + + Args: + frame: The user image frame to handle. + """ + # We don't want to store any images in the context. Revisit this later + # when the API evolves. + pass + + +@dataclass +class GeminiLiveContextAggregatorPair: + """Pair of user and assistant context aggregators for Gemini Live. + + Parameters: + _user: The user context aggregator instance. + _assistant: The assistant context aggregator instance. + """ + + _user: GeminiLiveUserContextAggregator + _assistant: GeminiLiveAssistantContextAggregator + + def user(self) -> GeminiLiveUserContextAggregator: + """Get the user context aggregator. + + Returns: + The user context aggregator instance. + """ + return self._user + + def assistant(self) -> GeminiLiveAssistantContextAggregator: + """Get the assistant context aggregator. + + Returns: + The assistant context aggregator instance. + """ + return self._assistant + + +class GeminiModalities(Enum): + """Supported modalities for Gemini Live. + + Parameters: + TEXT: Text responses. + AUDIO: Audio responses. + """ + + TEXT = "TEXT" + AUDIO = "AUDIO" + + +class GeminiMediaResolution(str, Enum): + """Media resolution options for Gemini Live. + + Parameters: + UNSPECIFIED: Use default resolution setting. + LOW: Low resolution with 64 tokens. + MEDIUM: Medium resolution with 256 tokens. + HIGH: High resolution with zoomed reframing and 256 tokens. + """ + + UNSPECIFIED = "MEDIA_RESOLUTION_UNSPECIFIED" # Use default + LOW = "MEDIA_RESOLUTION_LOW" # 64 tokens + MEDIUM = "MEDIA_RESOLUTION_MEDIUM" # 256 tokens + HIGH = "MEDIA_RESOLUTION_HIGH" # Zoomed reframing with 256 tokens + + +class GeminiVADParams(BaseModel): + """Voice Activity Detection parameters for Gemini Live. + + Parameters: + disabled: Whether to disable VAD. Defaults to None. + start_sensitivity: Sensitivity for speech start detection. Defaults to None. + end_sensitivity: Sensitivity for speech end detection. Defaults to None. + prefix_padding_ms: Prefix padding in milliseconds. Defaults to None. + silence_duration_ms: Silence duration threshold in milliseconds. Defaults to None. + """ + + disabled: Optional[bool] = Field(default=None) + start_sensitivity: Optional[StartSensitivity] = Field(default=None) + end_sensitivity: Optional[EndSensitivity] = Field(default=None) + prefix_padding_ms: Optional[int] = Field(default=None) + silence_duration_ms: Optional[int] = Field(default=None) + + +class ContextWindowCompressionParams(BaseModel): + """Parameters for context window compression in Gemini Live. + + Parameters: + enabled: Whether compression is enabled. Defaults to False. + trigger_tokens: Token count to trigger compression. None uses 80% of context window. + """ + + enabled: bool = Field(default=False) + trigger_tokens: Optional[int] = Field( + default=None + ) # None = use default (80% of context window) + + +class InputParams(BaseModel): + """Input parameters for Gemini Live generation. + + Parameters: + frequency_penalty: Frequency penalty for generation (0.0-2.0). Defaults to None. + max_tokens: Maximum tokens to generate. Must be >= 1. Defaults to 4096. + presence_penalty: Presence penalty for generation (0.0-2.0). Defaults to None. + temperature: Sampling temperature (0.0-2.0). Defaults to None. + top_k: Top-k sampling parameter. Must be >= 0. Defaults to None. + top_p: Top-p sampling parameter (0.0-1.0). Defaults to None. + modalities: Response modalities. Defaults to AUDIO. + language: Language for generation. Defaults to EN_US. + media_resolution: Media resolution setting. Defaults to UNSPECIFIED. + vad: Voice activity detection parameters. Defaults to None. + context_window_compression: Context compression settings. Defaults to None. + thinking: Thinking settings. Defaults to None. + Note that these settings may require specifying a model that + supports them, e.g. "gemini-2.5-flash-native-audio-preview-09-2025". + enable_affective_dialog: Enable affective dialog, which allows Gemini + to adapt to expression and tone. Defaults to None. + Note that these settings may require specifying a model that + supports them, e.g. "gemini-2.5-flash-native-audio-preview-09-2025". + Also note that this setting may require specifying an API version that + supports it, e.g. HttpOptions(api_version="v1alpha"). + proactivity: Proactivity settings, which allows Gemini to proactively + decide how to behave, such as whether to avoid responding to + content that is not relevant. Defaults to None. + Note that these settings may require specifying a model that + supports them, e.g. "gemini-2.5-flash-native-audio-preview-09-2025". + Also note that this setting may require specifying an API version that + supports it, e.g. HttpOptions(api_version="v1alpha"). + extra: Additional parameters. Defaults to empty dict. + """ + + frequency_penalty: Optional[float] = Field(default=None, ge=0.0, le=2.0) + max_tokens: Optional[int] = Field(default=4096, ge=1) + presence_penalty: Optional[float] = Field(default=None, ge=0.0, le=2.0) + temperature: Optional[float] = Field(default=None, ge=0.0, le=2.0) + top_k: Optional[int] = Field(default=None, ge=0) + top_p: Optional[float] = Field(default=None, ge=0.0, le=1.0) + modalities: Optional[GeminiModalities] = Field(default=GeminiModalities.AUDIO) + language: Optional[Language] = Field(default=Language.EN_US) + media_resolution: Optional[GeminiMediaResolution] = Field( + default=GeminiMediaResolution.UNSPECIFIED + ) + vad: Optional[GeminiVADParams] = Field(default=None) + context_window_compression: Optional[ContextWindowCompressionParams] = Field(default=None) + thinking: Optional[ThinkingConfig] = Field(default=None) + enable_affective_dialog: Optional[bool] = Field(default=None) + proactivity: Optional[ProactivityConfig] = Field(default=None) + extra: Optional[Dict[str, Any]] = Field(default_factory=dict) + + +class GeminiLiveLLMService(LLMService): + """Provides access to Google's Gemini Live API. + + This service enables real-time conversations with Gemini, supporting both + text and audio modalities. It handles voice transcription, streaming audio + responses, and tool usage. + """ + + # Overriding the default adapter to use the Gemini one. + adapter_class = GeminiLLMAdapter + + def __init__( + self, + *, + api_key: str, + base_url: Optional[str] = None, + model="models/gemini-2.0-flash-live-001", + voice_id: str = "Charon", + start_audio_paused: bool = False, + start_video_paused: bool = False, + system_instruction: Optional[str] = None, + tools: Optional[Union[List[dict], ToolsSchema]] = None, + params: Optional[InputParams] = None, + inference_on_context_initialization: bool = True, + file_api_base_url: str = "https://generativelanguage.googleapis.com/v1beta/files", + http_options: Optional[HttpOptions] = None, + **kwargs, + ): + """Initialize the Gemini Live LLM service. + + Args: + api_key: Google AI API key for authentication. + base_url: API endpoint base URL. Defaults to the official Gemini Live endpoint. + + .. deprecated:: 0.0.90 + This parameter is deprecated and no longer has any effect. + Please use `http_options` to customize requests made by the + API client. + + model: Model identifier to use. Defaults to "models/gemini-2.0-flash-live-001". + voice_id: TTS voice identifier. Defaults to "Charon". + start_audio_paused: Whether to start with audio input paused. Defaults to False. + start_video_paused: Whether to start with video input paused. Defaults to False. + system_instruction: System prompt for the model. Defaults to None. + tools: Tools/functions available to the model. Defaults to None. + params: Configuration parameters for the model. Defaults to InputParams(). + inference_on_context_initialization: Whether to generate a response when context + is first set. Defaults to True. + file_api_base_url: Base URL for the Gemini File API. Defaults to the official endpoint. + http_options: HTTP options for the client. + **kwargs: Additional arguments passed to parent LLMService. + """ + # Check for deprecated parameter usage + if base_url is not None: + import warnings + + with warnings.catch_warnings(): + warnings.simplefilter("always") + warnings.warn( + "Parameter 'base_url' is deprecated and no longer has any effect. Please use 'http_options' to customize requests made by the API client.", + DeprecationWarning, + stacklevel=2, + ) + + super().__init__(base_url=base_url, **kwargs) + + params = params or InputParams() + + self._last_sent_time = 0 + self._base_url = base_url + self.set_model_name(model) + self._voice_id = voice_id + self._language_code = params.language + + self._system_instruction = system_instruction + self._tools = tools + self._inference_on_context_initialization = inference_on_context_initialization + self._needs_turn_complete_message = False + + self._audio_input_paused = start_audio_paused + self._video_input_paused = start_video_paused + self._context = None + self._api_key = api_key + self._http_options = http_options + self._session: AsyncSession = None + self._connection_task = None + + self._disconnecting = False + self._run_llm_when_session_ready = False + + self._user_is_speaking = False + self._bot_is_speaking = False + self._user_audio_buffer = bytearray() + self._user_transcription_buffer = "" + self._last_transcription_sent = "" + self._bot_audio_buffer = bytearray() + self._bot_text_buffer = "" + self._llm_output_buffer = "" + + self._sample_rate = 24000 + + self._language = params.language + self._language_code = ( + language_to_gemini_language(params.language) if params.language else "en-US" + ) + self._vad_params = params.vad + + # Reconnection tracking + self._consecutive_failures = 0 + self._connection_start_time = None + + self._settings = { + "frequency_penalty": params.frequency_penalty, + "max_tokens": params.max_tokens, + "presence_penalty": params.presence_penalty, + "temperature": params.temperature, + "top_k": params.top_k, + "top_p": params.top_p, + "modalities": params.modalities, + "language": self._language_code, + "media_resolution": params.media_resolution, + "vad": params.vad, + "context_window_compression": params.context_window_compression.model_dump() + if params.context_window_compression + else {}, + "thinking": params.thinking or {}, + "enable_affective_dialog": params.enable_affective_dialog or False, + "proactivity": params.proactivity or {}, + "extra": params.extra if isinstance(params.extra, dict) else {}, + } + + self._file_api_base_url = file_api_base_url + self._file_api: Optional[GeminiFileAPI] = None + + # Grounding metadata tracking + self._search_result_buffer = "" + self._accumulated_grounding_metadata = None + + # Session resumption + self._session_resumption_handle: Optional[str] = None + + # Bookkeeping for ending gracefully (i.e. after the bot is finished) + self._end_frame_pending_bot_turn_finished: Optional[EndFrame] = None + + # Initialize the API client. Subclasses can override this if needed. + self.create_client() + + def create_client(self): + """Create the Gemini API client instance. Subclasses can override this.""" + self._client = Client(api_key=self._api_key, http_options=self._http_options) + + @property + def file_api(self) -> GeminiFileAPI: + """Get the Gemini File API client instance. Subclasses can override this. + + Returns: + The Gemini File API client. + """ + if not self._file_api: + self._file_api = GeminiFileAPI(api_key=self._api_key, base_url=self._file_api_base_url) + return self._file_api + + def can_generate_metrics(self) -> bool: + """Check if the service can generate usage metrics. + + Returns: + True as Gemini Live supports token usage metrics. + """ + return True + + def needs_mcp_alternate_schema(self) -> bool: + """Check if this LLM service requires alternate MCP schema. + + Google/Gemini has stricter JSON schema validation and requires + certain properties to be removed or modified for compatibility. + + Returns: + True for Google/Gemini services. + """ + return True + + def set_audio_input_paused(self, paused: bool): + """Set the audio input pause state. + + Args: + paused: Whether to pause audio input. + """ + self._audio_input_paused = paused + + def set_video_input_paused(self, paused: bool): + """Set the video input pause state. + + Args: + paused: Whether to pause video input. + """ + self._video_input_paused = paused + + def set_model_modalities(self, modalities: GeminiModalities): + """Set the model response modalities. + + Args: + modalities: The modalities to use for responses. + """ + self._settings["modalities"] = modalities + + def set_language(self, language: Language): + """Set the language for generation. + + Args: + language: The language to use for generation. + """ + self._language = language + self._language_code = language_to_gemini_language(language) or "en-US" + self._settings["language"] = self._language_code + logger.info(f"Set Gemini language to: {self._language_code}") + + async def set_context(self, context: OpenAILLMContext): + """Set the context explicitly from outside the pipeline. + + This is useful when initializing a conversation because in server-side VAD mode we might not have a + way to trigger the pipeline. This sends the history to the server. The `inference_on_context_initialization` + flag controls whether to set the turnComplete flag when we do this. Without that flag, the model will + not respond. This is often what we want when setting the context at the beginning of a conversation. + + Args: + context: The OpenAI LLM context to set. + """ + if self._context: + logger.error("Context already set. Can only set up Gemini Live context once.") + return + self._context = GeminiLiveContext.upgrade(context) + await self._create_initial_response() + + # + # standard AIService frame handling + # + + async def start(self, frame: StartFrame): + """Start the service and establish connection. + + Args: + frame: The start frame. + """ + await super().start(frame) + await self._connect() + + async def stop(self, frame: EndFrame): + """Stop the service and close connections. + + Args: + frame: The end frame. + """ + await super().stop(frame) + await self._disconnect() + + async def cancel(self, frame: CancelFrame): + """Cancel the service and close connections. + + Args: + frame: The cancel frame. + """ + await super().cancel(frame) + await self._disconnect() + + # + # speech and interruption handling + # + + async def _handle_interruption(self): + await self._set_bot_is_speaking(False) + await self.push_frame(TTSStoppedFrame()) + await self.push_frame(LLMFullResponseEndFrame()) + + async def _handle_user_started_speaking(self, frame): + self._user_is_speaking = True + pass + + async def _handle_user_stopped_speaking(self, frame): + self._user_is_speaking = False + self._user_audio_buffer = bytearray() + await self.start_ttfb_metrics() + if self._needs_turn_complete_message: + self._needs_turn_complete_message = False + # NOTE: without this, the model ignores the context it's been + # seeded with before the user started speaking + await self._session.send_client_content(turn_complete=True) + + # + # frame processing + # + # StartFrame, StopFrame, CancelFrame implemented in base class + # + + async def process_frame(self, frame: Frame, direction: FrameDirection): + """Process incoming frames for the Gemini Live service. + + Args: + frame: The frame to process. + direction: The frame processing direction. + """ + # Defer EndFrame handling until after the bot turn is finished + if isinstance(frame, EndFrame): + if self._bot_is_speaking: + logger.debug("Deferring handling EndFrame until bot turn is finished") + self._end_frame_pending_bot_turn_finished = frame + return + + await super().process_frame(frame, direction) + + if isinstance(frame, TranscriptionFrame): + await self.push_frame(frame, direction) + elif isinstance(frame, OpenAILLMContextFrame): + context: GeminiLiveContext = GeminiLiveContext.upgrade(frame.context) + # For now, we'll only trigger inference here when either: + # 1. We have not seen a context frame before + # 2. The last message is a tool call result + if not self._context: + self._context = context + if frame.context.tools: + self._tools = frame.context.tools + await self._create_initial_response() + elif context.messages and context.messages[-1].get("role") == "tool": + # Support just one tool call per context frame for now + tool_result_message = context.messages[-1] + await self._tool_result(tool_result_message) + elif isinstance(frame, LLMContextFrame): + raise NotImplementedError("Universal LLMContext is not yet supported for Gemini Live.") + elif isinstance(frame, InputTextRawFrame): + await self._send_user_text(frame.text) + await self.push_frame(frame, direction) + elif isinstance(frame, InputAudioRawFrame): + await self._send_user_audio(frame) + await self.push_frame(frame, direction) + elif isinstance(frame, InputImageRawFrame): + await self._send_user_video(frame) + await self.push_frame(frame, direction) + elif isinstance(frame, InterruptionFrame): + await self._handle_interruption() + await self.push_frame(frame, direction) + elif isinstance(frame, UserStartedSpeakingFrame): + await self._handle_user_started_speaking(frame) + await self.push_frame(frame, direction) + elif isinstance(frame, UserStoppedSpeakingFrame): + await self._handle_user_stopped_speaking(frame) + await self.push_frame(frame, direction) + elif isinstance(frame, BotStartedSpeakingFrame): + # Ignore this frame. Use the serverContent API message instead + await self.push_frame(frame, direction) + elif isinstance(frame, BotStoppedSpeakingFrame): + # ignore this frame. Use the serverContent.turnComplete API message + await self.push_frame(frame, direction) + elif isinstance(frame, LLMMessagesAppendFrame): + # NOTE: handling LLMMessagesAppendFrame here in the LLMService is + # unusual - typically this would be handled in the user context + # aggregator. Leaving this handling here so that user code that + # uses this frame *without* a user context aggregator still works + # (we have an example that does just that, actually). + await self._create_single_response(frame.messages) + elif isinstance(frame, LLMUpdateSettingsFrame): + await self._update_settings(frame.settings) + elif isinstance(frame, LLMSetToolsFrame): + await self._update_settings() + else: + await self.push_frame(frame, direction) + + async def _set_bot_is_speaking(self, speaking: bool): + if self._bot_is_speaking == speaking: + return + + self._bot_is_speaking = speaking + + if not self._bot_is_speaking and self._end_frame_pending_bot_turn_finished: + await self.queue_frame(self._end_frame_pending_bot_turn_finished) + self._end_frame_pending_bot_turn_finished = None + + async def _connect(self, session_resumption_handle: Optional[str] = None): + """Establish client connection to Gemini Live API.""" + if self._session: + # Here we assume that if we have a client, we are connected. We + # handle disconnections in the send/recv code paths. + return + + if session_resumption_handle: + logger.info( + f"Connecting to Gemini service with session_resumption_handle: {session_resumption_handle}" + ) + else: + logger.info("Connecting to Gemini service") + try: + # Assemble basic configuration + config = LiveConnectConfig( + generation_config=GenerationConfig( + frequency_penalty=self._settings["frequency_penalty"], + max_output_tokens=self._settings["max_tokens"], + presence_penalty=self._settings["presence_penalty"], + temperature=self._settings["temperature"], + top_k=self._settings["top_k"], + top_p=self._settings["top_p"], + response_modalities=[Modality(self._settings["modalities"].value)], + speech_config=SpeechConfig( + voice_config=VoiceConfig( + prebuilt_voice_config={"voice_name": self._voice_id} + ), + language_code=self._settings["language"], + ), + media_resolution=MediaResolution(self._settings["media_resolution"].value), + ), + input_audio_transcription=AudioTranscriptionConfig(), + output_audio_transcription=AudioTranscriptionConfig(), + session_resumption=SessionResumptionConfig(handle=session_resumption_handle), + ) + + # Add context window compression to configuration, if enabled + if self._settings.get("context_window_compression", {}).get("enabled", False): + compression_config = ContextWindowCompressionConfig() + + # Add sliding window (always true if compression is enabled) + compression_config.sliding_window = SlidingWindow() + + # Add trigger_tokens if specified + trigger_tokens = self._settings.get("context_window_compression", {}).get( + "trigger_tokens" + ) + if trigger_tokens is not None: + compression_config.trigger_tokens = trigger_tokens + + config.context_window_compression = compression_config + + # Add thinking configuration to configuration, if provided + if self._settings.get("thinking"): + config.thinking_config = self._settings["thinking"] + + # Add affective dialog setting, if provided + if self._settings.get("enable_affective_dialog", False): + config.enable_affective_dialog = self._settings["enable_affective_dialog"] + + # Add proactivity configuration to configuration, if provided + if self._settings.get("proactivity"): + config.proactivity = self._settings["proactivity"] + + # Add VAD configuration to configuration, if provided + if self._settings.get("vad"): + vad_config = AutomaticActivityDetection() + vad_params = self._settings["vad"] + has_vad_settings = False + + # Only add parameters that are explicitly set + if vad_params.disabled is not None: + vad_config.disabled = vad_params.disabled + has_vad_settings = True + + if vad_params.start_sensitivity: + vad_config.start_of_speech_sensitivity = vad_params.start_sensitivity + has_vad_settings = True + + if vad_params.end_sensitivity: + vad_config.end_of_speech_sensitivity = vad_params.end_sensitivity + has_vad_settings = True + + if vad_params.prefix_padding_ms is not None: + vad_config.prefix_padding_ms = vad_params.prefix_padding_ms + has_vad_settings = True + + if vad_params.silence_duration_ms is not None: + vad_config.silence_duration_ms = vad_params.silence_duration_ms + has_vad_settings = True + + # Only add automatic_activity_detection if we have VAD settings + if has_vad_settings: + config.realtime_input_config = RealtimeInputConfig( + automatic_activity_detection=vad_config + ) + + # Add system instruction to configuration, if provided + system_instruction = self._system_instruction or "" + if self._context and hasattr(self._context, "extract_system_instructions"): + system_instruction += "\n" + self._context.extract_system_instructions() + if system_instruction: + logger.debug(f"Setting system instruction: {system_instruction}") + config.system_instruction = system_instruction + + # Add tools to configuration, if provided + if self._tools: + logger.debug(f"Setting tools: {self._tools}") + config.tools = self.get_llm_adapter().from_standard_tools(self._tools) + + # Start the connection + self._connection_task = self.create_task(self._connection_task_handler(config=config)) + + except Exception as e: + await self.push_error(ErrorFrame(error=f"{self} Initialization error: {e}", fatal=True)) + + async def _connection_task_handler(self, config: LiveConnectConfig): + async with self._client.aio.live.connect(model=self._model_name, config=config) as session: + logger.info("Connected to Gemini service") + + # Mark connection start time + self._connection_start_time = time.time() + + await self._handle_session_ready(session) + + while True: + try: + turn = self._session.receive() + async for message in turn: + # Reset failure counter if connection has been stable + self._check_and_reset_failure_counter() + + if message.server_content and message.server_content.model_turn: + await self._handle_msg_model_turn(message) + elif ( + message.server_content + and message.server_content.turn_complete + and message.usage_metadata + ): + await self._handle_msg_turn_complete(message) + await self._handle_msg_usage_metadata(message) + elif message.server_content and message.server_content.input_transcription: + await self._handle_msg_input_transcription(message) + elif message.server_content and message.server_content.output_transcription: + await self._handle_msg_output_transcription(message) + elif message.server_content and message.server_content.grounding_metadata: + await self._handle_msg_grounding_metadata(message) + elif message.tool_call: + await self._handle_msg_tool_call(message) + elif message.session_resumption_update: + self._handle_msg_resumption_update(message) + except Exception as e: + if not self._disconnecting: + should_reconnect = await self._handle_connection_error(e) + if should_reconnect: + await self._reconnect() + return # Exit this connection handler, _reconnect will start a new one + break + + def _check_and_reset_failure_counter(self): + """Check if connection has been stable long enough to reset the failure counter. + + If the connection has been active for longer than the established threshold + and there are accumulated failures, reset the counter to 0. + """ + if ( + self._connection_start_time + and self._consecutive_failures > 0 + and time.time() - self._connection_start_time >= CONNECTION_ESTABLISHED_THRESHOLD + ): + logger.info( + f"Connection stable for {CONNECTION_ESTABLISHED_THRESHOLD}s, " + f"resetting failure counter from {self._consecutive_failures} to 0" + ) + self._consecutive_failures = 0 + + async def _handle_connection_error(self, error: Exception) -> bool: + """Handle a connection error and determine if reconnection should be attempted. + + Args: + error: The exception that caused the connection error. + + Returns: + True if reconnection should be attempted, False if a fatal error should be pushed. + """ + self._consecutive_failures += 1 + logger.warning( + f"Connection error (failure {self._consecutive_failures}/{MAX_CONSECUTIVE_FAILURES}): {error}" + ) + + if self._consecutive_failures >= MAX_CONSECUTIVE_FAILURES: + logger.error( + f"Max consecutive failures ({MAX_CONSECUTIVE_FAILURES}) reached, " + "treating as fatal error" + ) + await self.push_error( + ErrorFrame(error=f"{self} Error in receive loop: {error}", fatal=True) + ) + return False + else: + logger.info( + f"Attempting reconnection ({self._consecutive_failures}/{MAX_CONSECUTIVE_FAILURES})" + ) + return True + + async def _reconnect(self): + """Reconnect to Gemini Live API.""" + await self._disconnect() + await self._connect(session_resumption_handle=self._session_resumption_handle) + + async def _disconnect(self): + """Disconnect from Gemini Live API and clean up resources.""" + logger.info("Disconnecting from Gemini service") + try: + self._disconnecting = True + await self.stop_all_metrics() + if self._connection_task: + await self.cancel_task(self._connection_task, timeout=1.0) + self._connection_task = None + if self._session: + await self._session.close() + self._session = None + self._disconnecting = False + except Exception as e: + logger.error(f"{self} error disconnecting: {e}") + + async def _send_user_audio(self, frame): + """Send user audio frame to Gemini Live API.""" + if self._audio_input_paused or self._disconnecting or not self._session: + return + + # Send all audio to Gemini + try: + await self._session.send_realtime_input( + audio=Blob(data=frame.audio, mime_type=f"audio/pcm;rate={frame.sample_rate}") + ) + except Exception as e: + await self._handle_send_error(e) + + # Manage a buffer of audio to use for transcription + audio = frame.audio + if self._user_is_speaking: + self._user_audio_buffer.extend(audio) + else: + # Keep 1/2 second of audio in the buffer even when not speaking. + self._user_audio_buffer.extend(audio) + length = int((frame.sample_rate * frame.num_channels * 2) * 0.5) + self._user_audio_buffer = self._user_audio_buffer[-length:] + + async def _send_user_text(self, text: str): + """Send user text via Gemini Live API's realtime input stream. + + This method sends text through the realtimeInput stream (via TextInputMessage) + rather than the clientContent stream. This ensures text input is synchronized + with audio and video inputs, preventing temporal misalignment that can occur + when different modalities are processed through separate API pathways. + + For realtimeInput, turn completion is automatically inferred by the API based + on user activity, so no explicit turnComplete signal is needed. + + Args: + text: The text to send as user input. + """ + if self._disconnecting or not self._session: + return + + try: + await self._session.send_realtime_input(text=text) + except Exception as e: + await self._handle_send_error(e) + + async def _send_user_video(self, frame): + """Send user video frame to Gemini Live API.""" + if self._video_input_paused or self._disconnecting or not self._session: + return + + now = time.time() + if now - self._last_sent_time < 1: + return # Ignore if less than 1 second has passed + + self._last_sent_time = now # Update last sent time + logger.debug(f"Sending video frame to Gemini: {frame}") + + buffer = io.BytesIO() + Image.frombytes(frame.format, frame.size, frame.image).save(buffer, format="JPEG") + data = base64.b64encode(buffer.getvalue()).decode("utf-8") + + try: + await self._session.send_realtime_input(video=Blob(data=data, mime_type="image/jpeg")) + except Exception as e: + await self._handle_send_error(e) + + async def _create_initial_response(self): + """Create initial response based on context history.""" + if self._disconnecting: + return + + if not self._session: + self._run_llm_when_session_ready = True + return + + messages = self._context.get_messages_for_initializing_history() + if not messages: + return + + logger.debug(f"Creating initial response: {messages}") + + await self.start_ttfb_metrics() + + try: + await self._session.send_client_content( + turns=messages, turn_complete=self._inference_on_context_initialization + ) + except Exception as e: + await self._handle_send_error(e) + + # If we're generating a response right away upon initializing + # conversation history, set a flag saying that we need a turn complete + # message when the user stops speaking. + if not self._inference_on_context_initialization: + self._needs_turn_complete_message = True + + async def _create_single_response(self, messages_list): + """Create a single response from a list of messages.""" + if self._disconnecting or not self._session: + return + + # Create a throwaway context just for the purpose of getting messages + # in the right format + context = GeminiLiveContext.upgrade(OpenAILLMContext(messages=messages_list)) + messages = context.get_messages_for_initializing_history() + + if not messages: + return + + logger.debug(f"Creating response: {messages}") + + await self.start_ttfb_metrics() + + try: + await self._session.send_client_content(turns=messages, turn_complete=True) + except Exception as e: + await self._handle_send_error(e) + + @traced_gemini_live(operation="llm_tool_result") + async def _tool_result(self, tool_result_message): + """Send tool result back to the API.""" + if self._disconnecting or not self._session: + return + + # For now we're shoving the name into the tool_call_id field, so this + # will work until we revisit that. + id = tool_result_message.get("tool_call_id") + name = tool_result_message.get("tool_call_name") + result = json.loads(tool_result_message.get("content") or "") + response = FunctionResponse(name=name, id=id, response=result) + + try: + await self._session.send_tool_response(function_responses=response) + except Exception as e: + await self._handle_send_error(e) + + @traced_gemini_live(operation="llm_setup") + async def _handle_session_ready(self, session: AsyncSession): + """Handle the session being ready.""" + self._session = session + # If we were just waititng for the session to be ready to run the LLM, + # do that now. + if self._run_llm_when_session_ready: + self._run_llm_when_session_ready = False + await self._create_initial_response() + + async def _handle_msg_model_turn(self, msg: LiveServerMessage): + """Handle the model turn message.""" + part = msg.server_content.model_turn.parts[0] + if not part: + return + + await self.stop_ttfb_metrics() + + # part.text is added when `modalities` is set to TEXT; otherwise, it's None + text = part.text + if text: + if not self._bot_text_buffer: + await self.push_frame(LLMFullResponseStartFrame()) + + self._bot_text_buffer += text + self._search_result_buffer += text # Also accumulate for grounding + await self.push_frame(LLMTextFrame(text=text)) + + # Check for grounding metadata in server content + if msg.server_content and msg.server_content.grounding_metadata: + self._accumulated_grounding_metadata = msg.server_content.grounding_metadata + + inline_data = part.inline_data + if not inline_data: + return + + # Check if mime type matches expected format + expected_mime_type = f"audio/pcm;rate={self._sample_rate}" + if inline_data.mime_type == expected_mime_type: + # Perfect match, continue processing + pass + elif inline_data.mime_type == "audio/pcm": + # Sample rate not provided in mime type, assume default + if not hasattr(self, "_sample_rate_warning_logged"): + logger.warning( + f"Sample rate not provided in mime type '{inline_data.mime_type}', assuming rate of {self._sample_rate}" + ) + self._sample_rate_warning_logged = True + else: + # Unrecognized format + logger.warning(f"Unrecognized server_content format {inline_data.mime_type}") + return + + audio = inline_data.data + if not audio: + return + + if not self._bot_is_speaking: + await self._set_bot_is_speaking(True) + await self.push_frame(TTSStartedFrame()) + await self.push_frame(LLMFullResponseStartFrame()) + + self._bot_audio_buffer.extend(audio) + frame = TTSAudioRawFrame( + audio=audio, + sample_rate=self._sample_rate, + num_channels=1, + ) + await self.push_frame(frame) + + @traced_gemini_live(operation="llm_tool_call") + async def _handle_msg_tool_call(self, message: LiveServerMessage): + """Handle tool call messages.""" + function_calls = message.tool_call.function_calls + if not function_calls: + return + if not self._context: + logger.error("Function calls are not supported without a context object.") + + function_calls_llm = [ + FunctionCallFromLLM( + context=self._context, + tool_call_id=( + # NOTE: when using Vertex AI we don't get server-provided + # tool call IDs here + f.id or str(uuid.uuid4()) + ), + function_name=f.name, + arguments=f.args, + ) + for f in function_calls + ] + + await self.run_function_calls(function_calls_llm) + + @traced_gemini_live(operation="llm_response") + async def _handle_msg_turn_complete(self, message: LiveServerMessage): + """Handle the turn complete message.""" + await self._set_bot_is_speaking(False) + text = self._bot_text_buffer + + # Trace the complete LLM response (this will be handled by the decorator) + # The decorator will extract the output text and usage metadata from the message + + self._bot_text_buffer = "" + self._llm_output_buffer = "" + + # Process grounding metadata if we have accumulated any + if self._accumulated_grounding_metadata: + await self._process_grounding_metadata( + self._accumulated_grounding_metadata, self._search_result_buffer + ) + + # Reset grounding tracking for next response + self._search_result_buffer = "" + self._accumulated_grounding_metadata = None + + # Only push the TTSStoppedFrame if the bot is outputting audio + # when text is found, modalities is set to TEXT and no audio + # is produced. + if not text: + await self.push_frame(TTSStoppedFrame()) + + await self.push_frame(LLMFullResponseEndFrame()) + + @traced_stt + async def _handle_user_transcription( + self, transcript: str, is_final: bool, language: Optional[Language] = None + ): + """Handle a transcription result with tracing.""" + pass + + async def _handle_msg_input_transcription(self, message: LiveServerMessage): + """Handle the input transcription message. + + Gemini Live sends user transcriptions in either single words or multi-word + phrases. As a result, we have to aggregate the input transcription. This handler + aggregates into sentences, splitting on the end of sentence markers. + """ + if not message.server_content.input_transcription: + return + + text = message.server_content.input_transcription.text + + if not text: + return + + # Strip leading space from sentence starts if buffer is empty + if text.startswith(" ") and not self._user_transcription_buffer: + text = text.lstrip() + + # Accumulate text in the buffer + self._user_transcription_buffer += text + + # Check for complete sentences + while True: + eos_end_marker = match_endofsentence(self._user_transcription_buffer) + if not eos_end_marker: + break + + # Extract the complete sentence + complete_sentence = self._user_transcription_buffer[:eos_end_marker] + # Keep the remainder for the next chunk + self._user_transcription_buffer = self._user_transcription_buffer[eos_end_marker:] + + # Send a TranscriptionFrame with the complete sentence + logger.debug(f"[Transcription:user] [{complete_sentence}]") + await self._handle_user_transcription( + complete_sentence, True, self._settings["language"] + ) + await self.push_frame( + TranscriptionFrame( + text=complete_sentence, + user_id="", + timestamp=time_now_iso8601(), + result=message, + ), + FrameDirection.UPSTREAM, + ) + + async def _handle_msg_output_transcription(self, message: LiveServerMessage): + """Handle the output transcription message.""" + if not message.server_content.output_transcription: + return + + # This is the output transcription text when modalities is set to AUDIO. + # In this case, we push LLMTextFrame and TTSTextFrame to be handled by the + # downstream assistant context aggregator. + text = message.server_content.output_transcription.text + + if not text: + return + + # Accumulate text for grounding as well + self._search_result_buffer += text + + # Check for grounding metadata in server content + if message.server_content and message.server_content.grounding_metadata: + self._accumulated_grounding_metadata = message.server_content.grounding_metadata + # Collect text for tracing + self._llm_output_buffer += text + + await self.push_frame(LLMTextFrame(text=text)) + await self.push_frame(TTSTextFrame(text=text)) + + async def _handle_msg_grounding_metadata(self, message: LiveServerMessage): + """Handle dedicated grounding metadata messages.""" + if message.server_content and message.server_content.grounding_metadata: + grounding_metadata = message.server_content.grounding_metadata + # Process the grounding metadata immediately + await self._process_grounding_metadata(grounding_metadata, self._search_result_buffer) + + async def _process_grounding_metadata( + self, grounding_metadata: GroundingMetadata, search_result: str = "" + ): + """Process grounding metadata and emit LLMSearchResponseFrame.""" + if not grounding_metadata: + return + + # Extract rendered content for search suggestions + rendered_content = None + if ( + grounding_metadata.search_entry_point + and grounding_metadata.search_entry_point.rendered_content + ): + rendered_content = grounding_metadata.search_entry_point.rendered_content + + # Convert grounding chunks and supports to LLMSearchOrigin format + origins = [] + + if grounding_metadata.grounding_chunks and grounding_metadata.grounding_supports: + # Create a mapping of chunk indices to origins + chunk_to_origin: Dict[int, LLMSearchOrigin] = {} + + for index, chunk in enumerate(grounding_metadata.grounding_chunks): + if chunk.web: + origin = LLMSearchOrigin( + site_uri=chunk.web.uri, site_title=chunk.web.title, results=[] + ) + chunk_to_origin[index] = origin + origins.append(origin) + + # Add grounding support results to the appropriate origins + for support in grounding_metadata.grounding_supports: + if support.segment and support.grounding_chunk_indices: + text = support.segment.text or "" + confidence_scores = support.confidence_scores or [] + + # Add this result to all origins referenced by this support + for chunk_index in support.grounding_chunk_indices: + if chunk_index in chunk_to_origin: + result = LLMSearchResult(text=text, confidence=confidence_scores) + chunk_to_origin[chunk_index].results.append(result) + + # Create and push the search response frame + search_frame = LLMSearchResponseFrame( + search_result=search_result, origins=origins, rendered_content=rendered_content + ) + + await self.push_frame(search_frame) + + async def _handle_msg_usage_metadata(self, message: LiveServerMessage): + """Handle the usage metadata message.""" + if not message.usage_metadata: + return + + usage = message.usage_metadata + + # Ensure we have valid integers for all token counts + prompt_tokens = usage.prompt_token_count or 0 + completion_tokens = usage.response_token_count or 0 + total_tokens = usage.total_token_count or (prompt_tokens + completion_tokens) + + tokens = LLMTokenUsage( + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + total_tokens=total_tokens, + ) + + await self.start_llm_usage_metrics(tokens) + + def _handle_msg_resumption_update(self, message: LiveServerMessage): + update = message.session_resumption_update + if update.resumable and update.new_handle: + self._session_resumption_handle = update.new_handle + + async def _handle_send_error(self, error: Exception): + # In server-to-server contexts, a WebSocket error should be quite rare. + # Given how hard it is to recover from a send-side error with proper + # state management, and that exponential backoff for retries can have + # cost/stability implications for a service cluster, let's just treat a + # send-side error as fatal. + if not self._disconnecting: + await self.push_error(ErrorFrame(error=f"{self} Send error: {error}", fatal=True)) + + def create_context_aggregator( + self, + context: OpenAILLMContext, + *, + user_params: LLMUserAggregatorParams = LLMUserAggregatorParams(), + assistant_params: LLMAssistantAggregatorParams = LLMAssistantAggregatorParams(), + ) -> GeminiLiveContextAggregatorPair: + """Create an instance of GeminiLiveContextAggregatorPair from an OpenAILLMContext. + + Constructor keyword arguments for both the user and assistant aggregators can be provided. + + Args: + context: The LLM context to use. + user_params: User aggregator parameters. Defaults to LLMUserAggregatorParams(). + assistant_params: Assistant aggregator parameters. Defaults to LLMAssistantAggregatorParams(). + + Returns: + GeminiLiveContextAggregatorPair: A pair of context + aggregators, one for the user and one for the assistant, + encapsulated in an GeminiLiveContextAggregatorPair. + """ + context.set_llm_adapter(self.get_llm_adapter()) + + GeminiLiveContext.upgrade(context) + user = GeminiLiveUserContextAggregator(context, params=user_params) + + assistant_params.expect_stripped_words = False + assistant = GeminiLiveAssistantContextAggregator(context, params=assistant_params) + return GeminiLiveContextAggregatorPair(_user=user, _assistant=assistant) diff --git a/src/pipecat/services/gemini_multimodal_live/vertex.py b/src/pipecat/services/gemini_live/vertex.py similarity index 89% rename from src/pipecat/services/gemini_multimodal_live/vertex.py rename to src/pipecat/services/gemini_live/vertex.py index e07eb474f..c03f589b8 100644 --- a/src/pipecat/services/gemini_multimodal_live/vertex.py +++ b/src/pipecat/services/gemini_live/vertex.py @@ -4,9 +4,9 @@ # SPDX-License-Identifier: BSD 2-Clause License # -"""Google Vertex AI Gemini Multimodal Live service. +"""Service for accessing Gemini Live via Google Vertex AI. -This module provides integration with Google's Gemini Multimodal Live model via +This module provides integration with Google's Gemini Live model via Vertex AI, supporting both text and audio modalities with voice transcription, streaming responses, and tool usage. """ @@ -18,8 +18,8 @@ from typing import List, Optional, Union from loguru import logger from pipecat.adapters.schemas.tools_schema import ToolsSchema -from pipecat.services.gemini_multimodal_live.gemini import ( - GeminiMultimodalLiveLLMService, +from pipecat.services.gemini_live.gemini import ( + GeminiLiveLLMService, HttpOptions, InputParams, ) @@ -39,12 +39,12 @@ except ModuleNotFoundError as e: raise Exception(f"Missing module: {e}") -class GeminiVertexMultimodalLiveLLMService(GeminiMultimodalLiveLLMService): - """Google Vertex AI Gemini Multimodal Live service. +class GeminiLiveVertexLLMService(GeminiLiveLLMService): + """Provides access to Google's Gemini Live model via Vertex AI. - Provides access to Google's Gemini Multimodal Live model via Vertex AI, - supporting both text and audio modalities. It handles voice transcription, - streaming audio responses, and tool usage. + This service enables real-time conversations with Gemini, supporting both + text and audio modalities. It handles voice transcription, streaming audio + responses, and tool usage. """ def __init__( @@ -66,7 +66,7 @@ class GeminiVertexMultimodalLiveLLMService(GeminiMultimodalLiveLLMService): http_options: Optional[HttpOptions] = None, **kwargs, ): - """Initialize the Google Vertex AI Gemini Multimodal Live service. + """Initialize the service for accessing Gemini Live via Google Vertex AI. Args: credentials: JSON string of service account credentials. @@ -85,13 +85,13 @@ class GeminiVertexMultimodalLiveLLMService(GeminiMultimodalLiveLLMService): is first set. Defaults to True. file_api_base_url: Base URL for the Gemini File API. Defaults to the official endpoint. http_options: HTTP options for the client. - **kwargs: Additional arguments passed to parent GeminiMultimodalLiveLLMService. + **kwargs: Additional arguments passed to parent GeminiLiveLLMService. """ # Check if user incorrectly passed api_key, which is used by parent # class but not here. if "api_key" in kwargs: logger.error( - "GeminiVertexMultimodalLiveLLMService does not accept 'api_key' parameter. " + "GeminiLiveVertexLLMService does not accept 'api_key' parameter. " "Use 'credentials' or 'credentials_path' instead for Vertex AI authentication." ) raise ValueError( diff --git a/src/pipecat/services/gemini_multimodal_live/events.py b/src/pipecat/services/gemini_multimodal_live/events.py index 9705cf9f2..be69033aa 100644 --- a/src/pipecat/services/gemini_multimodal_live/events.py +++ b/src/pipecat/services/gemini_multimodal_live/events.py @@ -30,12 +30,15 @@ except ModuleNotFoundError as e: # These aliases are just here for backward compatibility, since we used to # define public-facing StartSensitivity and EndSensitivity enums in this # module. -warnings.warn( - "Importing StartSensitivity and EndSensitivity from " - "pipecat.services.gemini_multimodal_live.events is deprecated. " - "Please import them directly from google.genai.types instead.", - DeprecationWarning, - stacklevel=2, -) +with warnings.catch_warnings(): + warnings.simplefilter("always") + warnings.warn( + "Importing StartSensitivity and EndSensitivity from " + "pipecat.services.gemini_multimodal_live.events is deprecated. " + "Please import them directly from google.genai.types instead.", + DeprecationWarning, + stacklevel=2, + ) + StartSensitivity = _StartSensitivity EndSensitivity = _EndSensitivity diff --git a/src/pipecat/services/gemini_multimodal_live/file_api.py b/src/pipecat/services/gemini_multimodal_live/file_api.py index 5ae7fdbb7..53f8442fc 100644 --- a/src/pipecat/services/gemini_multimodal_live/file_api.py +++ b/src/pipecat/services/gemini_multimodal_live/file_api.py @@ -9,181 +9,31 @@ This module provides a client for Google's Gemini File API, enabling file uploads, metadata retrieval, listing, and deletion. Files uploaded through this API can be referenced in Gemini generative model calls. + +.. deprecated:: 0.0.90 + Importing GeminiFileAPI from this module is deprecated. + Import it from pipecat.services.gemini_live.file_api instead. """ -import mimetypes -from typing import Any, Dict, Optional +import warnings -import aiohttp from loguru import logger +try: + from pipecat.services.gemini_live.file_api import GeminiFileAPI as _GeminiFileAPI +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error("In order to use Google AI, you need to `pip install pipecat-ai[google]`.") + raise Exception(f"Missing module: {e}") -class GeminiFileAPI: - """Client for the Gemini File API. - - This class provides methods for uploading, fetching, listing, and deleting files - through Google's Gemini File API. - - Files uploaded through this API remain available for 48 hours and can be referenced - in calls to the Gemini generative models. Maximum file size is 2GB, with total - project storage limited to 20GB. - """ - - def __init__( - self, api_key: str, base_url: str = "https://generativelanguage.googleapis.com/v1beta/files" - ): - """Initialize the Gemini File API client. - - Args: - api_key: Google AI API key - base_url: Base URL for the Gemini File API (default is the v1beta endpoint) - """ - self._api_key = api_key - self._base_url = base_url - # Upload URL uses the /upload/ path - self.upload_base_url = "https://generativelanguage.googleapis.com/upload/v1beta/files" - - async def upload_file( - self, file_path: str, display_name: Optional[str] = None - ) -> Dict[str, Any]: - """Upload a file to the Gemini File API using the correct resumable upload protocol. - - Args: - file_path: Path to the file to upload - display_name: Optional display name for the file - - Returns: - File metadata including uri, name, and display_name - """ - logger.info(f"Uploading file: {file_path}") - - async with aiohttp.ClientSession() as session: - # Determine the file's MIME type - mime_type, _ = mimetypes.guess_type(file_path) - if not mime_type: - mime_type = "application/octet-stream" - - # Read the file - with open(file_path, "rb") as f: - file_data = f.read() - - # Create the metadata payload - metadata = {} - if display_name: - metadata = {"file": {"display_name": display_name}} - - # Step 1: Initial resumable request to get upload URL - headers = { - "X-Goog-Upload-Protocol": "resumable", - "X-Goog-Upload-Command": "start", - "X-Goog-Upload-Header-Content-Length": str(len(file_data)), - "X-Goog-Upload-Header-Content-Type": mime_type, - "Content-Type": "application/json", - } - - logger.debug(f"Step 1: Getting upload URL from {self.upload_base_url}") - async with session.post( - f"{self.upload_base_url}?key={self._api_key}", headers=headers, json=metadata - ) as response: - if response.status != 200: - error_text = await response.text() - logger.error(f"Error initiating file upload: {error_text}") - raise Exception(f"Failed to initiate upload: {response.status} - {error_text}") - - # Get the upload URL from the response header - upload_url = response.headers.get("X-Goog-Upload-URL") - if not upload_url: - logger.error(f"Response headers: {dict(response.headers)}") - raise Exception("No upload URL in response headers") - - logger.debug(f"Got upload URL: {upload_url}") - - # Step 2: Upload the actual file data - upload_headers = { - "Content-Length": str(len(file_data)), - "X-Goog-Upload-Offset": "0", - "X-Goog-Upload-Command": "upload, finalize", - } - - logger.debug(f"Step 2: Uploading file data to {upload_url}") - async with session.post(upload_url, headers=upload_headers, data=file_data) as response: - if response.status != 200: - error_text = await response.text() - logger.error(f"Error uploading file data: {error_text}") - raise Exception(f"Failed to upload file: {response.status} - {error_text}") - - file_info = await response.json() - logger.info(f"File uploaded successfully: {file_info.get('file', {}).get('name')}") - return file_info - - async def get_file(self, name: str) -> Dict[str, Any]: - """Get metadata for a file. - - Args: - name: File name (or full path) - - Returns: - File metadata - """ - # Extract just the name part if a full path is provided - if "/" in name: - name = name.split("/")[-1] - - async with aiohttp.ClientSession() as session: - async with session.get(f"{self._base_url}/{name}?key={self._api_key}") as response: - if response.status != 200: - error_text = await response.text() - logger.error(f"Error getting file metadata: {error_text}") - raise Exception(f"Failed to get file metadata: {response.status}") - - file_info = await response.json() - return file_info - - async def list_files( - self, page_size: int = 10, page_token: Optional[str] = None - ) -> Dict[str, Any]: - """List uploaded files. - - Args: - page_size: Number of files to return per page - page_token: Token for pagination - - Returns: - List of files and next page token if available - """ - params = {"key": self._api_key, "pageSize": page_size} - - if page_token: - params["pageToken"] = page_token - - async with aiohttp.ClientSession() as session: - async with session.get(self._base_url, params=params) as response: - if response.status != 200: - error_text = await response.text() - logger.error(f"Error listing files: {error_text}") - raise Exception(f"Failed to list files: {response.status}") - - result = await response.json() - return result - - async def delete_file(self, name: str) -> bool: - """Delete a file. - - Args: - name: File name (or full path) - - Returns: - True if deleted successfully - """ - # Extract just the name part if a full path is provided - if "/" in name: - name = name.split("/")[-1] - - async with aiohttp.ClientSession() as session: - async with session.delete(f"{self._base_url}/{name}?key={self._api_key}") as response: - if response.status != 200: - error_text = await response.text() - logger.error(f"Error deleting file: {error_text}") - raise Exception(f"Failed to delete file: {response.status}") - - return True +# These aliases are just here for backward compatibility, since we used to +# define public-facing StartSensitivity and EndSensitivity enums in this +# module. +warnings.warn( + "Importing GeminiFileAPI from " + "pipecat.services.gemini_multimodal_live.file_api is deprecated. " + "Please import it from pipecat.services.gemini_live.file_api instead.", + DeprecationWarning, + stacklevel=2, +) +GeminiFileAPI = _GeminiFileAPI diff --git a/src/pipecat/services/gemini_multimodal_live/gemini.py b/src/pipecat/services/gemini_multimodal_live/gemini.py index 8ab836173..4ac51e02e 100644 --- a/src/pipecat/services/gemini_multimodal_live/gemini.py +++ b/src/pipecat/services/gemini_multimodal_live/gemini.py @@ -4,1587 +4,54 @@ # SPDX-License-Identifier: BSD 2-Clause License # -"""Google Gemini Multimodal Live API service implementation. +"""Google Gemini Live API service implementation. This module provides real-time conversational AI capabilities using Google's -Gemini Multimodal Live API, supporting both text and audio modalities with +Gemini Live API, supporting both text and audio modalities with voice transcription, streaming responses, and tool usage. + +.. deprecated:: 0.0.90 + This module is deprecated. Please use the equivalent types from + pipecat.services.gemini_live.gemini instead. Note that the new type names + do not include 'Multimodal'. """ -import base64 -import io -import json -import random -import time -import uuid -from dataclasses import dataclass -from enum import Enum -from typing import Any, Dict, List, Optional, Union +import warnings -from loguru import logger -from PIL import Image -from pydantic import BaseModel, Field +from pipecat.services.gemini_live.gemini import ( + ContextWindowCompressionParams as _ContextWindowCompressionParams, +) +from pipecat.services.gemini_live.gemini import ( + GeminiLiveAssistantContextAggregator, + GeminiLiveContext, + GeminiLiveContextAggregatorPair, + GeminiLiveLLMService, + GeminiLiveUserContextAggregator, + GeminiModalities, +) +from pipecat.services.gemini_live.gemini import GeminiMediaResolution as _GeminiMediaResolution +from pipecat.services.gemini_live.gemini import GeminiVADParams as _GeminiVADParams +from pipecat.services.gemini_live.gemini import InputParams as _InputParams -from pipecat.adapters.schemas.tools_schema import ToolsSchema -from pipecat.adapters.services.gemini_adapter import GeminiLLMAdapter -from pipecat.frames.frames import ( - BotStartedSpeakingFrame, - BotStoppedSpeakingFrame, - CancelFrame, - EndFrame, - ErrorFrame, - Frame, - InputAudioRawFrame, - InputImageRawFrame, - InputTextRawFrame, - InterruptionFrame, - LLMContextFrame, - LLMFullResponseEndFrame, - LLMFullResponseStartFrame, - LLMMessagesAppendFrame, - LLMSetToolsFrame, - LLMTextFrame, - LLMUpdateSettingsFrame, - StartFrame, - TranscriptionFrame, - TTSAudioRawFrame, - TTSStartedFrame, - TTSStoppedFrame, - TTSTextFrame, - UserImageRawFrame, - UserStartedSpeakingFrame, - UserStoppedSpeakingFrame, -) -from pipecat.metrics.metrics import LLMTokenUsage -from pipecat.processors.aggregators.llm_response import ( - LLMAssistantAggregatorParams, - LLMUserAggregatorParams, -) -from pipecat.processors.aggregators.openai_llm_context import ( - OpenAILLMContext, - OpenAILLMContextFrame, -) -from pipecat.processors.frame_processor import FrameDirection -from pipecat.services.google.frames import LLMSearchOrigin, LLMSearchResponseFrame, LLMSearchResult -from pipecat.services.llm_service import FunctionCallFromLLM, LLMService -from pipecat.services.openai.llm import ( - OpenAIAssistantContextAggregator, - OpenAIUserContextAggregator, -) -from pipecat.transcriptions.language import Language -from pipecat.utils.string import match_endofsentence -from pipecat.utils.time import time_now_iso8601 -from pipecat.utils.tracing.service_decorators import traced_gemini_live, traced_stt - -from .file_api import GeminiFileAPI - -try: - from google.genai import Client - from google.genai.live import AsyncSession - from google.genai.types import ( - AudioTranscriptionConfig, - AutomaticActivityDetection, - Blob, - Content, - ContextWindowCompressionConfig, - EndSensitivity, - FileData, - FunctionResponse, - GenerationConfig, - GroundingMetadata, - HttpOptions, - LiveConnectConfig, - LiveServerMessage, - MediaResolution, - Modality, - Part, - ProactivityConfig, - RealtimeInputConfig, - SessionResumptionConfig, - SlidingWindow, - SpeechConfig, - StartSensitivity, - ThinkingConfig, - VoiceConfig, +with warnings.catch_warnings(): + warnings.simplefilter("always") + warnings.warn( + "Types in pipecat.services.gemini_multimodal_live.gemini are deprecated. " + "Please use the equivalent types from " + "pipecat.services.gemini_live.gemini instead. Note that the new type " + "names do not include 'Multimodal' " + "(e.g. `GeminiMultimodalLiveLLMService` is now `GeminiLiveLLMService`).", + DeprecationWarning, + stacklevel=2, ) -except ModuleNotFoundError as e: - logger.error(f"Exception: {e}") - logger.error("In order to use Google AI, you need to `pip install pipecat-ai[google]`.") - raise Exception(f"Missing module: {e}") - -# Connection management constants -MAX_CONSECUTIVE_FAILURES = 3 -CONNECTION_ESTABLISHED_THRESHOLD = 10.0 # seconds - - -def language_to_gemini_language(language: Language) -> Optional[str]: - """Maps a Language enum value to a Gemini Live supported language code. - - Source: - https://ai.google.dev/api/generate-content#MediaResolution - - Args: - language: The language enum value to convert. - - Returns: - The Gemini language code string, or None if the language is not supported. - """ - language_map = { - # Arabic - Language.AR: "ar-XA", - # Bengali - Language.BN_IN: "bn-IN", - # Chinese (Mandarin) - Language.CMN: "cmn-CN", - Language.CMN_CN: "cmn-CN", - Language.ZH: "cmn-CN", # Map general Chinese to Mandarin for Gemini - Language.ZH_CN: "cmn-CN", # Map Simplified Chinese to Mandarin for Gemini - # German - Language.DE: "de-DE", - Language.DE_DE: "de-DE", - # English - Language.EN: "en-US", # Default to US English (though not explicitly listed in supported codes) - Language.EN_US: "en-US", - Language.EN_AU: "en-AU", - Language.EN_GB: "en-GB", - Language.EN_IN: "en-IN", - # Spanish - Language.ES: "es-ES", # Default to Spain Spanish - Language.ES_ES: "es-ES", - Language.ES_US: "es-US", - # French - Language.FR: "fr-FR", # Default to France French - Language.FR_FR: "fr-FR", - Language.FR_CA: "fr-CA", - # Gujarati - Language.GU: "gu-IN", - Language.GU_IN: "gu-IN", - # Hindi - Language.HI: "hi-IN", - Language.HI_IN: "hi-IN", - # Indonesian - Language.ID: "id-ID", - Language.ID_ID: "id-ID", - # Italian - Language.IT: "it-IT", - Language.IT_IT: "it-IT", - # Japanese - Language.JA: "ja-JP", - Language.JA_JP: "ja-JP", - # Kannada - Language.KN: "kn-IN", - Language.KN_IN: "kn-IN", - # Korean - Language.KO: "ko-KR", - Language.KO_KR: "ko-KR", - # Malayalam - Language.ML: "ml-IN", - Language.ML_IN: "ml-IN", - # Marathi - Language.MR: "mr-IN", - Language.MR_IN: "mr-IN", - # Dutch - Language.NL: "nl-NL", - Language.NL_NL: "nl-NL", - # Polish - Language.PL: "pl-PL", - Language.PL_PL: "pl-PL", - # Portuguese (Brazil) - Language.PT_BR: "pt-BR", - # Russian - Language.RU: "ru-RU", - Language.RU_RU: "ru-RU", - # Tamil - Language.TA: "ta-IN", - Language.TA_IN: "ta-IN", - # Telugu - Language.TE: "te-IN", - Language.TE_IN: "te-IN", - # Thai - Language.TH: "th-TH", - Language.TH_TH: "th-TH", - # Turkish - Language.TR: "tr-TR", - Language.TR_TR: "tr-TR", - # Vietnamese - Language.VI: "vi-VN", - Language.VI_VN: "vi-VN", - } - return language_map.get(language) - - -class GeminiMultimodalLiveContext(OpenAILLMContext): - """Extended OpenAI context for Gemini Multimodal Live API. - - Provides Gemini-specific context management including system instruction - extraction and message format conversion for the Live API. - """ - - @staticmethod - def upgrade(obj: OpenAILLMContext) -> "GeminiMultimodalLiveContext": - """Upgrade an OpenAI context to Gemini context. - - Args: - obj: The OpenAI context to upgrade. - - Returns: - The upgraded Gemini context instance. - """ - if isinstance(obj, OpenAILLMContext) and not isinstance(obj, GeminiMultimodalLiveContext): - logger.debug(f"Upgrading to Gemini Multimodal Live Context: {obj}") - obj.__class__ = GeminiMultimodalLiveContext - obj._restructure_from_openai_messages() - return obj - - def _restructure_from_openai_messages(self): - pass - - def extract_system_instructions(self): - """Extract system instructions from context messages. - - Returns: - Combined system instruction text from all system messages. - """ - system_instruction = "" - for item in self.messages: - if item.get("role") == "system": - content = item.get("content", "") - if content: - if system_instruction and not system_instruction.endswith("\n"): - system_instruction += "\n" - system_instruction += str(content) - return system_instruction - - def add_file_reference(self, file_uri: str, mime_type: str, text: Optional[str] = None): - """Add a file reference to the context. - - This adds a user message with a file reference that will be sent during context initialization. - - Args: - file_uri: URI of the uploaded file - mime_type: MIME type of the file - text: Optional text prompt to accompany the file - """ - # Create parts list with file reference - parts = [] - if text: - parts.append({"type": "text", "text": text}) - - # Add file reference part - parts.append( - {"type": "file_data", "file_data": {"mime_type": mime_type, "file_uri": file_uri}} - ) - - # Add to messages - message = {"role": "user", "content": parts} - self.messages.append(message) - logger.info(f"Added file reference to context: {file_uri}") - - def get_messages_for_initializing_history(self) -> List[Content]: - """Get messages formatted for Gemini history initialization. - - Returns: - List of messages in Gemini format for conversation history. - """ - messages: List[Content] = [] - for item in self.messages: - role = item.get("role") - - if role == "system": - continue - - elif role == "assistant": - role = "model" - - content = item.get("content") - parts: List[Part] = [] - if isinstance(content, str): - parts = [Part(text=content)] - elif isinstance(content, list): - for part in content: - if part.get("type") == "text": - parts.append(Part(text=part.get("text"))) - elif part.get("type") == "file_data": - file_data = part.get("file_data", {}) - parts.append( - Part( - file_data=FileData( - mime_type=file_data.get("mime_type"), - file_uri=file_data.get("file_uri"), - ) - ) - ) - else: - logger.warning(f"Unsupported content type: {str(part)[:80]}") - else: - logger.warning(f"Unsupported content type: {str(content)[:80]}") - messages.append(Content(role=role, parts=parts)) - return messages - - -class GeminiMultimodalLiveUserContextAggregator(OpenAIUserContextAggregator): - """User context aggregator for Gemini Multimodal Live. - - Extends OpenAI user aggregator to handle Gemini-specific message passing - while maintaining compatibility with the standard aggregation pipeline. - """ - - async def process_frame(self, frame, direction): - """Process incoming frames for user context aggregation. - - Args: - frame: The frame to process. - direction: The frame processing direction. - """ - await super().process_frame(frame, direction) - # kind of a hack just to pass the LLMMessagesAppendFrame through, but it's fine for now - if isinstance(frame, LLMMessagesAppendFrame): - await self.push_frame(frame, direction) - - -class GeminiMultimodalLiveAssistantContextAggregator(OpenAIAssistantContextAggregator): - """Assistant context aggregator for Gemini Multimodal Live. - - Handles assistant response aggregation while filtering out LLMTextFrames - to prevent duplicate context entries, as Gemini Live pushes both - LLMTextFrames and TTSTextFrames. - """ - - async def process_frame(self, frame: Frame, direction: FrameDirection): - """Process incoming frames for assistant context aggregation. - - Args: - frame: The frame to process. - direction: The frame processing direction. - """ - # The LLMAssistantContextAggregator uses TextFrames to aggregate the LLM output, - # but the GeminiMultimodalLiveAssistantContextAggregator pushes LLMTextFrames and TTSTextFrames. We - # need to override this proces_frame for LLMTextFrame, so that only the TTSTextFrames - # are process. This ensures that the context gets only one set of messages. - if not isinstance(frame, LLMTextFrame): - await super().process_frame(frame, direction) - - async def handle_user_image_frame(self, frame: UserImageRawFrame): - """Handle user image frames. - - Args: - frame: The user image frame to handle. - """ - # We don't want to store any images in the context. Revisit this later - # when the API evolves. - pass - - -@dataclass -class GeminiMultimodalLiveContextAggregatorPair: - """Pair of user and assistant context aggregators for Gemini Multimodal Live. - - Parameters: - _user: The user context aggregator instance. - _assistant: The assistant context aggregator instance. - """ - - _user: GeminiMultimodalLiveUserContextAggregator - _assistant: GeminiMultimodalLiveAssistantContextAggregator - - def user(self) -> GeminiMultimodalLiveUserContextAggregator: - """Get the user context aggregator. - - Returns: - The user context aggregator instance. - """ - return self._user - - def assistant(self) -> GeminiMultimodalLiveAssistantContextAggregator: - """Get the assistant context aggregator. - - Returns: - The assistant context aggregator instance. - """ - return self._assistant - - -class GeminiMultimodalModalities(Enum): - """Supported modalities for Gemini Multimodal Live. - - Parameters: - TEXT: Text responses. - AUDIO: Audio responses. - """ - - TEXT = "TEXT" - AUDIO = "AUDIO" - - -class GeminiMediaResolution(str, Enum): - """Media resolution options for Gemini Multimodal Live. - - Parameters: - UNSPECIFIED: Use default resolution setting. - LOW: Low resolution with 64 tokens. - MEDIUM: Medium resolution with 256 tokens. - HIGH: High resolution with zoomed reframing and 256 tokens. - """ - - UNSPECIFIED = "MEDIA_RESOLUTION_UNSPECIFIED" # Use default - LOW = "MEDIA_RESOLUTION_LOW" # 64 tokens - MEDIUM = "MEDIA_RESOLUTION_MEDIUM" # 256 tokens - HIGH = "MEDIA_RESOLUTION_HIGH" # Zoomed reframing with 256 tokens - - -class GeminiVADParams(BaseModel): - """Voice Activity Detection parameters for Gemini Live. - - Parameters: - disabled: Whether to disable VAD. Defaults to None. - start_sensitivity: Sensitivity for speech start detection. Defaults to None. - end_sensitivity: Sensitivity for speech end detection. Defaults to None. - prefix_padding_ms: Prefix padding in milliseconds. Defaults to None. - silence_duration_ms: Silence duration threshold in milliseconds. Defaults to None. - """ - - disabled: Optional[bool] = Field(default=None) - start_sensitivity: Optional[StartSensitivity] = Field(default=None) - end_sensitivity: Optional[EndSensitivity] = Field(default=None) - prefix_padding_ms: Optional[int] = Field(default=None) - silence_duration_ms: Optional[int] = Field(default=None) - - -class ContextWindowCompressionParams(BaseModel): - """Parameters for context window compression in Gemini Live. - - Parameters: - enabled: Whether compression is enabled. Defaults to False. - trigger_tokens: Token count to trigger compression. None uses 80% of context window. - """ - - enabled: bool = Field(default=False) - trigger_tokens: Optional[int] = Field( - default=None - ) # None = use default (80% of context window) - - -class InputParams(BaseModel): - """Input parameters for Gemini Multimodal Live generation. - - Parameters: - frequency_penalty: Frequency penalty for generation (0.0-2.0). Defaults to None. - max_tokens: Maximum tokens to generate. Must be >= 1. Defaults to 4096. - presence_penalty: Presence penalty for generation (0.0-2.0). Defaults to None. - temperature: Sampling temperature (0.0-2.0). Defaults to None. - top_k: Top-k sampling parameter. Must be >= 0. Defaults to None. - top_p: Top-p sampling parameter (0.0-1.0). Defaults to None. - modalities: Response modalities. Defaults to AUDIO. - language: Language for generation. Defaults to EN_US. - media_resolution: Media resolution setting. Defaults to UNSPECIFIED. - vad: Voice activity detection parameters. Defaults to None. - context_window_compression: Context compression settings. Defaults to None. - thinking: Thinking settings. Defaults to None. - Note that these settings may require specifying a model that - supports them, e.g. "gemini-2.5-flash-native-audio-preview-09-2025". - enable_affective_dialog: Enable affective dialog, which allows Gemini - to adapt to expression and tone. Defaults to None. - Note that these settings may require specifying a model that - supports them, e.g. "gemini-2.5-flash-native-audio-preview-09-2025". - Also note that this setting may require specifying an API version that - supports it, e.g. HttpOptions(api_version="v1alpha"). - proactivity: Proactivity settings, which allows Gemini to proactively - decide how to behave, such as whether to avoid responding to - content that is not relevant. Defaults to None. - Note that these settings may require specifying a model that - supports them, e.g. "gemini-2.5-flash-native-audio-preview-09-2025". - Also note that this setting may require specifying an API version that - supports it, e.g. HttpOptions(api_version="v1alpha"). - extra: Additional parameters. Defaults to empty dict. - """ - - frequency_penalty: Optional[float] = Field(default=None, ge=0.0, le=2.0) - max_tokens: Optional[int] = Field(default=4096, ge=1) - presence_penalty: Optional[float] = Field(default=None, ge=0.0, le=2.0) - temperature: Optional[float] = Field(default=None, ge=0.0, le=2.0) - top_k: Optional[int] = Field(default=None, ge=0) - top_p: Optional[float] = Field(default=None, ge=0.0, le=1.0) - modalities: Optional[GeminiMultimodalModalities] = Field( - default=GeminiMultimodalModalities.AUDIO - ) - language: Optional[Language] = Field(default=Language.EN_US) - media_resolution: Optional[GeminiMediaResolution] = Field( - default=GeminiMediaResolution.UNSPECIFIED - ) - vad: Optional[GeminiVADParams] = Field(default=None) - context_window_compression: Optional[ContextWindowCompressionParams] = Field(default=None) - thinking: Optional[ThinkingConfig] = Field(default=None) - enable_affective_dialog: Optional[bool] = Field(default=None) - proactivity: Optional[ProactivityConfig] = Field(default=None) - extra: Optional[Dict[str, Any]] = Field(default_factory=dict) - - -class GeminiMultimodalLiveLLMService(LLMService): - """Provides access to Google's Gemini Multimodal Live API. - - This service enables real-time conversations with Gemini, supporting both - text and audio modalities. It handles voice transcription, streaming audio - responses, and tool usage. - """ - - # Overriding the default adapter to use the Gemini one. - adapter_class = GeminiLLMAdapter - - def __init__( - self, - *, - api_key: str, - base_url: Optional[str] = None, - model="models/gemini-2.0-flash-live-001", - voice_id: str = "Charon", - start_audio_paused: bool = False, - start_video_paused: bool = False, - system_instruction: Optional[str] = None, - tools: Optional[Union[List[dict], ToolsSchema]] = None, - params: Optional[InputParams] = None, - inference_on_context_initialization: bool = True, - file_api_base_url: str = "https://generativelanguage.googleapis.com/v1beta/files", - http_options: Optional[HttpOptions] = None, - **kwargs, - ): - """Initialize the Gemini Multimodal Live LLM service. - - Args: - api_key: Google AI API key for authentication. - base_url: API endpoint base URL. Defaults to the official Gemini Live endpoint. - - .. deprecated:: 0.0.90 - This parameter is deprecated and no longer has any effect. - Please use `http_options` to customize requests made by the - API client. - - model: Model identifier to use. Defaults to "models/gemini-2.0-flash-live-001". - voice_id: TTS voice identifier. Defaults to "Charon". - start_audio_paused: Whether to start with audio input paused. Defaults to False. - start_video_paused: Whether to start with video input paused. Defaults to False. - system_instruction: System prompt for the model. Defaults to None. - tools: Tools/functions available to the model. Defaults to None. - params: Configuration parameters for the model. Defaults to InputParams(). - inference_on_context_initialization: Whether to generate a response when context - is first set. Defaults to True. - file_api_base_url: Base URL for the Gemini File API. Defaults to the official endpoint. - http_options: HTTP options for the client. - **kwargs: Additional arguments passed to parent LLMService. - """ - # Check for deprecated parameter usage - if base_url is not None: - import warnings - - with warnings.catch_warnings(): - warnings.simplefilter("always") - warnings.warn( - "Parameter 'base_url' is deprecated and no longer has any effect. Please use 'http_options' to customize requests made by the API client.", - DeprecationWarning, - stacklevel=2, - ) - - super().__init__(base_url=base_url, **kwargs) - - params = params or InputParams() - - self._last_sent_time = 0 - self._base_url = base_url - self.set_model_name(model) - self._voice_id = voice_id - self._language_code = params.language - - self._system_instruction = system_instruction - self._tools = tools - self._inference_on_context_initialization = inference_on_context_initialization - self._needs_turn_complete_message = False - - self._audio_input_paused = start_audio_paused - self._video_input_paused = start_video_paused - self._context = None - self._api_key = api_key - self._http_options = http_options - self._session: AsyncSession = None - self._connection_task = None - - self._disconnecting = False - self._run_llm_when_session_ready = False - - self._user_is_speaking = False - self._bot_is_speaking = False - self._user_audio_buffer = bytearray() - self._user_transcription_buffer = "" - self._last_transcription_sent = "" - self._bot_audio_buffer = bytearray() - self._bot_text_buffer = "" - self._llm_output_buffer = "" - - self._sample_rate = 24000 - - self._language = params.language - self._language_code = ( - language_to_gemini_language(params.language) if params.language else "en-US" - ) - self._vad_params = params.vad - - # Reconnection tracking - self._consecutive_failures = 0 - self._connection_start_time = None - - self._settings = { - "frequency_penalty": params.frequency_penalty, - "max_tokens": params.max_tokens, - "presence_penalty": params.presence_penalty, - "temperature": params.temperature, - "top_k": params.top_k, - "top_p": params.top_p, - "modalities": params.modalities, - "language": self._language_code, - "media_resolution": params.media_resolution, - "vad": params.vad, - "context_window_compression": params.context_window_compression.model_dump() - if params.context_window_compression - else {}, - "thinking": params.thinking or {}, - "enable_affective_dialog": params.enable_affective_dialog or False, - "proactivity": params.proactivity or {}, - "extra": params.extra if isinstance(params.extra, dict) else {}, - } - - self._file_api_base_url = file_api_base_url - self._file_api: Optional[GeminiFileAPI] = None - - # Grounding metadata tracking - self._search_result_buffer = "" - self._accumulated_grounding_metadata = None - - # Session resumption - self._session_resumption_handle: Optional[str] = None - - # Bookkeeping for ending gracefully (i.e. after the bot is finished) - self._end_frame_pending_bot_turn_finished: Optional[EndFrame] = None - - # Initialize the API client. Subclasses can override this if needed. - self.create_client() - - def create_client(self): - """Create the Gemini API client instance. Subclasses can override this.""" - self._client = Client(api_key=self._api_key, http_options=self._http_options) - - @property - def file_api(self) -> GeminiFileAPI: - """Get the Gemini File API client instance. Subclasses can override this. - - Returns: - The Gemini File API client. - """ - if not self._file_api: - self._file_api = GeminiFileAPI(api_key=self._api_key, base_url=self._file_api_base_url) - return self._file_api - - def can_generate_metrics(self) -> bool: - """Check if the service can generate usage metrics. - - Returns: - True as Gemini Live supports token usage metrics. - """ - return True - - def needs_mcp_alternate_schema(self) -> bool: - """Check if this LLM service requires alternate MCP schema. - - Google/Gemini has stricter JSON schema validation and requires - certain properties to be removed or modified for compatibility. - - Returns: - True for Google/Gemini services. - """ - return True - - def set_audio_input_paused(self, paused: bool): - """Set the audio input pause state. - - Args: - paused: Whether to pause audio input. - """ - self._audio_input_paused = paused - - def set_video_input_paused(self, paused: bool): - """Set the video input pause state. - - Args: - paused: Whether to pause video input. - """ - self._video_input_paused = paused - - def set_model_modalities(self, modalities: GeminiMultimodalModalities): - """Set the model response modalities. - - Args: - modalities: The modalities to use for responses. - """ - self._settings["modalities"] = modalities - - def set_language(self, language: Language): - """Set the language for generation. - - Args: - language: The language to use for generation. - """ - self._language = language - self._language_code = language_to_gemini_language(language) or "en-US" - self._settings["language"] = self._language_code - logger.info(f"Set Gemini language to: {self._language_code}") - - async def set_context(self, context: OpenAILLMContext): - """Set the context explicitly from outside the pipeline. - - This is useful when initializing a conversation because in server-side VAD mode we might not have a - way to trigger the pipeline. This sends the history to the server. The `inference_on_context_initialization` - flag controls whether to set the turnComplete flag when we do this. Without that flag, the model will - not respond. This is often what we want when setting the context at the beginning of a conversation. - - Args: - context: The OpenAI LLM context to set. - """ - if self._context: - logger.error( - "Context already set. Can only set up Gemini Multimodal Live context once." - ) - return - self._context = GeminiMultimodalLiveContext.upgrade(context) - await self._create_initial_response() - - # - # standard AIService frame handling - # - - async def start(self, frame: StartFrame): - """Start the service and establish connection. - - Args: - frame: The start frame. - """ - await super().start(frame) - await self._connect() - - async def stop(self, frame: EndFrame): - """Stop the service and close connections. - - Args: - frame: The end frame. - """ - await super().stop(frame) - await self._disconnect() - - async def cancel(self, frame: CancelFrame): - """Cancel the service and close connections. - - Args: - frame: The cancel frame. - """ - await super().cancel(frame) - await self._disconnect() - - # - # speech and interruption handling - # - - async def _handle_interruption(self): - await self._set_bot_is_speaking(False) - await self.push_frame(TTSStoppedFrame()) - await self.push_frame(LLMFullResponseEndFrame()) - - async def _handle_user_started_speaking(self, frame): - self._user_is_speaking = True - pass - - async def _handle_user_stopped_speaking(self, frame): - self._user_is_speaking = False - self._user_audio_buffer = bytearray() - await self.start_ttfb_metrics() - if self._needs_turn_complete_message: - self._needs_turn_complete_message = False - # NOTE: without this, the model ignores the context it's been - # seeded with before the user started speaking - await self._session.send_client_content(turn_complete=True) - - # - # frame processing - # - # StartFrame, StopFrame, CancelFrame implemented in base class - # - - async def process_frame(self, frame: Frame, direction: FrameDirection): - """Process incoming frames for the Gemini Live service. - - Args: - frame: The frame to process. - direction: The frame processing direction. - """ - # Defer EndFrame handling until after the bot turn is finished - if isinstance(frame, EndFrame): - if self._bot_is_speaking: - logger.debug("Deferring handling EndFrame until bot turn is finished") - self._end_frame_pending_bot_turn_finished = frame - return - - await super().process_frame(frame, direction) - - if isinstance(frame, TranscriptionFrame): - await self.push_frame(frame, direction) - elif isinstance(frame, OpenAILLMContextFrame): - context: GeminiMultimodalLiveContext = GeminiMultimodalLiveContext.upgrade( - frame.context - ) - # For now, we'll only trigger inference here when either: - # 1. We have not seen a context frame before - # 2. The last message is a tool call result - if not self._context: - self._context = context - if frame.context.tools: - self._tools = frame.context.tools - await self._create_initial_response() - elif context.messages and context.messages[-1].get("role") == "tool": - # Support just one tool call per context frame for now - tool_result_message = context.messages[-1] - await self._tool_result(tool_result_message) - elif isinstance(frame, LLMContextFrame): - raise NotImplementedError( - "Universal LLMContext is not yet supported for Gemini Multimodal Live." - ) - elif isinstance(frame, InputTextRawFrame): - await self._send_user_text(frame.text) - await self.push_frame(frame, direction) - elif isinstance(frame, InputAudioRawFrame): - await self._send_user_audio(frame) - await self.push_frame(frame, direction) - elif isinstance(frame, InputImageRawFrame): - await self._send_user_video(frame) - await self.push_frame(frame, direction) - elif isinstance(frame, InterruptionFrame): - await self._handle_interruption() - await self.push_frame(frame, direction) - elif isinstance(frame, UserStartedSpeakingFrame): - await self._handle_user_started_speaking(frame) - await self.push_frame(frame, direction) - elif isinstance(frame, UserStoppedSpeakingFrame): - await self._handle_user_stopped_speaking(frame) - await self.push_frame(frame, direction) - elif isinstance(frame, BotStartedSpeakingFrame): - # Ignore this frame. Use the serverContent API message instead - await self.push_frame(frame, direction) - elif isinstance(frame, BotStoppedSpeakingFrame): - # ignore this frame. Use the serverContent.turnComplete API message - await self.push_frame(frame, direction) - elif isinstance(frame, LLMMessagesAppendFrame): - # NOTE: handling LLMMessagesAppendFrame here in the LLMService is - # unusual - typically this would be handled in the user context - # aggregator. Leaving this handling here so that user code that - # uses this frame *without* a user context aggregator still works - # (we have an example that does just that, actually). - await self._create_single_response(frame.messages) - elif isinstance(frame, LLMUpdateSettingsFrame): - await self._update_settings(frame.settings) - elif isinstance(frame, LLMSetToolsFrame): - await self._update_settings() - else: - await self.push_frame(frame, direction) - - async def _set_bot_is_speaking(self, speaking: bool): - if self._bot_is_speaking == speaking: - return - - self._bot_is_speaking = speaking - - if not self._bot_is_speaking and self._end_frame_pending_bot_turn_finished: - await self.queue_frame(self._end_frame_pending_bot_turn_finished) - self._end_frame_pending_bot_turn_finished = None - - async def _connect(self, session_resumption_handle: Optional[str] = None): - """Establish client connection to Gemini Live API.""" - if self._session: - # Here we assume that if we have a client, we are connected. We - # handle disconnections in the send/recv code paths. - return - - if session_resumption_handle: - logger.info( - f"Connecting to Gemini service with session_resumption_handle: {session_resumption_handle}" - ) - else: - logger.info("Connecting to Gemini service") - try: - # Assemble basic configuration - config = LiveConnectConfig( - generation_config=GenerationConfig( - frequency_penalty=self._settings["frequency_penalty"], - max_output_tokens=self._settings["max_tokens"], - presence_penalty=self._settings["presence_penalty"], - temperature=self._settings["temperature"], - top_k=self._settings["top_k"], - top_p=self._settings["top_p"], - response_modalities=[Modality(self._settings["modalities"].value)], - speech_config=SpeechConfig( - voice_config=VoiceConfig( - prebuilt_voice_config={"voice_name": self._voice_id} - ), - language_code=self._settings["language"], - ), - media_resolution=MediaResolution(self._settings["media_resolution"].value), - ), - input_audio_transcription=AudioTranscriptionConfig(), - output_audio_transcription=AudioTranscriptionConfig(), - session_resumption=SessionResumptionConfig(handle=session_resumption_handle), - ) - - # Add context window compression to configuration, if enabled - if self._settings.get("context_window_compression", {}).get("enabled", False): - compression_config = ContextWindowCompressionConfig() - - # Add sliding window (always true if compression is enabled) - compression_config.sliding_window = SlidingWindow() - - # Add trigger_tokens if specified - trigger_tokens = self._settings.get("context_window_compression", {}).get( - "trigger_tokens" - ) - if trigger_tokens is not None: - compression_config.trigger_tokens = trigger_tokens - - config.context_window_compression = compression_config - - # Add thinking configuration to configuration, if provided - if self._settings.get("thinking"): - config.thinking_config = self._settings["thinking"] - - # Add affective dialog setting, if provided - if self._settings.get("enable_affective_dialog", False): - config.enable_affective_dialog = self._settings["enable_affective_dialog"] - - # Add proactivity configuration to configuration, if provided - if self._settings.get("proactivity"): - config.proactivity = self._settings["proactivity"] - - # Add VAD configuration to configuration, if provided - if self._settings.get("vad"): - vad_config = AutomaticActivityDetection() - vad_params = self._settings["vad"] - has_vad_settings = False - - # Only add parameters that are explicitly set - if vad_params.disabled is not None: - vad_config.disabled = vad_params.disabled - has_vad_settings = True - - if vad_params.start_sensitivity: - vad_config.start_of_speech_sensitivity = vad_params.start_sensitivity - has_vad_settings = True - - if vad_params.end_sensitivity: - vad_config.end_of_speech_sensitivity = vad_params.end_sensitivity - has_vad_settings = True - - if vad_params.prefix_padding_ms is not None: - vad_config.prefix_padding_ms = vad_params.prefix_padding_ms - has_vad_settings = True - - if vad_params.silence_duration_ms is not None: - vad_config.silence_duration_ms = vad_params.silence_duration_ms - has_vad_settings = True - - # Only add automatic_activity_detection if we have VAD settings - if has_vad_settings: - config.realtime_input_config = RealtimeInputConfig( - automatic_activity_detection=vad_config - ) - - # Add system instruction to configuration, if provided - system_instruction = self._system_instruction or "" - if self._context and hasattr(self._context, "extract_system_instructions"): - system_instruction += "\n" + self._context.extract_system_instructions() - if system_instruction: - logger.debug(f"Setting system instruction: {system_instruction}") - config.system_instruction = system_instruction - - # Add tools to configuration, if provided - if self._tools: - logger.debug(f"Setting tools: {self._tools}") - config.tools = self.get_llm_adapter().from_standard_tools(self._tools) - - # Start the connection - self._connection_task = self.create_task(self._connection_task_handler(config=config)) - - except Exception as e: - await self.push_error(ErrorFrame(error=f"{self} Initialization error: {e}", fatal=True)) - - async def _connection_task_handler(self, config: LiveConnectConfig): - async with self._client.aio.live.connect(model=self._model_name, config=config) as session: - logger.info("Connected to Gemini service") - - # Mark connection start time - self._connection_start_time = time.time() - - await self._handle_session_ready(session) - - while True: - try: - turn = self._session.receive() - async for message in turn: - # Reset failure counter if connection has been stable - self._check_and_reset_failure_counter() - - if message.server_content and message.server_content.model_turn: - await self._handle_msg_model_turn(message) - elif ( - message.server_content - and message.server_content.turn_complete - and message.usage_metadata - ): - await self._handle_msg_turn_complete(message) - await self._handle_msg_usage_metadata(message) - elif message.server_content and message.server_content.input_transcription: - await self._handle_msg_input_transcription(message) - elif message.server_content and message.server_content.output_transcription: - await self._handle_msg_output_transcription(message) - elif message.server_content and message.server_content.grounding_metadata: - await self._handle_msg_grounding_metadata(message) - elif message.tool_call: - await self._handle_msg_tool_call(message) - elif message.session_resumption_update: - self._handle_msg_resumption_update(message) - except Exception as e: - if not self._disconnecting: - should_reconnect = await self._handle_connection_error(e) - if should_reconnect: - await self._reconnect() - return # Exit this connection handler, _reconnect will start a new one - break - - def _check_and_reset_failure_counter(self): - """Check if connection has been stable long enough to reset the failure counter. - - If the connection has been active for longer than the established threshold - and there are accumulated failures, reset the counter to 0. - """ - if ( - self._connection_start_time - and self._consecutive_failures > 0 - and time.time() - self._connection_start_time >= CONNECTION_ESTABLISHED_THRESHOLD - ): - logger.info( - f"Connection stable for {CONNECTION_ESTABLISHED_THRESHOLD}s, " - f"resetting failure counter from {self._consecutive_failures} to 0" - ) - self._consecutive_failures = 0 - - async def _handle_connection_error(self, error: Exception) -> bool: - """Handle a connection error and determine if reconnection should be attempted. - - Args: - error: The exception that caused the connection error. - - Returns: - True if reconnection should be attempted, False if a fatal error should be pushed. - """ - self._consecutive_failures += 1 - logger.warning( - f"Connection error (failure {self._consecutive_failures}/{MAX_CONSECUTIVE_FAILURES}): {error}" - ) - - if self._consecutive_failures >= MAX_CONSECUTIVE_FAILURES: - logger.error( - f"Max consecutive failures ({MAX_CONSECUTIVE_FAILURES}) reached, " - "treating as fatal error" - ) - await self.push_error( - ErrorFrame(error=f"{self} Error in receive loop: {error}", fatal=True) - ) - return False - else: - logger.info( - f"Attempting reconnection ({self._consecutive_failures}/{MAX_CONSECUTIVE_FAILURES})" - ) - return True - - async def _reconnect(self): - """Reconnect to Gemini Live API.""" - await self._disconnect() - await self._connect(session_resumption_handle=self._session_resumption_handle) - - async def _disconnect(self): - """Disconnect from Gemini Live API and clean up resources.""" - logger.info("Disconnecting from Gemini service") - try: - self._disconnecting = True - await self.stop_all_metrics() - if self._connection_task: - await self.cancel_task(self._connection_task, timeout=1.0) - self._connection_task = None - if self._session: - await self._session.close() - self._session = None - self._disconnecting = False - except Exception as e: - logger.error(f"{self} error disconnecting: {e}") - - async def _send_user_audio(self, frame): - """Send user audio frame to Gemini Live API.""" - if self._audio_input_paused or self._disconnecting or not self._session: - return - - # Send all audio to Gemini - try: - await self._session.send_realtime_input( - audio=Blob(data=frame.audio, mime_type=f"audio/pcm;rate={frame.sample_rate}") - ) - except Exception as e: - await self._handle_send_error(e) - - # Manage a buffer of audio to use for transcription - audio = frame.audio - if self._user_is_speaking: - self._user_audio_buffer.extend(audio) - else: - # Keep 1/2 second of audio in the buffer even when not speaking. - self._user_audio_buffer.extend(audio) - length = int((frame.sample_rate * frame.num_channels * 2) * 0.5) - self._user_audio_buffer = self._user_audio_buffer[-length:] - - async def _send_user_text(self, text: str): - """Send user text via Gemini Live API's realtime input stream. - - This method sends text through the realtimeInput stream (via TextInputMessage) - rather than the clientContent stream. This ensures text input is synchronized - with audio and video inputs, preventing temporal misalignment that can occur - when different modalities are processed through separate API pathways. - - For realtimeInput, turn completion is automatically inferred by the API based - on user activity, so no explicit turnComplete signal is needed. - - Args: - text: The text to send as user input. - """ - if self._disconnecting or not self._session: - return - - try: - await self._session.send_realtime_input(text=text) - except Exception as e: - await self._handle_send_error(e) - - async def _send_user_video(self, frame): - """Send user video frame to Gemini Live API.""" - if self._video_input_paused or self._disconnecting or not self._session: - return - - now = time.time() - if now - self._last_sent_time < 1: - return # Ignore if less than 1 second has passed - - self._last_sent_time = now # Update last sent time - logger.debug(f"Sending video frame to Gemini: {frame}") - - buffer = io.BytesIO() - Image.frombytes(frame.format, frame.size, frame.image).save(buffer, format="JPEG") - data = base64.b64encode(buffer.getvalue()).decode("utf-8") - - try: - await self._session.send_realtime_input(video=Blob(data=data, mime_type="image/jpeg")) - except Exception as e: - await self._handle_send_error(e) - - async def _create_initial_response(self): - """Create initial response based on context history.""" - if self._disconnecting: - return - - if not self._session: - self._run_llm_when_session_ready = True - return - - messages = self._context.get_messages_for_initializing_history() - if not messages: - return - - logger.debug(f"Creating initial response: {messages}") - - await self.start_ttfb_metrics() - - try: - await self._session.send_client_content( - turns=messages, turn_complete=self._inference_on_context_initialization - ) - except Exception as e: - await self._handle_send_error(e) - - # If we're generating a response right away upon initializing - # conversation history, set a flag saying that we need a turn complete - # message when the user stops speaking. - if not self._inference_on_context_initialization: - self._needs_turn_complete_message = True - - async def _create_single_response(self, messages_list): - """Create a single response from a list of messages.""" - if self._disconnecting or not self._session: - return - - # Create a throwaway context just for the purpose of getting messages - # in the right format - context = GeminiMultimodalLiveContext.upgrade(OpenAILLMContext(messages=messages_list)) - messages = context.get_messages_for_initializing_history() - - if not messages: - return - - logger.debug(f"Creating response: {messages}") - - await self.start_ttfb_metrics() - - try: - await self._session.send_client_content(turns=messages, turn_complete=True) - except Exception as e: - await self._handle_send_error(e) - - @traced_gemini_live(operation="llm_tool_result") - async def _tool_result(self, tool_result_message): - """Send tool result back to the API.""" - if self._disconnecting or not self._session: - return - - # For now we're shoving the name into the tool_call_id field, so this - # will work until we revisit that. - id = tool_result_message.get("tool_call_id") - name = tool_result_message.get("tool_call_name") - result = json.loads(tool_result_message.get("content") or "") - response = FunctionResponse(name=name, id=id, response=result) - - try: - await self._session.send_tool_response(function_responses=response) - except Exception as e: - await self._handle_send_error(e) - - @traced_gemini_live(operation="llm_setup") - async def _handle_session_ready(self, session: AsyncSession): - """Handle the session being ready.""" - self._session = session - # If we were just waititng for the session to be ready to run the LLM, - # do that now. - if self._run_llm_when_session_ready: - self._run_llm_when_session_ready = False - await self._create_initial_response() - - async def _handle_msg_model_turn(self, msg: LiveServerMessage): - """Handle the model turn message.""" - part = msg.server_content.model_turn.parts[0] - if not part: - return - - await self.stop_ttfb_metrics() - - # part.text is added when `modalities` is set to TEXT; otherwise, it's None - text = part.text - if text: - if not self._bot_text_buffer: - await self.push_frame(LLMFullResponseStartFrame()) - - self._bot_text_buffer += text - self._search_result_buffer += text # Also accumulate for grounding - await self.push_frame(LLMTextFrame(text=text)) - - # Check for grounding metadata in server content - if msg.server_content and msg.server_content.grounding_metadata: - self._accumulated_grounding_metadata = msg.server_content.grounding_metadata - - inline_data = part.inline_data - if not inline_data: - return - - # Check if mime type matches expected format - expected_mime_type = f"audio/pcm;rate={self._sample_rate}" - if inline_data.mime_type == expected_mime_type: - # Perfect match, continue processing - pass - elif inline_data.mime_type == "audio/pcm": - # Sample rate not provided in mime type, assume default - if not hasattr(self, "_sample_rate_warning_logged"): - logger.warning( - f"Sample rate not provided in mime type '{inline_data.mime_type}', assuming rate of {self._sample_rate}" - ) - self._sample_rate_warning_logged = True - else: - # Unrecognized format - logger.warning(f"Unrecognized server_content format {inline_data.mime_type}") - return - - audio = inline_data.data - if not audio: - return - - if not self._bot_is_speaking: - await self._set_bot_is_speaking(True) - await self.push_frame(TTSStartedFrame()) - await self.push_frame(LLMFullResponseStartFrame()) - - self._bot_audio_buffer.extend(audio) - frame = TTSAudioRawFrame( - audio=audio, - sample_rate=self._sample_rate, - num_channels=1, - ) - await self.push_frame(frame) - - @traced_gemini_live(operation="llm_tool_call") - async def _handle_msg_tool_call(self, message: LiveServerMessage): - """Handle tool call messages.""" - function_calls = message.tool_call.function_calls - if not function_calls: - return - if not self._context: - logger.error("Function calls are not supported without a context object.") - - function_calls_llm = [ - FunctionCallFromLLM( - context=self._context, - tool_call_id=( - # NOTE: when using Vertex AI we don't get server-provided - # tool call IDs here - f.id or str(uuid.uuid4()) - ), - function_name=f.name, - arguments=f.args, - ) - for f in function_calls - ] - - await self.run_function_calls(function_calls_llm) - - @traced_gemini_live(operation="llm_response") - async def _handle_msg_turn_complete(self, message: LiveServerMessage): - """Handle the turn complete message.""" - await self._set_bot_is_speaking(False) - text = self._bot_text_buffer - - # Trace the complete LLM response (this will be handled by the decorator) - # The decorator will extract the output text and usage metadata from the message - - self._bot_text_buffer = "" - self._llm_output_buffer = "" - - # Process grounding metadata if we have accumulated any - if self._accumulated_grounding_metadata: - await self._process_grounding_metadata( - self._accumulated_grounding_metadata, self._search_result_buffer - ) - - # Reset grounding tracking for next response - self._search_result_buffer = "" - self._accumulated_grounding_metadata = None - - # Only push the TTSStoppedFrame if the bot is outputting audio - # when text is found, modalities is set to TEXT and no audio - # is produced. - if not text: - await self.push_frame(TTSStoppedFrame()) - - await self.push_frame(LLMFullResponseEndFrame()) - - @traced_stt - async def _handle_user_transcription( - self, transcript: str, is_final: bool, language: Optional[Language] = None - ): - """Handle a transcription result with tracing.""" - pass - - async def _handle_msg_input_transcription(self, message: LiveServerMessage): - """Handle the input transcription message. - - Gemini Live sends user transcriptions in either single words or multi-word - phrases. As a result, we have to aggregate the input transcription. This handler - aggregates into sentences, splitting on the end of sentence markers. - """ - if not message.server_content.input_transcription: - return - - text = message.server_content.input_transcription.text - - if not text: - return - - # Strip leading space from sentence starts if buffer is empty - if text.startswith(" ") and not self._user_transcription_buffer: - text = text.lstrip() - - # Accumulate text in the buffer - self._user_transcription_buffer += text - - # Check for complete sentences - while True: - eos_end_marker = match_endofsentence(self._user_transcription_buffer) - if not eos_end_marker: - break - - # Extract the complete sentence - complete_sentence = self._user_transcription_buffer[:eos_end_marker] - # Keep the remainder for the next chunk - self._user_transcription_buffer = self._user_transcription_buffer[eos_end_marker:] - - # Send a TranscriptionFrame with the complete sentence - logger.debug(f"[Transcription:user] [{complete_sentence}]") - await self._handle_user_transcription( - complete_sentence, True, self._settings["language"] - ) - await self.push_frame( - TranscriptionFrame( - text=complete_sentence, - user_id="", - timestamp=time_now_iso8601(), - result=message, - ), - FrameDirection.UPSTREAM, - ) - - async def _handle_msg_output_transcription(self, message: LiveServerMessage): - """Handle the output transcription message.""" - if not message.server_content.output_transcription: - return - - # This is the output transcription text when modalities is set to AUDIO. - # In this case, we push LLMTextFrame and TTSTextFrame to be handled by the - # downstream assistant context aggregator. - text = message.server_content.output_transcription.text - - if not text: - return - - # Accumulate text for grounding as well - self._search_result_buffer += text - - # Check for grounding metadata in server content - if message.server_content and message.server_content.grounding_metadata: - self._accumulated_grounding_metadata = message.server_content.grounding_metadata - # Collect text for tracing - self._llm_output_buffer += text - - await self.push_frame(LLMTextFrame(text=text)) - await self.push_frame(TTSTextFrame(text=text)) - - async def _handle_msg_grounding_metadata(self, message: LiveServerMessage): - """Handle dedicated grounding metadata messages.""" - if message.server_content and message.server_content.grounding_metadata: - grounding_metadata = message.server_content.grounding_metadata - # Process the grounding metadata immediately - await self._process_grounding_metadata(grounding_metadata, self._search_result_buffer) - - async def _process_grounding_metadata( - self, grounding_metadata: GroundingMetadata, search_result: str = "" - ): - """Process grounding metadata and emit LLMSearchResponseFrame.""" - if not grounding_metadata: - return - - # Extract rendered content for search suggestions - rendered_content = None - if ( - grounding_metadata.search_entry_point - and grounding_metadata.search_entry_point.rendered_content - ): - rendered_content = grounding_metadata.search_entry_point.rendered_content - - # Convert grounding chunks and supports to LLMSearchOrigin format - origins = [] - - if grounding_metadata.grounding_chunks and grounding_metadata.grounding_supports: - # Create a mapping of chunk indices to origins - chunk_to_origin: Dict[int, LLMSearchOrigin] = {} - - for index, chunk in enumerate(grounding_metadata.grounding_chunks): - if chunk.web: - origin = LLMSearchOrigin( - site_uri=chunk.web.uri, site_title=chunk.web.title, results=[] - ) - chunk_to_origin[index] = origin - origins.append(origin) - - # Add grounding support results to the appropriate origins - for support in grounding_metadata.grounding_supports: - if support.segment and support.grounding_chunk_indices: - text = support.segment.text or "" - confidence_scores = support.confidence_scores or [] - - # Add this result to all origins referenced by this support - for chunk_index in support.grounding_chunk_indices: - if chunk_index in chunk_to_origin: - result = LLMSearchResult(text=text, confidence=confidence_scores) - chunk_to_origin[chunk_index].results.append(result) - - # Create and push the search response frame - search_frame = LLMSearchResponseFrame( - search_result=search_result, origins=origins, rendered_content=rendered_content - ) - - await self.push_frame(search_frame) - - async def _handle_msg_usage_metadata(self, message: LiveServerMessage): - """Handle the usage metadata message.""" - if not message.usage_metadata: - return - - usage = message.usage_metadata - - # Ensure we have valid integers for all token counts - prompt_tokens = usage.prompt_token_count or 0 - completion_tokens = usage.response_token_count or 0 - total_tokens = usage.total_token_count or (prompt_tokens + completion_tokens) - - tokens = LLMTokenUsage( - prompt_tokens=prompt_tokens, - completion_tokens=completion_tokens, - total_tokens=total_tokens, - ) - - await self.start_llm_usage_metrics(tokens) - - def _handle_msg_resumption_update(self, message: LiveServerMessage): - update = message.session_resumption_update - if update.resumable and update.new_handle: - self._session_resumption_handle = update.new_handle - - async def _handle_send_error(self, error: Exception): - # In server-to-server contexts, a WebSocket error should be quite rare. - # Given how hard it is to recover from a send-side error with proper - # state management, and that exponential backoff for retries can have - # cost/stability implications for a service cluster, let's just treat a - # send-side error as fatal. - if not self._disconnecting: - await self.push_error(ErrorFrame(error=f"{self} Send error: {error}", fatal=True)) - - def create_context_aggregator( - self, - context: OpenAILLMContext, - *, - user_params: LLMUserAggregatorParams = LLMUserAggregatorParams(), - assistant_params: LLMAssistantAggregatorParams = LLMAssistantAggregatorParams(), - ) -> GeminiMultimodalLiveContextAggregatorPair: - """Create an instance of GeminiMultimodalLiveContextAggregatorPair from an OpenAILLMContext. - - Constructor keyword arguments for both the user and assistant aggregators can be provided. - - Args: - context: The LLM context to use. - user_params: User aggregator parameters. Defaults to LLMUserAggregatorParams(). - assistant_params: Assistant aggregator parameters. Defaults to LLMAssistantAggregatorParams(). - - Returns: - GeminiMultimodalLiveContextAggregatorPair: A pair of context - aggregators, one for the user and one for the assistant, - encapsulated in an GeminiMultimodalLiveContextAggregatorPair. - """ - context.set_llm_adapter(self.get_llm_adapter()) - - GeminiMultimodalLiveContext.upgrade(context) - user = GeminiMultimodalLiveUserContextAggregator(context, params=user_params) - - assistant_params.expect_stripped_words = False - assistant = GeminiMultimodalLiveAssistantContextAggregator(context, params=assistant_params) - return GeminiMultimodalLiveContextAggregatorPair(_user=user, _assistant=assistant) +GeminiMultimodalLiveContext = GeminiLiveContext +GeminiMultimodalLiveUserContextAggregator = GeminiLiveUserContextAggregator +GeminiMultimodalLiveAssistantContextAggregator = GeminiLiveAssistantContextAggregator +GeminiMultimodalLiveContextAggregatorPair = GeminiLiveContextAggregatorPair +GeminiMultimodalModalities = GeminiModalities +GeminiMediaResolution = _GeminiMediaResolution +GeminiVADParams = _GeminiVADParams +ContextWindowCompressionParams = _ContextWindowCompressionParams +InputParams = _InputParams +GeminiMultimodalLiveLLMService = GeminiLiveLLMService