diff --git a/examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py b/examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py index b5b2a18f6..e26fbed51 100644 --- a/examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py +++ b/examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py @@ -1,13 +1,12 @@ - import argparse import os from dotenv import load_dotenv from loguru import logger +from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams -from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema from pipecat.frames.frames import Frame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner @@ -16,12 +15,37 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService from pipecat.services.google.frames import LLMSearchResponseFrame -from pipecat.transports.base_transport import TransportParams -from pipecat.transports.network.small_webrtc import SmallWebRTCTransport -from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams +from pipecat.transports.services.daily import DailyParams load_dotenv(override=True) + +# We store functions so objects (e.g. SileroVADAnalyzer) don't get +# instantiated. The function will be called when the desired transport gets +# selected. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + video_in_enabled=False, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)), + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + video_in_enabled=False, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)), + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + video_in_enabled=False, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)), + ), +} + SYSTEM_INSTRUCTION = """ You are a helpful AI assistant that actively uses Google Search to provide up-to-date, accurate information. @@ -50,48 +74,34 @@ class GroundingMetadataProcessor(FrameProcessor): self._grounding_count = 0 async def process_frame(self, frame: Frame, direction: FrameDirection): - # Always call super().process_frame first await super().process_frame(frame, direction) if isinstance(frame, LLMSearchResponseFrame): self._grounding_count += 1 - logger.info(f"šŸ” GROUNDING METADATA RECEIVED #{self._grounding_count}") - logger.info(f"šŸ“ Search Result: {frame.search_result[:200]}...") - - if frame.origins: - logger.info(f"šŸ“ Origins: {len(frame.origins)} sources") - for i, origin in enumerate(frame.origins): - logger.info(f" {i+1}. {origin.site_title} - {origin.site_uri}") - + logger.info(f"\n\nšŸ” GROUNDING METADATA RECEIVED #{self._grounding_count}\n") + logger.info(f"šŸ“ Search Result Text: {frame.search_result[:200]}...") + if frame.rendered_content: - logger.info(f"šŸ”— Rendered Content Available: {len(frame.rendered_content)} chars") + logger.info(f"šŸ”— Rendered Content: {frame.rendered_content}") + + if frame.origins: + logger.info(f"šŸ“ Number of Origins: {len(frame.origins)}") + for i, origin in enumerate(frame.origins): + logger.info(f" Origin {i + 1}: {origin.site_title} - {origin.site_uri}") + if origin.results: + logger.info(f" Results: {len(origin.results)} items") # Always push the frame downstream await self.push_frame(frame, direction) -async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespace): - - # Initialize the SmallWebRTCTransport with the connection - transport = SmallWebRTCTransport( - webrtc_connection=webrtc_connection, - params=TransportParams( - audio_in_enabled=True, - audio_out_enabled=True, - video_in_enabled=False, - vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)), - ), - ) +async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool): + logger.info(f"Starting Gemini Live Grounding Metadata Test Bot") # Create tools using ToolsSchema with custom tools for Gemini tools = ToolsSchema( standard_tools=[], # No standard function declarations needed - custom_tools={ - AdapterType.GEMINI: [ - {"google_search": {}}, - {"code_execution": {}} - ] - } + custom_tools={AdapterType.GEMINI: [{"google_search": {}}, {"code_execution": {}}]}, ) llm = GeminiMultimodalLiveLLMService( @@ -108,7 +118,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac messages = [ { "role": "user", - "content": 'Please introduce yourself and let me know that you can help with current information by searching the web. Ask me what current information I\'d like to know about.', + "content": "Please introduce yourself and let me know that you can help with current information by searching the web. Ask me what current information I'd like to know about.", }, ] @@ -150,6 +160,6 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac if __name__ == "__main__": - from run import main + from pipecat.examples.run import main - main() + main(run_example, transport_params=transport_params)