From d1116d149e62039cdbed116aec12deadbabf8794 Mon Sep 17 00:00:00 2001 From: Angad Singh Date: Fri, 14 Nov 2025 23:33:05 +0530 Subject: [PATCH] feat: Add ErrorFrame emission to TTS/STT services for pipeline error detection (#2881) * feat: Add ErrorFrame emission to TTS/STT services for pipeline error detection - Add ErrorFrame emission to all major TTS/STT services during initialization and runtime failures - Services updated: Cartesia, ElevenLabs, Deepgram, AssemblyAI, Rime, Azure - ErrorFrame objects emitted with fatal=False for graceful degradation - Enables on_pipeline_error event handler to detect service failures programmatically - Add comprehensive pytest test suite to verify ErrorFrame emission - Fixes issue where services failed gracefully but didn't emit ErrorFrame objects This allows developers to implement real-time error monitoring and alerting using the on_pipeline_error event handler introduced in v0.0.90. * Update STT and TTS services to use consistent error handling pattern - Improves error handling consistency across all services * Add changelog entry for STT/TTS error handling improvements * Linting issues Resolved * Azure STT ErrorFrames added with consistent patterns * Cartesia STT and Deepgram STT; additional fixes made * Removed Fatal Flags across services, removed duplication * Moving the changelog entry to the correct place. * Refactoring some classes to use yield instead of push_error directly. * Fixing ruff format. --------- Co-authored-by: Filipi Fuchter --- CHANGELOG.md | 3 ++ src/pipecat/services/assemblyai/stt.py | 19 ++++++--- src/pipecat/services/asyncai/tts.py | 16 +++++--- src/pipecat/services/aws/stt.py | 28 ++++++------- src/pipecat/services/azure/stt.py | 39 ++++++++++++------- src/pipecat/services/azure/tts.py | 7 +++- src/pipecat/services/cartesia/stt.py | 9 ++++- src/pipecat/services/cartesia/tts.py | 16 +++++--- src/pipecat/services/deepgram/flux/stt.py | 16 +++++--- src/pipecat/services/deepgram/stt.py | 2 +- src/pipecat/services/deepgram/tts.py | 4 +- src/pipecat/services/elevenlabs/stt.py | 4 +- src/pipecat/services/elevenlabs/tts.py | 23 +++++++---- src/pipecat/services/fal/stt.py | 4 +- src/pipecat/services/fish/tts.py | 16 +++++--- src/pipecat/services/gladia/stt.py | 10 +++-- .../services/google/gemini_live/llm.py | 6 +-- src/pipecat/services/google/stt.py | 13 ++++--- src/pipecat/services/google/tts.py | 6 +-- src/pipecat/services/groq/tts.py | 9 ++++- src/pipecat/services/hume/tts.py | 4 +- src/pipecat/services/inworld/tts.py | 6 +-- src/pipecat/services/lmnt/tts.py | 12 ++++-- src/pipecat/services/minimax/tts.py | 4 +- src/pipecat/services/neuphonic/tts.py | 17 +++++--- src/pipecat/services/openai/realtime/llm.py | 8 ++-- src/pipecat/services/openai/tts.py | 3 +- .../services/openai_realtime_beta/openai.py | 8 ++-- src/pipecat/services/piper/tts.py | 6 +-- src/pipecat/services/playht/tts.py | 18 +++++---- src/pipecat/services/rime/tts.py | 16 +++++--- src/pipecat/services/riva/stt.py | 4 +- src/pipecat/services/riva/tts.py | 2 + src/pipecat/services/sarvam/tts.py | 23 ++++++----- src/pipecat/services/soniox/stt.py | 10 ++--- src/pipecat/services/speechmatics/stt.py | 12 ++++-- src/pipecat/services/ultravox/stt.py | 13 ++++--- src/pipecat/services/websocket_service.py | 2 +- src/pipecat/services/whisper/base_stt.py | 4 +- src/pipecat/services/whisper/stt.py | 4 +- src/pipecat/services/xtts/tts.py | 4 +- 41 files changed, 260 insertions(+), 170 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3748892b6..7cda253c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- Updated all STT and TTS services to use consistent error handling pattern with + `push_error()` method for better pipeline error event integration. + - Added Hindi support for Rime TTS services. - Updated `GeminiTTSService` to use Google Cloud Text-to-Speech streaming API diff --git a/src/pipecat/services/assemblyai/stt.py b/src/pipecat/services/assemblyai/stt.py index b3f20800c..d78a42841 100644 --- a/src/pipecat/services/assemblyai/stt.py +++ b/src/pipecat/services/assemblyai/stt.py @@ -21,6 +21,7 @@ from pipecat import __version__ as pipecat_version from pipecat.frames.frames import ( CancelFrame, EndFrame, + ErrorFrame, Frame, InterimTranscriptionFrame, StartFrame, @@ -205,8 +206,9 @@ class AssemblyAISTTService(STTService): await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"Failed to connect to AssemblyAI: {e}") + logger.error(f"{self} exception: {e}") self._connected = False + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) raise async def _disconnect(self): @@ -231,7 +233,8 @@ class AssemblyAISTTService(STTService): logger.warning("Timed out waiting for termination message from server") except Exception as e: - logger.warning(f"Error during termination handshake: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) if self._receive_task: await self.cancel_task(self._receive_task) @@ -239,7 +242,8 @@ class AssemblyAISTTService(STTService): await self._websocket.close() except Exception as e: - logger.error(f"Error during disconnect: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._websocket = None @@ -258,11 +262,13 @@ class AssemblyAISTTService(STTService): except websockets.exceptions.ConnectionClosedOK: break except Exception as e: - logger.error(f"Error processing WebSocket message: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) break except Exception as e: - logger.error(f"Fatal error in receive handler: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) def _parse_message(self, message: Dict[str, Any]) -> BaseMessage: """Parse a raw message into the appropriate message type.""" @@ -291,7 +297,8 @@ class AssemblyAISTTService(STTService): elif isinstance(parsed_message, TerminationMessage): await self._handle_termination(parsed_message) except Exception as e: - logger.error(f"Error handling message: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) async def _handle_termination(self, message: TerminationMessage): """Handle termination message.""" diff --git a/src/pipecat/services/asyncai/tts.py b/src/pipecat/services/asyncai/tts.py index 78cdd7ef8..8df597c56 100644 --- a/src/pipecat/services/asyncai/tts.py +++ b/src/pipecat/services/asyncai/tts.py @@ -237,7 +237,8 @@ class AsyncAITTSService(InterruptibleTTSService): await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"{self} initialization error: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") @@ -249,7 +250,8 @@ class AsyncAITTSService(InterruptibleTTSService): logger.debug("Disconnecting from Async") await self._websocket.close() except Exception as e: - logger.error(f"{self} error closing websocket: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._websocket = None self._started = False @@ -297,7 +299,7 @@ class AsyncAITTSService(InterruptibleTTSService): logger.error(f"{self} error: {msg}") await self.push_frame(TTSStoppedFrame()) await self.stop_all_metrics() - await self.push_error(ErrorFrame(f"{self} error: {msg['message']}")) + await self.push_error(ErrorFrame(error=f"{self} error: {msg['message']}")) else: logger.error(f"{self} error, unknown message type: {msg}") @@ -342,7 +344,8 @@ class AsyncAITTSService(InterruptibleTTSService): await self._get_websocket().send(msg) await self.start_tts_usage_metrics(text) except Exception as e: - logger.error(f"{self} error sending message: {e}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") yield TTSStoppedFrame() await self._disconnect() await self._connect() @@ -350,6 +353,7 @@ class AsyncAITTSService(InterruptibleTTSService): yield None except Exception as e: logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") class AsyncAIHttpTTSService(TTSService): @@ -492,7 +496,7 @@ class AsyncAIHttpTTSService(TTSService): if response.status != 200: error_text = await response.text() logger.error(f"Async API error: {error_text}") - await self.push_error(ErrorFrame(f"Async API error: {error_text}")) + await self.push_error(ErrorFrame(error=f"Async API error: {error_text}")) raise Exception(f"Async API returned status {response.status}: {error_text}") audio_data = await response.read() @@ -509,7 +513,7 @@ class AsyncAIHttpTTSService(TTSService): except Exception as e: logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(f"Error generating TTS: {e}")) + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: await self.stop_ttfb_metrics() yield TTSStoppedFrame() diff --git a/src/pipecat/services/aws/stt.py b/src/pipecat/services/aws/stt.py index b1e0b5ba7..8d89d3198 100644 --- a/src/pipecat/services/aws/stt.py +++ b/src/pipecat/services/aws/stt.py @@ -140,7 +140,8 @@ class AWSTranscribeSTTService(STTService): return logger.warning("WebSocket connection not established after connect") except Exception as e: - logger.error(f"Failed to connect (attempt {retry_count + 1}/{max_retries}): {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) retry_count += 1 if retry_count < max_retries: await asyncio.sleep(1) # Wait before retrying @@ -181,8 +182,8 @@ class AWSTranscribeSTTService(STTService): try: await self._connect() except Exception as e: - logger.error(f"Failed to reconnect: {e}") - yield ErrorFrame("Failed to reconnect to AWS Transcribe", fatal=False) + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") return # Format the audio data according to AWS event stream format @@ -199,13 +200,13 @@ class AWSTranscribeSTTService(STTService): await self._disconnect() # Don't yield error here - we'll retry on next frame except Exception as e: - logger.error(f"Error sending audio: {e}") - yield ErrorFrame(f"AWS Transcribe error: {str(e)}", fatal=False) + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") await self._disconnect() except Exception as e: - logger.error(f"Error in run_stt: {e}") - yield ErrorFrame(f"AWS Transcribe error: {str(e)}", fatal=False) + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") await self._disconnect() async def _connect(self): @@ -288,7 +289,8 @@ class AWSTranscribeSTTService(STTService): await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"{self} Failed to connect to AWS Transcribe: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) await self._disconnect() raise @@ -308,7 +310,8 @@ class AWSTranscribeSTTService(STTService): await self._ws_client.send(json.dumps(end_stream)) await self._ws_client.close() except Exception as e: - logger.warning(f"{self} Error closing WebSocket connection: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._ws_client = None await self._call_event_handler("on_disconnected") @@ -527,9 +530,7 @@ class AWSTranscribeSTTService(STTService): elif headers.get(":message-type") == "exception": error_msg = payload.get("Message", "Unknown error") logger.error(f"{self} Exception from AWS: {error_msg}") - await self.push_frame( - ErrorFrame(f"AWS Transcribe error: {error_msg}", fatal=False) - ) + await self.push_frame(ErrorFrame(f"AWS Transcribe error: {error_msg}")) else: logger.debug(f"{self} Other message type received: {headers}") logger.debug(f"{self} Payload: {payload}") @@ -537,5 +538,6 @@ class AWSTranscribeSTTService(STTService): logger.error(f"{self} WebSocket connection closed in receive loop: {e}") break except Exception as e: - logger.error(f"{self} Unexpected error in receive loop: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) break diff --git a/src/pipecat/services/azure/stt.py b/src/pipecat/services/azure/stt.py index 586a94e44..85a0508a0 100644 --- a/src/pipecat/services/azure/stt.py +++ b/src/pipecat/services/azure/stt.py @@ -18,6 +18,7 @@ from loguru import logger from pipecat.frames.frames import ( CancelFrame, EndFrame, + ErrorFrame, Frame, InterimTranscriptionFrame, StartFrame, @@ -111,13 +112,17 @@ class AzureSTTService(STTService): audio: Raw audio bytes to process. Yields: - None - actual transcription frames are pushed via callbacks. + Frame: Either None for successful processing or ErrorFrame on failure. """ - await self.start_processing_metrics() - await self.start_ttfb_metrics() - if self._audio_stream: - self._audio_stream.write(audio) - yield None + try: + await self.start_processing_metrics() + await self.start_ttfb_metrics() + if self._audio_stream: + self._audio_stream.write(audio) + yield None + except Exception as e: + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") async def start(self, frame: StartFrame): """Start the speech recognition service. @@ -133,17 +138,21 @@ class AzureSTTService(STTService): if self._audio_stream: return - stream_format = AudioStreamFormat(samples_per_second=self.sample_rate, channels=1) - self._audio_stream = PushAudioInputStream(stream_format) + try: + stream_format = AudioStreamFormat(samples_per_second=self.sample_rate, channels=1) + self._audio_stream = PushAudioInputStream(stream_format) - audio_config = AudioConfig(stream=self._audio_stream) + audio_config = AudioConfig(stream=self._audio_stream) - self._speech_recognizer = SpeechRecognizer( - speech_config=self._speech_config, audio_config=audio_config - ) - self._speech_recognizer.recognizing.connect(self._on_handle_recognizing) - self._speech_recognizer.recognized.connect(self._on_handle_recognized) - self._speech_recognizer.start_continuous_recognition_async() + self._speech_recognizer = SpeechRecognizer( + speech_config=self._speech_config, audio_config=audio_config + ) + self._speech_recognizer.recognizing.connect(self._on_handle_recognizing) + self._speech_recognizer.recognized.connect(self._on_handle_recognized) + self._speech_recognizer.start_continuous_recognition_async() + except Exception as e: + logger.error(f"{self} exception during initialization: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) async def stop(self, frame: EndFrame): """Stop the speech recognition service. diff --git a/src/pipecat/services/azure/tts.py b/src/pipecat/services/azure/tts.py index d0ae42796..8ac8b70e3 100644 --- a/src/pipecat/services/azure/tts.py +++ b/src/pipecat/services/azure/tts.py @@ -337,7 +337,7 @@ class AzureTTSService(AzureBaseTTSService): if self._speech_synthesizer is None: error_msg = "Speech synthesizer not initialized." logger.error(error_msg) - yield ErrorFrame(error_msg) + yield ErrorFrame(error=error_msg) return try: @@ -364,13 +364,15 @@ class AzureTTSService(AzureBaseTTSService): yield TTSStoppedFrame() except Exception as e: - logger.error(f"{self} error during synthesis: {e}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") yield TTSStoppedFrame() # Could add reconnection logic here if needed return except Exception as e: logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") class AzureHttpTTSService(AzureBaseTTSService): @@ -448,3 +450,4 @@ class AzureHttpTTSService(AzureBaseTTSService): logger.warning(f"Speech synthesis canceled: {cancellation_details.reason}") if cancellation_details.reason == CancellationReason.Error: logger.error(f"{self} error: {cancellation_details.error_details}") + yield ErrorFrame(error=f"{self} error: {cancellation_details.error_details}") diff --git a/src/pipecat/services/cartesia/stt.py b/src/pipecat/services/cartesia/stt.py index b4e232c4a..a2ae9432f 100644 --- a/src/pipecat/services/cartesia/stt.py +++ b/src/pipecat/services/cartesia/stt.py @@ -20,6 +20,7 @@ from loguru import logger from pipecat.frames.frames import ( CancelFrame, EndFrame, + ErrorFrame, Frame, InterimTranscriptionFrame, StartFrame, @@ -275,7 +276,8 @@ class CartesiaSTTService(WebsocketSTTService): self._websocket = await websocket_connect(ws_url, additional_headers=headers) await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"{self}: unable to connect to Cartesia: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) async def _disconnect_websocket(self): try: @@ -284,6 +286,7 @@ class CartesiaSTTService(WebsocketSTTService): await self._websocket.close() except Exception as e: logger.error(f"{self} error closing websocket: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._websocket = None await self._call_event_handler("on_disconnected") @@ -315,7 +318,9 @@ class CartesiaSTTService(WebsocketSTTService): await self._on_transcript(data) elif data["type"] == "error": - logger.error(f"Cartesia error: {data.get('message', 'Unknown error')}") + error_msg = data.get("message", "Unknown error") + logger.error(f"Cartesia error: {error_msg}") + await self.push_error(ErrorFrame(error=error_msg)) @traced_stt async def _handle_transcription( diff --git a/src/pipecat/services/cartesia/tts.py b/src/pipecat/services/cartesia/tts.py index 9e7f6b37c..f8881200c 100644 --- a/src/pipecat/services/cartesia/tts.py +++ b/src/pipecat/services/cartesia/tts.py @@ -397,7 +397,8 @@ class CartesiaTTSService(AudioContextWordTTSService): ) await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"{self} initialization error: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") @@ -409,7 +410,8 @@ class CartesiaTTSService(AudioContextWordTTSService): logger.debug("Disconnecting from Cartesia") await self._websocket.close() except Exception as e: - logger.error(f"{self} error closing websocket: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._context_id = None self._websocket = None @@ -465,7 +467,7 @@ class CartesiaTTSService(AudioContextWordTTSService): logger.error(f"{self} error: {msg}") await self.push_frame(TTSStoppedFrame()) await self.stop_all_metrics() - await self.push_error(ErrorFrame(f"{self} error: {msg['error']}")) + await self.push_error(ErrorFrame(error=f"{self} error: {msg['error']}")) self._context_id = None else: logger.error(f"{self} error, unknown message type: {msg}") @@ -506,7 +508,8 @@ class CartesiaTTSService(AudioContextWordTTSService): await self._get_websocket().send(msg) await self.start_tts_usage_metrics(text) except Exception as e: - logger.error(f"{self} error sending message: {e}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") yield TTSStoppedFrame() await self._disconnect() await self._connect() @@ -514,6 +517,7 @@ class CartesiaTTSService(AudioContextWordTTSService): yield None except Exception as e: logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") class CartesiaHttpTTSService(TTSService): @@ -705,7 +709,7 @@ class CartesiaHttpTTSService(TTSService): if response.status != 200: error_text = await response.text() logger.error(f"Cartesia API error: {error_text}") - await self.push_error(ErrorFrame(f"Cartesia API error: {error_text}")) + await self.push_error(ErrorFrame(error=f"Cartesia API error: {error_text}")) raise Exception(f"Cartesia API returned status {response.status}: {error_text}") audio_data = await response.read() @@ -722,7 +726,7 @@ class CartesiaHttpTTSService(TTSService): except Exception as e: logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(f"Error generating TTS: {e}")) + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: await self.stop_ttfb_metrics() yield TTSStoppedFrame() diff --git a/src/pipecat/services/deepgram/flux/stt.py b/src/pipecat/services/deepgram/flux/stt.py index 7adc0f35a..ac6e8bb08 100644 --- a/src/pipecat/services/deepgram/flux/stt.py +++ b/src/pipecat/services/deepgram/flux/stt.py @@ -191,7 +191,8 @@ class DeepgramFluxSTTService(WebsocketSTTService): await self._disconnect_websocket() except Exception as e: - logger.error(f"Error during disconnect: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: # Reset state only after everything is cleaned up self._websocket = None @@ -214,7 +215,8 @@ class DeepgramFluxSTTService(WebsocketSTTService): logger.debug("Connected to Deepgram Flux Websocket") await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"{self} initialization error: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") @@ -233,6 +235,7 @@ class DeepgramFluxSTTService(WebsocketSTTService): await self._websocket.close() except Exception as e: logger.error(f"{self} error closing websocket: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._websocket = None await self._call_event_handler("on_disconnected") @@ -333,14 +336,14 @@ class DeepgramFluxSTTService(WebsocketSTTService): """ if not self._websocket: logger.error("Not connected to Deepgram Flux.") - yield ErrorFrame("Not connected to Deepgram Flux.", fatal=True) + yield ErrorFrame("Not connected to Deepgram Flux.") return try: await self._websocket.send(audio) except Exception as e: - logger.error(f"Failed to send audio to Flux: {e}") - yield ErrorFrame(f"Failed to send audio to Flux: {e}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") return yield None @@ -417,7 +420,8 @@ class DeepgramFluxSTTService(WebsocketSTTService): # Skip malformed messages continue except Exception as e: - logger.error(f"Error processing message: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) # Error will be handled inside WebsocketService->_receive_task_handler raise else: diff --git a/src/pipecat/services/deepgram/stt.py b/src/pipecat/services/deepgram/stt.py index 6b1ea2f21..0a4cd7e6e 100644 --- a/src/pipecat/services/deepgram/stt.py +++ b/src/pipecat/services/deepgram/stt.py @@ -256,7 +256,7 @@ class DeepgramSTTService(STTService): async def _on_error(self, *args, **kwargs): error: ErrorResponse = kwargs["error"] logger.warning(f"{self} connection error, will retry: {error}") - await self.push_error(ErrorFrame(f"{error}")) + await self.push_error(ErrorFrame(error=f"{error}")) await self.stop_all_metrics() # NOTE(aleix): we don't disconnect (i.e. call finish on the connection) # because this triggers more errors internally in the Deepgram SDK. So, diff --git a/src/pipecat/services/deepgram/tts.py b/src/pipecat/services/deepgram/tts.py index 2c816e4a9..f6045c1f3 100644 --- a/src/pipecat/services/deepgram/tts.py +++ b/src/pipecat/services/deepgram/tts.py @@ -125,8 +125,8 @@ class DeepgramTTSService(TTSService): yield TTSStoppedFrame() except Exception as e: - logger.exception(f"{self} exception: {e}") - yield ErrorFrame(f"Error getting audio: {str(e)}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") class DeepgramHttpTTSService(TTSService): diff --git a/src/pipecat/services/elevenlabs/stt.py b/src/pipecat/services/elevenlabs/stt.py index 3b3349ba2..8cbb40d63 100644 --- a/src/pipecat/services/elevenlabs/stt.py +++ b/src/pipecat/services/elevenlabs/stt.py @@ -351,8 +351,8 @@ class ElevenLabsSTTService(SegmentedSTTService): ) except Exception as e: - logger.error(f"ElevenLabs STT error: {e}") - yield ErrorFrame(f"ElevenLabs STT error: {str(e)}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") def audio_format_from_sample_rate(sample_rate: int) -> str: diff --git a/src/pipecat/services/elevenlabs/tts.py b/src/pipecat/services/elevenlabs/tts.py index 44017e264..bbe05f9dc 100644 --- a/src/pipecat/services/elevenlabs/tts.py +++ b/src/pipecat/services/elevenlabs/tts.py @@ -424,7 +424,8 @@ class ElevenLabsTTSService(AudioContextWordTTSService): json.dumps({"context_id": self._context_id, "close_context": True}) ) except Exception as e: - logger.warning(f"Error closing context for voice settings update: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) self._context_id = None self._started = False @@ -535,8 +536,9 @@ class ElevenLabsTTSService(AudioContextWordTTSService): await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"{self} initialization error: {e}") + logger.error(f"{self} exception: {e}") self._websocket = None + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) await self._call_event_handler("on_connection_error", f"{e}") async def _disconnect_websocket(self): @@ -551,7 +553,8 @@ class ElevenLabsTTSService(AudioContextWordTTSService): await self._websocket.close() logger.debug("Disconnected from ElevenLabs") except Exception as e: - logger.error(f"{self} error closing websocket: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._started = False self._context_id = None @@ -581,7 +584,8 @@ class ElevenLabsTTSService(AudioContextWordTTSService): json.dumps({"context_id": self._context_id, "close_context": True}) ) except Exception as e: - logger.error(f"Error closing context on interruption: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) self._context_id = None self._started = False self._partial_word = "" @@ -736,13 +740,15 @@ class ElevenLabsTTSService(AudioContextWordTTSService): else: await self._send_text(text) except Exception as e: - logger.error(f"{self} error sending message: {e}") + logger.error(f"{self} exception: {e}") yield TTSStoppedFrame() + yield ErrorFrame(error=f"{self} error: {e}") self._started = False return yield None except Exception as e: logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") class ElevenLabsHttpTTSService(WordTTSService): @@ -1085,7 +1091,8 @@ class ElevenLabsHttpTTSService(WordTTSService): logger.warning(f"Failed to parse JSON from stream: {e}") continue except Exception as e: - logger.error(f"Error processing response: {e}", exc_info=True) + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") continue # After processing all chunks, emit any remaining partial word @@ -1109,8 +1116,8 @@ class ElevenLabsHttpTTSService(WordTTSService): self._previous_text = text except Exception as e: - logger.error(f"Error in run_tts: {e}") - yield ErrorFrame(error=str(e)) + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") finally: await self.stop_ttfb_metrics() # Let the parent class handle TTSStoppedFrame diff --git a/src/pipecat/services/fal/stt.py b/src/pipecat/services/fal/stt.py index f4a708e23..8b84aaeeb 100644 --- a/src/pipecat/services/fal/stt.py +++ b/src/pipecat/services/fal/stt.py @@ -290,5 +290,5 @@ class FalSTTService(SegmentedSTTService): ) except Exception as e: - logger.error(f"Fal Wizper error: {e}") - yield ErrorFrame(f"Fal Wizper error: {str(e)}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") diff --git a/src/pipecat/services/fish/tts.py b/src/pipecat/services/fish/tts.py index 1abe6aca1..0acb12a96 100644 --- a/src/pipecat/services/fish/tts.py +++ b/src/pipecat/services/fish/tts.py @@ -237,7 +237,8 @@ class FishAudioTTSService(InterruptibleTTSService): await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"Fish Audio initialization error: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") @@ -251,7 +252,8 @@ class FishAudioTTSService(InterruptibleTTSService): await self._websocket.send(ormsgpack.packb(stop_message)) await self._websocket.close() except Exception as e: - logger.error(f"Error closing websocket: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._request_id = None self._started = False @@ -293,7 +295,8 @@ class FishAudioTTSService(InterruptibleTTSService): continue except Exception as e: - logger.error(f"Error processing message: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) @traced_tts async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: @@ -329,7 +332,8 @@ class FishAudioTTSService(InterruptibleTTSService): flush_message = {"event": "flush"} await self._get_websocket().send(ormsgpack.packb(flush_message)) except Exception as e: - logger.error(f"{self} error sending message: {e}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") yield TTSStoppedFrame() await self._disconnect() await self._connect() @@ -337,5 +341,5 @@ class FishAudioTTSService(InterruptibleTTSService): yield None except Exception as e: - logger.error(f"Error generating TTS: {e}") - yield ErrorFrame(f"Error: {str(e)}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") diff --git a/src/pipecat/services/gladia/stt.py b/src/pipecat/services/gladia/stt.py index cd59a3b74..624745293 100644 --- a/src/pipecat/services/gladia/stt.py +++ b/src/pipecat/services/gladia/stt.py @@ -23,6 +23,7 @@ from pipecat import __version__ as pipecat_version from pipecat.frames.frames import ( CancelFrame, EndFrame, + ErrorFrame, Frame, InterimTranscriptionFrame, StartFrame, @@ -467,7 +468,8 @@ class GladiaSTTService(STTService): break except Exception as e: - logger.error(f"Error in connection handler: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) self._connection_active = False if not self._should_reconnect: @@ -557,7 +559,8 @@ class GladiaSTTService(STTService): except websockets.exceptions.ConnectionClosed: logger.debug("Connection closed during keepalive") except Exception as e: - logger.error(f"Error in Gladia keepalive task: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) async def _receive_task_handler(self): try: @@ -620,7 +623,8 @@ class GladiaSTTService(STTService): # Expected when closing the connection pass except Exception as e: - logger.error(f"Error in Gladia WebSocket handler: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) async def _maybe_reconnect(self) -> bool: """Handle exponential backoff reconnection logic.""" diff --git a/src/pipecat/services/google/gemini_live/llm.py b/src/pipecat/services/google/gemini_live/llm.py index 9c92076ab..11632968e 100644 --- a/src/pipecat/services/google/gemini_live/llm.py +++ b/src/pipecat/services/google/gemini_live/llm.py @@ -1174,7 +1174,7 @@ class GeminiLiveLLMService(LLMService): self._connection_task = self.create_task(self._connection_task_handler(config=config)) except Exception as e: - await self.push_error(ErrorFrame(error=f"{self} Initialization error: {e}", fatal=True)) + await self.push_error(ErrorFrame(error=f"{self} Initialization error: {e}")) async def _connection_task_handler(self, config: LiveConnectConfig): async with self._client.aio.live.connect(model=self._model_name, config=config) as session: @@ -1255,9 +1255,7 @@ class GeminiLiveLLMService(LLMService): f"Max consecutive failures ({MAX_CONSECUTIVE_FAILURES}) reached, " "treating as fatal error" ) - await self.push_error( - ErrorFrame(error=f"{self} Error in receive loop: {error}", fatal=True) - ) + await self.push_error(ErrorFrame(error=f"{self} Error in receive loop: {error}")) return False else: logger.info( diff --git a/src/pipecat/services/google/stt.py b/src/pipecat/services/google/stt.py index 59d1dcd63..beb5db740 100644 --- a/src/pipecat/services/google/stt.py +++ b/src/pipecat/services/google/stt.py @@ -774,7 +774,8 @@ class GoogleSTTService(STTService): yield cloud_speech.StreamingRecognizeRequest(audio=audio_data) except Exception as e: - logger.error(f"Error in request generator: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) raise async def _stream_audio(self): @@ -805,14 +806,15 @@ class GoogleSTTService(STTService): break except Exception as e: - logger.warning(f"{self} Reconnecting: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) await asyncio.sleep(1) # Brief delay before reconnecting self._stream_start_time = int(time.time() * 1000) except Exception as e: - logger.error(f"Error in streaming task: {e}") - await self.push_frame(ErrorFrame(str(e))) + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: """Process an audio chunk for STT transcription. @@ -900,7 +902,8 @@ class GoogleSTTService(STTService): ) raise except Exception as e: - logger.error(f"Error processing Google STT responses: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) # Re-raise the exception to let it propagate (e.g. in the case of a # timeout, propagate to _stream_audio to reconnect) raise diff --git a/src/pipecat/services/google/tts.py b/src/pipecat/services/google/tts.py index 4db084623..449b2bca2 100644 --- a/src/pipecat/services/google/tts.py +++ b/src/pipecat/services/google/tts.py @@ -746,7 +746,7 @@ class GoogleHttpTTSService(TTSService): yield TTSStoppedFrame() except Exception as e: - logger.exception(f"{self} error generating TTS: {e}") + logger.error(f"{self} exception: {e}") error_message = f"TTS generation error: {str(e)}" yield ErrorFrame(error=error_message) @@ -1014,7 +1014,7 @@ class GoogleTTSService(GoogleBaseTTSService): yield frame except Exception as e: - logger.exception(f"{self} error generating TTS: {e}") + logger.error(f"{self} exception: {e}") error_message = f"TTS generation error: {str(e)}" yield ErrorFrame(error=error_message) @@ -1266,6 +1266,6 @@ class GeminiTTSService(GoogleBaseTTSService): yield frame except Exception as e: - logger.exception(f"{self} error generating TTS: {e}") + logger.error(f"{self} exception: {e}") error_message = f"Gemini TTS generation error: {str(e)}" yield ErrorFrame(error=error_message) diff --git a/src/pipecat/services/groq/tts.py b/src/pipecat/services/groq/tts.py index 6bd49d1a3..d2dcb73a1 100644 --- a/src/pipecat/services/groq/tts.py +++ b/src/pipecat/services/groq/tts.py @@ -13,7 +13,13 @@ from typing import AsyncGenerator, Optional from loguru import logger from pydantic import BaseModel -from pipecat.frames.frames import Frame, TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame +from pipecat.frames.frames import ( + ErrorFrame, + Frame, + TTSAudioRawFrame, + TTSStartedFrame, + TTSStoppedFrame, +) from pipecat.services.tts_service import TTSService from pipecat.transcriptions.language import Language from pipecat.utils.tracing.service_decorators import traced_tts @@ -150,5 +156,6 @@ class GroqTTSService(TTSService): yield TTSAudioRawFrame(bytes, frame_rate, channels) except Exception as e: logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") yield TTSStoppedFrame() diff --git a/src/pipecat/services/hume/tts.py b/src/pipecat/services/hume/tts.py index f8b2bbf27..6760c8121 100644 --- a/src/pipecat/services/hume/tts.py +++ b/src/pipecat/services/hume/tts.py @@ -225,8 +225,8 @@ class HumeTTSService(TTSService): self._audio_bytes = b"" except Exception as e: - logger.exception(f"{self} error generating TTS: {e}") - await self.push_error(ErrorFrame(f"Error generating TTS: {e}")) + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: # Ensure TTFB timer is stopped even on early failures await self.stop_ttfb_metrics() diff --git a/src/pipecat/services/inworld/tts.py b/src/pipecat/services/inworld/tts.py index 9bb939518..067ea6ab6 100644 --- a/src/pipecat/services/inworld/tts.py +++ b/src/pipecat/services/inworld/tts.py @@ -374,7 +374,7 @@ class InworldTTSService(TTSService): if response.status != 200: error_text = await response.text() logger.error(f"Inworld API error: {error_text}") - await self.push_error(ErrorFrame(f"Inworld API error: {error_text}")) + yield ErrorFrame(error=f"Inworld API error: {error_text}") return # ================================================================================ @@ -402,7 +402,7 @@ class InworldTTSService(TTSService): # ================================================================================ # Log any unexpected errors and notify the pipeline logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(f"Error generating TTS: {e}")) + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: # ================================================================================ # STEP 8: CLEANUP AND COMPLETION @@ -517,7 +517,7 @@ class InworldTTSService(TTSService): # Extract the base64-encoded audio content from response if "audioContent" not in response_data: logger.error("No audioContent in Inworld API response") - await self.push_error(ErrorFrame("No audioContent in response")) + await self.push_error(ErrorFrame(error="No audioContent in response")) return # ================================================================================ diff --git a/src/pipecat/services/lmnt/tts.py b/src/pipecat/services/lmnt/tts.py index 538c1ef93..d98d3f76c 100644 --- a/src/pipecat/services/lmnt/tts.py +++ b/src/pipecat/services/lmnt/tts.py @@ -223,7 +223,8 @@ class LmntTTSService(InterruptibleTTSService): await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"{self} initialization error: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") @@ -239,7 +240,8 @@ class LmntTTSService(InterruptibleTTSService): # await self._websocket.send(json.dumps({"eof": True})) await self._websocket.close() except Exception as e: - logger.error(f"{self} error closing websocket: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._started = False self._websocket = None @@ -276,7 +278,7 @@ class LmntTTSService(InterruptibleTTSService): logger.error(f"{self} error: {msg['error']}") await self.push_frame(TTSStoppedFrame()) await self.stop_all_metrics() - await self.push_error(ErrorFrame(f"{self} error: {msg['error']}")) + await self.push_error(ErrorFrame(error=f"{self} error: {msg['error']}")) return except json.JSONDecodeError: logger.error(f"Invalid JSON message: {message}") @@ -309,7 +311,8 @@ class LmntTTSService(InterruptibleTTSService): await self._get_websocket().send(json.dumps({"flush": True})) await self.start_tts_usage_metrics(text) except Exception as e: - logger.error(f"{self} error sending message: {e}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") yield TTSStoppedFrame() await self._disconnect() await self._connect() @@ -317,3 +320,4 @@ class LmntTTSService(InterruptibleTTSService): yield None except Exception as e: logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") diff --git a/src/pipecat/services/minimax/tts.py b/src/pipecat/services/minimax/tts.py index c0a6b6aaa..b8e984eeb 100644 --- a/src/pipecat/services/minimax/tts.py +++ b/src/pipecat/services/minimax/tts.py @@ -347,8 +347,8 @@ class MiniMaxHttpTTSService(TTSService): continue except Exception as e: - logger.exception(f"Error generating TTS: {e}") - yield ErrorFrame(error=f"MiniMax TTS error: {str(e)}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") finally: await self.stop_ttfb_metrics() yield TTSStoppedFrame() diff --git a/src/pipecat/services/neuphonic/tts.py b/src/pipecat/services/neuphonic/tts.py index 22a6e9999..992abbe67 100644 --- a/src/pipecat/services/neuphonic/tts.py +++ b/src/pipecat/services/neuphonic/tts.py @@ -294,7 +294,8 @@ class NeuphonicTTSService(InterruptibleTTSService): await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"{self} initialization error: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") @@ -307,7 +308,8 @@ class NeuphonicTTSService(InterruptibleTTSService): logger.debug("Disconnecting from Neuphonic") await self._websocket.close() except Exception as e: - logger.error(f"{self} error closing websocket: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._started = False self._websocket = None @@ -372,7 +374,8 @@ class NeuphonicTTSService(InterruptibleTTSService): await self._send_text(text) await self.start_tts_usage_metrics(text) except Exception as e: - logger.error(f"{self} error sending message: {e}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") yield TTSStoppedFrame() await self._disconnect() await self._connect() @@ -380,6 +383,7 @@ class NeuphonicTTSService(InterruptibleTTSService): yield None except Exception as e: logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") class NeuphonicHttpTTSService(TTSService): @@ -582,7 +586,8 @@ class NeuphonicHttpTTSService(TTSService): yield TTSAudioRawFrame(audio_bytes, self.sample_rate, 1) except Exception as e: - logger.error(f"Error processing SSE message: {e}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") # Don't yield error frame for individual message failures continue @@ -590,8 +595,8 @@ class NeuphonicHttpTTSService(TTSService): logger.debug("TTS generation cancelled") raise except Exception as e: - logger.exception(f"Error in run_tts: {e}") - yield ErrorFrame(error=f"Neuphonic TTS error: {str(e)}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") finally: await self.stop_ttfb_metrics() yield TTSStoppedFrame() diff --git a/src/pipecat/services/openai/realtime/llm.py b/src/pipecat/services/openai/realtime/llm.py index e38836b90..f66e6e8e1 100644 --- a/src/pipecat/services/openai/realtime/llm.py +++ b/src/pipecat/services/openai/realtime/llm.py @@ -478,7 +478,7 @@ class OpenAIRealtimeLLMService(LLMService): # it is to recover from a send-side error with proper state management, and that exponential # backoff for retries can have cost/stability implications for a service cluster, let's just # treat a send-side error as fatal. - await self.push_error(ErrorFrame(error=f"Error sending client event: {e}", fatal=True)) + await self.push_error(ErrorFrame(error=f"Error sending client event: {e}")) async def _update_settings(self): settings = self._session_properties @@ -667,9 +667,7 @@ class OpenAIRealtimeLLMService(LLMService): self._current_assistant_response = None # error handling if evt.response.status == "failed": - await self.push_error( - ErrorFrame(error=evt.response.status_details["error"]["message"], fatal=True) - ) + await self.push_error(ErrorFrame(error=evt.response.status_details["error"]["message"])) return # response content for item in evt.response.output: @@ -763,7 +761,7 @@ class OpenAIRealtimeLLMService(LLMService): async def _handle_evt_error(self, evt): # Errors are fatal to this connection. Send an ErrorFrame. - await self.push_error(ErrorFrame(error=f"Error: {evt}", fatal=True)) + await self.push_error(ErrorFrame(error=f"Error: {evt}")) # # state and client events for the current conversation diff --git a/src/pipecat/services/openai/tts.py b/src/pipecat/services/openai/tts.py index af580e4a0..a5231170e 100644 --- a/src/pipecat/services/openai/tts.py +++ b/src/pipecat/services/openai/tts.py @@ -199,7 +199,7 @@ class OpenAITTSService(TTSService): f"{self} error getting audio (status: {r.status_code}, error: {error})" ) yield ErrorFrame( - f"Error getting audio (status: {r.status_code}, error: {error})" + error=f"Error getting audio (status: {r.status_code}, error: {error})" ) return @@ -216,3 +216,4 @@ class OpenAITTSService(TTSService): yield TTSStoppedFrame() except BadRequestError as e: logger.exception(f"{self} error generating TTS: {e}") + yield ErrorFrame(error=f"{self} error: {e}") diff --git a/src/pipecat/services/openai_realtime_beta/openai.py b/src/pipecat/services/openai_realtime_beta/openai.py index 922f9a572..af0600882 100644 --- a/src/pipecat/services/openai_realtime_beta/openai.py +++ b/src/pipecat/services/openai_realtime_beta/openai.py @@ -454,7 +454,7 @@ class OpenAIRealtimeBetaLLMService(LLMService): # it is to recover from a send-side error with proper state management, and that exponential # backoff for retries can have cost/stability implications for a service cluster, let's just # treat a send-side error as fatal. - await self.push_error(ErrorFrame(error=f"Error sending client event: {e}", fatal=True)) + await self.push_error(ErrorFrame(error=f"Error sending client event: {e}")) async def _update_settings(self): settings = self._session_properties @@ -627,9 +627,7 @@ class OpenAIRealtimeBetaLLMService(LLMService): self._current_assistant_response = None # error handling if evt.response.status == "failed": - await self.push_error( - ErrorFrame(error=evt.response.status_details["error"]["message"], fatal=True) - ) + await self.push_error(ErrorFrame(error=evt.response.status_details["error"]["message"])) return # response content for item in evt.response.output: @@ -687,7 +685,7 @@ class OpenAIRealtimeBetaLLMService(LLMService): async def _handle_evt_error(self, evt): # Errors are fatal to this connection. Send an ErrorFrame. - await self.push_error(ErrorFrame(error=f"Error: {evt}", fatal=True)) + await self.push_error(ErrorFrame(error=f"Error: {evt}")) async def _handle_assistant_output(self, output): # We haven't seen intermixed audio and function_call items in the same response. But let's diff --git a/src/pipecat/services/piper/tts.py b/src/pipecat/services/piper/tts.py index 73addb9d1..3752418b5 100644 --- a/src/pipecat/services/piper/tts.py +++ b/src/pipecat/services/piper/tts.py @@ -101,7 +101,7 @@ class PiperTTSService(TTSService): f"{self} error getting audio (status: {response.status}, error: {error})" ) yield ErrorFrame( - f"Error getting audio (status: {response.status}, error: {error})" + error=f"Error getting audio (status: {response.status}, error: {error})" ) return @@ -117,8 +117,8 @@ class PiperTTSService(TTSService): await self.stop_ttfb_metrics() yield frame except Exception as e: - logger.error(f"Error in run_tts: {e}") - yield ErrorFrame(error=str(e)) + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") finally: logger.debug(f"{self}: Finished TTS [{text}]") await self.stop_ttfb_metrics() diff --git a/src/pipecat/services/playht/tts.py b/src/pipecat/services/playht/tts.py index 1f0ed2156..2c40065aa 100644 --- a/src/pipecat/services/playht/tts.py +++ b/src/pipecat/services/playht/tts.py @@ -266,7 +266,8 @@ class PlayHTTTSService(InterruptibleTTSService): self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") except Exception as e: - logger.error(f"{self} initialization error: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") @@ -279,7 +280,8 @@ class PlayHTTTSService(InterruptibleTTSService): logger.debug("Disconnecting from PlayHT") await self._websocket.close() except Exception as e: - logger.error(f"{self} error closing websocket: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._request_id = None self._websocket = None @@ -350,7 +352,7 @@ class PlayHTTTSService(InterruptibleTTSService): self._request_id = None elif "error" in msg: logger.error(f"{self} error: {msg}") - await self.push_error(ErrorFrame(f"{self} error: {msg['error']}")) + await self.push_error(ErrorFrame(error=f"{self} error: {msg['error']}")) except json.JSONDecodeError: logger.error(f"Invalid JSON message: {message}") @@ -392,7 +394,8 @@ class PlayHTTTSService(InterruptibleTTSService): await self._get_websocket().send(json.dumps(tts_command)) await self.start_tts_usage_metrics(text) except Exception as e: - logger.error(f"{self} error sending message: {e}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") yield TTSStoppedFrame() await self._disconnect() await self._connect() @@ -402,8 +405,8 @@ class PlayHTTTSService(InterruptibleTTSService): yield None except Exception as e: - logger.error(f"{self} error generating TTS: {e}") - yield ErrorFrame(f"{self} error: {str(e)}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") class PlayHTHttpTTSService(TTSService): @@ -623,7 +626,8 @@ class PlayHTHttpTTSService(TTSService): yield frame except Exception as e: - logger.error(f"{self} error generating TTS: {e}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") finally: await self.stop_ttfb_metrics() yield TTSStoppedFrame() diff --git a/src/pipecat/services/rime/tts.py b/src/pipecat/services/rime/tts.py index b51f9e3ec..329aecd7d 100644 --- a/src/pipecat/services/rime/tts.py +++ b/src/pipecat/services/rime/tts.py @@ -259,7 +259,8 @@ class RimeTTSService(AudioContextWordTTSService): await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"{self} initialization error: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") @@ -271,7 +272,8 @@ class RimeTTSService(AudioContextWordTTSService): await self._websocket.send(json.dumps(self._build_eos_msg())) await self._websocket.close() except Exception as e: - logger.error(f"{self} error closing websocket: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._context_id = None self._websocket = None @@ -367,7 +369,7 @@ class RimeTTSService(AudioContextWordTTSService): logger.error(f"{self} error: {msg}") await self.push_frame(TTSStoppedFrame()) await self.stop_all_metrics() - await self.push_error(ErrorFrame(f"{self} error: {msg['message']}")) + await self.push_error(ErrorFrame(error=f"{self} error: {msg['message']}")) self._context_id = None async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): @@ -409,7 +411,8 @@ class RimeTTSService(AudioContextWordTTSService): await self._get_websocket().send(json.dumps(msg)) await self.start_tts_usage_metrics(text) except Exception as e: - logger.error(f"{self} error sending message: {e}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") yield TTSStoppedFrame() await self._disconnect() await self._connect() @@ -417,6 +420,7 @@ class RimeTTSService(AudioContextWordTTSService): yield None except Exception as e: logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") class RimeHttpTTSService(TTSService): @@ -574,8 +578,8 @@ class RimeHttpTTSService(TTSService): yield frame except Exception as e: - logger.exception(f"Error generating TTS: {e}") - yield ErrorFrame(error=f"Rime TTS error: {str(e)}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") finally: await self.stop_ttfb_metrics() yield TTSStoppedFrame() diff --git a/src/pipecat/services/riva/stt.py b/src/pipecat/services/riva/stt.py index 0c93365d5..25cc78c7a 100644 --- a/src/pipecat/services/riva/stt.py +++ b/src/pipecat/services/riva/stt.py @@ -659,8 +659,8 @@ class RivaSegmentedSTTService(SegmentedSTTService): yield ErrorFrame(f"Unexpected Riva response format: {str(ae)}") except Exception as e: - logger.exception(f"Riva Canary ASR error: {e}") - yield ErrorFrame(f"Riva Canary ASR error: {str(e)}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") class ParakeetSTTService(RivaSTTService): diff --git a/src/pipecat/services/riva/tts.py b/src/pipecat/services/riva/tts.py index d051965ed..07aa4d105 100644 --- a/src/pipecat/services/riva/tts.py +++ b/src/pipecat/services/riva/tts.py @@ -23,6 +23,7 @@ from loguru import logger from pydantic import BaseModel from pipecat.frames.frames import ( + ErrorFrame, Frame, TTSAudioRawFrame, TTSStartedFrame, @@ -165,6 +166,7 @@ class RivaTTSService(TTSService): add_response(None) except Exception as e: logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") add_response(None) await self.start_ttfb_metrics() diff --git a/src/pipecat/services/sarvam/tts.py b/src/pipecat/services/sarvam/tts.py index 9ff037938..6ce45d27d 100644 --- a/src/pipecat/services/sarvam/tts.py +++ b/src/pipecat/services/sarvam/tts.py @@ -264,7 +264,7 @@ class SarvamHttpTTSService(TTSService): if response.status != 200: error_text = await response.text() logger.error(f"Sarvam API error: {error_text}") - await self.push_error(ErrorFrame(f"Sarvam API error: {error_text}")) + await self.push_error(ErrorFrame(error=f"Sarvam API error: {error_text}")) return response_data = await response.json() @@ -274,7 +274,7 @@ class SarvamHttpTTSService(TTSService): # Decode base64 audio data if "audios" not in response_data or not response_data["audios"]: logger.error("No audio data received from Sarvam API") - await self.push_error(ErrorFrame("No audio data received")) + await self.push_error(ErrorFrame(error="No audio data received")) return # Get the first audio (there should be only one for single text input) @@ -296,7 +296,7 @@ class SarvamHttpTTSService(TTSService): except Exception as e: logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(f"Error generating TTS: {e}")) + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: await self.stop_ttfb_metrics() yield TTSStoppedFrame() @@ -578,7 +578,8 @@ class SarvamTTSService(InterruptibleTTSService): await self._disconnect_websocket() except Exception as e: - logger.error(f"Error during disconnect: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: # Reset state only after everything is cleaned up self._started = False @@ -602,7 +603,8 @@ class SarvamTTSService(InterruptibleTTSService): await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"{self} initialization error: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") @@ -618,8 +620,8 @@ class SarvamTTSService(InterruptibleTTSService): await self._websocket.send(json.dumps(config_message)) logger.debug("Configuration sent successfully") except Exception as e: - logger.error(f"Failed to send config: {str(e)}") - await self.push_frame(ErrorFrame(f"Failed to send config: {str(e)}")) + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) raise async def _disconnect_websocket(self): @@ -632,6 +634,7 @@ class SarvamTTSService(InterruptibleTTSService): await self._websocket.close() except Exception as e: logger.error(f"{self} error closing websocket: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._started = False self._websocket = None @@ -661,7 +664,7 @@ class SarvamTTSService(InterruptibleTTSService): if "too long" in error_msg.lower() or "timeout" in error_msg.lower(): logger.warning("Connection timeout detected, service may need restart") - await self.push_frame(ErrorFrame(f"TTS Error: {error_msg}")) + await self.push_frame(ErrorFrame(error=f"TTS Error: {error_msg}")) async def _keepalive_task_handler(self): """Handle keepalive messages to maintain WebSocket connection.""" @@ -717,7 +720,8 @@ class SarvamTTSService(InterruptibleTTSService): await self._send_text(text) await self.start_tts_usage_metrics(text) except Exception as e: - logger.error(f"{self} error sending message: {e}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") yield TTSStoppedFrame() await self._disconnect() await self._connect() @@ -725,3 +729,4 @@ class SarvamTTSService(InterruptibleTTSService): yield None except Exception as e: logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") diff --git a/src/pipecat/services/soniox/stt.py b/src/pipecat/services/soniox/stt.py index 1447774e1..b4bcb7ba1 100644 --- a/src/pipecat/services/soniox/stt.py +++ b/src/pipecat/services/soniox/stt.py @@ -327,8 +327,8 @@ class SonioxSTTService(STTService): # Expected when closing the connection logger.debug("WebSocket connection closed, keepalive task stopped.") except Exception as e: - logger.error(f"{self} error (_keepalive_task_handler): {e}") - await self.push_error(ErrorFrame(f"{self} error (_keepalive_task_handler): {e}")) + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) async def _receive_task_handler(self): if not self._websocket: @@ -409,7 +409,7 @@ class SonioxSTTService(STTService): ) await self.push_error( ErrorFrame( - f"{self} error: {error_code} (_receive_task_handler) - {error_message}" + error=f"{self} error: {error_code} (_receive_task_handler) - {error_message}" ) ) @@ -425,5 +425,5 @@ class SonioxSTTService(STTService): # Expected when closing the connection. pass except Exception as e: - logger.error(f"{self} error: {e}") - await self.push_error(ErrorFrame(f"{self} error: {e}")) + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) diff --git a/src/pipecat/services/speechmatics/stt.py b/src/pipecat/services/speechmatics/stt.py index f85660c83..2d95bd69e 100644 --- a/src/pipecat/services/speechmatics/stt.py +++ b/src/pipecat/services/speechmatics/stt.py @@ -467,8 +467,8 @@ class SpeechmaticsSTTService(STTService): await self._client.send_audio(audio) yield None except Exception as e: - logger.error(f"Speechmatics error: {e}") - yield ErrorFrame(f"Speechmatics error: {e}", fatal=False) + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") await self._disconnect() def update_params( @@ -514,6 +514,8 @@ class SpeechmaticsSTTService(STTService): self._client.send_message(payload), self.get_event_loop() ) except Exception as e: + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) raise RuntimeError(f"error sending message to STT: {e}") async def _connect(self) -> None: @@ -579,7 +581,8 @@ class SpeechmaticsSTTService(STTService): logger.debug(f"{self} Connected to Speechmatics STT service") await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"{self} Error connecting to Speechmatics: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) self._client = None async def _disconnect(self) -> None: @@ -593,7 +596,8 @@ class SpeechmaticsSTTService(STTService): except asyncio.TimeoutError: logger.warning(f"{self} Timeout while closing Speechmatics client connection") except Exception as e: - logger.error(f"{self} Error closing Speechmatics client: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._client = None await self._call_event_handler("on_disconnected") diff --git a/src/pipecat/services/ultravox/stt.py b/src/pipecat/services/ultravox/stt.py index 987593f02..14eaebf6a 100644 --- a/src/pipecat/services/ultravox/stt.py +++ b/src/pipecat/services/ultravox/stt.py @@ -246,7 +246,8 @@ class UltravoxSTTService(AIService): logger.info("Model warm-up completed successfully") except Exception as e: - logger.warning(f"Model warm-up failed: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) def _generate_silent_audio(self, sample_rate=16000, duration_sec=1.0): """Generate silent audio as a numpy array. @@ -376,7 +377,7 @@ class UltravoxSTTService(AIService): if arr.size > 0: # Check if array is not empty audio_arrays.append(arr) except Exception as e: - logger.error(f"Error processing bytes audio frame: {e}") + yield ErrorFrame(error=f"{self} error: {e}") # Handle numpy array data elif isinstance(f.audio, np.ndarray): if f.audio.size > 0: # Check if array is not empty @@ -436,14 +437,14 @@ class UltravoxSTTService(AIService): yield LLMFullResponseEndFrame() except Exception as e: - logger.error(f"Error generating text from model: {e}") - yield ErrorFrame(f"Error generating text: {str(e)}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") else: - logger.warning("No model available for text generation") + logger.error("No model available for text generation") yield ErrorFrame("No model available for text generation") except Exception as e: - logger.error(f"Error processing audio buffer: {e}") + logger.error(f"{self} exception: {e}") import traceback logger.error(traceback.format_exc()) diff --git a/src/pipecat/services/websocket_service.py b/src/pipecat/services/websocket_service.py index 17a911366..f9799b9c3 100644 --- a/src/pipecat/services/websocket_service.py +++ b/src/pipecat/services/websocket_service.py @@ -94,7 +94,7 @@ class WebsocketService(ABC): if self._reconnect_on_error: retry_count += 1 if retry_count >= MAX_RETRIES: - await report_error(ErrorFrame(message, fatal=True)) + await report_error(ErrorFrame(message)) break logger.warning(f"{self} connection error, will retry: {e}") diff --git a/src/pipecat/services/whisper/base_stt.py b/src/pipecat/services/whisper/base_stt.py index 026b14a63..743895253 100644 --- a/src/pipecat/services/whisper/base_stt.py +++ b/src/pipecat/services/whisper/base_stt.py @@ -226,8 +226,8 @@ class BaseWhisperSTTService(SegmentedSTTService): logger.warning("Received empty transcription from API") except Exception as e: - logger.exception(f"Exception during transcription: {e}") - yield ErrorFrame(f"Error during transcription: {str(e)}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") async def _transcribe(self, audio: bytes) -> Transcription: """Transcribe audio data to text. diff --git a/src/pipecat/services/whisper/stt.py b/src/pipecat/services/whisper/stt.py index e69ce39cd..4e326e2ca 100644 --- a/src/pipecat/services/whisper/stt.py +++ b/src/pipecat/services/whisper/stt.py @@ -428,5 +428,5 @@ class WhisperSTTServiceMLX(WhisperSTTService): ) except Exception as e: - logger.exception(f"MLX Whisper transcription error: {e}") - yield ErrorFrame(f"MLX Whisper transcription error: {str(e)}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") diff --git a/src/pipecat/services/xtts/tts.py b/src/pipecat/services/xtts/tts.py index df58c96e4..34ddecbe2 100644 --- a/src/pipecat/services/xtts/tts.py +++ b/src/pipecat/services/xtts/tts.py @@ -146,7 +146,7 @@ class XTTSService(TTSService): ) await self.push_error( ErrorFrame( - f"Error error getting studio speakers (status: {r.status}, error: {text})" + error=f"Error getting studio speakers (status: {r.status}, error: {text})" ) ) return @@ -187,7 +187,7 @@ class XTTSService(TTSService): if r.status != 200: text = await r.text() logger.error(f"{self} error getting audio (status: {r.status}, error: {text})") - yield ErrorFrame(f"Error getting audio (status: {r.status}, error: {text})") + yield ErrorFrame(error=f"Error getting audio (status: {r.status}, error: {text})") return await self.start_tts_usage_metrics(text)