Merge pull request #2497 from pipecat-ai/aleix/pipeline-task-fix-cancellation
PipelineTask: handle cancellations gracefully
This commit is contained in:
@@ -341,6 +341,8 @@ async def run_eval_pipeline(
|
||||
async def on_pipeline_idle_timeout(task):
|
||||
await eval_runner.assert_eval_false()
|
||||
|
||||
runner = PipelineRunner()
|
||||
# TODO(aleix): We should handle SIGINT and SIGTERM so we can cancel both the
|
||||
# eval and the example.
|
||||
runner = PipelineRunner(handle_sigint=False)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
@@ -369,16 +369,37 @@ class PipelineTask(BasePipelineTask):
|
||||
|
||||
# We have already cleaned up the pipeline inside the task.
|
||||
cleanup_pipeline = False
|
||||
|
||||
# Pipeline has finished nicely.
|
||||
self._finished = True
|
||||
except asyncio.CancelledError:
|
||||
# Raise exception back to the pipeline runner so it can cancel this
|
||||
# task properly.
|
||||
raise
|
||||
finally:
|
||||
await self._cancel_tasks()
|
||||
await self._cleanup(cleanup_pipeline)
|
||||
if self._check_dangling_tasks:
|
||||
self._print_dangling_tasks()
|
||||
self._finished = True
|
||||
# We can reach this point for different reasons:
|
||||
#
|
||||
# 1. The task has finished properly (e.g. `EndFrame`).
|
||||
# 2. By calling `PipelineTask.cancel()`.
|
||||
# 3. By asyncio task cancellation.
|
||||
#
|
||||
# Case (1) will execute the code below without issues because
|
||||
# `self._finished` is true.
|
||||
#
|
||||
# Case (2) will execute the code below without issues because
|
||||
# `self._cancelled` is true.
|
||||
#
|
||||
# Case (3) will raise the exception above (because we are cancelling
|
||||
# the asyncio task). This will be then captured by the
|
||||
# `PipelineRunner` which will call `PipelineTask.cancel()` and
|
||||
# therefore becoming case (2).
|
||||
if self._finished or self._cancelled:
|
||||
logger.debug(f"Pipeline task {self} has finished, cleaning up resources")
|
||||
await self._cancel_tasks()
|
||||
await self._cleanup(cleanup_pipeline)
|
||||
if self._check_dangling_tasks:
|
||||
self._print_dangling_tasks()
|
||||
self._finished = True
|
||||
|
||||
async def queue_frame(self, frame: Frame):
|
||||
"""Queue a single frame to be pushed down the pipeline.
|
||||
@@ -698,6 +719,10 @@ class PipelineTask(BasePipelineTask):
|
||||
Returns:
|
||||
Whether the pipeline task should continue running.
|
||||
"""
|
||||
# If we are cancelling, just exit the task.
|
||||
if self._cancelled:
|
||||
return True
|
||||
|
||||
logger.warning("Idle timeout detected. Last 10 frames received:")
|
||||
for i, frame in enumerate(last_frames, 1):
|
||||
logger.warning(f"Frame {i}: {frame}")
|
||||
|
||||
Reference in New Issue
Block a user