diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b7b3d7f4..ee8a60b22 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,11 +5,13 @@ All notable changes to **pipecat** will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). - -## Unreleased +## [Unreleased] ### Added +- Added a new `FunctionFilter`. This filter will let you filter frames based on + a given function, except system messages which should never be filtered. + - Added `enable_metrics` to `PipelineParams`. - Added `MetricsFrame`. The `MetricsFrame` will report different metrics in the diff --git a/src/pipecat/processors/filters/function_filter.py b/src/pipecat/processors/filters/function_filter.py new file mode 100644 index 000000000..421fcc80c --- /dev/null +++ b/src/pipecat/processors/filters/function_filter.py @@ -0,0 +1,30 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +from typing import Awaitable, Callable + +from pipecat.frames.frames import Frame, SystemFrame +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor + + +class FunctionFilter(FrameProcessor): + + def __init__(self, filter: Callable[[Frame], Awaitable[bool]]): + super().__init__() + self._filter = filter + + # + # Frame processor + # + + def _should_passthrough_frame(self, frame): + return isinstance(frame, SystemFrame) + + async def process_frame(self, frame: Frame, direction: FrameDirection): + passthrough = self._should_passthrough_frame(frame) + allowed = await self._filter(frame) + if passthrough or allowed: + await self.push_frame(frame, direction)