diff --git a/tests/test_deepgram_stt.py b/tests/test_deepgram_stt.py index eb8036237..5956f9ec9 100644 --- a/tests/test_deepgram_stt.py +++ b/tests/test_deepgram_stt.py @@ -5,11 +5,12 @@ # import io +from unittest.mock import AsyncMock, MagicMock import pytest from loguru import logger -from pipecat.services.deepgram.stt import _derive_deepgram_urls +from pipecat.services.deepgram.stt import DeepgramSTTService, _derive_deepgram_urls @pytest.mark.parametrize( @@ -49,3 +50,38 @@ def test_derive_deepgram_urls_unknown_scheme_warns(): assert "Unrecognized scheme" in sink.getvalue() finally: logger.remove(handler_id) + + +@pytest.mark.asyncio +async def test_run_stt_send_media_exception_clears_connection(): + """send_media() failure should log a warning and clear self._connection.""" + service = DeepgramSTTService.__new__(DeepgramSTTService) + service._name = "DeepgramSTTService" + + mock_connection = MagicMock() + mock_connection.send_media = AsyncMock(side_effect=Exception("websocket closed")) + service._connection = mock_connection + + sink = io.StringIO() + handler_id = logger.add(sink, format="{message}") + try: + async for _ in service.run_stt(b"\x00" * 160): + pass + + assert service._connection is None + assert "send_media failed" in sink.getvalue() + finally: + logger.remove(handler_id) + + +@pytest.mark.asyncio +async def test_run_stt_skips_send_when_connection_is_none(): + """When self._connection is None, run_stt should silently skip.""" + service = DeepgramSTTService.__new__(DeepgramSTTService) + service._connection = None + + # Should not raise + async for _ in service.run_stt(b"\x00" * 160): + pass + + assert service._connection is None