diff --git a/src/pipecat/services/websocket_service.py b/src/pipecat/services/websocket_service.py index d757946f9..6fc8efb2e 100644 --- a/src/pipecat/services/websocket_service.py +++ b/src/pipecat/services/websocket_service.py @@ -4,6 +4,8 @@ # SPDX-License-Identifier: BSD 2-Clause License # +"""Base websocket service with automatic reconnection and error handling.""" + import asyncio from abc import ABC, abstractmethod from typing import Awaitable, Callable, Optional @@ -17,18 +19,26 @@ from pipecat.utils.network import exponential_backoff_time class WebsocketService(ABC): - """Base class for websocket-based services with reconnection logic.""" + """Base class for websocket-based services with automatic reconnection. + + Provides websocket connection management, automatic reconnection with + exponential backoff, connection verification, and error handling. + Subclasses implement service-specific connection and message handling logic. + + Args: + reconnect_on_error: Whether to automatically reconnect on connection errors. + **kwargs: Additional arguments (unused, for compatibility). + """ def __init__(self, *, reconnect_on_error: bool = True, **kwargs): - """Initialize websocket attributes.""" self._websocket: Optional[websockets.WebSocketClientProtocol] = None self._reconnect_on_error = reconnect_on_error async def _verify_connection(self) -> bool: - """Verify websocket connection is working. + """Verify the websocket connection is active and responsive. Returns: - bool: True if connection is verified working, False otherwise + True if connection is verified working, False otherwise. """ try: if not self._websocket or self._websocket.closed: @@ -40,13 +50,13 @@ class WebsocketService(ABC): return False async def _reconnect_websocket(self, attempt_number: int) -> bool: - """Reconnect the websocket. + """Reconnect the websocket with the current attempt number. Args: - attempt_number: Current retry attempt number + attempt_number: Current retry attempt number for logging. Returns: - bool: True if reconnection and verification successful, False otherwise + True if reconnection and verification successful, False otherwise. """ logger.warning(f"{self} reconnecting (attempt: {attempt_number})") await self._disconnect_websocket() @@ -54,10 +64,14 @@ class WebsocketService(ABC): return await self._verify_connection() async def _receive_task_handler(self, report_error: Callable[[ErrorFrame], Awaitable[None]]): - """Handles WebSocket message receiving with automatic retry logic. + """Handle websocket message receiving with automatic retry logic. + + Continuously receives messages with automatic reconnection on errors. + Uses exponential backoff between retry attempts and reports fatal errors + after maximum retries are exhausted. Args: - report_error: Callback to report errors + report_error: Callback function to report connection errors. """ retry_count = 0 MAX_RETRIES = 3 @@ -98,33 +112,45 @@ class WebsocketService(ABC): @abstractmethod async def _connect(self): - """Implement service-specific connection logic. This function will - connect to the websocket via _connect_websocket() among other connection - logic.""" + """Connect to the service. + + Implement service-specific connection logic including websocket connection + via _connect_websocket() and any additional setup required. + """ pass @abstractmethod async def _disconnect(self): - """Implement service-specific disconnection logic. This function will - disconnect to the websocket via _connect_websocket() among other - connection logic. + """Disconnect from the service. + Implement service-specific disconnection logic including websocket + disconnection via _disconnect_websocket() and any cleanup required. """ pass @abstractmethod async def _connect_websocket(self): - """Implement service-specific websocket connection logic. This function - should only connect to the websocket.""" + """Establish the websocket connection. + + Implement the low-level websocket connection logic specific to the service. + Should only handle websocket connection, not additional service setup. + """ pass @abstractmethod async def _disconnect_websocket(self): - """Implement service-specific websocket disconnection logic. This - function should only disconnect from the websocket.""" + """Close the websocket connection. + + Implement the low-level websocket disconnection logic specific to the service. + Should only handle websocket disconnection, not additional service cleanup. + """ pass @abstractmethod async def _receive_messages(self): - """Implement service-specific message receiving logic.""" + """Receive and process websocket messages. + + Implement service-specific logic for receiving and handling messages + from the websocket connection. Called continuously by the receive task handler. + """ pass