From 2345090b10541adbb52c4395b336ccdfed1fcec2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Thu, 5 Feb 2026 22:14:50 -0800 Subject: [PATCH 1/5] Attach asyncio.Event to InterruptionFrame for completion signaling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move the interruption wait event from per-processor instance state to the frame itself. The event is created in push_interruption_task_frame_and_wait(), threaded through InterruptionTaskFrame → InterruptionFrame, and set when the frame reaches the pipeline sink. This scopes the event to each interruption flow rather than sharing mutable state on the processor. Also adds a 2s timeout warning to help diagnose cases where InterruptionFrame.complete() is never called. --- src/pipecat/frames/frames.py | 43 +++++++++---- src/pipecat/pipeline/task.py | 4 +- .../aggregators/llm_response_universal.py | 6 ++ src/pipecat/processors/frame_processor.py | 60 ++++++++++--------- 4 files changed, 73 insertions(+), 40 deletions(-) diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index b0e196a22..b7be8045e 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -11,6 +11,7 @@ including data frames, system frames, and control frames for audio, video, text, and LLM processing. """ +import asyncio from dataclasses import dataclass, field from enum import Enum from typing import ( @@ -1118,15 +1119,29 @@ class FrameProcessorResumeUrgentFrame(SystemFrame): @dataclass class InterruptionFrame(SystemFrame): - """Frame indicating user started speaking (interruption detected). + """Frame pushed to interrupt the pipeline. + + This frame is used to interrupt the pipeline. For example, when a user + starts speaking to cancel any in-progress bot output. It can also be pushed + by any processor. + + Parameters: + event: Optional event set when the frame has fully traversed the + pipeline. - Emitted by the BaseInputTransport to indicate that a user has started - speaking (i.e. is interrupting). This is similar to - UserStartedSpeakingFrame except that it should be pushed concurrently - with other frames (so the order is not guaranteed). """ - pass + event: Optional[asyncio.Event] = None + + def complete(self): + """Signal that this interruption has been fully processed. + + Called automatically when the frame reaches the pipeline sink, or + manually when the frame is consumed before reaching it (e.g. when + the user is muted). + """ + if self.event: + self.event.set() @dataclass @@ -1706,15 +1721,19 @@ class StopTaskFrame(TaskFrame): @dataclass class InterruptionTaskFrame(TaskFrame): - """Frame indicating the bot should be interrupted. + """Frame indicating the pipeline should be interrupted. + + This frame should be pushed upstream to indicate the pipeline should be + interrupted. The pipeline task converts this into an `InterruptionFrame` and + sends it downstream. The `event` is passed to the `InterruptionFrame` so it + can signal when the interruption has fully traversed the pipeline. + + Parameters: + event: Optional event passed to the corresponding `InterruptionFrame`. - Emitted when the bot should be interrupted. This will mainly cause the - same actions as if the user interrupted except that the - UserStartedSpeakingFrame and UserStoppedSpeakingFrame won't be generated. - This frame should be pushed upstream. """ - pass + event: Optional[asyncio.Event] = None @dataclass diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index e643164eb..8bda19b18 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -864,7 +864,7 @@ class PipelineTask(BasePipelineTask): # pipeline. This is in case the push task is blocked waiting for a # pipeline-ending frame to finish traversing the pipeline. logger.debug(f"{self}: received interruption task frame {frame}") - await self._pipeline.queue_frame(InterruptionFrame()) + await self._pipeline.queue_frame(InterruptionFrame(event=frame.event)) elif isinstance(frame, ErrorFrame): await self._call_event_handler("on_pipeline_error", frame) if frame.fatal: @@ -903,6 +903,8 @@ class PipelineTask(BasePipelineTask): self._pipeline_end_event.set() elif isinstance(frame, CancelFrame): self._pipeline_end_event.set() + elif isinstance(frame, InterruptionFrame): + frame.complete() elif isinstance(frame, HeartbeatFrame): await self._heartbeat_queue.put(frame) diff --git a/src/pipecat/processors/aggregators/llm_response_universal.py b/src/pipecat/processors/aggregators/llm_response_universal.py index db28b85c1..cf65c6443 100644 --- a/src/pipecat/processors/aggregators/llm_response_universal.py +++ b/src/pipecat/processors/aggregators/llm_response_universal.py @@ -552,6 +552,12 @@ class LLMUserAggregator(LLMContextAggregator): if should_mute_frame: logger.trace(f"{frame.name} suppressed - user currently muted") + # When muted, the InterruptionFrame won't propagate further and + # will never reach the pipeline sink. Complete it here so + # push_interruption_task_frame_and_wait() doesn't hang. + if should_mute_frame and isinstance(frame, InterruptionFrame): + frame.complete() + should_mute_next_time = False for s in self._params.user_mute_strategies: should_mute_next_time |= await s.process_frame(frame) diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 740c19ae6..f0c9e7183 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -52,6 +52,8 @@ from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMet from pipecat.utils.asyncio.task_manager import BaseTaskManager from pipecat.utils.base_object import BaseObject +INTERRUPTION_COMPLETION_TIMEOUT = 2.0 + class FrameDirection(Enum): """Direction of frame flow in the processing pipeline. @@ -240,13 +242,9 @@ class FrameProcessor(BaseObject): self.__process_frame_task: Optional[asyncio.Task] = None self.__process_current_frame: Optional[Frame] = None - # To interrupt a pipeline, we push an `InterruptionTaskFrame` upstream. - # Then we wait for the corresponding `InterruptionFrame` to travel from - # the start of the pipeline back to the processor that sent the - # `InterruptionTaskFrame`. This wait is handled using the following - # event. + # Set while awaiting push_interruption_task_frame_and_wait() so that + # _start_interruption() knows not to cancel the process task. self._wait_for_interruption = False - self._wait_interruption_event = asyncio.Event() # Frame processor events. self._register_event_handler("on_before_process_frame", sync=True) @@ -600,10 +598,11 @@ class FrameProcessor(BaseObject): if self._cancelling: return - # If we are waiting for an interruption we will bypass all queued system - # frames and we will process the frame right away. This is because a - # previous system frame might be waiting for the interruption frame and - # it's blocking the input task. + # If we are waiting for an interruption, bypass all queued system frames + # and process the frame right away. This is because a previous system + # frame might be waiting for the interruption frame blocking the input + # task, so this InterruptionFrame would never be dequeued and we'd + # deadlock. if self._wait_for_interruption and isinstance(frame, InterruptionFrame): await self.__process_frame(frame, direction, callback) return @@ -742,31 +741,38 @@ class FrameProcessor(BaseObject): await self._call_event_handler("on_after_push_frame", frame) - # If we are waiting for an interruption and we get an interruption, then - # we can unblock `push_interruption_task_frame_and_wait()`. - if self._wait_for_interruption and isinstance(frame, InterruptionFrame): - self._wait_interruption_event.set() - async def push_interruption_task_frame_and_wait(self): """Push an interruption task frame upstream and wait for the interruption. - This function sends an `InterruptionTaskFrame` upstream to the pipeline - task and waits to receive the corresponding `InterruptionFrame`. When - the function finishes it is guaranteed that the `InterruptionFrame` has - been pushed downstream. + This function sends an `InterruptionTaskFrame` upstream to the + pipeline task. The task creates a corresponding `InterruptionFrame` + and sends it downstream through the pipeline. An `asyncio.Event` is + attached to both frames so the caller can wait until the interruption + has fully traversed the pipeline. The event is set when the + `InterruptionFrame` reaches the pipeline sink. If the frame does + not complete within `INTERRUPTION_COMPLETION_TIMEOUT` seconds, a + warning is logged periodically until it completes. + """ self._wait_for_interruption = True - await self.push_frame(InterruptionTaskFrame(), FrameDirection.UPSTREAM) + event = asyncio.Event() - # Wait for an `InterruptionFrame` to come to this processor and be - # pushed. Take a look at `push_frame()` to see how we first push the - # `InterruptionFrame` and then we set the event in order to maintain - # frame ordering. - await self._wait_interruption_event.wait() + await self.push_frame(InterruptionTaskFrame(event=event), FrameDirection.UPSTREAM) - # Clean the event. - self._wait_interruption_event.clear() + # Wait for the `InterruptionFrame` to complete and log a warning + # periodically if it takes too long. + while not event.is_set(): + try: + await asyncio.wait_for(event.wait(), timeout=INTERRUPTION_COMPLETION_TIMEOUT) + except asyncio.TimeoutError: + logger.warning( + f"{self}: InterruptionFrame has not completed after" + f" {INTERRUPTION_COMPLETION_TIMEOUT}s. Make sure" + " InterruptionFrame.complete() is being called (e.g. if the" + " frame is being blocked or consumed before reaching the" + " pipeline sink)." + ) self._wait_for_interruption = False From a352b2d7a01cfebc8bc610b8210ef73f63be02f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Thu, 5 Feb 2026 22:54:36 -0800 Subject: [PATCH 2/5] Add tests for InterruptionFrame completion event Add tests for the event-based interruption completion: complete() sets the event, complete() is safe without an event, the event fires at the pipeline sink, and a warning is logged when the frame is blocked. Also remove the unconditional await after the timeout so the function returns instead of hanging when complete() is never called. --- tests/test_frame_processor.py | 111 +++++++++++++++++++++++++++++++++- 1 file changed, 110 insertions(+), 1 deletion(-) diff --git a/tests/test_frame_processor.py b/tests/test_frame_processor.py index 3a47520b3..2ce4b7880 100644 --- a/tests/test_frame_processor.py +++ b/tests/test_frame_processor.py @@ -9,6 +9,8 @@ import unittest from dataclasses import dataclass, field from typing import List +from loguru import logger + from pipecat.frames.frames import ( DataFrame, EndFrame, @@ -22,7 +24,11 @@ from pipecat.frames.frames import ( ) from pipecat.pipeline.pipeline import Pipeline from pipecat.processors.filters.identity_filter import IdentityFilter -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.processors.frame_processor import ( + INTERRUPTION_COMPLETION_TIMEOUT, + FrameDirection, + FrameProcessor, +) from pipecat.tests.utils import SleepFrame, run_test @@ -449,6 +455,109 @@ class TestFrameProcessor(unittest.IsolatedAsyncioTestCase): stop_frames = [f for f in received_frames if isinstance(f, StopFrame)] self.assertEqual(len(stop_frames), 1, "StopFrame should survive interruption") + async def test_interruption_frame_complete_sets_event(self): + """Test that InterruptionFrame.complete() sets the event.""" + event = asyncio.Event() + frame = InterruptionFrame(event=event) + self.assertFalse(event.is_set()) + frame.complete() + self.assertTrue(event.is_set()) + + async def test_interruption_frame_complete_without_event(self): + """Test that InterruptionFrame.complete() is safe without an event.""" + frame = InterruptionFrame() + frame.complete() # Should not raise + + async def test_interruption_event_set_at_pipeline_sink(self): + """Test that the event from push_interruption_task_frame_and_wait() + is set when the InterruptionFrame reaches the pipeline sink.""" + event_was_set = False + + class InterruptOnTextProcessor(FrameProcessor): + async def process_frame(self, frame: Frame, direction: FrameDirection): + nonlocal event_was_set + + await super().process_frame(frame, direction) + if isinstance(frame, TextFrame): + await self.push_interruption_task_frame_and_wait() + + event_was_set = True + await self.push_frame(OutputTransportMessageUrgentFrame(message="done")) + else: + await self.push_frame(frame, direction) + + pipeline = Pipeline([InterruptOnTextProcessor()]) + + frames_to_send = [ + TextFrame(text="trigger"), + ] + expected_down_frames = [ + InterruptionFrame, + OutputTransportMessageUrgentFrame, + ] + await run_test( + pipeline, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + ) + self.assertTrue(event_was_set, "Event should be set after InterruptionFrame completes") + + async def test_interruption_completion_timeout_warning(self): + """Test that a warning is logged when an InterruptionFrame is blocked + and never reaches the pipeline sink.""" + warnings = [] + handler_id = logger.add( + lambda msg: warnings.append(str(msg)), level="WARNING", format="{message}" + ) + + try: + + class BlockInterruptionProcessor(FrameProcessor): + """Blocks InterruptionFrames, completing them after a delay.""" + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, InterruptionFrame): + # Complete after the timeout so the warning fires + # but the test doesn't hang. + async def delayed_complete(): + await asyncio.sleep(INTERRUPTION_COMPLETION_TIMEOUT + 1.0) + frame.complete() + + asyncio.create_task(delayed_complete()) + return + await self.push_frame(frame, direction) + + class InterruptOnTextProcessor(FrameProcessor): + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, TextFrame): + await self.push_interruption_task_frame_and_wait() + await self.push_frame(OutputTransportMessageUrgentFrame(message="done")) + else: + await self.push_frame(frame, direction) + + pipeline = Pipeline([BlockInterruptionProcessor(), InterruptOnTextProcessor()]) + + frames_to_send = [ + TextFrame(text="trigger"), + ] + expected_down_frames = [ + OutputTransportMessageUrgentFrame, + ] + await run_test( + pipeline, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + ) + finally: + logger.remove(handler_id) + + self.assertTrue( + any("InterruptionFrame has not completed" in w for w in warnings), + "Expected a timeout warning about InterruptionFrame not completing", + ) + if __name__ == "__main__": unittest.main() From d5105a78e6994c94f74af4ee488c8d58713b90cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 6 Feb 2026 09:45:20 -0800 Subject: [PATCH 3/5] STTMuteFilter should call frame.complete() when InterruptionFrame is blocked --- .../processors/filters/stt_mute_filter.py | 6 ++++ tests/test_stt_mute_filter.py | 29 +++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/src/pipecat/processors/filters/stt_mute_filter.py b/src/pipecat/processors/filters/stt_mute_filter.py index 9f522a20d..f5d008e28 100644 --- a/src/pipecat/processors/filters/stt_mute_filter.py +++ b/src/pipecat/processors/filters/stt_mute_filter.py @@ -234,6 +234,12 @@ class STTMuteFilter(FrameProcessor): await self.push_frame(frame, direction) else: logger.trace(f"{frame.__class__.__name__} suppressed - STT currently muted") + + # When muted, the InterruptionFrame won't propagate further + # and will never reach the pipeline sink. Complete it here so + # push_interruption_task_frame_and_wait() doesn't hang. + if isinstance(frame, InterruptionFrame): + frame.complete() else: # Pass all other frames through await self.push_frame(frame, direction) diff --git a/tests/test_stt_mute_filter.py b/tests/test_stt_mute_filter.py index 5682aaf22..adf4611df 100644 --- a/tests/test_stt_mute_filter.py +++ b/tests/test_stt_mute_filter.py @@ -4,6 +4,7 @@ # SPDX-License-Identifier: BSD 2-Clause License # +import asyncio import unittest from pipecat.frames.frames import ( @@ -14,6 +15,7 @@ from pipecat.frames.frames import ( FunctionCallsStartedFrame, InputAudioRawFrame, InterimTranscriptionFrame, + InterruptionFrame, TranscriptionFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame, @@ -327,6 +329,33 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase): expected_down_frames=expected_returned_frames, ) + async def test_interruption_frame_completed_when_muted(self): + """Test that InterruptionFrame.complete() is called when the frame is + suppressed due to muting, so push_interruption_task_frame_and_wait() + doesn't hang.""" + filter = STTMuteFilter(config=STTMuteConfig(strategies={STTMuteStrategy.ALWAYS})) + + event = asyncio.Event() + + frames_to_send = [ + BotStartedSpeakingFrame(), + InterruptionFrame(event=event), + BotStoppedSpeakingFrame(), + ] + + expected_returned_frames = [ + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, + ] + + await run_test( + filter, + frames_to_send=frames_to_send, + expected_down_frames=expected_returned_frames, + ) + + self.assertTrue(event.is_set(), "InterruptionFrame.complete() should be called when muted") + if __name__ == "__main__": unittest.main() From b9e79bd06afec69958b4166980b335fd31e3d980 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 6 Feb 2026 10:00:15 -0800 Subject: [PATCH 4/5] CLAUDE.md: explain about InterruptionFrame.complete() --- CLAUDE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CLAUDE.md b/CLAUDE.md index 1d745755b..162615ab2 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -68,7 +68,7 @@ Transport Input → Pipeline Source → [Processor1] → [Processor2] → ... - **User turn strategies**: Detection of when the user starts and stops speaking is done via user turn start/stop strategies. They push `UserStartedSpeakingFrame` and `UserStoppedSpeakingFrame` respectively. -- **Interruptions**: Interruptions are usually triggered by a user turn start strategy (e.g. `VADUserTurnStartStrategy`) but they can be triggered by other processors as well, in which case the user turn start strategies don't need to. +- **Interruptions**: Interruptions are usually triggered by a user turn start strategy (e.g. `VADUserTurnStartStrategy`) but they can be triggered by other processors as well, in which case the user turn start strategies don't need to. An `InterruptionFrame` carries an optional `asyncio.Event` that is set when the frame reaches the pipeline sink. If a processor stops an `InterruptionFrame` from propagating downstream (i.e., doesn't push it), it **must** call `frame.complete()` to avoid stalling `push_interruption_task_frame_and_wait()` callers. - **Uninterruptible Frames**: These are frames that will not be removed from internal queues even if there's an interruption. For example, `EndFrame` and `StopFrame`. From 5b67e76de7711d2bc7d1d5e5634a019b12f3d86c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Thu, 5 Feb 2026 22:17:31 -0800 Subject: [PATCH 5/5] Add changelog for PR #3660 --- changelog/3660.changed.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog/3660.changed.md diff --git a/changelog/3660.changed.md b/changelog/3660.changed.md new file mode 100644 index 000000000..e5077b73b --- /dev/null +++ b/changelog/3660.changed.md @@ -0,0 +1 @@ +- Moved interruption wait event from per-processor instance state to `InterruptionFrame` itself. Added `InterruptionFrame.complete()` to signal when the interruption has fully traversed the pipeline. Custom processors that block or consume an `InterruptionFrame` before it reaches the pipeline sink must call `frame.complete()` to avoid stalling `push_interruption_task_frame_and_wait()`. A warning is logged if completion does not happen within 2 seconds.