From 5310d903eca69062932d8ff23954ffc16fbb2799 Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Tue, 1 Jul 2025 17:04:27 -0300 Subject: [PATCH 1/4] Adding the requirements and needed variables for the freeze-test example. --- examples/freeze-test/client/src/app.ts | 12 ++++++- examples/freeze-test/env.example | 4 +++ examples/freeze-test/freeze_test_bot.py | 45 ++++++++++++++++++++++--- examples/freeze-test/requirements.txt | 4 +++ 4 files changed, 60 insertions(+), 5 deletions(-) create mode 100644 examples/freeze-test/env.example create mode 100644 examples/freeze-test/requirements.txt diff --git a/examples/freeze-test/client/src/app.ts b/examples/freeze-test/client/src/app.ts index 2e8315539..7b565ea71 100644 --- a/examples/freeze-test/client/src/app.ts +++ b/examples/freeze-test/client/src/app.ts @@ -191,7 +191,17 @@ class WebsocketClientApp { const startTime = Date.now(); this.recordingSerializer = new RecordingSerializer() - const transport = this.ENABLE_RECORDING_MODE ? new WebSocketTransport({serializer: this.recordingSerializer}) : new WebSocketTransport(); + const transport = this.ENABLE_RECORDING_MODE ? + new WebSocketTransport({ + serializer: this.recordingSerializer, + recorderSampleRate: 8000, + playerSampleRate:8000 + }) : + new WebSocketTransport({ + serializer: new ProtobufFrameSerializer(), + recorderSampleRate: 8000, + playerSampleRate:8000 + }); this.websocketTransport = transport const RTVIConfig: RTVIClientOptions = { diff --git a/examples/freeze-test/env.example b/examples/freeze-test/env.example new file mode 100644 index 000000000..7a94b2eee --- /dev/null +++ b/examples/freeze-test/env.example @@ -0,0 +1,4 @@ +SENTRY_DSN= +DEEPGRAM_API_KEY= +CARTESIA_API_KEY= +OPENAI_API_KEY= \ No newline at end of file diff --git a/examples/freeze-test/freeze_test_bot.py b/examples/freeze-test/freeze_test_bot.py index 52ef5fc89..8ce3df17a 100644 --- a/examples/freeze-test/freeze_test_bot.py +++ b/examples/freeze-test/freeze_test_bot.py @@ -18,7 +18,6 @@ from fastapi import FastAPI, Request, WebSocket from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import RedirectResponse from loguru import logger -from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import ( @@ -27,11 +26,13 @@ from pipecat.frames.frames import ( Frame, InterimTranscriptionFrame, LLMFullResponseEndFrame, + LLMMessagesFrame, StartFrame, StartInterruptionFrame, StopFrame, StopInterruptionFrame, TranscriptionFrame, + TTSSpeakFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame, ) @@ -47,6 +48,7 @@ from pipecat.processors.aggregators.openai_llm_context import ( from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIProcessor from pipecat.processors.metrics.sentry import SentryMetrics +from pipecat.processors.user_idle_processor import UserIdleProcessor from pipecat.serializers.protobuf import ProtobufFrameSerializer from pipecat.services.cartesia.tts import CartesiaTTSService from pipecat.services.deepgram.stt import DeepgramSTTService @@ -78,9 +80,6 @@ app.add_middleware( allow_headers=["*"], ) -# Mount the frontend at / -app.mount("/client", SmallWebRTCPrebuiltUI) - class SimulateFreezeInput(FrameProcessor): def __init__( @@ -188,6 +187,37 @@ async def run_example(websocket_client): stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + async def handle_user_idle(user_idle: UserIdleProcessor, retry_count: int) -> bool: + if retry_count == 1: + # First attempt: Add a gentle prompt to the conversation + messages.append( + { + "role": "system", + "content": "The user has been quiet. Politely and briefly ask if they're still there.", + } + ) + await user_idle.push_frame(LLMMessagesFrame(messages)) + return True + elif retry_count == 2: + # Second attempt: More direct prompt + messages.append( + { + "role": "system", + "content": "The user is still inactive. Ask if they'd like to continue our conversation.", + } + ) + await user_idle.push_frame(LLMMessagesFrame(messages)) + return True + else: + # Third attempt: End the conversation + await user_idle.push_frame( + TTSSpeakFrame("It seems like you're busy right now. Have a nice day!") + ) + await task.queue_frame(EndFrame()) + return False + + user_idle = UserIdleProcessor(callback=handle_user_idle, timeout=10.0) + tts = CartesiaTTSService( api_key=os.getenv("CARTESIA_API_KEY"), voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady @@ -222,6 +252,7 @@ async def run_example(websocket_client): stt, ], ), + user_idle, rtvi, context_aggregator.user(), # User responses llm, # LLM @@ -238,6 +269,8 @@ async def run_example(websocket_client): enable_metrics=True, enable_usage_metrics=True, report_only_initial_ttfb=True, + audio_in_sample_rate=8000, + audio_out_sample_rate=8000, ), idle_timeout_secs=120, observers=[ @@ -249,6 +282,10 @@ async def run_example(websocket_client): # LLMTextFrame: None, OpenAILLMContextFrame: None, LLMFullResponseEndFrame: None, + UserStartedSpeakingFrame: None, + UserStoppedSpeakingFrame: None, + StartInterruptionFrame: None, + StopInterruptionFrame: None, }, exclude_fields={ "result", diff --git a/examples/freeze-test/requirements.txt b/examples/freeze-test/requirements.txt new file mode 100644 index 000000000..8e3d7f12f --- /dev/null +++ b/examples/freeze-test/requirements.txt @@ -0,0 +1,4 @@ +python-dotenv +fastapi[all] +uvicorn +pipecat-ai[silero,websocket,openai, deepgram, cartesia, sentry] From fccd48bfff23510e1d87f110ad7dfc844e0d0513 Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Tue, 1 Jul 2025 17:05:18 -0300 Subject: [PATCH 2/4] Fixing pipeline freeze when using Python 3.10 --- CHANGELOG.md | 3 ++ src/pipecat/pipeline/parallel_pipeline.py | 2 ++ src/pipecat/processors/consumer_processor.py | 1 + src/pipecat/processors/frame_processor.py | 2 ++ src/pipecat/processors/frameworks/rtvi.py | 2 ++ .../processors/idle_frame_processor.py | 3 +- src/pipecat/processors/user_idle_processor.py | 7 +++- src/pipecat/services/tavus/video.py | 1 + src/pipecat/services/tts_service.py | 1 + src/pipecat/transports/base_output.py | 1 + src/pipecat/utils/asyncio/task_manager.py | 22 ++++++++++++ src/pipecat/utils/asyncio/watchdog_event.py | 5 +++ .../utils/asyncio/watchdog_priority_queue.py | 35 +++++++++++++++++-- src/pipecat/utils/asyncio/watchdog_queue.py | 34 ++++++++++++++++-- 14 files changed, 113 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 834faaa94..9a12496d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed a race condition that occurs in Python 3.10+ where the task could miss + the `CancelledError` and continue running indefinitely, freezing the pipeline. + - Fixed a `AWSNovaSonicLLMService` issue introduced in 0.0.72. ## [0.0.73] - 2025-06-26 diff --git a/src/pipecat/pipeline/parallel_pipeline.py b/src/pipecat/pipeline/parallel_pipeline.py index 57b3a82c3..250a3e989 100644 --- a/src/pipecat/pipeline/parallel_pipeline.py +++ b/src/pipecat/pipeline/parallel_pipeline.py @@ -258,9 +258,11 @@ class ParallelPipeline(BasePipeline): async def _cancel(self): """Cancel all parallel pipeline processing tasks.""" if self._up_task: + self._up_queue.cancel() await self.cancel_task(self._up_task) self._up_task = None if self._down_task: + self._down_queue.cancel() await self.cancel_task(self._down_task) self._down_task = None diff --git a/src/pipecat/processors/consumer_processor.py b/src/pipecat/processors/consumer_processor.py index 277cef2cd..7812bbbd3 100644 --- a/src/pipecat/processors/consumer_processor.py +++ b/src/pipecat/processors/consumer_processor.py @@ -77,6 +77,7 @@ class ConsumerProcessor(FrameProcessor): async def _cancel(self, _: CancelFrame): """Cancel the consumer task.""" if self._consumer_task: + self._queue.cancel() await self.cancel_task(self._consumer_task) async def _consumer_task_handler(self): diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 4105f4179..0d5a6db62 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -651,6 +651,7 @@ class FrameProcessor(BaseObject): async def __cancel_input_task(self): """Cancel the input processing task.""" if self.__input_frame_task: + self.__input_queue.cancel() await self.cancel_task(self.__input_frame_task) self.__input_frame_task = None @@ -686,6 +687,7 @@ class FrameProcessor(BaseObject): async def __cancel_push_task(self): """Cancel the frame pushing task.""" if self.__push_frame_task: + self.__push_queue.cancel() await self.cancel_task(self.__push_frame_task) self.__push_frame_task = None diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index 22d5370f2..7c03561a9 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -1086,10 +1086,12 @@ class RTVIProcessor(FrameProcessor): async def _cancel_tasks(self): """Cancel all running tasks.""" if self._action_task: + self._action_queue.cancel() await self.cancel_task(self._action_task) self._action_task = None if self._message_task: + self._message_queue.cancel() await self.cancel_task(self._message_task) self._message_task = None diff --git a/src/pipecat/processors/idle_frame_processor.py b/src/pipecat/processors/idle_frame_processor.py index 672027491..b8839124a 100644 --- a/src/pipecat/processors/idle_frame_processor.py +++ b/src/pipecat/processors/idle_frame_processor.py @@ -11,6 +11,7 @@ from typing import Awaitable, Callable, List, Optional from pipecat.frames.frames import Frame, StartFrame from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.utils.asyncio.watchdog_event import WatchdogEvent class IdleFrameProcessor(FrameProcessor): @@ -77,7 +78,7 @@ class IdleFrameProcessor(FrameProcessor): def _create_idle_task(self): """Create and start the idle monitoring task.""" if not self._idle_task: - self._idle_event = asyncio.Event() + self._idle_event = WatchdogEvent(self.task_manager) self._idle_task = self.create_task(self._idle_task_handler()) async def _idle_task_handler(self): diff --git a/src/pipecat/processors/user_idle_processor.py b/src/pipecat/processors/user_idle_processor.py index 1a442fd8a..e98320b8b 100644 --- a/src/pipecat/processors/user_idle_processor.py +++ b/src/pipecat/processors/user_idle_processor.py @@ -15,10 +15,12 @@ from pipecat.frames.frames import ( CancelFrame, EndFrame, Frame, + StartFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.utils.asyncio.watchdog_event import WatchdogEvent class UserIdleProcessor(FrameProcessor): @@ -74,7 +76,7 @@ class UserIdleProcessor(FrameProcessor): self._interrupted = False self._conversation_started = False self._idle_task = None - self._idle_event = asyncio.Event() + self._idle_event = None def _wrap_callback( self, @@ -134,6 +136,9 @@ class UserIdleProcessor(FrameProcessor): """ await super().process_frame(frame, direction) + if isinstance(frame, StartFrame): + self._idle_event = WatchdogEvent(self.task_manager) + # Check for end frames before processing if isinstance(frame, (EndFrame, CancelFrame)): # Stop the idle task, if it exists diff --git a/src/pipecat/services/tavus/video.py b/src/pipecat/services/tavus/video.py index 9f40b709d..d633329bb 100644 --- a/src/pipecat/services/tavus/video.py +++ b/src/pipecat/services/tavus/video.py @@ -244,6 +244,7 @@ class TavusVideoService(AIService): async def _cancel_send_task(self): """Cancel the audio sending task if it exists.""" if self._send_task: + self._queue.cancel() await self.cancel_task(self._send_task) self._send_task = None diff --git a/src/pipecat/services/tts_service.py b/src/pipecat/services/tts_service.py index 1d97045fe..43ab5634f 100644 --- a/src/pipecat/services/tts_service.py +++ b/src/pipecat/services/tts_service.py @@ -805,6 +805,7 @@ class AudioContextWordTTSService(WebsocketWordTTSService): async def _stop_audio_context_task(self): if self._audio_context_task: + self._contexts_queue.cancel() await self.cancel_task(self._audio_context_task) self._audio_context_task = None diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index f90b4c553..93a11f5a3 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -810,6 +810,7 @@ class BaseOutputTransport(FrameProcessor): async def _cancel_clock_task(self): """Cancel and cleanup the clock processing task.""" if self._clock_task: + self._clock_queue.cancel() await self._transport.cancel_task(self._clock_task) self._clock_task = None diff --git a/src/pipecat/utils/asyncio/task_manager.py b/src/pipecat/utils/asyncio/task_manager.py index aaa340399..3bc3453ac 100644 --- a/src/pipecat/utils/asyncio/task_manager.py +++ b/src/pipecat/utils/asyncio/task_manager.py @@ -295,6 +295,9 @@ class TaskManager(BaseTaskManager): raise except Exception as e: logger.exception(f"{name}: unexpected exception while stopping task: {e}") + except BaseException as e: + logger.critical(f"{name}: fatal base exception while stopping task: {e}") + raise async def cancel_task(self, task: asyncio.Task, timeout: Optional[float] = None): """Cancels the given asyncio Task and awaits its completion with an optional timeout. @@ -394,6 +397,10 @@ class TaskManager(BaseTaskManager): while True: try: + if task_data.task.done(): + logger.debug(f"{name}: task is already done, cancelling watchdog task.") + break + start_time = time.time() await asyncio.wait_for(timer.wait(), timeout=watchdog_timeout) total_time = time.time() - start_time @@ -417,7 +424,22 @@ class TaskManager(BaseTaskManager): task_data = self._tasks[name] if task_data.watchdog_task: task_data.watchdog_task.cancel() + # In Python 3.10, simply calling task.cancel() looks like is not enough. + # Without this, some tasks appear that are never canceled. + # Python 3.12 handles this more gracefully, but we keep this for compatibility + # and to avoid "Task exception was never retrieved" warnings. + self.get_event_loop().create_task( + self._cleanup_watchdog(name, task_data.watchdog_task) + ) task_data.watchdog_task = None del self._tasks[name] except KeyError as e: logger.trace(f"{name}: unable to remove task data (already removed?): {e}") + + async def _cleanup_watchdog(self, name: str, watchdog_task: asyncio.Task): + try: + await watchdog_task + except asyncio.CancelledError: + pass + except Exception as e: + logger.warning(f"{name}: watchdog task raised exception: {e}") diff --git a/src/pipecat/utils/asyncio/watchdog_event.py b/src/pipecat/utils/asyncio/watchdog_event.py index b2b306618..73a49ad3e 100644 --- a/src/pipecat/utils/asyncio/watchdog_event.py +++ b/src/pipecat/utils/asyncio/watchdog_event.py @@ -60,3 +60,8 @@ class WatchdogEvent(asyncio.Event): return True except asyncio.TimeoutError: self._manager.task_reset_watchdog() + + def clear(self): + if self._manager.task_watchdog_enabled: + self._manager.task_reset_watchdog() + super().clear() diff --git a/src/pipecat/utils/asyncio/watchdog_priority_queue.py b/src/pipecat/utils/asyncio/watchdog_priority_queue.py index 46c6adf3d..2bd630fba 100644 --- a/src/pipecat/utils/asyncio/watchdog_priority_queue.py +++ b/src/pipecat/utils/asyncio/watchdog_priority_queue.py @@ -12,10 +12,19 @@ timeouts during legitimate queue operations. """ import asyncio +from dataclasses import dataclass + +from loguru import logger from pipecat.utils.asyncio.task_manager import BaseTaskManager +@dataclass +class WatchdogPriorityCancelSentinel: + def __lt__(self, other): + return True + + class WatchdogPriorityQueue(asyncio.PriorityQueue): """Watchdog-enabled asyncio PriorityQueue. @@ -49,9 +58,17 @@ class WatchdogPriorityQueue(asyncio.PriorityQueue): The next item from the priority queue. """ if self._manager.task_watchdog_enabled: - return await self._watchdog_get() + get_result = await self._watchdog_get() else: - return await super().get() + get_result = await super().get() + + if isinstance(get_result, WatchdogPriorityCancelSentinel): + logger.debug( + "Received WatchdogPriorityCancelSentinel, throwing CancelledError to force cancelling" + ) + raise asyncio.CancelledError("Cancelling watchdog queue get() call.") + else: + return get_result def task_done(self): """Mark a task as done and reset watchdog if enabled. @@ -62,6 +79,20 @@ class WatchdogPriorityQueue(asyncio.PriorityQueue): self._manager.task_reset_watchdog() super().task_done() + def cancel(self): + """Ensures reliable task cancellation by preventing a common race condition. + + The race condition occurs in Python 3.10+ when: + 1. A value is put in the queue just before task cancellation + 2. queue.get() completes before the cancellation signal is delivered + 3. The task misses the CancelledError and continues running indefinitely + + This method prevents the issue by injecting a special sentinel value that + forces the task to raise CancelledError when consumed, ensuring proper + task termination. + """ + super().put_nowait(WatchdogPriorityCancelSentinel()) + async def _watchdog_get(self): """Get item from queue while periodically resetting watchdog timer.""" while True: diff --git a/src/pipecat/utils/asyncio/watchdog_queue.py b/src/pipecat/utils/asyncio/watchdog_queue.py index 4a92497f4..e3e379f3d 100644 --- a/src/pipecat/utils/asyncio/watchdog_queue.py +++ b/src/pipecat/utils/asyncio/watchdog_queue.py @@ -12,10 +12,18 @@ timeouts during legitimate queue operations. """ import asyncio +from dataclasses import dataclass + +from loguru import logger from pipecat.utils.asyncio.task_manager import BaseTaskManager +@dataclass +class WatchdogQueueCancelSentinel: + pass + + class WatchdogQueue(asyncio.Queue): """Watchdog-enabled asyncio Queue. @@ -49,9 +57,17 @@ class WatchdogQueue(asyncio.Queue): The next item from the queue. """ if self._manager.task_watchdog_enabled: - return await self._watchdog_get() + get_result = await self._watchdog_get() else: - return await super().get() + get_result = await super().get() + + if isinstance(get_result, WatchdogQueueCancelSentinel): + logger.debug( + "Received WatchdogQueueCancelFrame, throwing CancelledError to force cancelling" + ) + raise asyncio.CancelledError("Cancelling watchdog queue get() call.") + else: + return get_result def task_done(self): """Mark a task as done and reset watchdog if enabled. @@ -62,6 +78,20 @@ class WatchdogQueue(asyncio.Queue): self._manager.task_reset_watchdog() super().task_done() + def cancel(self): + """Ensures reliable task cancellation by preventing a common race condition. + + The race condition occurs in Python 3.10+ when: + 1. A value is put in the queue just before task cancellation + 2. queue.get() completes before the cancellation signal is delivered + 3. The task misses the CancelledError and continues running indefinitely + + This method prevents the issue by injecting a special sentinel value that + forces the task to raise CancelledError when consumed, ensuring proper + task termination. + """ + super().put_nowait(WatchdogQueueCancelSentinel()) + async def _watchdog_get(self): """Get item from queue while periodically resetting watchdog timer.""" while True: From 721f662bbefb0b3b54ac02fb12866f2971658526 Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Tue, 1 Jul 2025 17:09:05 -0300 Subject: [PATCH 3/4] Making cancel sentinel classes private --- src/pipecat/utils/asyncio/watchdog_priority_queue.py | 6 +++--- src/pipecat/utils/asyncio/watchdog_queue.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/pipecat/utils/asyncio/watchdog_priority_queue.py b/src/pipecat/utils/asyncio/watchdog_priority_queue.py index 2bd630fba..98d7f9172 100644 --- a/src/pipecat/utils/asyncio/watchdog_priority_queue.py +++ b/src/pipecat/utils/asyncio/watchdog_priority_queue.py @@ -20,7 +20,7 @@ from pipecat.utils.asyncio.task_manager import BaseTaskManager @dataclass -class WatchdogPriorityCancelSentinel: +class _WatchdogPriorityCancelSentinel: def __lt__(self, other): return True @@ -62,7 +62,7 @@ class WatchdogPriorityQueue(asyncio.PriorityQueue): else: get_result = await super().get() - if isinstance(get_result, WatchdogPriorityCancelSentinel): + if isinstance(get_result, _WatchdogPriorityCancelSentinel): logger.debug( "Received WatchdogPriorityCancelSentinel, throwing CancelledError to force cancelling" ) @@ -91,7 +91,7 @@ class WatchdogPriorityQueue(asyncio.PriorityQueue): forces the task to raise CancelledError when consumed, ensuring proper task termination. """ - super().put_nowait(WatchdogPriorityCancelSentinel()) + super().put_nowait(_WatchdogPriorityCancelSentinel()) async def _watchdog_get(self): """Get item from queue while periodically resetting watchdog timer.""" diff --git a/src/pipecat/utils/asyncio/watchdog_queue.py b/src/pipecat/utils/asyncio/watchdog_queue.py index e3e379f3d..53b04c534 100644 --- a/src/pipecat/utils/asyncio/watchdog_queue.py +++ b/src/pipecat/utils/asyncio/watchdog_queue.py @@ -20,7 +20,7 @@ from pipecat.utils.asyncio.task_manager import BaseTaskManager @dataclass -class WatchdogQueueCancelSentinel: +class _WatchdogQueueCancelSentinel: pass @@ -61,7 +61,7 @@ class WatchdogQueue(asyncio.Queue): else: get_result = await super().get() - if isinstance(get_result, WatchdogQueueCancelSentinel): + if isinstance(get_result, _WatchdogQueueCancelSentinel): logger.debug( "Received WatchdogQueueCancelFrame, throwing CancelledError to force cancelling" ) @@ -90,7 +90,7 @@ class WatchdogQueue(asyncio.Queue): forces the task to raise CancelledError when consumed, ensuring proper task termination. """ - super().put_nowait(WatchdogQueueCancelSentinel()) + super().put_nowait(_WatchdogQueueCancelSentinel()) async def _watchdog_get(self): """Get item from queue while periodically resetting watchdog timer.""" From b87c57c951d42205cc572e3b06d96007b11d96dd Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Tue, 1 Jul 2025 17:12:18 -0300 Subject: [PATCH 4/4] Adding missing docstring to the watchdog event --- src/pipecat/utils/asyncio/watchdog_event.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/pipecat/utils/asyncio/watchdog_event.py b/src/pipecat/utils/asyncio/watchdog_event.py index 73a49ad3e..c48356c50 100644 --- a/src/pipecat/utils/asyncio/watchdog_event.py +++ b/src/pipecat/utils/asyncio/watchdog_event.py @@ -62,6 +62,7 @@ class WatchdogEvent(asyncio.Event): self._manager.task_reset_watchdog() def clear(self): + """Clear the event while resetting watchdog timer.""" if self._manager.task_watchdog_enabled: self._manager.task_reset_watchdog() super().clear()