Files
pipecat/tests/test_startup_timing_observer.py
Mark Backman bbbfdfd321 Replace per-processor start_time with start_offset_secs
Use start_offset_secs (offset from StartFrame) on ProcessorStartupTiming
instead of a wall-clock timestamp. Reports keep a single start_time
anchor for dashboard visualization. Remove _mono_to_wall conversion.
2026-03-02 14:07:34 -05:00

338 lines
11 KiB
Python

import asyncio
import unittest
from pipecat.frames.frames import (
BotConnectedFrame,
ClientConnectedFrame,
Frame,
StartFrame,
TextFrame,
)
from pipecat.observers.startup_timing_observer import (
StartupTimingObserver,
StartupTimingReport,
TransportTimingReport,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.tests.utils import run_test
class SlowStartProcessor(FrameProcessor):
"""A processor that sleeps during start to simulate slow initialization."""
def __init__(self, delay: float = 0.1, **kwargs):
super().__init__(**kwargs)
self._delay = delay
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, StartFrame):
await asyncio.sleep(self._delay)
await self.push_frame(frame, direction)
class FastProcessor(FrameProcessor):
"""A processor with no start delay."""
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
await self.push_frame(frame, direction)
class TestStartupTimingObserver(unittest.IsolatedAsyncioTestCase):
"""Tests for StartupTimingObserver."""
async def test_timing_reported(self):
"""Test that startup timing is measured and reported."""
observer = StartupTimingObserver()
processor = SlowStartProcessor(delay=0.1)
reports = []
@observer.event_handler("on_startup_timing_report")
async def on_report(obs, report):
reports.append(report)
frames_to_send = [TextFrame(text="hello")]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=[TextFrame],
observers=[observer],
)
self.assertEqual(len(reports), 1)
report = reports[0]
self.assertGreater(report.total_duration_secs, 0)
self.assertGreater(len(report.processor_timings), 0)
# Find our slow processor in the timings.
slow_timings = [
t for t in report.processor_timings if "SlowStartProcessor" in t.processor_name
]
self.assertEqual(len(slow_timings), 1)
self.assertGreaterEqual(slow_timings[0].duration_secs, 0.05)
async def test_processor_types_filter(self):
"""Test that processor_types filter limits which processors appear."""
observer = StartupTimingObserver(processor_types=(SlowStartProcessor,))
processor = SlowStartProcessor(delay=0.05)
reports = []
@observer.event_handler("on_startup_timing_report")
async def on_report(obs, report):
reports.append(report)
frames_to_send = [TextFrame(text="hello")]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=[TextFrame],
observers=[observer],
)
self.assertEqual(len(reports), 1)
report = reports[0]
# Only SlowStartProcessor should be in the timings.
for t in report.processor_timings:
self.assertIn("SlowStartProcessor", t.processor_name)
async def test_report_emits_once(self):
"""Test that the report is emitted only once even with multiple frames."""
observer = StartupTimingObserver()
processor = FastProcessor()
reports = []
@observer.event_handler("on_startup_timing_report")
async def on_report(obs, report):
reports.append(report)
frames_to_send = [
TextFrame(text="first"),
TextFrame(text="second"),
TextFrame(text="third"),
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=[TextFrame, TextFrame, TextFrame],
observers=[observer],
)
self.assertEqual(len(reports), 1)
async def test_event_handler_receives_report(self):
"""Test that the event handler receives a proper StartupTimingReport."""
observer = StartupTimingObserver()
processor = SlowStartProcessor(delay=0.05)
reports = []
@observer.event_handler("on_startup_timing_report")
async def on_report(obs, report):
reports.append(report)
frames_to_send = [TextFrame(text="hello")]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=[TextFrame],
observers=[observer],
)
self.assertEqual(len(reports), 1)
report = reports[0]
self.assertIsInstance(report, StartupTimingReport)
self.assertIsInstance(report.total_duration_secs, float)
self.assertGreater(report.start_time, 0)
for timing in report.processor_timings:
self.assertIsInstance(timing.processor_name, str)
self.assertIsInstance(timing.duration_secs, float)
self.assertGreaterEqual(timing.start_offset_secs, 0)
async def test_excludes_internal_processors(self):
"""Test that internal pipeline processors are excluded by default."""
observer = StartupTimingObserver()
processor = FastProcessor()
reports = []
@observer.event_handler("on_startup_timing_report")
async def on_report(obs, report):
reports.append(report)
frames_to_send = [TextFrame(text="hello")]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=[TextFrame],
observers=[observer],
)
self.assertEqual(len(reports), 1)
report = reports[0]
# No internal processors (PipelineSource, PipelineSink, Pipeline) in the report.
internal_names = ("Pipeline#", "PipelineTask#")
for t in report.processor_timings:
for prefix in internal_names:
self.assertNotIn(
prefix,
t.processor_name,
f"Internal processor {t.processor_name} should be excluded by default",
)
async def test_transport_timing_client_only(self):
"""Test that ClientConnectedFrame emits on_transport_timing_report."""
observer = StartupTimingObserver()
processor = FastProcessor()
transport_reports = []
@observer.event_handler("on_transport_timing_report")
async def on_transport(obs, report):
transport_reports.append(report)
frames_to_send = [ClientConnectedFrame(), TextFrame(text="hello")]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=[ClientConnectedFrame, TextFrame],
observers=[observer],
)
self.assertEqual(len(transport_reports), 1)
report = transport_reports[0]
self.assertIsInstance(report, TransportTimingReport)
self.assertGreater(report.start_time, 0)
self.assertGreater(report.client_connected_secs, 0)
self.assertIsNone(report.bot_connected_secs)
async def test_transport_timing_only_first_client(self):
"""Test that only the first ClientConnectedFrame triggers the event."""
observer = StartupTimingObserver()
processor = FastProcessor()
transport_reports = []
@observer.event_handler("on_transport_timing_report")
async def on_transport(obs, report):
transport_reports.append(report)
frames_to_send = [
ClientConnectedFrame(),
ClientConnectedFrame(),
TextFrame(text="hello"),
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=[ClientConnectedFrame, ClientConnectedFrame, TextFrame],
observers=[observer],
)
self.assertEqual(len(transport_reports), 1)
async def test_transport_timing_without_start_frame(self):
"""Test that ClientConnectedFrame before StartFrame does not crash."""
observer = StartupTimingObserver()
# Directly call on_push_frame with a ClientConnectedFrame before any
# StartFrame has been seen. This should be a no-op (no crash).
from pipecat.observers.base_observer import FramePushed
processor = FastProcessor()
destination = FastProcessor()
data = FramePushed(
source=processor,
destination=destination,
frame=ClientConnectedFrame(),
direction=FrameDirection.DOWNSTREAM,
timestamp=1000,
)
await observer.on_push_frame(data)
# No event should have been emitted.
self.assertFalse(observer._transport_timing_reported)
async def test_bot_and_client_connected(self):
"""Test that BotConnectedFrame timing is included in the transport report."""
observer = StartupTimingObserver()
processor = FastProcessor()
transport_reports = []
@observer.event_handler("on_transport_timing_report")
async def on_transport(obs, report):
transport_reports.append(report)
frames_to_send = [
BotConnectedFrame(),
ClientConnectedFrame(),
TextFrame(text="hello"),
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=[BotConnectedFrame, ClientConnectedFrame, TextFrame],
observers=[observer],
)
self.assertEqual(len(transport_reports), 1)
report = transport_reports[0]
self.assertGreater(report.client_connected_secs, 0)
self.assertIsNotNone(report.bot_connected_secs)
self.assertGreater(report.bot_connected_secs, 0)
# Client connected should be >= bot connected.
self.assertGreaterEqual(report.client_connected_secs, report.bot_connected_secs)
async def test_bot_connected_only_first(self):
"""Test that only the first BotConnectedFrame is recorded."""
observer = StartupTimingObserver()
processor = FastProcessor()
transport_reports = []
@observer.event_handler("on_transport_timing_report")
async def on_transport(obs, report):
transport_reports.append(report)
frames_to_send = [
BotConnectedFrame(),
BotConnectedFrame(),
ClientConnectedFrame(),
TextFrame(text="hello"),
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=[
BotConnectedFrame,
BotConnectedFrame,
ClientConnectedFrame,
TextFrame,
],
observers=[observer],
)
# Only one transport report, with bot timing from first frame.
self.assertEqual(len(transport_reports), 1)
self.assertIsNotNone(transport_reports[0].bot_connected_secs)
if __name__ == "__main__":
unittest.main()