diff --git a/changelog/4306.fixed.md b/changelog/4306.fixed.md new file mode 100644 index 000000000..fa83811b3 --- /dev/null +++ b/changelog/4306.fixed.md @@ -0,0 +1 @@ +- Fixed Azure TTS last word being missed by observers and RTVI UI. The completion signal was racing with word timestamp processing, causing the final word's `TTSTextFrame` to arrive after `TTSStoppedFrame`. Completion is now routed through the word boundary queue to ensure all words are processed before signaling stream end. diff --git a/src/pipecat/services/azure/tts.py b/src/pipecat/services/azure/tts.py index 4e6879d1a..75f797aec 100644 --- a/src/pipecat/services/azure/tts.py +++ b/src/pipecat/services/azure/tts.py @@ -540,14 +540,25 @@ class AzureTTSService(TTSService, AzureBaseTTSService): self._last_timestamp = timestamp async def _word_processor_task_handler(self): - """Process word timestamps from the queue and call add_word_timestamps.""" + """Process word timestamps from the queue and call add_word_timestamps. + + Also handles a None sentinel from _handle_completed: once all pending + words have been drained, it signals audio stream completion via + _audio_queue so that run_tts exits only after the last word has been + processed. + """ while True: try: - word, timestamp_seconds = await self._word_boundary_queue.get() - if self._current_context_id: - await self.add_word_timestamps( - [(word, timestamp_seconds)], self._current_context_id - ) + item = await self._word_boundary_queue.get() + if item is None: + # All words drained — now signal audio completion. + self._audio_queue.put_nowait(None) + else: + word, timestamp_seconds = item + if self._current_context_id: + await self.add_word_timestamps( + [(word, timestamp_seconds)], self._current_context_id + ) self._word_boundary_queue.task_done() except asyncio.CancelledError: break @@ -569,17 +580,21 @@ class AzureTTSService(TTSService, AzureBaseTTSService): Args: evt: Completion event from Azure Speech SDK. """ + # Store duration for cumulative offset calculation + if evt.result and evt.result.audio_duration: + self._current_sentence_duration = evt.result.audio_duration.total_seconds() + # Flush any pending word before completing if self._last_word is not None: self._word_boundary_queue.put_nowait((self._last_word, self._last_timestamp)) self._last_word = None self._last_timestamp = None - # Store duration for cumulative offset calculation - if evt.result and evt.result.audio_duration: - self._current_sentence_duration = evt.result.audio_duration.total_seconds() - - self._audio_queue.put_nowait(None) # Signal completion + # Route completion through the word boundary queue so the word processor + # task drains all pending words before signaling audio stream completion. + # Without this, the last word's TTSTextFrame may arrive after + # TTSStoppedFrame, causing it to be missed by observers and the UI. + self._word_boundary_queue.put_nowait(None) def _handle_canceled(self, evt): """Handle synthesis cancellation.