diff --git a/changelog/3542.fixed.md b/changelog/3542.fixed.md new file mode 100644 index 000000000..523754b28 --- /dev/null +++ b/changelog/3542.fixed.md @@ -0,0 +1 @@ +- Fixed pipeline freeze when `InterruptionFrame` discards `EndFrame` or `StopFrame` by making terminal frames uninterruptible. diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 8df97d313..b0e196a22 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -1751,7 +1751,7 @@ class BotInterruptionFrame(InterruptionTaskFrame): @dataclass -class EndFrame(ControlFrame): +class EndFrame(ControlFrame, UninterruptibleFrame): """Frame indicating pipeline has ended and should shut down. Indicates that a pipeline has ended and frame processors and pipelines @@ -1760,6 +1760,10 @@ class EndFrame(ControlFrame): that this is a control frame, which means it will be received in the order it was sent. + This frame is marked as UninterruptibleFrame to ensure it is not lost when + an InterruptionFrame is processed. Terminal frames must survive interruption + to guarantee proper pipeline shutdown. + Parameters: reason: Optional reason for pushing an end frame. """ @@ -1771,12 +1775,16 @@ class EndFrame(ControlFrame): @dataclass -class StopFrame(ControlFrame): +class StopFrame(ControlFrame, UninterruptibleFrame): """Frame indicating pipeline should stop but keep processors running. Indicates that a pipeline should be stopped but that the pipeline processors should be kept in a running state. This is normally queued from the pipeline task. + + This frame is marked as UninterruptibleFrame to ensure it is not lost when + an InterruptionFrame is processed. Terminal frames must survive interruption + to guarantee proper pipeline control. """ pass diff --git a/tests/test_frame_processor.py b/tests/test_frame_processor.py index f44181858..3a47520b3 100644 --- a/tests/test_frame_processor.py +++ b/tests/test_frame_processor.py @@ -15,6 +15,7 @@ from pipecat.frames.frames import ( Frame, InterruptionFrame, OutputTransportMessageUrgentFrame, + StopFrame, SystemFrame, TextFrame, UninterruptibleFrame, @@ -348,6 +349,106 @@ class TestFrameProcessor(unittest.IsolatedAsyncioTestCase): self.assertIs(down_frame.metadata, orig.metadata) self.assertIs(up_frame.metadata, orig.metadata) + async def test_terminal_frames_survive_interruption(self): + """Test that EndFrame survives interruption (it is uninterruptible). + + This test simulates issue #3524 where an InterruptionFrame during slow + processing would cause terminal frames to be lost, freezing the pipeline. + """ + received_frames: List[Frame] = [] + + class DelayAndInterruptProcessor(FrameProcessor): + """This processor delays processing and then generates an interruption. + + When processing a TextFrame, it sleeps and then pushes an + InterruptionFrame to simulate what happens when interruption occurs + while a terminal frame is in the queue. + """ + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, TextFrame): + # Delay to allow EndFrame to be queued + await asyncio.sleep(0.1) + # Push interruption - this should NOT discard the EndFrame + await self.push_frame(InterruptionFrame(), direction) + await self.push_frame(frame, direction) + + class CaptureFrameProcessor(FrameProcessor): + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + received_frames.append(frame) + await self.push_frame(frame, direction) + + pipeline = Pipeline([DelayAndInterruptProcessor(), CaptureFrameProcessor()]) + + frames_to_send = [ + TextFrame(text="trigger"), + ] + expected_down_frames = [ + InterruptionFrame, + TextFrame, + ] + await run_test( + pipeline, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + ) + + # Verify EndFrame was received by our capture processor (survived interruption) + # Note: run_test filters EndFrame from expected_down_frames when send_end_frame=True, + # but our capture processor sees it before that filtering. + end_frames = [f for f in received_frames if isinstance(f, EndFrame)] + self.assertEqual(len(end_frames), 1, "EndFrame should survive interruption") + + async def test_stop_frame_survives_interruption(self): + """Test that StopFrame survives interruption (it is uninterruptible). + + Similar to test_terminal_frames_survive_interruption but specifically + for StopFrame. + """ + received_frames: List[Frame] = [] + + class DelayAndInterruptProcessor(FrameProcessor): + """This processor delays processing and then generates an interruption.""" + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, TextFrame): + # Delay to allow StopFrame to be queued + await asyncio.sleep(0.1) + # Push interruption - this should NOT discard the StopFrame + await self.push_frame(InterruptionFrame(), direction) + await self.push_frame(frame, direction) + + class CaptureFrameProcessor(FrameProcessor): + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + received_frames.append(frame) + await self.push_frame(frame, direction) + + pipeline = Pipeline([DelayAndInterruptProcessor(), CaptureFrameProcessor()]) + + frames_to_send = [ + TextFrame(text="trigger"), + StopFrame(), + ] + expected_down_frames = [ + InterruptionFrame, + TextFrame, + StopFrame, + ] + await run_test( + pipeline, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + send_end_frame=False, + ) + + # Verify StopFrame was received (survived interruption) + stop_frames = [f for f in received_frames if isinstance(f, StopFrame)] + self.assertEqual(len(stop_frames), 1, "StopFrame should survive interruption") + if __name__ == "__main__": unittest.main()