Merge pull request #2875 from pipecat-ai/filipi/rtvi_routes
Creating the WebRTC routes that mimic the ones provided by Pipecat Cloud.
This commit is contained in:
@@ -70,12 +70,14 @@ import asyncio
|
||||
import mimetypes
|
||||
import os
|
||||
import sys
|
||||
import uuid
|
||||
from contextlib import asynccontextmanager
|
||||
from http import HTTPMethod
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
from typing import Any, Dict, List, Optional, TypedDict
|
||||
|
||||
import aiohttp
|
||||
from fastapi.responses import FileResponse
|
||||
from fastapi.responses import FileResponse, Response
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.runner.types import (
|
||||
@@ -202,8 +204,9 @@ def _setup_webrtc_routes(
|
||||
try:
|
||||
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
|
||||
|
||||
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
|
||||
from pipecat.transports.smallwebrtc.connection import IceServer, SmallWebRTCConnection
|
||||
from pipecat.transports.smallwebrtc.request_handler import (
|
||||
IceCandidate,
|
||||
SmallWebRTCPatchRequest,
|
||||
SmallWebRTCRequest,
|
||||
SmallWebRTCRequestHandler,
|
||||
@@ -212,6 +215,16 @@ def _setup_webrtc_routes(
|
||||
logger.error(f"WebRTC transport dependencies not installed: {e}")
|
||||
return
|
||||
|
||||
class IceConfig(TypedDict):
|
||||
iceServers: List[IceServer]
|
||||
|
||||
class StartBotResult(TypedDict, total=False):
|
||||
sessionId: str
|
||||
iceConfig: Optional[IceConfig]
|
||||
|
||||
# In-memory store of active sessions: session_id -> session info
|
||||
active_sessions: Dict[str, Dict[str, Any]] = {}
|
||||
|
||||
# Mount the frontend
|
||||
app.mount("/client", SmallWebRTCPrebuiltUI)
|
||||
|
||||
@@ -264,6 +277,67 @@ def _setup_webrtc_routes(
|
||||
await small_webrtc_handler.handle_patch_request(request)
|
||||
return {"status": "success"}
|
||||
|
||||
@app.post("/start")
|
||||
async def rtvi_start(request: Request):
|
||||
"""Mimic Pipecat Cloud's /start endpoint."""
|
||||
# Parse the request body
|
||||
try:
|
||||
request_data = await request.json()
|
||||
logger.debug(f"Received request: {request_data}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse request body: {e}")
|
||||
request_data = {}
|
||||
|
||||
# Store session info immediately in memory, replicate the behavior expected on Pipecat Cloud
|
||||
session_id = str(uuid.uuid4())
|
||||
active_sessions[session_id] = request_data
|
||||
|
||||
result: StartBotResult = {"sessionId": session_id}
|
||||
if request_data.get("enableDefaultIceServers"):
|
||||
result["iceConfig"] = IceConfig(
|
||||
iceServers=[IceServer(urls="stun:stun.l.google.com:19302")]
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
@app.api_route(
|
||||
"/sessions/{session_id}/{path:path}",
|
||||
methods=["GET", "POST", "PUT", "PATCH", "DELETE"],
|
||||
)
|
||||
async def proxy_request(
|
||||
session_id: str, path: str, request: Request, background_tasks: BackgroundTasks
|
||||
):
|
||||
"""Mimic Pipecat Cloud's proxy."""
|
||||
active_session = active_sessions.get(session_id)
|
||||
if not active_session:
|
||||
return Response(content="Invalid or not-yet-ready session_id", status_code=404)
|
||||
|
||||
if path.endswith("api/offer"):
|
||||
# Parse the request body and convert to SmallWebRTCRequest
|
||||
try:
|
||||
request_data = await request.json()
|
||||
if request.method == HTTPMethod.POST.value:
|
||||
webrtc_request = SmallWebRTCRequest(
|
||||
sdp=request_data["sdp"],
|
||||
type=request_data["type"],
|
||||
pc_id=request_data.get("pc_id"),
|
||||
restart_pc=request_data.get("restart_pc"),
|
||||
request_data=request_data,
|
||||
)
|
||||
return await offer(webrtc_request, background_tasks)
|
||||
elif request.method == HTTPMethod.PATCH.value:
|
||||
patch_request = SmallWebRTCPatchRequest(
|
||||
pc_id=request_data["pc_id"],
|
||||
candidates=[IceCandidate(**c) for c in request_data.get("candidates", [])],
|
||||
)
|
||||
return await ice_candidate(patch_request)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse WebRTC request: {e}")
|
||||
return Response(content="Invalid WebRTC request", status_code=400)
|
||||
|
||||
logger.info(f"Received request for path: {path}")
|
||||
return Response(status_code=200)
|
||||
|
||||
@asynccontextmanager
|
||||
async def smallwebrtc_lifespan(app: FastAPI):
|
||||
"""Manage FastAPI application lifecycle and cleanup connections."""
|
||||
@@ -503,8 +577,6 @@ def _setup_daily_routes(app: FastAPI):
|
||||
else:
|
||||
logger.debug("No body data provided in request")
|
||||
|
||||
import aiohttp
|
||||
|
||||
from pipecat.runner.daily import configure
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
@@ -592,8 +664,6 @@ def _setup_telephony_routes(app: FastAPI, *, transport_type: str, proxy: str):
|
||||
async def _run_daily_direct():
|
||||
"""Run Daily bot with direct connection (no FastAPI server)."""
|
||||
try:
|
||||
import aiohttp
|
||||
|
||||
from pipecat.runner.daily import configure
|
||||
except ImportError as e:
|
||||
logger.error("Daily transport dependencies not installed.")
|
||||
|
||||
Reference in New Issue
Block a user