diff --git a/CHANGELOG.md b/CHANGELOG.md index 019cafd58..4d6236f7d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,19 @@ All notable changes to **Pipecat** will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added + +- Added `watchdog_coroutine()`. This is a watchdog helper for couroutines. So, + if you have a coroutine that is waiting for a result and that takes a long + time, you will need to wrap it with `watchdog_coroutine()` so the watchdog + timers are reset regularly. + +### Fixed + +- Fixed a `AWSNovaSonicLLMService` issue introduced in 0.0.72. + ## [0.0.73] - 2025-06-26 ### Fixed diff --git a/src/pipecat/services/aws_nova_sonic/aws.py b/src/pipecat/services/aws_nova_sonic/aws.py index c6ee1d6c7..c28d218c8 100644 --- a/src/pipecat/services/aws_nova_sonic/aws.py +++ b/src/pipecat/services/aws_nova_sonic/aws.py @@ -62,6 +62,7 @@ from pipecat.services.aws_nova_sonic.context import ( ) from pipecat.services.aws_nova_sonic.frames import AWSNovaSonicFunctionCallResultFrame from pipecat.services.llm_service import LLMService +from pipecat.utils.asyncio.watchdog_coroutine import watchdog_coroutine from pipecat.utils.time import time_now_iso8601 try: @@ -776,9 +777,7 @@ class AWSNovaSonicLLMService(LLMService): try: while self._stream and not self._disconnecting: output = await self._stream.await_output() - result = await asyncio.wait_for(output[1].receive(), timeout=1.0) - - self.reset_watchdog() + result = await watchdog_coroutine(output[1].receive(), manager=self.task_manager) if result.value and result.value.bytes_: response_data = result.value.bytes_.decode("utf-8") @@ -807,8 +806,6 @@ class AWSNovaSonicLLMService(LLMService): elif "completionEnd" in event_json: # Handle the LLM completion ending await self._handle_completion_end_event(event_json) - except asyncio.TimeoutError: - self.reset_watchdog() except Exception as e: logger.error(f"{self} error processing responses: {e}") if self._wants_connection: diff --git a/src/pipecat/utils/asyncio/watchdog_async_iterator.py b/src/pipecat/utils/asyncio/watchdog_async_iterator.py index e71b37ae3..d9d3e2f79 100644 --- a/src/pipecat/utils/asyncio/watchdog_async_iterator.py +++ b/src/pipecat/utils/asyncio/watchdog_async_iterator.py @@ -55,7 +55,8 @@ class WatchdogAsyncIterator: self._manager.task_reset_watchdog() - # The task has finish, so we will create a new one for th next item. + # The task has finished, so we will create a new one for the + # next item. self._current_anext_task = None return item diff --git a/src/pipecat/utils/asyncio/watchdog_coroutine.py b/src/pipecat/utils/asyncio/watchdog_coroutine.py new file mode 100644 index 000000000..84855c3e6 --- /dev/null +++ b/src/pipecat/utils/asyncio/watchdog_coroutine.py @@ -0,0 +1,61 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +from typing import Optional + +from pipecat.utils.asyncio.task_manager import BaseTaskManager + + +class WatchdogCoroutine: + """An asynchronous iterator that monitors activity and resets the current + task watchdog timer. This is necessary to avoid task watchdog timers to + expire while we are waiting to get an item from the iterator. + + """ + + def __init__( + self, + coroutine, + *, + manager: BaseTaskManager, + timeout: float = 2.0, + ): + self._coroutine = coroutine + self._manager = manager + self._timeout = timeout + self._current_coro_task: Optional[asyncio.Task] = None + + async def __call__(self): + if self._manager.task_watchdog_enabled: + return await self._watchdog_call() + else: + return await self._coroutine + + async def _watchdog_call(self): + while True: + try: + if not self._current_coro_task: + self._current_coro_task = asyncio.create_task(self._coroutine) + + result = await asyncio.wait_for( + asyncio.shield(self._current_coro_task), + timeout=self._timeout, + ) + + self._manager.task_reset_watchdog() + + # The task has finished. + self._current_coro_task = None + + return result + except asyncio.TimeoutError: + self._manager.task_reset_watchdog() + + +async def watchdog_coroutine(coroutine, *, manager: BaseTaskManager, timeout: float = 2.0): + watchdog_coro = WatchdogCoroutine(coroutine, manager=manager, timeout=timeout) + return await watchdog_coro()