From 384f80983ffab845f25d9d2be9ede5f505354bab Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Tue, 15 Apr 2025 21:29:19 -0400 Subject: [PATCH] Added word/timestamp pairs to ElevenLabsHttpTTSService --- CHANGELOG.md | 4 +- src/pipecat/services/elevenlabs/tts.py | 165 ++++++++++++++++++++++--- 2 files changed, 149 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 80b1369d5..d3333c27d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 you to control aggregator settings. You can now pass these arguments when creating aggregator pairs with `create_context_aggregator()`. +- Added word/timestamp pairs to `ElevenLabsHttpTTSService`. + - It is now possible to disable `SoundfileMixer` when created. You can then use `MixerEnableFrame` to dynamically enable it when necessary. @@ -55,7 +57,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Fixed an issue in `SmallWebRTCTransport` where an error was thrown if the client did not create a video transceiver. -- Fixed an issue where LLM input parameters were not working and applied correctly in `GoogleVertexLLMService`, causing +- Fixed an issue where LLM input parameters were not working and applied correctly in `GoogleVertexLLMService`, causing unexpected behavior during inference. ## [0.0.63] - 2025-04-11 diff --git a/src/pipecat/services/elevenlabs/tts.py b/src/pipecat/services/elevenlabs/tts.py index cc9a72889..d77e4199c 100644 --- a/src/pipecat/services/elevenlabs/tts.py +++ b/src/pipecat/services/elevenlabs/tts.py @@ -25,7 +25,7 @@ from pipecat.frames.frames import ( TTSStoppedFrame, ) from pipecat.processors.frame_processor import FrameDirection -from pipecat.services.tts_service import InterruptibleWordTTSService, TTSService +from pipecat.services.tts_service import InterruptibleWordTTSService, WordTTSService from pipecat.transcriptions.language import Language # See .env.example for ElevenLabs configuration needed @@ -441,8 +441,8 @@ class ElevenLabsTTSService(InterruptibleWordTTSService): logger.error(f"{self} exception: {e}") -class ElevenLabsHttpTTSService(TTSService): - """ElevenLabs Text-to-Speech service using HTTP streaming. +class ElevenLabsHttpTTSService(WordTTSService): + """ElevenLabs Text-to-Speech service using HTTP streaming with word timestamps. Args: api_key: ElevenLabs API key @@ -475,7 +475,13 @@ class ElevenLabsHttpTTSService(TTSService): params: InputParams = InputParams(), **kwargs, ): - super().__init__(sample_rate=sample_rate, **kwargs) + super().__init__( + aggregate_sentences=True, + push_text_frames=False, + push_stop_frames=True, + sample_rate=sample_rate, + **kwargs, + ) self._api_key = api_key self._base_url = base_url @@ -498,28 +504,109 @@ class ElevenLabsHttpTTSService(TTSService): self._output_format = "" # initialized in start() self._voice_settings = self._set_voice_settings() + # Track cumulative time to properly sequence word timestamps across utterances + self._cumulative_time = 0 + self._started = False + + def language_to_service_language(self, language: Language) -> Optional[str]: + """Convert pipecat Language to ElevenLabs language code.""" + return language_to_elevenlabs_language(language) + def can_generate_metrics(self) -> bool: + """Indicate that this service can generate usage metrics.""" return True def _set_voice_settings(self): return build_elevenlabs_voice_settings(self._settings) async def start(self, frame: StartFrame): + """Initialize the service upon receiving a StartFrame.""" await super().start(frame) self._output_format = output_format_from_sample_rate(self.sample_rate) + self._cumulative_time = 0 + self._started = False - async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: - """Generate speech from text using ElevenLabs streaming API. + async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): + await super().push_frame(frame, direction) + if isinstance(frame, (StartInterruptionFrame, TTSStoppedFrame)): + # Reset timing on interruption or stop + self._started = False + self._cumulative_time = 0 + if isinstance(frame, TTSStoppedFrame): + await self.add_word_timestamps([("LLMFullResponseEndFrame", 0), ("Reset", 0)]) + + def calculate_word_times(self, alignment_info: Mapping[str, Any]) -> List[Tuple[str, float]]: + """Calculate word timing from character alignment data. + + Example input data: + { + "characters": [" ", "H", "e", "l", "l", "o", " ", "w", "o", "r", "l", "d"], + "character_start_times_seconds": [0.0, 0.1, 0.15, 0.2, 0.25, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9], + "character_end_times_seconds": [0.1, 0.15, 0.2, 0.25, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0] + } + + Would produce word times (with cumulative_time=0): + [("Hello", 0.1), ("world", 0.5)] Args: - text: The text to convert to speech + alignment_info: Character timing data from ElevenLabs + + Returns: + List of (word, timestamp) pairs + """ + chars = alignment_info.get("characters", []) + char_start_times = alignment_info.get("character_start_times_seconds", []) + + if not chars or not char_start_times or len(chars) != len(char_start_times): + logger.warning( + f"Invalid alignment data: chars={len(chars)}, times={len(char_start_times)}" + ) + return [] + + # Build the words and find their start times + words = [] + word_start_times = [] + current_word = "" + first_char_idx = -1 + + for i, char in enumerate(chars): + if char == " ": + if current_word: # Only add non-empty words + words.append(current_word) + # Use time of the first character of the word, offset by cumulative time + word_start_times.append( + self._cumulative_time + char_start_times[first_char_idx] + ) + current_word = "" + first_char_idx = -1 + else: + if not current_word: # This is the first character of a new word + first_char_idx = i + current_word += char + + # Don't forget the last word if there's no trailing space + if current_word and first_char_idx >= 0: + words.append(current_word) + word_start_times.append(self._cumulative_time + char_start_times[first_char_idx]) + + # Create word-time pairs + word_times = list(zip(words, word_start_times)) + + return word_times + + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: + """Generate speech from text using ElevenLabs streaming API with timestamps. + + Args: + text: Text to convert to speech Yields: - Frames containing audio data and status information + Audio and control frames """ logger.debug(f"{self}: Generating TTS [{text}]") - url = f"{self._base_url}/v1/text-to-speech/{self._voice_id}/stream" + # Use the with-timestamps endpoint + url = f"{self._base_url}/v1/text-to-speech/{self._voice_id}/stream/with-timestamps" payload: Dict[str, Union[str, Dict[str, Union[float, bool]]]] = { "text": text, @@ -550,8 +637,6 @@ class ElevenLabsHttpTTSService(TTSService): if self._settings["optimize_streaming_latency"] is not None: params["optimize_streaming_latency"] = self._settings["optimize_streaming_latency"] - logger.debug(f"{self} ElevenLabs request - payload: {payload}, params: {params}") - try: await self.start_ttfb_metrics() @@ -566,17 +651,59 @@ class ElevenLabsHttpTTSService(TTSService): await self.start_tts_usage_metrics(text) - # Process the streaming response - CHUNK_SIZE = 1024 + # Start TTS sequence if not already started + if not self._started: + self.start_word_timestamps() + yield TTSStartedFrame() + self._started = True + + # Track the duration of this utterance based on the last character's end time + utterance_duration = 0 + async for line in response.content: + line_str = line.decode("utf-8").strip() + if not line_str: + continue + + try: + # Parse the JSON object + data = json.loads(line_str) + + # Process audio if present + if data and "audio_base64" in data: + await self.stop_ttfb_metrics() + audio = base64.b64decode(data["audio_base64"]) + yield TTSAudioRawFrame(audio, self.sample_rate, 1) + + # Process alignment if present + if data and "alignment" in data: + alignment = data["alignment"] + if alignment: # Ensure alignment is not None + # Get end time of the last character in this chunk + char_end_times = alignment.get("character_end_times_seconds", []) + if char_end_times: + chunk_end_time = char_end_times[-1] + # Update to the longest end time seen so far + utterance_duration = max(utterance_duration, chunk_end_time) + + # Calculate word timestamps + word_times = self.calculate_word_times(alignment) + if word_times: + await self.add_word_timestamps(word_times) + except json.JSONDecodeError as e: + logger.warning(f"Failed to parse JSON from stream: {e}") + continue + except Exception as e: + logger.error(f"Error processing response: {e}", exc_info=True) + continue + + # After processing all chunks, add the total utterance duration + # to the cumulative time to ensure next utterance starts after this one + if utterance_duration > 0: + self._cumulative_time += utterance_duration - yield TTSStartedFrame() - async for chunk in response.content.iter_chunked(CHUNK_SIZE): - if len(chunk) > 0: - await self.stop_ttfb_metrics() - yield TTSAudioRawFrame(chunk, self.sample_rate, 1) except Exception as e: logger.error(f"Error in run_tts: {e}") yield ErrorFrame(error=str(e)) finally: await self.stop_ttfb_metrics() - yield TTSStoppedFrame() + # Let the parent class handle TTSStoppedFrame