From c4e94e280eca9e6a2fcbfacae2bcba8b3448f57d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 25 Sep 2024 16:35:33 -0700 Subject: [PATCH] processors: add support for event handlers --- CHANGELOG.md | 10 ++++++++ src/pipecat/processors/frame_processor.py | 30 +++++++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c4bf3c92..f35978dcd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- All `FrameProcessors` can now register event handlers. + +``` +tts = SomeTTSService(...) + +@tts.event_handler("on_connected"): +async def on_connected(processor): + ... +``` + - Added `AsyncGeneratorProcessor`. This processor can be used together with a `FrameSerializer` as an async generator. It provides a `generator()` function that returns an `AsyncGenerator` and that yields serialized frames. diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index b56846aa6..f71e066d7 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -5,6 +5,7 @@ # import asyncio +import inspect from enum import Enum @@ -48,6 +49,8 @@ class FrameProcessor: self._loop: asyncio.AbstractEventLoop = loop or asyncio.get_running_loop() self._sync = sync + self._event_handlers: dict = {} + # Clock self._clock: BaseClock | None = None @@ -169,6 +172,23 @@ class FrameProcessor: else: await self.__push_queue.put((frame, direction)) + def event_handler(self, event_name: str): + def decorator(handler): + self.add_event_handler(event_name, handler) + return handler + + return decorator + + def add_event_handler(self, event_name: str, handler): + if event_name not in self._event_handlers: + raise Exception(f"Event handler {event_name} not registered") + self._event_handlers[event_name].append(handler) + + def _register_event_handler(self, event_name: str): + if event_name in self._event_handlers: + raise Exception(f"Event handler {event_name} already registered") + self._event_handlers[event_name] = [] + # # Handle interruptions # @@ -212,5 +232,15 @@ class FrameProcessor: except asyncio.CancelledError: break + async def _call_event_handler(self, event_name: str, *args, **kwargs): + try: + for handler in self._event_handlers[event_name]: + if inspect.iscoroutinefunction(handler): + await handler(self, *args, **kwargs) + else: + handler(self, *args, **kwargs) + except Exception as e: + logger.exception(f"Exception in event handler {event_name}: {e}") + def __str__(self): return self.name