Merge pull request #1283 from pipecat-ai/aleix/check-dangling-tasks

PipelineTask: add check_dangling_tasks parameter
This commit is contained in:
Aleix Conchillo Flaqué
2025-02-24 21:26:32 -08:00
committed by GitHub
2 changed files with 9 additions and 1 deletions

View File

@@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added a new `PipelineTask` parameter `check_dangling_tasks` to enable or
disable checking for frame processors' dangling tasks when the Pipeline
finishes running.
- Added new `on_completion_timeout` event for LLM services (all OpenAI-based
services, Anthropic and Google). Note that this event will only get triggered
if LLM timeouts are setup and if the timeout was reached. It can be useful to

View File

@@ -123,6 +123,7 @@ class PipelineTask(BaseTask):
pipeline: The pipeline to execute.
params: Configuration parameters for the pipeline.
clock: Clock implementation for timing operations.
check_dangling_tasks: Whether to check for processors' tasks finishing properly.
"""
def __init__(
@@ -130,6 +131,7 @@ class PipelineTask(BaseTask):
pipeline: BasePipeline,
params: PipelineParams = PipelineParams(),
clock: BaseClock = SystemClock(),
check_dangling_tasks: bool = True,
):
self._id: int = obj_id()
self._name: str = f"{self.__class__.__name__}#{obj_count(self)}"
@@ -137,6 +139,7 @@ class PipelineTask(BaseTask):
self._pipeline = pipeline
self._clock = clock
self._params = params
self._check_dangling_tasks = check_dangling_tasks
self._finished = False
# This queue receives frames coming from the pipeline upstream.
@@ -220,7 +223,8 @@ class PipelineTask(BaseTask):
pass
await self._cancel_tasks()
await self._cleanup()
self._print_dangling_tasks()
if self._check_dangling_tasks:
self._print_dangling_tasks()
self._finished = True
async def queue_frame(self, frame: Frame):