Compare commits

...

1 Commits

Author SHA1 Message Date
Aleix Conchillo Flaqué
0bf61d9e58 move observer proxies to FrameProcessor 2025-05-05 11:31:15 -07:00
12 changed files with 215 additions and 207 deletions

View File

@@ -11,8 +11,26 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added support to `RimeHttpTTSService` for the `arcana` model.
### Changed
- Observers `on_push_frame()` now take a single argument `FramePushed` instead
of multiple arguments.
### Removed
- `StartFrame.observer` has been replaced by `StartFrame.observers`. This allows
a user to pass all observers instead of a single proxy one.
### Deprecated
- Observer `on_push_frame(src, dst, frame, direction, timestamp)` is now
deprecated, use `on_push_frame(data: FramePushed)` instead.
### Fixed
- Fixed a `UltravoxSTTService` issue that would cause the service to generate
all tokens as one word.
- Remove custom audio tracks from `DailyTransport` before leaving.
## [0.0.66] - 2025-05-02

View File

@@ -14,16 +14,15 @@ from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
Frame,
StartInterruptionFrame,
)
from pipecat.observers.base_observer import BaseObserver
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.observers.loggers.llm_log_observer import LLMLogObserver
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
@@ -46,14 +45,13 @@ class DebugObserver(BaseObserver):
Log format: [EVENT TYPE]: [source processor] → [destination processor] at [timestamp]s
"""
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
async def on_push_frame(self, data: FramePushed):
src = data.source
dst = data.destination
frame = data.frame
direction = data.direction
timestamp = data.timestamp
# Convert timestamp to seconds for readability
time_sec = timestamp / 1_000_000_000

View File

@@ -16,6 +16,7 @@ from typing import (
Literal,
Mapping,
Optional,
Sequence,
Tuple,
)
@@ -449,12 +450,12 @@ class StartFrame(SystemFrame):
clock: BaseClock
task_manager: BaseTaskManager
observers: Sequence["BaseObserver"]
audio_in_sample_rate: int = 16000
audio_out_sample_rate: int = 24000
allow_interruptions: bool = False
enable_metrics: bool = False
enable_usage_metrics: bool = False
observer: Optional["BaseObserver"] = None
report_only_initial_ttfb: bool = False

View File

@@ -5,9 +5,38 @@
#
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing_extensions import TYPE_CHECKING
from pipecat.frames.frames import Frame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
if TYPE_CHECKING:
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
@dataclass
class FramePushed:
"""Represents an event where a frame is pushed from one processor to another
within the pipeline.
This data structure is typically used by observers to track the flow of
frames through the pipeline for logging, debugging, or analytics purposes.
Attributes:
source (FrameProcessor): The processor sending the frame.
destination (FrameProcessor): The processor receiving the frame.
frame (Frame): The frame being transferred.
direction (FrameDirection): The direction of the transfer (e.g., downstream or upstream).
timestamp (int): The time when the frame was pushed, based on the pipeline clock.
"""
source: "FrameProcessor"
destination: "FrameProcessor"
frame: Frame
direction: "FrameDirection"
timestamp: int
class BaseObserver(ABC):
@@ -19,26 +48,15 @@ class BaseObserver(ABC):
"""
@abstractmethod
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
"""Abstract method to handle the event when a frame is pushed from one
processor to another.
async def on_push_frame(self, data: FramePushed):
"""Handle the event when a frame is pushed from one processor to another.
This method should be implemented by subclasses to define specific
behavior (e.g., logging, monitoring, debugging) when a frame is
transferred through the pipeline.
Args:
src (FrameProcessor): The source frame processor that is sending the frame.
dst (FrameProcessor): The destination frame processor that will receive the frame.
frame (Frame): The frame being transferred between processors.
direction (FrameDirection): The direction of the frame transfer.
timestamp (int): The timestamp when the frame was pushed (based on the pipeline clock).
This method should be implemented by subclasses to define specific behavior
when a frame is pushed.
data (FramePushed): The event data containing details about the frame transfer.
"""
pass

View File

@@ -15,7 +15,7 @@ from pipecat.frames.frames import (
LLMMessagesFrame,
LLMTextFrame,
)
from pipecat.observers.base_observer import BaseObserver
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.llm_service import LLMService
@@ -38,14 +38,13 @@ class LLMLogObserver(BaseObserver):
"""
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
async def on_push_frame(self, data: FramePushed):
src = data.source
dst = data.destination
frame = data.frame
direction = data.direction
timestamp = data.timestamp
if not isinstance(src, LLMService) and not isinstance(dst, LLMService):
return

View File

