Port to the websockets asyncio implementation, support for websockets 13 and 14

This commit is contained in:
Mark Backman
2025-07-22 17:23:55 -04:00
parent 7955080da2
commit 300f19ad23
19 changed files with 94 additions and 68 deletions

View File

@@ -25,9 +25,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
`tasks cancelled error` to a debug log. This removes the log from appearing
in Pipecat logs upon leaving.
- The `websockets` dependency for a number of packages was updated to support
websockets>=13.1.0 and <15.0.0. This change provides greater compatibility
across Pipecat's packages.
- Upgraded the `websockets` implementation to the new asyncio implementation.
Along with this change, we're updating support for versions >=13.1.0 and
<15.0.0. All services have been update to use the asyncio implementation.
- Updated `MiniMaxHttpTTSService` with a `base_url` arg where you can specify
the Global endpoint (default) or Mainland China.

View File

@@ -44,6 +44,7 @@ from .models import (
try:
import websockets
from websockets.asyncio.client import connect as websocket_connect
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error('In order to use AssemblyAI, you need to `pip install "pipecat-ai[assemblyai]"`.')
@@ -190,7 +191,7 @@ class AssemblyAISTTService(STTService):
"Authorization": self._api_key,
"User-Agent": f"AssemblyAI/1.0 (integration=Pipecat/{pipecat_version})",
}
self._websocket = await websockets.connect(
self._websocket = await websocket_connect(
ws_url,
additional_headers=headers,
)

View File

@@ -36,6 +36,8 @@ from pipecat.utils.tracing.service_decorators import traced_stt
try:
import websockets
from websockets.asyncio.client import connect as websocket_connect
from websockets.protocol import State
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use AWS services, you need to `pip install pipecat-ai[aws]`.")
@@ -133,7 +135,7 @@ class AWSTranscribeSTTService(STTService):
while retry_count < max_retries:
try:
await self._connect()
if self._ws_client and self._ws_client.open:
if self._ws_client and self._ws_client.state is State.OPEN:
logger.info("Successfully established WebSocket connection")
return
logger.warning("WebSocket connection not established after connect")
@@ -174,7 +176,7 @@ class AWSTranscribeSTTService(STTService):
"""
try:
# Ensure WebSocket is connected
if not self._ws_client or not self._ws_client.open:
if not self._ws_client or self._ws_client.state is State.CLOSED:
logger.debug("WebSocket not connected, attempting to reconnect...")
try:
await self._connect()
@@ -208,7 +210,7 @@ class AWSTranscribeSTTService(STTService):
async def _connect(self):
"""Connect to AWS Transcribe with connection state management."""
if self._ws_client and self._ws_client.open and self._receive_task:
if self._ws_client and self._ws_client.state is State.OPEN and self._receive_task:
logger.debug(f"{self} Already connected")
return
@@ -268,7 +270,7 @@ class AWSTranscribeSTTService(STTService):
logger.debug(f"{self} Connecting to WebSocket with URL: {presigned_url[:100]}...")
# Connect with the required headers and settings
self._ws_client = await websockets.connect(
self._ws_client = await websocket_connect(
presigned_url,
additional_headers=additional_headers,
subprotocols=["mqtt"],
@@ -299,7 +301,7 @@ class AWSTranscribeSTTService(STTService):
self._receive_task = None
try:
if self._ws_client and self._ws_client.open:
if self._ws_client and self._ws_client.state is State.OPEN:
# Send end-stream message
end_stream = {"message-type": "event", "event": "end"}
await self._ws_client.send(json.dumps(end_stream))
@@ -341,7 +343,7 @@ class AWSTranscribeSTTService(STTService):
async def _receive_loop(self):
"""Background task to receive and process messages from AWS Transcribe."""
while True:
if not self._ws_client or not self._ws_client.open:
if not self._ws_client or self._ws_client.state is State.CLOSED:
logger.warning(f"{self} WebSocket closed in receive loop")
break

View File

@@ -15,7 +15,6 @@ import json
import urllib.parse
from typing import AsyncGenerator, Optional
import websockets
from loguru import logger
from pipecat.frames.frames import (
@@ -34,6 +33,15 @@ from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt
try:
import websockets
from websockets.asyncio.client import connect as websocket_connect
from websockets.protocol import State
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Cartesia, you need to `pip install pipecat-ai[cartesia]`.")
raise Exception(f"Missing module: {e}")
class CartesiaLiveOptions:
"""Configuration options for Cartesia Live STT service.
@@ -216,7 +224,7 @@ class CartesiaSTTService(STTService):
None - transcription results are handled via WebSocket responses.
"""
# If the connection is closed, due to timeout, we need to reconnect when the user starts speaking again
if not self._connection or self._connection.closed:
if not self._connection or self._connection.state is State.CLOSED:
await self._connect()
await self._connection.send(audio)
@@ -229,7 +237,7 @@ class CartesiaSTTService(STTService):
headers = {"Cartesia-Version": "2025-04-16", "X-API-Key": self._api_key}
try:
self._connection = await websockets.connect(ws_url, additional_headers=headers)
self._connection = await websocket_connect(ws_url, additional_headers=headers)
# Setup the receiver task to handle the incoming messages from the Cartesia server
if self._receiver_task is None or self._receiver_task.done():
self._receiver_task = asyncio.create_task(self._receive_messages())
@@ -240,7 +248,7 @@ class CartesiaSTTService(STTService):
async def _receive_messages(self):
try:
while True:
if not self._connection or self._connection.closed:
if not self._connection or self._connection.state is State.CLOSED:
break
message = await self._connection.recv()
@@ -320,7 +328,7 @@ class CartesiaSTTService(STTService):
logger.exception(f"Unexpected exception while cancelling task: {e}")
self._receiver_task = None
if self._connection and self._connection.open:
if self._connection and self._connection.state is State.OPEN:
logger.debug("Disconnecting from Cartesia")
await self._connection.close()
@@ -344,5 +352,5 @@ class CartesiaSTTService(STTService):
await self.start_metrics()
elif isinstance(frame, UserStoppedSpeakingFrame):
# Send finalize command to flush the transcription session
if self._connection and self._connection.open:
if self._connection and self._connection.state is State.OPEN:
await self._connection.send("finalize")

View File

@@ -36,8 +36,9 @@ from pipecat.utils.tracing.service_decorators import traced_tts
# See .env.example for Cartesia configuration needed
try:
import websockets
from cartesia import AsyncCartesia
from websockets.asyncio.client import connect as websocket_connect
from websockets.protocol import State
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Cartesia, you need to `pip install pipecat-ai[cartesia]`.")
@@ -288,10 +289,10 @@ class CartesiaTTSService(AudioContextWordTTSService):
async def _connect_websocket(self):
try:
if self._websocket and self._websocket.open:
if self._websocket and self._websocket.state is State.OPEN:
return
logger.debug("Connecting to Cartesia")
self._websocket = await websockets.connect(
self._websocket = await websocket_connect(
f"{self._url}?api_key={self._api_key}&cartesia_version={self._cartesia_version}"
)
except Exception as e:
@@ -380,7 +381,7 @@ class CartesiaTTSService(AudioContextWordTTSService):
logger.debug(f"{self}: Generating TTS [{text}]")
try:
if not self._websocket or self._websocket.closed:
if not self._websocket or self._websocket.state is State.CLOSED:
await self._connect()
if not self._context_id:

View File

@@ -44,6 +44,8 @@ from pipecat.utils.tracing.service_decorators import traced_tts
# See .env.example for ElevenLabs configuration needed
try:
import websockets
from websockets.asyncio.client import connect as websocket_connect
from websockets.protocol import State
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use ElevenLabs, you need to `pip install pipecat-ai[elevenlabs]`.")
@@ -447,7 +449,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
async def _connect_websocket(self):
try:
if self._websocket and self._websocket.open:
if self._websocket and self._websocket.state is State.OPEN:
return
logger.debug("Connecting to ElevenLabs")
@@ -474,7 +476,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
)
# Set max websocket message size to 16MB for large audio responses
self._websocket = await websockets.connect(
self._websocket = await websocket_connect(
url, max_size=16 * 1024 * 1024, additional_headers={"xi-api-key": self._api_key}
)
@@ -587,7 +589,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
self.reset_watchdog()
await asyncio.sleep(KEEPALIVE_SLEEP)
try:
if self._websocket and self._websocket.open:
if self._websocket and self._websocket.state is State.OPEN:
if self._context_id:
# Send keepalive with context ID to keep the connection alive
keepalive_message = {
@@ -625,7 +627,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
logger.debug(f"{self}: Generating TTS [{text}]")
try:
if not self._websocket or self._websocket.closed:
if not self._websocket or self._websocket.state is State.CLOSED:
await self._connect()
try:

View File

@@ -34,7 +34,8 @@ from pipecat.utils.tracing.service_decorators import traced_tts
try:
import ormsgpack
import websockets
from websockets.asyncio.client import connect as websocket_connect
from websockets.protocol import State
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Fish Audio, you need to `pip install pipecat-ai[fish]`.")
@@ -210,13 +211,13 @@ class FishAudioTTSService(InterruptibleTTSService):
async def _connect_websocket(self):
try:
if self._websocket and self._websocket.open:
if self._websocket and self._websocket.state is State.OPEN:
return
logger.debug("Connecting to Fish Audio")
headers = {"Authorization": f"Bearer {self._api_key}"}
headers["model"] = self.model_name
self._websocket = await websockets.connect(self._base_url, additional_headers=headers)
self._websocket = await websocket_connect(self._base_url, additional_headers=headers)
# Send initial start message with ormsgpack
start_message = {"event": "start", "request": {"text": "", **self._settings}}
@@ -246,7 +247,7 @@ class FishAudioTTSService(InterruptibleTTSService):
async def flush_audio(self):
"""Flush any buffered audio by sending a flush event to Fish Audio."""
logger.trace(f"{self}: Flushing audio buffers")
if not self._websocket or self._websocket.closed:
if not self._websocket or self._websocket.state is State.CLOSED:
return
flush_message = {"event": "flush"}
await self._get_websocket().send(ormsgpack.packb(flush_message))
@@ -292,7 +293,7 @@ class FishAudioTTSService(InterruptibleTTSService):
"""
logger.debug(f"{self}: Generating Fish TTS: [{text}]")
try:
if not self._websocket or self._websocket.closed:
if not self._websocket or self._websocket.state is State.CLOSED:
await self._connect()
if not self._request_id:

View File

@@ -75,7 +75,7 @@ from . import events
from .file_api import GeminiFileAPI
try:
import websockets
from websockets.asyncio.client import connect as websocket_connect
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Google AI, you need to `pip install pipecat-ai[google]`.")
@@ -791,7 +791,7 @@ class GeminiMultimodalLiveLLMService(LLMService):
try:
logger.info(f"Connecting to wss://{self._base_url}")
uri = f"wss://{self._base_url}?key={self._api_key}"
self._websocket = await websockets.connect(uri=uri)
self._websocket = await websocket_connect(uri=uri)
self._receive_task = self.create_task(self._receive_task_handler())
# Create the basic configuration

View File

@@ -37,6 +37,8 @@ from pipecat.utils.tracing.service_decorators import traced_stt
try:
import websockets
from websockets.asyncio.client import connect as websocket_connect
from websockets.protocol import State
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Gladia, you need to `pip install pipecat-ai[gladia]`.")
@@ -402,7 +404,7 @@ class GladiaSTTService(STTService):
logger.warning(f"Audio buffer exceeded max size, trimmed {trim_size} bytes")
# Send audio if connected
if self._connection_active and self._websocket and not self._websocket.closed:
if self._connection_active and self._websocket and self._websocket.state is State.OPEN:
try:
await self._send_audio(audio)
except websockets.exceptions.ConnectionClosed as e:
@@ -423,7 +425,7 @@ class GladiaSTTService(STTService):
self._reconnection_attempts = 0
# Connect with automatic reconnection
async with websockets.connect(self._session_url) as websocket:
async with websocket_connect(self._session_url) as websocket:
try:
self._websocket = websocket
self._connection_active = True
@@ -507,7 +509,7 @@ class GladiaSTTService(STTService):
async def _send_audio(self, audio: bytes):
"""Send audio chunk with proper message format."""
if self._websocket and not self._websocket.closed:
if self._websocket and self._websocket.state is State.OPEN:
data = base64.b64encode(audio).decode("utf-8")
message = {"type": "audio_chunk", "data": {"chunk": data}}
await self._websocket.send(json.dumps(message))
@@ -520,7 +522,7 @@ class GladiaSTTService(STTService):
await self._send_audio(bytes(self._audio_buffer))
async def _send_stop_recording(self):
if self._websocket and not self._websocket.closed:
if self._websocket and self._websocket.state is State.OPEN:
await self._websocket.send(json.dumps({"type": "stop_recording"}))
async def _keepalive_task_handler(self):
@@ -531,7 +533,7 @@ class GladiaSTTService(STTService):
self.reset_watchdog()
# Send keepalive (Gladia times out after 30 seconds)
await asyncio.sleep(KEEPALIVE_SLEEP)
if self._websocket and not self._websocket.closed:
if self._websocket and self._websocket.state is State.OPEN:
# Send an empty audio chunk as keepalive
empty_audio = b""
await self._send_audio(empty_audio)

View File

@@ -29,7 +29,8 @@ from pipecat.utils.tracing.service_decorators import traced_tts
# See .env.example for LMNT configuration needed
try:
import websockets
from websockets.asyncio.client import connect as websocket_connect
from websockets.protocol import State
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use LMNT, you need to `pip install pipecat-ai[lmnt]`.")
@@ -200,7 +201,7 @@ class LmntTTSService(InterruptibleTTSService):
async def _connect_websocket(self):
"""Connect to LMNT websocket."""
try:
if self._websocket and self._websocket.open:
if self._websocket and self._websocket.state is State.OPEN:
return
logger.debug("Connecting to LMNT")
@@ -216,7 +217,7 @@ class LmntTTSService(InterruptibleTTSService):
}
# Connect to LMNT's websocket directly
self._websocket = await websockets.connect("wss://api.lmnt.com/v1/ai/speech/stream")
self._websocket = await websocket_connect("wss://api.lmnt.com/v1/ai/speech/stream")
# Send initialization message
await self._websocket.send(json.dumps(init_msg))
@@ -251,7 +252,7 @@ class LmntTTSService(InterruptibleTTSService):
async def flush_audio(self):
"""Flush any pending audio synthesis."""
if not self._websocket or self._websocket.closed:
if not self._websocket or self._websocket.state is State.CLOSED:
return
await self._get_websocket().send(json.dumps({"flush": True}))
@@ -292,7 +293,7 @@ class LmntTTSService(InterruptibleTTSService):
logger.debug(f"{self}: Generating TTS [{text}]")
try:
if not self._websocket or self._websocket.closed:
if not self._websocket or self._websocket.state is State.CLOSED:
await self._connect()
try:

View File

@@ -39,8 +39,9 @@ from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
from pipecat.utils.tracing.service_decorators import traced_tts
try:
import websockets
from pyneuphonic import Neuphonic, TTSConfig
from websockets.asyncio.client import connect as websocket_connect
from websockets.protocol import State
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Neuphonic, you need to `pip install pipecat-ai[neuphonic]`.")
@@ -271,7 +272,7 @@ class NeuphonicTTSService(InterruptibleTTSService):
async def _connect_websocket(self):
"""Establish WebSocket connection to Neuphonic API."""
try:
if self._websocket and self._websocket.open:
if self._websocket and self._websocket.state is State.OPEN:
return
logger.debug("Connecting to Neuphonic")
@@ -292,7 +293,7 @@ class NeuphonicTTSService(InterruptibleTTSService):
headers = {"x-api-key": self._api_key}
self._websocket = await websockets.connect(url, additional_headers=headers)
self._websocket = await websocket_connect(url, additional_headers=headers)
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None
@@ -359,7 +360,7 @@ class NeuphonicTTSService(InterruptibleTTSService):
logger.debug(f"Generating TTS: [{text}]")
try:
if not self._websocket or self._websocket.closed:
if not self._websocket or self._websocket.state is State.CLOSED:
await self._connect()
try:

