Rename PipelineRunner.add_worker() to variadic add_workers(*workers)

Lets callers register multiple workers in a single call instead of
awaiting add_worker() repeatedly. Updates all examples, docs, tests,
and proxy worker docstrings to use the new API.
This commit is contained in:
Aleix Conchillo Flaqué
2026-05-21 19:45:47 -07:00
parent f91179a640
commit e8ec7c585f
13 changed files with 58 additions and 62 deletions

View File

@@ -1,6 +1,6 @@
# Pipecat Multi-Worker Examples
This directory contains example bots that use the multi-worker framework in `pipecat.workers`, `pipecat.pipeline.runner` (with `add_worker()`), and the `WorkerBus`. Each example shows a different cooperation pattern between workers: hand-off, parallel fan-out, remote workers, etc.
This directory contains example bots that use the multi-worker framework in `pipecat.workers`, `pipecat.pipeline.runner` (with `add_workers()`), and the `WorkerBus`. Each example shows a different cooperation pattern between workers: hand-off, parallel fan-out, remote workers, etc.
## Setup

View File

@@ -161,8 +161,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info("Client disconnected")
await runner.cancel()
await runner.add_worker(CodeWorker("code_worker", project_path=PROJECT_PATH))
await runner.add_worker(worker)
await runner.add_workers(CodeWorker("code_worker", project_path=PROJECT_PATH), worker)
await runner.run()

View File

@@ -251,9 +251,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info("Client disconnected")
await runner.cancel()
await runner.add_worker(build_greeter())
await runner.add_worker(build_support())
await runner.add_worker(worker)
await runner.add_workers(build_greeter(), build_support(), worker)
await runner.run()

View File

@@ -224,9 +224,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info("Client disconnected")
await runner.cancel()
await runner.add_worker(build_greeter())
await runner.add_worker(build_support())
await runner.add_worker(worker)
await runner.add_workers(build_greeter(), build_support(), worker)
await runner.run()

View File

@@ -217,9 +217,12 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info("Client disconnected")
await runner.cancel()
for role in ROLE_PROMPTS:
await runner.add_worker(DebateWorker(role))
await runner.add_worker(worker)
await runner.add_workers(
DebateWorker("advocate"),
DebateWorker("critic"),
DebateWorker("analyst"),
worker,
)
await runner.run()

View File

@@ -103,8 +103,7 @@ async def websocket_endpoint(websocket: WebSocket):
assistant = AcmeAssistant()
await runner.add_worker(proxy)
await runner.add_worker(assistant)
await runner.add_workers(proxy, assistant)
logger.info("Assistant server ready, waiting for activation")
await runner.run()

View File

@@ -146,8 +146,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info("Client disconnected")
await runner.cancel()
await runner.add_worker(proxy)
await runner.add_worker(worker)
await runner.add_workers(proxy, worker)
await runner.run()

View File

@@ -316,8 +316,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info("Client disconnected")
await runner.cancel()
await runner.add_worker(build_sensor_controller())
await runner.add_worker(worker)
await runner.add_workers(build_sensor_controller(), worker)
await runner.run()

View File

