From 36c654942635aa0f4ce74ad72153f22c126cfdbf Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Mon, 15 Dec 2025 18:19:11 -0500 Subject: [PATCH] Added Daily PSTN dial-in support to the development runner --- changelog/3235.added.md | 6 ++ src/pipecat/runner/run.py | 142 +++++++++++++++++++++++++++++++++++- src/pipecat/runner/types.py | 41 ++++++++++- 3 files changed, 184 insertions(+), 5 deletions(-) create mode 100644 changelog/3235.added.md diff --git a/changelog/3235.added.md b/changelog/3235.added.md new file mode 100644 index 000000000..ed0eb378e --- /dev/null +++ b/changelog/3235.added.md @@ -0,0 +1,6 @@ +- Added Daily PSTN dial-in support to the development runner with `--dialin` flag. This includes: + + - `/daily-dialin-webhook` endpoint that handles incoming Daily PSTN webhooks + - Automatic Daily room creation with SIP configuration + - `DialinSettings` and `DailyDialinRequest` types in `pipecat.runner.types` for type-safe dial-in data + - The runner now mimics Pipecat Cloud's dial-in webhook handling for local development diff --git a/src/pipecat/runner/run.py b/src/pipecat/runner/run.py index 083dabc52..62433fcbc 100644 --- a/src/pipecat/runner/run.py +++ b/src/pipecat/runner/run.py @@ -171,6 +171,7 @@ def _create_server_app( esp32_mode: bool = False, whatsapp_enabled: bool = False, folder: Optional[str] = None, + dialin_enabled: bool = False, ): """Create FastAPI app with transport-specific routes.""" app = FastAPI() @@ -189,7 +190,7 @@ def _create_server_app( if whatsapp_enabled: _setup_whatsapp_routes(app) elif transport_type == "daily": - _setup_daily_routes(app) + _setup_daily_routes(app, dialin_enabled=dialin_enabled) elif transport_type in TELEPHONY_TRANSPORTS: _setup_telephony_routes(app, transport_type=transport_type, proxy=proxy) else: @@ -533,8 +534,13 @@ def _setup_whatsapp_routes(app: FastAPI): _add_lifespan_to_app(app, whatsapp_lifespan) -def _setup_daily_routes(app: FastAPI): - """Set up Daily-specific routes.""" +def _setup_daily_routes(app: FastAPI, dialin_enabled: bool = False): + """Set up Daily-specific routes. + + Args: + app: FastAPI application instance + dialin_enabled: If True, adds /daily-dialin-webhook endpoint for PSTN dial-in handling + """ @app.get("/") async def create_room_and_start_agent(): @@ -639,6 +645,116 @@ def _setup_daily_routes(app: FastAPI): return result + if dialin_enabled: + + @app.post("/daily-dialin-webhook") + async def handle_dialin_webhook(request: Request): + """Handle incoming Daily PSTN dial-in webhook. + + This endpoint mimics Pipecat Cloud's dial-in webhook handler. + It receives Daily webhook data, creates a SIP-enabled room, and starts the bot. + + Expected webhook payload:: + + { + "From": "+15551234567", + "To": "+15559876543", + "callId": "uuid-call-id", + "callDomain": "uuid-call-domain", + "sipHeaders": {...} // optional + } + + Returns:: + + { + "dailyRoom": "https://...", + "dailyToken": "...", + "sessionId": "uuid" + } + """ + logger.debug("Received Daily dial-in webhook") + + try: + data = await request.json() + logger.debug(f"Webhook data: {data}") + except Exception as e: + logger.error(f"Failed to parse webhook data: {e}") + raise HTTPException(status_code=400, detail="Invalid JSON payload") + + # Handle webhook verification test (sent by Daily when configuring webhook) + if data.get("test") or data.get("Test"): + logger.debug("Webhook verification test received") + return {"status": "OK"} + + # Validate required fields + if not all(key in data for key in ["From", "To", "callId", "callDomain"]): + raise HTTPException( + status_code=400, + detail="Missing required fields: From, To, callId, callDomain", + ) + + import aiohttp + + from pipecat.runner.daily import configure + from pipecat.runner.types import DailyDialinRequest, DialinSettings + + # Create Daily room with SIP capabilities + async with aiohttp.ClientSession() as session: + try: + room_config = await configure(session, sip_caller_phone=data.get("From")) + except Exception as e: + logger.error(f"Failed to create Daily room: {e}") + raise HTTPException( + status_code=500, detail=f"Failed to create Daily room: {str(e)}" + ) + + # Get Daily API URL from environment, fallback to production + daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1") + + # Get Daily API key from environment + daily_api_key = os.getenv("DAILY_API_KEY") + if not daily_api_key: + logger.error("DAILY_API_KEY not found in environment") + raise HTTPException( + status_code=500, detail="DAILY_API_KEY not configured on server" + ) + + # Prepare dial-in settings matching Pipecat Cloud structure + dialin_settings = DialinSettings( + call_id=data.get("callId"), + call_domain=data.get("callDomain"), + To=data.get("To"), + From=data.get("From"), + sip_headers=data.get("sipHeaders"), + ) + + # Create request body matching Pipecat Cloud payload + request_body = DailyDialinRequest( + dialin_settings=dialin_settings, + daily_api_key=daily_api_key, + daily_api_url=daily_api_url, + ) + + # Start bot with dial-in context + bot_module = _get_bot_module() + runner_args = DailyRunnerArguments( + room_url=room_config.room_url, + token=room_config.token, + body=request_body.model_dump(), + ) + + asyncio.create_task(bot_module.bot(runner_args)) + + # Generate session ID + session_id = str(uuid.uuid4()) + + # Return response matching Pipecat Cloud format + return { + "dailyRoom": room_config.room_url, + "dailyToken": room_config.token, + "sessionId": session_id, + } + def _setup_telephony_routes(app: FastAPI, *, transport_type: str, proxy: str): """Set up telephony-specific routes.""" @@ -813,6 +929,12 @@ def main(): default=False, help="Ensure requried WhatsApp environment variables are present", ) + parser.add_argument( + "--dialin", + action="store_true", + default=False, + help="Enable Daily PSTN dial-in webhook handling (requires Daily transport)", + ) args = parser.parse_args() @@ -832,6 +954,11 @@ def main(): logger.error("For ESP32, you need to specify `--host IP` so we can do SDP munging.") return + # Validate dial-in requirements + if args.dialin and args.transport != "daily": + logger.error("--dialin flag only works with Daily transport (-t daily)") + return + # Log level logger.remove() logger.add(sys.stderr, level="TRACE" if args.verbose else "DEBUG") @@ -860,7 +987,13 @@ def main(): elif args.transport == "daily": print() print(f"🚀 Bot ready!") - print(f" → Open http://{args.host}:{args.port} in your browser to start a session") + if args.dialin: + print( + f" → Daily dial-in webhook: http://{args.host}:{args.port}/daily-dialin-webhook" + ) + print(f" → Configure this URL in your Daily phone number settings") + else: + print(f" → Open http://{args.host}:{args.port} in your browser to start a session") print() RUNNER_DOWNLOADS_FOLDER = args.folder @@ -875,6 +1008,7 @@ def main(): esp32_mode=args.esp32, whatsapp_enabled=args.whatsapp, folder=args.folder, + dialin_enabled=args.dialin, ) # Run the server diff --git a/src/pipecat/runner/types.py b/src/pipecat/runner/types.py index fab2404f3..d7803b23c 100644 --- a/src/pipecat/runner/types.py +++ b/src/pipecat/runner/types.py @@ -11,9 +11,48 @@ information to bot functions. """ from dataclasses import dataclass, field -from typing import Any, Optional +from typing import Any, Dict, Optional from fastapi import WebSocket +from pydantic import BaseModel + + +class DialinSettings(BaseModel): + """Dial-in settings from the Daily webhook. + + This model matches the structure sent by Pipecat Cloud and Daily.co webhooks + for incoming PSTN/SIP calls. + + Parameters: + call_id: Unique identifier for the call (UUID representing sessionId in SIP Network) + call_domain: Daily domain for the call (UUID representing Daily Domain on SIP Network) + To: The dialed phone number (optional) + From: The caller's phone number (optional) + sip_headers: Optional SIP headers from the call + """ + + call_id: str + call_domain: str + To: Optional[str] = None + From: Optional[str] = None + sip_headers: Optional[Dict[str, str]] = None + + +class DailyDialinRequest(BaseModel): + """Request data for Daily PSTN dial-in requests. + + This is the structure passed in runner_args.body for dial-in calls. + It matches the payload structure from Pipecat Cloud's dial-in webhook handler. + + Parameters: + dialin_settings: Dial-in configuration including call_id, call_domain, To, From + daily_api_key: Daily API key for pinlessCallUpdate (required for dial-in) + daily_api_url: Daily API URL (staging or production) + """ + + dialin_settings: DialinSettings + daily_api_key: str + daily_api_url: str @dataclass