Refactor STT TTFB metrics to use base class start/stop pattern

Eliminate custom _emit_stt_ttfb_metric and manual timestamp tracking in
STTService by reusing FrameProcessor's start_ttfb_metrics/stop_ttfb_metrics
with new start_time/end_time parameters. This keeps the chronological
start→stop ordering and removes _speech_end_time and _last_transcription_time
state from STTService.
This commit is contained in:
Aleix Conchillo Flaqué
2026-02-19 10:52:24 -08:00
parent 94e93bed83
commit 859cd7c920
3 changed files with 74 additions and 67 deletions

View File

@@ -419,27 +419,49 @@ class FrameProcessor(BaseObject):
"""
self._metrics.set_core_metrics_data(data)
async def start_ttfb_metrics(self):
"""Start time-to-first-byte metrics collection."""
if self.can_generate_metrics() and self.metrics_enabled:
await self._metrics.start_ttfb_metrics(self._report_only_initial_ttfb)
async def start_ttfb_metrics(self, *, start_time: Optional[float] = None):
"""Start time-to-first-byte metrics collection.
async def stop_ttfb_metrics(self):
"""Stop time-to-first-byte metrics collection and push results."""
Args:
start_time: Optional timestamp to use as the start time. If None,
uses the current time.
"""
if self.can_generate_metrics() and self.metrics_enabled:
frame = await self._metrics.stop_ttfb_metrics()
await self._metrics.start_ttfb_metrics(
start_time=start_time, report_only_initial_ttfb=self._report_only_initial_ttfb
)
async def stop_ttfb_metrics(self, *, end_time: Optional[float] = None):
"""Stop time-to-first-byte metrics collection and push results.
Args:
end_time: Optional timestamp to use as the end time. If None, uses
the current time.
"""
if self.can_generate_metrics() and self.metrics_enabled:
frame = await self._metrics.stop_ttfb_metrics(end_time=end_time)
if frame:
await self.push_frame(frame)
async def start_processing_metrics(self):
"""Start processing metrics collection."""
if self.can_generate_metrics() and self.metrics_enabled:
await self._metrics.start_processing_metrics()
async def start_processing_metrics(self, *, start_time: Optional[float] = None):
"""Start processing metrics collection.
async def stop_processing_metrics(self):
"""Stop processing metrics collection and push results."""
Args:
start_time: Optional timestamp to use as the start time. If None,
uses the current time.
"""
if self.can_generate_metrics() and self.metrics_enabled:
frame = await self._metrics.stop_processing_metrics()
await self._metrics.start_processing_metrics(start_time=start_time)
async def stop_processing_metrics(self, *, end_time: Optional[float] = None):
"""Stop processing metrics collection and push results.
Args:
end_time: Optional timestamp to use as the end time. If None, uses
the current time.
"""
if self.can_generate_metrics() and self.metrics_enabled:
frame = await self._metrics.stop_processing_metrics(end_time=end_time)
if frame:
await self.push_frame(frame)

View File

@@ -107,49 +107,70 @@ class FrameProcessorMetrics(BaseObject):
"""
self._core_metrics_data = MetricsData(processor=name)
async def start_ttfb_metrics(self, report_only_initial_ttfb):
async def start_ttfb_metrics(
self, *, start_time: Optional[float] = None, report_only_initial_ttfb: bool
):
"""Start measuring time-to-first-byte (TTFB).
Args:
start_time: Optional timestamp to use as the start time. If None,
uses the current time.
report_only_initial_ttfb: Whether to report only the first TTFB measurement.
"""
if self._should_report_ttfb:
self._start_ttfb_time = time.time()
self._start_ttfb_time = start_time or time.time()
self._last_ttfb_time = 0
self._should_report_ttfb = not report_only_initial_ttfb
async def stop_ttfb_metrics(self):
async def stop_ttfb_metrics(self, *, end_time: Optional[float] = None):
"""Stop TTFB measurement and generate metrics frame.
Args:
end_time: Optional timestamp to use as the end time. If None, uses
the current time.
Returns:
MetricsFrame containing TTFB data, or None if not measuring.
"""
if self._start_ttfb_time == 0:
return None
self._last_ttfb_time = time.time() - self._start_ttfb_time
logger.debug(f"{self._processor_name()} TTFB: {self._last_ttfb_time}")
end_time = end_time or time.time()
self._last_ttfb_time = end_time - self._start_ttfb_time
logger.debug(f"{self._processor_name()} TTFB: {self._last_ttfb_time:.3f}s")
ttfb = TTFBMetricsData(
processor=self._processor_name(), value=self._last_ttfb_time, model=self._model_name()
)
self._start_ttfb_time = 0
return MetricsFrame(data=[ttfb])
async def start_processing_metrics(self):
"""Start measuring processing time."""
self._start_processing_time = time.time()
async def start_processing_metrics(self, *, start_time: Optional[float] = None):
"""Start measuring processing time.
async def stop_processing_metrics(self):
Args:
start_time: Optional timestamp to use as the start time. If None,
uses the current time.
"""
self._start_processing_time = start_time or time.time()
async def stop_processing_metrics(self, *, end_time: Optional[float] = None):
"""Stop processing time measurement and generate metrics frame.
Args:
end_time: Optional timestamp to use as the end time. If None, uses
the current time.
Returns:
MetricsFrame containing processing duration data, or None if not measuring.
"""
if self._start_processing_time == 0:
return None
value = time.time() - self._start_processing_time
logger.debug(f"{self._processor_name()} processing time: {value}")
end_time = end_time or time.time()
value = end_time - self._start_processing_time
logger.debug(f"{self._processor_name()} processing time: {value:.3f}s")
processing = ProcessingMetricsData(
processor=self._processor_name(), value=value, model=self._model_name()
)