@@ -287,7 +287,7 @@ class BaseWorker(BaseObject, BusSubscriber):
def attach(self, *, registry: WorkerRegistry, bus: WorkerBus) -> None:
"""Attach the worker to a runner-provided registry and bus.
Called by the runner (typically from ``add_worker()``) before the
Called by the runner (typically from ``add_workers()``) before the
worker is run. After this call, :attr:`registry` and :attr:`bus`
return the provided instances.

View File

@@ -27,10 +27,10 @@ For multi-worker setups, add the additional workers alongside the main one:
.. code-block:: python
runner = PipelineRunner()
await runner.add_worker(CodeWorker("code_worker", ...))
await runner.add_workers(CodeWorker("code_worker", ...))
await runner.run(worker)
Optionally, ``add_worker`` every worker (including the main pipeline) and call
Optionally, ``add_workers`` every worker (including the main pipeline) and call
``run()`` with no argument. In that form ``run()`` blocks until
:meth:`PipelineRunner.end` / :meth:`PipelineRunner.cancel` is called (or an
incoming ``BusEndMessage`` / ``BusCancelMessage`` triggers the same path) —
@@ -86,10 +86,10 @@ class PipelineRunner(BaseObject, BusSubscriber):
- :meth:`run(worker)` — block until the given pipeline worker finishes.
The most common case for a single-pipeline bot.
- :meth:`add_worker(worker)` — register a worker on the runner's bus and
start it in the background. Added workers run alongside the main
worker and are cancelled when the main worker finishes (or when
:meth:`end` / :meth:`cancel` is called).
- :meth:`add_workers(*workers)` — register one or more workers on the
runner's bus and start them in the background. Added workers run
alongside the main worker and are cancelled when the main worker
finishes (or when :meth:`end` / :meth:`cancel` is called).
Event handlers available:
@@ -151,35 +151,36 @@ class PipelineRunner(BaseObject, BusSubscriber):
"""The worker registry this runner owns."""
return self._registry
async def add_worker(self, worker: BaseWorker) -> None:
"""Add a worker to the runner.
async def add_workers(self, *workers: BaseWorker) -> None:
"""Add one or more workers to the runner.
Adding a worker attaches it to the runner's bus and registry, and
starts it in the background. If the runner is not yet running
(``add_worker`` was called before :meth:`run`), the worker is
(``add_workers`` was called before :meth:`run`), workers are
queued and started during run setup; if the runner is already
running, the worker starts immediately.
running, each worker starts immediately.
Added workers run alongside the main worker and are cancelled
when the main worker finishes (or when :meth:`end` /
:meth:`cancel` is called).
Args:
worker: The worker to add.
*workers: One or more workers to add.
"""
if worker.name in self._entries:
logger.error(
f"PipelineRunner '{self}': worker '{worker.name}' already exists, skipping"
)
return
worker.attach(registry=self._registry, bus=self._bus)
await self._registry.watch(worker.name, self._on_local_worker_ready)
entry = _WorkerEntry(worker=worker)
self._entries[worker.name] = entry
logger.debug(f"PipelineRunner '{self}': added worker '{worker.name}'")
for worker in workers:
if worker.name in self._entries:
logger.error(
f"PipelineRunner '{self}': worker '{worker.name}' already exists, skipping"
)
continue
worker.attach(registry=self._registry, bus=self._bus)
await self._registry.watch(worker.name, self._on_local_worker_ready)
entry = _WorkerEntry(worker=worker)
self._entries[worker.name] = entry
logger.debug(f"PipelineRunner '{self}': added worker '{worker.name}'")
if self._running:
await self._start_worker(entry)
if self._running:
await self._start_worker(entry)
async def run(self, worker: PipelineWorker | None = None) -> None:
"""Run a pipeline worker to completion (optionally alongside added workers).
@@ -203,11 +204,11 @@ class PipelineRunner(BaseObject, BusSubscriber):
logger.debug(f"PipelineRunner '{self}': started running {worker}")
self._shutdown_event.clear()
# Treat the main worker as any other added worker: ``add_worker`` attaches
# Treat the main worker as any other added worker: ``add_workers`` attaches
# it to the bus and registry, and ``_setup_session`` then starts every
# entry (main and pre-added) through the same code path.
if worker is not None:
await self.add_worker(worker)
await self.add_workers(worker)
await self._setup_session()
await self._call_event_handler("on_ready")
@@ -299,7 +300,7 @@ class PipelineRunner(BaseObject, BusSubscriber):
elif isinstance(message, BusCancelMessage):
self.create_task(self.cancel(message.reason), "cancel")
elif isinstance(message, BusAddWorkerMessage) and message.worker:
await self.add_worker(message.worker)
await self.add_workers(message.worker)
elif isinstance(message, BusWorkerRegistryMessage):
await self._handle_worker_registry(message)

View File

@@ -56,7 +56,7 @@ class WebSocketProxyClientTask(BaseWorker):
async def on_disconnected(worker, websocket):
logger.info("Disconnected from remote server")
await runner.add_worker(proxy)
await runner.add_workers(proxy)
"""
def __init__(

View File

@@ -58,7 +58,7 @@ class WebSocketProxyServerTask(BaseWorker):
async def on_client_disconnected(worker, websocket):
logger.info("Client disconnected")
await runner.add_worker(proxy)
await runner.add_workers(proxy)
"""
def __init__(

View File

@@ -32,20 +32,20 @@ class StubTask(BaseWorker):
class TestPipelineRunner(unittest.IsolatedAsyncioTestCase):
async def test_spawn_registers_task(self):
"""add_worker() registers the task by name (duplicate is silently skipped)."""
"""add_workers() registers the task by name (duplicate is silently skipped)."""
runner = PipelineRunner(handle_sigint=False)
task = StubTask("task_a")
await runner.add_worker(task)
await runner.add_workers(task)
# Duplicate is silently skipped (logs error)
await runner.add_worker(StubTask("task_a"))
await runner.add_workers(StubTask("task_a"))
async def test_run_starts_bus_and_tasks(self):
"""run() starts bus, starts all tasks, fires on_ready."""
runner = PipelineRunner(handle_sigint=False)
task = StubTask("task_a")
await runner.add_worker(task)
await runner.add_workers(task)
runner_started = asyncio.Event()
@@ -63,7 +63,7 @@ class TestPipelineRunner(unittest.IsolatedAsyncioTestCase):
"""end() is idempotent — subsequent calls are no-ops."""
runner = PipelineRunner(handle_sigint=False)
task = StubTask("task_a")
await runner.add_worker(task)
await runner.add_workers(task)
@runner.event_handler("on_ready")
async def on_ready(runner):
@@ -77,7 +77,7 @@ class TestPipelineRunner(unittest.IsolatedAsyncioTestCase):
"""cancel() is idempotent — subsequent calls are no-ops."""
runner = PipelineRunner(handle_sigint=False)
task = StubTask("task_a")
await runner.add_worker(task)
await runner.add_workers(task)
@runner.event_handler("on_ready")
async def on_ready(runner):
@@ -96,8 +96,8 @@ class TestPipelineRunner(unittest.IsolatedAsyncioTestCase):
child = StubTask("child")
# Manually mark child as having root as parent
child._parent = root.name
await runner.add_worker(root)
await runner.add_worker(child)
await runner.add_workers(root)
await runner.add_workers(child)
sent = []
bus = runner.bus
@@ -123,8 +123,8 @@ class TestPipelineRunner(unittest.IsolatedAsyncioTestCase):
root = StubTask("root")
child = StubTask("child")
child._parent = root.name
await runner.add_worker(root)
await runner.add_worker(child)
await runner.add_workers(root)
await runner.add_workers(child)
sent = []
bus = runner.bus
@@ -148,7 +148,7 @@ class TestPipelineRunner(unittest.IsolatedAsyncioTestCase):
"""BusEndMessage on bus triggers runner.end()."""
runner = PipelineRunner(handle_sigint=False)
task = StubTask("task_a")
await runner.add_worker(task)
await runner.add_workers(task)
bus = runner.bus
@@ -164,7 +164,7 @@ class TestPipelineRunner(unittest.IsolatedAsyncioTestCase):
"""BusCancelMessage on bus triggers runner.cancel()."""
runner = PipelineRunner(handle_sigint=False)
task = StubTask("task_a")
await runner.add_worker(task)
await runner.add_workers(task)
bus = runner.bus
@@ -178,10 +178,10 @@ class TestPipelineRunner(unittest.IsolatedAsyncioTestCase):
pass
async def test_bus_add_task_message_triggers_add(self):
"""BusAddWorkerMessage on bus triggers add_worker()."""
"""BusAddWorkerMessage on bus triggers add_workers()."""
runner = PipelineRunner(handle_sigint=False)
task_a = StubTask("task_a")
await runner.add_worker(task_a)
await runner.add_workers(task_a)
task_b = StubTask("task_b")
bus = runner.bus
@@ -195,7 +195,7 @@ class TestPipelineRunner(unittest.IsolatedAsyncioTestCase):
await asyncio.wait_for(runner.run(), timeout=5.0)
# Verify task_b was added (duplicate is silently skipped)
await runner.add_worker(StubTask("task_b"))
await runner.add_workers(StubTask("task_b"))
if __name__ == "__main__":