diff --git a/src/pipecat/pipeline/sync_parallel_pipeline.py b/src/pipecat/pipeline/sync_parallel_pipeline.py index 3597b0e9e..00af35bbf 100644 --- a/src/pipecat/pipeline/sync_parallel_pipeline.py +++ b/src/pipecat/pipeline/sync_parallel_pipeline.py @@ -221,6 +221,20 @@ class SyncParallelPipeline(BasePipeline): """ await super().process_frame(frame, direction) + # SystemFrames (but not EndFrame) are simply passed through all + # internal pipelines without draining queued output. This avoids + # the race condition where a SystemFrame's wait_for_sync steals + # frames from a concurrent non-SystemFrame's wait_for_sync. + if isinstance(frame, SystemFrame) and not isinstance(frame, EndFrame): + if direction == FrameDirection.UPSTREAM: + for s in self._sinks: + await s["processor"].process_frame(frame, direction) + elif direction == FrameDirection.DOWNSTREAM: + for s in self._sources: + await s["processor"].process_frame(frame, direction) + await self.push_frame(frame, direction) + return + # The last processor of each pipeline needs to be synchronous otherwise # this element won't work. Since, we know it should be synchronous we # push a SyncFrame. Since frames are ordered we know this frame will be @@ -235,12 +249,12 @@ class SyncParallelPipeline(BasePipeline): await processor.process_frame(frame, direction) - if isinstance(frame, (SystemFrame, EndFrame)): + if isinstance(frame, EndFrame): new_frame = await queue.get() - if isinstance(new_frame, (SystemFrame, EndFrame)): + if isinstance(new_frame, EndFrame): await main_queue.put(new_frame) else: - while not isinstance(new_frame, (SystemFrame, EndFrame)): + while not isinstance(new_frame, EndFrame): await main_queue.put(new_frame) queue.task_done() new_frame = await queue.get()