Compare commits

...

1 Commits

Author SHA1 Message Date
Aleix Conchillo Flaqué
f55733be21 introduce pipeline nodes
Make FrameProcessors async iterators and decouple them from pipeline. A new
PipelineNode now handles routing frames between processors.
2025-11-03 18:07:09 -08:00
4 changed files with 198 additions and 80 deletions

View File

@@ -9,6 +9,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Refactored pipeline architecture by introducing a new `PipelineNode`
abstraction. Frame processors are now standalone async iterators, and
`PipelineNode` is responsible for routing frames upstream or downstream. This
decouples frame processors from direct linking, simplifies processor reuse,
and provides a clearer separation between processing logic and pipeline
wiring. This is an internal, transparent improvement and does not require any
changes to existing frame processor code.
- `EndFrame` and `EndTaskFrame` have an optional `reason` field to indicate why
the pipeline is being ended.
@@ -26,8 +34,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added supprt for Sarvam Speech-to-Text service (`SarvamSTTService`) with streaming WebSocket
support for `saarika` (STT) and `saaras` (STT-translate) models.
- Added supprt for Sarvam Speech-to-Text service (`SarvamSTTService`) with
streaming WebSocket support for `saarika` (STT) and `saaras` (STT-translate)
models.
- Added a new `DeepgramHttpTTSService`, which delivers a meaningful reduction
in latency when compared to the `DeepgramTTSService`.

View File

@@ -15,6 +15,7 @@ from typing import Callable, Coroutine, List, Optional
from pipecat.frames.frames import Frame
from pipecat.pipeline.base_pipeline import BasePipeline
from pipecat.pipeline.pipeline_node import PipelineNode
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
@@ -117,8 +118,7 @@ class Pipeline(BasePipeline):
self._source = source or PipelineSource(self.push_frame, name=f"{self}::Source")
self._sink = sink or PipelineSink(self.push_frame, name=f"{self}::Sink")
self._processors: List[FrameProcessor] = [self._source] + processors + [self._sink]
self._link_processors()
self._nodes = self._link_processors()
#
# Frame processor
@@ -196,17 +196,22 @@ class Pipeline(BasePipeline):
async def _setup_processors(self, setup: FrameProcessorSetup):
"""Set up all processors in the pipeline."""
for p in self._processors:
await p.setup(setup)
for n in self._nodes:
await n.setup(setup)
async def _cleanup_processors(self):
"""Clean up all processors in the pipeline."""
for p in self._processors:
await p.cleanup()
for n in self._nodes:
await n.cleanup()
def _link_processors(self):
"""Link all processors in sequence and set their parent."""
prev = self._processors[0]
def _link_processors(self) -> List[PipelineNode]:
"""Link all processors in sequence."""
nodes = []
prev_node = PipelineNode(self._processors[0])
nodes.append(prev_node)
for curr in self._processors[1:]:
prev.link(curr)
prev = curr
curr_node = PipelineNode(curr)
nodes.append(curr_node)
prev_node.link(curr_node)
prev_node = curr_node
return nodes

View File

