Commit Graph

9601 Commits

Author SHA1 Message Date
Mark Backman
9054912dfb Update to add_workers 2026-05-21 23:20:40 -04:00
Mark Backman
0b9500aae4 Match shopping-list client styling to the other UI demos
Restyle from a bespoke dark theme to the light theme the other UI demos
share: the canonical :root tokens (--border, --muted, --highlight), the
#fafafa/#18181b body, the sticky white header with the light/red Connect
button, the fixed bottom-right #status toast, and the amber
ui-highlight-pulse keyframe. index.html drops the custom topbar wrapper for
the standard <header> plus a standalone #status element.
2026-05-21 23:20:40 -04:00
Mark Backman
10b8feb9ea Add shopping-list UIWorker example (bridge-free voice + UI)
Demonstrates the 'every input acts, may speak' pattern without bridging: a
standard voice pipeline (STT → LLM → TTS) whose LLM only converses, plus a
separate UIWorker that does all the list work. The voice pipeline's user
aggregator fires on_user_turn_stopped each turn and dispatches the transcript
to the UIWorker as a respond job (a bus message); the UIWorker reads the
auto-injected <ui_state> snapshot and drives the list silently via add_item /
set_checked / remove_item commands (plus the standard highlight). Items are
checkboxes whose label and checked state the snapshot exposes.

Includes a vanilla-JS client following the existing UI-demo client style.
2026-05-21 23:20:40 -04:00
Mark Backman
1c94feaaff Inject <ui_state> via the LLM's on_before_process_frame hook
Move <ui_state> snapshot injection out of respond_with_llm into a
cross-cutting on_before_process_frame handler on the UIWorker's LLM, so it
appends the current snapshot to the context the request is built from, just
before each inference. Injection is gated to the user-turn-initiating
inference so a tool-calling turn never stacks duplicate <ui_state> blocks;
respond_with_llm no longer injects manually.

Also drop the bridged parameter from UIWorker: there is no viable way to
bridge a UIWorker between workers — a shared, teed context would be polluted
by the injection, and per-worker turn detection off teed frames isn't
supported. Other workers keep their PipelineWorker bridging.
2026-05-21 23:20:40 -04:00
Mark Backman
950fc10f05 Add document-review UIWorker example
Synthesis example: a ReplyToolMixin UIWorker adds a start_review tool that fans
out to clarity/tone peers via start_user_job_group, translates each reviewer
response into an add_note command in on_job_response, handles a client
note_click event via @on_ui_event, and keeps history across turns.
2026-05-21 23:20:40 -04:00
Mark Backman
07725429b2 Add async-tasks UIWorker example
A UIWorker with a custom reply tool fans research out to three BaseWorker peers
via start_user_job_group; their progress streams to the client as ui-task cards
and the user can cancel a group mid-flight.
2026-05-21 23:20:40 -04:00
Mark Backman
6b0e204d66 Add form-fill UIWorker example
A ReplyToolMixin UIWorker that fills inputs (fills) and toggles checkboxes /
presses submit (click) by voice — the state-changing half of the standard
action set.
2026-05-21 23:20:40 -04:00
Mark Backman
f826da9ac9 Add deixis UIWorker example
A ReplyToolMixin UIWorker that grounds in the user's text selection (the
<selection> block in the snapshot) and points back via select_text — both
directions of deictic reference.
2026-05-21 23:20:40 -04:00
Mark Backman
81b956d963 Add pointing UIWorker example
The voice LLM delegates to a ReplyToolMixin UIWorker that scrolls offscreen
items into view and highlights the phones it names — exercising the scroll_to /
highlight UI commands and the [offscreen] state tag.
2026-05-21 23:20:40 -04:00
Mark Backman
2254a8d0a2 Add hello-snapshot UIWorker example
Smallest UIWorker demo: a voice LLM in the main pipeline delegates
screen-relevant utterances to a UIWorker via a respond job; the UIWorker
auto-injects the current <ui_state> and answers grounded in what's on screen.
Includes a vanilla-JS client that streams accessibility snapshots over RTVI.
2026-05-21 23:20:40 -04:00
Mark Backman
f1f5a986e8 Add UIWorker
UIWorker is an LLMContextWorker that observes and drives a client GUI over the
RTVI UI channel: it stores accessibility snapshots, auto-injects <ui_state> at
the start of each respond job, dispatches client events to @on_ui_event
handlers, sends UI commands back to the client, and surfaces fan-out work as
cancellable task cards via user_job_group(). The optional ReplyToolMixin exposes
a bundled reply tool.

