Merge pull request #4527 from pipecat-ai/mb/fix-elevenlabs-keepalive-1008
Fix ElevenLabs keepalive racing context-init (1008 disconnects)
This commit is contained in:
1
changelog/4527.fixed.md
Normal file
1
changelog/4527.fixed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed a race in `ElevenLabsTTSService` where the periodic keepalive could be sent for a new turn's context before that context's `voice_settings` initialization message, causing ElevenLabs to close the WebSocket with a 1008 policy violation (`voice_settings field must be provided in the first message ...`). The keepalive now only targets a context once its context-init has been sent.
|
||||
@@ -594,6 +594,10 @@ class ElevenLabsTTSService(WebsocketTTSService):
|
||||
self._partial_word_start_time = 0.0
|
||||
self._alignment_started_context_ids: set[str | None] = set()
|
||||
|
||||
# Context IDs whose context-init has been sent, so the keepalive knows
|
||||
# which contexts are safe to target.
|
||||
self._context_init_sent: set[str] = set()
|
||||
|
||||
# Context management for v1 multi API
|
||||
self._receive_task = None
|
||||
self._keepalive_task = None
|
||||
@@ -792,6 +796,7 @@ class ElevenLabsTTSService(WebsocketTTSService):
|
||||
finally:
|
||||
await self.remove_active_audio_context()
|
||||
self._websocket = None
|
||||
self._context_init_sent.clear()
|
||||
await self._call_event_handler("on_disconnected")
|
||||
|
||||
def _get_websocket(self):
|
||||
@@ -822,6 +827,7 @@ class ElevenLabsTTSService(WebsocketTTSService):
|
||||
self._partial_word = ""
|
||||
self._partial_word_start_time = 0.0
|
||||
self._alignment_started_context_ids.discard(context_id)
|
||||
self._context_init_sent.discard(context_id)
|
||||
|
||||
async def on_audio_context_interrupted(self, context_id: str):
|
||||
"""Close the ElevenLabs context when the bot is interrupted."""
|
||||
@@ -914,26 +920,35 @@ class ElevenLabsTTSService(WebsocketTTSService):
|
||||
while True:
|
||||
await asyncio.sleep(KEEPALIVE_SLEEP)
|
||||
try:
|
||||
if self._websocket and self._websocket.state is State.OPEN:
|
||||
context_id = self.get_active_audio_context_id()
|
||||
if context_id:
|
||||
# Send keepalive with context ID to keep the connection alive
|
||||
keepalive_message = {
|
||||
"text": "",
|
||||
"context_id": context_id,
|
||||
}
|
||||
logger.trace(f"Sending keepalive for context {context_id}")
|
||||
else:
|
||||
# It's possible to have a user interruption which clears the context
|
||||
# without generating a new TTS response. In this case, we'll just send
|
||||
# an empty message to keep the connection alive.
|
||||
keepalive_message = {"text": ""}
|
||||
logger.trace("Sending keepalive without context")
|
||||
await self._websocket.send(json.dumps(keepalive_message))
|
||||
await self._send_keepalive()
|
||||
except websockets.ConnectionClosed as e:
|
||||
logger.warning(f"{self} keepalive error: {e}")
|
||||
break
|
||||
|
||||
async def _send_keepalive(self):
|
||||
"""Send a single keepalive message to keep the WebSocket connection alive.
|
||||
|
||||
Only stamps a ``context_id`` once its context-init (carrying
|
||||
``voice_settings``) has been sent. Otherwise the keepalive would be the
|
||||
context's first message, with no ``voice_settings``, and ElevenLabs would
|
||||
reject the later context-init with a 1008 policy violation. A context-less
|
||||
keepalive is sufficient until the context-init is sent.
|
||||
"""
|
||||
if not self._websocket or self._websocket.state is not State.OPEN:
|
||||
return
|
||||
|
||||
context_id = self.get_active_audio_context_id()
|
||||
if context_id and context_id in self._context_init_sent:
|
||||
# The context's voice_settings context-init has been sent, so it's
|
||||
# safe to keep that context alive.
|
||||
keepalive_message = {"text": "", "context_id": context_id}
|
||||
else:
|
||||
# No active context, or the active context's context-init hasn't been
|
||||
# sent yet. A context-less keepalive keeps the connection alive without
|
||||
# opening the context prematurely.
|
||||
keepalive_message = {"text": ""}
|
||||
await self._websocket.send(json.dumps(keepalive_message))
|
||||
|
||||
async def _send_text(self, text: str, context_id: str):
|
||||
"""Send text to the WebSocket for synthesis."""
|
||||
if self._websocket and context_id:
|
||||
@@ -980,6 +995,9 @@ class ElevenLabsTTSService(WebsocketTTSService):
|
||||
locator.model_dump()
|
||||
for locator in self._pronunciation_dictionary_locators
|
||||
]
|
||||
# Mark the context-init as sent so the keepalive may now
|
||||
# target this context_id.
|
||||
self._context_init_sent.add(context_id)
|
||||
await self._websocket.send(json.dumps(msg))
|
||||
logger.trace(f"Created new context {context_id}")
|
||||
|
||||
|
||||
@@ -6,9 +6,14 @@
|
||||
|
||||
"""Tests for ElevenLabs TTS alignment handling."""
|
||||
|
||||
import json
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
from websockets.protocol import State
|
||||
|
||||
from pipecat.services.elevenlabs.tts import (
|
||||
ElevenLabsTTSService,
|
||||
_select_alignment,
|
||||
_strip_utterance_leading_spaces,
|
||||
calculate_word_times,
|
||||
@@ -200,3 +205,87 @@ def test_select_alignment_works_with_http_field_names():
|
||||
)
|
||||
assert selected is not None
|
||||
assert selected["characters"] == list(" Hi")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Keepalive vs context-init race
|
||||
#
|
||||
# The keepalive must only stamp a context_id once its context-init (carrying
|
||||
# voice_settings) has been sent. Stamping it earlier makes the keepalive the
|
||||
# context's first message, with no voice_settings, and ElevenLabs rejects the
|
||||
# later context-init with a 1008 policy violation.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class _FakeWebSocket:
|
||||
"""Minimal stand-in for the ElevenLabs websocket that records sends."""
|
||||
|
||||
def __init__(self):
|
||||
self.state = State.OPEN
|
||||
self.sent: list[dict] = []
|
||||
|
||||
async def send(self, data: str):
|
||||
self.sent.append(json.loads(data))
|
||||
|
||||
|
||||
def _make_service() -> ElevenLabsTTSService:
|
||||
return ElevenLabsTTSService(
|
||||
api_key="test-key",
|
||||
settings=ElevenLabsTTSService.Settings(
|
||||
voice="test-voice",
|
||||
stability=0.55,
|
||||
similarity_boost=0.85,
|
||||
use_speaker_boost=True,
|
||||
speed=0.81,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_keepalive_does_not_stamp_context_before_init():
|
||||
"""During the pre-init window the keepalive must not stamp the new context_id."""
|
||||
service = _make_service()
|
||||
ws = _FakeWebSocket()
|
||||
service._websocket = ws
|
||||
|
||||
# Simulate the start of an LLM turn: TTSService sets the turn context id on
|
||||
# LLMFullResponseStartFrame, before run_tts sends the voice_settings init.
|
||||
service._turn_context_id = "ctx-1"
|
||||
service._playing_context_id = None
|
||||
assert "ctx-1" not in service._context_init_sent
|
||||
|
||||
await service._send_keepalive()
|
||||
|
||||
# Context-less keepalive: the real context-init stays the context's first
|
||||
# message, so ElevenLabs won't reject it with 1008.
|
||||
assert ws.sent == [{"text": ""}]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_keepalive_stamps_context_after_init():
|
||||
"""Once the context-init has been sent, the keepalive targets that context."""
|
||||
service = _make_service()
|
||||
ws = _FakeWebSocket()
|
||||
service._websocket = ws
|
||||
service._turn_context_id = "ctx-1"
|
||||
service._playing_context_id = None
|
||||
# run_tts records the context once its voice_settings init has gone out.
|
||||
service._context_init_sent.add("ctx-1")
|
||||
|
||||
await service._send_keepalive()
|
||||
|
||||
assert ws.sent == [{"text": "", "context_id": "ctx-1"}]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_keepalive_without_active_context_sends_empty():
|
||||
"""With no active context, the keepalive sends a plain empty message."""
|
||||
service = _make_service()
|
||||
ws = _FakeWebSocket()
|
||||
service._websocket = ws
|
||||
service._turn_context_id = None
|
||||
service._playing_context_id = None
|
||||
|
||||
await service._send_keepalive()
|
||||
|
||||
assert ws.sent == [{"text": ""}]
|
||||
|
||||
Reference in New Issue
Block a user