diff --git a/src/pipecat/runner/run.py b/src/pipecat/runner/run.py index f654f110d..189bd3efd 100644 --- a/src/pipecat/runner/run.py +++ b/src/pipecat/runner/run.py @@ -90,6 +90,7 @@ To run locally: import argparse import asyncio +import importlib.util import mimetypes import os import sys @@ -131,6 +132,11 @@ load_dotenv(override=True) os.environ["ENV"] = "local" TELEPHONY_TRANSPORTS = ["twilio", "telnyx", "plivo", "exotel"] +TRANSPORT_ROUTE_DEPENDENCIES = { + "daily": ("daily",), + "webrtc": ("aiortc",), + "websocket": ("fastapi", "websockets"), +} # Mirror Pipecat Cloud's 4-hour max session limit so dev rooms get cleaned up. PIPECAT_ROOM_EXP_HOURS = 4.0 @@ -156,6 +162,47 @@ Import this to add custom routes from other packages before calling """ +def _is_module_available(module: str) -> bool: + """Check whether a module can be imported without importing it. + + Args: + module: Fully-qualified module name to check. + + Returns: + ``True`` if Python can resolve the module, ``False`` otherwise. + """ + try: + return importlib.util.find_spec(module) is not None + except (ImportError, ModuleNotFoundError, ValueError): + return False + + +def _transport_route_dependencies(transport: str) -> tuple[str, ...]: + """Return module dependencies required for a transport route. + + Args: + transport: Transport name from the runner request or CLI. + + Returns: + Module names required to enable the transport route. + """ + if transport in TELEPHONY_TRANSPORTS: + return TRANSPORT_ROUTE_DEPENDENCIES["websocket"] + return TRANSPORT_ROUTE_DEPENDENCIES.get(transport, ()) + + +def _transport_routes_enabled(transport: str) -> bool: + """Return whether a transport route can run in this environment. + + Args: + transport: Transport name from the runner request or CLI. + + Returns: + ``True`` if the requested transport is enabled. + """ + return all(_is_module_available(module) for module in _transport_route_dependencies(transport)) + + def _get_bot_module(): """Get the bot module from the calling script.""" import importlib.util @@ -227,6 +274,8 @@ async def _run_websocket_bot(websocket: WebSocket, args: argparse.Namespace): def _setup_websocket_routes(app: FastAPI, args: argparse.Namespace): """Set up the plain WebSocket route at ``/ws-client``.""" + if not _transport_routes_enabled("websocket"): + return @app.websocket("/ws-client") async def websocket_client_endpoint(websocket: WebSocket): @@ -338,6 +387,15 @@ def _setup_unified_start_route( ), ) + if not _transport_routes_enabled(transport): + raise HTTPException( + status_code=400, + detail=( + f"Transport '{transport}' is disabled in this runner environment. " + "Check the startup banner for enabled transports." + ), + ) + if transport == "webrtc": # WebRTC: register the session; the bot starts when the WebRTC offer arrives. session_id = str(uuid.uuid4()) @@ -471,6 +529,9 @@ def _setup_webrtc_routes( app: FastAPI, args: argparse.Namespace, active_sessions: dict[str, dict[str, Any]] ): """Set up WebRTC-specific routes.""" + if not _transport_routes_enabled("webrtc"): + return + try: from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection from pipecat.transports.smallwebrtc.request_handler import ( @@ -480,7 +541,7 @@ def _setup_webrtc_routes( SmallWebRTCRequestHandler, ) except ImportError as e: - logger.error(f"WebRTC transport dependencies not installed: {e}") + logger.warning(f"WebRTC routes disabled after dependency check passed: {e}") return @app.get("/files/{filename:path}") @@ -765,6 +826,8 @@ def _setup_whatsapp_routes(app: FastAPI, args: argparse.Namespace): def _setup_daily_routes(app: FastAPI, args: argparse.Namespace): """Set up Daily-specific routes.""" + if not _transport_routes_enabled("daily"): + return @app.get("/daily") async def create_room_and_start_agent(): @@ -908,6 +971,9 @@ def _setup_telephony_routes(app: FastAPI, args: argparse.Namespace): specific telephony transport is chosen via ``-t`` because the XML template is provider-specific and requires a proxy hostname (``--proxy``). """ + if not _transport_routes_enabled("twilio"): + return + if args.transport in TELEPHONY_TRANSPORTS: # XML response templates (Exotel doesn't use XML webhooks) XML_TEMPLATES = { @@ -1163,9 +1229,22 @@ def main(parser: argparse.ArgumentParser | None = None): print() if args.transport is None: print("🚀 Bot ready!") - print(f" → WebRTC: http://{args.host}:{args.port}/client") - print(f" → Daily: http://{args.host}:{args.port}/daily") - print(f" → Telephony: ws://{args.host}:{args.port}/ws") + if _transport_routes_enabled("webrtc"): + print(f" → WebRTC: http://{args.host}:{args.port}/client") + else: + print(" → WebRTC: disabled (install pipecat-ai[webrtc])") + if _transport_routes_enabled("daily"): + print(f" → Daily: http://{args.host}:{args.port}/daily") + else: + print(" → Daily: disabled (install pipecat-ai[daily])") + if _transport_routes_enabled("twilio"): + print(f" → Telephony: ws://{args.host}:{args.port}/ws") + else: + print(" → Telephony: disabled (install pipecat-ai[websocket])") + if _transport_routes_enabled("websocket"): + print(f" → WebSocket: ws://{args.host}:{args.port}/ws-client") + else: + print(" → WebSocket: disabled (install pipecat-ai[websocket])") elif args.transport == "webrtc": if args.esp32: print("🚀 Bot ready! (ESP32 mode)") @@ -1173,10 +1252,15 @@ def main(parser: argparse.ArgumentParser | None = None): print("🚀 Bot ready! (WhatsApp)") else: print("🚀 Bot ready! (WebRTC)") - print(f" → Open http://{args.host}:{args.port}/client in your browser") + if _transport_routes_enabled("webrtc"): + print(f" → Open http://{args.host}:{args.port}/client in your browser") + else: + print(" → WebRTC disabled (install pipecat-ai[webrtc])") elif args.transport == "daily": print("🚀 Bot ready! (Daily)") - if args.dialin: + if not _transport_routes_enabled("daily"): + print(" → Daily disabled (install pipecat-ai[daily])") + elif args.dialin: print( f" → Daily dial-in webhook: http://{args.host}:{args.port}/daily-dialin-webhook" ) @@ -1187,9 +1271,12 @@ def main(parser: argparse.ArgumentParser | None = None): ) elif args.transport in TELEPHONY_TRANSPORTS: print(f"🚀 Bot ready! ({args.transport.capitalize()})") - if args.proxy: + if not _transport_routes_enabled(args.transport): + print(" → Telephony disabled (install pipecat-ai[websocket])") + elif args.proxy: print(f" → XML webhook: http://{args.host}:{args.port}/") - print(f" → WebSocket: ws://{args.host}:{args.port}/ws") + if _transport_routes_enabled(args.transport): + print(f" → WebSocket: ws://{args.host}:{args.port}/ws") elif args.transport == "vonage": print() print(f"🚀 Bot ready!") diff --git a/tests/test_runner_run.py b/tests/test_runner_run.py new file mode 100644 index 000000000..b4fc52117 --- /dev/null +++ b/tests/test_runner_run.py @@ -0,0 +1,244 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import argparse +import sys +import types +import unittest +from unittest.mock import MagicMock, patch + +from fastapi import FastAPI +from fastapi.testclient import TestClient +from pydantic import BaseModel + +from pipecat.runner.run import ( + _setup_daily_routes, + _setup_telephony_routes, + _setup_unified_start_route, + _setup_webrtc_routes, + _setup_websocket_routes, + _transport_route_dependencies, + _transport_routes_enabled, +) + + +class TestRunnerRun(unittest.TestCase): + def test_transport_route_dependencies_maps_transports_to_modules(self): + self.assertEqual(_transport_route_dependencies("daily"), ("daily",)) + self.assertEqual(_transport_route_dependencies("webrtc"), ("aiortc",)) + self.assertEqual(_transport_route_dependencies("websocket"), ("fastapi", "websockets")) + self.assertEqual(_transport_route_dependencies("twilio"), ("fastapi", "websockets")) + self.assertEqual(_transport_route_dependencies("telnyx"), ("fastapi", "websockets")) + self.assertEqual(_transport_route_dependencies("plivo"), ("fastapi", "websockets")) + self.assertEqual(_transport_route_dependencies("exotel"), ("fastapi", "websockets")) + self.assertEqual(_transport_route_dependencies("vonage"), ()) + + def test_transport_routes_enabled_maps_transports_to_dependency_checks(self): + def module_available(module: str) -> bool: + return module in {"fastapi", "websockets"} + + with patch("pipecat.runner.run._is_module_available", side_effect=module_available): + self.assertFalse(_transport_routes_enabled("daily")) + self.assertFalse(_transport_routes_enabled("webrtc")) + self.assertTrue(_transport_routes_enabled("websocket")) + self.assertTrue(_transport_routes_enabled("twilio")) + self.assertTrue(_transport_routes_enabled("vonage")) + + def test_setup_webrtc_routes_skips_when_aiortc_is_missing(self): + """WebRTC routes should be optional when the webrtc extra is not installed.""" + app = FastAPI() + args = argparse.Namespace(folder=None, esp32=False, host="localhost") + + with ( + patch("pipecat.runner.run._transport_routes_enabled", return_value=False), + patch("pipecat.runner.run.logger") as logger, + ): + _setup_webrtc_routes(app, args, {}) + + paths = {route.path for route in app.routes} + self.assertNotIn("/api/offer", paths) + logger.info.assert_not_called() + + def test_setup_webrtc_routes_registers_routes_when_webrtc_is_available(self): + """WebRTC routes should be registered when dependencies are available.""" + app = FastAPI() + args = argparse.Namespace(folder=None, esp32=False, host="localhost") + + connection_module = types.ModuleType("pipecat.transports.smallwebrtc.connection") + connection_module.SmallWebRTCConnection = MagicMock() + + request_handler_module = types.ModuleType( + "pipecat.transports.smallwebrtc.request_handler" + ) + + class IceCandidate(BaseModel): + candidate: str + sdp_mid: str + sdp_mline_index: int + + class SmallWebRTCPatchRequest(BaseModel): + pc_id: str + candidates: list[IceCandidate] = [] + + class SmallWebRTCRequest(BaseModel): + sdp: str + type: str + pc_id: str | None = None + restart_pc: bool | None = None + request_data: dict | None = None + + request_handler_module.IceCandidate = IceCandidate + request_handler_module.SmallWebRTCPatchRequest = SmallWebRTCPatchRequest + request_handler_module.SmallWebRTCRequest = SmallWebRTCRequest + + class MockSmallWebRTCRequestHandler: + def __init__(self, *args, **kwargs): + pass + + async def close(self): + pass + + request_handler_module.SmallWebRTCRequestHandler = MockSmallWebRTCRequestHandler + + with ( + patch("pipecat.runner.run._transport_routes_enabled", return_value=True), + patch.dict( + sys.modules, + { + "pipecat.transports.smallwebrtc.connection": connection_module, + "pipecat.transports.smallwebrtc.request_handler": request_handler_module, + }, + ), + ): + _setup_webrtc_routes(app, args, {}) + + paths = {route.path for route in app.routes} + self.assertIn("/api/offer", paths) + self.assertIn("/files/{filename:path}", paths) + + def test_setup_websocket_routes_skips_when_websocket_is_missing(self): + """Plain WebSocket routes should be optional.""" + app = FastAPI() + args = argparse.Namespace() + + with patch("pipecat.runner.run._transport_routes_enabled", return_value=False): + _setup_websocket_routes(app, args) + + paths = {route.path for route in app.routes} + self.assertNotIn("/ws-client", paths) + + def test_setup_websocket_routes_registers_when_websocket_is_available(self): + """Plain WebSocket route should be registered when dependencies are available.""" + app = FastAPI() + args = argparse.Namespace() + + with patch("pipecat.runner.run._transport_routes_enabled", return_value=True): + _setup_websocket_routes(app, args) + + paths = {route.path for route in app.routes} + self.assertIn("/ws-client", paths) + + def test_setup_telephony_routes_skips_when_websocket_is_missing(self): + """Telephony WebSocket routes should be optional.""" + app = FastAPI() + args = argparse.Namespace(transport=None) + + with patch("pipecat.runner.run._transport_routes_enabled", return_value=False): + _setup_telephony_routes(app, args) + + paths = {route.path for route in app.routes} + self.assertNotIn("/ws", paths) + + def test_setup_telephony_routes_registers_when_websocket_is_available(self): + """Telephony WebSocket route should be registered when dependencies are available.""" + app = FastAPI() + args = argparse.Namespace(transport=None) + + with patch("pipecat.runner.run._transport_routes_enabled", return_value=True): + _setup_telephony_routes(app, args) + + paths = {route.path for route in app.routes} + self.assertIn("/ws", paths) + + def test_setup_telephony_routes_registers_provider_webhook_for_selected_transport(self): + """Provider webhook route should be registered for selected telephony transports.""" + app = FastAPI() + args = argparse.Namespace(transport="twilio", proxy="example.ngrok.io") + + with patch("pipecat.runner.run._transport_routes_enabled", return_value=True): + _setup_telephony_routes(app, args) + + post_root_routes = [ + route for route in app.routes if route.path == "/" and "POST" in route.methods + ] + self.assertEqual(len(post_root_routes), 1) + + def test_setup_daily_routes_skips_when_daily_is_missing(self): + """Daily routes should be optional.""" + app = FastAPI() + args = argparse.Namespace(dialin=False) + + with patch("pipecat.runner.run._transport_routes_enabled", return_value=False): + _setup_daily_routes(app, args) + + paths = {route.path for route in app.routes} + self.assertNotIn("/daily", paths) + + def test_setup_daily_routes_registers_when_daily_is_available(self): + """Daily route should be registered when dependencies are available.""" + app = FastAPI() + args = argparse.Namespace(dialin=False) + + with patch("pipecat.runner.run._transport_routes_enabled", return_value=True): + _setup_daily_routes(app, args) + + paths = {route.path for route in app.routes} + self.assertIn("/daily", paths) + + def test_setup_daily_routes_registers_dialin_route_when_enabled(self): + """Daily dial-in route should be registered when requested and available.""" + app = FastAPI() + args = argparse.Namespace(dialin=True) + + with patch("pipecat.runner.run._transport_routes_enabled", return_value=True): + _setup_daily_routes(app, args) + + paths = {route.path for route in app.routes} + self.assertIn("/daily", paths) + self.assertIn("/daily-dialin-webhook", paths) + + def test_websocket_routes_require_fastapi_and_websockets(self): + with patch( + "pipecat.runner.run._is_module_available", + side_effect=lambda module: module == "fastapi", + ) as is_module_available: + self.assertFalse(_transport_routes_enabled("websocket")) + + self.assertEqual( + [call.args[0] for call in is_module_available.call_args_list], + ["fastapi", "websockets"], + ) + + def test_start_rejects_disabled_transport_before_running_bot(self): + app = FastAPI() + args = argparse.Namespace(transport=None) + _setup_unified_start_route(app, args, {}) + + with patch("pipecat.runner.run._transport_routes_enabled", return_value=False): + response = TestClient(app).post("/start", json={"transport": "daily"}) + + self.assertEqual(response.status_code, 400) + self.assertEqual( + response.json()["detail"], + ( + "Transport 'daily' is disabled in this runner environment. " + "Check the startup banner for enabled transports." + ), + ) + + +if __name__ == "__main__": + unittest.main()