Deprecate UserBotLatencyLogObserver, update 29 example

This commit is contained in:
Mark Backman
2026-02-05 10:58:05 -05:00
parent 8791559351
commit 56d8ef2bf4
3 changed files with 66 additions and 38 deletions

View File

@@ -0,0 +1 @@
- Deprecated `UserBotLatencyLogObserver`. Use `UserBotLatencyObserver` directly with its `on_latency_measured` event handler instead.

View File

@@ -14,7 +14,6 @@ from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnal
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.observers.loggers.user_bot_latency_log_observer import UserBotLatencyLogObserver
from pipecat.observers.user_bot_latency_observer import UserBotLatencyObserver
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -97,9 +96,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
]
)
# Create latency tracking observers
latency_tracker = UserBotLatencyObserver()
latency_log_observer = UserBotLatencyLogObserver(latency_tracker)
# Create latency tracking observer
latency_observer = UserBotLatencyObserver()
task = PipelineTask(
pipeline,
@@ -108,9 +106,14 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
observers=[latency_tracker, latency_log_observer],
observers=[latency_observer],
)
# Log latency measurements using the event handler
@latency_observer.event_handler("on_latency_measured")
async def on_latency_measured(observer, latency_seconds):
logger.info(f"⏱️ User-to-bot latency: {latency_seconds:.3f}s")
turn_observer = task.turn_tracking_observer
if turn_observer:

View File

@@ -4,65 +4,89 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Observer for measuring user-to-bot response latency."""
"""Observer for measuring user-to-bot response latency.
.. deprecated:: 0.0.102
This module is deprecated. Use :class:`UserBotLatencyObserver` directly
with its ``on_latency_measured`` event handler instead.
"""
import time
import warnings
from statistics import mean
from loguru import logger
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
CancelFrame,
EndFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.observers.user_bot_latency_observer import UserBotLatencyObserver
from pipecat.processors.frame_processor import FrameDirection
class UserBotLatencyLogObserver(BaseObserver):
"""Observer that logs user-to-bot response latency.
"""Observer that measures time between user stopping speech and bot starting speech.
Uses UserBotLatencyObserver to track latency measurements and provides
logging and statistics. Logs individual latencies and a summary with
average, min, and max values when the pipeline ends.
This helps measure how quickly the AI services respond by tracking
conversation turn timing and logging latency metrics.
.. deprecated:: 0.0.102
This class is deprecated. Use :class:`UserBotLatencyObserver` directly
with its ``on_latency_measured`` event handler for custom logging.
"""
def __init__(self, latency_tracker: UserBotLatencyObserver, **kwargs):
"""Initialize the latency log observer.
def __init__(self):
"""Initialize the latency observer.
Args:
latency_tracker: The latency tracking observer to monitor.
**kwargs: Additional arguments passed to parent class.
Sets up tracking for processed frames and user speech timing
to calculate response latencies.
.. deprecated:: 0.0.102
This class is deprecated. Use :class:`UserBotLatencyObserver`
directly with its ``on_latency_measured`` event handler.
"""
super().__init__(**kwargs)
self._latency_tracker = latency_tracker
warnings.warn(
"UserBotLatencyLogObserver is deprecated and will be removed in a future version. "
"Use UserBotLatencyObserver directly with its on_latency_measured event handler instead.",
DeprecationWarning,
stacklevel=2,
)
super().__init__()
self._user_bot_latency_processed_frames = set()
self._user_stopped_time = 0
self._latencies = []
if latency_tracker:
@latency_tracker.event_handler("on_latency_measured")
async def on_latency_measured(tracker, latency_seconds):
await self._handle_latency_measured(latency_seconds)
async def on_push_frame(self, data: FramePushed):
"""Process frames to handle pipeline end events.
"""Process frames to track speech timing and calculate latency.
Args:
data: Frame push event containing the frame and direction information.
"""
if isinstance(data.frame, (EndFrame, CancelFrame)):
# Only process downstream frames
if data.direction != FrameDirection.DOWNSTREAM:
return
# Skip already processed frames
if data.frame.id in self._user_bot_latency_processed_frames:
return
self._user_bot_latency_processed_frames.add(data.frame.id)
if isinstance(data.frame, VADUserStartedSpeakingFrame):
self._user_stopped_time = 0
elif isinstance(data.frame, VADUserStoppedSpeakingFrame):
self._user_stopped_time = time.time()
elif isinstance(data.frame, (EndFrame, CancelFrame)):
self._log_summary()
async def _handle_latency_measured(self, latency_seconds: float):
"""Handle latency measurement events.
Called when the latency tracker measures user-to-bot latency.
Stores the latency and logs it.
Args:
latency_seconds: The measured latency in seconds.
"""
self._latencies.append(latency_seconds)
self._log_latency(latency_seconds)
elif isinstance(data.frame, BotStartedSpeakingFrame) and self._user_stopped_time:
latency = time.time() - self._user_stopped_time
self._user_stopped_time = 0
self._latencies.append(latency)
self._log_latency(latency)
def _log_summary(self):
if not self._latencies: