InworldTTSService: Add keepalive task
This commit is contained in:
1
changelog/3403.added.md
Normal file
1
changelog/3403.added.md
Normal file
@@ -0,0 +1 @@
|
||||
- Added a keepalive task for `InworldTTSService` to keep the service connected in the event of no generations for longer periods of time.
|
||||
@@ -13,12 +13,14 @@ Contains two TTS services:
|
||||
Inworld’s text-to-speech (TTS) models offer ultra-realistic, context-aware speech synthesis and precise voice cloning capabilities, enabling developers to build natural and engaging experiences with human-like speech quality at an accessible price point.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import json
|
||||
import uuid
|
||||
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple
|
||||
|
||||
import aiohttp
|
||||
import websockets
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
|
||||
@@ -479,6 +481,7 @@ class InworldTTSService(AudioContextWordTTSService):
|
||||
}
|
||||
|
||||
self._receive_task = None
|
||||
self._keepalive_task = None
|
||||
self._context_id = None
|
||||
self._started = False
|
||||
|
||||
@@ -606,9 +609,13 @@ class InworldTTSService(AudioContextWordTTSService):
|
||||
The websocket.
|
||||
"""
|
||||
await self._connect_websocket()
|
||||
|
||||
if self._websocket and not self._receive_task:
|
||||
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
|
||||
|
||||
if self._websocket and not self._keepalive_task:
|
||||
self._keepalive_task = self.create_task(self._keepalive_task_handler())
|
||||
|
||||
async def _disconnect(self):
|
||||
"""Disconnect from the Inworld WebSocket TTS service.
|
||||
|
||||
@@ -619,6 +626,10 @@ class InworldTTSService(AudioContextWordTTSService):
|
||||
await self.cancel_task(self._receive_task)
|
||||
self._receive_task = None
|
||||
|
||||
if self._keepalive_task:
|
||||
await self.cancel_task(self._keepalive_task)
|
||||
self._keepalive_task = None
|
||||
|
||||
await self._disconnect_websocket()
|
||||
|
||||
async def _connect_websocket(self):
|
||||
@@ -693,6 +704,15 @@ class InworldTTSService(AudioContextWordTTSService):
|
||||
status = result.get("status", {})
|
||||
if status.get("code", 0) != 0:
|
||||
error_msg = status.get("message", "Unknown error")
|
||||
error_code = status.get("code")
|
||||
|
||||
# Handle "Context not found" error (code 5)
|
||||
# This can happen when a keepalive message is sent but no context is available.
|
||||
if error_code == 5 and "not found" in error_msg.lower():
|
||||
logger.debug(f"{self}: Context {ctx_id or self._context_id} not found.")
|
||||
continue
|
||||
|
||||
# For other errors, push error frame
|
||||
await self.push_error(error_msg=f"Inworld API error: {error_msg}")
|
||||
continue
|
||||
|
||||
@@ -757,6 +777,27 @@ class InworldTTSService(AudioContextWordTTSService):
|
||||
await self.remove_audio_context(ctx_id)
|
||||
await self.add_word_timestamps([("TTSStoppedFrame", 0), ("Reset", 0)])
|
||||
|
||||
async def _keepalive_task_handler(self):
|
||||
"""Send periodic keepalive messages to maintain WebSocket connection."""
|
||||
KEEPALIVE_SLEEP = 60
|
||||
while True:
|
||||
await asyncio.sleep(KEEPALIVE_SLEEP)
|
||||
try:
|
||||
if self._websocket and self._websocket.state is State.OPEN:
|
||||
if self._context_id:
|
||||
keepalive_message = {
|
||||
"send_text": {"text": ""},
|
||||
"contextId": self._context_id,
|
||||
}
|
||||
logger.trace(f"Sending keepalive for context {self._context_id}")
|
||||
else:
|
||||
keepalive_message = {"send_text": {"text": ""}}
|
||||
logger.trace("Sending keepalive without context")
|
||||
await self._websocket.send(json.dumps(keepalive_message))
|
||||
except websockets.ConnectionClosed as e:
|
||||
logger.warning(f"{self} keepalive error: {e}")
|
||||
break
|
||||
|
||||
async def _send_context(self, context_id: str):
|
||||
"""Send a context to the Inworld WebSocket TTS service.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user