diff --git a/changelog/3233.fixed.md b/changelog/3233.fixed.md new file mode 100644 index 000000000..3f17fd765 --- /dev/null +++ b/changelog/3233.fixed.md @@ -0,0 +1,2 @@ +- Improved error handling in `ElevenLabsRealtimeSTTService` +- Fixed an issue in `ElevenLabsRealtimeSTTService` causing an infinite loop that blocks the process if the websocket disconnects due to an error \ No newline at end of file diff --git a/src/pipecat/services/elevenlabs/stt.py b/src/pipecat/services/elevenlabs/stt.py index 5fa04d1c1..8169faa99 100644 --- a/src/pipecat/services/elevenlabs/stt.py +++ b/src/pipecat/services/elevenlabs/stt.py @@ -690,8 +690,8 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService): return self._websocket raise Exception("Websocket not connected") - async def _process_messages(self): - """Process incoming WebSocket messages.""" + async def _receive_messages(self): + """Continuously receive and process WebSocket messages.""" async for message in self._get_websocket(): try: data = json.loads(message) @@ -700,13 +700,6 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService): logger.warning(f"Received non-JSON message: {message}") except Exception as e: logger.error(f"Error processing message: {e}") - - async def _receive_messages(self): - """Continuously receive and process WebSocket messages.""" - try: - await self._process_messages() - except Exception as e: - logger.warning(f"{self} WebSocket connection closed: {e}") # Connection closed, will reconnect on next audio chunk async def _process_response(self, data: dict): @@ -729,21 +722,24 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService): elif message_type == "committed_transcript_with_timestamps": await self._on_committed_transcript_with_timestamps(data) - elif message_type == "error": - error_msg = data.get("error", "Unknown error") - logger.error(f"ElevenLabs error: {error_msg}") + elif message_type in ( + "error", + "auth_error", + "quota_exceeded_error", + "transcriber_error", + "input_error", + "commit_throttled", + "transcriber_error", + "unaccepted_terms_error", + "rate_limited", + "queue_overflow", + "resource_exhausted", + "session_time_limit_exceeded", + "chunk_size_exceeded", + "insufficient_audio_activity", + ): + error_msg = data.get("error", f"Unknown error - {message_type}") await self.push_error(error_msg=f"Error: {error_msg}") - - elif message_type == "auth_error": - error_msg = data.get("error", "Authentication error") - logger.error(f"ElevenLabs auth error: {error_msg}") - await self.push_error(error_msg=f"Auth error: {error_msg}") - - elif message_type == "quota_exceeded_error": - error_msg = data.get("error", "Quota exceeded") - logger.error(f"ElevenLabs quota exceeded: {error_msg}") - await self.push_error(error_msg=f"Quota exceeded: {error_msg}") - else: logger.debug(f"Unknown message type: {message_type}")