Compare commits

...

2 Commits

Author SHA1 Message Date
Mark Backman
4b4e8b839c Add changelog for PR #3851 2026-02-26 18:27:50 -05:00
Mark Backman
86c2dd5cfc Remove processing metrics (ProcessingMetricsData)
Processing metrics were an early addition that predated a clear
understanding of what timing measurements matter in real-time pipelines.
They were inconsistently implemented across services, often broken, and
overlapped with the better-defined TTFB metric.

- Remove ProcessingMetricsData class and all start/stop_processing_metrics
  methods from FrameProcessorMetrics, FrameProcessor, and SentryMetrics
- Remove all processing metrics calls from 31 service files (LLM, TTS,
  STT, image, vision, realtime)
- Clean up empty _start_metrics() stubs left in STT services
- Remove processing metrics handling from RTVI, metrics log observer,
  pipeline task initial metrics, and strands agents framework
- Update tests and examples

Remaining metrics (TTFB, LLM token usage, TTS character usage, text
aggregation time) are well-defined and consistently implemented.
2026-02-26 18:20:49 -05:00
42 changed files with 14 additions and 270 deletions

View File

@@ -0,0 +1 @@
- ⚠️ Removed `ProcessingMetricsData` and all `start_processing_metrics()`/`stop_processing_metrics()` methods from `FrameProcessor` and `FrameProcessorMetrics`. These metrics were inconsistently implemented across services and overlapped with the better-defined TTFB metric. TTFB, LLM token usage, TTS character usage, and text aggregation metrics are unaffected.

View File

@@ -13,7 +13,6 @@ from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, LLMRunFrame, MetricsFrame
from pipecat.metrics.metrics import (
LLMUsageMetricsData,
ProcessingMetricsData,
TTFBMetricsData,
TTSUsageMetricsData,
)
@@ -46,8 +45,6 @@ class MetricsLogger(FrameProcessor):
for d in frame.data:
if isinstance(d, TTFBMetricsData):
print(f"!!! MetricsFrame: {frame}, ttfb: {d.value}")
elif isinstance(d, ProcessingMetricsData):
print(f"!!! MetricsFrame: {frame}, processing: {d.value}")
elif isinstance(d, LLMUsageMetricsData):
tokens = d.value
print(

View File

@@ -38,16 +38,6 @@ class TTFBMetricsData(MetricsData):
value: float
class ProcessingMetricsData(MetricsData):
"""General processing time metrics data.
Parameters:
value: Processing time measurement in seconds.
"""
value: float
class LLMTokenUsage(BaseModel):
"""Token usage statistics for LLM operations.

View File

@@ -20,7 +20,6 @@ from pipecat.metrics.metrics import (
LLMTokenUsage,
LLMUsageMetricsData,
MetricsData,
ProcessingMetricsData,
SmartTurnMetricsData,
TTFBMetricsData,
TTSUsageMetricsData,
@@ -35,7 +34,6 @@ class MetricsLogObserver(BaseObserver):
Monitors and logs all MetricsFrame instances, including:
- TTFBMetricsData (Time To First Byte)
- ProcessingMetricsData (General processing time)
- LLMUsageMetricsData (Token usage statistics)
- TTSUsageMetricsData (Text-to-Speech character counts)
- TurnMetricsData (Turn prediction metrics)
@@ -146,10 +144,6 @@ class MetricsLogObserver(BaseObserver):
logger.debug(
f"📊 {processor_info} TTFB{model_info}: {metrics_data.value}s at {time_sec:.3f}s"
)
elif isinstance(metrics_data, ProcessingMetricsData):
logger.debug(
f"📊 {processor_info} PROCESSING TIME{model_info}: {metrics_data.value}s at {time_sec:.3f}s"
)
elif isinstance(metrics_data, LLMUsageMetricsData):
self._log_llm_usage(metrics_data, processor_info, model_info, time_sec)
elif isinstance(metrics_data, TTSUsageMetricsData):

View File

@@ -40,7 +40,7 @@ from pipecat.frames.frames import (
StopTaskFrame,
UserSpeakingFrame,
)
from pipecat.metrics.metrics import ProcessingMetricsData, TTFBMetricsData
from pipecat.metrics.metrics import TTFBMetricsData
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.observers.turn_tracking_observer import TurnTrackingObserver
from pipecat.observers.user_bot_latency_observer import UserBotLatencyObserver
@@ -715,7 +715,6 @@ class PipelineTask(BasePipelineTask):
data = []
for p in processors:
data.append(TTFBMetricsData(processor=p.name, value=0.0))
data.append(ProcessingMetricsData(processor=p.name, value=0.0))
return MetricsFrame(data=data)
async def _wait_for_pipeline_start(self, frame: Frame):

View File

@@ -441,28 +441,6 @@ class FrameProcessor(BaseObject):
if frame:
await self.push_frame(frame)
async def start_processing_metrics(self, *, start_time: Optional[float] = None):
"""Start processing metrics collection.
Args:
start_time: Optional timestamp to use as the start time. If None,
uses the current time.
"""
if self.can_generate_metrics() and self.metrics_enabled:
await self._metrics.start_processing_metrics(start_time=start_time)
async def stop_processing_metrics(self, *, end_time: Optional[float] = None):
"""Stop processing metrics collection and push results.
Args:
end_time: Optional timestamp to use as the end time. If None, uses
the current time.
"""
if self.can_generate_metrics() and self.metrics_enabled:
frame = await self._metrics.stop_processing_metrics(end_time=end_time)
if frame:
await self.push_frame(frame)
async def start_llm_usage_metrics(self, tokens: LLMTokenUsage):
"""Start LLM usage metrics collection.
@@ -500,7 +478,6 @@ class FrameProcessor(BaseObject):
async def stop_all_metrics(self):
"""Stop all active metrics collection."""
await self.stop_ttfb_metrics()
await self.stop_processing_metrics()
await self.stop_text_aggregation_metrics()
def create_task(self, coroutine: Coroutine, name: Optional[str] = None) -> asyncio.Task:

View File

@@ -75,7 +75,6 @@ from pipecat.frames.frames import (
)
from pipecat.metrics.metrics import (
LLMUsageMetricsData,
ProcessingMetricsData,
TTFBMetricsData,
TTSUsageMetricsData,
)
@@ -1547,10 +1546,6 @@ class RTVIObserver(BaseObserver):
if "ttfb" not in metrics:
metrics["ttfb"] = []
metrics["ttfb"].append(d.model_dump(exclude_none=True))
elif isinstance(d, ProcessingMetricsData):
if "processing" not in metrics:
metrics["processing"] = []
metrics["processing"].append(d.model_dump(exclude_none=True))
elif isinstance(d, LLMUsageMetricsData):
if "tokens" not in metrics:
metrics["tokens"] = []

View File

@@ -90,7 +90,6 @@ class StrandsAgentsProcessor(FrameProcessor):
ttfb_tracking = True
try:
await self.push_frame(LLMFullResponseStartFrame())
await self.start_processing_metrics()
await self.start_ttfb_metrics()
if self.graph:
@@ -148,7 +147,6 @@ class StrandsAgentsProcessor(FrameProcessor):
if ttfb_tracking:
await self.stop_ttfb_metrics()
ttfb_tracking = False
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())
def can_generate_metrics(self) -> bool:

View File

@@ -16,7 +16,6 @@ from pipecat.metrics.metrics import (
LLMTokenUsage,
LLMUsageMetricsData,
MetricsData,
ProcessingMetricsData,
TextAggregationMetricsData,
TTFBMetricsData,
TTSUsageMetricsData,
@@ -43,7 +42,6 @@ class FrameProcessorMetrics(BaseObject):
super().__init__()
self._task_manager = None
self._start_ttfb_time = 0
self._start_processing_time = 0
self._start_text_aggregation_time = 0
self._last_ttfb_time = 0
self._should_report_ttfb = True
@@ -147,38 +145,6 @@ class FrameProcessorMetrics(BaseObject):
self._start_ttfb_time = 0
return MetricsFrame(data=[ttfb])
async def start_processing_metrics(self, *, start_time: Optional[float] = None):
"""Start measuring processing time.
Args:
start_time: Optional timestamp to use as the start time. If None,
uses the current time.
"""
self._start_processing_time = start_time or time.time()
async def stop_processing_metrics(self, *, end_time: Optional[float] = None):
"""Stop processing time measurement and generate metrics frame.
Args:
end_time: Optional timestamp to use as the end time. If None, uses
the current time.
Returns:
MetricsFrame containing processing duration data, or None if not measuring.
"""
if self._start_processing_time == 0:
return None
end_time = end_time or time.time()
value = end_time - self._start_processing_time
logger.debug(f"{self._processor_name()} processing time: {value:.3f}s")
processing = ProcessingMetricsData(
processor=self._processor_name(), value=value, model=self._model_name()
)
self._start_processing_time = 0
return MetricsFrame(data=[processing])
async def start_llm_usage_metrics(self, tokens: LLMTokenUsage):
"""Record LLM token usage metrics.

View File

@@ -39,7 +39,6 @@ class SentryMetrics(FrameProcessorMetrics):
"""
super().__init__()
self._ttfb_metrics_tx = None
self._processing_metrics_tx = None
self._sentry_available = sentry_sdk.is_initialized()
if not self._sentry_available:
logger.warning("Sentry SDK not initialized. Sentry features will be disabled.")
@@ -105,35 +104,6 @@ class SentryMetrics(FrameProcessorMetrics):
await self._sentry_queue.put(self._ttfb_metrics_tx)
self._ttfb_metrics_tx = None
async def start_processing_metrics(self, *, start_time: Optional[float] = None):
"""Start tracking frame processing metrics.
Args:
start_time: Optional start timestamp override.
"""
await super().start_processing_metrics(start_time=start_time)
if self._sentry_available:
self._processing_metrics_tx = sentry_sdk.start_transaction(
op="processing",
name=f"Processing for {self._processor_name()}",
)
logger.debug(
f"{self} Sentry transaction started (ID: {self._processing_metrics_tx.span_id} Name: {self._processing_metrics_tx.name})"
)
async def stop_processing_metrics(self, *, end_time: Optional[float] = None):
"""Stop tracking frame processing metrics.
Args:
end_time: Optional end timestamp override.
"""
await super().stop_processing_metrics(end_time=end_time)
if self._sentry_available and self._processing_metrics_tx:
await self._sentry_queue.put(self._processing_metrics_tx)
self._processing_metrics_tx = None
async def _sentry_task_handler(self):
"""Background task handler for completing Sentry transactions."""
running = True

View File

@@ -427,7 +427,6 @@ class AnthropicLLMService(LLMService):
try:
await self.push_frame(LLMFullResponseStartFrame())
await self.start_processing_metrics()
params_from_context = self._get_llm_invocation_params(context)
@@ -579,7 +578,6 @@ class AnthropicLLMService(LLMService):
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())
comp_tokens = (
completion_tokens

View File

@@ -284,7 +284,6 @@ class AssemblyAISTTService(WebsocketSTTService):
and self._websocket.state is State.OPEN
):
await self._websocket.send(json.dumps({"type": "ForceEndpoint"}))
await self.start_processing_metrics()
@traced_stt
async def _trace_transcription(self, transcript: str, is_final: bool, language: Language):
@@ -482,7 +481,6 @@ class AssemblyAISTTService(WebsocketSTTService):
)
)
await self._trace_transcription(message.transcript, True, self._settings.language)
await self.stop_processing_metrics()
else:
await self.push_frame(
InterimTranscriptionFrame(

View File

@@ -1044,7 +1044,6 @@ class AWSBedrockLLMService(LLMService):
try:
await self.push_frame(LLMFullResponseStartFrame())
await self.start_processing_metrics()
await self.start_ttfb_metrics()
@@ -1200,7 +1199,6 @@ class AWSBedrockLLMService(LLMService):
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())
comp_tokens = (
completion_tokens

View File

@@ -213,8 +213,6 @@ class AWSTranscribeSTTService(WebsocketSTTService):
# Send the formatted event message
await self._websocket.send(event_message)
# Start metrics after first chunk sent
await self.start_processing_metrics()
except Exception as e:
yield ErrorFrame(error=f"Error sending audio: {e}")
@@ -541,7 +539,6 @@ class AWSTranscribeSTTService(WebsocketSTTService):
is_final,
self._settings.language,
)
await self.stop_processing_metrics()
else:
await self.push_frame(
InterimTranscriptionFrame(

View File

@@ -173,7 +173,6 @@ class AzureSTTService(STTService):
Frame: Either None for successful processing or ErrorFrame on failure.
"""
try:
await self.start_processing_metrics()
if self._audio_stream:
self._audio_stream.write(audio)
yield None
@@ -248,7 +247,7 @@ class AzureSTTService(STTService):
self, transcript: str, is_final: bool, language: Optional[Language] = None
):
"""Handle a transcription result with tracing."""
await self.stop_processing_metrics()
pass
def _on_handle_recognized(self, event):
if event.result.reason == ResultReason.RecognizedSpeech and len(event.result.text) > 0:

View File

@@ -24,7 +24,6 @@ from pipecat.frames.frames import (
InterimTranscriptionFrame,
StartFrame,
TranscriptionFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
@@ -241,10 +240,6 @@ class CartesiaSTTService(WebsocketSTTService):
await super().cancel(frame)
await self._disconnect()
async def _start_metrics(self):
"""Start performance metrics collection for transcription processing."""
await self.start_processing_metrics()
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and handle speech events.
@@ -254,9 +249,7 @@ class CartesiaSTTService(WebsocketSTTService):
"""
await super().process_frame(frame, direction)
if isinstance(frame, VADUserStartedSpeakingFrame):
await self._start_metrics()
elif isinstance(frame, VADUserStoppedSpeakingFrame):
if isinstance(frame, VADUserStoppedSpeakingFrame):
# Send finalize command to flush the transcription session
if self._websocket and self._websocket.state is State.OPEN:
await self._websocket.send("finalize")
@@ -404,7 +397,6 @@ class CartesiaSTTService(WebsocketSTTService):
)
)
await self._handle_transcription(transcript, is_final, language)
await self.stop_processing_metrics()
else:
# For interim transcriptions, just push the frame without tracing
await self.push_frame(

View File

@@ -497,7 +497,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
# both the "user started speaking" event and the first transcript simultaneously,
# making this timing measurement meaningless in this context.
# await self.start_ttfb_metrics()
await self.start_processing_metrics()
pass
@traced_stt
async def _handle_transcription(
@@ -753,7 +753,6 @@ class DeepgramFluxSTTService(WebsocketSTTService):
)
await self._handle_transcription(transcript, True, self._settings.language)
await self.stop_processing_metrics()
await self.broadcast_frame(UserStoppedSpeakingFrame)
await self._call_event_handler("on_end_of_turn", transcript)

View File

@@ -21,7 +21,6 @@ from pipecat.frames.frames import (
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
@@ -452,10 +451,6 @@ class DeepgramSTTService(STTService):
# GH issue: https://github.com/deepgram/deepgram-python-sdk/issues/570
await self._connection.finish()
async def _start_metrics(self):
"""Start processing metrics collection for this utterance."""
await self.start_processing_metrics()
async def _on_error(self, *args, **kwargs):
error: ErrorResponse = kwargs["error"]
logger.warning(f"{self} connection error, will retry: {error}")
@@ -467,7 +462,6 @@ class DeepgramSTTService(STTService):
await self._connect()
async def _on_speech_started(self, *args, **kwargs):
await self._start_metrics()
await self._call_event_handler("on_speech_started", *args, **kwargs)
await self.broadcast_frame(UserStartedSpeakingFrame)
if self._should_interrupt:
@@ -511,7 +505,6 @@ class DeepgramSTTService(STTService):
)
)
await self._handle_transcription(transcript, is_final, language)
await self.stop_processing_metrics()
else:
# For interim transcriptions, just push the frame without tracing
await self.push_frame(
@@ -533,10 +526,7 @@ class DeepgramSTTService(STTService):
"""
await super().process_frame(frame, direction)
if isinstance(frame, VADUserStartedSpeakingFrame) and not self.vad_enabled:
# Start metrics if Deepgram VAD is disabled & pipeline VAD has detected speech
await self._start_metrics()
elif isinstance(frame, VADUserStoppedSpeakingFrame):
if isinstance(frame, VADUserStoppedSpeakingFrame):
# https://developers.deepgram.com/docs/finalize
# Mark that we're awaiting a from_finalize response
self.request_finalize()

View File

@@ -27,7 +27,6 @@ from pipecat.frames.frames import (
InterimTranscriptionFrame,
StartFrame,
TranscriptionFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
@@ -390,7 +389,6 @@ class DeepgramSageMakerSTTService(STTService):
)
)
await self._handle_transcription(transcript, is_final, language)
await self.stop_processing_metrics()
else:
# Interim transcription
await self.push_frame(
@@ -420,10 +418,6 @@ class DeepgramSageMakerSTTService(STTService):
"""
pass
async def _start_metrics(self):
"""Start processing metrics collection."""
await self.start_processing_metrics()
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames with Deepgram SageMaker-specific handling.
@@ -433,10 +427,7 @@ class DeepgramSageMakerSTTService(STTService):
"""
await super().process_frame(frame, direction)
# Start metrics when user starts speaking (if VAD is not provided by Deepgram)
if isinstance(frame, VADUserStartedSpeakingFrame):
await self._start_metrics()
elif isinstance(frame, VADUserStoppedSpeakingFrame):
if isinstance(frame, VADUserStoppedSpeakingFrame):
# https://developers.deepgram.com/docs/finalize
# Mark that we're awaiting a from_finalize response
self.request_finalize()

View File

@@ -31,7 +31,6 @@ from pipecat.frames.frames import (
InterimTranscriptionFrame,
StartFrame,
TranscriptionFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
@@ -342,7 +341,7 @@ class ElevenLabsSTTService(SegmentedSTTService):
self, transcript: str, is_final: bool, language: Optional[str] = None
):
"""Handle a transcription result with tracing."""
await self.stop_processing_metrics()
pass
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""Transcribe an audio segment using ElevenLabs' STT API.
@@ -358,8 +357,6 @@ class ElevenLabsSTTService(SegmentedSTTService):
Only non-empty transcriptions are yielded.
"""
try:
await self.start_processing_metrics()
# Upload audio and get transcription result directly
result = await self._transcribe_audio(audio)
@@ -563,10 +560,6 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
await super().cancel(frame)
await self._disconnect()
async def _start_metrics(self):
"""Start performance metrics collection for transcription processing."""
await self.start_processing_metrics()
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and handle speech events.
@@ -576,10 +569,7 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
"""
await super().process_frame(frame, direction)
if isinstance(frame, VADUserStartedSpeakingFrame):
# Start metrics when user starts speaking
await self._start_metrics()
elif isinstance(frame, VADUserStoppedSpeakingFrame):
if isinstance(frame, VADUserStoppedSpeakingFrame):
# Send commit when user stops speaking (manual commit mode)
if self._settings.commit_strategy == CommitStrategy.MANUAL:
if self._websocket and self._websocket.state is State.OPEN:
@@ -852,8 +842,6 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
if not text:
return
await self.stop_processing_metrics()
# Get language if provided
language = data.get("language_code")
@@ -893,8 +881,6 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
if not text:
return
await self.stop_processing_metrics()
# Get language if provided
language = data.get("language_code")

View File

@@ -257,7 +257,7 @@ class FalSTTService(SegmentedSTTService):
self, transcript: str, is_final: bool, language: Optional[str] = None
):
"""Handle a transcription result with tracing."""
await self.stop_processing_metrics()
pass
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""Transcribes an audio segment using Fal's Wizper API.
@@ -273,8 +273,6 @@ class FalSTTService(SegmentedSTTService):
Only non-empty transcriptions are yielded.
"""
try:
await self.start_processing_metrics()
# Send to Fal directly (audio is already in WAV format from base class)
data_uri = fal_client.encode(audio, "audio/x-wav")
response = await self._fal_client.run(

View File

@@ -477,8 +477,6 @@ class GladiaSTTService(WebsocketSTTService):
Yields:
None (processing is handled asynchronously via WebSocket).
"""
await self.start_processing_metrics()
# Add audio to buffer
async with self._buffer_lock:
self._audio_buffer.extend(audio)
@@ -597,7 +595,7 @@ class GladiaSTTService(WebsocketSTTService):
async def _handle_transcription(
self, transcript: str, is_final: bool, language: Optional[str] = None
):
await self.stop_processing_metrics()
pass
async def _on_speech_started(self):
"""Handle speech start event from Gladia.

View File

@@ -905,7 +905,6 @@ class GoogleSTTService(STTService):
"""
if self._streaming_task:
# Queue the audio data
await self.start_processing_metrics()
await self._request_queue.put(audio)
yield None
@@ -948,7 +947,6 @@ class GoogleSTTService(STTService):
result=result,
)
)
await self.stop_processing_metrics()
await self._handle_transcription(
transcript,
is_final=True,

View File

@@ -233,9 +233,7 @@ class GradiumSTTService(WebsocketSTTService):
"""
await super().process_frame(frame, direction)
if isinstance(frame, VADUserStartedSpeakingFrame):
await self.start_processing_metrics()
elif isinstance(frame, VADUserStoppedSpeakingFrame):
if isinstance(frame, VADUserStoppedSpeakingFrame):
await self._flush_transcription()
async def _flush_transcription(self):
@@ -420,4 +418,3 @@ class GradiumSTTService(WebsocketSTTService):
)
)
await self._trace_transcription(text, is_final=True, language=None)
await self.stop_processing_metrics()

View File

@@ -661,7 +661,6 @@ class GrokRealtimeLLMService(LLMService):
)
await self.start_llm_usage_metrics(tokens)
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())
self._current_assistant_response = None
@@ -736,7 +735,6 @@ class GrokRealtimeLLMService(LLMService):
async def _handle_evt_speech_stopped(self, evt):
"""Handle speech stopped event from VAD."""
await self.start_ttfb_metrics()
await self.start_processing_metrics()
await self.broadcast_frame(UserStoppedSpeakingFrame)
async def _handle_evt_error(self, evt):
@@ -785,7 +783,6 @@ class GrokRealtimeLLMService(LLMService):
logger.debug("Creating Grok response")
await self.push_frame(LLMFullResponseStartFrame())
await self.start_processing_metrics()
await self.start_ttfb_metrics()
await self.send_client_event(

View File

@@ -129,8 +129,6 @@ class HathoraSTTService(SegmentedSTTService):
Frame: Frames containing transcription results (typically TextFrame).
"""
try:
await self.start_processing_metrics()
url = f"{self._base_url}"
payload = {
@@ -170,7 +168,5 @@ class HathoraSTTService(SegmentedSTTService):
result=response,
)
await self.stop_processing_metrics()
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")

View File

@@ -143,7 +143,6 @@ class HathoraTTSService(TTSService):
Frame: Audio frames containing the synthesized speech.
"""
try:
await self.start_processing_metrics()
await self.start_ttfb_metrics()
url = f"{self._base_url}"
@@ -187,5 +186,4 @@ class HathoraTTSService(TTSService):
yield ErrorFrame(error=f"Unknown error occurred: {e}")
finally:
await self.stop_ttfb_metrics()
await self.stop_processing_metrics()
yield TTSStoppedFrame(context_id=context_id)

View File

@@ -73,8 +73,6 @@ class ImageGenService(AIService):
if isinstance(frame, TextFrame):
await self.push_frame(frame, direction)
await self.start_processing_metrics()
await self.process_generator(self.run_image_gen(frame.text))
await self.stop_processing_metrics()
else:
await self.push_frame(frame, direction)

View File

@@ -342,7 +342,6 @@ class NvidiaSTTService(STTService):
transcript = result.alternatives[0].transcript
if transcript and len(transcript) > 0:
if result.is_final:
await self.stop_processing_metrics()
await self.push_frame(
TranscriptionFrame(
transcript,
@@ -377,7 +376,6 @@ class NvidiaSTTService(STTService):
Yields:
None - transcription results are pushed to the pipeline via frames.
"""
await self.start_processing_metrics()
await self._queue.put(audio)
yield None
@@ -620,13 +618,9 @@ class NvidiaSegmentedSTTService(SegmentedSTTService):
assert self._asr_service is not None, "ASR service not initialized"
assert self._config is not None, "Recognition config not created"
await self.start_processing_metrics()
# Process audio with NVIDIA Riva ASR - explicitly request non-future response
raw_response = self._asr_service.offline_recognize(audio, self._config, future=False)
await self.stop_processing_metrics()
# Process the response - handle different possible return types
# If it's a future-like object, get the result
if hasattr(raw_response, "result"):

View File

@@ -544,7 +544,6 @@ class BaseOpenAILLMService(LLMService):
if context:
try:
await self.push_frame(LLMFullResponseStartFrame())
await self.start_processing_metrics()
await self._process_context(context)
except httpx.TimeoutException as e:
await self._call_event_handler("on_completion_timeout")
@@ -552,5 +551,4 @@ class BaseOpenAILLMService(LLMService):
except Exception as e:
await self.push_error(error_msg=f"Error during completion: {e}", exception=e)
finally:
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())

View File

@@ -751,7 +751,6 @@ class OpenAIRealtimeLLMService(LLMService):
cache_read_input_tokens=cached_tokens,
)
await self.start_llm_usage_metrics(tokens)
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())
self._current_assistant_response = None
# error handling
@@ -840,7 +839,6 @@ class OpenAIRealtimeLLMService(LLMService):
async def _handle_evt_speech_stopped(self, evt):
await self.start_ttfb_metrics()
await self.start_processing_metrics()
await self.broadcast_frame(UserStoppedSpeakingFrame)
async def _maybe_handle_evt_retrieve_conversation_item_error(self, evt: events.ErrorEvent):
@@ -917,7 +915,6 @@ class OpenAIRealtimeLLMService(LLMService):
logger.debug("Creating response")
await self.push_frame(LLMFullResponseStartFrame())
await self.start_processing_metrics()
await self.start_ttfb_metrics()
await self.send_client_event(
events.ResponseCreateEvent(

View File

@@ -351,9 +351,7 @@ class OpenAIRealtimeSTTService(WebsocketSTTService):
# Handle local VAD events when server-side VAD is disabled.
if not self._server_vad_enabled:
if isinstance(frame, VADUserStartedSpeakingFrame):
await self.start_processing_metrics()
elif isinstance(frame, VADUserStoppedSpeakingFrame):
if isinstance(frame, VADUserStoppedSpeakingFrame):
await self._commit_audio_buffer()
# ------------------------------------------------------------------
@@ -609,7 +607,6 @@ class OpenAIRealtimeSTTService(WebsocketSTTService):
)
)
await self._handle_transcription_trace(transcript, True)
await self.stop_processing_metrics()
@traced_stt
async def _handle_transcription_trace(
@@ -640,7 +637,6 @@ class OpenAIRealtimeSTTService(WebsocketSTTService):
await self.broadcast_frame(UserStartedSpeakingFrame)
if self._should_interrupt:
await self.push_interruption_task_frame_and_wait()
await self.start_processing_metrics()
async def _handle_speech_stopped(self, evt: dict):
"""Handle server-side VAD speech stop.

View File

@@ -671,7 +671,6 @@ class OpenAIRealtimeBetaLLMService(LLMService):
total_tokens=evt.response.usage.total_tokens,
)
await self.start_llm_usage_metrics(tokens)
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())
self._current_assistant_response = None
# error handling
@@ -710,7 +709,6 @@ class OpenAIRealtimeBetaLLMService(LLMService):
async def _handle_evt_speech_stopped(self, evt):
await self.start_ttfb_metrics()
await self.start_processing_metrics()
await self.broadcast_frame(UserStoppedSpeakingFrame)
async def _maybe_handle_evt_retrieve_conversation_item_error(self, evt: events.ErrorEvent):
@@ -797,7 +795,6 @@ class OpenAIRealtimeBetaLLMService(LLMService):
logger.debug(f"Creating response: {self._context.get_messages_for_logging()}")
await self.push_frame(LLMFullResponseStartFrame())
await self.start_processing_metrics()
await self.start_ttfb_metrics()
await self.send_client_event(
events.ResponseCreateEvent(

View File

@@ -27,7 +27,6 @@ from pipecat.frames.frames import (
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
@@ -321,9 +320,7 @@ class SarvamSTTService(STTService):
# Only handle VAD frames when not using Sarvam's VAD signals
if not self._settings.vad_signals:
if isinstance(frame, VADUserStartedSpeakingFrame):
await self._start_metrics()
elif isinstance(frame, VADUserStoppedSpeakingFrame):
if isinstance(frame, VADUserStoppedSpeakingFrame):
if self._socket_client:
await self._socket_client.flush()
@@ -639,7 +636,6 @@ class SarvamSTTService(STTService):
logger.debug(f"VAD Signal: {signal}, Occurred at: {timestamp}")
if signal == "START_SPEECH":
await self._start_metrics()
logger.debug("User started speaking")
await self._call_event_handler("on_speech_started")
await self.broadcast_frame(UserStartedSpeakingFrame)
@@ -679,8 +675,6 @@ class SarvamSTTService(STTService):
)
)
await self.stop_processing_metrics()
except Exception as e:
await self.push_error(error_msg=f"Failed to handle message: {e}", exception=e)
await self.stop_all_metrics()
@@ -739,7 +733,3 @@ class SarvamSTTService(STTService):
await self._socket_client.translate(**method_kwargs)
else:
await self._socket_client.transcribe(**method_kwargs)
async def _start_metrics(self):
"""Start processing metrics collection."""
await self.start_processing_metrics()

View File

@@ -465,7 +465,6 @@ class SonioxSTTService(WebsocketSTTService):
)
)
await self._handle_transcription(text, is_final=True)
await self.stop_processing_metrics()
self._final_transcription_buffer = []
async for message in self._get_websocket():
@@ -492,8 +491,6 @@ class SonioxSTTService(WebsocketSTTService):
# the rest will be sent as interim tokens (even final tokens).
await send_endpoint_transcript()
else:
if not self._final_transcription_buffer:
await self.start_processing_metrics()
self._final_transcription_buffer.append(token)
else:
non_final_transcription.append(token)

