Files
pipecat/tests/test_user_bot_latency_observer.py
2026-02-05 14:29:45 -05:00

138 lines
4.0 KiB
Python

import unittest
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.observers.user_bot_latency_observer import UserBotLatencyObserver
from pipecat.processors.filters.identity_filter import IdentityFilter
from pipecat.tests.utils import run_test
class TestUserBotLatencyObserver(unittest.IsolatedAsyncioTestCase):
"""Tests for UserBotLatencyObserver."""
async def test_normal_latency_measurement(self):
"""Test basic latency measurement from user stop to bot start."""
# Create observer
observer = UserBotLatencyObserver()
# Create identity filter (passes all frames through)
processor = IdentityFilter()
# Capture latency events
latencies = []
@observer.event_handler("on_latency_measured")
async def on_latency(obs, latency_seconds):
latencies.append(latency_seconds)
# Define frame sequence
frames_to_send = [
VADUserStoppedSpeakingFrame(),
BotStartedSpeakingFrame(),
]
expected_down_frames = [
VADUserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
]
# Run test
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
observers=[observer],
)
# Verify latency was measured
self.assertEqual(len(latencies), 1)
self.assertGreater(latencies[0], 0)
self.assertLess(latencies[0], 1.0) # Should be very quick
async def test_multiple_latency_measurements(self):
"""Test that multiple user-bot exchanges produce separate latency events."""
# Create observer
observer = UserBotLatencyObserver()
# Create identity filter
processor = IdentityFilter()
# Capture latency events
latencies = []
@observer.event_handler("on_latency_measured")
async def on_latency(obs, latency_seconds):
latencies.append(latency_seconds)
# Define frame sequence with two complete cycles
frames_to_send = [
# First cycle
VADUserStoppedSpeakingFrame(),
BotStartedSpeakingFrame(),
# Second cycle
VADUserStoppedSpeakingFrame(),
BotStartedSpeakingFrame(),
]
expected_down_frames = [
VADUserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
]
# Run test
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
observers=[observer],
)
# Verify two separate latencies were measured
self.assertEqual(len(latencies), 2)
self.assertGreater(latencies[0], 0)
self.assertGreater(latencies[1], 0)
async def test_no_measurement_without_user_stop(self):
"""Test that latency is not measured if bot starts without user stopping first."""
# Create observer
observer = UserBotLatencyObserver()
# Create identity filter
processor = IdentityFilter()
# Capture latency events
latencies = []
@observer.event_handler("on_latency_measured")
async def on_latency(obs, latency_seconds):
latencies.append(latency_seconds)
# Define frame sequence - bot starts without user stop
frames_to_send = [
BotStartedSpeakingFrame(),
]
expected_down_frames = [
BotStartedSpeakingFrame,
]
# Run test
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
observers=[observer],
)
# Verify no latency was measured
self.assertEqual(len(latencies), 0)
if __name__ == "__main__":
unittest.main()