diff --git a/CHANGELOG.md b/CHANGELOG.md index 75dad0be3..49c35c689 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added `GatedOpenAILLMContextAggregator`. This aggregator keeps the last + received OpenAI LLM context frame and it doesn't let it through until the + notifier is notified. + +- Added `WakeNotifierFilter`. This processor expects a list of frame types and + will execute a given callback predicate when a frame of any of those type is + being processed. If the callback returns true the notifier will be notified. + +- Added `NullFilter`. A null filter doesn't push any frames upstream or + downstream. This is usually used to disable one of the pipelines in + `ParallelPipeline`. + +- Added `EventNotifier`. This can be used as a very simple synchronization + feature between processors. + - Added `TavusVideoService`. This is an integration for Tavus digital twins. (see https://www.tavus.io/) diff --git a/src/pipecat/processors/aggregators/gated_openai_llm_context.py b/src/pipecat/processors/aggregators/gated_openai_llm_context.py new file mode 100644 index 000000000..71a540dd4 --- /dev/null +++ b/src/pipecat/processors/aggregators/gated_openai_llm_context.py @@ -0,0 +1,55 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio + +from pipecat.frames.frames import CancelFrame, EndFrame, Frame, StartFrame +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.sync.base_notifier import BaseNotifier + + +class GatedOpenAILLMContextAggregator(FrameProcessor): + """This aggregator keeps the last received OpenAI LLM context frame and it + doesn't let it through until the notifier is notified. + + """ + + def __init__(self, notifier: BaseNotifier, **kwargs): + super().__init__(**kwargs) + self._notifier = notifier + self._last_context_frame = None + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, StartFrame): + await self.push_frame(frame) + await self._start() + if isinstance(frame, (EndFrame, CancelFrame)): + await self._stop() + await self.push_frame(frame) + elif isinstance(frame, OpenAILLMContextFrame): + self._last_context_frame = frame + else: + await self.push_frame(frame, direction) + + async def _start(self): + self._gate_task = self.get_event_loop().create_task(self._gate_task_handler()) + + async def _stop(self): + self._gate_task.cancel() + await self._gate_task + + async def _gate_task_handler(self): + while True: + try: + await self._notifier.wait() + if self._last_context_frame: + await self.push_frame(self._last_context_frame) + self._last_context_frame = None + except asyncio.CancelledError: + break diff --git a/src/pipecat/processors/filters/frame_filter.py b/src/pipecat/processors/filters/frame_filter.py index 45927a604..f4c4b0f61 100644 --- a/src/pipecat/processors/filters/frame_filter.py +++ b/src/pipecat/processors/filters/frame_filter.py @@ -4,14 +4,14 @@ # SPDX-License-Identifier: BSD 2-Clause License # -from typing import List +from typing import Tuple, Type from pipecat.frames.frames import AppFrame, ControlFrame, Frame, SystemFrame from pipecat.processors.frame_processor import FrameDirection, FrameProcessor class FrameFilter(FrameProcessor): - def __init__(self, types: List[type]): + def __init__(self, types: Tuple[Type[Frame]]): super().__init__() self._types = types @@ -20,9 +20,8 @@ class FrameFilter(FrameProcessor): # def _should_passthrough_frame(self, frame): - for t in self._types: - if isinstance(frame, t): - return True + if isinstance(frame, self._types): + return True return ( isinstance(frame, AppFrame) diff --git a/src/pipecat/processors/filters/null_filter.py b/src/pipecat/processors/filters/null_filter.py new file mode 100644 index 000000000..7e9ca6725 --- /dev/null +++ b/src/pipecat/processors/filters/null_filter.py @@ -0,0 +1,14 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +from pipecat.processors.frame_processor import FrameProcessor + + +class NullFilter(FrameProcessor): + """This filter doesn't allow passing any frames up or downstream.""" + + def __init__(self, **kwargs): + super().__init__(**kwargs) diff --git a/src/pipecat/processors/filters/wake_notifier_filter.py b/src/pipecat/processors/filters/wake_notifier_filter.py new file mode 100644 index 000000000..a7f074ccb --- /dev/null +++ b/src/pipecat/processors/filters/wake_notifier_filter.py @@ -0,0 +1,40 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +from typing import Awaitable, Callable, Tuple, Type + +from pipecat.frames.frames import Frame +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.sync.base_notifier import BaseNotifier + + +class WakeNotifierFilter(FrameProcessor): + """This processor expects a list of frame types and will execute a given + callback predicate when a frame of any of those type is being processed. If + the callback returns true the notifier will be notified. + + """ + + def __init__( + self, + notifier: BaseNotifier, + *, + types: Tuple[Type[Frame]], + filter: Callable[[Frame], Awaitable[bool]], + **kwargs, + ): + super().__init__(**kwargs) + self._notifier = notifier + self._types = types + self._filter = filter + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, self._types) and await self._filter(frame): + await self._notifier.notify() + + await self.push_frame(frame, direction) diff --git a/src/pipecat/sync/__init__.py b/src/pipecat/sync/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/pipecat/sync/base_notifier.py b/src/pipecat/sync/base_notifier.py new file mode 100644 index 000000000..c7770ab26 --- /dev/null +++ b/src/pipecat/sync/base_notifier.py @@ -0,0 +1,17 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +from abc import ABC, abstractmethod + + +class BaseNotifier(ABC): + @abstractmethod + async def notify(self): + pass + + @abstractmethod + async def wait(self): + pass diff --git a/src/pipecat/sync/event_notifier.py b/src/pipecat/sync/event_notifier.py new file mode 100644 index 000000000..f02dcbdae --- /dev/null +++ b/src/pipecat/sync/event_notifier.py @@ -0,0 +1,21 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio + +from pipecat.sync.base_notifier import BaseNotifier + + +class EventNotifier(BaseNotifier): + def __init__(self): + self._event = asyncio.Event() + + async def notify(self): + self._event.set() + + async def wait(self): + await self._event.wait() + self._event.clear()