Propagate Azure TTS/STT cancellation errors to the pipeline
Azure TTS _handle_canceled was putting None (the normal completion signal) into the audio queue for all cancellation reasons, so run_tts treated errors identically to success—silently producing no audio. Now error cancellations put an Exception marker in the queue, which run_tts converts to an ErrorFrame. Azure STT had no canceled event handler at all, so auth failures, network errors, and rate-limit cancellations were invisible. Added _on_handle_canceled which pushes an ErrorFrame upstream via push_error. Fixes pipecat-ai/pipecat#3892
This commit is contained in:
1
changelog/3893.fixed.md
Normal file
1
changelog/3893.fixed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed Azure TTS and STT services silently swallowing cancellation errors (invalid API key, network failures, rate limiting) instead of propagating them as `ErrorFrame`s to the pipeline.
|
||||
@@ -35,6 +35,7 @@ from pipecat.utils.tracing.service_decorators import traced_stt
|
||||
|
||||
try:
|
||||
from azure.cognitiveservices.speech import (
|
||||
CancellationReason,
|
||||
ResultReason,
|
||||
SpeechConfig,
|
||||
SpeechRecognizer,
|
||||
@@ -209,6 +210,7 @@ class AzureSTTService(STTService):
|
||||
)
|
||||
self._speech_recognizer.recognizing.connect(self._on_handle_recognizing)
|
||||
self._speech_recognizer.recognized.connect(self._on_handle_recognized)
|
||||
self._speech_recognizer.canceled.connect(self._on_handle_canceled)
|
||||
self._speech_recognizer.start_continuous_recognition_async()
|
||||
except Exception as e:
|
||||
await self.push_error(
|
||||
@@ -280,3 +282,13 @@ class AzureSTTService(STTService):
|
||||
result=event,
|
||||
)
|
||||
asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop())
|
||||
|
||||
def _on_handle_canceled(self, event):
|
||||
details = event.result.cancellation_details
|
||||
if details.reason == CancellationReason.Error:
|
||||
error_msg = f"Azure STT recognition canceled: {details.reason}"
|
||||
if details.error_details:
|
||||
error_msg += f" - {details.error_details}"
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.push_error(error_msg=error_msg), self.get_event_loop()
|
||||
)
|
||||
|
||||
@@ -561,9 +561,13 @@ class AzureTTSService(TTSService, AzureBaseTTSService):
|
||||
# User cancellation (from interruption) is expected, not an error
|
||||
if reason == CancellationReason.CancelledByUser:
|
||||
logger.debug(f"{self}: Speech synthesis canceled by user (interruption)")
|
||||
self._audio_queue.put_nowait(None)
|
||||
else:
|
||||
logger.warning(f"{self}: Speech synthesis canceled: {reason}")
|
||||
self._audio_queue.put_nowait(None)
|
||||
details = evt.result.cancellation_details
|
||||
error_msg = f"Azure TTS synthesis canceled: {reason}"
|
||||
if details.error_details:
|
||||
error_msg += f" - {details.error_details}"
|
||||
self._audio_queue.put_nowait(Exception(error_msg))
|
||||
|
||||
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
|
||||
"""Push a frame and handle state changes.
|
||||
@@ -676,6 +680,9 @@ class AzureTTSService(TTSService, AzureBaseTTSService):
|
||||
chunk = await self._audio_queue.get()
|
||||
if chunk is None: # End of stream
|
||||
break
|
||||
if isinstance(chunk, Exception): # Error from _handle_canceled
|
||||
yield ErrorFrame(error=str(chunk))
|
||||
break
|
||||
|
||||
if self._first_chunk:
|
||||
await self.stop_ttfb_metrics()
|
||||
|
||||
Reference in New Issue
Block a user