diff --git a/CHANGELOG.md b/CHANGELOG.md index 60bfa354b..74506687d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [0.0.94] - 2025-11-10 +### Changed + +- Added support for retrying `SpeechmaticsTTSService` when it returns a 503 + error. Default values in `InputParams`. + ### Deprecated - The `KrispFilter` is deprecated and will be removed in a future version. Use diff --git a/src/pipecat/services/speechmatics/tts.py b/src/pipecat/services/speechmatics/tts.py index 23d10c5e1..b8fe172e7 100644 --- a/src/pipecat/services/speechmatics/tts.py +++ b/src/pipecat/services/speechmatics/tts.py @@ -6,6 +6,7 @@ """Speechmatics TTS service integration.""" +import asyncio from typing import AsyncGenerator, Optional from urllib.parse import urlencode @@ -21,6 +22,7 @@ from pipecat.frames.frames import ( TTSStoppedFrame, ) from pipecat.services.tts_service import TTSService +from pipecat.utils.network import exponential_backoff_time from pipecat.utils.tracing.service_decorators import traced_tts try: @@ -43,9 +45,13 @@ class SpeechmaticsTTSService(TTSService): SPEECHMATICS_SAMPLE_RATE = 16000 class InputParams(BaseModel): - """Optional input parameters for Speechmatics TTS configuration.""" + """Optional input parameters for Speechmatics TTS configuration. - pass + Parameters: + max_retries: Maximum number of retries for TTS requests. Defaults to 5. + """ + + max_retries: int = 5 def __init__( self, @@ -109,64 +115,119 @@ class SpeechmaticsTTSService(TTSService): Yields: Frame: Audio frames containing the synthesized speech. """ + # Log the TTS started frame logger.debug(f"{self}: Generating TTS [{text}]") + # HTTP headers headers = { "Authorization": f"Bearer {self._api_key}", "Content-Type": "application/json", } + # HTTP payload payload = { "text": text, } + # Complete HTTP URL url = _get_endpoint_url(self._base_url, self._voice_id, self.sample_rate) try: + # Start TTS TTFB metrics await self.start_ttfb_metrics() - async with self._session.post(url, json=payload, headers=headers) as response: - if response.status != 200: - error_message = f"Speechmatics TTS error: HTTP {response.status}" - logger.error(error_message) - yield ErrorFrame(error=error_message) - return + # Track attempt + attempt = 0 - await self.start_tts_usage_metrics(text) + # Keep retrying until we get a 200 response or timeout + while True: + async with self._session.post(url, json=payload, headers=headers) as response: + """Evaluate response from TTS service.""" - yield TTSStartedFrame() + # 503 : Service unavailable + if response.status == 503: + """Calculate the backoff time and retry.""" - # Process the response in streaming chunks - first_chunk = True - buffer = b"" + try: + # Calculate the backoff time + backoff_time = exponential_backoff_time( + attempt=attempt, min_wait=0.25, max_wait=8.0, multiplier=0.5 + ) - async for chunk in response.content.iter_any(): - if not chunk: - continue - if first_chunk: - await self.stop_ttfb_metrics() - first_chunk = False + # Increment attempt + attempt += 1 - buffer += chunk + # Check if we've exceeded the maximum number of attempts + if attempt >= self._params.max_retries: + raise ValueError() - # Emit all complete 2-byte int16 samples from buffer - if len(buffer) >= 2: - complete_samples = len(buffer) // 2 - complete_bytes = complete_samples * 2 + # Report error frame + yield ErrorFrame( + error=f"{self} Service unavailable [503] (attempt {attempt}, retry in {backoff_time:.2f}s)" + ) - audio_data = buffer[:complete_bytes] - buffer = buffer[complete_bytes:] # Keep remaining bytes for next iteration + # Wait before retrying + await asyncio.sleep(backoff_time) - yield TTSAudioRawFrame( - audio=audio_data, - sample_rate=self.sample_rate, - num_channels=1, + # Retry + continue + + except (ValueError, ArithmeticError): + yield ErrorFrame( + error=f"{self} Service unavailable [503] (attempts {attempt})", + fatal=True, + ) + return + + # != 200 : Error + if response.status != 200: + yield ErrorFrame( + error=f"{self} Service unavailable [{response.status}]", fatal=True ) + return + + # Update Pipecat metrics + await self.start_tts_usage_metrics(text) + + # Emit the TTS started frame + yield TTSStartedFrame() + + # Process the response in streaming chunks + first_chunk = True + buffer = b"" + + # Iterate over each audio data chunk from the TTS API + async for chunk in response.content.iter_any(): + if not chunk: + continue + if first_chunk: + await self.stop_ttfb_metrics() + first_chunk = False + + buffer += chunk + + # Emit all complete 2-byte int16 samples from buffer + if len(buffer) >= 2: + complete_samples = len(buffer) // 2 + complete_bytes = complete_samples * 2 + + audio_data = buffer[:complete_bytes] + buffer = buffer[complete_bytes:] + + # Emit the audio frame + yield TTSAudioRawFrame( + audio=audio_data, + sample_rate=self.sample_rate, + num_channels=1, + ) + + # Successfully processed the response, break out of retry loop + break except Exception as e: - logger.exception(f"Error generating TTS: {e}") - yield ErrorFrame(error=f"Speechmatics TTS error: {str(e)}") + yield ErrorFrame(error=f"{self}: Error generating TTS: {e}", fatal=True) finally: + # Emit the TTS stopped frame yield TTSStoppedFrame()