diff --git a/CHANGELOG.md b/CHANGELOG.md index b8d62fb16..0beaeab2f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added support for loading external observers. You can now register custom + pipeline observers by setting the `PIPECAT_OBSERVER_FILES` environment + variable. This variable should contain a colon-separated list of Python files + (e.g. `export PIPECAT_OBSERVER_FILES="observer1.py:observer2.py:..."`). Each + file must define a function with the following signature: + + ```python + async def create_observers(task: PipelineTask) -> Iterable[BaseObserver]: + ... + ``` + - Added support for new sonic-3 languages in `CartesiaTTSService` and `CartesiaHttpTTSService`. diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index ebcb9146d..90976b52c 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -12,6 +12,9 @@ including heartbeats, idle detection, and observer integration. """ import asyncio +import importlib.util +import os +from pathlib import Path from typing import Any, AsyncIterable, Dict, Iterable, List, Optional, Tuple, Type from loguru import logger @@ -641,6 +644,9 @@ class PipelineTask(BasePipelineTask): async def _setup(self, params: PipelineTaskParams): """Set up the pipeline task and all processors.""" + # Load additional observers. + await self._load_observer_files() + mgr_params = TaskManagerParams(loop=params.loop) self._task_manager.setup(mgr_params) @@ -844,6 +850,27 @@ class PipelineTask(BasePipelineTask): return False return True + async def _load_observer_files(self): + observer_files = os.environ.get("PIPECAT_OBSERVER_FILES", "").split(":") + for f in observer_files: + try: + path = Path(f).resolve() + module_name = path.stem + spec = importlib.util.spec_from_file_location(module_name, str(path)) + if spec: + logger.debug(f"{self} loading observers from {path}") + + # Load module. + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + # Create observers. + observers = await module.create_observers(self) + for observer in observers: + self.add_observer(observer) + except Exception as e: + logger.error(f"{self} error loading external observers from {f}: {e}") + def _print_dangling_tasks(self): """Log any dangling tasks that haven't been properly cleaned up.""" tasks = [t.get_name() for t in self._task_manager.current_tasks()]