# # Copyright (c) 2024-2026, Daily # # SPDX-License-Identifier: BSD 2-Clause License # import asyncio import os from dotenv import load_dotenv from loguru import logger from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import LLMRunFrame, ManuallySwitchServiceFrame from pipecat.pipeline.llm_switcher import LLMSwitcher from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.service_switcher import ServiceSwitcher from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMContextAggregatorPair, LLMUserAggregatorParams, ) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.cartesia.stt import CartesiaSTTService from pipecat.services.cartesia.tts import CartesiaTTSService from pipecat.services.deepgram.stt import DeepgramSTTService from pipecat.services.deepgram.tts import DeepgramTTSService from pipecat.services.google.llm import GoogleLLMService from pipecat.services.llm_service import FunctionCallParams from pipecat.services.openai.llm import OpenAILLMService from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams load_dotenv(override=True) # "Classic" function async def fetch_weather_from_api(params: FunctionCallParams): await params.result_callback({"conditions": "nice", "temperature": "75"}) # "Direct" function async def get_restaurant_recommendation(params: FunctionCallParams, location: str): """ Get a restaurant recommendation. Args: location (str): The city and state, e.g. "San Francisco, CA". """ await params.result_callback({"name": "The Golden Dragon"}) # We use lambdas to defer transport parameter creation until the transport # type is selected at runtime. transport_params = { "daily": lambda: DailyParams( audio_in_enabled=True, audio_out_enabled=True, ), "twilio": lambda: FastAPIWebsocketParams( audio_in_enabled=True, audio_out_enabled=True, ), "webrtc": lambda: TransportParams( audio_in_enabled=True, audio_out_enabled=True, ), } async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info(f"Starting bot") weather_function = FunctionSchema( name="get_current_weather", description="Get the current weather", properties={ "location": { "type": "string", "description": "The city and state, e.g. San Francisco, CA", }, "format": { "type": "string", "enum": ["celsius", "fahrenheit"], "description": "The temperature unit to use. Infer this from the user's location.", }, }, required=["location", "format"], ) stt_cartesia = CartesiaSTTService(api_key=os.getenv("CARTESIA_API_KEY")) stt_deepgram = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) # Uses ServiceSwitcherStrategyManual by default stt_switcher = ServiceSwitcher(services=[stt_cartesia, stt_deepgram]) tts_cartesia = CartesiaTTSService( api_key=os.getenv("CARTESIA_API_KEY"), settings=CartesiaTTSService.Settings( voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady ), ) tts_deepgram = DeepgramTTSService( api_key=os.getenv("DEEPGRAM_API_KEY"), settings=DeepgramTTSService.Settings( voice="aura-2-helena-en", ), ) # Uses ServiceSwitcherStrategyManual by default tts_switcher = ServiceSwitcher(services=[tts_cartesia, tts_deepgram]) system_prompt = "You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way." llm_openai = OpenAILLMService( api_key=os.getenv("OPENAI_API_KEY"), settings=OpenAILLMService.Settings(system_instruction=system_prompt), ) llm_google = GoogleLLMService( api_key=os.getenv("GOOGLE_API_KEY"), settings=GoogleLLMService.Settings(system_instruction=system_prompt), ) # Uses ServiceSwitcherStrategyManual by default llm_switcher = LLMSwitcher(llms=[llm_openai, llm_google]) # Register a "classic" function llm_switcher.register_function("get_current_weather", fetch_weather_from_api) # Register a "direct" function llm_switcher.register_direct_function(get_restaurant_recommendation) tools = ToolsSchema(standard_tools=[weather_function, get_restaurant_recommendation]) context = LLMContext(tools=tools) user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), ) pipeline = Pipeline( [ transport.input(), # Transport user input stt_switcher, user_aggregator, # User responses llm_switcher, # LLM tts_switcher, # TTS transport.output(), # Transport bot output assistant_aggregator, # Assistant spoken responses ] ) task = PipelineTask( pipeline, params=PipelineParams( enable_metrics=True, enable_usage_metrics=True, ), idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, ) @transport.event_handler("on_client_connected") async def on_client_connected(transport, client): logger.info(f"Client connected") # Kick off the conversation. context.add_message( {"role": "developer", "content": "Please introduce yourself to the user."} ) await task.queue_frames([LLMRunFrame()]) await asyncio.sleep(15) print(f"Switching to {stt_deepgram}") await task.queue_frames([ManuallySwitchServiceFrame(service=stt_deepgram)]) await asyncio.sleep(15) print(f"Switching to {llm_google}") await task.queue_frames([ManuallySwitchServiceFrame(service=llm_google)]) await asyncio.sleep(15) print(f"Switching to {tts_deepgram}") await task.queue_frames([ManuallySwitchServiceFrame(service=tts_deepgram)]) @transport.event_handler("on_client_disconnected") async def on_client_disconnected(transport, client): logger.info(f"Client disconnected") await task.cancel() runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) await runner.run(task) async def bot(runner_args: RunnerArguments): """Main bot entry point compatible with Pipecat Cloud.""" transport = await create_transport(runner_args, transport_params) await run_bot(transport, runner_args) if __name__ == "__main__": from pipecat.runner.run import main main()