# # Copyright (c) 2024–2025, Daily # # SPDX-License-Identifier: BSD 2-Clause License # from dotenv import load_dotenv from loguru import logger from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.aws.llm import AWSBedrockLLMService from pipecat.services.aws.stt import AWSTranscribeSTTService from pipecat.services.aws.tts import AWSPollyTTSService from pipecat.services.llm_service import FunctionCallParams from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams from pipecat.transports.services.daily import DailyParams load_dotenv(override=True) async def fetch_weather_from_api(params: FunctionCallParams): await params.result_callback({"conditions": "nice", "temperature": "75"}) async def fetch_restaurant_recommendation(params: FunctionCallParams): await params.result_callback({"name": "The Golden Dragon"}) # We store functions so objects (e.g. SileroVADAnalyzer) don't get # instantiated. The function will be called when the desired transport gets # selected. transport_params = { "daily": lambda: DailyParams( audio_in_enabled=True, audio_out_enabled=True, vad_analyzer=SileroVADAnalyzer(), ), "twilio": lambda: FastAPIWebsocketParams( audio_in_enabled=True, audio_out_enabled=True, vad_analyzer=SileroVADAnalyzer(), ), "webrtc": lambda: TransportParams( audio_in_enabled=True, audio_out_enabled=True, vad_analyzer=SileroVADAnalyzer(), ), } async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info(f"Starting bot") stt = AWSTranscribeSTTService() tts = AWSPollyTTSService( region="us-west-2", # only specific regions support generative TTS voice_id="Joanna", params=AWSPollyTTSService.InputParams(engine="generative", rate="1.1"), ) llm = AWSBedrockLLMService( aws_region="us-west-2", model="us.anthropic.claude-3-5-haiku-20241022-v1:0", params=AWSBedrockLLMService.InputParams(temperature=0.8, latency="optimized"), ) # You can also register a function_name of None to get all functions # sent to the same callback with an additional function_name parameter. llm.register_function("get_current_weather", fetch_weather_from_api) llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation) weather_function = FunctionSchema( name="get_current_weather", description="Get the current weather", properties={ "location": { "type": "string", "description": "The city and state, e.g. San Francisco, CA", }, "format": { "type": "string", "enum": ["celsius", "fahrenheit"], "description": "The temperature unit to use. Infer this from the user's location.", }, }, required=["location", "format"], ) restaurant_function = FunctionSchema( name="get_restaurant_recommendation", description="Get a restaurant recommendation", properties={ "location": { "type": "string", "description": "The city and state, e.g. San Francisco, CA", }, }, required=["location"], ) tools = ToolsSchema(standard_tools=[weather_function, restaurant_function]) messages = [ { "role": "system", "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", }, ] context = OpenAILLMContext(messages, tools) context_aggregator = llm.create_context_aggregator(context) pipeline = Pipeline( [ transport.input(), stt, context_aggregator.user(), llm, tts, transport.output(), context_aggregator.assistant(), ] ) task = PipelineTask( pipeline, params=PipelineParams( enable_metrics=True, enable_usage_metrics=True, ), ) @transport.event_handler("on_client_connected") async def on_client_connected(transport, client): logger.info(f"Client connected") # Kick off the conversation. messages.append({"role": "user", "content": "Please introduce yourself to the user."}) await task.queue_frames([context_aggregator.user().get_context_frame()]) @transport.event_handler("on_client_disconnected") async def on_client_disconnected(transport, client): logger.info(f"Client disconnected") await task.cancel() runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) await runner.run(task) async def bot(runner_args: RunnerArguments): """Main bot entry point compatible with Pipecat Cloud.""" transport = await create_transport(runner_args, transport_params) await run_bot(transport, runner_args) if __name__ == "__main__": from pipecat.runner.run import main main()