Compare commits
2 Commits
copilot/vs
...
cb/elevenl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
40605aef38 | ||
|
|
c246b54d4d |
@@ -479,6 +479,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
self._websocket = await websocket_connect(
|
||||
url, max_size=16 * 1024 * 1024, additional_headers={"xi-api-key": self._api_key}
|
||||
)
|
||||
logger.debug(f"{self}: WebSocket connected to ElevenLabs - ready to receive audio")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} initialization error: {e}")
|
||||
@@ -513,7 +514,9 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
|
||||
# Close the current context when interrupted without closing the websocket
|
||||
if self._context_id and self._websocket:
|
||||
logger.trace(f"Closing context {self._context_id} due to interruption")
|
||||
logger.debug(
|
||||
f"{self}: Closing context {self._context_id} due to interruption - this will stop audio stream"
|
||||
)
|
||||
try:
|
||||
# ElevenLabs requires that Pipecat manages the contexts and closes them
|
||||
# when they're not longer in use. Since a StartInterruptionFrame is pushed
|
||||
@@ -524,6 +527,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
await self._websocket.send(
|
||||
json.dumps({"context_id": self._context_id, "close_context": True})
|
||||
)
|
||||
logger.debug(f"{self}: Sent close_context message for {self._context_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing context on interruption: {e}")
|
||||
self._context_id = None
|
||||
@@ -534,15 +538,28 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
async for message in WatchdogAsyncIterator(
|
||||
self._get_websocket(), manager=self.task_manager
|
||||
):
|
||||
# Log raw message structure for debugging (truncated for readability)
|
||||
message_preview = message[:500] + "..." if len(message) > 500 else message
|
||||
logger.debug(f"{self}: Raw WebSocket message preview: {message_preview}")
|
||||
|
||||
msg = json.loads(message)
|
||||
|
||||
received_ctx_id = msg.get("contextId")
|
||||
|
||||
# Log the message structure without the large audio data
|
||||
msg_structure = {
|
||||
k: (f"<{len(v)} chars>" if k == "audio" and isinstance(v, str) else v)
|
||||
for k, v in msg.items()
|
||||
}
|
||||
logger.debug(f"{self}: Parsed message structure: {msg_structure}")
|
||||
|
||||
# Handle final messages first, regardless of context availability
|
||||
# At the moment, this message is received AFTER the close_context message is
|
||||
# sent, so it doesn't serve any functional purpose. For now, we'll just log it.
|
||||
if msg.get("isFinal") is True:
|
||||
logger.trace(f"Received final message for context {received_ctx_id}")
|
||||
logger.debug(
|
||||
f"{self}: Received final message for context {received_ctx_id} - audio stream ended"
|
||||
)
|
||||
continue
|
||||
|
||||
# Check if this message belongs to the current context.
|
||||
@@ -564,7 +581,44 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
self.start_word_timestamps()
|
||||
|
||||
audio = base64.b64decode(msg["audio"])
|
||||
|
||||
# Targeted logging for audio debugging
|
||||
audio_size = len(audio)
|
||||
|
||||
# More comprehensive audio analysis
|
||||
if audio_size > 0:
|
||||
# Sample first and last 8 bytes
|
||||
first_8 = audio[:8].hex() if audio_size >= 8 else audio.hex()
|
||||
last_8 = audio[-8:].hex() if audio_size >= 8 else ""
|
||||
|
||||
# Statistical analysis
|
||||
non_zero_count = sum(1 for b in audio if b != 0)
|
||||
non_zero_percent = (non_zero_count / audio_size * 100) if audio_size > 0 else 0
|
||||
|
||||
# Check for common silence patterns
|
||||
all_zeros = all(b == 0 for b in audio)
|
||||
mostly_zeros = non_zero_percent < 1.0 # Less than 1% non-zero
|
||||
|
||||
# Sample some values to check amplitude range
|
||||
max_val = max(audio) if audio else 0
|
||||
min_val = min(audio) if audio else 0
|
||||
|
||||
logger.debug(
|
||||
f"{self}: Audio received - size: {audio_size} bytes, "
|
||||
f"first_8: {first_8}, last_8: {last_8}, "
|
||||
f"non_zero: {non_zero_percent:.1f}%, all_zeros: {all_zeros}, "
|
||||
f"mostly_zeros: {mostly_zeros}, range: {min_val}-{max_val}, "
|
||||
f"context: {received_ctx_id}, cumulative_time: {self._cumulative_time:.3f}s"
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"{self}: Received empty audio data for context {received_ctx_id}"
|
||||
)
|
||||
|
||||
frame = TTSAudioRawFrame(audio, self.sample_rate, 1)
|
||||
logger.debug(
|
||||
f"[ELEVENLABS_AUDIO] Creating TTSAudioRawFrame with {len(audio)} bytes for context {received_ctx_id}"
|
||||
)
|
||||
await self.append_to_audio_context(received_ctx_id, frame)
|
||||
|
||||
if msg.get("alignment"):
|
||||
@@ -620,6 +674,9 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
"""Send text to the WebSocket for synthesis."""
|
||||
if self._websocket and self._context_id:
|
||||
msg = {"text": text, "context_id": self._context_id}
|
||||
logger.debug(
|
||||
f"{self}: Sending text to ElevenLabs - length: {len(text)}, context: {self._context_id}"
|
||||
)
|
||||
await self._websocket.send(json.dumps(msg))
|
||||
|
||||
@traced_tts
|
||||
@@ -973,6 +1030,9 @@ class ElevenLabsHttpTTSService(WordTTSService):
|
||||
if data and "audio_base64" in data:
|
||||
await self.stop_ttfb_metrics()
|
||||
audio = base64.b64decode(data["audio_base64"])
|
||||
logger.debug(
|
||||
f"[ELEVENLABS_AUDIO] Yielding TTSAudioRawFrame with {len(audio)} bytes"
|
||||
)
|
||||
yield TTSAudioRawFrame(audio, self.sample_rate, 1)
|
||||
|
||||
# Process alignment if present
|
||||
|
||||
@@ -333,6 +333,9 @@ class TTSService(AIService):
|
||||
elif isinstance(frame, TTSUpdateSettingsFrame):
|
||||
await self._update_settings(frame.settings)
|
||||
elif isinstance(frame, BotStoppedSpeakingFrame):
|
||||
logger.warning(
|
||||
f"[TTS_RESUME] {self.__class__.__name__} received BotStoppedSpeakingFrame"
|
||||
)
|
||||
await self._maybe_resume_frame_processing()
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
@@ -376,11 +379,15 @@ class TTSService(AIService):
|
||||
|
||||
async def _maybe_pause_frame_processing(self):
|
||||
if self._processing_text and self._pause_frame_processing:
|
||||
logger.warning(f"[TTS_PAUSE] {self.__class__.__name__} pausing frame processing")
|
||||
await self.pause_processing_frames()
|
||||
|
||||
async def _maybe_resume_frame_processing(self):
|
||||
if self._pause_frame_processing:
|
||||
logger.warning(f"[TTS_RESUME] {self.__class__.__name__} resuming frame processing")
|
||||
await self.resume_processing_frames()
|
||||
else:
|
||||
logger.debug(f"[TTS_RESUME] {self.__class__.__name__} resume called but not paused")
|
||||
|
||||
async def _process_text_frame(self, frame: TextFrame):
|
||||
text: Optional[str] = None
|
||||
|
||||
@@ -503,6 +503,9 @@ class BaseOutputTransport(FrameProcessor):
|
||||
num_channels=frame.num_channels,
|
||||
)
|
||||
chunk.transport_destination = self._destination
|
||||
logger.debug(
|
||||
f"[AUDIO_QUEUE] Putting {chunk.__class__.__name__} with {len(chunk.audio)} bytes into audio queue"
|
||||
)
|
||||
await self._audio_queue.put(chunk)
|
||||
self._audio_buffer = self._audio_buffer[self._audio_chunk_size :]
|
||||
|
||||
@@ -582,15 +585,18 @@ class BaseOutputTransport(FrameProcessor):
|
||||
async def _bot_stopped_speaking(self):
|
||||
"""Handle bot stopped speaking event."""
|
||||
if self._bot_speaking:
|
||||
logger.debug(
|
||||
f"Bot{f' [{self._destination}]' if self._destination else ''} stopped speaking"
|
||||
logger.warning(
|
||||
f"[BOT_STOPPED] Bot stopped speaking - sending BotStoppedSpeakingFrame upstream"
|
||||
)
|
||||
|
||||
downstream_frame = BotStoppedSpeakingFrame()
|
||||
downstream_frame.transport_destination = self._destination
|
||||
upstream_frame = BotStoppedSpeakingFrame()
|
||||
upstream_frame.transport_destination = self._destination
|
||||
|
||||
logger.debug(f"[BOT_STOPPED] Pushing downstream BotStoppedSpeakingFrame")
|
||||
await self._transport.push_frame(downstream_frame)
|
||||
logger.debug(f"[BOT_STOPPED] Pushing upstream BotStoppedSpeakingFrame")
|
||||
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)
|
||||
|
||||
self._bot_speaking = False
|
||||
@@ -598,6 +604,8 @@ class BaseOutputTransport(FrameProcessor):
|
||||
# Clean audio buffer (there could be tiny left overs if not multiple
|
||||
# to our output chunk size).
|
||||
self._audio_buffer = bytearray()
|
||||
else:
|
||||
logger.debug(f"[BOT_STOPPED] _bot_stopped_speaking called but bot was not speaking")
|
||||
|
||||
async def _handle_frame(self, frame: Frame):
|
||||
"""Handle various frame types with appropriate processing.
|
||||
@@ -624,12 +632,24 @@ class BaseOutputTransport(FrameProcessor):
|
||||
async def without_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
|
||||
while True:
|
||||
try:
|
||||
logger.debug(
|
||||
f"[AUDIO_QUEUE] Waiting for audio frame (timeout={vad_stop_secs}s)"
|
||||
)
|
||||
start_time = time.time()
|
||||
frame = await asyncio.wait_for(
|
||||
self._audio_queue.get(), timeout=vad_stop_secs
|
||||
)
|
||||
wait_time = time.time() - start_time
|
||||
logger.debug(
|
||||
f"[AUDIO_QUEUE] Got frame {frame.__class__.__name__} after {wait_time:.3f}s"
|
||||
)
|
||||
self._transport.reset_watchdog()
|
||||
yield frame
|
||||
except asyncio.TimeoutError:
|
||||
wait_time = time.time() - start_time
|
||||
logger.warning(
|
||||
f"[AUDIO_QUEUE] TIMEOUT after {wait_time:.3f}s - triggering bot_stopped_speaking"
|
||||
)
|
||||
self._transport.reset_watchdog()
|
||||
# Notify the bot stopped speaking upstream if necessary.
|
||||
await self._bot_stopped_speaking()
|
||||
|
||||
Reference in New Issue
Block a user