Add chronological_events utility function to display UserBotLatencyObserver report

This commit is contained in:
Mark Backman
2026-03-02 19:23:42 -05:00
parent 8f66272de7
commit 7dbb130666
3 changed files with 114 additions and 25 deletions

View File

@@ -184,30 +184,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
@latency_observer.event_handler("on_latency_breakdown")
async def on_latency_breakdown(observer, breakdown):
# Display a sequential waterfall that roughly adds up to the total.
# User turn is the first stage: user silence → turn release.
# The STT TTFB is shown as context within the user turn since
# it's a component of that time (along with VAD silence and any
# turn analyzer delay).
stt_ttfb = next((t for t in breakdown.ttfb if "STT" in t.processor), None)
if breakdown.user_turn_secs is not None:
stt_note = f" (STT: {stt_ttfb.duration_secs:.3f}s)" if stt_ttfb else ""
logger.info(f" User turn: {breakdown.user_turn_secs:.3f}s{stt_note}")
# Show non-STT TTFBs, inserting function calls after the first
# LLM TTFB (which triggered the calls) for a chronological waterfall.
non_stt = [t for t in breakdown.ttfb if t is not stt_ttfb]
fc_shown = False
for ttfb in non_stt:
logger.info(f" {ttfb.processor}: TTFB {ttfb.duration_secs:.3f}s")
if not fc_shown and breakdown.function_calls:
for fc in breakdown.function_calls:
logger.info(f" {fc.function_name}: {fc.duration_secs:.3f}s")
fc_shown = True
if breakdown.text_aggregation:
ta = breakdown.text_aggregation
logger.info(f" {ta.processor}: text aggregation {ta.duration_secs:.3f}s")
for event in breakdown.chronological_events():
logger.info(f" {event}")
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):

View File

@@ -111,6 +111,35 @@ class LatencyBreakdown(BaseModel):
user_turn_secs: Optional[float] = None
function_calls: List[FunctionCallMetrics] = Field(default_factory=list)
def chronological_events(self) -> List[str]:
"""Return human-readable event labels sorted by start time.
Collects all sub-metrics into a flat list, sorts by ``start_time``,
and returns formatted strings suitable for logging.
Returns:
List of formatted strings, one per event, in chronological order.
"""
events: List[tuple] = []
if self.user_turn_start_time is not None and self.user_turn_secs is not None:
events.append((self.user_turn_start_time, f"User turn: {self.user_turn_secs:.3f}s"))
for t in self.ttfb:
events.append((t.start_time, f"{t.processor}: TTFB {t.duration_secs:.3f}s"))
for fc in self.function_calls:
events.append((fc.start_time, f"{fc.function_name}: {fc.duration_secs:.3f}s"))
if self.text_aggregation:
ta = self.text_aggregation
events.append(
(ta.start_time, f"{ta.processor}: text aggregation {ta.duration_secs:.3f}s")
)
events.sort(key=lambda e: e[0])
return [label for _, label in events]
class UserBotLatencyObserver(BaseObserver):
"""Observer that tracks user-to-bot response latency.

View File

@@ -15,7 +15,13 @@ from pipecat.metrics.metrics import (
TextAggregationMetricsData,
TTFBMetricsData,
)
from pipecat.observers.user_bot_latency_observer import UserBotLatencyObserver
from pipecat.observers.user_bot_latency_observer import (
FunctionCallMetrics,
LatencyBreakdown,
TextAggregationBreakdownMetrics,
TTFBBreakdownMetrics,
UserBotLatencyObserver,
)
from pipecat.processors.filters.identity_filter import IdentityFilter
from pipecat.tests.utils import SleepFrame, run_test
@@ -545,5 +551,81 @@ class TestUserBotLatencyObserver(unittest.IsolatedAsyncioTestCase):
self.assertEqual(len(breakdowns[0].function_calls), 0)
class TestLatencyBreakdownChronologicalEvents(unittest.TestCase):
"""Tests for LatencyBreakdown.chronological_events()."""
def test_events_sorted_by_start_time(self):
"""Test that events are returned in chronological order."""
breakdown = LatencyBreakdown(
user_turn_start_time=100.0,
user_turn_secs=0.150,
ttfb=[
TTFBBreakdownMetrics(
processor="OpenAILLMService#0",
model="gpt-4o",
start_time=100.200,
duration_secs=0.250,
),
TTFBBreakdownMetrics(
processor="DeepgramSTTService#0",
start_time=100.050,
duration_secs=0.080,
),
TTFBBreakdownMetrics(
processor="CartesiaTTSService#0",
start_time=100.500,
duration_secs=0.070,
),
],
function_calls=[
FunctionCallMetrics(
function_name="get_weather",
start_time=100.450,
duration_secs=0.120,
),
],
text_aggregation=TextAggregationBreakdownMetrics(
processor="CartesiaTTSService#0",
start_time=100.480,
duration_secs=0.030,
),
)
events = breakdown.chronological_events()
self.assertEqual(len(events), 6)
self.assertEqual(events[0], "User turn: 0.150s")
self.assertEqual(events[1], "DeepgramSTTService#0: TTFB 0.080s")
self.assertEqual(events[2], "OpenAILLMService#0: TTFB 0.250s")
self.assertEqual(events[3], "get_weather: 0.120s")
self.assertEqual(events[4], "CartesiaTTSService#0: text aggregation 0.030s")
self.assertEqual(events[5], "CartesiaTTSService#0: TTFB 0.070s")
def test_empty_breakdown(self):
"""Test that an empty breakdown returns no events."""
breakdown = LatencyBreakdown()
self.assertEqual(breakdown.chronological_events(), [])
def test_user_turn_requires_both_fields(self):
"""Test that user turn is only included when both start_time and secs are set."""
# Only start_time, no duration
breakdown = LatencyBreakdown(user_turn_start_time=100.0)
self.assertEqual(breakdown.chronological_events(), [])
# Only duration, no start_time
breakdown = LatencyBreakdown(user_turn_secs=0.150)
self.assertEqual(breakdown.chronological_events(), [])
def test_ttfb_only(self):
"""Test breakdown with only TTFB metrics."""
breakdown = LatencyBreakdown(
ttfb=[
TTFBBreakdownMetrics(processor="LLM#0", start_time=100.0, duration_secs=0.200),
],
)
events = breakdown.chronological_events()
self.assertEqual(events, ["LLM#0: TTFB 0.200s"])
if __name__ == "__main__":
unittest.main()