Improve runner optional transport handling

This commit is contained in:
Mark Backman
2026-05-19 16:42:18 -04:00
parent c09f6d5adb
commit 1487da53a9
2 changed files with 339 additions and 8 deletions

View File

@@ -90,6 +90,7 @@ To run locally:
import argparse
import asyncio
import importlib.util
import mimetypes
import os
import sys
@@ -131,6 +132,11 @@ load_dotenv(override=True)
os.environ["ENV"] = "local"
TELEPHONY_TRANSPORTS = ["twilio", "telnyx", "plivo", "exotel"]
TRANSPORT_ROUTE_DEPENDENCIES = {
"daily": ("daily",),
"webrtc": ("aiortc",),
"websocket": ("fastapi", "websockets"),
}
# Mirror Pipecat Cloud's 4-hour max session limit so dev rooms get cleaned up.
PIPECAT_ROOM_EXP_HOURS = 4.0
@@ -156,6 +162,47 @@ Import this to add custom routes from other packages before calling
"""
def _is_module_available(module: str) -> bool:
"""Check whether a module can be imported without importing it.
Args:
module: Fully-qualified module name to check.
Returns:
``True`` if Python can resolve the module, ``False`` otherwise.
"""
try:
return importlib.util.find_spec(module) is not None
except (ImportError, ModuleNotFoundError, ValueError):
return False
def _transport_route_dependencies(transport: str) -> tuple[str, ...]:
"""Return module dependencies required for a transport route.
Args:
transport: Transport name from the runner request or CLI.
Returns:
Module names required to enable the transport route.
"""
if transport in TELEPHONY_TRANSPORTS:
return TRANSPORT_ROUTE_DEPENDENCIES["websocket"]
return TRANSPORT_ROUTE_DEPENDENCIES.get(transport, ())
def _transport_routes_enabled(transport: str) -> bool:
"""Return whether a transport route can run in this environment.
Args:
transport: Transport name from the runner request or CLI.
Returns:
``True`` if the requested transport is enabled.
"""
return all(_is_module_available(module) for module in _transport_route_dependencies(transport))
def _get_bot_module():
"""Get the bot module from the calling script."""
import importlib.util
@@ -227,6 +274,8 @@ async def _run_websocket_bot(websocket: WebSocket, args: argparse.Namespace):
def _setup_websocket_routes(app: FastAPI, args: argparse.Namespace):
"""Set up the plain WebSocket route at ``/ws-client``."""
if not _transport_routes_enabled("websocket"):
return
@app.websocket("/ws-client")
async def websocket_client_endpoint(websocket: WebSocket):
@@ -338,6 +387,15 @@ def _setup_unified_start_route(
),
)
if not _transport_routes_enabled(transport):
raise HTTPException(
status_code=400,
detail=(
f"Transport '{transport}' is disabled in this runner environment. "
"Check the startup banner for enabled transports."
),
)
if transport == "webrtc":
# WebRTC: register the session; the bot starts when the WebRTC offer arrives.
session_id = str(uuid.uuid4())
@@ -471,6 +529,9 @@ def _setup_webrtc_routes(
app: FastAPI, args: argparse.Namespace, active_sessions: dict[str, dict[str, Any]]
):
"""Set up WebRTC-specific routes."""
if not _transport_routes_enabled("webrtc"):
return
try:
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
from pipecat.transports.smallwebrtc.request_handler import (
@@ -480,7 +541,7 @@ def _setup_webrtc_routes(
SmallWebRTCRequestHandler,
)
except ImportError as e:
logger.error(f"WebRTC transport dependencies not installed: {e}")
logger.warning(f"WebRTC routes disabled after dependency check passed: {e}")
return
@app.get("/files/{filename:path}")
@@ -765,6 +826,8 @@ def _setup_whatsapp_routes(app: FastAPI, args: argparse.Namespace):
def _setup_daily_routes(app: FastAPI, args: argparse.Namespace):
"""Set up Daily-specific routes."""
if not _transport_routes_enabled("daily"):
return
@app.get("/daily")
async def create_room_and_start_agent():
@@ -908,6 +971,9 @@ def _setup_telephony_routes(app: FastAPI, args: argparse.Namespace):
specific telephony transport is chosen via ``-t`` because the XML template
is provider-specific and requires a proxy hostname (``--proxy``).
"""
if not _transport_routes_enabled("twilio"):
return
if args.transport in TELEPHONY_TRANSPORTS:
# XML response templates (Exotel doesn't use XML webhooks)
XML_TEMPLATES = {
@@ -1163,9 +1229,22 @@ def main(parser: argparse.ArgumentParser | None = None):
print()
if args.transport is None:
print("🚀 Bot ready!")
print(f" → WebRTC: http://{args.host}:{args.port}/client")
print(f"Daily: http://{args.host}:{args.port}/daily")
print(f" → Telephony: ws://{args.host}:{args.port}/ws")
if _transport_routes_enabled("webrtc"):
print(f"WebRTC: http://{args.host}:{args.port}/client")
else:
print(" → WebRTC: disabled (install pipecat-ai[webrtc])")
if _transport_routes_enabled("daily"):
print(f" → Daily: http://{args.host}:{args.port}/daily")
else:
print(" → Daily: disabled (install pipecat-ai[daily])")
if _transport_routes_enabled("twilio"):
print(f" → Telephony: ws://{args.host}:{args.port}/ws")
else:
print(" → Telephony: disabled (install pipecat-ai[websocket])")
if _transport_routes_enabled("websocket"):
print(f" → WebSocket: ws://{args.host}:{args.port}/ws-client")
else:
print(" → WebSocket: disabled (install pipecat-ai[websocket])")
elif args.transport == "webrtc":
if args.esp32:
print("🚀 Bot ready! (ESP32 mode)")
@@ -1173,10 +1252,15 @@ def main(parser: argparse.ArgumentParser | None = None):
print("🚀 Bot ready! (WhatsApp)")
else:
print("🚀 Bot ready! (WebRTC)")
print(f" → Open http://{args.host}:{args.port}/client in your browser")
if _transport_routes_enabled("webrtc"):
print(f" → Open http://{args.host}:{args.port}/client in your browser")
else:
print(" → WebRTC disabled (install pipecat-ai[webrtc])")
elif args.transport == "daily":
print("🚀 Bot ready! (Daily)")
if args.dialin:
if not _transport_routes_enabled("daily"):
print(" → Daily disabled (install pipecat-ai[daily])")
elif args.dialin:
print(
f" → Daily dial-in webhook: http://{args.host}:{args.port}/daily-dialin-webhook"
)
@@ -1187,9 +1271,12 @@ def main(parser: argparse.ArgumentParser | None = None):
)
elif args.transport in TELEPHONY_TRANSPORTS:
print(f"🚀 Bot ready! ({args.transport.capitalize()})")
if args.proxy:
if not _transport_routes_enabled(args.transport):
print(" → Telephony disabled (install pipecat-ai[websocket])")
elif args.proxy:
print(f" → XML webhook: http://{args.host}:{args.port}/")
print(f" → WebSocket: ws://{args.host}:{args.port}/ws")
if _transport_routes_enabled(args.transport):
print(f" → WebSocket: ws://{args.host}:{args.port}/ws")
elif args.transport == "vonage":
print()
print(f"🚀 Bot ready!")

