PipelineTask: use PipelineSource/PipelineSink and remove tasks

This commit is contained in:
Aleix Conchillo Flaqué
2025-08-20 07:34:10 -07:00
parent 12bcb7db64
commit 6831e63ec9
2 changed files with 49 additions and 138 deletions

View File

@@ -54,6 +54,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Performance
- Improve `PipelineTask` performance by using direct mode processors and by
removing unnecessary tasks.
- Improve `ParallelPipeline` performance by using direct mode, by not
creating a task for each frame and every sub-pipeline and also by removing
other unnecessary tasks.

View File

@@ -42,7 +42,7 @@ from pipecat.metrics.metrics import ProcessingMetricsData, TTFBMetricsData
from pipecat.observers.base_observer import BaseObserver
from pipecat.observers.turn_tracking_observer import TurnTrackingObserver
from pipecat.pipeline.base_task import BasePipelineTask, PipelineTaskParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.pipeline import Pipeline, PipelineSink, PipelineSource
from pipecat.pipeline.task_observer import TaskObserver
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
from pipecat.utils.asyncio.task_manager import (
@@ -101,70 +101,6 @@ class PipelineParams(BaseModel):
start_metadata: Dict[str, Any] = Field(default_factory=dict)
class PipelineTaskSource(FrameProcessor):
"""Source processor for pipeline tasks that handles frame routing.
This is the source processor that is linked at the beginning of the
pipeline given to the pipeline task. It allows us to easily push frames
downstream to the pipeline and also receive upstream frames coming from the
pipeline.
"""
def __init__(self, up_queue: asyncio.Queue):
"""Initialize the pipeline task source.
Args:
up_queue: Queue for upstream frame processing.
**kwargs: Additional arguments passed to the parent class.
"""
super().__init__(enable_direct_mode=True)
self._up_queue = up_queue
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames and route them based on direction.
Args:
frame: The frame to process.
direction: The direction of frame flow.
"""
await super().process_frame(frame, direction)
match direction:
case FrameDirection.UPSTREAM:
await self._up_queue.put(frame)
case FrameDirection.DOWNSTREAM:
await self.push_frame(frame, direction)
class PipelineTaskSink(FrameProcessor):
"""Sink processor for pipeline tasks that handles final frame processing.
This is the sink processor that is linked at the end of the pipeline
given to the pipeline task. It allows us to receive downstream frames and
act on them, for example, waiting to receive an EndFrame.
"""
def __init__(self, down_queue: asyncio.Queue):
"""Initialize the pipeline task sink.
Args:
down_queue: Queue for downstream frame processing.
**kwargs: Additional arguments passed to the parent class.
"""
super().__init__(enable_direct_mode=True)
self._down_queue = down_queue
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames and route them to the downstream queue.
Args:
frame: The frame to process.
direction: The direction of frame flow.
"""
await super().process_frame(frame, direction)
await self._down_queue.put(frame)
class PipelineTask(BasePipelineTask):
"""Manages the execution of a pipeline, handling frame processing and task lifecycle.
@@ -287,12 +223,6 @@ class PipelineTask(BasePipelineTask):
# PipelineTask and its frame processors.
self._task_manager = task_manager or TaskManager()
# This queue receives frames coming from the pipeline upstream.
self._up_queue = WatchdogQueue(self._task_manager)
self._process_up_task: Optional[asyncio.Task] = None
# This queue receives frames coming from the pipeline downstream.
self._down_queue = WatchdogQueue(self._task_manager)
self._process_down_task: Optional[asyncio.Task] = None
# This queue is the queue used to push frames to the pipeline.
self._push_queue = WatchdogQueue(self._task_manager)
self._process_push_task: Optional[asyncio.Task] = None
@@ -314,9 +244,9 @@ class PipelineTask(BasePipelineTask):
# followed by the user pipeline, and ending with a sink processor. The
# source allows us to receive and react to upstream frames, and the sink
# allows us to receive and react to downstream frames.
source = PipelineTaskSource(self._up_queue)
sink = PipelineTaskSink(self._down_queue)
self._pipeline = Pipeline([source, pipeline, sink])
source = PipelineSource(self._source_push_frame, name=f"{self}::Source")
sink = PipelineSink(self._sink_push_frame, name=f"{self}::Sink")
self._pipeline = Pipeline([pipeline], source=source, sink=sink)
# The task observer acts as a proxy to the provided observers. This way,
# we only need to pass a single observer (using the StartFrame) which
@@ -501,12 +431,6 @@ class PipelineTask(BasePipelineTask):
async def _create_tasks(self):
"""Create and start all pipeline processing tasks."""
self._process_up_task = self._task_manager.create_task(
self._process_up_queue(), f"{self}::_process_up_queue"
)
self._process_down_task = self._task_manager.create_task(
self._process_down_queue(), f"{self}::_process_down_queue"
)
self._process_push_task = self._task_manager.create_task(
self._process_push_queue(), f"{self}::_process_push_queue"
)
@@ -540,14 +464,6 @@ class PipelineTask(BasePipelineTask):
await self._task_manager.cancel_task(self._process_push_task)
self._process_push_task = None
if self._process_up_task:
await self._task_manager.cancel_task(self._process_up_task)
self._process_up_task = None
if self._process_down_task:
await self._task_manager.cancel_task(self._process_down_task)
self._process_down_task = None
await self._maybe_cancel_heartbeat_tasks()
await self._maybe_cancel_idle_task()
@@ -655,7 +571,7 @@ class PipelineTask(BasePipelineTask):
self._push_queue.task_done()
await self._cleanup(cleanup_pipeline)
async def _process_up_queue(self):
async def _source_push_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames coming upstream from the pipeline.
This is the task that processes frames coming upstream from the
@@ -663,33 +579,29 @@ class PipelineTask(BasePipelineTask):
pipeline to be stopped (e.g. EndTaskFrame) in which case we would send
an EndFrame down the pipeline.
"""
while True:
frame = await self._up_queue.get()
if isinstance(frame, self._reached_upstream_types):
await self._call_event_handler("on_frame_reached_upstream", frame)
if isinstance(frame, self._reached_upstream_types):
await self._call_event_handler("on_frame_reached_upstream", frame)
if isinstance(frame, EndTaskFrame):
# Tell the task we should end nicely.
await self.queue_frame(EndFrame())
elif isinstance(frame, CancelTaskFrame):
# Tell the task we should end right away.
if isinstance(frame, EndTaskFrame):
# Tell the task we should end nicely.
await self.queue_frame(EndFrame())
elif isinstance(frame, CancelTaskFrame):
# Tell the task we should end right away.
await self.queue_frame(CancelFrame())
elif isinstance(frame, StopTaskFrame):
# Tell the task we should stop nicely.
await self.queue_frame(StopFrame())
elif isinstance(frame, ErrorFrame):
if frame.fatal:
logger.error(f"A fatal error occurred: {frame}")
# Cancel all tasks downstream.
await self.queue_frame(CancelFrame())
elif isinstance(frame, StopTaskFrame):
# Tell the task we should stop nicely.
await self.queue_frame(StopFrame())
elif isinstance(frame, ErrorFrame):
if frame.fatal:
logger.error(f"A fatal error occurred: {frame}")
# Cancel all tasks downstream.
await self.queue_frame(CancelFrame())
# Tell the task we should stop.
await self.queue_frame(StopTaskFrame())
else:
logger.warning(f"Something went wrong: {frame}")
self._up_queue.task_done()
# Tell the task we should stop.
await self.queue_frame(StopTaskFrame())
else:
logger.warning(f"Something went wrong: {frame}")
async def _process_down_queue(self):
async def _sink_push_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames coming downstream from the pipeline.
This tasks process frames coming downstream from the pipeline. For
@@ -697,34 +609,30 @@ class PipelineTask(BasePipelineTask):
processors have handled the EndFrame and therefore we can exit the task
cleanly.
"""
while True:
frame = await self._down_queue.get()
# Queue received frame to the idle queue so we can monitor idle
# pipelines.
await self._idle_queue.put(frame)
# Queue received frame to the idle queue so we can monitor idle
# pipelines.
await self._idle_queue.put(frame)
if isinstance(frame, self._reached_downstream_types):
await self._call_event_handler("on_frame_reached_downstream", frame)
if isinstance(frame, self._reached_downstream_types):
await self._call_event_handler("on_frame_reached_downstream", frame)
if isinstance(frame, StartFrame):
await self._call_event_handler("on_pipeline_started", frame)
if isinstance(frame, StartFrame):
await self._call_event_handler("on_pipeline_started", frame)
# Start heartbeat tasks now that StartFrame has been processed
# by all processors in the pipeline
self._maybe_start_heartbeat_tasks()
elif isinstance(frame, EndFrame):
await self._call_event_handler("on_pipeline_ended", frame)
self._pipeline_end_event.set()
elif isinstance(frame, StopFrame):
await self._call_event_handler("on_pipeline_stopped", frame)
self._pipeline_end_event.set()
elif isinstance(frame, CancelFrame):
await self._call_event_handler("on_pipeline_cancelled", frame)
self._pipeline_end_event.set()
elif isinstance(frame, HeartbeatFrame):
await self._heartbeat_queue.put(frame)
self._down_queue.task_done()
# Start heartbeat tasks now that StartFrame has been processed
# by all processors in the pipeline
self._maybe_start_heartbeat_tasks()
elif isinstance(frame, EndFrame):
await self._call_event_handler("on_pipeline_ended", frame)
self._pipeline_end_event.set()
elif isinstance(frame, StopFrame):
await self._call_event_handler("on_pipeline_stopped", frame)
self._pipeline_end_event.set()
elif isinstance(frame, CancelFrame):
await self._call_event_handler("on_pipeline_cancelled", frame)
self._pipeline_end_event.set()
elif isinstance(frame, HeartbeatFrame):
await self._heartbeat_queue.put(frame)
async def _heartbeat_push_handler(self):
"""Push heartbeat frames at regular intervals."""