Files
pipecat/examples/foundational/49d-thinking-functions-google.py
2026-03-07 09:15:24 -05:00

169 lines
5.8 KiB
Python

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
AssistantThoughtMessage,
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.google.llm import GoogleLLMService, GoogleThinkingConfig
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def check_flight_status(params: FunctionCallParams, flight_number: str):
"""Check the status of a flight. Returns status (e.g., "on time", "delayed") and departure time.
Args:
flight_number (str): The flight number, e.g. "AA100".
"""
await params.result_callback({"status": "delayed", "departure_time": "14:30"})
async def book_taxi(params: FunctionCallParams, time: str):
"""Book a taxi for a given time. Returns status (e.g., "done").
Args:
time (str): The time to book the taxi for, e.g. "15:00".
"""
await params.result_callback({"status": "done"})
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = GoogleLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
# model="gemini-3-pro-preview", # A more powerful reasoning model, but slower
settings=GoogleLLMService.Settings(
thinking=GoogleThinkingConfig(
thinking_budget=-1, # Dynamic thinking
include_thoughts=True,
),
system_instruction="You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
),
)
llm.register_direct_function(check_flight_status)
llm.register_direct_function(book_taxi)
tools = ToolsSchema(standard_tools=[check_flight_status, book_taxi])
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
user_aggregator, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
assistant_aggregator, # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message({"role": "user", "content": "Say hello briefly."})
# Replace the above with one of these example prompts to demonstrate
# thinking and function calling.
# This example comes from Gemini docs.
# context.add_message(
# {
# "role": "user",
# "content": "Check the status of flight AA100 and, if it's delayed, book me a taxi 2 hours before its departure time.",
# }
# )
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
@assistant_aggregator.event_handler("on_assistant_thought")
async def on_assistant_thought(aggregator, message: AssistantThoughtMessage):
logger.info(f"Thought (timestamp: {message.timestamp}): {message.content}")
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()