Files
pipecat/examples/runner-examples/cloud-simple-bot2.py
2025-07-29 15:12:26 -04:00

166 lines
5.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Pipecat Cloud-compatible bot example.
Transports are Twilio or SmallWebRTC."""
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.cloud import SmallWebRTCSessionArguments
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
try:
from pipecatcloud.agent import DailySessionArguments, WebSocketSessionArguments
except ImportError:
raise ImportError(
"pipecatcloud package is required for cloud-compatible bots. "
"Install with: pip install pipecat-ai[pipecatcloud]"
)
load_dotenv(override=True)
# Check if we're running locally
IS_LOCAL_RUN = os.environ.get("LOCAL_RUN", "0") == "1"
async def run_bot(transport):
"""Main bot logic that works with any transport."""
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a friendly AI assistant. Respond naturally and keep your answers conversational.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
pipeline = Pipeline(
[
transport.input(),
rtvi,
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
observers=[RTVIObserver(rtvi)],
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Client connected")
messages.append({"role": "system", "content": "Say hello and briefly introduce yourself."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
async def bot(
session_args: DailySessionArguments | SmallWebRTCSessionArguments | WebSocketSessionArguments,
):
"""Main bot entry point compatible with Pipecat Cloud."""
if isinstance(session_args, SmallWebRTCSessionArguments):
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
transport = SmallWebRTCTransport(
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
webrtc_connection=session_args.webrtc_connection,
)
elif isinstance(session_args, WebSocketSessionArguments):
# Use the utility to parse WebSocket data
from pipecat.runner.utils import parse_telephony_websocket
transport_type, stream_id, call_id = await parse_telephony_websocket(session_args.websocket)
logger.info(f"Auto-detected transport: {transport_type}")
# Create transport based on detected type
if transport_type == "twilio":
from pipecat.serializers.twilio import TwilioFrameSerializer
serializer = TwilioFrameSerializer(
stream_sid=stream_id,
call_sid=call_id,
account_sid=os.getenv("TWILIO_ACCOUNT_SID", ""),
auth_token=os.getenv("TWILIO_AUTH_TOKEN", ""),
)
else:
raise ValueError(f"Unsupported WebSocket transport type: {transport_type}")
# Create the transport
from pipecat.transports.network.fastapi_websocket import (
FastAPIWebsocketParams,
FastAPIWebsocketTransport,
)
transport = FastAPIWebsocketTransport(
websocket=session_args.websocket,
params=FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
add_wav_header=False,
vad_analyzer=SileroVADAnalyzer(),
serializer=serializer,
),
)
await run_bot(transport)
if __name__ == "__main__":
from pipecat.runner.cloud import main
main()