From 1ede8460a2d775ea9580f53af0f7a772b39c7df6 Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Fri, 13 Mar 2026 14:17:35 -0400 Subject: [PATCH] Fix SyncParallelPipeline race condition with concurrent SystemFrame processing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The FrameProcessor two-queue architecture processes SystemFrames and non-SystemFrames on separate concurrent async tasks. Both paths called SyncParallelPipeline.process_frame(), which used the same per-pipeline sink queues. A SystemFrame's wait_for_sync could steal frames from a concurrent non-SystemFrame's wait_for_sync, corrupting synchronization and stalling the pipeline. This was triggered by the auto-embedded RTVI processor (added in v0.0.101) which floods OutputTransportMessageUrgentFrame SystemFrames through the pipeline during LLM responses. Fix: SystemFrames (except EndFrame) now take a fast path — passed through internal pipelines and pushed downstream directly without touching the sink queues or drain logic. EndFrame retains the full drain behavior as a lifecycle frame. --- .../pipeline/sync_parallel_pipeline.py | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/pipecat/pipeline/sync_parallel_pipeline.py b/src/pipecat/pipeline/sync_parallel_pipeline.py index 3597b0e9e..00af35bbf 100644 --- a/src/pipecat/pipeline/sync_parallel_pipeline.py +++ b/src/pipecat/pipeline/sync_parallel_pipeline.py @@ -221,6 +221,20 @@ class SyncParallelPipeline(BasePipeline): """ await super().process_frame(frame, direction) + # SystemFrames (but not EndFrame) are simply passed through all + # internal pipelines without draining queued output. This avoids + # the race condition where a SystemFrame's wait_for_sync steals + # frames from a concurrent non-SystemFrame's wait_for_sync. + if isinstance(frame, SystemFrame) and not isinstance(frame, EndFrame): + if direction == FrameDirection.UPSTREAM: + for s in self._sinks: + await s["processor"].process_frame(frame, direction) + elif direction == FrameDirection.DOWNSTREAM: + for s in self._sources: + await s["processor"].process_frame(frame, direction) + await self.push_frame(frame, direction) + return + # The last processor of each pipeline needs to be synchronous otherwise # this element won't work. Since, we know it should be synchronous we # push a SyncFrame. Since frames are ordered we know this frame will be @@ -235,12 +249,12 @@ class SyncParallelPipeline(BasePipeline): await processor.process_frame(frame, direction) - if isinstance(frame, (SystemFrame, EndFrame)): + if isinstance(frame, EndFrame): new_frame = await queue.get() - if isinstance(new_frame, (SystemFrame, EndFrame)): + if isinstance(new_frame, EndFrame): await main_queue.put(new_frame) else: - while not isinstance(new_frame, (SystemFrame, EndFrame)): + while not isinstance(new_frame, EndFrame): await main_queue.put(new_frame) queue.task_done() new_frame = await queue.get()