Clarify SyncParallelPipeline docstrings

Rewrite docstrings to more clearly explain what SyncParallelPipeline
does: hold all output until every parallel branch finishes, so frames
produced in response to a single input are released together.
This commit is contained in:
Paul Kompfner
2026-03-18 16:39:20 -04:00
parent d702ebd6a2
commit 06f7da44f1

View File

@@ -4,11 +4,16 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Synchronous parallel pipeline implementation for concurrent frame processing.
"""Synchronized parallel pipeline that holds output until all branches finish.
This module provides a pipeline that processes frames through multiple parallel
pipelines simultaneously, synchronizing their output to maintain frame ordering
and prevent duplicate processing.
A SyncParallelPipeline fans each inbound frame out to multiple parallel pipelines
and waits for every pipeline to finish processing before releasing any of the
resulting output frames. This ensures that all frames produced in response to a
single input frame are emitted together.
System frames (except EndFrame) are exempt from this synchronization — they pass
straight through without waiting, since they are expected to race ahead of
regular data frames.
"""
import asyncio
@@ -46,20 +51,21 @@ class FrameOrder(Enum):
@dataclass
class SyncFrame(ControlFrame):
"""Control frame used to synchronize parallel pipeline processing.
"""Sentinel frame used to detect when a parallel pipeline has finished processing.
This frame is sent through parallel pipelines to determine when the
internal pipelines have finished processing a batch of frames.
After sending a real frame into a parallel pipeline, a SyncFrame is sent
behind it. When the SyncFrame emerges from the pipeline's output, we know
all output frames for the preceding input have been produced.
"""
pass
class SyncParallelPipelineSource(FrameProcessor):
"""Source processor for synchronous parallel pipeline processing.
"""Bookend processor placed at the start of each parallel pipeline.
Routes frames to parallel pipelines and collects upstream responses
for synchronization purposes.
Forwards downstream frames into the pipeline and captures upstream frames
into a queue so the parent SyncParallelPipeline can release them later.
"""
def __init__(self, upstream_queue: asyncio.Queue):
@@ -88,10 +94,11 @@ class SyncParallelPipelineSource(FrameProcessor):
class SyncParallelPipelineSink(FrameProcessor):
"""Sink processor for synchronous parallel pipeline processing.
"""Bookend processor placed at the end of each parallel pipeline.
Collects downstream frames from parallel pipelines and routes
upstream frames back through the pipeline.
Captures downstream output frames into a queue so the parent
SyncParallelPipeline can release them later, and forwards upstream
frames back through the pipeline.
"""
def __init__(self, downstream_queue: asyncio.Queue):
@@ -120,15 +127,20 @@ class SyncParallelPipelineSink(FrameProcessor):
class SyncParallelPipeline(BasePipeline):
"""Pipeline that processes frames through multiple parallel pipelines synchronously.
"""Fans each input frame to parallel pipelines then holds output until every pipeline finishes.
Creates multiple parallel processing paths that all receive the same input frames
and produces synchronized output. Each parallel path is a separate pipeline that
processes frames independently, with synchronization points to ensure consistent
ordering and prevent duplicate frame processing.
For each inbound frame the pipeline:
The pipeline uses SyncFrame control frames to coordinate between parallel paths
and ensure all paths have completed processing before moving to the next frame.
1. Sends the frame into every parallel pipeline.
2. Sends a ``SyncFrame`` sentinel behind it in each pipeline.
3. Waits until every pipeline has produced its ``SyncFrame``, meaning all
output for that input is ready.
4. Releases the collected output frames (deduplicating by frame id, since
the same frame may emerge from more than one branch).
System frames (except ``EndFrame``) bypass this mechanism entirely — they are
forwarded through each pipeline and pushed immediately, since system frames
are expected to race ahead of regular data frames.
By default, output frames are pushed in the order they arrive from any pipeline
(``FrameOrder.ARRIVAL``). Set ``frame_order=FrameOrder.PIPELINE`` to push frames
@@ -239,16 +251,11 @@ class SyncParallelPipeline(BasePipeline):
await asyncio.gather(*[p.cleanup() for p in self._pipelines])
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames through all parallel pipelines with synchronization.
"""Send a frame through all parallel pipelines and release output once all finish.
Distributes frames to all parallel pipelines and synchronizes their output
to maintain proper ordering and prevent duplicate processing. Uses SyncFrame
control frames to coordinate between parallel paths.
When ``frame_order`` is ``FrameOrder.ARRIVAL``, output frames are pushed in
the order they arrive from any pipeline (via a shared queue). When it is
``FrameOrder.PIPELINE``, each pipeline collects its output into a separate
list and the lists are drained in pipeline definition order.
System frames (except EndFrame) skip synchronization and pass straight
through. All other frames are fanned out to every pipeline, and output is
held until every pipeline signals completion (via SyncFrame).
Args:
frame: The frame to process.