feat: Add ErrorFrame emission to TTS/STT services for pipeline error detection (#2881)

* feat: Add ErrorFrame emission to TTS/STT services for pipeline error detection

- Add ErrorFrame emission to all major TTS/STT services during initialization and runtime failures
- Services updated: Cartesia, ElevenLabs, Deepgram, AssemblyAI, Rime, Azure
- ErrorFrame objects emitted with fatal=False for graceful degradation
- Enables on_pipeline_error event handler to detect service failures programmatically
- Add comprehensive pytest test suite to verify ErrorFrame emission
- Fixes issue where services failed gracefully but didn't emit ErrorFrame objects

This allows developers to implement real-time error monitoring and alerting
using the on_pipeline_error event handler introduced in v0.0.90.

* Update STT and TTS services to use consistent error handling pattern

- Improves error handling consistency across all services

* Add changelog entry for STT/TTS error handling improvements

* Linting issues Resolved

* Azure STT ErrorFrames added with consistent patterns

* Cartesia STT and Deepgram STT; additional fixes made

* Removed Fatal Flags across services, removed duplication

* Moving the changelog entry to the correct place.

* Refactoring some classes to use yield instead of push_error directly.

* Fixing ruff format.

---------

Co-authored-by: Filipi Fuchter <filipi87@gmail.com>
This commit is contained in:
Angad Singh
2025-11-14 23:33:05 +05:30
committed by GitHub
parent 74a0e8c88d
commit d1116d149e
41 changed files with 260 additions and 170 deletions

View File

@@ -18,6 +18,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Updated all STT and TTS services to use consistent error handling pattern with
`push_error()` method for better pipeline error event integration.
- Added Hindi support for Rime TTS services.
- Updated `GeminiTTSService` to use Google Cloud Text-to-Speech streaming API

View File

@@ -21,6 +21,7 @@ from pipecat import __version__ as pipecat_version
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
@@ -205,8 +206,9 @@ class AssemblyAISTTService(STTService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"Failed to connect to AssemblyAI: {e}")
logger.error(f"{self} exception: {e}")
self._connected = False
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
raise
async def _disconnect(self):
@@ -231,7 +233,8 @@ class AssemblyAISTTService(STTService):
logger.warning("Timed out waiting for termination message from server")
except Exception as e:
logger.warning(f"Error during termination handshake: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
if self._receive_task:
await self.cancel_task(self._receive_task)
@@ -239,7 +242,8 @@ class AssemblyAISTTService(STTService):
await self._websocket.close()
except Exception as e:
logger.error(f"Error during disconnect: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._websocket = None
@@ -258,11 +262,13 @@ class AssemblyAISTTService(STTService):
except websockets.exceptions.ConnectionClosedOK:
break
except Exception as e:
logger.error(f"Error processing WebSocket message: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
break
except Exception as e:
logger.error(f"Fatal error in receive handler: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
def _parse_message(self, message: Dict[str, Any]) -> BaseMessage:
"""Parse a raw message into the appropriate message type."""
@@ -291,7 +297,8 @@ class AssemblyAISTTService(STTService):
elif isinstance(parsed_message, TerminationMessage):
await self._handle_termination(parsed_message)
except Exception as e:
logger.error(f"Error handling message: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
async def _handle_termination(self, message: TerminationMessage):
"""Handle termination message."""

View File

@@ -237,7 +237,8 @@ class AsyncAITTSService(InterruptibleTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -249,7 +250,8 @@ class AsyncAITTSService(InterruptibleTTSService):
logger.debug("Disconnecting from Async")
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._websocket = None
self._started = False
@@ -297,7 +299,7 @@ class AsyncAITTSService(InterruptibleTTSService):
logger.error(f"{self} error: {msg}")
await self.push_frame(TTSStoppedFrame())
await self.stop_all_metrics()
await self.push_error(ErrorFrame(f"{self} error: {msg['message']}"))
await self.push_error(ErrorFrame(error=f"{self} error: {msg['message']}"))
else:
logger.error(f"{self} error, unknown message type: {msg}")
@@ -342,7 +344,8 @@ class AsyncAITTSService(InterruptibleTTSService):
await self._get_websocket().send(msg)
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} error sending message: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
@@ -350,6 +353,7 @@ class AsyncAITTSService(InterruptibleTTSService):
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
class AsyncAIHttpTTSService(TTSService):
@@ -492,7 +496,7 @@ class AsyncAIHttpTTSService(TTSService):
if response.status != 200:
error_text = await response.text()
logger.error(f"Async API error: {error_text}")
await self.push_error(ErrorFrame(f"Async API error: {error_text}"))
await self.push_error(ErrorFrame(error=f"Async API error: {error_text}"))
raise Exception(f"Async API returned status {response.status}: {error_text}")
audio_data = await response.read()
@@ -509,7 +513,7 @@ class AsyncAIHttpTTSService(TTSService):
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(f"Error generating TTS: {e}"))
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -140,7 +140,8 @@ class AWSTranscribeSTTService(STTService):
return
logger.warning("WebSocket connection not established after connect")
except Exception as e:
logger.error(f"Failed to connect (attempt {retry_count + 1}/{max_retries}): {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
retry_count += 1
if retry_count < max_retries:
await asyncio.sleep(1) # Wait before retrying
@@ -181,8 +182,8 @@ class AWSTranscribeSTTService(STTService):
try:
await self._connect()
except Exception as e:
logger.error(f"Failed to reconnect: {e}")
yield ErrorFrame("Failed to reconnect to AWS Transcribe", fatal=False)
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
return
# Format the audio data according to AWS event stream format
@@ -199,13 +200,13 @@ class AWSTranscribeSTTService(STTService):
await self._disconnect()
# Don't yield error here - we'll retry on next frame
except Exception as e:
logger.error(f"Error sending audio: {e}")
yield ErrorFrame(f"AWS Transcribe error: {str(e)}", fatal=False)
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
await self._disconnect()
except Exception as e:
logger.error(f"Error in run_stt: {e}")
yield ErrorFrame(f"AWS Transcribe error: {str(e)}", fatal=False)
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
await self._disconnect()
async def _connect(self):
@@ -288,7 +289,8 @@ class AWSTranscribeSTTService(STTService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} Failed to connect to AWS Transcribe: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self._disconnect()
raise
@@ -308,7 +310,8 @@ class AWSTranscribeSTTService(STTService):
await self._ws_client.send(json.dumps(end_stream))
await self._ws_client.close()
except Exception as e:
logger.warning(f"{self} Error closing WebSocket connection: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._ws_client = None
await self._call_event_handler("on_disconnected")
@@ -527,9 +530,7 @@ class AWSTranscribeSTTService(STTService):
elif headers.get(":message-type") == "exception":
error_msg = payload.get("Message", "Unknown error")
logger.error(f"{self} Exception from AWS: {error_msg}")
await self.push_frame(
ErrorFrame(f"AWS Transcribe error: {error_msg}", fatal=False)
)
await self.push_frame(ErrorFrame(f"AWS Transcribe error: {error_msg}"))
else:
logger.debug(f"{self} Other message type received: {headers}")
logger.debug(f"{self} Payload: {payload}")
@@ -537,5 +538,6 @@ class AWSTranscribeSTTService(STTService):
logger.error(f"{self} WebSocket connection closed in receive loop: {e}")
break
except Exception as e:
logger.error(f"{self} Unexpected error in receive loop: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
break

View File

@@ -18,6 +18,7 @@ from loguru import logger
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
@@ -111,13 +112,17 @@ class AzureSTTService(STTService):
audio: Raw audio bytes to process.
Yields:
None - actual transcription frames are pushed via callbacks.
Frame: Either None for successful processing or ErrorFrame on failure.
"""
await self.start_processing_metrics()
await self.start_ttfb_metrics()
if self._audio_stream:
self._audio_stream.write(audio)
yield None
try:
await self.start_processing_metrics()
await self.start_ttfb_metrics()
if self._audio_stream:
self._audio_stream.write(audio)
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
async def start(self, frame: StartFrame):
"""Start the speech recognition service.
@@ -133,17 +138,21 @@ class AzureSTTService(STTService):
if self._audio_stream:
return
stream_format = AudioStreamFormat(samples_per_second=self.sample_rate, channels=1)
self._audio_stream = PushAudioInputStream(stream_format)
try:
stream_format = AudioStreamFormat(samples_per_second=self.sample_rate, channels=1)
self._audio_stream = PushAudioInputStream(stream_format)
audio_config = AudioConfig(stream=self._audio_stream)
audio_config = AudioConfig(stream=self._audio_stream)
self._speech_recognizer = SpeechRecognizer(
speech_config=self._speech_config, audio_config=audio_config
)
self._speech_recognizer.recognizing.connect(self._on_handle_recognizing)
self._speech_recognizer.recognized.connect(self._on_handle_recognized)
self._speech_recognizer.start_continuous_recognition_async()
self._speech_recognizer = SpeechRecognizer(
speech_config=self._speech_config, audio_config=audio_config
)
self._speech_recognizer.recognizing.connect(self._on_handle_recognizing)
self._speech_recognizer.recognized.connect(self._on_handle_recognized)
self._speech_recognizer.start_continuous_recognition_async()
except Exception as e:
logger.error(f"{self} exception during initialization: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
async def stop(self, frame: EndFrame):
"""Stop the speech recognition service.

View File

@@ -337,7 +337,7 @@ class AzureTTSService(AzureBaseTTSService):
if self._speech_synthesizer is None:
error_msg = "Speech synthesizer not initialized."
logger.error(error_msg)
yield ErrorFrame(error_msg)
yield ErrorFrame(error=error_msg)
return
try:
@@ -364,13 +364,15 @@ class AzureTTSService(AzureBaseTTSService):
yield TTSStoppedFrame()
except Exception as e:
logger.error(f"{self} error during synthesis: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
# Could add reconnection logic here if needed
return
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
class AzureHttpTTSService(AzureBaseTTSService):
@@ -448,3 +450,4 @@ class AzureHttpTTSService(AzureBaseTTSService):
logger.warning(f"Speech synthesis canceled: {cancellation_details.reason}")
if cancellation_details.reason == CancellationReason.Error:
logger.error(f"{self} error: {cancellation_details.error_details}")
yield ErrorFrame(error=f"{self} error: {cancellation_details.error_details}")

View File

@@ -20,6 +20,7 @@ from loguru import logger
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
@@ -275,7 +276,8 @@ class CartesiaSTTService(WebsocketSTTService):
self._websocket = await websocket_connect(ws_url, additional_headers=headers)
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self}: unable to connect to Cartesia: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
async def _disconnect_websocket(self):
try:
@@ -284,6 +286,7 @@ class CartesiaSTTService(WebsocketSTTService):
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._websocket = None
await self._call_event_handler("on_disconnected")
@@ -315,7 +318,9 @@ class CartesiaSTTService(WebsocketSTTService):
await self._on_transcript(data)
elif data["type"] == "error":
logger.error(f"Cartesia error: {data.get('message', 'Unknown error')}")
error_msg = data.get("message", "Unknown error")
logger.error(f"Cartesia error: {error_msg}")
await self.push_error(ErrorFrame(error=error_msg))
@traced_stt
async def _handle_transcription(

View File

@@ -397,7 +397,8 @@ class CartesiaTTSService(AudioContextWordTTSService):
)
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -409,7 +410,8 @@ class CartesiaTTSService(AudioContextWordTTSService):
logger.debug("Disconnecting from Cartesia")
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._context_id = None
self._websocket = None
@@ -465,7 +467,7 @@ class CartesiaTTSService(AudioContextWordTTSService):
logger.error(f"{self} error: {msg}")
await self.push_frame(TTSStoppedFrame())
await self.stop_all_metrics()
await self.push_error(ErrorFrame(f"{self} error: {msg['error']}"))
await self.push_error(ErrorFrame(error=f"{self} error: {msg['error']}"))
self._context_id = None
else:
logger.error(f"{self} error, unknown message type: {msg}")
@@ -506,7 +508,8 @@ class CartesiaTTSService(AudioContextWordTTSService):
await self._get_websocket().send(msg)
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} error sending message: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
@@ -514,6 +517,7 @@ class CartesiaTTSService(AudioContextWordTTSService):
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
class CartesiaHttpTTSService(TTSService):
@@ -705,7 +709,7 @@ class CartesiaHttpTTSService(TTSService):
if response.status != 200:
error_text = await response.text()
logger.error(f"Cartesia API error: {error_text}")
await self.push_error(ErrorFrame(f"Cartesia API error: {error_text}"))
await self.push_error(ErrorFrame(error=f"Cartesia API error: {error_text}"))
raise Exception(f"Cartesia API returned status {response.status}: {error_text}")
audio_data = await response.read()
@@ -722,7 +726,7 @@ class CartesiaHttpTTSService(TTSService):
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(f"Error generating TTS: {e}"))
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -191,7 +191,8 @@ class DeepgramFluxSTTService(WebsocketSTTService):
await self._disconnect_websocket()
except Exception as e:
logger.error(f"Error during disconnect: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
# Reset state only after everything is cleaned up
self._websocket = None
@@ -214,7 +215,8 @@ class DeepgramFluxSTTService(WebsocketSTTService):
logger.debug("Connected to Deepgram Flux Websocket")
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -233,6 +235,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._websocket = None
await self._call_event_handler("on_disconnected")
@@ -333,14 +336,14 @@ class DeepgramFluxSTTService(WebsocketSTTService):
"""
if not self._websocket:
logger.error("Not connected to Deepgram Flux.")
yield ErrorFrame("Not connected to Deepgram Flux.", fatal=True)
yield ErrorFrame("Not connected to Deepgram Flux.")
return
try:
await self._websocket.send(audio)
except Exception as e:
logger.error(f"Failed to send audio to Flux: {e}")
yield ErrorFrame(f"Failed to send audio to Flux: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
return
yield None
@@ -417,7 +420,8 @@ class DeepgramFluxSTTService(WebsocketSTTService):
# Skip malformed messages
continue
except Exception as e:
logger.error(f"Error processing message: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
# Error will be handled inside WebsocketService->_receive_task_handler
raise
else:

View File

@@ -256,7 +256,7 @@ class DeepgramSTTService(STTService):
async def _on_error(self, *args, **kwargs):
error: ErrorResponse = kwargs["error"]
logger.warning(f"{self} connection error, will retry: {error}")
await self.push_error(ErrorFrame(f"{error}"))
await self.push_error(ErrorFrame(error=f"{error}"))
await self.stop_all_metrics()
# NOTE(aleix): we don't disconnect (i.e. call finish on the connection)
# because this triggers more errors internally in the Deepgram SDK. So,

View File

@@ -125,8 +125,8 @@ class DeepgramTTSService(TTSService):
yield TTSStoppedFrame()
except Exception as e:
logger.exception(f"{self} exception: {e}")
yield ErrorFrame(f"Error getting audio: {str(e)}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
class DeepgramHttpTTSService(TTSService):

View File

@@ -351,8 +351,8 @@ class ElevenLabsSTTService(SegmentedSTTService):
)
except Exception as e:
logger.error(f"ElevenLabs STT error: {e}")
yield ErrorFrame(f"ElevenLabs STT error: {str(e)}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
def audio_format_from_sample_rate(sample_rate: int) -> str:

View File

@@ -424,7 +424,8 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
json.dumps({"context_id": self._context_id, "close_context": True})
)
except Exception as e:
logger.warning(f"Error closing context for voice settings update: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._context_id = None
self._started = False
@@ -535,8 +536,9 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
logger.error(f"{self} exception: {e}")
self._websocket = None
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self._call_event_handler("on_connection_error", f"{e}")
async def _disconnect_websocket(self):
@@ -551,7 +553,8 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
await self._websocket.close()
logger.debug("Disconnected from ElevenLabs")
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._started = False
self._context_id = None
@@ -581,7 +584,8 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
json.dumps({"context_id": self._context_id, "close_context": True})
)
except Exception as e:
logger.error(f"Error closing context on interruption: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._context_id = None
self._started = False
self._partial_word = ""
@@ -736,13 +740,15 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
else:
await self._send_text(text)
except Exception as e:
logger.error(f"{self} error sending message: {e}")
logger.error(f"{self} exception: {e}")
yield TTSStoppedFrame()
yield ErrorFrame(error=f"{self} error: {e}")
self._started = False
return
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
class ElevenLabsHttpTTSService(WordTTSService):
@@ -1085,7 +1091,8 @@ class ElevenLabsHttpTTSService(WordTTSService):
logger.warning(f"Failed to parse JSON from stream: {e}")
continue
except Exception as e:
logger.error(f"Error processing response: {e}", exc_info=True)
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
continue
# After processing all chunks, emit any remaining partial word
@@ -1109,8 +1116,8 @@ class ElevenLabsHttpTTSService(WordTTSService):
self._previous_text = text
except Exception as e:
logger.error(f"Error in run_tts: {e}")
yield ErrorFrame(error=str(e))
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
finally:
await self.stop_ttfb_metrics()
# Let the parent class handle TTSStoppedFrame

View File

@@ -290,5 +290,5 @@ class FalSTTService(SegmentedSTTService):
)
except Exception as e:
logger.error(f"Fal Wizper error: {e}")
yield ErrorFrame(f"Fal Wizper error: {str(e)}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")

View File

@@ -237,7 +237,8 @@ class FishAudioTTSService(InterruptibleTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"Fish Audio initialization error: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -251,7 +252,8 @@ class FishAudioTTSService(InterruptibleTTSService):
await self._websocket.send(ormsgpack.packb(stop_message))
await self._websocket.close()
except Exception as e:
logger.error(f"Error closing websocket: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._request_id = None
self._started = False
@@ -293,7 +295,8 @@ class FishAudioTTSService(InterruptibleTTSService):
continue
except Exception as e:
logger.error(f"Error processing message: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
@@ -329,7 +332,8 @@ class FishAudioTTSService(InterruptibleTTSService):
flush_message = {"event": "flush"}
await self._get_websocket().send(ormsgpack.packb(flush_message))
except Exception as e:
logger.error(f"{self} error sending message: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
@@ -337,5 +341,5 @@ class FishAudioTTSService(InterruptibleTTSService):
yield None
except Exception as e:
logger.error(f"Error generating TTS: {e}")
yield ErrorFrame(f"Error: {str(e)}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")

View File

@@ -23,6 +23,7 @@ from pipecat import __version__ as pipecat_version
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
@@ -467,7 +468,8 @@ class GladiaSTTService(STTService):
break
except Exception as e:
logger.error(f"Error in connection handler: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._connection_active = False
if not self._should_reconnect:
@@ -557,7 +559,8 @@ class GladiaSTTService(STTService):
except websockets.exceptions.ConnectionClosed:
logger.debug("Connection closed during keepalive")
except Exception as e:
logger.error(f"Error in Gladia keepalive task: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
async def _receive_task_handler(self):
try:
@@ -620,7 +623,8 @@ class GladiaSTTService(STTService):
# Expected when closing the connection
pass
except Exception as e:
logger.error(f"Error in Gladia WebSocket handler: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
async def _maybe_reconnect(self) -> bool:
"""Handle exponential backoff reconnection logic."""

View File

@@ -1174,7 +1174,7 @@ class GeminiLiveLLMService(LLMService):
self._connection_task = self.create_task(self._connection_task_handler(config=config))
except Exception as e:
await self.push_error(ErrorFrame(error=f"{self} Initialization error: {e}", fatal=True))
await self.push_error(ErrorFrame(error=f"{self} Initialization error: {e}"))
async def _connection_task_handler(self, config: LiveConnectConfig):
async with self._client.aio.live.connect(model=self._model_name, config=config) as session:
@@ -1255,9 +1255,7 @@ class GeminiLiveLLMService(LLMService):
f"Max consecutive failures ({MAX_CONSECUTIVE_FAILURES}) reached, "
"treating as fatal error"
)
await self.push_error(
ErrorFrame(error=f"{self} Error in receive loop: {error}", fatal=True)
)
await self.push_error(ErrorFrame(error=f"{self} Error in receive loop: {error}"))
return False
else:
logger.info(

View File

@@ -774,7 +774,8 @@ class GoogleSTTService(STTService):
yield cloud_speech.StreamingRecognizeRequest(audio=audio_data)
except Exception as e:
logger.error(f"Error in request generator: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
raise
async def _stream_audio(self):
@@ -805,14 +806,15 @@ class GoogleSTTService(STTService):
break
except Exception as e:
logger.warning(f"{self} Reconnecting: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await asyncio.sleep(1) # Brief delay before reconnecting
self._stream_start_time = int(time.time() * 1000)
except Exception as e:
logger.error(f"Error in streaming task: {e}")
await self.push_frame(ErrorFrame(str(e)))
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""Process an audio chunk for STT transcription.
@@ -900,7 +902,8 @@ class GoogleSTTService(STTService):
)
raise
except Exception as e:
logger.error(f"Error processing Google STT responses: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
# Re-raise the exception to let it propagate (e.g. in the case of a
# timeout, propagate to _stream_audio to reconnect)
raise

View File

@@ -746,7 +746,7 @@ class GoogleHttpTTSService(TTSService):
yield TTSStoppedFrame()
except Exception as e:
logger.exception(f"{self} error generating TTS: {e}")
logger.error(f"{self} exception: {e}")
error_message = f"TTS generation error: {str(e)}"
yield ErrorFrame(error=error_message)
@@ -1014,7 +1014,7 @@ class GoogleTTSService(GoogleBaseTTSService):
yield frame
except Exception as e:
logger.exception(f"{self} error generating TTS: {e}")
logger.error(f"{self} exception: {e}")
error_message = f"TTS generation error: {str(e)}"
yield ErrorFrame(error=error_message)
@@ -1266,6 +1266,6 @@ class GeminiTTSService(GoogleBaseTTSService):
yield frame
except Exception as e:
logger.exception(f"{self} error generating TTS: {e}")
logger.error(f"{self} exception: {e}")
error_message = f"Gemini TTS generation error: {str(e)}"
yield ErrorFrame(error=error_message)

View File

@@ -13,7 +13,13 @@ from typing import AsyncGenerator, Optional
from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import Frame, TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame
from pipecat.frames.frames import (
ErrorFrame,
Frame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.services.tts_service import TTSService
from pipecat.transcriptions.language import Language
from pipecat.utils.tracing.service_decorators import traced_tts
@@ -150,5 +156,6 @@ class GroqTTSService(TTSService):
yield TTSAudioRawFrame(bytes, frame_rate, channels)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()

View File

@@ -225,8 +225,8 @@ class HumeTTSService(TTSService):
self._audio_bytes = b""
except Exception as e:
logger.exception(f"{self} error generating TTS: {e}")
await self.push_error(ErrorFrame(f"Error generating TTS: {e}"))
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
# Ensure TTFB timer is stopped even on early failures
await self.stop_ttfb_metrics()

View File

@@ -374,7 +374,7 @@ class InworldTTSService(TTSService):
if response.status != 200:
error_text = await response.text()
logger.error(f"Inworld API error: {error_text}")
await self.push_error(ErrorFrame(f"Inworld API error: {error_text}"))
yield ErrorFrame(error=f"Inworld API error: {error_text}")
return
# ================================================================================
@@ -402,7 +402,7 @@ class InworldTTSService(TTSService):
# ================================================================================
# Log any unexpected errors and notify the pipeline
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(f"Error generating TTS: {e}"))
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
# ================================================================================
# STEP 8: CLEANUP AND COMPLETION
@@ -517,7 +517,7 @@ class InworldTTSService(TTSService):
# Extract the base64-encoded audio content from response
if "audioContent" not in response_data:
logger.error("No audioContent in Inworld API response")
await self.push_error(ErrorFrame("No audioContent in response"))
await self.push_error(ErrorFrame(error="No audioContent in response"))
return
# ================================================================================

View File

@@ -223,7 +223,8 @@ class LmntTTSService(InterruptibleTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -239,7 +240,8 @@ class LmntTTSService(InterruptibleTTSService):
# await self._websocket.send(json.dumps({"eof": True}))
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._started = False
self._websocket = None
@@ -276,7 +278,7 @@ class LmntTTSService(InterruptibleTTSService):
logger.error(f"{self} error: {msg['error']}")
await self.push_frame(TTSStoppedFrame())
await self.stop_all_metrics()
await self.push_error(ErrorFrame(f"{self} error: {msg['error']}"))
await self.push_error(ErrorFrame(error=f"{self} error: {msg['error']}"))
return
except json.JSONDecodeError:
logger.error(f"Invalid JSON message: {message}")
@@ -309,7 +311,8 @@ class LmntTTSService(InterruptibleTTSService):
await self._get_websocket().send(json.dumps({"flush": True}))
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} error sending message: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
@@ -317,3 +320,4 @@ class LmntTTSService(InterruptibleTTSService):
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")

View File

@@ -347,8 +347,8 @@ class MiniMaxHttpTTSService(TTSService):
continue
except Exception as e:
logger.exception(f"Error generating TTS: {e}")
yield ErrorFrame(error=f"MiniMax TTS error: {str(e)}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -294,7 +294,8 @@ class NeuphonicTTSService(InterruptibleTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -307,7 +308,8 @@ class NeuphonicTTSService(InterruptibleTTSService):
logger.debug("Disconnecting from Neuphonic")
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._started = False
self._websocket = None
@@ -372,7 +374,8 @@ class NeuphonicTTSService(InterruptibleTTSService):
await self._send_text(text)
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} error sending message: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
@@ -380,6 +383,7 @@ class NeuphonicTTSService(InterruptibleTTSService):
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
class NeuphonicHttpTTSService(TTSService):
@@ -582,7 +586,8 @@ class NeuphonicHttpTTSService(TTSService):
yield TTSAudioRawFrame(audio_bytes, self.sample_rate, 1)
except Exception as e:
logger.error(f"Error processing SSE message: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
# Don't yield error frame for individual message failures
continue
@@ -590,8 +595,8 @@ class NeuphonicHttpTTSService(TTSService):
logger.debug("TTS generation cancelled")
raise
except Exception as e:
logger.exception(f"Error in run_tts: {e}")
yield ErrorFrame(error=f"Neuphonic TTS error: {str(e)}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -478,7 +478,7 @@ class OpenAIRealtimeLLMService(LLMService):
# it is to recover from a send-side error with proper state management, and that exponential
# backoff for retries can have cost/stability implications for a service cluster, let's just
# treat a send-side error as fatal.
await self.push_error(ErrorFrame(error=f"Error sending client event: {e}", fatal=True))
await self.push_error(ErrorFrame(error=f"Error sending client event: {e}"))
async def _update_settings(self):
settings = self._session_properties
@@ -667,9 +667,7 @@ class OpenAIRealtimeLLMService(LLMService):
self._current_assistant_response = None
# error handling
if evt.response.status == "failed":
await self.push_error(
ErrorFrame(error=evt.response.status_details["error"]["message"], fatal=True)
)
await self.push_error(ErrorFrame(error=evt.response.status_details["error"]["message"]))
return
# response content
for item in evt.response.output:
@@ -763,7 +761,7 @@ class OpenAIRealtimeLLMService(LLMService):
async def _handle_evt_error(self, evt):
# Errors are fatal to this connection. Send an ErrorFrame.
await self.push_error(ErrorFrame(error=f"Error: {evt}", fatal=True))
await self.push_error(ErrorFrame(error=f"Error: {evt}"))
#
# state and client events for the current conversation

View File

@@ -199,7 +199,7 @@ class OpenAITTSService(TTSService):
f"{self} error getting audio (status: {r.status_code}, error: {error})"
)
yield ErrorFrame(
f"Error getting audio (status: {r.status_code}, error: {error})"
error=f"Error getting audio (status: {r.status_code}, error: {error})"
)
return
@@ -216,3 +216,4 @@ class OpenAITTSService(TTSService):
yield TTSStoppedFrame()
except BadRequestError as e:
logger.exception(f"{self} error generating TTS: {e}")
yield ErrorFrame(error=f"{self} error: {e}")

View File

@@ -454,7 +454,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
# it is to recover from a send-side error with proper state management, and that exponential
# backoff for retries can have cost/stability implications for a service cluster, let's just
# treat a send-side error as fatal.
await self.push_error(ErrorFrame(error=f"Error sending client event: {e}", fatal=True))
await self.push_error(ErrorFrame(error=f"Error sending client event: {e}"))
async def _update_settings(self):
settings = self._session_properties
@@ -627,9 +627,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
self._current_assistant_response = None
# error handling
if evt.response.status == "failed":
await self.push_error(
ErrorFrame(error=evt.response.status_details["error"]["message"], fatal=True)
)
await self.push_error(ErrorFrame(error=evt.response.status_details["error"]["message"]))
return
# response content
for item in evt.response.output:
@@ -687,7 +685,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
async def _handle_evt_error(self, evt):
# Errors are fatal to this connection. Send an ErrorFrame.
await self.push_error(ErrorFrame(error=f"Error: {evt}", fatal=True))
await self.push_error(ErrorFrame(error=f"Error: {evt}"))
async def _handle_assistant_output(self, output):
# We haven't seen intermixed audio and function_call items in the same response. But let's

View File

@@ -101,7 +101,7 @@ class PiperTTSService(TTSService):
f"{self} error getting audio (status: {response.status}, error: {error})"
)
yield ErrorFrame(
f"Error getting audio (status: {response.status}, error: {error})"
error=f"Error getting audio (status: {response.status}, error: {error})"
)
return
@@ -117,8 +117,8 @@ class PiperTTSService(TTSService):
await self.stop_ttfb_metrics()
yield frame
except Exception as e:
logger.error(f"Error in run_tts: {e}")
yield ErrorFrame(error=str(e))
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
finally:
logger.debug(f"{self}: Finished TTS [{text}]")
await self.stop_ttfb_metrics()

View File

@@ -266,7 +266,8 @@ class PlayHTTTSService(InterruptibleTTSService):
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -279,7 +280,8 @@ class PlayHTTTSService(InterruptibleTTSService):
logger.debug("Disconnecting from PlayHT")
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._request_id = None
self._websocket = None
@@ -350,7 +352,7 @@ class PlayHTTTSService(InterruptibleTTSService):
self._request_id = None
elif "error" in msg:
logger.error(f"{self} error: {msg}")
await self.push_error(ErrorFrame(f"{self} error: {msg['error']}"))
await self.push_error(ErrorFrame(error=f"{self} error: {msg['error']}"))
except json.JSONDecodeError:
logger.error(f"Invalid JSON message: {message}")
@@ -392,7 +394,8 @@ class PlayHTTTSService(InterruptibleTTSService):
await self._get_websocket().send(json.dumps(tts_command))
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} error sending message: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
@@ -402,8 +405,8 @@ class PlayHTTTSService(InterruptibleTTSService):
yield None
except Exception as e:
logger.error(f"{self} error generating TTS: {e}")
yield ErrorFrame(f"{self} error: {str(e)}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
class PlayHTHttpTTSService(TTSService):
@@ -623,7 +626,8 @@ class PlayHTHttpTTSService(TTSService):
yield frame
except Exception as e:
logger.error(f"{self} error generating TTS: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -259,7 +259,8 @@ class RimeTTSService(AudioContextWordTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -271,7 +272,8 @@ class RimeTTSService(AudioContextWordTTSService):
await self._websocket.send(json.dumps(self._build_eos_msg()))
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._context_id = None
self._websocket = None
@@ -367,7 +369,7 @@ class RimeTTSService(AudioContextWordTTSService):
logger.error(f"{self} error: {msg}")
await self.push_frame(TTSStoppedFrame())
await self.stop_all_metrics()
await self.push_error(ErrorFrame(f"{self} error: {msg['message']}"))
await self.push_error(ErrorFrame(error=f"{self} error: {msg['message']}"))
self._context_id = None
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
@@ -409,7 +411,8 @@ class RimeTTSService(AudioContextWordTTSService):
await self._get_websocket().send(json.dumps(msg))
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} error sending message: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
@@ -417,6 +420,7 @@ class RimeTTSService(AudioContextWordTTSService):
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
class RimeHttpTTSService(TTSService):
@@ -574,8 +578,8 @@ class RimeHttpTTSService(TTSService):
yield frame
except Exception as e:
logger.exception(f"Error generating TTS: {e}")
yield ErrorFrame(error=f"Rime TTS error: {str(e)}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -659,8 +659,8 @@ class RivaSegmentedSTTService(SegmentedSTTService):
yield ErrorFrame(f"Unexpected Riva response format: {str(ae)}")
except Exception as e:
logger.exception(f"Riva Canary ASR error: {e}")
yield ErrorFrame(f"Riva Canary ASR error: {str(e)}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
class ParakeetSTTService(RivaSTTService):

View File

@@ -23,6 +23,7 @@ from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import (
ErrorFrame,
Frame,
TTSAudioRawFrame,
TTSStartedFrame,
@@ -165,6 +166,7 @@ class RivaTTSService(TTSService):
add_response(None)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
add_response(None)
await self.start_ttfb_metrics()

View File

@@ -264,7 +264,7 @@ class SarvamHttpTTSService(TTSService):
if response.status != 200:
error_text = await response.text()
logger.error(f"Sarvam API error: {error_text}")
await self.push_error(ErrorFrame(f"Sarvam API error: {error_text}"))
await self.push_error(ErrorFrame(error=f"Sarvam API error: {error_text}"))
return
response_data = await response.json()
@@ -274,7 +274,7 @@ class SarvamHttpTTSService(TTSService):
# Decode base64 audio data
if "audios" not in response_data or not response_data["audios"]:
logger.error("No audio data received from Sarvam API")
await self.push_error(ErrorFrame("No audio data received"))
await self.push_error(ErrorFrame(error="No audio data received"))
return
# Get the first audio (there should be only one for single text input)
@@ -296,7 +296,7 @@ class SarvamHttpTTSService(TTSService):
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(f"Error generating TTS: {e}"))
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()
@@ -578,7 +578,8 @@ class SarvamTTSService(InterruptibleTTSService):
await self._disconnect_websocket()
except Exception as e:
logger.error(f"Error during disconnect: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
# Reset state only after everything is cleaned up
self._started = False
@@ -602,7 +603,8 @@ class SarvamTTSService(InterruptibleTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -618,8 +620,8 @@ class SarvamTTSService(InterruptibleTTSService):
await self._websocket.send(json.dumps(config_message))
logger.debug("Configuration sent successfully")
except Exception as e:
logger.error(f"Failed to send config: {str(e)}")
await self.push_frame(ErrorFrame(f"Failed to send config: {str(e)}"))
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
raise
async def _disconnect_websocket(self):
@@ -632,6 +634,7 @@ class SarvamTTSService(InterruptibleTTSService):
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._started = False
self._websocket = None
@@ -661,7 +664,7 @@ class SarvamTTSService(InterruptibleTTSService):
if "too long" in error_msg.lower() or "timeout" in error_msg.lower():
logger.warning("Connection timeout detected, service may need restart")
await self.push_frame(ErrorFrame(f"TTS Error: {error_msg}"))
await self.push_frame(ErrorFrame(error=f"TTS Error: {error_msg}"))
async def _keepalive_task_handler(self):
"""Handle keepalive messages to maintain WebSocket connection."""
@@ -717,7 +720,8 @@ class SarvamTTSService(InterruptibleTTSService):
await self._send_text(text)
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} error sending message: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
@@ -725,3 +729,4 @@ class SarvamTTSService(InterruptibleTTSService):
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")

View File

@@ -327,8 +327,8 @@ class SonioxSTTService(STTService):
# Expected when closing the connection
logger.debug("WebSocket connection closed, keepalive task stopped.")
except Exception as e:
logger.error(f"{self} error (_keepalive_task_handler): {e}")
await self.push_error(ErrorFrame(f"{self} error (_keepalive_task_handler): {e}"))
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
async def _receive_task_handler(self):
if not self._websocket:
@@ -409,7 +409,7 @@ class SonioxSTTService(STTService):
)
await self.push_error(
ErrorFrame(
f"{self} error: {error_code} (_receive_task_handler) - {error_message}"
error=f"{self} error: {error_code} (_receive_task_handler) - {error_message}"
)
)
@@ -425,5 +425,5 @@ class SonioxSTTService(STTService):
# Expected when closing the connection.
pass
except Exception as e:
logger.error(f"{self} error: {e}")
await self.push_error(ErrorFrame(f"{self} error: {e}"))
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))

View File

@@ -467,8 +467,8 @@ class SpeechmaticsSTTService(STTService):
await self._client.send_audio(audio)
yield None
except Exception as e:
logger.error(f"Speechmatics error: {e}")
yield ErrorFrame(f"Speechmatics error: {e}", fatal=False)
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
await self._disconnect()
def update_params(
@@ -514,6 +514,8 @@ class SpeechmaticsSTTService(STTService):
self._client.send_message(payload), self.get_event_loop()
)
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
raise RuntimeError(f"error sending message to STT: {e}")
async def _connect(self) -> None:
@@ -579,7 +581,8 @@ class SpeechmaticsSTTService(STTService):
logger.debug(f"{self} Connected to Speechmatics STT service")
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} Error connecting to Speechmatics: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._client = None
async def _disconnect(self) -> None:
@@ -593,7 +596,8 @@ class SpeechmaticsSTTService(STTService):
except asyncio.TimeoutError:
logger.warning(f"{self} Timeout while closing Speechmatics client connection")
except Exception as e:
logger.error(f"{self} Error closing Speechmatics client: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._client = None
await self._call_event_handler("on_disconnected")

View File

@@ -246,7 +246,8 @@ class UltravoxSTTService(AIService):
logger.info("Model warm-up completed successfully")
except Exception as e:
logger.warning(f"Model warm-up failed: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
def _generate_silent_audio(self, sample_rate=16000, duration_sec=1.0):
"""Generate silent audio as a numpy array.
@@ -376,7 +377,7 @@ class UltravoxSTTService(AIService):
if arr.size > 0: # Check if array is not empty
audio_arrays.append(arr)
except Exception as e:
logger.error(f"Error processing bytes audio frame: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
# Handle numpy array data
elif isinstance(f.audio, np.ndarray):
if f.audio.size > 0: # Check if array is not empty
@@ -436,14 +437,14 @@ class UltravoxSTTService(AIService):
yield LLMFullResponseEndFrame()
except Exception as e:
logger.error(f"Error generating text from model: {e}")
yield ErrorFrame(f"Error generating text: {str(e)}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
else:
logger.warning("No model available for text generation")
logger.error("No model available for text generation")
yield ErrorFrame("No model available for text generation")
except Exception as e:
logger.error(f"Error processing audio buffer: {e}")
logger.error(f"{self} exception: {e}")
import traceback
logger.error(traceback.format_exc())

View File

@@ -94,7 +94,7 @@ class WebsocketService(ABC):
if self._reconnect_on_error:
retry_count += 1
if retry_count >= MAX_RETRIES:
await report_error(ErrorFrame(message, fatal=True))
await report_error(ErrorFrame(message))
break
logger.warning(f"{self} connection error, will retry: {e}")

View File

@@ -226,8 +226,8 @@ class BaseWhisperSTTService(SegmentedSTTService):
logger.warning("Received empty transcription from API")
except Exception as e:
logger.exception(f"Exception during transcription: {e}")
yield ErrorFrame(f"Error during transcription: {str(e)}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
async def _transcribe(self, audio: bytes) -> Transcription:
"""Transcribe audio data to text.

View File

@@ -428,5 +428,5 @@ class WhisperSTTServiceMLX(WhisperSTTService):
)
except Exception as e:
logger.exception(f"MLX Whisper transcription error: {e}")
yield ErrorFrame(f"MLX Whisper transcription error: {str(e)}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")

View File

@@ -146,7 +146,7 @@ class XTTSService(TTSService):
)
await self.push_error(
ErrorFrame(
f"Error error getting studio speakers (status: {r.status}, error: {text})"
error=f"Error getting studio speakers (status: {r.status}, error: {text})"
)
)
return
@@ -187,7 +187,7 @@ class XTTSService(TTSService):
if r.status != 200:
text = await r.text()
logger.error(f"{self} error getting audio (status: {r.status}, error: {text})")
yield ErrorFrame(f"Error getting audio (status: {r.status}, error: {text})")
yield ErrorFrame(error=f"Error getting audio (status: {r.status}, error: {text})")
return
await self.start_tts_usage_metrics(text)