diff --git a/CHANGELOG.md b/CHANGELOG.md index 30742d536..eb28247fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to **Pipecat** will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Performance + +- Remove unncessary push task in each `FrameProcessor`. + ## [0.0.74] - 2025-07-03 ### Added diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 0d5a6db62..a049fabdb 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -152,11 +152,6 @@ class FrameProcessor(BaseObject): self.__input_event = None self.__input_frame_task: Optional[asyncio.Task] = None - # Every processor in Pipecat should only output frames from a single - # task. This avoid problems like audio overlapping. System frames are the - # exception to this rule. This create this task. - self.__push_frame_task: Optional[asyncio.Task] = None - @property def id(self) -> int: """Get the unique identifier for this processor. @@ -385,7 +380,6 @@ class FrameProcessor(BaseObject): """Clean up processor resources.""" await super().cleanup() await self.__cancel_input_task() - await self.__cancel_push_task() if self._metrics is not None: await self._metrics.cleanup() @@ -512,10 +506,7 @@ class FrameProcessor(BaseObject): if not self._check_started(frame): return - if isinstance(frame, SystemFrame): - await self.__internal_push_frame(frame, direction) - else: - await self.__push_queue.put((frame, direction)) + await self.__internal_push_frame(frame, direction) async def __start(self, frame: StartFrame): """Handle the start frame to initialize processor state. @@ -530,7 +521,6 @@ class FrameProcessor(BaseObject): self._interruption_strategies = frame.interruption_strategies self._report_only_initial_ttfb = frame.report_only_initial_ttfb self.__create_input_task() - self.__create_push_task() async def __cancel(self, frame: CancelFrame): """Handle the cancel frame to stop processor operation. @@ -540,7 +530,6 @@ class FrameProcessor(BaseObject): """ self._cancelling = True await self.__cancel_input_task() - await self.__cancel_push_task() async def __pause(self, frame: FrameProcessorPauseFrame | FrameProcessorPauseUrgentFrame): """Handle pause frame to pause processor operation. @@ -567,9 +556,6 @@ class FrameProcessor(BaseObject): async def _start_interruption(self): """Start handling an interruption by canceling current tasks.""" try: - # Cancel the push frame task. This will stop pushing frames downstream. - await self.__cancel_push_task() - # Cancel the input task. This will stop processing queued frames. await self.__cancel_input_task() except Exception as e: @@ -579,9 +565,6 @@ class FrameProcessor(BaseObject): # Create a new input queue and task. self.__create_input_task() - # Create a new output queue and task. - self.__create_push_task() - async def _stop_interruption(self): """Stop handling an interruption.""" # Nothing to do right now. @@ -677,23 +660,3 @@ class FrameProcessor(BaseObject): await self.push_error(ErrorFrame(str(e))) finally: self.__input_queue.task_done() - - def __create_push_task(self): - """Create the frame pushing task.""" - if not self.__push_frame_task: - self.__push_queue = WatchdogQueue(self.task_manager) - self.__push_frame_task = self.create_task(self.__push_frame_task_handler()) - - async def __cancel_push_task(self): - """Cancel the frame pushing task.""" - if self.__push_frame_task: - self.__push_queue.cancel() - await self.cancel_task(self.__push_frame_task) - self.__push_frame_task = None - - async def __push_frame_task_handler(self): - """Handle frames from the push queue.""" - while True: - (frame, direction) = await self.__push_queue.get() - await self.__internal_push_frame(frame, direction) - self.__push_queue.task_done()