Merge pull request #1222 from pipecat-ai/aleix/websocket-service-handle-clean-disconnection

WebsocketService: handle clean server disconnection
This commit is contained in:
Aleix Conchillo Flaqué
2025-02-14 10:33:54 -08:00
committed by GitHub
3 changed files with 17 additions and 5 deletions

View File

@@ -102,6 +102,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed a websocket-based service issue (e.g. `CartesiaTTSService`) that was
preventing a reconnection after the server disconnected cleanly, which was
causing an inifite loop instead.
- Fixed a `BaseOutputTransport` issue that was causing upstream frames to no be
pushed upstream.

View File

@@ -370,8 +370,13 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
def _get_websocket(self):
if self._websocket:
return self._websocket
raise Exception("Websocket not connected")
async def _receive_messages(self):
async for message in self._websocket:
async for message in self._get_websocket():
msg = json.loads(message)
if msg.get("audio"):
await self.stop_ttfb_metrics()

View File

@@ -10,6 +10,7 @@ from typing import Awaitable, Callable, Optional
import websockets
from loguru import logger
from websockets.protocol import State
from pipecat.frames.frames import ErrorFrame
@@ -83,11 +84,13 @@ class WebsocketService(ABC):
while True:
try:
await self._receive_messages()
logger.debug(f"{self} connection established successfully")
retry_count = 0 # Reset counter on successful message receive
except websockets.ConnectionClosed as e:
logger.warning(f"{self} connection closed: {e}")
break
if self._websocket and self._websocket.state == State.CLOSED:
raise websockets.ConnectionClosedOK(
self._websocket.close_rcvd,
self._websocket.close_sent,
self._websocket.close_rcvd_then_sent,
)
except Exception as e:
retry_count += 1
if retry_count >= MAX_RETRIES: