diff --git a/changelog/3893.fixed.md b/changelog/3893.fixed.md new file mode 100644 index 000000000..0209571e3 --- /dev/null +++ b/changelog/3893.fixed.md @@ -0,0 +1 @@ +- Fixed Azure TTS and STT services silently swallowing cancellation errors (invalid API key, network failures, rate limiting) instead of propagating them as `ErrorFrame`s to the pipeline. diff --git a/src/pipecat/services/azure/stt.py b/src/pipecat/services/azure/stt.py index c6cb96d2e..5533e350e 100644 --- a/src/pipecat/services/azure/stt.py +++ b/src/pipecat/services/azure/stt.py @@ -35,6 +35,7 @@ from pipecat.utils.tracing.service_decorators import traced_stt try: from azure.cognitiveservices.speech import ( + CancellationReason, ResultReason, SpeechConfig, SpeechRecognizer, @@ -209,6 +210,7 @@ class AzureSTTService(STTService): ) self._speech_recognizer.recognizing.connect(self._on_handle_recognizing) self._speech_recognizer.recognized.connect(self._on_handle_recognized) + self._speech_recognizer.canceled.connect(self._on_handle_canceled) self._speech_recognizer.start_continuous_recognition_async() except Exception as e: await self.push_error( @@ -280,3 +282,13 @@ class AzureSTTService(STTService): result=event, ) asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop()) + + def _on_handle_canceled(self, event): + details = event.result.cancellation_details + if details.reason == CancellationReason.Error: + error_msg = f"Azure STT recognition canceled: {details.reason}" + if details.error_details: + error_msg += f" - {details.error_details}" + asyncio.run_coroutine_threadsafe( + self.push_error(error_msg=error_msg), self.get_event_loop() + ) diff --git a/src/pipecat/services/azure/tts.py b/src/pipecat/services/azure/tts.py index f68694eb5..6e62c73bf 100644 --- a/src/pipecat/services/azure/tts.py +++ b/src/pipecat/services/azure/tts.py @@ -561,9 +561,13 @@ class AzureTTSService(TTSService, AzureBaseTTSService): # User cancellation (from interruption) is expected, not an error if reason == CancellationReason.CancelledByUser: logger.debug(f"{self}: Speech synthesis canceled by user (interruption)") + self._audio_queue.put_nowait(None) else: - logger.warning(f"{self}: Speech synthesis canceled: {reason}") - self._audio_queue.put_nowait(None) + details = evt.result.cancellation_details + error_msg = f"Azure TTS synthesis canceled: {reason}" + if details.error_details: + error_msg += f" - {details.error_details}" + self._audio_queue.put_nowait(Exception(error_msg)) async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): """Push a frame and handle state changes. @@ -676,6 +680,9 @@ class AzureTTSService(TTSService, AzureBaseTTSService): chunk = await self._audio_queue.get() if chunk is None: # End of stream break + if isinstance(chunk, Exception): # Error from _handle_canceled + yield ErrorFrame(error=str(chunk)) + break if self._first_chunk: await self.stop_ttfb_metrics()