Refactoring ElevenLabs to send close_context as soon as the turn context is complete.
This commit is contained in:
@@ -558,7 +558,7 @@ class ElevenLabsTTSService(WebsocketTTSService):
|
||||
text_aggregation_mode=text_aggregation_mode,
|
||||
aggregate_sentences=aggregate_sentences,
|
||||
push_text_frames=False,
|
||||
push_stop_frames=True,
|
||||
push_stop_frames=False,
|
||||
pause_frame_processing=True,
|
||||
sample_rate=sample_rate,
|
||||
settings=default_settings,
|
||||
@@ -825,15 +825,15 @@ class ElevenLabsTTSService(WebsocketTTSService):
|
||||
await self._close_context(context_id)
|
||||
await super().on_audio_context_interrupted(context_id)
|
||||
|
||||
async def on_audio_context_completed(self, context_id: str):
|
||||
"""Close the ElevenLabs context after all audio has been played.
|
||||
async def on_turn_context_completed(self):
|
||||
"""Close the server-side context at end of turn.
|
||||
|
||||
ElevenLabs does not send a server-side signal when a context is
|
||||
exhausted, so Pipecat must explicitly close it with
|
||||
``close_context: True`` to free server-side resources.
|
||||
Sends close_context so isFinal arrives immediately after the last audio byte.
|
||||
"""
|
||||
await self._close_context(context_id)
|
||||
await super().on_audio_context_completed(context_id)
|
||||
context_id = self._turn_context_id
|
||||
await super().on_turn_context_completed()
|
||||
if context_id:
|
||||
await self._close_context(context_id)
|
||||
|
||||
async def _receive_messages(self):
|
||||
"""Handle incoming WebSocket messages from ElevenLabs."""
|
||||
@@ -843,25 +843,15 @@ class ElevenLabsTTSService(WebsocketTTSService):
|
||||
received_ctx_id = msg.get("contextId")
|
||||
|
||||
# Handle final messages first, regardless of context availability
|
||||
# At the moment, this message is received AFTER the close_context message is
|
||||
# sent, so it doesn't serve any functional purpose. For now, we'll just log it.
|
||||
if msg.get("isFinal") is True:
|
||||
logger.trace(f"Received final message for context {received_ctx_id}")
|
||||
continue
|
||||
|
||||
# Check if this message belongs to the current context.
|
||||
if not self.audio_context_available(received_ctx_id):
|
||||
if self.get_active_audio_context_id() == received_ctx_id:
|
||||
logger.debug(
|
||||
f"Received a delayed message, recreating the context: {received_ctx_id}"
|
||||
logger.debug(f"Received final message for context {received_ctx_id}")
|
||||
# In case of interruption, there is no audio context available, so we don’t need to do anything.
|
||||
if self.audio_context_available(received_ctx_id):
|
||||
await self.append_to_audio_context(
|
||||
received_ctx_id, TTSStoppedFrame(context_id=received_ctx_id)
|
||||
)
|
||||
await self.create_audio_context(received_ctx_id)
|
||||
else:
|
||||
# This can happen if a message is received _after_ we have closed a context
|
||||
# due to user interruption but _before_ the `isFinal` message for the context
|
||||
# is received.
|
||||
logger.debug(f"Ignoring message from unavailable context: {received_ctx_id}")
|
||||
continue
|
||||
await self.remove_audio_context(received_ctx_id)
|
||||
continue
|
||||
|
||||
if msg.get("audio"):
|
||||
audio = base64.b64decode(msg["audio"])
|
||||
|
||||
Reference in New Issue
Block a user