The prompt_guide parameter auto-appends the UI wire-format guide to the LLM's
system instruction (default UI_STATE_PROMPT_GUIDE; override with a string or
disable with None), so the LLM can parse the injected <ui_state> / <ui_event>
messages without the app concatenating the guide by hand.
2026-05-21 23:20:40 -04:00
Mark Backman
02667a7255 Add native RTVI⇄bus UI bridge to PipelineWorker
When RTVI is enabled, PipelineWorker now republishes inbound ui-event /
ui-snapshot / ui-cancel-task messages onto the bus as a broadcast
BusUIEventMessage, and translates outbound BusUICommandMessage / BusUITask*
carriers into the matching RTVI frames. This lets a UIWorker on the bus observe
and drive the client UI with no decorator or manual wiring; when no UIWorker is
present the events are simply unconsumed.

The BusUI* carriers live in the bus layer so both pipeline and workers can
reference them without an import cycle.
2026-05-21 23:03:37 -04:00
Mark Backman
ee3d1128ec Add LLMService.append_system_instruction()
Composes durable text onto a user-provided system instruction (alongside the
turn-completion and async-tool-cancellation addons) so it is prepended on every
inference and survives context-message resets. The user's base prompt is now
snapshotted once and the effective instruction is always rebuilt from it,
replacing the prior lazy capture/restore logic with a single invariant.
2026-05-21 23:03:37 -04:00
Aleix Conchillo Flaqué
e8ec7c585f Rename PipelineRunner.add_worker() to variadic add_workers(*workers)
Lets callers register multiple workers in a single call instead of
awaiting add_worker() repeatedly. Updates all examples, docs, tests,
and proxy worker docstrings to use the new API.
2026-05-21 19:46:53 -07:00
Aleix Conchillo Flaqué
f91179a640 Forward active from PipelineWorker through to BaseWorker
PipelineWorker.__init__ was only forwarding `name` to BaseWorker, so
the `active` flag (the other BaseWorker constructor arg) wasn't
reachable from PipelineWorker callers. Add `active: bool = True` to
the signature and pass it through.
2026-05-21 19:07:13 -07:00
Aleix Conchillo Flaqué
e85f3fe606 update uv.lock 2026-05-21 19:07:13 -07:00
Aleix Conchillo Flaqué
d07ba562eb Separate bus messages from pipeline frames
BusMessage was a mixin tacked onto DataFrame / SystemFrame so the bus
could reuse the frame priority machinery. That made every bus message
also a Frame, which is misleading — bus messages travel on the bus, not
through pipelines. If a worker actually needs to ship a frame, it wraps
it in BusFrameMessage.

BusMessage is now a plain dataclass base carrying source/target.
BusDataMessage and BusSystemMessage are empty subclasses that exist
only as priority markers. The bus router and the priority queue check
``isinstance(item, BusSystemMessage)`` directly instead of
``isinstance(item, SystemFrame)``.

The serializer test that round-tripped DataFrame.name (a non-init
field) is rewritten against a local _MessageWithNonInit(BusDataMessage)
subclass so the serializer's init=False path stays covered.
2026-05-21 19:07:13 -07:00
Aleix Conchillo Flaqué
b03247f360 Rename BaseTask → BaseWorker and reserve "task" for asyncio
Replaces every "task" identifier that referred to the BaseTask
abstraction with "worker". Asyncio task plumbing (asyncio.Task,
BaseTaskManager, TaskManager, create_task, cancel_task, etc.) stays
untouched. Highlights:

- Classes: BaseTask → BaseWorker, PipelineTask → PipelineWorker,
  LLMTask → LLMWorker, LLMContextTask → LLMContextWorker, TaskBus →
  WorkerBus, TaskRegistry → WorkerRegistry, TaskActivationArgs →
  WorkerActivationArgs, TaskReadyData → WorkerReadyData,
  TaskRegistryEntry → WorkerRegistryEntry, TaskObserver →
  WorkerObserver, all Bus*TaskMessage → Bus*WorkerMessage,
  BusAddTaskMessage.task field → worker, BusWorkerRegistryMessage.tasks
  field → workers.
