From 3b0affe5b4006a4c3869fa1c0b2e9c55a9519936 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Wed, 22 Apr 2026 11:03:41 -0400 Subject: [PATCH] Guard run_stt WebSocket sends with try/except AssemblyAI, Cartesia, Gradium, and Soniox STT services sent audio over the WebSocket without catching transient send failures, so a single network hiccup could propagate an exception up through process_frame and end the pipeline. Other push-based STT services (Deepgram, xAI, Azure, Smallest, etc.) already guard their sends. Follow the deepgram/stt.py pattern: log a warning and continue. The existing connection-state check at the top of each call handles recovery on the next invocation. --- src/pipecat/services/assemblyai/stt.py | 6 +++++- src/pipecat/services/cartesia/stt.py | 5 ++++- src/pipecat/services/gradium/stt.py | 6 +++++- src/pipecat/services/soniox/stt.py | 5 ++++- 4 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/pipecat/services/assemblyai/stt.py b/src/pipecat/services/assemblyai/stt.py index 6b2607587..f87a5a0e5 100644 --- a/src/pipecat/services/assemblyai/stt.py +++ b/src/pipecat/services/assemblyai/stt.py @@ -433,7 +433,11 @@ class AssemblyAISTTService(WebsocketSTTService): while len(self._audio_buffer) >= self._chunk_size_bytes: chunk = bytes(self._audio_buffer[: self._chunk_size_bytes]) self._audio_buffer = self._audio_buffer[self._chunk_size_bytes :] - await self._websocket.send(chunk) + try: + await self._websocket.send(chunk) + except Exception as e: + logger.warning(f"{self}: send failed: {e}") + break yield None diff --git a/src/pipecat/services/cartesia/stt.py b/src/pipecat/services/cartesia/stt.py index 191f778b3..018d95e6a 100644 --- a/src/pipecat/services/cartesia/stt.py +++ b/src/pipecat/services/cartesia/stt.py @@ -290,7 +290,10 @@ class CartesiaSTTService(WebsocketSTTService): if not self._websocket or self._websocket.state is not State.OPEN: await self._connect() - await self._websocket.send(audio) + try: + await self._websocket.send(audio) + except Exception as e: + logger.warning(f"{self}: send failed: {e}") yield None async def _connect(self): diff --git a/src/pipecat/services/gradium/stt.py b/src/pipecat/services/gradium/stt.py index 70fde8145..412f0a1b3 100644 --- a/src/pipecat/services/gradium/stt.py +++ b/src/pipecat/services/gradium/stt.py @@ -350,7 +350,11 @@ class GradiumSTTService(WebsocketSTTService): chunk = base64.b64encode(chunk).decode("utf-8") msg = {"type": "audio", "audio": chunk} if self._websocket and self._websocket.state is State.OPEN: - await self._websocket.send(json.dumps(msg)) + try: + await self._websocket.send(json.dumps(msg)) + except Exception as e: + logger.warning(f"{self}: send failed: {e}") + break yield None diff --git a/src/pipecat/services/soniox/stt.py b/src/pipecat/services/soniox/stt.py index 30c813c9d..95418011c 100644 --- a/src/pipecat/services/soniox/stt.py +++ b/src/pipecat/services/soniox/stt.py @@ -412,7 +412,10 @@ class SonioxSTTService(WebsocketSTTService): Frame: None (transcription results come via WebSocket callbacks). """ if self._websocket and self._websocket.state is State.OPEN: - await self._websocket.send(audio) + try: + await self._websocket.send(audio) + except Exception as e: + logger.warning(f"{self}: send failed: {e}") yield None