Refactor STT TTFB metrics to use base class start/stop pattern

Eliminate custom _emit_stt_ttfb_metric and manual timestamp tracking in
STTService by reusing FrameProcessor's start_ttfb_metrics/stop_ttfb_metrics
with new start_time/end_time parameters. This keeps the chronological
start→stop ordering and removes _speech_end_time and _last_transcription_time
state from STTService.
This commit is contained in:
Aleix Conchillo Flaqué
2026-02-19 10:52:24 -08:00
parent 94e93bed83
commit 859cd7c920
3 changed files with 74 additions and 67 deletions

View File

@@ -419,27 +419,49 @@ class FrameProcessor(BaseObject):
""" """
self._metrics.set_core_metrics_data(data) self._metrics.set_core_metrics_data(data)
async def start_ttfb_metrics(self): async def start_ttfb_metrics(self, *, start_time: Optional[float] = None):
"""Start time-to-first-byte metrics collection.""" """Start time-to-first-byte metrics collection.
if self.can_generate_metrics() and self.metrics_enabled:
await self._metrics.start_ttfb_metrics(self._report_only_initial_ttfb)
async def stop_ttfb_metrics(self): Args:
"""Stop time-to-first-byte metrics collection and push results.""" start_time: Optional timestamp to use as the start time. If None,
uses the current time.
"""
if self.can_generate_metrics() and self.metrics_enabled: if self.can_generate_metrics() and self.metrics_enabled:
frame = await self._metrics.stop_ttfb_metrics() await self._metrics.start_ttfb_metrics(
start_time=start_time, report_only_initial_ttfb=self._report_only_initial_ttfb
)
async def stop_ttfb_metrics(self, *, end_time: Optional[float] = None):
"""Stop time-to-first-byte metrics collection and push results.
Args:
end_time: Optional timestamp to use as the end time. If None, uses
the current time.
"""
if self.can_generate_metrics() and self.metrics_enabled:
frame = await self._metrics.stop_ttfb_metrics(end_time=end_time)
if frame: if frame:
await self.push_frame(frame) await self.push_frame(frame)
async def start_processing_metrics(self): async def start_processing_metrics(self, *, start_time: Optional[float] = None):
"""Start processing metrics collection.""" """Start processing metrics collection.
if self.can_generate_metrics() and self.metrics_enabled:
await self._metrics.start_processing_metrics()
async def stop_processing_metrics(self): Args:
"""Stop processing metrics collection and push results.""" start_time: Optional timestamp to use as the start time. If None,
uses the current time.
"""
if self.can_generate_metrics() and self.metrics_enabled: if self.can_generate_metrics() and self.metrics_enabled:
frame = await self._metrics.stop_processing_metrics() await self._metrics.start_processing_metrics(start_time=start_time)
async def stop_processing_metrics(self, *, end_time: Optional[float] = None):
"""Stop processing metrics collection and push results.
Args:
end_time: Optional timestamp to use as the end time. If None, uses
the current time.
"""
if self.can_generate_metrics() and self.metrics_enabled:
frame = await self._metrics.stop_processing_metrics(end_time=end_time)
if frame: if frame:
await self.push_frame(frame) await self.push_frame(frame)

View File

