From ff8aa6894238b7e5a1d3ec8c98237f7b6eb10061 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Thu, 16 Jan 2025 18:56:01 -0800 Subject: [PATCH 1/7] introduce heartbeat frames --- CHANGELOG.md | 7 ++ src/pipecat/frames/frames.py | 10 ++ src/pipecat/pipeline/task.py | 184 ++++++++++++++++++++++++++++------- 3 files changed, 165 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a47979a85..5246be593 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 pipeline. This can be useful, for example, to implement frame loggers or debuggers among other things. +- Introduced heartbeat frames. The pipeline task can now push periodic + heartbeats down the pipeline when `enable_heartbeats=True`. Heartbeats are + system frames that are supposed to make it all the way to the end of the + pipeline. When a heartbeat frame is received the traversing time (i.e. the time + it took to go through the whole pipeline) will be displayed (with TRACE + logging) otherwise a warning will be shown. + - Added `30-observer.py` to show how to add an Observer to a pipeline for debugging. diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index e2ac25e7a..3df7d0a33 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -424,6 +424,16 @@ class FatalErrorFrame(ErrorFrame): fatal: bool = field(default=True, init=False) +@dataclass +class HeartbeatFrame(SystemFrame): + """This frame is used by the pipeline task as a mechanism to know if the + pipeline is running properly. + + """ + + timestamp: int + + @dataclass class EndTaskFrame(SystemFrame): """This is used to notify the pipeline task that the pipeline should be diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index fc404f75b..e04e0d285 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -19,6 +19,7 @@ from pipecat.frames.frames import ( EndTaskFrame, ErrorFrame, Frame, + HeartbeatFrame, MetricsFrame, StartFrame, StopTaskFrame, @@ -29,11 +30,14 @@ from pipecat.pipeline.base_pipeline import BasePipeline from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.utils.utils import obj_count, obj_id +HEARTBEAT_SECONDS = 1.0 + class PipelineParams(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) allow_interruptions: bool = False + enable_heartbeats: bool = False enable_metrics: bool = False enable_usage_metrics: bool = False send_initial_empty_metrics: bool = True @@ -58,25 +62,10 @@ class Source(FrameProcessor): match direction: case FrameDirection.UPSTREAM: - await self._handle_upstream_frame(frame) + await self._up_queue.put(frame) case FrameDirection.DOWNSTREAM: await self.push_frame(frame, direction) - async def _handle_upstream_frame(self, frame: Frame): - if isinstance(frame, EndTaskFrame): - # Tell the task we should end nicely. - await self._up_queue.put(EndTaskFrame()) - elif isinstance(frame, CancelTaskFrame): - # Tell the task we should end right away. - await self._up_queue.put(CancelTaskFrame()) - elif isinstance(frame, ErrorFrame): - logger.error(f"Error running app: {frame}") - if frame.fatal: - # Cancel all tasks downstream. - await self.push_frame(CancelFrame()) - # Tell the task we should stop. - await self._up_queue.put(StopTaskFrame()) - class Sink(FrameProcessor): """This is the sink processor that is linked at the end of the pipeline @@ -91,10 +80,7 @@ class Sink(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - - # We really just want to know when the EndFrame reached the sink. - if isinstance(frame, EndFrame): - await self._down_queue.put(frame) + await self._down_queue.put(frame) class Observer(BaseObserver): @@ -135,9 +121,18 @@ class PipelineTask: self._params = params self._finished = False + # This queue receives frames coming from the pipeline upstream. self._up_queue = asyncio.Queue() + # This queue receives frames coming from the pipeline downstream. self._down_queue = asyncio.Queue() + # This queue is the queue used to push frames to the pipeline. self._push_queue = asyncio.Queue() + # This is the heartbeat queue. When a heartbeat frame is received in the + # down queue we add it to the heartbeat queue for processing. + self._heartbeat_queue = asyncio.Queue() + # This event is used to indicate an EndFrame has been received in the + # down queue. + self._endframe_event = asyncio.Event() self._source = Source(self._up_queue) self._source.link(pipeline) @@ -148,33 +143,49 @@ class PipelineTask: self._observer = Observer(params.observers) def has_finished(self): + """Indicates whether the tasks has finished. That is, all processors + have stopped. + + """ return self._finished async def stop_when_done(self): + """This is a helper function that sends an EndFrame to the pipeline in + order to stop the task after everything in it has been processed. + + """ logger.debug(f"Task {self} scheduled to stop when done") await self.queue_frame(EndFrame()) async def cancel(self): + """ + Stops the running pipeline immediately. + """ logger.debug(f"Canceling pipeline task {self}") # Make sure everything is cleaned up downstream. This is sent # out-of-band from the main streaming task which is what we want since # we want to cancel right away. await self._source.push_frame(CancelFrame()) - self._process_push_task.cancel() - self._process_up_task.cancel() - await self._process_push_task - await self._process_up_task + await self._cancel_tasks(True) async def run(self): - self._process_up_task = asyncio.create_task(self._process_up_queue()) - self._process_push_task = asyncio.create_task(self._process_push_queue()) - await asyncio.gather(self._process_up_task, self._process_push_task) + """ + Starts running the given pipeline. + """ + tasks = self._create_tasks() + await asyncio.gather(*tasks) self._finished = True async def queue_frame(self, frame: Frame): + """ + Queue a frame to be pushed down the pipeline. + """ await self._push_queue.put(frame) async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]): + """ + Queues multiple frames to be pushed down the pipeline. + """ if isinstance(frames, AsyncIterable): async for frame in frames: await self.queue_frame(frame) @@ -182,6 +193,39 @@ class PipelineTask: for frame in frames: await self.queue_frame(frame) + def _create_tasks(self): + tasks = [] + self._process_up_task = asyncio.create_task(self._process_up_queue()) + self._process_down_task = asyncio.create_task(self._process_down_queue()) + self._process_push_task = asyncio.create_task(self._process_push_queue()) + + tasks = [self._process_up_task, self._process_down_task, self._process_push_task] + + if self._params.enable_heartbeats: + self._heartbeat_push_task = asyncio.create_task(self._heartbeat_push_handler()) + self._heartbeat_monitor_task = asyncio.create_task(self._heartbeat_monitor_handler()) + tasks.append(self._heartbeat_push_task) + tasks.append(self._heartbeat_monitor_task) + + return tasks + + async def _cancel_tasks(self, cancel_push: bool): + if cancel_push: + self._process_push_task.cancel() + await self._process_push_task + + self._process_up_task.cancel() + await self._process_up_task + + self._process_down_task.cancel() + await self._process_down_task + + if self._params.enable_heartbeats: + self._heartbeat_push_task.cancel() + await self._heartbeat_push_task + self._heartbeat_monitor_task.cancel() + await self._heartbeat_monitor_task + def _initial_metrics_frame(self) -> MetricsFrame: processors = self._pipeline.processors_with_metrics() data = [] @@ -190,7 +234,16 @@ class PipelineTask: data.append(ProcessingMetricsData(processor=p.name, value=0.0)) return MetricsFrame(data=data) + async def _wait_for_endframe(self): + await self._endframe_event.wait() + self._endframe_event.clear() + async def _process_push_queue(self): + """This is the task that runs the pipeline for the first time by sending + a StartFrame and by pushing any other frames queued by the user. It runs + until the tasks is canceled or stopped (e.g. with an EndFrame). + + """ self._clock.start() start_frame = StartFrame( @@ -224,29 +277,88 @@ class PipelineTask: await self._source.cleanup() await self._pipeline.cleanup() await self._sink.cleanup() - # We just enqueue None to terminate the task gracefully. - self._process_up_task.cancel() - await self._process_up_task - - async def _wait_for_endframe(self): - # NOTE(aleix): the Sink element just pushes EndFrames to the down queue, - # so just wait for it. In the future we might do something else here, - # but for now this is fine. - await self._down_queue.get() + # Finally, cancel internal tasks. We don't cancel the push tasks because + # that's us. + await self._cancel_tasks(False) async def _process_up_queue(self): + """This is the task that processes frames coming upstream from the + pipeline. These frames might indicate, for example, that we want the + pipeline to be stopped (e.g. EndTaskFrame) in which case we would send + an EndFrame down the pipeline. + + """ while True: try: frame = await self._up_queue.get() if isinstance(frame, EndTaskFrame): + # Tell the task we should end nicely. await self.queue_frame(EndFrame()) elif isinstance(frame, CancelTaskFrame): + # Tell the task we should end right away. await self.queue_frame(CancelFrame()) elif isinstance(frame, StopTaskFrame): await self.queue_frame(StopTaskFrame()) + elif isinstance(frame, ErrorFrame): + logger.error(f"Error running app: {frame}") + if frame.fatal: + # Cancel all tasks downstream. + await self.queue_frame(CancelFrame()) + # Tell the task we should stop. + await self.queue_frame(StopTaskFrame()) self._up_queue.task_done() except asyncio.CancelledError: break + async def _process_down_queue(self): + """This tasks process frames coming downstream from the pipeline. For + example, heartbeat frames or an EndFrame which would indicate all + processors have handled the EndFrame and therefore we can exit the task + cleanly. + + """ + while True: + try: + frame = await self._down_queue.get() + if isinstance(frame, EndFrame): + self._endframe_event.set() + elif isinstance(frame, HeartbeatFrame): + await self._heartbeat_queue.put(frame) + self._down_queue.task_done() + except asyncio.CancelledError: + break + + async def _heartbeat_push_handler(self): + """ + This tasks pushes a heartbeat frame every HEARTBEAT_SECONDS. + """ + while True: + try: + await self.queue_frame(HeartbeatFrame(timestamp=self._clock.get_time())) + await asyncio.sleep(HEARTBEAT_SECONDS) + except asyncio.CancelledError: + break + + async def _heartbeat_monitor_handler(self): + """This tasks monitors heartbeat frames. If a heartbeat frame has not + been received for a long period a warning will be logged. It also logs + the time that a heartbeat frame takes to processes, that is how long it + takes for the heartbeat frame to traverse all the pipeline. + + """ + wait_time = HEARTBEAT_SECONDS * 2 + while True: + try: + frame = await asyncio.wait_for(self._heartbeat_queue.get(), timeout=wait_time) + process_time = (self._clock.get_time() - frame.timestamp) / 1_000_000_000 + logger.trace(f"{self}: heartbeat frame processed in {process_time} seconds") + self._heartbeat_queue.task_done() + except asyncio.TimeoutError: + logger.warning( + f"{self}: heartbeat frame not received for more than {wait_time} seconds" + ) + except asyncio.CancelledError: + break + def __str__(self): return self.name From 2503f761078fedc14dd1b655f44efb918440db26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Thu, 16 Jan 2025 18:56:21 -0800 Subject: [PATCH 2/7] examples: add 31-heartbeats.py --- CHANGELOG.md | 15 ++++----- examples/foundational/31-heartbeats.py | 43 ++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 7 deletions(-) create mode 100644 examples/foundational/31-heartbeats.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 5246be593..27a620e5d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,17 +12,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Introduced pipeline frame observers. Observers can view all the frames that go through the pipeline without the need to inject processors in the pipeline. This can be useful, for example, to implement frame loggers or - debuggers among other things. + debuggers among other things. The example + `examples/foundational/30-observer.py` shows how to add an observer to a + pipeline for debugging. - Introduced heartbeat frames. The pipeline task can now push periodic heartbeats down the pipeline when `enable_heartbeats=True`. Heartbeats are system frames that are supposed to make it all the way to the end of the - pipeline. When a heartbeat frame is received the traversing time (i.e. the time - it took to go through the whole pipeline) will be displayed (with TRACE - logging) otherwise a warning will be shown. - -- Added `30-observer.py` to show how to add an Observer to a pipeline for - debugging. + pipeline. When a heartbeat frame is received the traversing time (i.e. the + time it took to go through the whole pipeline) will be displayed (with TRACE + logging) otherwise a warning will be shown. The example + `examples/foundational/31-heartbeats.py` shows how to enable heartbeats and + forces warnings to be displayed. - Added `OpenRouter` for OpenRouter integration with an OpenAI-compatible interface. Added foundational example `14m-function-calling-openrouter.py`. diff --git a/examples/foundational/31-heartbeats.py b/examples/foundational/31-heartbeats.py new file mode 100644 index 000000000..fbb959519 --- /dev/null +++ b/examples/foundational/31-heartbeats.py @@ -0,0 +1,43 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import sys + +from loguru import logger + +from pipecat.frames.frames import Frame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +class NullProcessor(FrameProcessor): + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + +async def main(): + """This test shows heartbeat monitoring by displaying a warning when + heartbeats are not received. + + """ + + pipeline = Pipeline([NullProcessor()]) + + task = PipelineTask(pipeline, PipelineParams(enable_heartbeats=True)) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) From f22a00570d96480146093203d12a526e859a99db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 17 Jan 2025 10:03:13 -0800 Subject: [PATCH 3/7] task: start heartbeats task when push task starts --- src/pipecat/pipeline/task.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index e04e0d285..f0e2089e4 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -201,15 +201,16 @@ class PipelineTask: tasks = [self._process_up_task, self._process_down_task, self._process_push_task] + return tasks + + def _maybe_start_heartbeat_tasks(self): if self._params.enable_heartbeats: self._heartbeat_push_task = asyncio.create_task(self._heartbeat_push_handler()) self._heartbeat_monitor_task = asyncio.create_task(self._heartbeat_monitor_handler()) - tasks.append(self._heartbeat_push_task) - tasks.append(self._heartbeat_monitor_task) - - return tasks async def _cancel_tasks(self, cancel_push: bool): + await self._maybe_cancel_heartbeat_tasks() + if cancel_push: self._process_push_task.cancel() await self._process_push_task @@ -220,6 +221,7 @@ class PipelineTask: self._process_down_task.cancel() await self._process_down_task + async def _maybe_cancel_heartbeat_tasks(self): if self._params.enable_heartbeats: self._heartbeat_push_task.cancel() await self._heartbeat_push_task @@ -246,6 +248,8 @@ class PipelineTask: """ self._clock.start() + self._maybe_start_heartbeat_tasks() + start_frame = StartFrame( allow_interruptions=self._params.allow_interruptions, enable_metrics=self._params.enable_metrics, From da0c4cfd99584269bde678e470b3f0abed757382 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 17 Jan 2025 10:04:05 -0800 Subject: [PATCH 4/7] task: increase heartbeat monitoring to 5 seconds --- src/pipecat/pipeline/task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index f0e2089e4..a77997d08 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -350,7 +350,7 @@ class PipelineTask: takes for the heartbeat frame to traverse all the pipeline. """ - wait_time = HEARTBEAT_SECONDS * 2 + wait_time = HEARTBEAT_SECONDS * 5 while True: try: frame = await asyncio.wait_for(self._heartbeat_queue.get(), timeout=wait_time) From 4b3c776f582ce708ddb2e2b93fb772a499649dd2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 17 Jan 2025 10:04:24 -0800 Subject: [PATCH 5/7] task: don't use push queue to send a heartbeat This is because we might be waiting for the EndFrame. Currently, if we push an EndFrame to the task, the task will block until the EndFrame traverses all the pipeline. --- src/pipecat/pipeline/task.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index a77997d08..9a6d344ea 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -338,7 +338,10 @@ class PipelineTask: """ while True: try: - await self.queue_frame(HeartbeatFrame(timestamp=self._clock.get_time())) + # Don't use `queue_frame()` because if an EndFrame is queued the + # task will just stop waiting for the pipeline to finish not + # allowing more frames to be pushed. + await self._source.queue_frame(HeartbeatFrame(timestamp=self._clock.get_time())) await asyncio.sleep(HEARTBEAT_SECONDS) except asyncio.CancelledError: break From 477d0d154b0ef7a7e29918212c04f7877eca95b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 17 Jan 2025 10:05:23 -0800 Subject: [PATCH 6/7] frame_processor: make sure clock is initialized --- src/pipecat/processors/frame_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 47863017e..ae830c52c 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -260,7 +260,7 @@ class FrameProcessor: async def __internal_push_frame(self, frame: Frame, direction: FrameDirection): try: - timestamp = self._clock.get_time() + timestamp = self._clock.get_time() if self._clock else 0 if direction == FrameDirection.DOWNSTREAM and self._next: logger.trace(f"Pushing {frame} from {self} to {self._next}") if self._observer: From 45cbad5b3e2238aae51fc859bd0a23d686de917a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 17 Jan 2025 10:11:28 -0800 Subject: [PATCH 7/7] task: add HEARTBEAT_MONITOR_SECONDS --- src/pipecat/pipeline/task.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 9a6d344ea..2fb13f627 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -31,6 +31,7 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.utils.utils import obj_count, obj_id HEARTBEAT_SECONDS = 1.0 +HEARTBEAT_MONITOR_SECONDS = HEARTBEAT_SECONDS * 5 class PipelineParams(BaseModel): @@ -353,7 +354,7 @@ class PipelineTask: takes for the heartbeat frame to traverse all the pipeline. """ - wait_time = HEARTBEAT_SECONDS * 5 + wait_time = HEARTBEAT_MONITOR_SECONDS while True: try: frame = await asyncio.wait_for(self._heartbeat_queue.get(), timeout=wait_time)