diff --git a/src/pipecat/pipeline/runner.py b/src/pipecat/pipeline/runner.py index 18a1979e8..c667d8d94 100644 --- a/src/pipecat/pipeline/runner.py +++ b/src/pipecat/pipeline/runner.py @@ -39,12 +39,9 @@ spawned tasks finishing on their own does **not** unblock it. import asyncio import gc -import importlib.util -import os import signal import uuid from dataclasses import dataclass, field -from pathlib import Path from loguru import logger @@ -62,6 +59,7 @@ from pipecat.bus import ( from pipecat.bus.subscriber import BusSubscriber from pipecat.pipeline.base_task import BaseTask from pipecat.pipeline.task import PipelineTask, PipelineTaskParams +from pipecat.pipeline.utils import run_setup_hook from pipecat.registry import TaskRegistry from pipecat.registry.types import TaskReadyData, TaskRegistryEntry from pipecat.utils.asyncio.task_manager import TaskManager, TaskManagerParams @@ -342,28 +340,12 @@ class PipelineRunner(BaseObject, BusSubscriber): await asyncio.gather(*remaining, return_exceptions=True) async def _load_setup_files(self) -> None: - """Load setup files from ``PIPECAT_RUNNER_SETUP_FILES``. + """Run ``setup_pipeline_runner`` from each file in ``PIPECAT_SETUP_FILES``. - Each file should contain an async ``setup_runner(runner)`` function - that receives the runner instance. + A setup file may define ``setup_pipeline_runner(runner)`` to attach + spawned tasks, event handlers, or other runner-level wiring. """ - setup_files = [f for f in os.environ.get("PIPECAT_RUNNER_SETUP_FILES", "").split(":") if f] - for f in setup_files: - try: - path = Path(f).resolve() - spec = importlib.util.spec_from_file_location(path.stem, str(path)) - if spec and spec.loader: - logger.debug(f"PipelineRunner '{self}': running setup from {path}") - module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module) - if hasattr(module, "setup_runner"): - await module.setup_runner(self) - else: - logger.warning( - f"PipelineRunner '{self}': setup file {path} has no setup_runner function" - ) - except Exception as e: - logger.error(f"PipelineRunner '{self}': error running setup from {f}: {e}") + await run_setup_hook(target=self, function_name="setup_pipeline_runner") async def _start_task(self, entry: _TaskEntry) -> None: """Run a registered task as a background asyncio task.""" diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 3634f1dc3..f8f4e993c 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -12,12 +12,9 @@ including heartbeats, idle detection, and observer integration. """ import asyncio -import importlib.util -import os import warnings from collections.abc import AsyncIterable, Iterable from dataclasses import dataclass -from pathlib import Path from typing import Any, TypeVar from loguru import logger @@ -52,6 +49,7 @@ from pipecat.pipeline.base_pipeline import BasePipeline from pipecat.pipeline.base_task import BaseTask from pipecat.pipeline.pipeline import Pipeline, PipelineSink, PipelineSource from pipecat.pipeline.task_observer import TaskObserver +from pipecat.pipeline.utils import run_setup_hook from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIObserverParams, RTVIProcessor from pipecat.utils.asyncio.task_manager import BaseTaskManager, TaskManager, TaskManagerParams @@ -1068,36 +1066,12 @@ class PipelineTask(BaseTask): return True async def _load_setup_files(self): - """Dynamically setup pipeline task from files listed in PIPECAT_SETUP_FILES. - - Each file should contain a `setup_pipeline_task(task)` async function - that receives the `PipelineTask` instance and can perform any custom - setup (e.g., adding event handlers, observers, or modifying task - configuration). + """Run ``setup_pipeline_task`` from each file in ``PIPECAT_SETUP_FILES``. + A setup file may define ``setup_pipeline_task(task)`` to attach event + handlers, observers, or other per-task wiring. """ - setup_files = [f for f in os.environ.get("PIPECAT_SETUP_FILES", "").split(":") if f] - for f in setup_files: - try: - path = Path(f).resolve() - module_name = path.stem - spec = importlib.util.spec_from_file_location(module_name, str(path)) - if spec and spec.loader: - logger.debug(f"{self} running setup from {path}") - - # Load module. - module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module) - - # Run setup function. - if hasattr(module, "setup_pipeline_task"): - await module.setup_pipeline_task(self) - else: - logger.warning( - f"{self} setup file {path} has no setup_pipeline_task function" - ) - except Exception as e: - logger.error(f"{self} error running external setup from {f}: {e}") + await run_setup_hook(target=self, function_name="setup_pipeline_task") def _print_dangling_tasks(self): """Log any dangling tasks that haven't been properly cleaned up.""" diff --git a/src/pipecat/pipeline/utils.py b/src/pipecat/pipeline/utils.py new file mode 100644 index 000000000..f695acb58 --- /dev/null +++ b/src/pipecat/pipeline/utils.py @@ -0,0 +1,77 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Shared loader for ``PIPECAT_SETUP_FILES`` hooks. + +Each file listed in the ``PIPECAT_SETUP_FILES`` environment variable (colon +separated) may define one or both of the following async functions: + +- ``setup_pipeline_runner(runner)`` — invoked once per :class:`PipelineRunner` + before its spawned tasks start. +- ``setup_pipeline_task(task)`` — invoked once per :class:`PipelineTask` while + the task sets up its pipeline. + +Setup files are imported at most once per process; module-level state (for +example, a shared debugger instance referenced by both hooks) is preserved +across hook invocations. +""" + +import importlib.util +import os +from pathlib import Path +from types import ModuleType +from typing import Any + +from loguru import logger + +_module_cache: dict[str, ModuleType] = {} + + +def _setup_file_paths() -> list[Path]: + return [Path(f).resolve() for f in os.environ.get("PIPECAT_SETUP_FILES", "").split(":") if f] + + +def _load_module(path: Path) -> ModuleType | None: + cache_key = str(path) + cached = _module_cache.get(cache_key) + if cached is not None: + return cached + + spec = importlib.util.spec_from_file_location(path.stem, str(path)) + if spec is None or spec.loader is None: + logger.error(f"unable to load setup file {path}") + return None + + module = importlib.util.module_from_spec(spec) + try: + spec.loader.exec_module(module) + except Exception as e: + logger.error(f"error loading setup file {path}: {e}") + return None + + _module_cache[cache_key] = module + return module + + +async def run_setup_hook(*, target: Any, function_name: str) -> None: + """Run ``function_name(target)`` from every ``PIPECAT_SETUP_FILES`` module. + + Args: + target: Object passed to the hook function. + function_name: Name of the async function to call in each setup file. + """ + for path in _setup_file_paths(): + module = _load_module(path) + if module is None: + continue + try: + if hasattr(module, function_name): + logger.debug(f"{target} running {function_name} from {path}") + await getattr(module, function_name)(target) + else: + logger.warning(f"{target} setup file {path} has no {function_name} function") + except Exception as e: + logger.error(f"{target} error running {function_name} from {path}: {e}")