Make PipelineTask inherit BaseTask and support bridged pipelines

`PipelineTask` now extends `BaseTask` so every pipeline task is
also a bus participant. Adds optional `bus`, `bridged`, and
`exclude_frames` parameters: when `bridged` is set, the user's
pipeline is wrapped with `_BusEdgeProcessor` source/sink edges so
frames are mirrored onto the bus. Bridges pipeline lifecycle
events to `start()`/`stop()`, overrides `_handle_task_end` /
`_handle_task_cancel` to drive the pipeline shutdown, subscribes
to the bus in setup, and exposes the `bridged` property to the
registry. Moves `PipelineTaskParams` here and updates the
matching test import.
This commit is contained in:
Aleix Conchillo Flaqué
2026-05-13 19:14:56 -07:00
parent 6a738bd3a0
commit b5c757ab85
2 changed files with 90 additions and 7 deletions

View File

@@ -16,12 +16,15 @@ import importlib.util
import os
import warnings
from collections.abc import AsyncIterable, Iterable
from dataclasses import dataclass
from pathlib import Path
from typing import Any, TypeVar
from loguru import logger
from pydantic import BaseModel, ConfigDict, Field
from pipecat.bus import BusCancelTaskMessage, BusEndTaskMessage, TaskBus
from pipecat.bus.bridge_processor import _BusEdgeProcessor
from pipecat.clocks.base_clock import BaseClock
from pipecat.clocks.system_clock import SystemClock
from pipecat.frames.frames import (
@@ -46,7 +49,7 @@ from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.observers.turn_tracking_observer import TurnTrackingObserver
from pipecat.observers.user_bot_latency_observer import UserBotLatencyObserver
from pipecat.pipeline.base_pipeline import BasePipeline
from pipecat.pipeline.base_task import BasePipelineTask, PipelineTaskParams
from pipecat.pipeline.base_task import BaseTask
from pipecat.pipeline.pipeline import Pipeline, PipelineSink, PipelineSource
from pipecat.pipeline.task_observer import TaskObserver
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
@@ -67,6 +70,17 @@ CANCEL_TIMEOUT_SECS = 20.0
T = TypeVar("T")
@dataclass
class PipelineTaskParams:
"""Configuration parameters for pipeline task execution.
Parameters:
loop: The asyncio event loop to use for task execution.
"""
loop: asyncio.AbstractEventLoop
class IdleFrameObserver(BaseObserver):
"""Idle timeout observer.
@@ -139,7 +153,7 @@ class PipelineParams(BaseModel):
start_metadata: dict[str, Any] = Field(default_factory=dict)
class PipelineTask(BasePipelineTask):
class PipelineTask(BaseTask):
"""Manages the execution of a pipeline, handling frame processing and task lifecycle.
This class orchestrates pipeline execution with comprehensive monitoring,
@@ -192,9 +206,10 @@ class PipelineTask(BasePipelineTask):
self,
pipeline: BasePipeline,
*,
params: PipelineParams | None = None,
additional_span_attributes: dict | None = None,
app_resources: Any = None,
bus: TaskBus | None = None,
bridged: tuple[str, ...] | None = None,
cancel_on_idle_timeout: bool = True,
cancel_timeout_secs: float = CANCEL_TIMEOUT_SECS,
check_dangling_tasks: bool = True,
@@ -203,9 +218,12 @@ class PipelineTask(BasePipelineTask):
enable_tracing: bool = False,
enable_turn_tracking: bool = True,
enable_rtvi: bool = True,
exclude_frames: tuple[type[Frame], ...] | None = None,
idle_timeout_frames: tuple[type[Frame], ...] = (BotSpeakingFrame, UserSpeakingFrame),
idle_timeout_secs: float | None = IDLE_TIMEOUT_SECS,
name: str | None = None,
observers: list[BaseObserver] | None = None,
params: PipelineParams | None = None,
rtvi_processor: RTVIProcessor | None = None,
rtvi_observer_params: RTVIObserverParams | None = None,
task_manager: BaseTaskManager | None = None,
@@ -215,7 +233,6 @@ class PipelineTask(BasePipelineTask):
Args:
pipeline: The pipeline to execute.
params: Configuration parameters for the pipeline.
additional_span_attributes: Optional dictionary of attributes to propagate as
OpenTelemetry conversation span attributes.
app_resources: Optional application-defined bag of anything your
@@ -226,6 +243,14 @@ class PipelineTask(BasePipelineTask):
``FunctionCallParams.app_resources``. The framework never
copies or clears this object; the caller retains their handle
and can read any mutations after the task finishes.
bus: Optional :class:`~pipecat.bus.TaskBus` for inter-task
communication. ``None`` means this task doesn't
participate in inter-task messaging.
bridged: Bridge configuration. ``None`` means the pipeline
is not bridged. An empty tuple ``()`` wraps the pipeline
with bus edge processors that accept frames from all
bridges. A tuple of names like ``("voice",)`` accepts
only frames from those bridges. Requires ``bus``.
cancel_on_idle_timeout: Whether the pipeline task should be cancelled if
the idle timeout is reached.
cancel_timeout_secs: Timeout (in seconds) to wait for cancellation to happen
@@ -236,12 +261,17 @@ class PipelineTask(BasePipelineTask):
enable_rtvi: Whether to automatically add RTVI support to the pipeline.
enable_tracing: Whether to enable tracing.
enable_turn_tracking: Whether to enable turn tracking.
exclude_frames: When ``bridged`` is set, extra frame types
that should not cross the bus (lifecycle frames are
always excluded).
idle_timeout_frames: A tuple with the frames that should trigger an idle
timeout if not received within `idle_timeout_seconds`.
idle_timeout_secs: Timeout (in seconds) to consider pipeline idle or
None. If a pipeline is idle the pipeline task will be cancelled
automatically.
name: Optional task name (used for agent-style addressing on the bus).
observers: List of observers for monitoring pipeline execution.
params: Configuration parameters for the pipeline.
rtvi_observer_params: The RTVI observer parameter to use if RTVI is enabled.
rtvi_processor: The RTVI processor to add if RTVI is enabled.
task_manager: Optional task manager for handling asyncio tasks.
@@ -251,7 +281,10 @@ class PipelineTask(BasePipelineTask):
Use ``app_resources`` instead. ``tool_resources`` will be
removed in a future version.
"""
super().__init__()
super().__init__(name=name, bus=bus)
self._bridged = bridged
if bridged is not None and bus is None:
raise ValueError(f"PipelineTask '{self}': ``bridged`` requires a ``bus`` to be set.")
if tool_resources is not None:
with warnings.catch_warnings():
warnings.simplefilter("always")
@@ -371,6 +404,29 @@ class PipelineTask(BasePipelineTask):
# This event is set when the pipeline truly finishes.
self._pipeline_finished_event = asyncio.Event()
# When bridged, wrap the user pipeline with bus edge processors
# so frames tee onto the bus at the source/sink and incoming bus
# frames are injected back into the local pipeline.
if bridged is not None:
assert bus is not None # validated above
edge_source = _BusEdgeProcessor(
bus=bus,
task=self,
direction=FrameDirection.UPSTREAM,
bridges=bridged,
exclude_frames=exclude_frames,
name=f"{self}::EdgeSource",
)
edge_sink = _BusEdgeProcessor(
bus=bus,
task=self,
direction=FrameDirection.DOWNSTREAM,
bridges=bridged,
exclude_frames=exclude_frames,
name=f"{self}::EdgeSink",
)
pipeline = Pipeline([edge_source, pipeline, edge_sink])
# This is the final pipeline. It is composed of a source processor,
# followed by the user pipeline, and ending with a sink processor. The
# source allows us to receive and react to upstream frames, and the sink
@@ -403,6 +459,16 @@ class PipelineTask(BasePipelineTask):
self._register_event_handler("on_pipeline_finished")
self._register_event_handler("on_pipeline_error")
# Bridge pipeline lifecycle to the BaseTask lifecycle so the bus
# registry sees this task as ready/finished.
@self.event_handler("on_pipeline_started")
async def on_started(_task, _frame):
await self.start()
@self.event_handler("on_pipeline_finished")
async def on_finished(_task, _frame):
await self.stop()
@property
def params(self) -> PipelineParams:
"""Get the pipeline parameters for this task.
@@ -412,6 +478,11 @@ class PipelineTask(BasePipelineTask):
"""
return self._params
@property
def bridged(self) -> bool:
"""Whether this pipeline is bridged onto the bus."""
return self._bridged is not None
@property
def app_resources(self) -> Any:
"""Get the application-defined resources passed to this task.
@@ -753,6 +824,9 @@ class PipelineTask(BasePipelineTask):
"""Set up the pipeline task and all processors."""
await super().setup(self._pipeline_task_manager)
if self._bus is not None:
await self._bus.subscribe(self)
mgr_params = TaskManagerParams(loop=params.loop)
self.task_manager.setup(mgr_params)
@@ -794,6 +868,16 @@ class PipelineTask(BasePipelineTask):
if cleanup_pipeline:
await self._pipeline.cleanup()
async def _handle_task_end(self, message: BusEndTaskMessage) -> None:
"""End the pipeline after propagating end to children."""
await super()._handle_task_end(message)
await self.queue_frame(EndFrame(reason=message.reason))
async def _handle_task_cancel(self, message: BusCancelTaskMessage) -> None:
"""Cancel the pipeline after propagating cancel to children."""
await super()._handle_task_cancel(message)
await self.cancel(reason=message.reason)
async def _process_push_queue(self):
"""Process frames from the push queue and send them through the pipeline.

View File

@@ -23,10 +23,9 @@ from pipecat.frames.frames import (
TextFrame,
)
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.pipeline.base_task import PipelineTaskParams
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.task import PipelineParams, PipelineTask, PipelineTaskParams
from pipecat.processors.filters.frame_filter import FrameFilter
from pipecat.processors.filters.identity_filter import IdentityFilter
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor