runner: allow custom CLI arguments

This commit is contained in:
Aleix Conchillo Flaqué
2026-01-29 10:08:25 -08:00
parent f3b72e9263
commit bbb8b53d03
4 changed files with 95 additions and 90 deletions

1
changelog/3590.added.md Normal file
View File

@@ -0,0 +1 @@
- `main()` in `pipecat.runner.run` now accepts an optional `argparse.ArgumentParser`, allowing bots to define custom CLI arguments accessible via `runner_args.cli_args`.

View File

@@ -20,10 +20,6 @@ from pipecat.transports.daily.transport import DailyParams
load_dotenv(override=True)
parser = argparse.ArgumentParser(description="Pipecat Video Streaming Bot")
parser.add_argument("-i", "--input", type=str, required=True, help="Input video file")
args = parser.parse_args()
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
@@ -46,10 +42,10 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot with video input: {args.input}")
logger.info(f"Starting bot with video input: {runner_args.cli_args.input}")
gst = GStreamerPipelineSource(
pipeline=f"filesrc location={args.input}",
pipeline=f"filesrc location={runner_args.cli_args.input}",
out_params=GStreamerPipelineSource.OutputParams(
video_width=1280,
video_height=720,
@@ -68,6 +64,15 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
@@ -82,4 +87,7 @@ async def bot(runner_args: RunnerArguments):
if __name__ == "__main__":
from pipecat.runner.run import main
main()
parser = argparse.ArgumentParser(description="Pipecat Video Streaming Bot")
parser.add_argument("-i", "--input", type=str, required=True, help="Input video file")
main(parser)

View File

@@ -153,26 +153,18 @@ def _get_bot_module():
)
async def _run_telephony_bot(websocket: WebSocket):
async def _run_telephony_bot(websocket: WebSocket, args: argparse.Namespace):
"""Run a bot for telephony transports."""
bot_module = _get_bot_module()
# Just pass the WebSocket - let the bot handle parsing
runner_args = WebSocketRunnerArguments(websocket=websocket)
runner_args.cli_args = args
await bot_module.bot(runner_args)
def _create_server_app(
*,
transport_type: str,
host: str = "localhost",
proxy: str,
esp32_mode: bool = False,
whatsapp_enabled: bool = False,
folder: Optional[str] = None,
dialin_enabled: bool = False,
):
def _create_server_app(args: argparse.Namespace):
"""Create FastAPI app with transport-specific routes."""
app = FastAPI()
@@ -185,23 +177,21 @@ def _create_server_app(
)
# Set up transport-specific routes
if transport_type == "webrtc":
_setup_webrtc_routes(app, esp32_mode=esp32_mode, host=host, folder=folder)
if whatsapp_enabled:
_setup_whatsapp_routes(app)
elif transport_type == "daily":
_setup_daily_routes(app, dialin_enabled=dialin_enabled)
elif transport_type in TELEPHONY_TRANSPORTS:
_setup_telephony_routes(app, transport_type=transport_type, proxy=proxy)
if args.transport == "webrtc":
_setup_webrtc_routes(app, args)
if args.whatsapp:
_setup_whatsapp_routes(app, args)
elif args.transport == "daily":
_setup_daily_routes(app, args)
elif args.transport in TELEPHONY_TRANSPORTS:
_setup_telephony_routes(app, args)
else:
logger.warning(f"Unknown transport type: {transport_type}")
logger.warning(f"Unknown transport type: {args.transport}")
return app
def _setup_webrtc_routes(
app: FastAPI, *, esp32_mode: bool = False, host: str = "localhost", folder: Optional[str] = None
):
def _setup_webrtc_routes(app: FastAPI, args: argparse.Namespace):
"""Set up WebRTC-specific routes."""
try:
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
@@ -241,11 +231,11 @@ def _setup_webrtc_routes(
@app.get("/files/{filename:path}")
async def download_file(filename: str):
"""Handle file downloads."""
if not folder:
if not args.folder:
logger.warning(f"Attempting to dowload {filename}, but downloads folder not setup.")
return
file_path = Path(folder) / filename
file_path = Path(args.folder) / filename
if not os.path.exists(file_path):
raise HTTPException(404)
@@ -255,7 +245,7 @@ def _setup_webrtc_routes(
# Initialize the SmallWebRTC request handler
small_webrtc_handler: SmallWebRTCRequestHandler = SmallWebRTCRequestHandler(
esp32_mode=esp32_mode, host=host
esp32_mode=args.esp32, host=args.host
)
@app.post("/api/offer")
@@ -269,6 +259,7 @@ def _setup_webrtc_routes(
runner_args = SmallWebRTCRunnerArguments(
webrtc_connection=connection, body=request.request_data
)
runner_args.cli_args = args
background_tasks.add_task(bot_module.bot, runner_args)
# Delegate handling to SmallWebRTCRequestHandler
@@ -381,8 +372,8 @@ def _add_lifespan_to_app(app: FastAPI, new_lifespan):
app.router.lifespan_context = new_lifespan
def _setup_whatsapp_routes(app: FastAPI):
"""Set up WebRTC-specific routes."""
def _setup_whatsapp_routes(app: FastAPI, args: argparse.Namespace):
"""Set up WhatsApp-specific routes."""
WHATSAPP_APP_SECRET = os.getenv("WHATSAPP_APP_SECRET")
WHATSAPP_PHONE_NUMBER_ID = os.getenv("WHATSAPP_PHONE_NUMBER_ID")
WHATSAPP_TOKEN = os.getenv("WHATSAPP_TOKEN")
@@ -484,6 +475,7 @@ def _setup_whatsapp_routes(app: FastAPI):
"""
bot_module = _get_bot_module()
runner_args = SmallWebRTCRunnerArguments(webrtc_connection=connection)
runner_args.cli_args = args
background_tasks.add_task(bot_module.bot, runner_args)
try:
@@ -529,13 +521,8 @@ def _setup_whatsapp_routes(app: FastAPI):
_add_lifespan_to_app(app, whatsapp_lifespan)
def _setup_daily_routes(app: FastAPI, dialin_enabled: bool = False):
"""Set up Daily-specific routes.
Args:
app: FastAPI application instance
dialin_enabled: If True, adds /daily-dialin-webhook endpoint for PSTN dial-in handling
"""
def _setup_daily_routes(app: FastAPI, args: argparse.Namespace):
"""Set up Daily-specific routes."""
@app.get("/")
async def create_room_and_start_agent():
@@ -552,6 +539,7 @@ def _setup_daily_routes(app: FastAPI, dialin_enabled: bool = False):
# Start the bot in the background with empty body for GET requests
bot_module = _get_bot_module()
runner_args = DailyRunnerArguments(room_url=room_url, token=token)
runner_args.cli_args = args
asyncio.create_task(bot_module.bot(runner_args))
return RedirectResponse(room_url)
@@ -635,12 +623,15 @@ def _setup_daily_routes(app: FastAPI, dialin_enabled: bool = False):
else:
runner_args = RunnerArguments(body=body)
# Update CLI args.
runner_args.cli_args = args
# Start the bot in the background
asyncio.create_task(bot_module.bot(runner_args))
return result
if dialin_enabled:
if args.dialin:
@app.post("/daily-dialin-webhook")
async def handle_dialin_webhook(request: Request):
@@ -737,6 +728,7 @@ def _setup_daily_routes(app: FastAPI, dialin_enabled: bool = False):
token=room_config.token,
body=request_body.model_dump(),
)
runner_args.cli_args = args
asyncio.create_task(bot_module.bot(runner_args))
@@ -751,44 +743,44 @@ def _setup_daily_routes(app: FastAPI, dialin_enabled: bool = False):
}
def _setup_telephony_routes(app: FastAPI, *, transport_type: str, proxy: str):
def _setup_telephony_routes(app: FastAPI, args: argparse.Namespace):
"""Set up telephony-specific routes."""
# XML response templates (Exotel doesn't use XML webhooks)
XML_TEMPLATES = {
"twilio": f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Connect>
<Stream url="wss://{proxy}/ws"></Stream>
<Stream url="wss://{args.proxy}/ws"></Stream>
</Connect>
<Pause length="40"/>
</Response>""",
"telnyx": f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Connect>
<Stream url="wss://{proxy}/ws" bidirectionalMode="rtp"></Stream>
<Stream url="wss://{args.proxy}/ws" bidirectionalMode="rtp"></Stream>
</Connect>
<Pause length="40"/>
</Response>""",
"plivo": f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Stream bidirectional="true" keepCallAlive="true" contentType="audio/x-mulaw;rate=8000">wss://{proxy}/ws</Stream>
<Stream bidirectional="true" keepCallAlive="true" contentType="audio/x-mulaw;rate=8000">wss://{args.proxy}/ws</Stream>
</Response>""",
}
@app.post("/")
async def start_call():
"""Handle telephony webhook and return XML response."""
if transport_type == "exotel":
if args.transport == "exotel":
# Exotel doesn't use POST webhooks - redirect to proper documentation
logger.debug("POST Exotel endpoint - not used")
return {
"error": "Exotel doesn't use POST webhooks",
"websocket_url": f"wss://{proxy}/ws",
"websocket_url": f"wss://{args.proxy}/ws",
"note": "Configure the WebSocket URL above in your Exotel App Bazaar Voicebot Applet",
}
else:
logger.debug(f"POST {transport_type.upper()} XML")
xml_content = XML_TEMPLATES.get(transport_type, "<Response></Response>")
logger.debug(f"POST {args.transport.upper()} XML")
xml_content = XML_TEMPLATES.get(args.transport, "<Response></Response>")
return HTMLResponse(content=xml_content, media_type="application/xml")
@app.websocket("/ws")
@@ -796,15 +788,15 @@ def _setup_telephony_routes(app: FastAPI, *, transport_type: str, proxy: str):
"""Handle WebSocket connections for telephony."""
await websocket.accept()
logger.debug("WebSocket connection accepted")
await _run_telephony_bot(websocket)
await _run_telephony_bot(websocket, args)
@app.get("/")
async def start_agent():
"""Simple status endpoint for telephony transports."""
return {"status": f"Bot started with {transport_type}"}
return {"status": f"Bot started with {args.transport}"}
async def _run_daily_direct():
async def _run_daily_direct(args: argparse.Namespace):
"""Run Daily bot with direct connection (no FastAPI server)."""
try:
from pipecat.runner.daily import configure
@@ -820,6 +812,7 @@ async def _run_daily_direct():
# Direct connections have no request body, so use empty dict
runner_args = DailyRunnerArguments(room_url=room_url, token=token)
runner_args.handle_sigint = True
runner_args.cli_args = args
# Get the bot module and run it directly
bot_module = _get_bot_module()
@@ -867,29 +860,38 @@ def runner_port() -> int:
return RUNNER_PORT
def main():
def main(parser: Optional[argparse.ArgumentParser] = None):
"""Start the Pipecat development runner.
Parses command-line arguments and starts a FastAPI server configured
for the specified transport type. The runner will discover and run
any bot() function found in the current directory.
for the specified transport type.
The runner discovers and runs any ``bot(runner_args)`` function found in the
calling module.
Command-line arguments:
- --host: Server host address (default: localhost) 879
- --port: Server port (default: 7860)
- -t/--transport: Transport type (daily, webrtc, twilio, telnyx, plivo, exotel)
- -x/--proxy: Public proxy hostname for telephony webhooks
- -d/--direct: Connect directly to Daily room (automatically sets transport to daily)
- -f/--folder: Path to downloads folder
- --dialin: Enable Daily PSTN dial-in webhook handling (requires Daily transport)
- --esp32: Enable SDP munging for ESP32 compatibility (requires --host with IP address)
- --whatsapp: Ensure requried WhatsApp environment variables are present
- -v/--verbose: Increase logging verbosity
Args:
--host: Server host address (default: localhost)
--port: Server port (default: 7860)
-t/--transport: Transport type (daily, webrtc, twilio, telnyx, plivo, exotel)
-x/--proxy: Public proxy hostname for telephony webhooks
--esp32: Enable SDP munging for ESP32 compatibility (requires --host with IP address)
-d/--direct: Connect directly to Daily room (automatically sets transport to daily)
-v/--verbose: Increase logging verbosity
parser: Optional custom argument parser. If provided, default runner
arguments are added to it so bots can define their own CLI
arguments. Custom arguments should not conflict with the default
ones. Custom args are accessible via `runner_args.cli_args`.
The bot file must contain a `bot(runner_args)` function as the entry point.
"""
global RUNNER_DOWNLOADS_FOLDER, RUNNER_HOST, RUNNER_PORT
parser = argparse.ArgumentParser(description="Pipecat Development Runner")
if not parser:
parser = argparse.ArgumentParser(description="Pipecat Development Runner")
parser.add_argument("--host", type=str, default=RUNNER_HOST, help="Host address")
parser.add_argument("--port", type=int, default=RUNNER_PORT, help="Port number")
parser.add_argument(
@@ -900,13 +902,7 @@ def main():
default="webrtc",
help="Transport type",
)
parser.add_argument("--proxy", "-x", help="Public proxy host name")
parser.add_argument(
"--esp32",
action="store_true",
default=False,
help="Enable SDP munging for ESP32 compatibility (requires --host with IP address)",
)
parser.add_argument("-x", "--proxy", help="Public proxy host name")
parser.add_argument(
"-d",
"--direct",
@@ -916,13 +912,7 @@ def main():
)
parser.add_argument("-f", "--folder", type=str, help="Path to downloads folder")
parser.add_argument(
"--verbose", "-v", action="count", default=0, help="Increase logging verbosity"
)
parser.add_argument(
"--whatsapp",
action="store_true",
default=False,
help="Ensure requried WhatsApp environment variables are present",
"-v", "--verbose", action="count", default=0, help="Increase logging verbosity"
)
parser.add_argument(
"--dialin",
@@ -930,6 +920,18 @@ def main():
default=False,
help="Enable Daily PSTN dial-in webhook handling (requires Daily transport)",
)
parser.add_argument(
"--esp32",
action="store_true",
default=False,
help="Enable SDP munging for ESP32 compatibility (requires --host with IP address)",
)
parser.add_argument(
"--whatsapp",
action="store_true",
default=False,
help="Ensure requried WhatsApp environment variables are present",
)
args = parser.parse_args()
@@ -965,7 +967,7 @@ def main():
print()
# Run direct Daily connection
asyncio.run(_run_daily_direct())
asyncio.run(_run_daily_direct(args))
return
# Print startup message for server-based transports
@@ -996,15 +998,7 @@ def main():
RUNNER_PORT = args.port
# Create the app with transport-specific setup
app = _create_server_app(
transport_type=args.transport,
host=args.host,
proxy=args.proxy,
esp32_mode=args.esp32,
whatsapp_enabled=args.whatsapp,
folder=args.folder,
dialin_enabled=args.dialin,
)
app = _create_server_app(args)
# Run the server
uvicorn.run(app, host=args.host, port=args.port)

View File

@@ -10,6 +10,7 @@ These types are used by the development runner to pass transport-specific
information to bot functions.
"""
import argparse
from dataclasses import dataclass, field
from typing import Any, Dict, Optional
@@ -64,6 +65,7 @@ class RunnerArguments:
handle_sigterm: bool = field(init=False, kw_only=True)
pipeline_idle_timeout_secs: int = field(init=False, kw_only=True)
body: Optional[Any] = field(default_factory=dict, kw_only=True)
cli_args: Optional[argparse.Namespace] = field(default=None, init=False, kw_only=True)
def __post_init__(self):
self.handle_sigint = False