diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index f0c9e7183..7423c4845 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -419,27 +419,49 @@ class FrameProcessor(BaseObject): """ self._metrics.set_core_metrics_data(data) - async def start_ttfb_metrics(self): - """Start time-to-first-byte metrics collection.""" - if self.can_generate_metrics() and self.metrics_enabled: - await self._metrics.start_ttfb_metrics(self._report_only_initial_ttfb) + async def start_ttfb_metrics(self, *, start_time: Optional[float] = None): + """Start time-to-first-byte metrics collection. - async def stop_ttfb_metrics(self): - """Stop time-to-first-byte metrics collection and push results.""" + Args: + start_time: Optional timestamp to use as the start time. If None, + uses the current time. + """ if self.can_generate_metrics() and self.metrics_enabled: - frame = await self._metrics.stop_ttfb_metrics() + await self._metrics.start_ttfb_metrics( + start_time=start_time, report_only_initial_ttfb=self._report_only_initial_ttfb + ) + + async def stop_ttfb_metrics(self, *, end_time: Optional[float] = None): + """Stop time-to-first-byte metrics collection and push results. + + Args: + end_time: Optional timestamp to use as the end time. If None, uses + the current time. + """ + if self.can_generate_metrics() and self.metrics_enabled: + frame = await self._metrics.stop_ttfb_metrics(end_time=end_time) if frame: await self.push_frame(frame) - async def start_processing_metrics(self): - """Start processing metrics collection.""" - if self.can_generate_metrics() and self.metrics_enabled: - await self._metrics.start_processing_metrics() + async def start_processing_metrics(self, *, start_time: Optional[float] = None): + """Start processing metrics collection. - async def stop_processing_metrics(self): - """Stop processing metrics collection and push results.""" + Args: + start_time: Optional timestamp to use as the start time. If None, + uses the current time. + """ if self.can_generate_metrics() and self.metrics_enabled: - frame = await self._metrics.stop_processing_metrics() + await self._metrics.start_processing_metrics(start_time=start_time) + + async def stop_processing_metrics(self, *, end_time: Optional[float] = None): + """Stop processing metrics collection and push results. + + Args: + end_time: Optional timestamp to use as the end time. If None, uses + the current time. + """ + if self.can_generate_metrics() and self.metrics_enabled: + frame = await self._metrics.stop_processing_metrics(end_time=end_time) if frame: await self.push_frame(frame) diff --git a/src/pipecat/processors/metrics/frame_processor_metrics.py b/src/pipecat/processors/metrics/frame_processor_metrics.py index b8beba6e2..c82fd9698 100644 --- a/src/pipecat/processors/metrics/frame_processor_metrics.py +++ b/src/pipecat/processors/metrics/frame_processor_metrics.py @@ -107,49 +107,70 @@ class FrameProcessorMetrics(BaseObject): """ self._core_metrics_data = MetricsData(processor=name) - async def start_ttfb_metrics(self, report_only_initial_ttfb): + async def start_ttfb_metrics( + self, *, start_time: Optional[float] = None, report_only_initial_ttfb: bool + ): """Start measuring time-to-first-byte (TTFB). Args: + start_time: Optional timestamp to use as the start time. If None, + uses the current time. report_only_initial_ttfb: Whether to report only the first TTFB measurement. """ if self._should_report_ttfb: - self._start_ttfb_time = time.time() + self._start_ttfb_time = start_time or time.time() self._last_ttfb_time = 0 self._should_report_ttfb = not report_only_initial_ttfb - async def stop_ttfb_metrics(self): + async def stop_ttfb_metrics(self, *, end_time: Optional[float] = None): """Stop TTFB measurement and generate metrics frame. + Args: + end_time: Optional timestamp to use as the end time. If None, uses + the current time. + Returns: MetricsFrame containing TTFB data, or None if not measuring. """ if self._start_ttfb_time == 0: return None - self._last_ttfb_time = time.time() - self._start_ttfb_time - logger.debug(f"{self._processor_name()} TTFB: {self._last_ttfb_time}") + end_time = end_time or time.time() + + self._last_ttfb_time = end_time - self._start_ttfb_time + logger.debug(f"{self._processor_name()} TTFB: {self._last_ttfb_time:.3f}s") ttfb = TTFBMetricsData( processor=self._processor_name(), value=self._last_ttfb_time, model=self._model_name() ) self._start_ttfb_time = 0 return MetricsFrame(data=[ttfb]) - async def start_processing_metrics(self): - """Start measuring processing time.""" - self._start_processing_time = time.time() + async def start_processing_metrics(self, *, start_time: Optional[float] = None): + """Start measuring processing time. - async def stop_processing_metrics(self): + Args: + start_time: Optional timestamp to use as the start time. If None, + uses the current time. + """ + self._start_processing_time = start_time or time.time() + + async def stop_processing_metrics(self, *, end_time: Optional[float] = None): """Stop processing time measurement and generate metrics frame. + Args: + end_time: Optional timestamp to use as the end time. If None, uses + the current time. + Returns: MetricsFrame containing processing duration data, or None if not measuring. """ if self._start_processing_time == 0: return None - value = time.time() - self._start_processing_time - logger.debug(f"{self._processor_name()} processing time: {value}") + end_time = end_time or time.time() + + value = end_time - self._start_processing_time + logger.debug(f"{self._processor_name()} processing time: {value:.3f}s") processing = ProcessingMetricsData( processor=self._processor_name(), value=value, model=self._model_name() ) diff --git a/src/pipecat/services/stt_service.py b/src/pipecat/services/stt_service.py index 6d431c523..bad8e2e28 100644 --- a/src/pipecat/services/stt_service.py +++ b/src/pipecat/services/stt_service.py @@ -21,7 +21,6 @@ from pipecat.frames.frames import ( ErrorFrame, Frame, InterruptionFrame, - MetricsFrame, ServiceSwitcherRequestMetadataFrame, StartFrame, STTMetadataFrame, @@ -31,7 +30,6 @@ from pipecat.frames.frames import ( VADUserStartedSpeakingFrame, VADUserStoppedSpeakingFrame, ) -from pipecat.metrics.metrics import TTFBMetricsData from pipecat.processors.frame_processor import FrameDirection from pipecat.services.ai_service import AIService from pipecat.services.stt_latency import DEFAULT_TTFS_P99 @@ -121,9 +119,7 @@ class STTService(AIService): # STT TTFB tracking state self._stt_ttfb_timeout = stt_ttfb_timeout self._ttfb_timeout_task: Optional[asyncio.Task] = None - self._speech_end_time: Optional[float] = None self._user_speaking: bool = False - self._last_transcription_time: Optional[float] = None self._finalize_pending: bool = False self._finalize_requested: bool = False @@ -327,23 +323,16 @@ class STTService(AIService): direction: The direction to push the frame. """ if isinstance(frame, TranscriptionFrame): - # Store the transcription time for TTFB calculation - self._last_transcription_time = time.time() - # Set finalized from pending state and auto-reset if self._finalize_pending: frame.finalized = True self._finalize_pending = False # If this is a finalized transcription, report TTFB immediately - if frame.finalized and self._speech_end_time is not None: - ttfb = self._last_transcription_time - self._speech_end_time - await self._emit_stt_ttfb_metric(ttfb) + if frame.finalized: + await self.stop_ttfb_metrics() # Cancel the timeout since we've already reported await self._cancel_ttfb_timeout() - # Clear state - self._speech_end_time = None - self._last_transcription_time = None await super().push_frame(frame, direction) @@ -373,8 +362,6 @@ class STTService(AIService): while user is still speaking. """ await self._cancel_ttfb_timeout() - self._speech_end_time = None - self._last_transcription_time = None async def _handle_vad_user_started_speaking(self, frame: VADUserStartedSpeakingFrame): """Handle VAD user started speaking frame to start tracking transcriptions. @@ -408,7 +395,8 @@ class STTService(AIService): # Calculate the actual speech end time (current time minus VAD stop delay). # This approximates when the last user audio was sent to the STT service, # which we use to measure against the eventual transcription response. - self._speech_end_time = frame.timestamp - frame.stop_secs + speech_end_time = frame.timestamp - frame.stop_secs + await self.start_ttfb_metrics(start_time=speech_end_time) # Start timeout task (any previous timeout was cancelled by VADUserStartedSpeakingFrame # or InterruptionFrame) @@ -417,44 +405,20 @@ class STTService(AIService): ) async def _ttfb_timeout_handler(self): - """Wait for timeout then report TTFB using the last transcription timestamp. + """Wait for timeout then report TTFB. This timeout allows the final transcription to arrive before we calculate and report TTFB. If no transcription arrived, no TTFB is reported. """ try: await asyncio.sleep(self._stt_ttfb_timeout) - - # Report TTFB if we have both speech end time and transcription time - if self._speech_end_time is not None and self._last_transcription_time is not None: - ttfb = self._last_transcription_time - self._speech_end_time - await self._emit_stt_ttfb_metric(ttfb) - - # Clear state after reporting - self._speech_end_time = None - self._last_transcription_time = None + await self.stop_ttfb_metrics() except asyncio.CancelledError: # Task was cancelled (new utterance or interruption), which is expected behavior pass finally: self._ttfb_timeout_task = None - async def _emit_stt_ttfb_metric(self, ttfb: float): - """Emit STT TTFB metric if value is non-negative. - - Args: - ttfb: The TTFB value in seconds. - """ - if ttfb >= 0: - logger.debug(f"{self} TTFB: {ttfb:.3f}s") - if self.metrics_enabled: - ttfb_data = TTFBMetricsData( - processor=self.name, - model=self.model_name, - value=ttfb, - ) - await super().push_frame(MetricsFrame(data=[ttfb_data])) - def _create_keepalive_task(self): """Start the keepalive task if keepalive is enabled.""" if self._keepalive_timeout is not None: