Merge pull request #3028 from sam-s10s/fix/smx-tts-retry

SpeechmaticsTTS - Support for retry when 503 error to TTS API.
This commit is contained in:
Mark Backman
2025-11-12 09:26:07 -05:00
committed by GitHub
2 changed files with 98 additions and 32 deletions

View File

@@ -19,6 +19,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [0.0.94] - 2025-11-10
### Changed
- Added support for retrying `SpeechmaticsTTSService` when it returns a 503
error. Default values in `InputParams`.
### Deprecated
- The `KrispFilter` is deprecated and will be removed in a future version. Use

View File

@@ -6,6 +6,7 @@
"""Speechmatics TTS service integration."""
import asyncio
from typing import AsyncGenerator, Optional
from urllib.parse import urlencode
@@ -21,6 +22,7 @@ from pipecat.frames.frames import (
TTSStoppedFrame,
)
from pipecat.services.tts_service import TTSService
from pipecat.utils.network import exponential_backoff_time
from pipecat.utils.tracing.service_decorators import traced_tts
try:
@@ -43,9 +45,13 @@ class SpeechmaticsTTSService(TTSService):
SPEECHMATICS_SAMPLE_RATE = 16000
class InputParams(BaseModel):
"""Optional input parameters for Speechmatics TTS configuration."""
"""Optional input parameters for Speechmatics TTS configuration.
pass
Parameters:
max_retries: Maximum number of retries for TTS requests. Defaults to 5.
"""
max_retries: int = 5
def __init__(
self,
@@ -109,64 +115,119 @@ class SpeechmaticsTTSService(TTSService):
Yields:
Frame: Audio frames containing the synthesized speech.
"""
# Log the TTS started frame
logger.debug(f"{self}: Generating TTS [{text}]")
# HTTP headers
headers = {
"Authorization": f"Bearer {self._api_key}",
"Content-Type": "application/json",
}
# HTTP payload
payload = {
"text": text,
}
# Complete HTTP URL
url = _get_endpoint_url(self._base_url, self._voice_id, self.sample_rate)
try:
# Start TTS TTFB metrics
await self.start_ttfb_metrics()
async with self._session.post(url, json=payload, headers=headers) as response:
if response.status != 200:
error_message = f"Speechmatics TTS error: HTTP {response.status}"
logger.error(error_message)
yield ErrorFrame(error=error_message)
return
# Track attempt
attempt = 0
await self.start_tts_usage_metrics(text)
# Keep retrying until we get a 200 response or timeout
while True:
async with self._session.post(url, json=payload, headers=headers) as response:
"""Evaluate response from TTS service."""
yield TTSStartedFrame()
# 503 : Service unavailable
if response.status == 503:
"""Calculate the backoff time and retry."""
# Process the response in streaming chunks
first_chunk = True
buffer = b""
try:
# Calculate the backoff time
backoff_time = exponential_backoff_time(
attempt=attempt, min_wait=0.25, max_wait=8.0, multiplier=0.5
)
async for chunk in response.content.iter_any():
if not chunk:
continue
if first_chunk:
await self.stop_ttfb_metrics()
first_chunk = False
# Increment attempt
attempt += 1
buffer += chunk
# Check if we've exceeded the maximum number of attempts
if attempt >= self._params.max_retries:
raise ValueError()
# Emit all complete 2-byte int16 samples from buffer
if len(buffer) >= 2:
complete_samples = len(buffer) // 2
complete_bytes = complete_samples * 2
# Report error frame
yield ErrorFrame(
error=f"{self} Service unavailable [503] (attempt {attempt}, retry in {backoff_time:.2f}s)"
)
audio_data = buffer[:complete_bytes]
buffer = buffer[complete_bytes:] # Keep remaining bytes for next iteration
# Wait before retrying
await asyncio.sleep(backoff_time)
yield TTSAudioRawFrame(
audio=audio_data,
sample_rate=self.sample_rate,
num_channels=1,
# Retry
continue
except (ValueError, ArithmeticError):
yield ErrorFrame(
error=f"{self} Service unavailable [503] (attempts {attempt})",
fatal=True,
)
return
# != 200 : Error
if response.status != 200:
yield ErrorFrame(
error=f"{self} Service unavailable [{response.status}]", fatal=True
)
return
# Update Pipecat metrics
await self.start_tts_usage_metrics(text)
# Emit the TTS started frame
yield TTSStartedFrame()
# Process the response in streaming chunks
first_chunk = True
buffer = b""
# Iterate over each audio data chunk from the TTS API
async for chunk in response.content.iter_any():
if not chunk:
continue
if first_chunk:
await self.stop_ttfb_metrics()
first_chunk = False
buffer += chunk
# Emit all complete 2-byte int16 samples from buffer
if len(buffer) >= 2:
complete_samples = len(buffer) // 2
complete_bytes = complete_samples * 2
audio_data = buffer[:complete_bytes]
buffer = buffer[complete_bytes:]
# Emit the audio frame
yield TTSAudioRawFrame(
audio=audio_data,
sample_rate=self.sample_rate,
num_channels=1,
)
# Successfully processed the response, break out of retry loop
break
except Exception as e:
logger.exception(f"Error generating TTS: {e}")
yield ErrorFrame(error=f"Speechmatics TTS error: {str(e)}")
yield ErrorFrame(error=f"{self}: Error generating TTS: {e}", fatal=True)
finally:
# Emit the TTS stopped frame
yield TTSStoppedFrame()