@@ -107,49 +107,70 @@ class FrameProcessorMetrics(BaseObject):
""" """
self._core_metrics_data = MetricsData(processor=name) self._core_metrics_data = MetricsData(processor=name)
async def start_ttfb_metrics(self, report_only_initial_ttfb): async def start_ttfb_metrics(
self, *, start_time: Optional[float] = None, report_only_initial_ttfb: bool
):
"""Start measuring time-to-first-byte (TTFB). """Start measuring time-to-first-byte (TTFB).
Args: Args:
start_time: Optional timestamp to use as the start time. If None,
uses the current time.
report_only_initial_ttfb: Whether to report only the first TTFB measurement. report_only_initial_ttfb: Whether to report only the first TTFB measurement.
""" """
if self._should_report_ttfb: if self._should_report_ttfb:
self._start_ttfb_time = time.time() self._start_ttfb_time = start_time or time.time()
self._last_ttfb_time = 0 self._last_ttfb_time = 0
self._should_report_ttfb = not report_only_initial_ttfb self._should_report_ttfb = not report_only_initial_ttfb
async def stop_ttfb_metrics(self): async def stop_ttfb_metrics(self, *, end_time: Optional[float] = None):
"""Stop TTFB measurement and generate metrics frame. """Stop TTFB measurement and generate metrics frame.
Args:
end_time: Optional timestamp to use as the end time. If None, uses
the current time.
Returns: Returns:
MetricsFrame containing TTFB data, or None if not measuring. MetricsFrame containing TTFB data, or None if not measuring.
""" """
if self._start_ttfb_time == 0: if self._start_ttfb_time == 0:
return None return None
self._last_ttfb_time = time.time() - self._start_ttfb_time end_time = end_time or time.time()
logger.debug(f"{self._processor_name()} TTFB: {self._last_ttfb_time}")
self._last_ttfb_time = end_time - self._start_ttfb_time
logger.debug(f"{self._processor_name()} TTFB: {self._last_ttfb_time:.3f}s")
ttfb = TTFBMetricsData( ttfb = TTFBMetricsData(
processor=self._processor_name(), value=self._last_ttfb_time, model=self._model_name() processor=self._processor_name(), value=self._last_ttfb_time, model=self._model_name()
) )
self._start_ttfb_time = 0 self._start_ttfb_time = 0
return MetricsFrame(data=[ttfb]) return MetricsFrame(data=[ttfb])
async def start_processing_metrics(self): async def start_processing_metrics(self, *, start_time: Optional[float] = None):
"""Start measuring processing time.""" """Start measuring processing time.
self._start_processing_time = time.time()
async def stop_processing_metrics(self): Args:
start_time: Optional timestamp to use as the start time. If None,
uses the current time.
"""
self._start_processing_time = start_time or time.time()
async def stop_processing_metrics(self, *, end_time: Optional[float] = None):
"""Stop processing time measurement and generate metrics frame. """Stop processing time measurement and generate metrics frame.
Args:
end_time: Optional timestamp to use as the end time. If None, uses
the current time.
Returns: Returns:
MetricsFrame containing processing duration data, or None if not measuring. MetricsFrame containing processing duration data, or None if not measuring.
""" """
if self._start_processing_time == 0: if self._start_processing_time == 0:
return None return None
value = time.time() - self._start_processing_time end_time = end_time or time.time()
logger.debug(f"{self._processor_name()} processing time: {value}")
value = end_time - self._start_processing_time
logger.debug(f"{self._processor_name()} processing time: {value:.3f}s")
processing = ProcessingMetricsData( processing = ProcessingMetricsData(
processor=self._processor_name(), value=value, model=self._model_name() processor=self._processor_name(), value=value, model=self._model_name()
) )

View File

@@ -21,7 +21,6 @@ from pipecat.frames.frames import (
ErrorFrame, ErrorFrame,
Frame, Frame,
InterruptionFrame, InterruptionFrame,
MetricsFrame,
ServiceSwitcherRequestMetadataFrame, ServiceSwitcherRequestMetadataFrame,
StartFrame, StartFrame,
STTMetadataFrame, STTMetadataFrame,
@@ -31,7 +30,6 @@ from pipecat.frames.frames import (
VADUserStartedSpeakingFrame, VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame, VADUserStoppedSpeakingFrame,
) )
from pipecat.metrics.metrics import TTFBMetricsData
from pipecat.processors.frame_processor import FrameDirection from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_service import AIService from pipecat.services.ai_service import AIService
from pipecat.services.stt_latency import DEFAULT_TTFS_P99 from pipecat.services.stt_latency import DEFAULT_TTFS_P99
@@ -121,9 +119,7 @@ class STTService(AIService):
# STT TTFB tracking state # STT TTFB tracking state
self._stt_ttfb_timeout = stt_ttfb_timeout self._stt_ttfb_timeout = stt_ttfb_timeout
self._ttfb_timeout_task: Optional[asyncio.Task] = None self._ttfb_timeout_task: Optional[asyncio.Task] = None
self._speech_end_time: Optional[float] = None
self._user_speaking: bool = False self._user_speaking: bool = False
self._last_transcription_time: Optional[float] = None
self._finalize_pending: bool = False self._finalize_pending: bool = False
self._finalize_requested: bool = False self._finalize_requested: bool = False
@@ -327,23 +323,16 @@ class STTService(AIService):
direction: The direction to push the frame. direction: The direction to push the frame.
""" """
if isinstance(frame, TranscriptionFrame): if isinstance(frame, TranscriptionFrame):
# Store the transcription time for TTFB calculation
self._last_transcription_time = time.time()
# Set finalized from pending state and auto-reset # Set finalized from pending state and auto-reset
if self._finalize_pending: if self._finalize_pending:
frame.finalized = True frame.finalized = True
self._finalize_pending = False self._finalize_pending = False
# If this is a finalized transcription, report TTFB immediately # If this is a finalized transcription, report TTFB immediately
if frame.finalized and self._speech_end_time is not None: if frame.finalized:
ttfb = self._last_transcription_time - self._speech_end_time await self.stop_ttfb_metrics()
await self._emit_stt_ttfb_metric(ttfb)
# Cancel the timeout since we've already reported # Cancel the timeout since we've already reported
await self._cancel_ttfb_timeout() await self._cancel_ttfb_timeout()
# Clear state
self._speech_end_time = None
self._last_transcription_time = None
await super().push_frame(frame, direction) await super().push_frame(frame, direction)
@@ -373,8 +362,6 @@ class STTService(AIService):
while user is still speaking. while user is still speaking.
""" """
await self._cancel_ttfb_timeout() await self._cancel_ttfb_timeout()
self._speech_end_time = None
self._last_transcription_time = None
async def _handle_vad_user_started_speaking(self, frame: VADUserStartedSpeakingFrame): async def _handle_vad_user_started_speaking(self, frame: VADUserStartedSpeakingFrame):
"""Handle VAD user started speaking frame to start tracking transcriptions. """Handle VAD user started speaking frame to start tracking transcriptions.
@@ -408,7 +395,8 @@ class STTService(AIService):
# Calculate the actual speech end time (current time minus VAD stop delay). # Calculate the actual speech end time (current time minus VAD stop delay).
# This approximates when the last user audio was sent to the STT service, # This approximates when the last user audio was sent to the STT service,
# which we use to measure against the eventual transcription response. # which we use to measure against the eventual transcription response.
self._speech_end_time = frame.timestamp - frame.stop_secs speech_end_time = frame.timestamp - frame.stop_secs
await self.start_ttfb_metrics(start_time=speech_end_time)
# Start timeout task (any previous timeout was cancelled by VADUserStartedSpeakingFrame # Start timeout task (any previous timeout was cancelled by VADUserStartedSpeakingFrame
# or InterruptionFrame) # or InterruptionFrame)
@@ -417,44 +405,20 @@ class STTService(AIService):
) )
async def _ttfb_timeout_handler(self): async def _ttfb_timeout_handler(self):
"""Wait for timeout then report TTFB using the last transcription timestamp. """Wait for timeout then report TTFB.
This timeout allows the final transcription to arrive before we calculate This timeout allows the final transcription to arrive before we calculate
and report TTFB. If no transcription arrived, no TTFB is reported. and report TTFB. If no transcription arrived, no TTFB is reported.
""" """
try: try:
await asyncio.sleep(self._stt_ttfb_timeout) await asyncio.sleep(self._stt_ttfb_timeout)
await self.stop_ttfb_metrics()
# Report TTFB if we have both speech end time and transcription time
if self._speech_end_time is not None and self._last_transcription_time is not None:
ttfb = self._last_transcription_time - self._speech_end_time
await self._emit_stt_ttfb_metric(ttfb)
# Clear state after reporting
self._speech_end_time = None
self._last_transcription_time = None
except asyncio.CancelledError: except asyncio.CancelledError:
# Task was cancelled (new utterance or interruption), which is expected behavior # Task was cancelled (new utterance or interruption), which is expected behavior
pass pass
finally: finally:
self._ttfb_timeout_task = None self._ttfb_timeout_task = None
async def _emit_stt_ttfb_metric(self, ttfb: float):
"""Emit STT TTFB metric if value is non-negative.
Args:
ttfb: The TTFB value in seconds.
"""
if ttfb >= 0:
logger.debug(f"{self} TTFB: {ttfb:.3f}s")
if self.metrics_enabled:
ttfb_data = TTFBMetricsData(
processor=self.name,
model=self.model_name,
value=ttfb,
)
await super().push_frame(MetricsFrame(data=[ttfb_data]))
def _create_keepalive_task(self): def _create_keepalive_task(self):
"""Start the keepalive task if keepalive is enabled.""" """Start the keepalive task if keepalive is enabled."""
if self._keepalive_timeout is not None: if self._keepalive_timeout is not None: