From d2696be03be115e843d64d5f24e981cff638952e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 9 Jan 2026 16:35:11 -0800 Subject: [PATCH] PipelineTask: add external pipeline task setup files --- changelog/3397.added.md | 6 +++++ changelog/3397.deprecated.md | 1 + src/pipecat/pipeline/task.py | 46 +++++++++++++++++++++++++++++++++++- 3 files changed, 52 insertions(+), 1 deletion(-) create mode 100644 changelog/3397.added.md create mode 100644 changelog/3397.deprecated.md diff --git a/changelog/3397.added.md b/changelog/3397.added.md new file mode 100644 index 000000000..2f819cc43 --- /dev/null +++ b/changelog/3397.added.md @@ -0,0 +1,6 @@ +- Added support for setting up a pipeline task from external files. You can now register custom pipeline task setup files by setting the `PIPECAT_SETUP_FILES` environment variable. This variable should contain a colon-separated list of Python files (e.g. `export PIPECAT_SETUP_FILES="setup1.py:setup.py:..."`). Each file must define a function with the following signature: + + ```python + async def setup_pipeline_task(task: PipelineTask): + ... + ``` diff --git a/changelog/3397.deprecated.md b/changelog/3397.deprecated.md new file mode 100644 index 000000000..b9028c5be --- /dev/null +++ b/changelog/3397.deprecated.md @@ -0,0 +1 @@ +- Loading external observers from files is deprecated, use the new pipeline task setup files and `PIPECAT_SETUP_FILES` environment variable instead. diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index b3d64c893..faff787e4 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -654,6 +654,9 @@ class PipelineTask(BasePipelineTask): async def _setup(self, params: PipelineTaskParams): """Set up the pipeline task and all processors.""" + # Do any additional pipeline task setup externally. + await self._load_setup_files() + # Load additional observers. await self._load_observer_files() @@ -860,10 +863,51 @@ class PipelineTask(BasePipelineTask): return False return True + async def _load_setup_files(self): + """Dynamically setup pipeline task from files listed in PIPECAT_SETUP_FILES. + + Each file should contain a `setup_pipeline_task(task)` async function + that receives the `PipelineTask` instance and can perform any custom + setup (e.g., adding event handlers, observers, or modifying task + configuration). + + """ + setup_files = [f for f in os.environ.get("PIPECAT_SETUP_FILES", "").split(":") if f] + for f in setup_files: + try: + path = Path(f).resolve() + module_name = path.stem + spec = importlib.util.spec_from_file_location(module_name, str(path)) + if spec and spec.loader: + logger.debug(f"{self} running setup from {path}") + + # Load module. + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + # Run setup function. + if hasattr(module, "setup_pipeline_task"): + await module.setup_pipeline_task(self) + else: + logger.warning( + f"{self} setup file {path} has no setup_pipeline_task function" + ) + except Exception as e: + logger.error(f"{self} error running external setup from {f}: {e}") + async def _load_observer_files(self): """Dynamically load observers from files listed in PIPECAT_OBSERVER_FILES.""" - observer_files = os.environ.get("PIPECAT_OBSERVER_FILES", "").split(":") + observer_files = [f for f in os.environ.get("PIPECAT_OBSERVER_FILES", "").split(":") if f] for f in observer_files: + import warnings + + with warnings.catch_warnings(): + warnings.simplefilter("always") + warnings.warn( + "Observer files (and environment variable `PIPECAT_OBSERVER_FILES`) is deprecated, use setup files instead (and `PIPECAT_SETUP_FILES`) instead.", + DeprecationWarning, + ) + try: path = Path(f).resolve() module_name = path.stem