From ac7b06faba0ad11a7148a8fbabd50e41b3a16879 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 16 Dec 2025 13:18:22 -0800 Subject: [PATCH] WordTTSService: make sure word timestamps are always started --- changelog/3240.changed.md | 2 ++ changelog/3240.fixed.md | 2 ++ src/pipecat/services/cartesia/tts.py | 2 +- src/pipecat/services/elevenlabs/tts.py | 4 ++-- src/pipecat/services/gradium/tts.py | 2 +- src/pipecat/services/hume/tts.py | 2 +- src/pipecat/services/inworld/tts.py | 4 ++-- src/pipecat/services/rime/tts.py | 2 +- src/pipecat/services/tts_service.py | 26 ++++++++++++++++++++------ 9 files changed, 32 insertions(+), 14 deletions(-) create mode 100644 changelog/3240.changed.md create mode 100644 changelog/3240.fixed.md diff --git a/changelog/3240.changed.md b/changelog/3240.changed.md new file mode 100644 index 000000000..a5ee377c9 --- /dev/null +++ b/changelog/3240.changed.md @@ -0,0 +1,2 @@ +- ⚠️ Breaking change: `WordTTSService.start_word_timestamps()` and + `WordTTSService.reset_word_timestamps()` are now async. diff --git a/changelog/3240.fixed.md b/changelog/3240.fixed.md new file mode 100644 index 000000000..caff36b55 --- /dev/null +++ b/changelog/3240.fixed.md @@ -0,0 +1,2 @@ +- Fixed a TTS service word-timestamp issue that could cause generated + `TTSTextFrame` instances to have an incorrect pts (`pts = -1`). diff --git a/src/pipecat/services/cartesia/tts.py b/src/pipecat/services/cartesia/tts.py index 9a47d9237..536c64776 100644 --- a/src/pipecat/services/cartesia/tts.py +++ b/src/pipecat/services/cartesia/tts.py @@ -554,7 +554,7 @@ class CartesiaTTSService(AudioContextWordTTSService): await self.add_word_timestamps(processed_timestamps) elif msg["type"] == "chunk": await self.stop_ttfb_metrics() - self.start_word_timestamps() + await self.start_word_timestamps() frame = TTSAudioRawFrame( audio=base64.b64decode(msg["data"]), sample_rate=self.sample_rate, diff --git a/src/pipecat/services/elevenlabs/tts.py b/src/pipecat/services/elevenlabs/tts.py index 5314c3766..d407c68ba 100644 --- a/src/pipecat/services/elevenlabs/tts.py +++ b/src/pipecat/services/elevenlabs/tts.py @@ -617,7 +617,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService): if msg.get("audio"): await self.stop_ttfb_metrics() - self.start_word_timestamps() + await self.start_word_timestamps() audio = base64.b64decode(msg["audio"]) frame = TTSAudioRawFrame(audio, self.sample_rate, 1) @@ -1047,7 +1047,7 @@ class ElevenLabsHttpTTSService(WordTTSService): # Start TTS sequence if not already started if not self._started: - self.start_word_timestamps() + await self.start_word_timestamps() yield TTSStartedFrame() self._started = True diff --git a/src/pipecat/services/gradium/tts.py b/src/pipecat/services/gradium/tts.py index 8a04f2db6..7936e8c2b 100644 --- a/src/pipecat/services/gradium/tts.py +++ b/src/pipecat/services/gradium/tts.py @@ -253,7 +253,7 @@ class GradiumTTSService(InterruptibleWordTTSService): if msg["type"] == "audio": # Process audio chunk await self.stop_ttfb_metrics() - self.start_word_timestamps() + await self.start_word_timestamps() frame = TTSAudioRawFrame( audio=base64.b64decode(msg["audio"]), sample_rate=self.sample_rate, diff --git a/src/pipecat/services/hume/tts.py b/src/pipecat/services/hume/tts.py index 6571bff0f..38e5b9fd6 100644 --- a/src/pipecat/services/hume/tts.py +++ b/src/pipecat/services/hume/tts.py @@ -245,7 +245,7 @@ class HumeTTSService(WordTTSService): # Start TTS sequence if not already started if not self._started: - self.start_word_timestamps() + await self.start_word_timestamps() yield TTSStartedFrame() self._started = True diff --git a/src/pipecat/services/inworld/tts.py b/src/pipecat/services/inworld/tts.py index 19c3283f7..abe41bb1a 100644 --- a/src/pipecat/services/inworld/tts.py +++ b/src/pipecat/services/inworld/tts.py @@ -243,7 +243,7 @@ class InworldHttpTTSService(WordTTSService): await self.start_ttfb_metrics() if not self._started: - self.start_word_timestamps() + await self.start_word_timestamps() yield TTSStartedFrame() self._started = True @@ -699,7 +699,7 @@ class InworldTTSService(AudioContextWordTTSService): if audio_b64: await self.stop_ttfb_metrics() - self.start_word_timestamps() + await self.start_word_timestamps() audio = base64.b64decode(audio_b64) if len(audio) > 44 and audio.startswith(b"RIFF"): audio = audio[44:] diff --git a/src/pipecat/services/rime/tts.py b/src/pipecat/services/rime/tts.py index 7f95ffd21..a959fd8c0 100644 --- a/src/pipecat/services/rime/tts.py +++ b/src/pipecat/services/rime/tts.py @@ -385,7 +385,7 @@ class RimeTTSService(AudioContextWordTTSService): if msg["type"] == "chunk": # Process audio chunk await self.stop_ttfb_metrics() - self.start_word_timestamps() + await self.start_word_timestamps() frame = TTSAudioRawFrame( audio=base64.b64decode(msg["data"]), sample_rate=self.sample_rate, diff --git a/src/pipecat/services/tts_service.py b/src/pipecat/services/tts_service.py index ca15cb2c0..39a6078e0 100644 --- a/src/pipecat/services/tts_service.py +++ b/src/pipecat/services/tts_service.py @@ -651,15 +651,21 @@ class WordTTSService(TTSService): """ super().__init__(**kwargs) self._initial_word_timestamp = -1 + self._initial_word_times = [] self._words_task = None self._llm_response_started: bool = False - def start_word_timestamps(self): + async def start_word_timestamps(self): """Start tracking word timestamps from the current time.""" if self._initial_word_timestamp == -1: self._initial_word_timestamp = self.get_clock().get_time() + # If we cached some initial word times (because we didn't receive + # audio), let's add them now. + if self._initial_word_times: + await self._add_word_timestamps(self._initial_word_times) + self._initial_word_times = [] - def reset_word_timestamps(self): + async def reset_word_timestamps(self): """Reset word timestamp tracking.""" self._initial_word_timestamp = -1 @@ -669,8 +675,12 @@ class WordTTSService(TTSService): Args: word_times: List of (word, timestamp) tuples where timestamp is in seconds. """ - for word, timestamp in word_times: - await self._words_queue.put((word, seconds_to_nanoseconds(timestamp))) + if self._initial_word_timestamp == -1: + # Cache word timestamps and don't add them until we have started + # (i.e. we have some audio). + self._initial_word_times.extend(word_times) + else: + await self._add_word_timestamps(word_times) async def start(self, frame: StartFrame): """Start the word TTS service. @@ -716,7 +726,7 @@ class WordTTSService(TTSService): async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection): await super()._handle_interruption(frame, direction) self._llm_response_started = False - self.reset_word_timestamps() + await self.reset_word_timestamps() def _create_words_task(self): if not self._words_task: @@ -728,13 +738,17 @@ class WordTTSService(TTSService): await self.cancel_task(self._words_task) self._words_task = None + async def _add_word_timestamps(self, word_times: List[Tuple[str, float]]): + for word, timestamp in word_times: + await self._words_queue.put((word, seconds_to_nanoseconds(timestamp))) + async def _words_task_handler(self): last_pts = 0 while True: frame = None (word, timestamp) = await self._words_queue.get() if word == "Reset" and timestamp == 0: - self.reset_word_timestamps() + await self.reset_word_timestamps() if self._llm_response_started: self._llm_response_started = False frame = LLMFullResponseEndFrame()