From 59c3abeb92567ded29edd435b7544de53d1db8e9 Mon Sep 17 00:00:00 2001 From: jay Date: Mon, 15 Dec 2025 23:06:35 +0530 Subject: [PATCH 1/4] fix issue with infinite loop when websocket disconnects --- src/pipecat/services/elevenlabs/stt.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/pipecat/services/elevenlabs/stt.py b/src/pipecat/services/elevenlabs/stt.py index 5fa04d1c1..78a16e1e5 100644 --- a/src/pipecat/services/elevenlabs/stt.py +++ b/src/pipecat/services/elevenlabs/stt.py @@ -690,8 +690,8 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService): return self._websocket raise Exception("Websocket not connected") - async def _process_messages(self): - """Process incoming WebSocket messages.""" + async def _receive_messages(self): + """Continuously receive and process WebSocket messages.""" async for message in self._get_websocket(): try: data = json.loads(message) @@ -700,13 +700,6 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService): logger.warning(f"Received non-JSON message: {message}") except Exception as e: logger.error(f"Error processing message: {e}") - - async def _receive_messages(self): - """Continuously receive and process WebSocket messages.""" - try: - await self._process_messages() - except Exception as e: - logger.warning(f"{self} WebSocket connection closed: {e}") # Connection closed, will reconnect on next audio chunk async def _process_response(self, data: dict): From 7e424d750ebee48d8ff8af6f009b8257acd41dc5 Mon Sep 17 00:00:00 2001 From: jay Date: Mon, 15 Dec 2025 23:18:44 +0530 Subject: [PATCH 2/4] improve error handling to log all error types --- src/pipecat/services/elevenlabs/stt.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/pipecat/services/elevenlabs/stt.py b/src/pipecat/services/elevenlabs/stt.py index 78a16e1e5..99f1b28de 100644 --- a/src/pipecat/services/elevenlabs/stt.py +++ b/src/pipecat/services/elevenlabs/stt.py @@ -710,6 +710,8 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService): """ message_type = data.get("message_type") + error_types = ["transcriber_error", "input_error", "commit_throttled", "transcriber_error", "unaccepted_terms_error", "rate_limited", "queue_overflow", "resource_exhausted", "session_time_limit_exceeded", "chunk_size_exceeded", "insufficient_audio_activity"] + if message_type == "session_started": logger.debug(f"ElevenLabs session started: {data}") @@ -731,12 +733,14 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService): error_msg = data.get("error", "Authentication error") logger.error(f"ElevenLabs auth error: {error_msg}") await self.push_error(error_msg=f"Auth error: {error_msg}") - elif message_type == "quota_exceeded_error": error_msg = data.get("error", "Quota exceeded") logger.error(f"ElevenLabs quota exceeded: {error_msg}") await self.push_error(error_msg=f"Quota exceeded: {error_msg}") - + elif message_type in error_types: + error_msg = data.get("error", message_type) + logger.error(f"ElevenLabs socket error: {error_msg}") + await self.push_error(error_msg=f"ElevenLabs socket error: {error_msg}") else: logger.debug(f"Unknown message type: {message_type}") From 83a3295a39111f38ff7726ffdb8c0cce992747d6 Mon Sep 17 00:00:00 2001 From: jay Date: Thu, 18 Dec 2025 00:03:47 +0530 Subject: [PATCH 3/4] update error handling based on code review --- src/pipecat/services/elevenlabs/stt.py | 35 +++++++++++++------------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/src/pipecat/services/elevenlabs/stt.py b/src/pipecat/services/elevenlabs/stt.py index 99f1b28de..8169faa99 100644 --- a/src/pipecat/services/elevenlabs/stt.py +++ b/src/pipecat/services/elevenlabs/stt.py @@ -710,8 +710,6 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService): """ message_type = data.get("message_type") - error_types = ["transcriber_error", "input_error", "commit_throttled", "transcriber_error", "unaccepted_terms_error", "rate_limited", "queue_overflow", "resource_exhausted", "session_time_limit_exceeded", "chunk_size_exceeded", "insufficient_audio_activity"] - if message_type == "session_started": logger.debug(f"ElevenLabs session started: {data}") @@ -724,23 +722,24 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService): elif message_type == "committed_transcript_with_timestamps": await self._on_committed_transcript_with_timestamps(data) - elif message_type == "error": - error_msg = data.get("error", "Unknown error") - logger.error(f"ElevenLabs error: {error_msg}") + elif message_type in ( + "error", + "auth_error", + "quota_exceeded_error", + "transcriber_error", + "input_error", + "commit_throttled", + "transcriber_error", + "unaccepted_terms_error", + "rate_limited", + "queue_overflow", + "resource_exhausted", + "session_time_limit_exceeded", + "chunk_size_exceeded", + "insufficient_audio_activity", + ): + error_msg = data.get("error", f"Unknown error - {message_type}") await self.push_error(error_msg=f"Error: {error_msg}") - - elif message_type == "auth_error": - error_msg = data.get("error", "Authentication error") - logger.error(f"ElevenLabs auth error: {error_msg}") - await self.push_error(error_msg=f"Auth error: {error_msg}") - elif message_type == "quota_exceeded_error": - error_msg = data.get("error", "Quota exceeded") - logger.error(f"ElevenLabs quota exceeded: {error_msg}") - await self.push_error(error_msg=f"Quota exceeded: {error_msg}") - elif message_type in error_types: - error_msg = data.get("error", message_type) - logger.error(f"ElevenLabs socket error: {error_msg}") - await self.push_error(error_msg=f"ElevenLabs socket error: {error_msg}") else: logger.debug(f"Unknown message type: {message_type}") From 614d5e0d19efa6382aa3d528b1ceeca2276f0e76 Mon Sep 17 00:00:00 2001 From: jay Date: Thu, 18 Dec 2025 00:08:30 +0530 Subject: [PATCH 4/4] add changelog --- changelog/3233.fixed.md | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 changelog/3233.fixed.md diff --git a/changelog/3233.fixed.md b/changelog/3233.fixed.md new file mode 100644 index 000000000..3f17fd765 --- /dev/null +++ b/changelog/3233.fixed.md @@ -0,0 +1,2 @@ +- Improved error handling in `ElevenLabsRealtimeSTTService` +- Fixed an issue in `ElevenLabsRealtimeSTTService` causing an infinite loop that blocks the process if the websocket disconnects due to an error \ No newline at end of file