From a0128516ff8e47a7e4446566c0c68544f12b20fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 22 Aug 2025 18:53:00 -0700 Subject: [PATCH] PipelineTask: handle cancellations gracefully --- scripts/evals/eval.py | 4 +++- src/pipecat/pipeline/task.py | 35 ++++++++++++++++++++++++++++++----- 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/scripts/evals/eval.py b/scripts/evals/eval.py index 3818cd568..39770e474 100644 --- a/scripts/evals/eval.py +++ b/scripts/evals/eval.py @@ -341,6 +341,8 @@ async def run_eval_pipeline( async def on_pipeline_idle_timeout(task): await eval_runner.assert_eval_false() - runner = PipelineRunner() + # TODO(aleix): We should handle SIGINT and SIGTERM so we can cancel both the + # eval and the example. + runner = PipelineRunner(handle_sigint=False) await runner.run(task) diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index e809a690b..021ca38a4 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -369,16 +369,37 @@ class PipelineTask(BasePipelineTask): # We have already cleaned up the pipeline inside the task. cleanup_pipeline = False + + # Pipeline has finished nicely. + self._finished = True except asyncio.CancelledError: # Raise exception back to the pipeline runner so it can cancel this # task properly. raise finally: - await self._cancel_tasks() - await self._cleanup(cleanup_pipeline) - if self._check_dangling_tasks: - self._print_dangling_tasks() - self._finished = True + # We can reach this point for different reasons: + # + # 1. The task has finished properly (e.g. `EndFrame`). + # 2. By calling `PipelineTask.cancel()`. + # 3. By asyncio task cancellation. + # + # Case (1) will execute the code below without issues because + # `self._finished` is true. + # + # Case (2) will execute the code below without issues because + # `self._cancelled` is true. + # + # Case (3) will raise the exception above (because we are cancelling + # the asyncio task). This will be then captured by the + # `PipelineRunner` which will call `PipelineTask.cancel()` and + # therefore becoming case (2). + if self._finished or self._cancelled: + logger.debug(f"Pipeline task {self} has finished, cleaning up resources") + await self._cancel_tasks() + await self._cleanup(cleanup_pipeline) + if self._check_dangling_tasks: + self._print_dangling_tasks() + self._finished = True async def queue_frame(self, frame: Frame): """Queue a single frame to be pushed down the pipeline. @@ -698,6 +719,10 @@ class PipelineTask(BasePipelineTask): Returns: Whether the pipeline task should continue running. """ + # If we are cancelling, just exit the task. + if self._cancelled: + return True + logger.warning("Idle timeout detected. Last 10 frames received:") for i, frame in enumerate(last_frames, 1): logger.warning(f"Frame {i}: {frame}")