Restyle from a bespoke dark theme to the light theme the other UI demos
share: the canonical :root tokens (--border, --muted, --highlight), the
#fafafa/#18181b body, the sticky white header with the light/red Connect
button, the fixed bottom-right #status toast, and the amber
ui-highlight-pulse keyframe. index.html drops the custom topbar wrapper for
the standard <header> plus a standalone #status element.
Demonstrates the 'every input acts, may speak' pattern without bridging: a
standard voice pipeline (STT → LLM → TTS) whose LLM only converses, plus a
separate UIWorker that does all the list work. The voice pipeline's user
aggregator fires on_user_turn_stopped each turn and dispatches the transcript
to the UIWorker as a respond job (a bus message); the UIWorker reads the
auto-injected <ui_state> snapshot and drives the list silently via add_item /
set_checked / remove_item commands (plus the standard highlight). Items are
checkboxes whose label and checked state the snapshot exposes.
Includes a vanilla-JS client following the existing UI-demo client style.
Move <ui_state> snapshot injection out of respond_with_llm into a
cross-cutting on_before_process_frame handler on the UIWorker's LLM, so it
appends the current snapshot to the context the request is built from, just
before each inference. Injection is gated to the user-turn-initiating
inference so a tool-calling turn never stacks duplicate <ui_state> blocks;
respond_with_llm no longer injects manually.
Also drop the bridged parameter from UIWorker: there is no viable way to
bridge a UIWorker between workers — a shared, teed context would be polluted
by the injection, and per-worker turn detection off teed frames isn't
supported. Other workers keep their PipelineWorker bridging.
Synthesis example: a ReplyToolMixin UIWorker adds a start_review tool that fans
out to clarity/tone peers via start_user_job_group, translates each reviewer
response into an add_note command in on_job_response, handles a client
note_click event via @on_ui_event, and keeps history across turns.
A UIWorker with a custom reply tool fans research out to three BaseWorker peers
via start_user_job_group; their progress streams to the client as ui-task cards
and the user can cancel a group mid-flight.
A ReplyToolMixin UIWorker that fills inputs (fills) and toggles checkboxes /
presses submit (click) by voice — the state-changing half of the standard
action set.
A ReplyToolMixin UIWorker that grounds in the user's text selection (the
<selection> block in the snapshot) and points back via select_text — both
directions of deictic reference.
The voice LLM delegates to a ReplyToolMixin UIWorker that scrolls offscreen
items into view and highlights the phones it names — exercising the scroll_to /
highlight UI commands and the [offscreen] state tag.
Smallest UIWorker demo: a voice LLM in the main pipeline delegates
screen-relevant utterances to a UIWorker via a respond job; the UIWorker
auto-injects the current <ui_state> and answers grounded in what's on screen.
Includes a vanilla-JS client that streams accessibility snapshots over RTVI.
UIWorker is an LLMContextWorker that observes and drives a client GUI over the
RTVI UI channel: it stores accessibility snapshots, auto-injects <ui_state> at
the start of each respond job, dispatches client events to @on_ui_event
handlers, sends UI commands back to the client, and surfaces fan-out work as
cancellable task cards via user_job_group(). The optional ReplyToolMixin exposes
a bundled reply tool.
The prompt_guide parameter auto-appends the UI wire-format guide to the LLM's
system instruction (default UI_STATE_PROMPT_GUIDE; override with a string or
disable with None), so the LLM can parse the injected <ui_state> / <ui_event>
messages without the app concatenating the guide by hand.
When RTVI is enabled, PipelineWorker now republishes inbound ui-event /
ui-snapshot / ui-cancel-task messages onto the bus as a broadcast
BusUIEventMessage, and translates outbound BusUICommandMessage / BusUITask*
carriers into the matching RTVI frames. This lets a UIWorker on the bus observe
and drive the client UI with no decorator or manual wiring; when no UIWorker is
present the events are simply unconsumed.
The BusUI* carriers live in the bus layer so both pipeline and workers can
reference them without an import cycle.
Composes durable text onto a user-provided system instruction (alongside the
turn-completion and async-tool-cancellation addons) so it is prepended on every
inference and survives context-message resets. The user's base prompt is now
snapshotted once and the effective instruction is always rebuilt from it,
replacing the prior lazy capture/restore logic with a single invariant.
Lets callers register multiple workers in a single call instead of
awaiting add_worker() repeatedly. Updates all examples, docs, tests,
and proxy worker docstrings to use the new API.
PipelineWorker.__init__ was only forwarding `name` to BaseWorker, so
the `active` flag (the other BaseWorker constructor arg) wasn't
reachable from PipelineWorker callers. Add `active: bool = True` to
the signature and pass it through.
BusMessage was a mixin tacked onto DataFrame / SystemFrame so the bus
could reuse the frame priority machinery. That made every bus message
also a Frame, which is misleading — bus messages travel on the bus, not
through pipelines. If a worker actually needs to ship a frame, it wraps
it in BusFrameMessage.
BusMessage is now a plain dataclass base carrying source/target.
BusDataMessage and BusSystemMessage are empty subclasses that exist
only as priority markers. The bus router and the priority queue check
``isinstance(item, BusSystemMessage)`` directly instead of
``isinstance(item, SystemFrame)``.
The serializer test that round-tripped DataFrame.name (a non-init
field) is rewritten against a local _MessageWithNonInit(BusDataMessage)
subclass so the serializer's init=False path stays covered.
Symmetric with send_bus_message; "send_bus_error" on its own reads
ambiguously (sounds like an error about the bus, à la SIGBUS) and the
underlying types are BusTaskErrorMessage / BusTaskLocalErrorMessage,
so keeping "_message" in the name matches what's actually sent.
Mirrors on_bus_message and makes it explicit that the call goes out on
the task bus, not on a transport (transports have their own
send_message for client/peer messaging).
BaseTask.handoff_to was just deactivate_self + activate_task. Remove
it and add a deactivate_self flag on activate_task instead, so there's
one entry point for activating another task.
LLMTask now overrides activate_task (mirroring its end() override) to
keep the messages / result_callback hooks that finish an in-progress
tool call before the target is activated. All multi-task examples and
unit tests switch to the new call.
PipelineRunner now picks up an async setup_pipeline_runner(runner) hook
from the same PIPECAT_SETUP_FILES env var that PipelineTask already uses
for setup_pipeline_task. Previously the runner used a separate
PIPECAT_RUNNER_SETUP_FILES variable and a setup_runner function — both
are removed.
A new _setup_files module hosts the loader for both hooks and caches
each setup file's module so a single file defining both hooks (e.g. a
debugger that registers a runner-level task in one hook and a per-task
observer in the other) sees its module-level state preserved across
invocations.
Rename ``_build_greeter`` / ``_build_support`` to ``build_greeter`` /
``build_support`` to match the convention used by other multi-task
examples (e.g. ``build_sensor_controller``). They're public factories
the example exposes; the leading underscore was misleading.
``BaseObject.create_task`` already auto-names the task based on the
coroutine; the explicit ``f"{self}::..."`` strings duplicated that
default and made the call sites noisier. Remove them.
A voice agent talking to a worker that owns a simulated temperature
sensor. Demonstrates two ``PipelineTask`` instances side by side
communicating purely via ``BusJobRequestMessage`` /
``BusJobResponseMessage`` — the worker is a plain ``PipelineTask``
(no ``LLMTask`` subclassing, not bridged) whose pipeline runs both an
autonomous sensor tick loop and its own tool-calling LLM:
SensorReader -> SensorStats -> user_agg -> llm -> assistant_agg
The voice agent's LLM has a single tool, ``ask_controller(question)``,
that forwards the user's request verbatim to the worker and speaks
back the controller's reply. The worker LLM has direct tools to read
the current temperature, inspect rolling stats, set the target, or
change the response rate; the sensor simulation drifts toward the
target with a first-order lag plus Gaussian noise.
Job responses are paired with completed LLM turns via the assistant
aggregator's ``on_assistant_turn_stopped`` event, skipping empty
turn-stopped events that fire between a tool call and its result.
Switch every example to ``await runner.spawn(task)`` followed by
``await runner.run()`` (no task argument), and ``await runner.cancel()``
on client-disconnected instead of ``await task.cancel()``. This makes
the main pipeline task look the same as the worker / proxy tasks
spawned alongside it, and lets ``runner.cancel()`` drive a uniform
shutdown across every root task on the bus.
Spell out that spawned tasks finishing on their own does not unblock
``runner.run()`` when called without a ``task`` argument. The form is
for hosts (e.g. FastAPI servers) that have no single "main" pipeline
and want to stay up across many spawned sessions; callers who want
the runner to finish when a specific pipeline finishes should pass
that pipeline as ``task``.
``on_activated`` on this task opens the upstream WebSocket connection,
which is almost always something the caller wants to trigger
explicitly (e.g. on local-client-connected). With the BaseTask default
of ``active=True`` the connection was opened twice: once when the task
auto-activated at start, and once again when the caller's
``activate_task("proxy")`` re-fired ``on_activated``. The result on
the remote side was two ``PipelineRunner`` instances per session
instead of one.
Default to ``active=False`` so the activation is a deliberate signal;
pass ``active=True`` explicitly to restore the eager-connect behavior.
``test_pgmq_bus_lazy_import`` and ``test_redis_bus_lazy_import``
import ``pipecat.bus.network.pgmq`` / ``redis`` directly, which raises
when the optional ``pgmq`` / ``redis`` packages are missing. Gate each
test with ``@unittest.skipUnless`` on a top-level probe of the
underlying package so they're skipped (not errored) in environments
without the extras. ``test_unknown_attribute_raises`` is unaffected.
The PGMQ and Redis bus modules raise an ``Exception`` at import time
when the optional ``pgmq`` / ``redis`` packages are missing, which broke
``pytest`` collection in environments without those extras (e.g. CI
that uses ``--no-extra gstreamer --no-extra local``). Wrap the imports
in ``try/except`` and ``raise unittest.SkipTest`` so the whole test
module is skipped cleanly instead of failing collection.
``_wait_tasks_ready`` is only called from
``create_job_group_and_request_job``, so it belongs with the other
job-group internals (``_create_job_group``, ``_send_job_request``,
``_task_timeout``, ...) rather than next to the task-readiness
helpers (``_register_ready``, ``_on_watched_task_ready``).
``BaseTask._handle_task_end`` and ``_handle_task_cancel`` now call
``stop()`` after propagating to children, so bus-only subclasses
(``WebSocketProxyClientTask``, ``WebSocketProxyServerTask``, custom
worker tasks like ``CodeWorker``) don't need to override these
handlers just to set ``_finished_event``.
Children-propagation is extracted into ``_propagate_end_to_children``
and ``_propagate_cancel_to_children`` so ``PipelineTask`` can call
them directly without invoking ``stop()`` prematurely — the pipeline
still drives its own shutdown through the ``EndFrame`` / ``CancelFrame``
path, which triggers ``on_pipeline_finished`` and ``stop()`` after the
pipeline drains.
Drop the now-redundant overrides from the WebSocket proxy tasks.
PipelineTask had its own ``_pipeline_finished_event`` that signalled
"pipeline run has truly finished" — the same role ``BaseTask._finished_event``
plays for bus-only tasks. They were two events with the same intent.
Set ``_finished_event`` directly when the pipeline-end frame propagates
through the sink, drop the now-redundant field, and drop the
``clear()`` after wait so the event stays set for the lifetime of the
task. As a side-effect, ``await pipeline_task.wait()`` from outside now
resolves at the moment the pipeline finishes, matching the semantics
of bus-only tasks.
claude_agent_sdk's _AsyncioTaskHandle.wait() uses
`with suppress(asyncio.CancelledError)` to silence the inner read
task's expected cancellation, but it also swallows the outer task's
cancellation if it lands on the same await — causing cancel_task to
time out.
Bypass `async with ClaudeSDKClient` and drive connect/disconnect
ourselves so disconnect() runs in a finally where the outer
CancelledError has already been raised and suspended by Python's
exception machinery, out of reach of the SDK's suppress.
self.queue_frame would defer the LLMMessagesAppendFrame because
_finish_function_call always runs inside a tool call. The subsequent
_flush_pipeline() then returned before the goodbye/handoff LLM output
was actually delivered. Use super().queue_frame to push the frame
straight into the pipeline, matching the pattern used in
_flush_pipeline().
Brings over 215 tests across 15 files covering the new
multi-task framework: BaseTask / PipelineTask bus lifecycle,
job RPC and job groups, the bus message hierarchy and serializers,
TaskBus + AsyncQueueBus + RedisBus + PgmqBus (with direct and
isolated backends), TaskRegistry, the BusBridgeProcessor, the
WebSocket proxy tasks, the LLMTask deferral logic, and the
PipelineRunner spawn-and-attach flow.
Move the wire-side of PGMQ operations into a new
``pipecat.bus.network.pgmq_backends`` module with a ``PgmqBackend``
Protocol, a ``DirectPgmqBackend`` (peers discovered by queue prefix),
and an ``IsolatedPgmqBackend`` (SECURITY DEFINER ``public.bus_*``
wrappers over an asyncpg pool). ``PgmqBus`` now delegates join,
publish, read, archive, and leave to the configured backend.
Construct ``PgmqBus`` with either ``pgmq=PGMQueue`` (uses
``DirectPgmqBackend``) or ``backend=PgmqBackend`` (any backend); the
two are mutually exclusive.
Adapts the pipecat-subagents `examples/README.md` to the new
layout (`multi-task/` umbrella, `local-handoff/`, `distributed-handoff/`,
`remote-proxy-assistant/`, `parallel-debate/`, `code-assistant/`),
updates the agent→task / job-RPC vocabulary, drops the
single-agent and llm-and-flows examples (gone in the port), and
adds a new section for the PGMQ handoff transport.
`pipecat.bus.network.pgmq` and `pipecat.bus.network.redis` need
optional dependencies. Adding `pgmq` and `redis` extras so users
can `pip install pipecat-ai[pgmq]` / `pip install pipecat-ai[redis]`
to opt in.
Demonstrates the WebSocket proxy tasks: a local `main.py` voice
bot uses `WebSocketProxyClientTask` to forward bus messages
(including `BusFrameMessage`s) to a remote `assistant.py`
FastAPI server. Each incoming connection spawns a
`WebSocketProxyServerTask` plus an `LLMTask` assistant on a
per-session `PipelineRunner`.
Two transports of the same shape: a main task that hosts the
voice pipeline plus a network-backed `TaskBus` (`RedisBus` or
`PgmqBus`), and a standalone `llm.py` worker process for the
greeter / support LLM. Workers connect to the same bus channel,
register on the shared `TaskRegistry`, and the main task waits
on `runner.registry.watch("greeter", ...)` before sending the
welcome activation so it doesn't fire before the worker is up.
A voice moderator that fans out a debate topic to three worker
tasks (advocate, critic, analyst) via `task.job_group(...)`,
then synthesizes their replies. Workers are `LLMContextTask`s
that keep their own conversation context across rounds and use
the assistant-aggregator's `on_assistant_turn_stopped` event
to ship the completed turn back as a job response.
Variant of the local handoff example with per-task TTS voices.
Each child task wraps the LLM with its own `CartesiaTTSService`
in a custom pipeline override, so the main task has no TTS and
audio comes from whichever child is active over the bus.
Voice code assistant that dispatches questions to a Claude Agent
SDK worker. The main task runs the voice pipeline (STT + LLM + TTS)
and an `ask_code` direct function. `CodeWorker` is a bus-only
`BaseTask` spawned on the runner: it accepts `@job`-style
requests through the bus, queues them onto an asyncio queue, and
runs them sequentially through a persistent Claude SDK session so
follow-ups share context. The example shows the job-RPC surface
(`task.job("code_worker", ...)`), bus-only tasks (no pipeline),
and the `pipeline_task` field on `FunctionCallParams`.
Two LLM tasks (greeter and support) handing off to each other over
the local `AsyncQueueBus`. The main task owns the transport
pipeline (STT, TTS, transport I/O) and the child tasks each run
their own LLM behind a `BusBridgeProcessor`. Each child uses
`bridged=()` so `PipelineTask` auto-wraps its pipeline with
the bus edge processors, and `transfer_to_agent` / `end_conversation`
tools demonstrate `handoff_to(...)` and `end(...)`.
- `TaskBus._router_task`: cast the narrowed `SystemFrame` back
to `BusMessage` for the subscriber callback.
- `bus.network.__init__`: expose `PgmqBus` / `RedisBus` to
the type-checker via a TYPE_CHECKING block so `__all__` is
satisfied; runtime path still goes through `__getattr__`.
- `RedisBus`: subscribe through a local before assigning
`self._pubsub`, and `assert self._pubsub is not None` in
the reader loop.
- `BaseTask.on_job_error` accepts
`BusJobResponseMessage | BusJobResponseUrgentMessage` to match
what is dispatched.
- `JobGroupContext.__aexit__` / `JobContext.__aexit__`: assert
`self._group is not None` before `wait()`.
- `@task_ready` collector: type handlers dict as `dict[str, Callable]`
so the `.__name__` read on a duplicate handler typechecks.
- WebSocket proxy client/server: assert the socket is set in
`_receive_loop`, and decode `str` payloads to bytes before
handing them to the serializer.
`WebSocketProxyServerAgent` / `WebSocketProxyClientAgent` are
renamed to `WebSocketProxyServerTask` / `WebSocketProxyClientTask`
and updated for the post-refactor surface:
- Drop `bus=` from the constructor; the bus arrives via
`BaseTask.attach` from the runner.
- Constructor params `agent_name` / `remote_agent_name` /
`local_agent_name` → `task_name` / `remote_task_name` /
`local_task_name` (matching `BusBridgeProcessor`).
- Move setup logic from the now-removed `on_ready` hook into
`start()`; replace `_stop()` overrides with `stop()`.
- Add `_handle_task_end` / `_handle_task_cancel` overrides that
set `_finished_event` so `PipelineRunner._cancel_spawned_tasks`
can drive these bus-only tasks to a clean exit.
- Update the registry-message field reference
(`agents=`/`message.agents` → `tasks=`/`message.tasks`)
and `TaskReadyData.task_name` access.
- Tighten the server's `_send_ws` exception handling to only
catch `WebSocketDisconnect`.
- Update install hints (`pipecat-ai[websockets-base]` for the
client, `starlette` for the server) and refresh docstrings/
examples to use `runner.spawn(...)`.
Cleans up leftover "agent" terminology in module/class/method
docstrings across `pipecat.bus`, `pipecat.registry`,
`pipecat.pipeline`, and `pipecat.tasks.llm`, and renames
job-RPC phrasing ("task request", "task identifier",
"task group execution") to use "job" consistently.
API-visible changes:
- `BusBridgeProcessor(agent_name=, target_agent=)` → `task_name=` /
`target_task=`.
- `@task_ready` decorator's internal marker
`fn.agent_ready_name` → `fn.task_ready_name`.
- `@tool` decorator's internal marker
`fn.is_agent_tool` → `fn.is_llm_tool`.
- `PIPECAT_SUBAGENTS_SETUP_FILES` env var →
`PIPECAT_RUNNER_SETUP_FILES`.
- pgmq/redis bus install hints point at `pipecat-ai\[extra\]`
rather than the old `pipecat-ai-subagents\[extra\]` package.