diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 05abfceb4..527b19dd0 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -17,7 +17,7 @@ import asyncio import dataclasses import traceback import warnings -from collections.abc import Awaitable, Callable, Coroutine +from collections.abc import Awaitable, Callable from dataclasses import dataclass from enum import Enum from typing import ( @@ -217,9 +217,6 @@ class FrameProcessor(BaseObject): # Clock self._clock: BaseClock | None = None - # Task Manager - self._task_manager: BaseTaskManager | None = None - # Observer self._observer: BaseObserver | None = None @@ -368,20 +365,6 @@ class FrameProcessor(BaseObject): """ return self._report_only_initial_ttfb - @property - def task_manager(self) -> BaseTaskManager: - """Get the task manager for this processor. - - Returns: - The task manager instance. - - Raises: - Exception: If the task manager is not initialized. - """ - if not self._task_manager: - raise Exception(f"{self} TaskManager is still not initialized.") - return self._task_manager - @property def pipeline_task(self) -> PipelineTask | None: """Get the :class:`PipelineTask` this processor is running in. @@ -511,43 +494,14 @@ class FrameProcessor(BaseObject): await self.stop_processing_metrics() await self.stop_text_aggregation_metrics() - def create_task(self, coroutine: Coroutine, name: str | None = None) -> asyncio.Task: - """Create a new task managed by this processor. - - Args: - coroutine: The coroutine to run in the task. - name: Optional name for the task. - - Returns: - The created asyncio task. - """ - if name: - name = f"{self}::{name}" - else: - name = f"{self}::{coroutine.cr_code.co_name}" - return self.task_manager.create_task(coroutine, name) - - async def cancel_task(self, task: asyncio.Task, timeout: float | None = 1.0): - """Cancel a task managed by this processor. - - A default timeout if 1 second is used in order to avoid potential - freezes caused by certain libraries that swallow - `asyncio.CancelledError`. - - Args: - task: The task to cancel. - timeout: Optional timeout for task cancellation. - """ - await self.task_manager.cancel_task(task, timeout) - async def setup(self, setup: FrameProcessorSetup): """Set up the processor with required components. Args: setup: Configuration object containing setup parameters. """ + await super().setup(setup.task_manager) self._clock = setup.clock - self._task_manager = setup.task_manager self._observer = setup.observer self._pipeline_task = setup.pipeline_task @@ -555,7 +509,7 @@ class FrameProcessor(BaseObject): self.__create_input_task() if self._metrics is not None: - await self._metrics.setup(self._task_manager) + await self._metrics.setup(self.task_manager) async def cleanup(self): """Clean up processor resources.""" diff --git a/src/pipecat/processors/metrics/frame_processor_metrics.py b/src/pipecat/processors/metrics/frame_processor_metrics.py index 97098ffed..9e49cb122 100644 --- a/src/pipecat/processors/metrics/frame_processor_metrics.py +++ b/src/pipecat/processors/metrics/frame_processor_metrics.py @@ -20,7 +20,6 @@ from pipecat.metrics.metrics import ( TTFBMetricsData, TTSUsageMetricsData, ) -from pipecat.utils.asyncio.task_manager import BaseTaskManager from pipecat.utils.base_object import BaseObject @@ -40,36 +39,12 @@ class FrameProcessorMetrics(BaseObject): processing times, and usage statistics. """ super().__init__() - self._task_manager = None self._start_ttfb_time = 0 self._start_processing_time = 0 self._start_text_aggregation_time = 0 self._last_ttfb_time = 0 self._should_report_ttfb = True - async def setup(self, task_manager: BaseTaskManager): - """Set up the metrics collector with a task manager. - - Args: - task_manager: The task manager for handling async operations. - """ - self._task_manager = task_manager - - async def cleanup(self): - """Clean up metrics collection resources.""" - await super().cleanup() - - @property - def task_manager(self) -> BaseTaskManager: - """Get the associated task manager. - - Returns: - The task manager instance for async operations. - """ - if self._task_manager is None: - raise RuntimeError("task_manager not set; call setup() first") - return self._task_manager - @property def ttfb(self) -> float | None: """Get the current TTFB value in seconds. diff --git a/src/pipecat/turns/user_idle_controller.py b/src/pipecat/turns/user_idle_controller.py index 0fa0053bb..24f358bb8 100644 --- a/src/pipecat/turns/user_idle_controller.py +++ b/src/pipecat/turns/user_idle_controller.py @@ -19,7 +19,6 @@ from pipecat.frames.frames import ( UserStartedSpeakingFrame, UserStoppedSpeakingFrame, ) -from pipecat.utils.asyncio.task_manager import BaseTaskManager from pipecat.utils.base_object import BaseObject @@ -63,29 +62,12 @@ class UserIdleController(BaseObject): self._user_idle_timeout = user_idle_timeout - self._task_manager: BaseTaskManager | None = None - self._user_turn_in_progress: bool = False self._function_calls_in_progress: int = 0 self._idle_timer_task: asyncio.Task | None = None self._register_event_handler("on_user_turn_idle", sync=True) - @property - def task_manager(self) -> BaseTaskManager: - """Returns the configured task manager.""" - if not self._task_manager: - raise RuntimeError(f"{self} user idle controller was not properly setup") - return self._task_manager - - async def setup(self, task_manager: BaseTaskManager): - """Initialize the controller with the given task manager. - - Args: - task_manager: The task manager to be associated with this instance. - """ - self._task_manager = task_manager - async def cleanup(self): """Cleanup the controller.""" await super().cleanup() @@ -138,17 +120,14 @@ class UserIdleController(BaseObject): if self._user_idle_timeout <= 0: return await self._cancel_idle_timer() - self._idle_timer_task = self.task_manager.create_task( - self._idle_timer_expired(), - f"{self}::idle_timer", - ) + self._idle_timer_task = self.create_task(self._idle_timer_expired()) # Make sure the task is scheduled. await asyncio.sleep(0) async def _cancel_idle_timer(self): """Cancel the idle timer if running.""" if self._idle_timer_task: - await self.task_manager.cancel_task(self._idle_timer_task) + await self.cancel_task(self._idle_timer_task) self._idle_timer_task = None async def _idle_timer_expired(self): diff --git a/src/pipecat/turns/user_start/base_user_turn_start_strategy.py b/src/pipecat/turns/user_start/base_user_turn_start_strategy.py index b424d7a9e..43100382a 100644 --- a/src/pipecat/turns/user_start/base_user_turn_start_strategy.py +++ b/src/pipecat/turns/user_start/base_user_turn_start_strategy.py @@ -11,7 +11,6 @@ from dataclasses import dataclass from pipecat.frames.frames import Frame from pipecat.processors.frame_processor import FrameDirection from pipecat.turns.types import ProcessFrameResult -from pipecat.utils.asyncio.task_manager import BaseTaskManager from pipecat.utils.base_object import BaseObject @@ -72,27 +71,11 @@ class BaseUserTurnStartStrategy(BaseObject): super().__init__(**kwargs) self._enable_interruptions = enable_interruptions self._enable_user_speaking_frames = enable_user_speaking_frames - self._task_manager: BaseTaskManager | None = None self._register_event_handler("on_push_frame", sync=True) self._register_event_handler("on_broadcast_frame", sync=True) self._register_event_handler("on_user_turn_started", sync=True) self._register_event_handler("on_reset_aggregation", sync=True) - @property - def task_manager(self) -> BaseTaskManager: - """Returns the configured task manager.""" - if not self._task_manager: - raise RuntimeError(f"{self} user turn start strategy was not properly setup") - return self._task_manager - - async def setup(self, task_manager: BaseTaskManager): - """Initialize the strategy with the given task manager. - - Args: - task_manager: The task manager to be associated with this instance. - """ - self._task_manager = task_manager - async def cleanup(self): """Cleanup the strategy.""" pass diff --git a/src/pipecat/turns/user_stop/base_user_turn_stop_strategy.py b/src/pipecat/turns/user_stop/base_user_turn_stop_strategy.py index 6f303282c..4bd9709fc 100644 --- a/src/pipecat/turns/user_stop/base_user_turn_stop_strategy.py +++ b/src/pipecat/turns/user_stop/base_user_turn_stop_strategy.py @@ -11,7 +11,6 @@ from dataclasses import dataclass from pipecat.frames.frames import Frame from pipecat.processors.frame_processor import FrameDirection from pipecat.turns.types import ProcessFrameResult -from pipecat.utils.asyncio.task_manager import BaseTaskManager from pipecat.utils.base_object import BaseObject @@ -70,27 +69,11 @@ class BaseUserTurnStopStrategy(BaseObject): """ super().__init__(**kwargs) self._enable_user_speaking_frames = enable_user_speaking_frames - self._task_manager: BaseTaskManager | None = None self._register_event_handler("on_push_frame", sync=True) self._register_event_handler("on_broadcast_frame", sync=True) self._register_event_handler("on_user_turn_inference_triggered", sync=True) self._register_event_handler("on_user_turn_stopped", sync=True) - @property - def task_manager(self) -> BaseTaskManager: - """Returns the configured task manager.""" - if not self._task_manager: - raise RuntimeError(f"{self} user turn stop strategy was not properly setup") - return self._task_manager - - async def setup(self, task_manager: BaseTaskManager): - """Initialize the strategy with the given task manager. - - Args: - task_manager: The task manager to be associated with this instance. - """ - self._task_manager = task_manager - async def cleanup(self): """Cleanup the strategy.""" pass diff --git a/src/pipecat/turns/user_turn_controller.py b/src/pipecat/turns/user_turn_controller.py index 974a8ff0f..51823e368 100644 --- a/src/pipecat/turns/user_turn_controller.py +++ b/src/pipecat/turns/user_turn_controller.py @@ -92,8 +92,6 @@ class UserTurnController(BaseObject): self._user_turn_strategies = user_turn_strategies self._user_turn_stop_timeout = user_turn_stop_timeout - self._task_manager: BaseTaskManager | None = None - self._user_speaking = False self._user_turn = False @@ -108,25 +106,17 @@ class UserTurnController(BaseObject): self._register_event_handler("on_user_turn_stop_timeout", sync=True) self._register_event_handler("on_reset_aggregation", sync=True) - @property - def task_manager(self) -> BaseTaskManager: - """Returns the configured task manager.""" - if not self._task_manager: - raise RuntimeError(f"{self} user turn controller was not properly setup") - return self._task_manager - async def setup(self, task_manager: BaseTaskManager): """Initialize the controller with the given task manager. Args: task_manager: The task manager to be associated with this instance. """ - self._task_manager = task_manager + await super().setup(task_manager) if not self._user_turn_stop_timeout_task: - self._user_turn_stop_timeout_task = self.task_manager.create_task( - self._user_turn_stop_timeout_task_handler(), - f"{self}::_user_turn_stop_timeout_task_handler", + self._user_turn_stop_timeout_task = self.create_task( + self._user_turn_stop_timeout_task_handler() ) await self._setup_strategies() @@ -136,7 +126,7 @@ class UserTurnController(BaseObject): await super().cleanup() if self._user_turn_stop_timeout_task: - await self.task_manager.cancel_task(self._user_turn_stop_timeout_task) + await self.cancel_task(self._user_turn_stop_timeout_task) self._user_turn_stop_timeout_task = None await self._cleanup_strategies() diff --git a/src/pipecat/utils/base_object.py b/src/pipecat/utils/base_object.py index 896c62b6a..6733d9106 100644 --- a/src/pipecat/utils/base_object.py +++ b/src/pipecat/utils/base_object.py @@ -15,11 +15,13 @@ import asyncio import inspect import traceback from abc import ABC +from collections.abc import Coroutine from dataclasses import dataclass from typing import Any from loguru import logger +from pipecat.utils.asyncio.task_manager import BaseTaskManager from pipecat.utils.utils import obj_count, obj_id @@ -69,6 +71,10 @@ class BaseObject(ABC): # event tasks still being executed. self._event_tasks = set() + # Task manager. Populated by setup(); accessing the task_manager + # property before setup raises. + self._task_manager: BaseTaskManager | None = None + @property def id(self) -> int: """Get the unique identifier for this object. @@ -87,6 +93,65 @@ class BaseObject(ABC): """ return self._name + @property + def task_manager(self) -> BaseTaskManager: + """Get the task manager for this object. + + Returns: + The task manager instance. + + Raises: + Exception: If the task manager is not initialized. + """ + if not self._task_manager: + raise Exception(f"{self}: TaskManager is not initialized.") + return self._task_manager + + async def setup(self, task_manager: BaseTaskManager): + """Wire the object up with a task manager. + + Owners of a :class:`BaseObject` should call this on their child objects + to propagate the task manager down. Subclasses that own other + :class:`BaseObject` instances should override and forward:: + + async def setup(self, task_manager): + await super().setup(task_manager) + await self._child.setup(task_manager) + + Args: + task_manager: The task manager to associate with this instance. + """ + self._task_manager = task_manager + + def create_task(self, coroutine: Coroutine, name: str | None = None) -> asyncio.Task: + """Create a new task managed by this object's task manager. + + Args: + coroutine: The coroutine to run in the task. + name: Optional name for the task. + + Returns: + The created asyncio task. + """ + if name: + name = f"{self}::{name}" + else: + name = f"{self}::{coroutine.cr_code.co_name}" + return self.task_manager.create_task(coroutine, name) + + async def cancel_task(self, task: asyncio.Task, timeout: float | None = 1.0): + """Cancel a task managed by this object's task manager. + + A default timeout of 1 second is used in order to avoid potential + freezes caused by certain libraries that swallow + :class:`asyncio.CancelledError`. + + Args: + task: The task to cancel. + timeout: Optional timeout for task cancellation. + """ + await self.task_manager.cancel_task(task, timeout) + async def cleanup(self): """Clean up resources and wait for running event handlers to complete.