From 6b24f89fa7e57afaa3c5b3fd2fe25723b1f16328 Mon Sep 17 00:00:00 2001 From: Kwindla Hultman Kramer Date: Thu, 19 Jun 2025 17:05:52 -0700 Subject: [PATCH] small fix for processor pause/resume frames --- src/pipecat/frames/frames.py | 34 +++++++++++++---------- src/pipecat/processors/frame_processor.py | 4 +-- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index ec368789d..c303d99ab 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -7,6 +7,7 @@ from dataclasses import dataclass, field from enum import Enum from typing import ( + TYPE_CHECKING, Any, Awaitable, Callable, @@ -26,6 +27,9 @@ from pipecat.transcriptions.language import Language from pipecat.utils.time import nanoseconds_to_str from pipecat.utils.utils import obj_count, obj_id +if TYPE_CHECKING: + from pipecat.processors.frame_processor import FrameProcessor + class KeypadEntry(str, Enum): """DTMF entries.""" @@ -529,25 +533,25 @@ class StopTaskFrame(SystemFrame): @dataclass class FrameProcessorPauseUrgentFrame(SystemFrame): - """This processor is used to pause frame processing for the given processor - as fast as possible. Pausing frame processing will keep frames in the - internal queue which will then be processed when frame processing is resumed - with `FrameProcessorResumeFrame`. + """This frame is used to pause frame processing for the given processor as + fast as possible. Pausing frame processing will keep frames in the internal + queue which will then be processed when frame processing is resumed with + `FrameProcessorResumeFrame`. """ - processor: str + processor: "FrameProcessor" @dataclass class FrameProcessorResumeUrgentFrame(SystemFrame): - """This processor is used to resume frame processing for the given processor + """This frame is used to resume frame processing for the given processor if it was previously paused as fast as possible. After resuming frame processing all queued frames will be processed in the order received. """ - processor: str + processor: "FrameProcessor" @dataclass @@ -879,23 +883,25 @@ class StopFrame(ControlFrame): @dataclass class FrameProcessorPauseFrame(ControlFrame): - """This processor is used to pause frame processing for the given + """This frame is used to pause frame processing for the given processor. Pausing frame processing will keep frames in the internal queue which will then be processed when frame processing is resumed with - `FrameProcessorResumeFrame`.""" + `FrameProcessorResumeFrame`. - processor: str + """ + + processor: "FrameProcessor" @dataclass class FrameProcessorResumeFrame(ControlFrame): - """This processor is used to resume frame processing for the given processor - if it was previously paused. After resuming frame processing all queued - frames will be processed in the order received. + """This frame is used to resume frame processing for the given processor if + it was previously paused. After resuming frame processing all queued frames + will be processed in the order received. """ - processor: str + processor: "FrameProcessor" @dataclass diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 1d2f066ed..680465e2d 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -296,11 +296,11 @@ class FrameProcessor(BaseObject): await self.__cancel_push_task() async def __pause(self, frame: FrameProcessorPauseFrame | FrameProcessorPauseUrgentFrame): - if frame.name == self.name: + if frame.processor.name == self.name: await self.pause_processing_frames() async def __resume(self, frame: FrameProcessorResumeFrame | FrameProcessorResumeUrgentFrame): - if frame.name == self.name: + if frame.processor.name == self.name: await self.resume_processing_frames() #