From 6831e63ec9c8a07552e0d850122da1c88122cf1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 20 Aug 2025 07:34:10 -0700 Subject: [PATCH] PipelineTask: use PipelineSource/PipelineSink and remove tasks --- CHANGELOG.md | 3 + src/pipecat/pipeline/task.py | 184 +++++++++-------------------------- 2 files changed, 49 insertions(+), 138 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b472a897a..6f962d33b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -54,6 +54,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Performance +- Improve `PipelineTask` performance by using direct mode processors and by + removing unnecessary tasks. + - Improve `ParallelPipeline` performance by using direct mode, by not creating a task for each frame and every sub-pipeline and also by removing other unnecessary tasks. diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index fc03cc9c9..7ff924531 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -42,7 +42,7 @@ from pipecat.metrics.metrics import ProcessingMetricsData, TTFBMetricsData from pipecat.observers.base_observer import BaseObserver from pipecat.observers.turn_tracking_observer import TurnTrackingObserver from pipecat.pipeline.base_task import BasePipelineTask, PipelineTaskParams -from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.pipeline import Pipeline, PipelineSink, PipelineSource from pipecat.pipeline.task_observer import TaskObserver from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup from pipecat.utils.asyncio.task_manager import ( @@ -101,70 +101,6 @@ class PipelineParams(BaseModel): start_metadata: Dict[str, Any] = Field(default_factory=dict) -class PipelineTaskSource(FrameProcessor): - """Source processor for pipeline tasks that handles frame routing. - - This is the source processor that is linked at the beginning of the - pipeline given to the pipeline task. It allows us to easily push frames - downstream to the pipeline and also receive upstream frames coming from the - pipeline. - """ - - def __init__(self, up_queue: asyncio.Queue): - """Initialize the pipeline task source. - - Args: - up_queue: Queue for upstream frame processing. - **kwargs: Additional arguments passed to the parent class. - """ - super().__init__(enable_direct_mode=True) - self._up_queue = up_queue - - async def process_frame(self, frame: Frame, direction: FrameDirection): - """Process frames and route them based on direction. - - Args: - frame: The frame to process. - direction: The direction of frame flow. - """ - await super().process_frame(frame, direction) - - match direction: - case FrameDirection.UPSTREAM: - await self._up_queue.put(frame) - case FrameDirection.DOWNSTREAM: - await self.push_frame(frame, direction) - - -class PipelineTaskSink(FrameProcessor): - """Sink processor for pipeline tasks that handles final frame processing. - - This is the sink processor that is linked at the end of the pipeline - given to the pipeline task. It allows us to receive downstream frames and - act on them, for example, waiting to receive an EndFrame. - """ - - def __init__(self, down_queue: asyncio.Queue): - """Initialize the pipeline task sink. - - Args: - down_queue: Queue for downstream frame processing. - **kwargs: Additional arguments passed to the parent class. - """ - super().__init__(enable_direct_mode=True) - self._down_queue = down_queue - - async def process_frame(self, frame: Frame, direction: FrameDirection): - """Process frames and route them to the downstream queue. - - Args: - frame: The frame to process. - direction: The direction of frame flow. - """ - await super().process_frame(frame, direction) - await self._down_queue.put(frame) - - class PipelineTask(BasePipelineTask): """Manages the execution of a pipeline, handling frame processing and task lifecycle. @@ -287,12 +223,6 @@ class PipelineTask(BasePipelineTask): # PipelineTask and its frame processors. self._task_manager = task_manager or TaskManager() - # This queue receives frames coming from the pipeline upstream. - self._up_queue = WatchdogQueue(self._task_manager) - self._process_up_task: Optional[asyncio.Task] = None - # This queue receives frames coming from the pipeline downstream. - self._down_queue = WatchdogQueue(self._task_manager) - self._process_down_task: Optional[asyncio.Task] = None # This queue is the queue used to push frames to the pipeline. self._push_queue = WatchdogQueue(self._task_manager) self._process_push_task: Optional[asyncio.Task] = None @@ -314,9 +244,9 @@ class PipelineTask(BasePipelineTask): # followed by the user pipeline, and ending with a sink processor. The # source allows us to receive and react to upstream frames, and the sink # allows us to receive and react to downstream frames. - source = PipelineTaskSource(self._up_queue) - sink = PipelineTaskSink(self._down_queue) - self._pipeline = Pipeline([source, pipeline, sink]) + source = PipelineSource(self._source_push_frame, name=f"{self}::Source") + sink = PipelineSink(self._sink_push_frame, name=f"{self}::Sink") + self._pipeline = Pipeline([pipeline], source=source, sink=sink) # The task observer acts as a proxy to the provided observers. This way, # we only need to pass a single observer (using the StartFrame) which @@ -501,12 +431,6 @@ class PipelineTask(BasePipelineTask): async def _create_tasks(self): """Create and start all pipeline processing tasks.""" - self._process_up_task = self._task_manager.create_task( - self._process_up_queue(), f"{self}::_process_up_queue" - ) - self._process_down_task = self._task_manager.create_task( - self._process_down_queue(), f"{self}::_process_down_queue" - ) self._process_push_task = self._task_manager.create_task( self._process_push_queue(), f"{self}::_process_push_queue" ) @@ -540,14 +464,6 @@ class PipelineTask(BasePipelineTask): await self._task_manager.cancel_task(self._process_push_task) self._process_push_task = None - if self._process_up_task: - await self._task_manager.cancel_task(self._process_up_task) - self._process_up_task = None - - if self._process_down_task: - await self._task_manager.cancel_task(self._process_down_task) - self._process_down_task = None - await self._maybe_cancel_heartbeat_tasks() await self._maybe_cancel_idle_task() @@ -655,7 +571,7 @@ class PipelineTask(BasePipelineTask): self._push_queue.task_done() await self._cleanup(cleanup_pipeline) - async def _process_up_queue(self): + async def _source_push_frame(self, frame: Frame, direction: FrameDirection): """Process frames coming upstream from the pipeline. This is the task that processes frames coming upstream from the @@ -663,33 +579,29 @@ class PipelineTask(BasePipelineTask): pipeline to be stopped (e.g. EndTaskFrame) in which case we would send an EndFrame down the pipeline. """ - while True: - frame = await self._up_queue.get() + if isinstance(frame, self._reached_upstream_types): + await self._call_event_handler("on_frame_reached_upstream", frame) - if isinstance(frame, self._reached_upstream_types): - await self._call_event_handler("on_frame_reached_upstream", frame) - - if isinstance(frame, EndTaskFrame): - # Tell the task we should end nicely. - await self.queue_frame(EndFrame()) - elif isinstance(frame, CancelTaskFrame): - # Tell the task we should end right away. + if isinstance(frame, EndTaskFrame): + # Tell the task we should end nicely. + await self.queue_frame(EndFrame()) + elif isinstance(frame, CancelTaskFrame): + # Tell the task we should end right away. + await self.queue_frame(CancelFrame()) + elif isinstance(frame, StopTaskFrame): + # Tell the task we should stop nicely. + await self.queue_frame(StopFrame()) + elif isinstance(frame, ErrorFrame): + if frame.fatal: + logger.error(f"A fatal error occurred: {frame}") + # Cancel all tasks downstream. await self.queue_frame(CancelFrame()) - elif isinstance(frame, StopTaskFrame): - # Tell the task we should stop nicely. - await self.queue_frame(StopFrame()) - elif isinstance(frame, ErrorFrame): - if frame.fatal: - logger.error(f"A fatal error occurred: {frame}") - # Cancel all tasks downstream. - await self.queue_frame(CancelFrame()) - # Tell the task we should stop. - await self.queue_frame(StopTaskFrame()) - else: - logger.warning(f"Something went wrong: {frame}") - self._up_queue.task_done() + # Tell the task we should stop. + await self.queue_frame(StopTaskFrame()) + else: + logger.warning(f"Something went wrong: {frame}") - async def _process_down_queue(self): + async def _sink_push_frame(self, frame: Frame, direction: FrameDirection): """Process frames coming downstream from the pipeline. This tasks process frames coming downstream from the pipeline. For @@ -697,34 +609,30 @@ class PipelineTask(BasePipelineTask): processors have handled the EndFrame and therefore we can exit the task cleanly. """ - while True: - frame = await self._down_queue.get() + # Queue received frame to the idle queue so we can monitor idle + # pipelines. + await self._idle_queue.put(frame) - # Queue received frame to the idle queue so we can monitor idle - # pipelines. - await self._idle_queue.put(frame) + if isinstance(frame, self._reached_downstream_types): + await self._call_event_handler("on_frame_reached_downstream", frame) - if isinstance(frame, self._reached_downstream_types): - await self._call_event_handler("on_frame_reached_downstream", frame) + if isinstance(frame, StartFrame): + await self._call_event_handler("on_pipeline_started", frame) - if isinstance(frame, StartFrame): - await self._call_event_handler("on_pipeline_started", frame) - - # Start heartbeat tasks now that StartFrame has been processed - # by all processors in the pipeline - self._maybe_start_heartbeat_tasks() - elif isinstance(frame, EndFrame): - await self._call_event_handler("on_pipeline_ended", frame) - self._pipeline_end_event.set() - elif isinstance(frame, StopFrame): - await self._call_event_handler("on_pipeline_stopped", frame) - self._pipeline_end_event.set() - elif isinstance(frame, CancelFrame): - await self._call_event_handler("on_pipeline_cancelled", frame) - self._pipeline_end_event.set() - elif isinstance(frame, HeartbeatFrame): - await self._heartbeat_queue.put(frame) - self._down_queue.task_done() + # Start heartbeat tasks now that StartFrame has been processed + # by all processors in the pipeline + self._maybe_start_heartbeat_tasks() + elif isinstance(frame, EndFrame): + await self._call_event_handler("on_pipeline_ended", frame) + self._pipeline_end_event.set() + elif isinstance(frame, StopFrame): + await self._call_event_handler("on_pipeline_stopped", frame) + self._pipeline_end_event.set() + elif isinstance(frame, CancelFrame): + await self._call_event_handler("on_pipeline_cancelled", frame) + self._pipeline_end_event.set() + elif isinstance(frame, HeartbeatFrame): + await self._heartbeat_queue.put(frame) async def _heartbeat_push_handler(self): """Push heartbeat frames at regular intervals."""