# # Copyright (c) 2025, Daily # # SPDX-License-Identifier: BSD 2-Clause License # import os import aiohttp from dotenv import load_dotenv from loguru import logger from pipecatcloud.agent import DailySessionArguments from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import LLMMessagesFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.services.cartesia.tts import CartesiaTTSService from pipecat.services.openai.llm import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport # Check if we're in local development mode LOCAL_RUN = os.getenv("LOCAL_RUN") if LOCAL_RUN: import asyncio import webbrowser try: from local_runner import configure except ImportError: logger.error("Could not import local_runner module. Local development mode may not work.") # Load environment variables load_dotenv(override=True) async def main(room_url: str, token: str): """Main pipeline setup and execution function. Args: room_url: The Daily room URL token: The Daily room token """ logger.debug("Starting bot in room: {}", room_url) transport = DailyTransport( room_url, token, "bot", DailyParams( audio_out_enabled=True, transcription_enabled=True, vad_enabled=True, vad_analyzer=SileroVADAnalyzer(), ), ) tts = CartesiaTTSService( api_key=os.getenv("CARTESIA_API_KEY"), voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22" ) llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) messages = [ { "role": "system", "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", }, ] context = OpenAILLMContext(messages) context_aggregator = llm.create_context_aggregator(context) pipeline = Pipeline( [ transport.input(), context_aggregator.user(), llm, tts, transport.output(), context_aggregator.assistant(), ] ) task = PipelineTask( pipeline, params=PipelineParams( allow_interruptions=True, enable_metrics=True, enable_usage_metrics=True, report_only_initial_ttfb=True, ), ) @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): logger.info("First participant joined: {}", participant["id"]) await transport.capture_participant_transcription(participant["id"]) # Kick off the conversation. messages.append( { "role": "system", "content": "Please start with 'Hello World' and introduce yourself to the user.", } ) await task.queue_frames([LLMMessagesFrame(messages)]) @transport.event_handler("on_participant_left") async def on_participant_left(transport, participant, reason): logger.info("Participant left: {}", participant) await task.cancel() runner = PipelineRunner() await runner.run(task) async def bot(args: DailySessionArguments): """Main bot entry point compatible with the FastAPI route handler. Args: room_url: The Daily room URL token: The Daily room token body: The configuration object from the request body session_id: The session ID for logging """ logger.info(f"Bot process initialized {args.room_url} {args.token}") try: await main(args.room_url, args.token) logger.info("Bot process completed") except Exception as e: logger.exception(f"Error in bot process: {str(e)}") raise # Local development functions async def local_main(): """Function for local development testing.""" try: async with aiohttp.ClientSession() as session: (room_url, token) = await configure(session) logger.warning("_") logger.warning("_") logger.warning(f"Talk to your voice agent here: {room_url}") logger.warning("_") logger.warning("_") webbrowser.open(room_url) await main(room_url, token) except Exception as e: logger.exception(f"Error in local development mode: {e}") # Local development entry point if LOCAL_RUN and __name__ == "__main__": try: asyncio.run(local_main()) except Exception as e: logger.exception(f"Failed to run in local mode: {e}")