From 32c6dccebeacd34fd9dcf29593f322bcf94a6973 Mon Sep 17 00:00:00 2001 From: yukiobata1 Date: Wed, 31 Dec 2025 18:49:48 +0900 Subject: [PATCH] Add word-level timestamp support to Azure TTS with cumulative PTS fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit adds word boundary support to AzureTTSService and fixes the race condition that causes scrambled TTS output across multiple sentences. ## Features Added - Change AzureTTSService to inherit from WordTTSService - Subscribe to Azure SDK's synthesis_word_boundary event - Emit word-level text with timing information via _words_queue - Add synthesis lock for sequential sentence processing ## Race Condition Fix Previously, each sentence's word boundary timestamps reset to 0, causing downstream components to interleave words when reordering frames by PTS. This resulted in scrambled output like: 'Hello ! I What am questions AI have assistant...' The fix adds cumulative audio offset tracking to ensure monotonically increasing PTS across all sentences: Sentence 1: pts = 0.1s, 0.5s, 0.8s (cumulative at end: 0.8s) Sentence 2: pts = 0.9s, 1.2s, 1.5s (0.8s + relative offset) ## Key Changes - _cumulative_audio_offset: tracks total audio duration - _handle_word_boundary: adds cumulative offset to timestamps - _handle_completed: accumulates audio duration for next sentence - flush_audio: resets cumulative offset at end of LLM response - _handle_interruption: resets state on user interruption - run_tts: uses synthesis lock for sequential processing Fixes #2918 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- src/pipecat/services/azure/tts.py | 363 ++++++++++++++++++++++++++---- 1 file changed, 316 insertions(+), 47 deletions(-) diff --git a/src/pipecat/services/azure/tts.py b/src/pipecat/services/azure/tts.py index 154930fac..82d37b39f 100644 --- a/src/pipecat/services/azure/tts.py +++ b/src/pipecat/services/azure/tts.py @@ -13,16 +13,21 @@ from loguru import logger from pydantic import BaseModel from pipecat.frames.frames import ( + CancelFrame, + EndFrame, ErrorFrame, Frame, + InterruptionFrame, StartFrame, TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame, ) +from pipecat.processors.frame_processor import FrameDirection from pipecat.services.azure.common import language_to_azure_language -from pipecat.services.tts_service import TTSService +from pipecat.services.tts_service import TTSService, WordTTSService from pipecat.transcriptions.language import Language +from pipecat.utils.time import seconds_to_nanoseconds from pipecat.utils.tracing.service_decorators import traced_tts try: @@ -233,24 +238,198 @@ class AzureBaseTTSService(TTSService): return escaped_text -class AzureTTSService(AzureBaseTTSService): - """Azure Cognitive Services streaming TTS service. +class AzureTTSService(WordTTSService): + """Azure Cognitive Services streaming TTS service with word timestamps. Provides real-time text-to-speech synthesis using Azure's WebSocket-based - streaming API. Audio chunks are streamed as they become available for - lower latency playback. + streaming API. Audio chunks and word boundaries are streamed as they become + available for lower latency playback and accurate word-level synchronization. """ - def __init__(self, **kwargs): + # Define SSML escape mappings based on SSML reserved characters + # See - https://learn.microsoft.com/en-us/azure/ai-services/speech-service/speech-synthesis-markup-structure + SSML_ESCAPE_CHARS = { + "&": "&", + "<": "<", + ">": ">", + '"': """, + "'": "'", + } + + class InputParams(BaseModel): + """Input parameters for Azure TTS voice configuration. + + Parameters: + emphasis: Emphasis level for speech ("strong", "moderate", "reduced"). + language: Language for synthesis. Defaults to English (US). + pitch: Voice pitch adjustment (e.g., "+10%", "-5Hz", "high"). + rate: Speech rate multiplier. Defaults to "1.05". + role: Voice role for expression (e.g., "YoungAdultFemale"). + style: Speaking style (e.g., "cheerful", "sad", "excited"). + style_degree: Intensity of the speaking style (0.01 to 2.0). + volume: Volume level (e.g., "+20%", "loud", "x-soft"). + """ + + emphasis: Optional[str] = None + language: Optional[Language] = Language.EN_US + pitch: Optional[str] = None + rate: Optional[str] = "1.05" + role: Optional[str] = None + style: Optional[str] = None + style_degree: Optional[str] = None + volume: Optional[str] = None + + def __init__( + self, + *, + api_key: str, + region: str, + voice="en-US-SaraNeural", + sample_rate: Optional[int] = None, + params: Optional[InputParams] = None, + **kwargs, + ): """Initialize the Azure streaming TTS service. Args: - **kwargs: All arguments passed to AzureBaseTTSService parent class. + api_key: Azure Cognitive Services subscription key. + region: Azure region identifier (e.g., "eastus", "westus2"). + voice: Voice name to use for synthesis. Defaults to "en-US-SaraNeural". + sample_rate: Audio sample rate in Hz. If None, uses service default. + params: Voice and synthesis parameters configuration. + **kwargs: Additional arguments passed to parent WordTTSService. """ - super().__init__(**kwargs) + # We want to push text frames ourselves with word-level timing + super().__init__( + aggregate_sentences=True, + push_text_frames=False, + pause_frame_processing=True, + sample_rate=sample_rate, + **kwargs, + ) + + params = params or AzureTTSService.InputParams() + + self._settings = { + "emphasis": params.emphasis, + "language": self.language_to_service_language(params.language) + if params.language + else "en-US", + "pitch": params.pitch, + "rate": params.rate, + "role": params.role, + "style": params.style, + "style_degree": params.style_degree, + "volume": params.volume, + } + + self._api_key = api_key + self._region = region + self._voice_id = voice self._speech_config = None self._speech_synthesizer = None self._audio_queue = asyncio.Queue() + self._context_id = None + self._word_timestamps_started = False + self._synthesis_lock = asyncio.Lock() + self._cumulative_audio_offset: float = 0.0 # Cumulative audio duration in seconds + + def can_generate_metrics(self) -> bool: + """Check if this service can generate processing metrics. + + Returns: + True, as Azure TTS service supports metrics generation. + """ + return True + + def language_to_service_language(self, language: Language) -> Optional[str]: + """Convert a Language enum to Azure language format. + + Args: + language: The language to convert. + + Returns: + The Azure-specific language code, or None if not supported. + """ + return language_to_azure_language(language) + + def _construct_ssml(self, text: str) -> str: + """Construct SSML from text with current voice settings. + + Args: + text: The text to convert to SSML. + + Returns: + SSML string for Azure TTS synthesis. + """ + language = self._settings["language"] + + # Escape special characters + escaped_text = self._escape_text(text) + + ssml = ( + f"" + f"" + "" + ) + + if self._settings["style"]: + ssml += f"" + + if self._settings["emphasis"]: + ssml += f"" + + ssml += escaped_text + + if self._settings["emphasis"]: + ssml += "" + + ssml += "" + + if self._settings["style"]: + ssml += "" + + ssml += "" + + return ssml + + def _escape_text(self, text: str) -> str: + """Escapes XML/SSML reserved characters according to Microsoft documentation. + + This method escapes the following characters: + - & becomes & + - < becomes < + - > becomes > + - " becomes " + - ' becomes ' + + Args: + text: The text to escape. + + Returns: + The escaped text. + """ + escaped_text = text + for char, escape_code in AzureTTSService.SSML_ESCAPE_CHARS.items(): + escaped_text = escaped_text.replace(char, escape_code) + return escaped_text async def start(self, frame: StartFrame): """Start the Azure TTS service and initialize speech synthesizer. @@ -286,24 +465,99 @@ class AzureTTSService(AzureBaseTTSService): self._speech_synthesizer.synthesizing.connect(self._handle_synthesizing) self._speech_synthesizer.synthesis_completed.connect(self._handle_completed) self._speech_synthesizer.synthesis_canceled.connect(self._handle_canceled) + # Add word boundary event handler for word-level timestamps + self._speech_synthesizer.synthesis_word_boundary.connect(self._handle_word_boundary) + + def _handle_word_boundary(self, evt): + """Handle word boundary events from Azure SDK. + + Args: + evt: SpeechSynthesisWordBoundaryEventArgs from Azure Speech SDK + containing word text and audio offset timing. + """ + # evt.text contains the word + # evt.audio_offset contains timing in ticks (100-nanosecond units) + # Convert ticks to seconds: divide by 10,000,000 + word = evt.text + sentence_relative_seconds = evt.audio_offset / 10_000_000.0 + + # Add cumulative offset to get absolute timestamp across sentences + absolute_seconds = self._cumulative_audio_offset + sentence_relative_seconds + + # Queue the word timestamp for processing + # Use put_nowait since this is a synchronous callback + if self._context_id and word: + logger.debug(f"{self}: Word boundary - '{word}' at {absolute_seconds:.2f}s") + try: + # Convert to nanoseconds and put directly in queue (sync operation) + timestamp_ns = seconds_to_nanoseconds(absolute_seconds) + self._words_queue.put_nowait((word, timestamp_ns)) + except Exception as e: + logger.error(f"{self} error adding word timestamp: {e}") def _handle_synthesizing(self, evt): - """Handle audio chunks as they arriv.""" + """Handle audio chunks as they arrive. + + Args: + evt: Synthesis event containing audio data. + """ if evt.result and evt.result.audio_data: self._audio_queue.put_nowait(evt.result.audio_data) def _handle_completed(self, evt): - """Handle synthesis completion.""" + """Handle synthesis completion. + + Args: + evt: Completion event from Azure Speech SDK. + """ + # Update cumulative audio offset for next sentence + if evt.result and evt.result.audio_duration: + self._cumulative_audio_offset += evt.result.audio_duration.total_seconds() + self._audio_queue.put_nowait(None) # Signal completion + # Add completion markers to word timestamp queue + # Use put_nowait since this is a synchronous callback + if self._context_id: + try: + # Add TTSStoppedFrame marker but NOT Reset - we maintain cumulative PTS + self._words_queue.put_nowait(("TTSStoppedFrame", 0)) + except Exception as e: + logger.error(f"{self} error finalizing word timestamps: {e}") def _handle_canceled(self, evt): - """Handle synthesis cancellation.""" + """Handle synthesis cancellation. + + Args: + evt: Cancellation event. + """ logger.error(f"Speech synthesis canceled: {evt.result.cancellation_details.reason}") self._audio_queue.put_nowait(None) async def flush_audio(self): """Flush any pending audio data.""" logger.trace(f"{self}: flushing audio") + # Reset cumulative audio offset at end of LLM response + self._cumulative_audio_offset = 0.0 + + async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection): + """Handle interruption by stopping current synthesis. + + Args: + frame: The interruption frame. + direction: Frame processing direction. + """ + await super()._handle_interruption(frame, direction) + await self.stop_all_metrics() + # Reset cumulative audio offset on interruption + self._cumulative_audio_offset = 0.0 + # Clear the audio queue + while not self._audio_queue.empty(): + try: + self._audio_queue.get_nowait() + self._audio_queue.task_done() + except asyncio.QueueEmpty: + break + self._context_id = None @traced_tts async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: @@ -317,50 +571,65 @@ class AzureTTSService(AzureBaseTTSService): """ logger.debug(f"{self}: Generating TTS [{text}]") - # Clear the audio queue in case there's still audio in it, causing the next audio response - # to be cut off by the 'None' element returned at the end of the previous audio synthesis. - # Empty the audio queue before processing the new text - while not self._audio_queue.empty(): - self._audio_queue.get_nowait() - self._audio_queue.task_done() - - try: - if self._speech_synthesizer is None: - error_msg = "Speech synthesizer not initialized." - yield ErrorFrame(error=error_msg) - return + # Ensure sequential sentence processing to prevent word boundary interleaving + async with self._synthesis_lock: + # Clear the audio queue in case there's still audio in it, causing the next audio response + # to be cut off by the 'None' element returned at the end of the previous audio synthesis. + # Empty the audio queue before processing the new text + while not self._audio_queue.empty(): + self._audio_queue.get_nowait() + self._audio_queue.task_done() try: - await self.start_ttfb_metrics() - yield TTSStartedFrame() + if self._speech_synthesizer is None: + error_msg = "Speech synthesizer not initialized." + logger.error(error_msg) + yield ErrorFrame(error=error_msg) + return - ssml = self._construct_ssml(text) - self._speech_synthesizer.speak_ssml_async(ssml) - await self.start_tts_usage_metrics(text) + try: + await self.start_ttfb_metrics() + yield TTSStartedFrame() - # Stream audio chunks as they arrive - while True: - chunk = await self._audio_queue.get() - if chunk is None: # End of stream - break + # Mark that we're starting a new synthesis + self._context_id = str(id(text)) + self._word_timestamps_started = False - await self.stop_ttfb_metrics() - yield TTSAudioRawFrame( - audio=chunk, - sample_rate=self.sample_rate, - num_channels=1, - ) + ssml = self._construct_ssml(text) + self._speech_synthesizer.speak_ssml_async(ssml) + await self.start_tts_usage_metrics(text) - yield TTSStoppedFrame() + # Stream audio chunks as they arrive + while True: + chunk = await self._audio_queue.get() + if chunk is None: # End of stream + break + + await self.stop_ttfb_metrics() + # Start word timestamps only once when we receive first audio + if not self._word_timestamps_started: + self.start_word_timestamps() + self._word_timestamps_started = True + + frame = TTSAudioRawFrame( + audio=chunk, + sample_rate=self.sample_rate, + num_channels=1, + ) + yield frame + + # Clear context ID when done + self._context_id = None + yield TTSStoppedFrame() + + except Exception as e: + logger.error(f"{self} error during synthesis: {e}") + yield TTSStoppedFrame() + # Could add reconnection logic here if needed + return except Exception as e: - yield ErrorFrame(error=f"Unknown error occurred: {e}") - yield TTSStoppedFrame() - # Could add reconnection logic here if needed - return - - except Exception as e: - yield ErrorFrame(error=f"Unknown error occurred: {e}") + logger.error(f"{self} exception: {e}") class AzureHttpTTSService(AzureBaseTTSService):