From e006dcf1729d8df79787ed07623719afa874bfac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 14 Feb 2025 09:54:03 -0800 Subject: [PATCH] WebsocketService: handle clean server disconnection The websocket async iterator doesn't raise an exception when the server disconnects cleanly. We should handle that and raise an exception so we can reconnect. --- CHANGELOG.md | 4 ++++ src/pipecat/services/elevenlabs.py | 7 ++++++- src/pipecat/services/websocket_service.py | 11 +++++++---- 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ca7d08e3e..90c80da6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -102,6 +102,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed a websocket-based service issue (e.g. `CartesiaTTSService`) that was + preventing a reconnection after the server disconnected cleanly, which was + causing an inifite loop instead. + - Fixed a `BaseOutputTransport` issue that was causing upstream frames to no be pushed upstream. diff --git a/src/pipecat/services/elevenlabs.py b/src/pipecat/services/elevenlabs.py index e0b985411..b2047d34e 100644 --- a/src/pipecat/services/elevenlabs.py +++ b/src/pipecat/services/elevenlabs.py @@ -370,8 +370,13 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService): except Exception as e: logger.error(f"{self} error closing websocket: {e}") + def _get_websocket(self): + if self._websocket: + return self._websocket + raise Exception("Websocket not connected") + async def _receive_messages(self): - async for message in self._websocket: + async for message in self._get_websocket(): msg = json.loads(message) if msg.get("audio"): await self.stop_ttfb_metrics() diff --git a/src/pipecat/services/websocket_service.py b/src/pipecat/services/websocket_service.py index 9c942b019..c47d6d97e 100644 --- a/src/pipecat/services/websocket_service.py +++ b/src/pipecat/services/websocket_service.py @@ -10,6 +10,7 @@ from typing import Awaitable, Callable, Optional import websockets from loguru import logger +from websockets.protocol import State from pipecat.frames.frames import ErrorFrame @@ -83,11 +84,13 @@ class WebsocketService(ABC): while True: try: await self._receive_messages() - logger.debug(f"{self} connection established successfully") retry_count = 0 # Reset counter on successful message receive - except websockets.ConnectionClosed as e: - logger.warning(f"{self} connection closed: {e}") - break + if self._websocket and self._websocket.state == State.CLOSED: + raise websockets.ConnectionClosedOK( + self._websocket.close_rcvd, + self._websocket.close_sent, + self._websocket.close_rcvd_then_sent, + ) except Exception as e: retry_count += 1 if retry_count >= MAX_RETRIES: