diff --git a/CHANGELOG.md b/CHANGELOG.md index c7d7c5d97..73a20e628 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added `MetricsLogObserver` for logging performance metrics from `MetricsFrame` + instances. Supports filtering via `include_metrics` parameter to control which + metrics types are logged (TTFB, processing time, LLM token usage, TTS usage, + smart turn metrics). + - Added support for loading external observers. You can now register custom pipeline observers by setting the `PIPECAT_OBSERVER_FILES` environment variable. This variable should contain a colon-separated list of Python files diff --git a/src/pipecat/observers/loggers/metrics_log_observer.py b/src/pipecat/observers/loggers/metrics_log_observer.py new file mode 100644 index 000000000..dbef805be --- /dev/null +++ b/src/pipecat/observers/loggers/metrics_log_observer.py @@ -0,0 +1,218 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Metrics logging observer for Pipecat. + +This module provides an observer that logs metrics frames to the console, +allowing developers to monitor performance metrics, token usage, and other +statistics in real-time. +""" + +from typing import Optional, Set, Type + +from loguru import logger + +from pipecat.frames.frames import MetricsFrame +from pipecat.metrics.metrics import ( + LLMTokenUsage, + LLMUsageMetricsData, + MetricsData, + ProcessingMetricsData, + SmartTurnMetricsData, + TTFBMetricsData, + TTSUsageMetricsData, +) +from pipecat.observers.base_observer import BaseObserver, FramePushed + + +class MetricsLogObserver(BaseObserver): + """Observer to log metrics activity to the console. + + Monitors and logs all MetricsFrame instances, including: + + - TTFBMetricsData (Time To First Byte) + - ProcessingMetricsData (General processing time) + - LLMUsageMetricsData (Token usage statistics) + - TTSUsageMetricsData (Text-to-Speech character counts) + - SmartTurnMetricsData (Turn prediction metrics) + + This allows developers to track performance metrics, token usage, + and other statistics throughout the pipeline. + + Examples: + Log all metrics types:: + + observers = [MetricsLogObserver()] + + Log only LLM and TTS metrics:: + + from pipecat.metrics.metrics import LLMUsageMetricsData, TTSUsageMetricsData + observers = [ + MetricsLogObserver( + include_metrics={LLMUsageMetricsData, TTSUsageMetricsData} + ) + ] + """ + + def __init__( + self, + include_metrics: Optional[Set[Type[MetricsData]]] = None, + **kwargs, + ): + """Initialize the metrics log observer. + + Args: + include_metrics: Set of metrics types to include. If specified, only these + metrics types will be logged. If None, all metrics are logged. + **kwargs: Additional arguments passed to parent class. + """ + super().__init__(**kwargs) + self._include_metrics = include_metrics + self._frames_seen = set() + + async def on_push_frame(self, data: FramePushed): + """Handle frame push events and log metrics frames. + + Logs MetricsFrame instances with detailed information about the + metrics data, formatted appropriately for each metrics type. + + Args: + data: Frame push event data containing source, frame, and timestamp. + """ + frame = data.frame + timestamp = data.timestamp + + if not isinstance(frame, MetricsFrame): + return + + # Skip frames we've already seen to avoid duplicate logging + if frame.id in self._frames_seen: + return + + self._frames_seen.add(frame.id) + + time_sec = timestamp / 1_000_000_000 + + # Process each metrics data item in the frame + for metrics_data in frame.data: + # Check if this metrics type should be logged + if not self._should_log_metrics(metrics_data): + continue + + self._log_metrics_data(metrics_data, time_sec) + + def _should_log_metrics(self, metrics_data: MetricsData) -> bool: + """Determine if a metrics data item should be logged based on filters. + + Args: + metrics_data: The metrics data to check. + + Returns: + True if the metrics should be logged, False otherwise. + """ + # If include_metrics is specified, only log those types + if self._include_metrics is not None: + return type(metrics_data) in self._include_metrics + + # Otherwise, log all metrics + return True + + def _log_metrics_data(self, metrics_data: MetricsData, time_sec: float): + """Log a single metrics data item. + + Args: + metrics_data: The metrics data to log. + time_sec: Timestamp in seconds. + """ + processor_info = f"[{metrics_data.processor}]" + model_info = f" ({metrics_data.model})" if metrics_data.model else "" + + if isinstance(metrics_data, TTFBMetricsData): + logger.debug( + f"📊 {processor_info} TTFB{model_info}: {metrics_data.value}s at {time_sec:.3f}s" + ) + elif isinstance(metrics_data, ProcessingMetricsData): + logger.debug( + f"📊 {processor_info} PROCESSING TIME{model_info}: {metrics_data.value}s at {time_sec:.3f}s" + ) + elif isinstance(metrics_data, LLMUsageMetricsData): + self._log_llm_usage(metrics_data, processor_info, model_info, time_sec) + elif isinstance(metrics_data, TTSUsageMetricsData): + logger.debug( + f"📊 {processor_info} TTS USAGE{model_info}: {metrics_data.value} characters at {time_sec:.3f}s" + ) + elif isinstance(metrics_data, SmartTurnMetricsData): + self._log_smart_turn(metrics_data, processor_info, model_info, time_sec) + else: + # Generic fallback for unknown metrics types + logger.debug( + f"📊 {processor_info} METRICS{model_info}: {metrics_data} at {time_sec:.3f}s" + ) + + def _log_llm_usage( + self, + metrics_data: LLMUsageMetricsData, + processor_info: str, + model_info: str, + time_sec: float, + ): + """Log LLM token usage metrics. + + Args: + metrics_data: The LLM usage metrics data. + processor_info: Formatted processor name string. + model_info: Formatted model name string. + time_sec: Timestamp in seconds. + """ + usage: LLMTokenUsage = metrics_data.value + + # Build usage details + details = [ + f"prompt: {usage.prompt_tokens}", + f"completion: {usage.completion_tokens}", + f"total: {usage.total_tokens}", + ] + + if usage.cache_read_input_tokens is not None: + details.append(f"cache_read: {usage.cache_read_input_tokens}") + + if usage.cache_creation_input_tokens is not None: + details.append(f"cache_creation: {usage.cache_creation_input_tokens}") + + if usage.reasoning_tokens is not None: + details.append(f"reasoning: {usage.reasoning_tokens}") + + usage_str = ", ".join(details) + + logger.debug( + f"📊 {processor_info} LLM TOKEN USAGE{model_info}: {usage_str} at {time_sec:.2f}s" + ) + + def _log_smart_turn( + self, + metrics_data: SmartTurnMetricsData, + processor_info: str, + model_info: str, + time_sec: float, + ): + """Log smart turn prediction metrics. + + Args: + metrics_data: The smart turn metrics data. + processor_info: Formatted processor name string. + model_info: Formatted model name string. + time_sec: Timestamp in seconds. + """ + complete_str = "COMPLETE" if metrics_data.is_complete else "INCOMPLETE" + + logger.debug( + f"📊 {processor_info} SMART TURN{model_info}: {complete_str} " + f"(probability: {metrics_data.probability:.2%}, " + f"inference: {metrics_data.inference_time_ms:.1f}ms, " + f"server: {metrics_data.server_total_time_ms:.1f}ms, " + f"e2e: {metrics_data.e2e_processing_time_ms:.1f}ms) " + f"at {time_sec:.2f}s" + )