Feat: Add user-bot latency to OTel turn spans

This adds user-to-bot response latency tracking to OpenTelemetry spans:

- Created UserBotLatencyObserver as a reusable component for tracking
user-to-bot response latency
- Records the value as an attribute on turn spans (turn.user_bot_latency_seconds)
- Updated TurnTraceObserver to use UserBotLatencyObserver, following the same pattern as TurnTrackingObserver
- Updated PipelineTask to automatically create and wire UserBotLatencyObserver
when tracing is enabled (same as TurnTrackingObserver)
This commit is contained in:
Derek Haynes
2026-01-05 12:16:22 -07:00
committed by Mark Backman
parent 5a5a98b497
commit 93138466d6
5 changed files with 154 additions and 44 deletions

View File

@@ -15,6 +15,7 @@ from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.observers.loggers.user_bot_latency_log_observer import UserBotLatencyLogObserver
from pipecat.observers.user_bot_latency_observer import UserBotLatencyObserver
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -96,6 +97,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
]
)
# Create latency tracking observers
latency_tracker = UserBotLatencyObserver()
latency_log_observer = UserBotLatencyLogObserver(latency_tracker)
task = PipelineTask(
pipeline,
params=PipelineParams(
@@ -103,7 +108,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
observers=[UserBotLatencyLogObserver()],
observers=[latency_tracker, latency_log_observer],
)
turn_observer = task.turn_tracking_observer

View File

@@ -6,67 +6,63 @@
"""Observer for measuring user-to-bot response latency."""
import time
from statistics import mean
from loguru import logger
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
CancelFrame,
EndFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.processors.frame_processor import FrameDirection
from pipecat.observers.user_bot_latency_observer import UserBotLatencyObserver
class UserBotLatencyLogObserver(BaseObserver):
"""Observer that measures time between user stopping speech and bot starting speech.
"""Observer that logs user-to-bot response latency.
This helps measure how quickly the AI services respond by tracking
conversation turn timing and logging latency metrics.
Uses UserBotLatencyObserver to track latency measurements and provides
logging and statistics. Logs individual latencies and a summary with
average, min, and max values when the pipeline ends.
"""
def __init__(self):
"""Initialize the latency observer.
def __init__(self, latency_tracker: UserBotLatencyObserver, **kwargs):
"""Initialize the latency log observer.
Sets up tracking for processed frames and user speech timing
to calculate response latencies.
Args:
latency_tracker: The latency tracking observer to monitor.
**kwargs: Additional arguments passed to parent class.
"""
super().__init__()
self._user_bot_latency_processed_frames = set()
self._user_stopped_time = 0
super().__init__(**kwargs)
self._latency_tracker = latency_tracker
self._latencies = []
if latency_tracker:
@latency_tracker.event_handler("on_latency_measured")
async def on_latency_measured(tracker, latency_seconds):
await self._handle_latency_measured(latency_seconds)
async def on_push_frame(self, data: FramePushed):
"""Process frames to track speech timing and calculate latency.
"""Process frames to handle pipeline end events.
Args:
data: Frame push event containing the frame and direction information.
"""
# Only process downstream frames
if data.direction != FrameDirection.DOWNSTREAM:
return
# Skip already processed frames
if data.frame.id in self._user_bot_latency_processed_frames:
return
self._user_bot_latency_processed_frames.add(data.frame.id)
if isinstance(data.frame, VADUserStartedSpeakingFrame):
self._user_stopped_time = 0
elif isinstance(data.frame, VADUserStoppedSpeakingFrame):
self._user_stopped_time = time.time()
elif isinstance(data.frame, (EndFrame, CancelFrame)):
if isinstance(data.frame, (EndFrame, CancelFrame)):
self._log_summary()
elif isinstance(data.frame, BotStartedSpeakingFrame) and self._user_stopped_time:
latency = time.time() - self._user_stopped_time
self._user_stopped_time = 0
self._latencies.append(latency)
self._log_latency(latency)
async def _handle_latency_measured(self, latency_seconds: float):
"""Handle latency measurement events.
Called when the latency tracker measures user-to-bot latency.
Stores the latency and logs it.
Args:
latency_seconds: The measured latency in seconds.
"""
self._latencies.append(latency_seconds)
self._log_latency(latency_seconds)
def _log_summary(self):
if not self._latencies:

View File

@@ -0,0 +1,81 @@
"""Observer for tracking user-to-bot response latency.
This module provides an observer that monitors the time between when a user
stops speaking and when the bot starts speaking, emitting events when latency
is measured.
"""
import time
from typing import Optional, Set
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.processors.frame_processor import FrameDirection
class UserBotLatencyObserver(BaseObserver):
"""Observer that tracks user-to-bot response latency.
Measures the time between when a user stops speaking (VADUserStoppedSpeakingFrame)
and when the bot starts speaking (BotStartedSpeakingFrame). Emits events when
latency is measured, allowing consumers to log, trace, or otherwise process
the latency data.
This observer follows the composition pattern used by TurnTrackingObserver,
acting as a reusable component for latency measurement.
Events:
on_latency_measured(observer, latency_seconds): Emitted when user-to-bot
latency is calculated. Includes the latency value in seconds as a float.
"""
def __init__(self, **kwargs):
"""Initialize the user-bot latency observer.
Sets up tracking for processed frames and user speech timing
to calculate response latencies.
Args:
**kwargs: Additional arguments passed to parent class.
"""
super().__init__(**kwargs)
self._user_stopped_time: Optional[float] = None
self._processed_frames: Set[str] = set()
self._register_event_handler("on_latency_measured")
async def on_push_frame(self, data: FramePushed):
"""Process frames to track speech timing and calculate latency.
Tracks VAD events and bot speaking events to measure the time between
user stopping speech and bot starting speech.
Args:
data: Frame push event containing the frame and direction information.
"""
# Only process downstream frames
if data.direction != FrameDirection.DOWNSTREAM:
return
# Skip already processed frames
if data.frame.id in self._processed_frames:
return
self._processed_frames.add(data.frame.id)
# Track VAD and bot speaking events for latency
if isinstance(data.frame, VADUserStartedSpeakingFrame):
# Reset when user starts speaking
self._user_stopped_time = None
elif isinstance(data.frame, VADUserStoppedSpeakingFrame):
# Record timestamp when user stops speaking
self._user_stopped_time = time.time()
elif isinstance(data.frame, BotStartedSpeakingFrame) and self._user_stopped_time:
# Calculate and emit latency
latency = time.time() - self._user_stopped_time
self._user_stopped_time = None
await self._call_event_handler("on_latency_measured", latency)

View File

@@ -43,6 +43,7 @@ from pipecat.frames.frames import (
from pipecat.metrics.metrics import ProcessingMetricsData, TTFBMetricsData
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.observers.turn_tracking_observer import TurnTrackingObserver
from pipecat.observers.user_bot_latency_observer import UserBotLatencyObserver
from pipecat.pipeline.base_pipeline import BasePipeline
from pipecat.pipeline.base_task import BasePipelineTask, PipelineTaskParams
from pipecat.pipeline.pipeline import Pipeline, PipelineSink, PipelineSource
@@ -287,13 +288,19 @@ class PipelineTask(BasePipelineTask):
observers = self._params.observers
observers = observers or []
self._turn_tracking_observer: Optional[TurnTrackingObserver] = None
self._user_bot_latency_observer: Optional[UserBotLatencyObserver] = None
self._turn_trace_observer: Optional[TurnTraceObserver] = None
if self._enable_turn_tracking:
self._turn_tracking_observer = TurnTrackingObserver()
observers.append(self._turn_tracking_observer)
if self._enable_tracing and self._turn_tracking_observer:
# Create latency observer for tracing
self._user_bot_latency_observer = UserBotLatencyObserver()
observers.append(self._user_bot_latency_observer)
# Create turn trace observer with latency tracking
self._turn_trace_observer = TurnTraceObserver(
self._turn_tracking_observer,
latency_tracker=self._user_bot_latency_observer,
conversation_id=self._conversation_id,
additional_span_attributes=self._additional_span_attributes,
)

View File

@@ -18,6 +18,7 @@ from loguru import logger
from pipecat.frames.frames import StartFrame
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.observers.turn_tracking_observer import TurnTrackingObserver
from pipecat.observers.user_bot_latency_observer import UserBotLatencyObserver
from pipecat.utils.tracing.conversation_context_provider import ConversationContextProvider
from pipecat.utils.tracing.setup import is_tracing_available
from pipecat.utils.tracing.turn_context_provider import TurnContextProvider
@@ -45,6 +46,7 @@ class TurnTraceObserver(BaseObserver):
def __init__(
self,
turn_tracker: TurnTrackingObserver,
latency_tracker: UserBotLatencyObserver,
conversation_id: Optional[str] = None,
additional_span_attributes: Optional[dict] = None,
**kwargs,
@@ -53,12 +55,14 @@ class TurnTraceObserver(BaseObserver):
Args:
turn_tracker: The turn tracking observer to monitor.
latency_tracker: The latency tracking observer for user-bot latency.
conversation_id: Optional conversation ID for grouping turns.
additional_span_attributes: Additional attributes to add to spans.
**kwargs: Additional arguments passed to parent class.
"""
super().__init__(**kwargs)
self._turn_tracker = turn_tracker
self._latency_tracker = latency_tracker
self._current_span: Optional["Span"] = None
self._current_turn_number: int = 0
self._trace_context_map: Dict[int, "SpanContext"] = {}
@@ -69,15 +73,32 @@ class TurnTraceObserver(BaseObserver):
self._conversation_id = conversation_id
self._additional_span_attributes = additional_span_attributes or {}
if turn_tracker:
@turn_tracker.event_handler("on_turn_started")
async def on_turn_started(tracker, turn_number):
await self._handle_turn_started(turn_number)
@turn_tracker.event_handler("on_turn_started")
async def on_turn_started(tracker, turn_number):
await self._handle_turn_started(turn_number)
@turn_tracker.event_handler("on_turn_ended")
async def on_turn_ended(tracker, turn_number, duration, was_interrupted):
await self._handle_turn_ended(turn_number, duration, was_interrupted)
@turn_tracker.event_handler("on_turn_ended")
async def on_turn_ended(tracker, turn_number, duration, was_interrupted):
await self._handle_turn_ended(turn_number, duration, was_interrupted)
@latency_tracker.event_handler("on_latency_measured")
async def on_latency_measured(tracker, latency_seconds):
await self._handle_latency_measured(latency_seconds)
async def _handle_latency_measured(self, latency_seconds: float):
"""Handle latency measurement events.
Called when the latency tracker measures user-to-bot latency.
Adds the latency as an attribute to the current turn span.
Args:
latency_seconds: The measured latency in seconds.
"""
if self._current_span and is_tracing_available():
self._current_span.set_attribute("turn.user_bot_latency_seconds", latency_seconds)
logger.debug(
f"Turn {self._current_turn_number} user-bot latency: {latency_seconds:.3f}s"
)
async def on_push_frame(self, data: FramePushed):
"""Process a frame without modifying it.