From aac24ad2d4f3c7808a414f7159593d2397b7c8a6 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Sat, 10 Jan 2026 11:18:35 -0500 Subject: [PATCH] InworldTTSService: Add keepalive task --- changelog/3403.added.md | 1 + src/pipecat/services/inworld/tts.py | 41 +++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+) create mode 100644 changelog/3403.added.md diff --git a/changelog/3403.added.md b/changelog/3403.added.md new file mode 100644 index 000000000..6b55ef97d --- /dev/null +++ b/changelog/3403.added.md @@ -0,0 +1 @@ +- Added a keepalive task for `InworldTTSService` to keep the service connected in the event of no generations for longer periods of time. diff --git a/src/pipecat/services/inworld/tts.py b/src/pipecat/services/inworld/tts.py index fddb96602..c3dbce1eb 100644 --- a/src/pipecat/services/inworld/tts.py +++ b/src/pipecat/services/inworld/tts.py @@ -13,12 +13,14 @@ Contains two TTS services: Inworld’s text-to-speech (TTS) models offer ultra-realistic, context-aware speech synthesis and precise voice cloning capabilities, enabling developers to build natural and engaging experiences with human-like speech quality at an accessible price point. """ +import asyncio import base64 import json import uuid from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple import aiohttp +import websockets from loguru import logger from pydantic import BaseModel @@ -479,6 +481,7 @@ class InworldTTSService(AudioContextWordTTSService): } self._receive_task = None + self._keepalive_task = None self._context_id = None self._started = False @@ -606,9 +609,13 @@ class InworldTTSService(AudioContextWordTTSService): The websocket. """ await self._connect_websocket() + if self._websocket and not self._receive_task: self._receive_task = self.create_task(self._receive_task_handler(self._report_error)) + if self._websocket and not self._keepalive_task: + self._keepalive_task = self.create_task(self._keepalive_task_handler()) + async def _disconnect(self): """Disconnect from the Inworld WebSocket TTS service. @@ -619,6 +626,10 @@ class InworldTTSService(AudioContextWordTTSService): await self.cancel_task(self._receive_task) self._receive_task = None + if self._keepalive_task: + await self.cancel_task(self._keepalive_task) + self._keepalive_task = None + await self._disconnect_websocket() async def _connect_websocket(self): @@ -693,6 +704,15 @@ class InworldTTSService(AudioContextWordTTSService): status = result.get("status", {}) if status.get("code", 0) != 0: error_msg = status.get("message", "Unknown error") + error_code = status.get("code") + + # Handle "Context not found" error (code 5) + # This can happen when a keepalive message is sent but no context is available. + if error_code == 5 and "not found" in error_msg.lower(): + logger.debug(f"{self}: Context {ctx_id or self._context_id} not found.") + continue + + # For other errors, push error frame await self.push_error(error_msg=f"Inworld API error: {error_msg}") continue @@ -757,6 +777,27 @@ class InworldTTSService(AudioContextWordTTSService): await self.remove_audio_context(ctx_id) await self.add_word_timestamps([("TTSStoppedFrame", 0), ("Reset", 0)]) + async def _keepalive_task_handler(self): + """Send periodic keepalive messages to maintain WebSocket connection.""" + KEEPALIVE_SLEEP = 60 + while True: + await asyncio.sleep(KEEPALIVE_SLEEP) + try: + if self._websocket and self._websocket.state is State.OPEN: + if self._context_id: + keepalive_message = { + "send_text": {"text": ""}, + "contextId": self._context_id, + } + logger.trace(f"Sending keepalive for context {self._context_id}") + else: + keepalive_message = {"send_text": {"text": ""}} + logger.trace("Sending keepalive without context") + await self._websocket.send(json.dumps(keepalive_message)) + except websockets.ConnectionClosed as e: + logger.warning(f"{self} keepalive error: {e}") + break + async def _send_context(self, context_id: str): """Send a context to the Inworld WebSocket TTS service.