diff --git a/changelog/xxxx.added.md b/changelog/xxxx.added.md new file mode 100644 index 000000000..f0dd7fe15 --- /dev/null +++ b/changelog/xxxx.added.md @@ -0,0 +1,3 @@ +- Added `pipecat.workers.ui.UIWorker`, an `LLMContextWorker` that observes and drives a client GUI over the RTVI UI channel: it stores live accessibility snapshots, auto-injects `` at the start of each `respond` job, dispatches client events to `@on_ui_event` handlers, and sends UI commands (`scroll_to`, `highlight`, `select_text`, `click`, `set_input_value`) back to the client. The optional `ReplyToolMixin` exposes a bundled `reply` tool, and `user_job_group(...)` surfaces fan-out work to the client as cancellable task cards. A native RTVI⇄bus UI bridge is built into `PipelineWorker` (active whenever RTVI is enabled), so no decorator or manual wiring is needed: inbound UI messages are broadcast on the bus as `BusUIEventMessage`, and outbound `BusUICommandMessage` / `BusUITask*` carriers are translated into RTVI frames for the client. + +- `UIWorker` auto-injects the UI wire-format guide (`UI_STATE_PROMPT_GUIDE`) into its LLM's system instruction by default, via a `prompt_guide` parameter — pass your own string to override the guide, or `None` to disable. Apps no longer need to concatenate `UI_STATE_PROMPT_GUIDE` into the LLM's `system_instruction` by hand. diff --git a/src/pipecat/workers/ui/__init__.py b/src/pipecat/workers/ui/__init__.py new file mode 100644 index 000000000..03ae5e6c6 --- /dev/null +++ b/src/pipecat/workers/ui/__init__.py @@ -0,0 +1,49 @@ +# +# Copyright (c) 2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""UI worker: LLM worker that observes and drives a GUI app. + +Composes the RTVI wire protocol for client UI events, accessibility-tree +snapshots, and server-emitted UI commands (in +``pipecat.processors.frameworks.rtvi.models``) with an opt-in +``ReplyToolMixin`` exposing the canonical bundled reply tool +(``answer`` + optional ``scroll_to`` / ``highlight`` / ...). + +The RTVI⇄bus bridge that connects a ``UIWorker`` to the client is built +into ``PipelineWorker`` and is active whenever RTVI is enabled — there +is no decorator or bridge to wire up. +""" + +from pipecat.bus.ui_messages import ( + BusUICommandMessage, + BusUIEventMessage, + BusUITaskCompletedMessage, + BusUITaskGroupCompletedMessage, + BusUITaskGroupStartedMessage, + BusUITaskUpdateMessage, +) +from pipecat.workers.ui.ui_event_decorator import on_ui_event +from pipecat.workers.ui.ui_prompts import UI_STATE_PROMPT_GUIDE +from pipecat.workers.ui.ui_tools import ReplyToolMixin +from pipecat.workers.ui.ui_worker import UIWorker + +# Built-in UI command payload models (Toast, Navigate, ScrollTo, +# Highlight, Focus, Click, SetInputValue, SelectText) live in +# ``pipecat.processors.frameworks.rtvi.models``. Import them from there +# directly. + +__all__ = [ + "BusUICommandMessage", + "BusUIEventMessage", + "BusUITaskCompletedMessage", + "BusUITaskGroupCompletedMessage", + "BusUITaskGroupStartedMessage", + "BusUITaskUpdateMessage", + "ReplyToolMixin", + "UIWorker", + "UI_STATE_PROMPT_GUIDE", + "on_ui_event", +] diff --git a/src/pipecat/workers/ui/ui_event_decorator.py b/src/pipecat/workers/ui/ui_event_decorator.py new file mode 100644 index 000000000..a4d5c0b22 --- /dev/null +++ b/src/pipecat/workers/ui/ui_event_decorator.py @@ -0,0 +1,67 @@ +# +# Copyright (c) 2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Decorator for marking worker methods as UI event handlers.""" + + +def on_ui_event(name: str): + """Mark a worker method as a handler for a named UI event. + + On ``UIWorker`` subclasses, decorated methods are automatically + dispatched when a ``BusUIEventMessage`` with a matching ``name`` + arrives. + + Example:: + + class MyUIWorker(UIWorker): + @on_ui_event("nav_click") + async def on_nav(self, message): + view = message.payload.get("view") + ... + + Args: + name: The UI event name to match. + """ + + def decorator(fn): + fn.is_ui_event_handler = True + fn.ui_event_name = name + return fn + + return decorator + + +def _collect_ui_event_handlers(obj) -> dict: + """Collect all ``@on_ui_event`` decorated bound methods from an object. + + Walks the MRO so that overridden methods in subclasses take + precedence over base-class definitions. + + Returns: + A dict mapping event name to the bound method. + + Raises: + ValueError: If two handlers share the same event name on the + same subclass level. + """ + seen: set[str] = set() + handlers: dict[str, object] = {} + source_names: dict[str, str] = {} # event name -> defining method name, for errors + for cls in type(obj).__mro__: + for attr_name, val in cls.__dict__.items(): + if attr_name in seen: + continue + seen.add(attr_name) + if callable(val) and getattr(val, "is_ui_event_handler", False): + event_name = val.ui_event_name + if event_name in handlers: + raise ValueError( + f"Duplicate @on_ui_event handler for '{event_name}': " + f"'{attr_name}' conflicts with '{source_names[event_name]}'" + ) + handlers[event_name] = getattr(obj, attr_name) + source_names[event_name] = attr_name + return handlers diff --git a/src/pipecat/workers/ui/ui_job_context.py b/src/pipecat/workers/ui/ui_job_context.py new file mode 100644 index 000000000..4b72df2ec --- /dev/null +++ b/src/pipecat/workers/ui/ui_job_context.py @@ -0,0 +1,147 @@ +# +# Copyright (c) 2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""User-facing job group context. + +Wraps ``JobGroupContext`` so the work it dispatches is also surfaced +to the UI client through the UI Worker protocol. Apps reach this via +``UIWorker.user_job_group(...)`` rather than constructing it directly. +""" + +from __future__ import annotations + +import time +from typing import TYPE_CHECKING + +from pipecat.bus.ui_messages import ( + BusUITaskGroupCompletedMessage, + BusUITaskGroupStartedMessage, +) +from pipecat.pipeline.job_context import JobGroupContext + +if TYPE_CHECKING: + from pipecat.workers.ui.ui_worker import UIWorker + + +class UserJobGroupContext(JobGroupContext): + """Job group whose lifecycle is forwarded to the UI client. + + Behaves exactly like ``JobGroupContext`` for the dispatching code. + Additionally, on enter the context registers the group with its + parent ``UIWorker`` and publishes a ``BusUITaskGroupStartedMessage``. + The worker forwards any subsequent ``BusJobUpdateMessage`` / + ``BusJobResponseMessage`` whose ``job_id`` matches a registered + group as ``BusUITaskUpdateMessage`` / ``BusUITaskCompletedMessage``. + On exit the context publishes ``BusUITaskGroupCompletedMessage`` and + deregisters. + + Workers don't need to know about the UI surface: any + ``send_job_update`` they emit against the group's ``job_id`` is + forwarded automatically. + + Example:: + + async with self.user_job_group( + "researcher_a", "researcher_b", + payload={"query": query}, + label=f"Research: {query}", + cancellable=True, + ) as tg: + async for event in tg: + ... + results = tg.responses + """ + + def __init__( + self, + worker: UIWorker, + worker_names: tuple[str, ...], + *, + name: str | None = None, + payload: dict | None = None, + timeout: float | None = None, + cancel_on_error: bool = True, + label: str | None = None, + cancellable: bool = True, + ): + """Initialize the UserJobGroupContext. + + Args: + worker: The parent ``UIWorker`` that owns this job group. + worker_names: Names of the workers to send the job to. + name: Optional job name for routing to named ``@job`` + handlers on the workers. + payload: Optional structured data describing the work. + timeout: Optional timeout in seconds covering both the + ready-wait and job execution. + cancel_on_error: Whether to cancel the group if a worker + errors. Defaults to True. + label: Optional human-readable label surfaced to the + client (e.g. ``"Research: Radiohead"``). The client UI + uses it to title the in-flight task card. + cancellable: Whether the client may request cancellation + of this group via the reserved ``__cancel_task`` event. + Defaults to True. + """ + super().__init__( + worker, + worker_names, + name=name, + payload=payload, + timeout=timeout, + cancel_on_error=cancel_on_error, + ) + self._ui_worker = worker + self._label = label + self._cancellable = cancellable + + @property + def label(self) -> str | None: + """The group's human-readable label, if any.""" + return self._label + + @property + def cancellable(self) -> bool: + """Whether the client may request cancellation.""" + return self._cancellable + + async def __aenter__(self) -> UserJobGroupContext: + await super().__aenter__() + job_id = self.job_id + self._ui_worker._register_user_job_group( + job_id=job_id, + worker_names=list(self._worker_names), + label=self._label, + cancellable=self._cancellable, + ) + await self._ui_worker.send_bus_message( + BusUITaskGroupStartedMessage( + source=self._ui_worker.name, + target=None, + task_id=job_id, + agents=list(self._worker_names), + label=self._label, + cancellable=self._cancellable, + at=int(time.time() * 1000), + ) + ) + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> bool: + job_id = self._group.job_id if self._group else None + try: + return await super().__aexit__(exc_type, exc_val, exc_tb) + finally: + if job_id: + self._ui_worker._unregister_user_job_group(job_id) + await self._ui_worker.send_bus_message( + BusUITaskGroupCompletedMessage( + source=self._ui_worker.name, + target=None, + task_id=job_id, + at=int(time.time() * 1000), + ) + ) diff --git a/src/pipecat/workers/ui/ui_prompts.py b/src/pipecat/workers/ui/ui_prompts.py new file mode 100644 index 000000000..961c3ed05 --- /dev/null +++ b/src/pipecat/workers/ui/ui_prompts.py @@ -0,0 +1,64 @@ +# +# Copyright (c) 2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Canonical prompt fragments describing the UI worker wire format. + +Apps concatenate these constants into their system prompt so the +LLM understands the ```` and ```` developer +messages the SDK injects on its behalf. + +Example:: + + system_prompt = f''' + You are a voice-driven music player agent. + ...app-specific tool and behavior instructions... + + {UI_STATE_PROMPT_GUIDE} + ''' + +The SDK updates the guide alongside the wire format, so apps get +new tags and semantics automatically on the next release. +""" + +UI_STATE_PROMPT_GUIDE: str = """\ +## UI context + +Your developer context includes two kinds of SDK-managed messages: + +- ``payload``: an event the user just \ +triggered on the client (click, tab switch, navigation, etc.). The \ +payload is JSON for that event. +- ``...``: an accessibility snapshot of the \ +current screen, injected at the start of every task request. \ +Indented tree in Playwright-MCP style. Each line is \ +``- role "name" [state] [ref=eN]`` with children nested one level \ +deeper. + +State tags include ``[focused]``, ``[selected]``, ``[disabled]``, and \ +``[offscreen]``. A node tagged ``[offscreen]`` exists on the page \ +but is not currently in the user's viewport; only visible \ +(non-offscreen) nodes count for position-based references. + +Grids carry a ``[cols=N]`` tag. Their cells are listed in reading \ +order (left-to-right, top-to-bottom); with N columns, cell K sits \ +at row ``ceil(K/N)``, column ``((K-1) mod N) + 1``. Example with \ +``[cols=8]`` and 16 children: "top right" is cell 8, "bottom left" \ +is cell 9. + +Resolve position references ("top right", "the first one", "the \ +third new release") against the most recent ```` tree. \ +Sibling order matches reading order on screen (top-to-bottom, \ +left-to-right within each region). + +When the user has text selected on the page, the snapshot ends with \ +a ``selected text`` block inside \ +````. Treat the selection as the deictic referent for \ +"this", "that", "what I selected", and similar phrases. The ``ref`` \ +identifies the closest enclosing element that has a ref in the tree; \ +the inner text is the actual selected content (truncated if very \ +long). Text inside ```` or ``