View File

@@ -11,7 +11,7 @@ from loguru import logger
from .openai import OpenAIRealtimeBetaLLMService
try:
import websockets
from websockets.asyncio.client import connect as websocket_connect
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
@@ -55,7 +55,7 @@ class AzureRealtimeBetaLLMService(OpenAIRealtimeBetaLLMService):
return
logger.info(f"Connecting to {self.base_url}, api key: {self.api_key}")
self._websocket = await websockets.connect(
self._websocket = await websocket_connect(
uri=self.base_url,
additional_headers={
"api-key": self.api_key,

View File

@@ -66,7 +66,7 @@ from .context import (
from .frames import RealtimeFunctionCallResultFrame, RealtimeMessagesUpdateFrame
try:
import websockets
from websockets.asyncio.client import connect as websocket_connect
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use OpenAI, you need to `pip install pipecat-ai[openai]`.")
@@ -387,7 +387,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
# Here we assume that if we have a websocket, we are connected. We
# handle disconnections in the send/recv code paths.
return
self._websocket = await websockets.connect(
self._websocket = await websocket_connect(
uri=self.base_url,
additional_headers={
"Authorization": f"Bearer {self.api_key}",

View File

@@ -17,7 +17,6 @@ import uuid
from typing import AsyncGenerator, Optional
import aiohttp
import websockets
from loguru import logger
from pydantic import BaseModel
@@ -41,6 +40,8 @@ try:
from pyht.async_client import AsyncClient
from pyht.client import Format, TTSOptions
from pyht.client import Language as PlayHTLanguage
from websockets.asyncio.client import connect as websocket_connect
from websockets.protocol import State
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use PlayHT, you need to `pip install pipecat-ai[playht]`.")
@@ -244,7 +245,7 @@ class PlayHTTTSService(InterruptibleTTSService):
async def _connect_websocket(self):
"""Connect to PlayHT websocket."""
try:
if self._websocket and self._websocket.open:
if self._websocket and self._websocket.state is State.OPEN:
return
logger.debug("Connecting to PlayHT")
@@ -255,7 +256,7 @@ class PlayHTTTSService(InterruptibleTTSService):
if not isinstance(self._websocket_url, str):
raise ValueError("WebSocket URL is not a string")
self._websocket = await websockets.connect(self._websocket_url)
self._websocket = await websocket_connect(self._websocket_url)
except ValueError as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None
@@ -362,7 +363,7 @@ class PlayHTTTSService(InterruptibleTTSService):
try:
# Reconnect if the websocket is closed
if not self._websocket or self._websocket.closed:
if not self._websocket or self._websocket.state is State.CLOSED:
await self._connect()
if not self._request_id:

View File

@@ -39,7 +39,8 @@ from pipecat.utils.text.skip_tags_aggregator import SkipTagsAggregator
from pipecat.utils.tracing.service_decorators import traced_tts
try:
import websockets
from websockets.asyncio.client import connect as websocket_connect
from websockets.protocol import State
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Rime, you need to `pip install pipecat-ai[rime]`.")
@@ -238,13 +239,13 @@ class RimeTTSService(AudioContextWordTTSService):
async def _connect_websocket(self):
"""Connect to Rime websocket API with configured settings."""
try:
if self._websocket and self._websocket.open:
if self._websocket and self._websocket.state is State.OPEN:
return
params = "&".join(f"{k}={v}" for k, v in self._settings.items())
url = f"{self._url}?{params}"
headers = {"Authorization": f"Bearer {self._api_key}"}
self._websocket = await websockets.connect(url, additional_headers=headers)
self._websocket = await websocket_connect(url, additional_headers=headers)
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None
@@ -380,7 +381,7 @@ class RimeTTSService(AudioContextWordTTSService):
"""
logger.debug(f"{self}: Generating TTS [{text}]")
try:
if not self._websocket or self._websocket.closed:
if not self._websocket or self._websocket.state is State.CLOSED:
await self._connect()
try:

View File

@@ -32,6 +32,8 @@ from pipecat.utils.tracing.service_decorators import traced_stt
try:
import websockets
from websockets.asyncio.client import connect as websocket_connect
from websockets.protocol import State
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Soniox, you need to `pip install pipecat-ai[soniox]`.")
@@ -162,7 +164,7 @@ class SonioxSTTService(STTService):
if self._websocket:
return
self._websocket = await websockets.connect(self._url)
self._websocket = await websocket_connect(self._url)
if not self._websocket:
logger.error(f"Unable to connect to Soniox API at {self._url}")
@@ -244,7 +246,7 @@ class SonioxSTTService(STTService):
Frame: None (transcription results come via WebSocket callbacks).
"""
await self.start_processing_metrics()
if self._websocket and not self._websocket.closed:
if self._websocket and self._websocket.state is State.OPEN:
await self._websocket.send(audio)
await self.stop_processing_metrics()
@@ -268,13 +270,13 @@ class SonioxSTTService(STTService):
if isinstance(frame, UserStoppedSpeakingFrame) and self._vad_force_turn_endpoint:
# Send finalize message to Soniox so we get the final tokens asap.
if self._websocket and not self._websocket.closed:
if self._websocket and self._websocket.state is State.OPEN:
await self._websocket.send(FINALIZE_MESSAGE)
logger.debug(f"Triggered finalize event on: {frame.name=}, {direction=}")
async def _send_stop_recording(self):
"""Send stop recording message to Soniox."""
if self._websocket and not self._websocket.closed:
if self._websocket and self._websocket.state is State.OPEN:
# Send stop recording message
await self._websocket.send("")
@@ -283,7 +285,7 @@ class SonioxSTTService(STTService):
try:
while True:
logger.debug("Sending keepalive message")
if self._websocket and not self._websocket.closed:
if self._websocket and self._websocket.state is State.OPEN:
await self._websocket.send(KEEPALIVE_MESSAGE)
else:
logger.debug("WebSocket connection closed.")

View File

@@ -43,7 +43,7 @@ class WebsocketService(ABC):
True if connection is verified working, False otherwise.
"""
try:
if not self._websocket or self._websocket.closed:
if not self._websocket or self._websocket.state is State.CLOSED:
return False
await self._websocket.ping()
return True
@@ -82,7 +82,7 @@ class WebsocketService(ABC):
try:
await self._receive_messages()
retry_count = 0 # Reset counter on successful message receive
if self._websocket and self._websocket.state == State.CLOSED:
if self._websocket and self._websocket.state is State.CLOSED:
raise websockets.ConnectionClosedOK(
self._websocket.close_rcvd,
self._websocket.close_sent,

View File

@@ -20,6 +20,7 @@ from typing import Awaitable, Callable, Optional
import websockets
from loguru import logger
from pydantic.main import BaseModel
from websockets.asyncio.client import connect as websocket_connect
from pipecat.frames.frames import (
CancelFrame,
@@ -129,7 +130,7 @@ class WebsocketClientSession:
return
try:
self._websocket = await websockets.connect(uri=self._uri, open_timeout=10)
self._websocket = await websocket_connect(uri=self._uri, open_timeout=10)
self._client_task = self.task_manager.create_task(
self._client_task_handler(),
f"{self._transport_name}::WebsocketClientSession::_client_task_handler",

View File

@@ -39,6 +39,8 @@ from pipecat.transports.base_transport import BaseTransport, TransportParams
try:
import websockets
from websockets.asyncio.server import serve as websocket_serve
from websockets.protocol import State
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use websockets, you need to `pip install pipecat-ai[websocket]`.")
@@ -177,11 +179,11 @@ class WebsocketServerInputTransport(BaseInputTransport):
async def _server_task_handler(self):
"""Handle WebSocket server startup and client connections."""
logger.info(f"Starting websocket server on {self._host}:{self._port}")
async with websockets.serve(self._client_handler, self._host, self._port) as server:
async with websocket_serve(self._client_handler, self._host, self._port) as server:
await self._callbacks.on_websocket_ready()
await self._stop_server_event.wait()
async def _client_handler(self, websocket: websockets.WebSocketServerProtocol, path):
async def _client_handler(self, websocket: websockets.WebSocketServerProtocol):
"""Handle individual client connections and message processing."""
logger.info(f"New client connection from {websocket.remote_address}")
if self._websocket:
@@ -231,7 +233,7 @@ class WebsocketServerInputTransport(BaseInputTransport):
"""Monitor WebSocket connection for session timeout."""
try:
await asyncio.sleep(session_timeout)
if not websocket.closed:
if websocket.state is not State.CLOSED:
await self._callbacks.on_session_timeout(websocket)
except asyncio.CancelledError:
logger.info(f"Monitoring task cancelled for: {websocket.remote_address}")