- Methods/decorators: activate_task → activate_worker, deactivate_task
  → deactivate_worker, add_task → add_worker, watch_task →
  watch_worker, @task_ready → @worker_ready, setup_pipeline_task hook
  → setup_pipeline_worker.
- Params/fields: FrameProcessorSetup.pipeline_task and
  FunctionCallParams.pipeline_task → pipeline_worker. Parameter names
  like task_name → worker_name; spawn/run accept worker:.
- Files: pipeline/base_task.py → base_worker.py, pipeline/task.py →
  worker.py (plus a re-export shim at pipeline/task.py),
  task_observer.py → worker_observer.py, task_ready_decorator.py →
  worker_ready_decorator.py, pipecat.tasks → pipecat.workers,
  llm_task.py → llm_worker.py, llm_context_task.py →
  llm_context_worker.py, examples/multi-task → examples/multi-worker.

Back-compat:
- PipelineTask kept as a deprecated subclass of PipelineWorker that
  warns on construction.
- pipecat.pipeline.task re-exports PipelineWorker/PipelineTask/etc. so
  existing user imports keep working.
- FrameProcessor.pipeline_task kept as a deprecated property that
  forwards to pipeline_worker.

Local variables in examples that hold a worker (task = PipelineTask(...))
are renamed to worker = PipelineWorker(...). Asyncio-task locals
(runner_task, etc.) are preserved.
2026-05-21 19:07:13 -07:00
Aleix Conchillo Flaqué
b9aed0d673 Rename BaseTask.send_error to send_bus_error_message
Symmetric with send_bus_message; "send_bus_error" on its own reads
ambiguously (sounds like an error about the bus, à la SIGBUS) and the
underlying types are BusTaskErrorMessage / BusTaskLocalErrorMessage,
so keeping "_message" in the name matches what's actually sent.
2026-05-21 10:13:21 -07:00
Aleix Conchillo Flaqué
d8947c68a9 Rename BaseTask.send_message to send_bus_message
Mirrors on_bus_message and makes it explicit that the call goes out on
the task bus, not on a transport (transports have their own
send_message for client/peer messaging).
2026-05-21 10:13:21 -07:00
Aleix Conchillo Flaqué
373894fc65 Fold BaseTask.handoff_to into activate_task(deactivate_self=...)
BaseTask.handoff_to was just deactivate_self + activate_task. Remove
it and add a deactivate_self flag on activate_task instead, so there's
one entry point for activating another task.

LLMTask now overrides activate_task (mirroring its end() override) to
keep the messages / result_callback hooks that finish an in-progress
tool call before the target is activated. All multi-task examples and
unit tests switch to the new call.
2026-05-21 10:13:21 -07:00
Aleix Conchillo Flaqué
e8bbb5ee09 Add setup_pipeline_runner hook to PIPECAT_SETUP_FILES
PipelineRunner now picks up an async setup_pipeline_runner(runner) hook
from the same PIPECAT_SETUP_FILES env var that PipelineTask already uses
for setup_pipeline_task. Previously the runner used a separate
PIPECAT_RUNNER_SETUP_FILES variable and a setup_runner function — both
are removed.

