Fix SyncParallelPipeline race condition with concurrent SystemFrame processing
The FrameProcessor two-queue architecture processes SystemFrames and non-SystemFrames on separate concurrent async tasks. Both paths called SyncParallelPipeline.process_frame(), which used the same per-pipeline sink queues. A SystemFrame's wait_for_sync could steal frames from a concurrent non-SystemFrame's wait_for_sync, corrupting synchronization and stalling the pipeline. This was triggered by the auto-embedded RTVI processor (added in v0.0.101) which floods OutputTransportMessageUrgentFrame SystemFrames through the pipeline during LLM responses. Fix: SystemFrames (except EndFrame) now take a fast path — passed through internal pipelines and pushed downstream directly without touching the sink queues or drain logic. EndFrame retains the full drain behavior as a lifecycle frame.
This commit is contained in:
@@ -221,6 +221,20 @@ class SyncParallelPipeline(BasePipeline):
|
||||
"""
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
# SystemFrames (but not EndFrame) are simply passed through all
|
||||
# internal pipelines without draining queued output. This avoids
|
||||
# the race condition where a SystemFrame's wait_for_sync steals
|
||||
# frames from a concurrent non-SystemFrame's wait_for_sync.
|
||||
if isinstance(frame, SystemFrame) and not isinstance(frame, EndFrame):
|
||||
if direction == FrameDirection.UPSTREAM:
|
||||
for s in self._sinks:
|
||||
await s["processor"].process_frame(frame, direction)
|
||||
elif direction == FrameDirection.DOWNSTREAM:
|
||||
for s in self._sources:
|
||||
await s["processor"].process_frame(frame, direction)
|
||||
await self.push_frame(frame, direction)
|
||||
return
|
||||
|
||||
# The last processor of each pipeline needs to be synchronous otherwise
|
||||
# this element won't work. Since, we know it should be synchronous we
|
||||
# push a SyncFrame. Since frames are ordered we know this frame will be
|
||||
@@ -235,12 +249,12 @@ class SyncParallelPipeline(BasePipeline):
|
||||
|
||||
await processor.process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, (SystemFrame, EndFrame)):
|
||||
if isinstance(frame, EndFrame):
|
||||
new_frame = await queue.get()
|
||||
if isinstance(new_frame, (SystemFrame, EndFrame)):
|
||||
if isinstance(new_frame, EndFrame):
|
||||
await main_queue.put(new_frame)
|
||||
else:
|
||||
while not isinstance(new_frame, (SystemFrame, EndFrame)):
|
||||
while not isinstance(new_frame, EndFrame):
|
||||
await main_queue.put(new_frame)
|
||||
queue.task_done()
|
||||
new_frame = await queue.get()
|
||||
|
||||
Reference in New Issue
Block a user