From 10069719e4f3018463e6e93049803daf2ad2f099 Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Thu, 9 Oct 2025 15:48:59 -0400 Subject: [PATCH] Make `pause_processing_frames()` and `pause_processing_system_frames()` more robust in `FrameProcessor`. To understand this fix, let's look exclusively at `pause_processing_frames()` (`pause_processing_system_frames()` works the same way). `pause_processing_frames()` works by setting a `__should_block_frames` flag, which is then read each time through the loop in the long-running `__process_frame_task_handler`. if `__should_block_frames` is `True`, it pauses processing frames until it's resumed. Prior to this fix, the check for `__should_block_frames` was before `await self.__process_queue.get()`. The problem is that a lot of the time spent in the loop is waiting for a frame from the process queue. So if `pause_processing_frames()` is set at any time other than within `process_frame()` itself, it actually won't have an effect by the next frame, only on the frame *after* the next, which is later than intended. Because thus far in the Pipecat codebase we've only ever called `pause_processing_frames()` and `pause_processing_system_frames()` from within `process_frame()`, this change should have no behavioral effect. But it will be helpful if we ever need to call it from anywhere else. I noticed this issue while developing a feature that did exactly that (though I later abandoned that code). --- src/pipecat/processors/frame_processor.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 0b55082aa..1ca3333b5 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -877,6 +877,8 @@ class FrameProcessor(BaseObject): """ while True: + (frame, direction, callback) = await self.__input_queue.get() + if self.__should_block_system_frames and self.__input_event: logger.trace(f"{self}: system frame processing paused") await self.__input_event.wait() @@ -884,8 +886,6 @@ class FrameProcessor(BaseObject): self.__should_block_system_frames = False logger.trace(f"{self}: system frame processing resumed") - (frame, direction, callback) = await self.__input_queue.get() - if isinstance(frame, SystemFrame): await self.__process_frame(frame, direction, callback) elif self.__process_queue: @@ -900,6 +900,8 @@ class FrameProcessor(BaseObject): async def __process_frame_task_handler(self): """Handle non-system frames from the process queue.""" while True: + (frame, direction, callback) = await self.__process_queue.get() + if self.__should_block_frames and self.__process_event: logger.trace(f"{self}: frame processing paused") await self.__process_event.wait() @@ -907,8 +909,6 @@ class FrameProcessor(BaseObject): self.__should_block_frames = False logger.trace(f"{self}: frame processing resumed") - (frame, direction, callback) = await self.__process_queue.get() - await self.__process_frame(frame, direction, callback) self.__process_queue.task_done()