diff --git a/CHANGELOG.md b/CHANGELOG.md index 50e17df0f..820b45cc2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Refactored pipeline architecture by introducing a new `PipelineNode` + abstraction. Frame processors are now standalone async iterators, and + `PipelineNode` is responsible for routing frames upstream or downstream. This + decouples frame processors from direct linking, simplifies processor reuse, + and provides a clearer separation between processing logic and pipeline + wiring. This is an internal, transparent improvement and does not require any + changes to existing frame processor code. + - `EndFrame` and `EndTaskFrame` have an optional `reason` field to indicate why the pipeline is being ended. @@ -26,8 +34,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- Added supprt for Sarvam Speech-to-Text service (`SarvamSTTService`) with streaming WebSocket - support for `saarika` (STT) and `saaras` (STT-translate) models. +- Added supprt for Sarvam Speech-to-Text service (`SarvamSTTService`) with + streaming WebSocket support for `saarika` (STT) and `saaras` (STT-translate) + models. - Added a new `DeepgramHttpTTSService`, which delivers a meaningful reduction in latency when compared to the `DeepgramTTSService`. diff --git a/src/pipecat/pipeline/pipeline.py b/src/pipecat/pipeline/pipeline.py index 48cd4edcf..7281a66e1 100644 --- a/src/pipecat/pipeline/pipeline.py +++ b/src/pipecat/pipeline/pipeline.py @@ -15,6 +15,7 @@ from typing import Callable, Coroutine, List, Optional from pipecat.frames.frames import Frame from pipecat.pipeline.base_pipeline import BasePipeline +from pipecat.pipeline.pipeline_node import PipelineNode from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup @@ -117,8 +118,7 @@ class Pipeline(BasePipeline): self._source = source or PipelineSource(self.push_frame, name=f"{self}::Source") self._sink = sink or PipelineSink(self.push_frame, name=f"{self}::Sink") self._processors: List[FrameProcessor] = [self._source] + processors + [self._sink] - - self._link_processors() + self._nodes = self._link_processors() # # Frame processor @@ -196,17 +196,22 @@ class Pipeline(BasePipeline): async def _setup_processors(self, setup: FrameProcessorSetup): """Set up all processors in the pipeline.""" - for p in self._processors: - await p.setup(setup) + for n in self._nodes: + await n.setup(setup) async def _cleanup_processors(self): """Clean up all processors in the pipeline.""" - for p in self._processors: - await p.cleanup() + for n in self._nodes: + await n.cleanup() - def _link_processors(self): - """Link all processors in sequence and set their parent.""" - prev = self._processors[0] + def _link_processors(self) -> List[PipelineNode]: + """Link all processors in sequence.""" + nodes = [] + prev_node = PipelineNode(self._processors[0]) + nodes.append(prev_node) for curr in self._processors[1:]: - prev.link(curr) - prev = curr + curr_node = PipelineNode(curr) + nodes.append(curr_node) + prev_node.link(curr_node) + prev_node = curr_node + return nodes diff --git a/src/pipecat/pipeline/pipeline_node.py b/src/pipecat/pipeline/pipeline_node.py new file mode 100644 index 000000000..37b2a3184 --- /dev/null +++ b/src/pipecat/pipeline/pipeline_node.py @@ -0,0 +1,140 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""This module defines pipeline nodes. + +A pipeline node (`PipelineNode`) wraps a frame processor (`FrameProcessor`) and +can link to previous and next nodes in the pipeline. Pipeline nodes allow +linking frame processors together with the benefit that stateless frame +processors can be re-used in different pipelines, since what is linked is the +actual pipeline node, not the frame processor itself. + +""" + +import asyncio +from typing import Optional + +from loguru import logger + +from pipecat.observers.base_observer import FramePushed +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup +from pipecat.utils.base_object import BaseObject + + +class PipelineNode(BaseObject): + """A node in a pipeline that hosts a frame processor. + + A `PipelineNode` wraps a single `FrameProcessor` and is responsible for + connecting it to previous and next nodes in a pipeline. It pushes frames + emitted by its processor to the appropriate neighbor based on frame + direction (UPSTREAM or DOWNSTREAM). + """ + + def __init__(self, processor: FrameProcessor): + """Initialize the pipeline node with a given FrameProcessor. + + Args: + processor: The FrameProcessor instance that this node will host. + """ + super().__init__() + self._processor = processor + + self._prev: Optional["PipelineNode"] = None + self._next: Optional["PipelineNode"] = None + + self.__push_task: Optional[asyncio.Task] = None + + @property + def processor(self) -> FrameProcessor: + """Returns the frame processor of this pipeline node.""" + return self._processor + + @property + def next(self) -> Optional["PipelineNode"]: + """Get the next pipeline node. + + Returns: + The next node, or None if there's no next node. + """ + return self._next + + @property + def previous(self) -> Optional["PipelineNode"]: + """Get the previous pipeline node. + + Returns: + The previous node, or None if there's no previous node. + """ + return self._prev + + async def setup(self, setup: FrameProcessorSetup): + """Set up this pipeline node. + + This sets up the wrapped frame processor with required components. + + Args: + setup: Configuration object containing setup parameters. + """ + await self.processor.setup(setup) + self._clock = setup.clock + self._task_manager = setup.task_manager + self._observer = setup.observer + + self.__create_push_task() + + async def cleanup(self): + """Clean up this pipeline node.""" + await super().cleanup() + await self.processor.cleanup() + if self.__push_task: + await self.__push_task + self.__push_task = None + + def link(self, node: "PipelineNode"): + """Link this node to the next node in the pipeline. + + Args: + node: The node to link to. + """ + self._next = node + node._prev = self + logger.debug(f"Linking {self.processor} -> {node.processor}") + + def __create_push_task(self): + """Create the frame push task.""" + if not self.__push_task: + self.__push_task = self._task_manager.create_task( + self.__push_task_handler(), f"{self.processor}::_push_task" + ) + + async def __push_task_handler(self): + """Push task handler. + + Receive frames from the wrapped frame processor and push them to the + next or previous node depending on the direction. + """ + async for frame, direction in self.processor: + destination = None + if direction == FrameDirection.DOWNSTREAM and self.next: + logger.trace(f"Pushing {frame} from {self.processor} to {self.next.processor}") + destination = self.next.processor + elif direction == FrameDirection.UPSTREAM and self.previous: + logger.trace(f"Pushing {frame} upstream from {self} to {self._prev}") + destination = self.previous.processor + + if destination: + await destination.queue_frame(frame, direction) + + if self._observer and destination: + timestamp = self._clock.get_time() if self._clock else 0 + data = FramePushed( + source=self.processor, + destination=destination, + frame=frame, + direction=direction, + timestamp=timestamp, + ) + await self._observer.on_push_frame(data) diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 1ca3333b5..78bf0d1f5 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -132,14 +132,17 @@ INPUT_TASK_CANCEL_TIMEOUT_SECS = 3 class FrameProcessor(BaseObject): - """Base class for all frame processors in the pipeline. + """Base class for all frame processors in Pipecat. - Frame processors are the building blocks of Pipecat pipelines, they can be - linked to form complex processing pipelines. They receive frames, process - them, and pass them to the next or previous processor in the chain. Each - frame processor guarantees frame ordering and processes frames in its own - task. System frames are also processed in a separate task which guarantees - frame priority. + A FrameProcessor is an independent, asynchronous component that consumes + input frames and produces zero or more output frames. Frames are delivered + to the processor via the `queue_frame(frame, direction)` method. The + processor internally manages queues and background tasks to handle incoming + frames and generate output frames. + + Output frames are made available through the processor's asynchronous + iterator interface, allowing consumers to iterate over processed frames + using `async for frame in processor`. Frame ordering is guaranteed. Event handlers available: @@ -147,6 +150,7 @@ class FrameProcessor(BaseObject): - on_after_process_frame: Called after a frame is processed - on_before_push_frame: Called before a frame is pushed - on_after_push_frame: Called after a frame is pushed + """ def __init__( @@ -166,8 +170,6 @@ class FrameProcessor(BaseObject): **kwargs: Additional arguments passed to parent class. """ super().__init__(name=name, **kwargs) - self._prev: Optional["FrameProcessor"] = None - self._next: Optional["FrameProcessor"] = None # Enable direct mode to skip queues and process frames right away. self._enable_direct_mode = enable_direct_mode @@ -234,6 +236,9 @@ class FrameProcessor(BaseObject): self._wait_for_interruption = False self._wait_interruption_event = asyncio.Event() + # Push queue + self.__push_queue = asyncio.Queue() + # Frame processor events. self._register_event_handler("on_before_process_frame", sync=True) self._register_event_handler("on_after_process_frame", sync=True) @@ -284,24 +289,6 @@ class FrameProcessor(BaseObject): """ return [] - @property - def next(self) -> Optional["FrameProcessor"]: - """Get the next processor. - - Returns: - The next processor, or None if there's no next processor. - """ - return self._next - - @property - def previous(self) -> Optional["FrameProcessor"]: - """Get the previous processor. - - Returns: - The previous processor, or None if there's no previous processor. - """ - return self._prev - @property def interruptions_allowed(self): """Check if interruptions are allowed for this processor. @@ -518,16 +505,7 @@ class FrameProcessor(BaseObject): await self.__cancel_process_task() if self._metrics is not None: await self._metrics.cleanup() - - def link(self, processor: "FrameProcessor"): - """Link this processor to the next processor in the pipeline. - - Args: - processor: The processor to link to. - """ - self._next = processor - processor._prev = self - logger.debug(f"Linking {self} -> {self._next}") + await self.__push_queue.put(None) def get_clock(self) -> BaseClock: """Get the clock used by this processor. @@ -761,36 +739,7 @@ class FrameProcessor(BaseObject): frame: The frame to push. direction: The direction to push the frame. """ - try: - timestamp = self._clock.get_time() if self._clock else 0 - if direction == FrameDirection.DOWNSTREAM and self._next: - logger.trace(f"Pushing {frame} from {self} to {self._next}") - - if self._observer: - data = FramePushed( - source=self, - destination=self._next, - frame=frame, - direction=direction, - timestamp=timestamp, - ) - await self._observer.on_push_frame(data) - await self._next.queue_frame(frame, direction) - elif direction == FrameDirection.UPSTREAM and self._prev: - logger.trace(f"Pushing {frame} upstream from {self} to {self._prev}") - if self._observer: - data = FramePushed( - source=self, - destination=self._prev, - frame=frame, - direction=direction, - timestamp=timestamp, - ) - await self._observer.on_push_frame(data) - await self._prev.queue_frame(frame, direction) - except Exception as e: - logger.exception(f"Uncaught exception in {self}: {e}") - await self.push_error(ErrorFrame(str(e))) + await self.__push_queue.put((frame, direction)) def _check_started(self, frame: Frame): """Check if the processor has been started. @@ -912,3 +861,18 @@ class FrameProcessor(BaseObject): await self.__process_frame(frame, direction, callback) self.__process_queue.task_done() + + def __aiter__(self): + """A frame processor is an asynchronous iterator itself.""" + return self + + async def __anext__(self): + """Retrieve the next frame to push from this processor. + + Returns: + The next (frame, direction) item to push form this processor. + """ + data = await self.__push_queue.get() + if data is None: + raise StopAsyncIteration + return data