Add word-level timestamp support to Azure TTS with cumulative PTS fix

This commit adds word boundary support to AzureTTSService and fixes
the race condition that causes scrambled TTS output across multiple
sentences.

## Features Added

- Change AzureTTSService to inherit from WordTTSService
- Subscribe to Azure SDK's synthesis_word_boundary event
- Emit word-level text with timing information via _words_queue
- Add synthesis lock for sequential sentence processing

## Race Condition Fix

Previously, each sentence's word boundary timestamps reset to 0,
causing downstream components to interleave words when reordering
frames by PTS. This resulted in scrambled output like:
  'Hello ! I What am questions AI have assistant...'

The fix adds cumulative audio offset tracking to ensure monotonically
increasing PTS across all sentences:
  Sentence 1: pts = 0.1s, 0.5s, 0.8s (cumulative at end: 0.8s)
  Sentence 2: pts = 0.9s, 1.2s, 1.5s (0.8s + relative offset)

## Key Changes

- _cumulative_audio_offset: tracks total audio duration
- _handle_word_boundary: adds cumulative offset to timestamps
- _handle_completed: accumulates audio duration for next sentence
- flush_audio: resets cumulative offset at end of LLM response
- _handle_interruption: resets state on user interruption
- run_tts: uses synthesis lock for sequential processing

Fixes #2918

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
yukiobata1
2025-12-31 18:49:48 +09:00
parent cbdc2b7d2d
commit 32c6dccebe

View File

