Merge pull request #3397 from pipecat-ai/aleix/add-setup-pipeline-task

PipelineTask: add external pipeline task setup files
This commit is contained in:
Aleix Conchillo Flaqué
2026-01-09 17:39:25 -08:00
committed by GitHub
3 changed files with 52 additions and 1 deletions

6
changelog/3397.added.md Normal file
View File

@@ -0,0 +1,6 @@
- Added support for setting up a pipeline task from external files. You can now register custom pipeline task setup files by setting the `PIPECAT_SETUP_FILES` environment variable. This variable should contain a colon-separated list of Python files (e.g. `export PIPECAT_SETUP_FILES="setup1.py:setup.py:..."`). Each file must define a function with the following signature:
```python
async def setup_pipeline_task(task: PipelineTask):
...
```

View File

@@ -0,0 +1 @@
- Loading external observers from files is deprecated, use the new pipeline task setup files and `PIPECAT_SETUP_FILES` environment variable instead.

View File

@@ -666,6 +666,9 @@ class PipelineTask(BasePipelineTask):
async def _setup(self, params: PipelineTaskParams):
"""Set up the pipeline task and all processors."""
# Do any additional pipeline task setup externally.
await self._load_setup_files()
# Load additional observers.
await self._load_observer_files()
@@ -872,10 +875,51 @@ class PipelineTask(BasePipelineTask):
return False
return True
async def _load_setup_files(self):
"""Dynamically setup pipeline task from files listed in PIPECAT_SETUP_FILES.
Each file should contain a `setup_pipeline_task(task)` async function
that receives the `PipelineTask` instance and can perform any custom
setup (e.g., adding event handlers, observers, or modifying task
configuration).
"""
setup_files = [f for f in os.environ.get("PIPECAT_SETUP_FILES", "").split(":") if f]
for f in setup_files:
try:
path = Path(f).resolve()
module_name = path.stem
spec = importlib.util.spec_from_file_location(module_name, str(path))
if spec and spec.loader:
logger.debug(f"{self} running setup from {path}")
# Load module.
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Run setup function.
if hasattr(module, "setup_pipeline_task"):
await module.setup_pipeline_task(self)
else:
logger.warning(
f"{self} setup file {path} has no setup_pipeline_task function"
)
except Exception as e:
logger.error(f"{self} error running external setup from {f}: {e}")
async def _load_observer_files(self):
"""Dynamically load observers from files listed in PIPECAT_OBSERVER_FILES."""
observer_files = os.environ.get("PIPECAT_OBSERVER_FILES", "").split(":")
observer_files = [f for f in os.environ.get("PIPECAT_OBSERVER_FILES", "").split(":") if f]
for f in observer_files:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Observer files (and environment variable `PIPECAT_OBSERVER_FILES`) is deprecated, use setup files instead (and `PIPECAT_SETUP_FILES`) instead.",
DeprecationWarning,
)
try:
path = Path(f).resolve()
module_name = path.stem