Add frame_order parameter to SyncParallelPipeline

Adds a FrameOrder enum with ARRIVAL (default, existing behavior) and
PIPELINE (pushes frames in pipeline definition order). This lets callers
guarantee output ordering between parallel pipelines — e.g. ensuring
image frames precede audio frames — without needing a separate reordering
processor downstream.

Updates the 05-sync-speech-and-image example to use FrameOrder.PIPELINE,
removing the ImageBeforeAudioReorderer class entirely.
This commit is contained in:
Paul Kompfner
2026-03-16 09:59:13 -04:00
parent 26fc238eb7
commit d702ebd6a2
4 changed files with 214 additions and 88 deletions

View File

@@ -0,0 +1 @@
- Added `frame_order` parameter to `SyncParallelPipeline`. Set `frame_order=FrameOrder.PIPELINE` to push synchronized output frames in pipeline definition order (all frames from the first pipeline, then the second, etc.) instead of the default arrival order.

View File

@@ -12,21 +12,16 @@ from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import (
AggregatedTextFrame,
DataFrame,
Frame,
LLMContextFrame,
LLMFullResponseStartFrame,
OutputImageRawFrame,
TextFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
TTSTextFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
from pipecat.pipeline.sync_parallel_pipeline import FrameOrder, SyncParallelPipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.sentence import SentenceAggregator
@@ -63,61 +58,6 @@ class MarkImageForPlaybackSync(FrameProcessor):
await self.push_frame(frame, direction)
class ImageBeforeAudioReorderer(FrameProcessor):
"""Ensures each image frame precedes its corresponding TTS audio frames.
SyncParallelPipeline guarantees that each image is in the same synchronized
batch as its audio, but doesn't guarantee which branch's output comes first.
This processor detects when TTS frames arrive before their image and holds
them until the image arrives.
All frames pass through immediately unless we detect an ordering problem:
TTS frames arrived without a preceding image for the current batch (identified
by context_id). In that case, the TTS frames are held until the next image
frame, which is pushed first.
"""
def __init__(self):
super().__init__()
self._held_tts_frames = []
self._seen_image = False
self._current_context_id = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, OutputImageRawFrame):
self._seen_image = True
if self._held_tts_frames:
# Image arrived after TTS frames — push image first, then release held frames.
logger.debug("ImageBeforeAudioReorderer: reordered — moved image before audio")
await self.push_frame(frame, direction)
for f in self._held_tts_frames:
await self.push_frame(f, direction)
self._held_tts_frames = []
else:
logger.debug(
"ImageBeforeAudioReorderer: no reorder needed — image was already first"
)
await self.push_frame(frame, direction)
elif isinstance(
frame,
(AggregatedTextFrame, TTSStartedFrame, TTSAudioRawFrame, TTSStoppedFrame, TTSTextFrame),
):
# A new context_id means a new batch — reset image tracking.
context_id = frame.context_id
if context_id and context_id != self._current_context_id:
self._current_context_id = context_id
self._seen_image = False
if self._seen_image:
await self.push_frame(frame, direction)
else:
self._held_tts_frames.append(frame)
else:
await self.push_frame(frame, direction)
class MonthPrepender(FrameProcessor):
def __init__(self):
super().__init__()
@@ -197,22 +137,27 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# that, each pipeline runs concurrently and `SyncParallelPipeline` will
# wait for the input frame to be processed.
#
# We use `FrameOrder.PIPELINE` so that each synchronized batch of output
# frames is pushed in the order the pipelines are listed: image first,
# then audio. This ensures the transport receives the image before the
# audio frames it should accompany.
#
# Note that `SyncParallelPipeline` requires the last processor in each
# of the pipelines to be synchronous. In this case, we use
# `CartesiaHttpTTSService` and `FalImageGenService` which make HTTP
# `FalImageGenService` and `CartesiaHttpTTSService` which make HTTP
# requests and wait for the response.
pipeline = Pipeline(
[
llm, # LLM
sentence_aggregator, # Aggregates LLM output into full sentences
SyncParallelPipeline( # Run pipelines in parallel aggregating the result
[month_prepender, tts], # Create "Month: sentence" and output audio
[
imagegen, # Generate image
MarkImageForPlaybackSync(), # Mark image as needing sync w/audio during playback
],
[month_prepender, tts], # Create "Month: sentence" and output audio
frame_order=FrameOrder.PIPELINE,
),
ImageBeforeAudioReorderer(), # Ensure each image precedes its audio (important for playback)
transport.output(), # Transport output
]
)

View File

