Merge pull request #1406 from pipecat-ai/aleix/pipeline-task-idle-timeouts

PipelineTask: automatically cancel tasks if pipeline is idle
This commit is contained in:
Aleix Conchillo Flaqué
2025-03-20 08:37:39 -07:00
committed by GitHub
4 changed files with 198 additions and 6 deletions

View File

@@ -9,6 +9,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added support for detecting idle pipelines. By default, if no activity has
been detected during 5 minutes, the `PipelineTask` will be automatically
cancelled. It is possible to override this behavior by passing
`cancel_on_idle_timeout=False`. It is also possible to change the default
timeout with `idle_timeout_secs` or the frames that prevent the pipeline from
being idle with `idle_timeout_frames`. Finally, an `on_idle_timeout` event
handler will be triggered if the idle timeout is reached (whether the pipeline
task is cancelled or not).
- Added a `reconnect_on_error` parameter to websocket-based TTS services as well
as a `on_connection_error` event handler. The `reconnect_on_error` indicates
whether the TTS service should reconnect on error. The `on_connection_error`
@@ -111,6 +120,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- ⚠️ `PipelineTask` will now be automatically cancelled if no bot activity is
happening in the pipeline. There are a few settings to configure this
behavior, see `PipelineTask` documentation for more details.
- All event handlers are now executed in separate tasks in order to prevent
blocking the pipeline. It is possible that event handlers take some time to
execute in which case the pipeline would be blocked waiting for the event

View File

