diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a1651808..3d8e18746 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added a new `PipelineTask` parameter `check_dangling_tasks` to enable or + disable checking for frame processors' dangling tasks when the Pipeline + finishes running. + - Added new `on_completion_timeout` event for LLM services (all OpenAI-based services, Anthropic and Google). Note that this event will only get triggered if LLM timeouts are setup and if the timeout was reached. It can be useful to diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 66b1e7647..1813878e1 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -123,6 +123,7 @@ class PipelineTask(BaseTask): pipeline: The pipeline to execute. params: Configuration parameters for the pipeline. clock: Clock implementation for timing operations. + check_dangling_tasks: Whether to check for processors' tasks finishing properly. """ def __init__( @@ -130,6 +131,7 @@ class PipelineTask(BaseTask): pipeline: BasePipeline, params: PipelineParams = PipelineParams(), clock: BaseClock = SystemClock(), + check_dangling_tasks: bool = True, ): self._id: int = obj_id() self._name: str = f"{self.__class__.__name__}#{obj_count(self)}" @@ -137,6 +139,7 @@ class PipelineTask(BaseTask): self._pipeline = pipeline self._clock = clock self._params = params + self._check_dangling_tasks = check_dangling_tasks self._finished = False # This queue receives frames coming from the pipeline upstream. @@ -220,7 +223,8 @@ class PipelineTask(BaseTask): pass await self._cancel_tasks() await self._cleanup() - self._print_dangling_tasks() + if self._check_dangling_tasks: + self._print_dangling_tasks() self._finished = True async def queue_frame(self, frame: Frame):