From 3064326834737e4384c2f14538bab9c5754db956 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 27 Jun 2025 10:23:16 -0700 Subject: [PATCH 1/2] utils.asyncio: added watchdog_coroutine() --- CHANGELOG.md | 9 +++ .../utils/asyncio/watchdog_async_iterator.py | 3 +- .../utils/asyncio/watchdog_coroutine.py | 61 +++++++++++++++++++ 3 files changed, 72 insertions(+), 1 deletion(-) create mode 100644 src/pipecat/utils/asyncio/watchdog_coroutine.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 019cafd58..2d1d5a72e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,15 @@ All notable changes to **Pipecat** will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added + +- Added `watchdog_coroutine()`. This is a watchdog helper for couroutines. So, + if you have a coroutine that is waiting for a result and that takes a long + time, you will need to wrap it with `watchdog_coroutine()` so the watchdog + timers are reset regularly. + ## [0.0.73] - 2025-06-26 ### Fixed diff --git a/src/pipecat/utils/asyncio/watchdog_async_iterator.py b/src/pipecat/utils/asyncio/watchdog_async_iterator.py index e71b37ae3..d9d3e2f79 100644 --- a/src/pipecat/utils/asyncio/watchdog_async_iterator.py +++ b/src/pipecat/utils/asyncio/watchdog_async_iterator.py @@ -55,7 +55,8 @@ class WatchdogAsyncIterator: self._manager.task_reset_watchdog() - # The task has finish, so we will create a new one for th next item. + # The task has finished, so we will create a new one for the + # next item. self._current_anext_task = None return item diff --git a/src/pipecat/utils/asyncio/watchdog_coroutine.py b/src/pipecat/utils/asyncio/watchdog_coroutine.py new file mode 100644 index 000000000..84855c3e6 --- /dev/null +++ b/src/pipecat/utils/asyncio/watchdog_coroutine.py @@ -0,0 +1,61 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +from typing import Optional + +from pipecat.utils.asyncio.task_manager import BaseTaskManager + + +class WatchdogCoroutine: + """An asynchronous iterator that monitors activity and resets the current + task watchdog timer. This is necessary to avoid task watchdog timers to + expire while we are waiting to get an item from the iterator. + + """ + + def __init__( + self, + coroutine, + *, + manager: BaseTaskManager, + timeout: float = 2.0, + ): + self._coroutine = coroutine + self._manager = manager + self._timeout = timeout + self._current_coro_task: Optional[asyncio.Task] = None + + async def __call__(self): + if self._manager.task_watchdog_enabled: + return await self._watchdog_call() + else: + return await self._coroutine + + async def _watchdog_call(self): + while True: + try: + if not self._current_coro_task: + self._current_coro_task = asyncio.create_task(self._coroutine) + + result = await asyncio.wait_for( + asyncio.shield(self._current_coro_task), + timeout=self._timeout, + ) + + self._manager.task_reset_watchdog() + + # The task has finished. + self._current_coro_task = None + + return result + except asyncio.TimeoutError: + self._manager.task_reset_watchdog() + + +async def watchdog_coroutine(coroutine, *, manager: BaseTaskManager, timeout: float = 2.0): + watchdog_coro = WatchdogCoroutine(coroutine, manager=manager, timeout=timeout) + return await watchdog_coro() From b0c773189f59aa4c130301757751bd73fe0e465c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 27 Jun 2025 10:24:06 -0700 Subject: [PATCH 2/2] AWSNovaSonicLLMService: fix error with watchdog_coroutine() --- CHANGELOG.md | 4 ++++ src/pipecat/services/aws_nova_sonic/aws.py | 7 ++----- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d1d5a72e..4d6236f7d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 time, you will need to wrap it with `watchdog_coroutine()` so the watchdog timers are reset regularly. +### Fixed + +- Fixed a `AWSNovaSonicLLMService` issue introduced in 0.0.72. + ## [0.0.73] - 2025-06-26 ### Fixed diff --git a/src/pipecat/services/aws_nova_sonic/aws.py b/src/pipecat/services/aws_nova_sonic/aws.py index c6ee1d6c7..c28d218c8 100644 --- a/src/pipecat/services/aws_nova_sonic/aws.py +++ b/src/pipecat/services/aws_nova_sonic/aws.py @@ -62,6 +62,7 @@ from pipecat.services.aws_nova_sonic.context import ( ) from pipecat.services.aws_nova_sonic.frames import AWSNovaSonicFunctionCallResultFrame from pipecat.services.llm_service import LLMService +from pipecat.utils.asyncio.watchdog_coroutine import watchdog_coroutine from pipecat.utils.time import time_now_iso8601 try: @@ -776,9 +777,7 @@ class AWSNovaSonicLLMService(LLMService): try: while self._stream and not self._disconnecting: output = await self._stream.await_output() - result = await asyncio.wait_for(output[1].receive(), timeout=1.0) - - self.reset_watchdog() + result = await watchdog_coroutine(output[1].receive(), manager=self.task_manager) if result.value and result.value.bytes_: response_data = result.value.bytes_.decode("utf-8") @@ -807,8 +806,6 @@ class AWSNovaSonicLLMService(LLMService): elif "completionEnd" in event_json: # Handle the LLM completion ending await self._handle_completion_end_event(event_json) - except asyncio.TimeoutError: - self.reset_watchdog() except Exception as e: logger.error(f"{self} error processing responses: {e}") if self._wants_connection: