WatchdogPriorityQueue: fix watchdog sentinel insertion

We now force each inserted item in the priority queue to be a tuple and the
actual value to be last in the tuple. All the previous values in the tuple also
need to be numeric.
This commit is contained in:
Aleix Conchillo Flaqué
2025-08-12 15:28:03 -07:00
parent 54a4d8a9f8
commit 0508ddddfb
5 changed files with 40 additions and 21 deletions

View File

@@ -30,6 +30,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- `WatchdogPriorityQueue` now requires the items to be inserted to always be
tuples and the size of the tuple needs to be specified in the constructor when
creating the queue with the `tuple_size` argument.
- Updated Moondream to revision `2025-01-09`.
- Updated `PlayHTHttpTTSService` to no longer use the `pyht` client to remove
@@ -60,6 +64,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed a `WatchdogPriorityQueue` issue that could cause an exception when
compating watchdog cancel sentinel items with other items in the queue.
- Fixed an issue that would cause system frames to not be processed with higher
priority than other frames. This could cause slower interruption times.

View File

@@ -97,13 +97,11 @@ class FrameProcessorQueue(WatchdogPriorityQueue):
manager (BaseTaskManager): The task manager used by the internal watchdog queues.
"""
super().__init__(manager)
super().__init__(manager, tuple_size=3)
self.__high_counter = 0
self.__low_counter = 0
async def put(
self, item: WatchdogPriorityCancelSentinel | Tuple[Frame, FrameDirection, FrameCallback]
):
async def put(self, item: Tuple[Frame, FrameDirection, FrameCallback]):
"""Put an item into the priority queue.
System frames (`SystemFrame`) have higher priority than any other
@@ -114,10 +112,6 @@ class FrameProcessorQueue(WatchdogPriorityQueue):
item (Any): The item to enqueue.
"""
if isinstance(item, WatchdogPriorityCancelSentinel):
await super().put(item)
return
frame, _, _ = item
if isinstance(frame, SystemFrame):
self.__high_counter += 1

View File

@@ -12,7 +12,6 @@ output processing, including frame buffering, mixing, timing, and media streamin
import asyncio
import itertools
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Any, AsyncGenerator, Dict, List, Mapping, Optional
@@ -429,7 +428,7 @@ class BaseOutputTransport(FrameProcessor):
frame: The end frame signaling sender shutdown.
"""
# Let the sink tasks process the queue until they reach this EndFrame.
await self._clock_queue.put((sys.maxsize, frame.id, frame))
await self._clock_queue.put((float("inf"), frame.id, frame))
await self._audio_queue.put(frame)
# At this point we have enqueued an EndFrame and we need to wait for
@@ -828,7 +827,9 @@ class BaseOutputTransport(FrameProcessor):
def _create_clock_task(self):
"""Create the clock/timing processing task."""
if not self._clock_task:
self._clock_queue = WatchdogPriorityQueue(self._transport.task_manager)
self._clock_queue = WatchdogPriorityQueue(
self._transport.task_manager, tuple_size=3
)
self._clock_task = self._transport.create_task(self._clock_task_handler())
async def _cancel_clock_task(self):

View File

@@ -25,28 +25,37 @@ class WatchdogPriorityCancelSentinel:
An instance of this class is typically inserted into a
`WatchdogPriorityQueue` to act as a high-priority marker asyncio task
cancellation. The `__lt__` method always returns `True`, ensuring that the
sentinel is considered "less than" any other item in the queue, and
therefore processed before anything else.
cancellation.
"""
def __lt__(self, other):
return True
pass
class WatchdogPriorityQueue(asyncio.PriorityQueue):
"""Watchdog-enabled asyncio PriorityQueue.
"""Class for watchdog-enabled asyncio PriorityQueue.
An asynchronous priority queue that resets the current task watchdog
timer. This is necessary to avoid task watchdog timers to expire while we
are waiting to get an item from the queue.
This queue expects items to be tuples, with the actual payload stored
in the last element. All preceding elements are treated as numeric
priority fields. For example:
(0, 1, "foo")
The tuple length must be specified at creation time so the queue can
correctly construct special items, such as the watchdog cancel sentinel,
with the proper tuple structure.
"""
def __init__(
self,
manager: BaseTaskManager,
*,
tuple_size: int,
maxsize: int = 0,
timeout: float = 2.0,
) -> None:
@@ -54,12 +63,14 @@ class WatchdogPriorityQueue(asyncio.PriorityQueue):
Args:
manager: The task manager for watchdog timer control.
tuple_size: The number of values in each inserted tuple.
maxsize: Maximum queue size. 0 means unlimited.
timeout: Timeout in seconds between watchdog resets while waiting.
"""
super().__init__(maxsize)
self._manager = manager
self._timeout = timeout
self._tuple_size = tuple_size
async def get(self):
"""Get an item from the queue with watchdog monitoring.
@@ -72,7 +83,10 @@ class WatchdogPriorityQueue(asyncio.PriorityQueue):
else:
get_result = await super().get()
if isinstance(get_result, WatchdogPriorityCancelSentinel):
# Value is always at the end of the tuple.
item = get_result[-1]
if isinstance(item, WatchdogPriorityCancelSentinel):
logger.trace(
"Received WatchdogPriorityCancelSentinel, throwing CancelledError to force cancelling"
)
@@ -101,7 +115,10 @@ class WatchdogPriorityQueue(asyncio.PriorityQueue):
forces the task to raise CancelledError when consumed, ensuring proper
task termination.
"""
super().put_nowait(WatchdogPriorityCancelSentinel())
item = [float("-inf")] * self._tuple_size
# Values go always at the end.
item[-1] = WatchdogPriorityCancelSentinel()
super().put_nowait(tuple(item))
async def _watchdog_get(self):
"""Get item from queue while periodically resetting watchdog timer."""

View File

@@ -41,7 +41,7 @@ class TestWatchdogQueue(unittest.IsolatedAsyncioTestCase):
class TestWatchdogPriorityQueue(unittest.IsolatedAsyncioTestCase):
async def test_simple_item(self):
queue = WatchdogPriorityQueue(TaskManager())
queue = WatchdogPriorityQueue(TaskManager(), tuple_size=2)
await queue.put((3, 1))
await queue.put((2, 1))
await queue.put((1, 1))
@@ -53,7 +53,7 @@ class TestWatchdogPriorityQueue(unittest.IsolatedAsyncioTestCase):
queue.task_done()
async def test_watchdog_sentinel(self):
queue = WatchdogPriorityQueue(TaskManager())
queue = WatchdogPriorityQueue(TaskManager(), tuple_size=2)
await queue.put((0, 1))
# The get should throw an exception because the watchdog sentinel has
# higher priority.