From 32c6dccebeacd34fd9dcf29593f322bcf94a6973 Mon Sep 17 00:00:00 2001 From: yukiobata1 Date: Wed, 31 Dec 2025 18:49:48 +0900 Subject: [PATCH 1/6] Add word-level timestamp support to Azure TTS with cumulative PTS fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit adds word boundary support to AzureTTSService and fixes the race condition that causes scrambled TTS output across multiple sentences. ## Features Added - Change AzureTTSService to inherit from WordTTSService - Subscribe to Azure SDK's synthesis_word_boundary event - Emit word-level text with timing information via _words_queue - Add synthesis lock for sequential sentence processing ## Race Condition Fix Previously, each sentence's word boundary timestamps reset to 0, causing downstream components to interleave words when reordering frames by PTS. This resulted in scrambled output like: 'Hello ! I What am questions AI have assistant...' The fix adds cumulative audio offset tracking to ensure monotonically increasing PTS across all sentences: Sentence 1: pts = 0.1s, 0.5s, 0.8s (cumulative at end: 0.8s) Sentence 2: pts = 0.9s, 1.2s, 1.5s (0.8s + relative offset) ## Key Changes - _cumulative_audio_offset: tracks total audio duration - _handle_word_boundary: adds cumulative offset to timestamps - _handle_completed: accumulates audio duration for next sentence - flush_audio: resets cumulative offset at end of LLM response - _handle_interruption: resets state on user interruption - run_tts: uses synthesis lock for sequential processing Fixes #2918 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- src/pipecat/services/azure/tts.py | 363 ++++++++++++++++++++++++++---- 1 file changed, 316 insertions(+), 47 deletions(-) diff --git a/src/pipecat/services/azure/tts.py b/src/pipecat/services/azure/tts.py index 154930fac..82d37b39f 100644 --- a/src/pipecat/services/azure/tts.py +++ b/src/pipecat/services/azure/tts.py @@ -13,16 +13,21 @@ from loguru import logger from pydantic import BaseModel from pipecat.frames.frames import ( + CancelFrame, + EndFrame, ErrorFrame, Frame, + InterruptionFrame, StartFrame, TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame, ) +from pipecat.processors.frame_processor import FrameDirection from pipecat.services.azure.common import language_to_azure_language -from pipecat.services.tts_service import TTSService +from pipecat.services.tts_service import TTSService, WordTTSService from pipecat.transcriptions.language import Language +from pipecat.utils.time import seconds_to_nanoseconds from pipecat.utils.tracing.service_decorators import traced_tts try: @@ -233,24 +238,198 @@ class AzureBaseTTSService(TTSService): return escaped_text -class AzureTTSService(AzureBaseTTSService): - """Azure Cognitive Services streaming TTS service. +class AzureTTSService(WordTTSService): + """Azure Cognitive Services streaming TTS service with word timestamps. Provides real-time text-to-speech synthesis using Azure's WebSocket-based - streaming API. Audio chunks are streamed as they become available for - lower latency playback. + streaming API. Audio chunks and word boundaries are streamed as they become + available for lower latency playback and accurate word-level synchronization. """ - def __init__(self, **kwargs): + # Define SSML escape mappings based on SSML reserved characters + # See - https://learn.microsoft.com/en-us/azure/ai-services/speech-service/speech-synthesis-markup-structure + SSML_ESCAPE_CHARS = { + "&": "&", + "<": "<", + ">": ">", + '"': """, + "'": "'", + } + + class InputParams(BaseModel): + """Input parameters for Azure TTS voice configuration. + + Parameters: + emphasis: Emphasis level for speech ("strong", "moderate", "reduced"). + language: Language for synthesis. Defaults to English (US). + pitch: Voice pitch adjustment (e.g., "+10%", "-5Hz", "high"). + rate: Speech rate multiplier. Defaults to "1.05". + role: Voice role for expression (e.g., "YoungAdultFemale"). + style: Speaking style (e.g., "cheerful", "sad", "excited"). + style_degree: Intensity of the speaking style (0.01 to 2.0). + volume: Volume level (e.g., "+20%", "loud", "x-soft"). + """ + + emphasis: Optional[str] = None + language: Optional[Language] = Language.EN_US + pitch: Optional[str] = None + rate: Optional[str] = "1.05" + role: Optional[str] = None + style: Optional[str] = None + style_degree: Optional[str] = None + volume: Optional[str] = None + + def __init__( + self, + *, + api_key: str, + region: str, + voice="en-US-SaraNeural", + sample_rate: Optional[int] = None, + params: Optional[InputParams] = None, + **kwargs, + ): """Initialize the Azure streaming TTS service. Args: - **kwargs: All arguments passed to AzureBaseTTSService parent class. + api_key: Azure Cognitive Services subscription key. + region: Azure region identifier (e.g., "eastus", "westus2"). + voice: Voice name to use for synthesis. Defaults to "en-US-SaraNeural". + sample_rate: Audio sample rate in Hz. If None, uses service default. + params: Voice and synthesis parameters configuration. + **kwargs: Additional arguments passed to parent WordTTSService. """ - super().__init__(**kwargs) + # We want to push text frames ourselves with word-level timing + super().__init__( + aggregate_sentences=True, + push_text_frames=False, + pause_frame_processing=True, + sample_rate=sample_rate, + **kwargs, + ) + + params = params or AzureTTSService.InputParams() + + self._settings = { + "emphasis": params.emphasis, + "language": self.language_to_service_language(params.language) + if params.language + else "en-US", + "pitch": params.pitch, + "rate": params.rate, + "role": params.role, + "style": params.style, + "style_degree": params.style_degree, + "volume": params.volume, + } + + self._api_key = api_key + self._region = region + self._voice_id = voice self._speech_config = None self._speech_synthesizer = None self._audio_queue = asyncio.Queue() + self._context_id = None + self._word_timestamps_started = False + self._synthesis_lock = asyncio.Lock() + self._cumulative_audio_offset: float = 0.0 # Cumulative audio duration in seconds + + def can_generate_metrics(self) -> bool: + """Check if this service can generate processing metrics. + + Returns: + True, as Azure TTS service supports metrics generation. + """ + return True + + def language_to_service_language(self, language: Language) -> Optional[str]: + """Convert a Language enum to Azure language format. + + Args: + language: The language to convert. + + Returns: + The Azure-specific language code, or None if not supported. + """ + return language_to_azure_language(language) + + def _construct_ssml(self, text: str) -> str: + """Construct SSML from text with current voice settings. + + Args: + text: The text to convert to SSML. + + Returns: + SSML string for Azure TTS synthesis. + """ + language = self._settings["language"] + + # Escape special characters + escaped_text = self._escape_text(text) + + ssml = ( + f"" + f"" + "" + ) + + if self._settings["style"]: + ssml += f"" + + if self._settings["emphasis"]: + ssml += f"" + + ssml += escaped_text + + if self._settings["emphasis"]: + ssml += "" + + ssml += "" + + if self._settings["style"]: + ssml += "" + + ssml += "" + + return ssml + + def _escape_text(self, text: str) -> str: + """Escapes XML/SSML reserved characters according to Microsoft documentation. + + This method escapes the following characters: + - & becomes & + - < becomes < + - > becomes > + - " becomes " + - ' becomes ' + + Args: + text: The text to escape. + + Returns: + The escaped text. + """ + escaped_text = text + for char, escape_code in AzureTTSService.SSML_ESCAPE_CHARS.items(): + escaped_text = escaped_text.replace(char, escape_code) + return escaped_text async def start(self, frame: StartFrame): """Start the Azure TTS service and initialize speech synthesizer. @@ -286,24 +465,99 @@ class AzureTTSService(AzureBaseTTSService): self._speech_synthesizer.synthesizing.connect(self._handle_synthesizing) self._speech_synthesizer.synthesis_completed.connect(self._handle_completed) self._speech_synthesizer.synthesis_canceled.connect(self._handle_canceled) + # Add word boundary event handler for word-level timestamps + self._speech_synthesizer.synthesis_word_boundary.connect(self._handle_word_boundary) + + def _handle_word_boundary(self, evt): + """Handle word boundary events from Azure SDK. + + Args: + evt: SpeechSynthesisWordBoundaryEventArgs from Azure Speech SDK + containing word text and audio offset timing. + """ + # evt.text contains the word + # evt.audio_offset contains timing in ticks (100-nanosecond units) + # Convert ticks to seconds: divide by 10,000,000 + word = evt.text + sentence_relative_seconds = evt.audio_offset / 10_000_000.0 + + # Add cumulative offset to get absolute timestamp across sentences + absolute_seconds = self._cumulative_audio_offset + sentence_relative_seconds + + # Queue the word timestamp for processing + # Use put_nowait since this is a synchronous callback + if self._context_id and word: + logger.debug(f"{self}: Word boundary - '{word}' at {absolute_seconds:.2f}s") + try: + # Convert to nanoseconds and put directly in queue (sync operation) + timestamp_ns = seconds_to_nanoseconds(absolute_seconds) + self._words_queue.put_nowait((word, timestamp_ns)) + except Exception as e: + logger.error(f"{self} error adding word timestamp: {e}") def _handle_synthesizing(self, evt): - """Handle audio chunks as they arriv.""" + """Handle audio chunks as they arrive. + + Args: + evt: Synthesis event containing audio data. + """ if evt.result and evt.result.audio_data: self._audio_queue.put_nowait(evt.result.audio_data) def _handle_completed(self, evt): - """Handle synthesis completion.""" + """Handle synthesis completion. + + Args: + evt: Completion event from Azure Speech SDK. + """ + # Update cumulative audio offset for next sentence + if evt.result and evt.result.audio_duration: + self._cumulative_audio_offset += evt.result.audio_duration.total_seconds() + self._audio_queue.put_nowait(None) # Signal completion + # Add completion markers to word timestamp queue + # Use put_nowait since this is a synchronous callback + if self._context_id: + try: + # Add TTSStoppedFrame marker but NOT Reset - we maintain cumulative PTS + self._words_queue.put_nowait(("TTSStoppedFrame", 0)) + except Exception as e: + logger.error(f"{self} error finalizing word timestamps: {e}") def _handle_canceled(self, evt): - """Handle synthesis cancellation.""" + """Handle synthesis cancellation. + + Args: + evt: Cancellation event. + """ logger.error(f"Speech synthesis canceled: {evt.result.cancellation_details.reason}") self._audio_queue.put_nowait(None) async def flush_audio(self): """Flush any pending audio data.""" logger.trace(f"{self}: flushing audio") + # Reset cumulative audio offset at end of LLM response + self._cumulative_audio_offset = 0.0 + + async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection): + """Handle interruption by stopping current synthesis. + + Args: + frame: The interruption frame. + direction: Frame processing direction. + """ + await super()._handle_interruption(frame, direction) + await self.stop_all_metrics() + # Reset cumulative audio offset on interruption + self._cumulative_audio_offset = 0.0 + # Clear the audio queue + while not self._audio_queue.empty(): + try: + self._audio_queue.get_nowait() + self._audio_queue.task_done() + except asyncio.QueueEmpty: + break + self._context_id = None @traced_tts async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: @@ -317,50 +571,65 @@ class AzureTTSService(AzureBaseTTSService): """ logger.debug(f"{self}: Generating TTS [{text}]") - # Clear the audio queue in case there's still audio in it, causing the next audio response - # to be cut off by the 'None' element returned at the end of the previous audio synthesis. - # Empty the audio queue before processing the new text - while not self._audio_queue.empty(): - self._audio_queue.get_nowait() - self._audio_queue.task_done() - - try: - if self._speech_synthesizer is None: - error_msg = "Speech synthesizer not initialized." - yield ErrorFrame(error=error_msg) - return + # Ensure sequential sentence processing to prevent word boundary interleaving + async with self._synthesis_lock: + # Clear the audio queue in case there's still audio in it, causing the next audio response + # to be cut off by the 'None' element returned at the end of the previous audio synthesis. + # Empty the audio queue before processing the new text + while not self._audio_queue.empty(): + self._audio_queue.get_nowait() + self._audio_queue.task_done() try: - await self.start_ttfb_metrics() - yield TTSStartedFrame() + if self._speech_synthesizer is None: + error_msg = "Speech synthesizer not initialized." + logger.error(error_msg) + yield ErrorFrame(error=error_msg) + return - ssml = self._construct_ssml(text) - self._speech_synthesizer.speak_ssml_async(ssml) - await self.start_tts_usage_metrics(text) + try: + await self.start_ttfb_metrics() + yield TTSStartedFrame() - # Stream audio chunks as they arrive - while True: - chunk = await self._audio_queue.get() - if chunk is None: # End of stream - break + # Mark that we're starting a new synthesis + self._context_id = str(id(text)) + self._word_timestamps_started = False - await self.stop_ttfb_metrics() - yield TTSAudioRawFrame( - audio=chunk, - sample_rate=self.sample_rate, - num_channels=1, - ) + ssml = self._construct_ssml(text) + self._speech_synthesizer.speak_ssml_async(ssml) + await self.start_tts_usage_metrics(text) - yield TTSStoppedFrame() + # Stream audio chunks as they arrive + while True: + chunk = await self._audio_queue.get() + if chunk is None: # End of stream + break + + await self.stop_ttfb_metrics() + # Start word timestamps only once when we receive first audio + if not self._word_timestamps_started: + self.start_word_timestamps() + self._word_timestamps_started = True + + frame = TTSAudioRawFrame( + audio=chunk, + sample_rate=self.sample_rate, + num_channels=1, + ) + yield frame + + # Clear context ID when done + self._context_id = None + yield TTSStoppedFrame() + + except Exception as e: + logger.error(f"{self} error during synthesis: {e}") + yield TTSStoppedFrame() + # Could add reconnection logic here if needed + return except Exception as e: - yield ErrorFrame(error=f"Unknown error occurred: {e}") - yield TTSStoppedFrame() - # Could add reconnection logic here if needed - return - - except Exception as e: - yield ErrorFrame(error=f"Unknown error occurred: {e}") + logger.error(f"{self} exception: {e}") class AzureHttpTTSService(AzureBaseTTSService): From 4f93d331b7ebd96520450273b51ced7b7ac5a038 Mon Sep 17 00:00:00 2001 From: yukiobata1 Date: Wed, 31 Dec 2025 19:19:21 +0900 Subject: [PATCH 2/6] Added await to self.start_word_timestamps() --- src/pipecat/services/azure/tts.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pipecat/services/azure/tts.py b/src/pipecat/services/azure/tts.py index 82d37b39f..d65f93df4 100644 --- a/src/pipecat/services/azure/tts.py +++ b/src/pipecat/services/azure/tts.py @@ -608,7 +608,7 @@ class AzureTTSService(WordTTSService): await self.stop_ttfb_metrics() # Start word timestamps only once when we receive first audio if not self._word_timestamps_started: - self.start_word_timestamps() + await self.start_word_timestamps() self._word_timestamps_started = True frame = TTSAudioRawFrame( From 137bbb3d2c9ff27701e69879052d0ff62965df76 Mon Sep 17 00:00:00 2001 From: yukiobata1 Date: Tue, 6 Jan 2026 21:16:13 +0900 Subject: [PATCH 3/6] updated tts.py to match mark's version --- src/pipecat/services/azure/tts.py | 333 +++++++++--------------------- 1 file changed, 97 insertions(+), 236 deletions(-) diff --git a/src/pipecat/services/azure/tts.py b/src/pipecat/services/azure/tts.py index d65f93df4..dec047d81 100644 --- a/src/pipecat/services/azure/tts.py +++ b/src/pipecat/services/azure/tts.py @@ -13,8 +13,6 @@ from loguru import logger from pydantic import BaseModel from pipecat.frames.frames import ( - CancelFrame, - EndFrame, ErrorFrame, Frame, InterruptionFrame, @@ -66,11 +64,12 @@ def sample_rate_to_output_format(sample_rate: int) -> SpeechSynthesisOutputForma return sample_rate_map.get(sample_rate, SpeechSynthesisOutputFormat.Raw24Khz16BitMonoPcm) -class AzureBaseTTSService(TTSService): - """Base class for Azure Cognitive Services text-to-speech implementations. +class AzureBaseTTSService: + """Base mixin class for Azure Cognitive Services text-to-speech implementations. Provides common functionality for Azure TTS services including SSML construction, voice configuration, and parameter management. + This is a mixin class and should be used alongside TTSService or its subclasses. """ # Define SSML escape mappings based on SSML reserved characters @@ -106,28 +105,24 @@ class AzureBaseTTSService(TTSService): style_degree: Optional[str] = None volume: Optional[str] = None - def __init__( + def _init_azure_base( self, *, api_key: str, region: str, - voice="en-US-SaraNeural", - sample_rate: Optional[int] = None, + voice: str = "en-US-SaraNeural", params: Optional[InputParams] = None, - **kwargs, ): - """Initialize the Azure TTS service with configuration parameters. + """Initialize Azure-specific configuration. + + This method should be called by subclasses after initializing their TTSService parent. Args: api_key: Azure Cognitive Services subscription key. region: Azure region identifier (e.g., "eastus", "westus2"). voice: Voice name to use for synthesis. Defaults to "en-US-SaraNeural". - sample_rate: Audio sample rate in Hz. If None, uses service default. params: Voice and synthesis parameters configuration. - **kwargs: Additional arguments passed to parent TTSService. """ - super().__init__(sample_rate=sample_rate, **kwargs) - params = params or AzureBaseTTSService.InputParams() self._settings = { @@ -238,7 +233,7 @@ class AzureBaseTTSService(TTSService): return escaped_text -class AzureTTSService(WordTTSService): +class AzureTTSService(WordTTSService, AzureBaseTTSService): """Azure Cognitive Services streaming TTS service with word timestamps. Provides real-time text-to-speech synthesis using Azure's WebSocket-based @@ -246,47 +241,15 @@ class AzureTTSService(WordTTSService): available for lower latency playback and accurate word-level synchronization. """ - # Define SSML escape mappings based on SSML reserved characters - # See - https://learn.microsoft.com/en-us/azure/ai-services/speech-service/speech-synthesis-markup-structure - SSML_ESCAPE_CHARS = { - "&": "&", - "<": "<", - ">": ">", - '"': """, - "'": "'", - } - - class InputParams(BaseModel): - """Input parameters for Azure TTS voice configuration. - - Parameters: - emphasis: Emphasis level for speech ("strong", "moderate", "reduced"). - language: Language for synthesis. Defaults to English (US). - pitch: Voice pitch adjustment (e.g., "+10%", "-5Hz", "high"). - rate: Speech rate multiplier. Defaults to "1.05". - role: Voice role for expression (e.g., "YoungAdultFemale"). - style: Speaking style (e.g., "cheerful", "sad", "excited"). - style_degree: Intensity of the speaking style (0.01 to 2.0). - volume: Volume level (e.g., "+20%", "loud", "x-soft"). - """ - - emphasis: Optional[str] = None - language: Optional[Language] = Language.EN_US - pitch: Optional[str] = None - rate: Optional[str] = "1.05" - role: Optional[str] = None - style: Optional[str] = None - style_degree: Optional[str] = None - volume: Optional[str] = None - def __init__( self, *, api_key: str, region: str, - voice="en-US-SaraNeural", + voice: str = "en-US-SaraNeural", sample_rate: Optional[int] = None, - params: Optional[InputParams] = None, + params: Optional[AzureBaseTTSService.InputParams] = None, + aggregate_sentences: bool = True, **kwargs, ): """Initialize the Azure streaming TTS service. @@ -297,140 +260,28 @@ class AzureTTSService(WordTTSService): voice: Voice name to use for synthesis. Defaults to "en-US-SaraNeural". sample_rate: Audio sample rate in Hz. If None, uses service default. params: Voice and synthesis parameters configuration. + aggregate_sentences: Whether to aggregate sentences before synthesis. **kwargs: Additional arguments passed to parent WordTTSService. """ - # We want to push text frames ourselves with word-level timing + # Initialize WordTTSService first to set up word timestamp tracking super().__init__( - aggregate_sentences=True, - push_text_frames=False, + aggregate_sentences=aggregate_sentences, + push_text_frames=False, # We'll push text frames based on word timestamps + push_stop_frames=True, pause_frame_processing=True, sample_rate=sample_rate, **kwargs, ) - params = params or AzureTTSService.InputParams() + # Initialize Azure-specific functionality from mixin + self._init_azure_base(api_key=api_key, region=region, voice=voice, params=params) - self._settings = { - "emphasis": params.emphasis, - "language": self.language_to_service_language(params.language) - if params.language - else "en-US", - "pitch": params.pitch, - "rate": params.rate, - "role": params.role, - "style": params.style, - "style_degree": params.style_degree, - "volume": params.volume, - } - - self._api_key = api_key - self._region = region - self._voice_id = voice self._speech_config = None self._speech_synthesizer = None self._audio_queue = asyncio.Queue() - self._context_id = None - self._word_timestamps_started = False - self._synthesis_lock = asyncio.Lock() + self._started = False self._cumulative_audio_offset: float = 0.0 # Cumulative audio duration in seconds - def can_generate_metrics(self) -> bool: - """Check if this service can generate processing metrics. - - Returns: - True, as Azure TTS service supports metrics generation. - """ - return True - - def language_to_service_language(self, language: Language) -> Optional[str]: - """Convert a Language enum to Azure language format. - - Args: - language: The language to convert. - - Returns: - The Azure-specific language code, or None if not supported. - """ - return language_to_azure_language(language) - - def _construct_ssml(self, text: str) -> str: - """Construct SSML from text with current voice settings. - - Args: - text: The text to convert to SSML. - - Returns: - SSML string for Azure TTS synthesis. - """ - language = self._settings["language"] - - # Escape special characters - escaped_text = self._escape_text(text) - - ssml = ( - f"" - f"" - "" - ) - - if self._settings["style"]: - ssml += f"" - - if self._settings["emphasis"]: - ssml += f"" - - ssml += escaped_text - - if self._settings["emphasis"]: - ssml += "" - - ssml += "" - - if self._settings["style"]: - ssml += "" - - ssml += "" - - return ssml - - def _escape_text(self, text: str) -> str: - """Escapes XML/SSML reserved characters according to Microsoft documentation. - - This method escapes the following characters: - - & becomes & - - < becomes < - - > becomes > - - " becomes " - - ' becomes ' - - Args: - text: The text to escape. - - Returns: - The escaped text. - """ - escaped_text = text - for char, escape_code in AzureTTSService.SSML_ESCAPE_CHARS.items(): - escaped_text = escaped_text.replace(char, escape_code) - return escaped_text - async def start(self, frame: StartFrame): """Start the Azure TTS service and initialize speech synthesizer. @@ -486,8 +337,8 @@ class AzureTTSService(WordTTSService): # Queue the word timestamp for processing # Use put_nowait since this is a synchronous callback - if self._context_id and word: - logger.debug(f"{self}: Word boundary - '{word}' at {absolute_seconds:.2f}s") + if word: + logger.trace(f"{self}: Word boundary - '{word}' at {absolute_seconds:.2f}s") try: # Convert to nanoseconds and put directly in queue (sync operation) timestamp_ns = seconds_to_nanoseconds(absolute_seconds) @@ -515,14 +366,6 @@ class AzureTTSService(WordTTSService): self._cumulative_audio_offset += evt.result.audio_duration.total_seconds() self._audio_queue.put_nowait(None) # Signal completion - # Add completion markers to word timestamp queue - # Use put_nowait since this is a synchronous callback - if self._context_id: - try: - # Add TTSStoppedFrame marker but NOT Reset - we maintain cumulative PTS - self._words_queue.put_nowait(("TTSStoppedFrame", 0)) - except Exception as e: - logger.error(f"{self} error finalizing word timestamps: {e}") def _handle_canceled(self, evt): """Handle synthesis cancellation. @@ -533,11 +376,22 @@ class AzureTTSService(WordTTSService): logger.error(f"Speech synthesis canceled: {evt.result.cancellation_details.reason}") self._audio_queue.put_nowait(None) + async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): + """Push a frame and handle state changes. + + Args: + frame: The frame to push. + direction: The direction to push the frame. + """ + await super().push_frame(frame, direction) + if isinstance(frame, (TTSStoppedFrame, InterruptionFrame)): + self._started = False + if isinstance(frame, TTSStoppedFrame): + await self.add_word_timestamps([("Reset", 0)]) + async def flush_audio(self): """Flush any pending audio data.""" logger.trace(f"{self}: flushing audio") - # Reset cumulative audio offset at end of LLM response - self._cumulative_audio_offset = 0.0 async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection): """Handle interruption by stopping current synthesis. @@ -557,7 +411,6 @@ class AzureTTSService(WordTTSService): self._audio_queue.task_done() except asyncio.QueueEmpty: break - self._context_id = None @traced_tts async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: @@ -571,68 +424,58 @@ class AzureTTSService(WordTTSService): """ logger.debug(f"{self}: Generating TTS [{text}]") - # Ensure sequential sentence processing to prevent word boundary interleaving - async with self._synthesis_lock: - # Clear the audio queue in case there's still audio in it, causing the next audio response - # to be cut off by the 'None' element returned at the end of the previous audio synthesis. - # Empty the audio queue before processing the new text - while not self._audio_queue.empty(): - self._audio_queue.get_nowait() - self._audio_queue.task_done() + # Clear the audio queue in case there's still audio in it, causing the next audio response + # to be cut off by the 'None' element returned at the end of the previous audio synthesis. + # Empty the audio queue before processing the new text + while not self._audio_queue.empty(): + self._audio_queue.get_nowait() + self._audio_queue.task_done() + + try: + if self._speech_synthesizer is None: + error_msg = "Speech synthesizer not initialized." + logger.error(error_msg) + yield ErrorFrame(error=error_msg) + return try: - if self._speech_synthesizer is None: - error_msg = "Speech synthesizer not initialized." - logger.error(error_msg) - yield ErrorFrame(error=error_msg) - return - - try: + if not self._started: await self.start_ttfb_metrics() + await self.start_word_timestamps() yield TTSStartedFrame() + self._started = True + self._cumulative_audio_offset = 0.0 - # Mark that we're starting a new synthesis - self._context_id = str(id(text)) - self._word_timestamps_started = False + ssml = self._construct_ssml(text) + self._speech_synthesizer.speak_ssml_async(ssml) + await self.start_tts_usage_metrics(text) - ssml = self._construct_ssml(text) - self._speech_synthesizer.speak_ssml_async(ssml) - await self.start_tts_usage_metrics(text) + # Stream audio chunks as they arrive + while True: + chunk = await self._audio_queue.get() + if chunk is None: # End of stream + break - # Stream audio chunks as they arrive - while True: - chunk = await self._audio_queue.get() - if chunk is None: # End of stream - break - - await self.stop_ttfb_metrics() - # Start word timestamps only once when we receive first audio - if not self._word_timestamps_started: - await self.start_word_timestamps() - self._word_timestamps_started = True - - frame = TTSAudioRawFrame( - audio=chunk, - sample_rate=self.sample_rate, - num_channels=1, - ) - yield frame - - # Clear context ID when done - self._context_id = None - yield TTSStoppedFrame() - - except Exception as e: - logger.error(f"{self} error during synthesis: {e}") - yield TTSStoppedFrame() - # Could add reconnection logic here if needed - return + await self.stop_ttfb_metrics() + frame = TTSAudioRawFrame( + audio=chunk, + sample_rate=self.sample_rate, + num_channels=1, + ) + yield frame except Exception as e: - logger.error(f"{self} exception: {e}") + logger.error(f"{self} error during synthesis: {e}") + yield TTSStoppedFrame() + self._started = False + # Could add reconnection logic here if needed + return + + except Exception as e: + logger.error(f"{self} exception: {e}") -class AzureHttpTTSService(AzureBaseTTSService): +class AzureHttpTTSService(TTSService, AzureBaseTTSService): """Azure Cognitive Services HTTP-based TTS service. Provides text-to-speech synthesis using Azure's HTTP API for simpler, @@ -640,13 +483,31 @@ class AzureHttpTTSService(AzureBaseTTSService): required and simpler integration is preferred. """ - def __init__(self, **kwargs): + def __init__( + self, + *, + api_key: str, + region: str, + voice: str = "en-US-SaraNeural", + sample_rate: Optional[int] = None, + params: Optional[AzureBaseTTSService.InputParams] = None, + **kwargs, + ): """Initialize the Azure HTTP TTS service. Args: - **kwargs: All arguments passed to AzureBaseTTSService parent class. + api_key: Azure Cognitive Services subscription key. + region: Azure region identifier (e.g., "eastus", "westus2"). + voice: Voice name to use for synthesis. Defaults to "en-US-SaraNeural". + sample_rate: Audio sample rate in Hz. If None, uses service default. + params: Voice and synthesis parameters configuration. + **kwargs: Additional arguments passed to parent TTSService. """ - super().__init__(**kwargs) + super().__init__(sample_rate=sample_rate, **kwargs) + + # Initialize Azure-specific functionality from mixin + self._init_azure_base(api_key=api_key, region=region, voice=voice, params=params) + self._speech_config = None self._speech_synthesizer = None From f62c262f233b194fc5b835cd9e0bb60b01cd7e3d Mon Sep 17 00:00:00 2001 From: yukiobata1 Date: Wed, 7 Jan 2026 13:10:41 +0900 Subject: [PATCH 4/6] Call start_word_timestamps() when the first audio chunk arrives --- src/pipecat/services/azure/tts.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/pipecat/services/azure/tts.py b/src/pipecat/services/azure/tts.py index dec047d81..3c649cf42 100644 --- a/src/pipecat/services/azure/tts.py +++ b/src/pipecat/services/azure/tts.py @@ -441,9 +441,9 @@ class AzureTTSService(WordTTSService, AzureBaseTTSService): try: if not self._started: await self.start_ttfb_metrics() - await self.start_word_timestamps() yield TTSStartedFrame() self._started = True + self._first_chunk = True self._cumulative_audio_offset = 0.0 ssml = self._construct_ssml(text) @@ -457,6 +457,12 @@ class AzureTTSService(WordTTSService, AzureBaseTTSService): break await self.stop_ttfb_metrics() + + # Start word timestamps when first chunk arrives + if self._first_chunk: + await self.start_word_timestamps() + self._first_chunk = False + frame = TTSAudioRawFrame( audio=chunk, sample_rate=self.sample_rate, From 7204bf9914601e6b2fd51c80f51b833f39a3015c Mon Sep 17 00:00:00 2001 From: yukiobata1 Date: Wed, 7 Jan 2026 13:32:31 +0900 Subject: [PATCH 5/6] added changegelog --- changelog/3334.added.md | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 changelog/3334.added.md diff --git a/changelog/3334.added.md b/changelog/3334.added.md new file mode 100644 index 000000000..993bca20e --- /dev/null +++ b/changelog/3334.added.md @@ -0,0 +1,2 @@ +- Added word-level timestamp support to `AzureTTSService` for accurate text-to-audio synchronization. + \ No newline at end of file From add5f51201838ff7279e3a0feb27cb3a16852d73 Mon Sep 17 00:00:00 2001 From: yukiobata1 Date: Thu, 8 Jan 2026 03:14:37 +0900 Subject: [PATCH 6/6] updated azure tts.py file --- src/pipecat/services/azure/tts.py | 69 ++++++++++++++++++++++++------- 1 file changed, 54 insertions(+), 15 deletions(-) diff --git a/src/pipecat/services/azure/tts.py b/src/pipecat/services/azure/tts.py index 3c649cf42..309f5e6b8 100644 --- a/src/pipecat/services/azure/tts.py +++ b/src/pipecat/services/azure/tts.py @@ -13,6 +13,8 @@ from loguru import logger from pydantic import BaseModel from pipecat.frames.frames import ( + CancelFrame, + EndFrame, ErrorFrame, Frame, InterruptionFrame, @@ -25,7 +27,6 @@ from pipecat.processors.frame_processor import FrameDirection from pipecat.services.azure.common import language_to_azure_language from pipecat.services.tts_service import TTSService, WordTTSService from pipecat.transcriptions.language import Language -from pipecat.utils.time import seconds_to_nanoseconds from pipecat.utils.tracing.service_decorators import traced_tts try: @@ -279,6 +280,8 @@ class AzureTTSService(WordTTSService, AzureBaseTTSService): self._speech_config = None self._speech_synthesizer = None self._audio_queue = asyncio.Queue() + self._word_boundary_queue = asyncio.Queue() + self._word_processor_task = None self._started = False self._cumulative_audio_offset: float = 0.0 # Cumulative audio duration in seconds @@ -316,9 +319,32 @@ class AzureTTSService(WordTTSService, AzureBaseTTSService): self._speech_synthesizer.synthesizing.connect(self._handle_synthesizing) self._speech_synthesizer.synthesis_completed.connect(self._handle_completed) self._speech_synthesizer.synthesis_canceled.connect(self._handle_canceled) - # Add word boundary event handler for word-level timestamps self._speech_synthesizer.synthesis_word_boundary.connect(self._handle_word_boundary) + # Start word processor task + if not self._word_processor_task: + self._word_processor_task = self.create_task(self._word_processor_task_handler()) + + async def stop(self, frame: EndFrame): + """Stop the Azure TTS service. + + Args: + frame: End frame signaling service stop. + """ + await super().stop(frame) + await self.cancel_task(self._word_processor_task) + self._word_processor_task = None + + async def cancel(self, frame: CancelFrame): + """Cancel the Azure TTS service. + + Args: + frame: Cancel frame signaling service cancellation. + """ + await super().cancel(frame) + await self.cancel_task(self._word_processor_task) + self._word_processor_task = None + def _handle_word_boundary(self, evt): """Handle word boundary events from Azure SDK. @@ -335,16 +361,28 @@ class AzureTTSService(WordTTSService, AzureBaseTTSService): # Add cumulative offset to get absolute timestamp across sentences absolute_seconds = self._cumulative_audio_offset + sentence_relative_seconds - # Queue the word timestamp for processing - # Use put_nowait since this is a synchronous callback + # Queue word timestamp for async processing + # Use thread-safe queue since this is called from Azure SDK thread if word: logger.trace(f"{self}: Word boundary - '{word}' at {absolute_seconds:.2f}s") try: - # Convert to nanoseconds and put directly in queue (sync operation) - timestamp_ns = seconds_to_nanoseconds(absolute_seconds) - self._words_queue.put_nowait((word, timestamp_ns)) + # Put in temporary queue - will be processed by async task + # Store as (word, timestamp_in_seconds) tuple + self._word_boundary_queue.put_nowait((word, absolute_seconds)) except Exception as e: - logger.error(f"{self} error adding word timestamp: {e}") + logger.error(f"{self} error queuing word timestamp: {e}") + + async def _word_processor_task_handler(self): + """Process word timestamps from the queue and call add_word_timestamps.""" + while True: + try: + word, timestamp_seconds = await self._word_boundary_queue.get() + await self.add_word_timestamps([(word, timestamp_seconds)]) + self._word_boundary_queue.task_done() + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"{self} error processing word timestamp: {e}") def _handle_synthesizing(self, evt): """Handle audio chunks as they arrive. @@ -411,6 +449,13 @@ class AzureTTSService(WordTTSService, AzureBaseTTSService): self._audio_queue.task_done() except asyncio.QueueEmpty: break + # Clear the word boundary queue + while not self._word_boundary_queue.empty(): + try: + self._word_boundary_queue.get_nowait() + self._word_boundary_queue.task_done() + except asyncio.QueueEmpty: + break @traced_tts async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: @@ -443,8 +488,6 @@ class AzureTTSService(WordTTSService, AzureBaseTTSService): await self.start_ttfb_metrics() yield TTSStartedFrame() self._started = True - self._first_chunk = True - self._cumulative_audio_offset = 0.0 ssml = self._construct_ssml(text) self._speech_synthesizer.speak_ssml_async(ssml) @@ -457,11 +500,7 @@ class AzureTTSService(WordTTSService, AzureBaseTTSService): break await self.stop_ttfb_metrics() - - # Start word timestamps when first chunk arrives - if self._first_chunk: - await self.start_word_timestamps() - self._first_chunk = False + await self.start_word_timestamps() frame = TTSAudioRawFrame( audio=chunk,