A new _setup_files module hosts the loader for both hooks and caches
each setup file's module so a single file defining both hooks (e.g. a
debugger that registers a runner-level task in one hook and a per-task
observer in the other) sees its module-level state preserved across
invocations.
2026-05-21 10:13:21 -07:00
Aleix Conchillo Flaqué
a2e58044f2 update pyproject.toml and uv.lock 2026-05-21 10:13:21 -07:00
Aleix Conchillo Flaqué
8867426a97 Document sensor-controller example in the multi-task README
Add a Local-section entry with the running instructions, example
questions, and architecture diagram for the new sensor-controller
example.
2026-05-21 10:13:21 -07:00
Aleix Conchillo Flaqué
d984393213 Make local-handoff builder functions public
Rename ``_build_greeter`` / ``_build_support`` to ``build_greeter`` /
``build_support`` to match the convention used by other multi-task
examples (e.g. ``build_sensor_controller``). They're public factories
the example exposes; the leading underscore was misleading.
2026-05-21 10:13:21 -07:00
Aleix Conchillo Flaqué
959fb831f1 Drop redundant name strings from create_task calls in bus + proxy
``BaseObject.create_task`` already auto-names the task based on the
coroutine; the explicit ``f"{self}::..."`` strings duplicated that
default and made the call sites noisier. Remove them.
2026-05-21 10:13:21 -07:00
Aleix Conchillo Flaqué
410190dabb Add sensor-controller multi-task example
A voice agent talking to a worker that owns a simulated temperature
sensor. Demonstrates two ``PipelineTask`` instances side by side
communicating purely via ``BusJobRequestMessage`` /
``BusJobResponseMessage`` — the worker is a plain ``PipelineTask``
(no ``LLMTask`` subclassing, not bridged) whose pipeline runs both an
autonomous sensor tick loop and its own tool-calling LLM:

    SensorReader -> SensorStats -> user_agg -> llm -> assistant_agg

The voice agent's LLM has a single tool, ``ask_controller(question)``,
that forwards the user's request verbatim to the worker and speaks
back the controller's reply. The worker LLM has direct tools to read
the current temperature, inspect rolling stats, set the target, or
change the response rate; the sensor simulation drifts toward the
target with a first-order lag plus Gaussian noise.

Job responses are paired with completed LLM turns via the assistant
aggregator's ``on_assistant_turn_stopped`` event, skipping empty
turn-stopped events that fire between a tool call and its result.
2026-05-21 10:13:21 -07:00
Aleix Conchillo Flaqué
f22350ce2f Use symmetric spawn-then-run() pattern in multi-task examples
Switch every example to ``await runner.spawn(task)`` followed by
``await runner.run()`` (no task argument), and ``await runner.cancel()``
on client-disconnected instead of ``await task.cancel()``. This makes
the main pipeline task look the same as the worker / proxy tasks
spawned alongside it, and lets ``runner.cancel()`` drive a uniform
shutdown across every root task on the bus.
2026-05-21 10:13:21 -07:00
Aleix Conchillo Flaqué
5f1b91bb89 Clarify PipelineRunner.run() docstring for the no-task form
Spell out that spawned tasks finishing on their own does not unblock
``runner.run()`` when called without a ``task`` argument. The form is
for hosts (e.g. FastAPI servers) that have no single "main" pipeline
and want to stay up across many spawned sessions; callers who want
the runner to finish when a specific pipeline finishes should pass
that pipeline as ``task``.
2026-05-21 10:13:21 -07:00
Aleix Conchillo Flaqué
cd22742e10 Default WebSocketProxyClientTask to active=False
``on_activated`` on this task opens the upstream WebSocket connection,
which is almost always something the caller wants to trigger
explicitly (e.g. on local-client-connected). With the BaseTask default
of ``active=True`` the connection was opened twice: once when the task
auto-activated at start, and once again when the caller's
``activate_task("proxy")`` re-fired ``on_activated``. The result on
the remote side was two ``PipelineRunner`` instances per session
instead of one.

