Compare commits
2 Commits
v0.0.108
...
mb/remove-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4b4e8b839c | ||
|
|
86c2dd5cfc |
1
changelog/3851.removed.md
Normal file
1
changelog/3851.removed.md
Normal file
@@ -0,0 +1 @@
|
||||
- ⚠️ Removed `ProcessingMetricsData` and all `start_processing_metrics()`/`stop_processing_metrics()` methods from `FrameProcessor` and `FrameProcessorMetrics`. These metrics were inconsistently implemented across services and overlapped with the better-defined TTFB metric. TTFB, LLM token usage, TTS character usage, and text aggregation metrics are unaffected.
|
||||
@@ -13,7 +13,6 @@ from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import Frame, LLMRunFrame, MetricsFrame
|
||||
from pipecat.metrics.metrics import (
|
||||
LLMUsageMetricsData,
|
||||
ProcessingMetricsData,
|
||||
TTFBMetricsData,
|
||||
TTSUsageMetricsData,
|
||||
)
|
||||
@@ -46,8 +45,6 @@ class MetricsLogger(FrameProcessor):
|
||||
for d in frame.data:
|
||||
if isinstance(d, TTFBMetricsData):
|
||||
print(f"!!! MetricsFrame: {frame}, ttfb: {d.value}")
|
||||
elif isinstance(d, ProcessingMetricsData):
|
||||
print(f"!!! MetricsFrame: {frame}, processing: {d.value}")
|
||||
elif isinstance(d, LLMUsageMetricsData):
|
||||
tokens = d.value
|
||||
print(
|
||||
|
||||
@@ -38,16 +38,6 @@ class TTFBMetricsData(MetricsData):
|
||||
value: float
|
||||
|
||||
|
||||
class ProcessingMetricsData(MetricsData):
|
||||
"""General processing time metrics data.
|
||||
|
||||
Parameters:
|
||||
value: Processing time measurement in seconds.
|
||||
"""
|
||||
|
||||
value: float
|
||||
|
||||
|
||||
class LLMTokenUsage(BaseModel):
|
||||
"""Token usage statistics for LLM operations.
|
||||
|
||||
|
||||
@@ -20,7 +20,6 @@ from pipecat.metrics.metrics import (
|
||||
LLMTokenUsage,
|
||||
LLMUsageMetricsData,
|
||||
MetricsData,
|
||||
ProcessingMetricsData,
|
||||
SmartTurnMetricsData,
|
||||
TTFBMetricsData,
|
||||
TTSUsageMetricsData,
|
||||
@@ -35,7 +34,6 @@ class MetricsLogObserver(BaseObserver):
|
||||
Monitors and logs all MetricsFrame instances, including:
|
||||
|
||||
- TTFBMetricsData (Time To First Byte)
|
||||
- ProcessingMetricsData (General processing time)
|
||||
- LLMUsageMetricsData (Token usage statistics)
|
||||
- TTSUsageMetricsData (Text-to-Speech character counts)
|
||||
- TurnMetricsData (Turn prediction metrics)
|
||||
@@ -146,10 +144,6 @@ class MetricsLogObserver(BaseObserver):
|
||||
logger.debug(
|
||||
f"📊 {processor_info} TTFB{model_info}: {metrics_data.value}s at {time_sec:.3f}s"
|
||||
)
|
||||
elif isinstance(metrics_data, ProcessingMetricsData):
|
||||
logger.debug(
|
||||
f"📊 {processor_info} PROCESSING TIME{model_info}: {metrics_data.value}s at {time_sec:.3f}s"
|
||||
)
|
||||
elif isinstance(metrics_data, LLMUsageMetricsData):
|
||||
self._log_llm_usage(metrics_data, processor_info, model_info, time_sec)
|
||||
elif isinstance(metrics_data, TTSUsageMetricsData):
|
||||
|
||||
@@ -40,7 +40,7 @@ from pipecat.frames.frames import (
|
||||
StopTaskFrame,
|
||||
UserSpeakingFrame,
|
||||
)
|
||||
from pipecat.metrics.metrics import ProcessingMetricsData, TTFBMetricsData
|
||||
from pipecat.metrics.metrics import TTFBMetricsData
|
||||
from pipecat.observers.base_observer import BaseObserver, FramePushed
|
||||
from pipecat.observers.turn_tracking_observer import TurnTrackingObserver
|
||||
from pipecat.observers.user_bot_latency_observer import UserBotLatencyObserver
|
||||
@@ -715,7 +715,6 @@ class PipelineTask(BasePipelineTask):
|
||||
data = []
|
||||
for p in processors:
|
||||
data.append(TTFBMetricsData(processor=p.name, value=0.0))
|
||||
data.append(ProcessingMetricsData(processor=p.name, value=0.0))
|
||||
return MetricsFrame(data=data)
|
||||
|
||||
async def _wait_for_pipeline_start(self, frame: Frame):
|
||||
|
||||
@@ -441,28 +441,6 @@ class FrameProcessor(BaseObject):
|
||||
if frame:
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def start_processing_metrics(self, *, start_time: Optional[float] = None):
|
||||
"""Start processing metrics collection.
|
||||
|
||||
Args:
|
||||
start_time: Optional timestamp to use as the start time. If None,
|
||||
uses the current time.
|
||||
"""
|
||||
if self.can_generate_metrics() and self.metrics_enabled:
|
||||
await self._metrics.start_processing_metrics(start_time=start_time)
|
||||
|
||||
async def stop_processing_metrics(self, *, end_time: Optional[float] = None):
|
||||
"""Stop processing metrics collection and push results.
|
||||
|
||||
Args:
|
||||
end_time: Optional timestamp to use as the end time. If None, uses
|
||||
the current time.
|
||||
"""
|
||||
if self.can_generate_metrics() and self.metrics_enabled:
|
||||
frame = await self._metrics.stop_processing_metrics(end_time=end_time)
|
||||
if frame:
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def start_llm_usage_metrics(self, tokens: LLMTokenUsage):
|
||||
"""Start LLM usage metrics collection.
|
||||
|
||||
@@ -500,7 +478,6 @@ class FrameProcessor(BaseObject):
|
||||
async def stop_all_metrics(self):
|
||||
"""Stop all active metrics collection."""
|
||||
await self.stop_ttfb_metrics()
|
||||
await self.stop_processing_metrics()
|
||||
await self.stop_text_aggregation_metrics()
|
||||
|
||||
def create_task(self, coroutine: Coroutine, name: Optional[str] = None) -> asyncio.Task:
|
||||
|
||||
@@ -75,7 +75,6 @@ from pipecat.frames.frames import (
|
||||
)
|
||||
from pipecat.metrics.metrics import (
|
||||
LLMUsageMetricsData,
|
||||
ProcessingMetricsData,
|
||||
TTFBMetricsData,
|
||||
TTSUsageMetricsData,
|
||||
)
|
||||
@@ -1547,10 +1546,6 @@ class RTVIObserver(BaseObserver):
|
||||
if "ttfb" not in metrics:
|
||||
metrics["ttfb"] = []
|
||||
metrics["ttfb"].append(d.model_dump(exclude_none=True))
|
||||
elif isinstance(d, ProcessingMetricsData):
|
||||
if "processing" not in metrics:
|
||||
metrics["processing"] = []
|
||||
metrics["processing"].append(d.model_dump(exclude_none=True))
|
||||
elif isinstance(d, LLMUsageMetricsData):
|
||||
if "tokens" not in metrics:
|
||||
metrics["tokens"] = []
|
||||
|
||||
@@ -90,7 +90,6 @@ class StrandsAgentsProcessor(FrameProcessor):
|
||||
ttfb_tracking = True
|
||||
try:
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
await self.start_processing_metrics()
|
||||
await self.start_ttfb_metrics()
|
||||
|
||||
if self.graph:
|
||||
@@ -148,7 +147,6 @@ class StrandsAgentsProcessor(FrameProcessor):
|
||||
if ttfb_tracking:
|
||||
await self.stop_ttfb_metrics()
|
||||
ttfb_tracking = False
|
||||
await self.stop_processing_metrics()
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
|
||||
@@ -16,7 +16,6 @@ from pipecat.metrics.metrics import (
|
||||
LLMTokenUsage,
|
||||
LLMUsageMetricsData,
|
||||
MetricsData,
|
||||
ProcessingMetricsData,
|
||||
TextAggregationMetricsData,
|
||||
TTFBMetricsData,
|
||||
TTSUsageMetricsData,
|
||||
@@ -43,7 +42,6 @@ class FrameProcessorMetrics(BaseObject):
|
||||
super().__init__()
|
||||
self._task_manager = None
|
||||
self._start_ttfb_time = 0
|
||||
self._start_processing_time = 0
|
||||
self._start_text_aggregation_time = 0
|
||||
self._last_ttfb_time = 0
|
||||
self._should_report_ttfb = True
|
||||
@@ -147,38 +145,6 @@ class FrameProcessorMetrics(BaseObject):
|
||||
self._start_ttfb_time = 0
|
||||
return MetricsFrame(data=[ttfb])
|
||||
|
||||
async def start_processing_metrics(self, *, start_time: Optional[float] = None):
|
||||
"""Start measuring processing time.
|
||||
|
||||
Args:
|
||||
start_time: Optional timestamp to use as the start time. If None,
|
||||
uses the current time.
|
||||
"""
|
||||
self._start_processing_time = start_time or time.time()
|
||||
|
||||
async def stop_processing_metrics(self, *, end_time: Optional[float] = None):
|
||||
"""Stop processing time measurement and generate metrics frame.
|
||||
|
||||
Args:
|
||||
end_time: Optional timestamp to use as the end time. If None, uses
|
||||
the current time.
|
||||
|
||||
Returns:
|
||||
MetricsFrame containing processing duration data, or None if not measuring.
|
||||
"""
|
||||
if self._start_processing_time == 0:
|
||||
return None
|
||||
|
||||
end_time = end_time or time.time()
|
||||
|
||||
value = end_time - self._start_processing_time
|
||||
logger.debug(f"{self._processor_name()} processing time: {value:.3f}s")
|
||||
processing = ProcessingMetricsData(
|
||||
processor=self._processor_name(), value=value, model=self._model_name()
|
||||
)
|
||||
self._start_processing_time = 0
|
||||
return MetricsFrame(data=[processing])
|
||||
|
||||
async def start_llm_usage_metrics(self, tokens: LLMTokenUsage):
|
||||
"""Record LLM token usage metrics.
|
||||
|
||||
|
||||
@@ -39,7 +39,6 @@ class SentryMetrics(FrameProcessorMetrics):
|
||||
"""
|
||||
super().__init__()
|
||||
self._ttfb_metrics_tx = None
|
||||
self._processing_metrics_tx = None
|
||||
self._sentry_available = sentry_sdk.is_initialized()
|
||||
if not self._sentry_available:
|
||||
logger.warning("Sentry SDK not initialized. Sentry features will be disabled.")
|
||||
@@ -105,35 +104,6 @@ class SentryMetrics(FrameProcessorMetrics):
|
||||
await self._sentry_queue.put(self._ttfb_metrics_tx)
|
||||
self._ttfb_metrics_tx = None
|
||||
|
||||
async def start_processing_metrics(self, *, start_time: Optional[float] = None):
|
||||
"""Start tracking frame processing metrics.
|
||||
|
||||
Args:
|
||||
start_time: Optional start timestamp override.
|
||||
"""
|
||||
await super().start_processing_metrics(start_time=start_time)
|
||||
|
||||
if self._sentry_available:
|
||||
self._processing_metrics_tx = sentry_sdk.start_transaction(
|
||||
op="processing",
|
||||
name=f"Processing for {self._processor_name()}",
|
||||
)
|
||||
logger.debug(
|
||||
f"{self} Sentry transaction started (ID: {self._processing_metrics_tx.span_id} Name: {self._processing_metrics_tx.name})"
|
||||
)
|
||||
|
||||
async def stop_processing_metrics(self, *, end_time: Optional[float] = None):
|
||||
"""Stop tracking frame processing metrics.
|
||||
|
||||
Args:
|
||||
end_time: Optional end timestamp override.
|
||||
"""
|
||||
await super().stop_processing_metrics(end_time=end_time)
|
||||
|
||||
if self._sentry_available and self._processing_metrics_tx:
|
||||
await self._sentry_queue.put(self._processing_metrics_tx)
|
||||
self._processing_metrics_tx = None
|
||||
|
||||
async def _sentry_task_handler(self):
|
||||
"""Background task handler for completing Sentry transactions."""
|
||||
running = True
|
||||
|
||||
@@ -427,7 +427,6 @@ class AnthropicLLMService(LLMService):
|
||||
|
||||
try:
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
await self.start_processing_metrics()
|
||||
|
||||
params_from_context = self._get_llm_invocation_params(context)
|
||||
|
||||
@@ -579,7 +578,6 @@ class AnthropicLLMService(LLMService):
|
||||
except Exception as e:
|
||||
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
|
||||
finally:
|
||||
await self.stop_processing_metrics()
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
comp_tokens = (
|
||||
completion_tokens
|
||||
|
||||
@@ -284,7 +284,6 @@ class AssemblyAISTTService(WebsocketSTTService):
|
||||
and self._websocket.state is State.OPEN
|
||||
):
|
||||
await self._websocket.send(json.dumps({"type": "ForceEndpoint"}))
|
||||
await self.start_processing_metrics()
|
||||
|
||||
@traced_stt
|
||||
async def _trace_transcription(self, transcript: str, is_final: bool, language: Language):
|
||||
@@ -482,7 +481,6 @@ class AssemblyAISTTService(WebsocketSTTService):
|
||||
)
|
||||
)
|
||||
await self._trace_transcription(message.transcript, True, self._settings.language)
|
||||
await self.stop_processing_metrics()
|
||||
else:
|
||||
await self.push_frame(
|
||||
InterimTranscriptionFrame(
|
||||
|
||||
@@ -1044,7 +1044,6 @@ class AWSBedrockLLMService(LLMService):
|
||||
|
||||
try:
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
await self.start_processing_metrics()
|
||||
|
||||
await self.start_ttfb_metrics()
|
||||
|
||||
@@ -1200,7 +1199,6 @@ class AWSBedrockLLMService(LLMService):
|
||||
except Exception as e:
|
||||
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
|
||||
finally:
|
||||
await self.stop_processing_metrics()
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
comp_tokens = (
|
||||
completion_tokens
|
||||
|
||||
@@ -213,8 +213,6 @@ class AWSTranscribeSTTService(WebsocketSTTService):
|
||||
|
||||
# Send the formatted event message
|
||||
await self._websocket.send(event_message)
|
||||
# Start metrics after first chunk sent
|
||||
await self.start_processing_metrics()
|
||||
except Exception as e:
|
||||
yield ErrorFrame(error=f"Error sending audio: {e}")
|
||||
|
||||
@@ -541,7 +539,6 @@ class AWSTranscribeSTTService(WebsocketSTTService):
|
||||
is_final,
|
||||
self._settings.language,
|
||||
)
|
||||
await self.stop_processing_metrics()
|
||||
else:
|
||||
await self.push_frame(
|
||||
InterimTranscriptionFrame(
|
||||
|
||||
@@ -173,7 +173,6 @@ class AzureSTTService(STTService):
|
||||
Frame: Either None for successful processing or ErrorFrame on failure.
|
||||
"""
|
||||
try:
|
||||
await self.start_processing_metrics()
|
||||
if self._audio_stream:
|
||||
self._audio_stream.write(audio)
|
||||
yield None
|
||||
@@ -248,7 +247,7 @@ class AzureSTTService(STTService):
|
||||
self, transcript: str, is_final: bool, language: Optional[Language] = None
|
||||
):
|
||||
"""Handle a transcription result with tracing."""
|
||||
await self.stop_processing_metrics()
|
||||
pass
|
||||
|
||||
def _on_handle_recognized(self, event):
|
||||
if event.result.reason == ResultReason.RecognizedSpeech and len(event.result.text) > 0:
|
||||
|
||||
@@ -24,7 +24,6 @@ from pipecat.frames.frames import (
|
||||
InterimTranscriptionFrame,
|
||||
StartFrame,
|
||||
TranscriptionFrame,
|
||||
VADUserStartedSpeakingFrame,
|
||||
VADUserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
@@ -241,10 +240,6 @@ class CartesiaSTTService(WebsocketSTTService):
|
||||
await super().cancel(frame)
|
||||
await self._disconnect()
|
||||
|
||||
async def _start_metrics(self):
|
||||
"""Start performance metrics collection for transcription processing."""
|
||||
await self.start_processing_metrics()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
"""Process incoming frames and handle speech events.
|
||||
|
||||
@@ -254,9 +249,7 @@ class CartesiaSTTService(WebsocketSTTService):
|
||||
"""
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, VADUserStartedSpeakingFrame):
|
||||
await self._start_metrics()
|
||||
elif isinstance(frame, VADUserStoppedSpeakingFrame):
|
||||
if isinstance(frame, VADUserStoppedSpeakingFrame):
|
||||
# Send finalize command to flush the transcription session
|
||||
if self._websocket and self._websocket.state is State.OPEN:
|
||||
await self._websocket.send("finalize")
|
||||
@@ -404,7 +397,6 @@ class CartesiaSTTService(WebsocketSTTService):
|
||||
)
|
||||
)
|
||||
await self._handle_transcription(transcript, is_final, language)
|
||||
await self.stop_processing_metrics()
|
||||
else:
|
||||
# For interim transcriptions, just push the frame without tracing
|
||||
await self.push_frame(
|
||||
|
||||
@@ -497,7 +497,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
|
||||
# both the "user started speaking" event and the first transcript simultaneously,
|
||||
# making this timing measurement meaningless in this context.
|
||||
# await self.start_ttfb_metrics()
|
||||
await self.start_processing_metrics()
|
||||
pass
|
||||
|
||||
@traced_stt
|
||||
async def _handle_transcription(
|
||||
@@ -753,7 +753,6 @@ class DeepgramFluxSTTService(WebsocketSTTService):
|
||||
)
|
||||
|
||||
await self._handle_transcription(transcript, True, self._settings.language)
|
||||
await self.stop_processing_metrics()
|
||||
await self.broadcast_frame(UserStoppedSpeakingFrame)
|
||||
await self._call_event_handler("on_end_of_turn", transcript)
|
||||
|
||||
|
||||
@@ -21,7 +21,6 @@ from pipecat.frames.frames import (
|
||||
TranscriptionFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
VADUserStartedSpeakingFrame,
|
||||
VADUserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
@@ -452,10 +451,6 @@ class DeepgramSTTService(STTService):
|
||||
# GH issue: https://github.com/deepgram/deepgram-python-sdk/issues/570
|
||||
await self._connection.finish()
|
||||
|
||||
async def _start_metrics(self):
|
||||
"""Start processing metrics collection for this utterance."""
|
||||
await self.start_processing_metrics()
|
||||
|
||||
async def _on_error(self, *args, **kwargs):
|
||||
error: ErrorResponse = kwargs["error"]
|
||||
logger.warning(f"{self} connection error, will retry: {error}")
|
||||
@@ -467,7 +462,6 @@ class DeepgramSTTService(STTService):
|
||||
await self._connect()
|
||||
|
||||
async def _on_speech_started(self, *args, **kwargs):
|
||||
await self._start_metrics()
|
||||
await self._call_event_handler("on_speech_started", *args, **kwargs)
|
||||
await self.broadcast_frame(UserStartedSpeakingFrame)
|
||||
if self._should_interrupt:
|
||||
@@ -511,7 +505,6 @@ class DeepgramSTTService(STTService):
|
||||
)
|
||||
)
|
||||
await self._handle_transcription(transcript, is_final, language)
|
||||
await self.stop_processing_metrics()
|
||||
else:
|
||||
# For interim transcriptions, just push the frame without tracing
|
||||
await self.push_frame(
|
||||
@@ -533,10 +526,7 @@ class DeepgramSTTService(STTService):
|
||||
"""
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, VADUserStartedSpeakingFrame) and not self.vad_enabled:
|
||||
# Start metrics if Deepgram VAD is disabled & pipeline VAD has detected speech
|
||||
await self._start_metrics()
|
||||
elif isinstance(frame, VADUserStoppedSpeakingFrame):
|
||||
if isinstance(frame, VADUserStoppedSpeakingFrame):
|
||||
# https://developers.deepgram.com/docs/finalize
|
||||
# Mark that we're awaiting a from_finalize response
|
||||
self.request_finalize()
|
||||
|
||||
@@ -27,7 +27,6 @@ from pipecat.frames.frames import (
|
||||
InterimTranscriptionFrame,
|
||||
StartFrame,
|
||||
TranscriptionFrame,
|
||||
VADUserStartedSpeakingFrame,
|
||||
VADUserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
@@ -390,7 +389,6 @@ class DeepgramSageMakerSTTService(STTService):
|
||||
)
|
||||
)
|
||||
await self._handle_transcription(transcript, is_final, language)
|
||||
await self.stop_processing_metrics()
|
||||
else:
|
||||
# Interim transcription
|
||||
await self.push_frame(
|
||||
@@ -420,10 +418,6 @@ class DeepgramSageMakerSTTService(STTService):
|
||||
"""
|
||||
pass
|
||||
|
||||
async def _start_metrics(self):
|
||||
"""Start processing metrics collection."""
|
||||
await self.start_processing_metrics()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
"""Process frames with Deepgram SageMaker-specific handling.
|
||||
|
||||
@@ -433,10 +427,7 @@ class DeepgramSageMakerSTTService(STTService):
|
||||
"""
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
# Start metrics when user starts speaking (if VAD is not provided by Deepgram)
|
||||
if isinstance(frame, VADUserStartedSpeakingFrame):
|
||||
await self._start_metrics()
|
||||
elif isinstance(frame, VADUserStoppedSpeakingFrame):
|
||||
if isinstance(frame, VADUserStoppedSpeakingFrame):
|
||||
# https://developers.deepgram.com/docs/finalize
|
||||
# Mark that we're awaiting a from_finalize response
|
||||
self.request_finalize()
|
||||
|
||||
@@ -31,7 +31,6 @@ from pipecat.frames.frames import (
|
||||
InterimTranscriptionFrame,
|
||||
StartFrame,
|
||||
TranscriptionFrame,
|
||||
VADUserStartedSpeakingFrame,
|
||||
VADUserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
@@ -342,7 +341,7 @@ class ElevenLabsSTTService(SegmentedSTTService):
|
||||
self, transcript: str, is_final: bool, language: Optional[str] = None
|
||||
):
|
||||
"""Handle a transcription result with tracing."""
|
||||
await self.stop_processing_metrics()
|
||||
pass
|
||||
|
||||
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
|
||||
"""Transcribe an audio segment using ElevenLabs' STT API.
|
||||
@@ -358,8 +357,6 @@ class ElevenLabsSTTService(SegmentedSTTService):
|
||||
Only non-empty transcriptions are yielded.
|
||||
"""
|
||||
try:
|
||||
await self.start_processing_metrics()
|
||||
|
||||
# Upload audio and get transcription result directly
|
||||
result = await self._transcribe_audio(audio)
|
||||
|
||||
@@ -563,10 +560,6 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
|
||||
await super().cancel(frame)
|
||||
await self._disconnect()
|
||||
|
||||
async def _start_metrics(self):
|
||||
"""Start performance metrics collection for transcription processing."""
|
||||
await self.start_processing_metrics()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
"""Process incoming frames and handle speech events.
|
||||
|
||||
@@ -576,10 +569,7 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
|
||||
"""
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, VADUserStartedSpeakingFrame):
|
||||
# Start metrics when user starts speaking
|
||||
await self._start_metrics()
|
||||
elif isinstance(frame, VADUserStoppedSpeakingFrame):
|
||||
if isinstance(frame, VADUserStoppedSpeakingFrame):
|
||||
# Send commit when user stops speaking (manual commit mode)
|
||||
if self._settings.commit_strategy == CommitStrategy.MANUAL:
|
||||
if self._websocket and self._websocket.state is State.OPEN:
|
||||
@@ -852,8 +842,6 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
|
||||
if not text:
|
||||
return
|
||||
|
||||
await self.stop_processing_metrics()
|
||||
|
||||
# Get language if provided
|
||||
language = data.get("language_code")
|
||||
|
||||
@@ -893,8 +881,6 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
|
||||
if not text:
|
||||
return
|
||||
|
||||
await self.stop_processing_metrics()
|
||||
|
||||
# Get language if provided
|
||||
language = data.get("language_code")
|
||||
|
||||
|
||||
@@ -257,7 +257,7 @@ class FalSTTService(SegmentedSTTService):
|
||||
self, transcript: str, is_final: bool, language: Optional[str] = None
|
||||
):
|
||||
"""Handle a transcription result with tracing."""
|
||||
await self.stop_processing_metrics()
|
||||
pass
|
||||
|
||||
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
|
||||
"""Transcribes an audio segment using Fal's Wizper API.
|
||||
@@ -273,8 +273,6 @@ class FalSTTService(SegmentedSTTService):
|
||||
Only non-empty transcriptions are yielded.
|
||||
"""
|
||||
try:
|
||||
await self.start_processing_metrics()
|
||||
|
||||
# Send to Fal directly (audio is already in WAV format from base class)
|
||||
data_uri = fal_client.encode(audio, "audio/x-wav")
|
||||
response = await self._fal_client.run(
|
||||
|
||||
@@ -477,8 +477,6 @@ class GladiaSTTService(WebsocketSTTService):
|
||||
Yields:
|
||||
None (processing is handled asynchronously via WebSocket).
|
||||
"""
|
||||
await self.start_processing_metrics()
|
||||
|
||||
# Add audio to buffer
|
||||
async with self._buffer_lock:
|
||||
self._audio_buffer.extend(audio)
|
||||
@@ -597,7 +595,7 @@ class GladiaSTTService(WebsocketSTTService):
|
||||
async def _handle_transcription(
|
||||
self, transcript: str, is_final: bool, language: Optional[str] = None
|
||||
):
|
||||
await self.stop_processing_metrics()
|
||||
pass
|
||||
|
||||
async def _on_speech_started(self):
|
||||
"""Handle speech start event from Gladia.
|
||||
|
||||
@@ -905,7 +905,6 @@ class GoogleSTTService(STTService):
|
||||
"""
|
||||
if self._streaming_task:
|
||||
# Queue the audio data
|
||||
await self.start_processing_metrics()
|
||||
await self._request_queue.put(audio)
|
||||
yield None
|
||||
|
||||
@@ -948,7 +947,6 @@ class GoogleSTTService(STTService):
|
||||
result=result,
|
||||
)
|
||||
)
|
||||
await self.stop_processing_metrics()
|
||||
await self._handle_transcription(
|
||||
transcript,
|
||||
is_final=True,
|
||||
|
||||
@@ -233,9 +233,7 @@ class GradiumSTTService(WebsocketSTTService):
|
||||
"""
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, VADUserStartedSpeakingFrame):
|
||||
await self.start_processing_metrics()
|
||||
elif isinstance(frame, VADUserStoppedSpeakingFrame):
|
||||
if isinstance(frame, VADUserStoppedSpeakingFrame):
|
||||
await self._flush_transcription()
|
||||
|
||||
async def _flush_transcription(self):
|
||||
@@ -420,4 +418,3 @@ class GradiumSTTService(WebsocketSTTService):
|
||||
)
|
||||
)
|
||||
await self._trace_transcription(text, is_final=True, language=None)
|
||||
await self.stop_processing_metrics()
|
||||
|
||||
@@ -661,7 +661,6 @@ class GrokRealtimeLLMService(LLMService):
|
||||
)
|
||||
await self.start_llm_usage_metrics(tokens)
|
||||
|
||||
await self.stop_processing_metrics()
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
self._current_assistant_response = None
|
||||
|
||||
@@ -736,7 +735,6 @@ class GrokRealtimeLLMService(LLMService):
|
||||
async def _handle_evt_speech_stopped(self, evt):
|
||||
"""Handle speech stopped event from VAD."""
|
||||
await self.start_ttfb_metrics()
|
||||
await self.start_processing_metrics()
|
||||
await self.broadcast_frame(UserStoppedSpeakingFrame)
|
||||
|
||||
async def _handle_evt_error(self, evt):
|
||||
@@ -785,7 +783,6 @@ class GrokRealtimeLLMService(LLMService):
|
||||
logger.debug("Creating Grok response")
|
||||
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
await self.start_processing_metrics()
|
||||
await self.start_ttfb_metrics()
|
||||
|
||||
await self.send_client_event(
|
||||
|
||||
@@ -129,8 +129,6 @@ class HathoraSTTService(SegmentedSTTService):
|
||||
Frame: Frames containing transcription results (typically TextFrame).
|
||||
"""
|
||||
try:
|
||||
await self.start_processing_metrics()
|
||||
|
||||
url = f"{self._base_url}"
|
||||
|
||||
payload = {
|
||||
@@ -170,7 +168,5 @@ class HathoraSTTService(SegmentedSTTService):
|
||||
result=response,
|
||||
)
|
||||
|
||||
await self.stop_processing_metrics()
|
||||
|
||||
except Exception as e:
|
||||
yield ErrorFrame(error=f"Unknown error occurred: {e}")
|
||||
|
||||
@@ -143,7 +143,6 @@ class HathoraTTSService(TTSService):
|
||||
Frame: Audio frames containing the synthesized speech.
|
||||
"""
|
||||
try:
|
||||
await self.start_processing_metrics()
|
||||
await self.start_ttfb_metrics()
|
||||
|
||||
url = f"{self._base_url}"
|
||||
@@ -187,5 +186,4 @@ class HathoraTTSService(TTSService):
|
||||
yield ErrorFrame(error=f"Unknown error occurred: {e}")
|
||||
finally:
|
||||
await self.stop_ttfb_metrics()
|
||||
await self.stop_processing_metrics()
|
||||
yield TTSStoppedFrame(context_id=context_id)
|
||||
|
||||
@@ -73,8 +73,6 @@ class ImageGenService(AIService):
|
||||
|
||||
if isinstance(frame, TextFrame):
|
||||
await self.push_frame(frame, direction)
|
||||
await self.start_processing_metrics()
|
||||
await self.process_generator(self.run_image_gen(frame.text))
|
||||
await self.stop_processing_metrics()
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
@@ -342,7 +342,6 @@ class NvidiaSTTService(STTService):
|
||||
transcript = result.alternatives[0].transcript
|
||||
if transcript and len(transcript) > 0:
|
||||
if result.is_final:
|
||||
await self.stop_processing_metrics()
|
||||
await self.push_frame(
|
||||
TranscriptionFrame(
|
||||
transcript,
|
||||
@@ -377,7 +376,6 @@ class NvidiaSTTService(STTService):
|
||||
Yields:
|
||||
None - transcription results are pushed to the pipeline via frames.
|
||||
"""
|
||||
await self.start_processing_metrics()
|
||||
await self._queue.put(audio)
|
||||
yield None
|
||||
|
||||
@@ -620,13 +618,9 @@ class NvidiaSegmentedSTTService(SegmentedSTTService):
|
||||
assert self._asr_service is not None, "ASR service not initialized"
|
||||
assert self._config is not None, "Recognition config not created"
|
||||
|
||||
await self.start_processing_metrics()
|
||||
|
||||
# Process audio with NVIDIA Riva ASR - explicitly request non-future response
|
||||
raw_response = self._asr_service.offline_recognize(audio, self._config, future=False)
|
||||
|
||||
await self.stop_processing_metrics()
|
||||
|
||||
# Process the response - handle different possible return types
|
||||
# If it's a future-like object, get the result
|
||||
if hasattr(raw_response, "result"):
|
||||
|
||||
@@ -544,7 +544,6 @@ class BaseOpenAILLMService(LLMService):
|
||||
if context:
|
||||
try:
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
await self.start_processing_metrics()
|
||||
await self._process_context(context)
|
||||
except httpx.TimeoutException as e:
|
||||
await self._call_event_handler("on_completion_timeout")
|
||||
@@ -552,5 +551,4 @@ class BaseOpenAILLMService(LLMService):
|
||||
except Exception as e:
|
||||
await self.push_error(error_msg=f"Error during completion: {e}", exception=e)
|
||||
finally:
|
||||
await self.stop_processing_metrics()
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
|
||||
@@ -751,7 +751,6 @@ class OpenAIRealtimeLLMService(LLMService):
|
||||
cache_read_input_tokens=cached_tokens,
|
||||
)
|
||||
await self.start_llm_usage_metrics(tokens)
|
||||
await self.stop_processing_metrics()
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
self._current_assistant_response = None
|
||||
# error handling
|
||||
@@ -840,7 +839,6 @@ class OpenAIRealtimeLLMService(LLMService):
|
||||
|
||||
async def _handle_evt_speech_stopped(self, evt):
|
||||
await self.start_ttfb_metrics()
|
||||
await self.start_processing_metrics()
|
||||
await self.broadcast_frame(UserStoppedSpeakingFrame)
|
||||
|
||||
async def _maybe_handle_evt_retrieve_conversation_item_error(self, evt: events.ErrorEvent):
|
||||
@@ -917,7 +915,6 @@ class OpenAIRealtimeLLMService(LLMService):
|
||||
logger.debug("Creating response")
|
||||
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
await self.start_processing_metrics()
|
||||
await self.start_ttfb_metrics()
|
||||
await self.send_client_event(
|
||||
events.ResponseCreateEvent(
|
||||
|
||||
@@ -351,9 +351,7 @@ class OpenAIRealtimeSTTService(WebsocketSTTService):
|
||||
|
||||
# Handle local VAD events when server-side VAD is disabled.
|
||||
if not self._server_vad_enabled:
|
||||
if isinstance(frame, VADUserStartedSpeakingFrame):
|
||||
await self.start_processing_metrics()
|
||||
elif isinstance(frame, VADUserStoppedSpeakingFrame):
|
||||
if isinstance(frame, VADUserStoppedSpeakingFrame):
|
||||
await self._commit_audio_buffer()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
@@ -609,7 +607,6 @@ class OpenAIRealtimeSTTService(WebsocketSTTService):
|
||||
)
|
||||
)
|
||||
await self._handle_transcription_trace(transcript, True)
|
||||
await self.stop_processing_metrics()
|
||||
|
||||
@traced_stt
|
||||
async def _handle_transcription_trace(
|
||||
@@ -640,7 +637,6 @@ class OpenAIRealtimeSTTService(WebsocketSTTService):
|
||||
await self.broadcast_frame(UserStartedSpeakingFrame)
|
||||
if self._should_interrupt:
|
||||
await self.push_interruption_task_frame_and_wait()
|
||||
await self.start_processing_metrics()
|
||||
|
||||
async def _handle_speech_stopped(self, evt: dict):
|
||||
"""Handle server-side VAD speech stop.
|
||||
|
||||
@@ -671,7 +671,6 @@ class OpenAIRealtimeBetaLLMService(LLMService):
|
||||
total_tokens=evt.response.usage.total_tokens,
|
||||
)
|
||||
await self.start_llm_usage_metrics(tokens)
|
||||
await self.stop_processing_metrics()
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
self._current_assistant_response = None
|
||||
# error handling
|
||||
@@ -710,7 +709,6 @@ class OpenAIRealtimeBetaLLMService(LLMService):
|
||||
|
||||
async def _handle_evt_speech_stopped(self, evt):
|
||||
await self.start_ttfb_metrics()
|
||||
await self.start_processing_metrics()
|
||||
await self.broadcast_frame(UserStoppedSpeakingFrame)
|
||||
|
||||
async def _maybe_handle_evt_retrieve_conversation_item_error(self, evt: events.ErrorEvent):
|
||||
@@ -797,7 +795,6 @@ class OpenAIRealtimeBetaLLMService(LLMService):
|
||||
logger.debug(f"Creating response: {self._context.get_messages_for_logging()}")
|
||||
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
await self.start_processing_metrics()
|
||||
await self.start_ttfb_metrics()
|
||||
await self.send_client_event(
|
||||
events.ResponseCreateEvent(
|
||||
|
||||
@@ -27,7 +27,6 @@ from pipecat.frames.frames import (
|
||||
TranscriptionFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
VADUserStartedSpeakingFrame,
|
||||
VADUserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
@@ -321,9 +320,7 @@ class SarvamSTTService(STTService):
|
||||
|
||||
# Only handle VAD frames when not using Sarvam's VAD signals
|
||||
if not self._settings.vad_signals:
|
||||
if isinstance(frame, VADUserStartedSpeakingFrame):
|
||||
await self._start_metrics()
|
||||
elif isinstance(frame, VADUserStoppedSpeakingFrame):
|
||||
if isinstance(frame, VADUserStoppedSpeakingFrame):
|
||||
if self._socket_client:
|
||||
await self._socket_client.flush()
|
||||
|
||||
@@ -639,7 +636,6 @@ class SarvamSTTService(STTService):
|
||||
logger.debug(f"VAD Signal: {signal}, Occurred at: {timestamp}")
|
||||
|
||||
if signal == "START_SPEECH":
|
||||
await self._start_metrics()
|
||||
logger.debug("User started speaking")
|
||||
await self._call_event_handler("on_speech_started")
|
||||
await self.broadcast_frame(UserStartedSpeakingFrame)
|
||||
@@ -679,8 +675,6 @@ class SarvamSTTService(STTService):
|
||||
)
|
||||
)
|
||||
|
||||
await self.stop_processing_metrics()
|
||||
|
||||
except Exception as e:
|
||||
await self.push_error(error_msg=f"Failed to handle message: {e}", exception=e)
|
||||
await self.stop_all_metrics()
|
||||
@@ -739,7 +733,3 @@ class SarvamSTTService(STTService):
|
||||
await self._socket_client.translate(**method_kwargs)
|
||||
else:
|
||||
await self._socket_client.transcribe(**method_kwargs)
|
||||
|
||||
async def _start_metrics(self):
|
||||
"""Start processing metrics collection."""
|
||||
await self.start_processing_metrics()
|
||||
|
||||
@@ -465,7 +465,6 @@ class SonioxSTTService(WebsocketSTTService):
|
||||
)
|
||||
)
|
||||
await self._handle_transcription(text, is_final=True)
|
||||
await self.stop_processing_metrics()
|
||||
self._final_transcription_buffer = []
|
||||
|
||||
async for message in self._get_websocket():
|
||||
@@ -492,8 +491,6 @@ class SonioxSTTService(WebsocketSTTService):
|
||||
# the rest will be sent as interim tokens (even final tokens).
|
||||
await send_endpoint_transcript()
|
||||
else:
|
||||
if not self._final_transcription_buffer:
|
||||
await self.start_processing_metrics()
|
||||
self._final_transcription_buffer.append(token)
|
||||
else:
|
||||
non_final_transcription.append(token)
|
||||
|
||||
@@ -833,7 +833,6 @@ class SpeechmaticsSTTService(STTService):
|
||||
message: the message payload.
|
||||
"""
|
||||
logger.debug(f"{self} StartOfTurn received")
|
||||
# await self.start_processing_metrics()
|
||||
await self.broadcast_frame(UserStartedSpeakingFrame)
|
||||
if self._should_interrupt:
|
||||
await self.push_interruption_task_frame_and_wait()
|
||||
@@ -854,7 +853,6 @@ class SpeechmaticsSTTService(STTService):
|
||||
message: the message payload.
|
||||
"""
|
||||
logger.debug(f"{self} EndOfTurn received")
|
||||
# await self.stop_processing_metrics()
|
||||
await self.broadcast_frame(UserStoppedSpeakingFrame)
|
||||
|
||||
async def _handle_speakers_result(self, message: dict[str, Any]) -> None:
|
||||
|
||||
@@ -836,20 +836,12 @@ class TTSService(AIService):
|
||||
if self._is_streaming_tokens:
|
||||
self._streamed_text += text
|
||||
|
||||
# Skip per-token processing metrics when streaming. The per-token
|
||||
# processing time is just websocket send overhead (~0.1ms) and not
|
||||
# meaningful. TTFB captures the important timing for streaming TTS.
|
||||
if not self._is_streaming_tokens:
|
||||
await self.start_processing_metrics()
|
||||
|
||||
# Process all filters.
|
||||
for filter in self._text_filters:
|
||||
await filter.reset_interruption()
|
||||
text = await filter.filter(text)
|
||||
|
||||
if not text.strip():
|
||||
if not self._is_streaming_tokens:
|
||||
await self.stop_processing_metrics()
|
||||
return
|
||||
|
||||
# Create context ID and store metadata
|
||||
@@ -887,9 +879,6 @@ class TTSService(AIService):
|
||||
|
||||
await self.process_generator(self.run_tts(prepared_text, context_id))
|
||||
|
||||
if not self._is_streaming_tokens:
|
||||
await self.stop_processing_metrics()
|
||||
|
||||
if self._push_text_frames:
|
||||
# In TTS services that support word timestamps, the TTSTextFrames
|
||||
# are pushed as words are spoken. However, in the case where the TTS service
|
||||
|
||||
@@ -551,7 +551,6 @@ class UltravoxRealtimeLLMService(LLMService):
|
||||
if not audio:
|
||||
return
|
||||
if not self._bot_responding:
|
||||
await self.start_processing_metrics()
|
||||
await self.stop_ttfb_metrics()
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
await self.push_frame(TTSStartedFrame())
|
||||
@@ -559,7 +558,6 @@ class UltravoxRealtimeLLMService(LLMService):
|
||||
await self.push_frame(TTSAudioRawFrame(audio, self._sample_rate, 1))
|
||||
|
||||
async def _handle_response_end(self):
|
||||
await self.stop_processing_metrics()
|
||||
if self._bot_responding == "voice":
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
@@ -606,12 +604,10 @@ class UltravoxRealtimeLLMService(LLMService):
|
||||
await self.push_frame(tts_frame)
|
||||
elif medium == "text":
|
||||
if final:
|
||||
await self.stop_processing_metrics()
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
self._bot_responding = None
|
||||
elif text or delta:
|
||||
if not self._bot_responding:
|
||||
await self.start_processing_metrics()
|
||||
await self.stop_ttfb_metrics()
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
self._bot_responding = "text"
|
||||
|
||||
@@ -74,8 +74,6 @@ class VisionService(AIService):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, UserImageRawFrame) and frame.text:
|
||||
await self.start_processing_metrics()
|
||||
await self.process_generator(self.run_vision(frame))
|
||||
await self.stop_processing_metrics()
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
@@ -229,12 +229,8 @@ class BaseWhisperSTTService(SegmentedSTTService):
|
||||
or an ErrorFrame if transcription fails.
|
||||
"""
|
||||
try:
|
||||
await self.start_processing_metrics()
|
||||
|
||||
response = await self._transcribe(audio)
|
||||
|
||||
await self.stop_processing_metrics()
|
||||
|
||||
text = response.text.strip()
|
||||
|
||||
if text:
|
||||
|
||||
@@ -314,8 +314,6 @@ class WhisperSTTService(SegmentedSTTService):
|
||||
yield ErrorFrame("Whisper model not available")
|
||||
return
|
||||
|
||||
await self.start_processing_metrics()
|
||||
|
||||
# Divide by 32768 because we have signed 16-bit data.
|
||||
audio_float = np.frombuffer(audio, dtype=np.int16).astype(np.float32) / 32768.0
|
||||
|
||||
@@ -327,8 +325,6 @@ class WhisperSTTService(SegmentedSTTService):
|
||||
if segment.no_speech_prob < self._no_speech_prob:
|
||||
text += f"{segment.text} "
|
||||
|
||||
await self.stop_processing_metrics()
|
||||
|
||||
if text:
|
||||
await self._handle_transcription(text, True, self._settings.language)
|
||||
logger.debug(f"Transcription: [{text}]")
|
||||
@@ -414,8 +410,6 @@ class WhisperSTTServiceMLX(WhisperSTTService):
|
||||
try:
|
||||
import mlx_whisper
|
||||
|
||||
await self.start_processing_metrics()
|
||||
|
||||
# Divide by 32768 because we have signed 16-bit data.
|
||||
audio_float = np.frombuffer(audio, dtype=np.int16).astype(np.float32) / 32768.0
|
||||
|
||||
@@ -438,8 +432,6 @@ class WhisperSTTServiceMLX(WhisperSTTService):
|
||||
if len(text.strip()) == 0:
|
||||
text = None
|
||||
|
||||
await self.stop_processing_metrics()
|
||||
|
||||
if text:
|
||||
await self._handle_transcription(text, True, self._settings.language)
|
||||
logger.debug(f"Transcription: [{text}]")
|
||||
|
||||
@@ -61,8 +61,6 @@ async def test_openai_llm_emits_error_frame_on_timeout():
|
||||
)
|
||||
|
||||
# Mock metrics methods
|
||||
service.start_processing_metrics = AsyncMock()
|
||||
service.stop_processing_metrics = AsyncMock()
|
||||
service.start_ttfb_metrics = AsyncMock()
|
||||
|
||||
# Create a context frame to process
|
||||
@@ -108,8 +106,6 @@ async def test_openai_llm_timeout_still_pushes_end_frame():
|
||||
service.push_error = AsyncMock()
|
||||
service._call_event_handler = AsyncMock()
|
||||
service._process_context = AsyncMock(side_effect=httpx.TimeoutException("Timeout"))
|
||||
service.start_processing_metrics = AsyncMock()
|
||||
service.stop_processing_metrics = AsyncMock()
|
||||
|
||||
context = LLMContext(
|
||||
messages=[{"role": "user", "content": "Hello"}],
|
||||
@@ -123,9 +119,6 @@ async def test_openai_llm_timeout_still_pushes_end_frame():
|
||||
assert LLMFullResponseStartFrame in frame_types
|
||||
assert LLMFullResponseEndFrame in frame_types
|
||||
|
||||
# Verify metrics were stopped
|
||||
service.stop_processing_metrics.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_openai_llm_stream_closed_on_cancellation():
|
||||
@@ -208,8 +201,6 @@ async def test_openai_llm_emits_error_frame_on_exception():
|
||||
service.push_error = mock_push_error
|
||||
service._call_event_handler = AsyncMock()
|
||||
service._process_context = AsyncMock(side_effect=RuntimeError("API Error"))
|
||||
service.start_processing_metrics = AsyncMock()
|
||||
service.stop_processing_metrics = AsyncMock()
|
||||
|
||||
context = LLMContext(
|
||||
messages=[{"role": "user", "content": "Hello"}],
|
||||
|
||||
Reference in New Issue
Block a user