Move create_task and cancel_task from FrameProcessor to BaseObject

Lift the task manager wiring (`_task_manager`, `task_manager` property,
`create_task`, `cancel_task`, and `setup(task_manager)`) up to
`BaseObject`. Owners propagate the task manager to their child
`BaseObject`s via `await child.setup(task_manager)`, matching the
existing convention.

Removes duplicated `_task_manager` / `task_manager` property / setup
implementations from `FrameProcessor`, `FrameProcessorMetrics`,
`UserIdleController`, `UserTurnController`,
`BaseUserTurnStartStrategy`, and `BaseUserTurnStopStrategy`.
This commit is contained in:
Aleix Conchillo Flaqué
2026-05-08 13:37:42 -07:00
parent 94a94ee28c
commit df1b071a13
7 changed files with 74 additions and 145 deletions

View File

@@ -17,7 +17,7 @@ import asyncio
import dataclasses
import traceback
import warnings
from collections.abc import Awaitable, Callable, Coroutine
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from enum import Enum
from typing import (
@@ -217,9 +217,6 @@ class FrameProcessor(BaseObject):
# Clock
self._clock: BaseClock | None = None
# Task Manager
self._task_manager: BaseTaskManager | None = None
# Observer
self._observer: BaseObserver | None = None
@@ -368,20 +365,6 @@ class FrameProcessor(BaseObject):
"""
return self._report_only_initial_ttfb
@property
def task_manager(self) -> BaseTaskManager:
"""Get the task manager for this processor.
Returns:
The task manager instance.
Raises:
Exception: If the task manager is not initialized.
"""
if not self._task_manager:
raise Exception(f"{self} TaskManager is still not initialized.")
return self._task_manager
@property
def pipeline_task(self) -> PipelineTask | None:
"""Get the :class:`PipelineTask` this processor is running in.
@@ -511,43 +494,14 @@ class FrameProcessor(BaseObject):
await self.stop_processing_metrics()
await self.stop_text_aggregation_metrics()
def create_task(self, coroutine: Coroutine, name: str | None = None) -> asyncio.Task:
"""Create a new task managed by this processor.
Args:
coroutine: The coroutine to run in the task.
name: Optional name for the task.
Returns:
The created asyncio task.
"""
if name:
name = f"{self}::{name}"
else:
name = f"{self}::{coroutine.cr_code.co_name}"
return self.task_manager.create_task(coroutine, name)
async def cancel_task(self, task: asyncio.Task, timeout: float | None = 1.0):
"""Cancel a task managed by this processor.
A default timeout if 1 second is used in order to avoid potential
freezes caused by certain libraries that swallow
`asyncio.CancelledError`.
Args:
task: The task to cancel.
timeout: Optional timeout for task cancellation.
"""
await self.task_manager.cancel_task(task, timeout)
async def setup(self, setup: FrameProcessorSetup):
"""Set up the processor with required components.
Args:
setup: Configuration object containing setup parameters.
"""
await super().setup(setup.task_manager)
self._clock = setup.clock
self._task_manager = setup.task_manager
self._observer = setup.observer
self._pipeline_task = setup.pipeline_task
@@ -555,7 +509,7 @@ class FrameProcessor(BaseObject):
self.__create_input_task()
if self._metrics is not None:
await self._metrics.setup(self._task_manager)
await self._metrics.setup(self.task_manager)
async def cleanup(self):
"""Clean up processor resources."""

View File

@@ -20,7 +20,6 @@ from pipecat.metrics.metrics import (
TTFBMetricsData,
TTSUsageMetricsData,
)
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.base_object import BaseObject
@@ -40,36 +39,12 @@ class FrameProcessorMetrics(BaseObject):
processing times, and usage statistics.
"""
super().__init__()
self._task_manager = None
self._start_ttfb_time = 0
self._start_processing_time = 0
self._start_text_aggregation_time = 0
self._last_ttfb_time = 0
self._should_report_ttfb = True
async def setup(self, task_manager: BaseTaskManager):
"""Set up the metrics collector with a task manager.
Args:
task_manager: The task manager for handling async operations.
"""
self._task_manager = task_manager
async def cleanup(self):
"""Clean up metrics collection resources."""
await super().cleanup()
@property
def task_manager(self) -> BaseTaskManager:
"""Get the associated task manager.
Returns:
The task manager instance for async operations.
"""
if self._task_manager is None:
raise RuntimeError("task_manager not set; call setup() first")
return self._task_manager
@property
def ttfb(self) -> float | None:
"""Get the current TTFB value in seconds.

View File

@@ -19,7 +19,6 @@ from pipecat.frames.frames import (
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.base_object import BaseObject
@@ -63,29 +62,12 @@ class UserIdleController(BaseObject):
self._user_idle_timeout = user_idle_timeout
self._task_manager: BaseTaskManager | None = None
self._user_turn_in_progress: bool = False
self._function_calls_in_progress: int = 0
self._idle_timer_task: asyncio.Task | None = None
self._register_event_handler("on_user_turn_idle", sync=True)
@property
def task_manager(self) -> BaseTaskManager:
"""Returns the configured task manager."""
if not self._task_manager:
raise RuntimeError(f"{self} user idle controller was not properly setup")
return self._task_manager
async def setup(self, task_manager: BaseTaskManager):
"""Initialize the controller with the given task manager.
Args:
task_manager: The task manager to be associated with this instance.
"""
self._task_manager = task_manager
async def cleanup(self):
"""Cleanup the controller."""
await super().cleanup()
@@ -138,17 +120,14 @@ class UserIdleController(BaseObject):
if self._user_idle_timeout <= 0:
return
await self._cancel_idle_timer()
self._idle_timer_task = self.task_manager.create_task(
self._idle_timer_expired(),
f"{self}::idle_timer",
)
self._idle_timer_task = self.create_task(self._idle_timer_expired())
# Make sure the task is scheduled.
await asyncio.sleep(0)
async def _cancel_idle_timer(self):
"""Cancel the idle timer if running."""
if self._idle_timer_task:
await self.task_manager.cancel_task(self._idle_timer_task)
await self.cancel_task(self._idle_timer_task)
self._idle_timer_task = None
async def _idle_timer_expired(self):

View File

@@ -11,7 +11,6 @@ from dataclasses import dataclass
from pipecat.frames.frames import Frame
from pipecat.processors.frame_processor import FrameDirection
from pipecat.turns.types import ProcessFrameResult
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.base_object import BaseObject
@@ -72,27 +71,11 @@ class BaseUserTurnStartStrategy(BaseObject):
super().__init__(**kwargs)
self._enable_interruptions = enable_interruptions
self._enable_user_speaking_frames = enable_user_speaking_frames
self._task_manager: BaseTaskManager | None = None
self._register_event_handler("on_push_frame", sync=True)
self._register_event_handler("on_broadcast_frame", sync=True)
self._register_event_handler("on_user_turn_started", sync=True)
self._register_event_handler("on_reset_aggregation", sync=True)
@property
def task_manager(self) -> BaseTaskManager:
"""Returns the configured task manager."""
if not self._task_manager:
raise RuntimeError(f"{self} user turn start strategy was not properly setup")
return self._task_manager
async def setup(self, task_manager: BaseTaskManager):
"""Initialize the strategy with the given task manager.
Args:
task_manager: The task manager to be associated with this instance.
"""
self._task_manager = task_manager
async def cleanup(self):
"""Cleanup the strategy."""
pass

View File

@@ -11,7 +11,6 @@ from dataclasses import dataclass
from pipecat.frames.frames import Frame
from pipecat.processors.frame_processor import FrameDirection
from pipecat.turns.types import ProcessFrameResult
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.base_object import BaseObject
@@ -70,27 +69,11 @@ class BaseUserTurnStopStrategy(BaseObject):
"""
super().__init__(**kwargs)
self._enable_user_speaking_frames = enable_user_speaking_frames
self._task_manager: BaseTaskManager | None = None
self._register_event_handler("on_push_frame", sync=True)
self._register_event_handler("on_broadcast_frame", sync=True)
self._register_event_handler("on_user_turn_inference_triggered", sync=True)
self._register_event_handler("on_user_turn_stopped", sync=True)
@property
def task_manager(self) -> BaseTaskManager:
"""Returns the configured task manager."""
if not self._task_manager:
raise RuntimeError(f"{self} user turn stop strategy was not properly setup")
return self._task_manager
async def setup(self, task_manager: BaseTaskManager):
"""Initialize the strategy with the given task manager.
Args:
task_manager: The task manager to be associated with this instance.
"""
self._task_manager = task_manager
async def cleanup(self):
"""Cleanup the strategy."""
pass

View File

@@ -92,8 +92,6 @@ class UserTurnController(BaseObject):
self._user_turn_strategies = user_turn_strategies
self._user_turn_stop_timeout = user_turn_stop_timeout
self._task_manager: BaseTaskManager | None = None
self._user_speaking = False
self._user_turn = False
@@ -108,25 +106,17 @@ class UserTurnController(BaseObject):
self._register_event_handler("on_user_turn_stop_timeout", sync=True)
self._register_event_handler("on_reset_aggregation", sync=True)
@property
def task_manager(self) -> BaseTaskManager:
"""Returns the configured task manager."""
if not self._task_manager:
raise RuntimeError(f"{self} user turn controller was not properly setup")
return self._task_manager
async def setup(self, task_manager: BaseTaskManager):
"""Initialize the controller with the given task manager.
Args:
task_manager: The task manager to be associated with this instance.
"""
self._task_manager = task_manager
await super().setup(task_manager)
if not self._user_turn_stop_timeout_task:
self._user_turn_stop_timeout_task = self.task_manager.create_task(
self._user_turn_stop_timeout_task_handler(),
f"{self}::_user_turn_stop_timeout_task_handler",
self._user_turn_stop_timeout_task = self.create_task(
self._user_turn_stop_timeout_task_handler()
)
await self._setup_strategies()
@@ -136,7 +126,7 @@ class UserTurnController(BaseObject):
await super().cleanup()
if self._user_turn_stop_timeout_task:
await self.task_manager.cancel_task(self._user_turn_stop_timeout_task)
await self.cancel_task(self._user_turn_stop_timeout_task)
self._user_turn_stop_timeout_task = None
await self._cleanup_strategies()

View File

@@ -15,11 +15,13 @@ import asyncio
import inspect
import traceback
from abc import ABC
from collections.abc import Coroutine
from dataclasses import dataclass
from typing import Any
from loguru import logger
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.utils import obj_count, obj_id
@@ -69,6 +71,10 @@ class BaseObject(ABC):
# event tasks still being executed.
self._event_tasks = set()
# Task manager. Populated by setup(); accessing the task_manager
# property before setup raises.
self._task_manager: BaseTaskManager | None = None
@property
def id(self) -> int:
"""Get the unique identifier for this object.
@@ -87,6 +93,65 @@ class BaseObject(ABC):
"""
return self._name
@property
def task_manager(self) -> BaseTaskManager:
"""Get the task manager for this object.
Returns:
The task manager instance.
Raises:
Exception: If the task manager is not initialized.
"""
if not self._task_manager:
raise Exception(f"{self}: TaskManager is not initialized.")
return self._task_manager
async def setup(self, task_manager: BaseTaskManager):
"""Wire the object up with a task manager.
Owners of a :class:`BaseObject` should call this on their child objects
to propagate the task manager down. Subclasses that own other
:class:`BaseObject` instances should override and forward::
async def setup(self, task_manager):
await super().setup(task_manager)
await self._child.setup(task_manager)
Args:
task_manager: The task manager to associate with this instance.
"""
self._task_manager = task_manager
def create_task(self, coroutine: Coroutine, name: str | None = None) -> asyncio.Task:
"""Create a new task managed by this object's task manager.
Args:
coroutine: The coroutine to run in the task.
name: Optional name for the task.
Returns:
The created asyncio task.
"""
if name:
name = f"{self}::{name}"
else:
name = f"{self}::{coroutine.cr_code.co_name}"
return self.task_manager.create_task(coroutine, name)
async def cancel_task(self, task: asyncio.Task, timeout: float | None = 1.0):
"""Cancel a task managed by this object's task manager.
A default timeout of 1 second is used in order to avoid potential
freezes caused by certain libraries that swallow
:class:`asyncio.CancelledError`.
Args:
task: The task to cancel.
timeout: Optional timeout for task cancellation.
"""
await self.task_manager.cancel_task(task, timeout)
async def cleanup(self):
"""Clean up resources and wait for running event handlers to complete.