Update grounding metadata example with final refinements

- Reorganize imports and transport_params structure
- Remove copyright header for consistency
- Enhance grounding metadata logging with better formatting
- Remove unnecessary PipelineParams configuration
- Update message content formatting

Completes incorporation of draft PR #2121 changes
This commit is contained in:
Pete
2025-07-03 17:53:55 -04:00
parent 4951c97eab
commit d565e9ae53

View File

@@ -1,13 +1,12 @@
import argparse
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema
from pipecat.frames.frames import Frame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -16,12 +15,37 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService
from pipecat.services.google.frames import LLMSearchResponseFrame
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
from pipecat.transports.services.daily import DailyParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_in_enabled=False,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_in_enabled=False,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_in_enabled=False,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
),
}
SYSTEM_INSTRUCTION = """
You are a helpful AI assistant that actively uses Google Search to provide up-to-date, accurate information.
@@ -50,48 +74,34 @@ class GroundingMetadataProcessor(FrameProcessor):
self._grounding_count = 0
async def process_frame(self, frame: Frame, direction: FrameDirection):
# Always call super().process_frame first
await super().process_frame(frame, direction)
if isinstance(frame, LLMSearchResponseFrame):
self._grounding_count += 1
logger.info(f"🔍 GROUNDING METADATA RECEIVED #{self._grounding_count}")
logger.info(f"📝 Search Result: {frame.search_result[:200]}...")
if frame.origins:
logger.info(f"📍 Origins: {len(frame.origins)} sources")
for i, origin in enumerate(frame.origins):
logger.info(f" {i+1}. {origin.site_title} - {origin.site_uri}")
logger.info(f"\n\n🔍 GROUNDING METADATA RECEIVED #{self._grounding_count}\n")
logger.info(f"📝 Search Result Text: {frame.search_result[:200]}...")
if frame.rendered_content:
logger.info(f"🔗 Rendered Content Available: {len(frame.rendered_content)} chars")
logger.info(f"🔗 Rendered Content: {frame.rendered_content}")
if frame.origins:
logger.info(f"📍 Number of Origins: {len(frame.origins)}")
for i, origin in enumerate(frame.origins):
logger.info(f" Origin {i + 1}: {origin.site_title} - {origin.site_uri}")
if origin.results:
logger.info(f" Results: {len(origin.results)} items")
# Always push the frame downstream
await self.push_frame(frame, direction)
async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespace):
# Initialize the SmallWebRTCTransport with the connection
transport = SmallWebRTCTransport(
webrtc_connection=webrtc_connection,
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_in_enabled=False,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
),
)
async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool):
logger.info(f"Starting Gemini Live Grounding Metadata Test Bot")
# Create tools using ToolsSchema with custom tools for Gemini
tools = ToolsSchema(
standard_tools=[], # No standard function declarations needed
custom_tools={
AdapterType.GEMINI: [
{"google_search": {}},
{"code_execution": {}}
]
}
custom_tools={AdapterType.GEMINI: [{"google_search": {}}, {"code_execution": {}}]},
)
llm = GeminiMultimodalLiveLLMService(
@@ -108,7 +118,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
messages = [
{
"role": "user",
"content": 'Please introduce yourself and let me know that you can help with current information by searching the web. Ask me what current information I\'d like to know about.',
"content": "Please introduce yourself and let me know that you can help with current information by searching the web. Ask me what current information I'd like to know about.",
},
]
@@ -150,6 +160,6 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
if __name__ == "__main__":
from run import main
from pipecat.examples.run import main
main()
main(run_example, transport_params=transport_params)