From 3d88b42e0b533e4dcdb9283d06c77b7d7ecf39fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 19 Sep 2025 13:44:49 -0700 Subject: [PATCH] FrameProcessor: add before/after events for processed/pushed frames --- CHANGELOG.md | 6 +++ src/pipecat/processors/frame_processor.py | 20 ++++++++++ tests/test_frame_processor.py | 45 +++++++++++++++++++++++ 3 files changed, 71 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 65d3c388f..bc432db50 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added `on_before_process_frame`, `on_after_process_frame`, + `on_before_push_frame` and `on_after_push_frame`. These are synchronous events + that get called before and after a frame is processed or pushed. Note that + these events are synchrnous so they should ideally perform lightweight tasks + in order to not block the pipeline. + - Added `on_before_leave` synchronous event to `DailyTransport`. - Added `on_before_disconnect` synchronous event to `LiveKitTransport`. diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 5723512a3..894dabbe7 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -141,6 +141,12 @@ class FrameProcessor(BaseObject): task. System frames are also processed in a separate task which guarantees frame priority. + Event handlers available: + + - on_before_process_frame: Called before a frame is processed + - on_after_process_frame: Called after a frame is processed + - on_before_push_frame: Called before a frame is pushed + - on_after_push_frame: Called after a frame is pushed """ def __init__( @@ -228,6 +234,12 @@ class FrameProcessor(BaseObject): self._wait_for_interruption = False self._wait_interruption_event = asyncio.Event() + # Frame processor events. + self._register_event_handler("on_before_process_frame", sync=True) + self._register_event_handler("on_after_process_frame", sync=True) + self._register_event_handler("on_before_push_frame", sync=True) + self._register_event_handler("on_after_push_frame", sync=True) + @property def id(self) -> int: """Get the unique identifier for this processor. @@ -639,8 +651,12 @@ class FrameProcessor(BaseObject): if not self._check_started(frame): return + await self._call_event_handler("on_before_push_frame", frame) + await self.__internal_push_frame(frame, direction) + await self._call_event_handler("on_after_push_frame", frame) + # If we are waiting for an interruption and we get an interruption, then # we can unblock `push_interruption_task_frame_and_wait()`. if self._wait_for_interruption and isinstance(frame, InterruptionFrame): @@ -836,11 +852,15 @@ class FrameProcessor(BaseObject): self, frame: Frame, direction: FrameDirection, callback: Optional[FrameCallback] ): try: + await self._call_event_handler("on_before_process_frame", frame) + # Process the frame. await self.process_frame(frame, direction) # If this frame has an associated callback, call it now. if callback: await callback(self, frame, direction) + + await self._call_event_handler("on_after_process_frame", frame) except Exception as e: logger.exception(f"{self}: error processing frame: {e}") await self.push_error(ErrorFrame(str(e))) diff --git a/tests/test_frame_processor.py b/tests/test_frame_processor.py index 7cbe258f8..f514c94b0 100644 --- a/tests/test_frame_processor.py +++ b/tests/test_frame_processor.py @@ -15,11 +15,56 @@ from pipecat.frames.frames import ( TransportMessageUrgentFrame, ) from pipecat.pipeline.pipeline import Pipeline +from pipecat.processors.filters.identity_filter import IdentityFilter from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.tests.utils import SleepFrame, run_test class TestFrameProcessor(unittest.IsolatedAsyncioTestCase): + async def test_before_after_events(self): + identity = IdentityFilter() + + before_process_called = False + after_process_called = False + before_push_called = False + after_push_called = False + + @identity.event_handler("on_before_process_frame") + async def on_before_process_frame(filter, frame): + nonlocal before_process_called + before_process_called = True + + @identity.event_handler("on_after_process_frame") + async def on_after_process_frame(filter, frame): + nonlocal after_process_called + after_process_called = True + + @identity.event_handler("on_before_push_frame") + async def on_before_push_frame(filter, frame): + nonlocal before_push_called + before_push_called = True + + @identity.event_handler("on_after_push_frame") + async def on_after_push_frame(filter, frame): + nonlocal after_push_called + after_push_called = True + + pipeline = Pipeline([identity]) + + frames_to_send = [TextFrame(text="Hello cat!")] + + expected_down_frames = [TextFrame] + + await run_test( + pipeline, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + ) + assert before_process_called + assert after_process_called + assert before_push_called + assert after_push_called + async def test_interruption_and_wait(self): class DelayFrameProcessor(FrameProcessor): """This processors just gives time to the event loop to change