Refactor STT TTFB metrics to use base class start/stop pattern
Eliminate custom _emit_stt_ttfb_metric and manual timestamp tracking in STTService by reusing FrameProcessor's start_ttfb_metrics/stop_ttfb_metrics with new start_time/end_time parameters. This keeps the chronological start→stop ordering and removes _speech_end_time and _last_transcription_time state from STTService.
This commit is contained in:
@@ -419,27 +419,49 @@ class FrameProcessor(BaseObject):
|
||||
"""
|
||||
self._metrics.set_core_metrics_data(data)
|
||||
|
||||
async def start_ttfb_metrics(self):
|
||||
"""Start time-to-first-byte metrics collection."""
|
||||
if self.can_generate_metrics() and self.metrics_enabled:
|
||||
await self._metrics.start_ttfb_metrics(self._report_only_initial_ttfb)
|
||||
async def start_ttfb_metrics(self, *, start_time: Optional[float] = None):
|
||||
"""Start time-to-first-byte metrics collection.
|
||||
|
||||
async def stop_ttfb_metrics(self):
|
||||
"""Stop time-to-first-byte metrics collection and push results."""
|
||||
Args:
|
||||
start_time: Optional timestamp to use as the start time. If None,
|
||||
uses the current time.
|
||||
"""
|
||||
if self.can_generate_metrics() and self.metrics_enabled:
|
||||
frame = await self._metrics.stop_ttfb_metrics()
|
||||
await self._metrics.start_ttfb_metrics(
|
||||
start_time=start_time, report_only_initial_ttfb=self._report_only_initial_ttfb
|
||||
)
|
||||
|
||||
async def stop_ttfb_metrics(self, *, end_time: Optional[float] = None):
|
||||
"""Stop time-to-first-byte metrics collection and push results.
|
||||
|
||||
Args:
|
||||
end_time: Optional timestamp to use as the end time. If None, uses
|
||||
the current time.
|
||||
"""
|
||||
if self.can_generate_metrics() and self.metrics_enabled:
|
||||
frame = await self._metrics.stop_ttfb_metrics(end_time=end_time)
|
||||
if frame:
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def start_processing_metrics(self):
|
||||
"""Start processing metrics collection."""
|
||||
if self.can_generate_metrics() and self.metrics_enabled:
|
||||
await self._metrics.start_processing_metrics()
|
||||
async def start_processing_metrics(self, *, start_time: Optional[float] = None):
|
||||
"""Start processing metrics collection.
|
||||
|
||||
async def stop_processing_metrics(self):
|
||||
"""Stop processing metrics collection and push results."""
|
||||
Args:
|
||||
start_time: Optional timestamp to use as the start time. If None,
|
||||
uses the current time.
|
||||
"""
|
||||
if self.can_generate_metrics() and self.metrics_enabled:
|
||||
frame = await self._metrics.stop_processing_metrics()
|
||||
await self._metrics.start_processing_metrics(start_time=start_time)
|
||||
|
||||
async def stop_processing_metrics(self, *, end_time: Optional[float] = None):
|
||||
"""Stop processing metrics collection and push results.
|
||||
|
||||
Args:
|
||||
end_time: Optional timestamp to use as the end time. If None, uses
|
||||
the current time.
|
||||
"""
|
||||
if self.can_generate_metrics() and self.metrics_enabled:
|
||||
frame = await self._metrics.stop_processing_metrics(end_time=end_time)
|
||||
if frame:
|
||||
await self.push_frame(frame)
|
||||
|
||||
|
||||
@@ -107,49 +107,70 @@ class FrameProcessorMetrics(BaseObject):
|
||||
"""
|
||||
self._core_metrics_data = MetricsData(processor=name)
|
||||
|
||||
async def start_ttfb_metrics(self, report_only_initial_ttfb):
|
||||
async def start_ttfb_metrics(
|
||||
self, *, start_time: Optional[float] = None, report_only_initial_ttfb: bool
|
||||
):
|
||||
"""Start measuring time-to-first-byte (TTFB).
|
||||
|
||||
Args:
|
||||
start_time: Optional timestamp to use as the start time. If None,
|
||||
uses the current time.
|
||||
report_only_initial_ttfb: Whether to report only the first TTFB measurement.
|
||||
"""
|
||||
if self._should_report_ttfb:
|
||||
self._start_ttfb_time = time.time()
|
||||
self._start_ttfb_time = start_time or time.time()
|
||||
self._last_ttfb_time = 0
|
||||
self._should_report_ttfb = not report_only_initial_ttfb
|
||||
|
||||
async def stop_ttfb_metrics(self):
|
||||
async def stop_ttfb_metrics(self, *, end_time: Optional[float] = None):
|
||||
"""Stop TTFB measurement and generate metrics frame.
|
||||
|
||||
Args:
|
||||
end_time: Optional timestamp to use as the end time. If None, uses
|
||||
the current time.
|
||||
|
||||
Returns:
|
||||
MetricsFrame containing TTFB data, or None if not measuring.
|
||||
"""
|
||||
if self._start_ttfb_time == 0:
|
||||
return None
|
||||
|
||||
self._last_ttfb_time = time.time() - self._start_ttfb_time
|
||||
logger.debug(f"{self._processor_name()} TTFB: {self._last_ttfb_time}")
|
||||
end_time = end_time or time.time()
|
||||
|
||||
self._last_ttfb_time = end_time - self._start_ttfb_time
|
||||
logger.debug(f"{self._processor_name()} TTFB: {self._last_ttfb_time:.3f}s")
|
||||
ttfb = TTFBMetricsData(
|
||||
processor=self._processor_name(), value=self._last_ttfb_time, model=self._model_name()
|
||||
)
|
||||
self._start_ttfb_time = 0
|
||||
return MetricsFrame(data=[ttfb])
|
||||
|
||||
async def start_processing_metrics(self):
|
||||
"""Start measuring processing time."""
|
||||
self._start_processing_time = time.time()
|
||||
async def start_processing_metrics(self, *, start_time: Optional[float] = None):
|
||||
"""Start measuring processing time.
|
||||
|
||||
async def stop_processing_metrics(self):
|
||||
Args:
|
||||
start_time: Optional timestamp to use as the start time. If None,
|
||||
uses the current time.
|
||||
"""
|
||||
self._start_processing_time = start_time or time.time()
|
||||
|
||||
async def stop_processing_metrics(self, *, end_time: Optional[float] = None):
|
||||
"""Stop processing time measurement and generate metrics frame.
|
||||
|
||||
Args:
|
||||
end_time: Optional timestamp to use as the end time. If None, uses
|
||||
the current time.
|
||||
|
||||
Returns:
|
||||
MetricsFrame containing processing duration data, or None if not measuring.
|
||||
"""
|
||||
if self._start_processing_time == 0:
|
||||
return None
|
||||
|
||||
value = time.time() - self._start_processing_time
|
||||
logger.debug(f"{self._processor_name()} processing time: {value}")
|
||||
end_time = end_time or time.time()
|
||||
|
||||
value = end_time - self._start_processing_time
|
||||
logger.debug(f"{self._processor_name()} processing time: {value:.3f}s")
|
||||
processing = ProcessingMetricsData(
|
||||
processor=self._processor_name(), value=value, model=self._model_name()
|
||||
)
|
||||
|
||||
@@ -21,7 +21,6 @@ from pipecat.frames.frames import (
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
InterruptionFrame,
|
||||
MetricsFrame,
|
||||
ServiceSwitcherRequestMetadataFrame,
|
||||
StartFrame,
|
||||
STTMetadataFrame,
|
||||
@@ -31,7 +30,6 @@ from pipecat.frames.frames import (
|
||||
VADUserStartedSpeakingFrame,
|
||||
VADUserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.metrics.metrics import TTFBMetricsData
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_service import AIService
|
||||
from pipecat.services.stt_latency import DEFAULT_TTFS_P99
|
||||
@@ -121,9 +119,7 @@ class STTService(AIService):
|
||||
# STT TTFB tracking state
|
||||
self._stt_ttfb_timeout = stt_ttfb_timeout
|
||||
self._ttfb_timeout_task: Optional[asyncio.Task] = None
|
||||
self._speech_end_time: Optional[float] = None
|
||||
self._user_speaking: bool = False
|
||||
self._last_transcription_time: Optional[float] = None
|
||||
self._finalize_pending: bool = False
|
||||
self._finalize_requested: bool = False
|
||||
|
||||
@@ -327,23 +323,16 @@ class STTService(AIService):
|
||||
direction: The direction to push the frame.
|
||||
"""
|
||||
if isinstance(frame, TranscriptionFrame):
|
||||
# Store the transcription time for TTFB calculation
|
||||
self._last_transcription_time = time.time()
|
||||
|
||||
# Set finalized from pending state and auto-reset
|
||||
if self._finalize_pending:
|
||||
frame.finalized = True
|
||||
self._finalize_pending = False
|
||||
|
||||
# If this is a finalized transcription, report TTFB immediately
|
||||
if frame.finalized and self._speech_end_time is not None:
|
||||
ttfb = self._last_transcription_time - self._speech_end_time
|
||||
await self._emit_stt_ttfb_metric(ttfb)
|
||||
if frame.finalized:
|
||||
await self.stop_ttfb_metrics()
|
||||
# Cancel the timeout since we've already reported
|
||||
await self._cancel_ttfb_timeout()
|
||||
# Clear state
|
||||
self._speech_end_time = None
|
||||
self._last_transcription_time = None
|
||||
|
||||
await super().push_frame(frame, direction)
|
||||
|
||||
@@ -373,8 +362,6 @@ class STTService(AIService):
|
||||
while user is still speaking.
|
||||
"""
|
||||
await self._cancel_ttfb_timeout()
|
||||
self._speech_end_time = None
|
||||
self._last_transcription_time = None
|
||||
|
||||
async def _handle_vad_user_started_speaking(self, frame: VADUserStartedSpeakingFrame):
|
||||
"""Handle VAD user started speaking frame to start tracking transcriptions.
|
||||
@@ -408,7 +395,8 @@ class STTService(AIService):
|
||||
# Calculate the actual speech end time (current time minus VAD stop delay).
|
||||
# This approximates when the last user audio was sent to the STT service,
|
||||
# which we use to measure against the eventual transcription response.
|
||||
self._speech_end_time = frame.timestamp - frame.stop_secs
|
||||
speech_end_time = frame.timestamp - frame.stop_secs
|
||||
await self.start_ttfb_metrics(start_time=speech_end_time)
|
||||
|
||||
# Start timeout task (any previous timeout was cancelled by VADUserStartedSpeakingFrame
|
||||
# or InterruptionFrame)
|
||||
@@ -417,44 +405,20 @@ class STTService(AIService):
|
||||
)
|
||||
|
||||
async def _ttfb_timeout_handler(self):
|
||||
"""Wait for timeout then report TTFB using the last transcription timestamp.
|
||||
"""Wait for timeout then report TTFB.
|
||||
|
||||
This timeout allows the final transcription to arrive before we calculate
|
||||
and report TTFB. If no transcription arrived, no TTFB is reported.
|
||||
"""
|
||||
try:
|
||||
await asyncio.sleep(self._stt_ttfb_timeout)
|
||||
|
||||
# Report TTFB if we have both speech end time and transcription time
|
||||
if self._speech_end_time is not None and self._last_transcription_time is not None:
|
||||
ttfb = self._last_transcription_time - self._speech_end_time
|
||||
await self._emit_stt_ttfb_metric(ttfb)
|
||||
|
||||
# Clear state after reporting
|
||||
self._speech_end_time = None
|
||||
self._last_transcription_time = None
|
||||
await self.stop_ttfb_metrics()
|
||||
except asyncio.CancelledError:
|
||||
# Task was cancelled (new utterance or interruption), which is expected behavior
|
||||
pass
|
||||
finally:
|
||||
self._ttfb_timeout_task = None
|
||||
|
||||
async def _emit_stt_ttfb_metric(self, ttfb: float):
|
||||
"""Emit STT TTFB metric if value is non-negative.
|
||||
|
||||
Args:
|
||||
ttfb: The TTFB value in seconds.
|
||||
"""
|
||||
if ttfb >= 0:
|
||||
logger.debug(f"{self} TTFB: {ttfb:.3f}s")
|
||||
if self.metrics_enabled:
|
||||
ttfb_data = TTFBMetricsData(
|
||||
processor=self.name,
|
||||
model=self.model_name,
|
||||
value=ttfb,
|
||||
)
|
||||
await super().push_frame(MetricsFrame(data=[ttfb_data]))
|
||||
|
||||
def _create_keepalive_task(self):
|
||||
"""Start the keepalive task if keepalive is enabled."""
|
||||
if self._keepalive_timeout is not None:
|
||||
|
||||
Reference in New Issue
Block a user