@@ -13,6 +13,7 @@ and prevent duplicate processing.
import asyncio
from dataclasses import dataclass
from enum import Enum
from itertools import chain
from typing import List
@@ -24,6 +25,25 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
class FrameOrder(Enum):
"""Controls the order in which synchronized frames are pushed downstream.
When multiple parallel pipelines produce output for the same input frame,
this setting determines the order in which those output frames are pushed.
Attributes:
ARRIVAL: Frames are pushed in the order they arrive from any pipeline.
This is the default and matches the behavior of prior versions.
PIPELINE: Frames are pushed in pipeline definition order — all frames
from the first pipeline are pushed, then all frames from the second
pipeline, and so on. Useful when the relative ordering between
pipelines matters (e.g. ensuring image frames precede audio frames).
"""
ARRIVAL = "arrival"
PIPELINE = "pipeline"
@dataclass
class SyncFrame(ControlFrame):
"""Control frame used to synchronize parallel pipeline processing.
@@ -109,20 +129,30 @@ class SyncParallelPipeline(BasePipeline):
The pipeline uses SyncFrame control frames to coordinate between parallel paths
and ensure all paths have completed processing before moving to the next frame.
By default, output frames are pushed in the order they arrive from any pipeline
(``FrameOrder.ARRIVAL``). Set ``frame_order=FrameOrder.PIPELINE`` to push frames
in pipeline definition order instead — all output from the first pipeline, then
the second, and so on.
"""
def __init__(self, *args):
def __init__(self, *args, frame_order: FrameOrder = FrameOrder.ARRIVAL):
"""Initialize the synchronous parallel pipeline.
Args:
*args: Variable number of processor lists, each representing a parallel pipeline path.
Each argument should be a list of FrameProcessor instances.
*args: Variable number of processor lists, each representing a parallel
pipeline path. Each argument should be a list of FrameProcessor instances.
frame_order: Controls the order in which synchronized output frames are
pushed. ``FrameOrder.ARRIVAL`` (default) pushes frames in the order they arrive.
``FrameOrder.PIPELINE`` pushes all frames from the first pipeline
before the second, and so on.
Raises:
Exception: If no arguments are provided.
TypeError: If any argument is not a list of processors.
"""
super().__init__()
self._frame_order = frame_order
if len(args) == 0:
raise Exception(f"SyncParallelPipeline needs at least one argument")
@@ -215,6 +245,11 @@ class SyncParallelPipeline(BasePipeline):
to maintain proper ordering and prevent duplicate processing. Uses SyncFrame
control frames to coordinate between parallel paths.
When ``frame_order`` is ``FrameOrder.ARRIVAL``, output frames are pushed in
the order they arrive from any pipeline (via a shared queue). When it is
``FrameOrder.PIPELINE``, each pipeline collects its output into a separate
list and the lists are drained in pipeline definition order.
Args:
frame: The frame to process.
direction: The direction of frame flow.
@@ -235,60 +270,88 @@ class SyncParallelPipeline(BasePipeline):
await self.push_frame(frame, direction)
return
use_pipeline_order = self._frame_order == FrameOrder.PIPELINE
# The last processor of each pipeline needs to be synchronous otherwise
# this element won't work. Since, we know it should be synchronous we
# this element won't work. Since we know it should be synchronous we
# push a SyncFrame. Since frames are ordered we know this frame will be
# pushed after the synchronous processor has pushed its data allowing us
# to synchronize all the internal pipelines by waiting for the
# SyncFrame in all of them.
#
# In ARRIVAL mode, output frames are put onto a shared main_queue as
# they arrive. In PIPELINE mode, they are accumulated in a per-pipeline
# list and returned so the caller can drain them in definition order.
async def wait_for_sync(
obj, main_queue: asyncio.Queue, frame: Frame, direction: FrameDirection
):
) -> list[Frame]:
processor = obj["processor"]
queue = obj["queue"]
output_frames: list[Frame] = []
await processor.process_frame(frame, direction)
if isinstance(frame, EndFrame):
new_frame = await queue.get()
if isinstance(new_frame, EndFrame):
await main_queue.put(new_frame)
if use_pipeline_order:
output_frames.append(new_frame)
else:
await main_queue.put(new_frame)
else:
while not isinstance(new_frame, EndFrame):
await main_queue.put(new_frame)
if use_pipeline_order:
output_frames.append(new_frame)
else:
await main_queue.put(new_frame)
queue.task_done()
new_frame = await queue.get()
else:
await processor.process_frame(SyncFrame(), direction)
new_frame = await queue.get()
while not isinstance(new_frame, SyncFrame):
await main_queue.put(new_frame)
if use_pipeline_order:
output_frames.append(new_frame)
else:
await main_queue.put(new_frame)
queue.task_done()
new_frame = await queue.get()
return output_frames
if direction == FrameDirection.UPSTREAM:
# If we get an upstream frame we process it in each sink.
await asyncio.gather(
frames_per_pipeline = await asyncio.gather(
*[wait_for_sync(s, self._up_queue, frame, direction) for s in self._sinks]
)
elif direction == FrameDirection.DOWNSTREAM:
# If we get a downstream frame we process it in each source.
await asyncio.gather(
frames_per_pipeline = await asyncio.gather(
*[wait_for_sync(s, self._down_queue, frame, direction) for s in self._sources]
)
seen_ids = set()
while not self._up_queue.empty():
frame = await self._up_queue.get()
if frame.id not in seen_ids:
await self.push_frame(frame, FrameDirection.UPSTREAM)
seen_ids.add(frame.id)
self._up_queue.task_done()
if use_pipeline_order:
# Push frames in pipeline definition order, deduplicating by id.
seen_ids = set()
for pipeline_frames in frames_per_pipeline:
for f in pipeline_frames:
if f.id not in seen_ids:
await self.push_frame(f, direction)
seen_ids.add(f.id)
else:
# ARRIVAL mode: drain the shared queues in the order frames arrived.
seen_ids = set()
while not self._up_queue.empty():
frame = await self._up_queue.get()
if frame.id not in seen_ids:
await self.push_frame(frame, FrameDirection.UPSTREAM)
seen_ids.add(frame.id)
self._up_queue.task_done()
seen_ids = set()
while not self._down_queue.empty():
frame = await self._down_queue.get()
if frame.id not in seen_ids:
await self.push_frame(frame, FrameDirection.DOWNSTREAM)
seen_ids.add(frame.id)
self._down_queue.task_done()
seen_ids = set()
while not self._down_queue.empty():
frame = await self._down_queue.get()
if frame.id not in seen_ids:
await self.push_frame(frame, FrameDirection.DOWNSTREAM)
seen_ids.add(frame.id)
self._down_queue.task_done()

View File

@@ -0,0 +1,117 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import unittest
from dataclasses import dataclass
from pipecat.frames.frames import Frame, TextFrame
from pipecat.pipeline.sync_parallel_pipeline import FrameOrder, SyncParallelPipeline
from pipecat.processors.filters.identity_filter import IdentityFilter
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.tests.utils import run_test
@dataclass
class TaggedFrame(Frame):
"""A simple tagged frame for testing pipeline ordering."""
tag: str = ""
def __str__(self):
return f"{self.name}(tag: {self.tag})"
class EmitTaggedFrameProcessor(FrameProcessor):
"""Emits a TaggedFrame for every TextFrame it receives.
Used to produce distinguishable output from different pipelines so tests
can verify ordering.
"""
def __init__(self, tag: str, *, delay: float = 0, **kwargs):
super().__init__(**kwargs)
self._tag = tag
self._delay = delay
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame):
if self._delay > 0:
await asyncio.sleep(self._delay)
await self.push_frame(TaggedFrame(tag=self._tag))
else:
await self.push_frame(frame, direction)
class TestSyncParallelPipeline(unittest.IsolatedAsyncioTestCase):
async def test_dedup_multiple_frames(self):
"""Identical frames from multiple paths should be deduplicated."""
pipeline = SyncParallelPipeline([IdentityFilter()], [IdentityFilter()])
frames_to_send = [TextFrame(text="one"), TextFrame(text="two")]
expected_down_frames = [TextFrame, TextFrame]
await run_test(
pipeline,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
)
async def test_arrival_order(self):
"""With FrameOrder.ARRIVAL, a slow first pipeline's frames should
arrive after a fast second pipeline's frames."""
pipeline = SyncParallelPipeline(
[EmitTaggedFrameProcessor("slow", delay=0.05)],
[EmitTaggedFrameProcessor("fast")],
frame_order=FrameOrder.ARRIVAL,
)
frames_to_send = [TextFrame(text="one"), TextFrame(text="two")]
(down_frames, _) = await run_test(
pipeline,
frames_to_send=frames_to_send,
)
tags = [f.tag for f in down_frames if isinstance(f, TaggedFrame)]
assert tags == [
"fast",
"slow",
"fast",
"slow",
], f"Expected fast before slow in each batch, got {tags}"
async def test_pipeline_order(self):
"""With FrameOrder.PIPELINE and multiple input frames, each batch
should follow pipeline definition order regardless of processing speed."""
pipeline = SyncParallelPipeline(
[EmitTaggedFrameProcessor("slow", delay=0.05)],
[EmitTaggedFrameProcessor("fast")],
frame_order=FrameOrder.PIPELINE,
)
frames_to_send = [TextFrame(text="one"), TextFrame(text="two")]
(down_frames, _) = await run_test(
pipeline,
frames_to_send=frames_to_send,
)
tags = [f.tag for f in down_frames if isinstance(f, TaggedFrame)]
assert tags == [
"slow",
"fast",
"slow",
"fast",
], f"Expected pipeline definition order (slow, fast) in each batch, got {tags}"
async def test_default_is_arrival(self):
"""The default frame_order should be ARRIVAL."""
pipeline = SyncParallelPipeline([IdentityFilter()])
assert pipeline._frame_order == FrameOrder.ARRIVAL
if __name__ == "__main__":
unittest.main()