View File

@@ -833,7 +833,6 @@ class SpeechmaticsSTTService(STTService):
message: the message payload.
"""
logger.debug(f"{self} StartOfTurn received")
# await self.start_processing_metrics()
await self.broadcast_frame(UserStartedSpeakingFrame)
if self._should_interrupt:
await self.push_interruption_task_frame_and_wait()
@@ -854,7 +853,6 @@ class SpeechmaticsSTTService(STTService):
message: the message payload.
"""
logger.debug(f"{self} EndOfTurn received")
# await self.stop_processing_metrics()
await self.broadcast_frame(UserStoppedSpeakingFrame)
async def _handle_speakers_result(self, message: dict[str, Any]) -> None:

View File

@@ -836,20 +836,12 @@ class TTSService(AIService):
if self._is_streaming_tokens:
self._streamed_text += text
# Skip per-token processing metrics when streaming. The per-token
# processing time is just websocket send overhead (~0.1ms) and not
# meaningful. TTFB captures the important timing for streaming TTS.
if not self._is_streaming_tokens:
await self.start_processing_metrics()
# Process all filters.
for filter in self._text_filters:
await filter.reset_interruption()
text = await filter.filter(text)
if not text.strip():
if not self._is_streaming_tokens:
await self.stop_processing_metrics()
return
# Create context ID and store metadata
@@ -887,9 +879,6 @@ class TTSService(AIService):
await self.process_generator(self.run_tts(prepared_text, context_id))
if not self._is_streaming_tokens:
await self.stop_processing_metrics()
if self._push_text_frames:
# In TTS services that support word timestamps, the TTSTextFrames
# are pushed as words are spoken. However, in the case where the TTS service

View File

@@ -551,7 +551,6 @@ class UltravoxRealtimeLLMService(LLMService):
if not audio:
return
if not self._bot_responding:
await self.start_processing_metrics()
await self.stop_ttfb_metrics()
await self.push_frame(LLMFullResponseStartFrame())
await self.push_frame(TTSStartedFrame())
@@ -559,7 +558,6 @@ class UltravoxRealtimeLLMService(LLMService):
await self.push_frame(TTSAudioRawFrame(audio, self._sample_rate, 1))
async def _handle_response_end(self):
await self.stop_processing_metrics()
if self._bot_responding == "voice":
await self.push_frame(TTSStoppedFrame())
await self.push_frame(LLMFullResponseEndFrame())
@@ -606,12 +604,10 @@ class UltravoxRealtimeLLMService(LLMService):
await self.push_frame(tts_frame)
elif medium == "text":
if final:
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())
self._bot_responding = None
elif text or delta:
if not self._bot_responding:
await self.start_processing_metrics()
await self.stop_ttfb_metrics()
await self.push_frame(LLMFullResponseStartFrame())
self._bot_responding = "text"

View File

@@ -74,8 +74,6 @@ class VisionService(AIService):
await super().process_frame(frame, direction)
if isinstance(frame, UserImageRawFrame) and frame.text:
await self.start_processing_metrics()
await self.process_generator(self.run_vision(frame))
await self.stop_processing_metrics()
else:
await self.push_frame(frame, direction)

