diff --git a/CHANGELOG.md b/CHANGELOG.md index 73285e07c..908197dca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Removed +- Watchdog timers have been removed. They were introduced in 0.0.72 to help + diagnose pipeline freezes. Unfortunately, they proved ineffective since they + required developers to use Pipecat-specific queues, iterators, and events to + correctly reset the timer, which limited their usefulness and added friction. + - Removed unused `FrameProcessor.set_parent()` and `FrameProcessor.get_parent()`. diff --git a/src/pipecat/pipeline/sync_parallel_pipeline.py b/src/pipecat/pipeline/sync_parallel_pipeline.py index b09207a53..41a07a632 100644 --- a/src/pipecat/pipeline/sync_parallel_pipeline.py +++ b/src/pipecat/pipeline/sync_parallel_pipeline.py @@ -22,7 +22,6 @@ from pipecat.frames.frames import ControlFrame, EndFrame, Frame, SystemFrame from pipecat.pipeline.base_pipeline import BasePipeline from pipecat.pipeline.pipeline import Pipeline from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup -from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue @dataclass @@ -128,11 +127,35 @@ class SyncParallelPipeline(BasePipeline): if len(args) == 0: raise Exception(f"SyncParallelPipeline needs at least one argument") - self._args = args self._sinks = [] self._sources = [] self._pipelines = [] + self._up_queue = asyncio.Queue() + self._down_queue = asyncio.Queue() + + logger.debug(f"Creating {self} pipelines") + for processors in args: + if not isinstance(processors, list): + raise TypeError(f"SyncParallelPipeline argument {processors} is not a list") + + # We add a source at the beginning of the pipeline and a sink at the end. + up_queue = asyncio.Queue() + down_queue = asyncio.Queue() + source = SyncParallelPipelineSource(up_queue) + sink = SyncParallelPipelineSink(down_queue) + + # Keep track of sources and sinks. We also keep the output queue of + # the source and the sinks so we can use it later. + self._sources.append({"processor": source, "queue": down_queue}) + self._sinks.append({"processor": sink, "queue": up_queue}) + + # Create pipeline + pipeline = Pipeline(processors, source=source, sink=sink) + self._pipelines.append(pipeline) + + logger.debug(f"Finished creating {self} pipelines") + # # Frame processor # @@ -178,32 +201,6 @@ class SyncParallelPipeline(BasePipeline): setup: Configuration for frame processor setup. """ await super().setup(setup) - - self._up_queue = WatchdogQueue(setup.task_manager) - self._down_queue = WatchdogQueue(setup.task_manager) - - logger.debug(f"Creating {self} pipelines") - for processors in self._args: - if not isinstance(processors, list): - raise TypeError(f"SyncParallelPipeline argument {processors} is not a list") - - # We add a source at the beginning of the pipeline and a sink at the end. - up_queue = asyncio.Queue() - down_queue = asyncio.Queue() - source = SyncParallelPipelineSource(up_queue) - sink = SyncParallelPipelineSink(down_queue) - - # Keep track of sources and sinks. We also keep the output queue of - # the source and the sinks so we can use it later. - self._sources.append({"processor": source, "queue": down_queue}) - self._sinks.append({"processor": sink, "queue": up_queue}) - - # Create pipeline - pipeline = Pipeline(processors, source=source, sink=sink) - self._pipelines.append(pipeline) - - logger.debug(f"Finished creating {self} pipelines") - await asyncio.gather(*[p.setup(setup) for p in self._pipelines]) async def cleanup(self): diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 9b3586ea8..27e1fcd9a 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -49,13 +49,7 @@ from pipecat.pipeline.base_task import BasePipelineTask, PipelineTaskParams from pipecat.pipeline.pipeline import Pipeline, PipelineSink, PipelineSource from pipecat.pipeline.task_observer import TaskObserver from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup -from pipecat.utils.asyncio.task_manager import ( - WATCHDOG_TIMEOUT, - BaseTaskManager, - TaskManager, - TaskManagerParams, -) -from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue +from pipecat.utils.asyncio.task_manager import BaseTaskManager, TaskManager, TaskManagerParams from pipecat.utils.tracing.setup import is_tracing_available from pipecat.utils.tracing.turn_trace_observer import TurnTraceObserver @@ -146,8 +140,6 @@ class PipelineTask(BasePipelineTask): conversation_id: Optional[str] = None, enable_tracing: bool = False, enable_turn_tracking: bool = True, - enable_watchdog_logging: bool = False, - enable_watchdog_timers: bool = False, idle_timeout_frames: Tuple[Type[Frame], ...] = ( BotSpeakingFrame, InterimTranscriptionFrame, @@ -159,7 +151,6 @@ class PipelineTask(BasePipelineTask): idle_timeout_secs: Optional[float] = 300, observers: Optional[List[BaseObserver]] = None, task_manager: Optional[BaseTaskManager] = None, - watchdog_timeout_secs: float = WATCHDOG_TIMEOUT, ): """Initialize the PipelineTask. @@ -175,8 +166,6 @@ class PipelineTask(BasePipelineTask): conversation_id: Optional custom ID for the conversation. enable_tracing: Whether to enable tracing. enable_turn_tracking: Whether to enable turn tracking. - enable_watchdog_logging: Whether to print task processing times. - enable_watchdog_timers: Whether to enable task watchdog timers. idle_timeout_frames: A tuple with the frames that should trigger an idle timeout if not received within `idle_timeout_seconds`. idle_timeout_secs: Timeout (in seconds) to consider pipeline idle or @@ -184,8 +173,6 @@ class PipelineTask(BasePipelineTask): automatically. observers: List of observers for monitoring pipeline execution. task_manager: Optional task manager for handling asyncio tasks. - watchdog_timeout_secs: Watchdog timer timeout (in seconds). A warning - will be logged if the watchdog timer is not reset before this timeout. """ super().__init__() self._params = params or PipelineParams() @@ -196,11 +183,8 @@ class PipelineTask(BasePipelineTask): self._conversation_id = conversation_id self._enable_tracing = enable_tracing and is_tracing_available() self._enable_turn_tracking = enable_turn_tracking - self._enable_watchdog_logging = enable_watchdog_logging - self._enable_watchdog_timers = enable_watchdog_timers self._idle_timeout_frames = idle_timeout_frames self._idle_timeout_secs = idle_timeout_secs - self._watchdog_timeout_secs = watchdog_timeout_secs if self._params.observers: import warnings @@ -232,17 +216,17 @@ class PipelineTask(BasePipelineTask): self._task_manager = task_manager or TaskManager() # This queue is the queue used to push frames to the pipeline. - self._push_queue = WatchdogQueue(self._task_manager) + self._push_queue = asyncio.Queue() self._process_push_task: Optional[asyncio.Task] = None # This is the heartbeat queue. When a heartbeat frame is received in the # down queue we add it to the heartbeat queue for processing. - self._heartbeat_queue = WatchdogQueue(self._task_manager) + self._heartbeat_queue = asyncio.Queue() self._heartbeat_push_task: Optional[asyncio.Task] = None self._heartbeat_monitor_task: Optional[asyncio.Task] = None # This is the idle queue. When frames are received downstream they are # put in the queue. If no frame is received the pipeline is considered # idle. - self._idle_queue = WatchdogQueue(self._task_manager) + self._idle_queue = asyncio.Queue() self._idle_monitor_task: Optional[asyncio.Task] = None # This event is used to indicate a finalize frame (e.g. EndFrame, # StopFrame) has been received in the down queue. @@ -491,7 +475,6 @@ class PipelineTask(BasePipelineTask): async def _maybe_cancel_idle_task(self): """Cancel idle monitoring task if it is running.""" if self._idle_timeout_secs and self._idle_monitor_task: - self._idle_queue.cancel() await self._task_manager.cancel_task(self._idle_monitor_task) self._idle_monitor_task = None @@ -511,19 +494,13 @@ class PipelineTask(BasePipelineTask): async def _setup(self, params: PipelineTaskParams): """Set up the pipeline task and all processors.""" - mgr_params = TaskManagerParams( - loop=params.loop, - enable_watchdog_logging=self._enable_watchdog_logging, - enable_watchdog_timers=self._enable_watchdog_timers, - watchdog_timeout=self._watchdog_timeout_secs, - ) + mgr_params = TaskManagerParams(loop=params.loop) self._task_manager.setup(mgr_params) setup = FrameProcessorSetup( clock=self._clock, task_manager=self._task_manager, observer=self._observer, - watchdog_timers_enabled=self._enable_watchdog_timers, ) await self._pipeline.setup(setup) diff --git a/src/pipecat/pipeline/task_observer.py b/src/pipecat/pipeline/task_observer.py index 6e4e44742..6f266a0db 100644 --- a/src/pipecat/pipeline/task_observer.py +++ b/src/pipecat/pipeline/task_observer.py @@ -19,7 +19,6 @@ from attr import dataclass from pipecat.observers.base_observer import BaseObserver, FrameProcessed, FramePushed from pipecat.utils.asyncio.task_manager import BaseTaskManager -from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue @dataclass @@ -86,7 +85,7 @@ class TaskObserver(BaseObserver): # If we already started, create a new proxy for the observer. # Otherwise, it will be created in start(). - if self._started(): + if self._proxies: proxy = self._create_proxy(observer) self._proxies[observer] = proxy @@ -97,7 +96,7 @@ class TaskObserver(BaseObserver): observer: The observer to remove. """ # If the observer has a proxy, remove it. - if observer in self._proxies: + if self._proxies and observer in self._proxies: proxy = self._proxies[observer] # Remove the proxy so it doesn't get called anymore. del self._proxies[observer] @@ -136,13 +135,9 @@ class TaskObserver(BaseObserver): """ await self._send_to_proxy(data) - def _started(self) -> bool: - """Check if the task observer has been started.""" - return self._proxies is not None - def _create_proxy(self, observer: BaseObserver) -> Proxy: """Create a proxy for a single observer.""" - queue = WatchdogQueue(self._task_manager) + queue = asyncio.Queue() task = self._task_manager.create_task( self._proxy_task_handler(queue, observer), f"TaskObserver::{observer}::_proxy_task_handler", diff --git a/src/pipecat/processors/aggregators/dtmf_aggregator.py b/src/pipecat/processors/aggregators/dtmf_aggregator.py index 75cded7fb..24ef2a1e1 100644 --- a/src/pipecat/processors/aggregators/dtmf_aggregator.py +++ b/src/pipecat/processors/aggregators/dtmf_aggregator.py @@ -25,7 +25,6 @@ from pipecat.frames.frames import ( TranscriptionFrame, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup -from pipecat.utils.asyncio.watchdog_event import WatchdogEvent from pipecat.utils.time import time_now_iso8601 @@ -63,13 +62,9 @@ class DTMFAggregator(FrameProcessor): self._termination_digit = termination_digit self._prefix = prefix + self._digit_event = asyncio.Event() self._aggregation_task: Optional[asyncio.Task] = None - async def setup(self, setup: FrameProcessorSetup): - """Setup resources.""" - await super().setup(setup) - self._digit_event = WatchdogEvent(setup.task_manager) - async def cleanup(self) -> None: """Clean up resources.""" await super().cleanup() @@ -137,7 +132,6 @@ class DTMFAggregator(FrameProcessor): await asyncio.wait_for(self._digit_event.wait(), timeout=self._idle_timeout) self._digit_event.clear() except asyncio.TimeoutError: - self.reset_watchdog() if self._aggregation: await self._flush_aggregation() diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index a5b746467..7fed35f1c 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -684,7 +684,6 @@ class LLMUserContextAggregator(LLMContextResponseAggregator): ) self._emulating_vad = False finally: - self.reset_watchdog() self._aggregation_event.clear() async def _maybe_emulate_user_speaking(self): diff --git a/src/pipecat/processors/consumer_processor.py b/src/pipecat/processors/consumer_processor.py index 7812bbbd3..5445b492d 100644 --- a/src/pipecat/processors/consumer_processor.py +++ b/src/pipecat/processors/consumer_processor.py @@ -12,7 +12,6 @@ from typing import Awaitable, Callable, Optional from pipecat.frames.frames import CancelFrame, EndFrame, Frame, StartFrame from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.processors.producer_processor import ProducerProcessor, identity_transformer -from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue class ConsumerProcessor(FrameProcessor): @@ -66,7 +65,7 @@ class ConsumerProcessor(FrameProcessor): async def _start(self, _: StartFrame): """Start the consumer task and register with the producer.""" if not self._consumer_task: - self._queue: WatchdogQueue = self._producer.add_consumer() + self._queue = self._producer.add_consumer() self._consumer_task = self.create_task(self._consumer_task_handler()) async def _stop(self, _: EndFrame): @@ -77,7 +76,6 @@ class ConsumerProcessor(FrameProcessor): async def _cancel(self, _: CancelFrame): """Cancel the consumer task.""" if self._consumer_task: - self._queue.cancel() await self.cancel_task(self._consumer_task) async def _consumer_task_handler(self): diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 95b300fce..fe36804fe 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -37,9 +37,6 @@ from pipecat.metrics.metrics import LLMTokenUsage, MetricsData from pipecat.observers.base_observer import BaseObserver, FrameProcessed, FramePushed from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics from pipecat.utils.asyncio.task_manager import BaseTaskManager -from pipecat.utils.asyncio.watchdog_event import WatchdogEvent -from pipecat.utils.asyncio.watchdog_priority_queue import WatchdogPriorityQueue -from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue from pipecat.utils.base_object import BaseObject @@ -66,16 +63,14 @@ class FrameProcessorSetup: clock: The clock instance for timing operations. task_manager: The task manager for handling async operations. observer: Optional observer for monitoring frame processing events. - watchdog_timers_enabled: Whether to enable watchdog timers by default. """ clock: BaseClock task_manager: BaseTaskManager observer: Optional[BaseObserver] = None - watchdog_timers_enabled: bool = False -class FrameProcessorQueue(WatchdogPriorityQueue): +class FrameProcessorQueue(asyncio.PriorityQueue): """A priority queue for systems frames and other frames. This is a specialized queue for frame processors that separates and @@ -87,14 +82,14 @@ class FrameProcessorQueue(WatchdogPriorityQueue): HIGH_PRIORITY = 1 LOW_PRIORITY = 2 - def __init__(self, manager: BaseTaskManager): + def __init__(self): """Initialize the FrameProcessorQueue. Args: manager (BaseTaskManager): The task manager used by the internal watchdog queues. """ - super().__init__(manager, tuple_size=3) + super().__init__() self.__high_counter = 0 self.__low_counter = 0 @@ -148,10 +143,7 @@ class FrameProcessor(BaseObject): *, name: Optional[str] = None, enable_direct_mode: bool = False, - enable_watchdog_logging: Optional[bool] = None, - enable_watchdog_timers: Optional[bool] = None, metrics: Optional[FrameProcessorMetrics] = None, - watchdog_timeout_secs: Optional[float] = None, **kwargs, ): """Initialize the frame processor. @@ -159,10 +151,7 @@ class FrameProcessor(BaseObject): Args: name: Optional name for this processor instance. enable_direct_mode: Whether to process frames immediately or use internal queues. - enable_watchdog_logging: Whether to enable watchdog logging for tasks. - enable_watchdog_timers: Whether to enable watchdog timers for tasks. metrics: Optional metrics collector for this processor. - watchdog_timeout_secs: Timeout in seconds for watchdog operations. **kwargs: Additional arguments passed to parent class. """ super().__init__(name=name, **kwargs) @@ -172,15 +161,6 @@ class FrameProcessor(BaseObject): # Enable direct mode to skip queues and process frames right away. self._enable_direct_mode = enable_direct_mode - # Enable watchdog timers for all tasks created by this frame processor. - self._enable_watchdog_timers = enable_watchdog_timers - - # Enable watchdog logging for all tasks created by this frame processor. - self._enable_watchdog_logging = enable_watchdog_logging - - # Allow this frame processor to control their tasks timeout. - self._watchdog_timeout_secs = watchdog_timeout_secs - # Clock self._clock: Optional[BaseClock] = None @@ -434,23 +414,12 @@ class FrameProcessor(BaseObject): await self.stop_ttfb_metrics() await self.stop_processing_metrics() - def create_task( - self, - coroutine: Coroutine, - name: Optional[str] = None, - *, - enable_watchdog_logging: Optional[bool] = None, - enable_watchdog_timers: Optional[bool] = None, - watchdog_timeout_secs: Optional[float] = None, - ) -> asyncio.Task: + def create_task(self, coroutine: Coroutine, name: Optional[str] = None) -> asyncio.Task: """Create a new task managed by this processor. Args: coroutine: The coroutine to run in the task. name: Optional name for the task. - enable_watchdog_logging: Whether to enable watchdog logging. - enable_watchdog_timers: Whether to enable watchdog timers. - watchdog_timeout_secs: Timeout in seconds for watchdog operations. Returns: The created asyncio task. @@ -459,21 +428,7 @@ class FrameProcessor(BaseObject): name = f"{self}::{name}" else: name = f"{self}::{coroutine.cr_code.co_name}" - return self.task_manager.create_task( - coroutine, - name, - enable_watchdog_logging=( - enable_watchdog_logging - if enable_watchdog_logging - else self._enable_watchdog_logging - ), - enable_watchdog_timers=( - enable_watchdog_timers if enable_watchdog_timers else self._enable_watchdog_timers - ), - watchdog_timeout=( - watchdog_timeout_secs if watchdog_timeout_secs else self._watchdog_timeout_secs - ), - ) + return self.task_manager.create_task(coroutine, name) async def cancel_task(self, task: asyncio.Task, timeout: Optional[float] = None): """Cancel a task managed by this processor. @@ -493,10 +448,6 @@ class FrameProcessor(BaseObject): """ await self.task_manager.wait_for_task(task, timeout) - def reset_watchdog(self): - """Reset the watchdog timer for the current task.""" - self.task_manager.task_reset_watchdog() - async def setup(self, setup: FrameProcessorSetup): """Set up the processor with required components. @@ -506,11 +457,6 @@ class FrameProcessor(BaseObject): self._clock = setup.clock self._task_manager = setup.task_manager self._observer = setup.observer - self._watchdog_timers_enabled = ( - self._enable_watchdog_timers - if self._enable_watchdog_timers - else setup.watchdog_timers_enabled - ) # Create processing tasks. self.__create_input_task() @@ -774,14 +720,13 @@ class FrameProcessor(BaseObject): return if not self.__input_frame_task: - self.__input_event = WatchdogEvent(self.task_manager) - self.__input_queue = FrameProcessorQueue(self.task_manager) + self.__input_event = asyncio.Event() + self.__input_queue = FrameProcessorQueue() self.__input_frame_task = self.create_task(self.__input_frame_task_handler()) async def __cancel_input_task(self): """Cancel the frame input processing task.""" if self.__input_frame_task: - self.__input_queue.cancel() await self.cancel_task(self.__input_frame_task) self.__input_frame_task = None @@ -792,14 +737,13 @@ class FrameProcessor(BaseObject): if not self.__process_frame_task: self.__should_block_frames = False - self.__process_event = WatchdogEvent(self.task_manager) - self.__process_queue = WatchdogQueue(self.task_manager) + self.__process_event = asyncio.Event() + self.__process_queue = asyncio.Queue() self.__process_frame_task = self.create_task(self.__process_frame_task_handler()) async def __cancel_process_task(self): """Cancel the non-system frame processing task.""" if self.__process_frame_task: - self.__process_queue.cancel() await self.cancel_task(self.__process_frame_task) self.__process_frame_task = None diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index d72436a97..cd65e27ab 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -72,11 +72,9 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.services.llm_service import ( FunctionCallParams, # TODO(aleix): we shouldn't import `services` from `processors` ) -from pipecat.services.openai.llm import OpenAIContextAggregatorPair from pipecat.transports.base_input import BaseInputTransport from pipecat.transports.base_output import BaseOutputTransport from pipecat.transports.base_transport import BaseTransport -from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue from pipecat.utils.string import match_endofsentence RTVI_PROTOCOL_VERSION = "1.0.0" @@ -1315,10 +1313,10 @@ class RTVIProcessor(FrameProcessor): async def _start(self, frame: StartFrame): """Start the RTVI processor tasks.""" if not self._action_task: - self._action_queue = WatchdogQueue(self.task_manager) + self._action_queue = asyncio.Queue() self._action_task = self.create_task(self._action_task_handler()) if not self._message_task: - self._message_queue = WatchdogQueue(self.task_manager) + self._message_queue = asyncio.Queue() self._message_task = self.create_task(self._message_task_handler()) await self._call_event_handler("on_bot_started") @@ -1333,12 +1331,10 @@ class RTVIProcessor(FrameProcessor): async def _cancel_tasks(self): """Cancel all running tasks.""" if self._action_task: - self._action_queue.cancel() await self.cancel_task(self._action_task) self._action_task = None if self._message_task: - self._message_queue.cancel() await self.cancel_task(self._message_task) self._message_task = None diff --git a/src/pipecat/processors/idle_frame_processor.py b/src/pipecat/processors/idle_frame_processor.py index b8839124a..672027491 100644 --- a/src/pipecat/processors/idle_frame_processor.py +++ b/src/pipecat/processors/idle_frame_processor.py @@ -11,7 +11,6 @@ from typing import Awaitable, Callable, List, Optional from pipecat.frames.frames import Frame, StartFrame from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -from pipecat.utils.asyncio.watchdog_event import WatchdogEvent class IdleFrameProcessor(FrameProcessor): @@ -78,7 +77,7 @@ class IdleFrameProcessor(FrameProcessor): def _create_idle_task(self): """Create and start the idle monitoring task.""" if not self._idle_task: - self._idle_event = WatchdogEvent(self.task_manager) + self._idle_event = asyncio.Event() self._idle_task = self.create_task(self._idle_task_handler()) async def _idle_task_handler(self): diff --git a/src/pipecat/processors/metrics/sentry.py b/src/pipecat/processors/metrics/sentry.py index 755d5cd5e..75bd63c48 100644 --- a/src/pipecat/processors/metrics/sentry.py +++ b/src/pipecat/processors/metrics/sentry.py @@ -9,7 +9,6 @@ from loguru import logger from pipecat.utils.asyncio.task_manager import BaseTaskManager -from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue try: import sentry_sdk @@ -51,7 +50,7 @@ class SentryMetrics(FrameProcessorMetrics): """ await super().setup(task_manager) if self._sentry_available: - self._sentry_queue = WatchdogQueue(task_manager) + self._sentry_queue = asyncio.Queue() self._sentry_task = self.task_manager.create_task( self._sentry_task_handler(), name=f"{self}::_sentry_task_handler" ) diff --git a/src/pipecat/processors/producer_processor.py b/src/pipecat/processors/producer_processor.py index 8de9d66bb..1ab839e06 100644 --- a/src/pipecat/processors/producer_processor.py +++ b/src/pipecat/processors/producer_processor.py @@ -11,7 +11,6 @@ from typing import Awaitable, Callable, List from pipecat.frames.frames import Frame from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue async def identity_transformer(frame: Frame): @@ -64,7 +63,7 @@ class ProducerProcessor(FrameProcessor): Returns: asyncio.Queue: The queue for the newly added consumer. """ - queue = WatchdogQueue(self.task_manager) + queue = asyncio.Queue() self._consumers.append(queue) return queue diff --git a/src/pipecat/processors/user_idle_processor.py b/src/pipecat/processors/user_idle_processor.py index 5f6b25b95..406ccc885 100644 --- a/src/pipecat/processors/user_idle_processor.py +++ b/src/pipecat/processors/user_idle_processor.py @@ -22,7 +22,6 @@ from pipecat.frames.frames import ( UserStoppedSpeakingFrame, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -from pipecat.utils.asyncio.watchdog_event import WatchdogEvent class UserIdleProcessor(FrameProcessor): @@ -78,7 +77,7 @@ class UserIdleProcessor(FrameProcessor): self._interrupted = False self._conversation_started = False self._idle_task = None - self._idle_event = None + self._idle_event = asyncio.Event() def _wrap_callback( self, @@ -138,9 +137,6 @@ class UserIdleProcessor(FrameProcessor): """ await super().process_frame(frame, direction) - if isinstance(frame, StartFrame): - self._idle_event = WatchdogEvent(self.task_manager) - # Check for end frames before processing if isinstance(frame, (EndFrame, CancelFrame)): # Stop the idle task, if it exists diff --git a/src/pipecat/services/anthropic/llm.py b/src/pipecat/services/anthropic/llm.py index e351ecab2..3a26519f6 100644 --- a/src/pipecat/services/anthropic/llm.py +++ b/src/pipecat/services/anthropic/llm.py @@ -53,7 +53,6 @@ from pipecat.processors.aggregators.openai_llm_context import ( ) from pipecat.processors.frame_processor import FrameDirection from pipecat.services.llm_service import FunctionCallFromLLM, LLMService -from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator from pipecat.utils.tracing.service_decorators import traced_llm try: @@ -290,7 +289,7 @@ class AnthropicLLMService(LLMService): json_accumulator = "" function_calls = [] - async for event in WatchdogAsyncIterator(response, manager=self.task_manager): + async for event in response: # Aggregate streaming content, create frames, trigger events if event.type == "content_block_delta": diff --git a/src/pipecat/services/assemblyai/stt.py b/src/pipecat/services/assemblyai/stt.py index f25a41661..aa2fc36bc 100644 --- a/src/pipecat/services/assemblyai/stt.py +++ b/src/pipecat/services/assemblyai/stt.py @@ -244,11 +244,9 @@ class AssemblyAISTTService(STTService): try: while self._connected: try: - message = await asyncio.wait_for(self._websocket.recv(), timeout=1.0) + message = await self._websocket.recv() data = json.loads(message) await self._handle_message(data) - except asyncio.TimeoutError: - self.reset_watchdog() except websockets.exceptions.ConnectionClosedOK: break except Exception as e: diff --git a/src/pipecat/services/asyncai/tts.py b/src/pipecat/services/asyncai/tts.py index aa18183fd..4377162ef 100644 --- a/src/pipecat/services/asyncai/tts.py +++ b/src/pipecat/services/asyncai/tts.py @@ -29,7 +29,6 @@ from pipecat.frames.frames import ( from pipecat.processors.frame_processor import FrameDirection from pipecat.services.tts_service import InterruptibleTTSService, TTSService from pipecat.transcriptions.language import Language -from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator from pipecat.utils.tracing.service_decorators import traced_tts try: @@ -276,9 +275,7 @@ class AsyncAITTSService(InterruptibleTTSService): self._started = False async def _receive_messages(self): - async for message in WatchdogAsyncIterator( - self._get_websocket(), manager=self.task_manager - ): + async for message in self._get_websocket(): msg = json.loads(message) if not msg: continue @@ -301,9 +298,8 @@ class AsyncAITTSService(InterruptibleTTSService): async def _keepalive_task_handler(self): """Send periodic keepalive messages to maintain WebSocket connection.""" - KEEPALIVE_SLEEP = 10 if self.task_manager.task_watchdog_enabled else 3 + KEEPALIVE_SLEEP = 3 while True: - self.reset_watchdog() await asyncio.sleep(KEEPALIVE_SLEEP) try: if self._websocket and self._websocket.state is State.OPEN: diff --git a/src/pipecat/services/aws/llm.py b/src/pipecat/services/aws/llm.py index c935c141d..a4e1366c2 100644 --- a/src/pipecat/services/aws/llm.py +++ b/src/pipecat/services/aws/llm.py @@ -954,8 +954,6 @@ class AWSBedrockLLMService(LLMService): function_calls = [] async for event in response["stream"]: - self.reset_watchdog() - # Handle text content if "contentBlockDelta" in event: delta = event["contentBlockDelta"]["delta"] diff --git a/src/pipecat/services/aws/stt.py b/src/pipecat/services/aws/stt.py index f67f0979f..0267ea0e5 100644 --- a/src/pipecat/services/aws/stt.py +++ b/src/pipecat/services/aws/stt.py @@ -480,7 +480,7 @@ class AWSTranscribeSTTService(STTService): break try: - response = await asyncio.wait_for(self._ws_client.recv(), timeout=1.0) + response = await self._ws_client.recv() headers, payload = decode_event(response) @@ -531,8 +531,6 @@ class AWSTranscribeSTTService(STTService): else: logger.debug(f"{self} Other message type received: {headers}") logger.debug(f"{self} Payload: {payload}") - except asyncio.TimeoutError: - self.reset_watchdog() except websockets.exceptions.ConnectionClosed as e: logger.error( f"{self} WebSocket connection closed in receive loop with code {e.code}: {e.reason}" diff --git a/src/pipecat/services/aws_nova_sonic/aws.py b/src/pipecat/services/aws_nova_sonic/aws.py index 7c8eb112c..6ca6c9f61 100644 --- a/src/pipecat/services/aws_nova_sonic/aws.py +++ b/src/pipecat/services/aws_nova_sonic/aws.py @@ -62,7 +62,6 @@ from pipecat.services.aws_nova_sonic.context import ( ) from pipecat.services.aws_nova_sonic.frames import AWSNovaSonicFunctionCallResultFrame from pipecat.services.llm_service import LLMService -from pipecat.utils.asyncio.watchdog_coroutine import watchdog_coroutine from pipecat.utils.time import time_now_iso8601 try: @@ -795,7 +794,7 @@ class AWSNovaSonicLLMService(LLMService): try: while self._stream and not self._disconnecting: output = await self._stream.await_output() - result = await watchdog_coroutine(output[1].receive(), manager=self.task_manager) + result = await output[1].receive() if result.value and result.value.bytes_: response_data = result.value.bytes_.decode("utf-8") diff --git a/src/pipecat/services/cartesia/tts.py b/src/pipecat/services/cartesia/tts.py index 1d136feef..a2b013f48 100644 --- a/src/pipecat/services/cartesia/tts.py +++ b/src/pipecat/services/cartesia/tts.py @@ -29,7 +29,6 @@ from pipecat.frames.frames import ( from pipecat.processors.frame_processor import FrameDirection from pipecat.services.tts_service import AudioContextWordTTSService, TTSService from pipecat.transcriptions.language import Language -from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator from pipecat.utils.text.base_text_aggregator import BaseTextAggregator from pipecat.utils.text.skip_tags_aggregator import SkipTagsAggregator from pipecat.utils.tracing.service_decorators import traced_tts @@ -388,9 +387,7 @@ class CartesiaTTSService(AudioContextWordTTSService): self._context_id = None async def _receive_messages(self): - async for message in WatchdogAsyncIterator( - self._get_websocket(), manager=self.task_manager - ): + async for message in self._get_websocket(): msg = json.loads(message) if not msg or not self.audio_context_available(msg["context_id"]): continue diff --git a/src/pipecat/services/elevenlabs/tts.py b/src/pipecat/services/elevenlabs/tts.py index 9f4f3415c..8551fc5de 100644 --- a/src/pipecat/services/elevenlabs/tts.py +++ b/src/pipecat/services/elevenlabs/tts.py @@ -38,7 +38,6 @@ from pipecat.services.tts_service import ( WordTTSService, ) from pipecat.transcriptions.language import Language -from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator from pipecat.utils.tracing.service_decorators import traced_tts # See .env.example for ElevenLabs configuration needed @@ -574,9 +573,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService): async def _receive_messages(self): """Handle incoming WebSocket messages from ElevenLabs.""" - async for message in WatchdogAsyncIterator( - self._get_websocket(), manager=self.task_manager - ): + async for message in self._get_websocket(): msg = json.loads(message) received_ctx_id = msg.get("contextId") @@ -635,9 +632,8 @@ class ElevenLabsTTSService(AudioContextWordTTSService): async def _keepalive_task_handler(self): """Send periodic keepalive messages to maintain WebSocket connection.""" - KEEPALIVE_SLEEP = 10 if self.task_manager.task_watchdog_enabled else 3 + KEEPALIVE_SLEEP = 10 while True: - self.reset_watchdog() await asyncio.sleep(KEEPALIVE_SLEEP) try: if self._websocket and self._websocket.state is State.OPEN: diff --git a/src/pipecat/services/gemini_multimodal_live/gemini.py b/src/pipecat/services/gemini_multimodal_live/gemini.py index 04e6853eb..da14c3275 100644 --- a/src/pipecat/services/gemini_multimodal_live/gemini.py +++ b/src/pipecat/services/gemini_multimodal_live/gemini.py @@ -67,7 +67,6 @@ from pipecat.services.openai.llm import ( OpenAIUserContextAggregator, ) from pipecat.transcriptions.language import Language -from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator from pipecat.utils.string import match_endofsentence from pipecat.utils.time import time_now_iso8601 from pipecat.utils.tracing.service_decorators import traced_gemini_live, traced_stt @@ -929,7 +928,7 @@ class GeminiMultimodalLiveLLMService(LLMService): async def _receive_task_handler(self): """Handle incoming messages from the WebSocket connection.""" - async for message in WatchdogAsyncIterator(self._websocket, manager=self.task_manager): + async for message in self._websocket: evt = events.parse_server_event(message) # logger.debug(f"Received event: {message[:500]}") # logger.debug(f"Received event: {evt}") diff --git a/src/pipecat/services/gladia/stt.py b/src/pipecat/services/gladia/stt.py index 7bec1a91f..c774c4257 100644 --- a/src/pipecat/services/gladia/stt.py +++ b/src/pipecat/services/gladia/stt.py @@ -32,7 +32,6 @@ from pipecat.frames.frames import ( from pipecat.services.gladia.config import GladiaInputParams from pipecat.services.stt_service import STTService from pipecat.transcriptions.language import Language -from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator from pipecat.utils.time import time_now_iso8601 from pipecat.utils.tracing.service_decorators import traced_stt @@ -536,9 +535,8 @@ class GladiaSTTService(STTService): async def _keepalive_task_handler(self): """Send periodic empty audio chunks to keep the connection alive.""" try: - KEEPALIVE_SLEEP = 20 if self.task_manager.task_watchdog_enabled else 3 + KEEPALIVE_SLEEP = 20 while self._connection_active: - self.reset_watchdog() # Send keepalive (Gladia times out after 30 seconds) await asyncio.sleep(KEEPALIVE_SLEEP) if self._websocket and self._websocket.state is State.OPEN: @@ -555,7 +553,7 @@ class GladiaSTTService(STTService): async def _receive_task_handler(self): try: - async for message in WatchdogAsyncIterator(self._websocket, manager=self.task_manager): + async for message in self._websocket: content = json.loads(message) # Handle audio chunk acknowledgments @@ -613,8 +611,6 @@ class GladiaSTTService(STTService): translation, "", time_now_iso8601(), translated_language ) ) - - self.reset_watchdog() except websockets.exceptions.ConnectionClosed: # Expected when closing the connection pass diff --git a/src/pipecat/services/google/llm.py b/src/pipecat/services/google/llm.py index 7e0e59840..88664c9d8 100644 --- a/src/pipecat/services/google/llm.py +++ b/src/pipecat/services/google/llm.py @@ -53,7 +53,6 @@ from pipecat.services.openai.llm import ( OpenAIAssistantContextAggregator, OpenAIUserContextAggregator, ) -from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator from pipecat.utils.tracing.service_decorators import traced_llm # Suppress gRPC fork warnings @@ -807,7 +806,7 @@ class GoogleLLMService(LLMService): ) function_calls = [] - async for chunk in WatchdogAsyncIterator(response, manager=self.task_manager): + async for chunk in response: # Stop TTFB metrics after the first chunk await self.stop_ttfb_metrics() if chunk.usage_metadata: diff --git a/src/pipecat/services/google/llm_openai.py b/src/pipecat/services/google/llm_openai.py index 02c50f885..bcd350380 100644 --- a/src/pipecat/services/google/llm_openai.py +++ b/src/pipecat/services/google/llm_openai.py @@ -17,7 +17,6 @@ from openai import AsyncStream from openai.types.chat import ChatCompletionChunk from pipecat.services.llm_service import FunctionCallFromLLM -from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator # Suppress gRPC fork warnings os.environ["GRPC_ENABLE_FORK_SUPPORT"] = "false" @@ -77,7 +76,7 @@ class GoogleLLMOpenAIBetaService(OpenAILLMService): context ) - async for chunk in WatchdogAsyncIterator(chunk_stream, manager=self.task_manager): + async for chunk in chunk_stream: if chunk.usage: tokens = LLMTokenUsage( prompt_tokens=chunk.usage.prompt_tokens, diff --git a/src/pipecat/services/google/stt.py b/src/pipecat/services/google/stt.py index 5e3e36e62..31ae597f7 100644 --- a/src/pipecat/services/google/stt.py +++ b/src/pipecat/services/google/stt.py @@ -16,7 +16,6 @@ import json import os import time -from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator from pipecat.utils.tracing.service_decorators import traced_stt # Suppress gRPC fork warnings @@ -781,7 +780,6 @@ class GoogleSTTService(STTService): if self._request_queue.empty(): # wait for 10ms in case we don't have audio await asyncio.sleep(0.01) - self.reset_watchdog() continue # Start bi-directional streaming @@ -836,9 +834,7 @@ class GoogleSTTService(STTService): async def _process_responses(self, streaming_recognize): """Process streaming recognition responses.""" try: - async for response in WatchdogAsyncIterator( - streaming_recognize, manager=self.task_manager - ): + async for response in streaming_recognize: # Check streaming limit if (int(time.time() * 1000) - self._stream_start_time) > self.STREAMING_LIMIT: logger.debug("Stream timeout reached in response processing") diff --git a/src/pipecat/services/heygen/client.py b/src/pipecat/services/heygen/client.py index 2a464f949..07da6feb1 100644 --- a/src/pipecat/services/heygen/client.py +++ b/src/pipecat/services/heygen/client.py @@ -31,7 +31,6 @@ from pipecat.processors.frame_processor import FrameProcessorSetup from pipecat.services.heygen.api import HeyGenApi, HeyGenSession, NewSessionRequest from pipecat.transports.base_transport import TransportParams from pipecat.utils.asyncio.task_manager import BaseTaskManager -from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue try: from livekit import rtc @@ -104,7 +103,7 @@ class HeyGenClient: self._connected = False self._session_request = session_request self._callbacks = callbacks - self._event_queue: Optional[WatchdogQueue] = None + self._event_queue: Optional[asyncio.Queue] = None self._event_task = None # Currently supporting to capture the audio and video from a single participant self._video_task = None @@ -149,7 +148,7 @@ class HeyGenClient: try: await self._initialize() - self._event_queue = WatchdogQueue(self._task_manager) + self._event_queue = asyncio.Queue() self._event_task = self._task_manager.create_task( self._callback_task_handler(self._event_queue), f"{self}::event_callback_task", @@ -170,7 +169,6 @@ class HeyGenClient: self._connected = False if self._event_task and self._task_manager: - self._event_queue.cancel() await self._task_manager.cancel_task(self._event_task) self._event_task = None except Exception as e: @@ -231,11 +229,9 @@ class HeyGenClient: """Handle incoming WebSocket messages.""" while self._connected: try: - message = await asyncio.wait_for(self._websocket.recv(), timeout=1.0) + message = await self._websocket.recv() parsed_message = json.loads(message) await self._handle_ws_server_event(parsed_message) - except asyncio.TimeoutError: - self._task_manager.task_reset_watchdog() except ConnectionClosedOK: break except Exception as e: diff --git a/src/pipecat/services/heygen/video.py b/src/pipecat/services/heygen/video.py index 96c684641..26a6f120d 100644 --- a/src/pipecat/services/heygen/video.py +++ b/src/pipecat/services/heygen/video.py @@ -40,7 +40,6 @@ from pipecat.services.ai_service import AIService from pipecat.services.heygen.api import NewSessionRequest from pipecat.services.heygen.client import HEY_GEN_SAMPLE_RATE, HeyGenCallbacks, HeyGenClient from pipecat.transports.base_transport import TransportParams -from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue # Using the same values that we do in the BaseOutputTransport AVATAR_VAD_STOP_SECS = 0.35 @@ -278,21 +277,14 @@ class HeyGenVideoService(AIService): await self._client.stop() async def _create_send_task(self): - """Create the audio sending task if it doesn't exist. - - Initializes a new WatchdogQueue and creates a task for handling audio sending. - """ + """Create the audio sending task if it doesn't exist.""" if not self._send_task: - self._queue = WatchdogQueue(self.task_manager) + self._queue = asyncio.Queue() self._send_task = self.create_task(self._send_task_handler()) async def _cancel_send_task(self): - """Cancel the audio sending task if it exists. - - Cancels and cleans up the audio sending task and associated queue. - """ + """Cancel the audio sending task if it exists.""" if self._send_task: - self._queue.cancel() await self.cancel_task(self._send_task) self._send_task = None diff --git a/src/pipecat/services/neuphonic/tts.py b/src/pipecat/services/neuphonic/tts.py index fc0117712..f4777b8d9 100644 --- a/src/pipecat/services/neuphonic/tts.py +++ b/src/pipecat/services/neuphonic/tts.py @@ -36,7 +36,6 @@ from pipecat.frames.frames import ( from pipecat.processors.frame_processor import FrameDirection from pipecat.services.tts_service import InterruptibleTTSService, TTSService from pipecat.transcriptions.language import Language -from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator from pipecat.utils.tracing.service_decorators import traced_tts try: @@ -315,7 +314,7 @@ class NeuphonicTTSService(InterruptibleTTSService): async def _receive_messages(self): """Receive and process messages from Neuphonic WebSocket.""" - async for message in WatchdogAsyncIterator(self._websocket, manager=self.task_manager): + async for message in self._websocket: if isinstance(message, str): msg = json.loads(message) if msg.get("data") and msg["data"].get("audio"): @@ -327,9 +326,8 @@ class NeuphonicTTSService(InterruptibleTTSService): async def _keepalive_task_handler(self): """Handle keepalive messages to maintain WebSocket connection.""" - KEEPALIVE_SLEEP = 10 if self.task_manager.task_watchdog_enabled else 3 + KEEPALIVE_SLEEP = 10 while True: - self.reset_watchdog() await asyncio.sleep(KEEPALIVE_SLEEP) await self._send_keepalive() diff --git a/src/pipecat/services/openai/base_llm.py b/src/pipecat/services/openai/base_llm.py index ecde66eec..1dec6e91b 100644 --- a/src/pipecat/services/openai/base_llm.py +++ b/src/pipecat/services/openai/base_llm.py @@ -39,7 +39,6 @@ from pipecat.processors.aggregators.openai_llm_context import ( ) from pipecat.processors.frame_processor import FrameDirection from pipecat.services.llm_service import FunctionCallFromLLM, LLMService -from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator from pipecat.utils.tracing.service_decorators import traced_llm @@ -284,7 +283,7 @@ class BaseOpenAILLMService(LLMService): context ) - async for chunk in WatchdogAsyncIterator(chunk_stream, manager=self.task_manager): + async for chunk in chunk_stream: if chunk.usage: tokens = LLMTokenUsage( prompt_tokens=chunk.usage.prompt_tokens, diff --git a/src/pipecat/services/openai_realtime_beta/openai.py b/src/pipecat/services/openai_realtime_beta/openai.py index 5c7df7597..bc7af9a46 100644 --- a/src/pipecat/services/openai_realtime_beta/openai.py +++ b/src/pipecat/services/openai_realtime_beta/openai.py @@ -53,7 +53,6 @@ from pipecat.processors.frame_processor import FrameDirection from pipecat.services.llm_service import FunctionCallFromLLM, LLMService from pipecat.services.openai.llm import OpenAIContextAggregatorPair from pipecat.transcriptions.language import Language -from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator from pipecat.utils.time import time_now_iso8601 from pipecat.utils.tracing.service_decorators import traced_openai_realtime, traced_stt @@ -456,7 +455,7 @@ class OpenAIRealtimeBetaLLMService(LLMService): # async def _receive_task_handler(self): - async for message in WatchdogAsyncIterator(self._websocket, manager=self.task_manager): + async for message in self._websocket: evt = events.parse_server_event(message) if evt.type == "session.created": await self._handle_evt_session_created(evt) diff --git a/src/pipecat/services/riva/stt.py b/src/pipecat/services/riva/stt.py index be90e8732..d00eb4f42 100644 --- a/src/pipecat/services/riva/stt.py +++ b/src/pipecat/services/riva/stt.py @@ -24,7 +24,6 @@ from pipecat.frames.frames import ( ) from pipecat.services.stt_service import SegmentedSTTService, STTService from pipecat.transcriptions.language import Language -from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue from pipecat.utils.time import time_now_iso8601 from pipecat.utils.tracing.service_decorators import traced_stt @@ -239,13 +238,13 @@ class RivaSTTService(STTService): riva.client.add_custom_configuration_to_config(config, self._custom_configuration) self._config = config - self._queue = WatchdogQueue(self.task_manager) + self._queue = asyncio.Queue() if not self._thread_task: self._thread_task = self.create_task(self._thread_task_handler()) if not self._response_task: - self._response_queue = WatchdogQueue(self.task_manager) + self._response_queue = asyncio.Queue() self._response_task = self.create_task(self._response_task_handler()) async def stop(self, frame: EndFrame): diff --git a/src/pipecat/services/sambanova/llm.py b/src/pipecat/services/sambanova/llm.py index 815d1c8be..f2ee082ed 100644 --- a/src/pipecat/services/sambanova/llm.py +++ b/src/pipecat/services/sambanova/llm.py @@ -20,7 +20,6 @@ from pipecat.metrics.metrics import LLMTokenUsage from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.services.llm_service import FunctionCallFromLLM from pipecat.services.openai.llm import OpenAILLMService -from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator from pipecat.utils.tracing.service_decorators import traced_llm @@ -127,7 +126,7 @@ class SambaNovaLLMService(OpenAILLMService): # type: ignore context ) - async for chunk in WatchdogAsyncIterator(chunk_stream, manager=self.task_manager): + async for chunk in chunk_stream: if chunk.usage: tokens = LLMTokenUsage( prompt_tokens=chunk.usage.prompt_tokens, diff --git a/src/pipecat/services/simli/video.py b/src/pipecat/services/simli/video.py index d398a87f3..e35dad4c6 100644 --- a/src/pipecat/services/simli/video.py +++ b/src/pipecat/services/simli/video.py @@ -22,7 +22,6 @@ from pipecat.frames.frames import ( UserStartedSpeakingFrame, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, StartFrame -from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator try: from av.audio.frame import AudioFrame @@ -96,7 +95,7 @@ class SimliVideoService(FrameProcessor): """Consume audio frames from Simli and push them downstream.""" await self._pipecat_resampler_event.wait() audio_iterator = self._simli_client.getAudioStreamIterator() - async for audio_frame in WatchdogAsyncIterator(audio_iterator, manager=self.task_manager): + async for audio_frame in audio_iterator: resampled_frames = self._pipecat_resampler.resample(audio_frame) for resampled_frame in resampled_frames: audio_array = resampled_frame.to_ndarray() @@ -114,7 +113,7 @@ class SimliVideoService(FrameProcessor): """Consume video frames from Simli and convert them to output frames.""" await self._pipecat_resampler_event.wait() video_iterator = self._simli_client.getVideoStreamIterator(targetFormat="rgb24") - async for video_frame in WatchdogAsyncIterator(video_iterator, manager=self.task_manager): + async for video_frame in video_iterator: # Process the video frame convertedFrame: OutputImageRawFrame = OutputImageRawFrame( image=video_frame.to_rgb().to_image().tobytes(), diff --git a/src/pipecat/services/tavus/video.py b/src/pipecat/services/tavus/video.py index e3c83bb89..1ec76c02c 100644 --- a/src/pipecat/services/tavus/video.py +++ b/src/pipecat/services/tavus/video.py @@ -35,7 +35,6 @@ from pipecat.frames.frames import ( from pipecat.processors.frame_processor import FrameDirection, FrameProcessorSetup from pipecat.services.ai_service import AIService from pipecat.transports.services.tavus import TavusCallbacks, TavusParams, TavusTransportClient -from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue class TavusVideoService(AIService): @@ -255,13 +254,12 @@ class TavusVideoService(AIService): async def _create_send_task(self): """Create the audio sending task if it doesn't exist.""" if not self._send_task: - self._queue = WatchdogQueue(self.task_manager) + self._queue = asyncio.Queue() self._send_task = self.create_task(self._send_task_handler()) async def _cancel_send_task(self): """Cancel the audio sending task if it exists.""" if self._send_task: - self._queue.cancel() await self.cancel_task(self._send_task) self._send_task = None diff --git a/src/pipecat/services/tts_service.py b/src/pipecat/services/tts_service.py index 208abaef9..90eb1646d 100644 --- a/src/pipecat/services/tts_service.py +++ b/src/pipecat/services/tts_service.py @@ -37,7 +37,6 @@ from pipecat.processors.frame_processor import FrameDirection from pipecat.services.ai_service import AIService from pipecat.services.websocket_service import WebsocketService from pipecat.transcriptions.language import Language -from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue from pipecat.utils.text.base_text_aggregator import BaseTextAggregator from pipecat.utils.text.base_text_filter import BaseTextFilter from pipecat.utils.text.simple_text_aggregator import SimpleTextAggregator @@ -438,8 +437,6 @@ class TTSService(AIService): if has_started: await self.push_frame(TTSStoppedFrame()) has_started = False - finally: - self.reset_watchdog() class WordTTSService(TTSService): @@ -526,7 +523,7 @@ class WordTTSService(TTSService): def _create_words_task(self): if not self._words_task: - self._words_queue = WatchdogQueue(self.task_manager) + self._words_queue = asyncio.Queue() self._words_task = self.create_task(self._words_task_handler()) async def _stop_words_task(self): @@ -816,13 +813,12 @@ class AudioContextWordTTSService(WebsocketWordTTSService): def _create_audio_context_task(self): if not self._audio_context_task: - self._contexts_queue = WatchdogQueue(self.task_manager) + self._contexts_queue = asyncio.Queue() self._contexts: Dict[str, asyncio.Queue] = {} self._audio_context_task = self.create_task(self._audio_context_task_handler()) async def _stop_audio_context_task(self): if self._audio_context_task: - self._contexts_queue.cancel() await self.cancel_task(self._audio_context_task) self._audio_context_task = None @@ -859,12 +855,10 @@ class AudioContextWordTTSService(WebsocketWordTTSService): while running: try: frame = await asyncio.wait_for(queue.get(), timeout=AUDIO_CONTEXT_TIMEOUT) - self.reset_watchdog() if frame: await self.push_frame(frame) running = frame is not None except asyncio.TimeoutError: - self.reset_watchdog() # We didn't get audio, so let's consider this context finished. logger.trace(f"{self} time out on audio context {context_id}") break diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index 6891bef00..aad2fb842 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -49,7 +49,6 @@ from pipecat.frames.frames import ( from pipecat.metrics.metrics import MetricsData from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.transports.base_transport import TransportParams -from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue AUDIO_INPUT_TIMEOUT_SECS = 0.5 @@ -396,7 +395,7 @@ class BaseInputTransport(FrameProcessor): def _create_audio_task(self): """Create the audio processing task if audio input is enabled.""" if not self._audio_task and self._params.audio_in_enabled: - self._audio_in_queue = WatchdogQueue(self.task_manager) + self._audio_in_queue = asyncio.Queue() self._audio_task = self.create_task(self._audio_task_handler()) async def _cancel_audio_task(self): @@ -506,8 +505,6 @@ class BaseInputTransport(FrameProcessor): if self._params.turn_analyzer: self._params.turn_analyzer.clear() await self._handle_user_interruption(UserStoppedSpeakingFrame()) - finally: - self.reset_watchdog() async def _handle_prediction_result(self, result: MetricsData): """Handle a prediction result event from the turn analyzer.""" diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index afb5abb0b..0a20e07b3 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -46,7 +46,6 @@ from pipecat.frames.frames import ( ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.transports.base_transport import TransportParams -from pipecat.utils.asyncio.watchdog_priority_queue import WatchdogPriorityQueue from pipecat.utils.time import nanoseconds_to_seconds BOT_VAD_STOP_SECS = 0.35 @@ -626,10 +625,8 @@ class BaseOutputTransport(FrameProcessor): frame = await asyncio.wait_for( self._audio_queue.get(), timeout=vad_stop_secs ) - self._transport.reset_watchdog() yield frame except asyncio.TimeoutError: - self._transport.reset_watchdog() # Notify the bot stopped speaking upstream if necessary. await self._bot_stopped_speaking() @@ -639,13 +636,11 @@ class BaseOutputTransport(FrameProcessor): while True: try: frame = self._audio_queue.get_nowait() - self._transport.reset_watchdog() if isinstance(frame, OutputAudioRawFrame): frame.audio = await self._mixer.mix(frame.audio) last_frame_time = time.time() yield frame except asyncio.QueueEmpty: - self._transport.reset_watchdog() # Notify the bot stopped speaking upstream if necessary. diff_time = time.time() - last_frame_time if diff_time > vad_stop_secs: @@ -827,15 +822,12 @@ class BaseOutputTransport(FrameProcessor): def _create_clock_task(self): """Create the clock/timing processing task.""" if not self._clock_task: - self._clock_queue = WatchdogPriorityQueue( - self._transport.task_manager, tuple_size=3 - ) + self._clock_queue = asyncio.PriorityQueue() self._clock_task = self._transport.create_task(self._clock_task_handler()) async def _cancel_clock_task(self): """Cancel and cleanup the clock processing task.""" if self._clock_task: - self._clock_queue.cancel() await self._transport.cancel_task(self._clock_task) self._clock_task = None diff --git a/src/pipecat/transports/network/fastapi_websocket.py b/src/pipecat/transports/network/fastapi_websocket.py index d3e3362b4..8287783c2 100644 --- a/src/pipecat/transports/network/fastapi_websocket.py +++ b/src/pipecat/transports/network/fastapi_websocket.py @@ -37,7 +37,6 @@ from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializer from pipecat.transports.base_input import BaseInputTransport from pipecat.transports.base_output import BaseOutputTransport from pipecat.transports.base_transport import BaseTransport, TransportParams -from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator try: from fastapi import WebSocket @@ -283,9 +282,7 @@ class FastAPIWebsocketInputTransport(BaseInputTransport): async def _receive_messages(self): """Main message receiving loop for WebSocket messages.""" try: - async for message in WatchdogAsyncIterator( - self._client.receive(), manager=self.task_manager - ): + async for message in self._client.receive(): if not self._params.serializer: continue diff --git a/src/pipecat/transports/network/small_webrtc.py b/src/pipecat/transports/network/small_webrtc.py index ddb8339c9..d690e6138 100644 --- a/src/pipecat/transports/network/small_webrtc.py +++ b/src/pipecat/transports/network/small_webrtc.py @@ -40,7 +40,6 @@ from pipecat.transports.base_input import BaseInputTransport from pipecat.transports.base_output import BaseOutputTransport from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection -from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator try: import cv2 @@ -586,9 +585,7 @@ class SmallWebRTCInputTransport(BaseInputTransport): """Background task for receiving audio frames from WebRTC.""" try: audio_iterator = self._client.read_audio_frame() - async for audio_frame in WatchdogAsyncIterator( - audio_iterator, manager=self.task_manager - ): + async for audio_frame in audio_iterator: if audio_frame: await self.push_audio_frame(audio_frame) @@ -599,9 +596,7 @@ class SmallWebRTCInputTransport(BaseInputTransport): """Background task for receiving video frames from WebRTC.""" try: video_iterator = self._client.read_video_frame() - async for video_frame in WatchdogAsyncIterator( - video_iterator, manager=self.task_manager - ): + async for video_frame in video_iterator: if video_frame: await self.push_video_frame(video_frame) diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 77f7f5e29..66e17905a 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -49,7 +49,6 @@ from pipecat.transports.base_input import BaseInputTransport from pipecat.transports.base_output import BaseOutputTransport from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.utils.asyncio.task_manager import BaseTaskManager -from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue try: from daily import ( @@ -358,7 +357,6 @@ class DailyTransportClient(EventHandler): self._leave_counter = 0 self._task_manager: Optional[BaseTaskManager] = None - self._watchdog_timers_enabled = False # We use the executor to cleanup the client. We just do it from one # place, so only one thread is really needed. @@ -527,9 +525,8 @@ class DailyTransportClient(EventHandler): return self._task_manager = setup.task_manager - self._watchdog_timers_enabled = setup.watchdog_timers_enabled - self._event_queue = WatchdogQueue(self._task_manager) + self._event_queue = asyncio.Queue() self._event_task = self._task_manager.create_task( self._callback_task_handler(self._event_queue), f"{self}::event_callback_task", @@ -561,7 +558,7 @@ class DailyTransportClient(EventHandler): if self._params.audio_in_enabled: if self._params.audio_in_user_tracks and not self._audio_task and self._task_manager: - self._audio_queue = WatchdogQueue(self._task_manager) + self._audio_queue = asyncio.Queue() self._audio_task = self._task_manager.create_task( self._callback_task_handler(self._audio_queue), f"{self}::audio_callback_task", @@ -576,7 +573,7 @@ class DailyTransportClient(EventHandler): Daily.select_speaker_device(self._speaker_name()) if self._params.video_in_enabled and not self._video_task and self._task_manager: - self._video_queue = WatchdogQueue(self._task_manager) + self._video_queue = asyncio.Queue() self._video_task = self._task_manager.create_task( self._callback_task_handler(self._video_queue), f"{self}::video_callback_task", diff --git a/src/pipecat/transports/services/livekit.py b/src/pipecat/transports/services/livekit.py index 24c0b3cf0..f21775cf6 100644 --- a/src/pipecat/transports/services/livekit.py +++ b/src/pipecat/transports/services/livekit.py @@ -35,7 +35,6 @@ from pipecat.transports.base_input import BaseInputTransport from pipecat.transports.base_output import BaseOutputTransport from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.utils.asyncio.task_manager import BaseTaskManager -from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator try: from livekit import rtc @@ -599,7 +598,7 @@ class LiveKitInputTransport(BaseInputTransport): """Handle incoming audio frames from participants.""" logger.info("Audio input task started") audio_iterator = self._client.get_next_audio_frame() - async for audio_data in WatchdogAsyncIterator(audio_iterator, manager=self.task_manager): + async for audio_data in audio_iterator: if audio_data: audio_frame_event, participant_id = audio_data pipecat_audio_frame = await self._convert_livekit_audio_to_pipecat( diff --git a/src/pipecat/utils/asyncio/task_manager.py b/src/pipecat/utils/asyncio/task_manager.py index 3bc3453ac..386826bcb 100644 --- a/src/pipecat/utils/asyncio/task_manager.py +++ b/src/pipecat/utils/asyncio/task_manager.py @@ -4,24 +4,20 @@ # SPDX-License-Identifier: BSD 2-Clause License # -"""Asyncio task management with watchdog monitoring capabilities. +"""Asyncio task management. -This module provides task management functionality with optional watchdog timers -to monitor task execution and prevent hanging operations. Includes both abstract -base classes and concrete implementations for managing asyncio tasks with +This module provides task management functionality. Includes both abstract base +classes and concrete implementations for managing asyncio tasks with comprehensive monitoring and cleanup capabilities. """ import asyncio -import time from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Coroutine, Dict, Optional, Sequence from loguru import logger -WATCHDOG_TIMEOUT = 5.0 - @dataclass class TaskManagerParams: @@ -29,22 +25,15 @@ class TaskManagerParams: Parameters: loop: The asyncio event loop to use for task management. - enable_watchdog_timers: Whether to enable watchdog timers for tasks. - enable_watchdog_logging: Whether to log watchdog timing information. - watchdog_timeout: Default timeout in seconds for watchdog timers. """ loop: asyncio.AbstractEventLoop - enable_watchdog_timers: bool = False - enable_watchdog_logging: bool = False - watchdog_timeout: float = WATCHDOG_TIMEOUT class BaseTaskManager(ABC): - """Abstract base class for asyncio task management with watchdog support. + """Abstract base class for asyncio task management. - Provides the interface for creating, monitoring, and managing asyncio tasks - with optional watchdog timer functionality to detect stalled operations. + Provides the interface for creating, monitoring, and managing asyncio tasks. """ @abstractmethod @@ -66,15 +55,7 @@ class BaseTaskManager(ABC): pass @abstractmethod - def create_task( - self, - coroutine: Coroutine, - name: str, - *, - enable_watchdog_logging: Optional[bool] = None, - enable_watchdog_timers: Optional[bool] = None, - watchdog_timeout: Optional[float] = None, - ) -> asyncio.Task: + def create_task(self, coroutine: Coroutine, name: str) -> asyncio.Task: """Creates and schedules a new asyncio Task that runs the given coroutine. The task is added to a global set of created tasks. @@ -82,9 +63,6 @@ class BaseTaskManager(ABC): Args: coroutine: The coroutine to be executed within the task. name: The name to assign to the task for identification. - enable_watchdog_logging: Whether this task should log watchdog processing times. - enable_watchdog_timers: Whether this task should have a watchdog timer. - watchdog_timeout: Watchdog timer timeout for this task. Returns: The created task object. @@ -128,53 +106,24 @@ class BaseTaskManager(ABC): """ pass - @abstractmethod - def task_reset_watchdog(self): - """Task reset watchdog timer. - - Resets the running task watchdog timer. If not reset, a warning will - be logged indicating the task is stalling. - """ - pass - - @property - @abstractmethod - def task_watchdog_enabled(self) -> bool: - """Whether the current running task has a watchdog timer enabled. - - Returns: - True if the current task has watchdog monitoring active. - """ - pass - @dataclass class TaskData: - """Internal data structure for tracking task metadata and watchdog state. + """Internal data structure for tracking task metadata. Parameters: task: The asyncio Task being managed. - watchdog_timer: Event used to reset the watchdog timer. - enable_watchdog_logging: Whether to log watchdog timing information. - enable_watchdog_timers: Whether watchdog timers are enabled for this task. - watchdog_timeout: Timeout in seconds for watchdog warnings. - watchdog_task: Optional background task monitoring the watchdog timer. """ task: asyncio.Task - watchdog_timer: asyncio.Event - enable_watchdog_logging: bool - enable_watchdog_timers: bool - watchdog_timeout: float - watchdog_task: Optional[asyncio.Task] class TaskManager(BaseTaskManager): - """Concrete implementation of BaseTaskManager with full watchdog support. + """Concrete implementation of BaseTaskManager. + + Manages asyncio tasks. Provides comprehensive task lifecycle management + including creation, monitoring, cancellation, and cleanup. - Manages asyncio tasks with optional watchdog monitoring to detect stalled - operations. Provides comprehensive task lifecycle management including - creation, monitoring, cancellation, and cleanup. """ def __init__(self) -> None: @@ -204,15 +153,7 @@ class TaskManager(BaseTaskManager): raise Exception("TaskManager is not setup: unable to get event loop") return self._params.loop - def create_task( - self, - coroutine: Coroutine, - name: str, - *, - enable_watchdog_logging: Optional[bool] = None, - enable_watchdog_timers: Optional[bool] = None, - watchdog_timeout: Optional[float] = None, - ) -> asyncio.Task: + def create_task(self, coroutine: Coroutine, name: str) -> asyncio.Task: """Creates and schedules a new asyncio Task that runs the given coroutine. The task is added to a global set of created tasks. @@ -220,9 +161,6 @@ class TaskManager(BaseTaskManager): Args: coroutine: The coroutine to be executed within the task. name: The name to assign to the task for identification. - enable_watchdog_logging: Whether this task should log watchdog processing time. - enable_watchdog_timers: Whether this task should have a watchdog timer. - watchdog_timeout: Watchdog timer timeout for this task. Returns: The created task object. @@ -247,26 +185,7 @@ class TaskManager(BaseTaskManager): task = self._params.loop.create_task(run_coroutine()) task.set_name(name) task.add_done_callback(self._task_done_handler) - self._add_task( - TaskData( - task=task, - watchdog_timer=asyncio.Event(), - enable_watchdog_logging=( - enable_watchdog_logging - if enable_watchdog_logging - else self._params.enable_watchdog_logging - ), - enable_watchdog_timers=( - enable_watchdog_timers - if enable_watchdog_timers - else self._params.enable_watchdog_timers - ), - watchdog_timeout=( - watchdog_timeout if watchdog_timeout else self._params.watchdog_timeout - ), - watchdog_task=None, - ), - ) + self._add_task(TaskData(task=task)) logger.trace(f"{name}: task created") return task @@ -312,8 +231,6 @@ class TaskManager(BaseTaskManager): name = task.get_name() task.cancel() try: - # Make sure to reset watchdog if a task is cancelled. - self.reset_watchdog(task) if timeout: await asyncio.wait_for(task, timeout=timeout) else: @@ -329,16 +246,6 @@ class TaskManager(BaseTaskManager): logger.critical(f"{name}: fatal base exception while cancelling task: {e}") raise - def reset_watchdog(self, task: asyncio.Task): - """Reset the watchdog timer for a specific task. - - Args: - task: The task whose watchdog timer should be reset. - """ - name = task.get_name() - if name in self._tasks and self._tasks[name].enable_watchdog_timers: - self._tasks[name].watchdog_timer.set() - def current_tasks(self) -> Sequence[asyncio.Task]: """Returns the list of currently created/registered tasks. @@ -347,99 +254,23 @@ class TaskManager(BaseTaskManager): """ return [data.task for data in self._tasks.values()] - def task_reset_watchdog(self): - """Task reset watchdog timer. - - Resets the running task watchdog timer. If not reset on time, a warning - will be logged indicating the task is stalling. - """ - task = asyncio.current_task() - if task: - self.reset_watchdog(task) - - @property - def task_watchdog_enabled(self) -> bool: - """Whether the current running task has a watchdog timer enabled. - - Returns: - True if the current task has watchdog monitoring active. - """ - task = asyncio.current_task() - if not task: - return False - name = task.get_name() - return name in self._tasks and self._tasks[name].enable_watchdog_timers - def _add_task(self, task_data: TaskData): - """Add a task to the internal registry and start watchdog if enabled. + """Add a task to the internal registry. Args: - task_data: The task data containing task and watchdog configuration. + task_data: The task metadata. """ name = task_data.task.get_name() self._tasks[name] = task_data - if self._params and task_data.enable_watchdog_timers: - watchdog_task = self.get_event_loop().create_task( - self._watchdog_task_handler(task_data) - ) - task_data.watchdog_task = watchdog_task - - async def _watchdog_task_handler(self, task_data: TaskData): - """Background task that monitors watchdog timer for a specific task. - - Args: - task_data: The task data containing watchdog configuration. - """ - name = task_data.task.get_name() - timer = task_data.watchdog_timer - enable_watchdog_logging = task_data.enable_watchdog_logging - watchdog_timeout = task_data.watchdog_timeout - - while True: - try: - if task_data.task.done(): - logger.debug(f"{name}: task is already done, cancelling watchdog task.") - break - - start_time = time.time() - await asyncio.wait_for(timer.wait(), timeout=watchdog_timeout) - total_time = time.time() - start_time - if enable_watchdog_logging: - logger.debug(f"{name} time between watchdog timer resets: {total_time:.20f}") - except asyncio.TimeoutError: - logger.warning( - f"{name}: task is taking too long {WATCHDOG_TIMEOUT} second(s) (forgot to reset watchdog?)" - ) - finally: - timer.clear() def _task_done_handler(self, task: asyncio.Task): - """Handle task completion by cleaning up watchdog and removing from registry. + """Handle task completion by removing the task from the registry. Args: task: The completed asyncio task. """ name = task.get_name() try: - task_data = self._tasks[name] - if task_data.watchdog_task: - task_data.watchdog_task.cancel() - # In Python 3.10, simply calling task.cancel() looks like is not enough. - # Without this, some tasks appear that are never canceled. - # Python 3.12 handles this more gracefully, but we keep this for compatibility - # and to avoid "Task exception was never retrieved" warnings. - self.get_event_loop().create_task( - self._cleanup_watchdog(name, task_data.watchdog_task) - ) - task_data.watchdog_task = None del self._tasks[name] except KeyError as e: logger.trace(f"{name}: unable to remove task data (already removed?): {e}") - - async def _cleanup_watchdog(self, name: str, watchdog_task: asyncio.Task): - try: - await watchdog_task - except asyncio.CancelledError: - pass - except Exception as e: - logger.warning(f"{name}: watchdog task raised exception: {e}") diff --git a/src/pipecat/utils/asyncio/watchdog_async_iterator.py b/src/pipecat/utils/asyncio/watchdog_async_iterator.py deleted file mode 100644 index 410584f49..000000000 --- a/src/pipecat/utils/asyncio/watchdog_async_iterator.py +++ /dev/null @@ -1,102 +0,0 @@ -# -# Copyright (c) 2024–2025, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# - -"""Watchdog-enabled async iterator wrapper for task monitoring. - -This module provides an async iterator wrapper that automatically resets -watchdog timers while waiting for iterator items, preventing false positive -watchdog timeouts during legitimate waiting periods. -""" - -import asyncio -from typing import AsyncIterator, Optional - -from pipecat.utils.asyncio.task_manager import BaseTaskManager - - -class WatchdogAsyncIterator: - """Watchdog async iterator wrapper. - - An asynchronous iterator that monitors activity and resets the current - task watchdog timer. This is necessary to avoid task watchdog timers to - expire while we are waiting to get an item from the iterator. - """ - - def __init__( - self, - async_iterable, - *, - manager: BaseTaskManager, - timeout: float = 2.0, - ): - """Initialize the watchdog async iterator. - - Args: - async_iterable: The async iterable to wrap with watchdog monitoring. - manager: The task manager for watchdog timer control. - timeout: Timeout in seconds between watchdog resets while waiting. - """ - self._async_iterable = async_iterable - self._manager = manager - self._timeout = timeout - self._iter: Optional[AsyncIterator] = None - self._current_anext_task: Optional[asyncio.Task] = None - - def __aiter__(self): - """Return self as the async iterator. - - Returns: - This iterator instance. - """ - return self - - async def __anext__(self): - """Get the next item from the iterator with watchdog monitoring. - - Returns: - The next item from the wrapped async iterator. - - Raises: - StopAsyncIteration: When the iterator is exhausted. - """ - if not self._iter: - self._iter = await self._ensure_async_iterator(self._async_iterable) - - if self._manager.task_watchdog_enabled: - return await self._watchdog_anext() - else: - return await self._iter.__anext__() - - async def _watchdog_anext(self): - """Get next item while periodically resetting watchdog timer.""" - while True: - try: - if not self._current_anext_task: - self._current_anext_task = asyncio.create_task(self._iter.__anext__()) - - item = await asyncio.wait_for( - asyncio.shield(self._current_anext_task), timeout=self._timeout - ) - - self._manager.task_reset_watchdog() - - # The task has finished, so we will create a new one for the - # next item. - self._current_anext_task = None - - return item - except asyncio.TimeoutError: - self._manager.task_reset_watchdog() - except StopAsyncIteration: - self._current_anext_task = None - raise - - async def _ensure_async_iterator(self, obj) -> AsyncIterator: - """Ensure the object is an async iterator, awaiting if necessary.""" - aiter = obj.__aiter__() - if asyncio.iscoroutine(aiter): - aiter = await aiter - return aiter diff --git a/src/pipecat/utils/asyncio/watchdog_coroutine.py b/src/pipecat/utils/asyncio/watchdog_coroutine.py deleted file mode 100644 index 99a5f8974..000000000 --- a/src/pipecat/utils/asyncio/watchdog_coroutine.py +++ /dev/null @@ -1,87 +0,0 @@ -# -# Copyright (c) 2024–2025, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# - -"""Watchdog-enabled coroutine wrapper for task monitoring. - -This module provides a coroutine wrapper that automatically resets watchdog -timers while waiting for coroutine completion, preventing false positive -watchdog timeouts during legitimate operations. -""" - -import asyncio -from typing import Optional - -from pipecat.utils.asyncio.task_manager import BaseTaskManager - - -class WatchdogCoroutine: - """Watchdog-enabled coroutine wrapper. - - An asynchronous iterator that monitors activity and resets the current - task watchdog timer. This is necessary to avoid task watchdog timers to - expire while we are waiting to get an item from the iterator. - """ - - def __init__( - self, - coroutine, - *, - manager: BaseTaskManager, - timeout: float = 2.0, - ): - """Initialize the watchdog coroutine wrapper. - - Args: - coroutine: The coroutine to wrap with watchdog monitoring. - manager: The task manager for watchdog timer control. - timeout: Timeout in seconds between watchdog resets while waiting. - """ - self._coroutine = coroutine - self._manager = manager - self._timeout = timeout - self._current_coro_task: Optional[asyncio.Task] = None - - async def __call__(self): - """Execute the wrapped coroutine with watchdog monitoring.""" - if self._manager.task_watchdog_enabled: - return await self._watchdog_call() - else: - return await self._coroutine - - async def _watchdog_call(self): - """Execute coroutine while periodically resetting watchdog timer.""" - while True: - try: - if not self._current_coro_task: - self._current_coro_task = asyncio.create_task(self._coroutine) - - result = await asyncio.wait_for( - asyncio.shield(self._current_coro_task), timeout=self._timeout - ) - - self._manager.task_reset_watchdog() - - # The task has finished. - self._current_coro_task = None - - return result - except asyncio.TimeoutError: - self._manager.task_reset_watchdog() - - -async def watchdog_coroutine(coroutine, *, manager: BaseTaskManager, timeout: float = 2.0): - """Execute a coroutine with watchdog monitoring support. - - Args: - coroutine: The coroutine to execute with watchdog monitoring. - manager: The task manager for watchdog timer control. - timeout: Timeout in seconds between watchdog resets while waiting. - - Returns: - The result of the coroutine execution. - """ - watchdog_coro = WatchdogCoroutine(coroutine, manager=manager, timeout=timeout) - return await watchdog_coro() diff --git a/src/pipecat/utils/asyncio/watchdog_event.py b/src/pipecat/utils/asyncio/watchdog_event.py deleted file mode 100644 index c48356c50..000000000 --- a/src/pipecat/utils/asyncio/watchdog_event.py +++ /dev/null @@ -1,68 +0,0 @@ -# -# Copyright (c) 2024–2025, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# - -"""Watchdog-enabled asyncio Event for task monitoring. - -This module provides an asyncio Event subclass that automatically resets -watchdog timers while waiting for the event, preventing false positive -watchdog timeouts during legitimate waiting periods. -""" - -import asyncio - -from pipecat.utils.asyncio.task_manager import BaseTaskManager - - -class WatchdogEvent(asyncio.Event): - """Watchdog-enabled asyncio Event. - - An asynchronous event that resets the current task watchdog timer. This - is necessary to avoid task watchdog timers to expire while we are waiting on - the event. - """ - - def __init__( - self, - manager: BaseTaskManager, - *, - timeout: float = 2.0, - ) -> None: - """Initialize the watchdog event. - - Args: - manager: The task manager for watchdog timer control. - timeout: Timeout in seconds between watchdog resets while waiting. - """ - super().__init__() - self._manager = manager - self._timeout = timeout - - async def wait(self): - """Wait for the event to be set with watchdog monitoring. - - Returns: - True when the event is set. - """ - if self._manager.task_watchdog_enabled: - return await self._watchdog_wait() - else: - return await super().wait() - - async def _watchdog_wait(self): - """Wait for event while periodically resetting watchdog timer.""" - while True: - try: - await asyncio.wait_for(super().wait(), timeout=self._timeout) - self._manager.task_reset_watchdog() - return True - except asyncio.TimeoutError: - self._manager.task_reset_watchdog() - - def clear(self): - """Clear the event while resetting watchdog timer.""" - if self._manager.task_watchdog_enabled: - self._manager.task_reset_watchdog() - super().clear() diff --git a/src/pipecat/utils/asyncio/watchdog_priority_queue.py b/src/pipecat/utils/asyncio/watchdog_priority_queue.py deleted file mode 100644 index c9b8fe171..000000000 --- a/src/pipecat/utils/asyncio/watchdog_priority_queue.py +++ /dev/null @@ -1,131 +0,0 @@ -# -# Copyright (c) 2024–2025, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# - -"""Watchdog-enabled asyncio PriorityQueue for task monitoring. - -This module provides an asyncio PriorityQueue subclass that automatically resets -watchdog timers while waiting for items, preventing false positive watchdog -timeouts during legitimate queue operations. -""" - -import asyncio -from dataclasses import dataclass - -from loguru import logger - -from pipecat.utils.asyncio.task_manager import BaseTaskManager - - -@dataclass -class WatchdogPriorityCancelSentinel: - """Sentinel object used in priority queues to force cancellation. - - An instance of this class is typically inserted into a - `WatchdogPriorityQueue` to act as a high-priority marker asyncio task - cancellation. - - """ - - pass - - -class WatchdogPriorityQueue(asyncio.PriorityQueue): - """Class for watchdog-enabled asyncio PriorityQueue. - - An asynchronous priority queue that resets the current task watchdog - timer. This is necessary to avoid task watchdog timers to expire while we - are waiting to get an item from the queue. - - This queue expects items to be tuples, with the actual payload stored - in the last element. All preceding elements are treated as numeric - priority fields. For example: - - (0, 1, "foo") - - The tuple length must be specified at creation time so the queue can - correctly construct special items, such as the watchdog cancel sentinel, - with the proper tuple structure. - - """ - - def __init__( - self, - manager: BaseTaskManager, - *, - tuple_size: int, - maxsize: int = 0, - timeout: float = 2.0, - ) -> None: - """Initialize the watchdog priority queue. - - Args: - manager: The task manager for watchdog timer control. - tuple_size: The number of values in each inserted tuple. - maxsize: Maximum queue size. 0 means unlimited. - timeout: Timeout in seconds between watchdog resets while waiting. - """ - super().__init__(maxsize) - self._manager = manager - self._timeout = timeout - self._tuple_size = tuple_size - - async def get(self): - """Get an item from the queue with watchdog monitoring. - - Returns: - The next item from the priority queue. - """ - if self._manager.task_watchdog_enabled: - get_result = await self._watchdog_get() - else: - get_result = await super().get() - - # Value is always at the end of the tuple. - item = get_result[-1] - - if isinstance(item, WatchdogPriorityCancelSentinel): - logger.trace( - "Received WatchdogPriorityCancelSentinel, throwing CancelledError to force cancelling" - ) - raise asyncio.CancelledError("Cancelling watchdog queue get() call.") - else: - return get_result - - def task_done(self): - """Mark a task as done and reset watchdog if enabled. - - Should be called after processing each item retrieved from the queue. - """ - if self._manager.task_watchdog_enabled: - self._manager.task_reset_watchdog() - super().task_done() - - def cancel(self): - """Ensures reliable task cancellation by preventing a common race condition. - - The race condition occurs in Python 3.10+ when: - 1. A value is put in the queue just before task cancellation - 2. queue.get() completes before the cancellation signal is delivered - 3. The task misses the CancelledError and continues running indefinitely - - This method prevents the issue by injecting a special sentinel value that - forces the task to raise CancelledError when consumed, ensuring proper - task termination. - """ - item = [float("-inf")] * self._tuple_size - # Values go always at the end. - item[-1] = WatchdogPriorityCancelSentinel() - super().put_nowait(tuple(item)) - - async def _watchdog_get(self): - """Get item from queue while periodically resetting watchdog timer.""" - while True: - try: - item = await asyncio.wait_for(super().get(), timeout=self._timeout) - self._manager.task_reset_watchdog() - return item - except asyncio.TimeoutError: - self._manager.task_reset_watchdog() diff --git a/src/pipecat/utils/asyncio/watchdog_queue.py b/src/pipecat/utils/asyncio/watchdog_queue.py deleted file mode 100644 index 45bcaff66..000000000 --- a/src/pipecat/utils/asyncio/watchdog_queue.py +++ /dev/null @@ -1,110 +0,0 @@ -# -# Copyright (c) 2024–2025, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# - -"""Watchdog-enabled asyncio Queue for task monitoring. - -This module provides an asyncio Queue subclass that automatically resets -watchdog timers while waiting for items, preventing false positive watchdog -timeouts during legitimate queue operations. -""" - -import asyncio -from dataclasses import dataclass - -from loguru import logger - -from pipecat.utils.asyncio.task_manager import BaseTaskManager - - -@dataclass -class WatchdogQueueCancelSentinel: - """Sentinel object used in queues to force cancellation. - - An instance of this class is typically inserted into a `WatchdogQueue` to - act as a marker for asyncio task cancellation. - - """ - - pass - - -class WatchdogQueue(asyncio.Queue): - """Watchdog-enabled asyncio Queue. - - An asynchronous queue that resets the current task watchdog timer. This - is necessary to avoid task watchdog timers to expire while we are waiting to - get an item from the queue. - """ - - def __init__( - self, - manager: BaseTaskManager, - *, - maxsize: int = 0, - timeout: float = 2.0, - ) -> None: - """Initialize the watchdog queue. - - Args: - manager: The task manager for watchdog timer control. - maxsize: Maximum queue size. 0 means unlimited. - timeout: Timeout in seconds between watchdog resets while waiting. - """ - super().__init__(maxsize) - self._manager = manager - self._timeout = timeout - - async def get(self): - """Get an item from the queue with watchdog monitoring. - - Returns: - The next item from the queue. - """ - if self._manager.task_watchdog_enabled: - get_result = await self._watchdog_get() - else: - get_result = await super().get() - - if isinstance(get_result, WatchdogQueueCancelSentinel): - logger.trace( - "Received WatchdogQueueCancelFrame, throwing CancelledError to force cancelling" - ) - raise asyncio.CancelledError("Cancelling watchdog queue get() call.") - else: - return get_result - - def task_done(self): - """Mark a task as done and reset watchdog if enabled. - - Should be called after processing each item retrieved from the queue. - """ - if self._manager.task_watchdog_enabled: - self._manager.task_reset_watchdog() - super().task_done() - - def cancel(self): - """Ensures reliable task cancellation by preventing a common race condition. - - The race condition occurs in Python 3.10+ when: - 1. A value is put in the queue just before task cancellation - 2. queue.get() completes before the cancellation signal is delivered - 3. The task misses the CancelledError and continues running indefinitely - - This method prevents the issue by injecting a special sentinel value that - forces the task to raise CancelledError when consumed, ensuring proper - task termination. - """ - super().put_nowait(WatchdogQueueCancelSentinel()) - - async def _watchdog_get(self): - """Get item from queue while periodically resetting watchdog timer.""" - while True: - try: - item = await asyncio.wait_for(super().get(), timeout=self._timeout) - self._manager.task_reset_watchdog() - return item - except asyncio.TimeoutError: - self._manager.task_reset_watchdog() diff --git a/tests/test_watchdog_queue.py b/tests/test_watchdog_queue.py deleted file mode 100644 index 507f9cf02..000000000 --- a/tests/test_watchdog_queue.py +++ /dev/null @@ -1,65 +0,0 @@ -# -# Copyright (c) 2024-2025 Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# - -import asyncio -import unittest - -from pipecat.utils.asyncio.task_manager import TaskManager -from pipecat.utils.asyncio.watchdog_priority_queue import WatchdogPriorityQueue -from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue - - -class TestWatchdogQueue(unittest.IsolatedAsyncioTestCase): - async def test_simple_item(self): - queue = WatchdogQueue(TaskManager()) - await queue.put(1) - await queue.put(2) - await queue.put(3) - self.assertEqual(await queue.get(), 1) - queue.task_done() - self.assertEqual(await queue.get(), 2) - queue.task_done() - self.assertEqual(await queue.get(), 3) - queue.task_done() - - async def test_watchdog_sentinel(self): - queue = WatchdogQueue(TaskManager()) - await queue.put(1) - self.assertEqual(await queue.get(), 1) - queue.task_done() - # The get should throw an exception. - queue.cancel() - try: - await queue.get() - assert False - except asyncio.CancelledError: - assert True - - -class TestWatchdogPriorityQueue(unittest.IsolatedAsyncioTestCase): - async def test_simple_item(self): - queue = WatchdogPriorityQueue(TaskManager(), tuple_size=2) - await queue.put((3, 1)) - await queue.put((2, 1)) - await queue.put((1, 1)) - self.assertEqual(await queue.get(), (1, 1)) - queue.task_done() - self.assertEqual(await queue.get(), (2, 1)) - queue.task_done() - self.assertEqual(await queue.get(), (3, 1)) - queue.task_done() - - async def test_watchdog_sentinel(self): - queue = WatchdogPriorityQueue(TaskManager(), tuple_size=2) - await queue.put((0, 1)) - # The get should throw an exception because the watchdog sentinel has - # higher priority. - queue.cancel() - try: - await queue.get() - assert False - except asyncio.CancelledError: - assert True