diff --git a/changelog/3590.added.md b/changelog/3590.added.md new file mode 100644 index 000000000..80e0c3dba --- /dev/null +++ b/changelog/3590.added.md @@ -0,0 +1 @@ +- `main()` in `pipecat.runner.run` now accepts an optional `argparse.ArgumentParser`, allowing bots to define custom CLI arguments accessible via `runner_args.cli_args`. diff --git a/examples/foundational/18-gstreamer-filesrc.py b/examples/foundational/18-gstreamer-filesrc.py index ceb400c94..56631c443 100644 --- a/examples/foundational/18-gstreamer-filesrc.py +++ b/examples/foundational/18-gstreamer-filesrc.py @@ -20,10 +20,6 @@ from pipecat.transports.daily.transport import DailyParams load_dotenv(override=True) -parser = argparse.ArgumentParser(description="Pipecat Video Streaming Bot") -parser.add_argument("-i", "--input", type=str, required=True, help="Input video file") -args = parser.parse_args() - # We store functions so objects (e.g. SileroVADAnalyzer) don't get # instantiated. The function will be called when the desired transport gets # selected. @@ -46,10 +42,10 @@ transport_params = { async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): - logger.info(f"Starting bot with video input: {args.input}") + logger.info(f"Starting bot with video input: {runner_args.cli_args.input}") gst = GStreamerPipelineSource( - pipeline=f"filesrc location={args.input}", + pipeline=f"filesrc location={runner_args.cli_args.input}", out_params=GStreamerPipelineSource.OutputParams( video_width=1280, video_height=720, @@ -68,6 +64,15 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, ) + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) await runner.run(task) @@ -82,4 +87,7 @@ async def bot(runner_args: RunnerArguments): if __name__ == "__main__": from pipecat.runner.run import main - main() + parser = argparse.ArgumentParser(description="Pipecat Video Streaming Bot") + parser.add_argument("-i", "--input", type=str, required=True, help="Input video file") + + main(parser) diff --git a/src/pipecat/runner/run.py b/src/pipecat/runner/run.py index 9a280d119..76c870468 100644 --- a/src/pipecat/runner/run.py +++ b/src/pipecat/runner/run.py @@ -153,26 +153,18 @@ def _get_bot_module(): ) -async def _run_telephony_bot(websocket: WebSocket): +async def _run_telephony_bot(websocket: WebSocket, args: argparse.Namespace): """Run a bot for telephony transports.""" bot_module = _get_bot_module() # Just pass the WebSocket - let the bot handle parsing runner_args = WebSocketRunnerArguments(websocket=websocket) + runner_args.cli_args = args await bot_module.bot(runner_args) -def _create_server_app( - *, - transport_type: str, - host: str = "localhost", - proxy: str, - esp32_mode: bool = False, - whatsapp_enabled: bool = False, - folder: Optional[str] = None, - dialin_enabled: bool = False, -): +def _create_server_app(args: argparse.Namespace): """Create FastAPI app with transport-specific routes.""" app = FastAPI() @@ -185,23 +177,21 @@ def _create_server_app( ) # Set up transport-specific routes - if transport_type == "webrtc": - _setup_webrtc_routes(app, esp32_mode=esp32_mode, host=host, folder=folder) - if whatsapp_enabled: - _setup_whatsapp_routes(app) - elif transport_type == "daily": - _setup_daily_routes(app, dialin_enabled=dialin_enabled) - elif transport_type in TELEPHONY_TRANSPORTS: - _setup_telephony_routes(app, transport_type=transport_type, proxy=proxy) + if args.transport == "webrtc": + _setup_webrtc_routes(app, args) + if args.whatsapp: + _setup_whatsapp_routes(app, args) + elif args.transport == "daily": + _setup_daily_routes(app, args) + elif args.transport in TELEPHONY_TRANSPORTS: + _setup_telephony_routes(app, args) else: - logger.warning(f"Unknown transport type: {transport_type}") + logger.warning(f"Unknown transport type: {args.transport}") return app -def _setup_webrtc_routes( - app: FastAPI, *, esp32_mode: bool = False, host: str = "localhost", folder: Optional[str] = None -): +def _setup_webrtc_routes(app: FastAPI, args: argparse.Namespace): """Set up WebRTC-specific routes.""" try: from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI @@ -241,11 +231,11 @@ def _setup_webrtc_routes( @app.get("/files/{filename:path}") async def download_file(filename: str): """Handle file downloads.""" - if not folder: + if not args.folder: logger.warning(f"Attempting to dowload {filename}, but downloads folder not setup.") return - file_path = Path(folder) / filename + file_path = Path(args.folder) / filename if not os.path.exists(file_path): raise HTTPException(404) @@ -255,7 +245,7 @@ def _setup_webrtc_routes( # Initialize the SmallWebRTC request handler small_webrtc_handler: SmallWebRTCRequestHandler = SmallWebRTCRequestHandler( - esp32_mode=esp32_mode, host=host + esp32_mode=args.esp32, host=args.host ) @app.post("/api/offer") @@ -269,6 +259,7 @@ def _setup_webrtc_routes( runner_args = SmallWebRTCRunnerArguments( webrtc_connection=connection, body=request.request_data ) + runner_args.cli_args = args background_tasks.add_task(bot_module.bot, runner_args) # Delegate handling to SmallWebRTCRequestHandler @@ -381,8 +372,8 @@ def _add_lifespan_to_app(app: FastAPI, new_lifespan): app.router.lifespan_context = new_lifespan -def _setup_whatsapp_routes(app: FastAPI): - """Set up WebRTC-specific routes.""" +def _setup_whatsapp_routes(app: FastAPI, args: argparse.Namespace): + """Set up WhatsApp-specific routes.""" WHATSAPP_APP_SECRET = os.getenv("WHATSAPP_APP_SECRET") WHATSAPP_PHONE_NUMBER_ID = os.getenv("WHATSAPP_PHONE_NUMBER_ID") WHATSAPP_TOKEN = os.getenv("WHATSAPP_TOKEN") @@ -484,6 +475,7 @@ def _setup_whatsapp_routes(app: FastAPI): """ bot_module = _get_bot_module() runner_args = SmallWebRTCRunnerArguments(webrtc_connection=connection) + runner_args.cli_args = args background_tasks.add_task(bot_module.bot, runner_args) try: @@ -529,13 +521,8 @@ def _setup_whatsapp_routes(app: FastAPI): _add_lifespan_to_app(app, whatsapp_lifespan) -def _setup_daily_routes(app: FastAPI, dialin_enabled: bool = False): - """Set up Daily-specific routes. - - Args: - app: FastAPI application instance - dialin_enabled: If True, adds /daily-dialin-webhook endpoint for PSTN dial-in handling - """ +def _setup_daily_routes(app: FastAPI, args: argparse.Namespace): + """Set up Daily-specific routes.""" @app.get("/") async def create_room_and_start_agent(): @@ -552,6 +539,7 @@ def _setup_daily_routes(app: FastAPI, dialin_enabled: bool = False): # Start the bot in the background with empty body for GET requests bot_module = _get_bot_module() runner_args = DailyRunnerArguments(room_url=room_url, token=token) + runner_args.cli_args = args asyncio.create_task(bot_module.bot(runner_args)) return RedirectResponse(room_url) @@ -635,12 +623,15 @@ def _setup_daily_routes(app: FastAPI, dialin_enabled: bool = False): else: runner_args = RunnerArguments(body=body) + # Update CLI args. + runner_args.cli_args = args + # Start the bot in the background asyncio.create_task(bot_module.bot(runner_args)) return result - if dialin_enabled: + if args.dialin: @app.post("/daily-dialin-webhook") async def handle_dialin_webhook(request: Request): @@ -737,6 +728,7 @@ def _setup_daily_routes(app: FastAPI, dialin_enabled: bool = False): token=room_config.token, body=request_body.model_dump(), ) + runner_args.cli_args = args asyncio.create_task(bot_module.bot(runner_args)) @@ -751,44 +743,44 @@ def _setup_daily_routes(app: FastAPI, dialin_enabled: bool = False): } -def _setup_telephony_routes(app: FastAPI, *, transport_type: str, proxy: str): +def _setup_telephony_routes(app: FastAPI, args: argparse.Namespace): """Set up telephony-specific routes.""" # XML response templates (Exotel doesn't use XML webhooks) XML_TEMPLATES = { "twilio": f""" - + """, "telnyx": f""" - + """, "plivo": f""" - wss://{proxy}/ws + wss://{args.proxy}/ws """, } @app.post("/") async def start_call(): """Handle telephony webhook and return XML response.""" - if transport_type == "exotel": + if args.transport == "exotel": # Exotel doesn't use POST webhooks - redirect to proper documentation logger.debug("POST Exotel endpoint - not used") return { "error": "Exotel doesn't use POST webhooks", - "websocket_url": f"wss://{proxy}/ws", + "websocket_url": f"wss://{args.proxy}/ws", "note": "Configure the WebSocket URL above in your Exotel App Bazaar Voicebot Applet", } else: - logger.debug(f"POST {transport_type.upper()} XML") - xml_content = XML_TEMPLATES.get(transport_type, "") + logger.debug(f"POST {args.transport.upper()} XML") + xml_content = XML_TEMPLATES.get(args.transport, "") return HTMLResponse(content=xml_content, media_type="application/xml") @app.websocket("/ws") @@ -796,15 +788,15 @@ def _setup_telephony_routes(app: FastAPI, *, transport_type: str, proxy: str): """Handle WebSocket connections for telephony.""" await websocket.accept() logger.debug("WebSocket connection accepted") - await _run_telephony_bot(websocket) + await _run_telephony_bot(websocket, args) @app.get("/") async def start_agent(): """Simple status endpoint for telephony transports.""" - return {"status": f"Bot started with {transport_type}"} + return {"status": f"Bot started with {args.transport}"} -async def _run_daily_direct(): +async def _run_daily_direct(args: argparse.Namespace): """Run Daily bot with direct connection (no FastAPI server).""" try: from pipecat.runner.daily import configure @@ -820,6 +812,7 @@ async def _run_daily_direct(): # Direct connections have no request body, so use empty dict runner_args = DailyRunnerArguments(room_url=room_url, token=token) runner_args.handle_sigint = True + runner_args.cli_args = args # Get the bot module and run it directly bot_module = _get_bot_module() @@ -867,29 +860,38 @@ def runner_port() -> int: return RUNNER_PORT -def main(): +def main(parser: Optional[argparse.ArgumentParser] = None): """Start the Pipecat development runner. Parses command-line arguments and starts a FastAPI server configured - for the specified transport type. The runner will discover and run - any bot() function found in the current directory. + for the specified transport type. + + The runner discovers and runs any ``bot(runner_args)`` function found in the + calling module. Command-line arguments: + - --host: Server host address (default: localhost) 879 + - --port: Server port (default: 7860) + - -t/--transport: Transport type (daily, webrtc, twilio, telnyx, plivo, exotel) + - -x/--proxy: Public proxy hostname for telephony webhooks + - -d/--direct: Connect directly to Daily room (automatically sets transport to daily) + - -f/--folder: Path to downloads folder + - --dialin: Enable Daily PSTN dial-in webhook handling (requires Daily transport) + - --esp32: Enable SDP munging for ESP32 compatibility (requires --host with IP address) + - --whatsapp: Ensure requried WhatsApp environment variables are present + - -v/--verbose: Increase logging verbosity Args: - --host: Server host address (default: localhost) - --port: Server port (default: 7860) - -t/--transport: Transport type (daily, webrtc, twilio, telnyx, plivo, exotel) - -x/--proxy: Public proxy hostname for telephony webhooks - --esp32: Enable SDP munging for ESP32 compatibility (requires --host with IP address) - -d/--direct: Connect directly to Daily room (automatically sets transport to daily) - -v/--verbose: Increase logging verbosity + parser: Optional custom argument parser. If provided, default runner + arguments are added to it so bots can define their own CLI + arguments. Custom arguments should not conflict with the default + ones. Custom args are accessible via `runner_args.cli_args`. - The bot file must contain a `bot(runner_args)` function as the entry point. """ global RUNNER_DOWNLOADS_FOLDER, RUNNER_HOST, RUNNER_PORT - parser = argparse.ArgumentParser(description="Pipecat Development Runner") + if not parser: + parser = argparse.ArgumentParser(description="Pipecat Development Runner") parser.add_argument("--host", type=str, default=RUNNER_HOST, help="Host address") parser.add_argument("--port", type=int, default=RUNNER_PORT, help="Port number") parser.add_argument( @@ -900,13 +902,7 @@ def main(): default="webrtc", help="Transport type", ) - parser.add_argument("--proxy", "-x", help="Public proxy host name") - parser.add_argument( - "--esp32", - action="store_true", - default=False, - help="Enable SDP munging for ESP32 compatibility (requires --host with IP address)", - ) + parser.add_argument("-x", "--proxy", help="Public proxy host name") parser.add_argument( "-d", "--direct", @@ -916,13 +912,7 @@ def main(): ) parser.add_argument("-f", "--folder", type=str, help="Path to downloads folder") parser.add_argument( - "--verbose", "-v", action="count", default=0, help="Increase logging verbosity" - ) - parser.add_argument( - "--whatsapp", - action="store_true", - default=False, - help="Ensure requried WhatsApp environment variables are present", + "-v", "--verbose", action="count", default=0, help="Increase logging verbosity" ) parser.add_argument( "--dialin", @@ -930,6 +920,18 @@ def main(): default=False, help="Enable Daily PSTN dial-in webhook handling (requires Daily transport)", ) + parser.add_argument( + "--esp32", + action="store_true", + default=False, + help="Enable SDP munging for ESP32 compatibility (requires --host with IP address)", + ) + parser.add_argument( + "--whatsapp", + action="store_true", + default=False, + help="Ensure requried WhatsApp environment variables are present", + ) args = parser.parse_args() @@ -965,7 +967,7 @@ def main(): print() # Run direct Daily connection - asyncio.run(_run_daily_direct()) + asyncio.run(_run_daily_direct(args)) return # Print startup message for server-based transports @@ -996,15 +998,7 @@ def main(): RUNNER_PORT = args.port # Create the app with transport-specific setup - app = _create_server_app( - transport_type=args.transport, - host=args.host, - proxy=args.proxy, - esp32_mode=args.esp32, - whatsapp_enabled=args.whatsapp, - folder=args.folder, - dialin_enabled=args.dialin, - ) + app = _create_server_app(args) # Run the server uvicorn.run(app, host=args.host, port=args.port) diff --git a/src/pipecat/runner/types.py b/src/pipecat/runner/types.py index 0e997b963..e48f10a08 100644 --- a/src/pipecat/runner/types.py +++ b/src/pipecat/runner/types.py @@ -10,6 +10,7 @@ These types are used by the development runner to pass transport-specific information to bot functions. """ +import argparse from dataclasses import dataclass, field from typing import Any, Dict, Optional @@ -64,6 +65,7 @@ class RunnerArguments: handle_sigterm: bool = field(init=False, kw_only=True) pipeline_idle_timeout_secs: int = field(init=False, kw_only=True) body: Optional[Any] = field(default_factory=dict, kw_only=True) + cli_args: Optional[argparse.Namespace] = field(default=None, init=False, kw_only=True) def __post_init__(self): self.handle_sigint = False