Merge pull request #1739 from pipecat-ai/aleix/observers-frame-pushed-class

BaseObserver: add FramePushed class and deprecate multiple arguments
This commit is contained in:
Aleix Conchillo Flaqué
2025-05-06 15:29:05 -07:00
committed by GitHub
10 changed files with 130 additions and 114 deletions

View File

@@ -16,10 +16,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Observers `on_push_frame()` now take a single argument `FramePushed` instead
of multiple arguments.
- Updated the default voice for `DeepgramTTSService` to `aura-2-helena-en`.
### Deprecated
- Observer `on_push_frame(src, dst, frame, direction, timestamp)` is now
deprecated, use `on_push_frame(data: FramePushed)` instead.
### Fixed
- Fixed a `UltravoxSTTService` issue that would cause the service to generate
all tokens as one word.
- Fixed a `PipelineTask` issue that would cause tasks to not be cancelled if
task was cancelled from outside of Pipecat.

View File

@@ -14,16 +14,15 @@ from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
Frame,
StartInterruptionFrame,
)
from pipecat.observers.base_observer import BaseObserver
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.observers.loggers.llm_log_observer import LLMLogObserver
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
@@ -46,14 +45,13 @@ class DebugObserver(BaseObserver):
Log format: [EVENT TYPE]: [source processor] → [destination processor] at [timestamp]s
"""
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
async def on_push_frame(self, data: FramePushed):
src = data.source
dst = data.destination
frame = data.frame
direction = data.direction
timestamp = data.timestamp
# Convert timestamp to seconds for readability
time_sec = timestamp / 1_000_000_000

View File

@@ -5,9 +5,38 @@
#
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing_extensions import TYPE_CHECKING
from pipecat.frames.frames import Frame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
if TYPE_CHECKING:
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
@dataclass
class FramePushed:
"""Represents an event where a frame is pushed from one processor to another
within the pipeline.
This data structure is typically used by observers to track the flow of
frames through the pipeline for logging, debugging, or analytics purposes.
Attributes:
source (FrameProcessor): The processor sending the frame.
destination (FrameProcessor): The processor receiving the frame.
frame (Frame): The frame being transferred.
direction (FrameDirection): The direction of the transfer (e.g., downstream or upstream).
timestamp (int): The time when the frame was pushed, based on the pipeline clock.
"""
source: "FrameProcessor"
destination: "FrameProcessor"
frame: Frame
direction: "FrameDirection"
timestamp: int
class BaseObserver(ABC):
@@ -19,26 +48,15 @@ class BaseObserver(ABC):
"""
@abstractmethod
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
"""Abstract method to handle the event when a frame is pushed from one
processor to another.
async def on_push_frame(self, data: FramePushed):
"""Handle the event when a frame is pushed from one processor to another.
This method should be implemented by subclasses to define specific
behavior (e.g., logging, monitoring, debugging) when a frame is
transferred through the pipeline.
Args:
src (FrameProcessor): The source frame processor that is sending the frame.
dst (FrameProcessor): The destination frame processor that will receive the frame.
frame (Frame): The frame being transferred between processors.
direction (FrameDirection): The direction of the frame transfer.
timestamp (int): The timestamp when the frame was pushed (based on the pipeline clock).
This method should be implemented by subclasses to define specific behavior
when a frame is pushed.
data (FramePushed): The event data containing details about the frame transfer.
"""
pass

View File

@@ -15,7 +15,7 @@ from pipecat.frames.frames import (
LLMMessagesFrame,
LLMTextFrame,
)
from pipecat.observers.base_observer import BaseObserver
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.llm_service import LLMService
@@ -38,14 +38,13 @@ class LLMLogObserver(BaseObserver):
"""
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
async def on_push_frame(self, data: FramePushed):
src = data.source
dst = data.destination
frame = data.frame
direction = data.direction
timestamp = data.timestamp
if not isinstance(src, LLMService) and not isinstance(dst, LLMService):
return

View File

