FrameProcessor: add before/after events for processed/pushed frames

This commit is contained in:
Aleix Conchillo Flaqué
2025-09-19 13:44:49 -07:00
parent 2289409b4c
commit 3d88b42e0b
3 changed files with 71 additions and 0 deletions

View File

@@ -9,6 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added `on_before_process_frame`, `on_after_process_frame`,
`on_before_push_frame` and `on_after_push_frame`. These are synchronous events
that get called before and after a frame is processed or pushed. Note that
these events are synchrnous so they should ideally perform lightweight tasks
in order to not block the pipeline.
- Added `on_before_leave` synchronous event to `DailyTransport`.
- Added `on_before_disconnect` synchronous event to `LiveKitTransport`.

View File

@@ -141,6 +141,12 @@ class FrameProcessor(BaseObject):
task. System frames are also processed in a separate task which guarantees
frame priority.
Event handlers available:
- on_before_process_frame: Called before a frame is processed
- on_after_process_frame: Called after a frame is processed
- on_before_push_frame: Called before a frame is pushed
- on_after_push_frame: Called after a frame is pushed
"""
def __init__(
@@ -228,6 +234,12 @@ class FrameProcessor(BaseObject):
self._wait_for_interruption = False
self._wait_interruption_event = asyncio.Event()
# Frame processor events.
self._register_event_handler("on_before_process_frame", sync=True)
self._register_event_handler("on_after_process_frame", sync=True)
self._register_event_handler("on_before_push_frame", sync=True)
self._register_event_handler("on_after_push_frame", sync=True)
@property
def id(self) -> int:
"""Get the unique identifier for this processor.
@@ -639,8 +651,12 @@ class FrameProcessor(BaseObject):
if not self._check_started(frame):
return
await self._call_event_handler("on_before_push_frame", frame)
await self.__internal_push_frame(frame, direction)
await self._call_event_handler("on_after_push_frame", frame)
# If we are waiting for an interruption and we get an interruption, then
# we can unblock `push_interruption_task_frame_and_wait()`.
if self._wait_for_interruption and isinstance(frame, InterruptionFrame):
@@ -836,11 +852,15 @@ class FrameProcessor(BaseObject):
self, frame: Frame, direction: FrameDirection, callback: Optional[FrameCallback]
):
try:
await self._call_event_handler("on_before_process_frame", frame)
# Process the frame.
await self.process_frame(frame, direction)
# If this frame has an associated callback, call it now.
if callback:
await callback(self, frame, direction)
await self._call_event_handler("on_after_process_frame", frame)
except Exception as e:
logger.exception(f"{self}: error processing frame: {e}")
await self.push_error(ErrorFrame(str(e)))

View File

@@ -15,11 +15,56 @@ from pipecat.frames.frames import (
TransportMessageUrgentFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.filters.identity_filter import IdentityFilter
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.tests.utils import SleepFrame, run_test
class TestFrameProcessor(unittest.IsolatedAsyncioTestCase):
async def test_before_after_events(self):
identity = IdentityFilter()
before_process_called = False
after_process_called = False
before_push_called = False
after_push_called = False
@identity.event_handler("on_before_process_frame")
async def on_before_process_frame(filter, frame):
nonlocal before_process_called
before_process_called = True
@identity.event_handler("on_after_process_frame")
async def on_after_process_frame(filter, frame):
nonlocal after_process_called
after_process_called = True
@identity.event_handler("on_before_push_frame")
async def on_before_push_frame(filter, frame):
nonlocal before_push_called
before_push_called = True
@identity.event_handler("on_after_push_frame")
async def on_after_push_frame(filter, frame):
nonlocal after_push_called
after_push_called = True
pipeline = Pipeline([identity])
frames_to_send = [TextFrame(text="Hello cat!")]
expected_down_frames = [TextFrame]
await run_test(
pipeline,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
)
assert before_process_called
assert after_process_called
assert before_push_called
assert after_push_called
async def test_interruption_and_wait(self):
class DelayFrameProcessor(FrameProcessor):
"""This processors just gives time to the event loop to change