Merge pull request #2086 from pipecat-ai/aleix/watchdog-coroutine-helper

add watchdog coroutine helper
This commit is contained in:
Aleix Conchillo Flaqué
2025-06-27 11:10:10 -07:00
committed by GitHub
4 changed files with 78 additions and 6 deletions

View File

@@ -5,6 +5,19 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Added
- Added `watchdog_coroutine()`. This is a watchdog helper for couroutines. So,
if you have a coroutine that is waiting for a result and that takes a long
time, you will need to wrap it with `watchdog_coroutine()` so the watchdog
timers are reset regularly.
### Fixed
- Fixed a `AWSNovaSonicLLMService` issue introduced in 0.0.72.
## [0.0.73] - 2025-06-26
### Fixed

View File

@@ -62,6 +62,7 @@ from pipecat.services.aws_nova_sonic.context import (
)
from pipecat.services.aws_nova_sonic.frames import AWSNovaSonicFunctionCallResultFrame
from pipecat.services.llm_service import LLMService
from pipecat.utils.asyncio.watchdog_coroutine import watchdog_coroutine
from pipecat.utils.time import time_now_iso8601
try:
@@ -776,9 +777,7 @@ class AWSNovaSonicLLMService(LLMService):
try:
while self._stream and not self._disconnecting:
output = await self._stream.await_output()
result = await asyncio.wait_for(output[1].receive(), timeout=1.0)
self.reset_watchdog()
result = await watchdog_coroutine(output[1].receive(), manager=self.task_manager)
if result.value and result.value.bytes_:
response_data = result.value.bytes_.decode("utf-8")
@@ -807,8 +806,6 @@ class AWSNovaSonicLLMService(LLMService):
elif "completionEnd" in event_json:
# Handle the LLM completion ending
await self._handle_completion_end_event(event_json)
except asyncio.TimeoutError:
self.reset_watchdog()
except Exception as e:
logger.error(f"{self} error processing responses: {e}")
if self._wants_connection:

View File

@@ -55,7 +55,8 @@ class WatchdogAsyncIterator:
self._manager.task_reset_watchdog()
# The task has finish, so we will create a new one for th next item.
# The task has finished, so we will create a new one for the
# next item.
self._current_anext_task = None
return item

View File

@@ -0,0 +1,61 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
from typing import Optional
from pipecat.utils.asyncio.task_manager import BaseTaskManager
class WatchdogCoroutine:
"""An asynchronous iterator that monitors activity and resets the current
task watchdog timer. This is necessary to avoid task watchdog timers to
expire while we are waiting to get an item from the iterator.
"""
def __init__(
self,
coroutine,
*,
manager: BaseTaskManager,
timeout: float = 2.0,
):
self._coroutine = coroutine
self._manager = manager
self._timeout = timeout
self._current_coro_task: Optional[asyncio.Task] = None
async def __call__(self):
if self._manager.task_watchdog_enabled:
return await self._watchdog_call()
else:
return await self._coroutine
async def _watchdog_call(self):
while True:
try:
if not self._current_coro_task:
self._current_coro_task = asyncio.create_task(self._coroutine)
result = await asyncio.wait_for(
asyncio.shield(self._current_coro_task),
timeout=self._timeout,
)
self._manager.task_reset_watchdog()
# The task has finished.
self._current_coro_task = None
return result
except asyncio.TimeoutError:
self._manager.task_reset_watchdog()
async def watchdog_coroutine(coroutine, *, manager: BaseTaskManager, timeout: float = 2.0):
watchdog_coro = WatchdogCoroutine(coroutine, manager=manager, timeout=timeout)
return await watchdog_coro()