Files
pipecat/tests/test_sync_parallel_pipeline.py
Paul Kompfner d702ebd6a2 Add frame_order parameter to SyncParallelPipeline
Adds a FrameOrder enum with ARRIVAL (default, existing behavior) and
PIPELINE (pushes frames in pipeline definition order). This lets callers
guarantee output ordering between parallel pipelines — e.g. ensuring
image frames precede audio frames — without needing a separate reordering
processor downstream.

Updates the 05-sync-speech-and-image example to use FrameOrder.PIPELINE,
removing the ImageBeforeAudioReorderer class entirely.
2026-03-19 09:43:51 -04:00

118 lines
3.8 KiB
Python

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import unittest
from dataclasses import dataclass
from pipecat.frames.frames import Frame, TextFrame
from pipecat.pipeline.sync_parallel_pipeline import FrameOrder, SyncParallelPipeline
from pipecat.processors.filters.identity_filter import IdentityFilter
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.tests.utils import run_test
@dataclass
class TaggedFrame(Frame):
"""A simple tagged frame for testing pipeline ordering."""
tag: str = ""
def __str__(self):
return f"{self.name}(tag: {self.tag})"
class EmitTaggedFrameProcessor(FrameProcessor):
"""Emits a TaggedFrame for every TextFrame it receives.
Used to produce distinguishable output from different pipelines so tests
can verify ordering.
"""
def __init__(self, tag: str, *, delay: float = 0, **kwargs):
super().__init__(**kwargs)
self._tag = tag
self._delay = delay
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame):
if self._delay > 0:
await asyncio.sleep(self._delay)
await self.push_frame(TaggedFrame(tag=self._tag))
else:
await self.push_frame(frame, direction)
class TestSyncParallelPipeline(unittest.IsolatedAsyncioTestCase):
async def test_dedup_multiple_frames(self):
"""Identical frames from multiple paths should be deduplicated."""
pipeline = SyncParallelPipeline([IdentityFilter()], [IdentityFilter()])
frames_to_send = [TextFrame(text="one"), TextFrame(text="two")]
expected_down_frames = [TextFrame, TextFrame]
await run_test(
pipeline,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
)
async def test_arrival_order(self):
"""With FrameOrder.ARRIVAL, a slow first pipeline's frames should
arrive after a fast second pipeline's frames."""
pipeline = SyncParallelPipeline(
[EmitTaggedFrameProcessor("slow", delay=0.05)],
[EmitTaggedFrameProcessor("fast")],
frame_order=FrameOrder.ARRIVAL,
)
frames_to_send = [TextFrame(text="one"), TextFrame(text="two")]
(down_frames, _) = await run_test(
pipeline,
frames_to_send=frames_to_send,
)
tags = [f.tag for f in down_frames if isinstance(f, TaggedFrame)]
assert tags == [
"fast",
"slow",
"fast",
"slow",
], f"Expected fast before slow in each batch, got {tags}"
async def test_pipeline_order(self):
"""With FrameOrder.PIPELINE and multiple input frames, each batch
should follow pipeline definition order regardless of processing speed."""
pipeline = SyncParallelPipeline(
[EmitTaggedFrameProcessor("slow", delay=0.05)],
[EmitTaggedFrameProcessor("fast")],
frame_order=FrameOrder.PIPELINE,
)
frames_to_send = [TextFrame(text="one"), TextFrame(text="two")]
(down_frames, _) = await run_test(
pipeline,
frames_to_send=frames_to_send,
)
tags = [f.tag for f in down_frames if isinstance(f, TaggedFrame)]
assert tags == [
"slow",
"fast",
"slow",
"fast",
], f"Expected pipeline definition order (slow, fast) in each batch, got {tags}"
async def test_default_is_arrival(self):
"""The default frame_order should be ARRIVAL."""
pipeline = SyncParallelPipeline([IdentityFilter()])
assert pipeline._frame_order == FrameOrder.ARRIVAL
if __name__ == "__main__":
unittest.main()