diff --git a/CHANGELOG.md b/CHANGELOG.md index 07696a739..57104786a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,10 +12,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Introduced pipeline frame observers. Observers can view all the frames that go through the pipeline without the need to inject processors in the pipeline. This can be useful, for example, to implement frame loggers or - debuggers among other things. + debuggers among other things. The example + `examples/foundational/30-observer.py` shows how to add an observer to a + pipeline for debugging. -- Added `30-observer.py` to show how to add an Observer to a pipeline for - debugging. +- Introduced heartbeat frames. The pipeline task can now push periodic + heartbeats down the pipeline when `enable_heartbeats=True`. Heartbeats are + system frames that are supposed to make it all the way to the end of the + pipeline. When a heartbeat frame is received the traversing time (i.e. the + time it took to go through the whole pipeline) will be displayed (with TRACE + logging) otherwise a warning will be shown. The example + `examples/foundational/31-heartbeats.py` shows how to enable heartbeats and + forces warnings to be displayed. - Added `OpenRouter` for OpenRouter integration with an OpenAI-compatible interface. Added foundational example `14m-function-calling-openrouter.py`. diff --git a/examples/foundational/31-heartbeats.py b/examples/foundational/31-heartbeats.py new file mode 100644 index 000000000..fbb959519 --- /dev/null +++ b/examples/foundational/31-heartbeats.py @@ -0,0 +1,43 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import sys + +from loguru import logger + +from pipecat.frames.frames import Frame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +class NullProcessor(FrameProcessor): + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + +async def main(): + """This test shows heartbeat monitoring by displaying a warning when + heartbeats are not received. + + """ + + pipeline = Pipeline([NullProcessor()]) + + task = PipelineTask(pipeline, PipelineParams(enable_heartbeats=True)) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index e2ac25e7a..3df7d0a33 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -424,6 +424,16 @@ class FatalErrorFrame(ErrorFrame): fatal: bool = field(default=True, init=False) +@dataclass +class HeartbeatFrame(SystemFrame): + """This frame is used by the pipeline task as a mechanism to know if the + pipeline is running properly. + + """ + + timestamp: int + + @dataclass class EndTaskFrame(SystemFrame): """This is used to notify the pipeline task that the pipeline should be diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index fc404f75b..2fb13f627 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -19,6 +19,7 @@ from pipecat.frames.frames import ( EndTaskFrame, ErrorFrame, Frame, + HeartbeatFrame, MetricsFrame, StartFrame, StopTaskFrame, @@ -29,11 +30,15 @@ from pipecat.pipeline.base_pipeline import BasePipeline from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.utils.utils import obj_count, obj_id +HEARTBEAT_SECONDS = 1.0 +HEARTBEAT_MONITOR_SECONDS = HEARTBEAT_SECONDS * 5 + class PipelineParams(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) allow_interruptions: bool = False + enable_heartbeats: bool = False enable_metrics: bool = False enable_usage_metrics: bool = False send_initial_empty_metrics: bool = True @@ -58,25 +63,10 @@ class Source(FrameProcessor): match direction: case FrameDirection.UPSTREAM: - await self._handle_upstream_frame(frame) + await self._up_queue.put(frame) case FrameDirection.DOWNSTREAM: await self.push_frame(frame, direction) - async def _handle_upstream_frame(self, frame: Frame): - if isinstance(frame, EndTaskFrame): - # Tell the task we should end nicely. - await self._up_queue.put(EndTaskFrame()) - elif isinstance(frame, CancelTaskFrame): - # Tell the task we should end right away. - await self._up_queue.put(CancelTaskFrame()) - elif isinstance(frame, ErrorFrame): - logger.error(f"Error running app: {frame}") - if frame.fatal: - # Cancel all tasks downstream. - await self.push_frame(CancelFrame()) - # Tell the task we should stop. - await self._up_queue.put(StopTaskFrame()) - class Sink(FrameProcessor): """This is the sink processor that is linked at the end of the pipeline @@ -91,10 +81,7 @@ class Sink(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - - # We really just want to know when the EndFrame reached the sink. - if isinstance(frame, EndFrame): - await self._down_queue.put(frame) + await self._down_queue.put(frame) class Observer(BaseObserver): @@ -135,9 +122,18 @@ class PipelineTask: self._params = params self._finished = False + # This queue receives frames coming from the pipeline upstream. self._up_queue = asyncio.Queue() + # This queue receives frames coming from the pipeline downstream. self._down_queue = asyncio.Queue() + # This queue is the queue used to push frames to the pipeline. self._push_queue = asyncio.Queue() + # This is the heartbeat queue. When a heartbeat frame is received in the + # down queue we add it to the heartbeat queue for processing. + self._heartbeat_queue = asyncio.Queue() + # This event is used to indicate an EndFrame has been received in the + # down queue. + self._endframe_event = asyncio.Event() self._source = Source(self._up_queue) self._source.link(pipeline) @@ -148,33 +144,49 @@ class PipelineTask: self._observer = Observer(params.observers) def has_finished(self): + """Indicates whether the tasks has finished. That is, all processors + have stopped. + + """ return self._finished async def stop_when_done(self): + """This is a helper function that sends an EndFrame to the pipeline in + order to stop the task after everything in it has been processed. + + """ logger.debug(f"Task {self} scheduled to stop when done") await self.queue_frame(EndFrame()) async def cancel(self): + """ + Stops the running pipeline immediately. + """ logger.debug(f"Canceling pipeline task {self}") # Make sure everything is cleaned up downstream. This is sent # out-of-band from the main streaming task which is what we want since # we want to cancel right away. await self._source.push_frame(CancelFrame()) - self._process_push_task.cancel() - self._process_up_task.cancel() - await self._process_push_task - await self._process_up_task + await self._cancel_tasks(True) async def run(self): - self._process_up_task = asyncio.create_task(self._process_up_queue()) - self._process_push_task = asyncio.create_task(self._process_push_queue()) - await asyncio.gather(self._process_up_task, self._process_push_task) + """ + Starts running the given pipeline. + """ + tasks = self._create_tasks() + await asyncio.gather(*tasks) self._finished = True async def queue_frame(self, frame: Frame): + """ + Queue a frame to be pushed down the pipeline. + """ await self._push_queue.put(frame) async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]): + """ + Queues multiple frames to be pushed down the pipeline. + """ if isinstance(frames, AsyncIterable): async for frame in frames: await self.queue_frame(frame) @@ -182,6 +194,41 @@ class PipelineTask: for frame in frames: await self.queue_frame(frame) + def _create_tasks(self): + tasks = [] + self._process_up_task = asyncio.create_task(self._process_up_queue()) + self._process_down_task = asyncio.create_task(self._process_down_queue()) + self._process_push_task = asyncio.create_task(self._process_push_queue()) + + tasks = [self._process_up_task, self._process_down_task, self._process_push_task] + + return tasks + + def _maybe_start_heartbeat_tasks(self): + if self._params.enable_heartbeats: + self._heartbeat_push_task = asyncio.create_task(self._heartbeat_push_handler()) + self._heartbeat_monitor_task = asyncio.create_task(self._heartbeat_monitor_handler()) + + async def _cancel_tasks(self, cancel_push: bool): + await self._maybe_cancel_heartbeat_tasks() + + if cancel_push: + self._process_push_task.cancel() + await self._process_push_task + + self._process_up_task.cancel() + await self._process_up_task + + self._process_down_task.cancel() + await self._process_down_task + + async def _maybe_cancel_heartbeat_tasks(self): + if self._params.enable_heartbeats: + self._heartbeat_push_task.cancel() + await self._heartbeat_push_task + self._heartbeat_monitor_task.cancel() + await self._heartbeat_monitor_task + def _initial_metrics_frame(self) -> MetricsFrame: processors = self._pipeline.processors_with_metrics() data = [] @@ -190,9 +237,20 @@ class PipelineTask: data.append(ProcessingMetricsData(processor=p.name, value=0.0)) return MetricsFrame(data=data) + async def _wait_for_endframe(self): + await self._endframe_event.wait() + self._endframe_event.clear() + async def _process_push_queue(self): + """This is the task that runs the pipeline for the first time by sending + a StartFrame and by pushing any other frames queued by the user. It runs + until the tasks is canceled or stopped (e.g. with an EndFrame). + + """ self._clock.start() + self._maybe_start_heartbeat_tasks() + start_frame = StartFrame( allow_interruptions=self._params.allow_interruptions, enable_metrics=self._params.enable_metrics, @@ -224,29 +282,91 @@ class PipelineTask: await self._source.cleanup() await self._pipeline.cleanup() await self._sink.cleanup() - # We just enqueue None to terminate the task gracefully. - self._process_up_task.cancel() - await self._process_up_task - - async def _wait_for_endframe(self): - # NOTE(aleix): the Sink element just pushes EndFrames to the down queue, - # so just wait for it. In the future we might do something else here, - # but for now this is fine. - await self._down_queue.get() + # Finally, cancel internal tasks. We don't cancel the push tasks because + # that's us. + await self._cancel_tasks(False) async def _process_up_queue(self): + """This is the task that processes frames coming upstream from the + pipeline. These frames might indicate, for example, that we want the + pipeline to be stopped (e.g. EndTaskFrame) in which case we would send + an EndFrame down the pipeline. + + """ while True: try: frame = await self._up_queue.get() if isinstance(frame, EndTaskFrame): + # Tell the task we should end nicely. await self.queue_frame(EndFrame()) elif isinstance(frame, CancelTaskFrame): + # Tell the task we should end right away. await self.queue_frame(CancelFrame()) elif isinstance(frame, StopTaskFrame): await self.queue_frame(StopTaskFrame()) + elif isinstance(frame, ErrorFrame): + logger.error(f"Error running app: {frame}") + if frame.fatal: + # Cancel all tasks downstream. + await self.queue_frame(CancelFrame()) + # Tell the task we should stop. + await self.queue_frame(StopTaskFrame()) self._up_queue.task_done() except asyncio.CancelledError: break + async def _process_down_queue(self): + """This tasks process frames coming downstream from the pipeline. For + example, heartbeat frames or an EndFrame which would indicate all + processors have handled the EndFrame and therefore we can exit the task + cleanly. + + """ + while True: + try: + frame = await self._down_queue.get() + if isinstance(frame, EndFrame): + self._endframe_event.set() + elif isinstance(frame, HeartbeatFrame): + await self._heartbeat_queue.put(frame) + self._down_queue.task_done() + except asyncio.CancelledError: + break + + async def _heartbeat_push_handler(self): + """ + This tasks pushes a heartbeat frame every HEARTBEAT_SECONDS. + """ + while True: + try: + # Don't use `queue_frame()` because if an EndFrame is queued the + # task will just stop waiting for the pipeline to finish not + # allowing more frames to be pushed. + await self._source.queue_frame(HeartbeatFrame(timestamp=self._clock.get_time())) + await asyncio.sleep(HEARTBEAT_SECONDS) + except asyncio.CancelledError: + break + + async def _heartbeat_monitor_handler(self): + """This tasks monitors heartbeat frames. If a heartbeat frame has not + been received for a long period a warning will be logged. It also logs + the time that a heartbeat frame takes to processes, that is how long it + takes for the heartbeat frame to traverse all the pipeline. + + """ + wait_time = HEARTBEAT_MONITOR_SECONDS + while True: + try: + frame = await asyncio.wait_for(self._heartbeat_queue.get(), timeout=wait_time) + process_time = (self._clock.get_time() - frame.timestamp) / 1_000_000_000 + logger.trace(f"{self}: heartbeat frame processed in {process_time} seconds") + self._heartbeat_queue.task_done() + except asyncio.TimeoutError: + logger.warning( + f"{self}: heartbeat frame not received for more than {wait_time} seconds" + ) + except asyncio.CancelledError: + break + def __str__(self): return self.name diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 47863017e..ae830c52c 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -260,7 +260,7 @@ class FrameProcessor: async def __internal_push_frame(self, frame: Frame, direction: FrameDirection): try: - timestamp = self._clock.get_time() + timestamp = self._clock.get_time() if self._clock else 0 if direction == FrameDirection.DOWNSTREAM and self._next: logger.trace(f"Pushing {frame} from {self} to {self._next}") if self._observer: