From a5e6886b80998d8f7c74161a393336bb2cdd590f Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Wed, 20 May 2026 08:59:01 -0400 Subject: [PATCH 1/2] Fix ElevenLabs keepalive racing context-init (1008 disconnects) The keepalive could fire for a new turn's context before that context's voice_settings context-init was sent, making the keepalive the context's first message (no voice_settings) and causing ElevenLabs to reject the later init with a 1008 policy violation. The keepalive now only targets a context once its context-init has been sent (tracked in _context_init_sent). --- changelog/4520.fixed.md | 1 + src/pipecat/services/elevenlabs/tts.py | 50 ++++++++++----- tests/test_elevenlabs_tts.py | 89 ++++++++++++++++++++++++++ 3 files changed, 124 insertions(+), 16 deletions(-) create mode 100644 changelog/4520.fixed.md diff --git a/changelog/4520.fixed.md b/changelog/4520.fixed.md new file mode 100644 index 000000000..f9c362178 --- /dev/null +++ b/changelog/4520.fixed.md @@ -0,0 +1 @@ +- Fixed a race in `ElevenLabsTTSService` where the periodic keepalive could be sent for a new turn's context before that context's `voice_settings` initialization message, causing ElevenLabs to close the WebSocket with a 1008 policy violation (`voice_settings field must be provided in the first message ...`). The keepalive now only targets a context once its context-init has been sent. diff --git a/src/pipecat/services/elevenlabs/tts.py b/src/pipecat/services/elevenlabs/tts.py index b8f6a9abf..d577dbf8d 100644 --- a/src/pipecat/services/elevenlabs/tts.py +++ b/src/pipecat/services/elevenlabs/tts.py @@ -594,6 +594,10 @@ class ElevenLabsTTSService(WebsocketTTSService): self._partial_word_start_time = 0.0 self._alignment_started_context_ids: set[str | None] = set() + # Context IDs whose context-init has been sent, so the keepalive knows + # which contexts are safe to target. + self._context_init_sent: set[str] = set() + # Context management for v1 multi API self._receive_task = None self._keepalive_task = None @@ -792,6 +796,7 @@ class ElevenLabsTTSService(WebsocketTTSService): finally: await self.remove_active_audio_context() self._websocket = None + self._context_init_sent.clear() await self._call_event_handler("on_disconnected") def _get_websocket(self): @@ -822,6 +827,7 @@ class ElevenLabsTTSService(WebsocketTTSService): self._partial_word = "" self._partial_word_start_time = 0.0 self._alignment_started_context_ids.discard(context_id) + self._context_init_sent.discard(context_id) async def on_audio_context_interrupted(self, context_id: str): """Close the ElevenLabs context when the bot is interrupted.""" @@ -914,26 +920,35 @@ class ElevenLabsTTSService(WebsocketTTSService): while True: await asyncio.sleep(KEEPALIVE_SLEEP) try: - if self._websocket and self._websocket.state is State.OPEN: - context_id = self.get_active_audio_context_id() - if context_id: - # Send keepalive with context ID to keep the connection alive - keepalive_message = { - "text": "", - "context_id": context_id, - } - logger.trace(f"Sending keepalive for context {context_id}") - else: - # It's possible to have a user interruption which clears the context - # without generating a new TTS response. In this case, we'll just send - # an empty message to keep the connection alive. - keepalive_message = {"text": ""} - logger.trace("Sending keepalive without context") - await self._websocket.send(json.dumps(keepalive_message)) + await self._send_keepalive() except websockets.ConnectionClosed as e: logger.warning(f"{self} keepalive error: {e}") break + async def _send_keepalive(self): + """Send a single keepalive message to keep the WebSocket connection alive. + + Only stamps a ``context_id`` once its context-init (carrying + ``voice_settings``) has been sent. Otherwise the keepalive would be the + context's first message, with no ``voice_settings``, and ElevenLabs would + reject the later context-init with a 1008 policy violation. A context-less + keepalive is sufficient until the context-init is sent. + """ + if not self._websocket or self._websocket.state is not State.OPEN: + return + + context_id = self.get_active_audio_context_id() + if context_id and context_id in self._context_init_sent: + # The context's voice_settings context-init has been sent, so it's + # safe to keep that context alive. + keepalive_message = {"text": "", "context_id": context_id} + else: + # No active context, or the active context's context-init hasn't been + # sent yet. A context-less keepalive keeps the connection alive without + # opening the context prematurely. + keepalive_message = {"text": ""} + await self._websocket.send(json.dumps(keepalive_message)) + async def _send_text(self, text: str, context_id: str): """Send text to the WebSocket for synthesis.""" if self._websocket and context_id: @@ -980,6 +995,9 @@ class ElevenLabsTTSService(WebsocketTTSService): locator.model_dump() for locator in self._pronunciation_dictionary_locators ] + # Mark the context-init as sent so the keepalive may now + # target this context_id. + self._context_init_sent.add(context_id) await self._websocket.send(json.dumps(msg)) logger.trace(f"Created new context {context_id}") diff --git a/tests/test_elevenlabs_tts.py b/tests/test_elevenlabs_tts.py index 89bc94d12..6a1a91bba 100644 --- a/tests/test_elevenlabs_tts.py +++ b/tests/test_elevenlabs_tts.py @@ -6,9 +6,14 @@ """Tests for ElevenLabs TTS alignment handling.""" +import json from typing import Any +import pytest +from websockets.protocol import State + from pipecat.services.elevenlabs.tts import ( + ElevenLabsTTSService, _select_alignment, _strip_utterance_leading_spaces, calculate_word_times, @@ -200,3 +205,87 @@ def test_select_alignment_works_with_http_field_names(): ) assert selected is not None assert selected["characters"] == list(" Hi") + + +# --------------------------------------------------------------------------- +# Keepalive vs context-init race +# +# The keepalive must only stamp a context_id once its context-init (carrying +# voice_settings) has been sent. Stamping it earlier makes the keepalive the +# context's first message, with no voice_settings, and ElevenLabs rejects the +# later context-init with a 1008 policy violation. +# --------------------------------------------------------------------------- + + +class _FakeWebSocket: + """Minimal stand-in for the ElevenLabs websocket that records sends.""" + + def __init__(self): + self.state = State.OPEN + self.sent: list[dict] = [] + + async def send(self, data: str): + self.sent.append(json.loads(data)) + + +def _make_service() -> ElevenLabsTTSService: + return ElevenLabsTTSService( + api_key="test-key", + settings=ElevenLabsTTSService.Settings( + voice="test-voice", + stability=0.55, + similarity_boost=0.85, + use_speaker_boost=True, + speed=0.81, + ), + ) + + +@pytest.mark.asyncio +async def test_keepalive_does_not_stamp_context_before_init(): + """During the pre-init window the keepalive must not stamp the new context_id.""" + service = _make_service() + ws = _FakeWebSocket() + service._websocket = ws + + # Simulate the start of an LLM turn: TTSService sets the turn context id on + # LLMFullResponseStartFrame, before run_tts sends the voice_settings init. + service._turn_context_id = "ctx-1" + service._playing_context_id = None + assert "ctx-1" not in service._context_init_sent + + await service._send_keepalive() + + # Context-less keepalive: the real context-init stays the context's first + # message, so ElevenLabs won't reject it with 1008. + assert ws.sent == [{"text": ""}] + + +@pytest.mark.asyncio +async def test_keepalive_stamps_context_after_init(): + """Once the context-init has been sent, the keepalive targets that context.""" + service = _make_service() + ws = _FakeWebSocket() + service._websocket = ws + service._turn_context_id = "ctx-1" + service._playing_context_id = None + # run_tts records the context once its voice_settings init has gone out. + service._context_init_sent.add("ctx-1") + + await service._send_keepalive() + + assert ws.sent == [{"text": "", "context_id": "ctx-1"}] + + +@pytest.mark.asyncio +async def test_keepalive_without_active_context_sends_empty(): + """With no active context, the keepalive sends a plain empty message.""" + service = _make_service() + ws = _FakeWebSocket() + service._websocket = ws + service._turn_context_id = None + service._playing_context_id = None + + await service._send_keepalive() + + assert ws.sent == [{"text": ""}] From 70773bce0a6ef48998329b591ae32a6009473d74 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Wed, 20 May 2026 09:07:55 -0400 Subject: [PATCH 2/2] Add changelog for PR #4527 --- changelog/{4520.fixed.md => 4527.fixed.md} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename changelog/{4520.fixed.md => 4527.fixed.md} (100%) diff --git a/changelog/4520.fixed.md b/changelog/4527.fixed.md similarity index 100% rename from changelog/4520.fixed.md rename to changelog/4527.fixed.md