Merge pull request #4433 from pipecat-ai/filipi/refactoring_elevenlabs

Refactoring ElevenLabs to send close_context as soon as the turn context is complete.
This commit is contained in:
Filipi da Silva Fuchter
2026-05-07 13:10:36 -03:00
committed by GitHub
2 changed files with 25 additions and 25 deletions

View File

@@ -0,0 +1 @@
- `ElevenLabsTTSService` now sends `close_context` to the server as soon as the turn is complete (on `on_turn_context_completed`) rather than waiting until all audio has finished playing back. The `isFinal` message from ElevenLabs is now used to signal `TTSStoppedFrame` and clean up the audio context, improving turn transition timing.

View File

@@ -558,7 +558,7 @@ class ElevenLabsTTSService(WebsocketTTSService):
text_aggregation_mode=text_aggregation_mode,
aggregate_sentences=aggregate_sentences,
push_text_frames=False,
push_stop_frames=True,
push_stop_frames=False,
pause_frame_processing=True,
sample_rate=sample_rate,
settings=default_settings,
@@ -660,6 +660,7 @@ class ElevenLabsTTSService(WebsocketTTSService):
if audio_contexts:
for ctx_id in audio_contexts:
await self._close_context(ctx_id)
self._reset_alignment_state(ctx_id)
if not url_changed:
# Reconnect applies all settings; only warn about fields not handled
@@ -815,6 +816,8 @@ class ElevenLabsTTSService(WebsocketTTSService):
)
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
def _reset_alignment_state(self, context_id: str):
self._cumulative_time = 0.0
self._partial_word = ""
self._partial_word_start_time = 0.0
@@ -823,18 +826,24 @@ class ElevenLabsTTSService(WebsocketTTSService):
async def on_audio_context_interrupted(self, context_id: str):
"""Close the ElevenLabs context when the bot is interrupted."""
await self._close_context(context_id)
self._reset_alignment_state(context_id)
await super().on_audio_context_interrupted(context_id)
async def on_audio_context_completed(self, context_id: str):
"""Close the ElevenLabs context after all audio has been played.
ElevenLabs does not send a server-side signal when a context is
exhausted, so Pipecat must explicitly close it with
``close_context: True`` to free server-side resources.
"""
await self._close_context(context_id)
"""Reset alignment state after all audio for the context has played."""
self._reset_alignment_state(context_id)
await super().on_audio_context_completed(context_id)
async def on_turn_context_completed(self):
"""Close the server-side context at end of turn.
Sends close_context so isFinal arrives immediately after the last audio byte.
"""
context_id = self._turn_context_id
await super().on_turn_context_completed()
if context_id:
await self._close_context(context_id)
async def _receive_messages(self):
"""Handle incoming WebSocket messages from ElevenLabs."""
async for message in self._get_websocket():
@@ -843,25 +852,15 @@ class ElevenLabsTTSService(WebsocketTTSService):
received_ctx_id = msg.get("contextId")
# Handle final messages first, regardless of context availability
# At the moment, this message is received AFTER the close_context message is
# sent, so it doesn't serve any functional purpose. For now, we'll just log it.
if msg.get("isFinal") is True:
logger.trace(f"Received final message for context {received_ctx_id}")
continue
# Check if this message belongs to the current context.
if not self.audio_context_available(received_ctx_id):
if self.get_active_audio_context_id() == received_ctx_id:
logger.debug(
f"Received a delayed message, recreating the context: {received_ctx_id}"
logger.debug(f"Received final message for context {received_ctx_id}")
# In case of interruption, there is no audio context available, so we dont need to do anything.
if self.audio_context_available(received_ctx_id):
await self.append_to_audio_context(
received_ctx_id, TTSStoppedFrame(context_id=received_ctx_id)
)
await self.create_audio_context(received_ctx_id)
else:
# This can happen if a message is received _after_ we have closed a context
# due to user interruption but _before_ the `isFinal` message for the context
# is received.
logger.debug(f"Ignoring message from unavailable context: {received_ctx_id}")
continue
await self.remove_audio_context(received_ctx_id)
continue
if msg.get("audio"):
audio = base64.b64decode(msg["audio"])