From 3e4020cdbaae3f3f8d8e7bd1efec65cbe5119342 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 17 Jan 2025 11:13:33 -0800 Subject: [PATCH] task: add TaskObserver and avoid pipeline blocking Observers now process frames in separate tasks. This avoids blocking the pipeline while the observer is processing the frame. --- src/pipecat/pipeline/task.py | 28 ++------ src/pipecat/pipeline/task_observer.py | 97 +++++++++++++++++++++++++++ 2 files changed, 101 insertions(+), 24 deletions(-) create mode 100644 src/pipecat/pipeline/task_observer.py diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 2fb13f627..9263352e4 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -27,6 +27,7 @@ from pipecat.frames.frames import ( from pipecat.metrics.metrics import ProcessingMetricsData, TTFBMetricsData from pipecat.observers.base_observer import BaseObserver from pipecat.pipeline.base_pipeline import BasePipeline +from pipecat.pipeline.task_observer import TaskObserver from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.utils.utils import obj_count, obj_id @@ -84,29 +85,6 @@ class Sink(FrameProcessor): await self._down_queue.put(frame) -class Observer(BaseObserver): - """This is a pipeline frame observer that is used as a proxy to the user - provided observers. That is, this is the only observer passed to the frame - processors. Then, every time a frame is pushed this observer will call all - the observers registered to the pipeline task. - - """ - - def __init__(self, observers: List[BaseObserver] = []): - self._observers = observers - - async def on_push_frame( - self, - src: FrameProcessor, - dst: FrameProcessor, - frame: Frame, - direction: FrameDirection, - timestamp: int, - ): - for observer in self._observers: - await observer.on_push_frame(src, dst, frame, direction, timestamp) - - class PipelineTask: def __init__( self, @@ -141,7 +119,7 @@ class PipelineTask: self._sink = Sink(self._down_queue) pipeline.link(self._sink) - self._observer = Observer(params.observers) + self._observer = TaskObserver(params.observers) def has_finished(self): """Indicates whether the tasks has finished. That is, all processors @@ -222,6 +200,8 @@ class PipelineTask: self._process_down_task.cancel() await self._process_down_task + await self._observer.stop() + async def _maybe_cancel_heartbeat_tasks(self): if self._params.enable_heartbeats: self._heartbeat_push_task.cancel() diff --git a/src/pipecat/pipeline/task_observer.py b/src/pipecat/pipeline/task_observer.py new file mode 100644 index 000000000..2fd13f517 --- /dev/null +++ b/src/pipecat/pipeline/task_observer.py @@ -0,0 +1,97 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +from typing import List + +from attr import dataclass + +from pipecat.frames.frames import Frame +from pipecat.observers.base_observer import BaseObserver +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor + + +@dataclass +class Proxy: + """This is the data we receive from the main observer and that we put into + a queue for later processing. + + """ + + queue: asyncio.Queue + task: asyncio.Task + observer: BaseObserver + + +@dataclass +class ObserverData: + """This is the data we receive from the main observer and that we put into a + proxy queue for later processing. + + """ + + src: FrameProcessor + dst: FrameProcessor + frame: Frame + direction: FrameDirection + timestamp: int + + +class TaskObserver(BaseObserver): + """This is a pipeline frame observer that is meant to be used as a proxy to + the user provided observers. That is, this is the observer that should be + passed to the frame processors. Then, every time a frame is pushed this + observer will call all the observers registered to the pipeline task. + + This observer makes sure that passing frames to observers doesn't block the + pipeline by creating a queue and a task for each user observer. When a frame + is received, it will be put in a queue for efficiency and later processed by + each task. + + """ + + def __init__(self, observers: List[BaseObserver] = []): + self._proxies: List[Proxy] = self._create_proxies(observers) + + async def stop(self): + """Stops all proxy observer tasks.""" + for proxy in self._proxies: + proxy.task.cancel() + await proxy.task + + async def on_push_frame( + self, + src: FrameProcessor, + dst: FrameProcessor, + frame: Frame, + direction: FrameDirection, + timestamp: int, + ): + for proxy in self._proxies: + await proxy.queue.put( + ObserverData( + src=src, dst=dst, frame=frame, direction=direction, timestamp=timestamp + ) + ) + + def _create_proxies(self, observers) -> List[Proxy]: + proxies = [] + for observer in observers: + queue = asyncio.Queue() + task = asyncio.create_task(self._proxy_task_handler(queue, observer)) + proxy = Proxy(queue=queue, task=task, observer=observer) + proxies.append(proxy) + return proxies + + async def _proxy_task_handler(self, queue: asyncio.Queue, observer: BaseObserver): + while True: + try: + data = await queue.get() + await observer.on_push_frame( + data.src, data.dst, data.frame, data.direction, data.timestamp + ) + except asyncio.CancelledError: + break