diff --git a/src/pipecat/pipeline/parallel_pipeline.py b/src/pipecat/pipeline/parallel_pipeline.py index 7068ed86d..300492cc5 100644 --- a/src/pipecat/pipeline/parallel_pipeline.py +++ b/src/pipecat/pipeline/parallel_pipeline.py @@ -102,8 +102,8 @@ class ParallelPipeline(BasePipeline): async def setup(self, setup: FrameProcessorSetup): await super().setup(setup) - self._up_queue = WatchdogQueue(self, watchdog_enabled=setup.watchdog_timers_enabled) - self._down_queue = WatchdogQueue(self, watchdog_enabled=setup.watchdog_timers_enabled) + self._up_queue = WatchdogQueue(setup.task_manager) + self._down_queue = WatchdogQueue(setup.task_manager) logger.debug(f"Creating {self} pipelines") for processors in self._args: diff --git a/src/pipecat/pipeline/sync_parallel_pipeline.py b/src/pipecat/pipeline/sync_parallel_pipeline.py index 6b178bc1c..006290710 100644 --- a/src/pipecat/pipeline/sync_parallel_pipeline.py +++ b/src/pipecat/pipeline/sync_parallel_pipeline.py @@ -81,8 +81,8 @@ class SyncParallelPipeline(BasePipeline): async def setup(self, setup: FrameProcessorSetup): await super().setup(setup) - self._up_queue = WatchdogQueue(self, watchdog_enabled=setup.watchdog_timers_enabled) - self._down_queue = WatchdogQueue(self, watchdog_enabled=setup.watchdog_timers_enabled) + self._up_queue = WatchdogQueue(setup.task_manager) + self._down_queue = WatchdogQueue(setup.task_manager) logger.debug(f"Creating {self} pipelines") for processors in self._args: diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 8b30d1a4e..3e8a36b6c 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -45,7 +45,6 @@ from pipecat.utils.asyncio.task_manager import ( TaskManagerParams, ) from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue -from pipecat.utils.asyncio.watchdog_reseter import WatchdogReseter from pipecat.utils.tracing.setup import is_tracing_available from pipecat.utils.tracing.turn_trace_observer import TurnTraceObserver @@ -138,7 +137,7 @@ class PipelineTaskSink(FrameProcessor): await self._down_queue.put(frame) -class PipelineTask(WatchdogReseter, BasePipelineTask): +class PipelineTask(BasePipelineTask): """Manages the execution of a pipeline, handling frame processing and task lifecycle. It has a couple of event handlers `on_frame_reached_upstream` and @@ -270,24 +269,28 @@ class PipelineTask(WatchdogReseter, BasePipelineTask): self._finished = False self._cancelled = False + # This task maneger will handle all the asyncio tasks created by this + # PipelineTask and its frame processors. + self._task_manager = task_manager or TaskManager() + # This queue receives frames coming from the pipeline upstream. - self._up_queue = WatchdogQueue(self, watchdog_enabled=enable_watchdog_timers) + self._up_queue = WatchdogQueue(self._task_manager) self._process_up_task: Optional[asyncio.Task] = None # This queue receives frames coming from the pipeline downstream. - self._down_queue = WatchdogQueue(self, watchdog_enabled=enable_watchdog_timers) + self._down_queue = WatchdogQueue(self._task_manager) self._process_down_task: Optional[asyncio.Task] = None # This queue is the queue used to push frames to the pipeline. - self._push_queue = WatchdogQueue(self, watchdog_enabled=enable_watchdog_timers) + self._push_queue = WatchdogQueue(self._task_manager) self._process_push_task: Optional[asyncio.Task] = None # This is the heartbeat queue. When a heartbeat frame is received in the # down queue we add it to the heartbeat queue for processing. - self._heartbeat_queue = WatchdogQueue(self, watchdog_enabled=enable_watchdog_timers) + self._heartbeat_queue = WatchdogQueue(self._task_manager) self._heartbeat_push_task: Optional[asyncio.Task] = None self._heartbeat_monitor_task: Optional[asyncio.Task] = None # This is the idle queue. When frames are received downstream they are # put in the queue. If no frame is received the pipeline is considered # idle. - self._idle_queue = WatchdogQueue(self, watchdog_enabled=enable_watchdog_timers) + self._idle_queue = WatchdogQueue(self._task_manager) self._idle_monitor_task: Optional[asyncio.Task] = None # This event is used to indicate a finalize frame (e.g. EndFrame, # StopFrame) has been received in the down queue. @@ -305,10 +308,6 @@ class PipelineTask(WatchdogReseter, BasePipelineTask): self._sink = PipelineTaskSink(self._down_queue) pipeline.link(self._sink) - # This task maneger will handle all the asyncio tasks created by this - # PipelineTask and its frame processors. - self._task_manager = task_manager or TaskManager() - # The task observer acts as a proxy to the provided observers. This way, # we only need to pass a single observer (using the StartFrame) which # then just acts as a proxy. @@ -440,9 +439,6 @@ class PipelineTask(WatchdogReseter, BasePipelineTask): for frame in frames: await self.queue_frame(frame) - def reset_watchdog(self): - self._task_manager.reset_watchdog(asyncio.current_task()) - async def _cancel(self): if not self._cancelled: logger.debug(f"Canceling pipeline task {self}") diff --git a/src/pipecat/pipeline/task_observer.py b/src/pipecat/pipeline/task_observer.py index ae4decbce..db03a73d1 100644 --- a/src/pipecat/pipeline/task_observer.py +++ b/src/pipecat/pipeline/task_observer.py @@ -13,7 +13,6 @@ from attr import dataclass from pipecat.observers.base_observer import BaseObserver, FramePushed from pipecat.utils.asyncio.task_manager import BaseTaskManager from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue -from pipecat.utils.asyncio.watchdog_reseter import WatchdogReseter @dataclass @@ -28,7 +27,7 @@ class Proxy: observer: BaseObserver -class TaskObserver(WatchdogReseter, BaseObserver): +class TaskObserver(BaseObserver): """This is a pipeline frame observer that is meant to be used as a proxy to the user provided observers. That is, this is the observer that should be passed to the frame processors. Then, every time a frame is pushed this @@ -54,7 +53,6 @@ class TaskObserver(WatchdogReseter, BaseObserver): self._proxies: Optional[Dict[BaseObserver, Proxy]] = ( None # Becomes a dict after start() is called ) - self._watchdog_timers_enabled = False def add_observer(self, observer: BaseObserver): # Add the observer to the list. @@ -81,7 +79,6 @@ class TaskObserver(WatchdogReseter, BaseObserver): async def start(self, watchdog_timers_enabled: bool = False): """Starts all proxy observer tasks.""" - self._watchdog_timers_enabled = watchdog_timers_enabled self._proxies = self._create_proxies(self._observers) async def stop(self): @@ -96,14 +93,11 @@ class TaskObserver(WatchdogReseter, BaseObserver): for proxy in self._proxies.values(): await proxy.queue.put(data) - def reset_watchdog(self): - self._task_manager.reset_watchdog(asyncio.current_task()) - def _started(self) -> bool: return self._proxies is not None def _create_proxy(self, observer: BaseObserver) -> Proxy: - queue = WatchdogQueue(self, watchdog_enabled=self._watchdog_timers_enabled) + queue = WatchdogQueue(self._task_manager) task = self._task_manager.create_task( self._proxy_task_handler(queue, observer), f"TaskObserver::{observer}::_proxy_task_handler", diff --git a/src/pipecat/processors/consumer_processor.py b/src/pipecat/processors/consumer_processor.py index 0440a74e1..977450181 100644 --- a/src/pipecat/processors/consumer_processor.py +++ b/src/pipecat/processors/consumer_processor.py @@ -32,7 +32,7 @@ class ConsumerProcessor(FrameProcessor): super().__init__(**kwargs) self._transformer = transformer self._direction = direction - self._queue: WatchdogQueue = producer.add_consumer(self) + self._producer = producer self._consumer_task: Optional[asyncio.Task] = None async def process_frame(self, frame: Frame, direction: FrameDirection): @@ -49,6 +49,7 @@ class ConsumerProcessor(FrameProcessor): async def _start(self, _: StartFrame): if not self._consumer_task: + self._queue: WatchdogQueue = self._producer.add_consumer() self._consumer_task = self.create_task(self._consumer_task_handler()) async def _stop(self, _: EndFrame): diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 260e549fa..1935aeb2b 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -32,7 +32,6 @@ from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMet from pipecat.utils.asyncio.task_manager import BaseTaskManager from pipecat.utils.asyncio.watchdog_event import WatchdogEvent from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue -from pipecat.utils.asyncio.watchdog_reseter import WatchdogReseter from pipecat.utils.base_object import BaseObject @@ -49,7 +48,7 @@ class FrameProcessorSetup: watchdog_timers_enabled: bool = False -class FrameProcessor(WatchdogReseter, BaseObject): +class FrameProcessor(BaseObject): def __init__( self, *, @@ -89,7 +88,6 @@ class FrameProcessor(WatchdogReseter, BaseObject): self._enable_usage_metrics = False self._report_only_initial_ttfb = False self._interruption_strategies: List[BaseInterruptionStrategy] = [] - self._watchdog_timers_enabled = False # Indicates whether we have received the StartFrame. self.__started = False @@ -147,8 +145,10 @@ class FrameProcessor(WatchdogReseter, BaseObject): return self._interruption_strategies @property - def watchdog_timers_enabled(self): - return self._watchdog_timers_enabled + def task_manager(self) -> BaseTaskManager: + if not self._task_manager: + raise Exception(f"{self} TaskManager is still not initialized.") + return self._task_manager def can_generate_metrics(self) -> bool: return False @@ -205,7 +205,7 @@ class FrameProcessor(WatchdogReseter, BaseObject): name = f"{self}::{name}" else: name = f"{self}::{coroutine.cr_code.co_name}" - return self.get_task_manager().create_task( + return self.task_manager.create_task( coroutine, name, enable_watchdog_logging=( @@ -214,7 +214,7 @@ class FrameProcessor(WatchdogReseter, BaseObject): else self._enable_watchdog_logging ), enable_watchdog_timers=( - enable_watchdog_timers if enable_watchdog_timers else self.watchdog_timers_enabled + enable_watchdog_timers if enable_watchdog_timers else self._enable_watchdog_timers ), watchdog_timeout=( watchdog_timeout_secs if watchdog_timeout_secs else self._watchdog_timeout_secs @@ -222,13 +222,13 @@ class FrameProcessor(WatchdogReseter, BaseObject): ) async def cancel_task(self, task: asyncio.Task, timeout: Optional[float] = None): - await self.get_task_manager().cancel_task(task, timeout) + await self.task_manager.cancel_task(task, timeout) async def wait_for_task(self, task: asyncio.Task, timeout: Optional[float] = None): - await self.get_task_manager().wait_for_task(task, timeout) + await self.task_manager.wait_for_task(task, timeout) def reset_watchdog(self): - self.get_task_manager().reset_watchdog(asyncio.current_task()) + self.task_manager.task_reset_watchdog() async def setup(self, setup: FrameProcessorSetup): self._clock = setup.clock @@ -240,7 +240,7 @@ class FrameProcessor(WatchdogReseter, BaseObject): else setup.watchdog_timers_enabled ) if self._metrics is not None: - await self._metrics.setup(self._task_manager, self._watchdog_timers_enabled) + await self._metrics.setup(self._task_manager) async def cleanup(self): await super().cleanup() @@ -255,7 +255,7 @@ class FrameProcessor(WatchdogReseter, BaseObject): logger.debug(f"Linking {self} -> {self._next}") def get_event_loop(self) -> asyncio.AbstractEventLoop: - return self.get_task_manager().get_event_loop() + return self.task_manager.get_event_loop() def set_parent(self, parent: "FrameProcessor"): self._parent = parent @@ -268,11 +268,6 @@ class FrameProcessor(WatchdogReseter, BaseObject): raise Exception(f"{self} Clock is still not initialized.") return self._clock - def get_task_manager(self) -> BaseTaskManager: - if not self._task_manager: - raise Exception(f"{self} TaskManager is still not initialized.") - return self._task_manager - async def queue_frame( self, frame: Frame, @@ -417,11 +412,9 @@ class FrameProcessor(WatchdogReseter, BaseObject): if not self.__input_frame_task: self.__should_block_frames = False if not self.__input_event: - self.__input_event = WatchdogEvent( - self, watchdog_enabled=self.watchdog_timers_enabled - ) + self.__input_event = WatchdogEvent(self.task_manager) self.__input_event.clear() - self.__input_queue = WatchdogQueue(self, watchdog_enabled=self.watchdog_timers_enabled) + self.__input_queue = WatchdogQueue(self.task_manager) self.__input_frame_task = self.create_task(self.__input_frame_task_handler()) async def __cancel_input_task(self): @@ -453,7 +446,7 @@ class FrameProcessor(WatchdogReseter, BaseObject): def __create_push_task(self): if not self.__push_frame_task: - self.__push_queue = WatchdogQueue(self, watchdog_enabled=self.watchdog_timers_enabled) + self.__push_queue = WatchdogQueue(self.task_manager) self.__push_frame_task = self.create_task(self.__push_frame_task_handler()) async def __cancel_push_task(self): diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index b379e522a..09291c422 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -755,10 +755,10 @@ class RTVIProcessor(FrameProcessor): async def _start(self, frame: StartFrame): if not self._action_task: - self._action_queue = WatchdogQueue(self, watchdog_enabled=self.watchdog_timers_enabled) + self._action_queue = WatchdogQueue(self.task_manager) self._action_task = self.create_task(self._action_task_handler()) if not self._message_task: - self._message_queue = WatchdogQueue(self, watchdog_enabled=self.watchdog_timers_enabled) + self._message_queue = WatchdogQueue(self.task_manager) self._message_task = self.create_task(self._message_task_handler()) await self._call_event_handler("on_bot_started") diff --git a/src/pipecat/processors/metrics/frame_processor_metrics.py b/src/pipecat/processors/metrics/frame_processor_metrics.py index ec1501122..cf08f85f6 100644 --- a/src/pipecat/processors/metrics/frame_processor_metrics.py +++ b/src/pipecat/processors/metrics/frame_processor_metrics.py @@ -18,7 +18,7 @@ from pipecat.metrics.metrics import ( TTFBMetricsData, TTSUsageMetricsData, ) -from pipecat.utils.asyncio.task_manager import TaskManager +from pipecat.utils.asyncio.task_manager import BaseTaskManager from pipecat.utils.base_object import BaseObject @@ -31,14 +31,14 @@ class FrameProcessorMetrics(BaseObject): self._last_ttfb_time = 0 self._should_report_ttfb = True - async def setup(self, task_manager: TaskManager, watchdog_timers_enabled: bool = False): + async def setup(self, task_manager: BaseTaskManager): self._task_manager = task_manager async def cleanup(self): await super().cleanup() @property - def task_manager(self) -> TaskManager: + def task_manager(self) -> BaseTaskManager: return self._task_manager @property diff --git a/src/pipecat/processors/metrics/sentry.py b/src/pipecat/processors/metrics/sentry.py index b19e9aa04..32b04a59b 100644 --- a/src/pipecat/processors/metrics/sentry.py +++ b/src/pipecat/processors/metrics/sentry.py @@ -8,9 +8,8 @@ import asyncio from loguru import logger -from pipecat.utils.asyncio.task_manager import TaskManager +from pipecat.utils.asyncio.task_manager import BaseTaskManager from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue -from pipecat.utils.asyncio.watchdog_reseter import WatchdogReseter try: import sentry_sdk @@ -22,7 +21,7 @@ except ModuleNotFoundError as e: from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics -class SentryMetrics(WatchdogReseter, FrameProcessorMetrics): +class SentryMetrics(FrameProcessorMetrics): def __init__(self): super().__init__() self._ttfb_metrics_tx = None @@ -32,10 +31,10 @@ class SentryMetrics(WatchdogReseter, FrameProcessorMetrics): logger.warning("Sentry SDK not initialized. Sentry features will be disabled.") self._sentry_task = None - async def setup(self, task_manager: TaskManager, watchdog_timers_enabled: bool = False): - await super().setup(task_manager, watchdog_timers_enabled) + async def setup(self, task_manager: BaseTaskManager): + await super().setup(task_manager) if self._sentry_available: - self._sentry_queue = WatchdogQueue(self, watchdog_enabled=watchdog_timers_enabled) + self._sentry_queue = WatchdogQueue(task_manager) self._sentry_task = self.task_manager.create_task( self._sentry_task_handler(), name=f"{self}::_sentry_task_handler" ) @@ -49,10 +48,6 @@ class SentryMetrics(WatchdogReseter, FrameProcessorMetrics): logger.trace(f"{self} Flushing Sentry metrics") sentry_sdk.flush(timeout=5.0) - def reset_watchdog(self): - if self._task_manager: - self._task_manager.reset_watchdog(asyncio.current_task()) - async def start_ttfb_metrics(self, report_only_initial_ttfb): await super().start_ttfb_metrics(report_only_initial_ttfb) diff --git a/src/pipecat/processors/producer_processor.py b/src/pipecat/processors/producer_processor.py index e00ac6e84..0a41269fb 100644 --- a/src/pipecat/processors/producer_processor.py +++ b/src/pipecat/processors/producer_processor.py @@ -37,14 +37,14 @@ class ProducerProcessor(FrameProcessor): self._passthrough = passthrough self._consumers: List[asyncio.Queue] = [] - def add_consumer(self, consumer: FrameProcessor): + def add_consumer(self): """ Adds a new consumer and returns its associated queue. Returns: asyncio.Queue: The queue for the newly added consumer. """ - queue = WatchdogQueue(consumer, watchdog_enabled=self.watchdog_timers_enabled) + queue = WatchdogQueue(self.task_manager) self._consumers.append(queue) return queue diff --git a/src/pipecat/services/anthropic/llm.py b/src/pipecat/services/anthropic/llm.py index 246efcf26..657903b90 100644 --- a/src/pipecat/services/anthropic/llm.py +++ b/src/pipecat/services/anthropic/llm.py @@ -204,9 +204,7 @@ class AnthropicLLMService(LLMService): json_accumulator = "" function_calls = [] - async for event in WatchdogAsyncIterator( - response, reseter=self, watchdog_enabled=self.watchdog_timers_enabled - ): + async for event in WatchdogAsyncIterator(response, manager=self.task_manager): # Aggregate streaming content, create frames, trigger events if event.type == "content_block_delta": diff --git a/src/pipecat/services/cartesia/tts.py b/src/pipecat/services/cartesia/tts.py index 2309c9d8c..0edbb1f11 100644 --- a/src/pipecat/services/cartesia/tts.py +++ b/src/pipecat/services/cartesia/tts.py @@ -329,7 +329,7 @@ class CartesiaTTSService(AudioContextWordTTSService): async def _receive_messages(self): async for message in WatchdogAsyncIterator( - self._get_websocket(), reseter=self, watchdog_enabled=self.watchdog_timers_enabled + self._get_websocket(), manager=self.task_manager ): msg = json.loads(message) if not msg or not self.audio_context_available(msg["context_id"]): diff --git a/src/pipecat/services/elevenlabs/tts.py b/src/pipecat/services/elevenlabs/tts.py index a8c4ae0c9..3b1a3a20c 100644 --- a/src/pipecat/services/elevenlabs/tts.py +++ b/src/pipecat/services/elevenlabs/tts.py @@ -396,7 +396,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService): async def _receive_messages(self): async for message in WatchdogAsyncIterator( - self._get_websocket(), reseter=self, watchdog_enabled=self.watchdog_timers_enabled + self._get_websocket(), manager=self.task_manager ): msg = json.loads(message) diff --git a/src/pipecat/services/gemini_multimodal_live/gemini.py b/src/pipecat/services/gemini_multimodal_live/gemini.py index 8424a8625..6c25a97ad 100644 --- a/src/pipecat/services/gemini_multimodal_live/gemini.py +++ b/src/pipecat/services/gemini_multimodal_live/gemini.py @@ -687,9 +687,7 @@ class GeminiMultimodalLiveLLMService(LLMService): # async def _receive_task_handler(self): - async for message in WatchdogAsyncIterator( - self._websocket, reseter=self, watchdog_enabled=self.watchdog_timers_enabled - ): + async for message in WatchdogAsyncIterator(self._websocket, manager=self.task_manager): evt = events.parse_server_event(message) # logger.debug(f"Received event: {message[:500]}") # logger.debug(f"Received event: {evt}") diff --git a/src/pipecat/services/gladia/stt.py b/src/pipecat/services/gladia/stt.py index b0ba81470..27b6ff1d9 100644 --- a/src/pipecat/services/gladia/stt.py +++ b/src/pipecat/services/gladia/stt.py @@ -504,9 +504,7 @@ class GladiaSTTService(STTService): async def _receive_task_handler(self): try: - async for message in WatchdogAsyncIterator( - self._websocket, reseter=self, watchdog_enabled=self.watchdog_timers_enabled - ): + async for message in WatchdogAsyncIterator(self._websocket, manager=self.task_manager): content = json.loads(message) # Handle audio chunk acknowledgments diff --git a/src/pipecat/services/google/llm.py b/src/pipecat/services/google/llm.py index a7d9b018d..75d0bd0ad 100644 --- a/src/pipecat/services/google/llm.py +++ b/src/pipecat/services/google/llm.py @@ -558,9 +558,7 @@ class GoogleLLMService(LLMService): ) function_calls = [] - async for chunk in WatchdogAsyncIterator( - response, reseter=self, watchdog_enabled=self.watchdog_timers_enabled - ): + async for chunk in WatchdogAsyncIterator(response, manager=self.task_manager): # Stop TTFB metrics after the first chunk await self.stop_ttfb_metrics() if chunk.usage_metadata: diff --git a/src/pipecat/services/google/llm_openai.py b/src/pipecat/services/google/llm_openai.py index af49fa6e0..8677179c9 100644 --- a/src/pipecat/services/google/llm_openai.py +++ b/src/pipecat/services/google/llm_openai.py @@ -54,9 +54,7 @@ class GoogleLLMOpenAIBetaService(OpenAILLMService): context ) - async for chunk in WatchdogAsyncIterator( - chunk_stream, reseter=self, watchdog_enabled=self.watchdog_timers_enabled - ): + async for chunk in WatchdogAsyncIterator(chunk_stream, manager=self.task_manager): if chunk.usage: tokens = LLMTokenUsage( prompt_tokens=chunk.usage.prompt_tokens, diff --git a/src/pipecat/services/google/stt.py b/src/pipecat/services/google/stt.py index 157810dd7..274aba2fa 100644 --- a/src/pipecat/services/google/stt.py +++ b/src/pipecat/services/google/stt.py @@ -785,7 +785,7 @@ class GoogleSTTService(STTService): """Process streaming recognition responses.""" try: async for response in WatchdogAsyncIterator( - streaming_recognize, reseter=self, watchdog_enabled=self.watchdog_timers_enabled + streaming_recognize, manager=self.task_manager ): # Check streaming limit if (int(time.time() * 1000) - self._stream_start_time) > self.STREAMING_LIMIT: diff --git a/src/pipecat/services/neuphonic/tts.py b/src/pipecat/services/neuphonic/tts.py index 6d9a47a03..079a29420 100644 --- a/src/pipecat/services/neuphonic/tts.py +++ b/src/pipecat/services/neuphonic/tts.py @@ -222,9 +222,7 @@ class NeuphonicTTSService(InterruptibleTTSService): self._websocket = None async def _receive_messages(self): - async for message in WatchdogAsyncIterator( - self._websocket, reseter=self, watchdog_enabled=self.watchdog_timers_enabled - ): + async for message in WatchdogAsyncIterator(self._websocket, manager=self.task_manager): if isinstance(message, str): msg = json.loads(message) if msg.get("data", {}).get("audio") is not None: diff --git a/src/pipecat/services/openai/base_llm.py b/src/pipecat/services/openai/base_llm.py index f3e6b0bb3..b307b3e21 100644 --- a/src/pipecat/services/openai/base_llm.py +++ b/src/pipecat/services/openai/base_llm.py @@ -245,9 +245,7 @@ class BaseOpenAILLMService(LLMService): context ) - async for chunk in WatchdogAsyncIterator( - chunk_stream, reseter=self, watchdog_enabled=self.watchdog_timers_enabled - ): + async for chunk in WatchdogAsyncIterator(chunk_stream, manager=self.task_manager): if chunk.usage: tokens = LLMTokenUsage( prompt_tokens=chunk.usage.prompt_tokens, diff --git a/src/pipecat/services/openai_realtime_beta/openai.py b/src/pipecat/services/openai_realtime_beta/openai.py index da70b6118..d6ff23111 100644 --- a/src/pipecat/services/openai_realtime_beta/openai.py +++ b/src/pipecat/services/openai_realtime_beta/openai.py @@ -370,9 +370,7 @@ class OpenAIRealtimeBetaLLMService(LLMService): # async def _receive_task_handler(self): - async for message in WatchdogAsyncIterator( - self._websocket, reseter=self, watchdog_enabled=self.watchdog_timers_enabled - ): + async for message in WatchdogAsyncIterator(self._websocket, manager=self.task_manager): evt = events.parse_server_event(message) if evt.type == "session.created": await self._handle_evt_session_created(evt) diff --git a/src/pipecat/services/riva/stt.py b/src/pipecat/services/riva/stt.py index 22fde1f54..0d2330bef 100644 --- a/src/pipecat/services/riva/stt.py +++ b/src/pipecat/services/riva/stt.py @@ -199,9 +199,7 @@ class RivaSTTService(STTService): self._thread_task = self.create_task(self._thread_task_handler()) if not self._response_task: - self._response_queue = WatchdogQueue( - self, watchdog_enabled=self.watchdog_timers_enabled - ) + self._response_queue = WatchdogQueue(self.task_manager) self._response_task = self.create_task(self._response_task_handler()) async def stop(self, frame: EndFrame): diff --git a/src/pipecat/services/sambanova/llm.py b/src/pipecat/services/sambanova/llm.py index d70860a7e..19382819c 100644 --- a/src/pipecat/services/sambanova/llm.py +++ b/src/pipecat/services/sambanova/llm.py @@ -95,9 +95,7 @@ class SambaNovaLLMService(OpenAILLMService): # type: ignore context ) - async for chunk in WatchdogAsyncIterator( - chunk_stream, reseter=self, watchdog_enabled=self.watchdog_timers_enabled - ): + async for chunk in WatchdogAsyncIterator(chunk_stream, manager=self.task_manager): if chunk.usage: tokens = LLMTokenUsage( prompt_tokens=chunk.usage.prompt_tokens, diff --git a/src/pipecat/services/simli/video.py b/src/pipecat/services/simli/video.py index aef4d8022..5ddcbcba8 100644 --- a/src/pipecat/services/simli/video.py +++ b/src/pipecat/services/simli/video.py @@ -63,9 +63,7 @@ class SimliVideoService(FrameProcessor): async def _consume_and_process_audio(self): await self._pipecat_resampler_event.wait() audio_iterator = self._simli_client.getAudioStreamIterator() - async for audio_frame in WatchdogAsyncIterator( - audio_iterator, reseter=self, watchdog_enabled=self.watchdog_timers_enabled - ): + async for audio_frame in WatchdogAsyncIterator(audio_iterator, manager=self.task_manager): resampled_frames = self._pipecat_resampler.resample(audio_frame) for resampled_frame in resampled_frames: audio_array = resampled_frame.to_ndarray() @@ -82,9 +80,7 @@ class SimliVideoService(FrameProcessor): async def _consume_and_process_video(self): await self._pipecat_resampler_event.wait() video_iterator = self._simli_client.getVideoStreamIterator(targetFormat="rgb24") - async for video_frame in WatchdogAsyncIterator( - video_iterator, reseter=self, watchdog_enabled=self.watchdog_timers_enabled - ): + async for video_frame in WatchdogAsyncIterator(video_iterator, manager=self.task_manager): # Process the video frame convertedFrame: OutputImageRawFrame = OutputImageRawFrame( image=video_frame.to_rgb().to_image().tobytes(), diff --git a/src/pipecat/services/tavus/video.py b/src/pipecat/services/tavus/video.py index c71a5159c..e97da71b9 100644 --- a/src/pipecat/services/tavus/video.py +++ b/src/pipecat/services/tavus/video.py @@ -188,7 +188,7 @@ class TavusVideoService(AIService): async def _create_send_task(self): if not self._send_task: - self._queue = WatchdogQueue(self, watchdog_enabled=self.watchdog_timers_enabled) + self._queue = WatchdogQueue(self.task_manager) self._send_task = self.create_task(self._send_task_handler()) async def _cancel_send_task(self): diff --git a/src/pipecat/services/tts_service.py b/src/pipecat/services/tts_service.py index 6fb0f4988..e50244986 100644 --- a/src/pipecat/services/tts_service.py +++ b/src/pipecat/services/tts_service.py @@ -505,7 +505,7 @@ class WordTTSService(TTSService): def _create_words_task(self): if not self._words_task: - self._words_queue = WatchdogQueue(self, watchdog_enabled=self.watchdog_timers_enabled) + self._words_queue = WatchdogQueue(self.task_manager) self._words_task = self.create_task(self._words_task_handler()) async def _stop_words_task(self): @@ -787,9 +787,7 @@ class AudioContextWordTTSService(WebsocketWordTTSService): def _create_audio_context_task(self): if not self._audio_context_task: - self._contexts_queue = WatchdogQueue( - self, watchdog_enabled=self.watchdog_timers_enabled - ) + self._contexts_queue = WatchdogQueue(self.task_manager) self._contexts: Dict[str, asyncio.Queue] = {} self._audio_context_task = self.create_task(self._audio_context_task_handler()) diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index 95aba7320..36d0536d7 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -602,9 +602,7 @@ class BaseOutputTransport(FrameProcessor): def _create_clock_task(self): if not self._clock_task: - self._clock_queue = WatchdogPriorityQueue( - self._transport, watchdog_enabled=self._transport.watchdog_timers_enabled - ) + self._clock_queue = WatchdogPriorityQueue(self._transport.task_manager) self._clock_task = self._transport.create_task(self._clock_task_handler()) async def _cancel_clock_task(self): diff --git a/src/pipecat/transports/network/fastapi_websocket.py b/src/pipecat/transports/network/fastapi_websocket.py index 790f5f99a..5ddaacff7 100644 --- a/src/pipecat/transports/network/fastapi_websocket.py +++ b/src/pipecat/transports/network/fastapi_websocket.py @@ -180,7 +180,7 @@ class FastAPIWebsocketInputTransport(BaseInputTransport): async def _receive_messages(self): try: async for message in WatchdogAsyncIterator( - self._client.receive(), reseter=self, watchdog_enabled=self.watchdog_timers_enabled + self._client.receive(), manager=self.task_manager ): if not self._params.serializer: continue diff --git a/src/pipecat/transports/network/small_webrtc.py b/src/pipecat/transports/network/small_webrtc.py index 89895b8ab..9eedd7d95 100644 --- a/src/pipecat/transports/network/small_webrtc.py +++ b/src/pipecat/transports/network/small_webrtc.py @@ -425,7 +425,7 @@ class SmallWebRTCInputTransport(BaseInputTransport): try: audio_iterator = self._client.read_audio_frame() async for audio_frame in WatchdogAsyncIterator( - audio_iterator, reseter=self, watchdog_enabled=self.watchdog_timers_enabled + audio_iterator, manager=self.task_manager ): if audio_frame: await self.push_audio_frame(audio_frame) @@ -437,7 +437,7 @@ class SmallWebRTCInputTransport(BaseInputTransport): try: video_iterator = self._client.read_video_frame() async for video_frame in WatchdogAsyncIterator( - video_iterator, reseter=self, watchdog_enabled=self.watchdog_timers_enabled + video_iterator, manager=self.task_manager ): if video_frame: await self.push_video_frame(video_frame) diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 42c126eb4..4c00fa44c 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -41,7 +41,6 @@ from pipecat.transports.base_output import BaseOutputTransport from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.utils.asyncio.task_manager import BaseTaskManager from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue -from pipecat.utils.asyncio.watchdog_reseter import WatchdogReseter try: from daily import ( @@ -252,7 +251,7 @@ class DailyAudioTrack: track: CustomAudioTrack -class DailyTransportClient(WatchdogReseter, EventHandler): +class DailyTransportClient(EventHandler): """Core client for interacting with Daily's API. Manages the connection to Daily rooms and handles all low-level API interactions. @@ -395,10 +394,6 @@ class DailyTransportClient(WatchdogReseter, EventHandler): if not frame.transport_destination and self._camera: self._camera.write_frame(frame.image) - def reset_watchdog(self): - if self._task_manager: - self._task_manager.reset_watchdog(asyncio.current_task()) - async def setup(self, setup: FrameProcessorSetup): if self._task_manager: return @@ -406,7 +401,7 @@ class DailyTransportClient(WatchdogReseter, EventHandler): self._task_manager = setup.task_manager self._watchdog_timers_enabled = setup.watchdog_timers_enabled - self._event_queue = WatchdogQueue(self, watchdog_enabled=self._watchdog_timers_enabled) + self._event_queue = WatchdogQueue(self._task_manager) self._event_task = self._task_manager.create_task( self._callback_task_handler(self._event_queue), f"{self}::event_callback_task", @@ -431,14 +426,14 @@ class DailyTransportClient(WatchdogReseter, EventHandler): self._out_sample_rate = self._params.audio_out_sample_rate or frame.audio_out_sample_rate if self._params.audio_in_enabled and not self._audio_task and self._task_manager: - self._audio_queue = WatchdogQueue(self, watchdog_enabled=self._watchdog_timers_enabled) + self._audio_queue = WatchdogQueue(self._task_manager) self._audio_task = self._task_manager.create_task( self._callback_task_handler(self._audio_queue), f"{self}::audio_callback_task", ) if self._params.video_in_enabled and not self._video_task and self._task_manager: - self._video_queue = WatchdogQueue(self, watchdog_enabled=self._watchdog_timers_enabled) + self._video_queue = WatchdogQueue(self._task_manager) self._video_task = self._task_manager.create_task( self._callback_task_handler(self._video_queue), f"{self}::video_callback_task", diff --git a/src/pipecat/transports/services/livekit.py b/src/pipecat/transports/services/livekit.py index 9524bb9e3..53dd091ef 100644 --- a/src/pipecat/transports/services/livekit.py +++ b/src/pipecat/transports/services/livekit.py @@ -416,9 +416,7 @@ class LiveKitInputTransport(BaseInputTransport): async def _audio_in_task_handler(self): logger.info("Audio input task started") audio_iterator = self._client.get_next_audio_frame() - async for audio_data in WatchdogAsyncIterator( - audio_iterator, reseter=self, watchdog_enabled=self.watchdog_timers_enabled - ): + async for audio_data in WatchdogAsyncIterator(audio_iterator, manager=self.task_manager): if audio_data: audio_frame_event, participant_id = audio_data pipecat_audio_frame = await self._convert_livekit_audio_to_pipecat( diff --git a/src/pipecat/utils/asyncio/task_manager.py b/src/pipecat/utils/asyncio/task_manager.py index a6f2f1a7f..844536186 100644 --- a/src/pipecat/utils/asyncio/task_manager.py +++ b/src/pipecat/utils/asyncio/task_manager.py @@ -97,13 +97,19 @@ class BaseTaskManager(ABC): pass @abstractmethod - def reset_watchdog(self, task: asyncio.Task): - """Resets the given task watchdog timer. If not reset, a warning will be - logged indicating the task is stalling. + def task_reset_watchdog(self): + """Resets the running task watchdog timer. If not reset, a warning will + be logged indicating the task is stalling. """ pass + @property + @abstractmethod + def task_watchdog_enabled(self) -> bool: + """Whether the current running task has a watchdog timer enabled.""" + pass + @dataclass class TaskData: @@ -253,18 +259,31 @@ class TaskManager(BaseTaskManager): logger.critical(f"{name}: fatal base exception while cancelling task: {e}") raise + def reset_watchdog(self, task: asyncio.Task): + name = task.get_name() + if name in self._tasks and self._tasks[name].enable_watchdog_timers: + self._tasks[name].watchdog_timer.set() + def current_tasks(self) -> Sequence[asyncio.Task]: """Returns the list of currently created/registered tasks.""" return [data.task for data in self._tasks.values()] - def reset_watchdog(self, task: asyncio.Task): - """Resets the given task watchdog timer. If not reset on time, a warning + def task_reset_watchdog(self): + """Resets the running task watchdog timer. If not reset on time, a warning will be logged indicating the task is stalling. """ + task = asyncio.current_task() + if task: + self.reset_watchdog(task) + + @property + def task_watchdog_enabled(self) -> bool: + task = asyncio.current_task() + if not task: + return False name = task.get_name() - if name in self._tasks and self._tasks[name].enable_watchdog_timers: - self._tasks[name].watchdog_timer.set() + return name in self._tasks and self._tasks[name].enable_watchdog_timers def _add_task(self, task_data: TaskData): name = task_data.task.get_name() diff --git a/src/pipecat/utils/asyncio/watchdog_async_iterator.py b/src/pipecat/utils/asyncio/watchdog_async_iterator.py index e35d3d54c..e71b37ae3 100644 --- a/src/pipecat/utils/asyncio/watchdog_async_iterator.py +++ b/src/pipecat/utils/asyncio/watchdog_async_iterator.py @@ -7,7 +7,7 @@ import asyncio from typing import AsyncIterator, Optional -from pipecat.utils.asyncio.watchdog_reseter import WatchdogReseter +from pipecat.utils.asyncio.task_manager import BaseTaskManager class WatchdogAsyncIterator: @@ -21,16 +21,14 @@ class WatchdogAsyncIterator: self, async_iterable, *, - reseter: WatchdogReseter, + manager: BaseTaskManager, timeout: float = 2.0, - watchdog_enabled: bool = False, ): self._async_iterable = async_iterable - self._reseter = reseter + self._manager = manager self._timeout = timeout self._iter: Optional[AsyncIterator] = None self._current_anext_task: Optional[asyncio.Task] = None - self._watchdog_enabled = watchdog_enabled def __aiter__(self): return self @@ -39,7 +37,7 @@ class WatchdogAsyncIterator: if not self._iter: self._iter = await self._ensure_async_iterator(self._async_iterable) - if self._watchdog_enabled: + if self._manager.task_watchdog_enabled: return await self._watchdog_anext() else: return await self._iter.__anext__() @@ -55,14 +53,14 @@ class WatchdogAsyncIterator: timeout=self._timeout, ) - self._reseter.reset_watchdog() + self._manager.task_reset_watchdog() # The task has finish, so we will create a new one for th next item. self._current_anext_task = None return item except asyncio.TimeoutError: - self._reseter.reset_watchdog() + self._manager.task_reset_watchdog() except StopAsyncIteration: self._current_anext_task = None raise diff --git a/src/pipecat/utils/asyncio/watchdog_event.py b/src/pipecat/utils/asyncio/watchdog_event.py index 823c0db82..65453f6ec 100644 --- a/src/pipecat/utils/asyncio/watchdog_event.py +++ b/src/pipecat/utils/asyncio/watchdog_event.py @@ -6,7 +6,7 @@ import asyncio -from pipecat.utils.asyncio.watchdog_reseter import WatchdogReseter +from pipecat.utils.asyncio.task_manager import BaseTaskManager class WatchdogEvent(asyncio.Event): @@ -18,18 +18,16 @@ class WatchdogEvent(asyncio.Event): def __init__( self, - reseter: WatchdogReseter, + manager: BaseTaskManager, *, timeout: float = 2.0, - watchdog_enabled: bool = False, ) -> None: super().__init__() - self._reseter = reseter + self._manager = manager self._timeout = timeout - self._watchdog_enabled = watchdog_enabled async def wait(self): - if self._watchdog_enabled: + if self._manager.task_watchdog_enabled: return await self._watchdog_wait() else: return await super().wait() @@ -38,7 +36,7 @@ class WatchdogEvent(asyncio.Event): while True: try: await asyncio.wait_for(super().wait(), timeout=self._timeout) - self._reseter.reset_watchdog() + self._manager.task_reset_watchdog() return True except asyncio.TimeoutError: - self._reseter.reset_watchdog() + self._manager.task_reset_watchdog() diff --git a/src/pipecat/utils/asyncio/watchdog_priority_queue.py b/src/pipecat/utils/asyncio/watchdog_priority_queue.py index fb1071c58..31d358fc7 100644 --- a/src/pipecat/utils/asyncio/watchdog_priority_queue.py +++ b/src/pipecat/utils/asyncio/watchdog_priority_queue.py @@ -6,7 +6,7 @@ import asyncio -from pipecat.utils.asyncio.watchdog_reseter import WatchdogReseter +from pipecat.utils.asyncio.task_manager import BaseTaskManager class WatchdogPriorityQueue(asyncio.PriorityQueue): @@ -18,33 +18,31 @@ class WatchdogPriorityQueue(asyncio.PriorityQueue): def __init__( self, - reseter: WatchdogReseter, + manager: BaseTaskManager, *, maxsize: int = 0, timeout: float = 2.0, - watchdog_enabled: bool = False, ) -> None: super().__init__(maxsize) - self._reseter = reseter + self._manager = manager self._timeout = timeout - self._watchdog_enabled = watchdog_enabled async def get(self): - if self._watchdog_enabled: + if self._manager.task_watchdog_enabled: return await self._watchdog_get() else: return await super().get() def task_done(self): - if self._watchdog_enabled: - self._reseter.reset_watchdog() + if self._manager.task_watchdog_enabled: + self._manager.task_reset_watchdog() super().task_done() async def _watchdog_get(self): while True: try: item = await asyncio.wait_for(super().get(), timeout=self._timeout) - self._reseter.reset_watchdog() + self._manager.task_reset_watchdog() return item except asyncio.TimeoutError: - self._reseter.reset_watchdog() + self._manager.task_reset_watchdog() diff --git a/src/pipecat/utils/asyncio/watchdog_queue.py b/src/pipecat/utils/asyncio/watchdog_queue.py index 5a9d86cb6..961324b7b 100644 --- a/src/pipecat/utils/asyncio/watchdog_queue.py +++ b/src/pipecat/utils/asyncio/watchdog_queue.py @@ -6,7 +6,7 @@ import asyncio -from pipecat.utils.asyncio.watchdog_reseter import WatchdogReseter +from pipecat.utils.asyncio.task_manager import BaseTaskManager class WatchdogQueue(asyncio.Queue): @@ -18,33 +18,31 @@ class WatchdogQueue(asyncio.Queue): def __init__( self, - reseter: WatchdogReseter, + manager: BaseTaskManager, *, maxsize: int = 0, timeout: float = 2.0, - watchdog_enabled: bool = False, ) -> None: super().__init__(maxsize) - self._reseter = reseter + self._manager = manager self._timeout = timeout - self._watchdog_enabled = watchdog_enabled async def get(self): - if self._watchdog_enabled: + if self._manager.task_watchdog_enabled: return await self._watchdog_get() else: return await super().get() def task_done(self): - if self._watchdog_enabled: - self._reseter.reset_watchdog() + if self._manager.task_watchdog_enabled: + self._manager.task_reset_watchdog() super().task_done() async def _watchdog_get(self): while True: try: item = await asyncio.wait_for(super().get(), timeout=self._timeout) - self._reseter.reset_watchdog() + self._manager.task_reset_watchdog() return item except asyncio.TimeoutError: - self._reseter.reset_watchdog() + self._manager.task_reset_watchdog() diff --git a/src/pipecat/utils/asyncio/watchdog_reseter.py b/src/pipecat/utils/asyncio/watchdog_reseter.py deleted file mode 100644 index ee70207b3..000000000 --- a/src/pipecat/utils/asyncio/watchdog_reseter.py +++ /dev/null @@ -1,13 +0,0 @@ -# -# Copyright (c) 2024–2025, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# - -from abc import ABC, abstractmethod - - -class WatchdogReseter(ABC): - @abstractmethod - def reset_watchdog(self): - pass