Merge pull request #2820 from pipecat-ai/pk/gemini-live-vertex-support

Support Gemini Live + Vertex AI
This commit is contained in:
kompfner
2025-10-09 11:53:41 -04:00
committed by GitHub
20 changed files with 2206 additions and 1746 deletions

View File

@@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added `GeminiVertexMultimodalLiveLLMService`, for accessing Gemini Live via
Google Vertex AI.
- Added some new configuration options to `GeminiMultimodalLiveLLMService`:
- `thinking`

View File

@@ -17,7 +17,7 @@ from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService
from pipecat.services.gemini_live.gemini import GeminiLiveLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
@@ -65,7 +65,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
Respond to what the user said in a creative and helpful way.
"""
llm = GeminiMultimodalLiveLLMService(
llm = GeminiLiveLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=system_instruction,
voice_id="Puck", # Aoede, Charon, Fenrir, Kore, Puck

View File

@@ -20,7 +20,7 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.transcript_processor import TranscriptProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService
from pipecat.services.gemini_live.gemini import GeminiLiveLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
@@ -65,7 +65,7 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
llm = GeminiMultimodalLiveLLMService(
llm = GeminiLiveLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
voice_id="Aoede", # Puck, Charon, Kore, Fenrir, Aoede
# system_instruction="Talk like a pirate."

View File

@@ -22,7 +22,7 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService
from pipecat.services.gemini_live.gemini import GeminiLiveLLMService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
@@ -122,12 +122,15 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
required=["location"],
)
search_tool = {"google_search": {}}
# KNOWN ISSUE: If using GeminiVertexMultimodalLiveLLMService, it appears
# you cannot use the "google_search" tool alongside other tools.
# See https://github.com/googleapis/python-genai/issues/941.
tools = ToolsSchema(
standard_tools=[weather_function, restaurant_function],
custom_tools={AdapterType.GEMINI: [search_tool]},
)
llm = GeminiMultimodalLiveLLMService(
llm = GeminiLiveLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=system_instruction,
tools=tools,

View File

@@ -24,7 +24,7 @@ from pipecat.runner.utils import (
maybe_capture_participant_camera,
maybe_capture_participant_screen,
)
from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService
from pipecat.services.gemini_live.gemini import GeminiLiveLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
@@ -58,7 +58,7 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
llm = GeminiMultimodalLiveLLMService(
llm = GeminiLiveLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
voice_id="Aoede", # Puck, Charon, Kore, Fenrir, Aoede
# system_instruction="Talk like a pirate."

View File

@@ -20,9 +20,9 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.gemini_multimodal_live.gemini import (
GeminiMultimodalLiveLLMService,
GeminiMultimodalModalities,
from pipecat.services.gemini_live.gemini import (
GeminiLiveLLMService,
GeminiModalities,
InputParams,
)
from pipecat.transports.base_transport import BaseTransport, TransportParams
@@ -80,11 +80,13 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
llm = GeminiMultimodalLiveLLMService(
# KNOWN ISSUE: If using GeminiVertexMultimodalLiveLLMService, it appears
# you cannot specify a modality other than AUDIO.
llm = GeminiLiveLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=SYSTEM_INSTRUCTION,
tools=[{"google_search": {}}, {"code_execution": {}}],
params=InputParams(modalities=GeminiMultimodalModalities.TEXT),
params=InputParams(modalities=GeminiModalities.TEXT),
)
# Optionally, you can set the response modalities via a function

View File

@@ -19,7 +19,7 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService
from pipecat.services.gemini_live.gemini import GeminiLiveLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
@@ -83,7 +83,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
# Initialize the Gemini Multimodal Live model
llm = GeminiMultimodalLiveLLMService(
llm = GeminiLiveLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
voice_id="Puck", # Aoede, Charon, Fenrir, Kore, Puck
system_instruction=system_instruction,

View File

@@ -19,8 +19,8 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.gemini_multimodal_live.gemini import (
GeminiMultimodalLiveLLMService,
from pipecat.services.gemini_live.gemini import (
GeminiLiveLLMService,
)
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
@@ -110,7 +110,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
"""
# Initialize Gemini service with File API support
llm = GeminiMultimodalLiveLLMService(
llm = GeminiLiveLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=system_instruction,
voice_id="Charon", # Aoede, Charon, Fenrir, Kore, Puck

View File

@@ -14,7 +14,7 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService
from pipecat.services.gemini_live.gemini import GeminiLiveLLMService
from pipecat.services.google.frames import LLMSearchResponseFrame
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
@@ -105,7 +105,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
custom_tools={AdapterType.GEMINI: [{"google_search": {}}, {"code_execution": {}}]},
)
llm = GeminiMultimodalLiveLLMService(
llm = GeminiLiveLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=SYSTEM_INSTRUCTION,
voice_id="Charon", # Aoede, Charon, Fenrir, Kore, Puck

View File

@@ -0,0 +1,134 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMMessagesAppendFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.gemini_live.gemini import GeminiLiveLLMService
from pipecat.services.gemini_live.vertex import GeminiLiveVertexLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
# Load environment variables
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
# set stop_secs to something roughly similar to the internal setting
# of the Multimodal Live api, just to align events.
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
# set stop_secs to something roughly similar to the internal setting
# of the Multimodal Live api, just to align events.
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
# set stop_secs to something roughly similar to the internal setting
# of the Multimodal Live api, just to align events.
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
# Create the Gemini Vertex Multimodal Live LLM service
system_instruction = f"""
You are a helpful AI assistant.
Your goal is to demonstrate your capabilities in a helpful and engaging way.
Your output will be converted to audio so don't include special characters in your answers.
Respond to what the user said in a creative and helpful way.
"""
llm = GeminiLiveVertexLLMService(
credentials=os.getenv("GOOGLE_VERTEX_TEST_CREDENTIALS"),
project_id=os.getenv("GOOGLE_CLOUD_PROJECT_ID"),
location=os.getenv("GOOGLE_CLOUD_LOCATION"),
system_instruction=system_instruction,
voice_id="Puck", # Aoede, Charon, Fenrir, Kore, Puck
)
# Build the pipeline
pipeline = Pipeline(
[
transport.input(),
llm,
transport.output(),
]
)
# Configure the pipeline task
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
# Handle client connection event
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
await task.queue_frames(
[
LLMMessagesAppendFrame(
messages=[
{
"role": "user",
"content": f"Greet the user and introduce yourself.",
}
]
)
]
)
# Handle client disconnection events
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
# Run the pipeline
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -24,7 +24,7 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService
from pipecat.services.gemini_live.gemini import GeminiLiveLLMService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
@@ -144,7 +144,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
custom_tools={AdapterType.GEMINI: [search_tool]},
)
llm = GeminiMultimodalLiveLLMService(
llm = GeminiLiveLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=system_instruction,
tools=tools,

View File

@@ -20,7 +20,7 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.gemini_multimodal_live import GeminiMultimodalLiveLLMService
from pipecat.services.gemini_live import GeminiLiveLLMService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.daily.transport import DailyParams, DailyTransport
@@ -94,7 +94,7 @@ Respond to what the user said in a creative and helpful way. Keep your responses
async def run_bot(pipecat_transport):
llm = GeminiMultimodalLiveLLMService(
llm = GeminiLiveLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
voice_id="Puck", # Aoede, Charon, Fenrir, Kore, Puck
transcribe_user_audio=True,

View File

@@ -87,9 +87,11 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
Includes both converted standard tools and any custom Gemini-specific tools.
"""
functions_schema = tools_schema.standard_tools
formatted_standard_tools = [
{"function_declarations": [func.to_default_dict() for func in functions_schema]}
]
formatted_standard_tools = (
[{"function_declarations": [func.to_default_dict() for func in functions_schema]}]
if functions_schema
else []
)
custom_gemini_tools = []
if tools_schema.custom_tools:
custom_gemini_tools = tools_schema.custom_tools.get(AdapterType.GEMINI, [])

View File

@@ -0,0 +1,3 @@
from .file_api import GeminiFileAPI
from .gemini import GeminiLiveLLMService
from .vertex import GeminiLiveVertexLLMService

View File

@@ -0,0 +1,189 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Gemini File API client for uploading and managing files.
This module provides a client for Google's Gemini File API, enabling file
uploads, metadata retrieval, listing, and deletion. Files uploaded through
this API can be referenced in Gemini generative model calls.
"""
import mimetypes
from typing import Any, Dict, Optional
import aiohttp
from loguru import logger
class GeminiFileAPI:
"""Client for the Gemini File API.
This class provides methods for uploading, fetching, listing, and deleting files
through Google's Gemini File API.
Files uploaded through this API remain available for 48 hours and can be referenced
in calls to the Gemini generative models. Maximum file size is 2GB, with total
project storage limited to 20GB.
"""
def __init__(
self, api_key: str, base_url: str = "https://generativelanguage.googleapis.com/v1beta/files"
):
"""Initialize the Gemini File API client.
Args:
api_key: Google AI API key
base_url: Base URL for the Gemini File API (default is the v1beta endpoint)
"""
self._api_key = api_key
self._base_url = base_url
# Upload URL uses the /upload/ path
self.upload_base_url = "https://generativelanguage.googleapis.com/upload/v1beta/files"
async def upload_file(
self, file_path: str, display_name: Optional[str] = None
) -> Dict[str, Any]:
"""Upload a file to the Gemini File API using the correct resumable upload protocol.
Args:
file_path: Path to the file to upload
display_name: Optional display name for the file
Returns:
File metadata including uri, name, and display_name
"""
logger.info(f"Uploading file: {file_path}")
async with aiohttp.ClientSession() as session:
# Determine the file's MIME type
mime_type, _ = mimetypes.guess_type(file_path)
if not mime_type:
mime_type = "application/octet-stream"
# Read the file
with open(file_path, "rb") as f:
file_data = f.read()
# Create the metadata payload
metadata = {}
if display_name:
metadata = {"file": {"display_name": display_name}}
# Step 1: Initial resumable request to get upload URL
headers = {
"X-Goog-Upload-Protocol": "resumable",
"X-Goog-Upload-Command": "start",
"X-Goog-Upload-Header-Content-Length": str(len(file_data)),
"X-Goog-Upload-Header-Content-Type": mime_type,
"Content-Type": "application/json",
}
logger.debug(f"Step 1: Getting upload URL from {self.upload_base_url}")
async with session.post(
f"{self.upload_base_url}?key={self._api_key}", headers=headers, json=metadata
) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Error initiating file upload: {error_text}")
raise Exception(f"Failed to initiate upload: {response.status} - {error_text}")
# Get the upload URL from the response header
upload_url = response.headers.get("X-Goog-Upload-URL")
if not upload_url:
logger.error(f"Response headers: {dict(response.headers)}")
raise Exception("No upload URL in response headers")
logger.debug(f"Got upload URL: {upload_url}")
# Step 2: Upload the actual file data
upload_headers = {
"Content-Length": str(len(file_data)),
"X-Goog-Upload-Offset": "0",
"X-Goog-Upload-Command": "upload, finalize",
}
logger.debug(f"Step 2: Uploading file data to {upload_url}")
async with session.post(upload_url, headers=upload_headers, data=file_data) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Error uploading file data: {error_text}")
raise Exception(f"Failed to upload file: {response.status} - {error_text}")
file_info = await response.json()
logger.info(f"File uploaded successfully: {file_info.get('file', {}).get('name')}")
return file_info
async def get_file(self, name: str) -> Dict[str, Any]:
"""Get metadata for a file.
Args:
name: File name (or full path)
Returns:
File metadata
"""
# Extract just the name part if a full path is provided
if "/" in name:
name = name.split("/")[-1]
async with aiohttp.ClientSession() as session:
async with session.get(f"{self._base_url}/{name}?key={self._api_key}") as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Error getting file metadata: {error_text}")
raise Exception(f"Failed to get file metadata: {response.status}")
file_info = await response.json()
return file_info
async def list_files(
self, page_size: int = 10, page_token: Optional[str] = None
) -> Dict[str, Any]:
"""List uploaded files.
Args:
page_size: Number of files to return per page
page_token: Token for pagination
Returns:
List of files and next page token if available
"""
params = {"key": self._api_key, "pageSize": page_size}
if page_token:
params["pageToken"] = page_token
async with aiohttp.ClientSession() as session:
async with session.get(self._base_url, params=params) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Error listing files: {error_text}")
raise Exception(f"Failed to list files: {response.status}")
result = await response.json()
return result
async def delete_file(self, name: str) -> bool:
"""Delete a file.
Args:
name: File name (or full path)
Returns:
True if deleted successfully
"""
# Extract just the name part if a full path is provided
if "/" in name:
name = name.split("/")[-1]
async with aiohttp.ClientSession() as session:
async with session.delete(f"{self._base_url}/{name}?key={self._api_key}") as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Error deleting file: {error_text}")
raise Exception(f"Failed to delete file: {response.status}")
return True

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,187 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Service for accessing Gemini Live via Google Vertex AI.
This module provides integration with Google's Gemini Live model via
Vertex AI, supporting both text and audio modalities with voice transcription,
streaming responses, and tool usage.
"""
import json
import os
from typing import List, Optional, Union
from loguru import logger
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.services.gemini_live.gemini import (
GeminiLiveLLMService,
HttpOptions,
InputParams,
)
try:
from google.auth import default
from google.auth.exceptions import GoogleAuthError
from google.auth.transport.requests import Request
from google.genai import Client
from google.oauth2 import service_account
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use Google Vertex AI, you need to `pip install pipecat-ai[google]`. Also, set up your Google credentials properly."
)
raise Exception(f"Missing module: {e}")
class GeminiLiveVertexLLMService(GeminiLiveLLMService):
"""Provides access to Google's Gemini Live model via Vertex AI.
This service enables real-time conversations with Gemini, supporting both
text and audio modalities. It handles voice transcription, streaming audio
responses, and tool usage.
"""
def __init__(
self,
*,
credentials: Optional[str] = None,
credentials_path: Optional[str] = None,
location: str = "us-east4",
project_id: str,
model="google/gemini-2.0-flash-live-preview-04-09",
voice_id: str = "Charon",
start_audio_paused: bool = False,
start_video_paused: bool = False,
system_instruction: Optional[str] = None,
tools: Optional[Union[List[dict], ToolsSchema]] = None,
params: Optional[InputParams] = None,
inference_on_context_initialization: bool = True,
file_api_base_url: str = "https://generativelanguage.googleapis.com/v1beta/files",
http_options: Optional[HttpOptions] = None,
**kwargs,
):
"""Initialize the service for accessing Gemini Live via Google Vertex AI.
Args:
credentials: JSON string of service account credentials.
credentials_path: Path to the service account JSON file.
location: GCP region for Vertex AI endpoint (e.g., "us-east4").
project_id: Google Cloud project ID.
model: Model identifier to use. Defaults to "models/gemini-2.0-flash-live-preview-04-09".
voice_id: TTS voice identifier. Defaults to "Charon".
start_audio_paused: Whether to start with audio input paused. Defaults to False.
start_video_paused: Whether to start with video input paused. Defaults to False.
system_instruction: System prompt for the model. Defaults to None.
tools: Tools/functions available to the model. Defaults to None.
params: Configuration parameters for the model along with Vertex AI
location and project ID.
inference_on_context_initialization: Whether to generate a response when context
is first set. Defaults to True.
file_api_base_url: Base URL for the Gemini File API. Defaults to the official endpoint.
http_options: HTTP options for the client.
**kwargs: Additional arguments passed to parent GeminiLiveLLMService.
"""
# Check if user incorrectly passed api_key, which is used by parent
# class but not here.
if "api_key" in kwargs:
logger.error(
"GeminiLiveVertexLLMService does not accept 'api_key' parameter. "
"Use 'credentials' or 'credentials_path' instead for Vertex AI authentication."
)
raise ValueError(
"Invalid parameter 'api_key'. Use 'credentials' or 'credentials_path' for Vertex AI authentication."
)
# These need to be set before calling super().__init__() because
# super().__init__() invokes create_client(), which needs these.
self._credentials = self._get_credentials(credentials, credentials_path)
self._project_id = project_id
self._location = location
# Call parent constructor with the obtained API key
super().__init__(
# api_key is required by parent class, but actually not used with
# Vertex
api_key="dummy",
model=model,
voice_id=voice_id,
start_audio_paused=start_audio_paused,
start_video_paused=start_video_paused,
system_instruction=system_instruction,
tools=tools,
params=params,
inference_on_context_initialization=inference_on_context_initialization,
file_api_base_url=file_api_base_url,
http_options=http_options,
**kwargs,
)
def create_client(self):
"""Create the Gemini client instance."""
self._client = Client(
vertexai=True,
credentials=self._credentials,
project=self._project_id,
location=self._location,
)
@property
def file_api(self):
"""Gemini File API is not supported with Vertex AI."""
raise NotImplementedError(
"When using Vertex AI, the recommended approach is to use Google Cloud Storage for file handling. The Gemini File API is not directly supported in this context."
)
@staticmethod
def _get_credentials(credentials: Optional[str], credentials_path: Optional[str]) -> str:
"""Retrieve Credentials using Google service account credentials JSON.
Supports multiple authentication methods:
1. Direct JSON credentials string
2. Path to service account JSON file
3. Default application credentials (ADC)
Args:
credentials: JSON string of service account credentials.
credentials_path: Path to the service account JSON file.
Returns:
OAuth token for API authentication.
Raises:
ValueError: If no valid credentials are provided or found.
"""
creds: Optional[service_account.Credentials] = None
if credentials:
# Parse and load credentials from JSON string
creds = service_account.Credentials.from_service_account_info(
json.loads(credentials),
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
elif credentials_path:
# Load credentials from JSON file
creds = service_account.Credentials.from_service_account_file(
credentials_path,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
else:
try:
creds, project_id = default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
except GoogleAuthError:
pass
if not creds:
raise ValueError("No valid credentials provided.")
creds.refresh(Request()) # Ensure token is up-to-date, lifetime is 1 hour.
return creds

View File

@@ -30,12 +30,15 @@ except ModuleNotFoundError as e:
# These aliases are just here for backward compatibility, since we used to
# define public-facing StartSensitivity and EndSensitivity enums in this
# module.
warnings.warn(
"Importing StartSensitivity and EndSensitivity from "
"pipecat.services.gemini_multimodal_live.events is deprecated. "
"Please import them directly from google.genai.types instead.",
DeprecationWarning,
stacklevel=2,
)
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Importing StartSensitivity and EndSensitivity from "
"pipecat.services.gemini_multimodal_live.events is deprecated. "
"Please import them directly from google.genai.types instead.",
DeprecationWarning,
stacklevel=2,
)
StartSensitivity = _StartSensitivity
EndSensitivity = _EndSensitivity

View File

@@ -9,181 +9,31 @@
This module provides a client for Google's Gemini File API, enabling file
uploads, metadata retrieval, listing, and deletion. Files uploaded through
this API can be referenced in Gemini generative model calls.
.. deprecated:: 0.0.90
Importing GeminiFileAPI from this module is deprecated.
Import it from pipecat.services.gemini_live.file_api instead.
"""
import mimetypes
from typing import Any, Dict, Optional
import warnings
import aiohttp
from loguru import logger
try:
from pipecat.services.gemini_live.file_api import GeminiFileAPI as _GeminiFileAPI
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Google AI, you need to `pip install pipecat-ai[google]`.")
raise Exception(f"Missing module: {e}")
class GeminiFileAPI:
"""Client for the Gemini File API.
This class provides methods for uploading, fetching, listing, and deleting files
through Google's Gemini File API.
Files uploaded through this API remain available for 48 hours and can be referenced
in calls to the Gemini generative models. Maximum file size is 2GB, with total
project storage limited to 20GB.
"""
def __init__(
self, api_key: str, base_url: str = "https://generativelanguage.googleapis.com/v1beta/files"
):
"""Initialize the Gemini File API client.
Args:
api_key: Google AI API key
base_url: Base URL for the Gemini File API (default is the v1beta endpoint)
"""
self._api_key = api_key
self._base_url = base_url
# Upload URL uses the /upload/ path
self.upload_base_url = "https://generativelanguage.googleapis.com/upload/v1beta/files"
async def upload_file(
self, file_path: str, display_name: Optional[str] = None
) -> Dict[str, Any]:
"""Upload a file to the Gemini File API using the correct resumable upload protocol.
Args:
file_path: Path to the file to upload
display_name: Optional display name for the file
Returns:
File metadata including uri, name, and display_name
"""
logger.info(f"Uploading file: {file_path}")
async with aiohttp.ClientSession() as session:
# Determine the file's MIME type
mime_type, _ = mimetypes.guess_type(file_path)
if not mime_type:
mime_type = "application/octet-stream"
# Read the file
with open(file_path, "rb") as f:
file_data = f.read()
# Create the metadata payload
metadata = {}
if display_name:
metadata = {"file": {"display_name": display_name}}
# Step 1: Initial resumable request to get upload URL
headers = {
"X-Goog-Upload-Protocol": "resumable",
"X-Goog-Upload-Command": "start",
"X-Goog-Upload-Header-Content-Length": str(len(file_data)),
"X-Goog-Upload-Header-Content-Type": mime_type,
"Content-Type": "application/json",
}
logger.debug(f"Step 1: Getting upload URL from {self.upload_base_url}")
async with session.post(
f"{self.upload_base_url}?key={self._api_key}", headers=headers, json=metadata
) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Error initiating file upload: {error_text}")
raise Exception(f"Failed to initiate upload: {response.status} - {error_text}")
# Get the upload URL from the response header
upload_url = response.headers.get("X-Goog-Upload-URL")
if not upload_url:
logger.error(f"Response headers: {dict(response.headers)}")
raise Exception("No upload URL in response headers")
logger.debug(f"Got upload URL: {upload_url}")
# Step 2: Upload the actual file data
upload_headers = {
"Content-Length": str(len(file_data)),
"X-Goog-Upload-Offset": "0",
"X-Goog-Upload-Command": "upload, finalize",
}
logger.debug(f"Step 2: Uploading file data to {upload_url}")
async with session.post(upload_url, headers=upload_headers, data=file_data) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Error uploading file data: {error_text}")
raise Exception(f"Failed to upload file: {response.status} - {error_text}")
file_info = await response.json()
logger.info(f"File uploaded successfully: {file_info.get('file', {}).get('name')}")
return file_info
async def get_file(self, name: str) -> Dict[str, Any]:
"""Get metadata for a file.
Args:
name: File name (or full path)
Returns:
File metadata
"""
# Extract just the name part if a full path is provided
if "/" in name:
name = name.split("/")[-1]
async with aiohttp.ClientSession() as session:
async with session.get(f"{self._base_url}/{name}?key={self._api_key}") as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Error getting file metadata: {error_text}")
raise Exception(f"Failed to get file metadata: {response.status}")
file_info = await response.json()
return file_info
async def list_files(
self, page_size: int = 10, page_token: Optional[str] = None
) -> Dict[str, Any]:
"""List uploaded files.
Args:
page_size: Number of files to return per page
page_token: Token for pagination
Returns:
List of files and next page token if available
"""
params = {"key": self._api_key, "pageSize": page_size}
if page_token:
params["pageToken"] = page_token
async with aiohttp.ClientSession() as session:
async with session.get(self._base_url, params=params) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Error listing files: {error_text}")
raise Exception(f"Failed to list files: {response.status}")
result = await response.json()
return result
async def delete_file(self, name: str) -> bool:
"""Delete a file.
Args:
name: File name (or full path)
Returns:
True if deleted successfully
"""
# Extract just the name part if a full path is provided
if "/" in name:
name = name.split("/")[-1]
async with aiohttp.ClientSession() as session:
async with session.delete(f"{self._base_url}/{name}?key={self._api_key}") as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Error deleting file: {error_text}")
raise Exception(f"Failed to delete file: {response.status}")
return True
# These aliases are just here for backward compatibility, since we used to
# define public-facing StartSensitivity and EndSensitivity enums in this
# module.
warnings.warn(
"Importing GeminiFileAPI from "
"pipecat.services.gemini_multimodal_live.file_api is deprecated. "
"Please import it from pipecat.services.gemini_live.file_api instead.",
DeprecationWarning,
stacklevel=2,
)
GeminiFileAPI = _GeminiFileAPI

File diff suppressed because it is too large Load Diff