From 06f7da44f19b8fa949b8335273df3d3a970aabd8 Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Wed, 18 Mar 2026 16:39:20 -0400 Subject: [PATCH] Clarify SyncParallelPipeline docstrings Rewrite docstrings to more clearly explain what SyncParallelPipeline does: hold all output until every parallel branch finishes, so frames produced in response to a single input are released together. --- .../pipeline/sync_parallel_pipeline.py | 65 ++++++++++--------- 1 file changed, 36 insertions(+), 29 deletions(-) diff --git a/src/pipecat/pipeline/sync_parallel_pipeline.py b/src/pipecat/pipeline/sync_parallel_pipeline.py index 9870f03f3..148d29b25 100644 --- a/src/pipecat/pipeline/sync_parallel_pipeline.py +++ b/src/pipecat/pipeline/sync_parallel_pipeline.py @@ -4,11 +4,16 @@ # SPDX-License-Identifier: BSD 2-Clause License # -"""Synchronous parallel pipeline implementation for concurrent frame processing. +"""Synchronized parallel pipeline that holds output until all branches finish. -This module provides a pipeline that processes frames through multiple parallel -pipelines simultaneously, synchronizing their output to maintain frame ordering -and prevent duplicate processing. +A SyncParallelPipeline fans each inbound frame out to multiple parallel pipelines +and waits for every pipeline to finish processing before releasing any of the +resulting output frames. This ensures that all frames produced in response to a +single input frame are emitted together. + +System frames (except EndFrame) are exempt from this synchronization — they pass +straight through without waiting, since they are expected to race ahead of +regular data frames. """ import asyncio @@ -46,20 +51,21 @@ class FrameOrder(Enum): @dataclass class SyncFrame(ControlFrame): - """Control frame used to synchronize parallel pipeline processing. + """Sentinel frame used to detect when a parallel pipeline has finished processing. - This frame is sent through parallel pipelines to determine when the - internal pipelines have finished processing a batch of frames. + After sending a real frame into a parallel pipeline, a SyncFrame is sent + behind it. When the SyncFrame emerges from the pipeline's output, we know + all output frames for the preceding input have been produced. """ pass class SyncParallelPipelineSource(FrameProcessor): - """Source processor for synchronous parallel pipeline processing. + """Bookend processor placed at the start of each parallel pipeline. - Routes frames to parallel pipelines and collects upstream responses - for synchronization purposes. + Forwards downstream frames into the pipeline and captures upstream frames + into a queue so the parent SyncParallelPipeline can release them later. """ def __init__(self, upstream_queue: asyncio.Queue): @@ -88,10 +94,11 @@ class SyncParallelPipelineSource(FrameProcessor): class SyncParallelPipelineSink(FrameProcessor): - """Sink processor for synchronous parallel pipeline processing. + """Bookend processor placed at the end of each parallel pipeline. - Collects downstream frames from parallel pipelines and routes - upstream frames back through the pipeline. + Captures downstream output frames into a queue so the parent + SyncParallelPipeline can release them later, and forwards upstream + frames back through the pipeline. """ def __init__(self, downstream_queue: asyncio.Queue): @@ -120,15 +127,20 @@ class SyncParallelPipelineSink(FrameProcessor): class SyncParallelPipeline(BasePipeline): - """Pipeline that processes frames through multiple parallel pipelines synchronously. + """Fans each input frame to parallel pipelines then holds output until every pipeline finishes. - Creates multiple parallel processing paths that all receive the same input frames - and produces synchronized output. Each parallel path is a separate pipeline that - processes frames independently, with synchronization points to ensure consistent - ordering and prevent duplicate frame processing. + For each inbound frame the pipeline: - The pipeline uses SyncFrame control frames to coordinate between parallel paths - and ensure all paths have completed processing before moving to the next frame. + 1. Sends the frame into every parallel pipeline. + 2. Sends a ``SyncFrame`` sentinel behind it in each pipeline. + 3. Waits until every pipeline has produced its ``SyncFrame``, meaning all + output for that input is ready. + 4. Releases the collected output frames (deduplicating by frame id, since + the same frame may emerge from more than one branch). + + System frames (except ``EndFrame``) bypass this mechanism entirely — they are + forwarded through each pipeline and pushed immediately, since system frames + are expected to race ahead of regular data frames. By default, output frames are pushed in the order they arrive from any pipeline (``FrameOrder.ARRIVAL``). Set ``frame_order=FrameOrder.PIPELINE`` to push frames @@ -239,16 +251,11 @@ class SyncParallelPipeline(BasePipeline): await asyncio.gather(*[p.cleanup() for p in self._pipelines]) async def process_frame(self, frame: Frame, direction: FrameDirection): - """Process frames through all parallel pipelines with synchronization. + """Send a frame through all parallel pipelines and release output once all finish. - Distributes frames to all parallel pipelines and synchronizes their output - to maintain proper ordering and prevent duplicate processing. Uses SyncFrame - control frames to coordinate between parallel paths. - - When ``frame_order`` is ``FrameOrder.ARRIVAL``, output frames are pushed in - the order they arrive from any pipeline (via a shared queue). When it is - ``FrameOrder.PIPELINE``, each pipeline collects its output into a separate - list and the lists are drained in pipeline definition order. + System frames (except EndFrame) skip synchronization and pass straight + through. All other frames are fanned out to every pipeline, and output is + held until every pipeline signals completion (via SyncFrame). Args: frame: The frame to process.