Fix asyncio.Event race conditions in idle processors
Move event.clear() from finally block to success path in IdleFrameProcessor and UserIdleProcessor._idle_task_handler(). The finally block unconditionally cleared signals set during async timeout callbacks, causing false-positive idle detection. Closes #3402
This commit is contained in:
committed by
Aleix Conchillo Flaqué
parent
988c08a5b6
commit
565349d332
1
changelog/3796.fixed.md
Normal file
1
changelog/3796.fixed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed `IdleFrameProcessor` where `asyncio.Event` was unconditionally cleared in a `finally` block instead of only on the success path.
|
||||
@@ -85,7 +85,6 @@ class IdleFrameProcessor(FrameProcessor):
|
||||
while True:
|
||||
try:
|
||||
await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout)
|
||||
self._idle_event.clear()
|
||||
except asyncio.TimeoutError:
|
||||
await self._callback(self)
|
||||
finally:
|
||||
self._idle_event.clear()
|
||||
|
||||
104
tests/test_idle_frame_processor.py
Normal file
104
tests/test_idle_frame_processor.py
Normal file
@@ -0,0 +1,104 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import unittest
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
TextFrame,
|
||||
TranscriptionFrame,
|
||||
)
|
||||
from pipecat.processors.idle_frame_processor import IdleFrameProcessor
|
||||
from pipecat.tests.utils import SleepFrame, run_test
|
||||
|
||||
|
||||
class TestIdleFrameProcessor(unittest.IsolatedAsyncioTestCase):
|
||||
async def test_basic_idle_detection(self):
|
||||
"""Test that idle callback is triggered after timeout."""
|
||||
callback_called = asyncio.Event()
|
||||
|
||||
async def idle_callback(processor):
|
||||
callback_called.set()
|
||||
|
||||
processor = IdleFrameProcessor(callback=idle_callback, timeout=0.1)
|
||||
|
||||
await run_test(
|
||||
processor,
|
||||
frames_to_send=[SleepFrame(sleep=0.2)],
|
||||
expected_down_frames=[],
|
||||
)
|
||||
|
||||
assert callback_called.is_set(), "Idle callback was not called"
|
||||
|
||||
async def test_activity_resets_timer(self):
|
||||
"""Test that receiving frames resets the idle timer."""
|
||||
callback_called = asyncio.Event()
|
||||
|
||||
async def idle_callback(processor):
|
||||
callback_called.set()
|
||||
|
||||
processor = IdleFrameProcessor(callback=idle_callback, timeout=0.2)
|
||||
|
||||
await run_test(
|
||||
processor,
|
||||
frames_to_send=[
|
||||
SleepFrame(sleep=0.1),
|
||||
TextFrame(text="hello"),
|
||||
SleepFrame(sleep=0.1),
|
||||
TextFrame(text="world"),
|
||||
SleepFrame(sleep=0.1),
|
||||
],
|
||||
expected_down_frames=[TextFrame, TextFrame],
|
||||
)
|
||||
|
||||
assert not callback_called.is_set()
|
||||
|
||||
async def test_signal_preserved_during_callback(self):
|
||||
"""Regression: event set during callback must not be cleared."""
|
||||
|
||||
class _Sentinel(Exception):
|
||||
pass
|
||||
|
||||
async def callback(proc):
|
||||
proc._idle_event.set()
|
||||
raise _Sentinel()
|
||||
|
||||
processor = IdleFrameProcessor(callback=callback, timeout=0.01)
|
||||
processor._idle_event = asyncio.Event()
|
||||
|
||||
with self.assertRaises(_Sentinel):
|
||||
await processor._idle_task_handler()
|
||||
|
||||
self.assertTrue(processor._idle_event.is_set())
|
||||
|
||||
async def test_specific_frame_types(self):
|
||||
"""Test that only monitored frame types reset the idle timer."""
|
||||
callback_called = asyncio.Event()
|
||||
|
||||
async def idle_callback(processor):
|
||||
callback_called.set()
|
||||
|
||||
processor = IdleFrameProcessor(
|
||||
callback=idle_callback,
|
||||
timeout=0.15,
|
||||
types=[TranscriptionFrame],
|
||||
)
|
||||
|
||||
await run_test(
|
||||
processor,
|
||||
frames_to_send=[
|
||||
SleepFrame(sleep=0.1),
|
||||
TextFrame(text="this should not reset"),
|
||||
SleepFrame(sleep=0.2),
|
||||
],
|
||||
expected_down_frames=[TextFrame],
|
||||
)
|
||||
|
||||
assert callback_called.is_set()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user