diff --git a/examples/foundational/29-turn-tracking-observer.py b/examples/foundational/29-turn-tracking-observer.py index 476dc4612..cf85972e1 100644 --- a/examples/foundational/29-turn-tracking-observer.py +++ b/examples/foundational/29-turn-tracking-observer.py @@ -184,30 +184,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): @latency_observer.event_handler("on_latency_breakdown") async def on_latency_breakdown(observer, breakdown): - # Display a sequential waterfall that roughly adds up to the total. - # User turn is the first stage: user silence → turn release. - # The STT TTFB is shown as context within the user turn since - # it's a component of that time (along with VAD silence and any - # turn analyzer delay). - stt_ttfb = next((t for t in breakdown.ttfb if "STT" in t.processor), None) - if breakdown.user_turn_secs is not None: - stt_note = f" (STT: {stt_ttfb.duration_secs:.3f}s)" if stt_ttfb else "" - logger.info(f" User turn: {breakdown.user_turn_secs:.3f}s{stt_note}") - - # Show non-STT TTFBs, inserting function calls after the first - # LLM TTFB (which triggered the calls) for a chronological waterfall. - non_stt = [t for t in breakdown.ttfb if t is not stt_ttfb] - fc_shown = False - for ttfb in non_stt: - logger.info(f" {ttfb.processor}: TTFB {ttfb.duration_secs:.3f}s") - if not fc_shown and breakdown.function_calls: - for fc in breakdown.function_calls: - logger.info(f" {fc.function_name}: {fc.duration_secs:.3f}s") - fc_shown = True - - if breakdown.text_aggregation: - ta = breakdown.text_aggregation - logger.info(f" {ta.processor}: text aggregation {ta.duration_secs:.3f}s") + for event in breakdown.chronological_events(): + logger.info(f" {event}") @transport.event_handler("on_client_connected") async def on_client_connected(transport, client): diff --git a/src/pipecat/observers/user_bot_latency_observer.py b/src/pipecat/observers/user_bot_latency_observer.py index aa0887e30..0672b689c 100644 --- a/src/pipecat/observers/user_bot_latency_observer.py +++ b/src/pipecat/observers/user_bot_latency_observer.py @@ -111,6 +111,35 @@ class LatencyBreakdown(BaseModel): user_turn_secs: Optional[float] = None function_calls: List[FunctionCallMetrics] = Field(default_factory=list) + def chronological_events(self) -> List[str]: + """Return human-readable event labels sorted by start time. + + Collects all sub-metrics into a flat list, sorts by ``start_time``, + and returns formatted strings suitable for logging. + + Returns: + List of formatted strings, one per event, in chronological order. + """ + events: List[tuple] = [] + + if self.user_turn_start_time is not None and self.user_turn_secs is not None: + events.append((self.user_turn_start_time, f"User turn: {self.user_turn_secs:.3f}s")) + + for t in self.ttfb: + events.append((t.start_time, f"{t.processor}: TTFB {t.duration_secs:.3f}s")) + + for fc in self.function_calls: + events.append((fc.start_time, f"{fc.function_name}: {fc.duration_secs:.3f}s")) + + if self.text_aggregation: + ta = self.text_aggregation + events.append( + (ta.start_time, f"{ta.processor}: text aggregation {ta.duration_secs:.3f}s") + ) + + events.sort(key=lambda e: e[0]) + return [label for _, label in events] + class UserBotLatencyObserver(BaseObserver): """Observer that tracks user-to-bot response latency. diff --git a/tests/test_user_bot_latency_observer.py b/tests/test_user_bot_latency_observer.py index 42d5d3367..96c24724b 100644 --- a/tests/test_user_bot_latency_observer.py +++ b/tests/test_user_bot_latency_observer.py @@ -15,7 +15,13 @@ from pipecat.metrics.metrics import ( TextAggregationMetricsData, TTFBMetricsData, ) -from pipecat.observers.user_bot_latency_observer import UserBotLatencyObserver +from pipecat.observers.user_bot_latency_observer import ( + FunctionCallMetrics, + LatencyBreakdown, + TextAggregationBreakdownMetrics, + TTFBBreakdownMetrics, + UserBotLatencyObserver, +) from pipecat.processors.filters.identity_filter import IdentityFilter from pipecat.tests.utils import SleepFrame, run_test @@ -545,5 +551,81 @@ class TestUserBotLatencyObserver(unittest.IsolatedAsyncioTestCase): self.assertEqual(len(breakdowns[0].function_calls), 0) +class TestLatencyBreakdownChronologicalEvents(unittest.TestCase): + """Tests for LatencyBreakdown.chronological_events().""" + + def test_events_sorted_by_start_time(self): + """Test that events are returned in chronological order.""" + breakdown = LatencyBreakdown( + user_turn_start_time=100.0, + user_turn_secs=0.150, + ttfb=[ + TTFBBreakdownMetrics( + processor="OpenAILLMService#0", + model="gpt-4o", + start_time=100.200, + duration_secs=0.250, + ), + TTFBBreakdownMetrics( + processor="DeepgramSTTService#0", + start_time=100.050, + duration_secs=0.080, + ), + TTFBBreakdownMetrics( + processor="CartesiaTTSService#0", + start_time=100.500, + duration_secs=0.070, + ), + ], + function_calls=[ + FunctionCallMetrics( + function_name="get_weather", + start_time=100.450, + duration_secs=0.120, + ), + ], + text_aggregation=TextAggregationBreakdownMetrics( + processor="CartesiaTTSService#0", + start_time=100.480, + duration_secs=0.030, + ), + ) + + events = breakdown.chronological_events() + + self.assertEqual(len(events), 6) + self.assertEqual(events[0], "User turn: 0.150s") + self.assertEqual(events[1], "DeepgramSTTService#0: TTFB 0.080s") + self.assertEqual(events[2], "OpenAILLMService#0: TTFB 0.250s") + self.assertEqual(events[3], "get_weather: 0.120s") + self.assertEqual(events[4], "CartesiaTTSService#0: text aggregation 0.030s") + self.assertEqual(events[5], "CartesiaTTSService#0: TTFB 0.070s") + + def test_empty_breakdown(self): + """Test that an empty breakdown returns no events.""" + breakdown = LatencyBreakdown() + self.assertEqual(breakdown.chronological_events(), []) + + def test_user_turn_requires_both_fields(self): + """Test that user turn is only included when both start_time and secs are set.""" + # Only start_time, no duration + breakdown = LatencyBreakdown(user_turn_start_time=100.0) + self.assertEqual(breakdown.chronological_events(), []) + + # Only duration, no start_time + breakdown = LatencyBreakdown(user_turn_secs=0.150) + self.assertEqual(breakdown.chronological_events(), []) + + def test_ttfb_only(self): + """Test breakdown with only TTFB metrics.""" + breakdown = LatencyBreakdown( + ttfb=[ + TTFBBreakdownMetrics(processor="LLM#0", start_time=100.0, duration_secs=0.200), + ], + ) + events = breakdown.chronological_events() + self.assertEqual(events, ["LLM#0: TTFB 0.200s"]) + + if __name__ == "__main__": unittest.main()