FrameProcessor: remove unnecessary push task

When we call `FrameProcessor.push_frame()` we end up calling
`FrameProcessor.queue_frame()` on the next or previous processor which already
uses the input queue and guarantees frame ordering. So, there's no need to have
a two queues next to each other.
This commit is contained in:
Aleix Conchillo Flaqué
2025-07-02 16:49:41 -07:00
parent 2f3272ea2f
commit 02b63c28a5
2 changed files with 7 additions and 38 deletions

View File

@@ -5,6 +5,12 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Performance
- Remove unncessary push task in each `FrameProcessor`.
## [0.0.74] - 2025-07-03
### Added

View File

@@ -152,11 +152,6 @@ class FrameProcessor(BaseObject):
self.__input_event = None
self.__input_frame_task: Optional[asyncio.Task] = None
# Every processor in Pipecat should only output frames from a single
# task. This avoid problems like audio overlapping. System frames are the
# exception to this rule. This create this task.
self.__push_frame_task: Optional[asyncio.Task] = None
@property
def id(self) -> int:
"""Get the unique identifier for this processor.
@@ -385,7 +380,6 @@ class FrameProcessor(BaseObject):
"""Clean up processor resources."""
await super().cleanup()
await self.__cancel_input_task()
await self.__cancel_push_task()
if self._metrics is not None:
await self._metrics.cleanup()
@@ -512,10 +506,7 @@ class FrameProcessor(BaseObject):
if not self._check_started(frame):
return
if isinstance(frame, SystemFrame):
await self.__internal_push_frame(frame, direction)
else:
await self.__push_queue.put((frame, direction))
await self.__internal_push_frame(frame, direction)
async def __start(self, frame: StartFrame):
"""Handle the start frame to initialize processor state.
@@ -530,7 +521,6 @@ class FrameProcessor(BaseObject):
self._interruption_strategies = frame.interruption_strategies
self._report_only_initial_ttfb = frame.report_only_initial_ttfb
self.__create_input_task()
self.__create_push_task()
async def __cancel(self, frame: CancelFrame):
"""Handle the cancel frame to stop processor operation.
@@ -540,7 +530,6 @@ class FrameProcessor(BaseObject):
"""
self._cancelling = True
await self.__cancel_input_task()
await self.__cancel_push_task()
async def __pause(self, frame: FrameProcessorPauseFrame | FrameProcessorPauseUrgentFrame):
"""Handle pause frame to pause processor operation.
@@ -567,9 +556,6 @@ class FrameProcessor(BaseObject):
async def _start_interruption(self):
"""Start handling an interruption by canceling current tasks."""
try:
# Cancel the push frame task. This will stop pushing frames downstream.
await self.__cancel_push_task()
# Cancel the input task. This will stop processing queued frames.
await self.__cancel_input_task()
except Exception as e:
@@ -579,9 +565,6 @@ class FrameProcessor(BaseObject):
# Create a new input queue and task.
self.__create_input_task()
# Create a new output queue and task.
self.__create_push_task()
async def _stop_interruption(self):
"""Stop handling an interruption."""
# Nothing to do right now.
@@ -677,23 +660,3 @@ class FrameProcessor(BaseObject):
await self.push_error(ErrorFrame(str(e)))
finally:
self.__input_queue.task_done()
def __create_push_task(self):
"""Create the frame pushing task."""
if not self.__push_frame_task:
self.__push_queue = WatchdogQueue(self.task_manager)
self.__push_frame_task = self.create_task(self.__push_frame_task_handler())
async def __cancel_push_task(self):
"""Cancel the frame pushing task."""
if self.__push_frame_task:
self.__push_queue.cancel()
await self.cancel_task(self.__push_frame_task)
self.__push_frame_task = None
async def __push_frame_task_handler(self):
"""Handle frames from the push queue."""
while True:
(frame, direction) = await self.__push_queue.get()
await self.__internal_push_frame(frame, direction)
self.__push_queue.task_done()