utils(asyncio): simplify watchdog helpers

This commit is contained in:
Aleix Conchillo Flaqué
2025-06-26 09:29:33 -07:00
parent 4d34aa7cd6
commit 3de4f22d34
37 changed files with 126 additions and 184 deletions

View File

@@ -102,8 +102,8 @@ class ParallelPipeline(BasePipeline):
async def setup(self, setup: FrameProcessorSetup):
await super().setup(setup)
self._up_queue = WatchdogQueue(self, watchdog_enabled=setup.watchdog_timers_enabled)
self._down_queue = WatchdogQueue(self, watchdog_enabled=setup.watchdog_timers_enabled)
self._up_queue = WatchdogQueue(setup.task_manager)
self._down_queue = WatchdogQueue(setup.task_manager)
logger.debug(f"Creating {self} pipelines")
for processors in self._args:

View File

@@ -81,8 +81,8 @@ class SyncParallelPipeline(BasePipeline):
async def setup(self, setup: FrameProcessorSetup):
await super().setup(setup)
self._up_queue = WatchdogQueue(self, watchdog_enabled=setup.watchdog_timers_enabled)
self._down_queue = WatchdogQueue(self, watchdog_enabled=setup.watchdog_timers_enabled)
self._up_queue = WatchdogQueue(setup.task_manager)
self._down_queue = WatchdogQueue(setup.task_manager)
logger.debug(f"Creating {self} pipelines")
for processors in self._args:

View File

@@ -45,7 +45,6 @@ from pipecat.utils.asyncio.task_manager import (
TaskManagerParams,
)
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
from pipecat.utils.asyncio.watchdog_reseter import WatchdogReseter
from pipecat.utils.tracing.setup import is_tracing_available
from pipecat.utils.tracing.turn_trace_observer import TurnTraceObserver
@@ -138,7 +137,7 @@ class PipelineTaskSink(FrameProcessor):
await self._down_queue.put(frame)
class PipelineTask(WatchdogReseter, BasePipelineTask):
class PipelineTask(BasePipelineTask):
"""Manages the execution of a pipeline, handling frame processing and task lifecycle.
It has a couple of event handlers `on_frame_reached_upstream` and
@@ -270,24 +269,28 @@ class PipelineTask(WatchdogReseter, BasePipelineTask):
self._finished = False
self._cancelled = False
# This task maneger will handle all the asyncio tasks created by this
# PipelineTask and its frame processors.
self._task_manager = task_manager or TaskManager()
# This queue receives frames coming from the pipeline upstream.
self._up_queue = WatchdogQueue(self, watchdog_enabled=enable_watchdog_timers)
self._up_queue = WatchdogQueue(self._task_manager)
self._process_up_task: Optional[asyncio.Task] = None
# This queue receives frames coming from the pipeline downstream.
self._down_queue = WatchdogQueue(self, watchdog_enabled=enable_watchdog_timers)
self._down_queue = WatchdogQueue(self._task_manager)
self._process_down_task: Optional[asyncio.Task] = None
# This queue is the queue used to push frames to the pipeline.
self._push_queue = WatchdogQueue(self, watchdog_enabled=enable_watchdog_timers)
self._push_queue = WatchdogQueue(self._task_manager)
self._process_push_task: Optional[asyncio.Task] = None
# This is the heartbeat queue. When a heartbeat frame is received in the
# down queue we add it to the heartbeat queue for processing.
self._heartbeat_queue = WatchdogQueue(self, watchdog_enabled=enable_watchdog_timers)
self._heartbeat_queue = WatchdogQueue(self._task_manager)
self._heartbeat_push_task: Optional[asyncio.Task] = None
self._heartbeat_monitor_task: Optional[asyncio.Task] = None
# This is the idle queue. When frames are received downstream they are
# put in the queue. If no frame is received the pipeline is considered
# idle.
self._idle_queue = WatchdogQueue(self, watchdog_enabled=enable_watchdog_timers)
self._idle_queue = WatchdogQueue(self._task_manager)
self._idle_monitor_task: Optional[asyncio.Task] = None
# This event is used to indicate a finalize frame (e.g. EndFrame,
# StopFrame) has been received in the down queue.
@@ -305,10 +308,6 @@ class PipelineTask(WatchdogReseter, BasePipelineTask):
self._sink = PipelineTaskSink(self._down_queue)
pipeline.link(self._sink)
# This task maneger will handle all the asyncio tasks created by this
# PipelineTask and its frame processors.
self._task_manager = task_manager or TaskManager()
# The task observer acts as a proxy to the provided observers. This way,
# we only need to pass a single observer (using the StartFrame) which
# then just acts as a proxy.
@@ -440,9 +439,6 @@ class PipelineTask(WatchdogReseter, BasePipelineTask):
for frame in frames:
await self.queue_frame(frame)
def reset_watchdog(self):
self._task_manager.reset_watchdog(asyncio.current_task())
async def _cancel(self):
if not self._cancelled:
logger.debug(f"Canceling pipeline task {self}")

View File

@@ -13,7 +13,6 @@ from attr import dataclass
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
from pipecat.utils.asyncio.watchdog_reseter import WatchdogReseter
@dataclass
@@ -28,7 +27,7 @@ class Proxy:
observer: BaseObserver
class TaskObserver(WatchdogReseter, BaseObserver):
class TaskObserver(BaseObserver):
"""This is a pipeline frame observer that is meant to be used as a proxy to
the user provided observers. That is, this is the observer that should be
passed to the frame processors. Then, every time a frame is pushed this
@@ -54,7 +53,6 @@ class TaskObserver(WatchdogReseter, BaseObserver):
self._proxies: Optional[Dict[BaseObserver, Proxy]] = (
None # Becomes a dict after start() is called
)
self._watchdog_timers_enabled = False
def add_observer(self, observer: BaseObserver):
# Add the observer to the list.
@@ -81,7 +79,6 @@ class TaskObserver(WatchdogReseter, BaseObserver):
async def start(self, watchdog_timers_enabled: bool = False):
"""Starts all proxy observer tasks."""
self._watchdog_timers_enabled = watchdog_timers_enabled
self._proxies = self._create_proxies(self._observers)
async def stop(self):
@@ -96,14 +93,11 @@ class TaskObserver(WatchdogReseter, BaseObserver):
for proxy in self._proxies.values():
await proxy.queue.put(data)
def reset_watchdog(self):
self._task_manager.reset_watchdog(asyncio.current_task())
def _started(self) -> bool:
return self._proxies is not None
def _create_proxy(self, observer: BaseObserver) -> Proxy:
queue = WatchdogQueue(self, watchdog_enabled=self._watchdog_timers_enabled)
queue = WatchdogQueue(self._task_manager)
task = self._task_manager.create_task(
self._proxy_task_handler(queue, observer),
f"TaskObserver::{observer}::_proxy_task_handler",

View File

@@ -32,7 +32,7 @@ class ConsumerProcessor(FrameProcessor):
super().__init__(**kwargs)
self._transformer = transformer
self._direction = direction
self._queue: WatchdogQueue = producer.add_consumer(self)
self._producer = producer
self._consumer_task: Optional[asyncio.Task] = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
@@ -49,6 +49,7 @@ class ConsumerProcessor(FrameProcessor):
async def _start(self, _: StartFrame):
if not self._consumer_task:
self._queue: WatchdogQueue = self._producer.add_consumer()
self._consumer_task = self.create_task(self._consumer_task_handler())
async def _stop(self, _: EndFrame):

View File

@@ -32,7 +32,6 @@ from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMet
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.asyncio.watchdog_event import WatchdogEvent
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
from pipecat.utils.asyncio.watchdog_reseter import WatchdogReseter
from pipecat.utils.base_object import BaseObject
@@ -49,7 +48,7 @@ class FrameProcessorSetup:
watchdog_timers_enabled: bool = False
class FrameProcessor(WatchdogReseter, BaseObject):
class FrameProcessor(BaseObject):
def __init__(
self,
*,
@@ -89,7 +88,6 @@ class FrameProcessor(WatchdogReseter, BaseObject):
self._enable_usage_metrics = False
self._report_only_initial_ttfb = False
self._interruption_strategies: List[BaseInterruptionStrategy] = []
self._watchdog_timers_enabled = False
# Indicates whether we have received the StartFrame.
self.__started = False
@@ -147,8 +145,10 @@ class FrameProcessor(WatchdogReseter, BaseObject):
return self._interruption_strategies
@property
def watchdog_timers_enabled(self):
return self._watchdog_timers_enabled
def task_manager(self) -> BaseTaskManager:
if not self._task_manager:
raise Exception(f"{self} TaskManager is still not initialized.")
return self._task_manager
def can_generate_metrics(self) -> bool:
return False
@@ -205,7 +205,7 @@ class FrameProcessor(WatchdogReseter, BaseObject):
name = f"{self}::{name}"
else:
name = f"{self}::{coroutine.cr_code.co_name}"
return self.get_task_manager().create_task(
return self.task_manager.create_task(
coroutine,
name,
enable_watchdog_logging=(
@@ -214,7 +214,7 @@ class FrameProcessor(WatchdogReseter, BaseObject):
else self._enable_watchdog_logging
),
enable_watchdog_timers=(
enable_watchdog_timers if enable_watchdog_timers else self.watchdog_timers_enabled
enable_watchdog_timers if enable_watchdog_timers else self._enable_watchdog_timers
),
watchdog_timeout=(
watchdog_timeout_secs if watchdog_timeout_secs else self._watchdog_timeout_secs
@@ -222,13 +222,13 @@ class FrameProcessor(WatchdogReseter, BaseObject):
)
async def cancel_task(self, task: asyncio.Task, timeout: Optional[float] = None):
await self.get_task_manager().cancel_task(task, timeout)
await self.task_manager.cancel_task(task, timeout)
async def wait_for_task(self, task: asyncio.Task, timeout: Optional[float] = None):
await self.get_task_manager().wait_for_task(task, timeout)
await self.task_manager.wait_for_task(task, timeout)
def reset_watchdog(self):
self.get_task_manager().reset_watchdog(asyncio.current_task())
self.task_manager.task_reset_watchdog()
async def setup(self, setup: FrameProcessorSetup):
self._clock = setup.clock
@@ -240,7 +240,7 @@ class FrameProcessor(WatchdogReseter, BaseObject):
else setup.watchdog_timers_enabled
)
if self._metrics is not None:
await self._metrics.setup(self._task_manager, self._watchdog_timers_enabled)
await self._metrics.setup(self._task_manager)
async def cleanup(self):
await super().cleanup()
@@ -255,7 +255,7 @@ class FrameProcessor(WatchdogReseter, BaseObject):
logger.debug(f"Linking {self} -> {self._next}")
def get_event_loop(self) -> asyncio.AbstractEventLoop:
return self.get_task_manager().get_event_loop()
return self.task_manager.get_event_loop()
def set_parent(self, parent: "FrameProcessor"):
self._parent = parent
@@ -268,11 +268,6 @@ class FrameProcessor(WatchdogReseter, BaseObject):
raise Exception(f"{self} Clock is still not initialized.")
return self._clock
def get_task_manager(self) -> BaseTaskManager:
if not self._task_manager:
raise Exception(f"{self} TaskManager is still not initialized.")
return self._task_manager
async def queue_frame(
self,
frame: Frame,
@@ -417,11 +412,9 @@ class FrameProcessor(WatchdogReseter, BaseObject):
if not self.__input_frame_task:
self.__should_block_frames = False
if not self.__input_event:
self.__input_event = WatchdogEvent(
self, watchdog_enabled=self.watchdog_timers_enabled
)
self.__input_event = WatchdogEvent(self.task_manager)
self.__input_event.clear()
self.__input_queue = WatchdogQueue(self, watchdog_enabled=self.watchdog_timers_enabled)
self.__input_queue = WatchdogQueue(self.task_manager)
self.__input_frame_task = self.create_task(self.__input_frame_task_handler())
async def __cancel_input_task(self):
@@ -453,7 +446,7 @@ class FrameProcessor(WatchdogReseter, BaseObject):
def __create_push_task(self):
if not self.__push_frame_task:
self.__push_queue = WatchdogQueue(self, watchdog_enabled=self.watchdog_timers_enabled)
self.__push_queue = WatchdogQueue(self.task_manager)
self.__push_frame_task = self.create_task(self.__push_frame_task_handler())
async def __cancel_push_task(self):

View File

@@ -755,10 +755,10 @@ class RTVIProcessor(FrameProcessor):
async def _start(self, frame: StartFrame):
if not self._action_task:
self._action_queue = WatchdogQueue(self, watchdog_enabled=self.watchdog_timers_enabled)
self._action_queue = WatchdogQueue(self.task_manager)
self._action_task = self.create_task(self._action_task_handler())
if not self._message_task:
self._message_queue = WatchdogQueue(self, watchdog_enabled=self.watchdog_timers_enabled)
self._message_queue = WatchdogQueue(self.task_manager)
self._message_task = self.create_task(self._message_task_handler())
await self._call_event_handler("on_bot_started")

View File

@@ -18,7 +18,7 @@ from pipecat.metrics.metrics import (
TTFBMetricsData,
TTSUsageMetricsData,
)
from pipecat.utils.asyncio.task_manager import TaskManager
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.base_object import BaseObject
@@ -31,14 +31,14 @@ class FrameProcessorMetrics(BaseObject):
self._last_ttfb_time = 0
self._should_report_ttfb = True
async def setup(self, task_manager: TaskManager, watchdog_timers_enabled: bool = False):
async def setup(self, task_manager: BaseTaskManager):
self._task_manager = task_manager
async def cleanup(self):
await super().cleanup()
@property
def task_manager(self) -> TaskManager:
def task_manager(self) -> BaseTaskManager:
return self._task_manager
@property

View File

@@ -8,9 +8,8 @@ import asyncio
from loguru import logger
from pipecat.utils.asyncio.task_manager import TaskManager
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
from pipecat.utils.asyncio.watchdog_reseter import WatchdogReseter
try:
import sentry_sdk
@@ -22,7 +21,7 @@ except ModuleNotFoundError as e:
from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics
class SentryMetrics(WatchdogReseter, FrameProcessorMetrics):
class SentryMetrics(FrameProcessorMetrics):
def __init__(self):
super().__init__()
self._ttfb_metrics_tx = None
@@ -32,10 +31,10 @@ class SentryMetrics(WatchdogReseter, FrameProcessorMetrics):
logger.warning("Sentry SDK not initialized. Sentry features will be disabled.")
self._sentry_task = None
async def setup(self, task_manager: TaskManager, watchdog_timers_enabled: bool = False):
await super().setup(task_manager, watchdog_timers_enabled)
async def setup(self, task_manager: BaseTaskManager):
await super().setup(task_manager)
if self._sentry_available:
self._sentry_queue = WatchdogQueue(self, watchdog_enabled=watchdog_timers_enabled)
self._sentry_queue = WatchdogQueue(task_manager)
self._sentry_task = self.task_manager.create_task(
self._sentry_task_handler(), name=f"{self}::_sentry_task_handler"
)
@@ -49,10 +48,6 @@ class SentryMetrics(WatchdogReseter, FrameProcessorMetrics):
logger.trace(f"{self} Flushing Sentry metrics")
sentry_sdk.flush(timeout=5.0)
def reset_watchdog(self):
if self._task_manager:
self._task_manager.reset_watchdog(asyncio.current_task())
async def start_ttfb_metrics(self, report_only_initial_ttfb):
await super().start_ttfb_metrics(report_only_initial_ttfb)

View File

@@ -37,14 +37,14 @@ class ProducerProcessor(FrameProcessor):
self._passthrough = passthrough
self._consumers: List[asyncio.Queue] = []
def add_consumer(self, consumer: FrameProcessor):
def add_consumer(self):
"""
Adds a new consumer and returns its associated queue.
Returns:
asyncio.Queue: The queue for the newly added consumer.
"""
queue = WatchdogQueue(consumer, watchdog_enabled=self.watchdog_timers_enabled)
queue = WatchdogQueue(self.task_manager)
self._consumers.append(queue)
return queue

View File

@@ -204,9 +204,7 @@ class AnthropicLLMService(LLMService):
json_accumulator = ""
function_calls = []
async for event in WatchdogAsyncIterator(
response, reseter=self, watchdog_enabled=self.watchdog_timers_enabled
):
async for event in WatchdogAsyncIterator(response, manager=self.task_manager):
# Aggregate streaming content, create frames, trigger events
if event.type == "content_block_delta":

View File

@@ -329,7 +329,7 @@ class CartesiaTTSService(AudioContextWordTTSService):
async def _receive_messages(self):
async for message in WatchdogAsyncIterator(
self._get_websocket(), reseter=self, watchdog_enabled=self.watchdog_timers_enabled
self._get_websocket(), manager=self.task_manager
):
msg = json.loads(message)
if not msg or not self.audio_context_available(msg["context_id"]):

View File

@@ -396,7 +396,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
async def _receive_messages(self):
async for message in WatchdogAsyncIterator(
self._get_websocket(), reseter=self, watchdog_enabled=self.watchdog_timers_enabled
self._get_websocket(), manager=self.task_manager
):
msg = json.loads(message)

View File

@@ -687,9 +687,7 @@ class GeminiMultimodalLiveLLMService(LLMService):
#
async def _receive_task_handler(self):
async for message in WatchdogAsyncIterator(
self._websocket, reseter=self, watchdog_enabled=self.watchdog_timers_enabled
):
async for message in WatchdogAsyncIterator(self._websocket, manager=self.task_manager):
evt = events.parse_server_event(message)
# logger.debug(f"Received event: {message[:500]}")
# logger.debug(f"Received event: {evt}")

View File

@@ -504,9 +504,7 @@ class GladiaSTTService(STTService):
async def _receive_task_handler(self):
try:
async for message in WatchdogAsyncIterator(
self._websocket, reseter=self, watchdog_enabled=self.watchdog_timers_enabled
):
async for message in WatchdogAsyncIterator(self._websocket, manager=self.task_manager):
content = json.loads(message)
# Handle audio chunk acknowledgments

View File

@@ -558,9 +558,7 @@ class GoogleLLMService(LLMService):
)
function_calls = []
async for chunk in WatchdogAsyncIterator(
response, reseter=self, watchdog_enabled=self.watchdog_timers_enabled
):
async for chunk in WatchdogAsyncIterator(response, manager=self.task_manager):
# Stop TTFB metrics after the first chunk
await self.stop_ttfb_metrics()
if chunk.usage_metadata:

View File

@@ -54,9 +54,7 @@ class GoogleLLMOpenAIBetaService(OpenAILLMService):
context
)
async for chunk in WatchdogAsyncIterator(
chunk_stream, reseter=self, watchdog_enabled=self.watchdog_timers_enabled
):
async for chunk in WatchdogAsyncIterator(chunk_stream, manager=self.task_manager):
if chunk.usage:
tokens = LLMTokenUsage(
prompt_tokens=chunk.usage.prompt_tokens,

View File

@@ -785,7 +785,7 @@ class GoogleSTTService(STTService):
"""Process streaming recognition responses."""
try:
async for response in WatchdogAsyncIterator(
streaming_recognize, reseter=self, watchdog_enabled=self.watchdog_timers_enabled
streaming_recognize, manager=self.task_manager
):
# Check streaming limit
if (int(time.time() * 1000) - self._stream_start_time) > self.STREAMING_LIMIT:

View File

@@ -222,9 +222,7 @@ class NeuphonicTTSService(InterruptibleTTSService):
self._websocket = None
async def _receive_messages(self):
async for message in WatchdogAsyncIterator(
self._websocket, reseter=self, watchdog_enabled=self.watchdog_timers_enabled
):
async for message in WatchdogAsyncIterator(self._websocket, manager=self.task_manager):
if isinstance(message, str):
msg = json.loads(message)
if msg.get("data", {}).get("audio") is not None:

View File

@@ -245,9 +245,7 @@ class BaseOpenAILLMService(LLMService):
context
)
async for chunk in WatchdogAsyncIterator(
chunk_stream, reseter=self, watchdog_enabled=self.watchdog_timers_enabled
):
async for chunk in WatchdogAsyncIterator(chunk_stream, manager=self.task_manager):
if chunk.usage:
tokens = LLMTokenUsage(
prompt_tokens=chunk.usage.prompt_tokens,

View File

@@ -370,9 +370,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
#
async def _receive_task_handler(self):
async for message in WatchdogAsyncIterator(
self._websocket, reseter=self, watchdog_enabled=self.watchdog_timers_enabled
):
async for message in WatchdogAsyncIterator(self._websocket, manager=self.task_manager):
evt = events.parse_server_event(message)
if evt.type == "session.created":
await self._handle_evt_session_created(evt)

View File

@@ -199,9 +199,7 @@ class RivaSTTService(STTService):
self._thread_task = self.create_task(self._thread_task_handler())
if not self._response_task:
self._response_queue = WatchdogQueue(
self, watchdog_enabled=self.watchdog_timers_enabled
)
self._response_queue = WatchdogQueue(self.task_manager)
self._response_task = self.create_task(self._response_task_handler())
async def stop(self, frame: EndFrame):

View File

@@ -95,9 +95,7 @@ class SambaNovaLLMService(OpenAILLMService): # type: ignore
context
)
async for chunk in WatchdogAsyncIterator(
chunk_stream, reseter=self, watchdog_enabled=self.watchdog_timers_enabled
):
async for chunk in WatchdogAsyncIterator(chunk_stream, manager=self.task_manager):
if chunk.usage:
tokens = LLMTokenUsage(
prompt_tokens=chunk.usage.prompt_tokens,

View File

@@ -63,9 +63,7 @@ class SimliVideoService(FrameProcessor):
async def _consume_and_process_audio(self):
await self._pipecat_resampler_event.wait()
audio_iterator = self._simli_client.getAudioStreamIterator()
async for audio_frame in WatchdogAsyncIterator(
audio_iterator, reseter=self, watchdog_enabled=self.watchdog_timers_enabled
):
async for audio_frame in WatchdogAsyncIterator(audio_iterator, manager=self.task_manager):
resampled_frames = self._pipecat_resampler.resample(audio_frame)
for resampled_frame in resampled_frames:
audio_array = resampled_frame.to_ndarray()
@@ -82,9 +80,7 @@ class SimliVideoService(FrameProcessor):
async def _consume_and_process_video(self):
await self._pipecat_resampler_event.wait()
video_iterator = self._simli_client.getVideoStreamIterator(targetFormat="rgb24")
async for video_frame in WatchdogAsyncIterator(
video_iterator, reseter=self, watchdog_enabled=self.watchdog_timers_enabled
):
async for video_frame in WatchdogAsyncIterator(video_iterator, manager=self.task_manager):
# Process the video frame
convertedFrame: OutputImageRawFrame = OutputImageRawFrame(
image=video_frame.to_rgb().to_image().tobytes(),

View File

@@ -188,7 +188,7 @@ class TavusVideoService(AIService):
async def _create_send_task(self):
if not self._send_task:
self._queue = WatchdogQueue(self, watchdog_enabled=self.watchdog_timers_enabled)
self._queue = WatchdogQueue(self.task_manager)
self._send_task = self.create_task(self._send_task_handler())
async def _cancel_send_task(self):

View File

@@ -505,7 +505,7 @@ class WordTTSService(TTSService):
def _create_words_task(self):
if not self._words_task:
self._words_queue = WatchdogQueue(self, watchdog_enabled=self.watchdog_timers_enabled)
self._words_queue = WatchdogQueue(self.task_manager)
self._words_task = self.create_task(self._words_task_handler())
async def _stop_words_task(self):
@@ -787,9 +787,7 @@ class AudioContextWordTTSService(WebsocketWordTTSService):
def _create_audio_context_task(self):
if not self._audio_context_task:
self._contexts_queue = WatchdogQueue(
self, watchdog_enabled=self.watchdog_timers_enabled
)
self._contexts_queue = WatchdogQueue(self.task_manager)
self._contexts: Dict[str, asyncio.Queue] = {}
self._audio_context_task = self.create_task(self._audio_context_task_handler())

View File

@@ -602,9 +602,7 @@ class BaseOutputTransport(FrameProcessor):
def _create_clock_task(self):
if not self._clock_task:
self._clock_queue = WatchdogPriorityQueue(
self._transport, watchdog_enabled=self._transport.watchdog_timers_enabled
)
self._clock_queue = WatchdogPriorityQueue(self._transport.task_manager)
self._clock_task = self._transport.create_task(self._clock_task_handler())
async def _cancel_clock_task(self):

View File

@@ -180,7 +180,7 @@ class FastAPIWebsocketInputTransport(BaseInputTransport):
async def _receive_messages(self):
try:
async for message in WatchdogAsyncIterator(
self._client.receive(), reseter=self, watchdog_enabled=self.watchdog_timers_enabled
self._client.receive(), manager=self.task_manager
):
if not self._params.serializer:
continue

View File

@@ -425,7 +425,7 @@ class SmallWebRTCInputTransport(BaseInputTransport):
try:
audio_iterator = self._client.read_audio_frame()
async for audio_frame in WatchdogAsyncIterator(
audio_iterator, reseter=self, watchdog_enabled=self.watchdog_timers_enabled
audio_iterator, manager=self.task_manager
):
if audio_frame:
await self.push_audio_frame(audio_frame)
@@ -437,7 +437,7 @@ class SmallWebRTCInputTransport(BaseInputTransport):
try:
video_iterator = self._client.read_video_frame()
async for video_frame in WatchdogAsyncIterator(
video_iterator, reseter=self, watchdog_enabled=self.watchdog_timers_enabled
video_iterator, manager=self.task_manager
):
if video_frame:
await self.push_video_frame(video_frame)

View File

@@ -41,7 +41,6 @@ from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
from pipecat.utils.asyncio.watchdog_reseter import WatchdogReseter
try:
from daily import (
@@ -252,7 +251,7 @@ class DailyAudioTrack:
track: CustomAudioTrack
class DailyTransportClient(WatchdogReseter, EventHandler):
class DailyTransportClient(EventHandler):
"""Core client for interacting with Daily's API.
Manages the connection to Daily rooms and handles all low-level API interactions.
@@ -395,10 +394,6 @@ class DailyTransportClient(WatchdogReseter, EventHandler):
if not frame.transport_destination and self._camera:
self._camera.write_frame(frame.image)
def reset_watchdog(self):
if self._task_manager:
self._task_manager.reset_watchdog(asyncio.current_task())
async def setup(self, setup: FrameProcessorSetup):
if self._task_manager:
return
@@ -406,7 +401,7 @@ class DailyTransportClient(WatchdogReseter, EventHandler):
self._task_manager = setup.task_manager
self._watchdog_timers_enabled = setup.watchdog_timers_enabled
self._event_queue = WatchdogQueue(self, watchdog_enabled=self._watchdog_timers_enabled)
self._event_queue = WatchdogQueue(self._task_manager)
self._event_task = self._task_manager.create_task(
self._callback_task_handler(self._event_queue),
f"{self}::event_callback_task",
@@ -431,14 +426,14 @@ class DailyTransportClient(WatchdogReseter, EventHandler):
self._out_sample_rate = self._params.audio_out_sample_rate or frame.audio_out_sample_rate
if self._params.audio_in_enabled and not self._audio_task and self._task_manager:
self._audio_queue = WatchdogQueue(self, watchdog_enabled=self._watchdog_timers_enabled)
self._audio_queue = WatchdogQueue(self._task_manager)
self._audio_task = self._task_manager.create_task(
self._callback_task_handler(self._audio_queue),
f"{self}::audio_callback_task",
)
if self._params.video_in_enabled and not self._video_task and self._task_manager:
self._video_queue = WatchdogQueue(self, watchdog_enabled=self._watchdog_timers_enabled)
self._video_queue = WatchdogQueue(self._task_manager)
self._video_task = self._task_manager.create_task(
self._callback_task_handler(self._video_queue),
f"{self}::video_callback_task",

View File

@@ -416,9 +416,7 @@ class LiveKitInputTransport(BaseInputTransport):
async def _audio_in_task_handler(self):
logger.info("Audio input task started")
audio_iterator = self._client.get_next_audio_frame()
async for audio_data in WatchdogAsyncIterator(
audio_iterator, reseter=self, watchdog_enabled=self.watchdog_timers_enabled
):
async for audio_data in WatchdogAsyncIterator(audio_iterator, manager=self.task_manager):
if audio_data:
audio_frame_event, participant_id = audio_data
pipecat_audio_frame = await self._convert_livekit_audio_to_pipecat(

View File

@@ -97,13 +97,19 @@ class BaseTaskManager(ABC):
pass
@abstractmethod
def reset_watchdog(self, task: asyncio.Task):
"""Resets the given task watchdog timer. If not reset, a warning will be
logged indicating the task is stalling.
def task_reset_watchdog(self):
"""Resets the running task watchdog timer. If not reset, a warning will
be logged indicating the task is stalling.
"""
pass
@property
@abstractmethod
def task_watchdog_enabled(self) -> bool:
"""Whether the current running task has a watchdog timer enabled."""
pass
@dataclass
class TaskData:
@@ -253,18 +259,31 @@ class TaskManager(BaseTaskManager):
logger.critical(f"{name}: fatal base exception while cancelling task: {e}")
raise
def reset_watchdog(self, task: asyncio.Task):
name = task.get_name()
if name in self._tasks and self._tasks[name].enable_watchdog_timers:
self._tasks[name].watchdog_timer.set()
def current_tasks(self) -> Sequence[asyncio.Task]:
"""Returns the list of currently created/registered tasks."""
return [data.task for data in self._tasks.values()]
def reset_watchdog(self, task: asyncio.Task):
"""Resets the given task watchdog timer. If not reset on time, a warning
def task_reset_watchdog(self):
"""Resets the running task watchdog timer. If not reset on time, a warning
will be logged indicating the task is stalling.
"""
task = asyncio.current_task()
if task:
self.reset_watchdog(task)
@property
def task_watchdog_enabled(self) -> bool:
task = asyncio.current_task()
if not task:
return False
name = task.get_name()
if name in self._tasks and self._tasks[name].enable_watchdog_timers:
self._tasks[name].watchdog_timer.set()
return name in self._tasks and self._tasks[name].enable_watchdog_timers
def _add_task(self, task_data: TaskData):
name = task_data.task.get_name()

View File

@@ -7,7 +7,7 @@
import asyncio
from typing import AsyncIterator, Optional
from pipecat.utils.asyncio.watchdog_reseter import WatchdogReseter
from pipecat.utils.asyncio.task_manager import BaseTaskManager
class WatchdogAsyncIterator:
@@ -21,16 +21,14 @@ class WatchdogAsyncIterator:
self,
async_iterable,
*,
reseter: WatchdogReseter,
manager: BaseTaskManager,
timeout: float = 2.0,
watchdog_enabled: bool = False,
):
self._async_iterable = async_iterable
self._reseter = reseter
self._manager = manager
self._timeout = timeout
self._iter: Optional[AsyncIterator] = None
self._current_anext_task: Optional[asyncio.Task] = None
self._watchdog_enabled = watchdog_enabled
def __aiter__(self):
return self
@@ -39,7 +37,7 @@ class WatchdogAsyncIterator:
if not self._iter:
self._iter = await self._ensure_async_iterator(self._async_iterable)
if self._watchdog_enabled:
if self._manager.task_watchdog_enabled:
return await self._watchdog_anext()
else:
return await self._iter.__anext__()
@@ -55,14 +53,14 @@ class WatchdogAsyncIterator:
timeout=self._timeout,
)
self._reseter.reset_watchdog()
self._manager.task_reset_watchdog()
# The task has finish, so we will create a new one for th next item.
self._current_anext_task = None
return item
except asyncio.TimeoutError:
self._reseter.reset_watchdog()
self._manager.task_reset_watchdog()
except StopAsyncIteration:
self._current_anext_task = None
raise

View File

@@ -6,7 +6,7 @@
import asyncio
from pipecat.utils.asyncio.watchdog_reseter import WatchdogReseter
from pipecat.utils.asyncio.task_manager import BaseTaskManager
class WatchdogEvent(asyncio.Event):
@@ -18,18 +18,16 @@ class WatchdogEvent(asyncio.Event):
def __init__(
self,
reseter: WatchdogReseter,
manager: BaseTaskManager,
*,
timeout: float = 2.0,
watchdog_enabled: bool = False,
) -> None:
super().__init__()
self._reseter = reseter
self._manager = manager
self._timeout = timeout
self._watchdog_enabled = watchdog_enabled
async def wait(self):
if self._watchdog_enabled:
if self._manager.task_watchdog_enabled:
return await self._watchdog_wait()
else:
return await super().wait()
@@ -38,7 +36,7 @@ class WatchdogEvent(asyncio.Event):
while True:
try:
await asyncio.wait_for(super().wait(), timeout=self._timeout)
self._reseter.reset_watchdog()
self._manager.task_reset_watchdog()
return True
except asyncio.TimeoutError:
self._reseter.reset_watchdog()
self._manager.task_reset_watchdog()

View File

@@ -6,7 +6,7 @@
import asyncio
from pipecat.utils.asyncio.watchdog_reseter import WatchdogReseter
from pipecat.utils.asyncio.task_manager import BaseTaskManager
class WatchdogPriorityQueue(asyncio.PriorityQueue):
@@ -18,33 +18,31 @@ class WatchdogPriorityQueue(asyncio.PriorityQueue):
def __init__(
self,
reseter: WatchdogReseter,
manager: BaseTaskManager,
*,
maxsize: int = 0,
timeout: float = 2.0,
watchdog_enabled: bool = False,
) -> None:
super().__init__(maxsize)
self._reseter = reseter
self._manager = manager
self._timeout = timeout
self._watchdog_enabled = watchdog_enabled
async def get(self):
if self._watchdog_enabled:
if self._manager.task_watchdog_enabled:
return await self._watchdog_get()
else:
return await super().get()
def task_done(self):
if self._watchdog_enabled:
self._reseter.reset_watchdog()
if self._manager.task_watchdog_enabled:
self._manager.task_reset_watchdog()
super().task_done()
async def _watchdog_get(self):
while True:
try:
item = await asyncio.wait_for(super().get(), timeout=self._timeout)
self._reseter.reset_watchdog()
self._manager.task_reset_watchdog()
return item
except asyncio.TimeoutError:
self._reseter.reset_watchdog()
self._manager.task_reset_watchdog()

View File

@@ -6,7 +6,7 @@
import asyncio
from pipecat.utils.asyncio.watchdog_reseter import WatchdogReseter
from pipecat.utils.asyncio.task_manager import BaseTaskManager
class WatchdogQueue(asyncio.Queue):
@@ -18,33 +18,31 @@ class WatchdogQueue(asyncio.Queue):
def __init__(
self,
reseter: WatchdogReseter,
manager: BaseTaskManager,
*,
maxsize: int = 0,
timeout: float = 2.0,
watchdog_enabled: bool = False,
) -> None:
super().__init__(maxsize)
self._reseter = reseter
self._manager = manager
self._timeout = timeout
self._watchdog_enabled = watchdog_enabled
async def get(self):
if self._watchdog_enabled:
if self._manager.task_watchdog_enabled:
return await self._watchdog_get()
else:
return await super().get()
def task_done(self):
if self._watchdog_enabled:
self._reseter.reset_watchdog()
if self._manager.task_watchdog_enabled:
self._manager.task_reset_watchdog()
super().task_done()
async def _watchdog_get(self):
while True:
try:
item = await asyncio.wait_for(super().get(), timeout=self._timeout)
self._reseter.reset_watchdog()
self._manager.task_reset_watchdog()
return item
except asyncio.TimeoutError:
self._reseter.reset_watchdog()
self._manager.task_reset_watchdog()

View File

@@ -1,13 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from abc import ABC, abstractmethod
class WatchdogReseter(ABC):
@abstractmethod
def reset_watchdog(self):
pass