@@ -0,0 +1,140 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""This module defines pipeline nodes.
A pipeline node (`PipelineNode`) wraps a frame processor (`FrameProcessor`) and
can link to previous and next nodes in the pipeline. Pipeline nodes allow
linking frame processors together with the benefit that stateless frame
processors can be re-used in different pipelines, since what is linked is the
actual pipeline node, not the frame processor itself.
"""
import asyncio
from typing import Optional
from loguru import logger
from pipecat.observers.base_observer import FramePushed
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
from pipecat.utils.base_object import BaseObject
class PipelineNode(BaseObject):
"""A node in a pipeline that hosts a frame processor.
A `PipelineNode` wraps a single `FrameProcessor` and is responsible for
connecting it to previous and next nodes in a pipeline. It pushes frames
emitted by its processor to the appropriate neighbor based on frame
direction (UPSTREAM or DOWNSTREAM).
"""
def __init__(self, processor: FrameProcessor):
"""Initialize the pipeline node with a given FrameProcessor.
Args:
processor: The FrameProcessor instance that this node will host.
"""
super().__init__()
self._processor = processor
self._prev: Optional["PipelineNode"] = None
self._next: Optional["PipelineNode"] = None
self.__push_task: Optional[asyncio.Task] = None
@property
def processor(self) -> FrameProcessor:
"""Returns the frame processor of this pipeline node."""
return self._processor
@property
def next(self) -> Optional["PipelineNode"]:
"""Get the next pipeline node.
Returns:
The next node, or None if there's no next node.
"""
return self._next
@property
def previous(self) -> Optional["PipelineNode"]:
"""Get the previous pipeline node.
Returns:
The previous node, or None if there's no previous node.
"""
return self._prev
async def setup(self, setup: FrameProcessorSetup):
"""Set up this pipeline node.
This sets up the wrapped frame processor with required components.
Args:
setup: Configuration object containing setup parameters.
"""
await self.processor.setup(setup)
self._clock = setup.clock
self._task_manager = setup.task_manager
self._observer = setup.observer
self.__create_push_task()
async def cleanup(self):
"""Clean up this pipeline node."""
await super().cleanup()
await self.processor.cleanup()
if self.__push_task:
await self.__push_task
self.__push_task = None
def link(self, node: "PipelineNode"):
"""Link this node to the next node in the pipeline.
Args:
node: The node to link to.
"""
self._next = node
node._prev = self
logger.debug(f"Linking {self.processor} -> {node.processor}")
def __create_push_task(self):
"""Create the frame push task."""
if not self.__push_task:
self.__push_task = self._task_manager.create_task(
self.__push_task_handler(), f"{self.processor}::_push_task"
)
async def __push_task_handler(self):
"""Push task handler.
Receive frames from the wrapped frame processor and push them to the
next or previous node depending on the direction.
"""
async for frame, direction in self.processor:
destination = None
if direction == FrameDirection.DOWNSTREAM and self.next:
logger.trace(f"Pushing {frame} from {self.processor} to {self.next.processor}")
destination = self.next.processor
elif direction == FrameDirection.UPSTREAM and self.previous:
logger.trace(f"Pushing {frame} upstream from {self} to {self._prev}")
destination = self.previous.processor
if destination:
await destination.queue_frame(frame, direction)
if self._observer and destination:
timestamp = self._clock.get_time() if self._clock else 0
data = FramePushed(
source=self.processor,
destination=destination,
frame=frame,
direction=direction,
timestamp=timestamp,
)
await self._observer.on_push_frame(data)

View File

@@ -132,14 +132,17 @@ INPUT_TASK_CANCEL_TIMEOUT_SECS = 3
class FrameProcessor(BaseObject):
"""Base class for all frame processors in the pipeline.
"""Base class for all frame processors in Pipecat.
Frame processors are the building blocks of Pipecat pipelines, they can be
linked to form complex processing pipelines. They receive frames, process
them, and pass them to the next or previous processor in the chain. Each
frame processor guarantees frame ordering and processes frames in its own
task. System frames are also processed in a separate task which guarantees
frame priority.
A FrameProcessor is an independent, asynchronous component that consumes
input frames and produces zero or more output frames. Frames are delivered
to the processor via the `queue_frame(frame, direction)` method. The
processor internally manages queues and background tasks to handle incoming
frames and generate output frames.
Output frames are made available through the processor's asynchronous
iterator interface, allowing consumers to iterate over processed frames
using `async for frame in processor`. Frame ordering is guaranteed.
Event handlers available:
@@ -147,6 +150,7 @@ class FrameProcessor(BaseObject):
- on_after_process_frame: Called after a frame is processed
- on_before_push_frame: Called before a frame is pushed
- on_after_push_frame: Called after a frame is pushed
"""
def __init__(
@@ -166,8 +170,6 @@ class FrameProcessor(BaseObject):
**kwargs: Additional arguments passed to parent class.
"""
super().__init__(name=name, **kwargs)
self._prev: Optional["FrameProcessor"] = None
self._next: Optional["FrameProcessor"] = None
# Enable direct mode to skip queues and process frames right away.
self._enable_direct_mode = enable_direct_mode
@@ -234,6 +236,9 @@ class FrameProcessor(BaseObject):
self._wait_for_interruption = False
self._wait_interruption_event = asyncio.Event()
# Push queue
self.__push_queue = asyncio.Queue()
# Frame processor events.
self._register_event_handler("on_before_process_frame", sync=True)
self._register_event_handler("on_after_process_frame", sync=True)
@@ -284,24 +289,6 @@ class FrameProcessor(BaseObject):
"""
return []
@property
def next(self) -> Optional["FrameProcessor"]:
"""Get the next processor.
Returns:
The next processor, or None if there's no next processor.
"""
return self._next
@property
def previous(self) -> Optional["FrameProcessor"]:
"""Get the previous processor.
Returns:
The previous processor, or None if there's no previous processor.
"""
return self._prev
@property
def interruptions_allowed(self):
"""Check if interruptions are allowed for this processor.
@@ -518,16 +505,7 @@ class FrameProcessor(BaseObject):
await self.__cancel_process_task()
if self._metrics is not None:
await self._metrics.cleanup()
def link(self, processor: "FrameProcessor"):
"""Link this processor to the next processor in the pipeline.
Args:
processor: The processor to link to.
"""
self._next = processor
processor._prev = self
logger.debug(f"Linking {self} -> {self._next}")
await self.__push_queue.put(None)
def get_clock(self) -> BaseClock:
"""Get the clock used by this processor.
@@ -761,36 +739,7 @@ class FrameProcessor(BaseObject):
frame: The frame to push.
direction: The direction to push the frame.
"""
try:
timestamp = self._clock.get_time() if self._clock else 0
if direction == FrameDirection.DOWNSTREAM and self._next:
logger.trace(f"Pushing {frame} from {self} to {self._next}")
if self._observer:
data = FramePushed(
source=self,
destination=self._next,
frame=frame,
direction=direction,
timestamp=timestamp,
)
await self._observer.on_push_frame(data)
await self._next.queue_frame(frame, direction)
elif direction == FrameDirection.UPSTREAM and self._prev:
logger.trace(f"Pushing {frame} upstream from {self} to {self._prev}")
if self._observer:
data = FramePushed(
source=self,
destination=self._prev,
frame=frame,
direction=direction,
timestamp=timestamp,
)
await self._observer.on_push_frame(data)
await self._prev.queue_frame(frame, direction)
except Exception as e:
logger.exception(f"Uncaught exception in {self}: {e}")
await self.push_error(ErrorFrame(str(e)))
await self.__push_queue.put((frame, direction))
def _check_started(self, frame: Frame):
"""Check if the processor has been started.
@@ -912,3 +861,18 @@ class FrameProcessor(BaseObject):
await self.__process_frame(frame, direction, callback)
self.__process_queue.task_done()
def __aiter__(self):
"""A frame processor is an asynchronous iterator itself."""
return self
async def __anext__(self):
"""Retrieve the next frame to push from this processor.
Returns:
The next (frame, direction) item to push form this processor.
"""
data = await self.__push_queue.get()
if data is None:
raise StopAsyncIteration
return data