Default to ``active=False`` so the activation is a deliberate signal;
pass ``active=True`` explicitly to restore the eager-connect behavior.
2026-05-21 10:13:21 -07:00
Aleix Conchillo Flaqué
9ecb00d097 Skip pgmq/redis lazy-import tests when their extras are not installed
``test_pgmq_bus_lazy_import`` and ``test_redis_bus_lazy_import``
import ``pipecat.bus.network.pgmq`` / ``redis`` directly, which raises
when the optional ``pgmq`` / ``redis`` packages are missing. Gate each
test with ``@unittest.skipUnless`` on a top-level probe of the
underlying package so they're skipped (not errored) in environments
without the extras. ``test_unknown_attribute_raises`` is unaffected.
2026-05-21 10:13:21 -07:00
Aleix Conchillo Flaqué
79ae9740cc Skip pgmq/redis bus tests when their extras are not installed
The PGMQ and Redis bus modules raise an ``Exception`` at import time
when the optional ``pgmq`` / ``redis`` packages are missing, which broke
``pytest`` collection in environments without those extras (e.g. CI
that uses ``--no-extra gstreamer --no-extra local``). Wrap the imports
in ``try/except`` and ``raise unittest.SkipTest`` so the whole test
module is skipped cleanly instead of failing collection.
2026-05-21 10:13:21 -07:00
Aleix Conchillo Flaqué
df704a34f1 Move _wait_tasks_ready into the job-group internals section in BaseTask
``_wait_tasks_ready`` is only called from
``create_job_group_and_request_job``, so it belongs with the other
job-group internals (``_create_job_group``, ``_send_job_request``,
``_task_timeout``, ...) rather than next to the task-readiness
helpers (``_register_ready``, ``_on_watched_task_ready``).
2026-05-21 10:13:21 -07:00
Aleix Conchillo Flaqué
7dc2b41412 Drive task end/cancel shutdown from BaseTask by default
``BaseTask._handle_task_end`` and ``_handle_task_cancel`` now call
``stop()`` after propagating to children, so bus-only subclasses
(``WebSocketProxyClientTask``, ``WebSocketProxyServerTask``, custom
worker tasks like ``CodeWorker``) don't need to override these
handlers just to set ``_finished_event``.

Children-propagation is extracted into ``_propagate_end_to_children``
and ``_propagate_cancel_to_children`` so ``PipelineTask`` can call
them directly without invoking ``stop()`` prematurely — the pipeline
still drives its own shutdown through the ``EndFrame`` / ``CancelFrame``
path, which triggers ``on_pipeline_finished`` and ``stop()`` after the
pipeline drains.

Drop the now-redundant overrides from the WebSocket proxy tasks.
2026-05-21 10:13:21 -07:00
Aleix Conchillo Flaqué
4d9e258e55 Collapse _pipeline_finished_event into BaseTask._finished_event
PipelineTask had its own ``_pipeline_finished_event`` that signalled
"pipeline run has truly finished" — the same role ``BaseTask._finished_event``
plays for bus-only tasks. They were two events with the same intent.

Set ``_finished_event`` directly when the pipeline-end frame propagates
through the sink, drop the now-redundant field, and drop the
``clear()`` after wait so the event stays set for the lifetime of the
task. As a side-effect, ``await pipeline_task.wait()`` from outside now
resolves at the moment the pipeline finishes, matching the semantics
of bus-only tasks.
2026-05-21 10:13:21 -07:00
Aleix Conchillo Flaqué
de1bd7cb7e code-assistant: work around CancelledError swallow in ClaudeSDKClient
claude_agent_sdk's _AsyncioTaskHandle.wait() uses
`with suppress(asyncio.CancelledError)` to silence the inner read
task's expected cancellation, but it also swallows the outer task's
cancellation if it lands on the same await — causing cancel_task to
time out.

Bypass `async with ClaudeSDKClient` and drive connect/disconnect
ourselves so disconnect() runs in a finally where the outer
CancelledError has already been raised and suspended by Python's
exception machinery, out of reach of the SDK's suppress.
2026-05-21 10:13:21 -07:00
Aleix Conchillo Flaqué
a5bb9f65de Fix LLMTask._finish_function_call to bypass deferral
self.queue_frame would defer the LLMMessagesAppendFrame because
_finish_function_call always runs inside a tool call. The subsequent
_flush_pipeline() then returned before the goodbye/handoff LLM output
was actually delivered. Use super().queue_frame to push the frame
straight into the pipeline, matching the pattern used in
_flush_pipeline().
2026-05-21 10:13:21 -07:00
Aleix Conchillo Flaqué
402cf8dade Port multi-task unit tests from pipecat-subagents
Brings over 215 tests across 15 files covering the new
multi-task framework: BaseTask / PipelineTask bus lifecycle,
job RPC and job groups, the bus message hierarchy and serializers,
TaskBus + AsyncQueueBus + RedisBus + PgmqBus (with direct and
isolated backends), TaskRegistry, the BusBridgeProcessor, the
WebSocket proxy tasks, the LLMTask deferral logic, and the
PipelineRunner spawn-and-attach flow.
2026-05-21 10:13:21 -07:00
Jon Taylor
d757d8d06d Split PgmqBus into orchestrator + pluggable backends
Move the wire-side of PGMQ operations into a new
``pipecat.bus.network.pgmq_backends`` module with a ``PgmqBackend``
Protocol, a ``DirectPgmqBackend`` (peers discovered by queue prefix),
and an ``IsolatedPgmqBackend`` (SECURITY DEFINER ``public.bus_*``
wrappers over an asyncpg pool). ``PgmqBus`` now delegates join,
publish, read, archive, and leave to the configured backend.

