From 403d22e62c6ecce9831788081be4c209e2da9549 Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Fri, 26 Sep 2025 10:28:19 -0300 Subject: [PATCH] Creating the whatsapp routes inside the runner. --- src/pipecat/runner/run.py | 134 +++++++++++++++++++++++++++++++++++++- 1 file changed, 133 insertions(+), 1 deletion(-) diff --git a/src/pipecat/runner/run.py b/src/pipecat/runner/run.py index dcf0d05ad..6672f9b37 100644 --- a/src/pipecat/runner/run.py +++ b/src/pipecat/runner/run.py @@ -70,7 +70,9 @@ import asyncio import os import sys from contextlib import asynccontextmanager +from typing import Optional +import aiohttp from loguru import logger from pipecat.runner.types import ( @@ -82,7 +84,7 @@ from pipecat.runner.types import ( try: import uvicorn from dotenv import load_dotenv - from fastapi import BackgroundTasks, FastAPI, Request, WebSocket + from fastapi import BackgroundTasks, FastAPI, HTTPException, Request, WebSocket from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse, RedirectResponse except ImportError as e: @@ -166,6 +168,7 @@ def _create_server_app( # Set up transport-specific routes if transport_type == "webrtc": _setup_webrtc_routes(app, esp32_mode=esp32_mode, host=host) + _setup_whatsapp_routes(app) elif transport_type == "daily": _setup_daily_routes(app) elif transport_type in ["twilio", "telnyx", "plivo", "exotel"]: @@ -229,6 +232,135 @@ def _setup_webrtc_routes(app: FastAPI, esp32_mode: bool = False, host: str = "lo app.router.lifespan_context = lifespan +def _setup_whatsapp_routes(app: FastAPI): + """Set up WebRTC-specific routes.""" + try: + from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI + + from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection + from pipecat.transports.smallwebrtc.request_handler import ( + SmallWebRTCRequest, + SmallWebRTCRequestHandler, + ) + from pipecat.transports.whatsapp.api import WhatsAppWebhookRequest + from pipecat.transports.whatsapp.client import WhatsAppClient + except ImportError as e: + logger.error(f"WebRTC transport dependencies not installed: {e}") + return + + WHATSAPP_TOKEN = os.getenv("WHATSAPP_TOKEN") + WHATSAPP_PHONE_NUMBER_ID = os.getenv("WHATSAPP_PHONE_NUMBER_ID") + WHATSAPP_WEBHOOK_VERIFICATION_TOKEN = os.getenv("WHATSAPP_WEBHOOK_VERIFICATION_TOKEN") + + if not all( + [ + WHATSAPP_TOKEN, + WHATSAPP_PHONE_NUMBER_ID, + WHATSAPP_WEBHOOK_VERIFICATION_TOKEN, + ] + ): + logger.debug( + "Missing required environment variables for WhatsApp transport. Keeping it disabled." + ) + return + + # Global WhatsApp client instance + whatsapp_client: Optional[WhatsAppClient] = None + + @app.get( + "/whatsapp", + summary="Verify WhatsApp webhook", + description="Handles WhatsApp webhook verification requests from Meta", + ) + async def verify_webhook(request: Request): + """Verify WhatsApp webhook endpoint. + + This endpoint is called by Meta's WhatsApp Business API to verify + the webhook URL during setup. It validates the verification token + and returns the challenge parameter if successful. + """ + params = dict(request.query_params) + logger.debug(f"Webhook verification request received with params: {list(params.keys())}") + + try: + result = await whatsapp_client.handle_verify_webhook_request( + params=params, expected_verification_token=WHATSAPP_WEBHOOK_VERIFICATION_TOKEN + ) + logger.info("Webhook verification successful") + return result + except ValueError as e: + logger.warning(f"Webhook verification failed: {e}") + raise HTTPException(status_code=403, detail="Verification failed") + + @app.post( + "/whatsapp", + summary="Handle WhatsApp webhook events", + description="Processes incoming WhatsApp messages and call events", + ) + async def whatsapp_webhook(body: WhatsAppWebhookRequest, background_tasks: BackgroundTasks): + """Handle incoming WhatsApp webhook events. + + For call events, establishes WebRTC connections and spawns bot instances + in the background to handle real-time communication. + """ + # Validate webhook object type + if body.object != "whatsapp_business_account": + logger.warning(f"Invalid webhook object type: {body.object}") + raise HTTPException(status_code=400, detail="Invalid object type") + + logger.debug(f"Processing WhatsApp webhook: {body.model_dump()}") + + async def connection_callback(connection: SmallWebRTCConnection): + """Handle new WebRTC connections from WhatsApp calls. + + Called when a WebRTC connection is established for a WhatsApp call. + Spawns a bot instance to handle the conversation. + + Args: + connection: The established WebRTC connection + """ + bot_module = _get_bot_module() + runner_args = SmallWebRTCRunnerArguments(webrtc_connection=connection) + background_tasks.add_task(bot_module.bot, runner_args) + + try: + # Process the webhook request + result = await whatsapp_client.handle_webhook_request(body, connection_callback) + logger.debug(f"Webhook processed successfully: {result}") + return {"status": "success", "message": "Webhook processed successfully"} + except ValueError as ve: + logger.warning(f"Invalid webhook request format: {ve}") + raise HTTPException(status_code=400, detail=f"Invalid request: {str(ve)}") + except Exception as e: + logger.error(f"Internal error processing webhook: {e}") + raise HTTPException(status_code=500, detail="Internal server error processing webhook") + + @asynccontextmanager + async def lifespan(app: FastAPI): + """Manage FastAPI application lifecycle and cleanup connections.""" + global whatsapp_client + + # Initialize WhatsApp client with persistent HTTP session + async with aiohttp.ClientSession() as session: + whatsapp_client = WhatsAppClient( + whatsapp_token=WHATSAPP_TOKEN, + phone_number_id=WHATSAPP_PHONE_NUMBER_ID, + session=session, + ) + logger.info("WhatsApp client initialized successfully") + + try: + yield # Run the application + finally: + # Cleanup all active calls on shutdown + logger.info("Cleaning up WhatsApp client resources...") + if whatsapp_client: + await whatsapp_client.terminate_all_calls() + logger.info("Cleanup completed") + + app.router.lifespan_context = lifespan + + def _setup_daily_routes(app: FastAPI): """Set up Daily-specific routes."""