From 855f4842dd9f156835db322a07cd3ec1d8f57031 Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Fri, 17 Oct 2025 10:10:19 -0300 Subject: [PATCH] Creating the WebRTC routes that mimic the ones provided by Pipecat Cloud. --- src/pipecat/runner/run.py | 84 +++++++++++++++++++++++++++++++++++---- 1 file changed, 77 insertions(+), 7 deletions(-) diff --git a/src/pipecat/runner/run.py b/src/pipecat/runner/run.py index 9b1d233f6..a90c96ac5 100644 --- a/src/pipecat/runner/run.py +++ b/src/pipecat/runner/run.py @@ -70,12 +70,14 @@ import asyncio import mimetypes import os import sys +import uuid from contextlib import asynccontextmanager +from http import HTTPMethod from pathlib import Path -from typing import Optional +from typing import Any, Dict, List, Optional, TypedDict import aiohttp -from fastapi.responses import FileResponse +from fastapi.responses import FileResponse, Response from loguru import logger from pipecat.runner.types import ( @@ -202,8 +204,9 @@ def _setup_webrtc_routes( try: from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI - from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection + from pipecat.transports.smallwebrtc.connection import IceServer, SmallWebRTCConnection from pipecat.transports.smallwebrtc.request_handler import ( + IceCandidate, SmallWebRTCPatchRequest, SmallWebRTCRequest, SmallWebRTCRequestHandler, @@ -212,6 +215,16 @@ def _setup_webrtc_routes( logger.error(f"WebRTC transport dependencies not installed: {e}") return + class IceConfig(TypedDict): + iceServers: List[IceServer] + + class StartBotResult(TypedDict, total=False): + sessionId: str + iceConfig: Optional[IceConfig] + + # In-memory store of active sessions: session_id -> session info + active_sessions: Dict[str, Dict[str, Any]] = {} + # Mount the frontend app.mount("/client", SmallWebRTCPrebuiltUI) @@ -264,6 +277,67 @@ def _setup_webrtc_routes( await small_webrtc_handler.handle_patch_request(request) return {"status": "success"} + @app.post("/start") + async def rtvi_start(request: Request): + """Mimic Pipecat Cloud's /start endpoint.""" + # Parse the request body + try: + request_data = await request.json() + logger.debug(f"Received request: {request_data}") + except Exception as e: + logger.error(f"Failed to parse request body: {e}") + request_data = {} + + # Store session info immediately in memory, replicate the behavior expected on Pipecat Cloud + session_id = str(uuid.uuid4()) + active_sessions[session_id] = request_data + + result: StartBotResult = {"sessionId": session_id} + if request_data.get("enableDefaultIceServers"): + result["iceConfig"] = IceConfig( + iceServers=[IceServer(urls="stun:stun.l.google.com:19302")] + ) + + return result + + @app.api_route( + "/sessions/{session_id}/{path:path}", + methods=["GET", "POST", "PUT", "PATCH", "DELETE"], + ) + async def proxy_request( + session_id: str, path: str, request: Request, background_tasks: BackgroundTasks + ): + """Mimic Pipecat Cloud's proxy.""" + active_session = active_sessions.get(session_id) + if not active_session: + return Response(content="Invalid or not-yet-ready session_id", status_code=404) + + if path.endswith("api/offer"): + # Parse the request body and convert to SmallWebRTCRequest + try: + request_data = await request.json() + if request.method == HTTPMethod.POST.value: + webrtc_request = SmallWebRTCRequest( + sdp=request_data["sdp"], + type=request_data["type"], + pc_id=request_data.get("pc_id"), + restart_pc=request_data.get("restart_pc"), + request_data=request_data, + ) + return await offer(webrtc_request, background_tasks) + elif request.method == HTTPMethod.PATCH.value: + patch_request = SmallWebRTCPatchRequest( + pc_id=request_data["pc_id"], + candidates=[IceCandidate(**c) for c in request_data.get("candidates", [])], + ) + return await ice_candidate(patch_request) + except Exception as e: + logger.error(f"Failed to parse WebRTC request: {e}") + return Response(content="Invalid WebRTC request", status_code=400) + + logger.info(f"Received request for path: {path}") + return Response(status_code=200) + @asynccontextmanager async def smallwebrtc_lifespan(app: FastAPI): """Manage FastAPI application lifecycle and cleanup connections.""" @@ -503,8 +577,6 @@ def _setup_daily_routes(app: FastAPI): else: logger.debug("No body data provided in request") - import aiohttp - from pipecat.runner.daily import configure async with aiohttp.ClientSession() as session: @@ -592,8 +664,6 @@ def _setup_telephony_routes(app: FastAPI, *, transport_type: str, proxy: str): async def _run_daily_direct(): """Run Daily bot with direct connection (no FastAPI server).""" try: - import aiohttp - from pipecat.runner.daily import configure except ImportError as e: logger.error("Daily transport dependencies not installed.")