Creating the whatsapp routes inside the runner.

This commit is contained in:
Filipi Fuchter
2025-09-26 10:28:19 -03:00
parent ee00ee5c57
commit 403d22e62c

View File

@@ -70,7 +70,9 @@ import asyncio
import os
import sys
from contextlib import asynccontextmanager
from typing import Optional
import aiohttp
from loguru import logger
from pipecat.runner.types import (
@@ -82,7 +84,7 @@ from pipecat.runner.types import (
try:
import uvicorn
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI, Request, WebSocket
from fastapi import BackgroundTasks, FastAPI, HTTPException, Request, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, RedirectResponse
except ImportError as e:
@@ -166,6 +168,7 @@ def _create_server_app(
# Set up transport-specific routes
if transport_type == "webrtc":
_setup_webrtc_routes(app, esp32_mode=esp32_mode, host=host)
_setup_whatsapp_routes(app)
elif transport_type == "daily":
_setup_daily_routes(app)
elif transport_type in ["twilio", "telnyx", "plivo", "exotel"]:
@@ -229,6 +232,135 @@ def _setup_webrtc_routes(app: FastAPI, esp32_mode: bool = False, host: str = "lo
app.router.lifespan_context = lifespan
def _setup_whatsapp_routes(app: FastAPI):
"""Set up WebRTC-specific routes."""
try:
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
from pipecat.transports.smallwebrtc.request_handler import (
SmallWebRTCRequest,
SmallWebRTCRequestHandler,
)
from pipecat.transports.whatsapp.api import WhatsAppWebhookRequest
from pipecat.transports.whatsapp.client import WhatsAppClient
except ImportError as e:
logger.error(f"WebRTC transport dependencies not installed: {e}")
return
WHATSAPP_TOKEN = os.getenv("WHATSAPP_TOKEN")
WHATSAPP_PHONE_NUMBER_ID = os.getenv("WHATSAPP_PHONE_NUMBER_ID")
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN = os.getenv("WHATSAPP_WEBHOOK_VERIFICATION_TOKEN")
if not all(
[
WHATSAPP_TOKEN,
WHATSAPP_PHONE_NUMBER_ID,
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN,
]
):
logger.debug(
"Missing required environment variables for WhatsApp transport. Keeping it disabled."
)
return
# Global WhatsApp client instance
whatsapp_client: Optional[WhatsAppClient] = None
@app.get(
"/whatsapp",
summary="Verify WhatsApp webhook",
description="Handles WhatsApp webhook verification requests from Meta",
)
async def verify_webhook(request: Request):
"""Verify WhatsApp webhook endpoint.
This endpoint is called by Meta's WhatsApp Business API to verify
the webhook URL during setup. It validates the verification token
and returns the challenge parameter if successful.
"""
params = dict(request.query_params)
logger.debug(f"Webhook verification request received with params: {list(params.keys())}")
try:
result = await whatsapp_client.handle_verify_webhook_request(
params=params, expected_verification_token=WHATSAPP_WEBHOOK_VERIFICATION_TOKEN
)
logger.info("Webhook verification successful")
return result
except ValueError as e:
logger.warning(f"Webhook verification failed: {e}")
raise HTTPException(status_code=403, detail="Verification failed")
@app.post(
"/whatsapp",
summary="Handle WhatsApp webhook events",
description="Processes incoming WhatsApp messages and call events",
)
async def whatsapp_webhook(body: WhatsAppWebhookRequest, background_tasks: BackgroundTasks):
"""Handle incoming WhatsApp webhook events.
For call events, establishes WebRTC connections and spawns bot instances
in the background to handle real-time communication.
"""
# Validate webhook object type
if body.object != "whatsapp_business_account":
logger.warning(f"Invalid webhook object type: {body.object}")
raise HTTPException(status_code=400, detail="Invalid object type")
logger.debug(f"Processing WhatsApp webhook: {body.model_dump()}")
async def connection_callback(connection: SmallWebRTCConnection):
"""Handle new WebRTC connections from WhatsApp calls.
Called when a WebRTC connection is established for a WhatsApp call.
Spawns a bot instance to handle the conversation.
Args:
connection: The established WebRTC connection
"""
bot_module = _get_bot_module()
runner_args = SmallWebRTCRunnerArguments(webrtc_connection=connection)
background_tasks.add_task(bot_module.bot, runner_args)
try:
# Process the webhook request
result = await whatsapp_client.handle_webhook_request(body, connection_callback)
logger.debug(f"Webhook processed successfully: {result}")
return {"status": "success", "message": "Webhook processed successfully"}
except ValueError as ve:
logger.warning(f"Invalid webhook request format: {ve}")
raise HTTPException(status_code=400, detail=f"Invalid request: {str(ve)}")
except Exception as e:
logger.error(f"Internal error processing webhook: {e}")
raise HTTPException(status_code=500, detail="Internal server error processing webhook")
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage FastAPI application lifecycle and cleanup connections."""
global whatsapp_client
# Initialize WhatsApp client with persistent HTTP session
async with aiohttp.ClientSession() as session:
whatsapp_client = WhatsAppClient(
whatsapp_token=WHATSAPP_TOKEN,
phone_number_id=WHATSAPP_PHONE_NUMBER_ID,
session=session,
)
logger.info("WhatsApp client initialized successfully")
try:
yield # Run the application
finally:
# Cleanup all active calls on shutdown
logger.info("Cleaning up WhatsApp client resources...")
if whatsapp_client:
await whatsapp_client.terminate_all_calls()
logger.info("Cleanup completed")
app.router.lifespan_context = lifespan
def _setup_daily_routes(app: FastAPI):
"""Set up Daily-specific routes."""