From db5bcfaa51186e86daf04b38f21846d0e495a9dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 16 Sep 2025 14:47:12 -0700 Subject: [PATCH] FrameProcessor: fix push_interruption_task_frame_and_wait() --- CHANGELOG.md | 5 ++++- src/pipecat/processors/frame_processor.py | 9 ++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 82a5feb0b..b3404d1ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,7 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed -- Update `livekit` to 1.0.13. +- Updated `livekit` to 1.0.13. - `torch` and `torchaudio` are no longer required for running Smart Turn locally. This avoids gigabytes of dependencies being installed. @@ -44,6 +44,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed an issue that would cause `push_interruption_task_frame_and_wait()` to + not wait if a previous interruption had already happened. + - Fixed a couple of bugs in `ServiceSwitcher`: - Using multiple `ServiceSwitcher`s in a pipeline would result in an error. - `ServiceSwitcherFrame`s (such as `ManuallySwitchServiceFrame`s) were having diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 76400fc22..85ce18548 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -220,6 +220,11 @@ class FrameProcessor(BaseObject): self.__process_event: Optional[asyncio.Event] = None self.__process_frame_task: Optional[asyncio.Task] = None + # To interrupt a pipeline, we push an `InterruptionTaskFrame` upstream. + # Then we wait for the corresponding `InterruptionFrame` to travel from + # the start of the pipeline back to the processor that sent the + # `InterruptionTaskFrame`. This wait is handled using the following + # event. self._wait_for_interruption = False self._wait_interruption_event = asyncio.Event() @@ -632,7 +637,9 @@ class FrameProcessor(BaseObject): await self.__internal_push_frame(frame, direction) - if isinstance(frame, InterruptionFrame): + # If we are waiting for an interruption and we get an interruption, then + # we can unblock `push_interruption_task_frame_and_wait()`. + if self._wait_for_interruption and isinstance(frame, InterruptionFrame): self._wait_interruption_event.set() async def push_interruption_task_frame_and_wait(self):