diff --git a/tests/test_user_bot_latency_observer.py b/tests/test_user_bot_latency_observer.py new file mode 100644 index 000000000..1b7325d14 --- /dev/null +++ b/tests/test_user_bot_latency_observer.py @@ -0,0 +1,137 @@ +import unittest + +from pipecat.frames.frames import ( + BotStartedSpeakingFrame, + VADUserStartedSpeakingFrame, + VADUserStoppedSpeakingFrame, +) +from pipecat.observers.user_bot_latency_observer import UserBotLatencyObserver +from pipecat.processors.filters.identity_filter import IdentityFilter +from pipecat.tests.utils import run_test + + +class TestUserBotLatencyObserver(unittest.IsolatedAsyncioTestCase): + """Tests for UserBotLatencyObserver.""" + + async def test_normal_latency_measurement(self): + """Test basic latency measurement from user stop to bot start.""" + # Create observer + observer = UserBotLatencyObserver() + + # Create identity filter (passes all frames through) + processor = IdentityFilter() + + # Capture latency events + latencies = [] + + @observer.event_handler("on_latency_measured") + async def on_latency(obs, latency_seconds): + latencies.append(latency_seconds) + + # Define frame sequence + frames_to_send = [ + VADUserStoppedSpeakingFrame(), + BotStartedSpeakingFrame(), + ] + + expected_down_frames = [ + VADUserStoppedSpeakingFrame, + BotStartedSpeakingFrame, + ] + + # Run test + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + observers=[observer], + ) + + # Verify latency was measured + self.assertEqual(len(latencies), 1) + self.assertGreater(latencies[0], 0) + self.assertLess(latencies[0], 1.0) # Should be very quick + + async def test_multiple_latency_measurements(self): + """Test that multiple user-bot exchanges produce separate latency events.""" + # Create observer + observer = UserBotLatencyObserver() + + # Create identity filter + processor = IdentityFilter() + + # Capture latency events + latencies = [] + + @observer.event_handler("on_latency_measured") + async def on_latency(obs, latency_seconds): + latencies.append(latency_seconds) + + # Define frame sequence with two complete cycles + frames_to_send = [ + # First cycle + VADUserStoppedSpeakingFrame(), + BotStartedSpeakingFrame(), + # Second cycle + VADUserStoppedSpeakingFrame(), + BotStartedSpeakingFrame(), + ] + + expected_down_frames = [ + VADUserStoppedSpeakingFrame, + BotStartedSpeakingFrame, + VADUserStoppedSpeakingFrame, + BotStartedSpeakingFrame, + ] + + # Run test + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + observers=[observer], + ) + + # Verify two separate latencies were measured + self.assertEqual(len(latencies), 2) + self.assertGreater(latencies[0], 0) + self.assertGreater(latencies[1], 0) + + async def test_no_measurement_without_user_stop(self): + """Test that latency is not measured if bot starts without user stopping first.""" + # Create observer + observer = UserBotLatencyObserver() + + # Create identity filter + processor = IdentityFilter() + + # Capture latency events + latencies = [] + + @observer.event_handler("on_latency_measured") + async def on_latency(obs, latency_seconds): + latencies.append(latency_seconds) + + # Define frame sequence - bot starts without user stop + frames_to_send = [ + BotStartedSpeakingFrame(), + ] + + expected_down_frames = [ + BotStartedSpeakingFrame, + ] + + # Run test + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + observers=[observer], + ) + + # Verify no latency was measured + self.assertEqual(len(latencies), 0) + + +if __name__ == "__main__": + unittest.main()