Merge pull request #599 from pipecat-ai/mb/remove-metrics-from-transport

Move metrics from transport to rtvi
This commit is contained in:
Mark Backman
2024-10-16 11:39:58 -04:00
committed by GitHub
5 changed files with 78 additions and 85 deletions

View File

@@ -5,6 +5,12 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Changed
- Metrics messages have moved out from the transport's base output into RTVI.
## [0.0.44] - 2024-10-15
### Added

View File

@@ -6,7 +6,17 @@
import asyncio
from dataclasses import dataclass
from typing import Any, Awaitable, Callable, Dict, List, Literal, Optional, Union
from typing import (
Any,
Awaitable,
Callable,
Dict,
List,
Literal,
Mapping,
Optional,
Union,
)
from loguru import logger
from pydantic import BaseModel, Field, PrivateAttr, ValidationError
@@ -24,6 +34,7 @@ from pipecat.frames.frames import (
InterimTranscriptionFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
MetricsFrame,
StartFrame,
SystemFrame,
TextFrame,
@@ -35,6 +46,12 @@ from pipecat.frames.frames import (
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.metrics.metrics import (
LLMUsageMetricsData,
ProcessingMetricsData,
TTFBMetricsData,
TTSUsageMetricsData,
)
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
@@ -343,6 +360,12 @@ class RTVIBotStoppedSpeakingMessage(BaseModel):
type: Literal["bot-stopped-speaking"] = "bot-stopped-speaking"
class RTVIMetricsMessage(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
type: Literal["metrics"] = "metrics"
data: Mapping[str, Any]
class RTVIProcessorParams(BaseModel):
send_bot_ready: bool = True
@@ -509,6 +532,42 @@ class RTVIBotTTSProcessor(RTVIFrameProcessor):
await self._push_transport_message_urgent(message)
class RTVIMetricsProcessor(RTVIFrameProcessor):
def __init__(self, **kwargs):
super().__init__(**kwargs)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
await self.push_frame(frame, direction)
if isinstance(frame, MetricsFrame):
await self._handle_metrics(frame)
async def _handle_metrics(self, frame: MetricsFrame):
metrics = {}
for d in frame.data:
if isinstance(d, TTFBMetricsData):
if "ttfb" not in metrics:
metrics["ttfb"] = []
metrics["ttfb"].append(d.model_dump(exclude_none=True))
elif isinstance(d, ProcessingMetricsData):
if "processing" not in metrics:
metrics["processing"] = []
metrics["processing"].append(d.model_dump(exclude_none=True))
elif isinstance(d, LLMUsageMetricsData):
if "tokens" not in metrics:
metrics["tokens"] = []
metrics["tokens"].append(d.value.model_dump(exclude_none=True))
elif isinstance(d, TTSUsageMetricsData):
if "characters" not in metrics:
metrics["characters"] = []
metrics["characters"].append(d.model_dump(exclude_none=True))
message = RTVIMetricsMessage(data=metrics)
await self._push_transport_message_urgent(message)
class RTVIProcessor(FrameProcessor):
def __init__(
self,

View File

@@ -6,37 +6,34 @@
import asyncio
import itertools
import time
import sys
from PIL import Image
import time
from typing import List
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from loguru import logger
from PIL import Image
from pipecat.frames.frames import (
BotSpeakingFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
CancelFrame,
MetricsFrame,
EndFrame,
Frame,
OutputAudioRawFrame,
OutputImageRawFrame,
SpriteFrame,
StartFrame,
EndFrame,
Frame,
StartInterruptionFrame,
StopInterruptionFrame,
SystemFrame,
TTSStartedFrame,
TTSStoppedFrame,
TransportMessageFrame,
TransportMessageUrgentFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.base_transport import TransportParams
from loguru import logger
from pipecat.utils.time import nanoseconds_to_seconds
@@ -141,9 +138,6 @@ class BaseOutputTransport(FrameProcessor):
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
pass
async def send_metrics(self, frame: MetricsFrame):
pass
async def write_frame_to_camera(self, frame: OutputImageRawFrame):
pass
@@ -173,9 +167,6 @@ class BaseOutputTransport(FrameProcessor):
elif isinstance(frame, (StartInterruptionFrame, StopInterruptionFrame)):
await self.push_frame(frame, direction)
await self._handle_interruptions(frame)
elif isinstance(frame, MetricsFrame):
await self.push_frame(frame, direction)
await self.send_metrics(frame)
elif isinstance(frame, TransportMessageUrgentFrame):
await self.send_message(frame)
elif isinstance(frame, SystemFrame):

View File

@@ -28,7 +28,6 @@ from pipecat.frames.frames import (
Frame,
InputAudioRawFrame,
InterimTranscriptionFrame,
MetricsFrame,
OutputAudioRawFrame,
OutputImageRawFrame,
SpriteFrame,
@@ -39,12 +38,6 @@ from pipecat.frames.frames import (
UserImageRawFrame,
UserImageRequestFrame,
)
from pipecat.metrics.metrics import (
LLMUsageMetricsData,
ProcessingMetricsData,
TTFBMetricsData,
TTSUsageMetricsData,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.transcriptions.language import Language
from pipecat.transports.base_input import BaseInputTransport
@@ -759,31 +752,6 @@ class DailyOutputTransport(BaseOutputTransport):
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
await self._messages_queue.put(frame)
async def send_metrics(self, frame: MetricsFrame):
metrics = {}
for d in frame.data:
if isinstance(d, TTFBMetricsData):
if "ttfb" not in metrics:
metrics["ttfb"] = []
metrics["ttfb"].append(d.model_dump(exclude_none=True))
elif isinstance(d, ProcessingMetricsData):
if "processing" not in metrics:
metrics["processing"] = []
metrics["processing"].append(d.model_dump(exclude_none=True))
elif isinstance(d, LLMUsageMetricsData):
if "tokens" not in metrics:
metrics["tokens"] = []
metrics["tokens"].append(d.value.model_dump(exclude_none=True))
elif isinstance(d, TTSUsageMetricsData):
if "characters" not in metrics:
metrics["characters"] = []
metrics["characters"].append(d.model_dump(exclude_none=True))
message = DailyTransportMessageFrame(
message={"label": "rtvi-ai", "type": "metrics", "data": metrics}
)
await self._messages_queue.put(message)
async def write_raw_audio_frames(self, frames: bytes):
await self._client.write_raw_audio_frames(frames)

View File

@@ -10,31 +10,25 @@ from typing import Any, Awaitable, Callable, List
import numpy as np
from loguru import logger
from pydantic import BaseModel
from scipy import signal
from pipecat.frames.frames import (
AudioRawFrame,
CancelFrame,
EndFrame,
Frame,
InputAudioRawFrame,
MetricsFrame,
OutputAudioRawFrame,
StartFrame,
TransportMessageFrame,
TransportMessageUrgentFrame,
)
from pipecat.metrics.metrics import (
LLMUsageMetricsData,
ProcessingMetricsData,
TTFBMetricsData,
TTSUsageMetricsData,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.vad.vad_analyzer import VADAnalyzer
from pydantic import BaseModel
from scipy import signal
try:
from livekit import rtc
@@ -450,31 +444,6 @@ class LiveKitOutputTransport(BaseOutputTransport):
else:
await self._client.send_data(frame.message.encode())
async def send_metrics(self, frame: MetricsFrame):
metrics = {}
for d in frame.data:
if isinstance(d, TTFBMetricsData):
if "ttfb" not in metrics:
metrics["ttfb"] = []
metrics["ttfb"].append(d.model_dump(exclude_none=True))
elif isinstance(d, ProcessingMetricsData):
if "processing" not in metrics:
metrics["processing"] = []
metrics["processing"].append(d.model_dump(exclude_none=True))
elif isinstance(d, LLMUsageMetricsData):
if "tokens" not in metrics:
metrics["tokens"] = []
metrics["tokens"].append(d.value.model_dump(exclude_none=True))
elif isinstance(d, TTSUsageMetricsData):
if "characters" not in metrics:
metrics["characters"] = []
metrics["characters"].append(d.model_dump(exclude_none=True))
message = LiveKitTransportMessageFrame(
message={"type": "pipecat-metrics", "metrics": metrics}
)
await self._client.send_data(str(message.message).encode())
async def write_raw_audio_frames(self, frames: bytes):
livekit_audio = self._convert_pipecat_audio_to_livekit(frames)
await self._client.publish_audio(livekit_audio)