diff --git a/changelog/4385.added.md b/changelog/4385.added.md new file mode 100644 index 000000000..16da37174 --- /dev/null +++ b/changelog/4385.added.md @@ -0,0 +1 @@ +- Added a `session_id` field to `RunnerArguments` so bots can log or trace a per-session identifier in local development the same way they can in Pipecat Cloud. The development runner now mints a UUID at every construction site, and paths that already returned a `sessionId` to the caller (Daily `/start`, dial-in webhook) share that same UUID with the runner args instead of generating two. The SmallWebRTC `/api/offer` endpoint also accepts an optional `session_id` query parameter so the `/sessions/{session_id}/...` proxy can thread it through. diff --git a/src/pipecat/runner/run.py b/src/pipecat/runner/run.py index be0b8575e..4deda3b7d 100644 --- a/src/pipecat/runner/run.py +++ b/src/pipecat/runner/run.py @@ -180,7 +180,7 @@ async def _run_telephony_bot(websocket: WebSocket, args: argparse.Namespace): bot_module = _get_bot_module() # Just pass the WebSocket - let the bot handle parsing - runner_args = WebSocketRunnerArguments(websocket=websocket) + runner_args = WebSocketRunnerArguments(websocket=websocket, session_id=str(uuid.uuid4())) runner_args.cli_args = args await bot_module.bot(runner_args) @@ -267,15 +267,25 @@ def _setup_webrtc_routes(app: FastAPI, args: argparse.Namespace): ) @app.post("/api/offer") - async def offer(request: SmallWebRTCRequest, background_tasks: BackgroundTasks): + async def offer( + request: SmallWebRTCRequest, + background_tasks: BackgroundTasks, + session_id: str | None = None, + ): """Handle WebRTC offer requests via SmallWebRTCRequestHandler.""" + # When called via the /sessions/{session_id}/api/offer proxy the + # session_id is threaded through; for direct /api/offer calls we mint + # one so bots see a stable identifier in either path. + resolved_session_id = session_id or str(uuid.uuid4()) # Prepare runner arguments with the callback to run your bot async def webrtc_connection_callback(connection: SmallWebRTCConnection): bot_module = _get_bot_module() runner_args = SmallWebRTCRunnerArguments( - webrtc_connection=connection, body=request.request_data + webrtc_connection=connection, + body=request.request_data, + session_id=resolved_session_id, ) runner_args.cli_args = args background_tasks.add_task(bot_module.bot, runner_args) @@ -343,7 +353,7 @@ def _setup_webrtc_routes(app: FastAPI, args: argparse.Namespace): or request_data.get("requestData") or active_session, ) - return await offer(webrtc_request, background_tasks) + return await offer(webrtc_request, background_tasks, session_id=session_id) elif request.method == HTTPMethod.PATCH.value: patch_request = SmallWebRTCPatchRequest( pc_id=request_data["pc_id"], @@ -490,7 +500,9 @@ def _setup_whatsapp_routes(app: FastAPI, args: argparse.Namespace): connection: The established WebRTC connection """ bot_module = _get_bot_module() - runner_args = SmallWebRTCRunnerArguments(webrtc_connection=connection) + runner_args = SmallWebRTCRunnerArguments( + webrtc_connection=connection, session_id=str(uuid.uuid4()) + ) runner_args.cli_args = args background_tasks.add_task(bot_module.bot, runner_args) @@ -554,7 +566,9 @@ def _setup_daily_routes(app: FastAPI, args: argparse.Namespace): # Start the bot in the background with empty body for GET requests bot_module = _get_bot_module() - runner_args = DailyRunnerArguments(room_url=room_url, token=token) + runner_args = DailyRunnerArguments( + room_url=room_url, token=token, session_id=str(uuid.uuid4()) + ) runner_args.cli_args = args asyncio.create_task(bot_module.bot(runner_args)) return RedirectResponse(room_url) @@ -590,6 +604,7 @@ def _setup_daily_routes(app: FastAPI, args: argparse.Namespace): existing_room_url = os.getenv("DAILY_ROOM_URL") + session_id = str(uuid.uuid4()) result = None # Configure room if: @@ -638,14 +653,16 @@ def _setup_daily_routes(app: FastAPI, args: argparse.Namespace): room_properties=room_properties, token_properties=token_properties, ) - runner_args = DailyRunnerArguments(room_url=room_url, token=token, body=body) + runner_args = DailyRunnerArguments( + room_url=room_url, token=token, body=body, session_id=session_id + ) result = { "dailyRoom": room_url, "dailyToken": token, - "sessionId": str(uuid.uuid4()), + "sessionId": session_id, } else: - runner_args = RunnerArguments(body=body) + runner_args = RunnerArguments(body=body, session_id=session_id) # Update CLI args. runner_args.cli_args = args @@ -749,20 +766,21 @@ def _setup_daily_routes(app: FastAPI, args: argparse.Namespace): daily_api_url=daily_api_url, ) + # Generate session ID for both the runner args and the response + session_id = str(uuid.uuid4()) + # Start bot with dial-in context bot_module = _get_bot_module() runner_args = DailyRunnerArguments( room_url=room_config.room_url, token=room_config.token, body=request_body.model_dump(), + session_id=session_id, ) runner_args.cli_args = args asyncio.create_task(bot_module.bot(runner_args)) - # Generate session ID - session_id = str(uuid.uuid4()) - # Return response matching Pipecat Cloud format return { "dailyRoom": room_config.room_url, @@ -838,7 +856,9 @@ async def _run_daily_direct(args: argparse.Namespace): room_url, token = await configure(session, room_exp_duration=PIPECAT_ROOM_EXP_HOURS) # Direct connections have no request body, so use empty dict - runner_args = DailyRunnerArguments(room_url=room_url, token=token) + runner_args = DailyRunnerArguments( + room_url=room_url, token=token, session_id=str(uuid.uuid4()) + ) runner_args.handle_sigint = True runner_args.cli_args = args diff --git a/src/pipecat/runner/types.py b/src/pipecat/runner/types.py index b009b3350..70ce03f29 100644 --- a/src/pipecat/runner/types.py +++ b/src/pipecat/runner/types.py @@ -58,13 +58,25 @@ class DailyDialinRequest(BaseModel): @dataclass class RunnerArguments: - """Base class for runner session arguments.""" + """Base class for runner session arguments. + + Parameters: + handle_sigint: Whether the bot should install a SIGINT handler. + handle_sigterm: Whether the bot should install a SIGTERM handler. + pipeline_idle_timeout_secs: Seconds the pipeline may stay idle before + shutting down. + body: Optional request body data passed from the runner entry point. + session_id: Identifier for this bot session. + cli_args: Parsed CLI arguments from the runner, when launched via the + development runner. + """ # Use kw_only so subclasses don't need to worry about ordering. handle_sigint: bool = field(init=False, kw_only=True) handle_sigterm: bool = field(init=False, kw_only=True) pipeline_idle_timeout_secs: int = field(init=False, kw_only=True) body: Any | None = field(default_factory=dict, kw_only=True) + session_id: str | None = field(default=None, kw_only=True) cli_args: argparse.Namespace | None = field(default=None, init=False, kw_only=True) def __post_init__(self):