diff --git a/examples/foundational/29-turn-tracking-observer.py b/examples/foundational/29-turn-tracking-observer.py index b6d8dcd88..04151bf26 100644 --- a/examples/foundational/29-turn-tracking-observer.py +++ b/examples/foundational/29-turn-tracking-observer.py @@ -15,6 +15,7 @@ from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams from pipecat.frames.frames import LLMRunFrame from pipecat.observers.loggers.user_bot_latency_log_observer import UserBotLatencyLogObserver +from pipecat.observers.user_bot_latency_observer import UserBotLatencyObserver from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -96,6 +97,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ] ) + # Create latency tracking observers + latency_tracker = UserBotLatencyObserver() + latency_log_observer = UserBotLatencyLogObserver(latency_tracker) + task = PipelineTask( pipeline, params=PipelineParams( @@ -103,7 +108,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): enable_usage_metrics=True, ), idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, - observers=[UserBotLatencyLogObserver()], + observers=[latency_tracker, latency_log_observer], ) turn_observer = task.turn_tracking_observer diff --git a/src/pipecat/observers/loggers/user_bot_latency_log_observer.py b/src/pipecat/observers/loggers/user_bot_latency_log_observer.py index b8dff734e..e46be9541 100644 --- a/src/pipecat/observers/loggers/user_bot_latency_log_observer.py +++ b/src/pipecat/observers/loggers/user_bot_latency_log_observer.py @@ -6,67 +6,63 @@ """Observer for measuring user-to-bot response latency.""" -import time from statistics import mean from loguru import logger from pipecat.frames.frames import ( - BotStartedSpeakingFrame, CancelFrame, EndFrame, - VADUserStartedSpeakingFrame, - VADUserStoppedSpeakingFrame, ) from pipecat.observers.base_observer import BaseObserver, FramePushed -from pipecat.processors.frame_processor import FrameDirection +from pipecat.observers.user_bot_latency_observer import UserBotLatencyObserver class UserBotLatencyLogObserver(BaseObserver): - """Observer that measures time between user stopping speech and bot starting speech. + """Observer that logs user-to-bot response latency. - This helps measure how quickly the AI services respond by tracking - conversation turn timing and logging latency metrics. + Uses UserBotLatencyObserver to track latency measurements and provides + logging and statistics. Logs individual latencies and a summary with + average, min, and max values when the pipeline ends. """ - def __init__(self): - """Initialize the latency observer. + def __init__(self, latency_tracker: UserBotLatencyObserver, **kwargs): + """Initialize the latency log observer. - Sets up tracking for processed frames and user speech timing - to calculate response latencies. + Args: + latency_tracker: The latency tracking observer to monitor. + **kwargs: Additional arguments passed to parent class. """ - super().__init__() - self._user_bot_latency_processed_frames = set() - self._user_stopped_time = 0 + super().__init__(**kwargs) + self._latency_tracker = latency_tracker self._latencies = [] + if latency_tracker: + + @latency_tracker.event_handler("on_latency_measured") + async def on_latency_measured(tracker, latency_seconds): + await self._handle_latency_measured(latency_seconds) + async def on_push_frame(self, data: FramePushed): - """Process frames to track speech timing and calculate latency. + """Process frames to handle pipeline end events. Args: data: Frame push event containing the frame and direction information. """ - # Only process downstream frames - if data.direction != FrameDirection.DOWNSTREAM: - return - - # Skip already processed frames - if data.frame.id in self._user_bot_latency_processed_frames: - return - - self._user_bot_latency_processed_frames.add(data.frame.id) - - if isinstance(data.frame, VADUserStartedSpeakingFrame): - self._user_stopped_time = 0 - elif isinstance(data.frame, VADUserStoppedSpeakingFrame): - self._user_stopped_time = time.time() - elif isinstance(data.frame, (EndFrame, CancelFrame)): + if isinstance(data.frame, (EndFrame, CancelFrame)): self._log_summary() - elif isinstance(data.frame, BotStartedSpeakingFrame) and self._user_stopped_time: - latency = time.time() - self._user_stopped_time - self._user_stopped_time = 0 - self._latencies.append(latency) - self._log_latency(latency) + + async def _handle_latency_measured(self, latency_seconds: float): + """Handle latency measurement events. + + Called when the latency tracker measures user-to-bot latency. + Stores the latency and logs it. + + Args: + latency_seconds: The measured latency in seconds. + """ + self._latencies.append(latency_seconds) + self._log_latency(latency_seconds) def _log_summary(self): if not self._latencies: diff --git a/src/pipecat/observers/user_bot_latency_observer.py b/src/pipecat/observers/user_bot_latency_observer.py new file mode 100644 index 000000000..475da0c5d --- /dev/null +++ b/src/pipecat/observers/user_bot_latency_observer.py @@ -0,0 +1,81 @@ +"""Observer for tracking user-to-bot response latency. + +This module provides an observer that monitors the time between when a user +stops speaking and when the bot starts speaking, emitting events when latency +is measured. +""" + +import time +from typing import Optional, Set + +from pipecat.frames.frames import ( + BotStartedSpeakingFrame, + VADUserStartedSpeakingFrame, + VADUserStoppedSpeakingFrame, +) +from pipecat.observers.base_observer import BaseObserver, FramePushed +from pipecat.processors.frame_processor import FrameDirection + + +class UserBotLatencyObserver(BaseObserver): + """Observer that tracks user-to-bot response latency. + + Measures the time between when a user stops speaking (VADUserStoppedSpeakingFrame) + and when the bot starts speaking (BotStartedSpeakingFrame). Emits events when + latency is measured, allowing consumers to log, trace, or otherwise process + the latency data. + + This observer follows the composition pattern used by TurnTrackingObserver, + acting as a reusable component for latency measurement. + + Events: + on_latency_measured(observer, latency_seconds): Emitted when user-to-bot + latency is calculated. Includes the latency value in seconds as a float. + """ + + def __init__(self, **kwargs): + """Initialize the user-bot latency observer. + + Sets up tracking for processed frames and user speech timing + to calculate response latencies. + + Args: + **kwargs: Additional arguments passed to parent class. + """ + super().__init__(**kwargs) + self._user_stopped_time: Optional[float] = None + self._processed_frames: Set[str] = set() + + self._register_event_handler("on_latency_measured") + + async def on_push_frame(self, data: FramePushed): + """Process frames to track speech timing and calculate latency. + + Tracks VAD events and bot speaking events to measure the time between + user stopping speech and bot starting speech. + + Args: + data: Frame push event containing the frame and direction information. + """ + # Only process downstream frames + if data.direction != FrameDirection.DOWNSTREAM: + return + + # Skip already processed frames + if data.frame.id in self._processed_frames: + return + + self._processed_frames.add(data.frame.id) + + # Track VAD and bot speaking events for latency + if isinstance(data.frame, VADUserStartedSpeakingFrame): + # Reset when user starts speaking + self._user_stopped_time = None + elif isinstance(data.frame, VADUserStoppedSpeakingFrame): + # Record timestamp when user stops speaking + self._user_stopped_time = time.time() + elif isinstance(data.frame, BotStartedSpeakingFrame) and self._user_stopped_time: + # Calculate and emit latency + latency = time.time() - self._user_stopped_time + self._user_stopped_time = None + await self._call_event_handler("on_latency_measured", latency) diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 4ccc879b8..e643164eb 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -43,6 +43,7 @@ from pipecat.frames.frames import ( from pipecat.metrics.metrics import ProcessingMetricsData, TTFBMetricsData from pipecat.observers.base_observer import BaseObserver, FramePushed from pipecat.observers.turn_tracking_observer import TurnTrackingObserver +from pipecat.observers.user_bot_latency_observer import UserBotLatencyObserver from pipecat.pipeline.base_pipeline import BasePipeline from pipecat.pipeline.base_task import BasePipelineTask, PipelineTaskParams from pipecat.pipeline.pipeline import Pipeline, PipelineSink, PipelineSource @@ -287,13 +288,19 @@ class PipelineTask(BasePipelineTask): observers = self._params.observers observers = observers or [] self._turn_tracking_observer: Optional[TurnTrackingObserver] = None + self._user_bot_latency_observer: Optional[UserBotLatencyObserver] = None self._turn_trace_observer: Optional[TurnTraceObserver] = None if self._enable_turn_tracking: self._turn_tracking_observer = TurnTrackingObserver() observers.append(self._turn_tracking_observer) if self._enable_tracing and self._turn_tracking_observer: + # Create latency observer for tracing + self._user_bot_latency_observer = UserBotLatencyObserver() + observers.append(self._user_bot_latency_observer) + # Create turn trace observer with latency tracking self._turn_trace_observer = TurnTraceObserver( self._turn_tracking_observer, + latency_tracker=self._user_bot_latency_observer, conversation_id=self._conversation_id, additional_span_attributes=self._additional_span_attributes, ) diff --git a/src/pipecat/utils/tracing/turn_trace_observer.py b/src/pipecat/utils/tracing/turn_trace_observer.py index 59b0bd2ae..78a9c9ea9 100644 --- a/src/pipecat/utils/tracing/turn_trace_observer.py +++ b/src/pipecat/utils/tracing/turn_trace_observer.py @@ -18,6 +18,7 @@ from loguru import logger from pipecat.frames.frames import StartFrame from pipecat.observers.base_observer import BaseObserver, FramePushed from pipecat.observers.turn_tracking_observer import TurnTrackingObserver +from pipecat.observers.user_bot_latency_observer import UserBotLatencyObserver from pipecat.utils.tracing.conversation_context_provider import ConversationContextProvider from pipecat.utils.tracing.setup import is_tracing_available from pipecat.utils.tracing.turn_context_provider import TurnContextProvider @@ -45,6 +46,7 @@ class TurnTraceObserver(BaseObserver): def __init__( self, turn_tracker: TurnTrackingObserver, + latency_tracker: UserBotLatencyObserver, conversation_id: Optional[str] = None, additional_span_attributes: Optional[dict] = None, **kwargs, @@ -53,12 +55,14 @@ class TurnTraceObserver(BaseObserver): Args: turn_tracker: The turn tracking observer to monitor. + latency_tracker: The latency tracking observer for user-bot latency. conversation_id: Optional conversation ID for grouping turns. additional_span_attributes: Additional attributes to add to spans. **kwargs: Additional arguments passed to parent class. """ super().__init__(**kwargs) self._turn_tracker = turn_tracker + self._latency_tracker = latency_tracker self._current_span: Optional["Span"] = None self._current_turn_number: int = 0 self._trace_context_map: Dict[int, "SpanContext"] = {} @@ -69,15 +73,32 @@ class TurnTraceObserver(BaseObserver): self._conversation_id = conversation_id self._additional_span_attributes = additional_span_attributes or {} - if turn_tracker: + @turn_tracker.event_handler("on_turn_started") + async def on_turn_started(tracker, turn_number): + await self._handle_turn_started(turn_number) - @turn_tracker.event_handler("on_turn_started") - async def on_turn_started(tracker, turn_number): - await self._handle_turn_started(turn_number) + @turn_tracker.event_handler("on_turn_ended") + async def on_turn_ended(tracker, turn_number, duration, was_interrupted): + await self._handle_turn_ended(turn_number, duration, was_interrupted) - @turn_tracker.event_handler("on_turn_ended") - async def on_turn_ended(tracker, turn_number, duration, was_interrupted): - await self._handle_turn_ended(turn_number, duration, was_interrupted) + @latency_tracker.event_handler("on_latency_measured") + async def on_latency_measured(tracker, latency_seconds): + await self._handle_latency_measured(latency_seconds) + + async def _handle_latency_measured(self, latency_seconds: float): + """Handle latency measurement events. + + Called when the latency tracker measures user-to-bot latency. + Adds the latency as an attribute to the current turn span. + + Args: + latency_seconds: The measured latency in seconds. + """ + if self._current_span and is_tracing_available(): + self._current_span.set_attribute("turn.user_bot_latency_seconds", latency_seconds) + logger.debug( + f"Turn {self._current_turn_number} user-bot latency: {latency_seconds:.3f}s" + ) async def on_push_frame(self, data: FramePushed): """Process a frame without modifying it.