Improve WebsocketService docstrings

This commit is contained in:
Mark Backman
2025-06-25 17:48:43 -04:00
parent f80f62c7d1
commit bb3bb8d9c6

View File

@@ -4,6 +4,8 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Base websocket service with automatic reconnection and error handling."""
import asyncio
from abc import ABC, abstractmethod
from typing import Awaitable, Callable, Optional
@@ -17,18 +19,26 @@ from pipecat.utils.network import exponential_backoff_time
class WebsocketService(ABC):
"""Base class for websocket-based services with reconnection logic."""
"""Base class for websocket-based services with automatic reconnection.
Provides websocket connection management, automatic reconnection with
exponential backoff, connection verification, and error handling.
Subclasses implement service-specific connection and message handling logic.
Args:
reconnect_on_error: Whether to automatically reconnect on connection errors.
**kwargs: Additional arguments (unused, for compatibility).
"""
def __init__(self, *, reconnect_on_error: bool = True, **kwargs):
"""Initialize websocket attributes."""
self._websocket: Optional[websockets.WebSocketClientProtocol] = None
self._reconnect_on_error = reconnect_on_error
async def _verify_connection(self) -> bool:
"""Verify websocket connection is working.
"""Verify the websocket connection is active and responsive.
Returns:
bool: True if connection is verified working, False otherwise
True if connection is verified working, False otherwise.
"""
try:
if not self._websocket or self._websocket.closed:
@@ -40,13 +50,13 @@ class WebsocketService(ABC):
return False
async def _reconnect_websocket(self, attempt_number: int) -> bool:
"""Reconnect the websocket.
"""Reconnect the websocket with the current attempt number.
Args:
attempt_number: Current retry attempt number
attempt_number: Current retry attempt number for logging.
Returns:
bool: True if reconnection and verification successful, False otherwise
True if reconnection and verification successful, False otherwise.
"""
logger.warning(f"{self} reconnecting (attempt: {attempt_number})")
await self._disconnect_websocket()
@@ -54,10 +64,14 @@ class WebsocketService(ABC):
return await self._verify_connection()
async def _receive_task_handler(self, report_error: Callable[[ErrorFrame], Awaitable[None]]):
"""Handles WebSocket message receiving with automatic retry logic.
"""Handle websocket message receiving with automatic retry logic.
Continuously receives messages with automatic reconnection on errors.
Uses exponential backoff between retry attempts and reports fatal errors
after maximum retries are exhausted.
Args:
report_error: Callback to report errors
report_error: Callback function to report connection errors.
"""
retry_count = 0
MAX_RETRIES = 3
@@ -98,33 +112,45 @@ class WebsocketService(ABC):
@abstractmethod
async def _connect(self):
"""Implement service-specific connection logic. This function will
connect to the websocket via _connect_websocket() among other connection
logic."""
"""Connect to the service.
Implement service-specific connection logic including websocket connection
via _connect_websocket() and any additional setup required.
"""
pass
@abstractmethod
async def _disconnect(self):
"""Implement service-specific disconnection logic. This function will
disconnect to the websocket via _connect_websocket() among other
connection logic.
"""Disconnect from the service.
Implement service-specific disconnection logic including websocket
disconnection via _disconnect_websocket() and any cleanup required.
"""
pass
@abstractmethod
async def _connect_websocket(self):
"""Implement service-specific websocket connection logic. This function
should only connect to the websocket."""
"""Establish the websocket connection.
Implement the low-level websocket connection logic specific to the service.
Should only handle websocket connection, not additional service setup.
"""
pass
@abstractmethod
async def _disconnect_websocket(self):
"""Implement service-specific websocket disconnection logic. This
function should only disconnect from the websocket."""
"""Close the websocket connection.
Implement the low-level websocket disconnection logic specific to the service.
Should only handle websocket disconnection, not additional service cleanup.
"""
pass
@abstractmethod
async def _receive_messages(self):
"""Implement service-specific message receiving logic."""
"""Receive and process websocket messages.
Implement service-specific logic for receiving and handling messages
from the websocket connection. Called continuously by the receive task handler.
"""
pass