WebsocketService: handle clean server disconnection

The websocket async iterator doesn't raise an exception when the server
disconnects cleanly. We should handle that and raise an exception so we can
reconnect.
This commit is contained in:
Aleix Conchillo Flaqué
2025-02-14 09:54:03 -08:00
parent b2754bf208
commit e006dcf172
3 changed files with 17 additions and 5 deletions

View File

@@ -102,6 +102,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed a websocket-based service issue (e.g. `CartesiaTTSService`) that was
preventing a reconnection after the server disconnected cleanly, which was
causing an inifite loop instead.
- Fixed a `BaseOutputTransport` issue that was causing upstream frames to no be
pushed upstream.

View File

@@ -370,8 +370,13 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
def _get_websocket(self):
if self._websocket:
return self._websocket
raise Exception("Websocket not connected")
async def _receive_messages(self):
async for message in self._websocket:
async for message in self._get_websocket():
msg = json.loads(message)
if msg.get("audio"):
await self.stop_ttfb_metrics()

View File

@@ -10,6 +10,7 @@ from typing import Awaitable, Callable, Optional
import websockets
from loguru import logger
from websockets.protocol import State
from pipecat.frames.frames import ErrorFrame
@@ -83,11 +84,13 @@ class WebsocketService(ABC):
while True:
try:
await self._receive_messages()
logger.debug(f"{self} connection established successfully")
retry_count = 0 # Reset counter on successful message receive
except websockets.ConnectionClosed as e:
logger.warning(f"{self} connection closed: {e}")
break
if self._websocket and self._websocket.state == State.CLOSED:
raise websockets.ConnectionClosedOK(
self._websocket.close_rcvd,
self._websocket.close_sent,
self._websocket.close_rcvd_then_sent,
)
except Exception as e:
retry_count += 1
if retry_count >= MAX_RETRIES: