Files
pipecat/examples/phone-chatbot/bot_runner.py
Dominic Stewart 1ba037865b Call Transfer demo (#1348)
* Updated code to dial out to an operator, keep track of operator conversation while escalated and then return to conversation when finished

* Removed unnecessary imports

* Updated bot runner code, added call routing file and then updated the call transfer and voicemail detection examples

* Updated the bot files

* Made prompt one level higher in the body and an array

* Updated call transfer examples to work correctly

* Updated gemini voicemail detection example to work

* Added twilio bot support back to the bot_runner

* Moved some state management, participant management and other logic to the helper file.

* Updated comments

* Updated env and requirements file

* Ran the examples and made sure code works. Still need to work on the prompts a bit

* Fixed format issue

* Add support to disable summary in call transfer

* Added support for operator transfer mode

* Updated readme file

* Updated readme based on feedback, and handling of various properties in the json to be more flexible for future examples

* Updated number of endpoints

* Updated readme to remove fly deployment text and replaced with Pipecat Cloud

* Starting to tweak function calls and prompts

* Updated examples to more consistently call the functions and say what they need to say

* Updated examples

* Updated examples

* Updated examples to work correctly

* Add simple bot versions of dialin and dialout

* Refactored the bot runner file to make adding future examples easier

* Based on feedback, removed examples for multiple LLMs and also adjusted voicemail detection code to be simpler

* Made sure to only capture the users transcription once

* Updated readme with latest changes

* Forgot to update the order of examples in one place

* Fixed formatting issue

* Adjusted based on james feedback

* Changed default_mode to default_calltransfer_mode
2025-04-03 09:03:23 +09:00

338 lines
11 KiB
Python

import argparse
import json
import os
import shlex
import subprocess
from contextlib import asynccontextmanager
from typing import Any, Dict
import aiohttp
from bot_constants import (
MAX_SESSION_TIME,
REQUIRED_ENV_VARS,
)
from bot_definitions import bot_registry
from bot_runner_helpers import (
determine_room_capabilities,
ensure_prompt_config,
process_dialin_request,
)
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, PlainTextResponse
from twilio.twiml.voice_response import VoiceResponse
from pipecat.transports.services.helpers.daily_rest import (
DailyRESTHelper,
DailyRoomParams,
DailyRoomProperties,
DailyRoomSipParams,
)
load_dotenv(override=True)
daily_helpers = {}
# ----------------- Daily Room Management ----------------- #
async def create_daily_room(room_url: str = None, config_body: Dict[str, Any] = None):
"""Create or retrieve a Daily room with appropriate properties based on the configuration.
Args:
room_url: Optional existing room URL
config_body: Optional configuration that determines room capabilities
Returns:
Dict containing room URL, token, and SIP endpoint
"""
if not room_url:
# Get room capabilities based on the configuration
capabilities = determine_room_capabilities(config_body)
# Configure SIP parameters if dialin is needed
sip_params = None
if capabilities["enable_dialin"]:
sip_params = DailyRoomSipParams(
display_name="dialin-user", video=False, sip_mode="dial-in", num_endpoints=2
)
# Create the properties object with the appropriate settings
properties = DailyRoomProperties(sip=sip_params)
# Set dialout capability if needed
if capabilities["enable_dialout"]:
properties.enable_dialout = True
# Log the capabilities being used
capability_str = ", ".join([f"{k}={v}" for k, v in capabilities.items()])
print(f"Creating room with capabilities: {capability_str}")
params = DailyRoomParams(properties=properties)
print("Creating new room...")
room = await daily_helpers["rest"].create_room(params=params)
else:
# Check if passed room URL exists
try:
room = await daily_helpers["rest"].get_room_from_url(room_url)
except Exception:
raise HTTPException(status_code=500, detail=f"Room not found: {room_url}")
print(f"Daily room: {room.url} {room.config.sip_endpoint}")
# Get token for the agent
token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
if not room or not token:
raise HTTPException(status_code=500, detail="Failed to get room or token")
return {"room": room.url, "token": token, "sip_endpoint": room.config.sip_endpoint}
# ----------------- Bot Process Management ----------------- #
async def start_bot(room_details: Dict[str, str], body: Dict[str, Any], example: str) -> bool:
"""Start a bot process with the given configuration.
Args:
room_details: Room URL and token
body: Bot configuration
example: Example script to run
Returns:
Boolean indicating success
"""
room_url = room_details["room"]
token = room_details["token"]
# Properly format body as JSON string for command line
body_json = json.dumps(body).replace('"', '\\"')
print(f"++++ Body JSON: {body_json}")
# Modified to use non-LLM-specific bot module names
bot_proc = f'python3 -m {example} -u {room_url} -t {token} -b "{body_json}"'
print(f"Starting bot. Example: {example}, Room: {room_url}")
try:
command_parts = shlex.split(bot_proc)
subprocess.Popen(command_parts, bufsize=1, cwd=os.path.dirname(os.path.abspath(__file__)))
return True
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
async def start_twilio_bot(room_details: Dict[str, str], call_id: str) -> bool:
"""Start a Twilio bot process with the given configuration.
Args:
room_details: Room URL, token, and SIP endpoint
call_id: Twilio call ID (CallSid)
Returns:
Boolean indicating success
"""
room_url = room_details["room"]
token = room_details["token"]
sip_endpoint = room_details["sip_endpoint"]
# Format command for Twilio bot
bot_proc = f"python3 -m bot_twilio -u {room_url} -t {token} -i {call_id} -s {sip_endpoint}"
print(f"Starting Twilio bot. Room: {room_url}")
try:
command_parts = shlex.split(bot_proc)
subprocess.Popen(command_parts, bufsize=1, cwd=os.path.dirname(os.path.abspath(__file__)))
return True
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
# ----------------- API Setup ----------------- #
@asynccontextmanager
async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ----------------- API Endpoints ----------------- #
@app.post("/twilio_start_bot", response_class=PlainTextResponse)
async def twilio_start_bot(request: Request):
"""Handle incoming Twilio webhook calls and start a Twilio bot.
This endpoint is called directly by Twilio as a webhook when a call is received.
It puts the call on hold with music and starts a bot that will handle the call.
"""
print("POST /twilio_start_bot")
# Get form data from Twilio webhook
try:
form_data = await request.form()
data = dict(form_data)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Failed to parse Twilio form data: {str(e)}")
# Get default room URL from environment
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", None)
# Extract call ID from Twilio data
call_id = data.get("CallSid")
if not call_id:
raise HTTPException(status_code=400, detail="Missing 'CallSid' in request")
print(f"CallId: {call_id}")
# Create Daily room for the Twilio call
room_details = await create_daily_room(room_url, None) # No special config for Twilio rooms
# Start the Twilio bot
await start_twilio_bot(room_details, call_id)
# Put the call on hold until the bot is ready to handle it
# The bot will update the call with the SIP URI when it's ready
resp = VoiceResponse()
resp.play(
url="http://com.twilio.sounds.music.s3.amazonaws.com/MARKOVICHAMP-Borghestral.mp3", loop=10
)
return str(resp)
@app.post("/start")
async def handle_start_request(request: Request) -> JSONResponse:
"""Unified endpoint to handle bot configuration for different scenarios."""
# Get default room URL from environment
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", None)
try:
# Check if this is form data (from Twilio) or JSON
content_type = request.headers.get("content-type", "").lower()
if "application/x-www-form-urlencoded" in content_type:
# Handle form data from Twilio
form_data = await request.form()
data = dict(form_data)
# Check for CallSid which indicates this is a Twilio webhook
if "CallSid" in data:
# Redirect to Twilio handler for backward compatibility
return await twilio_start_bot(request)
else:
# Parse JSON request data
data = await request.json()
# Handle webhook test
if "test" in data:
return JSONResponse({"test": True})
# Handle direct dialin webhook from Daily
if all(key in data for key in ["From", "To", "callId", "callDomain"]):
body = await process_dialin_request(data)
# Handle body-based request
elif "config" in data:
# Use the registry to set up the bot configuration
body = bot_registry.setup_configuration(data["config"])
else:
raise HTTPException(status_code=400, detail="Invalid request format")
# Ensure prompt configuration
body = ensure_prompt_config(body)
# Detect which bot type to use
bot_type_name = bot_registry.detect_bot_type(body)
if not bot_type_name:
raise HTTPException(
status_code=400, detail="Configuration doesn't match any supported scenario"
)
# Create the Daily room
room_details = await create_daily_room(room_url, body)
# Start the bot
await start_bot(room_details, body, bot_type_name)
# Get the bot type
bot_type = bot_registry.get_bot(bot_type_name)
# Build the response
response = {"status": "Bot started", "bot_type": bot_type_name}
# Add room URL for test mode
if bot_type.has_test_mode(body):
response["room_url"] = room_details["room"]
# Remove llm_model from response as it's no longer relevant
if "llm" in body:
response["llm_provider"] = body["llm"] # Optionally keep track of provider
# Add dialout info for dialout scenarios
if "dialout_settings" in body and len(body["dialout_settings"]) > 0:
first_setting = body["dialout_settings"][0]
if "phoneNumber" in first_setting:
response["dialing_to"] = f"phone:{first_setting['phoneNumber']}"
elif "sipUri" in first_setting:
response["dialing_to"] = f"sip:{first_setting['sipUri']}"
return JSONResponse(response)
except json.JSONDecodeError:
# Check if this might be form data from Twilio
try:
content_type = request.headers.get("content-type", "").lower()
if "application/x-www-form-urlencoded" in content_type:
return await twilio_start_bot(request)
except Exception:
pass
raise HTTPException(status_code=400, detail="Invalid JSON in request body")
except Exception as e:
raise HTTPException(status_code=400, detail=f"Request processing error: {str(e)}")
# ----------------- Main ----------------- #
if __name__ == "__main__":
# Check environment variables
for env_var in REQUIRED_ENV_VARS:
if env_var not in os.environ:
raise Exception(f"Missing environment variable: {env_var}.")
parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
parser.add_argument(
"--host", type=str, default=os.getenv("HOST", "0.0.0.0"), help="Host address"
)
parser.add_argument("--port", type=int, default=os.getenv("PORT", 7860), help="Port number")
parser.add_argument("--reload", action="store_true", default=True, help="Reload code on change")
config = parser.parse_args()
try:
import uvicorn
uvicorn.run("bot_runner:app", host=config.host, port=config.port, reload=config.reload)
except KeyboardInterrupt:
print("Pipecat runner shutting down...")