Merge pull request #2809 from pipecat-ai/aleix/revert-interruption-strategies-ordering

revert interruption strategies ordering
This commit is contained in:
Aleix Conchillo Flaqué
2025-10-07 18:07:07 -07:00
committed by GitHub
7 changed files with 22 additions and 56 deletions

View File

@@ -5,6 +5,13 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.0.89] - 2025-10-07
### Fixed
- Reverted a change that was causing pipelines to be frozen when using
interruption strategies.
## [0.0.88] - 2025-10-07
### Added

View File

@@ -849,15 +849,13 @@ class FrameProcessorResumeUrgentFrame(SystemFrame):
class InterruptionFrame(SystemFrame):
"""Frame indicating user started speaking (interruption detected).
Usually emitted by the pipeline task to indicate that all frame processors
should be interrrupted.
Parameters:
pushed_by_task: Whether this interruption was pushed from the pipeline
task.
Emitted by the BaseInputTransport to indicate that a user has started
speaking (i.e. is interrupting). This is similar to
UserStartedSpeakingFrame except that it should be pushed concurrently
with other frames (so the order is not guaranteed).
"""
pushed_by_task: bool = False
pass
@dataclass
@@ -888,17 +886,6 @@ class StartInterruptionFrame(InterruptionFrame):
)
@dataclass
class InterruptionCompletedFrame(SystemFrame):
"""Frame indicating that the whole pipeline has been interrupted.
This is emitted by the pipeline task when an InterruptionFrame has made it
all the way to the end of pipeline, interrupting all the frame processors.
"""
pass
@dataclass
class UserStartedSpeakingFrame(SystemFrame):
"""Frame indicating user has started speaking.

View File

@@ -30,7 +30,6 @@ from pipecat.frames.frames import (
ErrorFrame,
Frame,
HeartbeatFrame,
InterruptionCompletedFrame,
InterruptionFrame,
InterruptionTaskFrame,
MetricsFrame,
@@ -692,11 +691,8 @@ class PipelineTask(BasePipelineTask):
# bypassing the push queue and directly queue into the
# pipeline. This is in case the push task is blocked waiting for a
# pipeline-ending frame to finish traversing the pipeline.
interruption_frame = InterruptionFrame(pushed_by_task=True)
logger.debug(
f"{self}: received interruption task frame {frame}, pushing {interruption_frame}"
)
await self._pipeline.queue_frame(interruption_frame)
logger.debug(f"{self}: received interruption task frame {frame}")
await self._pipeline.queue_frame(InterruptionFrame())
elif isinstance(frame, ErrorFrame):
if frame.fatal:
logger.error(f"A fatal error occurred: {frame}")
@@ -740,14 +736,6 @@ class PipelineTask(BasePipelineTask):
self._pipeline_end_event.set()
elif isinstance(frame, CancelFrame):
self._pipeline_end_event.set()
elif isinstance(frame, InterruptionFrame) and frame.pushed_by_task:
# If an interruption frame made it all the way to the end of the
# pipeline, send an InterruptionCompleteFrame. Note that we are
# bypassing the push queue and directly queue into the
# pipeline. This is in case the push task is blocked waiting for a
# pipeline-ending frame to finish traversing the pipeline.
logger.debug(f"{self}: interruption completed with {frame}")
await self._pipeline.queue_frame(InterruptionCompletedFrame())
elif isinstance(frame, HeartbeatFrame):
await self._heartbeat_queue.put(frame)

View File

@@ -28,7 +28,6 @@ from pipecat.frames.frames import (
FrameProcessorPauseUrgentFrame,
FrameProcessorResumeFrame,
FrameProcessorResumeUrgentFrame,
InterruptionCompletedFrame,
InterruptionFrame,
InterruptionTaskFrame,
StartFrame,
@@ -572,9 +571,7 @@ class FrameProcessor(BaseObject):
# frames and we will process the frame right away. This is because a
# previous system frame might be waiting for the interruption frame and
# it's blocking the input task.
if self._wait_for_interruption and isinstance(
frame, (InterruptionFrame, InterruptionCompletedFrame)
):
if self._wait_for_interruption and isinstance(frame, InterruptionFrame):
await self.__process_frame(frame, direction, callback)
return
@@ -664,17 +661,18 @@ class FrameProcessor(BaseObject):
await self._call_event_handler("on_after_push_frame", frame)
# If we are waiting for an interruption and one completed, then we can
# unblock `push_interruption_task_frame_and_wait()`.
if self._wait_for_interruption and isinstance(frame, InterruptionCompletedFrame):
# If we are waiting for an interruption and we get an interruption, then
# we can unblock `push_interruption_task_frame_and_wait()`.
if self._wait_for_interruption and isinstance(frame, InterruptionFrame):
self._wait_interruption_event.set()
async def push_interruption_task_frame_and_wait(self):
"""Push an interruption task frame upstream and wait for the interruption.
This function sends an `InterruptionTaskFrame` upstream to the pipeline
task and waits to receive an `InterruptionCompletedFrame`. This
guarantees the whole pipeline has been interrupted.
task and waits to receive the corresponding `InterruptionFrame`. When
the function finishes it is guaranteed that the `InterruptionFrame` has
been pushed downstream.
"""
self._wait_for_interruption = True

View File

@@ -5,7 +5,6 @@
#
import json
import sys
import unittest
from typing import Any
@@ -21,7 +20,6 @@ from pipecat.frames.frames import (
FunctionCallResultFrame,
FunctionCallResultProperties,
InterimTranscriptionFrame,
InterruptionCompletedFrame,
InterruptionFrame,
InterruptionTaskFrame,
LLMFullResponseEndFrame,
@@ -570,7 +568,6 @@ class BaseTestUserContextAggregator:
BotStartedSpeakingFrame,
UserStartedSpeakingFrame,
InterruptionFrame,
InterruptionCompletedFrame,
UserStoppedSpeakingFrame,
*self.EXPECTED_CONTEXT_FRAMES,
]

View File

@@ -10,7 +10,6 @@ from pipecat.audio.dtmf.types import KeypadEntry
from pipecat.frames.frames import (
EndFrame,
InputDTMFFrame,
InterruptionCompletedFrame,
InterruptionFrame,
TranscriptionFrame,
)
@@ -31,7 +30,6 @@ class TestDTMFAggregator(unittest.IsolatedAsyncioTestCase):
expected_down_frames = [
InputDTMFFrame,
InterruptionFrame,
InterruptionCompletedFrame,
InputDTMFFrame,
InputDTMFFrame,
InputDTMFFrame,
@@ -64,12 +62,10 @@ class TestDTMFAggregator(unittest.IsolatedAsyncioTestCase):
expected_down_frames = [
InputDTMFFrame,
InterruptionFrame,
InterruptionCompletedFrame,
InputDTMFFrame,
TranscriptionFrame, # First aggregation "12"
InputDTMFFrame,
InterruptionFrame,
InterruptionCompletedFrame,
TranscriptionFrame, # Second aggregation "3"
]
@@ -102,13 +98,11 @@ class TestDTMFAggregator(unittest.IsolatedAsyncioTestCase):
expected_down_frames = [
InputDTMFFrame,
InterruptionFrame,
InterruptionCompletedFrame,
InputDTMFFrame,
InputDTMFFrame,
TranscriptionFrame, # "12#"
InputDTMFFrame,
InterruptionFrame,
InterruptionCompletedFrame,
InputDTMFFrame,
TranscriptionFrame, # "45"
]
@@ -138,7 +132,6 @@ class TestDTMFAggregator(unittest.IsolatedAsyncioTestCase):
expected_down_frames = [
InputDTMFFrame,
InterruptionFrame,
InterruptionCompletedFrame,
InputDTMFFrame,
TranscriptionFrame, # Should flush before EndFrame
EndFrame,
@@ -167,7 +160,6 @@ class TestDTMFAggregator(unittest.IsolatedAsyncioTestCase):
expected_down_frames = [
InputDTMFFrame,
InterruptionFrame,
InterruptionCompletedFrame,
InputDTMFFrame,
TranscriptionFrame,
]
@@ -195,7 +187,6 @@ class TestDTMFAggregator(unittest.IsolatedAsyncioTestCase):
expected_down_frames = [
InputDTMFFrame,
InterruptionFrame,
InterruptionCompletedFrame,
InputDTMFFrame,
InputDTMFFrame,
TranscriptionFrame,
@@ -233,7 +224,7 @@ class TestDTMFAggregator(unittest.IsolatedAsyncioTestCase):
# All the InputDTMFFrames plus one TranscriptionFrame
expected_down_frames = (
[InputDTMFFrame, InterruptionFrame, InterruptionCompletedFrame]
[InputDTMFFrame, InterruptionFrame]
+ [InputDTMFFrame] * (len(frames_to_send) - 1)
+ [TranscriptionFrame]
)

View File

@@ -10,7 +10,6 @@ import unittest
from pipecat.frames.frames import (
EndFrame,
Frame,
InterruptionCompletedFrame,
InterruptionFrame,
OutputTransportMessageUrgentFrame,
TextFrame,
@@ -102,7 +101,6 @@ class TestFrameProcessor(unittest.IsolatedAsyncioTestCase):
expected_down_frames = [
InterruptionFrame,
InterruptionFrame,
InterruptionCompletedFrame,
OutputTransportMessageUrgentFrame,
EndFrame,
]