diff --git a/CHANGELOG.md b/CHANGELOG.md index c6a93dff9..b80f47702 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- `WatchdogPriorityQueue` now requires the items to be inserted to always be + tuples and the size of the tuple needs to be specified in the constructor when + creating the queue with the `tuple_size` argument. + - Updated Moondream to revision `2025-01-09`. - Updated `PlayHTHttpTTSService` to no longer use the `pyht` client to remove @@ -60,6 +64,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed a `WatchdogPriorityQueue` issue that could cause an exception when + compating watchdog cancel sentinel items with other items in the queue. + - Fixed an issue that would cause system frames to not be processed with higher priority than other frames. This could cause slower interruption times. diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 98937216b..fbde70840 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -97,13 +97,11 @@ class FrameProcessorQueue(WatchdogPriorityQueue): manager (BaseTaskManager): The task manager used by the internal watchdog queues. """ - super().__init__(manager) + super().__init__(manager, tuple_size=3) self.__high_counter = 0 self.__low_counter = 0 - async def put( - self, item: WatchdogPriorityCancelSentinel | Tuple[Frame, FrameDirection, FrameCallback] - ): + async def put(self, item: Tuple[Frame, FrameDirection, FrameCallback]): """Put an item into the priority queue. System frames (`SystemFrame`) have higher priority than any other @@ -114,10 +112,6 @@ class FrameProcessorQueue(WatchdogPriorityQueue): item (Any): The item to enqueue. """ - if isinstance(item, WatchdogPriorityCancelSentinel): - await super().put(item) - return - frame, _, _ = item if isinstance(frame, SystemFrame): self.__high_counter += 1 diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index 9c2da1d22..afb5abb0b 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -12,7 +12,6 @@ output processing, including frame buffering, mixing, timing, and media streamin import asyncio import itertools -import sys import time from concurrent.futures import ThreadPoolExecutor from typing import Any, AsyncGenerator, Dict, List, Mapping, Optional @@ -429,7 +428,7 @@ class BaseOutputTransport(FrameProcessor): frame: The end frame signaling sender shutdown. """ # Let the sink tasks process the queue until they reach this EndFrame. - await self._clock_queue.put((sys.maxsize, frame.id, frame)) + await self._clock_queue.put((float("inf"), frame.id, frame)) await self._audio_queue.put(frame) # At this point we have enqueued an EndFrame and we need to wait for @@ -828,7 +827,9 @@ class BaseOutputTransport(FrameProcessor): def _create_clock_task(self): """Create the clock/timing processing task.""" if not self._clock_task: - self._clock_queue = WatchdogPriorityQueue(self._transport.task_manager) + self._clock_queue = WatchdogPriorityQueue( + self._transport.task_manager, tuple_size=3 + ) self._clock_task = self._transport.create_task(self._clock_task_handler()) async def _cancel_clock_task(self): diff --git a/src/pipecat/utils/asyncio/watchdog_priority_queue.py b/src/pipecat/utils/asyncio/watchdog_priority_queue.py index f31b3b90a..c9b8fe171 100644 --- a/src/pipecat/utils/asyncio/watchdog_priority_queue.py +++ b/src/pipecat/utils/asyncio/watchdog_priority_queue.py @@ -25,28 +25,37 @@ class WatchdogPriorityCancelSentinel: An instance of this class is typically inserted into a `WatchdogPriorityQueue` to act as a high-priority marker asyncio task - cancellation. The `__lt__` method always returns `True`, ensuring that the - sentinel is considered "less than" any other item in the queue, and - therefore processed before anything else. + cancellation. """ - def __lt__(self, other): - return True + pass class WatchdogPriorityQueue(asyncio.PriorityQueue): - """Watchdog-enabled asyncio PriorityQueue. + """Class for watchdog-enabled asyncio PriorityQueue. An asynchronous priority queue that resets the current task watchdog timer. This is necessary to avoid task watchdog timers to expire while we are waiting to get an item from the queue. + + This queue expects items to be tuples, with the actual payload stored + in the last element. All preceding elements are treated as numeric + priority fields. For example: + + (0, 1, "foo") + + The tuple length must be specified at creation time so the queue can + correctly construct special items, such as the watchdog cancel sentinel, + with the proper tuple structure. + """ def __init__( self, manager: BaseTaskManager, *, + tuple_size: int, maxsize: int = 0, timeout: float = 2.0, ) -> None: @@ -54,12 +63,14 @@ class WatchdogPriorityQueue(asyncio.PriorityQueue): Args: manager: The task manager for watchdog timer control. + tuple_size: The number of values in each inserted tuple. maxsize: Maximum queue size. 0 means unlimited. timeout: Timeout in seconds between watchdog resets while waiting. """ super().__init__(maxsize) self._manager = manager self._timeout = timeout + self._tuple_size = tuple_size async def get(self): """Get an item from the queue with watchdog monitoring. @@ -72,7 +83,10 @@ class WatchdogPriorityQueue(asyncio.PriorityQueue): else: get_result = await super().get() - if isinstance(get_result, WatchdogPriorityCancelSentinel): + # Value is always at the end of the tuple. + item = get_result[-1] + + if isinstance(item, WatchdogPriorityCancelSentinel): logger.trace( "Received WatchdogPriorityCancelSentinel, throwing CancelledError to force cancelling" ) @@ -101,7 +115,10 @@ class WatchdogPriorityQueue(asyncio.PriorityQueue): forces the task to raise CancelledError when consumed, ensuring proper task termination. """ - super().put_nowait(WatchdogPriorityCancelSentinel()) + item = [float("-inf")] * self._tuple_size + # Values go always at the end. + item[-1] = WatchdogPriorityCancelSentinel() + super().put_nowait(tuple(item)) async def _watchdog_get(self): """Get item from queue while periodically resetting watchdog timer.""" diff --git a/tests/test_watchdog_queue.py b/tests/test_watchdog_queue.py index b25ff2aca..507f9cf02 100644 --- a/tests/test_watchdog_queue.py +++ b/tests/test_watchdog_queue.py @@ -41,7 +41,7 @@ class TestWatchdogQueue(unittest.IsolatedAsyncioTestCase): class TestWatchdogPriorityQueue(unittest.IsolatedAsyncioTestCase): async def test_simple_item(self): - queue = WatchdogPriorityQueue(TaskManager()) + queue = WatchdogPriorityQueue(TaskManager(), tuple_size=2) await queue.put((3, 1)) await queue.put((2, 1)) await queue.put((1, 1)) @@ -53,7 +53,7 @@ class TestWatchdogPriorityQueue(unittest.IsolatedAsyncioTestCase): queue.task_done() async def test_watchdog_sentinel(self): - queue = WatchdogPriorityQueue(TaskManager()) + queue = WatchdogPriorityQueue(TaskManager(), tuple_size=2) await queue.put((0, 1)) # The get should throw an exception because the watchdog sentinel has # higher priority.