@@ -11,7 +11,7 @@ from pipecat.frames.frames import (
InterimTranscriptionFrame,
TranscriptionFrame,
)
from pipecat.observers.base_observer import BaseObserver
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.stt_service import STTService
@@ -29,14 +29,11 @@ class TranscriptionLogObserver(BaseObserver):
"""
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
async def on_push_frame(self, data: FramePushed):
src = data.source
frame = data.frame
timestamp = data.timestamp
if not isinstance(src, STTService):
return

View File

@@ -5,13 +5,12 @@
#
import asyncio
import inspect
from typing import List
from attr import dataclass
from pipecat.frames.frames import Frame
from pipecat.observers.base_observer import BaseObserver
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.utils.asyncio import BaseTaskManager
@@ -27,20 +26,6 @@ class Proxy:
observer: BaseObserver
@dataclass
class ObserverData:
"""This is the data we receive from the main observer and that we put into a
proxy queue for later processing.
"""
src: FrameProcessor
dst: FrameProcessor
frame: Frame
direction: FrameDirection
timestamp: int
class TaskObserver(BaseObserver):
"""This is a pipeline frame observer that is meant to be used as a proxy to
the user provided observers. That is, this is the observer that should be
@@ -68,20 +53,9 @@ class TaskObserver(BaseObserver):
for proxy in self._proxies:
await self._task_manager.cancel_task(proxy.task)
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
async def on_push_frame(self, data: FramePushed):
for proxy in self._proxies:
await proxy.queue.put(
ObserverData(
src=src, dst=dst, frame=frame, direction=direction, timestamp=timestamp
)
)
await proxy.queue.put(data)
def _create_proxies(self, observers) -> List[Proxy]:
proxies = []
@@ -96,8 +70,26 @@ class TaskObserver(BaseObserver):
return proxies
async def _proxy_task_handler(self, queue: asyncio.Queue, observer: BaseObserver):
warning_reported = False
while True:
data = await queue.get()
await observer.on_push_frame(
data.src, data.dst, data.frame, data.direction, data.timestamp
)
signature = inspect.signature(observer.on_push_frame)
if len(signature.parameters) > 1:
if not warning_reported:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Observer `on_push_frame(source, destination, frame, direction, timestamp)` is deprecated, us `on_push_frame(data: FramePushed)` instead.",
DeprecationWarning,
)
warning_reported = True
await observer.on_push_frame(
data.src, data.dst, data.frame, data.direction, data.timestamp
)
else:
await observer.on_push_frame(data)
queue.task_done()

View File

@@ -21,6 +21,7 @@ from pipecat.frames.frames import (
SystemFrame,
)
from pipecat.metrics.metrics import LLMTokenUsage, MetricsData
from pipecat.observers.base_observer import FramePushed
from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics
from pipecat.utils.asyncio import BaseTaskManager
from pipecat.utils.base_object import BaseObject
@@ -294,17 +295,28 @@ class FrameProcessor(BaseObject):
timestamp = self._clock.get_time() if self._clock else 0
if direction == FrameDirection.DOWNSTREAM and self._next:
logger.trace(f"Pushing {frame} from {self} to {self._next}")
if self._observer:
await self._observer.on_push_frame(
self, self._next, frame, direction, timestamp
data = FramePushed(
source=self,
destination=self._next,
frame=frame,
direction=direction,
timestamp=timestamp,
)
await self._observer.on_push_frame(data)
await self._next.queue_frame(frame, direction)
elif direction == FrameDirection.UPSTREAM and self._prev:
logger.trace(f"Pushing {frame} upstream from {self} to {self._prev}")
if self._observer:
await self._observer.on_push_frame(
self, self._prev, frame, direction, timestamp
data = FramePushed(
source=self,
destination=self._prev,
frame=frame,
direction=direction,
timestamp=timestamp,
)
await self._observer.on_push_frame(data)
await self._prev.queue_frame(frame, direction)
except Exception as e:
logger.exception(f"Uncaught exception in {self}: {e}")

View File

@@ -55,7 +55,7 @@ from pipecat.metrics.metrics import (
TTFBMetricsData,
TTSUsageMetricsData,
)
from pipecat.observers.base_observer import BaseObserver
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
@@ -445,14 +445,7 @@ class RTVIObserver(BaseObserver):
self._frames_seen = set()
rtvi.set_errors_enabled(self._params.errors_enabled)
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
async def on_push_frame(self, data: FramePushed):
"""Process a frame being pushed through the pipeline.
Args:
@@ -462,6 +455,10 @@ class RTVIObserver(BaseObserver):
direction: Direction of frame flow in pipeline
timestamp: Time when frame was pushed
"""
src = data.source
frame = data.frame
direction = data.direction
# If we have already seen this frame, let's skip it.
if frame.id in self._frames_seen:
return

View File

@@ -9,8 +9,9 @@ from typing import List, Literal, Optional
from pydantic import BaseModel
from pipecat.frames.frames import Frame
from pipecat.observers.base_observer import FramePushed
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frameworks.rtvi import RTVIObserver
from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIProcessor
from pipecat.services.google.frames import LLMSearchOrigin, LLMSearchResponseFrame
@@ -27,18 +28,13 @@ class RTVIBotLLMSearchResponseMessage(BaseModel):
class GoogleRTVIObserver(RTVIObserver):
def __init__(self, rtvi: FrameProcessor):
def __init__(self, rtvi: RTVIProcessor):
super().__init__(rtvi)
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
await super().on_push_frame(src, dst, frame, direction, timestamp)
async def on_push_frame(self, data: FramePushed):
await super().on_push_frame(data)
frame = data.frame
if isinstance(frame, LLMSearchResponseFrame):
await self._handle_llm_search_response_frame(frame)

View File

@@ -15,7 +15,7 @@ from pipecat.frames.frames import (
StartFrame,
SystemFrame,
)
from pipecat.observers.base_observer import BaseObserver
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -42,14 +42,10 @@ class HeartbeatsObserver(BaseObserver):
self._target = target
self._callback = heartbeat_callback
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
async def on_push_frame(self, data: FramePushed):
src = data.source
frame = data.frame
if src == self._target and isinstance(frame, HeartbeatFrame):
await self._callback(self._target, frame)