From e47f7d0e63fd69d9af7e003b85b2f9ddfb63d240 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Thu, 21 May 2026 23:08:47 -0700 Subject: [PATCH] Auto-end PipelineRunner.run() when all root workers finish run() now defaults to auto_end=True: the runner ends once every root worker has finished, so single-pipeline bots end naturally when their pipeline does and tests no longer need an explicit runner.end() call. Multi-worker bots whose helpers run forever still trigger shutdown via end() / cancel() from an event handler (typically on transport disconnect). Hosts that add and remove workers across many sessions can pass auto_end=False to keep the runner up. --- changelog/4493.changed.3.md | 1 + changelog/4493.deprecated.md | 2 +- src/pipecat/pipeline/runner.py | 76 ++++++++++++++++++++++------------ 3 files changed, 52 insertions(+), 27 deletions(-) create mode 100644 changelog/4493.changed.3.md diff --git a/changelog/4493.changed.3.md b/changelog/4493.changed.3.md new file mode 100644 index 000000000..11aac9b5c --- /dev/null +++ b/changelog/4493.changed.3.md @@ -0,0 +1 @@ +- `PipelineRunner.run()` now ends automatically once every root worker has finished, so single-pipeline bots no longer need an explicit `runner.end()` / `runner.cancel()` call. Multi-worker bots whose helpers run forever (waiting for bus messages) still trigger shutdown by calling `end()` / `cancel()` from an event handler (typically on transport disconnect). Pass `auto_end=False` to `run()` for long-lived hosts (e.g. a FastAPI server) that add and remove workers across many sessions. diff --git a/changelog/4493.deprecated.md b/changelog/4493.deprecated.md index 121cd562c..a60bb95fc 100644 --- a/changelog/4493.deprecated.md +++ b/changelog/4493.deprecated.md @@ -1 +1 @@ -- Passing a worker to `PipelineRunner.run()` is deprecated. Register the worker with `PipelineRunner.add_workers()` before calling `run()` instead; `run()` now blocks until `end()` / `cancel()` is called rather than until the passed worker finishes. The `worker` argument still works but emits a `DeprecationWarning` and will be removed in a future release. +- Passing a worker to `PipelineRunner.run()` is deprecated. Register the worker with `PipelineRunner.add_workers()` before calling `run()` instead. The `worker` argument still works but emits a `DeprecationWarning` and will be removed in a future release. diff --git a/src/pipecat/pipeline/runner.py b/src/pipecat/pipeline/runner.py index b161d8078..bdf74c212 100644 --- a/src/pipecat/pipeline/runner.py +++ b/src/pipecat/pipeline/runner.py @@ -29,12 +29,14 @@ For multi-worker setups, register every worker the same way: await runner.add_workers(CodeWorker("code_worker", ...), worker) await runner.run() -``run()`` blocks until :meth:`PipelineRunner.end` / -:meth:`PipelineRunner.cancel` is called (or an incoming ``BusEndMessage`` / -``BusCancelMessage`` triggers the same path). Added workers finishing on -their own does **not** unblock it — use ``end()`` / ``cancel()`` from an -event handler (e.g. when the transport disconnects) to shut the runner -down. +By default, ``run()`` ends once every root worker has finished — so a +single-pipeline bot naturally ends when its pipeline does. Multi-worker +bots whose helpers run forever (e.g. waiting for bus messages) end by +calling :meth:`PipelineRunner.end` / :meth:`PipelineRunner.cancel` from +an event handler (typically on transport disconnect). For long-lived +hosts that add and remove workers over many sessions (e.g. a FastAPI +server), pass ``auto_end=False`` to ``run()`` so the runner does not +exit when no workers are left. """ import asyncio @@ -87,11 +89,12 @@ class PipelineRunner(BaseObject, BusSubscriber): - :meth:`add_workers(*workers)` — register one or more workers on the runner's bus and start them in the background. Workers run - concurrently and are cancelled when :meth:`end` / :meth:`cancel` - is called. - - :meth:`run` — block until :meth:`end` / :meth:`cancel` is called - (or until an incoming ``BusEndMessage`` / ``BusCancelMessage`` - triggers the same path). + concurrently and remaining workers are cancelled when the runner + ends. + - :meth:`run` — block until the runner ends. By default + (``auto_end=True``) the runner ends once every root worker has + finished; pass ``auto_end=False`` to keep the runner up until + :meth:`end` / :meth:`cancel` is called. Event handlers available: @@ -132,6 +135,7 @@ class PipelineRunner(BaseObject, BusSubscriber): self._entries: dict[str, _WorkerEntry] = {} self._known_runners: set[str] = set() self._running: bool = False + self._auto_end: bool = True self._shutdown_event = asyncio.Event() self._sig_task: asyncio.Task | None = None @@ -184,15 +188,23 @@ class PipelineRunner(BaseObject, BusSubscriber): if self._running: await self._start_worker(entry) - async def run(self, worker: PipelineWorker | None = None) -> None: + async def run( + self, + worker: PipelineWorker | None = None, + *, + auto_end: bool = True, + ) -> None: """Run all added workers until the runner is stopped. - Blocks until :meth:`end` or :meth:`cancel` is called (or until an - incoming ``BusEndMessage`` / ``BusCancelMessage`` triggers the - same path). Added workers finishing on their own does **not** - unblock the runner — call ``end()`` / ``cancel()`` from an - event handler (e.g. when the transport disconnects) to shut the - runner down. + By default (``auto_end=True``), the runner ends once every root + worker has finished — so a single-pipeline bot naturally ends + when its pipeline does. Multi-worker bots whose helpers run + forever (e.g. waiting for bus messages) end by calling + :meth:`end` / :meth:`cancel` from an event handler (typically on + transport disconnect). For long-lived hosts that add and remove + workers over many sessions (e.g. a FastAPI server), pass + ``auto_end=False`` so the runner does not exit when no workers + are left. Args: worker: Optional pipeline worker to run. @@ -201,6 +213,10 @@ class PipelineRunner(BaseObject, BusSubscriber): Register the worker with :meth:`add_workers` before calling ``run()`` instead. Passing ``worker`` here will be removed in a future release. + auto_end: When ``True`` (the default), the runner ends once + every root worker has finished. When ``False``, the + runner blocks until :meth:`end` or :meth:`cancel` is + called. """ if worker is not None: warnings.warn( @@ -211,6 +227,7 @@ class PipelineRunner(BaseObject, BusSubscriber): ) logger.debug(f"PipelineRunner '{self}': started running") + self._auto_end = auto_end self._shutdown_event.clear() # Treat the main worker as any other added worker: ``add_workers`` attaches @@ -222,15 +239,10 @@ class PipelineRunner(BaseObject, BusSubscriber): await self._setup_session() await self._call_event_handler("on_ready") - # Wait for the main worker's background runner worker to finish - # (or for an explicit shutdown when there's no main worker). + # Wait for shutdown. With ``auto_end=True``, ``_run_worker`` sets + # ``_shutdown_event`` as soon as any root worker finishes. try: - if worker is not None: - runner_task = self._entries[worker.name].runner_task - if runner_task is not None: - await runner_task - else: - await self._shutdown_event.wait() + await self._shutdown_event.wait() except asyncio.CancelledError: pass @@ -383,6 +395,18 @@ class PipelineRunner(BaseObject, BusSubscriber): await worker.run(params) except asyncio.CancelledError: pass + finally: + # End the runner once every root worker has finished. The + # current worker's task is still "running" (we're inside its + # body), so exclude it from the check. + if self._auto_end and worker.parent is None: + others_running = any( + e.runner_task is not None and not e.runner_task.done() + for e in self._entries.values() + if e.worker.parent is None and e.worker is not worker + ) + if not others_running: + self._shutdown_event.set() async def _on_local_worker_ready(self, data: WorkerReadyData) -> None: """Called when a local added worker registers as ready."""