From adf5198423fbb825751ad01178cdbde259fafade Mon Sep 17 00:00:00 2001 From: Sam Sykes Date: Tue, 11 Nov 2025 13:49:14 +0000 Subject: [PATCH 1/8] Support for retry when 503 error to TTS API. --- CHANGELOG.md | 5 ++ src/pipecat/services/speechmatics/tts.py | 99 +++++++++++++++--------- 2 files changed, 68 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fc37064e4..fb7002fd6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [0.0.94] - 2025-11-10 +### Changed + +- Added support for retrying `SpeechmaticsTTSService` when it returns a 503 + error. Default values in `InputParams`. + ### Deprecated - The `KrispFilter` is deprecated and will be removed in a future version. Use diff --git a/src/pipecat/services/speechmatics/tts.py b/src/pipecat/services/speechmatics/tts.py index 23d10c5e1..a99433f73 100644 --- a/src/pipecat/services/speechmatics/tts.py +++ b/src/pipecat/services/speechmatics/tts.py @@ -6,6 +6,7 @@ """Speechmatics TTS service integration.""" +import asyncio from typing import AsyncGenerator, Optional from urllib.parse import urlencode @@ -45,7 +46,8 @@ class SpeechmaticsTTSService(TTSService): class InputParams(BaseModel): """Optional input parameters for Speechmatics TTS configuration.""" - pass + retry_interval_s: float = 0.02 + retry_timeout_s: float = 1.0 def __init__( self, @@ -125,46 +127,71 @@ class SpeechmaticsTTSService(TTSService): try: await self.start_ttfb_metrics() - async with self._session.post(url, json=payload, headers=headers) as response: - if response.status != 200: - error_message = f"Speechmatics TTS error: HTTP {response.status}" - logger.error(error_message) - yield ErrorFrame(error=error_message) - return + # Retry loop for 503 responses + start_time = asyncio.get_event_loop().time() - await self.start_tts_usage_metrics(text) + while True: + async with self._session.post(url, json=payload, headers=headers) as response: + if response.status == 503: + elapsed_time = asyncio.get_event_loop().time() - start_time + if elapsed_time >= self._params.retry_timeout_s: + error_message = ( + f"{self} HTTP 503 (timeout after {self._params.retry_timeout_s}s)" + ) + logger.error(error_message) + yield ErrorFrame(error=error_message) + return - yield TTSStartedFrame() - - # Process the response in streaming chunks - first_chunk = True - buffer = b"" - - async for chunk in response.content.iter_any(): - if not chunk: - continue - if first_chunk: - await self.stop_ttfb_metrics() - first_chunk = False - - buffer += chunk - - # Emit all complete 2-byte int16 samples from buffer - if len(buffer) >= 2: - complete_samples = len(buffer) // 2 - complete_bytes = complete_samples * 2 - - audio_data = buffer[:complete_bytes] - buffer = buffer[complete_bytes:] # Keep remaining bytes for next iteration - - yield TTSAudioRawFrame( - audio=audio_data, - sample_rate=self.sample_rate, - num_channels=1, + logger.debug( + f"{self} Received 503, retrying in {self._params.retry_interval_s}s..." ) + await asyncio.sleep(self._params.retry_interval_s) + continue + + if response.status != 200: + error_message = f"{self} HTTP {response.status}" + logger.error(error_message) + yield ErrorFrame(error=error_message) + return + + await self.start_tts_usage_metrics(text) + + yield TTSStartedFrame() + + # Process the response in streaming chunks + first_chunk = True + buffer = b"" + + async for chunk in response.content.iter_any(): + if not chunk: + continue + if first_chunk: + await self.stop_ttfb_metrics() + first_chunk = False + + buffer += chunk + + # Emit all complete 2-byte int16 samples from buffer + if len(buffer) >= 2: + complete_samples = len(buffer) // 2 + complete_bytes = complete_samples * 2 + + audio_data = buffer[:complete_bytes] + buffer = buffer[ + complete_bytes: + ] # Keep remaining bytes for next iteration + + yield TTSAudioRawFrame( + audio=audio_data, + sample_rate=self.sample_rate, + num_channels=1, + ) + + # Successfully processed the response, break out of retry loop + break except Exception as e: - logger.exception(f"Error generating TTS: {e}") + logger.exception(f"{self}: Error generating TTS: {e}") yield ErrorFrame(error=f"Speechmatics TTS error: {str(e)}") finally: yield TTSStoppedFrame() From 41ac43cf71f91b2ca751dcc9a4cef57af8ff0f96 Mon Sep 17 00:00:00 2001 From: Sam Sykes Date: Tue, 11 Nov 2025 13:56:45 +0000 Subject: [PATCH 2/8] updated docs --- src/pipecat/services/speechmatics/tts.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/pipecat/services/speechmatics/tts.py b/src/pipecat/services/speechmatics/tts.py index a99433f73..3086aaa44 100644 --- a/src/pipecat/services/speechmatics/tts.py +++ b/src/pipecat/services/speechmatics/tts.py @@ -44,7 +44,12 @@ class SpeechmaticsTTSService(TTSService): SPEECHMATICS_SAMPLE_RATE = 16000 class InputParams(BaseModel): - """Optional input parameters for Speechmatics TTS configuration.""" + """Optional input parameters for Speechmatics TTS configuration. + + Parameters: + retry_interval_s: Interval between retries in seconds. Defaults to 0.02. + retry_timeout_s: Timeout for retries in seconds. Defaults to 1.0. + """ retry_interval_s: float = 0.02 retry_timeout_s: float = 1.0 From 0febfc62ec1854327c5ccd4f7adee2891513ce66 Mon Sep 17 00:00:00 2001 From: Sam Sykes Date: Tue, 11 Nov 2025 17:45:22 +0000 Subject: [PATCH 3/8] Updated to use backoff utility function. --- src/pipecat/services/speechmatics/tts.py | 85 +++++++++++++++--------- 1 file changed, 55 insertions(+), 30 deletions(-) diff --git a/src/pipecat/services/speechmatics/tts.py b/src/pipecat/services/speechmatics/tts.py index 3086aaa44..7b40cf4eb 100644 --- a/src/pipecat/services/speechmatics/tts.py +++ b/src/pipecat/services/speechmatics/tts.py @@ -16,12 +16,14 @@ from pydantic import BaseModel from pipecat.frames.frames import ( ErrorFrame, + FatalErrorFrame, Frame, TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame, ) from pipecat.services.tts_service import TTSService +from pipecat.utils.network import exponential_backoff_time from pipecat.utils.tracing.service_decorators import traced_tts try: @@ -44,15 +46,9 @@ class SpeechmaticsTTSService(TTSService): SPEECHMATICS_SAMPLE_RATE = 16000 class InputParams(BaseModel): - """Optional input parameters for Speechmatics TTS configuration. + """Optional input parameters for Speechmatics TTS configuration.""" - Parameters: - retry_interval_s: Interval between retries in seconds. Defaults to 0.02. - retry_timeout_s: Timeout for retries in seconds. Defaults to 1.0. - """ - - retry_interval_s: float = 0.02 - retry_timeout_s: float = 1.0 + pass def __init__( self, @@ -116,57 +112,87 @@ class SpeechmaticsTTSService(TTSService): Yields: Frame: Audio frames containing the synthesized speech. """ + # Log the TTS started frame logger.debug(f"{self}: Generating TTS [{text}]") + # HTTP headers headers = { "Authorization": f"Bearer {self._api_key}", "Content-Type": "application/json", } + # HTTP payload payload = { "text": text, } + # Complete HTTP URL url = _get_endpoint_url(self._base_url, self._voice_id, self.sample_rate) try: + # Start TTS TTFB metrics await self.start_ttfb_metrics() - # Retry loop for 503 responses - start_time = asyncio.get_event_loop().time() + # Track attempt + attempt = 0 + # Keep retrying until we get a 200 response or timeout while True: async with self._session.post(url, json=payload, headers=headers) as response: + """Evaluate response from TTS service.""" + + # 503 : Service unavailable if response.status == 503: - elapsed_time = asyncio.get_event_loop().time() - start_time - if elapsed_time >= self._params.retry_timeout_s: - error_message = ( - f"{self} HTTP 503 (timeout after {self._params.retry_timeout_s}s)" + """Calculate the backoff time and retry.""" + + try: + # Calculate the backoff time + backoff_time = exponential_backoff_time( + attempt=attempt, min_wait=0.25, max_wait=8.0, multiplier=0.5 + ) + + # Check if we've exceeded the maximum number of attempts + if backoff_time >= 8.0: + raise ValueError() + + # Report error frame + yield ErrorFrame( + error=f"{self} HTTP 503 (attempt {attempt}, retry in {backoff_time:.2f}s)" + ) + + # Wait before retrying + await asyncio.sleep(backoff_time) + + # Increment attempt + attempt += 1 + + # Retry + continue + + except (ValueError, ArithmeticError): + yield FatalErrorFrame( + error=f"{self} Service unavailable (attempts {attempt})" ) - logger.error(error_message) - yield ErrorFrame(error=error_message) return - logger.debug( - f"{self} Received 503, retrying in {self._params.retry_interval_s}s..." - ) - await asyncio.sleep(self._params.retry_interval_s) - continue - + # != 200 : Error if response.status != 200: - error_message = f"{self} HTTP {response.status}" - logger.error(error_message) - yield ErrorFrame(error=error_message) + yield FatalErrorFrame( + error=f"{self} Service unavailable ({response.status})" + ) return + # Update Pipecat metrics await self.start_tts_usage_metrics(text) + # Emit the TTS started frame yield TTSStartedFrame() # Process the response in streaming chunks first_chunk = True buffer = b"" + # Iterate over each audio data chunk from the TTS API async for chunk in response.content.iter_any(): if not chunk: continue @@ -182,10 +208,9 @@ class SpeechmaticsTTSService(TTSService): complete_bytes = complete_samples * 2 audio_data = buffer[:complete_bytes] - buffer = buffer[ - complete_bytes: - ] # Keep remaining bytes for next iteration + buffer = buffer[complete_bytes:] + # Emit the audio frame yield TTSAudioRawFrame( audio=audio_data, sample_rate=self.sample_rate, @@ -196,9 +221,9 @@ class SpeechmaticsTTSService(TTSService): break except Exception as e: - logger.exception(f"{self}: Error generating TTS: {e}") - yield ErrorFrame(error=f"Speechmatics TTS error: {str(e)}") + yield ErrorFrame(error=f"{self}: Error generating TTS: {e}") finally: + # Emit the TTS stopped frame yield TTSStoppedFrame() From 60bc77c7959031c865db80dbab65686706f0441b Mon Sep 17 00:00:00 2001 From: Sam Sykes Date: Tue, 11 Nov 2025 17:50:06 +0000 Subject: [PATCH 4/8] Update debugging messages. --- src/pipecat/services/speechmatics/tts.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/pipecat/services/speechmatics/tts.py b/src/pipecat/services/speechmatics/tts.py index 7b40cf4eb..455eb1ce0 100644 --- a/src/pipecat/services/speechmatics/tts.py +++ b/src/pipecat/services/speechmatics/tts.py @@ -157,7 +157,7 @@ class SpeechmaticsTTSService(TTSService): # Report error frame yield ErrorFrame( - error=f"{self} HTTP 503 (attempt {attempt}, retry in {backoff_time:.2f}s)" + error=f"{self} Service unavailable [503] (attempt {attempt}, retry in {backoff_time:.2f}s)" ) # Wait before retrying @@ -171,14 +171,14 @@ class SpeechmaticsTTSService(TTSService): except (ValueError, ArithmeticError): yield FatalErrorFrame( - error=f"{self} Service unavailable (attempts {attempt})" + error=f"{self} Service unavailable [503] (attempts {attempt})" ) return # != 200 : Error if response.status != 200: yield FatalErrorFrame( - error=f"{self} Service unavailable ({response.status})" + error=f"{self} Service unavailable [{response.status}]" ) return From 501744d7da8b66d06ac0e1a77612a1a06e3fd106 Mon Sep 17 00:00:00 2001 From: Sam Sykes Date: Tue, 11 Nov 2025 17:53:31 +0000 Subject: [PATCH 5/8] Update CHANGELOG. --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fb7002fd6..df2c278de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - Added support for retrying `SpeechmaticsTTSService` when it returns a 503 - error. Default values in `InputParams`. + error. ### Deprecated From 41cf9adef45fe4bf63d7c1be46bb333921bde0a4 Mon Sep 17 00:00:00 2001 From: Sam Sykes Date: Tue, 11 Nov 2025 18:00:27 +0000 Subject: [PATCH 6/8] Updated for max retries. --- CHANGELOG.md | 2 +- src/pipecat/services/speechmatics/tts.py | 16 ++++++++++------ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index df2c278de..fb7002fd6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - Added support for retrying `SpeechmaticsTTSService` when it returns a 503 - error. + error. Default values in `InputParams`. ### Deprecated diff --git a/src/pipecat/services/speechmatics/tts.py b/src/pipecat/services/speechmatics/tts.py index 455eb1ce0..b45c63ef7 100644 --- a/src/pipecat/services/speechmatics/tts.py +++ b/src/pipecat/services/speechmatics/tts.py @@ -46,9 +46,13 @@ class SpeechmaticsTTSService(TTSService): SPEECHMATICS_SAMPLE_RATE = 16000 class InputParams(BaseModel): - """Optional input parameters for Speechmatics TTS configuration.""" + """Optional input parameters for Speechmatics TTS configuration. - pass + Parameters: + max_retries: Maximum number of retries for TTS requests. Defaults to 5. + """ + + max_retries: int = 5 def __init__( self, @@ -151,8 +155,11 @@ class SpeechmaticsTTSService(TTSService): attempt=attempt, min_wait=0.25, max_wait=8.0, multiplier=0.5 ) + # Increment attempt + attempt += 1 + # Check if we've exceeded the maximum number of attempts - if backoff_time >= 8.0: + if attempt > self._params.max_retries: raise ValueError() # Report error frame @@ -163,9 +170,6 @@ class SpeechmaticsTTSService(TTSService): # Wait before retrying await asyncio.sleep(backoff_time) - # Increment attempt - attempt += 1 - # Retry continue From 217d7e9953de9af2cb2fe692a3522fbfc21f01a5 Mon Sep 17 00:00:00 2001 From: Sam Sykes Date: Tue, 11 Nov 2025 18:05:06 +0000 Subject: [PATCH 7/8] Fix for max attempts. --- src/pipecat/services/speechmatics/tts.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pipecat/services/speechmatics/tts.py b/src/pipecat/services/speechmatics/tts.py index b45c63ef7..76c7a1a8a 100644 --- a/src/pipecat/services/speechmatics/tts.py +++ b/src/pipecat/services/speechmatics/tts.py @@ -159,7 +159,7 @@ class SpeechmaticsTTSService(TTSService): attempt += 1 # Check if we've exceeded the maximum number of attempts - if attempt > self._params.max_retries: + if attempt >= self._params.max_retries: raise ValueError() # Report error frame From 8d21b54ef3e30cfdbc8fc15303740c04eb5dfbae Mon Sep 17 00:00:00 2001 From: Sam Sykes Date: Tue, 11 Nov 2025 18:24:08 +0000 Subject: [PATCH 8/8] Revert to `ErrorFrame`. --- src/pipecat/services/speechmatics/tts.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/pipecat/services/speechmatics/tts.py b/src/pipecat/services/speechmatics/tts.py index 76c7a1a8a..b8fe172e7 100644 --- a/src/pipecat/services/speechmatics/tts.py +++ b/src/pipecat/services/speechmatics/tts.py @@ -16,7 +16,6 @@ from pydantic import BaseModel from pipecat.frames.frames import ( ErrorFrame, - FatalErrorFrame, Frame, TTSAudioRawFrame, TTSStartedFrame, @@ -174,15 +173,16 @@ class SpeechmaticsTTSService(TTSService): continue except (ValueError, ArithmeticError): - yield FatalErrorFrame( - error=f"{self} Service unavailable [503] (attempts {attempt})" + yield ErrorFrame( + error=f"{self} Service unavailable [503] (attempts {attempt})", + fatal=True, ) return # != 200 : Error if response.status != 200: - yield FatalErrorFrame( - error=f"{self} Service unavailable [{response.status}]" + yield ErrorFrame( + error=f"{self} Service unavailable [{response.status}]", fatal=True ) return @@ -225,7 +225,7 @@ class SpeechmaticsTTSService(TTSService): break except Exception as e: - yield ErrorFrame(error=f"{self}: Error generating TTS: {e}") + yield ErrorFrame(error=f"{self}: Error generating TTS: {e}", fatal=True) finally: # Emit the TTS stopped frame yield TTSStoppedFrame()