Fix ws error receive and send

This commit is contained in:
Xin Wang
2026-02-09 15:49:00 +08:00
parent fe05cf5d74
commit 8fd6daaed1
3 changed files with 34 additions and 4 deletions

View File

@@ -190,6 +190,12 @@ async def websocket_endpoint(websocket: WebSocket):
# Receive loop # Receive loop
while True: while True:
message = await websocket.receive() message = await websocket.receive()
message_type = message.get("type")
if message_type == "websocket.disconnect":
logger.info(f"WebSocket disconnected: {session_id}")
break
last_received_at[0] = time.monotonic() last_received_at[0] = time.monotonic()
# Handle binary audio data # Handle binary audio data

View File

@@ -1,5 +1,6 @@
"""Session management for active calls.""" """Session management for active calls."""
import asyncio
import uuid import uuid
import json import json
import time import time
@@ -78,6 +79,8 @@ class Session:
self._history_turn_index: int = 0 self._history_turn_index: int = 0
self._history_call_started_mono: Optional[float] = None self._history_call_started_mono: Optional[float] = None
self._history_finalized: bool = False self._history_finalized: bool = False
self._cleanup_lock = asyncio.Lock()
self._cleaned_up = False
self.pipeline.conversation.on_turn_complete(self._on_turn_complete) self.pipeline.conversation.on_turn_complete(self._on_turn_complete)
@@ -288,6 +291,11 @@ class Session:
async def cleanup(self) -> None: async def cleanup(self) -> None:
"""Cleanup session resources.""" """Cleanup session resources."""
async with self._cleanup_lock:
if self._cleaned_up:
return
self._cleaned_up = True
logger.info(f"Session {self.id} cleaning up") logger.info(f"Session {self.id} cleaning up")
await self._finalize_history(status="connected") await self._finalize_history(status="connected")
await self.pipeline.cleanup() await self.pipeline.cleanup()

View File

@@ -5,6 +5,7 @@ import json
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Optional from typing import Optional
from fastapi import WebSocket from fastapi import WebSocket
from starlette.websockets import WebSocketState
from loguru import logger from loguru import logger
# Try to import aiortc (optional for WebRTC functionality) # Try to import aiortc (optional for WebRTC functionality)
@@ -107,9 +108,24 @@ class SocketTransport(BaseTransport):
async def close(self) -> None: async def close(self) -> None:
"""Close the WebSocket connection.""" """Close the WebSocket connection."""
if self._closed:
return
self._closed = True self._closed = True
if (
self.ws.client_state == WebSocketState.DISCONNECTED
or self.ws.application_state == WebSocketState.DISCONNECTED
):
return
try: try:
await self.ws.close() await self.ws.close()
except RuntimeError as e:
# Already closed by another task/path; safe to ignore.
if "close message has been sent" in str(e):
logger.debug(f"WebSocket already closed: {e}")
return
logger.error(f"Error closing WebSocket: {e}")
except Exception as e: except Exception as e:
logger.error(f"Error closing WebSocket: {e}") logger.error(f"Error closing WebSocket: {e}")