From e18d9f6a111d27ac3009ac9609b3e83d59d9b39a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 19 Mar 2025 17:26:38 -0700 Subject: [PATCH] PipelineTask: automatically cancel tasks if pipeline is idle --- CHANGELOG.md | 13 +++++ src/pipecat/pipeline/task.py | 94 +++++++++++++++++++++++++++++++++++- src/pipecat/tests/utils.py | 6 ++- tests/test_pipeline.py | 91 ++++++++++++++++++++++++++++++++-- 4 files changed, 198 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 35df891ef..2b0da82a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added support for detecting idle pipelines. By default, if no activity has + been detected during 5 minutes, the `PipelineTask` will be automatically + cancelled. It is possible to override this behavior by passing + `cancel_on_idle_timeout=False`. It is also possible to change the default + timeout with `idle_timeout_secs` or the frames that prevent the pipeline from + being idle with `idle_timeout_frames`. Finally, an `on_idle_timeout` event + handler will be triggered if the idle timeout is reached (whether the pipeline + task is cancelled or not). + - Added a `reconnect_on_error` parameter to websocket-based TTS services as well as a `on_connection_error` event handler. The `reconnect_on_error` indicates whether the TTS service should reconnect on error. The `on_connection_error` @@ -111,6 +120,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- ⚠️ `PipelineTask` will now be automatically cancelled if no bot activity is + happening in the pipeline. There are a few settings to configure this + behavior, see `PipelineTask` documentation for more details. + - All event handlers are now executed in separate tasks in order to prevent blocking the pipeline. It is possible that event handlers take some time to execute in which case the pipeline would be blocked waiting for the event diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index e9744fd04..cf62be64f 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -5,6 +5,7 @@ # import asyncio +import time from typing import Any, AsyncIterable, Dict, Iterable, List, Optional, Tuple, Type from loguru import logger @@ -13,6 +14,7 @@ from pydantic import BaseModel, ConfigDict from pipecat.clocks.base_clock import BaseClock from pipecat.clocks.system_clock import SystemClock from pipecat.frames.frames import ( + BotSpeakingFrame, CancelFrame, CancelTaskFrame, EndFrame, @@ -20,6 +22,7 @@ from pipecat.frames.frames import ( ErrorFrame, Frame, HeartbeatFrame, + LLMFullResponseEndFrame, MetricsFrame, StartFrame, StopFrame, @@ -133,12 +136,27 @@ class PipelineTask(BaseTask): async def on_frame_reached_downstream(task, frame): ... + It also has an event handler that detects when the pipeline is idle. By + default, a pipeline is idle if no `BotSpeakingFrame` or + `LLMFullResponseEndFrame` are received within `idle_timeout_secs`. + + @task.event_handler("on_idle_timeout") + async def on_idle_timeout(task): + ... + Args: pipeline: The pipeline to execute. params: Configuration parameters for the pipeline. observers: List of observers for monitoring pipeline execution. clock: Clock implementation for timing operations. check_dangling_tasks: Whether to check for processors' tasks finishing properly. + idle_timeout_secs: Timeout (in seconds) to consider pipeline idle or + None. If a pipeline is idle the pipeline task will be cancelled + automatically. + idle_timeout_frames: A tuple with the frames that should trigger an idle + timeout if not received withing `idle_timeout_seconds`. + cancel_on_idle_timeout: Whether the pipeline task should be cancelled if + the idle timeout is reached. """ @@ -151,12 +169,21 @@ class PipelineTask(BaseTask): clock: BaseClock = SystemClock(), task_manager: Optional[BaseTaskManager] = None, check_dangling_tasks: bool = True, + idle_timeout_secs: Optional[float] = 300, + idle_timeout_frames: Tuple[Type[Frame], ...] = ( + BotSpeakingFrame, + LLMFullResponseEndFrame, + ), + cancel_on_idle_timeout: bool = True, ): super().__init__() self._pipeline = pipeline self._clock = clock self._params = params self._check_dangling_tasks = check_dangling_tasks + self._idle_timeout_secs = idle_timeout_secs + self._idle_timeout_frames = idle_timeout_frames + self._cancel_on_idle_timeout = cancel_on_idle_timeout if self._params.observers: import warnings @@ -178,6 +205,10 @@ class PipelineTask(BaseTask): # This is the heartbeat queue. When a heartbeat frame is received in the # down queue we add it to the heartbeat queue for processing. self._heartbeat_queue = asyncio.Queue() + # This is the idle queue. When frames are received downstream they are + # put in the queue. If no frame is received the pipeline is considered + # idle. + self._idle_queue = asyncio.Queue() # This event is used to indicate a finalize frame (e.g. EndFrame, # StopFrame) has been received in the down queue. self._pipeline_end_event = asyncio.Event() @@ -213,6 +244,7 @@ class PipelineTask(BaseTask): self._reached_downstream_types: Tuple[Type[Frame], ...] = () self._register_event_handler("on_frame_reached_upstream") self._register_event_handler("on_frame_reached_downstream") + self._register_event_handler("on_idle_timeout") @property def params(self) -> PipelineParams: @@ -328,19 +360,30 @@ class PipelineTask(BaseTask): self._heartbeat_monitor_handler(), f"{self}::_heartbeat_monitor_handler" ) + def _maybe_start_idle_task(self): + if self._idle_timeout_secs: + self._idle_monitor_task = self._task_manager.create_task( + self._idle_monitor_handler(), f"{self}::_idle_monitor_handler" + ) + async def _cancel_tasks(self): - await self._maybe_cancel_heartbeat_tasks() + await self._observer.stop() await self._task_manager.cancel_task(self._process_up_task) await self._task_manager.cancel_task(self._process_down_task) - await self._observer.stop() + await self._maybe_cancel_heartbeat_tasks() + await self._maybe_cancel_idle_task() async def _maybe_cancel_heartbeat_tasks(self): if self._params.enable_heartbeats: await self._task_manager.cancel_task(self._heartbeat_push_task) await self._task_manager.cancel_task(self._heartbeat_monitor_task) + async def _maybe_cancel_idle_task(self): + if self._idle_timeout_secs: + await self._task_manager.cancel_task(self._idle_monitor_task) + def _initial_metrics_frame(self) -> MetricsFrame: processors = self._pipeline.processors_with_metrics() data = [] @@ -372,6 +415,7 @@ class PipelineTask(BaseTask): self._clock.start() self._maybe_start_heartbeat_tasks() + self._maybe_start_idle_task() start_frame = StartFrame( clock=self._clock, @@ -445,6 +489,10 @@ class PipelineTask(BaseTask): while True: frame = await self._down_queue.get() + # Queue received frame to the idle queue so we can monitor idle + # pipelines. + await self._idle_queue.put(frame) + if isinstance(frame, self._reached_downstream_types): await self._call_event_handler("on_frame_reached_downstream", frame) @@ -482,6 +530,48 @@ class PipelineTask(BaseTask): f"{self}: heartbeat frame not received for more than {wait_time} seconds" ) + async def _idle_monitor_handler(self): + """This tasks monitors activity in the pipeline. If no frames are + received (heartbeats don't count) the pipeline is considered idle. + + """ + running = True + last_frame_time = 0 + while running: + try: + frame = await asyncio.wait_for( + self._idle_queue.get(), timeout=self._idle_timeout_secs + ) + + if isinstance(frame, StartFrame) or isinstance(frame, self._idle_timeout_frames): + # If we find a StartFrame or one of the frames that prevents a + # time out we update the time. + last_frame_time = time.time() + else: + # If we find any other frame we check if the pipeline is + # idle by checking the last time we received one of the + # valid frames. + diff_time = time.time() - last_frame_time + if diff_time >= self._idle_timeout_secs: + running = await self._idle_timeout_detected() + + self._idle_queue.task_done() + except asyncio.TimeoutError: + running = await self._idle_timeout_detected() + + async def _idle_timeout_detected(self) -> bool: + """Logic for when the pipeline is idle. + + Returns: + bool: Whther the pipeline task is being cancelled or not. + """ + await self._call_event_handler("on_idle_timeout") + if self._cancel_on_idle_timeout: + logger.warning(f"Idle pipeline detected, cancelling pipeline task...") + await self.cancel() + return False + return True + def _print_dangling_tasks(self): tasks = [t.get_name() for t in self._task_manager.current_tasks()] if tasks: diff --git a/src/pipecat/tests/utils.py b/src/pipecat/tests/utils.py index 7fcecc3ff..d68c87d88 100644 --- a/src/pipecat/tests/utils.py +++ b/src/pipecat/tests/utils.py @@ -101,7 +101,11 @@ async def run_test( pipeline = Pipeline([source, processor, sink]) - task = PipelineTask(pipeline, params=PipelineParams(start_metadata=start_metadata)) + task = PipelineTask( + pipeline, + params=PipelineParams(start_metadata=start_metadata), + cancel_on_idle_timeout=False, + ) async def push_frames(): # Just give a little head start to the runner. diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index c3811a672..46114b5b2 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -5,6 +5,7 @@ # import asyncio +import time import unittest from pipecat.frames.frames import EndFrame, HeartbeatFrame, StartFrame, TextFrame @@ -100,7 +101,7 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase): identity = IdentityFilter() pipeline = Pipeline([identity]) - task = PipelineTask(pipeline) + task = PipelineTask(pipeline, cancel_on_idle_timeout=False) task.set_event_loop(asyncio.get_event_loop()) task.set_reached_upstream_filter((TextFrame,)) task.set_reached_downstream_filter((TextFrame,)) @@ -123,7 +124,7 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase): await task.queue_frame(TextFrame(text="Hello Downstream!")) try: - await asyncio.wait_for(task.run(), timeout=1.0) + await asyncio.wait_for(asyncio.shield(task.run()), timeout=1.0) except asyncio.TimeoutError: pass @@ -149,6 +150,7 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase): heartbeats_period_secs=0.2, ), observers=[heartbeats_observer], + cancel_on_idle_timeout=False, ) task.set_event_loop(asyncio.get_event_loop()) @@ -156,7 +158,90 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase): await task.queue_frame(TextFrame(text="Hello!")) try: - await asyncio.wait_for(task.run(), timeout=1.0) + await asyncio.wait_for(asyncio.shield(task.run()), timeout=1.0) except asyncio.TimeoutError: pass assert heartbeats_counter == expected_heartbeats + + async def test_idle_task(self): + identity = IdentityFilter() + pipeline = Pipeline([identity]) + task = PipelineTask(pipeline, idle_timeout_secs=0.2) + task.set_event_loop(asyncio.get_event_loop()) + await task.run() + assert True + + async def test_no_idle_task(self): + identity = IdentityFilter() + pipeline = Pipeline([identity]) + task = PipelineTask(pipeline, idle_timeout_secs=0.2, cancel_on_idle_timeout=False) + task.set_event_loop(asyncio.get_event_loop()) + try: + await asyncio.wait_for(asyncio.shield(task.run()), timeout=0.3) + except asyncio.TimeoutError: + assert True + else: + assert False + + async def test_idle_task_heartbeats(self): + identity = IdentityFilter() + pipeline = Pipeline([identity]) + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_heartbeats=True, + heartbeats_period_secs=0.1, + ), + idle_timeout_secs=0.3, + ) + task.set_event_loop(asyncio.get_event_loop()) + await task.run() + assert True + + async def test_idle_task_event_handler(self): + identity = IdentityFilter() + pipeline = Pipeline([identity]) + task = PipelineTask(pipeline, idle_timeout_secs=0.2, cancel_on_idle_timeout=False) + task.set_event_loop(asyncio.get_event_loop()) + + idle_timeout = False + + @task.event_handler("on_idle_timeout") + async def on_idle_timeout(task: PipelineTask): + nonlocal idle_timeout + idle_timeout = True + await task.cancel() + + await task.run() + assert True + + async def test_idle_task_frames(self): + idle_timeout_secs = 0.2 + sleep_time_secs = idle_timeout_secs / 2 + + identity = IdentityFilter() + pipeline = Pipeline([identity]) + task = PipelineTask( + pipeline, + idle_timeout_secs=idle_timeout_secs, + idle_timeout_frames=(TextFrame,), + ) + task.set_event_loop(asyncio.get_event_loop()) + + async def delayed_frames(): + await asyncio.sleep(sleep_time_secs) + await task.queue_frame(TextFrame("Hello Pipecat!")) + await asyncio.sleep(sleep_time_secs) + await task.queue_frame(TextFrame("Hello Pipecat!")) + await asyncio.sleep(sleep_time_secs) + await task.queue_frame(TextFrame("Hello Pipecat!")) + + start_time = time.time() + + tasks = {asyncio.create_task(task.run()), asyncio.create_task(delayed_frames())} + + await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + + diff_time = time.time() - start_time + + self.assertGreater(diff_time, sleep_time_secs * 3)