Adds a FrameOrder enum with ARRIVAL (default, existing behavior) and PIPELINE (pushes frames in pipeline definition order). This lets callers guarantee output ordering between parallel pipelines — e.g. ensuring image frames precede audio frames — without needing a separate reordering processor downstream. Updates the 05-sync-speech-and-image example to use FrameOrder.PIPELINE, removing the ImageBeforeAudioReorderer class entirely.
118 lines
3.8 KiB
Python
118 lines
3.8 KiB
Python
#
|
|
# Copyright (c) 2024-2026, Daily
|
|
#
|
|
# SPDX-License-Identifier: BSD 2-Clause License
|
|
#
|
|
|
|
import asyncio
|
|
import unittest
|
|
from dataclasses import dataclass
|
|
|
|
from pipecat.frames.frames import Frame, TextFrame
|
|
from pipecat.pipeline.sync_parallel_pipeline import FrameOrder, SyncParallelPipeline
|
|
from pipecat.processors.filters.identity_filter import IdentityFilter
|
|
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
|
from pipecat.tests.utils import run_test
|
|
|
|
|
|
@dataclass
|
|
class TaggedFrame(Frame):
|
|
"""A simple tagged frame for testing pipeline ordering."""
|
|
|
|
tag: str = ""
|
|
|
|
def __str__(self):
|
|
return f"{self.name}(tag: {self.tag})"
|
|
|
|
|
|
class EmitTaggedFrameProcessor(FrameProcessor):
|
|
"""Emits a TaggedFrame for every TextFrame it receives.
|
|
|
|
Used to produce distinguishable output from different pipelines so tests
|
|
can verify ordering.
|
|
"""
|
|
|
|
def __init__(self, tag: str, *, delay: float = 0, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self._tag = tag
|
|
self._delay = delay
|
|
|
|
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
|
await super().process_frame(frame, direction)
|
|
|
|
if isinstance(frame, TextFrame):
|
|
if self._delay > 0:
|
|
await asyncio.sleep(self._delay)
|
|
await self.push_frame(TaggedFrame(tag=self._tag))
|
|
else:
|
|
await self.push_frame(frame, direction)
|
|
|
|
|
|
class TestSyncParallelPipeline(unittest.IsolatedAsyncioTestCase):
|
|
async def test_dedup_multiple_frames(self):
|
|
"""Identical frames from multiple paths should be deduplicated."""
|
|
pipeline = SyncParallelPipeline([IdentityFilter()], [IdentityFilter()])
|
|
|
|
frames_to_send = [TextFrame(text="one"), TextFrame(text="two")]
|
|
expected_down_frames = [TextFrame, TextFrame]
|
|
await run_test(
|
|
pipeline,
|
|
frames_to_send=frames_to_send,
|
|
expected_down_frames=expected_down_frames,
|
|
)
|
|
|
|
async def test_arrival_order(self):
|
|
"""With FrameOrder.ARRIVAL, a slow first pipeline's frames should
|
|
arrive after a fast second pipeline's frames."""
|
|
pipeline = SyncParallelPipeline(
|
|
[EmitTaggedFrameProcessor("slow", delay=0.05)],
|
|
[EmitTaggedFrameProcessor("fast")],
|
|
frame_order=FrameOrder.ARRIVAL,
|
|
)
|
|
|
|
frames_to_send = [TextFrame(text="one"), TextFrame(text="two")]
|
|
(down_frames, _) = await run_test(
|
|
pipeline,
|
|
frames_to_send=frames_to_send,
|
|
)
|
|
|
|
tags = [f.tag for f in down_frames if isinstance(f, TaggedFrame)]
|
|
assert tags == [
|
|
"fast",
|
|
"slow",
|
|
"fast",
|
|
"slow",
|
|
], f"Expected fast before slow in each batch, got {tags}"
|
|
|
|
async def test_pipeline_order(self):
|
|
"""With FrameOrder.PIPELINE and multiple input frames, each batch
|
|
should follow pipeline definition order regardless of processing speed."""
|
|
pipeline = SyncParallelPipeline(
|
|
[EmitTaggedFrameProcessor("slow", delay=0.05)],
|
|
[EmitTaggedFrameProcessor("fast")],
|
|
frame_order=FrameOrder.PIPELINE,
|
|
)
|
|
|
|
frames_to_send = [TextFrame(text="one"), TextFrame(text="two")]
|
|
(down_frames, _) = await run_test(
|
|
pipeline,
|
|
frames_to_send=frames_to_send,
|
|
)
|
|
|
|
tags = [f.tag for f in down_frames if isinstance(f, TaggedFrame)]
|
|
assert tags == [
|
|
"slow",
|
|
"fast",
|
|
"slow",
|
|
"fast",
|
|
], f"Expected pipeline definition order (slow, fast) in each batch, got {tags}"
|
|
|
|
async def test_default_is_arrival(self):
|
|
"""The default frame_order should be ARRIVAL."""
|
|
pipeline = SyncParallelPipeline([IdentityFilter()])
|
|
assert pipeline._frame_order == FrameOrder.ARRIVAL
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|