Files
pipecat/tests/test_user_bot_latency_observer.py

632 lines
21 KiB
Python

import unittest
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
ClientConnectedFrame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
InterruptionFrame,
MetricsFrame,
UserStoppedSpeakingFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.metrics.metrics import (
TextAggregationMetricsData,
TTFBMetricsData,
)
from pipecat.observers.user_bot_latency_observer import (
FunctionCallMetrics,
LatencyBreakdown,
TextAggregationBreakdownMetrics,
TTFBBreakdownMetrics,
UserBotLatencyObserver,
)
from pipecat.processors.filters.identity_filter import IdentityFilter
from pipecat.tests.utils import SleepFrame, run_test
class TestUserBotLatencyObserver(unittest.IsolatedAsyncioTestCase):
"""Tests for UserBotLatencyObserver."""
async def test_normal_latency_measurement(self):
"""Test basic latency measurement from user stop to bot start."""
# Create observer
observer = UserBotLatencyObserver()
# Create identity filter (passes all frames through)
processor = IdentityFilter()
# Capture latency events
latencies = []
@observer.event_handler("on_latency_measured")
async def on_latency(obs, latency_seconds):
latencies.append(latency_seconds)
# Define frame sequence
frames_to_send = [
VADUserStoppedSpeakingFrame(),
BotStartedSpeakingFrame(),
]
expected_down_frames = [
VADUserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
]
# Run test
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
observers=[observer],
)
# Verify latency was measured
self.assertEqual(len(latencies), 1)
self.assertGreater(latencies[0], 0)
self.assertLess(latencies[0], 1.0) # Should be very quick
async def test_multiple_latency_measurements(self):
"""Test that multiple user-bot exchanges produce separate latency events."""
# Create observer
observer = UserBotLatencyObserver()
# Create identity filter
processor = IdentityFilter()
# Capture latency events
latencies = []
@observer.event_handler("on_latency_measured")
async def on_latency(obs, latency_seconds):
latencies.append(latency_seconds)
# Define frame sequence with two complete cycles
frames_to_send = [
# First cycle
VADUserStoppedSpeakingFrame(),
BotStartedSpeakingFrame(),
# Second cycle
VADUserStoppedSpeakingFrame(),
BotStartedSpeakingFrame(),
]
expected_down_frames = [
VADUserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
]
# Run test
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
observers=[observer],
)
# Verify two separate latencies were measured
self.assertEqual(len(latencies), 2)
self.assertGreater(latencies[0], 0)
self.assertGreater(latencies[1], 0)
async def test_breakdown_with_metrics(self):
"""Test that metrics collected between VADUserStopped and BotStarted appear in breakdown."""
observer = UserBotLatencyObserver()
processor = IdentityFilter()
breakdowns = []
@observer.event_handler("on_latency_breakdown")
async def on_breakdown(obs, breakdown):
breakdowns.append(breakdown)
stt_ttfb = TTFBMetricsData(processor="DeepgramSTTService#0", value=0.080)
llm_ttfb = TTFBMetricsData(processor="OpenAILLMService#0", model="gpt-4o", value=0.250)
tts_ttfb = TTFBMetricsData(processor="CartesiaTTSService#0", value=0.070)
text_agg = TextAggregationMetricsData(processor="CartesiaTTSService#0", value=0.030)
frames_to_send = [
VADUserStoppedSpeakingFrame(),
MetricsFrame(data=[stt_ttfb]),
MetricsFrame(data=[llm_ttfb, text_agg]),
MetricsFrame(data=[tts_ttfb]),
BotStartedSpeakingFrame(),
]
expected_down_frames = [
VADUserStoppedSpeakingFrame,
MetricsFrame,
MetricsFrame,
MetricsFrame,
BotStartedSpeakingFrame,
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
observers=[observer],
)
self.assertEqual(len(breakdowns), 1)
bd = breakdowns[0]
self.assertEqual(len(bd.ttfb), 3)
self.assertEqual(bd.ttfb[0].processor, "DeepgramSTTService#0")
self.assertEqual(bd.ttfb[1].processor, "OpenAILLMService#0")
self.assertEqual(bd.ttfb[2].processor, "CartesiaTTSService#0")
self.assertIsNotNone(bd.text_aggregation)
self.assertEqual(bd.text_aggregation.duration_secs, 0.030)
async def test_interruption_resets_accumulators(self):
"""Test that InterruptionFrame clears stale metrics from earlier cycles."""
observer = UserBotLatencyObserver()
processor = IdentityFilter()
breakdowns = []
@observer.event_handler("on_latency_breakdown")
async def on_breakdown(obs, breakdown):
breakdowns.append(breakdown)
# First cycle metrics (will be interrupted)
stale_llm = TTFBMetricsData(processor="OpenAILLMService#0", value=0.245)
# Second cycle metrics (the ones that matter)
final_llm = TTFBMetricsData(processor="OpenAILLMService#0", value=0.224)
final_tts = TTFBMetricsData(processor="CartesiaTTSService#0", value=0.142)
frames_to_send = [
VADUserStoppedSpeakingFrame(),
MetricsFrame(data=[stale_llm]),
InterruptionFrame(),
MetricsFrame(data=[final_llm]),
MetricsFrame(data=[final_tts]),
BotStartedSpeakingFrame(),
]
expected_down_frames = [
VADUserStoppedSpeakingFrame,
MetricsFrame,
InterruptionFrame,
MetricsFrame,
MetricsFrame,
BotStartedSpeakingFrame,
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
observers=[observer],
)
self.assertEqual(len(breakdowns), 1)
bd = breakdowns[0]
# Only the post-interruption metrics should be present
self.assertEqual(len(bd.ttfb), 2)
self.assertEqual(bd.ttfb[0].processor, "OpenAILLMService#0")
self.assertEqual(bd.ttfb[0].duration_secs, 0.224)
self.assertEqual(bd.ttfb[1].processor, "CartesiaTTSService#0")
self.assertEqual(bd.ttfb[1].duration_secs, 0.142)
async def test_only_first_text_aggregation_kept(self):
"""Test that only the first text aggregation metric is kept per cycle."""
observer = UserBotLatencyObserver()
processor = IdentityFilter()
breakdowns = []
@observer.event_handler("on_latency_breakdown")
async def on_breakdown(obs, breakdown):
breakdowns.append(breakdown)
text_agg_1 = TextAggregationMetricsData(processor="CartesiaTTSService#0", value=0.030)
text_agg_2 = TextAggregationMetricsData(processor="CartesiaTTSService#0", value=0.080)
frames_to_send = [
VADUserStoppedSpeakingFrame(),
MetricsFrame(data=[text_agg_1]),
MetricsFrame(data=[text_agg_2]),
BotStartedSpeakingFrame(),
]
expected_down_frames = [
VADUserStoppedSpeakingFrame,
MetricsFrame,
MetricsFrame,
BotStartedSpeakingFrame,
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
observers=[observer],
)
self.assertEqual(len(breakdowns), 1)
self.assertIsNotNone(breakdowns[0].text_aggregation)
self.assertEqual(breakdowns[0].text_aggregation.duration_secs, 0.030)
async def test_user_turn_measured(self):
"""Test that pre-LLM wait from user silence to UserStopped is captured."""
observer = UserBotLatencyObserver()
processor = IdentityFilter()
breakdowns = []
@observer.event_handler("on_latency_breakdown")
async def on_breakdown(obs, breakdown):
breakdowns.append(breakdown)
frames_to_send = [
VADUserStoppedSpeakingFrame(),
SleepFrame(sleep=0.1), # Simulate turn analyzer wait
UserStoppedSpeakingFrame(),
BotStartedSpeakingFrame(),
]
expected_down_frames = [
VADUserStoppedSpeakingFrame,
UserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
observers=[observer],
)
self.assertEqual(len(breakdowns), 1)
self.assertIsNotNone(breakdowns[0].user_turn_secs)
self.assertGreaterEqual(breakdowns[0].user_turn_secs, 0.1)
async def test_user_turn_none_without_user_stopped(self):
"""Test that user_turn is None when no UserStoppedSpeakingFrame arrives."""
observer = UserBotLatencyObserver()
processor = IdentityFilter()
breakdowns = []
@observer.event_handler("on_latency_breakdown")
async def on_breakdown(obs, breakdown):
breakdowns.append(breakdown)
frames_to_send = [
VADUserStoppedSpeakingFrame(),
BotStartedSpeakingFrame(),
]
expected_down_frames = [
VADUserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
observers=[observer],
)
self.assertEqual(len(breakdowns), 1)
self.assertIsNone(breakdowns[0].user_turn_secs)
async def test_no_measurement_without_user_stop(self):
"""Test that BotStartedSpeaking without prior user stop emits nothing."""
observer = UserBotLatencyObserver()
processor = IdentityFilter()
latencies = []
breakdowns = []
@observer.event_handler("on_latency_measured")
async def on_latency(obs, latency_seconds):
latencies.append(latency_seconds)
@observer.event_handler("on_latency_breakdown")
async def on_breakdown(obs, breakdown):
breakdowns.append(breakdown)
frames_to_send = [
BotStartedSpeakingFrame(),
]
expected_down_frames = [
BotStartedSpeakingFrame,
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
observers=[observer],
)
self.assertEqual(len(latencies), 0)
self.assertEqual(len(breakdowns), 0)
async def test_first_bot_speech_latency(self):
"""Test first bot speech latency and breakdown from ClientConnected to BotStartedSpeaking."""
observer = UserBotLatencyObserver()
processor = IdentityFilter()
first_speech_latencies = []
breakdowns = []
@observer.event_handler("on_first_bot_speech_latency")
async def on_first_bot_speech(obs, latency_seconds):
first_speech_latencies.append(latency_seconds)
@observer.event_handler("on_latency_breakdown")
async def on_breakdown(obs, breakdown):
breakdowns.append(breakdown)
llm_ttfb = TTFBMetricsData(processor="OpenAILLMService#0", value=0.250)
tts_ttfb = TTFBMetricsData(processor="CartesiaTTSService#0", value=0.070)
frames_to_send = [
ClientConnectedFrame(),
MetricsFrame(data=[llm_ttfb]),
MetricsFrame(data=[tts_ttfb]),
BotStartedSpeakingFrame(),
]
expected_down_frames = [
ClientConnectedFrame,
MetricsFrame,
MetricsFrame,
BotStartedSpeakingFrame,
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
observers=[observer],
)
self.assertEqual(len(first_speech_latencies), 1)
self.assertGreater(first_speech_latencies[0], 0)
self.assertLess(first_speech_latencies[0], 1.0)
# Breakdown should also be emitted with the accumulated metrics
self.assertEqual(len(breakdowns), 1)
self.assertEqual(len(breakdowns[0].ttfb), 2)
self.assertEqual(breakdowns[0].ttfb[0].processor, "OpenAILLMService#0")
self.assertEqual(breakdowns[0].ttfb[1].processor, "CartesiaTTSService#0")
async def test_first_bot_speech_only_once(self):
"""Test that first bot speech latency is only emitted once."""
observer = UserBotLatencyObserver()
processor = IdentityFilter()
first_speech_latencies = []
@observer.event_handler("on_first_bot_speech_latency")
async def on_first_bot_speech(obs, latency_seconds):
first_speech_latencies.append(latency_seconds)
frames_to_send = [
ClientConnectedFrame(),
BotStartedSpeakingFrame(),
# Second bot speech should not trigger the event again
VADUserStoppedSpeakingFrame(),
BotStartedSpeakingFrame(),
]
expected_down_frames = [
ClientConnectedFrame,
BotStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
observers=[observer],
)
self.assertEqual(len(first_speech_latencies), 1)
async def test_first_bot_speech_skipped_when_user_speaks_first(self):
"""Test that first bot speech event is not emitted when user speaks before the bot."""
observer = UserBotLatencyObserver()
processor = IdentityFilter()
first_speech_latencies = []
@observer.event_handler("on_first_bot_speech_latency")
async def on_first_bot_speech(obs, latency_seconds):
first_speech_latencies.append(latency_seconds)
frames_to_send = [
ClientConnectedFrame(),
# User speaks before bot has a chance to greet
VADUserStartedSpeakingFrame(),
VADUserStoppedSpeakingFrame(),
BotStartedSpeakingFrame(),
]
expected_down_frames = [
ClientConnectedFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
observers=[observer],
)
self.assertEqual(len(first_speech_latencies), 0)
async def test_function_call_latency_in_breakdown(self):
"""Test that function call duration appears in the latency breakdown."""
observer = UserBotLatencyObserver()
processor = IdentityFilter()
breakdowns = []
@observer.event_handler("on_latency_breakdown")
async def on_breakdown(obs, breakdown):
breakdowns.append(breakdown)
tool_call_id = "call_abc123"
frames_to_send = [
VADUserStoppedSpeakingFrame(),
FunctionCallInProgressFrame(
function_name="get_weather",
tool_call_id=tool_call_id,
arguments={"location": "Atlanta"},
),
SleepFrame(sleep=0.1),
FunctionCallResultFrame(
function_name="get_weather",
tool_call_id=tool_call_id,
arguments={"location": "Atlanta"},
result={"temperature": "75"},
),
BotStartedSpeakingFrame(),
]
await run_test(
processor,
frames_to_send=frames_to_send,
observers=[observer],
)
self.assertEqual(len(breakdowns), 1)
self.assertEqual(len(breakdowns[0].function_calls), 1)
fc = breakdowns[0].function_calls[0]
self.assertEqual(fc.function_name, "get_weather")
self.assertGreaterEqual(fc.duration_secs, 0.1)
async def test_function_call_reset_on_interruption(self):
"""Test that function call metrics are cleared on interruption."""
observer = UserBotLatencyObserver()
processor = IdentityFilter()
breakdowns = []
@observer.event_handler("on_latency_breakdown")
async def on_breakdown(obs, breakdown):
breakdowns.append(breakdown)
frames_to_send = [
VADUserStoppedSpeakingFrame(),
FunctionCallInProgressFrame(
function_name="get_weather",
tool_call_id="call_1",
arguments={},
),
FunctionCallResultFrame(
function_name="get_weather",
tool_call_id="call_1",
arguments={},
result={},
),
InterruptionFrame(),
BotStartedSpeakingFrame(),
]
await run_test(
processor,
frames_to_send=frames_to_send,
observers=[observer],
)
self.assertEqual(len(breakdowns), 1)
self.assertEqual(len(breakdowns[0].function_calls), 0)
class TestLatencyBreakdownChronologicalEvents(unittest.TestCase):
"""Tests for LatencyBreakdown.chronological_events()."""
def test_events_sorted_by_start_time(self):
"""Test that events are returned in chronological order."""
breakdown = LatencyBreakdown(
user_turn_start_time=100.0,
user_turn_secs=0.150,
ttfb=[
TTFBBreakdownMetrics(
processor="OpenAILLMService#0",
model="gpt-4o",
start_time=100.200,
duration_secs=0.250,
),
TTFBBreakdownMetrics(
processor="DeepgramSTTService#0",
start_time=100.050,
duration_secs=0.080,
),
TTFBBreakdownMetrics(
processor="CartesiaTTSService#0",
start_time=100.500,
duration_secs=0.070,
),
],
function_calls=[
FunctionCallMetrics(
function_name="get_weather",
start_time=100.450,
duration_secs=0.120,
),
],
text_aggregation=TextAggregationBreakdownMetrics(
processor="CartesiaTTSService#0",
start_time=100.480,
duration_secs=0.030,
),
)
events = breakdown.chronological_events()
self.assertEqual(len(events), 6)
self.assertEqual(events[0], "User turn: 0.150s")
self.assertEqual(events[1], "DeepgramSTTService#0: TTFB 0.080s")
self.assertEqual(events[2], "OpenAILLMService#0: TTFB 0.250s")
self.assertEqual(events[3], "get_weather: 0.120s")
self.assertEqual(events[4], "CartesiaTTSService#0: text aggregation 0.030s")
self.assertEqual(events[5], "CartesiaTTSService#0: TTFB 0.070s")
def test_empty_breakdown(self):
"""Test that an empty breakdown returns no events."""
breakdown = LatencyBreakdown()
self.assertEqual(breakdown.chronological_events(), [])
def test_user_turn_requires_both_fields(self):
"""Test that user turn is only included when both start_time and secs are set."""
# Only start_time, no duration
breakdown = LatencyBreakdown(user_turn_start_time=100.0)
self.assertEqual(breakdown.chronological_events(), [])
# Only duration, no start_time
breakdown = LatencyBreakdown(user_turn_secs=0.150)
self.assertEqual(breakdown.chronological_events(), [])
def test_ttfb_only(self):
"""Test breakdown with only TTFB metrics."""
breakdown = LatencyBreakdown(
ttfb=[
TTFBBreakdownMetrics(processor="LLM#0", start_time=100.0, duration_secs=0.200),
],
)
events = breakdown.chronological_events()
self.assertEqual(events, ["LLM#0: TTFB 0.200s"])
if __name__ == "__main__":
unittest.main()