From 0c6e12a9b04baec535506eeaddddb7a92e7c561b Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Wed, 30 Jul 2025 18:07:40 -0300 Subject: [PATCH] Fixed a race condition in `FastAPIWebsocketClient` that occurred when attempting to send a message while the client was disconnecting. --- CHANGELOG.md | 3 +++ .../transports/network/fastapi_websocket.py | 15 +++++++++++---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 941dae4ac..945503530 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -97,6 +97,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed a race condition in `FastAPIWebsocketClient` that occurred when attempting to + send a message while the client was disconnecting. + - Fixed an issue in `GoogleLLMService` where interruptions did not work when an interruption strategy was used. diff --git a/src/pipecat/transports/network/fastapi_websocket.py b/src/pipecat/transports/network/fastapi_websocket.py index c3f3a933b..a256134a7 100644 --- a/src/pipecat/transports/network/fastapi_websocket.py +++ b/src/pipecat/transports/network/fastapi_websocket.py @@ -132,8 +132,11 @@ class FastAPIWebsocketClient: f"{self} exception sending data: {e.__class__.__name__} ({e}), application_state: {self._websocket.application_state}" ) # For some reason the websocket is disconnected, and we are not able to send data - # So let's properly handle it and disconnect the transport - if self._websocket.application_state == WebSocketState.DISCONNECTED: + # So let's properly handle it and disconnect the transport if it is not already disconnecting + if ( + self._websocket.application_state == WebSocketState.DISCONNECTED + and not self.is_closing + ): logger.warning("Closing already disconnected websocket!") self._closing = True await self.trigger_client_disconnected() @@ -146,8 +149,12 @@ class FastAPIWebsocketClient: if self.is_connected and not self.is_closing: self._closing = True - await self._websocket.close() - await self.trigger_client_disconnected() + try: + await self._websocket.close() + except Exception as e: + logger.error(f"{self} exception while closing the websocket: {e}") + finally: + await self.trigger_client_disconnected() async def trigger_client_disconnected(self): """Trigger the client disconnected callback."""