Remove an unnecessary check in SyncParallelPipeline

This commit is contained in:
Paul Kompfner
2026-03-23 10:00:32 -04:00
parent ec3bd8c5b1
commit e93b0ace06

View File

@@ -263,11 +263,11 @@ class SyncParallelPipeline(BasePipeline):
"""
await super().process_frame(frame, direction)
# SystemFrames (but not EndFrame) are simply passed through all
# internal pipelines without draining queued output. This avoids
# the race condition where a SystemFrame's wait_for_sync steals
# frames from a concurrent non-SystemFrame's wait_for_sync.
if isinstance(frame, SystemFrame) and not isinstance(frame, EndFrame):
# SystemFrames are simply passed through all internal pipelines without
# draining queued output. This avoids the race condition where a
# SystemFrame's wait_for_sync steals frames from a concurrent
# non-SystemFrame's wait_for_sync.
if isinstance(frame, SystemFrame):
if direction == FrameDirection.UPSTREAM:
for s in self._sinks:
await s["processor"].process_frame(frame, direction)