From 42344125b1cc6dfe7cc5c42e2d5200d77d3d288a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 16 Sep 2025 18:08:05 -0700 Subject: [PATCH] tests: add unit tests for push_interruption_task_frame_and_wait() --- tests/test_frame_processor.py | 67 +++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 tests/test_frame_processor.py diff --git a/tests/test_frame_processor.py b/tests/test_frame_processor.py new file mode 100644 index 000000000..7cbe258f8 --- /dev/null +++ b/tests/test_frame_processor.py @@ -0,0 +1,67 @@ +# +# Copyright (c) 2024-2025 Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import unittest + +from pipecat.frames.frames import ( + EndFrame, + Frame, + InterruptionFrame, + TextFrame, + TransportMessageUrgentFrame, +) +from pipecat.pipeline.pipeline import Pipeline +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.tests.utils import SleepFrame, run_test + + +class TestFrameProcessor(unittest.IsolatedAsyncioTestCase): + async def test_interruption_and_wait(self): + class DelayFrameProcessor(FrameProcessor): + """This processors just gives time to the event loop to change + between tasks. Otherwise things happen to fast.""" + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + await asyncio.sleep(0.1) + await self.push_frame(frame, direction) + + class InterruptFrameProcessor(FrameProcessor): + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, TextFrame): + await self.push_interruption_task_frame_and_wait() + await self.push_frame(TransportMessageUrgentFrame(message=frame.text)) + else: + await self.push_frame(frame, direction) + + pipeline = Pipeline([DelayFrameProcessor(), InterruptFrameProcessor()]) + + frames_to_send = [ + # Just a random interruption to make sure we don't clear anything + # before the actual `InterruptionTaskFrame` interruption. + InterruptionFrame(), + # This will generate an `InterruptionTaskFrame` and will wait for an + # `InterruptionFrame`. + TextFrame(text="Hello from Pipecat!"), + # Just give time for everything to complete. + SleepFrame(sleep=0.5), + EndFrame(), + ] + expected_down_frames = [ + InterruptionFrame, + InterruptionFrame, + TransportMessageUrgentFrame, + EndFrame, + ] + await run_test( + pipeline, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + send_end_frame=False, + )