Refactor STT TTFB metrics to use base class start/stop pattern
Eliminate custom _emit_stt_ttfb_metric and manual timestamp tracking in STTService by reusing FrameProcessor's start_ttfb_metrics/stop_ttfb_metrics with new start_time/end_time parameters. This keeps the chronological start→stop ordering and removes _speech_end_time and _last_transcription_time state from STTService.
This commit is contained in:
@@ -419,27 +419,49 @@ class FrameProcessor(BaseObject):
|
|||||||
"""
|
"""
|
||||||
self._metrics.set_core_metrics_data(data)
|
self._metrics.set_core_metrics_data(data)
|
||||||
|
|
||||||
async def start_ttfb_metrics(self):
|
async def start_ttfb_metrics(self, *, start_time: Optional[float] = None):
|
||||||
"""Start time-to-first-byte metrics collection."""
|
"""Start time-to-first-byte metrics collection.
|
||||||
if self.can_generate_metrics() and self.metrics_enabled:
|
|
||||||
await self._metrics.start_ttfb_metrics(self._report_only_initial_ttfb)
|
|
||||||
|
|
||||||
async def stop_ttfb_metrics(self):
|
Args:
|
||||||
"""Stop time-to-first-byte metrics collection and push results."""
|
start_time: Optional timestamp to use as the start time. If None,
|
||||||
|
uses the current time.
|
||||||
|
"""
|
||||||
if self.can_generate_metrics() and self.metrics_enabled:
|
if self.can_generate_metrics() and self.metrics_enabled:
|
||||||
frame = await self._metrics.stop_ttfb_metrics()
|
await self._metrics.start_ttfb_metrics(
|
||||||
|
start_time=start_time, report_only_initial_ttfb=self._report_only_initial_ttfb
|
||||||
|
)
|
||||||
|
|
||||||
|
async def stop_ttfb_metrics(self, *, end_time: Optional[float] = None):
|
||||||
|
"""Stop time-to-first-byte metrics collection and push results.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
end_time: Optional timestamp to use as the end time. If None, uses
|
||||||
|
the current time.
|
||||||
|
"""
|
||||||
|
if self.can_generate_metrics() and self.metrics_enabled:
|
||||||
|
frame = await self._metrics.stop_ttfb_metrics(end_time=end_time)
|
||||||
if frame:
|
if frame:
|
||||||
await self.push_frame(frame)
|
await self.push_frame(frame)
|
||||||
|
|
||||||
async def start_processing_metrics(self):
|
async def start_processing_metrics(self, *, start_time: Optional[float] = None):
|
||||||
"""Start processing metrics collection."""
|
"""Start processing metrics collection.
|
||||||
if self.can_generate_metrics() and self.metrics_enabled:
|
|
||||||
await self._metrics.start_processing_metrics()
|
|
||||||
|
|
||||||
async def stop_processing_metrics(self):
|
Args:
|
||||||
"""Stop processing metrics collection and push results."""
|
start_time: Optional timestamp to use as the start time. If None,
|
||||||
|
uses the current time.
|
||||||
|
"""
|
||||||
if self.can_generate_metrics() and self.metrics_enabled:
|
if self.can_generate_metrics() and self.metrics_enabled:
|
||||||
frame = await self._metrics.stop_processing_metrics()
|
await self._metrics.start_processing_metrics(start_time=start_time)
|
||||||
|
|
||||||
|
async def stop_processing_metrics(self, *, end_time: Optional[float] = None):
|
||||||
|
"""Stop processing metrics collection and push results.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
end_time: Optional timestamp to use as the end time. If None, uses
|
||||||
|
the current time.
|
||||||
|
"""
|
||||||
|
if self.can_generate_metrics() and self.metrics_enabled:
|
||||||
|
frame = await self._metrics.stop_processing_metrics(end_time=end_time)
|
||||||
if frame:
|
if frame:
|
||||||
await self.push_frame(frame)
|
await self.push_frame(frame)
|
||||||
|
|
||||||
|
|||||||
@@ -107,49 +107,70 @@ class FrameProcessorMetrics(BaseObject):
|
|||||||
"""
|
"""
|
||||||
self._core_metrics_data = MetricsData(processor=name)
|
self._core_metrics_data = MetricsData(processor=name)
|
||||||
|
|
||||||
async def start_ttfb_metrics(self, report_only_initial_ttfb):
|
async def start_ttfb_metrics(
|
||||||
|
self, *, start_time: Optional[float] = None, report_only_initial_ttfb: bool
|
||||||
|
):
|
||||||
"""Start measuring time-to-first-byte (TTFB).
|
"""Start measuring time-to-first-byte (TTFB).
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
start_time: Optional timestamp to use as the start time. If None,
|
||||||
|
uses the current time.
|
||||||
report_only_initial_ttfb: Whether to report only the first TTFB measurement.
|
report_only_initial_ttfb: Whether to report only the first TTFB measurement.
|
||||||
"""
|
"""
|
||||||
if self._should_report_ttfb:
|
if self._should_report_ttfb:
|
||||||
self._start_ttfb_time = time.time()
|
self._start_ttfb_time = start_time or time.time()
|
||||||
self._last_ttfb_time = 0
|
self._last_ttfb_time = 0
|
||||||
self._should_report_ttfb = not report_only_initial_ttfb
|
self._should_report_ttfb = not report_only_initial_ttfb
|
||||||
|
|
||||||
async def stop_ttfb_metrics(self):
|
async def stop_ttfb_metrics(self, *, end_time: Optional[float] = None):
|
||||||
"""Stop TTFB measurement and generate metrics frame.
|
"""Stop TTFB measurement and generate metrics frame.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
end_time: Optional timestamp to use as the end time. If None, uses
|
||||||
|
the current time.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
MetricsFrame containing TTFB data, or None if not measuring.
|
MetricsFrame containing TTFB data, or None if not measuring.
|
||||||
"""
|
"""
|
||||||
if self._start_ttfb_time == 0:
|
if self._start_ttfb_time == 0:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
self._last_ttfb_time = time.time() - self._start_ttfb_time
|
end_time = end_time or time.time()
|
||||||
logger.debug(f"{self._processor_name()} TTFB: {self._last_ttfb_time}")
|
|
||||||
|
self._last_ttfb_time = end_time - self._start_ttfb_time
|
||||||
|
logger.debug(f"{self._processor_name()} TTFB: {self._last_ttfb_time:.3f}s")
|
||||||
ttfb = TTFBMetricsData(
|
ttfb = TTFBMetricsData(
|
||||||
processor=self._processor_name(), value=self._last_ttfb_time, model=self._model_name()
|
processor=self._processor_name(), value=self._last_ttfb_time, model=self._model_name()
|
||||||
)
|
)
|
||||||
self._start_ttfb_time = 0
|
self._start_ttfb_time = 0
|
||||||
return MetricsFrame(data=[ttfb])
|
return MetricsFrame(data=[ttfb])
|
||||||
|
|
||||||
async def start_processing_metrics(self):
|
async def start_processing_metrics(self, *, start_time: Optional[float] = None):
|
||||||
"""Start measuring processing time."""
|
"""Start measuring processing time.
|
||||||
self._start_processing_time = time.time()
|
|
||||||
|
|
||||||
async def stop_processing_metrics(self):
|
Args:
|
||||||
|
start_time: Optional timestamp to use as the start time. If None,
|
||||||
|
uses the current time.
|
||||||
|
"""
|
||||||
|
self._start_processing_time = start_time or time.time()
|
||||||
|
|
||||||
|
async def stop_processing_metrics(self, *, end_time: Optional[float] = None):
|
||||||
"""Stop processing time measurement and generate metrics frame.
|
"""Stop processing time measurement and generate metrics frame.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
end_time: Optional timestamp to use as the end time. If None, uses
|
||||||
|
the current time.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
MetricsFrame containing processing duration data, or None if not measuring.
|
MetricsFrame containing processing duration data, or None if not measuring.
|
||||||
"""
|
"""
|
||||||
if self._start_processing_time == 0:
|
if self._start_processing_time == 0:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
value = time.time() - self._start_processing_time
|
end_time = end_time or time.time()
|
||||||
logger.debug(f"{self._processor_name()} processing time: {value}")
|
|
||||||
|
value = end_time - self._start_processing_time
|
||||||
|
logger.debug(f"{self._processor_name()} processing time: {value:.3f}s")
|
||||||
processing = ProcessingMetricsData(
|
processing = ProcessingMetricsData(
|
||||||
processor=self._processor_name(), value=value, model=self._model_name()
|
processor=self._processor_name(), value=value, model=self._model_name()
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -21,7 +21,6 @@ from pipecat.frames.frames import (
|
|||||||
ErrorFrame,
|
ErrorFrame,
|
||||||
Frame,
|
Frame,
|
||||||
InterruptionFrame,
|
InterruptionFrame,
|
||||||
MetricsFrame,
|
|
||||||
ServiceSwitcherRequestMetadataFrame,
|
ServiceSwitcherRequestMetadataFrame,
|
||||||
StartFrame,
|
StartFrame,
|
||||||
STTMetadataFrame,
|
STTMetadataFrame,
|
||||||
@@ -31,7 +30,6 @@ from pipecat.frames.frames import (
|
|||||||
VADUserStartedSpeakingFrame,
|
VADUserStartedSpeakingFrame,
|
||||||
VADUserStoppedSpeakingFrame,
|
VADUserStoppedSpeakingFrame,
|
||||||
)
|
)
|
||||||
from pipecat.metrics.metrics import TTFBMetricsData
|
|
||||||
from pipecat.processors.frame_processor import FrameDirection
|
from pipecat.processors.frame_processor import FrameDirection
|
||||||
from pipecat.services.ai_service import AIService
|
from pipecat.services.ai_service import AIService
|
||||||
from pipecat.services.stt_latency import DEFAULT_TTFS_P99
|
from pipecat.services.stt_latency import DEFAULT_TTFS_P99
|
||||||
@@ -121,9 +119,7 @@ class STTService(AIService):
|
|||||||
# STT TTFB tracking state
|
# STT TTFB tracking state
|
||||||
self._stt_ttfb_timeout = stt_ttfb_timeout
|
self._stt_ttfb_timeout = stt_ttfb_timeout
|
||||||
self._ttfb_timeout_task: Optional[asyncio.Task] = None
|
self._ttfb_timeout_task: Optional[asyncio.Task] = None
|
||||||
self._speech_end_time: Optional[float] = None
|
|
||||||
self._user_speaking: bool = False
|
self._user_speaking: bool = False
|
||||||
self._last_transcription_time: Optional[float] = None
|
|
||||||
self._finalize_pending: bool = False
|
self._finalize_pending: bool = False
|
||||||
self._finalize_requested: bool = False
|
self._finalize_requested: bool = False
|
||||||
|
|
||||||
@@ -327,23 +323,16 @@ class STTService(AIService):
|
|||||||
direction: The direction to push the frame.
|
direction: The direction to push the frame.
|
||||||
"""
|
"""
|
||||||
if isinstance(frame, TranscriptionFrame):
|
if isinstance(frame, TranscriptionFrame):
|
||||||
# Store the transcription time for TTFB calculation
|
|
||||||
self._last_transcription_time = time.time()
|
|
||||||
|
|
||||||
# Set finalized from pending state and auto-reset
|
# Set finalized from pending state and auto-reset
|
||||||
if self._finalize_pending:
|
if self._finalize_pending:
|
||||||
frame.finalized = True
|
frame.finalized = True
|
||||||
self._finalize_pending = False
|
self._finalize_pending = False
|
||||||
|
|
||||||
# If this is a finalized transcription, report TTFB immediately
|
# If this is a finalized transcription, report TTFB immediately
|
||||||
if frame.finalized and self._speech_end_time is not None:
|
if frame.finalized:
|
||||||
ttfb = self._last_transcription_time - self._speech_end_time
|
await self.stop_ttfb_metrics()
|
||||||
await self._emit_stt_ttfb_metric(ttfb)
|
|
||||||
# Cancel the timeout since we've already reported
|
# Cancel the timeout since we've already reported
|
||||||
await self._cancel_ttfb_timeout()
|
await self._cancel_ttfb_timeout()
|
||||||
# Clear state
|
|
||||||
self._speech_end_time = None
|
|
||||||
self._last_transcription_time = None
|
|
||||||
|
|
||||||
await super().push_frame(frame, direction)
|
await super().push_frame(frame, direction)
|
||||||
|
|
||||||
@@ -373,8 +362,6 @@ class STTService(AIService):
|
|||||||
while user is still speaking.
|
while user is still speaking.
|
||||||
"""
|
"""
|
||||||
await self._cancel_ttfb_timeout()
|
await self._cancel_ttfb_timeout()
|
||||||
self._speech_end_time = None
|
|
||||||
self._last_transcription_time = None
|
|
||||||
|
|
||||||
async def _handle_vad_user_started_speaking(self, frame: VADUserStartedSpeakingFrame):
|
async def _handle_vad_user_started_speaking(self, frame: VADUserStartedSpeakingFrame):
|
||||||
"""Handle VAD user started speaking frame to start tracking transcriptions.
|
"""Handle VAD user started speaking frame to start tracking transcriptions.
|
||||||
@@ -408,7 +395,8 @@ class STTService(AIService):
|
|||||||
# Calculate the actual speech end time (current time minus VAD stop delay).
|
# Calculate the actual speech end time (current time minus VAD stop delay).
|
||||||
# This approximates when the last user audio was sent to the STT service,
|
# This approximates when the last user audio was sent to the STT service,
|
||||||
# which we use to measure against the eventual transcription response.
|
# which we use to measure against the eventual transcription response.
|
||||||
self._speech_end_time = frame.timestamp - frame.stop_secs
|
speech_end_time = frame.timestamp - frame.stop_secs
|
||||||
|
await self.start_ttfb_metrics(start_time=speech_end_time)
|
||||||
|
|
||||||
# Start timeout task (any previous timeout was cancelled by VADUserStartedSpeakingFrame
|
# Start timeout task (any previous timeout was cancelled by VADUserStartedSpeakingFrame
|
||||||
# or InterruptionFrame)
|
# or InterruptionFrame)
|
||||||
@@ -417,44 +405,20 @@ class STTService(AIService):
|
|||||||
)
|
)
|
||||||
|
|
||||||
async def _ttfb_timeout_handler(self):
|
async def _ttfb_timeout_handler(self):
|
||||||
"""Wait for timeout then report TTFB using the last transcription timestamp.
|
"""Wait for timeout then report TTFB.
|
||||||
|
|
||||||
This timeout allows the final transcription to arrive before we calculate
|
This timeout allows the final transcription to arrive before we calculate
|
||||||
and report TTFB. If no transcription arrived, no TTFB is reported.
|
and report TTFB. If no transcription arrived, no TTFB is reported.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
await asyncio.sleep(self._stt_ttfb_timeout)
|
await asyncio.sleep(self._stt_ttfb_timeout)
|
||||||
|
await self.stop_ttfb_metrics()
|
||||||
# Report TTFB if we have both speech end time and transcription time
|
|
||||||
if self._speech_end_time is not None and self._last_transcription_time is not None:
|
|
||||||
ttfb = self._last_transcription_time - self._speech_end_time
|
|
||||||
await self._emit_stt_ttfb_metric(ttfb)
|
|
||||||
|
|
||||||
# Clear state after reporting
|
|
||||||
self._speech_end_time = None
|
|
||||||
self._last_transcription_time = None
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
# Task was cancelled (new utterance or interruption), which is expected behavior
|
# Task was cancelled (new utterance or interruption), which is expected behavior
|
||||||
pass
|
pass
|
||||||
finally:
|
finally:
|
||||||
self._ttfb_timeout_task = None
|
self._ttfb_timeout_task = None
|
||||||
|
|
||||||
async def _emit_stt_ttfb_metric(self, ttfb: float):
|
|
||||||
"""Emit STT TTFB metric if value is non-negative.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
ttfb: The TTFB value in seconds.
|
|
||||||
"""
|
|
||||||
if ttfb >= 0:
|
|
||||||
logger.debug(f"{self} TTFB: {ttfb:.3f}s")
|
|
||||||
if self.metrics_enabled:
|
|
||||||
ttfb_data = TTFBMetricsData(
|
|
||||||
processor=self.name,
|
|
||||||
model=self.model_name,
|
|
||||||
value=ttfb,
|
|
||||||
)
|
|
||||||
await super().push_frame(MetricsFrame(data=[ttfb_data]))
|
|
||||||
|
|
||||||
def _create_keepalive_task(self):
|
def _create_keepalive_task(self):
|
||||||
"""Start the keepalive task if keepalive is enabled."""
|
"""Start the keepalive task if keepalive is enabled."""
|
||||||
if self._keepalive_timeout is not None:
|
if self._keepalive_timeout is not None:
|
||||||
|
|||||||
Reference in New Issue
Block a user