View File

@@ -21,7 +21,6 @@ from pipecat.frames.frames import (
ErrorFrame,
Frame,
InterruptionFrame,
MetricsFrame,
ServiceSwitcherRequestMetadataFrame,
StartFrame,
STTMetadataFrame,
@@ -31,7 +30,6 @@ from pipecat.frames.frames import (
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.metrics.metrics import TTFBMetricsData
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_service import AIService
from pipecat.services.stt_latency import DEFAULT_TTFS_P99
@@ -121,9 +119,7 @@ class STTService(AIService):
# STT TTFB tracking state
self._stt_ttfb_timeout = stt_ttfb_timeout
self._ttfb_timeout_task: Optional[asyncio.Task] = None
self._speech_end_time: Optional[float] = None
self._user_speaking: bool = False
self._last_transcription_time: Optional[float] = None
self._finalize_pending: bool = False
self._finalize_requested: bool = False
@@ -327,23 +323,16 @@ class STTService(AIService):
direction: The direction to push the frame.
"""
if isinstance(frame, TranscriptionFrame):
# Store the transcription time for TTFB calculation
self._last_transcription_time = time.time()
# Set finalized from pending state and auto-reset
if self._finalize_pending:
frame.finalized = True
self._finalize_pending = False
# If this is a finalized transcription, report TTFB immediately
if frame.finalized and self._speech_end_time is not None:
ttfb = self._last_transcription_time - self._speech_end_time
await self._emit_stt_ttfb_metric(ttfb)
if frame.finalized:
await self.stop_ttfb_metrics()
# Cancel the timeout since we've already reported
await self._cancel_ttfb_timeout()
# Clear state
self._speech_end_time = None
self._last_transcription_time = None
await super().push_frame(frame, direction)
@@ -373,8 +362,6 @@ class STTService(AIService):
while user is still speaking.
"""
await self._cancel_ttfb_timeout()
self._speech_end_time = None
self._last_transcription_time = None
async def _handle_vad_user_started_speaking(self, frame: VADUserStartedSpeakingFrame):
"""Handle VAD user started speaking frame to start tracking transcriptions.
@@ -408,7 +395,8 @@ class STTService(AIService):
# Calculate the actual speech end time (current time minus VAD stop delay).
# This approximates when the last user audio was sent to the STT service,
# which we use to measure against the eventual transcription response.
self._speech_end_time = frame.timestamp - frame.stop_secs
speech_end_time = frame.timestamp - frame.stop_secs
await self.start_ttfb_metrics(start_time=speech_end_time)
# Start timeout task (any previous timeout was cancelled by VADUserStartedSpeakingFrame
# or InterruptionFrame)
@@ -417,44 +405,20 @@ class STTService(AIService):
)
async def _ttfb_timeout_handler(self):
"""Wait for timeout then report TTFB using the last transcription timestamp.
"""Wait for timeout then report TTFB.
This timeout allows the final transcription to arrive before we calculate
and report TTFB. If no transcription arrived, no TTFB is reported.
"""
try:
await asyncio.sleep(self._stt_ttfb_timeout)
# Report TTFB if we have both speech end time and transcription time
if self._speech_end_time is not None and self._last_transcription_time is not None:
ttfb = self._last_transcription_time - self._speech_end_time
await self._emit_stt_ttfb_metric(ttfb)
# Clear state after reporting
self._speech_end_time = None
self._last_transcription_time = None
await self.stop_ttfb_metrics()
except asyncio.CancelledError:
# Task was cancelled (new utterance or interruption), which is expected behavior
pass
finally:
self._ttfb_timeout_task = None
async def _emit_stt_ttfb_metric(self, ttfb: float):
"""Emit STT TTFB metric if value is non-negative.
Args:
ttfb: The TTFB value in seconds.
"""
if ttfb >= 0:
logger.debug(f"{self} TTFB: {ttfb:.3f}s")
if self.metrics_enabled:
ttfb_data = TTFBMetricsData(
processor=self.name,
model=self.model_name,
value=ttfb,
)
await super().push_frame(MetricsFrame(data=[ttfb_data]))
def _create_keepalive_task(self):
"""Start the keepalive task if keepalive is enabled."""
if self._keepalive_timeout is not None: