Merge pull request #2969 from pipecat-ai/aleix/pipecat-observer-files

PipelineTask: load observers from PIPECAT_OBSERVER_FILES
This commit is contained in:
Aleix Conchillo Flaqué
2025-11-04 12:34:37 -08:00
committed by GitHub
2 changed files with 38 additions and 0 deletions

View File

@@ -9,6 +9,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added support for loading external observers. You can now register custom
pipeline observers by setting the `PIPECAT_OBSERVER_FILES` environment
variable. This variable should contain a colon-separated list of Python files
(e.g. `export PIPECAT_OBSERVER_FILES="observer1.py:observer2.py:..."`). Each
file must define a function with the following signature:
```python
async def create_observers(task: PipelineTask) -> Iterable[BaseObserver]:
...
```
- Added support for new sonic-3 languages in `CartesiaTTSService` and
`CartesiaHttpTTSService`.

View File

@@ -12,6 +12,9 @@ including heartbeats, idle detection, and observer integration.
"""
import asyncio
import importlib.util
import os
from pathlib import Path
from typing import Any, AsyncIterable, Dict, Iterable, List, Optional, Tuple, Type
from loguru import logger
@@ -641,6 +644,9 @@ class PipelineTask(BasePipelineTask):
async def _setup(self, params: PipelineTaskParams):
"""Set up the pipeline task and all processors."""
# Load additional observers.
await self._load_observer_files()
mgr_params = TaskManagerParams(loop=params.loop)
self._task_manager.setup(mgr_params)
@@ -844,6 +850,27 @@ class PipelineTask(BasePipelineTask):
return False
return True
async def _load_observer_files(self):
observer_files = os.environ.get("PIPECAT_OBSERVER_FILES", "").split(":")
for f in observer_files:
try:
path = Path(f).resolve()
module_name = path.stem
spec = importlib.util.spec_from_file_location(module_name, str(path))
if spec:
logger.debug(f"{self} loading observers from {path}")
# Load module.
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Create observers.
observers = await module.create_observers(self)
for observer in observers:
self.add_observer(observer)
except Exception as e:
logger.error(f"{self} error loading external observers from {f}: {e}")
def _print_dangling_tasks(self):
"""Log any dangling tasks that haven't been properly cleaned up."""
tasks = [t.get_name() for t in self._task_manager.current_tasks()]