Compare commits
1 Commits
hush/TurnT
...
aleix/intr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f55733be21 |
13
CHANGELOG.md
13
CHANGELOG.md
@@ -9,6 +9,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Added
|
||||
|
||||
- Refactored pipeline architecture by introducing a new `PipelineNode`
|
||||
abstraction. Frame processors are now standalone async iterators, and
|
||||
`PipelineNode` is responsible for routing frames upstream or downstream. This
|
||||
decouples frame processors from direct linking, simplifies processor reuse,
|
||||
and provides a clearer separation between processing logic and pipeline
|
||||
wiring. This is an internal, transparent improvement and does not require any
|
||||
changes to existing frame processor code.
|
||||
|
||||
- `EndFrame` and `EndTaskFrame` have an optional `reason` field to indicate why
|
||||
the pipeline is being ended.
|
||||
|
||||
@@ -26,8 +34,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Added
|
||||
|
||||
- Added supprt for Sarvam Speech-to-Text service (`SarvamSTTService`) with streaming WebSocket
|
||||
support for `saarika` (STT) and `saaras` (STT-translate) models.
|
||||
- Added supprt for Sarvam Speech-to-Text service (`SarvamSTTService`) with
|
||||
streaming WebSocket support for `saarika` (STT) and `saaras` (STT-translate)
|
||||
models.
|
||||
|
||||
- Added a new `DeepgramHttpTTSService`, which delivers a meaningful reduction
|
||||
in latency when compared to the `DeepgramTTSService`.
|
||||
|
||||
@@ -15,6 +15,7 @@ from typing import Callable, Coroutine, List, Optional
|
||||
|
||||
from pipecat.frames.frames import Frame
|
||||
from pipecat.pipeline.base_pipeline import BasePipeline
|
||||
from pipecat.pipeline.pipeline_node import PipelineNode
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
|
||||
|
||||
|
||||
@@ -117,8 +118,7 @@ class Pipeline(BasePipeline):
|
||||
self._source = source or PipelineSource(self.push_frame, name=f"{self}::Source")
|
||||
self._sink = sink or PipelineSink(self.push_frame, name=f"{self}::Sink")
|
||||
self._processors: List[FrameProcessor] = [self._source] + processors + [self._sink]
|
||||
|
||||
self._link_processors()
|
||||
self._nodes = self._link_processors()
|
||||
|
||||
#
|
||||
# Frame processor
|
||||
@@ -196,17 +196,22 @@ class Pipeline(BasePipeline):
|
||||
|
||||
async def _setup_processors(self, setup: FrameProcessorSetup):
|
||||
"""Set up all processors in the pipeline."""
|
||||
for p in self._processors:
|
||||
await p.setup(setup)
|
||||
for n in self._nodes:
|
||||
await n.setup(setup)
|
||||
|
||||
async def _cleanup_processors(self):
|
||||
"""Clean up all processors in the pipeline."""
|
||||
for p in self._processors:
|
||||
await p.cleanup()
|
||||
for n in self._nodes:
|
||||
await n.cleanup()
|
||||
|
||||
def _link_processors(self):
|
||||
"""Link all processors in sequence and set their parent."""
|
||||
prev = self._processors[0]
|
||||
def _link_processors(self) -> List[PipelineNode]:
|
||||
"""Link all processors in sequence."""
|
||||
nodes = []
|
||||
prev_node = PipelineNode(self._processors[0])
|
||||
nodes.append(prev_node)
|
||||
for curr in self._processors[1:]:
|
||||
prev.link(curr)
|
||||
prev = curr
|
||||
curr_node = PipelineNode(curr)
|
||||
nodes.append(curr_node)
|
||||
prev_node.link(curr_node)
|
||||
prev_node = curr_node
|
||||
return nodes
|
||||
|
||||
140
src/pipecat/pipeline/pipeline_node.py
Normal file
140
src/pipecat/pipeline/pipeline_node.py
Normal file
@@ -0,0 +1,140 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""This module defines pipeline nodes.
|
||||
|
||||
A pipeline node (`PipelineNode`) wraps a frame processor (`FrameProcessor`) and
|
||||
can link to previous and next nodes in the pipeline. Pipeline nodes allow
|
||||
linking frame processors together with the benefit that stateless frame
|
||||
processors can be re-used in different pipelines, since what is linked is the
|
||||
actual pipeline node, not the frame processor itself.
|
||||
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from typing import Optional
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.observers.base_observer import FramePushed
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
|
||||
from pipecat.utils.base_object import BaseObject
|
||||
|
||||
|
||||
class PipelineNode(BaseObject):
|
||||
"""A node in a pipeline that hosts a frame processor.
|
||||
|
||||
A `PipelineNode` wraps a single `FrameProcessor` and is responsible for
|
||||
connecting it to previous and next nodes in a pipeline. It pushes frames
|
||||
emitted by its processor to the appropriate neighbor based on frame
|
||||
direction (UPSTREAM or DOWNSTREAM).
|
||||
"""
|
||||
|
||||
def __init__(self, processor: FrameProcessor):
|
||||
"""Initialize the pipeline node with a given FrameProcessor.
|
||||
|
||||
Args:
|
||||
processor: The FrameProcessor instance that this node will host.
|
||||
"""
|
||||
super().__init__()
|
||||
self._processor = processor
|
||||
|
||||
self._prev: Optional["PipelineNode"] = None
|
||||
self._next: Optional["PipelineNode"] = None
|
||||
|
||||
self.__push_task: Optional[asyncio.Task] = None
|
||||
|
||||
@property
|
||||
def processor(self) -> FrameProcessor:
|
||||
"""Returns the frame processor of this pipeline node."""
|
||||
return self._processor
|
||||
|
||||
@property
|
||||
def next(self) -> Optional["PipelineNode"]:
|
||||
"""Get the next pipeline node.
|
||||
|
||||
Returns:
|
||||
The next node, or None if there's no next node.
|
||||
"""
|
||||
return self._next
|
||||
|
||||
@property
|
||||
def previous(self) -> Optional["PipelineNode"]:
|
||||
"""Get the previous pipeline node.
|
||||
|
||||
Returns:
|
||||
The previous node, or None if there's no previous node.
|
||||
"""
|
||||
return self._prev
|
||||
|
||||
async def setup(self, setup: FrameProcessorSetup):
|
||||
"""Set up this pipeline node.
|
||||
|
||||
This sets up the wrapped frame processor with required components.
|
||||
|
||||
Args:
|
||||
setup: Configuration object containing setup parameters.
|
||||
"""
|
||||
await self.processor.setup(setup)
|
||||
self._clock = setup.clock
|
||||
self._task_manager = setup.task_manager
|
||||
self._observer = setup.observer
|
||||
|
||||
self.__create_push_task()
|
||||
|
||||
async def cleanup(self):
|
||||
"""Clean up this pipeline node."""
|
||||
await super().cleanup()
|
||||
await self.processor.cleanup()
|
||||
if self.__push_task:
|
||||
await self.__push_task
|
||||
self.__push_task = None
|
||||
|
||||
def link(self, node: "PipelineNode"):
|
||||
"""Link this node to the next node in the pipeline.
|
||||
|
||||
Args:
|
||||
node: The node to link to.
|
||||
"""
|
||||
self._next = node
|
||||
node._prev = self
|
||||
logger.debug(f"Linking {self.processor} -> {node.processor}")
|
||||
|
||||
def __create_push_task(self):
|
||||
"""Create the frame push task."""
|
||||
if not self.__push_task:
|
||||
self.__push_task = self._task_manager.create_task(
|
||||
self.__push_task_handler(), f"{self.processor}::_push_task"
|
||||
)
|
||||
|
||||
async def __push_task_handler(self):
|
||||
"""Push task handler.
|
||||
|
||||
Receive frames from the wrapped frame processor and push them to the
|
||||
next or previous node depending on the direction.
|
||||
"""
|
||||
async for frame, direction in self.processor:
|
||||
destination = None
|
||||
if direction == FrameDirection.DOWNSTREAM and self.next:
|
||||
logger.trace(f"Pushing {frame} from {self.processor} to {self.next.processor}")
|
||||
destination = self.next.processor
|
||||
elif direction == FrameDirection.UPSTREAM and self.previous:
|
||||
logger.trace(f"Pushing {frame} upstream from {self} to {self._prev}")
|
||||
destination = self.previous.processor
|
||||
|
||||
if destination:
|
||||
await destination.queue_frame(frame, direction)
|
||||
|
||||
if self._observer and destination:
|
||||
timestamp = self._clock.get_time() if self._clock else 0
|
||||
data = FramePushed(
|
||||
source=self.processor,
|
||||
destination=destination,
|
||||
frame=frame,
|
||||
direction=direction,
|
||||
timestamp=timestamp,
|
||||
)
|
||||
await self._observer.on_push_frame(data)
|
||||
@@ -132,14 +132,17 @@ INPUT_TASK_CANCEL_TIMEOUT_SECS = 3
|
||||
|
||||
|
||||
class FrameProcessor(BaseObject):
|
||||
"""Base class for all frame processors in the pipeline.
|
||||
"""Base class for all frame processors in Pipecat.
|
||||
|
||||
Frame processors are the building blocks of Pipecat pipelines, they can be
|
||||
linked to form complex processing pipelines. They receive frames, process
|
||||
them, and pass them to the next or previous processor in the chain. Each
|
||||
frame processor guarantees frame ordering and processes frames in its own
|
||||
task. System frames are also processed in a separate task which guarantees
|
||||
frame priority.
|
||||
A FrameProcessor is an independent, asynchronous component that consumes
|
||||
input frames and produces zero or more output frames. Frames are delivered
|
||||
to the processor via the `queue_frame(frame, direction)` method. The
|
||||
processor internally manages queues and background tasks to handle incoming
|
||||
frames and generate output frames.
|
||||
|
||||
Output frames are made available through the processor's asynchronous
|
||||
iterator interface, allowing consumers to iterate over processed frames
|
||||
using `async for frame in processor`. Frame ordering is guaranteed.
|
||||
|
||||
Event handlers available:
|
||||
|
||||
@@ -147,6 +150,7 @@ class FrameProcessor(BaseObject):
|
||||
- on_after_process_frame: Called after a frame is processed
|
||||
- on_before_push_frame: Called before a frame is pushed
|
||||
- on_after_push_frame: Called after a frame is pushed
|
||||
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@@ -166,8 +170,6 @@ class FrameProcessor(BaseObject):
|
||||
**kwargs: Additional arguments passed to parent class.
|
||||
"""
|
||||
super().__init__(name=name, **kwargs)
|
||||
self._prev: Optional["FrameProcessor"] = None
|
||||
self._next: Optional["FrameProcessor"] = None
|
||||
|
||||
# Enable direct mode to skip queues and process frames right away.
|
||||
self._enable_direct_mode = enable_direct_mode
|
||||
@@ -234,6 +236,9 @@ class FrameProcessor(BaseObject):
|
||||
self._wait_for_interruption = False
|
||||
self._wait_interruption_event = asyncio.Event()
|
||||
|
||||
# Push queue
|
||||
self.__push_queue = asyncio.Queue()
|
||||
|
||||
# Frame processor events.
|
||||
self._register_event_handler("on_before_process_frame", sync=True)
|
||||
self._register_event_handler("on_after_process_frame", sync=True)
|
||||
@@ -284,24 +289,6 @@ class FrameProcessor(BaseObject):
|
||||
"""
|
||||
return []
|
||||
|
||||
@property
|
||||
def next(self) -> Optional["FrameProcessor"]:
|
||||
"""Get the next processor.
|
||||
|
||||
Returns:
|
||||
The next processor, or None if there's no next processor.
|
||||
"""
|
||||
return self._next
|
||||
|
||||
@property
|
||||
def previous(self) -> Optional["FrameProcessor"]:
|
||||
"""Get the previous processor.
|
||||
|
||||
Returns:
|
||||
The previous processor, or None if there's no previous processor.
|
||||
"""
|
||||
return self._prev
|
||||
|
||||
@property
|
||||
def interruptions_allowed(self):
|
||||
"""Check if interruptions are allowed for this processor.
|
||||
@@ -518,16 +505,7 @@ class FrameProcessor(BaseObject):
|
||||
await self.__cancel_process_task()
|
||||
if self._metrics is not None:
|
||||
await self._metrics.cleanup()
|
||||
|
||||
def link(self, processor: "FrameProcessor"):
|
||||
"""Link this processor to the next processor in the pipeline.
|
||||
|
||||
Args:
|
||||
processor: The processor to link to.
|
||||
"""
|
||||
self._next = processor
|
||||
processor._prev = self
|
||||
logger.debug(f"Linking {self} -> {self._next}")
|
||||
await self.__push_queue.put(None)
|
||||
|
||||
def get_clock(self) -> BaseClock:
|
||||
"""Get the clock used by this processor.
|
||||
@@ -761,36 +739,7 @@ class FrameProcessor(BaseObject):
|
||||
frame: The frame to push.
|
||||
direction: The direction to push the frame.
|
||||
"""
|
||||
try:
|
||||
timestamp = self._clock.get_time() if self._clock else 0
|
||||
if direction == FrameDirection.DOWNSTREAM and self._next:
|
||||
logger.trace(f"Pushing {frame} from {self} to {self._next}")
|
||||
|
||||
if self._observer:
|
||||
data = FramePushed(
|
||||
source=self,
|
||||
destination=self._next,
|
||||
frame=frame,
|
||||
direction=direction,
|
||||
timestamp=timestamp,
|
||||
)
|
||||
await self._observer.on_push_frame(data)
|
||||
await self._next.queue_frame(frame, direction)
|
||||
elif direction == FrameDirection.UPSTREAM and self._prev:
|
||||
logger.trace(f"Pushing {frame} upstream from {self} to {self._prev}")
|
||||
if self._observer:
|
||||
data = FramePushed(
|
||||
source=self,
|
||||
destination=self._prev,
|
||||
frame=frame,
|
||||
direction=direction,
|
||||
timestamp=timestamp,
|
||||
)
|
||||
await self._observer.on_push_frame(data)
|
||||
await self._prev.queue_frame(frame, direction)
|
||||
except Exception as e:
|
||||
logger.exception(f"Uncaught exception in {self}: {e}")
|
||||
await self.push_error(ErrorFrame(str(e)))
|
||||
await self.__push_queue.put((frame, direction))
|
||||
|
||||
def _check_started(self, frame: Frame):
|
||||
"""Check if the processor has been started.
|
||||
@@ -912,3 +861,18 @@ class FrameProcessor(BaseObject):
|
||||
await self.__process_frame(frame, direction, callback)
|
||||
|
||||
self.__process_queue.task_done()
|
||||
|
||||
def __aiter__(self):
|
||||
"""A frame processor is an asynchronous iterator itself."""
|
||||
return self
|
||||
|
||||
async def __anext__(self):
|
||||
"""Retrieve the next frame to push from this processor.
|
||||
|
||||
Returns:
|
||||
The next (frame, direction) item to push form this processor.
|
||||
"""
|
||||
data = await self.__push_queue.get()
|
||||
if data is None:
|
||||
raise StopAsyncIteration
|
||||
return data
|
||||
|
||||
Reference in New Issue
Block a user