Merge pull request #3542 from lukepayyapilli/fix/terminal-frames-uninterruptible
fix: make EndFrame and StopFrame uninterruptible to prevent pipeline freeze
This commit is contained in:
1
changelog/3542.fixed.md
Normal file
1
changelog/3542.fixed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed pipeline freeze when `InterruptionFrame` discards `EndFrame` or `StopFrame` by making terminal frames uninterruptible.
|
||||
@@ -1751,7 +1751,7 @@ class BotInterruptionFrame(InterruptionTaskFrame):
|
||||
|
||||
|
||||
@dataclass
|
||||
class EndFrame(ControlFrame):
|
||||
class EndFrame(ControlFrame, UninterruptibleFrame):
|
||||
"""Frame indicating pipeline has ended and should shut down.
|
||||
|
||||
Indicates that a pipeline has ended and frame processors and pipelines
|
||||
@@ -1760,6 +1760,10 @@ class EndFrame(ControlFrame):
|
||||
that this is a control frame, which means it will be received in the order it
|
||||
was sent.
|
||||
|
||||
This frame is marked as UninterruptibleFrame to ensure it is not lost when
|
||||
an InterruptionFrame is processed. Terminal frames must survive interruption
|
||||
to guarantee proper pipeline shutdown.
|
||||
|
||||
Parameters:
|
||||
reason: Optional reason for pushing an end frame.
|
||||
"""
|
||||
@@ -1771,12 +1775,16 @@ class EndFrame(ControlFrame):
|
||||
|
||||
|
||||
@dataclass
|
||||
class StopFrame(ControlFrame):
|
||||
class StopFrame(ControlFrame, UninterruptibleFrame):
|
||||
"""Frame indicating pipeline should stop but keep processors running.
|
||||
|
||||
Indicates that a pipeline should be stopped but that the pipeline
|
||||
processors should be kept in a running state. This is normally queued from
|
||||
the pipeline task.
|
||||
|
||||
This frame is marked as UninterruptibleFrame to ensure it is not lost when
|
||||
an InterruptionFrame is processed. Terminal frames must survive interruption
|
||||
to guarantee proper pipeline control.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
@@ -15,6 +15,7 @@ from pipecat.frames.frames import (
|
||||
Frame,
|
||||
InterruptionFrame,
|
||||
OutputTransportMessageUrgentFrame,
|
||||
StopFrame,
|
||||
SystemFrame,
|
||||
TextFrame,
|
||||
UninterruptibleFrame,
|
||||
@@ -348,6 +349,106 @@ class TestFrameProcessor(unittest.IsolatedAsyncioTestCase):
|
||||
self.assertIs(down_frame.metadata, orig.metadata)
|
||||
self.assertIs(up_frame.metadata, orig.metadata)
|
||||
|
||||
async def test_terminal_frames_survive_interruption(self):
|
||||
"""Test that EndFrame survives interruption (it is uninterruptible).
|
||||
|
||||
This test simulates issue #3524 where an InterruptionFrame during slow
|
||||
processing would cause terminal frames to be lost, freezing the pipeline.
|
||||
"""
|
||||
received_frames: List[Frame] = []
|
||||
|
||||
class DelayAndInterruptProcessor(FrameProcessor):
|
||||
"""This processor delays processing and then generates an interruption.
|
||||
|
||||
When processing a TextFrame, it sleeps and then pushes an
|
||||
InterruptionFrame to simulate what happens when interruption occurs
|
||||
while a terminal frame is in the queue.
|
||||
"""
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
if isinstance(frame, TextFrame):
|
||||
# Delay to allow EndFrame to be queued
|
||||
await asyncio.sleep(0.1)
|
||||
# Push interruption - this should NOT discard the EndFrame
|
||||
await self.push_frame(InterruptionFrame(), direction)
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
class CaptureFrameProcessor(FrameProcessor):
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
received_frames.append(frame)
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
pipeline = Pipeline([DelayAndInterruptProcessor(), CaptureFrameProcessor()])
|
||||
|
||||
frames_to_send = [
|
||||
TextFrame(text="trigger"),
|
||||
]
|
||||
expected_down_frames = [
|
||||
InterruptionFrame,
|
||||
TextFrame,
|
||||
]
|
||||
await run_test(
|
||||
pipeline,
|
||||
frames_to_send=frames_to_send,
|
||||
expected_down_frames=expected_down_frames,
|
||||
)
|
||||
|
||||
# Verify EndFrame was received by our capture processor (survived interruption)
|
||||
# Note: run_test filters EndFrame from expected_down_frames when send_end_frame=True,
|
||||
# but our capture processor sees it before that filtering.
|
||||
end_frames = [f for f in received_frames if isinstance(f, EndFrame)]
|
||||
self.assertEqual(len(end_frames), 1, "EndFrame should survive interruption")
|
||||
|
||||
async def test_stop_frame_survives_interruption(self):
|
||||
"""Test that StopFrame survives interruption (it is uninterruptible).
|
||||
|
||||
Similar to test_terminal_frames_survive_interruption but specifically
|
||||
for StopFrame.
|
||||
"""
|
||||
received_frames: List[Frame] = []
|
||||
|
||||
class DelayAndInterruptProcessor(FrameProcessor):
|
||||
"""This processor delays processing and then generates an interruption."""
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
if isinstance(frame, TextFrame):
|
||||
# Delay to allow StopFrame to be queued
|
||||
await asyncio.sleep(0.1)
|
||||
# Push interruption - this should NOT discard the StopFrame
|
||||
await self.push_frame(InterruptionFrame(), direction)
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
class CaptureFrameProcessor(FrameProcessor):
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
received_frames.append(frame)
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
pipeline = Pipeline([DelayAndInterruptProcessor(), CaptureFrameProcessor()])
|
||||
|
||||
frames_to_send = [
|
||||
TextFrame(text="trigger"),
|
||||
StopFrame(),
|
||||
]
|
||||
expected_down_frames = [
|
||||
InterruptionFrame,
|
||||
TextFrame,
|
||||
StopFrame,
|
||||
]
|
||||
await run_test(
|
||||
pipeline,
|
||||
frames_to_send=frames_to_send,
|
||||
expected_down_frames=expected_down_frames,
|
||||
send_end_frame=False,
|
||||
)
|
||||
|
||||
# Verify StopFrame was received (survived interruption)
|
||||
stop_frames = [f for f in received_frames if isinstance(f, StopFrame)]
|
||||
self.assertEqual(len(stop_frames), 1, "StopFrame should survive interruption")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user