diff --git a/changelog/3355.deprecated.md b/changelog/3355.deprecated.md new file mode 100644 index 000000000..7406268a7 --- /dev/null +++ b/changelog/3355.deprecated.md @@ -0,0 +1 @@ +- Deprecated `UserBotLatencyLogObserver`. Use `UserBotLatencyObserver` directly with its `on_latency_measured` event handler instead. diff --git a/examples/foundational/29-turn-tracking-observer.py b/examples/foundational/29-turn-tracking-observer.py index 04151bf26..272254eb6 100644 --- a/examples/foundational/29-turn-tracking-observer.py +++ b/examples/foundational/29-turn-tracking-observer.py @@ -14,7 +14,6 @@ from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnal from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams from pipecat.frames.frames import LLMRunFrame -from pipecat.observers.loggers.user_bot_latency_log_observer import UserBotLatencyLogObserver from pipecat.observers.user_bot_latency_observer import UserBotLatencyObserver from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner @@ -97,9 +96,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ] ) - # Create latency tracking observers - latency_tracker = UserBotLatencyObserver() - latency_log_observer = UserBotLatencyLogObserver(latency_tracker) + # Create latency tracking observer + latency_observer = UserBotLatencyObserver() task = PipelineTask( pipeline, @@ -108,9 +106,14 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): enable_usage_metrics=True, ), idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, - observers=[latency_tracker, latency_log_observer], + observers=[latency_observer], ) + # Log latency measurements using the event handler + @latency_observer.event_handler("on_latency_measured") + async def on_latency_measured(observer, latency_seconds): + logger.info(f"⏱️ User-to-bot latency: {latency_seconds:.3f}s") + turn_observer = task.turn_tracking_observer if turn_observer: diff --git a/src/pipecat/observers/loggers/user_bot_latency_log_observer.py b/src/pipecat/observers/loggers/user_bot_latency_log_observer.py index e46be9541..044d4dea6 100644 --- a/src/pipecat/observers/loggers/user_bot_latency_log_observer.py +++ b/src/pipecat/observers/loggers/user_bot_latency_log_observer.py @@ -4,65 +4,89 @@ # SPDX-License-Identifier: BSD 2-Clause License # -"""Observer for measuring user-to-bot response latency.""" +"""Observer for measuring user-to-bot response latency. +.. deprecated:: 0.0.102 + This module is deprecated. Use :class:`UserBotLatencyObserver` directly + with its ``on_latency_measured`` event handler instead. +""" + +import time +import warnings from statistics import mean from loguru import logger from pipecat.frames.frames import ( + BotStartedSpeakingFrame, CancelFrame, EndFrame, + VADUserStartedSpeakingFrame, + VADUserStoppedSpeakingFrame, ) from pipecat.observers.base_observer import BaseObserver, FramePushed -from pipecat.observers.user_bot_latency_observer import UserBotLatencyObserver +from pipecat.processors.frame_processor import FrameDirection class UserBotLatencyLogObserver(BaseObserver): - """Observer that logs user-to-bot response latency. + """Observer that measures time between user stopping speech and bot starting speech. - Uses UserBotLatencyObserver to track latency measurements and provides - logging and statistics. Logs individual latencies and a summary with - average, min, and max values when the pipeline ends. + This helps measure how quickly the AI services respond by tracking + conversation turn timing and logging latency metrics. + + .. deprecated:: 0.0.102 + This class is deprecated. Use :class:`UserBotLatencyObserver` directly + with its ``on_latency_measured`` event handler for custom logging. """ - def __init__(self, latency_tracker: UserBotLatencyObserver, **kwargs): - """Initialize the latency log observer. + def __init__(self): + """Initialize the latency observer. - Args: - latency_tracker: The latency tracking observer to monitor. - **kwargs: Additional arguments passed to parent class. + Sets up tracking for processed frames and user speech timing + to calculate response latencies. + + .. deprecated:: 0.0.102 + This class is deprecated. Use :class:`UserBotLatencyObserver` + directly with its ``on_latency_measured`` event handler. """ - super().__init__(**kwargs) - self._latency_tracker = latency_tracker + warnings.warn( + "UserBotLatencyLogObserver is deprecated and will be removed in a future version. " + "Use UserBotLatencyObserver directly with its on_latency_measured event handler instead.", + DeprecationWarning, + stacklevel=2, + ) + super().__init__() + self._user_bot_latency_processed_frames = set() + self._user_stopped_time = 0 self._latencies = [] - if latency_tracker: - - @latency_tracker.event_handler("on_latency_measured") - async def on_latency_measured(tracker, latency_seconds): - await self._handle_latency_measured(latency_seconds) - async def on_push_frame(self, data: FramePushed): - """Process frames to handle pipeline end events. + """Process frames to track speech timing and calculate latency. Args: data: Frame push event containing the frame and direction information. """ - if isinstance(data.frame, (EndFrame, CancelFrame)): + # Only process downstream frames + if data.direction != FrameDirection.DOWNSTREAM: + return + + # Skip already processed frames + if data.frame.id in self._user_bot_latency_processed_frames: + return + + self._user_bot_latency_processed_frames.add(data.frame.id) + + if isinstance(data.frame, VADUserStartedSpeakingFrame): + self._user_stopped_time = 0 + elif isinstance(data.frame, VADUserStoppedSpeakingFrame): + self._user_stopped_time = time.time() + elif isinstance(data.frame, (EndFrame, CancelFrame)): self._log_summary() - - async def _handle_latency_measured(self, latency_seconds: float): - """Handle latency measurement events. - - Called when the latency tracker measures user-to-bot latency. - Stores the latency and logs it. - - Args: - latency_seconds: The measured latency in seconds. - """ - self._latencies.append(latency_seconds) - self._log_latency(latency_seconds) + elif isinstance(data.frame, BotStartedSpeakingFrame) and self._user_stopped_time: + latency = time.time() - self._user_stopped_time + self._user_stopped_time = 0 + self._latencies.append(latency) + self._log_latency(latency) def _log_summary(self): if not self._latencies: