From cfe91d11ecb7bf1b07c674ae9c0e3de041eaa653 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 26 Mar 2026 08:45:32 -0400 Subject: [PATCH 1/3] Handle Deepgram SDK 6.x send_media() exceptions Deepgram SDK 6.x surfaces connection errors from send_media() instead of silently swallowing them. This causes error floods when the WebSocket disconnects since every queued audio frame hits the dead connection. Wrap send_media() in try/except: on failure, log one warning and set self._connection = None so subsequent frames skip until the existing _connection_handler reconnects. --- src/pipecat/services/deepgram/stt.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/pipecat/services/deepgram/stt.py b/src/pipecat/services/deepgram/stt.py index 0fb891d61..63e7a05ba 100644 --- a/src/pipecat/services/deepgram/stt.py +++ b/src/pipecat/services/deepgram/stt.py @@ -577,7 +577,11 @@ class DeepgramSTTService(STTService): Frame: None (transcription results come via WebSocket callbacks). """ if self._connection: - await self._connection.send_media(audio) + try: + await self._connection.send_media(audio) + except Exception as e: + logger.warning(f"{self}: send_media failed, connection will reconnect: {e}") + self._connection = None yield None def _build_connect_kwargs(self) -> dict: From 259f5e124cbffc36f1f64c3303c9a8cd6a299389 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 26 Mar 2026 08:48:37 -0400 Subject: [PATCH 2/3] Add changelog for #4153 --- changelog/4153.fixed.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog/4153.fixed.md diff --git a/changelog/4153.fixed.md b/changelog/4153.fixed.md new file mode 100644 index 000000000..1e3dbebd0 --- /dev/null +++ b/changelog/4153.fixed.md @@ -0,0 +1 @@ +- Fixed error floods in `DeepgramSTTService` when the WebSocket connection drops. With Deepgram SDK 6.x, `send_media()` raises exceptions on a dead connection instead of silently failing, causing every queued audio frame to log an error. Now `send_media()` failures are caught gracefully — a single warning is logged and audio frames are skipped until the existing reconnection logic restores the connection. From c331c75d66fa8d4dcafaed09f3dd33a03a77b72f Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 26 Mar 2026 09:20:40 -0400 Subject: [PATCH 3/3] Add tests for send_media() exception handling in DeepgramSTTService --- tests/test_deepgram_stt.py | 38 +++++++++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/tests/test_deepgram_stt.py b/tests/test_deepgram_stt.py index eb8036237..5956f9ec9 100644 --- a/tests/test_deepgram_stt.py +++ b/tests/test_deepgram_stt.py @@ -5,11 +5,12 @@ # import io +from unittest.mock import AsyncMock, MagicMock import pytest from loguru import logger -from pipecat.services.deepgram.stt import _derive_deepgram_urls +from pipecat.services.deepgram.stt import DeepgramSTTService, _derive_deepgram_urls @pytest.mark.parametrize( @@ -49,3 +50,38 @@ def test_derive_deepgram_urls_unknown_scheme_warns(): assert "Unrecognized scheme" in sink.getvalue() finally: logger.remove(handler_id) + + +@pytest.mark.asyncio +async def test_run_stt_send_media_exception_clears_connection(): + """send_media() failure should log a warning and clear self._connection.""" + service = DeepgramSTTService.__new__(DeepgramSTTService) + service._name = "DeepgramSTTService" + + mock_connection = MagicMock() + mock_connection.send_media = AsyncMock(side_effect=Exception("websocket closed")) + service._connection = mock_connection + + sink = io.StringIO() + handler_id = logger.add(sink, format="{message}") + try: + async for _ in service.run_stt(b"\x00" * 160): + pass + + assert service._connection is None + assert "send_media failed" in sink.getvalue() + finally: + logger.remove(handler_id) + + +@pytest.mark.asyncio +async def test_run_stt_skips_send_when_connection_is_none(): + """When self._connection is None, run_stt should silently skip.""" + service = DeepgramSTTService.__new__(DeepgramSTTService) + service._connection = None + + # Should not raise + async for _ in service.run_stt(b"\x00" * 160): + pass + + assert service._connection is None