@@ -5,6 +5,7 @@
#
import asyncio
import time
from typing import Any, AsyncIterable, Dict, Iterable, List, Optional, Tuple, Type
from loguru import logger
@@ -13,6 +14,7 @@ from pydantic import BaseModel, ConfigDict
from pipecat.clocks.base_clock import BaseClock
from pipecat.clocks.system_clock import SystemClock
from pipecat.frames.frames import (
BotSpeakingFrame,
CancelFrame,
CancelTaskFrame,
EndFrame,
@@ -20,6 +22,7 @@ from pipecat.frames.frames import (
ErrorFrame,
Frame,
HeartbeatFrame,
LLMFullResponseEndFrame,
MetricsFrame,
StartFrame,
StopFrame,
@@ -133,12 +136,27 @@ class PipelineTask(BaseTask):
async def on_frame_reached_downstream(task, frame):
...
It also has an event handler that detects when the pipeline is idle. By
default, a pipeline is idle if no `BotSpeakingFrame` or
`LLMFullResponseEndFrame` are received within `idle_timeout_secs`.
@task.event_handler("on_idle_timeout")
async def on_idle_timeout(task):
...
Args:
pipeline: The pipeline to execute.
params: Configuration parameters for the pipeline.
observers: List of observers for monitoring pipeline execution.
clock: Clock implementation for timing operations.
check_dangling_tasks: Whether to check for processors' tasks finishing properly.
idle_timeout_secs: Timeout (in seconds) to consider pipeline idle or
None. If a pipeline is idle the pipeline task will be cancelled
automatically.
idle_timeout_frames: A tuple with the frames that should trigger an idle
timeout if not received withing `idle_timeout_seconds`.
cancel_on_idle_timeout: Whether the pipeline task should be cancelled if
the idle timeout is reached.
"""
@@ -151,12 +169,21 @@ class PipelineTask(BaseTask):
clock: BaseClock = SystemClock(),
task_manager: Optional[BaseTaskManager] = None,
check_dangling_tasks: bool = True,
idle_timeout_secs: Optional[float] = 300,
idle_timeout_frames: Tuple[Type[Frame], ...] = (
BotSpeakingFrame,
LLMFullResponseEndFrame,
),
cancel_on_idle_timeout: bool = True,
):
super().__init__()
self._pipeline = pipeline
self._clock = clock
self._params = params
self._check_dangling_tasks = check_dangling_tasks
self._idle_timeout_secs = idle_timeout_secs
self._idle_timeout_frames = idle_timeout_frames
self._cancel_on_idle_timeout = cancel_on_idle_timeout
if self._params.observers:
import warnings
@@ -178,6 +205,10 @@ class PipelineTask(BaseTask):
# This is the heartbeat queue. When a heartbeat frame is received in the
# down queue we add it to the heartbeat queue for processing.
self._heartbeat_queue = asyncio.Queue()
# This is the idle queue. When frames are received downstream they are
# put in the queue. If no frame is received the pipeline is considered
# idle.
self._idle_queue = asyncio.Queue()
# This event is used to indicate a finalize frame (e.g. EndFrame,
# StopFrame) has been received in the down queue.
self._pipeline_end_event = asyncio.Event()
@@ -213,6 +244,7 @@ class PipelineTask(BaseTask):
self._reached_downstream_types: Tuple[Type[Frame], ...] = ()
self._register_event_handler("on_frame_reached_upstream")
self._register_event_handler("on_frame_reached_downstream")
self._register_event_handler("on_idle_timeout")
@property
def params(self) -> PipelineParams:
@@ -328,19 +360,30 @@ class PipelineTask(BaseTask):
self._heartbeat_monitor_handler(), f"{self}::_heartbeat_monitor_handler"
)
def _maybe_start_idle_task(self):
if self._idle_timeout_secs:
self._idle_monitor_task = self._task_manager.create_task(
self._idle_monitor_handler(), f"{self}::_idle_monitor_handler"
)
async def _cancel_tasks(self):
await self._maybe_cancel_heartbeat_tasks()
await self._observer.stop()
await self._task_manager.cancel_task(self._process_up_task)
await self._task_manager.cancel_task(self._process_down_task)
await self._observer.stop()
await self._maybe_cancel_heartbeat_tasks()
await self._maybe_cancel_idle_task()
async def _maybe_cancel_heartbeat_tasks(self):
if self._params.enable_heartbeats:
await self._task_manager.cancel_task(self._heartbeat_push_task)
await self._task_manager.cancel_task(self._heartbeat_monitor_task)
async def _maybe_cancel_idle_task(self):
if self._idle_timeout_secs:
await self._task_manager.cancel_task(self._idle_monitor_task)
def _initial_metrics_frame(self) -> MetricsFrame:
processors = self._pipeline.processors_with_metrics()
data = []
@@ -372,6 +415,7 @@ class PipelineTask(BaseTask):
self._clock.start()
self._maybe_start_heartbeat_tasks()
self._maybe_start_idle_task()
start_frame = StartFrame(
clock=self._clock,
@@ -445,6 +489,10 @@ class PipelineTask(BaseTask):
while True:
frame = await self._down_queue.get()
# Queue received frame to the idle queue so we can monitor idle
# pipelines.
await self._idle_queue.put(frame)
if isinstance(frame, self._reached_downstream_types):
await self._call_event_handler("on_frame_reached_downstream", frame)
@@ -482,6 +530,48 @@ class PipelineTask(BaseTask):
f"{self}: heartbeat frame not received for more than {wait_time} seconds"
)
async def _idle_monitor_handler(self):
"""This tasks monitors activity in the pipeline. If no frames are
received (heartbeats don't count) the pipeline is considered idle.
"""
running = True
last_frame_time = 0
while running:
try:
frame = await asyncio.wait_for(
self._idle_queue.get(), timeout=self._idle_timeout_secs
)
if isinstance(frame, StartFrame) or isinstance(frame, self._idle_timeout_frames):
# If we find a StartFrame or one of the frames that prevents a
# time out we update the time.
last_frame_time = time.time()
else:
# If we find any other frame we check if the pipeline is
# idle by checking the last time we received one of the
# valid frames.
diff_time = time.time() - last_frame_time
if diff_time >= self._idle_timeout_secs:
running = await self._idle_timeout_detected()
self._idle_queue.task_done()
except asyncio.TimeoutError:
running = await self._idle_timeout_detected()
async def _idle_timeout_detected(self) -> bool:
"""Logic for when the pipeline is idle.
Returns:
bool: Whther the pipeline task is being cancelled or not.
"""
await self._call_event_handler("on_idle_timeout")
if self._cancel_on_idle_timeout:
logger.warning(f"Idle pipeline detected, cancelling pipeline task...")
await self.cancel()
return False
return True
def _print_dangling_tasks(self):
tasks = [t.get_name() for t in self._task_manager.current_tasks()]
if tasks:

View File

@@ -101,7 +101,11 @@ async def run_test(
pipeline = Pipeline([source, processor, sink])
task = PipelineTask(pipeline, params=PipelineParams(start_metadata=start_metadata))
task = PipelineTask(
pipeline,
params=PipelineParams(start_metadata=start_metadata),
cancel_on_idle_timeout=False,
)
async def push_frames():
# Just give a little head start to the runner.

View File

@@ -5,6 +5,7 @@
#
import asyncio
import time
import unittest
from pipecat.frames.frames import EndFrame, HeartbeatFrame, StartFrame, TextFrame
@@ -100,7 +101,7 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
identity = IdentityFilter()
pipeline = Pipeline([identity])
task = PipelineTask(pipeline)
task = PipelineTask(pipeline, cancel_on_idle_timeout=False)
task.set_event_loop(asyncio.get_event_loop())
task.set_reached_upstream_filter((TextFrame,))
task.set_reached_downstream_filter((TextFrame,))
@@ -123,7 +124,7 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
await task.queue_frame(TextFrame(text="Hello Downstream!"))
try:
await asyncio.wait_for(task.run(), timeout=1.0)
await asyncio.wait_for(asyncio.shield(task.run()), timeout=1.0)
except asyncio.TimeoutError:
pass
@@ -149,6 +150,7 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
heartbeats_period_secs=0.2,
),
observers=[heartbeats_observer],
cancel_on_idle_timeout=False,
)
task.set_event_loop(asyncio.get_event_loop())
@@ -156,7 +158,90 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
await task.queue_frame(TextFrame(text="Hello!"))
try:
await asyncio.wait_for(task.run(), timeout=1.0)
await asyncio.wait_for(asyncio.shield(task.run()), timeout=1.0)
except asyncio.TimeoutError:
pass
assert heartbeats_counter == expected_heartbeats
async def test_idle_task(self):
identity = IdentityFilter()
pipeline = Pipeline([identity])
task = PipelineTask(pipeline, idle_timeout_secs=0.2)
task.set_event_loop(asyncio.get_event_loop())
await task.run()
assert True
async def test_no_idle_task(self):
identity = IdentityFilter()
pipeline = Pipeline([identity])
task = PipelineTask(pipeline, idle_timeout_secs=0.2, cancel_on_idle_timeout=False)
task.set_event_loop(asyncio.get_event_loop())
try:
await asyncio.wait_for(asyncio.shield(task.run()), timeout=0.3)
except asyncio.TimeoutError:
assert True
else:
assert False
async def test_idle_task_heartbeats(self):
identity = IdentityFilter()
pipeline = Pipeline([identity])
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_heartbeats=True,
heartbeats_period_secs=0.1,
),
idle_timeout_secs=0.3,
)
task.set_event_loop(asyncio.get_event_loop())
await task.run()
assert True
async def test_idle_task_event_handler(self):
identity = IdentityFilter()
pipeline = Pipeline([identity])
task = PipelineTask(pipeline, idle_timeout_secs=0.2, cancel_on_idle_timeout=False)
task.set_event_loop(asyncio.get_event_loop())
idle_timeout = False
@task.event_handler("on_idle_timeout")
async def on_idle_timeout(task: PipelineTask):
nonlocal idle_timeout
idle_timeout = True
await task.cancel()
await task.run()
assert True
async def test_idle_task_frames(self):
idle_timeout_secs = 0.2
sleep_time_secs = idle_timeout_secs / 2
identity = IdentityFilter()
pipeline = Pipeline([identity])
task = PipelineTask(
pipeline,
idle_timeout_secs=idle_timeout_secs,
idle_timeout_frames=(TextFrame,),
)
task.set_event_loop(asyncio.get_event_loop())
async def delayed_frames():
await asyncio.sleep(sleep_time_secs)
await task.queue_frame(TextFrame("Hello Pipecat!"))
await asyncio.sleep(sleep_time_secs)
await task.queue_frame(TextFrame("Hello Pipecat!"))
await asyncio.sleep(sleep_time_secs)
await task.queue_frame(TextFrame("Hello Pipecat!"))
start_time = time.time()
tasks = {asyncio.create_task(task.run()), asyncio.create_task(delayed_frames())}
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
diff_time = time.time() - start_time
self.assertGreater(diff_time, sleep_time_secs * 3)