PipelineTask: fix repeated on_idle_timeout

This commit is contained in:
Aleix Conchillo Flaqué
2025-06-18 23:40:34 -07:00
parent ebb23a5a8c
commit 028f7b2d65
3 changed files with 49 additions and 3 deletions

View File

@@ -37,6 +37,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed an issue that would cause multiple `PipelineTask.on_idle_timeout`
events to be triggered repeatedly.
- Fixed an issue that was causing user and bot speech to not be synchronized
during recordings.

View File

@@ -663,6 +663,11 @@ class PipelineTask(BaseTask):
diff_time = time.time() - last_frame_time
if diff_time >= self._idle_timeout_secs:
running = await self._idle_timeout_detected()
# Reset `last_frame_time` so we don't trigger another
# immediate idle timeout if we are not cancelling. For
# example, we might want to force the bot to say goodbye
# and then clean nicely with an `EndFrame`.
last_frame_time = time.time()
self._idle_queue.task_done()
except asyncio.TimeoutError:

View File

@@ -8,7 +8,14 @@ import asyncio
import time
import unittest
from pipecat.frames.frames import EndFrame, HeartbeatFrame, StartFrame, StopFrame, TextFrame
from pipecat.frames.frames import (
EndFrame,
HeartbeatFrame,
InputAudioRawFrame,
StartFrame,
StopFrame,
TextFrame,
)
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
@@ -321,7 +328,7 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
await task.run()
assert True
async def test_idle_task_event_handler(self):
async def test_idle_task_event_handler_no_frames(self):
identity = IdentityFilter()
pipeline = Pipeline([identity])
task = PipelineTask(pipeline, idle_timeout_secs=0.2, cancel_on_idle_timeout=False)
@@ -336,7 +343,38 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
await task.cancel()
await task.run()
assert True
assert idle_timeout
async def test_idle_task_event_handler_quiet_user(self):
identity = IdentityFilter()
pipeline = Pipeline([identity])
task = PipelineTask(pipeline, idle_timeout_secs=0.2, cancel_on_idle_timeout=False)
task.set_event_loop(asyncio.get_event_loop())
idle_timeout = 0
@task.event_handler("on_idle_timeout")
async def on_idle_timeout(task: PipelineTask):
nonlocal idle_timeout
idle_timeout += 1
# Stay a bit longer here while user audio frames are still being
# pushed. We do this to make sure this function is only called once.
await asyncio.sleep(0.1)
await task.queue_frame(EndFrame())
async def send_audio():
# We send audio during and after the 0.2 seconds of idle
# timeout. Inside `on_idle_timeout` we are waiting a little bit
# simulating the pipeline finishing (e.g. goodbye message from bot
# flushing).
for i in range(30):
await task.queue_frame(
InputAudioRawFrame(audio=b"\x00", sample_rate=16000, num_channels=1)
)
await asyncio.sleep(0.01)
await asyncio.gather(send_audio(), task.run())
assert idle_timeout == 1
async def test_idle_task_frames(self):
idle_timeout_secs = 0.2