@@ -11,7 +11,7 @@ from pipecat.frames.frames import (
InterimTranscriptionFrame,
TranscriptionFrame,
)
from pipecat.observers.base_observer import BaseObserver
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.stt_service import STTService
@@ -29,14 +29,11 @@ class TranscriptionLogObserver(BaseObserver):
"""
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
async def on_push_frame(self, data: FramePushed):
src = data.source
frame = data.frame
timestamp = data.timestamp
if not isinstance(src, STTService):
return

View File

@@ -32,7 +32,6 @@ from pipecat.metrics.metrics import ProcessingMetricsData, TTFBMetricsData
from pipecat.observers.base_observer import BaseObserver
from pipecat.pipeline.base_pipeline import BasePipeline
from pipecat.pipeline.base_task import BaseTask
from pipecat.pipeline.task_observer import TaskObserver
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.asyncio import BaseTaskManager, TaskManager
@@ -184,6 +183,7 @@ class PipelineTask(BaseTask):
self._idle_timeout_secs = idle_timeout_secs
self._idle_timeout_frames = idle_timeout_frames
self._cancel_on_idle_timeout = cancel_on_idle_timeout
self._observers = observers
if self._params.observers:
import warnings
@@ -193,7 +193,7 @@ class PipelineTask(BaseTask):
"Field 'observers' is deprecated, use the 'observers' parameter instead.",
DeprecationWarning,
)
observers = self._params.observers
self._observers = self._params.observers
self._finished = False
# This queue receives frames coming from the pipeline upstream.
@@ -229,11 +229,6 @@ class PipelineTask(BaseTask):
# PipelineTask and its frame processors.
self._task_manager = task_manager or TaskManager()
# The task observer acts as a proxy to the provided observers. This way,
# we only need to pass a single observer (using the StartFrame) which
# then just acts as a proxy.
self._observer = TaskObserver(observers=observers, task_manager=self._task_manager)
# These events can be used to check which frames make it to the source
# or sink processors. Instead of calling the event handlers for every
# frame the user needs to specify which events they are interested
@@ -347,8 +342,6 @@ class PipelineTask(BaseTask):
self._process_push_queue(), f"{self}::_process_push_queue"
)
await self._observer.start()
return self._process_push_task
def _maybe_start_heartbeat_tasks(self):
@@ -367,8 +360,6 @@ class PipelineTask(BaseTask):
)
async def _cancel_tasks(self):
await self._observer.stop()
await self._task_manager.cancel_task(self._process_up_task)
await self._task_manager.cancel_task(self._process_down_task)
@@ -425,7 +416,7 @@ class PipelineTask(BaseTask):
audio_out_sample_rate=self._params.audio_out_sample_rate,
enable_metrics=self._params.enable_metrics,
enable_usage_metrics=self._params.enable_usage_metrics,
observer=self._observer,
observers=self._observers,
report_only_initial_ttfb=self._params.report_only_initial_ttfb,
)
start_frame.metadata = self._params.start_metadata

View File

@@ -1,103 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
from typing import List
from attr import dataclass
from pipecat.frames.frames import Frame
from pipecat.observers.base_observer import BaseObserver
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.asyncio import BaseTaskManager
@dataclass
class Proxy:
"""This is the data we receive from the main observer and that we put into
a queue for later processing.
"""
queue: asyncio.Queue
task: asyncio.Task
observer: BaseObserver
@dataclass
class ObserverData:
"""This is the data we receive from the main observer and that we put into a
proxy queue for later processing.
"""
src: FrameProcessor
dst: FrameProcessor
frame: Frame
direction: FrameDirection
timestamp: int
class TaskObserver(BaseObserver):
"""This is a pipeline frame observer that is meant to be used as a proxy to
the user provided observers. That is, this is the observer that should be
passed to the frame processors. Then, every time a frame is pushed this
observer will call all the observers registered to the pipeline task.
This observer makes sure that passing frames to observers doesn't block the
pipeline by creating a queue and a task for each user observer. When a frame
is received, it will be put in a queue for efficiency and later processed by
each task.
"""
def __init__(self, *, observers: List[BaseObserver] = [], task_manager: BaseTaskManager):
self._observers = observers
self._task_manager = task_manager
self._proxies: List[Proxy] = []
async def start(self):
"""Starts all proxy observer tasks."""
self._proxies = self._create_proxies(self._observers)
async def stop(self):
"""Stops all proxy observer tasks."""
for proxy in self._proxies:
await self._task_manager.cancel_task(proxy.task)
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
for proxy in self._proxies:
await proxy.queue.put(
ObserverData(
src=src, dst=dst, frame=frame, direction=direction, timestamp=timestamp
)
)
def _create_proxies(self, observers) -> List[Proxy]:
proxies = []
for observer in observers:
queue = asyncio.Queue()
task = self._task_manager.create_task(
self._proxy_task_handler(queue, observer),
f"TaskObserver::{observer.__class__.__name__}::_proxy_task_handler",
)
proxy = Proxy(queue=queue, task=task, observer=observer)
proxies.append(proxy)
return proxies
async def _proxy_task_handler(self, queue: asyncio.Queue, observer: BaseObserver):
while True:
data = await queue.get()
await observer.on_push_frame(
data.src, data.dst, data.frame, data.direction, data.timestamp
)

View File

@@ -5,8 +5,9 @@
#
import asyncio
import inspect
from enum import Enum
from typing import Awaitable, Callable, Coroutine, Optional
from typing import Awaitable, Callable, Coroutine, List, Optional, Sequence
from loguru import logger
@@ -21,8 +22,9 @@ from pipecat.frames.frames import (
SystemFrame,
)
from pipecat.metrics.metrics import LLMTokenUsage, MetricsData
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics
from pipecat.utils.asyncio import BaseTaskManager
from pipecat.utils.asyncio import BaseTaskManager, TaskManager
from pipecat.utils.base_object import BaseObject
@@ -31,6 +33,52 @@ class FrameDirection(Enum):
UPSTREAM = 2
class FrameObserverProxy:
def __init__(self, task_manager: TaskManager, observer: BaseObserver) -> None:
self._task_manager = task_manager
self._observer = observer
self._queue = asyncio.Queue()
self._task: Optional[asyncio.Task] = None
self._warning_reported = False
def start(self):
if not self._task:
self._task = self._task_manager.create_task(
self._observer_task_handler(), f"ObserverProxy::{self._observer.__class__.__name__}"
)
async def stop(self):
if self._task:
await self._task_manager.cancel_task(self._task)
self._task = None
async def observe(self, data: FramePushed):
await self._queue.put(data)
async def _observer_task_handler(self):
while True:
data = await self._queue.get()
signature = inspect.signature(self._observer.on_push_frame)
if len(signature.parameters) > 1:
if not self._warning_reported:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Observer `on_push_frame(source, destination, frame, direction, timestamp)` is deprecated, use `on_push_frame(data: FramePushed)` instead.",
DeprecationWarning,
)
self._warning_reported = True
await self._observer.on_push_frame(
data.source, data.destination, data.frame, data.direction, data.timestamp
)
else:
await self._observer.on_push_frame(data)
self._queue.task_done()
class FrameProcessor(BaseObject):
def __init__(
self,
@@ -55,7 +103,7 @@ class FrameProcessor(BaseObject):
self._enable_metrics = False
self._enable_usage_metrics = False
self._report_only_initial_ttfb = False
self._observer = None
self._observer_proxies: List[FrameObserverProxy] = []
# Cancellation is done through CancelFrame (a system frame). This could
# cause other events being triggered (e.g. closing a transport) which
@@ -77,10 +125,16 @@ class FrameProcessor(BaseObject):
self.__input_frame_task: Optional[asyncio.Task] = None
# Every processor in Pipecat should only output frames from a single
# task. This avoid problems like audio overlapping. System frames are the
# exception to this rule. This create this task.
# task. This avoids problems like audio overlapping. System frames are
# the exception to this rule.
self.__push_frame_task: Optional[asyncio.Task] = None
# The observers task will push observed frames to the observers'
# proxies. This task avoids the pipeline to block. Then, there is one
# proxy per observer and each proxy has its own task. This way each
# observer is independent to the rest.
self.__observers_task: Optional[asyncio.Task] = None
@property
def id(self) -> int:
return self._id
@@ -170,6 +224,7 @@ class FrameProcessor(BaseObject):
await super().cleanup()
await self.__cancel_input_task()
await self.__cancel_push_task()
await self.__cancel_observers_task()
def link(self, processor: "FrameProcessor"):
self._next = processor
@@ -232,7 +287,9 @@ class FrameProcessor(BaseObject):
self._enable_metrics = frame.enable_metrics
self._enable_usage_metrics = frame.enable_usage_metrics
self._report_only_initial_ttfb = frame.report_only_initial_ttfb
self._observer = frame.observer
self._observer_proxies = self._create_observer_proxies(
frame.task_manager, frame.observers
)
await self.__start(frame)
elif isinstance(frame, StartInterruptionFrame):
await self._start_interruption()
@@ -255,6 +312,7 @@ class FrameProcessor(BaseObject):
await self.__push_queue.put((frame, direction))
async def __start(self, frame: StartFrame):
self.__create_observers_task()
self.__create_input_task()
self.__create_push_task()
@@ -262,6 +320,7 @@ class FrameProcessor(BaseObject):
self._cancelling = True
await self.__cancel_input_task()
await self.__cancel_push_task()
await self.__cancel_observers_task()
#
# Handle interruptions
@@ -294,17 +353,25 @@ class FrameProcessor(BaseObject):
timestamp = self._clock.get_time() if self._clock else 0
if direction == FrameDirection.DOWNSTREAM and self._next:
logger.trace(f"Pushing {frame} from {self} to {self._next}")
if self._observer:
await self._observer.on_push_frame(
self, self._next, frame, direction, timestamp
)
data = FramePushed(
source=self,
destination=self._next,
frame=frame,
direction=direction,
timestamp=timestamp,
)
await self.__observe_frame(data)
await self._next.queue_frame(frame, direction)
elif direction == FrameDirection.UPSTREAM and self._prev:
logger.trace(f"Pushing {frame} upstream from {self} to {self._prev}")
if self._observer:
await self._observer.on_push_frame(
self, self._prev, frame, direction, timestamp
)
data = FramePushed(
source=self,
destination=self._prev,
frame=frame,
direction=direction,
timestamp=timestamp,
)
await self.__observe_frame(data)
await self._prev.queue_frame(frame, direction)
except Exception as e:
logger.exception(f"Uncaught exception in {self}: {e}")
@@ -368,3 +435,36 @@ class FrameProcessor(BaseObject):
(frame, direction) = await self.__push_queue.get()
await self.__internal_push_frame(frame, direction)
self.__push_queue.task_done()
def _create_observer_proxies(
self, task_manager: TaskManager, observers: Sequence[BaseObserver]
) -> List[FrameObserverProxy]:
result = []
for observer in observers:
proxy = FrameObserverProxy(task_manager, observer)
proxy.start()
result.append(proxy)
return result
def __create_observers_task(self):
if not self.__observers_task:
self.__observers_queue = asyncio.Queue()
self.__observers_task = self.create_task(self.__observers_task_handler())
async def __cancel_observers_task(self):
if self.__observers_task:
for proxy in self._observer_proxies:
await proxy.stop()
await self.cancel_task(self.__observers_task)
self.__observers_task = None
async def __observe_frame(self, data: FramePushed):
await self.__observers_queue.put(data)
async def __observers_task_handler(self):
while True:
data = await self.__observers_queue.get()
# Proxy observation to all observers.
for observer in self._observer_proxies:
await observer.observe(data)
self.__observers_queue.task_done()

View File

@@ -55,7 +55,7 @@ from pipecat.metrics.metrics import (
TTFBMetricsData,
TTSUsageMetricsData,
)
from pipecat.observers.base_observer import BaseObserver
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
@@ -445,14 +445,7 @@ class RTVIObserver(BaseObserver):
self._frames_seen = set()
rtvi.set_errors_enabled(self._params.errors_enabled)
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
async def on_push_frame(self, data: FramePushed):
"""Process a frame being pushed through the pipeline.
Args:
@@ -462,6 +455,10 @@ class RTVIObserver(BaseObserver):
direction: Direction of frame flow in pipeline
timestamp: Time when frame was pushed
"""
src = data.source
frame = data.frame
direction = data.direction
# If we have already seen this frame, let's skip it.
if frame.id in self._frames_seen:
return

View File

@@ -9,8 +9,9 @@ from typing import List, Literal, Optional
from pydantic import BaseModel
from pipecat.frames.frames import Frame
from pipecat.observers.base_observer import FramePushed
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frameworks.rtvi import RTVIObserver
from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIProcessor
from pipecat.services.google.frames import LLMSearchOrigin, LLMSearchResponseFrame
@@ -27,18 +28,13 @@ class RTVIBotLLMSearchResponseMessage(BaseModel):
class GoogleRTVIObserver(RTVIObserver):
def __init__(self, rtvi: FrameProcessor):
def __init__(self, rtvi: RTVIProcessor):
super().__init__(rtvi)
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
await super().on_push_frame(src, dst, frame, direction, timestamp)
async def on_push_frame(self, data: FramePushed):
await super().on_push_frame(data)
frame = data.frame
if isinstance(frame, LLMSearchResponseFrame):
await self._handle_llm_search_response_frame(frame)

View File

@@ -15,7 +15,7 @@ from pipecat.frames.frames import (
StartFrame,
SystemFrame,
)
from pipecat.observers.base_observer import BaseObserver
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -42,14 +42,10 @@ class HeartbeatsObserver(BaseObserver):
self._target = target
self._callback = heartbeat_callback
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
async def on_push_frame(self, data: FramePushed):
src = data.source
frame = data.frame
if src == self._target and isinstance(frame, HeartbeatFrame):
await self._callback(self._target, frame)