From c54232bdb40c28c7e28e1600bd3492212ff4b6cc Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Sat, 28 Feb 2026 14:04:21 -0500 Subject: [PATCH 01/11] Add StartupTimingObserver for measuring processor start() times Tracks how long each processor start method takes during pipeline startup by measuring StartFrame arrive/leave deltas. Emits a timing report via the on_startup_timing_report event and auto-logs a summary. Internal pipeline processors are excluded from reports by default. --- .../foundational/29-turn-tracking-observer.py | 12 +- .../observers/startup_timing_observer.py | 232 ++++++++++++++++++ tests/test_startup_timing_observer.py | 186 ++++++++++++++ 3 files changed, 427 insertions(+), 3 deletions(-) create mode 100644 src/pipecat/observers/startup_timing_observer.py create mode 100644 tests/test_startup_timing_observer.py diff --git a/examples/foundational/29-turn-tracking-observer.py b/examples/foundational/29-turn-tracking-observer.py index 321197db2..3e85ddfb8 100644 --- a/examples/foundational/29-turn-tracking-observer.py +++ b/examples/foundational/29-turn-tracking-observer.py @@ -12,6 +12,7 @@ from loguru import logger from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import LLMRunFrame +from pipecat.observers.startup_timing_observer import StartupTimingObserver from pipecat.observers.user_bot_latency_observer import UserBotLatencyObserver from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner @@ -87,8 +88,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ] ) - # Create latency tracking observer latency_observer = UserBotLatencyObserver() + startup_observer = StartupTimingObserver() task = PipelineTask( pipeline, @@ -97,14 +98,19 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): enable_usage_metrics=True, ), idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, - observers=[latency_observer], + observers=[latency_observer, startup_observer], ) - # Log latency measurements using the event handler @latency_observer.event_handler("on_latency_measured") async def on_latency_measured(observer, latency_seconds): logger.info(f"⏱️ User-to-bot latency: {latency_seconds:.3f}s") + @startup_observer.event_handler("on_startup_timing_report") + async def on_startup_timing_report(observer, report): + logger.info(f"Total startup: {report.total_duration_secs:.3f}s") + for timing in report.processor_timings: + logger.info(f" {timing.processor_name}: {timing.duration_secs:.3f}s") + turn_observer = task.turn_tracking_observer if turn_observer: diff --git a/src/pipecat/observers/startup_timing_observer.py b/src/pipecat/observers/startup_timing_observer.py new file mode 100644 index 000000000..0f3ad0b7a --- /dev/null +++ b/src/pipecat/observers/startup_timing_observer.py @@ -0,0 +1,232 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Observer for tracking pipeline startup timing. + +This module provides an observer that measures how long each processor's +``start()`` method takes during pipeline startup. It works by tracking +when a ``StartFrame`` arrives at a processor (``on_process_frame``) versus +when it leaves (``on_push_frame``), giving the exact ``start()`` duration +for each processor in the pipeline. + +Example:: + + observer = StartupTimingObserver() + + @observer.event_handler("on_startup_timing_report") + async def on_report(observer, report): + for t in report.processor_timings: + print(f"{t.processor_name}: {t.duration_secs:.3f}s") + + task = PipelineTask(pipeline, observers=[observer]) +""" + +from dataclasses import dataclass, field +from typing import Dict, List, Optional, Tuple, Type + +from loguru import logger + +from pipecat.frames.frames import StartFrame +from pipecat.observers.base_observer import BaseObserver, FrameProcessed, FramePushed +from pipecat.pipeline.base_pipeline import BasePipeline +from pipecat.pipeline.pipeline import PipelineSink, PipelineSource +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor + +# Internal pipeline types excluded from tracking by default. +_INTERNAL_TYPES = (PipelineSink, PipelineSource, BasePipeline) + + +@dataclass +class ProcessorStartupTiming: + """Startup timing for a single processor. + + Parameters: + processor_name: The name of the processor. + duration_secs: How long the processor's start() took, in seconds. + """ + + processor_name: str + duration_secs: float + + +@dataclass +class StartupTimingReport: + """Report of startup timings for all measured processors. + + Parameters: + total_duration_secs: Total wall-clock time from first to last processor start. + processor_timings: Per-processor timing data, in pipeline order. + """ + + total_duration_secs: float + processor_timings: List[ProcessorStartupTiming] = field(default_factory=list) + + +class StartupTimingObserver(BaseObserver): + """Observer that measures processor startup times during pipeline initialization. + + Tracks how long each processor's ``start()`` method takes by measuring the + time between when a ``StartFrame`` arrives at a processor and when it is + pushed downstream. This captures WebSocket connections, API authentication, + model loading, and other initialization work. + + By default, internal pipeline processors (``PipelineSource``, ``PipelineSink``, + ``Pipeline``) are excluded from the report. Pass ``processor_types`` to + measure only specific types. + + Event handlers available: + + - on_startup_timing_report: Called once after startup completes with the full + timing report. + + Example:: + + observer = StartupTimingObserver( + processor_types=(STTService, TTSService) + ) + + @observer.event_handler("on_startup_timing_report") + async def on_report(observer, report): + for t in report.processor_timings: + logger.info(f"{t.processor_name}: {t.duration_secs:.3f}s") + + task = PipelineTask(pipeline, observers=[observer]) + + Args: + processor_types: Optional tuple of processor types to measure. If None, + all non-internal processors are measured. + """ + + def __init__( + self, + *, + processor_types: Optional[Tuple[Type[FrameProcessor], ...]] = None, + **kwargs, + ): + """Initialize the startup timing observer. + + Args: + processor_types: Optional tuple of processor types to measure. + If None, all non-internal processors are measured. + **kwargs: Additional arguments passed to parent class. + """ + super().__init__(**kwargs) + self._processor_types = processor_types + + # Map processor ID -> (processor, arrival_timestamp_ns) + self._arrivals: Dict[int, Tuple[FrameProcessor, int]] = {} + + # Collected timings in pipeline order. + self._timings: List[ProcessorStartupTiming] = [] + + # Lock onto the first StartFrame we see (by frame ID). + self._start_frame_id: Optional[str] = None + + # Whether we've already emitted the report. + self._reported = False + + self._register_event_handler("on_startup_timing_report") + + def _should_track(self, processor: FrameProcessor) -> bool: + """Check if a processor should be tracked for timing. + + Args: + processor: The processor to check. + + Returns: + True if the processor matches the filter or no filter is set. + """ + if self._processor_types is not None: + return isinstance(processor, self._processor_types) + # Default: exclude internal pipeline plumbing. + return not isinstance(processor, _INTERNAL_TYPES) + + async def on_process_frame(self, data: FrameProcessed): + """Record when a StartFrame arrives at a processor. + + When a ``StartFrame`` reaches a ``PipelineSink``, startup is complete + (the frame has traversed the entire pipeline) and the report is emitted. + + Args: + data: The frame processing event data. + """ + if self._reported: + return + + if not isinstance(data.frame, StartFrame): + return + + if data.direction != FrameDirection.DOWNSTREAM: + return + + # Lock onto the first StartFrame. + if self._start_frame_id is None: + self._start_frame_id = data.frame.id + elif data.frame.id != self._start_frame_id: + return + + # When the StartFrame reaches a PipelineSink, all processors have + # completed start(). PipelineSinks use direct mode so the outermost + # sink fires last within the same synchronous call chain. + if isinstance(data.processor, PipelineSink): + if self._timings: + await self._emit_report() + return + + if self._should_track(data.processor): + self._arrivals[data.processor.id] = (data.processor, data.timestamp) + + async def on_push_frame(self, data: FramePushed): + """Record when a StartFrame leaves a processor and compute the delta. + + Args: + data: The frame push event data. + """ + if self._reported: + return + + if not isinstance(data.frame, StartFrame): + return + + if data.direction != FrameDirection.DOWNSTREAM: + return + + if self._start_frame_id is not None and data.frame.id != self._start_frame_id: + return + + arrival = self._arrivals.pop(data.source.id, None) + if arrival is None: + return + + processor, arrival_ts = arrival + duration_ns = data.timestamp - arrival_ts + duration_secs = duration_ns / 1e9 + + self._timings.append( + ProcessorStartupTiming( + processor_name=processor.name, + duration_secs=duration_secs, + ) + ) + + async def _emit_report(self): + """Build and emit the startup timing report.""" + if self._reported: + return + self._reported = True + + total = sum(t.duration_secs for t in self._timings) + + report = StartupTimingReport( + total_duration_secs=total, + processor_timings=self._timings, + ) + + logger.debug(f"Pipeline startup completed in {total:.3f}s") + for t in self._timings: + logger.debug(f" {t.processor_name}: {t.duration_secs:.3f}s") + + await self._call_event_handler("on_startup_timing_report", report) diff --git a/tests/test_startup_timing_observer.py b/tests/test_startup_timing_observer.py new file mode 100644 index 000000000..e3cd7c2b7 --- /dev/null +++ b/tests/test_startup_timing_observer.py @@ -0,0 +1,186 @@ +import asyncio +import unittest + +from pipecat.frames.frames import Frame, StartFrame, TextFrame +from pipecat.observers.startup_timing_observer import ( + StartupTimingObserver, + StartupTimingReport, +) +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.tests.utils import run_test + + +class SlowStartProcessor(FrameProcessor): + """A processor that sleeps during start to simulate slow initialization.""" + + def __init__(self, delay: float = 0.1, **kwargs): + super().__init__(**kwargs) + self._delay = delay + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, StartFrame): + await asyncio.sleep(self._delay) + await self.push_frame(frame, direction) + + +class FastProcessor(FrameProcessor): + """A processor with no start delay.""" + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + await self.push_frame(frame, direction) + + +class TestStartupTimingObserver(unittest.IsolatedAsyncioTestCase): + """Tests for StartupTimingObserver.""" + + async def test_timing_reported(self): + """Test that startup timing is measured and reported.""" + observer = StartupTimingObserver() + processor = SlowStartProcessor(delay=0.1) + + reports = [] + + @observer.event_handler("on_startup_timing_report") + async def on_report(obs, report): + reports.append(report) + + frames_to_send = [TextFrame(text="hello")] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=[TextFrame], + observers=[observer], + ) + + self.assertEqual(len(reports), 1) + report = reports[0] + self.assertGreater(report.total_duration_secs, 0) + self.assertGreater(len(report.processor_timings), 0) + + # Find our slow processor in the timings. + slow_timings = [ + t for t in report.processor_timings if "SlowStartProcessor" in t.processor_name + ] + self.assertEqual(len(slow_timings), 1) + self.assertGreaterEqual(slow_timings[0].duration_secs, 0.05) + + async def test_processor_types_filter(self): + """Test that processor_types filter limits which processors appear.""" + observer = StartupTimingObserver(processor_types=(SlowStartProcessor,)) + processor = SlowStartProcessor(delay=0.05) + + reports = [] + + @observer.event_handler("on_startup_timing_report") + async def on_report(obs, report): + reports.append(report) + + frames_to_send = [TextFrame(text="hello")] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=[TextFrame], + observers=[observer], + ) + + self.assertEqual(len(reports), 1) + report = reports[0] + + # Only SlowStartProcessor should be in the timings. + for t in report.processor_timings: + self.assertIn("SlowStartProcessor", t.processor_name) + + async def test_report_emits_once(self): + """Test that the report is emitted only once even with multiple frames.""" + observer = StartupTimingObserver() + processor = FastProcessor() + + reports = [] + + @observer.event_handler("on_startup_timing_report") + async def on_report(obs, report): + reports.append(report) + + frames_to_send = [ + TextFrame(text="first"), + TextFrame(text="second"), + TextFrame(text="third"), + ] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=[TextFrame, TextFrame, TextFrame], + observers=[observer], + ) + + self.assertEqual(len(reports), 1) + + async def test_event_handler_receives_report(self): + """Test that the event handler receives a proper StartupTimingReport.""" + observer = StartupTimingObserver() + processor = SlowStartProcessor(delay=0.05) + + reports = [] + + @observer.event_handler("on_startup_timing_report") + async def on_report(obs, report): + reports.append(report) + + frames_to_send = [TextFrame(text="hello")] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=[TextFrame], + observers=[observer], + ) + + self.assertEqual(len(reports), 1) + report = reports[0] + self.assertIsInstance(report, StartupTimingReport) + self.assertIsInstance(report.total_duration_secs, float) + for timing in report.processor_timings: + self.assertIsInstance(timing.processor_name, str) + self.assertIsInstance(timing.duration_secs, float) + + async def test_excludes_internal_processors(self): + """Test that internal pipeline processors are excluded by default.""" + observer = StartupTimingObserver() + processor = FastProcessor() + + reports = [] + + @observer.event_handler("on_startup_timing_report") + async def on_report(obs, report): + reports.append(report) + + frames_to_send = [TextFrame(text="hello")] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=[TextFrame], + observers=[observer], + ) + + self.assertEqual(len(reports), 1) + report = reports[0] + + # No internal processors (PipelineSource, PipelineSink, Pipeline) in the report. + internal_names = ("Pipeline#", "PipelineTask#") + for t in report.processor_timings: + for prefix in internal_names: + self.assertNotIn( + prefix, + t.processor_name, + f"Internal processor {t.processor_name} should be excluded by default", + ) + + +if __name__ == "__main__": + unittest.main() From 58aa8e1ba56a4817fcccffd234a8533257ba8cdf Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Sat, 28 Feb 2026 14:05:25 -0500 Subject: [PATCH 02/11] Add changelog for #3881 --- changelog/3881.added.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog/3881.added.md diff --git a/changelog/3881.added.md b/changelog/3881.added.md new file mode 100644 index 000000000..694e052ce --- /dev/null +++ b/changelog/3881.added.md @@ -0,0 +1 @@ +- Added `StartupTimingObserver` for measuring how long each processor's `start()` method takes during pipeline startup. Useful for diagnosing cold start slowness and identifying initialization bottlenecks. From 08360668984a9ae158bae566d390c2a67d17b5d5 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Sun, 1 Mar 2026 08:45:59 -0500 Subject: [PATCH 03/11] Add ClientConnectedFrame and transport readiness timing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce ClientConnectedFrame (SystemFrame) pushed by all transports when a client connects. StartupTimingObserver uses this to measure transport readiness — the time from StartFrame to first client connection — via a new on_transport_readiness_measured event. --- .../foundational/29-turn-tracking-observer.py | 4 + src/pipecat/frames/frames.py | 11 +++ .../observers/startup_timing_observer.py | 80 ++++++++++++++----- src/pipecat/transports/daily/transport.py | 3 + src/pipecat/transports/heygen/transport.py | 3 + src/pipecat/transports/livekit/transport.py | 3 + .../transports/smallwebrtc/transport.py | 3 + src/pipecat/transports/tavus/transport.py | 3 + src/pipecat/transports/websocket/fastapi.py | 2 + src/pipecat/transports/websocket/server.py | 4 +- tests/test_startup_timing_observer.py | 76 +++++++++++++++++- 11 files changed, 172 insertions(+), 20 deletions(-) diff --git a/examples/foundational/29-turn-tracking-observer.py b/examples/foundational/29-turn-tracking-observer.py index 3e85ddfb8..ad0b448e9 100644 --- a/examples/foundational/29-turn-tracking-observer.py +++ b/examples/foundational/29-turn-tracking-observer.py @@ -111,6 +111,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): for timing in report.processor_timings: logger.info(f" {timing.processor_name}: {timing.duration_secs:.3f}s") + @startup_observer.event_handler("on_transport_readiness_measured") + async def on_transport_readiness_measured(observer, report): + logger.info(f"Transport readiness: {report.readiness_secs:.3f}s") + turn_observer = task.turn_tracking_observer if turn_observer: diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 126f3c001..b5e368c53 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -1910,6 +1910,17 @@ class StopFrame(ControlFrame, UninterruptibleFrame): pass +@dataclass +class ClientConnectedFrame(SystemFrame): + """Frame indicating that a client has connected to the transport. + + Pushed downstream by the input transport when a client (participant) + connects. Used by observers to measure transport readiness timing. + """ + + pass + + @dataclass class OutputTransportReadyFrame(ControlFrame): """Frame indicating that the output transport is ready. diff --git a/src/pipecat/observers/startup_timing_observer.py b/src/pipecat/observers/startup_timing_observer.py index 0f3ad0b7a..d6b1c8fa9 100644 --- a/src/pipecat/observers/startup_timing_observer.py +++ b/src/pipecat/observers/startup_timing_observer.py @@ -12,6 +12,10 @@ when a ``StartFrame`` arrives at a processor (``on_process_frame``) versus when it leaves (``on_push_frame``), giving the exact ``start()`` duration for each processor in the pipeline. +It also measures transport readiness — the time from ``StartFrame`` to the +first ``ClientConnectedFrame`` — via a separate ``on_transport_readiness_measured`` +event. + Example:: observer = StartupTimingObserver() @@ -21,6 +25,10 @@ Example:: for t in report.processor_timings: print(f"{t.processor_name}: {t.duration_secs:.3f}s") + @observer.event_handler("on_transport_readiness_measured") + async def on_readiness(observer, report): + print(f"Transport ready in {report.readiness_secs:.3f}s") + task = PipelineTask(pipeline, observers=[observer]) """ @@ -29,11 +37,11 @@ from typing import Dict, List, Optional, Tuple, Type from loguru import logger -from pipecat.frames.frames import StartFrame +from pipecat.frames.frames import ClientConnectedFrame, StartFrame from pipecat.observers.base_observer import BaseObserver, FrameProcessed, FramePushed from pipecat.pipeline.base_pipeline import BasePipeline from pipecat.pipeline.pipeline import PipelineSink, PipelineSource -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.processors.frame_processor import FrameProcessor # Internal pipeline types excluded from tracking by default. _INTERNAL_TYPES = (PipelineSink, PipelineSource, BasePipeline) @@ -65,6 +73,17 @@ class StartupTimingReport: processor_timings: List[ProcessorStartupTiming] = field(default_factory=list) +@dataclass +class TransportReadinessReport: + """Time from pipeline start to first client connection. + + Parameters: + readiness_secs: Seconds from StartFrame to first ClientConnectedFrame. + """ + + readiness_secs: float + + class StartupTimingObserver(BaseObserver): """Observer that measures processor startup times during pipeline initialization. @@ -73,6 +92,10 @@ class StartupTimingObserver(BaseObserver): pushed downstream. This captures WebSocket connections, API authentication, model loading, and other initialization work. + Also measures transport readiness — the time from ``StartFrame`` to the + first ``ClientConnectedFrame`` — indicating how long it takes for a client + to connect after the pipeline starts. + By default, internal pipeline processors (``PipelineSource``, ``PipelineSink``, ``Pipeline``) are excluded from the report. Pass ``processor_types`` to measure only specific types. @@ -81,6 +104,8 @@ class StartupTimingObserver(BaseObserver): - on_startup_timing_report: Called once after startup completes with the full timing report. + - on_transport_readiness_measured: Called once when the first client connects with the + transport readiness timing. Example:: @@ -93,6 +118,10 @@ class StartupTimingObserver(BaseObserver): for t in report.processor_timings: logger.info(f"{t.processor_name}: {t.duration_secs:.3f}s") + @observer.event_handler("on_transport_readiness_measured") + async def on_readiness(observer, report): + logger.info(f"Transport ready in {report.readiness_secs:.3f}s") + task = PipelineTask(pipeline, observers=[observer]) Args: @@ -125,10 +154,17 @@ class StartupTimingObserver(BaseObserver): # Lock onto the first StartFrame we see (by frame ID). self._start_frame_id: Optional[str] = None - # Whether we've already emitted the report. - self._reported = False + # Whether we've already emitted the startup timing report. + self._startup_timing_reported = False + + # Whether we've already measured transport readiness. + self._transport_readiness_measured = False + + # Timestamp (ns) when we first see a StartFrame arrive at a processor. + self._start_frame_arrival_ns: Optional[int] = None self._register_event_handler("on_startup_timing_report") + self._register_event_handler("on_transport_readiness_measured") def _should_track(self, processor: FrameProcessor) -> bool: """Check if a processor should be tracked for timing. @@ -153,18 +189,16 @@ class StartupTimingObserver(BaseObserver): Args: data: The frame processing event data. """ - if self._reported: + if self._startup_timing_reported: return if not isinstance(data.frame, StartFrame): return - if data.direction != FrameDirection.DOWNSTREAM: - return - # Lock onto the first StartFrame. if self._start_frame_id is None: self._start_frame_id = data.frame.id + self._start_frame_arrival_ns = data.timestamp elif data.frame.id != self._start_frame_id: return @@ -182,18 +216,21 @@ class StartupTimingObserver(BaseObserver): async def on_push_frame(self, data: FramePushed): """Record when a StartFrame leaves a processor and compute the delta. + Also handles ``ClientConnectedFrame`` to measure transport readiness. + Args: data: The frame push event data. """ - if self._reported: + if isinstance(data.frame, ClientConnectedFrame): + await self._handle_client_connected(data) + return + + if self._startup_timing_reported: return if not isinstance(data.frame, StartFrame): return - if data.direction != FrameDirection.DOWNSTREAM: - return - if self._start_frame_id is not None and data.frame.id != self._start_frame_id: return @@ -212,11 +249,22 @@ class StartupTimingObserver(BaseObserver): ) ) + async def _handle_client_connected(self, data: FramePushed): + """Measure transport readiness on first client connection.""" + if self._transport_readiness_measured or self._start_frame_arrival_ns is None: + return + + self._transport_readiness_measured = True + delta_ns = data.timestamp - self._start_frame_arrival_ns + readiness_secs = delta_ns / 1e9 + report = TransportReadinessReport(readiness_secs=readiness_secs) + await self._call_event_handler("on_transport_readiness_measured", report) + async def _emit_report(self): """Build and emit the startup timing report.""" - if self._reported: + if self._startup_timing_reported: return - self._reported = True + self._startup_timing_reported = True total = sum(t.duration_secs for t in self._timings) @@ -225,8 +273,4 @@ class StartupTimingObserver(BaseObserver): processor_timings=self._timings, ) - logger.debug(f"Pipeline startup completed in {total:.3f}s") - for t in self._timings: - logger.debug(f" {t.processor_name}: {t.duration_secs:.3f}s") - await self._call_event_handler("on_startup_timing_report", report) diff --git a/src/pipecat/transports/daily/transport.py b/src/pipecat/transports/daily/transport.py index 9575fd51b..cb24b23fa 100644 --- a/src/pipecat/transports/daily/transport.py +++ b/src/pipecat/transports/daily/transport.py @@ -25,6 +25,7 @@ from pydantic import BaseModel from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADParams from pipecat.frames.frames import ( CancelFrame, + ClientConnectedFrame, DataFrame, EndFrame, Frame, @@ -2716,6 +2717,8 @@ class DailyTransport(BaseTransport): await self._call_event_handler("on_participant_joined", participant) # Also call on_client_connected for compatibility with other transports await self._call_event_handler("on_client_connected", participant) + if self._input: + await self._input.push_frame(ClientConnectedFrame()) async def _on_participant_left(self, participant, reason): """Handle participant left events.""" diff --git a/src/pipecat/transports/heygen/transport.py b/src/pipecat/transports/heygen/transport.py index dbeded3e5..77ccda09f 100644 --- a/src/pipecat/transports/heygen/transport.py +++ b/src/pipecat/transports/heygen/transport.py @@ -26,6 +26,7 @@ from pipecat.frames.frames import ( BotStartedSpeakingFrame, BotStoppedSpeakingFrame, CancelFrame, + ClientConnectedFrame, EndFrame, Frame, InputAudioRawFrame, @@ -387,6 +388,8 @@ class HeyGenTransport(BaseTransport): async def _on_client_connected(self, participant: Any): """Handle client connected events.""" await self._call_event_handler("on_client_connected", participant) + if self._input: + await self._input.push_frame(ClientConnectedFrame()) async def _on_client_disconnected(self, participant: Any): """Handle client disconnected events.""" diff --git a/src/pipecat/transports/livekit/transport.py b/src/pipecat/transports/livekit/transport.py index 1902e7cd3..e4435016c 100644 --- a/src/pipecat/transports/livekit/transport.py +++ b/src/pipecat/transports/livekit/transport.py @@ -24,6 +24,7 @@ from pipecat.audio.vad.vad_analyzer import VADAnalyzer from pipecat.frames.frames import ( AudioRawFrame, CancelFrame, + ClientConnectedFrame, EndFrame, ImageRawFrame, OutputAudioRawFrame, @@ -1143,6 +1144,8 @@ class LiveKitTransport(BaseTransport): async def _on_participant_connected(self, participant_id: str): """Handle participant connected events.""" await self._call_event_handler("on_participant_connected", participant_id) + if self._input: + await self._input.push_frame(ClientConnectedFrame()) async def _on_participant_disconnected(self, participant_id: str): """Handle participant disconnected events.""" diff --git a/src/pipecat/transports/smallwebrtc/transport.py b/src/pipecat/transports/smallwebrtc/transport.py index dc91588a3..36f883278 100644 --- a/src/pipecat/transports/smallwebrtc/transport.py +++ b/src/pipecat/transports/smallwebrtc/transport.py @@ -23,6 +23,7 @@ from pydantic import BaseModel from pipecat.frames.frames import ( CancelFrame, + ClientConnectedFrame, EndFrame, Frame, InputAudioRawFrame, @@ -964,6 +965,8 @@ class SmallWebRTCTransport(BaseTransport): async def _on_client_connected(self, webrtc_connection): """Handle client connection events.""" await self._call_event_handler("on_client_connected", webrtc_connection) + if self._input: + await self._input.push_frame(ClientConnectedFrame()) async def _on_client_disconnected(self, webrtc_connection): """Handle client disconnection events.""" diff --git a/src/pipecat/transports/tavus/transport.py b/src/pipecat/transports/tavus/transport.py index dd63cb790..114f33ca0 100644 --- a/src/pipecat/transports/tavus/transport.py +++ b/src/pipecat/transports/tavus/transport.py @@ -22,6 +22,7 @@ from pydantic import BaseModel from pipecat.frames.frames import ( CancelFrame, + ClientConnectedFrame, EndFrame, Frame, InputAudioRawFrame, @@ -786,6 +787,8 @@ class TavusTransport(BaseTransport): async def _on_client_connected(self, participant: Any): """Handle client connected events.""" await self._call_event_handler("on_client_connected", participant) + if self._input: + await self._input.push_frame(ClientConnectedFrame()) async def _on_client_disconnected(self, participant: Any): """Handle client disconnected events.""" diff --git a/src/pipecat/transports/websocket/fastapi.py b/src/pipecat/transports/websocket/fastapi.py index f52123e52..0fde2b9ae 100644 --- a/src/pipecat/transports/websocket/fastapi.py +++ b/src/pipecat/transports/websocket/fastapi.py @@ -23,6 +23,7 @@ from pydantic import BaseModel from pipecat.frames.frames import ( CancelFrame, + ClientConnectedFrame, EndFrame, Frame, InputAudioRawFrame, @@ -260,6 +261,7 @@ class FastAPIWebsocketInputTransport(BaseInputTransport): if not self._monitor_websocket_task and self._params.session_timeout: self._monitor_websocket_task = self.create_task(self._monitor_websocket()) await self._client.trigger_client_connected() + await self.push_frame(ClientConnectedFrame()) if not self._receive_task: self._receive_task = self.create_task(self._receive_messages()) await self.set_transport_ready(frame) diff --git a/src/pipecat/transports/websocket/server.py b/src/pipecat/transports/websocket/server.py index e5f628fa4..fa3645d37 100644 --- a/src/pipecat/transports/websocket/server.py +++ b/src/pipecat/transports/websocket/server.py @@ -22,11 +22,11 @@ from pydantic import BaseModel from pipecat.frames.frames import ( CancelFrame, + ClientConnectedFrame, EndFrame, Frame, InputAudioRawFrame, InputTransportMessageFrame, - InputTransportMessageUrgentFrame, InterruptionFrame, OutputAudioRawFrame, OutputTransportMessageFrame, @@ -504,6 +504,8 @@ class WebsocketServerTransport(BaseTransport): if self._output: await self._output.set_client_connection(websocket) await self._call_event_handler("on_client_connected", websocket) + if self._input: + await self._input.push_frame(ClientConnectedFrame()) else: logger.error("A WebsocketServerTransport output is missing in the pipeline") diff --git a/tests/test_startup_timing_observer.py b/tests/test_startup_timing_observer.py index e3cd7c2b7..efabf5bc7 100644 --- a/tests/test_startup_timing_observer.py +++ b/tests/test_startup_timing_observer.py @@ -1,10 +1,11 @@ import asyncio import unittest -from pipecat.frames.frames import Frame, StartFrame, TextFrame +from pipecat.frames.frames import ClientConnectedFrame, Frame, StartFrame, TextFrame from pipecat.observers.startup_timing_observer import ( StartupTimingObserver, StartupTimingReport, + TransportReadinessReport, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.tests.utils import run_test @@ -181,6 +182,79 @@ class TestStartupTimingObserver(unittest.IsolatedAsyncioTestCase): f"Internal processor {t.processor_name} should be excluded by default", ) + async def test_transport_readiness_measured(self): + """Test that ClientConnectedFrame after startup emits on_transport_readiness_measured.""" + observer = StartupTimingObserver() + processor = FastProcessor() + + readiness_reports = [] + + @observer.event_handler("on_transport_readiness_measured") + async def on_readiness(obs, report): + readiness_reports.append(report) + + frames_to_send = [ClientConnectedFrame(), TextFrame(text="hello")] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=[ClientConnectedFrame, TextFrame], + observers=[observer], + ) + + self.assertEqual(len(readiness_reports), 1) + report = readiness_reports[0] + self.assertIsInstance(report, TransportReadinessReport) + self.assertGreater(report.readiness_secs, 0) + + async def test_transport_readiness_only_first(self): + """Test that only the first ClientConnectedFrame triggers the event.""" + observer = StartupTimingObserver() + processor = FastProcessor() + + readiness_reports = [] + + @observer.event_handler("on_transport_readiness_measured") + async def on_readiness(obs, report): + readiness_reports.append(report) + + frames_to_send = [ + ClientConnectedFrame(), + ClientConnectedFrame(), + TextFrame(text="hello"), + ] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=[ClientConnectedFrame, ClientConnectedFrame, TextFrame], + observers=[observer], + ) + + self.assertEqual(len(readiness_reports), 1) + + async def test_transport_readiness_without_start_frame(self): + """Test that ClientConnectedFrame before StartFrame does not crash.""" + observer = StartupTimingObserver() + + # Directly call on_push_frame with a ClientConnectedFrame before any + # StartFrame has been seen. This should be a no-op (no crash). + from pipecat.observers.base_observer import FramePushed + + processor = FastProcessor() + destination = FastProcessor() + data = FramePushed( + source=processor, + destination=destination, + frame=ClientConnectedFrame(), + direction=FrameDirection.DOWNSTREAM, + timestamp=1000, + ) + await observer.on_push_frame(data) + + # No event should have been emitted. + self.assertFalse(observer._transport_readiness_measured) + if __name__ == "__main__": unittest.main() From de87894778e541b0f174b0be639e50e0825c2887 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Sun, 1 Mar 2026 08:47:40 -0500 Subject: [PATCH 04/11] Update changelog for #3881 --- changelog/3881.added.2.md | 1 + changelog/3881.added.md | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 changelog/3881.added.2.md diff --git a/changelog/3881.added.2.md b/changelog/3881.added.2.md new file mode 100644 index 000000000..a5bda94c1 --- /dev/null +++ b/changelog/3881.added.2.md @@ -0,0 +1 @@ +- Added `ClientConnectedFrame`, a new `SystemFrame` pushed by all transports (Daily, LiveKit, FastAPI WebSocket, WebSocket Server, SmallWebRTC, HeyGen, Tavus) when a client connects. Enables observers to track transport readiness timing. diff --git a/changelog/3881.added.md b/changelog/3881.added.md index 694e052ce..cbf6d0293 100644 --- a/changelog/3881.added.md +++ b/changelog/3881.added.md @@ -1 +1 @@ -- Added `StartupTimingObserver` for measuring how long each processor's `start()` method takes during pipeline startup. Useful for diagnosing cold start slowness and identifying initialization bottlenecks. +- Added `StartupTimingObserver` for measuring how long each processor's `start()` method takes during pipeline startup. Also measures transport readiness — the time from `StartFrame` to first client connection — via the `on_transport_readiness_measured` event. Useful for diagnosing cold start slowness and identifying initialization bottlenecks. From 68e8732e72ba5f20ec3b03129f97d4fa2271b190 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Mon, 2 Mar 2026 10:41:05 -0500 Subject: [PATCH 05/11] Add BotConnectedFrame and on_transport_timing_report event Add BotConnectedFrame (SystemFrame) pushed by SFU transports (Daily, LiveKit, HeyGen, Tavus) when the bot joins the room. Replace the on_transport_readiness_measured event with on_transport_timing_report which includes both bot_connected_secs and client_connected_secs. --- changelog/3881.added.3.md | 1 + .../foundational/29-turn-tracking-observer.py | 8 +- src/pipecat/frames/frames.py | 12 ++ .../observers/startup_timing_observer.py | 93 +++++++++----- src/pipecat/services/heygen/client.py | 7 +- src/pipecat/services/heygen/video.py | 5 + src/pipecat/services/tavus/video.py | 5 + src/pipecat/transports/daily/transport.py | 3 + src/pipecat/transports/heygen/transport.py | 9 ++ src/pipecat/transports/livekit/transport.py | 3 + src/pipecat/transports/tavus/transport.py | 12 ++ tests/test_startup_timing_observer.py | 114 +++++++++++++++--- 12 files changed, 215 insertions(+), 57 deletions(-) create mode 100644 changelog/3881.added.3.md diff --git a/changelog/3881.added.3.md b/changelog/3881.added.3.md new file mode 100644 index 000000000..cad26e876 --- /dev/null +++ b/changelog/3881.added.3.md @@ -0,0 +1 @@ +Added `BotConnectedFrame` for SFU transports and `on_transport_timing_report` event to `StartupTimingObserver` with bot and client connection timing. diff --git a/examples/foundational/29-turn-tracking-observer.py b/examples/foundational/29-turn-tracking-observer.py index ad0b448e9..4af28f1ed 100644 --- a/examples/foundational/29-turn-tracking-observer.py +++ b/examples/foundational/29-turn-tracking-observer.py @@ -111,9 +111,11 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): for timing in report.processor_timings: logger.info(f" {timing.processor_name}: {timing.duration_secs:.3f}s") - @startup_observer.event_handler("on_transport_readiness_measured") - async def on_transport_readiness_measured(observer, report): - logger.info(f"Transport readiness: {report.readiness_secs:.3f}s") + @startup_observer.event_handler("on_transport_timing_report") + async def on_transport_timing_report(observer, report): + if report.bot_connected_secs is not None: + logger.info(f"Bot connected: {report.bot_connected_secs:.3f}s") + logger.info(f"Client connected: {report.client_connected_secs:.3f}s") turn_observer = task.turn_tracking_observer if turn_observer: diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index b5e368c53..86778e564 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -1910,6 +1910,18 @@ class StopFrame(ControlFrame, UninterruptibleFrame): pass +@dataclass +class BotConnectedFrame(SystemFrame): + """Frame indicating the bot has connected to the transport service. + + Pushed downstream by SFU transports (Daily, LiveKit, HeyGen, Tavus) + when the bot successfully joins the room. Non-SFU transports do not + emit this frame. + """ + + pass + + @dataclass class ClientConnectedFrame(SystemFrame): """Frame indicating that a client has connected to the transport. diff --git a/src/pipecat/observers/startup_timing_observer.py b/src/pipecat/observers/startup_timing_observer.py index d6b1c8fa9..555a10cb0 100644 --- a/src/pipecat/observers/startup_timing_observer.py +++ b/src/pipecat/observers/startup_timing_observer.py @@ -12,9 +12,9 @@ when a ``StartFrame`` arrives at a processor (``on_process_frame``) versus when it leaves (``on_push_frame``), giving the exact ``start()`` duration for each processor in the pipeline. -It also measures transport readiness — the time from ``StartFrame`` to the -first ``ClientConnectedFrame`` — via a separate ``on_transport_readiness_measured`` -event. +It also measures transport timing — the time from ``StartFrame`` to the +first ``BotConnectedFrame`` (SFU transports only) and ``ClientConnectedFrame`` +— via a separate ``on_transport_timing_report`` event. Example:: @@ -25,9 +25,11 @@ Example:: for t in report.processor_timings: print(f"{t.processor_name}: {t.duration_secs:.3f}s") - @observer.event_handler("on_transport_readiness_measured") - async def on_readiness(observer, report): - print(f"Transport ready in {report.readiness_secs:.3f}s") + @observer.event_handler("on_transport_timing_report") + async def on_transport(observer, report): + if report.bot_connected_secs is not None: + print(f"Bot connected in {report.bot_connected_secs:.3f}s") + print(f"Client connected in {report.client_connected_secs:.3f}s") task = PipelineTask(pipeline, observers=[observer]) """ @@ -35,9 +37,7 @@ Example:: from dataclasses import dataclass, field from typing import Dict, List, Optional, Tuple, Type -from loguru import logger - -from pipecat.frames.frames import ClientConnectedFrame, StartFrame +from pipecat.frames.frames import BotConnectedFrame, ClientConnectedFrame, StartFrame from pipecat.observers.base_observer import BaseObserver, FrameProcessed, FramePushed from pipecat.pipeline.base_pipeline import BasePipeline from pipecat.pipeline.pipeline import PipelineSink, PipelineSource @@ -74,14 +74,17 @@ class StartupTimingReport: @dataclass -class TransportReadinessReport: - """Time from pipeline start to first client connection. +class TransportTimingReport: + """Time from pipeline start to transport connection milestones. Parameters: - readiness_secs: Seconds from StartFrame to first ClientConnectedFrame. + bot_connected_secs: Seconds from StartFrame to first BotConnectedFrame + (only set for SFU transports). + client_connected_secs: Seconds from StartFrame to first ClientConnectedFrame. """ - readiness_secs: float + bot_connected_secs: Optional[float] = None + client_connected_secs: Optional[float] = None class StartupTimingObserver(BaseObserver): @@ -92,9 +95,13 @@ class StartupTimingObserver(BaseObserver): pushed downstream. This captures WebSocket connections, API authentication, model loading, and other initialization work. - Also measures transport readiness — the time from ``StartFrame`` to the - first ``ClientConnectedFrame`` — indicating how long it takes for a client - to connect after the pipeline starts. + Also measures transport timing, the time from ``StartFrame`` to connection + milestones: + + - ``bot_connected_secs``: When the bot joins the transport room + (SFU transports only, triggered by ``BotConnectedFrame``). + - ``client_connected_secs``: When a remote participant connects + (triggered by ``ClientConnectedFrame``). By default, internal pipeline processors (``PipelineSource``, ``PipelineSink``, ``Pipeline``) are excluded from the report. Pass ``processor_types`` to @@ -104,8 +111,9 @@ class StartupTimingObserver(BaseObserver): - on_startup_timing_report: Called once after startup completes with the full timing report. - - on_transport_readiness_measured: Called once when the first client connects with the - transport readiness timing. + - on_transport_timing_report: Called once when the first client connects with a + TransportTimingReport containing client_connected_secs and bot_connected_secs + (if available). Example:: @@ -118,9 +126,11 @@ class StartupTimingObserver(BaseObserver): for t in report.processor_timings: logger.info(f"{t.processor_name}: {t.duration_secs:.3f}s") - @observer.event_handler("on_transport_readiness_measured") - async def on_readiness(observer, report): - logger.info(f"Transport ready in {report.readiness_secs:.3f}s") + @observer.event_handler("on_transport_timing_report") + async def on_transport(observer, report): + if report.bot_connected_secs is not None: + logger.info(f"Bot connected in {report.bot_connected_secs:.3f}s") + logger.info(f"Client connected in {report.client_connected_secs:.3f}s") task = PipelineTask(pipeline, observers=[observer]) @@ -157,14 +167,17 @@ class StartupTimingObserver(BaseObserver): # Whether we've already emitted the startup timing report. self._startup_timing_reported = False - # Whether we've already measured transport readiness. - self._transport_readiness_measured = False + # Whether we've already measured transport timing. + self._transport_timing_reported = False # Timestamp (ns) when we first see a StartFrame arrive at a processor. self._start_frame_arrival_ns: Optional[int] = None + # Bot connected timing (stored for inclusion in the transport report). + self._bot_connected_secs: Optional[float] = None + self._register_event_handler("on_startup_timing_report") - self._register_event_handler("on_transport_readiness_measured") + self._register_event_handler("on_transport_timing_report") def _should_track(self, processor: FrameProcessor) -> bool: """Check if a processor should be tracked for timing. @@ -216,11 +229,16 @@ class StartupTimingObserver(BaseObserver): async def on_push_frame(self, data: FramePushed): """Record when a StartFrame leaves a processor and compute the delta. - Also handles ``ClientConnectedFrame`` to measure transport readiness. + Also handles ``BotConnectedFrame`` and ``ClientConnectedFrame`` to + measure transport timing. Args: data: The frame push event data. """ + if isinstance(data.frame, BotConnectedFrame): + self._handle_bot_connected(data) + return + if isinstance(data.frame, ClientConnectedFrame): await self._handle_client_connected(data) return @@ -249,16 +267,27 @@ class StartupTimingObserver(BaseObserver): ) ) - async def _handle_client_connected(self, data: FramePushed): - """Measure transport readiness on first client connection.""" - if self._transport_readiness_measured or self._start_frame_arrival_ns is None: + def _handle_bot_connected(self, data: FramePushed): + """Record bot connected timing on first BotConnectedFrame.""" + if self._bot_connected_secs is not None or self._start_frame_arrival_ns is None: return - self._transport_readiness_measured = True delta_ns = data.timestamp - self._start_frame_arrival_ns - readiness_secs = delta_ns / 1e9 - report = TransportReadinessReport(readiness_secs=readiness_secs) - await self._call_event_handler("on_transport_readiness_measured", report) + self._bot_connected_secs = delta_ns / 1e9 + + async def _handle_client_connected(self, data: FramePushed): + """Emit transport timing report on first ClientConnectedFrame.""" + if self._transport_timing_reported or self._start_frame_arrival_ns is None: + return + + self._transport_timing_reported = True + delta_ns = data.timestamp - self._start_frame_arrival_ns + client_connected_secs = delta_ns / 1e9 + report = TransportTimingReport( + bot_connected_secs=self._bot_connected_secs, + client_connected_secs=client_connected_secs, + ) + await self._call_event_handler("on_transport_timing_report", report) async def _emit_report(self): """Build and emit the startup timing report.""" diff --git a/src/pipecat/services/heygen/client.py b/src/pipecat/services/heygen/client.py index 4018d3858..6d45d6114 100644 --- a/src/pipecat/services/heygen/client.py +++ b/src/pipecat/services/heygen/client.py @@ -62,10 +62,12 @@ class HeyGenCallbacks(BaseModel): """Callback handlers for HeyGen events. Parameters: - on_participant_connected: Called when a participant connects - on_participant_disconnected: Called when a participant disconnects + on_connected: Called when the bot connects to the LiveKit room. + on_participant_connected: Called when a participant connects. + on_participant_disconnected: Called when a participant disconnects. """ + on_connected: Callable[[], Awaitable[None]] on_participant_connected: Callable[[str], Awaitable[None]] on_participant_disconnected: Callable[[str], Awaitable[None]] @@ -251,6 +253,7 @@ class HeyGenClient: logger.debug(f"HeyGenClient send_interval: {self._send_interval}") await self._ws_connect() await self._livekit_connect() + self._call_event_callback(self._callbacks.on_connected) async def stop(self) -> None: """Stop the client and terminate all connections. diff --git a/src/pipecat/services/heygen/video.py b/src/pipecat/services/heygen/video.py index b97f4a5ed..7f3624f35 100644 --- a/src/pipecat/services/heygen/video.py +++ b/src/pipecat/services/heygen/video.py @@ -128,6 +128,7 @@ class HeyGenVideoService(AIService): session_request=self._session_request, service_type=self._service_type, callbacks=HeyGenCallbacks( + on_connected=self._on_connected, on_participant_connected=self._on_participant_connected, on_participant_disconnected=self._on_participant_disconnected, ), @@ -144,6 +145,10 @@ class HeyGenVideoService(AIService): await self._client.cleanup() self._client = None + async def _on_connected(self): + """Handle bot connected to LiveKit room.""" + logger.info("HeyGen bot connected to LiveKit room") + async def _on_participant_connected(self, participant_id: str): """Handle participant connected events.""" logger.info(f"Participant connected {participant_id}") diff --git a/src/pipecat/services/tavus/video.py b/src/pipecat/services/tavus/video.py index d9f259797..8c63ff354 100644 --- a/src/pipecat/services/tavus/video.py +++ b/src/pipecat/services/tavus/video.py @@ -94,6 +94,7 @@ class TavusVideoService(AIService): """ await super().setup(setup) callbacks = TavusCallbacks( + on_joined=self._on_joined, on_participant_joined=self._on_participant_joined, on_participant_left=self._on_participant_left, ) @@ -119,6 +120,10 @@ class TavusVideoService(AIService): await self._client.cleanup() self._client = None + async def _on_joined(self, data): + """Handle bot joined the Daily room.""" + logger.info("Tavus bot joined Daily room") + async def _on_participant_left(self, participant, reason): """Handle participant leaving the session.""" participant_id = participant["id"] diff --git a/src/pipecat/transports/daily/transport.py b/src/pipecat/transports/daily/transport.py index cb24b23fa..97aebe915 100644 --- a/src/pipecat/transports/daily/transport.py +++ b/src/pipecat/transports/daily/transport.py @@ -24,6 +24,7 @@ from pydantic import BaseModel from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADParams from pipecat.frames.frames import ( + BotConnectedFrame, CancelFrame, ClientConnectedFrame, DataFrame, @@ -2579,6 +2580,8 @@ class DailyTransport(BaseTransport): if error: await self._on_error(f"Unable to start transcription: {error}") await self._call_event_handler("on_joined", data) + if self._input: + await self._input.push_frame(BotConnectedFrame()) async def _on_left(self): """Handle room left events.""" diff --git a/src/pipecat/transports/heygen/transport.py b/src/pipecat/transports/heygen/transport.py index 77ccda09f..d79d0080e 100644 --- a/src/pipecat/transports/heygen/transport.py +++ b/src/pipecat/transports/heygen/transport.py @@ -23,6 +23,7 @@ from loguru import logger from pipecat.frames.frames import ( AudioRawFrame, + BotConnectedFrame, BotStartedSpeakingFrame, BotStoppedSpeakingFrame, CancelFrame, @@ -340,6 +341,7 @@ class HeyGenTransport(BaseTransport): session_request=session_request, service_type=service_type, callbacks=HeyGenCallbacks( + on_connected=self._on_connected, on_participant_connected=self._on_participant_connected, on_participant_disconnected=self._on_participant_disconnected, ), @@ -350,9 +352,16 @@ class HeyGenTransport(BaseTransport): # Register supported handlers. The user will only be able to register # these handlers. + self._register_event_handler("on_connected") self._register_event_handler("on_client_connected") self._register_event_handler("on_client_disconnected") + async def _on_connected(self): + """Handle bot connected to LiveKit room.""" + await self._call_event_handler("on_connected") + if self._input: + await self._input.push_frame(BotConnectedFrame()) + async def _on_participant_disconnected(self, participant_id: str): logger.debug(f"HeyGen participant {participant_id} disconnected") if participant_id != "heygen": diff --git a/src/pipecat/transports/livekit/transport.py b/src/pipecat/transports/livekit/transport.py index e4435016c..7e9c1de35 100644 --- a/src/pipecat/transports/livekit/transport.py +++ b/src/pipecat/transports/livekit/transport.py @@ -23,6 +23,7 @@ from pipecat.audio.utils import create_stream_resampler from pipecat.audio.vad.vad_analyzer import VADAnalyzer from pipecat.frames.frames import ( AudioRawFrame, + BotConnectedFrame, CancelFrame, ClientConnectedFrame, EndFrame, @@ -1132,6 +1133,8 @@ class LiveKitTransport(BaseTransport): async def _on_connected(self): """Handle room connected events.""" await self._call_event_handler("on_connected") + if self._input: + await self._input.push_frame(BotConnectedFrame()) async def _on_disconnected(self): """Handle room disconnected events.""" diff --git a/src/pipecat/transports/tavus/transport.py b/src/pipecat/transports/tavus/transport.py index 114f33ca0..6db44d431 100644 --- a/src/pipecat/transports/tavus/transport.py +++ b/src/pipecat/transports/tavus/transport.py @@ -21,6 +21,7 @@ from loguru import logger from pydantic import BaseModel from pipecat.frames.frames import ( + BotConnectedFrame, CancelFrame, ClientConnectedFrame, EndFrame, @@ -133,10 +134,12 @@ class TavusCallbacks(BaseModel): """Callback handlers for Tavus events. Parameters: + on_joined: Called when the bot joins the Daily room. on_participant_joined: Called when a participant joins the conversation. on_participant_left: Called when a participant leaves the conversation. """ + on_joined: Callable[[Mapping[str, Any]], Awaitable[None]] on_participant_joined: Callable[[Mapping[str, Any]], Awaitable[None]] on_participant_left: Callable[[Mapping[str, Any], str], Awaitable[None]] @@ -271,6 +274,7 @@ class TavusTransportClient: async def _on_joined(self, data): """Handle joined event.""" logger.debug("TavusTransportClient joined!") + await self._callbacks.on_joined(data) async def _on_left(self): """Handle left event.""" @@ -703,6 +707,7 @@ class TavusTransport(BaseTransport): self._params = params callbacks = TavusCallbacks( + on_joined=self._on_joined, on_participant_joined=self._on_participant_joined, on_participant_left=self._on_participant_left, ) @@ -721,9 +726,16 @@ class TavusTransport(BaseTransport): # Register supported handlers. The user will only be able to register # these handlers. + self._register_event_handler("on_joined") self._register_event_handler("on_client_connected") self._register_event_handler("on_client_disconnected") + async def _on_joined(self, data): + """Handle bot joined room event.""" + await self._call_event_handler("on_joined", data) + if self._input: + await self._input.push_frame(BotConnectedFrame()) + async def _on_participant_left(self, participant, reason): """Handle participant left events.""" persona_name = await self._client.get_persona_name() diff --git a/tests/test_startup_timing_observer.py b/tests/test_startup_timing_observer.py index efabf5bc7..3c89b9ca3 100644 --- a/tests/test_startup_timing_observer.py +++ b/tests/test_startup_timing_observer.py @@ -1,11 +1,17 @@ import asyncio import unittest -from pipecat.frames.frames import ClientConnectedFrame, Frame, StartFrame, TextFrame +from pipecat.frames.frames import ( + BotConnectedFrame, + ClientConnectedFrame, + Frame, + StartFrame, + TextFrame, +) from pipecat.observers.startup_timing_observer import ( StartupTimingObserver, StartupTimingReport, - TransportReadinessReport, + TransportTimingReport, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.tests.utils import run_test @@ -182,16 +188,16 @@ class TestStartupTimingObserver(unittest.IsolatedAsyncioTestCase): f"Internal processor {t.processor_name} should be excluded by default", ) - async def test_transport_readiness_measured(self): - """Test that ClientConnectedFrame after startup emits on_transport_readiness_measured.""" + async def test_transport_timing_client_only(self): + """Test that ClientConnectedFrame emits on_transport_timing_report.""" observer = StartupTimingObserver() processor = FastProcessor() - readiness_reports = [] + transport_reports = [] - @observer.event_handler("on_transport_readiness_measured") - async def on_readiness(obs, report): - readiness_reports.append(report) + @observer.event_handler("on_transport_timing_report") + async def on_transport(obs, report): + transport_reports.append(report) frames_to_send = [ClientConnectedFrame(), TextFrame(text="hello")] @@ -202,21 +208,22 @@ class TestStartupTimingObserver(unittest.IsolatedAsyncioTestCase): observers=[observer], ) - self.assertEqual(len(readiness_reports), 1) - report = readiness_reports[0] - self.assertIsInstance(report, TransportReadinessReport) - self.assertGreater(report.readiness_secs, 0) + self.assertEqual(len(transport_reports), 1) + report = transport_reports[0] + self.assertIsInstance(report, TransportTimingReport) + self.assertGreater(report.client_connected_secs, 0) + self.assertIsNone(report.bot_connected_secs) - async def test_transport_readiness_only_first(self): + async def test_transport_timing_only_first_client(self): """Test that only the first ClientConnectedFrame triggers the event.""" observer = StartupTimingObserver() processor = FastProcessor() - readiness_reports = [] + transport_reports = [] - @observer.event_handler("on_transport_readiness_measured") - async def on_readiness(obs, report): - readiness_reports.append(report) + @observer.event_handler("on_transport_timing_report") + async def on_transport(obs, report): + transport_reports.append(report) frames_to_send = [ ClientConnectedFrame(), @@ -231,9 +238,9 @@ class TestStartupTimingObserver(unittest.IsolatedAsyncioTestCase): observers=[observer], ) - self.assertEqual(len(readiness_reports), 1) + self.assertEqual(len(transport_reports), 1) - async def test_transport_readiness_without_start_frame(self): + async def test_transport_timing_without_start_frame(self): """Test that ClientConnectedFrame before StartFrame does not crash.""" observer = StartupTimingObserver() @@ -253,7 +260,74 @@ class TestStartupTimingObserver(unittest.IsolatedAsyncioTestCase): await observer.on_push_frame(data) # No event should have been emitted. - self.assertFalse(observer._transport_readiness_measured) + self.assertFalse(observer._transport_timing_reported) + + async def test_bot_and_client_connected(self): + """Test that BotConnectedFrame timing is included in the transport report.""" + observer = StartupTimingObserver() + processor = FastProcessor() + + transport_reports = [] + + @observer.event_handler("on_transport_timing_report") + async def on_transport(obs, report): + transport_reports.append(report) + + frames_to_send = [ + BotConnectedFrame(), + ClientConnectedFrame(), + TextFrame(text="hello"), + ] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=[BotConnectedFrame, ClientConnectedFrame, TextFrame], + observers=[observer], + ) + + self.assertEqual(len(transport_reports), 1) + report = transport_reports[0] + self.assertGreater(report.client_connected_secs, 0) + self.assertIsNotNone(report.bot_connected_secs) + self.assertGreater(report.bot_connected_secs, 0) + + # Client connected should be >= bot connected. + self.assertGreaterEqual(report.client_connected_secs, report.bot_connected_secs) + + async def test_bot_connected_only_first(self): + """Test that only the first BotConnectedFrame is recorded.""" + observer = StartupTimingObserver() + processor = FastProcessor() + + transport_reports = [] + + @observer.event_handler("on_transport_timing_report") + async def on_transport(obs, report): + transport_reports.append(report) + + frames_to_send = [ + BotConnectedFrame(), + BotConnectedFrame(), + ClientConnectedFrame(), + TextFrame(text="hello"), + ] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=[ + BotConnectedFrame, + BotConnectedFrame, + ClientConnectedFrame, + TextFrame, + ], + observers=[observer], + ) + + # Only one transport report, with bot timing from first frame. + self.assertEqual(len(transport_reports), 1) + self.assertIsNotNone(transport_reports[0].bot_connected_secs) if __name__ == "__main__": From 75669b12a2a4a07c396932a0e67afa951733db2c Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Mon, 2 Mar 2026 11:01:26 -0500 Subject: [PATCH 06/11] Convert observer data models to Pydantic BaseModel with timestamps Switch ProcessorStartupTiming, StartupTimingReport, and TransportTimingReport from dataclasses to Pydantic BaseModel. Add start_time (Unix timestamp) fields and wall clock conversion for monotonic observer timestamps. --- .../observers/startup_timing_observer.py | 37 +++++++++++++++---- tests/test_startup_timing_observer.py | 3 ++ 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/src/pipecat/observers/startup_timing_observer.py b/src/pipecat/observers/startup_timing_observer.py index 555a10cb0..6dd574cdc 100644 --- a/src/pipecat/observers/startup_timing_observer.py +++ b/src/pipecat/observers/startup_timing_observer.py @@ -34,9 +34,11 @@ Example:: task = PipelineTask(pipeline, observers=[observer]) """ -from dataclasses import dataclass, field +import time from typing import Dict, List, Optional, Tuple, Type +from pydantic import BaseModel, Field + from pipecat.frames.frames import BotConnectedFrame, ClientConnectedFrame, StartFrame from pipecat.observers.base_observer import BaseObserver, FrameProcessed, FramePushed from pipecat.pipeline.base_pipeline import BasePipeline @@ -47,42 +49,45 @@ from pipecat.processors.frame_processor import FrameProcessor _INTERNAL_TYPES = (PipelineSink, PipelineSource, BasePipeline) -@dataclass -class ProcessorStartupTiming: +class ProcessorStartupTiming(BaseModel): """Startup timing for a single processor. Parameters: processor_name: The name of the processor. + start_time: Unix timestamp when the processor's start() began. duration_secs: How long the processor's start() took, in seconds. """ processor_name: str + start_time: float duration_secs: float -@dataclass -class StartupTimingReport: +class StartupTimingReport(BaseModel): """Report of startup timings for all measured processors. Parameters: + start_time: Unix timestamp when the first processor began starting. total_duration_secs: Total wall-clock time from first to last processor start. processor_timings: Per-processor timing data, in pipeline order. """ + start_time: float total_duration_secs: float - processor_timings: List[ProcessorStartupTiming] = field(default_factory=list) + processor_timings: List[ProcessorStartupTiming] = Field(default_factory=list) -@dataclass -class TransportTimingReport: +class TransportTimingReport(BaseModel): """Time from pipeline start to transport connection milestones. Parameters: + start_time: Unix timestamp of the StartFrame (pipeline start). bot_connected_secs: Seconds from StartFrame to first BotConnectedFrame (only set for SFU transports). client_connected_secs: Seconds from StartFrame to first ClientConnectedFrame. """ + start_time: float bot_connected_secs: Optional[float] = None client_connected_secs: Optional[float] = None @@ -176,9 +181,19 @@ class StartupTimingObserver(BaseObserver): # Bot connected timing (stored for inclusion in the transport report). self._bot_connected_secs: Optional[float] = None + # Wall clock reference for converting monotonic ns to Unix timestamps. + self._wall_clock_ref: Optional[float] = None + self._mono_clock_ref_ns: Optional[int] = None + self._register_event_handler("on_startup_timing_report") self._register_event_handler("on_transport_timing_report") + def _mono_to_wall(self, mono_ns: int) -> float: + """Convert a monotonic nanosecond timestamp to a Unix wall clock time.""" + if self._wall_clock_ref is None or self._mono_clock_ref_ns is None: + return 0.0 + return self._wall_clock_ref + (mono_ns - self._mono_clock_ref_ns) / 1e9 + def _should_track(self, processor: FrameProcessor) -> bool: """Check if a processor should be tracked for timing. @@ -212,6 +227,8 @@ class StartupTimingObserver(BaseObserver): if self._start_frame_id is None: self._start_frame_id = data.frame.id self._start_frame_arrival_ns = data.timestamp + self._wall_clock_ref = time.time() + self._mono_clock_ref_ns = data.timestamp elif data.frame.id != self._start_frame_id: return @@ -263,6 +280,7 @@ class StartupTimingObserver(BaseObserver): self._timings.append( ProcessorStartupTiming( processor_name=processor.name, + start_time=self._mono_to_wall(arrival_ts), duration_secs=duration_secs, ) ) @@ -284,6 +302,7 @@ class StartupTimingObserver(BaseObserver): delta_ns = data.timestamp - self._start_frame_arrival_ns client_connected_secs = delta_ns / 1e9 report = TransportTimingReport( + start_time=self._mono_to_wall(self._start_frame_arrival_ns), bot_connected_secs=self._bot_connected_secs, client_connected_secs=client_connected_secs, ) @@ -296,8 +315,10 @@ class StartupTimingObserver(BaseObserver): self._startup_timing_reported = True total = sum(t.duration_secs for t in self._timings) + start_time = self._timings[0].start_time if self._timings else 0.0 report = StartupTimingReport( + start_time=start_time, total_duration_secs=total, processor_timings=self._timings, ) diff --git a/tests/test_startup_timing_observer.py b/tests/test_startup_timing_observer.py index 3c89b9ca3..2bc246754 100644 --- a/tests/test_startup_timing_observer.py +++ b/tests/test_startup_timing_observer.py @@ -151,9 +151,11 @@ class TestStartupTimingObserver(unittest.IsolatedAsyncioTestCase): report = reports[0] self.assertIsInstance(report, StartupTimingReport) self.assertIsInstance(report.total_duration_secs, float) + self.assertGreater(report.start_time, 0) for timing in report.processor_timings: self.assertIsInstance(timing.processor_name, str) self.assertIsInstance(timing.duration_secs, float) + self.assertGreater(timing.start_time, 0) async def test_excludes_internal_processors(self): """Test that internal pipeline processors are excluded by default.""" @@ -211,6 +213,7 @@ class TestStartupTimingObserver(unittest.IsolatedAsyncioTestCase): self.assertEqual(len(transport_reports), 1) report = transport_reports[0] self.assertIsInstance(report, TransportTimingReport) + self.assertGreater(report.start_time, 0) self.assertGreater(report.client_connected_secs, 0) self.assertIsNone(report.bot_connected_secs) From bbbfdfd32143940726662ca729d5d2a79637286c Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Mon, 2 Mar 2026 14:07:34 -0500 Subject: [PATCH 07/11] Replace per-processor start_time with start_offset_secs Use start_offset_secs (offset from StartFrame) on ProcessorStartupTiming instead of a wall-clock timestamp. Reports keep a single start_time anchor for dashboard visualization. Remove _mono_to_wall conversion. --- .../observers/startup_timing_observer.py | 28 ++++++++----------- tests/test_startup_timing_observer.py | 2 +- 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/src/pipecat/observers/startup_timing_observer.py b/src/pipecat/observers/startup_timing_observer.py index 6dd574cdc..8233ed2b8 100644 --- a/src/pipecat/observers/startup_timing_observer.py +++ b/src/pipecat/observers/startup_timing_observer.py @@ -54,12 +54,13 @@ class ProcessorStartupTiming(BaseModel): Parameters: processor_name: The name of the processor. - start_time: Unix timestamp when the processor's start() began. + start_offset_secs: Offset in seconds from the StartFrame to when this + processor's start() began. duration_secs: How long the processor's start() took, in seconds. """ processor_name: str - start_time: float + start_offset_secs: float duration_secs: float @@ -181,19 +182,12 @@ class StartupTimingObserver(BaseObserver): # Bot connected timing (stored for inclusion in the transport report). self._bot_connected_secs: Optional[float] = None - # Wall clock reference for converting monotonic ns to Unix timestamps. - self._wall_clock_ref: Optional[float] = None - self._mono_clock_ref_ns: Optional[int] = None + # Wall clock time when the StartFrame was first seen. + self._start_wall_clock: Optional[float] = None self._register_event_handler("on_startup_timing_report") self._register_event_handler("on_transport_timing_report") - def _mono_to_wall(self, mono_ns: int) -> float: - """Convert a monotonic nanosecond timestamp to a Unix wall clock time.""" - if self._wall_clock_ref is None or self._mono_clock_ref_ns is None: - return 0.0 - return self._wall_clock_ref + (mono_ns - self._mono_clock_ref_ns) / 1e9 - def _should_track(self, processor: FrameProcessor) -> bool: """Check if a processor should be tracked for timing. @@ -227,8 +221,7 @@ class StartupTimingObserver(BaseObserver): if self._start_frame_id is None: self._start_frame_id = data.frame.id self._start_frame_arrival_ns = data.timestamp - self._wall_clock_ref = time.time() - self._mono_clock_ref_ns = data.timestamp + self._start_wall_clock = time.time() elif data.frame.id != self._start_frame_id: return @@ -277,10 +270,12 @@ class StartupTimingObserver(BaseObserver): duration_ns = data.timestamp - arrival_ts duration_secs = duration_ns / 1e9 + start_offset_secs = (arrival_ts - self._start_frame_arrival_ns) / 1e9 + self._timings.append( ProcessorStartupTiming( processor_name=processor.name, - start_time=self._mono_to_wall(arrival_ts), + start_offset_secs=start_offset_secs, duration_secs=duration_secs, ) ) @@ -302,7 +297,7 @@ class StartupTimingObserver(BaseObserver): delta_ns = data.timestamp - self._start_frame_arrival_ns client_connected_secs = delta_ns / 1e9 report = TransportTimingReport( - start_time=self._mono_to_wall(self._start_frame_arrival_ns), + start_time=self._start_wall_clock or 0.0, bot_connected_secs=self._bot_connected_secs, client_connected_secs=client_connected_secs, ) @@ -315,10 +310,9 @@ class StartupTimingObserver(BaseObserver): self._startup_timing_reported = True total = sum(t.duration_secs for t in self._timings) - start_time = self._timings[0].start_time if self._timings else 0.0 report = StartupTimingReport( - start_time=start_time, + start_time=self._start_wall_clock or 0.0, total_duration_secs=total, processor_timings=self._timings, ) diff --git a/tests/test_startup_timing_observer.py b/tests/test_startup_timing_observer.py index 2bc246754..6355c6081 100644 --- a/tests/test_startup_timing_observer.py +++ b/tests/test_startup_timing_observer.py @@ -155,7 +155,7 @@ class TestStartupTimingObserver(unittest.IsolatedAsyncioTestCase): for timing in report.processor_timings: self.assertIsInstance(timing.processor_name, str) self.assertIsInstance(timing.duration_secs, float) - self.assertGreater(timing.start_time, 0) + self.assertGreaterEqual(timing.start_offset_secs, 0) async def test_excludes_internal_processors(self): """Test that internal pipeline processors are excluded by default.""" From 0cfd953a900f388a09ca40cedf3f49775f6d1cae Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Mon, 2 Mar 2026 14:15:41 -0500 Subject: [PATCH 08/11] Use _ArrivalInfo dataclass instead of tuple for arrival tracking --- .../observers/startup_timing_observer.py | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/src/pipecat/observers/startup_timing_observer.py b/src/pipecat/observers/startup_timing_observer.py index 8233ed2b8..d4a010d33 100644 --- a/src/pipecat/observers/startup_timing_observer.py +++ b/src/pipecat/observers/startup_timing_observer.py @@ -35,6 +35,7 @@ Example:: """ import time +from dataclasses import dataclass from typing import Dict, List, Optional, Tuple, Type from pydantic import BaseModel, Field @@ -49,6 +50,14 @@ from pipecat.processors.frame_processor import FrameProcessor _INTERNAL_TYPES = (PipelineSink, PipelineSource, BasePipeline) +@dataclass +class _ArrivalInfo: + """Internal record of when a StartFrame arrived at a processor.""" + + processor: FrameProcessor + arrival_ts_ns: int + + class ProcessorStartupTiming(BaseModel): """Startup timing for a single processor. @@ -161,8 +170,8 @@ class StartupTimingObserver(BaseObserver): super().__init__(**kwargs) self._processor_types = processor_types - # Map processor ID -> (processor, arrival_timestamp_ns) - self._arrivals: Dict[int, Tuple[FrameProcessor, int]] = {} + # Map processor ID -> arrival info. + self._arrivals: Dict[int, _ArrivalInfo] = {} # Collected timings in pipeline order. self._timings: List[ProcessorStartupTiming] = [] @@ -234,7 +243,9 @@ class StartupTimingObserver(BaseObserver): return if self._should_track(data.processor): - self._arrivals[data.processor.id] = (data.processor, data.timestamp) + self._arrivals[data.processor.id] = _ArrivalInfo( + processor=data.processor, arrival_ts_ns=data.timestamp + ) async def on_push_frame(self, data: FramePushed): """Record when a StartFrame leaves a processor and compute the delta. @@ -266,15 +277,13 @@ class StartupTimingObserver(BaseObserver): if arrival is None: return - processor, arrival_ts = arrival - duration_ns = data.timestamp - arrival_ts + duration_ns = data.timestamp - arrival.arrival_ts_ns duration_secs = duration_ns / 1e9 - - start_offset_secs = (arrival_ts - self._start_frame_arrival_ns) / 1e9 + start_offset_secs = (arrival.arrival_ts_ns - self._start_frame_arrival_ns) / 1e9 self._timings.append( ProcessorStartupTiming( - processor_name=processor.name, + processor_name=arrival.processor.name, start_offset_secs=start_offset_secs, duration_secs=duration_secs, ) From 389d0c3fb6adbbf7d926a3e3a6fea49a8aaae3d0 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Mon, 2 Mar 2026 14:33:55 -0500 Subject: [PATCH 09/11] Use on_pipeline_started from PipelineTask for startup report Replace the PipelineSink detection in StartupTimingObserver with an on_pipeline_started() callback from PipelineTask via TaskObserver. This fixes premature report emission when using ParallelPipeline, which has its own inner PipelineSinks per branch. --- src/pipecat/observers/base_observer.py | 8 +++++ .../observers/startup_timing_observer.py | 31 +++++++++---------- src/pipecat/pipeline/task.py | 1 + src/pipecat/pipeline/task_observer.py | 14 ++++++++- 4 files changed, 37 insertions(+), 17 deletions(-) diff --git a/src/pipecat/observers/base_observer.py b/src/pipecat/observers/base_observer.py index 78e36fec8..70c79224a 100644 --- a/src/pipecat/observers/base_observer.py +++ b/src/pipecat/observers/base_observer.py @@ -100,3 +100,11 @@ class BaseObserver(BaseObject): data: The event data containing details about the frame transfer. """ pass + + async def on_pipeline_started(self): + """Called when the pipeline has fully started. + + Fired after the ``StartFrame`` has been processed by all processors + in the pipeline, including nested ``ParallelPipeline`` branches. + """ + pass diff --git a/src/pipecat/observers/startup_timing_observer.py b/src/pipecat/observers/startup_timing_observer.py index d4a010d33..a1ea04d47 100644 --- a/src/pipecat/observers/startup_timing_observer.py +++ b/src/pipecat/observers/startup_timing_observer.py @@ -43,11 +43,11 @@ from pydantic import BaseModel, Field from pipecat.frames.frames import BotConnectedFrame, ClientConnectedFrame, StartFrame from pipecat.observers.base_observer import BaseObserver, FrameProcessed, FramePushed from pipecat.pipeline.base_pipeline import BasePipeline -from pipecat.pipeline.pipeline import PipelineSink, PipelineSource +from pipecat.pipeline.pipeline import PipelineSource from pipecat.processors.frame_processor import FrameProcessor # Internal pipeline types excluded from tracking by default. -_INTERNAL_TYPES = (PipelineSink, PipelineSource, BasePipeline) +_INTERNAL_TYPES = (PipelineSource, BasePipeline) @dataclass @@ -118,9 +118,9 @@ class StartupTimingObserver(BaseObserver): - ``client_connected_secs``: When a remote participant connects (triggered by ``ClientConnectedFrame``). - By default, internal pipeline processors (``PipelineSource``, ``PipelineSink``, - ``Pipeline``) are excluded from the report. Pass ``processor_types`` to - measure only specific types. + By default, internal pipeline processors (``PipelineSource``, ``Pipeline``) + are excluded from the report. Pass ``processor_types`` to measure only + specific types. Event handlers available: @@ -211,12 +211,19 @@ class StartupTimingObserver(BaseObserver): # Default: exclude internal pipeline plumbing. return not isinstance(processor, _INTERNAL_TYPES) + async def on_pipeline_started(self): + """Emit the startup timing report when the pipeline has fully started. + + Called by the ``PipelineTask`` after the ``StartFrame`` has been + processed by all processors, including nested ``ParallelPipeline`` + branches. + """ + if self._timings: + await self._emit_report() + async def on_process_frame(self, data: FrameProcessed): """Record when a StartFrame arrives at a processor. - When a ``StartFrame`` reaches a ``PipelineSink``, startup is complete - (the frame has traversed the entire pipeline) and the report is emitted. - Args: data: The frame processing event data. """ @@ -234,14 +241,6 @@ class StartupTimingObserver(BaseObserver): elif data.frame.id != self._start_frame_id: return - # When the StartFrame reaches a PipelineSink, all processors have - # completed start(). PipelineSinks use direct mode so the outermost - # sink fires last within the same synchronous call chain. - if isinstance(data.processor, PipelineSink): - if self._timings: - await self._emit_report() - return - if self._should_track(data.processor): self._arrivals[data.processor.id] = _ArrivalInfo( processor=data.processor, arrival_ts_ns=data.timestamp diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index deae6290c..906d55eb6 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -915,6 +915,7 @@ class PipelineTask(BasePipelineTask): if isinstance(frame, StartFrame): await self._call_event_handler("on_pipeline_started", frame) + await self._observer.on_pipeline_started() # Start heartbeat tasks now that StartFrame has been processed # by all processors in the pipeline diff --git a/src/pipecat/pipeline/task_observer.py b/src/pipecat/pipeline/task_observer.py index 4d33fd60e..dc2040e07 100644 --- a/src/pipecat/pipeline/task_observer.py +++ b/src/pipecat/pipeline/task_observer.py @@ -39,6 +39,12 @@ class Proxy: observer: BaseObserver +class _PipelineStartedSignal: + """Internal sentinel queued to observers when the pipeline has started.""" + + pass + + class TaskObserver(BaseObserver): """Proxy observer that manages multiple observers without blocking the pipeline. @@ -129,6 +135,10 @@ class TaskObserver(BaseObserver): for proxy in self._proxies: await proxy.cleanup() + async def on_pipeline_started(self): + """Forward pipeline started signal to all managed observers.""" + await self._send_to_proxy(_PipelineStartedSignal()) + async def on_process_frame(self, data: FrameProcessed): """Queue frame data for all managed observers. @@ -186,7 +196,9 @@ class TaskObserver(BaseObserver): while True: data = await queue.get() - if isinstance(data, FramePushed): + if isinstance(data, _PipelineStartedSignal): + await observer.on_pipeline_started() + elif isinstance(data, FramePushed): if on_push_frame_deprecated: await observer.on_push_frame( data.source, data.destination, data.frame, data.direction, data.timestamp From c1743dcffd16e05785c7c68317981280b299de94 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Mon, 2 Mar 2026 15:22:44 -0500 Subject: [PATCH 10/11] Rename Tavus event, on_connected --- src/pipecat/transports/tavus/transport.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/pipecat/transports/tavus/transport.py b/src/pipecat/transports/tavus/transport.py index 6db44d431..cb6844250 100644 --- a/src/pipecat/transports/tavus/transport.py +++ b/src/pipecat/transports/tavus/transport.py @@ -134,12 +134,12 @@ class TavusCallbacks(BaseModel): """Callback handlers for Tavus events. Parameters: - on_joined: Called when the bot joins the Daily room. + on_connected: Called when the bot connects to the room. on_participant_joined: Called when a participant joins the conversation. on_participant_left: Called when a participant leaves the conversation. """ - on_joined: Callable[[Mapping[str, Any]], Awaitable[None]] + on_connected: Callable[[Mapping[str, Any]], Awaitable[None]] on_participant_joined: Callable[[Mapping[str, Any]], Awaitable[None]] on_participant_left: Callable[[Mapping[str, Any], str], Awaitable[None]] @@ -274,7 +274,7 @@ class TavusTransportClient: async def _on_joined(self, data): """Handle joined event.""" logger.debug("TavusTransportClient joined!") - await self._callbacks.on_joined(data) + await self._callbacks.on_connected(data) async def _on_left(self): """Handle left event.""" @@ -669,6 +669,7 @@ class TavusTransport(BaseTransport): Event handlers available: + - on_connected(transport, data): Bot connected to the room - on_client_connected(transport, participant): Participant connected to the session - on_client_disconnected(transport, participant): Participant disconnected from the session @@ -707,7 +708,7 @@ class TavusTransport(BaseTransport): self._params = params callbacks = TavusCallbacks( - on_joined=self._on_joined, + on_connected=self._on_joined, on_participant_joined=self._on_participant_joined, on_participant_left=self._on_participant_left, ) @@ -726,13 +727,13 @@ class TavusTransport(BaseTransport): # Register supported handlers. The user will only be able to register # these handlers. - self._register_event_handler("on_joined") + self._register_event_handler("on_connected") self._register_event_handler("on_client_connected") self._register_event_handler("on_client_disconnected") async def _on_joined(self, data): """Handle bot joined room event.""" - await self._call_event_handler("on_joined", data) + await self._call_event_handler("on_connected", data) if self._input: await self._input.push_frame(BotConnectedFrame()) From dbdb54ce0f306a38fb0d49f0d8146d5594f9cd39 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Mon, 2 Mar 2026 15:44:37 -0500 Subject: [PATCH 11/11] Add on_connected event handler to DailyTransport for cross-transport consistency --- src/pipecat/transports/daily/transport.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/pipecat/transports/daily/transport.py b/src/pipecat/transports/daily/transport.py index 97aebe915..dc9868426 100644 --- a/src/pipecat/transports/daily/transport.py +++ b/src/pipecat/transports/daily/transport.py @@ -2072,6 +2072,8 @@ class DailyTransport(BaseTransport): Event handlers available: - on_joined: Called when the bot joins the room. Args: (data: dict) + - on_connected: Called when the bot connects to the room (alias for + on_joined). Args: (data: dict) - on_left: Called when the bot leaves the room. - on_before_leave: [sync] Called just before the bot leaves the room. - on_error: Called when a transport error occurs. Args: (error: str) @@ -2189,6 +2191,7 @@ class DailyTransport(BaseTransport): # Register supported handlers. The user will only be able to register # these handlers. self._register_event_handler("on_active_speaker_changed") + self._register_event_handler("on_connected") self._register_event_handler("on_joined") self._register_event_handler("on_left") self._register_event_handler("on_error") @@ -2580,6 +2583,8 @@ class DailyTransport(BaseTransport): if error: await self._on_error(f"Unable to start transcription: {error}") await self._call_event_handler("on_joined", data) + # Also call on_connected for compatibility with other transports + await self._call_event_handler("on_connected", data) if self._input: await self._input.push_frame(BotConnectedFrame())