diff --git a/CHANGELOG.md b/CHANGELOG.md index e8e93ab8f..410d4faf9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed an issue that would cause multiple `PipelineTask.on_idle_timeout` + events to be triggered repeatedly. + - Fixed an issue that was causing user and bot speech to not be synchronized during recordings. diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 736f98244..851452fbb 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -663,6 +663,11 @@ class PipelineTask(BaseTask): diff_time = time.time() - last_frame_time if diff_time >= self._idle_timeout_secs: running = await self._idle_timeout_detected() + # Reset `last_frame_time` so we don't trigger another + # immediate idle timeout if we are not cancelling. For + # example, we might want to force the bot to say goodbye + # and then clean nicely with an `EndFrame`. + last_frame_time = time.time() self._idle_queue.task_done() except asyncio.TimeoutError: diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 84485098d..1146681f9 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -8,7 +8,14 @@ import asyncio import time import unittest -from pipecat.frames.frames import EndFrame, HeartbeatFrame, StartFrame, StopFrame, TextFrame +from pipecat.frames.frames import ( + EndFrame, + HeartbeatFrame, + InputAudioRawFrame, + StartFrame, + StopFrame, + TextFrame, +) from pipecat.observers.base_observer import BaseObserver, FramePushed from pipecat.pipeline.parallel_pipeline import ParallelPipeline from pipecat.pipeline.pipeline import Pipeline @@ -321,7 +328,7 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase): await task.run() assert True - async def test_idle_task_event_handler(self): + async def test_idle_task_event_handler_no_frames(self): identity = IdentityFilter() pipeline = Pipeline([identity]) task = PipelineTask(pipeline, idle_timeout_secs=0.2, cancel_on_idle_timeout=False) @@ -336,7 +343,38 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase): await task.cancel() await task.run() - assert True + assert idle_timeout + + async def test_idle_task_event_handler_quiet_user(self): + identity = IdentityFilter() + pipeline = Pipeline([identity]) + task = PipelineTask(pipeline, idle_timeout_secs=0.2, cancel_on_idle_timeout=False) + task.set_event_loop(asyncio.get_event_loop()) + + idle_timeout = 0 + + @task.event_handler("on_idle_timeout") + async def on_idle_timeout(task: PipelineTask): + nonlocal idle_timeout + idle_timeout += 1 + # Stay a bit longer here while user audio frames are still being + # pushed. We do this to make sure this function is only called once. + await asyncio.sleep(0.1) + await task.queue_frame(EndFrame()) + + async def send_audio(): + # We send audio during and after the 0.2 seconds of idle + # timeout. Inside `on_idle_timeout` we are waiting a little bit + # simulating the pipeline finishing (e.g. goodbye message from bot + # flushing). + for i in range(30): + await task.queue_frame( + InputAudioRawFrame(audio=b"\x00", sample_rate=16000, num_channels=1) + ) + await asyncio.sleep(0.01) + + await asyncio.gather(send_audio(), task.run()) + assert idle_timeout == 1 async def test_idle_task_frames(self): idle_timeout_secs = 0.2