diff --git a/examples/multi-worker/README.md b/examples/multi-worker/README.md index fdb938165..6eee3ccea 100644 --- a/examples/multi-worker/README.md +++ b/examples/multi-worker/README.md @@ -1,6 +1,6 @@ # Pipecat Multi-Worker Examples -This directory contains example bots that use the multi-worker framework in `pipecat.workers`, `pipecat.pipeline.runner` (with `add_worker()`), and the `WorkerBus`. Each example shows a different cooperation pattern between workers: hand-off, parallel fan-out, remote workers, etc. +This directory contains example bots that use the multi-worker framework in `pipecat.workers`, `pipecat.pipeline.runner` (with `add_workers()`), and the `WorkerBus`. Each example shows a different cooperation pattern between workers: hand-off, parallel fan-out, remote workers, etc. ## Setup diff --git a/examples/multi-worker/code-assistant/code-assistant.py b/examples/multi-worker/code-assistant/code-assistant.py index 1b1266cd1..cc561a3f3 100644 --- a/examples/multi-worker/code-assistant/code-assistant.py +++ b/examples/multi-worker/code-assistant/code-assistant.py @@ -161,8 +161,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info("Client disconnected") await runner.cancel() - await runner.add_worker(CodeWorker("code_worker", project_path=PROJECT_PATH)) - await runner.add_worker(worker) + await runner.add_workers(CodeWorker("code_worker", project_path=PROJECT_PATH), worker) await runner.run() diff --git a/examples/multi-worker/local-handoff/local-handoff-two-agents-tts.py b/examples/multi-worker/local-handoff/local-handoff-two-agents-tts.py index 9d9951e7f..3a313d86b 100644 --- a/examples/multi-worker/local-handoff/local-handoff-two-agents-tts.py +++ b/examples/multi-worker/local-handoff/local-handoff-two-agents-tts.py @@ -251,9 +251,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info("Client disconnected") await runner.cancel() - await runner.add_worker(build_greeter()) - await runner.add_worker(build_support()) - await runner.add_worker(worker) + await runner.add_workers(build_greeter(), build_support(), worker) await runner.run() diff --git a/examples/multi-worker/local-handoff/local-handoff-two-agents.py b/examples/multi-worker/local-handoff/local-handoff-two-agents.py index eac7acf31..ca654d267 100644 --- a/examples/multi-worker/local-handoff/local-handoff-two-agents.py +++ b/examples/multi-worker/local-handoff/local-handoff-two-agents.py @@ -224,9 +224,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info("Client disconnected") await runner.cancel() - await runner.add_worker(build_greeter()) - await runner.add_worker(build_support()) - await runner.add_worker(worker) + await runner.add_workers(build_greeter(), build_support(), worker) await runner.run() diff --git a/examples/multi-worker/parallel-debate/parallel-debate.py b/examples/multi-worker/parallel-debate/parallel-debate.py index 1ce55ef7c..e72e92696 100644 --- a/examples/multi-worker/parallel-debate/parallel-debate.py +++ b/examples/multi-worker/parallel-debate/parallel-debate.py @@ -217,9 +217,12 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info("Client disconnected") await runner.cancel() - for role in ROLE_PROMPTS: - await runner.add_worker(DebateWorker(role)) - await runner.add_worker(worker) + await runner.add_workers( + DebateWorker("advocate"), + DebateWorker("critic"), + DebateWorker("analyst"), + worker, + ) await runner.run() diff --git a/examples/multi-worker/remote-proxy-assistant/assistant.py b/examples/multi-worker/remote-proxy-assistant/assistant.py index 36520d4b3..ba0680a5d 100644 --- a/examples/multi-worker/remote-proxy-assistant/assistant.py +++ b/examples/multi-worker/remote-proxy-assistant/assistant.py @@ -103,8 +103,7 @@ async def websocket_endpoint(websocket: WebSocket): assistant = AcmeAssistant() - await runner.add_worker(proxy) - await runner.add_worker(assistant) + await runner.add_workers(proxy, assistant) logger.info("Assistant server ready, waiting for activation") await runner.run() diff --git a/examples/multi-worker/remote-proxy-assistant/main.py b/examples/multi-worker/remote-proxy-assistant/main.py index 4120e3b12..f04d09536 100644 --- a/examples/multi-worker/remote-proxy-assistant/main.py +++ b/examples/multi-worker/remote-proxy-assistant/main.py @@ -146,8 +146,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info("Client disconnected") await runner.cancel() - await runner.add_worker(proxy) - await runner.add_worker(worker) + await runner.add_workers(proxy, worker) await runner.run() diff --git a/examples/multi-worker/sensor-controller/sensor-controller.py b/examples/multi-worker/sensor-controller/sensor-controller.py index 663729f73..18f4fadd6 100644 --- a/examples/multi-worker/sensor-controller/sensor-controller.py +++ b/examples/multi-worker/sensor-controller/sensor-controller.py @@ -316,8 +316,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info("Client disconnected") await runner.cancel() - await runner.add_worker(build_sensor_controller()) - await runner.add_worker(worker) + await runner.add_workers(build_sensor_controller(), worker) await runner.run() diff --git a/src/pipecat/pipeline/base_worker.py b/src/pipecat/pipeline/base_worker.py index 7e5441e0e..b6d599fa3 100644 --- a/src/pipecat/pipeline/base_worker.py +++ b/src/pipecat/pipeline/base_worker.py @@ -287,7 +287,7 @@ class BaseWorker(BaseObject, BusSubscriber): def attach(self, *, registry: WorkerRegistry, bus: WorkerBus) -> None: """Attach the worker to a runner-provided registry and bus. - Called by the runner (typically from ``add_worker()``) before the + Called by the runner (typically from ``add_workers()``) before the worker is run. After this call, :attr:`registry` and :attr:`bus` return the provided instances. diff --git a/src/pipecat/pipeline/runner.py b/src/pipecat/pipeline/runner.py index 257c1996c..90edc26ae 100644 --- a/src/pipecat/pipeline/runner.py +++ b/src/pipecat/pipeline/runner.py @@ -27,10 +27,10 @@ For multi-worker setups, add the additional workers alongside the main one: .. code-block:: python runner = PipelineRunner() - await runner.add_worker(CodeWorker("code_worker", ...)) + await runner.add_workers(CodeWorker("code_worker", ...)) await runner.run(worker) -Optionally, ``add_worker`` every worker (including the main pipeline) and call +Optionally, ``add_workers`` every worker (including the main pipeline) and call ``run()`` with no argument. In that form ``run()`` blocks until :meth:`PipelineRunner.end` / :meth:`PipelineRunner.cancel` is called (or an incoming ``BusEndMessage`` / ``BusCancelMessage`` triggers the same path) — @@ -86,10 +86,10 @@ class PipelineRunner(BaseObject, BusSubscriber): - :meth:`run(worker)` — block until the given pipeline worker finishes. The most common case for a single-pipeline bot. - - :meth:`add_worker(worker)` — register a worker on the runner's bus and - start it in the background. Added workers run alongside the main - worker and are cancelled when the main worker finishes (or when - :meth:`end` / :meth:`cancel` is called). + - :meth:`add_workers(*workers)` — register one or more workers on the + runner's bus and start them in the background. Added workers run + alongside the main worker and are cancelled when the main worker + finishes (or when :meth:`end` / :meth:`cancel` is called). Event handlers available: @@ -151,35 +151,36 @@ class PipelineRunner(BaseObject, BusSubscriber): """The worker registry this runner owns.""" return self._registry - async def add_worker(self, worker: BaseWorker) -> None: - """Add a worker to the runner. + async def add_workers(self, *workers: BaseWorker) -> None: + """Add one or more workers to the runner. Adding a worker attaches it to the runner's bus and registry, and starts it in the background. If the runner is not yet running - (``add_worker`` was called before :meth:`run`), the worker is + (``add_workers`` was called before :meth:`run`), workers are queued and started during run setup; if the runner is already - running, the worker starts immediately. + running, each worker starts immediately. Added workers run alongside the main worker and are cancelled when the main worker finishes (or when :meth:`end` / :meth:`cancel` is called). Args: - worker: The worker to add. + *workers: One or more workers to add. """ - if worker.name in self._entries: - logger.error( - f"PipelineRunner '{self}': worker '{worker.name}' already exists, skipping" - ) - return - worker.attach(registry=self._registry, bus=self._bus) - await self._registry.watch(worker.name, self._on_local_worker_ready) - entry = _WorkerEntry(worker=worker) - self._entries[worker.name] = entry - logger.debug(f"PipelineRunner '{self}': added worker '{worker.name}'") + for worker in workers: + if worker.name in self._entries: + logger.error( + f"PipelineRunner '{self}': worker '{worker.name}' already exists, skipping" + ) + continue + worker.attach(registry=self._registry, bus=self._bus) + await self._registry.watch(worker.name, self._on_local_worker_ready) + entry = _WorkerEntry(worker=worker) + self._entries[worker.name] = entry + logger.debug(f"PipelineRunner '{self}': added worker '{worker.name}'") - if self._running: - await self._start_worker(entry) + if self._running: + await self._start_worker(entry) async def run(self, worker: PipelineWorker | None = None) -> None: """Run a pipeline worker to completion (optionally alongside added workers). @@ -203,11 +204,11 @@ class PipelineRunner(BaseObject, BusSubscriber): logger.debug(f"PipelineRunner '{self}': started running {worker}") self._shutdown_event.clear() - # Treat the main worker as any other added worker: ``add_worker`` attaches + # Treat the main worker as any other added worker: ``add_workers`` attaches # it to the bus and registry, and ``_setup_session`` then starts every # entry (main and pre-added) through the same code path. if worker is not None: - await self.add_worker(worker) + await self.add_workers(worker) await self._setup_session() await self._call_event_handler("on_ready") @@ -299,7 +300,7 @@ class PipelineRunner(BaseObject, BusSubscriber): elif isinstance(message, BusCancelMessage): self.create_task(self.cancel(message.reason), "cancel") elif isinstance(message, BusAddWorkerMessage) and message.worker: - await self.add_worker(message.worker) + await self.add_workers(message.worker) elif isinstance(message, BusWorkerRegistryMessage): await self._handle_worker_registry(message) diff --git a/src/pipecat/workers/proxy/websocket/client.py b/src/pipecat/workers/proxy/websocket/client.py index 77f9864ab..dfe87955e 100644 --- a/src/pipecat/workers/proxy/websocket/client.py +++ b/src/pipecat/workers/proxy/websocket/client.py @@ -56,7 +56,7 @@ class WebSocketProxyClientTask(BaseWorker): async def on_disconnected(worker, websocket): logger.info("Disconnected from remote server") - await runner.add_worker(proxy) + await runner.add_workers(proxy) """ def __init__( diff --git a/src/pipecat/workers/proxy/websocket/server.py b/src/pipecat/workers/proxy/websocket/server.py index e31e62ef6..252483b7c 100644 --- a/src/pipecat/workers/proxy/websocket/server.py +++ b/src/pipecat/workers/proxy/websocket/server.py @@ -58,7 +58,7 @@ class WebSocketProxyServerTask(BaseWorker): async def on_client_disconnected(worker, websocket): logger.info("Client disconnected") - await runner.add_worker(proxy) + await runner.add_workers(proxy) """ def __init__( diff --git a/tests/test_runner.py b/tests/test_runner.py index a48fcc6f3..40b3b261c 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -32,20 +32,20 @@ class StubTask(BaseWorker): class TestPipelineRunner(unittest.IsolatedAsyncioTestCase): async def test_spawn_registers_task(self): - """add_worker() registers the task by name (duplicate is silently skipped).""" + """add_workers() registers the task by name (duplicate is silently skipped).""" runner = PipelineRunner(handle_sigint=False) task = StubTask("task_a") - await runner.add_worker(task) + await runner.add_workers(task) # Duplicate is silently skipped (logs error) - await runner.add_worker(StubTask("task_a")) + await runner.add_workers(StubTask("task_a")) async def test_run_starts_bus_and_tasks(self): """run() starts bus, starts all tasks, fires on_ready.""" runner = PipelineRunner(handle_sigint=False) task = StubTask("task_a") - await runner.add_worker(task) + await runner.add_workers(task) runner_started = asyncio.Event() @@ -63,7 +63,7 @@ class TestPipelineRunner(unittest.IsolatedAsyncioTestCase): """end() is idempotent — subsequent calls are no-ops.""" runner = PipelineRunner(handle_sigint=False) task = StubTask("task_a") - await runner.add_worker(task) + await runner.add_workers(task) @runner.event_handler("on_ready") async def on_ready(runner): @@ -77,7 +77,7 @@ class TestPipelineRunner(unittest.IsolatedAsyncioTestCase): """cancel() is idempotent — subsequent calls are no-ops.""" runner = PipelineRunner(handle_sigint=False) task = StubTask("task_a") - await runner.add_worker(task) + await runner.add_workers(task) @runner.event_handler("on_ready") async def on_ready(runner): @@ -96,8 +96,8 @@ class TestPipelineRunner(unittest.IsolatedAsyncioTestCase): child = StubTask("child") # Manually mark child as having root as parent child._parent = root.name - await runner.add_worker(root) - await runner.add_worker(child) + await runner.add_workers(root) + await runner.add_workers(child) sent = [] bus = runner.bus @@ -123,8 +123,8 @@ class TestPipelineRunner(unittest.IsolatedAsyncioTestCase): root = StubTask("root") child = StubTask("child") child._parent = root.name - await runner.add_worker(root) - await runner.add_worker(child) + await runner.add_workers(root) + await runner.add_workers(child) sent = [] bus = runner.bus @@ -148,7 +148,7 @@ class TestPipelineRunner(unittest.IsolatedAsyncioTestCase): """BusEndMessage on bus triggers runner.end().""" runner = PipelineRunner(handle_sigint=False) task = StubTask("task_a") - await runner.add_worker(task) + await runner.add_workers(task) bus = runner.bus @@ -164,7 +164,7 @@ class TestPipelineRunner(unittest.IsolatedAsyncioTestCase): """BusCancelMessage on bus triggers runner.cancel().""" runner = PipelineRunner(handle_sigint=False) task = StubTask("task_a") - await runner.add_worker(task) + await runner.add_workers(task) bus = runner.bus @@ -178,10 +178,10 @@ class TestPipelineRunner(unittest.IsolatedAsyncioTestCase): pass async def test_bus_add_task_message_triggers_add(self): - """BusAddWorkerMessage on bus triggers add_worker().""" + """BusAddWorkerMessage on bus triggers add_workers().""" runner = PipelineRunner(handle_sigint=False) task_a = StubTask("task_a") - await runner.add_worker(task_a) + await runner.add_workers(task_a) task_b = StubTask("task_b") bus = runner.bus @@ -195,7 +195,7 @@ class TestPipelineRunner(unittest.IsolatedAsyncioTestCase): await asyncio.wait_for(runner.run(), timeout=5.0) # Verify task_b was added (duplicate is silently skipped) - await runner.add_worker(StubTask("task_b")) + await runner.add_workers(StubTask("task_b")) if __name__ == "__main__":