From 55fb4b0845ba942850e0d77429114d055f087784 Mon Sep 17 00:00:00 2001 From: Yan Fortin Date: Tue, 14 Apr 2026 18:48:40 -0400 Subject: [PATCH 1/2] fix(azure-tts): route completion through word boundary queue to prevent last word from being missed The Azure TTS _handle_completed callback was putting the audio stream completion signal (None) directly into _audio_queue while the last word was still pending in _word_boundary_queue. This caused a race condition where run_tts could exit and TTSStoppedFrame could be emitted before the word processor task had a chance to process and emit the final word's TTSTextFrame. The fix routes the completion signal through _word_boundary_queue as a None sentinel. The word processor task now recognizes this sentinel and only signals _audio_queue after all pending words have been drained. This guarantees the last word's TTSTextFrame is always emitted before TTSStoppedFrame. The cancellation/interruption path (_handle_canceled) is unchanged and still signals _audio_queue directly, which is correct since word ordering does not matter when speech is interrupted. --- changelog/0000.fixed.md | 1 + src/pipecat/services/azure/tts.py | 37 ++++++++++++++++++++++--------- 2 files changed, 27 insertions(+), 11 deletions(-) create mode 100644 changelog/0000.fixed.md diff --git a/changelog/0000.fixed.md b/changelog/0000.fixed.md new file mode 100644 index 000000000..fa83811b3 --- /dev/null +++ b/changelog/0000.fixed.md @@ -0,0 +1 @@ +- Fixed Azure TTS last word being missed by observers and RTVI UI. The completion signal was racing with word timestamp processing, causing the final word's `TTSTextFrame` to arrive after `TTSStoppedFrame`. Completion is now routed through the word boundary queue to ensure all words are processed before signaling stream end. diff --git a/src/pipecat/services/azure/tts.py b/src/pipecat/services/azure/tts.py index 79dc8a2e1..57d201c02 100644 --- a/src/pipecat/services/azure/tts.py +++ b/src/pipecat/services/azure/tts.py @@ -542,14 +542,25 @@ class AzureTTSService(TTSService, AzureBaseTTSService): self._last_timestamp = timestamp async def _word_processor_task_handler(self): - """Process word timestamps from the queue and call add_word_timestamps.""" + """Process word timestamps from the queue and call add_word_timestamps. + + Also handles a None sentinel from _handle_completed: once all pending + words have been drained, it signals audio stream completion via + _audio_queue so that run_tts exits only after the last word has been + processed. + """ while True: try: - word, timestamp_seconds = await self._word_boundary_queue.get() - if self._current_context_id: - await self.add_word_timestamps( - [(word, timestamp_seconds)], self._current_context_id - ) + item = await self._word_boundary_queue.get() + if item is None: + # All words drained — now signal audio completion. + self._audio_queue.put_nowait(None) + else: + word, timestamp_seconds = item + if self._current_context_id: + await self.add_word_timestamps( + [(word, timestamp_seconds)], self._current_context_id + ) self._word_boundary_queue.task_done() except asyncio.CancelledError: break @@ -571,17 +582,21 @@ class AzureTTSService(TTSService, AzureBaseTTSService): Args: evt: Completion event from Azure Speech SDK. """ + # Store duration for cumulative offset calculation + if evt.result and evt.result.audio_duration: + self._current_sentence_duration = evt.result.audio_duration.total_seconds() + # Flush any pending word before completing if self._last_word is not None: self._word_boundary_queue.put_nowait((self._last_word, self._last_timestamp)) self._last_word = None self._last_timestamp = None - # Store duration for cumulative offset calculation - if evt.result and evt.result.audio_duration: - self._current_sentence_duration = evt.result.audio_duration.total_seconds() - - self._audio_queue.put_nowait(None) # Signal completion + # Route completion through the word boundary queue so the word processor + # task drains all pending words before signaling audio stream completion. + # Without this, the last word's TTSTextFrame may arrive after + # TTSStoppedFrame, causing it to be missed by observers and the UI. + self._word_boundary_queue.put_nowait(None) def _handle_canceled(self, evt): """Handle synthesis cancellation. From 6feeee515f93cc06d00036ca58132f11357cda9f Mon Sep 17 00:00:00 2001 From: Yan Fortin Date: Tue, 14 Apr 2026 18:49:35 -0400 Subject: [PATCH 2/2] chore: rename changelog fragment to match PR #4306 --- changelog/{0000.fixed.md => 4306.fixed.md} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename changelog/{0000.fixed.md => 4306.fixed.md} (100%) diff --git a/changelog/0000.fixed.md b/changelog/4306.fixed.md similarity index 100% rename from changelog/0000.fixed.md rename to changelog/4306.fixed.md