From 7c15a8c800c63349812c91aa18dcffcb3badb76b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 7 Oct 2025 17:42:35 -0700 Subject: [PATCH 1/2] Revert "fix context order when using interruption strategies" This reverts commit de8ee9692737225c9dbadb31201bf7d197587eda. --- src/pipecat/frames/frames.py | 23 +++++------------------ src/pipecat/pipeline/task.py | 16 ++-------------- src/pipecat/processors/frame_processor.py | 16 +++++++--------- tests/test_context_aggregators.py | 3 --- tests/test_dtmf_aggregator.py | 11 +---------- tests/test_frame_processor.py | 2 -- 6 files changed, 15 insertions(+), 56 deletions(-) diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index a871fe74b..2a9651b83 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -849,15 +849,13 @@ class FrameProcessorResumeUrgentFrame(SystemFrame): class InterruptionFrame(SystemFrame): """Frame indicating user started speaking (interruption detected). - Usually emitted by the pipeline task to indicate that all frame processors - should be interrrupted. - - Parameters: - pushed_by_task: Whether this interruption was pushed from the pipeline - task. + Emitted by the BaseInputTransport to indicate that a user has started + speaking (i.e. is interrupting). This is similar to + UserStartedSpeakingFrame except that it should be pushed concurrently + with other frames (so the order is not guaranteed). """ - pushed_by_task: bool = False + pass @dataclass @@ -888,17 +886,6 @@ class StartInterruptionFrame(InterruptionFrame): ) -@dataclass -class InterruptionCompletedFrame(SystemFrame): - """Frame indicating that the whole pipeline has been interrupted. - - This is emitted by the pipeline task when an InterruptionFrame has made it - all the way to the end of pipeline, interrupting all the frame processors. - """ - - pass - - @dataclass class UserStartedSpeakingFrame(SystemFrame): """Frame indicating user has started speaking. diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index d040beac1..9ce3baf7f 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -30,7 +30,6 @@ from pipecat.frames.frames import ( ErrorFrame, Frame, HeartbeatFrame, - InterruptionCompletedFrame, InterruptionFrame, InterruptionTaskFrame, MetricsFrame, @@ -692,11 +691,8 @@ class PipelineTask(BasePipelineTask): # bypassing the push queue and directly queue into the # pipeline. This is in case the push task is blocked waiting for a # pipeline-ending frame to finish traversing the pipeline. - interruption_frame = InterruptionFrame(pushed_by_task=True) - logger.debug( - f"{self}: received interruption task frame {frame}, pushing {interruption_frame}" - ) - await self._pipeline.queue_frame(interruption_frame) + logger.debug(f"{self}: received interruption task frame {frame}") + await self._pipeline.queue_frame(InterruptionFrame()) elif isinstance(frame, ErrorFrame): if frame.fatal: logger.error(f"A fatal error occurred: {frame}") @@ -740,14 +736,6 @@ class PipelineTask(BasePipelineTask): self._pipeline_end_event.set() elif isinstance(frame, CancelFrame): self._pipeline_end_event.set() - elif isinstance(frame, InterruptionFrame) and frame.pushed_by_task: - # If an interruption frame made it all the way to the end of the - # pipeline, send an InterruptionCompleteFrame. Note that we are - # bypassing the push queue and directly queue into the - # pipeline. This is in case the push task is blocked waiting for a - # pipeline-ending frame to finish traversing the pipeline. - logger.debug(f"{self}: interruption completed with {frame}") - await self._pipeline.queue_frame(InterruptionCompletedFrame()) elif isinstance(frame, HeartbeatFrame): await self._heartbeat_queue.put(frame) diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 25dc2716e..0b55082aa 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -28,7 +28,6 @@ from pipecat.frames.frames import ( FrameProcessorPauseUrgentFrame, FrameProcessorResumeFrame, FrameProcessorResumeUrgentFrame, - InterruptionCompletedFrame, InterruptionFrame, InterruptionTaskFrame, StartFrame, @@ -572,9 +571,7 @@ class FrameProcessor(BaseObject): # frames and we will process the frame right away. This is because a # previous system frame might be waiting for the interruption frame and # it's blocking the input task. - if self._wait_for_interruption and isinstance( - frame, (InterruptionFrame, InterruptionCompletedFrame) - ): + if self._wait_for_interruption and isinstance(frame, InterruptionFrame): await self.__process_frame(frame, direction, callback) return @@ -664,17 +661,18 @@ class FrameProcessor(BaseObject): await self._call_event_handler("on_after_push_frame", frame) - # If we are waiting for an interruption and one completed, then we can - # unblock `push_interruption_task_frame_and_wait()`. - if self._wait_for_interruption and isinstance(frame, InterruptionCompletedFrame): + # If we are waiting for an interruption and we get an interruption, then + # we can unblock `push_interruption_task_frame_and_wait()`. + if self._wait_for_interruption and isinstance(frame, InterruptionFrame): self._wait_interruption_event.set() async def push_interruption_task_frame_and_wait(self): """Push an interruption task frame upstream and wait for the interruption. This function sends an `InterruptionTaskFrame` upstream to the pipeline - task and waits to receive an `InterruptionCompletedFrame`. This - guarantees the whole pipeline has been interrupted. + task and waits to receive the corresponding `InterruptionFrame`. When + the function finishes it is guaranteed that the `InterruptionFrame` has + been pushed downstream. """ self._wait_for_interruption = True diff --git a/tests/test_context_aggregators.py b/tests/test_context_aggregators.py index accbfd669..77b6acc87 100644 --- a/tests/test_context_aggregators.py +++ b/tests/test_context_aggregators.py @@ -5,7 +5,6 @@ # import json -import sys import unittest from typing import Any @@ -21,7 +20,6 @@ from pipecat.frames.frames import ( FunctionCallResultFrame, FunctionCallResultProperties, InterimTranscriptionFrame, - InterruptionCompletedFrame, InterruptionFrame, InterruptionTaskFrame, LLMFullResponseEndFrame, @@ -570,7 +568,6 @@ class BaseTestUserContextAggregator: BotStartedSpeakingFrame, UserStartedSpeakingFrame, InterruptionFrame, - InterruptionCompletedFrame, UserStoppedSpeakingFrame, *self.EXPECTED_CONTEXT_FRAMES, ] diff --git a/tests/test_dtmf_aggregator.py b/tests/test_dtmf_aggregator.py index 18082a992..c7590ae47 100644 --- a/tests/test_dtmf_aggregator.py +++ b/tests/test_dtmf_aggregator.py @@ -10,7 +10,6 @@ from pipecat.audio.dtmf.types import KeypadEntry from pipecat.frames.frames import ( EndFrame, InputDTMFFrame, - InterruptionCompletedFrame, InterruptionFrame, TranscriptionFrame, ) @@ -31,7 +30,6 @@ class TestDTMFAggregator(unittest.IsolatedAsyncioTestCase): expected_down_frames = [ InputDTMFFrame, InterruptionFrame, - InterruptionCompletedFrame, InputDTMFFrame, InputDTMFFrame, InputDTMFFrame, @@ -64,12 +62,10 @@ class TestDTMFAggregator(unittest.IsolatedAsyncioTestCase): expected_down_frames = [ InputDTMFFrame, InterruptionFrame, - InterruptionCompletedFrame, InputDTMFFrame, TranscriptionFrame, # First aggregation "12" InputDTMFFrame, InterruptionFrame, - InterruptionCompletedFrame, TranscriptionFrame, # Second aggregation "3" ] @@ -102,13 +98,11 @@ class TestDTMFAggregator(unittest.IsolatedAsyncioTestCase): expected_down_frames = [ InputDTMFFrame, InterruptionFrame, - InterruptionCompletedFrame, InputDTMFFrame, InputDTMFFrame, TranscriptionFrame, # "12#" InputDTMFFrame, InterruptionFrame, - InterruptionCompletedFrame, InputDTMFFrame, TranscriptionFrame, # "45" ] @@ -138,7 +132,6 @@ class TestDTMFAggregator(unittest.IsolatedAsyncioTestCase): expected_down_frames = [ InputDTMFFrame, InterruptionFrame, - InterruptionCompletedFrame, InputDTMFFrame, TranscriptionFrame, # Should flush before EndFrame EndFrame, @@ -167,7 +160,6 @@ class TestDTMFAggregator(unittest.IsolatedAsyncioTestCase): expected_down_frames = [ InputDTMFFrame, InterruptionFrame, - InterruptionCompletedFrame, InputDTMFFrame, TranscriptionFrame, ] @@ -195,7 +187,6 @@ class TestDTMFAggregator(unittest.IsolatedAsyncioTestCase): expected_down_frames = [ InputDTMFFrame, InterruptionFrame, - InterruptionCompletedFrame, InputDTMFFrame, InputDTMFFrame, TranscriptionFrame, @@ -233,7 +224,7 @@ class TestDTMFAggregator(unittest.IsolatedAsyncioTestCase): # All the InputDTMFFrames plus one TranscriptionFrame expected_down_frames = ( - [InputDTMFFrame, InterruptionFrame, InterruptionCompletedFrame] + [InputDTMFFrame, InterruptionFrame] + [InputDTMFFrame] * (len(frames_to_send) - 1) + [TranscriptionFrame] ) diff --git a/tests/test_frame_processor.py b/tests/test_frame_processor.py index 5c1ee1b38..d0072e5fb 100644 --- a/tests/test_frame_processor.py +++ b/tests/test_frame_processor.py @@ -10,7 +10,6 @@ import unittest from pipecat.frames.frames import ( EndFrame, Frame, - InterruptionCompletedFrame, InterruptionFrame, OutputTransportMessageUrgentFrame, TextFrame, @@ -102,7 +101,6 @@ class TestFrameProcessor(unittest.IsolatedAsyncioTestCase): expected_down_frames = [ InterruptionFrame, InterruptionFrame, - InterruptionCompletedFrame, OutputTransportMessageUrgentFrame, EndFrame, ] From 0669daec3dbae394e5466b039abd0d44fde1091e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 7 Oct 2025 17:44:10 -0700 Subject: [PATCH 2/2] update CHANGELOG for 0.0.89 --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index eed99d367..926cff71d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,13 @@ All notable changes to **Pipecat** will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.0.89] - 2025-10-07 + +### Fixed + +- Reverted a change that was causing pipelines to be frozen when using + interruption strategies. + ## [0.0.88] - 2025-10-07 ### Added