View File

@@ -229,12 +229,8 @@ class BaseWhisperSTTService(SegmentedSTTService):
or an ErrorFrame if transcription fails.
"""
try:
await self.start_processing_metrics()
response = await self._transcribe(audio)
await self.stop_processing_metrics()
text = response.text.strip()
if text:

View File

@@ -314,8 +314,6 @@ class WhisperSTTService(SegmentedSTTService):
yield ErrorFrame("Whisper model not available")
return
await self.start_processing_metrics()
# Divide by 32768 because we have signed 16-bit data.
audio_float = np.frombuffer(audio, dtype=np.int16).astype(np.float32) / 32768.0
@@ -327,8 +325,6 @@ class WhisperSTTService(SegmentedSTTService):
if segment.no_speech_prob < self._no_speech_prob:
text += f"{segment.text} "
await self.stop_processing_metrics()
if text:
await self._handle_transcription(text, True, self._settings.language)
logger.debug(f"Transcription: [{text}]")
@@ -414,8 +410,6 @@ class WhisperSTTServiceMLX(WhisperSTTService):
try:
import mlx_whisper
await self.start_processing_metrics()
# Divide by 32768 because we have signed 16-bit data.
audio_float = np.frombuffer(audio, dtype=np.int16).astype(np.float32) / 32768.0
@@ -438,8 +432,6 @@ class WhisperSTTServiceMLX(WhisperSTTService):
if len(text.strip()) == 0:
text = None
await self.stop_processing_metrics()
if text:
await self._handle_transcription(text, True, self._settings.language)
logger.debug(f"Transcription: [{text}]")

View File

@@ -61,8 +61,6 @@ async def test_openai_llm_emits_error_frame_on_timeout():
)
# Mock metrics methods
service.start_processing_metrics = AsyncMock()
service.stop_processing_metrics = AsyncMock()
service.start_ttfb_metrics = AsyncMock()
# Create a context frame to process
@@ -108,8 +106,6 @@ async def test_openai_llm_timeout_still_pushes_end_frame():
service.push_error = AsyncMock()
service._call_event_handler = AsyncMock()
service._process_context = AsyncMock(side_effect=httpx.TimeoutException("Timeout"))
service.start_processing_metrics = AsyncMock()
service.stop_processing_metrics = AsyncMock()
context = LLMContext(
messages=[{"role": "user", "content": "Hello"}],
@@ -123,9 +119,6 @@ async def test_openai_llm_timeout_still_pushes_end_frame():
assert LLMFullResponseStartFrame in frame_types
assert LLMFullResponseEndFrame in frame_types
# Verify metrics were stopped
service.stop_processing_metrics.assert_called_once()
@pytest.mark.asyncio
async def test_openai_llm_stream_closed_on_cancellation():
@@ -208,8 +201,6 @@ async def test_openai_llm_emits_error_frame_on_exception():
service.push_error = mock_push_error
service._call_event_handler = AsyncMock()
service._process_context = AsyncMock(side_effect=RuntimeError("API Error"))
service.start_processing_metrics = AsyncMock()
service.stop_processing_metrics = AsyncMock()
context = LLMContext(
messages=[{"role": "user", "content": "Hello"}],