remove watchdog timers and specific asyncio implementations

Watchdog timers have been removed. They were introduced in 0.0.72 to help
diagnose pipeline freezes. Unfortunately, they proved ineffective since they
required developers to use Pipecat-specific queues, iterators, and events to
correctly reset the timer, which limited their usefulness and added friction.
This commit is contained in:
Aleix Conchillo Flaqué
2025-08-21 15:05:24 -07:00
parent ddab95835b
commit 24a628c85e
49 changed files with 113 additions and 1027 deletions

View File

@@ -46,6 +46,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Removed
- Watchdog timers have been removed. They were introduced in 0.0.72 to help
diagnose pipeline freezes. Unfortunately, they proved ineffective since they
required developers to use Pipecat-specific queues, iterators, and events to
correctly reset the timer, which limited their usefulness and added friction.
- Removed unused `FrameProcessor.set_parent()` and
`FrameProcessor.get_parent()`.

View File

@@ -22,7 +22,6 @@ from pipecat.frames.frames import ControlFrame, EndFrame, Frame, SystemFrame
from pipecat.pipeline.base_pipeline import BasePipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
@dataclass
@@ -128,11 +127,35 @@ class SyncParallelPipeline(BasePipeline):
if len(args) == 0:
raise Exception(f"SyncParallelPipeline needs at least one argument")
self._args = args
self._sinks = []
self._sources = []
self._pipelines = []
self._up_queue = asyncio.Queue()
self._down_queue = asyncio.Queue()
logger.debug(f"Creating {self} pipelines")
for processors in args:
if not isinstance(processors, list):
raise TypeError(f"SyncParallelPipeline argument {processors} is not a list")
# We add a source at the beginning of the pipeline and a sink at the end.
up_queue = asyncio.Queue()
down_queue = asyncio.Queue()
source = SyncParallelPipelineSource(up_queue)
sink = SyncParallelPipelineSink(down_queue)
# Keep track of sources and sinks. We also keep the output queue of
# the source and the sinks so we can use it later.
self._sources.append({"processor": source, "queue": down_queue})
self._sinks.append({"processor": sink, "queue": up_queue})
# Create pipeline
pipeline = Pipeline(processors, source=source, sink=sink)
self._pipelines.append(pipeline)
logger.debug(f"Finished creating {self} pipelines")
#
# Frame processor
#
@@ -178,32 +201,6 @@ class SyncParallelPipeline(BasePipeline):
setup: Configuration for frame processor setup.
"""
await super().setup(setup)
self._up_queue = WatchdogQueue(setup.task_manager)
self._down_queue = WatchdogQueue(setup.task_manager)
logger.debug(f"Creating {self} pipelines")
for processors in self._args:
if not isinstance(processors, list):
raise TypeError(f"SyncParallelPipeline argument {processors} is not a list")
# We add a source at the beginning of the pipeline and a sink at the end.
up_queue = asyncio.Queue()
down_queue = asyncio.Queue()
source = SyncParallelPipelineSource(up_queue)
sink = SyncParallelPipelineSink(down_queue)
# Keep track of sources and sinks. We also keep the output queue of
# the source and the sinks so we can use it later.
self._sources.append({"processor": source, "queue": down_queue})
self._sinks.append({"processor": sink, "queue": up_queue})
# Create pipeline
pipeline = Pipeline(processors, source=source, sink=sink)
self._pipelines.append(pipeline)
logger.debug(f"Finished creating {self} pipelines")
await asyncio.gather(*[p.setup(setup) for p in self._pipelines])
async def cleanup(self):

View File

@@ -49,13 +49,7 @@ from pipecat.pipeline.base_task import BasePipelineTask, PipelineTaskParams
from pipecat.pipeline.pipeline import Pipeline, PipelineSink, PipelineSource
from pipecat.pipeline.task_observer import TaskObserver
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
from pipecat.utils.asyncio.task_manager import (
WATCHDOG_TIMEOUT,
BaseTaskManager,
TaskManager,
TaskManagerParams,
)
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
from pipecat.utils.asyncio.task_manager import BaseTaskManager, TaskManager, TaskManagerParams
from pipecat.utils.tracing.setup import is_tracing_available
from pipecat.utils.tracing.turn_trace_observer import TurnTraceObserver
@@ -146,8 +140,6 @@ class PipelineTask(BasePipelineTask):
conversation_id: Optional[str] = None,
enable_tracing: bool = False,
enable_turn_tracking: bool = True,
enable_watchdog_logging: bool = False,
enable_watchdog_timers: bool = False,
idle_timeout_frames: Tuple[Type[Frame], ...] = (
BotSpeakingFrame,
InterimTranscriptionFrame,
@@ -159,7 +151,6 @@ class PipelineTask(BasePipelineTask):
idle_timeout_secs: Optional[float] = 300,
observers: Optional[List[BaseObserver]] = None,
task_manager: Optional[BaseTaskManager] = None,
watchdog_timeout_secs: float = WATCHDOG_TIMEOUT,
):
"""Initialize the PipelineTask.
@@ -175,8 +166,6 @@ class PipelineTask(BasePipelineTask):
conversation_id: Optional custom ID for the conversation.
enable_tracing: Whether to enable tracing.
enable_turn_tracking: Whether to enable turn tracking.
enable_watchdog_logging: Whether to print task processing times.
enable_watchdog_timers: Whether to enable task watchdog timers.
idle_timeout_frames: A tuple with the frames that should trigger an idle
timeout if not received within `idle_timeout_seconds`.
idle_timeout_secs: Timeout (in seconds) to consider pipeline idle or
@@ -184,8 +173,6 @@ class PipelineTask(BasePipelineTask):
automatically.
observers: List of observers for monitoring pipeline execution.
task_manager: Optional task manager for handling asyncio tasks.
watchdog_timeout_secs: Watchdog timer timeout (in seconds). A warning
will be logged if the watchdog timer is not reset before this timeout.
"""
super().__init__()
self._params = params or PipelineParams()
@@ -196,11 +183,8 @@ class PipelineTask(BasePipelineTask):
self._conversation_id = conversation_id
self._enable_tracing = enable_tracing and is_tracing_available()
self._enable_turn_tracking = enable_turn_tracking
self._enable_watchdog_logging = enable_watchdog_logging
self._enable_watchdog_timers = enable_watchdog_timers
self._idle_timeout_frames = idle_timeout_frames
self._idle_timeout_secs = idle_timeout_secs
self._watchdog_timeout_secs = watchdog_timeout_secs
if self._params.observers:
import warnings
@@ -232,17 +216,17 @@ class PipelineTask(BasePipelineTask):
self._task_manager = task_manager or TaskManager()
# This queue is the queue used to push frames to the pipeline.
self._push_queue = WatchdogQueue(self._task_manager)
self._push_queue = asyncio.Queue()
self._process_push_task: Optional[asyncio.Task] = None
# This is the heartbeat queue. When a heartbeat frame is received in the
# down queue we add it to the heartbeat queue for processing.
self._heartbeat_queue = WatchdogQueue(self._task_manager)
self._heartbeat_queue = asyncio.Queue()
self._heartbeat_push_task: Optional[asyncio.Task] = None
self._heartbeat_monitor_task: Optional[asyncio.Task] = None
# This is the idle queue. When frames are received downstream they are
# put in the queue. If no frame is received the pipeline is considered
# idle.
self._idle_queue = WatchdogQueue(self._task_manager)
self._idle_queue = asyncio.Queue()
self._idle_monitor_task: Optional[asyncio.Task] = None
# This event is used to indicate a finalize frame (e.g. EndFrame,
# StopFrame) has been received in the down queue.
@@ -491,7 +475,6 @@ class PipelineTask(BasePipelineTask):
async def _maybe_cancel_idle_task(self):
"""Cancel idle monitoring task if it is running."""
if self._idle_timeout_secs and self._idle_monitor_task:
self._idle_queue.cancel()
await self._task_manager.cancel_task(self._idle_monitor_task)
self._idle_monitor_task = None
@@ -511,19 +494,13 @@ class PipelineTask(BasePipelineTask):
async def _setup(self, params: PipelineTaskParams):
"""Set up the pipeline task and all processors."""
mgr_params = TaskManagerParams(
loop=params.loop,
enable_watchdog_logging=self._enable_watchdog_logging,
enable_watchdog_timers=self._enable_watchdog_timers,
watchdog_timeout=self._watchdog_timeout_secs,
)
mgr_params = TaskManagerParams(loop=params.loop)
self._task_manager.setup(mgr_params)
setup = FrameProcessorSetup(
clock=self._clock,
task_manager=self._task_manager,
observer=self._observer,
watchdog_timers_enabled=self._enable_watchdog_timers,
)
await self._pipeline.setup(setup)

View File

@@ -19,7 +19,6 @@ from attr import dataclass
from pipecat.observers.base_observer import BaseObserver, FrameProcessed, FramePushed
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
@dataclass
@@ -86,7 +85,7 @@ class TaskObserver(BaseObserver):
# If we already started, create a new proxy for the observer.
# Otherwise, it will be created in start().
if self._started():
if self._proxies:
proxy = self._create_proxy(observer)
self._proxies[observer] = proxy
@@ -97,7 +96,7 @@ class TaskObserver(BaseObserver):
observer: The observer to remove.
"""
# If the observer has a proxy, remove it.
if observer in self._proxies:
if self._proxies and observer in self._proxies:
proxy = self._proxies[observer]
# Remove the proxy so it doesn't get called anymore.
del self._proxies[observer]
@@ -136,13 +135,9 @@ class TaskObserver(BaseObserver):
"""
await self._send_to_proxy(data)
def _started(self) -> bool:
"""Check if the task observer has been started."""
return self._proxies is not None
def _create_proxy(self, observer: BaseObserver) -> Proxy:
"""Create a proxy for a single observer."""
queue = WatchdogQueue(self._task_manager)
queue = asyncio.Queue()
task = self._task_manager.create_task(
self._proxy_task_handler(queue, observer),
f"TaskObserver::{observer}::_proxy_task_handler",

View File

@@ -25,7 +25,6 @@ from pipecat.frames.frames import (
TranscriptionFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
from pipecat.utils.asyncio.watchdog_event import WatchdogEvent
from pipecat.utils.time import time_now_iso8601
@@ -63,13 +62,9 @@ class DTMFAggregator(FrameProcessor):
self._termination_digit = termination_digit
self._prefix = prefix
self._digit_event = asyncio.Event()
self._aggregation_task: Optional[asyncio.Task] = None
async def setup(self, setup: FrameProcessorSetup):
"""Setup resources."""
await super().setup(setup)
self._digit_event = WatchdogEvent(setup.task_manager)
async def cleanup(self) -> None:
"""Clean up resources."""
await super().cleanup()
@@ -137,7 +132,6 @@ class DTMFAggregator(FrameProcessor):
await asyncio.wait_for(self._digit_event.wait(), timeout=self._idle_timeout)
self._digit_event.clear()
except asyncio.TimeoutError:
self.reset_watchdog()
if self._aggregation:
await self._flush_aggregation()

View File

@@ -684,7 +684,6 @@ class LLMUserContextAggregator(LLMContextResponseAggregator):
)
self._emulating_vad = False
finally:
self.reset_watchdog()
self._aggregation_event.clear()
async def _maybe_emulate_user_speaking(self):

View File

@@ -12,7 +12,6 @@ from typing import Awaitable, Callable, Optional
from pipecat.frames.frames import CancelFrame, EndFrame, Frame, StartFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.producer_processor import ProducerProcessor, identity_transformer
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
class ConsumerProcessor(FrameProcessor):
@@ -66,7 +65,7 @@ class ConsumerProcessor(FrameProcessor):
async def _start(self, _: StartFrame):
"""Start the consumer task and register with the producer."""
if not self._consumer_task:
self._queue: WatchdogQueue = self._producer.add_consumer()
self._queue = self._producer.add_consumer()
self._consumer_task = self.create_task(self._consumer_task_handler())
async def _stop(self, _: EndFrame):
@@ -77,7 +76,6 @@ class ConsumerProcessor(FrameProcessor):
async def _cancel(self, _: CancelFrame):
"""Cancel the consumer task."""
if self._consumer_task:
self._queue.cancel()
await self.cancel_task(self._consumer_task)
async def _consumer_task_handler(self):

View File

@@ -37,9 +37,6 @@ from pipecat.metrics.metrics import LLMTokenUsage, MetricsData
from pipecat.observers.base_observer import BaseObserver, FrameProcessed, FramePushed
from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.asyncio.watchdog_event import WatchdogEvent
from pipecat.utils.asyncio.watchdog_priority_queue import WatchdogPriorityQueue
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
from pipecat.utils.base_object import BaseObject
@@ -66,16 +63,14 @@ class FrameProcessorSetup:
clock: The clock instance for timing operations.
task_manager: The task manager for handling async operations.
observer: Optional observer for monitoring frame processing events.
watchdog_timers_enabled: Whether to enable watchdog timers by default.
"""
clock: BaseClock
task_manager: BaseTaskManager
observer: Optional[BaseObserver] = None
watchdog_timers_enabled: bool = False
class FrameProcessorQueue(WatchdogPriorityQueue):
class FrameProcessorQueue(asyncio.PriorityQueue):
"""A priority queue for systems frames and other frames.
This is a specialized queue for frame processors that separates and
@@ -87,14 +82,14 @@ class FrameProcessorQueue(WatchdogPriorityQueue):
HIGH_PRIORITY = 1
LOW_PRIORITY = 2
def __init__(self, manager: BaseTaskManager):
def __init__(self):
"""Initialize the FrameProcessorQueue.
Args:
manager (BaseTaskManager): The task manager used by the internal watchdog queues.
"""
super().__init__(manager, tuple_size=3)
super().__init__()
self.__high_counter = 0
self.__low_counter = 0
@@ -148,10 +143,7 @@ class FrameProcessor(BaseObject):
*,
name: Optional[str] = None,
enable_direct_mode: bool = False,
enable_watchdog_logging: Optional[bool] = None,
enable_watchdog_timers: Optional[bool] = None,
metrics: Optional[FrameProcessorMetrics] = None,
watchdog_timeout_secs: Optional[float] = None,
**kwargs,
):
"""Initialize the frame processor.
@@ -159,10 +151,7 @@ class FrameProcessor(BaseObject):
Args:
name: Optional name for this processor instance.
enable_direct_mode: Whether to process frames immediately or use internal queues.
enable_watchdog_logging: Whether to enable watchdog logging for tasks.
enable_watchdog_timers: Whether to enable watchdog timers for tasks.
metrics: Optional metrics collector for this processor.
watchdog_timeout_secs: Timeout in seconds for watchdog operations.
**kwargs: Additional arguments passed to parent class.
"""
super().__init__(name=name, **kwargs)
@@ -172,15 +161,6 @@ class FrameProcessor(BaseObject):
# Enable direct mode to skip queues and process frames right away.
self._enable_direct_mode = enable_direct_mode
# Enable watchdog timers for all tasks created by this frame processor.
self._enable_watchdog_timers = enable_watchdog_timers
# Enable watchdog logging for all tasks created by this frame processor.
self._enable_watchdog_logging = enable_watchdog_logging
# Allow this frame processor to control their tasks timeout.
self._watchdog_timeout_secs = watchdog_timeout_secs
# Clock
self._clock: Optional[BaseClock] = None
@@ -434,23 +414,12 @@ class FrameProcessor(BaseObject):
await self.stop_ttfb_metrics()
await self.stop_processing_metrics()
def create_task(
self,
coroutine: Coroutine,
name: Optional[str] = None,
*,
enable_watchdog_logging: Optional[bool] = None,
enable_watchdog_timers: Optional[bool] = None,
watchdog_timeout_secs: Optional[float] = None,
) -> asyncio.Task:
def create_task(self, coroutine: Coroutine, name: Optional[str] = None) -> asyncio.Task:
"""Create a new task managed by this processor.
Args:
coroutine: The coroutine to run in the task.
name: Optional name for the task.
enable_watchdog_logging: Whether to enable watchdog logging.
enable_watchdog_timers: Whether to enable watchdog timers.
watchdog_timeout_secs: Timeout in seconds for watchdog operations.
Returns:
The created asyncio task.
@@ -459,21 +428,7 @@ class FrameProcessor(BaseObject):
name = f"{self}::{name}"
else:
name = f"{self}::{coroutine.cr_code.co_name}"
return self.task_manager.create_task(
coroutine,
name,
enable_watchdog_logging=(
enable_watchdog_logging
if enable_watchdog_logging
else self._enable_watchdog_logging
),
enable_watchdog_timers=(
enable_watchdog_timers if enable_watchdog_timers else self._enable_watchdog_timers
),
watchdog_timeout=(
watchdog_timeout_secs if watchdog_timeout_secs else self._watchdog_timeout_secs
),
)
return self.task_manager.create_task(coroutine, name)
async def cancel_task(self, task: asyncio.Task, timeout: Optional[float] = None):
"""Cancel a task managed by this processor.
@@ -493,10 +448,6 @@ class FrameProcessor(BaseObject):
"""
await self.task_manager.wait_for_task(task, timeout)
def reset_watchdog(self):
"""Reset the watchdog timer for the current task."""
self.task_manager.task_reset_watchdog()
async def setup(self, setup: FrameProcessorSetup):
"""Set up the processor with required components.
@@ -506,11 +457,6 @@ class FrameProcessor(BaseObject):
self._clock = setup.clock
self._task_manager = setup.task_manager
self._observer = setup.observer
self._watchdog_timers_enabled = (
self._enable_watchdog_timers
if self._enable_watchdog_timers
else setup.watchdog_timers_enabled
)
# Create processing tasks.
self.__create_input_task()
@@ -774,14 +720,13 @@ class FrameProcessor(BaseObject):
return
if not self.__input_frame_task:
self.__input_event = WatchdogEvent(self.task_manager)
self.__input_queue = FrameProcessorQueue(self.task_manager)
self.__input_event = asyncio.Event()
self.__input_queue = FrameProcessorQueue()
self.__input_frame_task = self.create_task(self.__input_frame_task_handler())
async def __cancel_input_task(self):
"""Cancel the frame input processing task."""
if self.__input_frame_task:
self.__input_queue.cancel()
await self.cancel_task(self.__input_frame_task)
self.__input_frame_task = None
@@ -792,14 +737,13 @@ class FrameProcessor(BaseObject):
if not self.__process_frame_task:
self.__should_block_frames = False
self.__process_event = WatchdogEvent(self.task_manager)
self.__process_queue = WatchdogQueue(self.task_manager)
self.__process_event = asyncio.Event()
self.__process_queue = asyncio.Queue()
self.__process_frame_task = self.create_task(self.__process_frame_task_handler())
async def __cancel_process_task(self):
"""Cancel the non-system frame processing task."""
if self.__process_frame_task:
self.__process_queue.cancel()
await self.cancel_task(self.__process_frame_task)
self.__process_frame_task = None

View File

@@ -72,11 +72,9 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.llm_service import (
FunctionCallParams, # TODO(aleix): we shouldn't import `services` from `processors`
)
from pipecat.services.openai.llm import OpenAIContextAggregatorPair
from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
from pipecat.utils.string import match_endofsentence
RTVI_PROTOCOL_VERSION = "1.0.0"
@@ -1315,10 +1313,10 @@ class RTVIProcessor(FrameProcessor):
async def _start(self, frame: StartFrame):
"""Start the RTVI processor tasks."""
if not self._action_task:
self._action_queue = WatchdogQueue(self.task_manager)
self._action_queue = asyncio.Queue()
self._action_task = self.create_task(self._action_task_handler())
if not self._message_task:
self._message_queue = WatchdogQueue(self.task_manager)
self._message_queue = asyncio.Queue()
self._message_task = self.create_task(self._message_task_handler())
await self._call_event_handler("on_bot_started")
@@ -1333,12 +1331,10 @@ class RTVIProcessor(FrameProcessor):
async def _cancel_tasks(self):
"""Cancel all running tasks."""
if self._action_task:
self._action_queue.cancel()
await self.cancel_task(self._action_task)
self._action_task = None
if self._message_task:
self._message_queue.cancel()
await self.cancel_task(self._message_task)
self._message_task = None

View File

@@ -11,7 +11,6 @@ from typing import Awaitable, Callable, List, Optional
from pipecat.frames.frames import Frame, StartFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.asyncio.watchdog_event import WatchdogEvent
class IdleFrameProcessor(FrameProcessor):
@@ -78,7 +77,7 @@ class IdleFrameProcessor(FrameProcessor):
def _create_idle_task(self):
"""Create and start the idle monitoring task."""
if not self._idle_task:
self._idle_event = WatchdogEvent(self.task_manager)
self._idle_event = asyncio.Event()
self._idle_task = self.create_task(self._idle_task_handler())
async def _idle_task_handler(self):

View File

@@ -9,7 +9,6 @@
from loguru import logger
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
try:
import sentry_sdk
@@ -51,7 +50,7 @@ class SentryMetrics(FrameProcessorMetrics):
"""
await super().setup(task_manager)
if self._sentry_available:
self._sentry_queue = WatchdogQueue(task_manager)
self._sentry_queue = asyncio.Queue()
self._sentry_task = self.task_manager.create_task(
self._sentry_task_handler(), name=f"{self}::_sentry_task_handler"
)

View File

@@ -11,7 +11,6 @@ from typing import Awaitable, Callable, List
from pipecat.frames.frames import Frame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
async def identity_transformer(frame: Frame):
@@ -64,7 +63,7 @@ class ProducerProcessor(FrameProcessor):
Returns:
asyncio.Queue: The queue for the newly added consumer.
"""
queue = WatchdogQueue(self.task_manager)
queue = asyncio.Queue()
self._consumers.append(queue)
return queue

View File

@@ -22,7 +22,6 @@ from pipecat.frames.frames import (
UserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.asyncio.watchdog_event import WatchdogEvent
class UserIdleProcessor(FrameProcessor):
@@ -78,7 +77,7 @@ class UserIdleProcessor(FrameProcessor):
self._interrupted = False
self._conversation_started = False
self._idle_task = None
self._idle_event = None
self._idle_event = asyncio.Event()
def _wrap_callback(
self,
@@ -138,9 +137,6 @@ class UserIdleProcessor(FrameProcessor):
"""
await super().process_frame(frame, direction)
if isinstance(frame, StartFrame):
self._idle_event = WatchdogEvent(self.task_manager)
# Check for end frames before processing
if isinstance(frame, (EndFrame, CancelFrame)):
# Stop the idle task, if it exists

View File

@@ -53,7 +53,6 @@ from pipecat.processors.aggregators.openai_llm_context import (
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
from pipecat.utils.tracing.service_decorators import traced_llm
try:
@@ -290,7 +289,7 @@ class AnthropicLLMService(LLMService):
json_accumulator = ""
function_calls = []
async for event in WatchdogAsyncIterator(response, manager=self.task_manager):
async for event in response:
# Aggregate streaming content, create frames, trigger events
if event.type == "content_block_delta":

View File

@@ -244,11 +244,9 @@ class AssemblyAISTTService(STTService):
try:
while self._connected:
try:
message = await asyncio.wait_for(self._websocket.recv(), timeout=1.0)
message = await self._websocket.recv()
data = json.loads(message)
await self._handle_message(data)
except asyncio.TimeoutError:
self.reset_watchdog()
except websockets.exceptions.ConnectionClosedOK:
break
except Exception as e:

View File

@@ -29,7 +29,6 @@ from pipecat.frames.frames import (
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.tts_service import InterruptibleTTSService, TTSService
from pipecat.transcriptions.language import Language
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
from pipecat.utils.tracing.service_decorators import traced_tts
try:
@@ -276,9 +275,7 @@ class AsyncAITTSService(InterruptibleTTSService):
self._started = False
async def _receive_messages(self):
async for message in WatchdogAsyncIterator(
self._get_websocket(), manager=self.task_manager
):
async for message in self._get_websocket():
msg = json.loads(message)
if not msg:
continue
@@ -301,9 +298,8 @@ class AsyncAITTSService(InterruptibleTTSService):
async def _keepalive_task_handler(self):
"""Send periodic keepalive messages to maintain WebSocket connection."""
KEEPALIVE_SLEEP = 10 if self.task_manager.task_watchdog_enabled else 3
KEEPALIVE_SLEEP = 3
while True:
self.reset_watchdog()
await asyncio.sleep(KEEPALIVE_SLEEP)
try:
if self._websocket and self._websocket.state is State.OPEN:

View File

@@ -954,8 +954,6 @@ class AWSBedrockLLMService(LLMService):
function_calls = []
async for event in response["stream"]:
self.reset_watchdog()
# Handle text content
if "contentBlockDelta" in event:
delta = event["contentBlockDelta"]["delta"]

View File

@@ -480,7 +480,7 @@ class AWSTranscribeSTTService(STTService):
break
try:
response = await asyncio.wait_for(self._ws_client.recv(), timeout=1.0)
response = await self._ws_client.recv()
headers, payload = decode_event(response)
@@ -531,8 +531,6 @@ class AWSTranscribeSTTService(STTService):
else:
logger.debug(f"{self} Other message type received: {headers}")
logger.debug(f"{self} Payload: {payload}")
except asyncio.TimeoutError:
self.reset_watchdog()
except websockets.exceptions.ConnectionClosed as e:
logger.error(
f"{self} WebSocket connection closed in receive loop with code {e.code}: {e.reason}"

View File

@@ -62,7 +62,6 @@ from pipecat.services.aws_nova_sonic.context import (
)
from pipecat.services.aws_nova_sonic.frames import AWSNovaSonicFunctionCallResultFrame
from pipecat.services.llm_service import LLMService
from pipecat.utils.asyncio.watchdog_coroutine import watchdog_coroutine
from pipecat.utils.time import time_now_iso8601
try:
@@ -795,7 +794,7 @@ class AWSNovaSonicLLMService(LLMService):
try:
while self._stream and not self._disconnecting:
output = await self._stream.await_output()
result = await watchdog_coroutine(output[1].receive(), manager=self.task_manager)
result = await output[1].receive()
if result.value and result.value.bytes_:
response_data = result.value.bytes_.decode("utf-8")

View File

@@ -29,7 +29,6 @@ from pipecat.frames.frames import (
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.tts_service import AudioContextWordTTSService, TTSService
from pipecat.transcriptions.language import Language
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
from pipecat.utils.text.base_text_aggregator import BaseTextAggregator
from pipecat.utils.text.skip_tags_aggregator import SkipTagsAggregator
from pipecat.utils.tracing.service_decorators import traced_tts
@@ -388,9 +387,7 @@ class CartesiaTTSService(AudioContextWordTTSService):
self._context_id = None
async def _receive_messages(self):
async for message in WatchdogAsyncIterator(
self._get_websocket(), manager=self.task_manager
):
async for message in self._get_websocket():
msg = json.loads(message)
if not msg or not self.audio_context_available(msg["context_id"]):
continue

View File

@@ -38,7 +38,6 @@ from pipecat.services.tts_service import (
WordTTSService,
)
from pipecat.transcriptions.language import Language
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
from pipecat.utils.tracing.service_decorators import traced_tts
# See .env.example for ElevenLabs configuration needed
@@ -574,9 +573,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
async def _receive_messages(self):
"""Handle incoming WebSocket messages from ElevenLabs."""
async for message in WatchdogAsyncIterator(
self._get_websocket(), manager=self.task_manager
):
async for message in self._get_websocket():
msg = json.loads(message)
received_ctx_id = msg.get("contextId")
@@ -635,9 +632,8 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
async def _keepalive_task_handler(self):
"""Send periodic keepalive messages to maintain WebSocket connection."""
KEEPALIVE_SLEEP = 10 if self.task_manager.task_watchdog_enabled else 3
KEEPALIVE_SLEEP = 10
while True:
self.reset_watchdog()
await asyncio.sleep(KEEPALIVE_SLEEP)
try:
if self._websocket and self._websocket.state is State.OPEN:

View File

@@ -67,7 +67,6 @@ from pipecat.services.openai.llm import (
OpenAIUserContextAggregator,
)
from pipecat.transcriptions.language import Language
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
from pipecat.utils.string import match_endofsentence
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_gemini_live, traced_stt
@@ -929,7 +928,7 @@ class GeminiMultimodalLiveLLMService(LLMService):
async def _receive_task_handler(self):
"""Handle incoming messages from the WebSocket connection."""
async for message in WatchdogAsyncIterator(self._websocket, manager=self.task_manager):
async for message in self._websocket:
evt = events.parse_server_event(message)
# logger.debug(f"Received event: {message[:500]}")
# logger.debug(f"Received event: {evt}")

View File

@@ -32,7 +32,6 @@ from pipecat.frames.frames import (
from pipecat.services.gladia.config import GladiaInputParams
from pipecat.services.stt_service import STTService
from pipecat.transcriptions.language import Language
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt
@@ -536,9 +535,8 @@ class GladiaSTTService(STTService):
async def _keepalive_task_handler(self):
"""Send periodic empty audio chunks to keep the connection alive."""
try:
KEEPALIVE_SLEEP = 20 if self.task_manager.task_watchdog_enabled else 3
KEEPALIVE_SLEEP = 20
while self._connection_active:
self.reset_watchdog()
# Send keepalive (Gladia times out after 30 seconds)
await asyncio.sleep(KEEPALIVE_SLEEP)
if self._websocket and self._websocket.state is State.OPEN:
@@ -555,7 +553,7 @@ class GladiaSTTService(STTService):
async def _receive_task_handler(self):
try:
async for message in WatchdogAsyncIterator(self._websocket, manager=self.task_manager):
async for message in self._websocket:
content = json.loads(message)
# Handle audio chunk acknowledgments
@@ -613,8 +611,6 @@ class GladiaSTTService(STTService):
translation, "", time_now_iso8601(), translated_language
)
)
self.reset_watchdog()
except websockets.exceptions.ConnectionClosed:
# Expected when closing the connection
pass

View File

@@ -53,7 +53,6 @@ from pipecat.services.openai.llm import (
OpenAIAssistantContextAggregator,
OpenAIUserContextAggregator,
)
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
from pipecat.utils.tracing.service_decorators import traced_llm
# Suppress gRPC fork warnings
@@ -807,7 +806,7 @@ class GoogleLLMService(LLMService):
)
function_calls = []
async for chunk in WatchdogAsyncIterator(response, manager=self.task_manager):
async for chunk in response:
# Stop TTFB metrics after the first chunk
await self.stop_ttfb_metrics()
if chunk.usage_metadata:

View File

@@ -17,7 +17,6 @@ from openai import AsyncStream
from openai.types.chat import ChatCompletionChunk
from pipecat.services.llm_service import FunctionCallFromLLM
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
# Suppress gRPC fork warnings
os.environ["GRPC_ENABLE_FORK_SUPPORT"] = "false"
@@ -77,7 +76,7 @@ class GoogleLLMOpenAIBetaService(OpenAILLMService):
context
)
async for chunk in WatchdogAsyncIterator(chunk_stream, manager=self.task_manager):
async for chunk in chunk_stream:
if chunk.usage:
tokens = LLMTokenUsage(
prompt_tokens=chunk.usage.prompt_tokens,

View File

@@ -16,7 +16,6 @@ import json
import os
import time
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
from pipecat.utils.tracing.service_decorators import traced_stt
# Suppress gRPC fork warnings
@@ -781,7 +780,6 @@ class GoogleSTTService(STTService):
if self._request_queue.empty():
# wait for 10ms in case we don't have audio
await asyncio.sleep(0.01)
self.reset_watchdog()
continue
# Start bi-directional streaming
@@ -836,9 +834,7 @@ class GoogleSTTService(STTService):
async def _process_responses(self, streaming_recognize):
"""Process streaming recognition responses."""
try:
async for response in WatchdogAsyncIterator(
streaming_recognize, manager=self.task_manager
):
async for response in streaming_recognize:
# Check streaming limit
if (int(time.time() * 1000) - self._stream_start_time) > self.STREAMING_LIMIT:
logger.debug("Stream timeout reached in response processing")

View File

@@ -31,7 +31,6 @@ from pipecat.processors.frame_processor import FrameProcessorSetup
from pipecat.services.heygen.api import HeyGenApi, HeyGenSession, NewSessionRequest
from pipecat.transports.base_transport import TransportParams
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
try:
from livekit import rtc
@@ -104,7 +103,7 @@ class HeyGenClient:
self._connected = False
self._session_request = session_request
self._callbacks = callbacks
self._event_queue: Optional[WatchdogQueue] = None
self._event_queue: Optional[asyncio.Queue] = None
self._event_task = None
# Currently supporting to capture the audio and video from a single participant
self._video_task = None
@@ -149,7 +148,7 @@ class HeyGenClient:
try:
await self._initialize()
self._event_queue = WatchdogQueue(self._task_manager)
self._event_queue = asyncio.Queue()
self._event_task = self._task_manager.create_task(
self._callback_task_handler(self._event_queue),
f"{self}::event_callback_task",
@@ -170,7 +169,6 @@ class HeyGenClient:
self._connected = False
if self._event_task and self._task_manager:
self._event_queue.cancel()
await self._task_manager.cancel_task(self._event_task)
self._event_task = None
except Exception as e:
@@ -231,11 +229,9 @@ class HeyGenClient:
"""Handle incoming WebSocket messages."""
while self._connected:
try:
message = await asyncio.wait_for(self._websocket.recv(), timeout=1.0)
message = await self._websocket.recv()
parsed_message = json.loads(message)
await self._handle_ws_server_event(parsed_message)
except asyncio.TimeoutError:
self._task_manager.task_reset_watchdog()
except ConnectionClosedOK:
break
except Exception as e:

View File

@@ -40,7 +40,6 @@ from pipecat.services.ai_service import AIService
from pipecat.services.heygen.api import NewSessionRequest
from pipecat.services.heygen.client import HEY_GEN_SAMPLE_RATE, HeyGenCallbacks, HeyGenClient
from pipecat.transports.base_transport import TransportParams
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
# Using the same values that we do in the BaseOutputTransport
AVATAR_VAD_STOP_SECS = 0.35
@@ -278,21 +277,14 @@ class HeyGenVideoService(AIService):
await self._client.stop()
async def _create_send_task(self):
"""Create the audio sending task if it doesn't exist.
Initializes a new WatchdogQueue and creates a task for handling audio sending.
"""
"""Create the audio sending task if it doesn't exist."""
if not self._send_task:
self._queue = WatchdogQueue(self.task_manager)
self._queue = asyncio.Queue()
self._send_task = self.create_task(self._send_task_handler())
async def _cancel_send_task(self):
"""Cancel the audio sending task if it exists.
Cancels and cleans up the audio sending task and associated queue.
"""
"""Cancel the audio sending task if it exists."""
if self._send_task:
self._queue.cancel()
await self.cancel_task(self._send_task)
self._send_task = None

View File

@@ -36,7 +36,6 @@ from pipecat.frames.frames import (
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.tts_service import InterruptibleTTSService, TTSService
from pipecat.transcriptions.language import Language
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
from pipecat.utils.tracing.service_decorators import traced_tts
try:
@@ -315,7 +314,7 @@ class NeuphonicTTSService(InterruptibleTTSService):
async def _receive_messages(self):
"""Receive and process messages from Neuphonic WebSocket."""
async for message in WatchdogAsyncIterator(self._websocket, manager=self.task_manager):
async for message in self._websocket:
if isinstance(message, str):
msg = json.loads(message)
if msg.get("data") and msg["data"].get("audio"):
@@ -327,9 +326,8 @@ class NeuphonicTTSService(InterruptibleTTSService):
async def _keepalive_task_handler(self):
"""Handle keepalive messages to maintain WebSocket connection."""
KEEPALIVE_SLEEP = 10 if self.task_manager.task_watchdog_enabled else 3
KEEPALIVE_SLEEP = 10
while True:
self.reset_watchdog()
await asyncio.sleep(KEEPALIVE_SLEEP)
await self._send_keepalive()

View File

@@ -39,7 +39,6 @@ from pipecat.processors.aggregators.openai_llm_context import (
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
from pipecat.utils.tracing.service_decorators import traced_llm
@@ -284,7 +283,7 @@ class BaseOpenAILLMService(LLMService):
context
)
async for chunk in WatchdogAsyncIterator(chunk_stream, manager=self.task_manager):
async for chunk in chunk_stream:
if chunk.usage:
tokens = LLMTokenUsage(
prompt_tokens=chunk.usage.prompt_tokens,

View File

@@ -53,7 +53,6 @@ from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
from pipecat.services.openai.llm import OpenAIContextAggregatorPair
from pipecat.transcriptions.language import Language
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_openai_realtime, traced_stt
@@ -456,7 +455,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
#
async def _receive_task_handler(self):
async for message in WatchdogAsyncIterator(self._websocket, manager=self.task_manager):
async for message in self._websocket:
evt = events.parse_server_event(message)
if evt.type == "session.created":
await self._handle_evt_session_created(evt)

View File

@@ -24,7 +24,6 @@ from pipecat.frames.frames import (
)
from pipecat.services.stt_service import SegmentedSTTService, STTService
from pipecat.transcriptions.language import Language
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt
@@ -239,13 +238,13 @@ class RivaSTTService(STTService):
riva.client.add_custom_configuration_to_config(config, self._custom_configuration)
self._config = config
self._queue = WatchdogQueue(self.task_manager)
self._queue = asyncio.Queue()
if not self._thread_task:
self._thread_task = self.create_task(self._thread_task_handler())
if not self._response_task:
self._response_queue = WatchdogQueue(self.task_manager)
self._response_queue = asyncio.Queue()
self._response_task = self.create_task(self._response_task_handler())
async def stop(self, frame: EndFrame):

View File

@@ -20,7 +20,6 @@ from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.llm_service import FunctionCallFromLLM
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
from pipecat.utils.tracing.service_decorators import traced_llm
@@ -127,7 +126,7 @@ class SambaNovaLLMService(OpenAILLMService): # type: ignore
context
)
async for chunk in WatchdogAsyncIterator(chunk_stream, manager=self.task_manager):
async for chunk in chunk_stream:
if chunk.usage:
tokens = LLMTokenUsage(
prompt_tokens=chunk.usage.prompt_tokens,

View File

@@ -22,7 +22,6 @@ from pipecat.frames.frames import (
UserStartedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, StartFrame
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
try:
from av.audio.frame import AudioFrame
@@ -96,7 +95,7 @@ class SimliVideoService(FrameProcessor):
"""Consume audio frames from Simli and push them downstream."""
await self._pipecat_resampler_event.wait()
audio_iterator = self._simli_client.getAudioStreamIterator()
async for audio_frame in WatchdogAsyncIterator(audio_iterator, manager=self.task_manager):
async for audio_frame in audio_iterator:
resampled_frames = self._pipecat_resampler.resample(audio_frame)
for resampled_frame in resampled_frames:
audio_array = resampled_frame.to_ndarray()
@@ -114,7 +113,7 @@ class SimliVideoService(FrameProcessor):
"""Consume video frames from Simli and convert them to output frames."""
await self._pipecat_resampler_event.wait()
video_iterator = self._simli_client.getVideoStreamIterator(targetFormat="rgb24")
async for video_frame in WatchdogAsyncIterator(video_iterator, manager=self.task_manager):
async for video_frame in video_iterator:
# Process the video frame
convertedFrame: OutputImageRawFrame = OutputImageRawFrame(
image=video_frame.to_rgb().to_image().tobytes(),

View File

@@ -35,7 +35,6 @@ from pipecat.frames.frames import (
from pipecat.processors.frame_processor import FrameDirection, FrameProcessorSetup
from pipecat.services.ai_service import AIService
from pipecat.transports.services.tavus import TavusCallbacks, TavusParams, TavusTransportClient
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
class TavusVideoService(AIService):
@@ -255,13 +254,12 @@ class TavusVideoService(AIService):
async def _create_send_task(self):
"""Create the audio sending task if it doesn't exist."""
if not self._send_task:
self._queue = WatchdogQueue(self.task_manager)
self._queue = asyncio.Queue()
self._send_task = self.create_task(self._send_task_handler())
async def _cancel_send_task(self):
"""Cancel the audio sending task if it exists."""
if self._send_task:
self._queue.cancel()
await self.cancel_task(self._send_task)
self._send_task = None

View File

@@ -37,7 +37,6 @@ from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_service import AIService
from pipecat.services.websocket_service import WebsocketService
from pipecat.transcriptions.language import Language
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
from pipecat.utils.text.base_text_aggregator import BaseTextAggregator
from pipecat.utils.text.base_text_filter import BaseTextFilter
from pipecat.utils.text.simple_text_aggregator import SimpleTextAggregator
@@ -438,8 +437,6 @@ class TTSService(AIService):
if has_started:
await self.push_frame(TTSStoppedFrame())
has_started = False
finally:
self.reset_watchdog()
class WordTTSService(TTSService):
@@ -526,7 +523,7 @@ class WordTTSService(TTSService):
def _create_words_task(self):
if not self._words_task:
self._words_queue = WatchdogQueue(self.task_manager)
self._words_queue = asyncio.Queue()
self._words_task = self.create_task(self._words_task_handler())
async def _stop_words_task(self):
@@ -816,13 +813,12 @@ class AudioContextWordTTSService(WebsocketWordTTSService):
def _create_audio_context_task(self):
if not self._audio_context_task:
self._contexts_queue = WatchdogQueue(self.task_manager)
self._contexts_queue = asyncio.Queue()
self._contexts: Dict[str, asyncio.Queue] = {}
self._audio_context_task = self.create_task(self._audio_context_task_handler())
async def _stop_audio_context_task(self):
if self._audio_context_task:
self._contexts_queue.cancel()
await self.cancel_task(self._audio_context_task)
self._audio_context_task = None
@@ -859,12 +855,10 @@ class AudioContextWordTTSService(WebsocketWordTTSService):
while running:
try:
frame = await asyncio.wait_for(queue.get(), timeout=AUDIO_CONTEXT_TIMEOUT)
self.reset_watchdog()
if frame:
await self.push_frame(frame)
running = frame is not None
except asyncio.TimeoutError:
self.reset_watchdog()
# We didn't get audio, so let's consider this context finished.
logger.trace(f"{self} time out on audio context {context_id}")
break

View File

@@ -49,7 +49,6 @@ from pipecat.frames.frames import (
from pipecat.metrics.metrics import MetricsData
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.base_transport import TransportParams
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
AUDIO_INPUT_TIMEOUT_SECS = 0.5
@@ -396,7 +395,7 @@ class BaseInputTransport(FrameProcessor):
def _create_audio_task(self):
"""Create the audio processing task if audio input is enabled."""
if not self._audio_task and self._params.audio_in_enabled:
self._audio_in_queue = WatchdogQueue(self.task_manager)
self._audio_in_queue = asyncio.Queue()
self._audio_task = self.create_task(self._audio_task_handler())
async def _cancel_audio_task(self):
@@ -506,8 +505,6 @@ class BaseInputTransport(FrameProcessor):
if self._params.turn_analyzer:
self._params.turn_analyzer.clear()
await self._handle_user_interruption(UserStoppedSpeakingFrame())
finally:
self.reset_watchdog()
async def _handle_prediction_result(self, result: MetricsData):
"""Handle a prediction result event from the turn analyzer."""

View File

@@ -46,7 +46,6 @@ from pipecat.frames.frames import (
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.base_transport import TransportParams
from pipecat.utils.asyncio.watchdog_priority_queue import WatchdogPriorityQueue
from pipecat.utils.time import nanoseconds_to_seconds
BOT_VAD_STOP_SECS = 0.35
@@ -626,10 +625,8 @@ class BaseOutputTransport(FrameProcessor):
frame = await asyncio.wait_for(
self._audio_queue.get(), timeout=vad_stop_secs
)
self._transport.reset_watchdog()
yield frame
except asyncio.TimeoutError:
self._transport.reset_watchdog()
# Notify the bot stopped speaking upstream if necessary.
await self._bot_stopped_speaking()
@@ -639,13 +636,11 @@ class BaseOutputTransport(FrameProcessor):
while True:
try:
frame = self._audio_queue.get_nowait()
self._transport.reset_watchdog()
if isinstance(frame, OutputAudioRawFrame):
frame.audio = await self._mixer.mix(frame.audio)
last_frame_time = time.time()
yield frame
except asyncio.QueueEmpty:
self._transport.reset_watchdog()
# Notify the bot stopped speaking upstream if necessary.
diff_time = time.time() - last_frame_time
if diff_time > vad_stop_secs:
@@ -827,15 +822,12 @@ class BaseOutputTransport(FrameProcessor):
def _create_clock_task(self):
"""Create the clock/timing processing task."""
if not self._clock_task:
self._clock_queue = WatchdogPriorityQueue(
self._transport.task_manager, tuple_size=3
)
self._clock_queue = asyncio.PriorityQueue()
self._clock_task = self._transport.create_task(self._clock_task_handler())
async def _cancel_clock_task(self):
"""Cancel and cleanup the clock processing task."""
if self._clock_task:
self._clock_queue.cancel()
await self._transport.cancel_task(self._clock_task)
self._clock_task = None

View File

@@ -37,7 +37,6 @@ from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializer
from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
try:
from fastapi import WebSocket
@@ -283,9 +282,7 @@ class FastAPIWebsocketInputTransport(BaseInputTransport):
async def _receive_messages(self):
"""Main message receiving loop for WebSocket messages."""
try:
async for message in WatchdogAsyncIterator(
self._client.receive(), manager=self.task_manager
):
async for message in self._client.receive():
if not self._params.serializer:
continue

View File

@@ -40,7 +40,6 @@ from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
try:
import cv2
@@ -586,9 +585,7 @@ class SmallWebRTCInputTransport(BaseInputTransport):
"""Background task for receiving audio frames from WebRTC."""
try:
audio_iterator = self._client.read_audio_frame()
async for audio_frame in WatchdogAsyncIterator(
audio_iterator, manager=self.task_manager
):
async for audio_frame in audio_iterator:
if audio_frame:
await self.push_audio_frame(audio_frame)
@@ -599,9 +596,7 @@ class SmallWebRTCInputTransport(BaseInputTransport):
"""Background task for receiving video frames from WebRTC."""
try:
video_iterator = self._client.read_video_frame()
async for video_frame in WatchdogAsyncIterator(
video_iterator, manager=self.task_manager
):
async for video_frame in video_iterator:
if video_frame:
await self.push_video_frame(video_frame)

View File

@@ -49,7 +49,6 @@ from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
try:
from daily import (
@@ -358,7 +357,6 @@ class DailyTransportClient(EventHandler):
self._leave_counter = 0
self._task_manager: Optional[BaseTaskManager] = None
self._watchdog_timers_enabled = False
# We use the executor to cleanup the client. We just do it from one
# place, so only one thread is really needed.
@@ -527,9 +525,8 @@ class DailyTransportClient(EventHandler):
return
self._task_manager = setup.task_manager
self._watchdog_timers_enabled = setup.watchdog_timers_enabled
self._event_queue = WatchdogQueue(self._task_manager)
self._event_queue = asyncio.Queue()
self._event_task = self._task_manager.create_task(
self._callback_task_handler(self._event_queue),
f"{self}::event_callback_task",
@@ -561,7 +558,7 @@ class DailyTransportClient(EventHandler):
if self._params.audio_in_enabled:
if self._params.audio_in_user_tracks and not self._audio_task and self._task_manager:
self._audio_queue = WatchdogQueue(self._task_manager)
self._audio_queue = asyncio.Queue()
self._audio_task = self._task_manager.create_task(
self._callback_task_handler(self._audio_queue),
f"{self}::audio_callback_task",
@@ -576,7 +573,7 @@ class DailyTransportClient(EventHandler):
Daily.select_speaker_device(self._speaker_name())
if self._params.video_in_enabled and not self._video_task and self._task_manager:
self._video_queue = WatchdogQueue(self._task_manager)
self._video_queue = asyncio.Queue()
self._video_task = self._task_manager.create_task(
self._callback_task_handler(self._video_queue),
f"{self}::video_callback_task",

View File

@@ -35,7 +35,6 @@ from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
try:
from livekit import rtc
@@ -599,7 +598,7 @@ class LiveKitInputTransport(BaseInputTransport):
"""Handle incoming audio frames from participants."""
logger.info("Audio input task started")
audio_iterator = self._client.get_next_audio_frame()
async for audio_data in WatchdogAsyncIterator(audio_iterator, manager=self.task_manager):
async for audio_data in audio_iterator:
if audio_data:
audio_frame_event, participant_id = audio_data
pipecat_audio_frame = await self._convert_livekit_audio_to_pipecat(

View File

@@ -4,24 +4,20 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Asyncio task management with watchdog monitoring capabilities.
"""Asyncio task management.
This module provides task management functionality with optional watchdog timers
to monitor task execution and prevent hanging operations. Includes both abstract
base classes and concrete implementations for managing asyncio tasks with
This module provides task management functionality. Includes both abstract base
classes and concrete implementations for managing asyncio tasks with
comprehensive monitoring and cleanup capabilities.
"""
import asyncio
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Coroutine, Dict, Optional, Sequence
from loguru import logger
WATCHDOG_TIMEOUT = 5.0
@dataclass
class TaskManagerParams:
@@ -29,22 +25,15 @@ class TaskManagerParams:
Parameters:
loop: The asyncio event loop to use for task management.
enable_watchdog_timers: Whether to enable watchdog timers for tasks.
enable_watchdog_logging: Whether to log watchdog timing information.
watchdog_timeout: Default timeout in seconds for watchdog timers.
"""
loop: asyncio.AbstractEventLoop
enable_watchdog_timers: bool = False
enable_watchdog_logging: bool = False
watchdog_timeout: float = WATCHDOG_TIMEOUT
class BaseTaskManager(ABC):
"""Abstract base class for asyncio task management with watchdog support.
"""Abstract base class for asyncio task management.
Provides the interface for creating, monitoring, and managing asyncio tasks
with optional watchdog timer functionality to detect stalled operations.
Provides the interface for creating, monitoring, and managing asyncio tasks.
"""
@abstractmethod
@@ -66,15 +55,7 @@ class BaseTaskManager(ABC):
pass
@abstractmethod
def create_task(
self,
coroutine: Coroutine,
name: str,
*,
enable_watchdog_logging: Optional[bool] = None,
enable_watchdog_timers: Optional[bool] = None,
watchdog_timeout: Optional[float] = None,
) -> asyncio.Task:
def create_task(self, coroutine: Coroutine, name: str) -> asyncio.Task:
"""Creates and schedules a new asyncio Task that runs the given coroutine.
The task is added to a global set of created tasks.
@@ -82,9 +63,6 @@ class BaseTaskManager(ABC):
Args:
coroutine: The coroutine to be executed within the task.
name: The name to assign to the task for identification.
enable_watchdog_logging: Whether this task should log watchdog processing times.
enable_watchdog_timers: Whether this task should have a watchdog timer.
watchdog_timeout: Watchdog timer timeout for this task.
Returns:
The created task object.
@@ -128,53 +106,24 @@ class BaseTaskManager(ABC):
"""
pass
@abstractmethod
def task_reset_watchdog(self):
"""Task reset watchdog timer.
Resets the running task watchdog timer. If not reset, a warning will
be logged indicating the task is stalling.
"""
pass
@property
@abstractmethod
def task_watchdog_enabled(self) -> bool:
"""Whether the current running task has a watchdog timer enabled.
Returns:
True if the current task has watchdog monitoring active.
"""
pass
@dataclass
class TaskData:
"""Internal data structure for tracking task metadata and watchdog state.
"""Internal data structure for tracking task metadata.
Parameters:
task: The asyncio Task being managed.
watchdog_timer: Event used to reset the watchdog timer.
enable_watchdog_logging: Whether to log watchdog timing information.
enable_watchdog_timers: Whether watchdog timers are enabled for this task.
watchdog_timeout: Timeout in seconds for watchdog warnings.
watchdog_task: Optional background task monitoring the watchdog timer.
"""
task: asyncio.Task
watchdog_timer: asyncio.Event
enable_watchdog_logging: bool
enable_watchdog_timers: bool
watchdog_timeout: float
watchdog_task: Optional[asyncio.Task]
class TaskManager(BaseTaskManager):
"""Concrete implementation of BaseTaskManager with full watchdog support.
"""Concrete implementation of BaseTaskManager.
Manages asyncio tasks. Provides comprehensive task lifecycle management
including creation, monitoring, cancellation, and cleanup.
Manages asyncio tasks with optional watchdog monitoring to detect stalled
operations. Provides comprehensive task lifecycle management including
creation, monitoring, cancellation, and cleanup.
"""
def __init__(self) -> None:
@@ -204,15 +153,7 @@ class TaskManager(BaseTaskManager):
raise Exception("TaskManager is not setup: unable to get event loop")
return self._params.loop
def create_task(
self,
coroutine: Coroutine,
name: str,
*,
enable_watchdog_logging: Optional[bool] = None,
enable_watchdog_timers: Optional[bool] = None,
watchdog_timeout: Optional[float] = None,
) -> asyncio.Task:
def create_task(self, coroutine: Coroutine, name: str) -> asyncio.Task:
"""Creates and schedules a new asyncio Task that runs the given coroutine.
The task is added to a global set of created tasks.
@@ -220,9 +161,6 @@ class TaskManager(BaseTaskManager):
Args:
coroutine: The coroutine to be executed within the task.
name: The name to assign to the task for identification.
enable_watchdog_logging: Whether this task should log watchdog processing time.
enable_watchdog_timers: Whether this task should have a watchdog timer.
watchdog_timeout: Watchdog timer timeout for this task.
Returns:
The created task object.
@@ -247,26 +185,7 @@ class TaskManager(BaseTaskManager):
task = self._params.loop.create_task(run_coroutine())
task.set_name(name)
task.add_done_callback(self._task_done_handler)
self._add_task(
TaskData(
task=task,
watchdog_timer=asyncio.Event(),
enable_watchdog_logging=(
enable_watchdog_logging
if enable_watchdog_logging
else self._params.enable_watchdog_logging
),
enable_watchdog_timers=(
enable_watchdog_timers
if enable_watchdog_timers
else self._params.enable_watchdog_timers
),
watchdog_timeout=(
watchdog_timeout if watchdog_timeout else self._params.watchdog_timeout
),
watchdog_task=None,
),
)
self._add_task(TaskData(task=task))
logger.trace(f"{name}: task created")
return task
@@ -312,8 +231,6 @@ class TaskManager(BaseTaskManager):
name = task.get_name()
task.cancel()
try:
# Make sure to reset watchdog if a task is cancelled.
self.reset_watchdog(task)
if timeout:
await asyncio.wait_for(task, timeout=timeout)
else:
@@ -329,16 +246,6 @@ class TaskManager(BaseTaskManager):
logger.critical(f"{name}: fatal base exception while cancelling task: {e}")
raise
def reset_watchdog(self, task: asyncio.Task):
"""Reset the watchdog timer for a specific task.
Args:
task: The task whose watchdog timer should be reset.
"""
name = task.get_name()
if name in self._tasks and self._tasks[name].enable_watchdog_timers:
self._tasks[name].watchdog_timer.set()
def current_tasks(self) -> Sequence[asyncio.Task]:
"""Returns the list of currently created/registered tasks.
@@ -347,99 +254,23 @@ class TaskManager(BaseTaskManager):
"""
return [data.task for data in self._tasks.values()]
def task_reset_watchdog(self):
"""Task reset watchdog timer.
Resets the running task watchdog timer. If not reset on time, a warning
will be logged indicating the task is stalling.
"""
task = asyncio.current_task()
if task:
self.reset_watchdog(task)
@property
def task_watchdog_enabled(self) -> bool:
"""Whether the current running task has a watchdog timer enabled.
Returns:
True if the current task has watchdog monitoring active.
"""
task = asyncio.current_task()
if not task:
return False
name = task.get_name()
return name in self._tasks and self._tasks[name].enable_watchdog_timers
def _add_task(self, task_data: TaskData):
"""Add a task to the internal registry and start watchdog if enabled.
"""Add a task to the internal registry.
Args:
task_data: The task data containing task and watchdog configuration.
task_data: The task metadata.
"""
name = task_data.task.get_name()
self._tasks[name] = task_data
if self._params and task_data.enable_watchdog_timers:
watchdog_task = self.get_event_loop().create_task(
self._watchdog_task_handler(task_data)
)
task_data.watchdog_task = watchdog_task
async def _watchdog_task_handler(self, task_data: TaskData):
"""Background task that monitors watchdog timer for a specific task.
Args:
task_data: The task data containing watchdog configuration.
"""
name = task_data.task.get_name()
timer = task_data.watchdog_timer
enable_watchdog_logging = task_data.enable_watchdog_logging
watchdog_timeout = task_data.watchdog_timeout
while True:
try:
if task_data.task.done():
logger.debug(f"{name}: task is already done, cancelling watchdog task.")
break
start_time = time.time()
await asyncio.wait_for(timer.wait(), timeout=watchdog_timeout)
total_time = time.time() - start_time
if enable_watchdog_logging:
logger.debug(f"{name} time between watchdog timer resets: {total_time:.20f}")
except asyncio.TimeoutError:
logger.warning(
f"{name}: task is taking too long {WATCHDOG_TIMEOUT} second(s) (forgot to reset watchdog?)"
)
finally:
timer.clear()
def _task_done_handler(self, task: asyncio.Task):
"""Handle task completion by cleaning up watchdog and removing from registry.
"""Handle task completion by removing the task from the registry.
Args:
task: The completed asyncio task.
"""
name = task.get_name()
try:
task_data = self._tasks[name]
if task_data.watchdog_task:
task_data.watchdog_task.cancel()
# In Python 3.10, simply calling task.cancel() looks like is not enough.
# Without this, some tasks appear that are never canceled.
# Python 3.12 handles this more gracefully, but we keep this for compatibility
# and to avoid "Task exception was never retrieved" warnings.
self.get_event_loop().create_task(
self._cleanup_watchdog(name, task_data.watchdog_task)
)
task_data.watchdog_task = None
del self._tasks[name]
except KeyError as e:
logger.trace(f"{name}: unable to remove task data (already removed?): {e}")
async def _cleanup_watchdog(self, name: str, watchdog_task: asyncio.Task):
try:
await watchdog_task
except asyncio.CancelledError:
pass
except Exception as e:
logger.warning(f"{name}: watchdog task raised exception: {e}")

View File

@@ -1,102 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Watchdog-enabled async iterator wrapper for task monitoring.
This module provides an async iterator wrapper that automatically resets
watchdog timers while waiting for iterator items, preventing false positive
watchdog timeouts during legitimate waiting periods.
"""
import asyncio
from typing import AsyncIterator, Optional
from pipecat.utils.asyncio.task_manager import BaseTaskManager
class WatchdogAsyncIterator:
"""Watchdog async iterator wrapper.
An asynchronous iterator that monitors activity and resets the current
task watchdog timer. This is necessary to avoid task watchdog timers to
expire while we are waiting to get an item from the iterator.
"""
def __init__(
self,
async_iterable,
*,
manager: BaseTaskManager,
timeout: float = 2.0,
):
"""Initialize the watchdog async iterator.
Args:
async_iterable: The async iterable to wrap with watchdog monitoring.
manager: The task manager for watchdog timer control.
timeout: Timeout in seconds between watchdog resets while waiting.
"""
self._async_iterable = async_iterable
self._manager = manager
self._timeout = timeout
self._iter: Optional[AsyncIterator] = None
self._current_anext_task: Optional[asyncio.Task] = None
def __aiter__(self):
"""Return self as the async iterator.
Returns:
This iterator instance.
"""
return self
async def __anext__(self):
"""Get the next item from the iterator with watchdog monitoring.
Returns:
The next item from the wrapped async iterator.
Raises:
StopAsyncIteration: When the iterator is exhausted.
"""
if not self._iter:
self._iter = await self._ensure_async_iterator(self._async_iterable)
if self._manager.task_watchdog_enabled:
return await self._watchdog_anext()
else:
return await self._iter.__anext__()
async def _watchdog_anext(self):
"""Get next item while periodically resetting watchdog timer."""
while True:
try:
if not self._current_anext_task:
self._current_anext_task = asyncio.create_task(self._iter.__anext__())
item = await asyncio.wait_for(
asyncio.shield(self._current_anext_task), timeout=self._timeout
)
self._manager.task_reset_watchdog()
# The task has finished, so we will create a new one for the
# next item.
self._current_anext_task = None
return item
except asyncio.TimeoutError:
self._manager.task_reset_watchdog()
except StopAsyncIteration:
self._current_anext_task = None
raise
async def _ensure_async_iterator(self, obj) -> AsyncIterator:
"""Ensure the object is an async iterator, awaiting if necessary."""
aiter = obj.__aiter__()
if asyncio.iscoroutine(aiter):
aiter = await aiter
return aiter

View File

@@ -1,87 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Watchdog-enabled coroutine wrapper for task monitoring.
This module provides a coroutine wrapper that automatically resets watchdog
timers while waiting for coroutine completion, preventing false positive
watchdog timeouts during legitimate operations.
"""
import asyncio
from typing import Optional
from pipecat.utils.asyncio.task_manager import BaseTaskManager
class WatchdogCoroutine:
"""Watchdog-enabled coroutine wrapper.
An asynchronous iterator that monitors activity and resets the current
task watchdog timer. This is necessary to avoid task watchdog timers to
expire while we are waiting to get an item from the iterator.
"""
def __init__(
self,
coroutine,
*,
manager: BaseTaskManager,
timeout: float = 2.0,
):
"""Initialize the watchdog coroutine wrapper.
Args:
coroutine: The coroutine to wrap with watchdog monitoring.
manager: The task manager for watchdog timer control.
timeout: Timeout in seconds between watchdog resets while waiting.
"""
self._coroutine = coroutine
self._manager = manager
self._timeout = timeout
self._current_coro_task: Optional[asyncio.Task] = None
async def __call__(self):
"""Execute the wrapped coroutine with watchdog monitoring."""
if self._manager.task_watchdog_enabled:
return await self._watchdog_call()
else:
return await self._coroutine
async def _watchdog_call(self):
"""Execute coroutine while periodically resetting watchdog timer."""
while True:
try:
if not self._current_coro_task:
self._current_coro_task = asyncio.create_task(self._coroutine)
result = await asyncio.wait_for(
asyncio.shield(self._current_coro_task), timeout=self._timeout
)
self._manager.task_reset_watchdog()
# The task has finished.
self._current_coro_task = None
return result
except asyncio.TimeoutError:
self._manager.task_reset_watchdog()
async def watchdog_coroutine(coroutine, *, manager: BaseTaskManager, timeout: float = 2.0):
"""Execute a coroutine with watchdog monitoring support.
Args:
coroutine: The coroutine to execute with watchdog monitoring.
manager: The task manager for watchdog timer control.
timeout: Timeout in seconds between watchdog resets while waiting.
Returns:
The result of the coroutine execution.
"""
watchdog_coro = WatchdogCoroutine(coroutine, manager=manager, timeout=timeout)
return await watchdog_coro()

View File

@@ -1,68 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Watchdog-enabled asyncio Event for task monitoring.
This module provides an asyncio Event subclass that automatically resets
watchdog timers while waiting for the event, preventing false positive
watchdog timeouts during legitimate waiting periods.
"""
import asyncio
from pipecat.utils.asyncio.task_manager import BaseTaskManager
class WatchdogEvent(asyncio.Event):
"""Watchdog-enabled asyncio Event.
An asynchronous event that resets the current task watchdog timer. This
is necessary to avoid task watchdog timers to expire while we are waiting on
the event.
"""
def __init__(
self,
manager: BaseTaskManager,
*,
timeout: float = 2.0,
) -> None:
"""Initialize the watchdog event.
Args:
manager: The task manager for watchdog timer control.
timeout: Timeout in seconds between watchdog resets while waiting.
"""
super().__init__()
self._manager = manager
self._timeout = timeout
async def wait(self):
"""Wait for the event to be set with watchdog monitoring.
Returns:
True when the event is set.
"""
if self._manager.task_watchdog_enabled:
return await self._watchdog_wait()
else:
return await super().wait()
async def _watchdog_wait(self):
"""Wait for event while periodically resetting watchdog timer."""
while True:
try:
await asyncio.wait_for(super().wait(), timeout=self._timeout)
self._manager.task_reset_watchdog()
return True
except asyncio.TimeoutError:
self._manager.task_reset_watchdog()
def clear(self):
"""Clear the event while resetting watchdog timer."""
if self._manager.task_watchdog_enabled:
self._manager.task_reset_watchdog()
super().clear()

View File

@@ -1,131 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Watchdog-enabled asyncio PriorityQueue for task monitoring.
This module provides an asyncio PriorityQueue subclass that automatically resets
watchdog timers while waiting for items, preventing false positive watchdog
timeouts during legitimate queue operations.
"""
import asyncio
from dataclasses import dataclass
from loguru import logger
from pipecat.utils.asyncio.task_manager import BaseTaskManager
@dataclass
class WatchdogPriorityCancelSentinel:
"""Sentinel object used in priority queues to force cancellation.
An instance of this class is typically inserted into a
`WatchdogPriorityQueue` to act as a high-priority marker asyncio task
cancellation.
"""
pass
class WatchdogPriorityQueue(asyncio.PriorityQueue):
"""Class for watchdog-enabled asyncio PriorityQueue.
An asynchronous priority queue that resets the current task watchdog
timer. This is necessary to avoid task watchdog timers to expire while we
are waiting to get an item from the queue.
This queue expects items to be tuples, with the actual payload stored
in the last element. All preceding elements are treated as numeric
priority fields. For example:
(0, 1, "foo")
The tuple length must be specified at creation time so the queue can
correctly construct special items, such as the watchdog cancel sentinel,
with the proper tuple structure.
"""
def __init__(
self,
manager: BaseTaskManager,
*,
tuple_size: int,
maxsize: int = 0,
timeout: float = 2.0,
) -> None:
"""Initialize the watchdog priority queue.
Args:
manager: The task manager for watchdog timer control.
tuple_size: The number of values in each inserted tuple.
maxsize: Maximum queue size. 0 means unlimited.
timeout: Timeout in seconds between watchdog resets while waiting.
"""
super().__init__(maxsize)
self._manager = manager
self._timeout = timeout
self._tuple_size = tuple_size
async def get(self):
"""Get an item from the queue with watchdog monitoring.
Returns:
The next item from the priority queue.
"""
if self._manager.task_watchdog_enabled:
get_result = await self._watchdog_get()
else:
get_result = await super().get()
# Value is always at the end of the tuple.
item = get_result[-1]
if isinstance(item, WatchdogPriorityCancelSentinel):
logger.trace(
"Received WatchdogPriorityCancelSentinel, throwing CancelledError to force cancelling"
)
raise asyncio.CancelledError("Cancelling watchdog queue get() call.")
else:
return get_result
def task_done(self):
"""Mark a task as done and reset watchdog if enabled.
Should be called after processing each item retrieved from the queue.
"""
if self._manager.task_watchdog_enabled:
self._manager.task_reset_watchdog()
super().task_done()
def cancel(self):
"""Ensures reliable task cancellation by preventing a common race condition.
The race condition occurs in Python 3.10+ when:
1. A value is put in the queue just before task cancellation
2. queue.get() completes before the cancellation signal is delivered
3. The task misses the CancelledError and continues running indefinitely
This method prevents the issue by injecting a special sentinel value that
forces the task to raise CancelledError when consumed, ensuring proper
task termination.
"""
item = [float("-inf")] * self._tuple_size
# Values go always at the end.
item[-1] = WatchdogPriorityCancelSentinel()
super().put_nowait(tuple(item))
async def _watchdog_get(self):
"""Get item from queue while periodically resetting watchdog timer."""
while True:
try:
item = await asyncio.wait_for(super().get(), timeout=self._timeout)
self._manager.task_reset_watchdog()
return item
except asyncio.TimeoutError:
self._manager.task_reset_watchdog()

View File

@@ -1,110 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Watchdog-enabled asyncio Queue for task monitoring.
This module provides an asyncio Queue subclass that automatically resets
watchdog timers while waiting for items, preventing false positive watchdog
timeouts during legitimate queue operations.
"""
import asyncio
from dataclasses import dataclass
from loguru import logger
from pipecat.utils.asyncio.task_manager import BaseTaskManager
@dataclass
class WatchdogQueueCancelSentinel:
"""Sentinel object used in queues to force cancellation.
An instance of this class is typically inserted into a `WatchdogQueue` to
act as a marker for asyncio task cancellation.
"""
pass
class WatchdogQueue(asyncio.Queue):
"""Watchdog-enabled asyncio Queue.
An asynchronous queue that resets the current task watchdog timer. This
is necessary to avoid task watchdog timers to expire while we are waiting to
get an item from the queue.
"""
def __init__(
self,
manager: BaseTaskManager,
*,
maxsize: int = 0,
timeout: float = 2.0,
) -> None:
"""Initialize the watchdog queue.
Args:
manager: The task manager for watchdog timer control.
maxsize: Maximum queue size. 0 means unlimited.
timeout: Timeout in seconds between watchdog resets while waiting.
"""
super().__init__(maxsize)
self._manager = manager
self._timeout = timeout
async def get(self):
"""Get an item from the queue with watchdog monitoring.
Returns:
The next item from the queue.
"""
if self._manager.task_watchdog_enabled:
get_result = await self._watchdog_get()
else:
get_result = await super().get()
if isinstance(get_result, WatchdogQueueCancelSentinel):
logger.trace(
"Received WatchdogQueueCancelFrame, throwing CancelledError to force cancelling"
)
raise asyncio.CancelledError("Cancelling watchdog queue get() call.")
else:
return get_result
def task_done(self):
"""Mark a task as done and reset watchdog if enabled.
Should be called after processing each item retrieved from the queue.
"""
if self._manager.task_watchdog_enabled:
self._manager.task_reset_watchdog()
super().task_done()
def cancel(self):
"""Ensures reliable task cancellation by preventing a common race condition.
The race condition occurs in Python 3.10+ when:
1. A value is put in the queue just before task cancellation
2. queue.get() completes before the cancellation signal is delivered
3. The task misses the CancelledError and continues running indefinitely
This method prevents the issue by injecting a special sentinel value that
forces the task to raise CancelledError when consumed, ensuring proper
task termination.
"""
super().put_nowait(WatchdogQueueCancelSentinel())
async def _watchdog_get(self):
"""Get item from queue while periodically resetting watchdog timer."""
while True:
try:
item = await asyncio.wait_for(super().get(), timeout=self._timeout)
self._manager.task_reset_watchdog()
return item
except asyncio.TimeoutError:
self._manager.task_reset_watchdog()

View File

@@ -1,65 +0,0 @@
#
# Copyright (c) 2024-2025 Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import unittest
from pipecat.utils.asyncio.task_manager import TaskManager
from pipecat.utils.asyncio.watchdog_priority_queue import WatchdogPriorityQueue
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
class TestWatchdogQueue(unittest.IsolatedAsyncioTestCase):
async def test_simple_item(self):
queue = WatchdogQueue(TaskManager())
await queue.put(1)
await queue.put(2)
await queue.put(3)
self.assertEqual(await queue.get(), 1)
queue.task_done()
self.assertEqual(await queue.get(), 2)
queue.task_done()
self.assertEqual(await queue.get(), 3)
queue.task_done()
async def test_watchdog_sentinel(self):
queue = WatchdogQueue(TaskManager())
await queue.put(1)
self.assertEqual(await queue.get(), 1)
queue.task_done()
# The get should throw an exception.
queue.cancel()
try:
await queue.get()
assert False
except asyncio.CancelledError:
assert True
class TestWatchdogPriorityQueue(unittest.IsolatedAsyncioTestCase):
async def test_simple_item(self):
queue = WatchdogPriorityQueue(TaskManager(), tuple_size=2)
await queue.put((3, 1))
await queue.put((2, 1))
await queue.put((1, 1))
self.assertEqual(await queue.get(), (1, 1))
queue.task_done()
self.assertEqual(await queue.get(), (2, 1))
queue.task_done()
self.assertEqual(await queue.get(), (3, 1))
queue.task_done()
async def test_watchdog_sentinel(self):
queue = WatchdogPriorityQueue(TaskManager(), tuple_size=2)
await queue.put((0, 1))
# The get should throw an exception because the watchdog sentinel has
# higher priority.
queue.cancel()
try:
await queue.get()
assert False
except asyncio.CancelledError:
assert True