task: add TaskObserver and avoid pipeline blocking

Observers now process frames in separate tasks. This avoids blocking the
pipeline while the observer is processing the frame.
This commit is contained in:
Aleix Conchillo Flaqué
2025-01-17 11:13:33 -08:00
parent 4f883ee31f
commit 3e4020cdba
2 changed files with 101 additions and 24 deletions

View File

@@ -27,6 +27,7 @@ from pipecat.frames.frames import (
from pipecat.metrics.metrics import ProcessingMetricsData, TTFBMetricsData
from pipecat.observers.base_observer import BaseObserver
from pipecat.pipeline.base_pipeline import BasePipeline
from pipecat.pipeline.task_observer import TaskObserver
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.utils import obj_count, obj_id
@@ -84,29 +85,6 @@ class Sink(FrameProcessor):
await self._down_queue.put(frame)
class Observer(BaseObserver):
"""This is a pipeline frame observer that is used as a proxy to the user
provided observers. That is, this is the only observer passed to the frame
processors. Then, every time a frame is pushed this observer will call all
the observers registered to the pipeline task.
"""
def __init__(self, observers: List[BaseObserver] = []):
self._observers = observers
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
for observer in self._observers:
await observer.on_push_frame(src, dst, frame, direction, timestamp)
class PipelineTask:
def __init__(
self,
@@ -141,7 +119,7 @@ class PipelineTask:
self._sink = Sink(self._down_queue)
pipeline.link(self._sink)
self._observer = Observer(params.observers)
self._observer = TaskObserver(params.observers)
def has_finished(self):
"""Indicates whether the tasks has finished. That is, all processors
@@ -222,6 +200,8 @@ class PipelineTask:
self._process_down_task.cancel()
await self._process_down_task
await self._observer.stop()
async def _maybe_cancel_heartbeat_tasks(self):
if self._params.enable_heartbeats:
self._heartbeat_push_task.cancel()

View File

@@ -0,0 +1,97 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
from typing import List
from attr import dataclass
from pipecat.frames.frames import Frame
from pipecat.observers.base_observer import BaseObserver
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
@dataclass
class Proxy:
"""This is the data we receive from the main observer and that we put into
a queue for later processing.
"""
queue: asyncio.Queue
task: asyncio.Task
observer: BaseObserver
@dataclass
class ObserverData:
"""This is the data we receive from the main observer and that we put into a
proxy queue for later processing.
"""
src: FrameProcessor
dst: FrameProcessor
frame: Frame
direction: FrameDirection
timestamp: int
class TaskObserver(BaseObserver):
"""This is a pipeline frame observer that is meant to be used as a proxy to
the user provided observers. That is, this is the observer that should be
passed to the frame processors. Then, every time a frame is pushed this
observer will call all the observers registered to the pipeline task.
This observer makes sure that passing frames to observers doesn't block the
pipeline by creating a queue and a task for each user observer. When a frame
is received, it will be put in a queue for efficiency and later processed by
each task.
"""
def __init__(self, observers: List[BaseObserver] = []):
self._proxies: List[Proxy] = self._create_proxies(observers)
async def stop(self):
"""Stops all proxy observer tasks."""
for proxy in self._proxies:
proxy.task.cancel()
await proxy.task
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
for proxy in self._proxies:
await proxy.queue.put(
ObserverData(
src=src, dst=dst, frame=frame, direction=direction, timestamp=timestamp
)
)
def _create_proxies(self, observers) -> List[Proxy]:
proxies = []
for observer in observers:
queue = asyncio.Queue()
task = asyncio.create_task(self._proxy_task_handler(queue, observer))
proxy = Proxy(queue=queue, task=task, observer=observer)
proxies.append(proxy)
return proxies
async def _proxy_task_handler(self, queue: asyncio.Queue, observer: BaseObserver):
while True:
try:
data = await queue.get()
await observer.on_push_frame(
data.src, data.dst, data.frame, data.direction, data.timestamp
)
except asyncio.CancelledError:
break