Construct ``PgmqBus`` with either ``pgmq=PGMQueue`` (uses
``DirectPgmqBackend``) or ``backend=PgmqBackend`` (any backend); the
two are mutually exclusive.
2026-05-21 10:13:21 -07:00
Aleix Conchillo Flaqué
a63abc41b6 Add README and env.example for multi-task examples
Adapts the pipecat-subagents `examples/README.md` to the new
layout (`multi-task/` umbrella, `local-handoff/`, `distributed-handoff/`,
`remote-proxy-assistant/`, `parallel-debate/`, `code-assistant/`),
updates the agent→task / job-RPC vocabulary, drops the
single-agent and llm-and-flows examples (gone in the port), and
adds a new section for the PGMQ handoff transport.
2026-05-21 10:13:21 -07:00
Aleix Conchillo Flaqué
4c5fb85856 Add pgmq and redis extras for the distributed bus implementations
`pipecat.bus.network.pgmq` and `pipecat.bus.network.redis` need
optional dependencies. Adding `pgmq` and `redis` extras so users
can `pip install pipecat-ai[pgmq]` / `pip install pipecat-ai[redis]`
to opt in.
2026-05-21 10:13:18 -07:00
Aleix Conchillo Flaqué
4fbeb5fbcb Add remote-proxy-assistant example
Demonstrates the WebSocket proxy tasks: a local `main.py` voice
bot uses `WebSocketProxyClientTask` to forward bus messages
(including `BusFrameMessage`s) to a remote `assistant.py`
FastAPI server. Each incoming connection spawns a
`WebSocketProxyServerTask` plus an `LLMTask` assistant on a
per-session `PipelineRunner`.
2026-05-21 10:12:51 -07:00
Aleix Conchillo Flaqué
4509caa724 Add distributed-handoff examples (redis and pgmq)
Two transports of the same shape: a main task that hosts the
voice pipeline plus a network-backed `TaskBus` (`RedisBus` or
`PgmqBus`), and a standalone `llm.py` worker process for the
greeter / support LLM. Workers connect to the same bus channel,
register on the shared `TaskRegistry`, and the main task waits
on `runner.registry.watch("greeter", ...)` before sending the
welcome activation so it doesn't fire before the worker is up.
2026-05-21 10:12:51 -07:00
Aleix Conchillo Flaqué
0f7211d072 Add parallel-debate example
A voice moderator that fans out a debate topic to three worker
tasks (advocate, critic, analyst) via `task.job_group(...)`,
then synthesizes their replies. Workers are `LLMContextTask`s
that keep their own conversation context across rounds and use
the assistant-aggregator's `on_assistant_turn_stopped` event
to ship the completed turn back as a job response.
2026-05-21 10:12:51 -07:00
Aleix Conchillo Flaqué
7c4294b7f6 Add local-handoff-two-agents-tts example
Variant of the local handoff example with per-task TTS voices.
Each child task wraps the LLM with its own `CartesiaTTSService`
in a custom pipeline override, so the main task has no TTS and
audio comes from whichever child is active over the bus.
2026-05-21 10:12:51 -07:00
Aleix Conchillo Flaqué
6964686808 Add code-assistant example
Voice code assistant that dispatches questions to a Claude Agent
SDK worker. The main task runs the voice pipeline (STT + LLM + TTS)
and an `ask_code` direct function. `CodeWorker` is a bus-only
`BaseTask` spawned on the runner: it accepts `@job`-style
requests through the bus, queues them onto an asyncio queue, and
runs them sequentially through a persistent Claude SDK session so
follow-ups share context. The example shows the job-RPC surface
(`task.job("code_worker", ...)`), bus-only tasks (no pipeline),
and the `pipeline_task` field on `FunctionCallParams`.
2026-05-21 10:12:51 -07:00
Aleix Conchillo Flaqué
f364c088cf Add local-handoff-two-agents example
Two LLM tasks (greeter and support) handing off to each other over
the local `AsyncQueueBus`. The main task owns the transport
pipeline (STT, TTS, transport I/O) and the child tasks each run
their own LLM behind a `BusBridgeProcessor`. Each child uses
`bridged=()` so `PipelineTask` auto-wraps its pipeline with
the bus edge processors, and `transfer_to_agent` / `end_conversation`
tools demonstrate `handoff_to(...)` and `end(...)`.
2026-05-21 10:12:51 -07:00
Aleix Conchillo Flaqué
42204c4d0f Fix pyright errors in new bus/task/proxy code
- `TaskBus._router_task`: cast the narrowed `SystemFrame` back
  to `BusMessage` for the subscriber callback.
- `bus.network.__init__`: expose `PgmqBus` / `RedisBus` to
  the type-checker via a TYPE_CHECKING block so `__all__` is
  satisfied; runtime path still goes through `__getattr__`.
- `RedisBus`: subscribe through a local before assigning
  `self._pubsub`, and `assert self._pubsub is not None` in
  the reader loop.
- `BaseTask.on_job_error` accepts
  `BusJobResponseMessage | BusJobResponseUrgentMessage` to match
  what is dispatched.
- `JobGroupContext.__aexit__` / `JobContext.__aexit__`: assert
  `self._group is not None` before `wait()`.
- `@task_ready` collector: type handlers dict as `dict[str, Callable]`
  so the `.__name__` read on a duplicate handler typechecks.
- WebSocket proxy client/server: assert the socket is set in
  `_receive_loop`, and decode `str` payloads to bytes before
  handing them to the serializer.
2026-05-21 10:12:51 -07:00
Aleix Conchillo Flaqué
5f86e39038 Port WebSocket proxies to the new BaseTask API
`WebSocketProxyServerAgent` / `WebSocketProxyClientAgent` are
renamed to `WebSocketProxyServerTask` / `WebSocketProxyClientTask`
and updated for the post-refactor surface:

- Drop `bus=` from the constructor; the bus arrives via
  `BaseTask.attach` from the runner.
- Constructor params `agent_name` / `remote_agent_name` /
  `local_agent_name` → `task_name` / `remote_task_name` /
  `local_task_name` (matching `BusBridgeProcessor`).
- Move setup logic from the now-removed `on_ready` hook into
  `start()`; replace `_stop()` overrides with `stop()`.
- Add `_handle_task_end` / `_handle_task_cancel` overrides that
  set `_finished_event` so `PipelineRunner._cancel_spawned_tasks`
  can drive these bus-only tasks to a clean exit.
- Update the registry-message field reference
  (`agents=`/`message.agents` → `tasks=`/`message.tasks`)
  and `TaskReadyData.task_name` access.
- Tighten the server's `_send_ws` exception handling to only
  catch `WebSocketDisconnect`.
- Update install hints (`pipecat-ai[websockets-base]` for the
  client, `starlette` for the server) and refresh docstrings/
  examples to use `runner.spawn(...)`.
2026-05-21 10:12:51 -07:00
Aleix Conchillo Flaqué
86f8f137a8 Sweep agent->task and task->job in docstrings and identifiers
Cleans up leftover "agent" terminology in module/class/method
docstrings across `pipecat.bus`, `pipecat.registry`,
`pipecat.pipeline`, and `pipecat.tasks.llm`, and renames
job-RPC phrasing ("task request", "task identifier",
"task group execution") to use "job" consistently.

API-visible changes:

- `BusBridgeProcessor(agent_name=, target_agent=)` → `task_name=` /
  `target_task=`.
- `@task_ready` decorator's internal marker
  `fn.agent_ready_name` → `fn.task_ready_name`.
- `@tool` decorator's internal marker
  `fn.is_agent_tool` → `fn.is_llm_tool`.
- `PIPECAT_SUBAGENTS_SETUP_FILES` env var →
  `PIPECAT_RUNNER_SETUP_FILES`.
- pgmq/redis bus install hints point at `pipecat-ai\[extra\]`
  rather than the old `pipecat-ai-subagents\[extra\]` package.
2026-05-21 10:12:51 -07:00