add notifiers and more frame filters

This commit is contained in:
Aleix Conchillo Flaqué
2024-10-30 16:34:05 -07:00
parent b050143952
commit 018e51e8a3
8 changed files with 166 additions and 5 deletions

View File

@@ -9,6 +9,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added `GatedOpenAILLMContextAggregator`. This aggregator keeps the last
received OpenAI LLM context frame and it doesn't let it through until the
notifier is notified.
- Added `WakeNotifierFilter`. This processor expects a list of frame types and
will execute a given callback predicate when a frame of any of those type is
being processed. If the callback returns true the notifier will be notified.
- Added `NullFilter`. A null filter doesn't push any frames upstream or
downstream. This is usually used to disable one of the pipelines in
`ParallelPipeline`.
- Added `EventNotifier`. This can be used as a very simple synchronization
feature between processors.
- Added `TavusVideoService`. This is an integration for Tavus digital twins.
(see https://www.tavus.io/)

View File

@@ -0,0 +1,55 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
from pipecat.frames.frames import CancelFrame, EndFrame, Frame, StartFrame
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.sync.base_notifier import BaseNotifier
class GatedOpenAILLMContextAggregator(FrameProcessor):
"""This aggregator keeps the last received OpenAI LLM context frame and it
doesn't let it through until the notifier is notified.
"""
def __init__(self, notifier: BaseNotifier, **kwargs):
super().__init__(**kwargs)
self._notifier = notifier
self._last_context_frame = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, StartFrame):
await self.push_frame(frame)
await self._start()
if isinstance(frame, (EndFrame, CancelFrame)):
await self._stop()
await self.push_frame(frame)
elif isinstance(frame, OpenAILLMContextFrame):
self._last_context_frame = frame
else:
await self.push_frame(frame, direction)
async def _start(self):
self._gate_task = self.get_event_loop().create_task(self._gate_task_handler())
async def _stop(self):
self._gate_task.cancel()
await self._gate_task
async def _gate_task_handler(self):
while True:
try:
await self._notifier.wait()
if self._last_context_frame:
await self.push_frame(self._last_context_frame)
self._last_context_frame = None
except asyncio.CancelledError:
break

View File

@@ -4,14 +4,14 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import List
from typing import Tuple, Type
from pipecat.frames.frames import AppFrame, ControlFrame, Frame, SystemFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class FrameFilter(FrameProcessor):
def __init__(self, types: List[type]):
def __init__(self, types: Tuple[Type[Frame]]):
super().__init__()
self._types = types
@@ -20,9 +20,8 @@ class FrameFilter(FrameProcessor):
#
def _should_passthrough_frame(self, frame):
for t in self._types:
if isinstance(frame, t):
return True
if isinstance(frame, self._types):
return True
return (
isinstance(frame, AppFrame)

View File

@@ -0,0 +1,14 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from pipecat.processors.frame_processor import FrameProcessor
class NullFilter(FrameProcessor):
"""This filter doesn't allow passing any frames up or downstream."""
def __init__(self, **kwargs):
super().__init__(**kwargs)

View File

@@ -0,0 +1,40 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import Awaitable, Callable, Tuple, Type
from pipecat.frames.frames import Frame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.sync.base_notifier import BaseNotifier
class WakeNotifierFilter(FrameProcessor):
"""This processor expects a list of frame types and will execute a given
callback predicate when a frame of any of those type is being processed. If
the callback returns true the notifier will be notified.
"""
def __init__(
self,
notifier: BaseNotifier,
*,
types: Tuple[Type[Frame]],
filter: Callable[[Frame], Awaitable[bool]],
**kwargs,
):
super().__init__(**kwargs)
self._notifier = notifier
self._types = types
self._filter = filter
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, self._types) and await self._filter(frame):
await self._notifier.notify()
await self.push_frame(frame, direction)

View File

View File

@@ -0,0 +1,17 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from abc import ABC, abstractmethod
class BaseNotifier(ABC):
@abstractmethod
async def notify(self):
pass
@abstractmethod
async def wait(self):
pass

View File

@@ -0,0 +1,21 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
from pipecat.sync.base_notifier import BaseNotifier
class EventNotifier(BaseNotifier):
def __init__(self):
self._event = asyncio.Event()
async def notify(self):
self._event.set()
async def wait(self):
await self._event.wait()
self._event.clear()