From 97b00042df8717176dc5abb86bd80c411d3d38fc Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Mon, 18 May 2026 12:35:01 -0400 Subject: [PATCH 1/3] Align websocket STT connection failures --- src/pipecat/services/assemblyai/stt.py | 2 +- src/pipecat/services/aws/stt.py | 2 +- src/pipecat/services/cartesia/stt.py | 3 +- src/pipecat/services/elevenlabs/stt.py | 1 + src/pipecat/services/gladia/stt.py | 3 +- src/pipecat/services/gradium/stt.py | 4 +-- src/pipecat/services/soniox/stt.py | 2 +- src/pipecat/services/xai/stt.py | 3 +- tests/test_cartesia_stt.py | 45 ++++++++++++++++++++++++++ tests/test_soniox_stt.py | 21 +++++++++++- 10 files changed, 77 insertions(+), 9 deletions(-) create mode 100644 tests/test_cartesia_stt.py diff --git a/src/pipecat/services/assemblyai/stt.py b/src/pipecat/services/assemblyai/stt.py index 5eda4b7f6..d6c663dda 100644 --- a/src/pipecat/services/assemblyai/stt.py +++ b/src/pipecat/services/assemblyai/stt.py @@ -586,9 +586,9 @@ class AssemblyAISTTService(WebsocketSTTService): await self._call_event_handler("on_connected") logger.debug(f"{self} Connected to AssemblyAI WebSocket") except Exception as e: + self._websocket = None self._connected = False await self.push_error(error_msg=f"Unable to connect to AssemblyAI: {e}", exception=e) - raise async def _disconnect_websocket(self): """Close the websocket connection to AssemblyAI.""" diff --git a/src/pipecat/services/aws/stt.py b/src/pipecat/services/aws/stt.py index 6a427d707..3e60ed110 100644 --- a/src/pipecat/services/aws/stt.py +++ b/src/pipecat/services/aws/stt.py @@ -339,10 +339,10 @@ class AWSTranscribeSTTService(WebsocketSTTService): await self._call_event_handler("on_connected") logger.info(f"{self} Successfully connected to AWS Transcribe") except Exception as e: + self._websocket = None await self.push_error( error_msg=f"Unable to connect to AWS Transcribe: {e}", exception=e ) - raise async def _disconnect_websocket(self): """Close the websocket connection to AWS Transcribe.""" diff --git a/src/pipecat/services/cartesia/stt.py b/src/pipecat/services/cartesia/stt.py index af3418edd..2d3eb8c96 100644 --- a/src/pipecat/services/cartesia/stt.py +++ b/src/pipecat/services/cartesia/stt.py @@ -354,7 +354,8 @@ class CartesiaSTTService(WebsocketSTTService): self._websocket = await websocket_connect(ws_url, additional_headers=headers) await self._call_event_handler("on_connected") except Exception as e: - await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e) + self._websocket = None + await self.push_error(error_msg=f"Unable to connect to Cartesia: {e}", exception=e) async def _disconnect_websocket(self): ws = self._websocket diff --git a/src/pipecat/services/elevenlabs/stt.py b/src/pipecat/services/elevenlabs/stt.py index 1e1d942cf..b7cdf8119 100644 --- a/src/pipecat/services/elevenlabs/stt.py +++ b/src/pipecat/services/elevenlabs/stt.py @@ -823,6 +823,7 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService): await self._call_event_handler("on_connected") logger.debug("Connected to ElevenLabs Realtime STT") except Exception as e: + self._websocket = None await self.push_error( error_msg=f"Unable to connect to ElevenLabs Realtime STT: {e}", exception=e ) diff --git a/src/pipecat/services/gladia/stt.py b/src/pipecat/services/gladia/stt.py index 26fea653c..0d788e234 100644 --- a/src/pipecat/services/gladia/stt.py +++ b/src/pipecat/services/gladia/stt.py @@ -558,8 +558,9 @@ class GladiaSTTService(WebsocketSTTService): logger.debug(f"{self} Connected to Gladia WebSocket") except Exception as e: + self._websocket = None + self._connection_active = False await self.push_error(error_msg=f"Unable to connect to Gladia: {e}", exception=e) - raise async def _disconnect_websocket(self): """Close the websocket connection to Gladia.""" diff --git a/src/pipecat/services/gradium/stt.py b/src/pipecat/services/gradium/stt.py index 3e5c954c2..64a77402d 100644 --- a/src/pipecat/services/gradium/stt.py +++ b/src/pipecat/services/gradium/stt.py @@ -423,8 +423,8 @@ class GradiumSTTService(WebsocketSTTService): logger.debug("Connected to Gradium STT") except Exception as e: - await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e) - raise + self._websocket = None + await self.push_error(error_msg=f"Unable to connect to Gradium: {e}", exception=e) async def _disconnect(self): await super()._disconnect() diff --git a/src/pipecat/services/soniox/stt.py b/src/pipecat/services/soniox/stt.py index 2a50581ce..823517aa1 100644 --- a/src/pipecat/services/soniox/stt.py +++ b/src/pipecat/services/soniox/stt.py @@ -537,8 +537,8 @@ class SonioxSTTService(WebsocketSTTService): await self._call_event_handler("on_connected") logger.debug("Connected to Soniox STT") except Exception as e: + self._websocket = None await self.push_error(error_msg=f"Unable to connect to Soniox: {e}", exception=e) - raise async def _disconnect_websocket(self): """Close the websocket connection to Soniox.""" diff --git a/src/pipecat/services/xai/stt.py b/src/pipecat/services/xai/stt.py index e219cfb6e..6bb32bb7e 100644 --- a/src/pipecat/services/xai/stt.py +++ b/src/pipecat/services/xai/stt.py @@ -293,8 +293,9 @@ class XAISTTService(WebsocketSTTService): await self._call_event_handler("on_connected") logger.debug(f"{self} connected to xAI STT WebSocket") except Exception as e: + self._websocket = None + self._session_ready.clear() await self.push_error(error_msg=f"Unable to connect to xAI STT: {e}", exception=e) - raise async def _disconnect_websocket(self): """Close the WebSocket connection.""" diff --git a/tests/test_cartesia_stt.py b/tests/test_cartesia_stt.py new file mode 100644 index 000000000..08563e711 --- /dev/null +++ b/tests/test_cartesia_stt.py @@ -0,0 +1,45 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +from unittest.mock import AsyncMock + +import pytest +from websockets.protocol import State + +from pipecat.services.cartesia.stt import CartesiaSTTService + + +class _FakeWebsocket: + def __init__(self, *, state=State.OPEN, send_side_effect=None): + self.state = state + self.send = AsyncMock(side_effect=send_side_effect) + + +@pytest.mark.asyncio +async def test_cartesia_connect_failure_clears_stale_websocket(monkeypatch): + async def fake_websocket_connect(*args, **kwargs): + raise RuntimeError("connection failed") + + monkeypatch.setattr("pipecat.services.cartesia.stt.websocket_connect", fake_websocket_connect) + + service = CartesiaSTTService(api_key="test-key", sample_rate=16000) + service._websocket = _FakeWebsocket(state=State.CLOSED) + + await service._connect_websocket() + + assert service._websocket is None + + +@pytest.mark.asyncio +async def test_cartesia_run_stt_logs_send_failure_without_clearing_websocket(): + service = CartesiaSTTService(api_key="test-key", sample_rate=16000) + websocket = _FakeWebsocket(send_side_effect=RuntimeError("websocket closed")) + service._websocket = websocket + + async for _ in service.run_stt(b"\x00" * 160): + pass + + assert service._websocket is websocket diff --git a/tests/test_soniox_stt.py b/tests/test_soniox_stt.py index e6d6713f5..99eaeb5c0 100644 --- a/tests/test_soniox_stt.py +++ b/tests/test_soniox_stt.py @@ -5,8 +5,10 @@ # import json +from unittest.mock import AsyncMock import pytest +from websockets.protocol import State from pipecat.frames.frames import TranscriptionFrame from pipecat.services.soniox.stt import END_TOKEN, SonioxSTTService, _language_from_tokens @@ -14,8 +16,10 @@ from pipecat.transcriptions.language import Language class _FakeWebsocket: - def __init__(self, messages): + def __init__(self, messages, *, state=State.OPEN, send_side_effect=None): self._messages = messages + self.state = state + self.send = AsyncMock(side_effect=send_side_effect) def __aiter__(self): return self._iter_messages() @@ -25,6 +29,21 @@ class _FakeWebsocket: yield message +@pytest.mark.asyncio +async def test_connect_failure_clears_stale_websocket_without_raising(monkeypatch): + async def fake_websocket_connect(*args, **kwargs): + raise RuntimeError("connection failed") + + monkeypatch.setattr("pipecat.services.soniox.stt.websocket_connect", fake_websocket_connect) + + service = SonioxSTTService(api_key="test-key") + service._websocket = _FakeWebsocket([], state=State.CLOSED) + + await service._connect_websocket() + + assert service._websocket is None + + def test_language_from_tokens_uses_single_recognized_language(): tokens = [ {"text": "Hello", "language": "en"}, From e2984910688b662727aac93f0abe31a022b7fc1c Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Mon, 18 May 2026 12:41:56 -0400 Subject: [PATCH 2/3] Add changelog for websocket STT failure handling --- changelog/4514.fixed.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog/4514.fixed.md diff --git a/changelog/4514.fixed.md b/changelog/4514.fixed.md new file mode 100644 index 000000000..d5fd46320 --- /dev/null +++ b/changelog/4514.fixed.md @@ -0,0 +1 @@ +- Fixed websocket STT connection setup failures so services clear stale websocket state and emit non-fatal error frames, allowing `ServiceSwitcher` failover to keep agents running. From 9586db5b50d91f45228f274a4689e8a9f02be000 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Wed, 20 May 2026 14:45:29 -0400 Subject: [PATCH 3/3] Preserve websocket reconnect failure retries --- src/pipecat/services/websocket_service.py | 4 +++- tests/test_websocket_service.py | 13 +++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/pipecat/services/websocket_service.py b/src/pipecat/services/websocket_service.py index d7d3cd505..5540f1be0 100644 --- a/src/pipecat/services/websocket_service.py +++ b/src/pipecat/services/websocket_service.py @@ -76,7 +76,9 @@ class WebsocketService(ABC): logger.warning(f"{self} reconnecting (attempt: {attempt_number})") await self._disconnect_websocket() await self._connect_websocket() - return await self._verify_connection() + if not await self._verify_connection(): + raise ConnectionError(f"{self} websocket reconnection failed verification") + return True async def _try_reconnect( self, diff --git a/tests/test_websocket_service.py b/tests/test_websocket_service.py index 2cffbce97..d5f17297b 100644 --- a/tests/test_websocket_service.py +++ b/tests/test_websocket_service.py @@ -165,6 +165,19 @@ async def test_reconnect_exhausted_emits_non_fatal_error(service, report_error): assert "Connection refused" in final_error.error +@pytest.mark.asyncio +async def test_reconnect_exhausted_when_connect_does_not_raise(service, report_error): + """A non-raising failed connect is treated as a failed reconnect attempt.""" + result = await service._try_reconnect(report_error=report_error) + + assert result is False + assert report_error.call_count == 4 + final_error = report_error.call_args_list[-1][0][0] + assert isinstance(final_error, ErrorFrame) + assert final_error.fatal is False + assert "websocket reconnection failed verification" in final_error.error + + # --------------------------------------------------------------------------- # Quick failure detection — accept then immediately close # ---------------------------------------------------------------------------