From 07b9be53086172c0ec6a80cffac7edb77051da2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Mon, 24 Feb 2025 17:33:10 -0800 Subject: [PATCH] PipelineTask: add check_dangling_tasks parameter --- CHANGELOG.md | 4 ++++ src/pipecat/pipeline/task.py | 6 +++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 299ead7b0..e7ae19967 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added a new `PipelineTask` parameter `check_dangling_tasks` to enable or + disable checking for frame processors' dangling tasks when the Pipeline + finishes running. + - Added new `on_completion_timeout` event for LLM services (all OpenAI-based services, Anthropic and Google). Note that this event will only get triggered if LLM timeouts are setup and if the timeout was reached. It can be useful to diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 66b1e7647..1813878e1 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -123,6 +123,7 @@ class PipelineTask(BaseTask): pipeline: The pipeline to execute. params: Configuration parameters for the pipeline. clock: Clock implementation for timing operations. + check_dangling_tasks: Whether to check for processors' tasks finishing properly. """ def __init__( @@ -130,6 +131,7 @@ class PipelineTask(BaseTask): pipeline: BasePipeline, params: PipelineParams = PipelineParams(), clock: BaseClock = SystemClock(), + check_dangling_tasks: bool = True, ): self._id: int = obj_id() self._name: str = f"{self.__class__.__name__}#{obj_count(self)}" @@ -137,6 +139,7 @@ class PipelineTask(BaseTask): self._pipeline = pipeline self._clock = clock self._params = params + self._check_dangling_tasks = check_dangling_tasks self._finished = False # This queue receives frames coming from the pipeline upstream. @@ -220,7 +223,8 @@ class PipelineTask(BaseTask): pass await self._cancel_tasks() await self._cleanup() - self._print_dangling_tasks() + if self._check_dangling_tasks: + self._print_dangling_tasks() self._finished = True async def queue_frame(self, frame: Frame):