From 0a4acfa2944fc4d66cdee10fcd710d7d06a42385 Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Mon, 16 Mar 2026 09:59:13 -0400 Subject: [PATCH] Add frame_order parameter to SyncParallelPipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a FrameOrder enum with ARRIVAL (default, existing behavior) and PIPELINE (pushes frames in pipeline definition order). This lets callers guarantee output ordering between parallel pipelines — e.g. ensuring image frames precede audio frames — without needing a separate reordering processor downstream. Updates the 05-sync-speech-and-image example to use FrameOrder.PIPELINE, removing the ImageBeforeAudioReorderer class entirely. --- changelog/4029.added.2.md | 1 + .../foundational/05-sync-speech-and-image.py | 73 ++--------- .../pipeline/sync_parallel_pipeline.py | 111 +++++++++++++---- tests/test_sync_parallel_pipeline.py | 117 ++++++++++++++++++ 4 files changed, 214 insertions(+), 88 deletions(-) create mode 100644 changelog/4029.added.2.md create mode 100644 tests/test_sync_parallel_pipeline.py diff --git a/changelog/4029.added.2.md b/changelog/4029.added.2.md new file mode 100644 index 000000000..1ae691442 --- /dev/null +++ b/changelog/4029.added.2.md @@ -0,0 +1 @@ +- Added `frame_order` parameter to `SyncParallelPipeline`. Set `frame_order=FrameOrder.PIPELINE` to push synchronized output frames in pipeline definition order (all frames from the first pipeline, then the second, etc.) instead of the default arrival order. diff --git a/examples/foundational/05-sync-speech-and-image.py b/examples/foundational/05-sync-speech-and-image.py index 646235e7c..f0e2ff9c7 100644 --- a/examples/foundational/05-sync-speech-and-image.py +++ b/examples/foundational/05-sync-speech-and-image.py @@ -12,21 +12,16 @@ from dotenv import load_dotenv from loguru import logger from pipecat.frames.frames import ( - AggregatedTextFrame, DataFrame, Frame, LLMContextFrame, LLMFullResponseStartFrame, OutputImageRawFrame, TextFrame, - TTSAudioRawFrame, - TTSStartedFrame, - TTSStoppedFrame, - TTSTextFrame, ) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline +from pipecat.pipeline.sync_parallel_pipeline import FrameOrder, SyncParallelPipeline from pipecat.pipeline.task import PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.sentence import SentenceAggregator @@ -63,61 +58,6 @@ class MarkImageForPlaybackSync(FrameProcessor): await self.push_frame(frame, direction) -class ImageBeforeAudioReorderer(FrameProcessor): - """Ensures each image frame precedes its corresponding TTS audio frames. - - SyncParallelPipeline guarantees that each image is in the same synchronized - batch as its audio, but doesn't guarantee which branch's output comes first. - This processor detects when TTS frames arrive before their image and holds - them until the image arrives. - - All frames pass through immediately unless we detect an ordering problem: - TTS frames arrived without a preceding image for the current batch (identified - by context_id). In that case, the TTS frames are held until the next image - frame, which is pushed first. - """ - - def __init__(self): - super().__init__() - self._held_tts_frames = [] - self._seen_image = False - self._current_context_id = None - - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, OutputImageRawFrame): - self._seen_image = True - if self._held_tts_frames: - # Image arrived after TTS frames — push image first, then release held frames. - logger.debug("ImageBeforeAudioReorderer: reordered — moved image before audio") - await self.push_frame(frame, direction) - for f in self._held_tts_frames: - await self.push_frame(f, direction) - self._held_tts_frames = [] - else: - logger.debug( - "ImageBeforeAudioReorderer: no reorder needed — image was already first" - ) - await self.push_frame(frame, direction) - elif isinstance( - frame, - (AggregatedTextFrame, TTSStartedFrame, TTSAudioRawFrame, TTSStoppedFrame, TTSTextFrame), - ): - # A new context_id means a new batch — reset image tracking. - context_id = frame.context_id - if context_id and context_id != self._current_context_id: - self._current_context_id = context_id - self._seen_image = False - - if self._seen_image: - await self.push_frame(frame, direction) - else: - self._held_tts_frames.append(frame) - else: - await self.push_frame(frame, direction) - - class MonthPrepender(FrameProcessor): def __init__(self): super().__init__() @@ -197,22 +137,27 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): # that, each pipeline runs concurrently and `SyncParallelPipeline` will # wait for the input frame to be processed. # + # We use `FrameOrder.PIPELINE` so that each synchronized batch of output + # frames is pushed in the order the pipelines are listed: image first, + # then audio. This ensures the transport receives the image before the + # audio frames it should accompany. + # # Note that `SyncParallelPipeline` requires the last processor in each # of the pipelines to be synchronous. In this case, we use - # `CartesiaHttpTTSService` and `FalImageGenService` which make HTTP + # `FalImageGenService` and `CartesiaHttpTTSService` which make HTTP # requests and wait for the response. pipeline = Pipeline( [ llm, # LLM sentence_aggregator, # Aggregates LLM output into full sentences SyncParallelPipeline( # Run pipelines in parallel aggregating the result - [month_prepender, tts], # Create "Month: sentence" and output audio [ imagegen, # Generate image MarkImageForPlaybackSync(), # Mark image as needing sync w/audio during playback ], + [month_prepender, tts], # Create "Month: sentence" and output audio + frame_order=FrameOrder.PIPELINE, ), - ImageBeforeAudioReorderer(), # Ensure each image precedes its audio (important for playback) transport.output(), # Transport output ] ) diff --git a/src/pipecat/pipeline/sync_parallel_pipeline.py b/src/pipecat/pipeline/sync_parallel_pipeline.py index 00af35bbf..9870f03f3 100644 --- a/src/pipecat/pipeline/sync_parallel_pipeline.py +++ b/src/pipecat/pipeline/sync_parallel_pipeline.py @@ -13,6 +13,7 @@ and prevent duplicate processing. import asyncio from dataclasses import dataclass +from enum import Enum from itertools import chain from typing import List @@ -24,6 +25,25 @@ from pipecat.pipeline.pipeline import Pipeline from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup +class FrameOrder(Enum): + """Controls the order in which synchronized frames are pushed downstream. + + When multiple parallel pipelines produce output for the same input frame, + this setting determines the order in which those output frames are pushed. + + Attributes: + ARRIVAL: Frames are pushed in the order they arrive from any pipeline. + This is the default and matches the behavior of prior versions. + PIPELINE: Frames are pushed in pipeline definition order — all frames + from the first pipeline are pushed, then all frames from the second + pipeline, and so on. Useful when the relative ordering between + pipelines matters (e.g. ensuring image frames precede audio frames). + """ + + ARRIVAL = "arrival" + PIPELINE = "pipeline" + + @dataclass class SyncFrame(ControlFrame): """Control frame used to synchronize parallel pipeline processing. @@ -109,20 +129,30 @@ class SyncParallelPipeline(BasePipeline): The pipeline uses SyncFrame control frames to coordinate between parallel paths and ensure all paths have completed processing before moving to the next frame. + + By default, output frames are pushed in the order they arrive from any pipeline + (``FrameOrder.ARRIVAL``). Set ``frame_order=FrameOrder.PIPELINE`` to push frames + in pipeline definition order instead — all output from the first pipeline, then + the second, and so on. """ - def __init__(self, *args): + def __init__(self, *args, frame_order: FrameOrder = FrameOrder.ARRIVAL): """Initialize the synchronous parallel pipeline. Args: - *args: Variable number of processor lists, each representing a parallel pipeline path. - Each argument should be a list of FrameProcessor instances. + *args: Variable number of processor lists, each representing a parallel + pipeline path. Each argument should be a list of FrameProcessor instances. + frame_order: Controls the order in which synchronized output frames are + pushed. ``FrameOrder.ARRIVAL`` (default) pushes frames in the order they arrive. + ``FrameOrder.PIPELINE`` pushes all frames from the first pipeline + before the second, and so on. Raises: Exception: If no arguments are provided. TypeError: If any argument is not a list of processors. """ super().__init__() + self._frame_order = frame_order if len(args) == 0: raise Exception(f"SyncParallelPipeline needs at least one argument") @@ -215,6 +245,11 @@ class SyncParallelPipeline(BasePipeline): to maintain proper ordering and prevent duplicate processing. Uses SyncFrame control frames to coordinate between parallel paths. + When ``frame_order`` is ``FrameOrder.ARRIVAL``, output frames are pushed in + the order they arrive from any pipeline (via a shared queue). When it is + ``FrameOrder.PIPELINE``, each pipeline collects its output into a separate + list and the lists are drained in pipeline definition order. + Args: frame: The frame to process. direction: The direction of frame flow. @@ -235,60 +270,88 @@ class SyncParallelPipeline(BasePipeline): await self.push_frame(frame, direction) return + use_pipeline_order = self._frame_order == FrameOrder.PIPELINE + # The last processor of each pipeline needs to be synchronous otherwise - # this element won't work. Since, we know it should be synchronous we + # this element won't work. Since we know it should be synchronous we # push a SyncFrame. Since frames are ordered we know this frame will be # pushed after the synchronous processor has pushed its data allowing us # to synchronize all the internal pipelines by waiting for the # SyncFrame in all of them. + # + # In ARRIVAL mode, output frames are put onto a shared main_queue as + # they arrive. In PIPELINE mode, they are accumulated in a per-pipeline + # list and returned so the caller can drain them in definition order. async def wait_for_sync( obj, main_queue: asyncio.Queue, frame: Frame, direction: FrameDirection - ): + ) -> list[Frame]: processor = obj["processor"] queue = obj["queue"] + output_frames: list[Frame] = [] await processor.process_frame(frame, direction) if isinstance(frame, EndFrame): new_frame = await queue.get() if isinstance(new_frame, EndFrame): - await main_queue.put(new_frame) + if use_pipeline_order: + output_frames.append(new_frame) + else: + await main_queue.put(new_frame) else: while not isinstance(new_frame, EndFrame): - await main_queue.put(new_frame) + if use_pipeline_order: + output_frames.append(new_frame) + else: + await main_queue.put(new_frame) queue.task_done() new_frame = await queue.get() else: await processor.process_frame(SyncFrame(), direction) new_frame = await queue.get() while not isinstance(new_frame, SyncFrame): - await main_queue.put(new_frame) + if use_pipeline_order: + output_frames.append(new_frame) + else: + await main_queue.put(new_frame) queue.task_done() new_frame = await queue.get() + return output_frames + if direction == FrameDirection.UPSTREAM: # If we get an upstream frame we process it in each sink. - await asyncio.gather( + frames_per_pipeline = await asyncio.gather( *[wait_for_sync(s, self._up_queue, frame, direction) for s in self._sinks] ) elif direction == FrameDirection.DOWNSTREAM: # If we get a downstream frame we process it in each source. - await asyncio.gather( + frames_per_pipeline = await asyncio.gather( *[wait_for_sync(s, self._down_queue, frame, direction) for s in self._sources] ) - seen_ids = set() - while not self._up_queue.empty(): - frame = await self._up_queue.get() - if frame.id not in seen_ids: - await self.push_frame(frame, FrameDirection.UPSTREAM) - seen_ids.add(frame.id) - self._up_queue.task_done() + if use_pipeline_order: + # Push frames in pipeline definition order, deduplicating by id. + seen_ids = set() + for pipeline_frames in frames_per_pipeline: + for f in pipeline_frames: + if f.id not in seen_ids: + await self.push_frame(f, direction) + seen_ids.add(f.id) + else: + # ARRIVAL mode: drain the shared queues in the order frames arrived. + seen_ids = set() + while not self._up_queue.empty(): + frame = await self._up_queue.get() + if frame.id not in seen_ids: + await self.push_frame(frame, FrameDirection.UPSTREAM) + seen_ids.add(frame.id) + self._up_queue.task_done() - seen_ids = set() - while not self._down_queue.empty(): - frame = await self._down_queue.get() - if frame.id not in seen_ids: - await self.push_frame(frame, FrameDirection.DOWNSTREAM) - seen_ids.add(frame.id) - self._down_queue.task_done() + seen_ids = set() + while not self._down_queue.empty(): + frame = await self._down_queue.get() + if frame.id not in seen_ids: + await self.push_frame(frame, FrameDirection.DOWNSTREAM) + seen_ids.add(frame.id) + self._down_queue.task_done() diff --git a/tests/test_sync_parallel_pipeline.py b/tests/test_sync_parallel_pipeline.py new file mode 100644 index 000000000..6c6faf7c7 --- /dev/null +++ b/tests/test_sync_parallel_pipeline.py @@ -0,0 +1,117 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import unittest +from dataclasses import dataclass + +from pipecat.frames.frames import Frame, TextFrame +from pipecat.pipeline.sync_parallel_pipeline import FrameOrder, SyncParallelPipeline +from pipecat.processors.filters.identity_filter import IdentityFilter +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.tests.utils import run_test + + +@dataclass +class TaggedFrame(Frame): + """A simple tagged frame for testing pipeline ordering.""" + + tag: str = "" + + def __str__(self): + return f"{self.name}(tag: {self.tag})" + + +class EmitTaggedFrameProcessor(FrameProcessor): + """Emits a TaggedFrame for every TextFrame it receives. + + Used to produce distinguishable output from different pipelines so tests + can verify ordering. + """ + + def __init__(self, tag: str, *, delay: float = 0, **kwargs): + super().__init__(**kwargs) + self._tag = tag + self._delay = delay + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, TextFrame): + if self._delay > 0: + await asyncio.sleep(self._delay) + await self.push_frame(TaggedFrame(tag=self._tag)) + else: + await self.push_frame(frame, direction) + + +class TestSyncParallelPipeline(unittest.IsolatedAsyncioTestCase): + async def test_dedup_multiple_frames(self): + """Identical frames from multiple paths should be deduplicated.""" + pipeline = SyncParallelPipeline([IdentityFilter()], [IdentityFilter()]) + + frames_to_send = [TextFrame(text="one"), TextFrame(text="two")] + expected_down_frames = [TextFrame, TextFrame] + await run_test( + pipeline, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + ) + + async def test_arrival_order(self): + """With FrameOrder.ARRIVAL, a slow first pipeline's frames should + arrive after a fast second pipeline's frames.""" + pipeline = SyncParallelPipeline( + [EmitTaggedFrameProcessor("slow", delay=0.05)], + [EmitTaggedFrameProcessor("fast")], + frame_order=FrameOrder.ARRIVAL, + ) + + frames_to_send = [TextFrame(text="one"), TextFrame(text="two")] + (down_frames, _) = await run_test( + pipeline, + frames_to_send=frames_to_send, + ) + + tags = [f.tag for f in down_frames if isinstance(f, TaggedFrame)] + assert tags == [ + "fast", + "slow", + "fast", + "slow", + ], f"Expected fast before slow in each batch, got {tags}" + + async def test_pipeline_order(self): + """With FrameOrder.PIPELINE and multiple input frames, each batch + should follow pipeline definition order regardless of processing speed.""" + pipeline = SyncParallelPipeline( + [EmitTaggedFrameProcessor("slow", delay=0.05)], + [EmitTaggedFrameProcessor("fast")], + frame_order=FrameOrder.PIPELINE, + ) + + frames_to_send = [TextFrame(text="one"), TextFrame(text="two")] + (down_frames, _) = await run_test( + pipeline, + frames_to_send=frames_to_send, + ) + + tags = [f.tag for f in down_frames if isinstance(f, TaggedFrame)] + assert tags == [ + "slow", + "fast", + "slow", + "fast", + ], f"Expected pipeline definition order (slow, fast) in each batch, got {tags}" + + async def test_default_is_arrival(self): + """The default frame_order should be ARRIVAL.""" + pipeline = SyncParallelPipeline([IdentityFilter()]) + assert pipeline._frame_order == FrameOrder.ARRIVAL + + +if __name__ == "__main__": + unittest.main()