From fccd48bfff23510e1d87f110ad7dfc844e0d0513 Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Tue, 1 Jul 2025 17:05:18 -0300 Subject: [PATCH] Fixing pipeline freeze when using Python 3.10 --- CHANGELOG.md | 3 ++ src/pipecat/pipeline/parallel_pipeline.py | 2 ++ src/pipecat/processors/consumer_processor.py | 1 + src/pipecat/processors/frame_processor.py | 2 ++ src/pipecat/processors/frameworks/rtvi.py | 2 ++ .../processors/idle_frame_processor.py | 3 +- src/pipecat/processors/user_idle_processor.py | 7 +++- src/pipecat/services/tavus/video.py | 1 + src/pipecat/services/tts_service.py | 1 + src/pipecat/transports/base_output.py | 1 + src/pipecat/utils/asyncio/task_manager.py | 22 ++++++++++++ src/pipecat/utils/asyncio/watchdog_event.py | 5 +++ .../utils/asyncio/watchdog_priority_queue.py | 35 +++++++++++++++++-- src/pipecat/utils/asyncio/watchdog_queue.py | 34 ++++++++++++++++-- 14 files changed, 113 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 834faaa94..9a12496d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed a race condition that occurs in Python 3.10+ where the task could miss + the `CancelledError` and continue running indefinitely, freezing the pipeline. + - Fixed a `AWSNovaSonicLLMService` issue introduced in 0.0.72. ## [0.0.73] - 2025-06-26 diff --git a/src/pipecat/pipeline/parallel_pipeline.py b/src/pipecat/pipeline/parallel_pipeline.py index 57b3a82c3..250a3e989 100644 --- a/src/pipecat/pipeline/parallel_pipeline.py +++ b/src/pipecat/pipeline/parallel_pipeline.py @@ -258,9 +258,11 @@ class ParallelPipeline(BasePipeline): async def _cancel(self): """Cancel all parallel pipeline processing tasks.""" if self._up_task: + self._up_queue.cancel() await self.cancel_task(self._up_task) self._up_task = None if self._down_task: + self._down_queue.cancel() await self.cancel_task(self._down_task) self._down_task = None diff --git a/src/pipecat/processors/consumer_processor.py b/src/pipecat/processors/consumer_processor.py index 277cef2cd..7812bbbd3 100644 --- a/src/pipecat/processors/consumer_processor.py +++ b/src/pipecat/processors/consumer_processor.py @@ -77,6 +77,7 @@ class ConsumerProcessor(FrameProcessor): async def _cancel(self, _: CancelFrame): """Cancel the consumer task.""" if self._consumer_task: + self._queue.cancel() await self.cancel_task(self._consumer_task) async def _consumer_task_handler(self): diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 4105f4179..0d5a6db62 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -651,6 +651,7 @@ class FrameProcessor(BaseObject): async def __cancel_input_task(self): """Cancel the input processing task.""" if self.__input_frame_task: + self.__input_queue.cancel() await self.cancel_task(self.__input_frame_task) self.__input_frame_task = None @@ -686,6 +687,7 @@ class FrameProcessor(BaseObject): async def __cancel_push_task(self): """Cancel the frame pushing task.""" if self.__push_frame_task: + self.__push_queue.cancel() await self.cancel_task(self.__push_frame_task) self.__push_frame_task = None diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index 22d5370f2..7c03561a9 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -1086,10 +1086,12 @@ class RTVIProcessor(FrameProcessor): async def _cancel_tasks(self): """Cancel all running tasks.""" if self._action_task: + self._action_queue.cancel() await self.cancel_task(self._action_task) self._action_task = None if self._message_task: + self._message_queue.cancel() await self.cancel_task(self._message_task) self._message_task = None diff --git a/src/pipecat/processors/idle_frame_processor.py b/src/pipecat/processors/idle_frame_processor.py index 672027491..b8839124a 100644 --- a/src/pipecat/processors/idle_frame_processor.py +++ b/src/pipecat/processors/idle_frame_processor.py @@ -11,6 +11,7 @@ from typing import Awaitable, Callable, List, Optional from pipecat.frames.frames import Frame, StartFrame from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.utils.asyncio.watchdog_event import WatchdogEvent class IdleFrameProcessor(FrameProcessor): @@ -77,7 +78,7 @@ class IdleFrameProcessor(FrameProcessor): def _create_idle_task(self): """Create and start the idle monitoring task.""" if not self._idle_task: - self._idle_event = asyncio.Event() + self._idle_event = WatchdogEvent(self.task_manager) self._idle_task = self.create_task(self._idle_task_handler()) async def _idle_task_handler(self): diff --git a/src/pipecat/processors/user_idle_processor.py b/src/pipecat/processors/user_idle_processor.py index 1a442fd8a..e98320b8b 100644 --- a/src/pipecat/processors/user_idle_processor.py +++ b/src/pipecat/processors/user_idle_processor.py @@ -15,10 +15,12 @@ from pipecat.frames.frames import ( CancelFrame, EndFrame, Frame, + StartFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.utils.asyncio.watchdog_event import WatchdogEvent class UserIdleProcessor(FrameProcessor): @@ -74,7 +76,7 @@ class UserIdleProcessor(FrameProcessor): self._interrupted = False self._conversation_started = False self._idle_task = None - self._idle_event = asyncio.Event() + self._idle_event = None def _wrap_callback( self, @@ -134,6 +136,9 @@ class UserIdleProcessor(FrameProcessor): """ await super().process_frame(frame, direction) + if isinstance(frame, StartFrame): + self._idle_event = WatchdogEvent(self.task_manager) + # Check for end frames before processing if isinstance(frame, (EndFrame, CancelFrame)): # Stop the idle task, if it exists diff --git a/src/pipecat/services/tavus/video.py b/src/pipecat/services/tavus/video.py index 9f40b709d..d633329bb 100644 --- a/src/pipecat/services/tavus/video.py +++ b/src/pipecat/services/tavus/video.py @@ -244,6 +244,7 @@ class TavusVideoService(AIService): async def _cancel_send_task(self): """Cancel the audio sending task if it exists.""" if self._send_task: + self._queue.cancel() await self.cancel_task(self._send_task) self._send_task = None diff --git a/src/pipecat/services/tts_service.py b/src/pipecat/services/tts_service.py index 1d97045fe..43ab5634f 100644 --- a/src/pipecat/services/tts_service.py +++ b/src/pipecat/services/tts_service.py @@ -805,6 +805,7 @@ class AudioContextWordTTSService(WebsocketWordTTSService): async def _stop_audio_context_task(self): if self._audio_context_task: + self._contexts_queue.cancel() await self.cancel_task(self._audio_context_task) self._audio_context_task = None diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index f90b4c553..93a11f5a3 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -810,6 +810,7 @@ class BaseOutputTransport(FrameProcessor): async def _cancel_clock_task(self): """Cancel and cleanup the clock processing task.""" if self._clock_task: + self._clock_queue.cancel() await self._transport.cancel_task(self._clock_task) self._clock_task = None diff --git a/src/pipecat/utils/asyncio/task_manager.py b/src/pipecat/utils/asyncio/task_manager.py index aaa340399..3bc3453ac 100644 --- a/src/pipecat/utils/asyncio/task_manager.py +++ b/src/pipecat/utils/asyncio/task_manager.py @@ -295,6 +295,9 @@ class TaskManager(BaseTaskManager): raise except Exception as e: logger.exception(f"{name}: unexpected exception while stopping task: {e}") + except BaseException as e: + logger.critical(f"{name}: fatal base exception while stopping task: {e}") + raise async def cancel_task(self, task: asyncio.Task, timeout: Optional[float] = None): """Cancels the given asyncio Task and awaits its completion with an optional timeout. @@ -394,6 +397,10 @@ class TaskManager(BaseTaskManager): while True: try: + if task_data.task.done(): + logger.debug(f"{name}: task is already done, cancelling watchdog task.") + break + start_time = time.time() await asyncio.wait_for(timer.wait(), timeout=watchdog_timeout) total_time = time.time() - start_time @@ -417,7 +424,22 @@ class TaskManager(BaseTaskManager): task_data = self._tasks[name] if task_data.watchdog_task: task_data.watchdog_task.cancel() + # In Python 3.10, simply calling task.cancel() looks like is not enough. + # Without this, some tasks appear that are never canceled. + # Python 3.12 handles this more gracefully, but we keep this for compatibility + # and to avoid "Task exception was never retrieved" warnings. + self.get_event_loop().create_task( + self._cleanup_watchdog(name, task_data.watchdog_task) + ) task_data.watchdog_task = None del self._tasks[name] except KeyError as e: logger.trace(f"{name}: unable to remove task data (already removed?): {e}") + + async def _cleanup_watchdog(self, name: str, watchdog_task: asyncio.Task): + try: + await watchdog_task + except asyncio.CancelledError: + pass + except Exception as e: + logger.warning(f"{name}: watchdog task raised exception: {e}") diff --git a/src/pipecat/utils/asyncio/watchdog_event.py b/src/pipecat/utils/asyncio/watchdog_event.py index b2b306618..73a49ad3e 100644 --- a/src/pipecat/utils/asyncio/watchdog_event.py +++ b/src/pipecat/utils/asyncio/watchdog_event.py @@ -60,3 +60,8 @@ class WatchdogEvent(asyncio.Event): return True except asyncio.TimeoutError: self._manager.task_reset_watchdog() + + def clear(self): + if self._manager.task_watchdog_enabled: + self._manager.task_reset_watchdog() + super().clear() diff --git a/src/pipecat/utils/asyncio/watchdog_priority_queue.py b/src/pipecat/utils/asyncio/watchdog_priority_queue.py index 46c6adf3d..2bd630fba 100644 --- a/src/pipecat/utils/asyncio/watchdog_priority_queue.py +++ b/src/pipecat/utils/asyncio/watchdog_priority_queue.py @@ -12,10 +12,19 @@ timeouts during legitimate queue operations. """ import asyncio +from dataclasses import dataclass + +from loguru import logger from pipecat.utils.asyncio.task_manager import BaseTaskManager +@dataclass +class WatchdogPriorityCancelSentinel: + def __lt__(self, other): + return True + + class WatchdogPriorityQueue(asyncio.PriorityQueue): """Watchdog-enabled asyncio PriorityQueue. @@ -49,9 +58,17 @@ class WatchdogPriorityQueue(asyncio.PriorityQueue): The next item from the priority queue. """ if self._manager.task_watchdog_enabled: - return await self._watchdog_get() + get_result = await self._watchdog_get() else: - return await super().get() + get_result = await super().get() + + if isinstance(get_result, WatchdogPriorityCancelSentinel): + logger.debug( + "Received WatchdogPriorityCancelSentinel, throwing CancelledError to force cancelling" + ) + raise asyncio.CancelledError("Cancelling watchdog queue get() call.") + else: + return get_result def task_done(self): """Mark a task as done and reset watchdog if enabled. @@ -62,6 +79,20 @@ class WatchdogPriorityQueue(asyncio.PriorityQueue): self._manager.task_reset_watchdog() super().task_done() + def cancel(self): + """Ensures reliable task cancellation by preventing a common race condition. + + The race condition occurs in Python 3.10+ when: + 1. A value is put in the queue just before task cancellation + 2. queue.get() completes before the cancellation signal is delivered + 3. The task misses the CancelledError and continues running indefinitely + + This method prevents the issue by injecting a special sentinel value that + forces the task to raise CancelledError when consumed, ensuring proper + task termination. + """ + super().put_nowait(WatchdogPriorityCancelSentinel()) + async def _watchdog_get(self): """Get item from queue while periodically resetting watchdog timer.""" while True: diff --git a/src/pipecat/utils/asyncio/watchdog_queue.py b/src/pipecat/utils/asyncio/watchdog_queue.py index 4a92497f4..e3e379f3d 100644 --- a/src/pipecat/utils/asyncio/watchdog_queue.py +++ b/src/pipecat/utils/asyncio/watchdog_queue.py @@ -12,10 +12,18 @@ timeouts during legitimate queue operations. """ import asyncio +from dataclasses import dataclass + +from loguru import logger from pipecat.utils.asyncio.task_manager import BaseTaskManager +@dataclass +class WatchdogQueueCancelSentinel: + pass + + class WatchdogQueue(asyncio.Queue): """Watchdog-enabled asyncio Queue. @@ -49,9 +57,17 @@ class WatchdogQueue(asyncio.Queue): The next item from the queue. """ if self._manager.task_watchdog_enabled: - return await self._watchdog_get() + get_result = await self._watchdog_get() else: - return await super().get() + get_result = await super().get() + + if isinstance(get_result, WatchdogQueueCancelSentinel): + logger.debug( + "Received WatchdogQueueCancelFrame, throwing CancelledError to force cancelling" + ) + raise asyncio.CancelledError("Cancelling watchdog queue get() call.") + else: + return get_result def task_done(self): """Mark a task as done and reset watchdog if enabled. @@ -62,6 +78,20 @@ class WatchdogQueue(asyncio.Queue): self._manager.task_reset_watchdog() super().task_done() + def cancel(self): + """Ensures reliable task cancellation by preventing a common race condition. + + The race condition occurs in Python 3.10+ when: + 1. A value is put in the queue just before task cancellation + 2. queue.get() completes before the cancellation signal is delivered + 3. The task misses the CancelledError and continues running indefinitely + + This method prevents the issue by injecting a special sentinel value that + forces the task to raise CancelledError when consumed, ensuring proper + task termination. + """ + super().put_nowait(WatchdogQueueCancelSentinel()) + async def _watchdog_get(self): """Get item from queue while periodically resetting watchdog timer.""" while True: