Merge pull request #2085 from pipecat-ai/filipi/freeze-test-python-3.10

Fixing pipeline freeze when using Python 3.10
This commit is contained in:
Filipi da Silva Fuchter
2025-07-01 17:17:38 -03:00
committed by GitHub
18 changed files with 174 additions and 11 deletions

View File

@@ -47,6 +47,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed a race condition that occurs in Python 3.10+ where the task could miss
the `CancelledError` and continue running indefinitely, freezing the pipeline.
- Fixed a `AWSNovaSonicLLMService` issue introduced in 0.0.72.
## [0.0.73] - 2025-06-26

View File

@@ -191,7 +191,17 @@ class WebsocketClientApp {
const startTime = Date.now();
this.recordingSerializer = new RecordingSerializer()
const transport = this.ENABLE_RECORDING_MODE ? new WebSocketTransport({serializer: this.recordingSerializer}) : new WebSocketTransport();
const transport = this.ENABLE_RECORDING_MODE ?
new WebSocketTransport({
serializer: this.recordingSerializer,
recorderSampleRate: 8000,
playerSampleRate:8000
}) :
new WebSocketTransport({
serializer: new ProtobufFrameSerializer(),
recorderSampleRate: 8000,
playerSampleRate:8000
});
this.websocketTransport = transport
const RTVIConfig: RTVIClientOptions = {

View File

@@ -0,0 +1,4 @@
SENTRY_DSN=
DEEPGRAM_API_KEY=
CARTESIA_API_KEY=
OPENAI_API_KEY=

View File

@@ -18,7 +18,6 @@ from fastapi import FastAPI, Request, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import RedirectResponse
from loguru import logger
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
@@ -27,11 +26,13 @@ from pipecat.frames.frames import (
Frame,
InterimTranscriptionFrame,
LLMFullResponseEndFrame,
LLMMessagesFrame,
StartFrame,
StartInterruptionFrame,
StopFrame,
StopInterruptionFrame,
TranscriptionFrame,
TTSSpeakFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
@@ -47,6 +48,7 @@ from pipecat.processors.aggregators.openai_llm_context import (
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIProcessor
from pipecat.processors.metrics.sentry import SentryMetrics
from pipecat.processors.user_idle_processor import UserIdleProcessor
from pipecat.serializers.protobuf import ProtobufFrameSerializer
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
@@ -78,9 +80,6 @@ app.add_middleware(
allow_headers=["*"],
)
# Mount the frontend at /
app.mount("/client", SmallWebRTCPrebuiltUI)
class SimulateFreezeInput(FrameProcessor):
def __init__(
@@ -188,6 +187,37 @@ async def run_example(websocket_client):
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
async def handle_user_idle(user_idle: UserIdleProcessor, retry_count: int) -> bool:
if retry_count == 1:
# First attempt: Add a gentle prompt to the conversation
messages.append(
{
"role": "system",
"content": "The user has been quiet. Politely and briefly ask if they're still there.",
}
)
await user_idle.push_frame(LLMMessagesFrame(messages))
return True
elif retry_count == 2:
# Second attempt: More direct prompt
messages.append(
{
"role": "system",
"content": "The user is still inactive. Ask if they'd like to continue our conversation.",
}
)
await user_idle.push_frame(LLMMessagesFrame(messages))
return True
else:
# Third attempt: End the conversation
await user_idle.push_frame(
TTSSpeakFrame("It seems like you're busy right now. Have a nice day!")
)
await task.queue_frame(EndFrame())
return False
user_idle = UserIdleProcessor(callback=handle_user_idle, timeout=10.0)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
@@ -222,6 +252,7 @@ async def run_example(websocket_client):
stt,
],
),
user_idle,
rtvi,
context_aggregator.user(), # User responses
llm, # LLM
@@ -238,6 +269,8 @@ async def run_example(websocket_client):
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
audio_in_sample_rate=8000,
audio_out_sample_rate=8000,
),
idle_timeout_secs=120,
observers=[
@@ -249,6 +282,10 @@ async def run_example(websocket_client):
# LLMTextFrame: None,
OpenAILLMContextFrame: None,
LLMFullResponseEndFrame: None,
UserStartedSpeakingFrame: None,
UserStoppedSpeakingFrame: None,
StartInterruptionFrame: None,
StopInterruptionFrame: None,
},
exclude_fields={
"result",

View File

@@ -0,0 +1,4 @@
python-dotenv
fastapi[all]
uvicorn
pipecat-ai[silero,websocket,openai, deepgram, cartesia, sentry]

View File

@@ -258,9 +258,11 @@ class ParallelPipeline(BasePipeline):
async def _cancel(self):
"""Cancel all parallel pipeline processing tasks."""
if self._up_task:
self._up_queue.cancel()
await self.cancel_task(self._up_task)
self._up_task = None
if self._down_task:
self._down_queue.cancel()
await self.cancel_task(self._down_task)
self._down_task = None

View File

@@ -77,6 +77,7 @@ class ConsumerProcessor(FrameProcessor):
async def _cancel(self, _: CancelFrame):
"""Cancel the consumer task."""
if self._consumer_task:
self._queue.cancel()
await self.cancel_task(self._consumer_task)
async def _consumer_task_handler(self):

View File

@@ -651,6 +651,7 @@ class FrameProcessor(BaseObject):
async def __cancel_input_task(self):
"""Cancel the input processing task."""
if self.__input_frame_task:
self.__input_queue.cancel()
await self.cancel_task(self.__input_frame_task)
self.__input_frame_task = None
@@ -686,6 +687,7 @@ class FrameProcessor(BaseObject):
async def __cancel_push_task(self):
"""Cancel the frame pushing task."""
if self.__push_frame_task:
self.__push_queue.cancel()
await self.cancel_task(self.__push_frame_task)
self.__push_frame_task = None

View File

@@ -1086,10 +1086,12 @@ class RTVIProcessor(FrameProcessor):
async def _cancel_tasks(self):
"""Cancel all running tasks."""
if self._action_task:
self._action_queue.cancel()
await self.cancel_task(self._action_task)
self._action_task = None
if self._message_task:
self._message_queue.cancel()
await self.cancel_task(self._message_task)
self._message_task = None

View File

@@ -11,6 +11,7 @@ from typing import Awaitable, Callable, List, Optional
from pipecat.frames.frames import Frame, StartFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.asyncio.watchdog_event import WatchdogEvent
class IdleFrameProcessor(FrameProcessor):
@@ -77,7 +78,7 @@ class IdleFrameProcessor(FrameProcessor):
def _create_idle_task(self):
"""Create and start the idle monitoring task."""
if not self._idle_task:
self._idle_event = asyncio.Event()
self._idle_event = WatchdogEvent(self.task_manager)
self._idle_task = self.create_task(self._idle_task_handler())
async def _idle_task_handler(self):

View File

@@ -15,10 +15,12 @@ from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
StartFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.asyncio.watchdog_event import WatchdogEvent
class UserIdleProcessor(FrameProcessor):
@@ -74,7 +76,7 @@ class UserIdleProcessor(FrameProcessor):
self._interrupted = False
self._conversation_started = False
self._idle_task = None
self._idle_event = asyncio.Event()
self._idle_event = None
def _wrap_callback(
self,
@@ -134,6 +136,9 @@ class UserIdleProcessor(FrameProcessor):
"""
await super().process_frame(frame, direction)
if isinstance(frame, StartFrame):
self._idle_event = WatchdogEvent(self.task_manager)
# Check for end frames before processing
if isinstance(frame, (EndFrame, CancelFrame)):
# Stop the idle task, if it exists

View File

@@ -244,6 +244,7 @@ class TavusVideoService(AIService):
async def _cancel_send_task(self):
"""Cancel the audio sending task if it exists."""
if self._send_task:
self._queue.cancel()
await self.cancel_task(self._send_task)
self._send_task = None

View File

@@ -805,6 +805,7 @@ class AudioContextWordTTSService(WebsocketWordTTSService):
async def _stop_audio_context_task(self):
if self._audio_context_task:
self._contexts_queue.cancel()
await self.cancel_task(self._audio_context_task)
self._audio_context_task = None

View File

@@ -810,6 +810,7 @@ class BaseOutputTransport(FrameProcessor):
async def _cancel_clock_task(self):
"""Cancel and cleanup the clock processing task."""
if self._clock_task:
self._clock_queue.cancel()
await self._transport.cancel_task(self._clock_task)
self._clock_task = None

View File

@@ -295,6 +295,9 @@ class TaskManager(BaseTaskManager):
raise
except Exception as e:
logger.exception(f"{name}: unexpected exception while stopping task: {e}")
except BaseException as e:
logger.critical(f"{name}: fatal base exception while stopping task: {e}")
raise
async def cancel_task(self, task: asyncio.Task, timeout: Optional[float] = None):
"""Cancels the given asyncio Task and awaits its completion with an optional timeout.
@@ -394,6 +397,10 @@ class TaskManager(BaseTaskManager):
while True:
try:
if task_data.task.done():
logger.debug(f"{name}: task is already done, cancelling watchdog task.")
break
start_time = time.time()
await asyncio.wait_for(timer.wait(), timeout=watchdog_timeout)
total_time = time.time() - start_time
@@ -417,7 +424,22 @@ class TaskManager(BaseTaskManager):
task_data = self._tasks[name]
if task_data.watchdog_task:
task_data.watchdog_task.cancel()
# In Python 3.10, simply calling task.cancel() looks like is not enough.
# Without this, some tasks appear that are never canceled.
# Python 3.12 handles this more gracefully, but we keep this for compatibility
# and to avoid "Task exception was never retrieved" warnings.
self.get_event_loop().create_task(
self._cleanup_watchdog(name, task_data.watchdog_task)
)
task_data.watchdog_task = None
del self._tasks[name]
except KeyError as e:
logger.trace(f"{name}: unable to remove task data (already removed?): {e}")
async def _cleanup_watchdog(self, name: str, watchdog_task: asyncio.Task):
try:
await watchdog_task
except asyncio.CancelledError:
pass
except Exception as e:
logger.warning(f"{name}: watchdog task raised exception: {e}")

View File

@@ -60,3 +60,9 @@ class WatchdogEvent(asyncio.Event):
return True
except asyncio.TimeoutError:
self._manager.task_reset_watchdog()
def clear(self):
"""Clear the event while resetting watchdog timer."""
if self._manager.task_watchdog_enabled:
self._manager.task_reset_watchdog()
super().clear()

View File

@@ -12,10 +12,19 @@ timeouts during legitimate queue operations.
"""
import asyncio
from dataclasses import dataclass
from loguru import logger
from pipecat.utils.asyncio.task_manager import BaseTaskManager
@dataclass
class _WatchdogPriorityCancelSentinel:
def __lt__(self, other):
return True
class WatchdogPriorityQueue(asyncio.PriorityQueue):
"""Watchdog-enabled asyncio PriorityQueue.
@@ -49,9 +58,17 @@ class WatchdogPriorityQueue(asyncio.PriorityQueue):
The next item from the priority queue.
"""
if self._manager.task_watchdog_enabled:
return await self._watchdog_get()
get_result = await self._watchdog_get()
else:
return await super().get()
get_result = await super().get()
if isinstance(get_result, _WatchdogPriorityCancelSentinel):
logger.debug(
"Received WatchdogPriorityCancelSentinel, throwing CancelledError to force cancelling"
)
raise asyncio.CancelledError("Cancelling watchdog queue get() call.")
else:
return get_result
def task_done(self):
"""Mark a task as done and reset watchdog if enabled.
@@ -62,6 +79,20 @@ class WatchdogPriorityQueue(asyncio.PriorityQueue):
self._manager.task_reset_watchdog()
super().task_done()
def cancel(self):
"""Ensures reliable task cancellation by preventing a common race condition.
The race condition occurs in Python 3.10+ when:
1. A value is put in the queue just before task cancellation
2. queue.get() completes before the cancellation signal is delivered
3. The task misses the CancelledError and continues running indefinitely
This method prevents the issue by injecting a special sentinel value that
forces the task to raise CancelledError when consumed, ensuring proper
task termination.
"""
super().put_nowait(_WatchdogPriorityCancelSentinel())
async def _watchdog_get(self):
"""Get item from queue while periodically resetting watchdog timer."""
while True:

View File

@@ -12,10 +12,18 @@ timeouts during legitimate queue operations.
"""
import asyncio
from dataclasses import dataclass
from loguru import logger
from pipecat.utils.asyncio.task_manager import BaseTaskManager
@dataclass
class _WatchdogQueueCancelSentinel:
pass
class WatchdogQueue(asyncio.Queue):
"""Watchdog-enabled asyncio Queue.
@@ -49,9 +57,17 @@ class WatchdogQueue(asyncio.Queue):
The next item from the queue.
"""
if self._manager.task_watchdog_enabled:
return await self._watchdog_get()
get_result = await self._watchdog_get()
else:
return await super().get()
get_result = await super().get()
if isinstance(get_result, _WatchdogQueueCancelSentinel):
logger.debug(
"Received WatchdogQueueCancelFrame, throwing CancelledError to force cancelling"
)
raise asyncio.CancelledError("Cancelling watchdog queue get() call.")
else:
return get_result
def task_done(self):
"""Mark a task as done and reset watchdog if enabled.
@@ -62,6 +78,20 @@ class WatchdogQueue(asyncio.Queue):
self._manager.task_reset_watchdog()
super().task_done()
def cancel(self):
"""Ensures reliable task cancellation by preventing a common race condition.
The race condition occurs in Python 3.10+ when:
1. A value is put in the queue just before task cancellation
2. queue.get() completes before the cancellation signal is delivered
3. The task misses the CancelledError and continues running indefinitely
This method prevents the issue by injecting a special sentinel value that
forces the task to raise CancelledError when consumed, ensuring proper
task termination.
"""
super().put_nowait(_WatchdogQueueCancelSentinel())
async def _watchdog_get(self):
"""Get item from queue while periodically resetting watchdog timer."""
while True: