From 300f19ad237b463ccd35acdaa77e1e290fdb3eca Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Tue, 22 Jul 2025 17:23:55 -0400 Subject: [PATCH] Port to the websockets asyncio implementation, support for websockets 13 and 14 --- CHANGELOG.md | 6 +++--- src/pipecat/services/assemblyai/stt.py | 3 ++- src/pipecat/services/aws/stt.py | 14 +++++++------ src/pipecat/services/cartesia/stt.py | 20 +++++++++++++------ src/pipecat/services/cartesia/tts.py | 9 +++++---- src/pipecat/services/elevenlabs/tts.py | 10 ++++++---- src/pipecat/services/fish/tts.py | 11 +++++----- .../services/gemini_multimodal_live/gemini.py | 4 ++-- src/pipecat/services/gladia/stt.py | 12 ++++++----- src/pipecat/services/lmnt/tts.py | 11 +++++----- src/pipecat/services/neuphonic/tts.py | 9 +++++---- .../services/openai_realtime_beta/azure.py | 4 ++-- .../services/openai_realtime_beta/openai.py | 4 ++-- src/pipecat/services/playht/tts.py | 9 +++++---- src/pipecat/services/rime/tts.py | 9 +++++---- src/pipecat/services/soniox/stt.py | 12 ++++++----- src/pipecat/services/websocket_service.py | 4 ++-- .../transports/network/websocket_client.py | 3 ++- .../transports/network/websocket_server.py | 8 +++++--- 19 files changed, 94 insertions(+), 68 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ad44f34b..cccecd034 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,9 +25,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 `tasks cancelled error` to a debug log. This removes the log from appearing in Pipecat logs upon leaving. -- The `websockets` dependency for a number of packages was updated to support - websockets>=13.1.0 and <15.0.0. This change provides greater compatibility - across Pipecat's packages. +- Upgraded the `websockets` implementation to the new asyncio implementation. + Along with this change, we're updating support for versions >=13.1.0 and + <15.0.0. All services have been update to use the asyncio implementation. - Updated `MiniMaxHttpTTSService` with a `base_url` arg where you can specify the Global endpoint (default) or Mainland China. diff --git a/src/pipecat/services/assemblyai/stt.py b/src/pipecat/services/assemblyai/stt.py index 6068c6d7e..d601b3ad5 100644 --- a/src/pipecat/services/assemblyai/stt.py +++ b/src/pipecat/services/assemblyai/stt.py @@ -44,6 +44,7 @@ from .models import ( try: import websockets + from websockets.asyncio.client import connect as websocket_connect except ModuleNotFoundError as e: logger.error(f"Exception: {e}") logger.error('In order to use AssemblyAI, you need to `pip install "pipecat-ai[assemblyai]"`.') @@ -190,7 +191,7 @@ class AssemblyAISTTService(STTService): "Authorization": self._api_key, "User-Agent": f"AssemblyAI/1.0 (integration=Pipecat/{pipecat_version})", } - self._websocket = await websockets.connect( + self._websocket = await websocket_connect( ws_url, additional_headers=headers, ) diff --git a/src/pipecat/services/aws/stt.py b/src/pipecat/services/aws/stt.py index 2e7beb3a3..57cd391b6 100644 --- a/src/pipecat/services/aws/stt.py +++ b/src/pipecat/services/aws/stt.py @@ -36,6 +36,8 @@ from pipecat.utils.tracing.service_decorators import traced_stt try: import websockets + from websockets.asyncio.client import connect as websocket_connect + from websockets.protocol import State except ModuleNotFoundError as e: logger.error(f"Exception: {e}") logger.error("In order to use AWS services, you need to `pip install pipecat-ai[aws]`.") @@ -133,7 +135,7 @@ class AWSTranscribeSTTService(STTService): while retry_count < max_retries: try: await self._connect() - if self._ws_client and self._ws_client.open: + if self._ws_client and self._ws_client.state is State.OPEN: logger.info("Successfully established WebSocket connection") return logger.warning("WebSocket connection not established after connect") @@ -174,7 +176,7 @@ class AWSTranscribeSTTService(STTService): """ try: # Ensure WebSocket is connected - if not self._ws_client or not self._ws_client.open: + if not self._ws_client or self._ws_client.state is State.CLOSED: logger.debug("WebSocket not connected, attempting to reconnect...") try: await self._connect() @@ -208,7 +210,7 @@ class AWSTranscribeSTTService(STTService): async def _connect(self): """Connect to AWS Transcribe with connection state management.""" - if self._ws_client and self._ws_client.open and self._receive_task: + if self._ws_client and self._ws_client.state is State.OPEN and self._receive_task: logger.debug(f"{self} Already connected") return @@ -268,7 +270,7 @@ class AWSTranscribeSTTService(STTService): logger.debug(f"{self} Connecting to WebSocket with URL: {presigned_url[:100]}...") # Connect with the required headers and settings - self._ws_client = await websockets.connect( + self._ws_client = await websocket_connect( presigned_url, additional_headers=additional_headers, subprotocols=["mqtt"], @@ -299,7 +301,7 @@ class AWSTranscribeSTTService(STTService): self._receive_task = None try: - if self._ws_client and self._ws_client.open: + if self._ws_client and self._ws_client.state is State.OPEN: # Send end-stream message end_stream = {"message-type": "event", "event": "end"} await self._ws_client.send(json.dumps(end_stream)) @@ -341,7 +343,7 @@ class AWSTranscribeSTTService(STTService): async def _receive_loop(self): """Background task to receive and process messages from AWS Transcribe.""" while True: - if not self._ws_client or not self._ws_client.open: + if not self._ws_client or self._ws_client.state is State.CLOSED: logger.warning(f"{self} WebSocket closed in receive loop") break diff --git a/src/pipecat/services/cartesia/stt.py b/src/pipecat/services/cartesia/stt.py index 77d7c9f53..5412c422c 100644 --- a/src/pipecat/services/cartesia/stt.py +++ b/src/pipecat/services/cartesia/stt.py @@ -15,7 +15,6 @@ import json import urllib.parse from typing import AsyncGenerator, Optional -import websockets from loguru import logger from pipecat.frames.frames import ( @@ -34,6 +33,15 @@ from pipecat.transcriptions.language import Language from pipecat.utils.time import time_now_iso8601 from pipecat.utils.tracing.service_decorators import traced_stt +try: + import websockets + from websockets.asyncio.client import connect as websocket_connect + from websockets.protocol import State +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error("In order to use Cartesia, you need to `pip install pipecat-ai[cartesia]`.") + raise Exception(f"Missing module: {e}") + class CartesiaLiveOptions: """Configuration options for Cartesia Live STT service. @@ -216,7 +224,7 @@ class CartesiaSTTService(STTService): None - transcription results are handled via WebSocket responses. """ # If the connection is closed, due to timeout, we need to reconnect when the user starts speaking again - if not self._connection or self._connection.closed: + if not self._connection or self._connection.state is State.CLOSED: await self._connect() await self._connection.send(audio) @@ -229,7 +237,7 @@ class CartesiaSTTService(STTService): headers = {"Cartesia-Version": "2025-04-16", "X-API-Key": self._api_key} try: - self._connection = await websockets.connect(ws_url, additional_headers=headers) + self._connection = await websocket_connect(ws_url, additional_headers=headers) # Setup the receiver task to handle the incoming messages from the Cartesia server if self._receiver_task is None or self._receiver_task.done(): self._receiver_task = asyncio.create_task(self._receive_messages()) @@ -240,7 +248,7 @@ class CartesiaSTTService(STTService): async def _receive_messages(self): try: while True: - if not self._connection or self._connection.closed: + if not self._connection or self._connection.state is State.CLOSED: break message = await self._connection.recv() @@ -320,7 +328,7 @@ class CartesiaSTTService(STTService): logger.exception(f"Unexpected exception while cancelling task: {e}") self._receiver_task = None - if self._connection and self._connection.open: + if self._connection and self._connection.state is State.OPEN: logger.debug("Disconnecting from Cartesia") await self._connection.close() @@ -344,5 +352,5 @@ class CartesiaSTTService(STTService): await self.start_metrics() elif isinstance(frame, UserStoppedSpeakingFrame): # Send finalize command to flush the transcription session - if self._connection and self._connection.open: + if self._connection and self._connection.state is State.OPEN: await self._connection.send("finalize") diff --git a/src/pipecat/services/cartesia/tts.py b/src/pipecat/services/cartesia/tts.py index 09c0f0b19..c4ea7c31c 100644 --- a/src/pipecat/services/cartesia/tts.py +++ b/src/pipecat/services/cartesia/tts.py @@ -36,8 +36,9 @@ from pipecat.utils.tracing.service_decorators import traced_tts # See .env.example for Cartesia configuration needed try: - import websockets from cartesia import AsyncCartesia + from websockets.asyncio.client import connect as websocket_connect + from websockets.protocol import State except ModuleNotFoundError as e: logger.error(f"Exception: {e}") logger.error("In order to use Cartesia, you need to `pip install pipecat-ai[cartesia]`.") @@ -288,10 +289,10 @@ class CartesiaTTSService(AudioContextWordTTSService): async def _connect_websocket(self): try: - if self._websocket and self._websocket.open: + if self._websocket and self._websocket.state is State.OPEN: return logger.debug("Connecting to Cartesia") - self._websocket = await websockets.connect( + self._websocket = await websocket_connect( f"{self._url}?api_key={self._api_key}&cartesia_version={self._cartesia_version}" ) except Exception as e: @@ -380,7 +381,7 @@ class CartesiaTTSService(AudioContextWordTTSService): logger.debug(f"{self}: Generating TTS [{text}]") try: - if not self._websocket or self._websocket.closed: + if not self._websocket or self._websocket.state is State.CLOSED: await self._connect() if not self._context_id: diff --git a/src/pipecat/services/elevenlabs/tts.py b/src/pipecat/services/elevenlabs/tts.py index 2ba2bd2cb..c921c3f4b 100644 --- a/src/pipecat/services/elevenlabs/tts.py +++ b/src/pipecat/services/elevenlabs/tts.py @@ -44,6 +44,8 @@ from pipecat.utils.tracing.service_decorators import traced_tts # See .env.example for ElevenLabs configuration needed try: import websockets + from websockets.asyncio.client import connect as websocket_connect + from websockets.protocol import State except ModuleNotFoundError as e: logger.error(f"Exception: {e}") logger.error("In order to use ElevenLabs, you need to `pip install pipecat-ai[elevenlabs]`.") @@ -447,7 +449,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService): async def _connect_websocket(self): try: - if self._websocket and self._websocket.open: + if self._websocket and self._websocket.state is State.OPEN: return logger.debug("Connecting to ElevenLabs") @@ -474,7 +476,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService): ) # Set max websocket message size to 16MB for large audio responses - self._websocket = await websockets.connect( + self._websocket = await websocket_connect( url, max_size=16 * 1024 * 1024, additional_headers={"xi-api-key": self._api_key} ) @@ -587,7 +589,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService): self.reset_watchdog() await asyncio.sleep(KEEPALIVE_SLEEP) try: - if self._websocket and self._websocket.open: + if self._websocket and self._websocket.state is State.OPEN: if self._context_id: # Send keepalive with context ID to keep the connection alive keepalive_message = { @@ -625,7 +627,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService): logger.debug(f"{self}: Generating TTS [{text}]") try: - if not self._websocket or self._websocket.closed: + if not self._websocket or self._websocket.state is State.CLOSED: await self._connect() try: diff --git a/src/pipecat/services/fish/tts.py b/src/pipecat/services/fish/tts.py index 1f0463f94..53f6cb171 100644 --- a/src/pipecat/services/fish/tts.py +++ b/src/pipecat/services/fish/tts.py @@ -34,7 +34,8 @@ from pipecat.utils.tracing.service_decorators import traced_tts try: import ormsgpack - import websockets + from websockets.asyncio.client import connect as websocket_connect + from websockets.protocol import State except ModuleNotFoundError as e: logger.error(f"Exception: {e}") logger.error("In order to use Fish Audio, you need to `pip install pipecat-ai[fish]`.") @@ -210,13 +211,13 @@ class FishAudioTTSService(InterruptibleTTSService): async def _connect_websocket(self): try: - if self._websocket and self._websocket.open: + if self._websocket and self._websocket.state is State.OPEN: return logger.debug("Connecting to Fish Audio") headers = {"Authorization": f"Bearer {self._api_key}"} headers["model"] = self.model_name - self._websocket = await websockets.connect(self._base_url, additional_headers=headers) + self._websocket = await websocket_connect(self._base_url, additional_headers=headers) # Send initial start message with ormsgpack start_message = {"event": "start", "request": {"text": "", **self._settings}} @@ -246,7 +247,7 @@ class FishAudioTTSService(InterruptibleTTSService): async def flush_audio(self): """Flush any buffered audio by sending a flush event to Fish Audio.""" logger.trace(f"{self}: Flushing audio buffers") - if not self._websocket or self._websocket.closed: + if not self._websocket or self._websocket.state is State.CLOSED: return flush_message = {"event": "flush"} await self._get_websocket().send(ormsgpack.packb(flush_message)) @@ -292,7 +293,7 @@ class FishAudioTTSService(InterruptibleTTSService): """ logger.debug(f"{self}: Generating Fish TTS: [{text}]") try: - if not self._websocket or self._websocket.closed: + if not self._websocket or self._websocket.state is State.CLOSED: await self._connect() if not self._request_id: diff --git a/src/pipecat/services/gemini_multimodal_live/gemini.py b/src/pipecat/services/gemini_multimodal_live/gemini.py index 5f8747da0..d1ae3ee33 100644 --- a/src/pipecat/services/gemini_multimodal_live/gemini.py +++ b/src/pipecat/services/gemini_multimodal_live/gemini.py @@ -75,7 +75,7 @@ from . import events from .file_api import GeminiFileAPI try: - import websockets + from websockets.asyncio.client import connect as websocket_connect except ModuleNotFoundError as e: logger.error(f"Exception: {e}") logger.error("In order to use Google AI, you need to `pip install pipecat-ai[google]`.") @@ -791,7 +791,7 @@ class GeminiMultimodalLiveLLMService(LLMService): try: logger.info(f"Connecting to wss://{self._base_url}") uri = f"wss://{self._base_url}?key={self._api_key}" - self._websocket = await websockets.connect(uri=uri) + self._websocket = await websocket_connect(uri=uri) self._receive_task = self.create_task(self._receive_task_handler()) # Create the basic configuration diff --git a/src/pipecat/services/gladia/stt.py b/src/pipecat/services/gladia/stt.py index a92fd5aef..cef89f1c0 100644 --- a/src/pipecat/services/gladia/stt.py +++ b/src/pipecat/services/gladia/stt.py @@ -37,6 +37,8 @@ from pipecat.utils.tracing.service_decorators import traced_stt try: import websockets + from websockets.asyncio.client import connect as websocket_connect + from websockets.protocol import State except ModuleNotFoundError as e: logger.error(f"Exception: {e}") logger.error("In order to use Gladia, you need to `pip install pipecat-ai[gladia]`.") @@ -402,7 +404,7 @@ class GladiaSTTService(STTService): logger.warning(f"Audio buffer exceeded max size, trimmed {trim_size} bytes") # Send audio if connected - if self._connection_active and self._websocket and not self._websocket.closed: + if self._connection_active and self._websocket and self._websocket.state is State.OPEN: try: await self._send_audio(audio) except websockets.exceptions.ConnectionClosed as e: @@ -423,7 +425,7 @@ class GladiaSTTService(STTService): self._reconnection_attempts = 0 # Connect with automatic reconnection - async with websockets.connect(self._session_url) as websocket: + async with websocket_connect(self._session_url) as websocket: try: self._websocket = websocket self._connection_active = True @@ -507,7 +509,7 @@ class GladiaSTTService(STTService): async def _send_audio(self, audio: bytes): """Send audio chunk with proper message format.""" - if self._websocket and not self._websocket.closed: + if self._websocket and self._websocket.state is State.OPEN: data = base64.b64encode(audio).decode("utf-8") message = {"type": "audio_chunk", "data": {"chunk": data}} await self._websocket.send(json.dumps(message)) @@ -520,7 +522,7 @@ class GladiaSTTService(STTService): await self._send_audio(bytes(self._audio_buffer)) async def _send_stop_recording(self): - if self._websocket and not self._websocket.closed: + if self._websocket and self._websocket.state is State.OPEN: await self._websocket.send(json.dumps({"type": "stop_recording"})) async def _keepalive_task_handler(self): @@ -531,7 +533,7 @@ class GladiaSTTService(STTService): self.reset_watchdog() # Send keepalive (Gladia times out after 30 seconds) await asyncio.sleep(KEEPALIVE_SLEEP) - if self._websocket and not self._websocket.closed: + if self._websocket and self._websocket.state is State.OPEN: # Send an empty audio chunk as keepalive empty_audio = b"" await self._send_audio(empty_audio) diff --git a/src/pipecat/services/lmnt/tts.py b/src/pipecat/services/lmnt/tts.py index 29dfc05b3..187ef6b84 100644 --- a/src/pipecat/services/lmnt/tts.py +++ b/src/pipecat/services/lmnt/tts.py @@ -29,7 +29,8 @@ from pipecat.utils.tracing.service_decorators import traced_tts # See .env.example for LMNT configuration needed try: - import websockets + from websockets.asyncio.client import connect as websocket_connect + from websockets.protocol import State except ModuleNotFoundError as e: logger.error(f"Exception: {e}") logger.error("In order to use LMNT, you need to `pip install pipecat-ai[lmnt]`.") @@ -200,7 +201,7 @@ class LmntTTSService(InterruptibleTTSService): async def _connect_websocket(self): """Connect to LMNT websocket.""" try: - if self._websocket and self._websocket.open: + if self._websocket and self._websocket.state is State.OPEN: return logger.debug("Connecting to LMNT") @@ -216,7 +217,7 @@ class LmntTTSService(InterruptibleTTSService): } # Connect to LMNT's websocket directly - self._websocket = await websockets.connect("wss://api.lmnt.com/v1/ai/speech/stream") + self._websocket = await websocket_connect("wss://api.lmnt.com/v1/ai/speech/stream") # Send initialization message await self._websocket.send(json.dumps(init_msg)) @@ -251,7 +252,7 @@ class LmntTTSService(InterruptibleTTSService): async def flush_audio(self): """Flush any pending audio synthesis.""" - if not self._websocket or self._websocket.closed: + if not self._websocket or self._websocket.state is State.CLOSED: return await self._get_websocket().send(json.dumps({"flush": True})) @@ -292,7 +293,7 @@ class LmntTTSService(InterruptibleTTSService): logger.debug(f"{self}: Generating TTS [{text}]") try: - if not self._websocket or self._websocket.closed: + if not self._websocket or self._websocket.state is State.CLOSED: await self._connect() try: diff --git a/src/pipecat/services/neuphonic/tts.py b/src/pipecat/services/neuphonic/tts.py index d7a32aac8..9472a4197 100644 --- a/src/pipecat/services/neuphonic/tts.py +++ b/src/pipecat/services/neuphonic/tts.py @@ -39,8 +39,9 @@ from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator from pipecat.utils.tracing.service_decorators import traced_tts try: - import websockets from pyneuphonic import Neuphonic, TTSConfig + from websockets.asyncio.client import connect as websocket_connect + from websockets.protocol import State except ModuleNotFoundError as e: logger.error(f"Exception: {e}") logger.error("In order to use Neuphonic, you need to `pip install pipecat-ai[neuphonic]`.") @@ -271,7 +272,7 @@ class NeuphonicTTSService(InterruptibleTTSService): async def _connect_websocket(self): """Establish WebSocket connection to Neuphonic API.""" try: - if self._websocket and self._websocket.open: + if self._websocket and self._websocket.state is State.OPEN: return logger.debug("Connecting to Neuphonic") @@ -292,7 +293,7 @@ class NeuphonicTTSService(InterruptibleTTSService): headers = {"x-api-key": self._api_key} - self._websocket = await websockets.connect(url, additional_headers=headers) + self._websocket = await websocket_connect(url, additional_headers=headers) except Exception as e: logger.error(f"{self} initialization error: {e}") self._websocket = None @@ -359,7 +360,7 @@ class NeuphonicTTSService(InterruptibleTTSService): logger.debug(f"Generating TTS: [{text}]") try: - if not self._websocket or self._websocket.closed: + if not self._websocket or self._websocket.state is State.CLOSED: await self._connect() try: diff --git a/src/pipecat/services/openai_realtime_beta/azure.py b/src/pipecat/services/openai_realtime_beta/azure.py index d7727c5d8..44bb9ed0c 100644 --- a/src/pipecat/services/openai_realtime_beta/azure.py +++ b/src/pipecat/services/openai_realtime_beta/azure.py @@ -11,7 +11,7 @@ from loguru import logger from .openai import OpenAIRealtimeBetaLLMService try: - import websockets + from websockets.asyncio.client import connect as websocket_connect except ModuleNotFoundError as e: logger.error(f"Exception: {e}") logger.error( @@ -55,7 +55,7 @@ class AzureRealtimeBetaLLMService(OpenAIRealtimeBetaLLMService): return logger.info(f"Connecting to {self.base_url}, api key: {self.api_key}") - self._websocket = await websockets.connect( + self._websocket = await websocket_connect( uri=self.base_url, additional_headers={ "api-key": self.api_key, diff --git a/src/pipecat/services/openai_realtime_beta/openai.py b/src/pipecat/services/openai_realtime_beta/openai.py index b153d1570..03b3d4938 100644 --- a/src/pipecat/services/openai_realtime_beta/openai.py +++ b/src/pipecat/services/openai_realtime_beta/openai.py @@ -66,7 +66,7 @@ from .context import ( from .frames import RealtimeFunctionCallResultFrame, RealtimeMessagesUpdateFrame try: - import websockets + from websockets.asyncio.client import connect as websocket_connect except ModuleNotFoundError as e: logger.error(f"Exception: {e}") logger.error("In order to use OpenAI, you need to `pip install pipecat-ai[openai]`.") @@ -387,7 +387,7 @@ class OpenAIRealtimeBetaLLMService(LLMService): # Here we assume that if we have a websocket, we are connected. We # handle disconnections in the send/recv code paths. return - self._websocket = await websockets.connect( + self._websocket = await websocket_connect( uri=self.base_url, additional_headers={ "Authorization": f"Bearer {self.api_key}", diff --git a/src/pipecat/services/playht/tts.py b/src/pipecat/services/playht/tts.py index 65c9fd41e..f7d0574ed 100644 --- a/src/pipecat/services/playht/tts.py +++ b/src/pipecat/services/playht/tts.py @@ -17,7 +17,6 @@ import uuid from typing import AsyncGenerator, Optional import aiohttp -import websockets from loguru import logger from pydantic import BaseModel @@ -41,6 +40,8 @@ try: from pyht.async_client import AsyncClient from pyht.client import Format, TTSOptions from pyht.client import Language as PlayHTLanguage + from websockets.asyncio.client import connect as websocket_connect + from websockets.protocol import State except ModuleNotFoundError as e: logger.error(f"Exception: {e}") logger.error("In order to use PlayHT, you need to `pip install pipecat-ai[playht]`.") @@ -244,7 +245,7 @@ class PlayHTTTSService(InterruptibleTTSService): async def _connect_websocket(self): """Connect to PlayHT websocket.""" try: - if self._websocket and self._websocket.open: + if self._websocket and self._websocket.state is State.OPEN: return logger.debug("Connecting to PlayHT") @@ -255,7 +256,7 @@ class PlayHTTTSService(InterruptibleTTSService): if not isinstance(self._websocket_url, str): raise ValueError("WebSocket URL is not a string") - self._websocket = await websockets.connect(self._websocket_url) + self._websocket = await websocket_connect(self._websocket_url) except ValueError as e: logger.error(f"{self} initialization error: {e}") self._websocket = None @@ -362,7 +363,7 @@ class PlayHTTTSService(InterruptibleTTSService): try: # Reconnect if the websocket is closed - if not self._websocket or self._websocket.closed: + if not self._websocket or self._websocket.state is State.CLOSED: await self._connect() if not self._request_id: diff --git a/src/pipecat/services/rime/tts.py b/src/pipecat/services/rime/tts.py index 1b2e66dab..a4f0ffda6 100644 --- a/src/pipecat/services/rime/tts.py +++ b/src/pipecat/services/rime/tts.py @@ -39,7 +39,8 @@ from pipecat.utils.text.skip_tags_aggregator import SkipTagsAggregator from pipecat.utils.tracing.service_decorators import traced_tts try: - import websockets + from websockets.asyncio.client import connect as websocket_connect + from websockets.protocol import State except ModuleNotFoundError as e: logger.error(f"Exception: {e}") logger.error("In order to use Rime, you need to `pip install pipecat-ai[rime]`.") @@ -238,13 +239,13 @@ class RimeTTSService(AudioContextWordTTSService): async def _connect_websocket(self): """Connect to Rime websocket API with configured settings.""" try: - if self._websocket and self._websocket.open: + if self._websocket and self._websocket.state is State.OPEN: return params = "&".join(f"{k}={v}" for k, v in self._settings.items()) url = f"{self._url}?{params}" headers = {"Authorization": f"Bearer {self._api_key}"} - self._websocket = await websockets.connect(url, additional_headers=headers) + self._websocket = await websocket_connect(url, additional_headers=headers) except Exception as e: logger.error(f"{self} initialization error: {e}") self._websocket = None @@ -380,7 +381,7 @@ class RimeTTSService(AudioContextWordTTSService): """ logger.debug(f"{self}: Generating TTS [{text}]") try: - if not self._websocket or self._websocket.closed: + if not self._websocket or self._websocket.state is State.CLOSED: await self._connect() try: diff --git a/src/pipecat/services/soniox/stt.py b/src/pipecat/services/soniox/stt.py index f51653876..d62da9ace 100644 --- a/src/pipecat/services/soniox/stt.py +++ b/src/pipecat/services/soniox/stt.py @@ -32,6 +32,8 @@ from pipecat.utils.tracing.service_decorators import traced_stt try: import websockets + from websockets.asyncio.client import connect as websocket_connect + from websockets.protocol import State except ModuleNotFoundError as e: logger.error(f"Exception: {e}") logger.error("In order to use Soniox, you need to `pip install pipecat-ai[soniox]`.") @@ -162,7 +164,7 @@ class SonioxSTTService(STTService): if self._websocket: return - self._websocket = await websockets.connect(self._url) + self._websocket = await websocket_connect(self._url) if not self._websocket: logger.error(f"Unable to connect to Soniox API at {self._url}") @@ -244,7 +246,7 @@ class SonioxSTTService(STTService): Frame: None (transcription results come via WebSocket callbacks). """ await self.start_processing_metrics() - if self._websocket and not self._websocket.closed: + if self._websocket and self._websocket.state is State.OPEN: await self._websocket.send(audio) await self.stop_processing_metrics() @@ -268,13 +270,13 @@ class SonioxSTTService(STTService): if isinstance(frame, UserStoppedSpeakingFrame) and self._vad_force_turn_endpoint: # Send finalize message to Soniox so we get the final tokens asap. - if self._websocket and not self._websocket.closed: + if self._websocket and self._websocket.state is State.OPEN: await self._websocket.send(FINALIZE_MESSAGE) logger.debug(f"Triggered finalize event on: {frame.name=}, {direction=}") async def _send_stop_recording(self): """Send stop recording message to Soniox.""" - if self._websocket and not self._websocket.closed: + if self._websocket and self._websocket.state is State.OPEN: # Send stop recording message await self._websocket.send("") @@ -283,7 +285,7 @@ class SonioxSTTService(STTService): try: while True: logger.debug("Sending keepalive message") - if self._websocket and not self._websocket.closed: + if self._websocket and self._websocket.state is State.OPEN: await self._websocket.send(KEEPALIVE_MESSAGE) else: logger.debug("WebSocket connection closed.") diff --git a/src/pipecat/services/websocket_service.py b/src/pipecat/services/websocket_service.py index a58a654d9..ff4f67dd3 100644 --- a/src/pipecat/services/websocket_service.py +++ b/src/pipecat/services/websocket_service.py @@ -43,7 +43,7 @@ class WebsocketService(ABC): True if connection is verified working, False otherwise. """ try: - if not self._websocket or self._websocket.closed: + if not self._websocket or self._websocket.state is State.CLOSED: return False await self._websocket.ping() return True @@ -82,7 +82,7 @@ class WebsocketService(ABC): try: await self._receive_messages() retry_count = 0 # Reset counter on successful message receive - if self._websocket and self._websocket.state == State.CLOSED: + if self._websocket and self._websocket.state is State.CLOSED: raise websockets.ConnectionClosedOK( self._websocket.close_rcvd, self._websocket.close_sent, diff --git a/src/pipecat/transports/network/websocket_client.py b/src/pipecat/transports/network/websocket_client.py index f0746a589..d141b52f3 100644 --- a/src/pipecat/transports/network/websocket_client.py +++ b/src/pipecat/transports/network/websocket_client.py @@ -20,6 +20,7 @@ from typing import Awaitable, Callable, Optional import websockets from loguru import logger from pydantic.main import BaseModel +from websockets.asyncio.client import connect as websocket_connect from pipecat.frames.frames import ( CancelFrame, @@ -129,7 +130,7 @@ class WebsocketClientSession: return try: - self._websocket = await websockets.connect(uri=self._uri, open_timeout=10) + self._websocket = await websocket_connect(uri=self._uri, open_timeout=10) self._client_task = self.task_manager.create_task( self._client_task_handler(), f"{self._transport_name}::WebsocketClientSession::_client_task_handler", diff --git a/src/pipecat/transports/network/websocket_server.py b/src/pipecat/transports/network/websocket_server.py index dbe418d3b..e957b01be 100644 --- a/src/pipecat/transports/network/websocket_server.py +++ b/src/pipecat/transports/network/websocket_server.py @@ -39,6 +39,8 @@ from pipecat.transports.base_transport import BaseTransport, TransportParams try: import websockets + from websockets.asyncio.server import serve as websocket_serve + from websockets.protocol import State except ModuleNotFoundError as e: logger.error(f"Exception: {e}") logger.error("In order to use websockets, you need to `pip install pipecat-ai[websocket]`.") @@ -177,11 +179,11 @@ class WebsocketServerInputTransport(BaseInputTransport): async def _server_task_handler(self): """Handle WebSocket server startup and client connections.""" logger.info(f"Starting websocket server on {self._host}:{self._port}") - async with websockets.serve(self._client_handler, self._host, self._port) as server: + async with websocket_serve(self._client_handler, self._host, self._port) as server: await self._callbacks.on_websocket_ready() await self._stop_server_event.wait() - async def _client_handler(self, websocket: websockets.WebSocketServerProtocol, path): + async def _client_handler(self, websocket: websockets.WebSocketServerProtocol): """Handle individual client connections and message processing.""" logger.info(f"New client connection from {websocket.remote_address}") if self._websocket: @@ -231,7 +233,7 @@ class WebsocketServerInputTransport(BaseInputTransport): """Monitor WebSocket connection for session timeout.""" try: await asyncio.sleep(session_timeout) - if not websocket.closed: + if websocket.state is not State.CLOSED: await self._callbacks.on_session_timeout(websocket) except asyncio.CancelledError: logger.info(f"Monitoring task cancelled for: {websocket.remote_address}")