@@ -13,16 +13,21 @@ from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterruptionFrame,
StartFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.azure.common import language_to_azure_language
from pipecat.services.tts_service import TTSService
from pipecat.services.tts_service import TTSService, WordTTSService
from pipecat.transcriptions.language import Language
from pipecat.utils.time import seconds_to_nanoseconds
from pipecat.utils.tracing.service_decorators import traced_tts
try:
@@ -233,24 +238,198 @@ class AzureBaseTTSService(TTSService):
return escaped_text
class AzureTTSService(AzureBaseTTSService):
"""Azure Cognitive Services streaming TTS service.
class AzureTTSService(WordTTSService):
"""Azure Cognitive Services streaming TTS service with word timestamps.
Provides real-time text-to-speech synthesis using Azure's WebSocket-based
streaming API. Audio chunks are streamed as they become available for
lower latency playback.
streaming API. Audio chunks and word boundaries are streamed as they become
available for lower latency playback and accurate word-level synchronization.
"""
def __init__(self, **kwargs):
# Define SSML escape mappings based on SSML reserved characters
# See - https://learn.microsoft.com/en-us/azure/ai-services/speech-service/speech-synthesis-markup-structure
SSML_ESCAPE_CHARS = {
"&": "&amp;",
"<": "&lt;",
">": "&gt;",
'"': "&quot;",
"'": "&apos;",
}
class InputParams(BaseModel):
"""Input parameters for Azure TTS voice configuration.
Parameters:
emphasis: Emphasis level for speech ("strong", "moderate", "reduced").
language: Language for synthesis. Defaults to English (US).
pitch: Voice pitch adjustment (e.g., "+10%", "-5Hz", "high").
rate: Speech rate multiplier. Defaults to "1.05".
role: Voice role for expression (e.g., "YoungAdultFemale").
style: Speaking style (e.g., "cheerful", "sad", "excited").
style_degree: Intensity of the speaking style (0.01 to 2.0).
volume: Volume level (e.g., "+20%", "loud", "x-soft").
"""
emphasis: Optional[str] = None
language: Optional[Language] = Language.EN_US
pitch: Optional[str] = None
rate: Optional[str] = "1.05"
role: Optional[str] = None
style: Optional[str] = None
style_degree: Optional[str] = None
volume: Optional[str] = None
def __init__(
self,
*,
api_key: str,
region: str,
voice="en-US-SaraNeural",
sample_rate: Optional[int] = None,
params: Optional[InputParams] = None,
**kwargs,
):
"""Initialize the Azure streaming TTS service.
Args:
**kwargs: All arguments passed to AzureBaseTTSService parent class.
api_key: Azure Cognitive Services subscription key.
region: Azure region identifier (e.g., "eastus", "westus2").
voice: Voice name to use for synthesis. Defaults to "en-US-SaraNeural".
sample_rate: Audio sample rate in Hz. If None, uses service default.
params: Voice and synthesis parameters configuration.
**kwargs: Additional arguments passed to parent WordTTSService.
"""
super().__init__(**kwargs)
# We want to push text frames ourselves with word-level timing
super().__init__(
aggregate_sentences=True,
push_text_frames=False,
pause_frame_processing=True,
sample_rate=sample_rate,
**kwargs,
)
params = params or AzureTTSService.InputParams()
self._settings = {
"emphasis": params.emphasis,
"language": self.language_to_service_language(params.language)
if params.language
else "en-US",
"pitch": params.pitch,
"rate": params.rate,
"role": params.role,
"style": params.style,
"style_degree": params.style_degree,
"volume": params.volume,
}
self._api_key = api_key
self._region = region
self._voice_id = voice
self._speech_config = None
self._speech_synthesizer = None
self._audio_queue = asyncio.Queue()
self._context_id = None
self._word_timestamps_started = False
self._synthesis_lock = asyncio.Lock()
self._cumulative_audio_offset: float = 0.0 # Cumulative audio duration in seconds
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as Azure TTS service supports metrics generation.
"""
return True
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to Azure language format.
Args:
language: The language to convert.
Returns:
The Azure-specific language code, or None if not supported.
"""
return language_to_azure_language(language)
def _construct_ssml(self, text: str) -> str:
"""Construct SSML from text with current voice settings.
Args:
text: The text to convert to SSML.
Returns:
SSML string for Azure TTS synthesis.
"""
language = self._settings["language"]
# Escape special characters
escaped_text = self._escape_text(text)
ssml = (
f"<speak version='1.0' xml:lang='{language}' "
"xmlns='http://www.w3.org/2001/10/synthesis' "
"xmlns:mstts='http://www.w3.org/2001/mstts'>"
f"<voice name='{self._voice_id}'>"
"<mstts:silence type='Sentenceboundary' value='20ms' />"
)
if self._settings["style"]:
ssml += f"<mstts:express-as style='{self._settings['style']}'"
if self._settings["style_degree"]:
ssml += f" styledegree='{self._settings['style_degree']}'"
if self._settings["role"]:
ssml += f" role='{self._settings['role']}'"
ssml += ">"
prosody_attrs = []
if self._settings["rate"]:
prosody_attrs.append(f"rate='{self._settings['rate']}'")
if self._settings["pitch"]:
prosody_attrs.append(f"pitch='{self._settings['pitch']}'")
if self._settings["volume"]:
prosody_attrs.append(f"volume='{self._settings['volume']}'")
ssml += f"<prosody {' '.join(prosody_attrs)}>"
if self._settings["emphasis"]:
ssml += f"<emphasis level='{self._settings['emphasis']}'>"
ssml += escaped_text
if self._settings["emphasis"]:
ssml += "</emphasis>"
ssml += "</prosody>"
if self._settings["style"]:
ssml += "</mstts:express-as>"
ssml += "</voice></speak>"
return ssml
def _escape_text(self, text: str) -> str:
"""Escapes XML/SSML reserved characters according to Microsoft documentation.
This method escapes the following characters:
- & becomes &amp;
- < becomes &lt;
- > becomes &gt;
- " becomes &quot;
- ' becomes &apos;
Args:
text: The text to escape.
Returns:
The escaped text.
"""
escaped_text = text
for char, escape_code in AzureTTSService.SSML_ESCAPE_CHARS.items():
escaped_text = escaped_text.replace(char, escape_code)
return escaped_text
async def start(self, frame: StartFrame):
"""Start the Azure TTS service and initialize speech synthesizer.
@@ -286,24 +465,99 @@ class AzureTTSService(AzureBaseTTSService):
self._speech_synthesizer.synthesizing.connect(self._handle_synthesizing)
self._speech_synthesizer.synthesis_completed.connect(self._handle_completed)
self._speech_synthesizer.synthesis_canceled.connect(self._handle_canceled)
# Add word boundary event handler for word-level timestamps
self._speech_synthesizer.synthesis_word_boundary.connect(self._handle_word_boundary)
def _handle_word_boundary(self, evt):
"""Handle word boundary events from Azure SDK.
Args:
evt: SpeechSynthesisWordBoundaryEventArgs from Azure Speech SDK
containing word text and audio offset timing.
"""
# evt.text contains the word
# evt.audio_offset contains timing in ticks (100-nanosecond units)
# Convert ticks to seconds: divide by 10,000,000
word = evt.text
sentence_relative_seconds = evt.audio_offset / 10_000_000.0
# Add cumulative offset to get absolute timestamp across sentences
absolute_seconds = self._cumulative_audio_offset + sentence_relative_seconds
# Queue the word timestamp for processing
# Use put_nowait since this is a synchronous callback
if self._context_id and word:
logger.debug(f"{self}: Word boundary - '{word}' at {absolute_seconds:.2f}s")
try:
# Convert to nanoseconds and put directly in queue (sync operation)
timestamp_ns = seconds_to_nanoseconds(absolute_seconds)
self._words_queue.put_nowait((word, timestamp_ns))
except Exception as e:
logger.error(f"{self} error adding word timestamp: {e}")
def _handle_synthesizing(self, evt):
"""Handle audio chunks as they arriv."""
"""Handle audio chunks as they arrive.
Args:
evt: Synthesis event containing audio data.
"""
if evt.result and evt.result.audio_data:
self._audio_queue.put_nowait(evt.result.audio_data)
def _handle_completed(self, evt):
"""Handle synthesis completion."""
"""Handle synthesis completion.
Args:
evt: Completion event from Azure Speech SDK.
"""
# Update cumulative audio offset for next sentence
if evt.result and evt.result.audio_duration:
self._cumulative_audio_offset += evt.result.audio_duration.total_seconds()
self._audio_queue.put_nowait(None) # Signal completion
# Add completion markers to word timestamp queue
# Use put_nowait since this is a synchronous callback
if self._context_id:
try:
# Add TTSStoppedFrame marker but NOT Reset - we maintain cumulative PTS
self._words_queue.put_nowait(("TTSStoppedFrame", 0))
except Exception as e:
logger.error(f"{self} error finalizing word timestamps: {e}")
def _handle_canceled(self, evt):
"""Handle synthesis cancellation."""
"""Handle synthesis cancellation.
Args:
evt: Cancellation event.
"""
logger.error(f"Speech synthesis canceled: {evt.result.cancellation_details.reason}")
self._audio_queue.put_nowait(None)
async def flush_audio(self):
"""Flush any pending audio data."""
logger.trace(f"{self}: flushing audio")
# Reset cumulative audio offset at end of LLM response
self._cumulative_audio_offset = 0.0
async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection):
"""Handle interruption by stopping current synthesis.
Args:
frame: The interruption frame.
direction: Frame processing direction.
"""
await super()._handle_interruption(frame, direction)
await self.stop_all_metrics()
# Reset cumulative audio offset on interruption
self._cumulative_audio_offset = 0.0
# Clear the audio queue
while not self._audio_queue.empty():
try:
self._audio_queue.get_nowait()
self._audio_queue.task_done()
except asyncio.QueueEmpty:
break
self._context_id = None
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
@@ -317,50 +571,65 @@ class AzureTTSService(AzureBaseTTSService):
"""
logger.debug(f"{self}: Generating TTS [{text}]")
# Clear the audio queue in case there's still audio in it, causing the next audio response
# to be cut off by the 'None' element returned at the end of the previous audio synthesis.
# Empty the audio queue before processing the new text
while not self._audio_queue.empty():
self._audio_queue.get_nowait()
self._audio_queue.task_done()
try:
if self._speech_synthesizer is None:
error_msg = "Speech synthesizer not initialized."
yield ErrorFrame(error=error_msg)
return
# Ensure sequential sentence processing to prevent word boundary interleaving
async with self._synthesis_lock:
# Clear the audio queue in case there's still audio in it, causing the next audio response
# to be cut off by the 'None' element returned at the end of the previous audio synthesis.
# Empty the audio queue before processing the new text
while not self._audio_queue.empty():
self._audio_queue.get_nowait()
self._audio_queue.task_done()
try:
await self.start_ttfb_metrics()
yield TTSStartedFrame()
if self._speech_synthesizer is None:
error_msg = "Speech synthesizer not initialized."
logger.error(error_msg)
yield ErrorFrame(error=error_msg)
return
ssml = self._construct_ssml(text)
self._speech_synthesizer.speak_ssml_async(ssml)
await self.start_tts_usage_metrics(text)
try:
await self.start_ttfb_metrics()
yield TTSStartedFrame()
# Stream audio chunks as they arrive
while True:
chunk = await self._audio_queue.get()
if chunk is None: # End of stream
break
# Mark that we're starting a new synthesis
self._context_id = str(id(text))
self._word_timestamps_started = False
await self.stop_ttfb_metrics()
yield TTSAudioRawFrame(
audio=chunk,
sample_rate=self.sample_rate,
num_channels=1,
)
ssml = self._construct_ssml(text)
self._speech_synthesizer.speak_ssml_async(ssml)
await self.start_tts_usage_metrics(text)
yield TTSStoppedFrame()
# Stream audio chunks as they arrive
while True:
chunk = await self._audio_queue.get()
if chunk is None: # End of stream
break
await self.stop_ttfb_metrics()
# Start word timestamps only once when we receive first audio
if not self._word_timestamps_started:
self.start_word_timestamps()
self._word_timestamps_started = True
frame = TTSAudioRawFrame(
audio=chunk,
sample_rate=self.sample_rate,
num_channels=1,
)
yield frame
# Clear context ID when done
self._context_id = None
yield TTSStoppedFrame()
except Exception as e:
logger.error(f"{self} error during synthesis: {e}")
yield TTSStoppedFrame()
# Could add reconnection logic here if needed
return
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
yield TTSStoppedFrame()
# Could add reconnection logic here if needed
return
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
class AzureHttpTTSService(AzureBaseTTSService):