Merge pull request #1023 from pipecat-ai/aleix/introduce-heartbeat-frames
introduce heartbeat frames
This commit is contained in:
14
CHANGELOG.md
14
CHANGELOG.md
@@ -12,10 +12,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
- Introduced pipeline frame observers. Observers can view all the frames that go
|
||||
through the pipeline without the need to inject processors in the
|
||||
pipeline. This can be useful, for example, to implement frame loggers or
|
||||
debuggers among other things.
|
||||
debuggers among other things. The example
|
||||
`examples/foundational/30-observer.py` shows how to add an observer to a
|
||||
pipeline for debugging.
|
||||
|
||||
- Added `30-observer.py` to show how to add an Observer to a pipeline for
|
||||
debugging.
|
||||
- Introduced heartbeat frames. The pipeline task can now push periodic
|
||||
heartbeats down the pipeline when `enable_heartbeats=True`. Heartbeats are
|
||||
system frames that are supposed to make it all the way to the end of the
|
||||
pipeline. When a heartbeat frame is received the traversing time (i.e. the
|
||||
time it took to go through the whole pipeline) will be displayed (with TRACE
|
||||
logging) otherwise a warning will be shown. The example
|
||||
`examples/foundational/31-heartbeats.py` shows how to enable heartbeats and
|
||||
forces warnings to be displayed.
|
||||
|
||||
- Added `OpenRouter` for OpenRouter integration with an OpenAI-compatible
|
||||
interface. Added foundational example `14m-function-calling-openrouter.py`.
|
||||
|
||||
43
examples/foundational/31-heartbeats.py
Normal file
43
examples/foundational/31-heartbeats.py
Normal file
@@ -0,0 +1,43 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import Frame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
class NullProcessor(FrameProcessor):
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
|
||||
async def main():
|
||||
"""This test shows heartbeat monitoring by displaying a warning when
|
||||
heartbeats are not received.
|
||||
|
||||
"""
|
||||
|
||||
pipeline = Pipeline([NullProcessor()])
|
||||
|
||||
task = PipelineTask(pipeline, PipelineParams(enable_heartbeats=True))
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -424,6 +424,16 @@ class FatalErrorFrame(ErrorFrame):
|
||||
fatal: bool = field(default=True, init=False)
|
||||
|
||||
|
||||
@dataclass
|
||||
class HeartbeatFrame(SystemFrame):
|
||||
"""This frame is used by the pipeline task as a mechanism to know if the
|
||||
pipeline is running properly.
|
||||
|
||||
"""
|
||||
|
||||
timestamp: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class EndTaskFrame(SystemFrame):
|
||||
"""This is used to notify the pipeline task that the pipeline should be
|
||||
|
||||
@@ -19,6 +19,7 @@ from pipecat.frames.frames import (
|
||||
EndTaskFrame,
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
HeartbeatFrame,
|
||||
MetricsFrame,
|
||||
StartFrame,
|
||||
StopTaskFrame,
|
||||
@@ -29,11 +30,15 @@ from pipecat.pipeline.base_pipeline import BasePipeline
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.utils.utils import obj_count, obj_id
|
||||
|
||||
HEARTBEAT_SECONDS = 1.0
|
||||
HEARTBEAT_MONITOR_SECONDS = HEARTBEAT_SECONDS * 5
|
||||
|
||||
|
||||
class PipelineParams(BaseModel):
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||
|
||||
allow_interruptions: bool = False
|
||||
enable_heartbeats: bool = False
|
||||
enable_metrics: bool = False
|
||||
enable_usage_metrics: bool = False
|
||||
send_initial_empty_metrics: bool = True
|
||||
@@ -58,25 +63,10 @@ class Source(FrameProcessor):
|
||||
|
||||
match direction:
|
||||
case FrameDirection.UPSTREAM:
|
||||
await self._handle_upstream_frame(frame)
|
||||
await self._up_queue.put(frame)
|
||||
case FrameDirection.DOWNSTREAM:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
async def _handle_upstream_frame(self, frame: Frame):
|
||||
if isinstance(frame, EndTaskFrame):
|
||||
# Tell the task we should end nicely.
|
||||
await self._up_queue.put(EndTaskFrame())
|
||||
elif isinstance(frame, CancelTaskFrame):
|
||||
# Tell the task we should end right away.
|
||||
await self._up_queue.put(CancelTaskFrame())
|
||||
elif isinstance(frame, ErrorFrame):
|
||||
logger.error(f"Error running app: {frame}")
|
||||
if frame.fatal:
|
||||
# Cancel all tasks downstream.
|
||||
await self.push_frame(CancelFrame())
|
||||
# Tell the task we should stop.
|
||||
await self._up_queue.put(StopTaskFrame())
|
||||
|
||||
|
||||
class Sink(FrameProcessor):
|
||||
"""This is the sink processor that is linked at the end of the pipeline
|
||||
@@ -91,10 +81,7 @@ class Sink(FrameProcessor):
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
# We really just want to know when the EndFrame reached the sink.
|
||||
if isinstance(frame, EndFrame):
|
||||
await self._down_queue.put(frame)
|
||||
await self._down_queue.put(frame)
|
||||
|
||||
|
||||
class Observer(BaseObserver):
|
||||
@@ -135,9 +122,18 @@ class PipelineTask:
|
||||
self._params = params
|
||||
self._finished = False
|
||||
|
||||
# This queue receives frames coming from the pipeline upstream.
|
||||
self._up_queue = asyncio.Queue()
|
||||
# This queue receives frames coming from the pipeline downstream.
|
||||
self._down_queue = asyncio.Queue()
|
||||
# This queue is the queue used to push frames to the pipeline.
|
||||
self._push_queue = asyncio.Queue()
|
||||
# This is the heartbeat queue. When a heartbeat frame is received in the
|
||||
# down queue we add it to the heartbeat queue for processing.
|
||||
self._heartbeat_queue = asyncio.Queue()
|
||||
# This event is used to indicate an EndFrame has been received in the
|
||||
# down queue.
|
||||
self._endframe_event = asyncio.Event()
|
||||
|
||||
self._source = Source(self._up_queue)
|
||||
self._source.link(pipeline)
|
||||
@@ -148,33 +144,49 @@ class PipelineTask:
|
||||
self._observer = Observer(params.observers)
|
||||
|
||||
def has_finished(self):
|
||||
"""Indicates whether the tasks has finished. That is, all processors
|
||||
have stopped.
|
||||
|
||||
"""
|
||||
return self._finished
|
||||
|
||||
async def stop_when_done(self):
|
||||
"""This is a helper function that sends an EndFrame to the pipeline in
|
||||
order to stop the task after everything in it has been processed.
|
||||
|
||||
"""
|
||||
logger.debug(f"Task {self} scheduled to stop when done")
|
||||
await self.queue_frame(EndFrame())
|
||||
|
||||
async def cancel(self):
|
||||
"""
|
||||
Stops the running pipeline immediately.
|
||||
"""
|
||||
logger.debug(f"Canceling pipeline task {self}")
|
||||
# Make sure everything is cleaned up downstream. This is sent
|
||||
# out-of-band from the main streaming task which is what we want since
|
||||
# we want to cancel right away.
|
||||
await self._source.push_frame(CancelFrame())
|
||||
self._process_push_task.cancel()
|
||||
self._process_up_task.cancel()
|
||||
await self._process_push_task
|
||||
await self._process_up_task
|
||||
await self._cancel_tasks(True)
|
||||
|
||||
async def run(self):
|
||||
self._process_up_task = asyncio.create_task(self._process_up_queue())
|
||||
self._process_push_task = asyncio.create_task(self._process_push_queue())
|
||||
await asyncio.gather(self._process_up_task, self._process_push_task)
|
||||
"""
|
||||
Starts running the given pipeline.
|
||||
"""
|
||||
tasks = self._create_tasks()
|
||||
await asyncio.gather(*tasks)
|
||||
self._finished = True
|
||||
|
||||
async def queue_frame(self, frame: Frame):
|
||||
"""
|
||||
Queue a frame to be pushed down the pipeline.
|
||||
"""
|
||||
await self._push_queue.put(frame)
|
||||
|
||||
async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]):
|
||||
"""
|
||||
Queues multiple frames to be pushed down the pipeline.
|
||||
"""
|
||||
if isinstance(frames, AsyncIterable):
|
||||
async for frame in frames:
|
||||
await self.queue_frame(frame)
|
||||
@@ -182,6 +194,41 @@ class PipelineTask:
|
||||
for frame in frames:
|
||||
await self.queue_frame(frame)
|
||||
|
||||
def _create_tasks(self):
|
||||
tasks = []
|
||||
self._process_up_task = asyncio.create_task(self._process_up_queue())
|
||||
self._process_down_task = asyncio.create_task(self._process_down_queue())
|
||||
self._process_push_task = asyncio.create_task(self._process_push_queue())
|
||||
|
||||
tasks = [self._process_up_task, self._process_down_task, self._process_push_task]
|
||||
|
||||
return tasks
|
||||
|
||||
def _maybe_start_heartbeat_tasks(self):
|
||||
if self._params.enable_heartbeats:
|
||||
self._heartbeat_push_task = asyncio.create_task(self._heartbeat_push_handler())
|
||||
self._heartbeat_monitor_task = asyncio.create_task(self._heartbeat_monitor_handler())
|
||||
|
||||
async def _cancel_tasks(self, cancel_push: bool):
|
||||
await self._maybe_cancel_heartbeat_tasks()
|
||||
|
||||
if cancel_push:
|
||||
self._process_push_task.cancel()
|
||||
await self._process_push_task
|
||||
|
||||
self._process_up_task.cancel()
|
||||
await self._process_up_task
|
||||
|
||||
self._process_down_task.cancel()
|
||||
await self._process_down_task
|
||||
|
||||
async def _maybe_cancel_heartbeat_tasks(self):
|
||||
if self._params.enable_heartbeats:
|
||||
self._heartbeat_push_task.cancel()
|
||||
await self._heartbeat_push_task
|
||||
self._heartbeat_monitor_task.cancel()
|
||||
await self._heartbeat_monitor_task
|
||||
|
||||
def _initial_metrics_frame(self) -> MetricsFrame:
|
||||
processors = self._pipeline.processors_with_metrics()
|
||||
data = []
|
||||
@@ -190,9 +237,20 @@ class PipelineTask:
|
||||
data.append(ProcessingMetricsData(processor=p.name, value=0.0))
|
||||
return MetricsFrame(data=data)
|
||||
|
||||
async def _wait_for_endframe(self):
|
||||
await self._endframe_event.wait()
|
||||
self._endframe_event.clear()
|
||||
|
||||
async def _process_push_queue(self):
|
||||
"""This is the task that runs the pipeline for the first time by sending
|
||||
a StartFrame and by pushing any other frames queued by the user. It runs
|
||||
until the tasks is canceled or stopped (e.g. with an EndFrame).
|
||||
|
||||
"""
|
||||
self._clock.start()
|
||||
|
||||
self._maybe_start_heartbeat_tasks()
|
||||
|
||||
start_frame = StartFrame(
|
||||
allow_interruptions=self._params.allow_interruptions,
|
||||
enable_metrics=self._params.enable_metrics,
|
||||
@@ -224,29 +282,91 @@ class PipelineTask:
|
||||
await self._source.cleanup()
|
||||
await self._pipeline.cleanup()
|
||||
await self._sink.cleanup()
|
||||
# We just enqueue None to terminate the task gracefully.
|
||||
self._process_up_task.cancel()
|
||||
await self._process_up_task
|
||||
|
||||
async def _wait_for_endframe(self):
|
||||
# NOTE(aleix): the Sink element just pushes EndFrames to the down queue,
|
||||
# so just wait for it. In the future we might do something else here,
|
||||
# but for now this is fine.
|
||||
await self._down_queue.get()
|
||||
# Finally, cancel internal tasks. We don't cancel the push tasks because
|
||||
# that's us.
|
||||
await self._cancel_tasks(False)
|
||||
|
||||
async def _process_up_queue(self):
|
||||
"""This is the task that processes frames coming upstream from the
|
||||
pipeline. These frames might indicate, for example, that we want the
|
||||
pipeline to be stopped (e.g. EndTaskFrame) in which case we would send
|
||||
an EndFrame down the pipeline.
|
||||
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
frame = await self._up_queue.get()
|
||||
if isinstance(frame, EndTaskFrame):
|
||||
# Tell the task we should end nicely.
|
||||
await self.queue_frame(EndFrame())
|
||||
elif isinstance(frame, CancelTaskFrame):
|
||||
# Tell the task we should end right away.
|
||||
await self.queue_frame(CancelFrame())
|
||||
elif isinstance(frame, StopTaskFrame):
|
||||
await self.queue_frame(StopTaskFrame())
|
||||
elif isinstance(frame, ErrorFrame):
|
||||
logger.error(f"Error running app: {frame}")
|
||||
if frame.fatal:
|
||||
# Cancel all tasks downstream.
|
||||
await self.queue_frame(CancelFrame())
|
||||
# Tell the task we should stop.
|
||||
await self.queue_frame(StopTaskFrame())
|
||||
self._up_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
|
||||
async def _process_down_queue(self):
|
||||
"""This tasks process frames coming downstream from the pipeline. For
|
||||
example, heartbeat frames or an EndFrame which would indicate all
|
||||
processors have handled the EndFrame and therefore we can exit the task
|
||||
cleanly.
|
||||
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
frame = await self._down_queue.get()
|
||||
if isinstance(frame, EndFrame):
|
||||
self._endframe_event.set()
|
||||
elif isinstance(frame, HeartbeatFrame):
|
||||
await self._heartbeat_queue.put(frame)
|
||||
self._down_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
|
||||
async def _heartbeat_push_handler(self):
|
||||
"""
|
||||
This tasks pushes a heartbeat frame every HEARTBEAT_SECONDS.
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
# Don't use `queue_frame()` because if an EndFrame is queued the
|
||||
# task will just stop waiting for the pipeline to finish not
|
||||
# allowing more frames to be pushed.
|
||||
await self._source.queue_frame(HeartbeatFrame(timestamp=self._clock.get_time()))
|
||||
await asyncio.sleep(HEARTBEAT_SECONDS)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
|
||||
async def _heartbeat_monitor_handler(self):
|
||||
"""This tasks monitors heartbeat frames. If a heartbeat frame has not
|
||||
been received for a long period a warning will be logged. It also logs
|
||||
the time that a heartbeat frame takes to processes, that is how long it
|
||||
takes for the heartbeat frame to traverse all the pipeline.
|
||||
|
||||
"""
|
||||
wait_time = HEARTBEAT_MONITOR_SECONDS
|
||||
while True:
|
||||
try:
|
||||
frame = await asyncio.wait_for(self._heartbeat_queue.get(), timeout=wait_time)
|
||||
process_time = (self._clock.get_time() - frame.timestamp) / 1_000_000_000
|
||||
logger.trace(f"{self}: heartbeat frame processed in {process_time} seconds")
|
||||
self._heartbeat_queue.task_done()
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(
|
||||
f"{self}: heartbeat frame not received for more than {wait_time} seconds"
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
@@ -260,7 +260,7 @@ class FrameProcessor:
|
||||
|
||||
async def __internal_push_frame(self, frame: Frame, direction: FrameDirection):
|
||||
try:
|
||||
timestamp = self._clock.get_time()
|
||||
timestamp = self._clock.get_time() if self._clock else 0
|
||||
if direction == FrameDirection.DOWNSTREAM and self._next:
|
||||
logger.trace(f"Pushing {frame} from {self} to {self._next}")
|
||||
if self._observer:
|
||||
|
||||
Reference in New Issue
Block a user