244
tests/test_runner_run.py Normal file
View File

@@ -0,0 +1,244 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import sys
import types
import unittest
from unittest.mock import MagicMock, patch
from fastapi import FastAPI
from fastapi.testclient import TestClient
from pydantic import BaseModel
from pipecat.runner.run import (
_setup_daily_routes,
_setup_telephony_routes,
_setup_unified_start_route,
_setup_webrtc_routes,
_setup_websocket_routes,
_transport_route_dependencies,
_transport_routes_enabled,
)
class TestRunnerRun(unittest.TestCase):
def test_transport_route_dependencies_maps_transports_to_modules(self):
self.assertEqual(_transport_route_dependencies("daily"), ("daily",))
self.assertEqual(_transport_route_dependencies("webrtc"), ("aiortc",))
self.assertEqual(_transport_route_dependencies("websocket"), ("fastapi", "websockets"))
self.assertEqual(_transport_route_dependencies("twilio"), ("fastapi", "websockets"))
self.assertEqual(_transport_route_dependencies("telnyx"), ("fastapi", "websockets"))
self.assertEqual(_transport_route_dependencies("plivo"), ("fastapi", "websockets"))
self.assertEqual(_transport_route_dependencies("exotel"), ("fastapi", "websockets"))
self.assertEqual(_transport_route_dependencies("vonage"), ())
def test_transport_routes_enabled_maps_transports_to_dependency_checks(self):
def module_available(module: str) -> bool:
return module in {"fastapi", "websockets"}
with patch("pipecat.runner.run._is_module_available", side_effect=module_available):
self.assertFalse(_transport_routes_enabled("daily"))
self.assertFalse(_transport_routes_enabled("webrtc"))
self.assertTrue(_transport_routes_enabled("websocket"))
self.assertTrue(_transport_routes_enabled("twilio"))
self.assertTrue(_transport_routes_enabled("vonage"))
def test_setup_webrtc_routes_skips_when_aiortc_is_missing(self):
"""WebRTC routes should be optional when the webrtc extra is not installed."""
app = FastAPI()
args = argparse.Namespace(folder=None, esp32=False, host="localhost")
with (
patch("pipecat.runner.run._transport_routes_enabled", return_value=False),
patch("pipecat.runner.run.logger") as logger,
):
_setup_webrtc_routes(app, args, {})
paths = {route.path for route in app.routes}
self.assertNotIn("/api/offer", paths)
logger.info.assert_not_called()
def test_setup_webrtc_routes_registers_routes_when_webrtc_is_available(self):
"""WebRTC routes should be registered when dependencies are available."""
app = FastAPI()
args = argparse.Namespace(folder=None, esp32=False, host="localhost")
connection_module = types.ModuleType("pipecat.transports.smallwebrtc.connection")
connection_module.SmallWebRTCConnection = MagicMock()
request_handler_module = types.ModuleType(
"pipecat.transports.smallwebrtc.request_handler"
)
class IceCandidate(BaseModel):
candidate: str
sdp_mid: str
sdp_mline_index: int
class SmallWebRTCPatchRequest(BaseModel):
pc_id: str
candidates: list[IceCandidate] = []
class SmallWebRTCRequest(BaseModel):
sdp: str
type: str
pc_id: str | None = None
restart_pc: bool | None = None
request_data: dict | None = None
request_handler_module.IceCandidate = IceCandidate
request_handler_module.SmallWebRTCPatchRequest = SmallWebRTCPatchRequest
request_handler_module.SmallWebRTCRequest = SmallWebRTCRequest
class MockSmallWebRTCRequestHandler:
def __init__(self, *args, **kwargs):
pass
async def close(self):
pass
request_handler_module.SmallWebRTCRequestHandler = MockSmallWebRTCRequestHandler
with (
patch("pipecat.runner.run._transport_routes_enabled", return_value=True),
patch.dict(
sys.modules,
{
"pipecat.transports.smallwebrtc.connection": connection_module,
"pipecat.transports.smallwebrtc.request_handler": request_handler_module,
},
),
):
_setup_webrtc_routes(app, args, {})
paths = {route.path for route in app.routes}
self.assertIn("/api/offer", paths)
self.assertIn("/files/{filename:path}", paths)
def test_setup_websocket_routes_skips_when_websocket_is_missing(self):
"""Plain WebSocket routes should be optional."""
app = FastAPI()
args = argparse.Namespace()
with patch("pipecat.runner.run._transport_routes_enabled", return_value=False):
_setup_websocket_routes(app, args)
paths = {route.path for route in app.routes}
self.assertNotIn("/ws-client", paths)
def test_setup_websocket_routes_registers_when_websocket_is_available(self):
"""Plain WebSocket route should be registered when dependencies are available."""
app = FastAPI()
args = argparse.Namespace()
with patch("pipecat.runner.run._transport_routes_enabled", return_value=True):
_setup_websocket_routes(app, args)
paths = {route.path for route in app.routes}
self.assertIn("/ws-client", paths)
def test_setup_telephony_routes_skips_when_websocket_is_missing(self):
"""Telephony WebSocket routes should be optional."""
app = FastAPI()
args = argparse.Namespace(transport=None)
with patch("pipecat.runner.run._transport_routes_enabled", return_value=False):
_setup_telephony_routes(app, args)
paths = {route.path for route in app.routes}
self.assertNotIn("/ws", paths)
def test_setup_telephony_routes_registers_when_websocket_is_available(self):
"""Telephony WebSocket route should be registered when dependencies are available."""
app = FastAPI()
args = argparse.Namespace(transport=None)
with patch("pipecat.runner.run._transport_routes_enabled", return_value=True):
_setup_telephony_routes(app, args)
paths = {route.path for route in app.routes}
self.assertIn("/ws", paths)
def test_setup_telephony_routes_registers_provider_webhook_for_selected_transport(self):
"""Provider webhook route should be registered for selected telephony transports."""
app = FastAPI()
args = argparse.Namespace(transport="twilio", proxy="example.ngrok.io")
with patch("pipecat.runner.run._transport_routes_enabled", return_value=True):
_setup_telephony_routes(app, args)
post_root_routes = [
route for route in app.routes if route.path == "/" and "POST" in route.methods
]
self.assertEqual(len(post_root_routes), 1)
def test_setup_daily_routes_skips_when_daily_is_missing(self):
"""Daily routes should be optional."""
app = FastAPI()
args = argparse.Namespace(dialin=False)
with patch("pipecat.runner.run._transport_routes_enabled", return_value=False):
_setup_daily_routes(app, args)
paths = {route.path for route in app.routes}
self.assertNotIn("/daily", paths)
def test_setup_daily_routes_registers_when_daily_is_available(self):
"""Daily route should be registered when dependencies are available."""
app = FastAPI()
args = argparse.Namespace(dialin=False)
with patch("pipecat.runner.run._transport_routes_enabled", return_value=True):
_setup_daily_routes(app, args)
paths = {route.path for route in app.routes}
self.assertIn("/daily", paths)
def test_setup_daily_routes_registers_dialin_route_when_enabled(self):
"""Daily dial-in route should be registered when requested and available."""
app = FastAPI()
args = argparse.Namespace(dialin=True)
with patch("pipecat.runner.run._transport_routes_enabled", return_value=True):
_setup_daily_routes(app, args)
paths = {route.path for route in app.routes}
self.assertIn("/daily", paths)
self.assertIn("/daily-dialin-webhook", paths)
def test_websocket_routes_require_fastapi_and_websockets(self):
with patch(
"pipecat.runner.run._is_module_available",
side_effect=lambda module: module == "fastapi",
) as is_module_available:
self.assertFalse(_transport_routes_enabled("websocket"))
self.assertEqual(
[call.args[0] for call in is_module_available.call_args_list],
["fastapi", "websockets"],
)
def test_start_rejects_disabled_transport_before_running_bot(self):
app = FastAPI()
args = argparse.Namespace(transport=None)
_setup_unified_start_route(app, args, {})
with patch("pipecat.runner.run._transport_routes_enabled", return_value=False):
response = TestClient(app).post("/start", json={"transport": "daily"})
self.assertEqual(response.status_code, 400)
self.assertEqual(
response.json()["detail"],
(
"Transport 'daily' is disabled in this runner environment. "
"Check the startup banner for enabled transports."
),
)
if __name__ == "__main__":
unittest.main()