diff --git a/src/pipecat/services/assemblyai/stt.py b/src/pipecat/services/assemblyai/stt.py index 9b7f2b915..3cbaea949 100644 --- a/src/pipecat/services/assemblyai/stt.py +++ b/src/pipecat/services/assemblyai/stt.py @@ -240,8 +240,7 @@ class AssemblyAISTTService(STTService): await self._websocket.close() except Exception as e: - logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error(exception=e) finally: self._websocket = None @@ -293,8 +292,7 @@ class AssemblyAISTTService(STTService): elif isinstance(parsed_message, TerminationMessage): await self._handle_termination(parsed_message) except Exception as e: - logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error(exception=e) async def _handle_termination(self, message: TerminationMessage): """Handle termination message.""" diff --git a/src/pipecat/services/asyncai/tts.py b/src/pipecat/services/asyncai/tts.py index 5507bccfc..b15b6b94b 100644 --- a/src/pipecat/services/asyncai/tts.py +++ b/src/pipecat/services/asyncai/tts.py @@ -240,8 +240,7 @@ class AsyncAITTSService(InterruptibleTTSService): logger.debug("Disconnecting from Async") await self._websocket.close() except Exception as e: - logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error(exception=e) finally: self._websocket = None self._started = False @@ -476,8 +475,7 @@ class AsyncAIHttpTTSService(TTSService): async with self._session.post(url, json=payload, headers=headers) as response: if response.status != 200: error_text = await response.text() - logger.error(f"Async API error: {error_text}") - await self.push_error(ErrorFrame(error=f"Async API error: {error_text}")) + await self.push_error(error_msg=f"Async API error: {error_text}") raise Exception(f"Async API returned status {response.status}: {error_text}") audio_data = await response.read() diff --git a/src/pipecat/services/aws/stt.py b/src/pipecat/services/aws/stt.py index 3ca782ed9..81140876d 100644 --- a/src/pipecat/services/aws/stt.py +++ b/src/pipecat/services/aws/stt.py @@ -288,8 +288,7 @@ class AWSTranscribeSTTService(STTService): await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error(exception=e) await self._disconnect() raise @@ -536,6 +535,5 @@ class AWSTranscribeSTTService(STTService): logger.error(f"{self} WebSocket connection closed in receive loop: {e}") break except Exception as e: - logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error(exception=e) break diff --git a/src/pipecat/services/cartesia/stt.py b/src/pipecat/services/cartesia/stt.py index 7494ea909..7a5a37725 100644 --- a/src/pipecat/services/cartesia/stt.py +++ b/src/pipecat/services/cartesia/stt.py @@ -284,8 +284,7 @@ class CartesiaSTTService(WebsocketSTTService): logger.debug("Disconnecting from Cartesia STT") await self._websocket.close() except Exception as e: - logger.error(f"{self} error closing websocket: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error(error_msg=f"{self} error closing websocket: {e}", exception=e) finally: self._websocket = None await self._call_event_handler("on_disconnected") diff --git a/src/pipecat/services/cartesia/tts.py b/src/pipecat/services/cartesia/tts.py index bf3747351..7bd7de3e1 100644 --- a/src/pipecat/services/cartesia/tts.py +++ b/src/pipecat/services/cartesia/tts.py @@ -409,8 +409,7 @@ class CartesiaTTSService(AudioContextWordTTSService): logger.debug("Disconnecting from Cartesia") await self._websocket.close() except Exception as e: - logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error(exception=e) finally: self._context_id = None self._websocket = None diff --git a/src/pipecat/services/deepgram/flux/stt.py b/src/pipecat/services/deepgram/flux/stt.py index 2154d3ed7..94010362f 100644 --- a/src/pipecat/services/deepgram/flux/stt.py +++ b/src/pipecat/services/deepgram/flux/stt.py @@ -250,8 +250,7 @@ class DeepgramFluxSTTService(WebsocketSTTService): logger.debug("Connected to Deepgram Flux Websocket") await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error(exception=e) self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") diff --git a/src/pipecat/services/elevenlabs/tts.py b/src/pipecat/services/elevenlabs/tts.py index bbe05f9dc..1fd99d575 100644 --- a/src/pipecat/services/elevenlabs/tts.py +++ b/src/pipecat/services/elevenlabs/tts.py @@ -424,8 +424,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService): json.dumps({"context_id": self._context_id, "close_context": True}) ) except Exception as e: - logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error(exception=e) self._context_id = None self._started = False @@ -553,8 +552,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService): await self._websocket.close() logger.debug("Disconnected from ElevenLabs") except Exception as e: - logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error(exception=e) finally: self._started = False self._context_id = None diff --git a/src/pipecat/services/fish/tts.py b/src/pipecat/services/fish/tts.py index da71f3829..950db6464 100644 --- a/src/pipecat/services/fish/tts.py +++ b/src/pipecat/services/fish/tts.py @@ -284,8 +284,7 @@ class FishAudioTTSService(InterruptibleTTSService): continue except Exception as e: - logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error(exception=e) @traced_tts async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: diff --git a/src/pipecat/services/gladia/stt.py b/src/pipecat/services/gladia/stt.py index 624745293..4ba359c06 100644 --- a/src/pipecat/services/gladia/stt.py +++ b/src/pipecat/services/gladia/stt.py @@ -468,8 +468,7 @@ class GladiaSTTService(STTService): break except Exception as e: - logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error(exception=e) self._connection_active = False if not self._should_reconnect: @@ -623,8 +622,7 @@ class GladiaSTTService(STTService): # Expected when closing the connection pass except Exception as e: - logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error(exception=e) async def _maybe_reconnect(self) -> bool: """Handle exponential backoff reconnection logic.""" diff --git a/src/pipecat/services/google/gemini_live/llm.py b/src/pipecat/services/google/gemini_live/llm.py index 7e0b0f494..ae34ae517 100644 --- a/src/pipecat/services/google/gemini_live/llm.py +++ b/src/pipecat/services/google/gemini_live/llm.py @@ -1174,7 +1174,7 @@ class GeminiLiveLLMService(LLMService): self._connection_task = self.create_task(self._connection_task_handler(config=config)) except Exception as e: - await self.push_error(ErrorFrame(error=f"{self} Initialization error: {e}")) + await self.push_error(exception=e) async def _connection_task_handler(self, config: LiveConnectConfig): async with self._client.aio.live.connect(model=self._model_name, config=config) as session: diff --git a/src/pipecat/services/google/stt.py b/src/pipecat/services/google/stt.py index beb5db740..6d78a1f54 100644 --- a/src/pipecat/services/google/stt.py +++ b/src/pipecat/services/google/stt.py @@ -774,8 +774,7 @@ class GoogleSTTService(STTService): yield cloud_speech.StreamingRecognizeRequest(audio=audio_data) except Exception as e: - logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error(exception=e) raise async def _stream_audio(self): @@ -813,8 +812,7 @@ class GoogleSTTService(STTService): self._stream_start_time = int(time.time() * 1000) except Exception as e: - logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error(exception=e) async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: """Process an audio chunk for STT transcription. diff --git a/src/pipecat/services/hume/tts.py b/src/pipecat/services/hume/tts.py index a3a7e9a4c..673b41b7c 100644 --- a/src/pipecat/services/hume/tts.py +++ b/src/pipecat/services/hume/tts.py @@ -216,8 +216,7 @@ class HumeTTSService(TTSService): self._audio_bytes = b"" except Exception as e: - logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error(exception=e) finally: # Ensure TTFB timer is stopped even on early failures await self.stop_ttfb_metrics() diff --git a/src/pipecat/services/inworld/tts.py b/src/pipecat/services/inworld/tts.py index dc2282b91..848e2043a 100644 --- a/src/pipecat/services/inworld/tts.py +++ b/src/pipecat/services/inworld/tts.py @@ -392,8 +392,7 @@ class InworldTTSService(TTSService): # STEP 7: ERROR HANDLING # ================================================================================ # Log any unexpected errors and notify the pipeline - logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error(exception=e) finally: # ================================================================================ # STEP 8: CLEANUP AND COMPLETION diff --git a/src/pipecat/services/lmnt/tts.py b/src/pipecat/services/lmnt/tts.py index ebcad0f20..27f215b47 100644 --- a/src/pipecat/services/lmnt/tts.py +++ b/src/pipecat/services/lmnt/tts.py @@ -214,8 +214,7 @@ class LmntTTSService(InterruptibleTTSService): await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error(exception=e) self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") @@ -266,10 +265,9 @@ class LmntTTSService(InterruptibleTTSService): try: msg = json.loads(message) if "error" in msg: - logger.error(f"{self} error: {msg['error']}") await self.push_frame(TTSStoppedFrame()) await self.stop_all_metrics() - await self.push_error(ErrorFrame(error=f"{self} error: {msg['error']}")) + await self.push_error(error_msg=f"{self} error: {msg['error']}") return except json.JSONDecodeError: logger.error(f"Invalid JSON message: {message}") diff --git a/src/pipecat/services/neuphonic/tts.py b/src/pipecat/services/neuphonic/tts.py index 60b0ebcb1..3faedee40 100644 --- a/src/pipecat/services/neuphonic/tts.py +++ b/src/pipecat/services/neuphonic/tts.py @@ -285,8 +285,7 @@ class NeuphonicTTSService(InterruptibleTTSService): await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error(exception=e) self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") diff --git a/src/pipecat/services/openai/realtime/llm.py b/src/pipecat/services/openai/realtime/llm.py index 8eaa3d6fa..ab5079f68 100644 --- a/src/pipecat/services/openai/realtime/llm.py +++ b/src/pipecat/services/openai/realtime/llm.py @@ -478,7 +478,7 @@ class OpenAIRealtimeLLMService(LLMService): # it is to recover from a send-side error with proper state management, and that exponential # backoff for retries can have cost/stability implications for a service cluster, let's just # treat a send-side error as fatal. - await self.push_error(ErrorFrame(error=f"Error sending client event: {e}")) + await self.push_error(error_msg=f"Error sending client event: {e}", exception=e) async def _update_settings(self): settings = self._session_properties @@ -759,7 +759,7 @@ class OpenAIRealtimeLLMService(LLMService): async def _handle_evt_error(self, evt): # Errors are fatal to this connection. Send an ErrorFrame. - await self.push_error(ErrorFrame(error=f"Error: {evt}")) + await self.push_error(error_msg=f"Error: {evt}") # # state and client events for the current conversation diff --git a/src/pipecat/services/openai_realtime_beta/openai.py b/src/pipecat/services/openai_realtime_beta/openai.py index af0600882..f572f15ca 100644 --- a/src/pipecat/services/openai_realtime_beta/openai.py +++ b/src/pipecat/services/openai_realtime_beta/openai.py @@ -454,7 +454,7 @@ class OpenAIRealtimeBetaLLMService(LLMService): # it is to recover from a send-side error with proper state management, and that exponential # backoff for retries can have cost/stability implications for a service cluster, let's just # treat a send-side error as fatal. - await self.push_error(ErrorFrame(error=f"Error sending client event: {e}")) + await self.push_error(error_msg=f"Error sending client event: {e}", exception=e) async def _update_settings(self): settings = self._session_properties @@ -685,7 +685,7 @@ class OpenAIRealtimeBetaLLMService(LLMService): async def _handle_evt_error(self, evt): # Errors are fatal to this connection. Send an ErrorFrame. - await self.push_error(ErrorFrame(error=f"Error: {evt}")) + await self.push_error(error_msg=f"Error: {evt}") async def _handle_assistant_output(self, output): # We haven't seen intermixed audio and function_call items in the same response. But let's diff --git a/src/pipecat/services/playht/tts.py b/src/pipecat/services/playht/tts.py index 2c40065aa..9323755c4 100644 --- a/src/pipecat/services/playht/tts.py +++ b/src/pipecat/services/playht/tts.py @@ -266,8 +266,7 @@ class PlayHTTTSService(InterruptibleTTSService): self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") except Exception as e: - logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error(exception=e) self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") @@ -352,7 +351,7 @@ class PlayHTTTSService(InterruptibleTTSService): self._request_id = None elif "error" in msg: logger.error(f"{self} error: {msg}") - await self.push_error(ErrorFrame(error=f"{self} error: {msg['error']}")) + await self.push_error(error_msg=f"{self} error: {msg['error']}") except json.JSONDecodeError: logger.error(f"Invalid JSON message: {message}") diff --git a/src/pipecat/services/rime/tts.py b/src/pipecat/services/rime/tts.py index 7b62f20fa..e11160a73 100644 --- a/src/pipecat/services/rime/tts.py +++ b/src/pipecat/services/rime/tts.py @@ -259,8 +259,7 @@ class RimeTTSService(AudioContextWordTTSService): await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error(exception=e) self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") @@ -366,10 +365,9 @@ class RimeTTSService(AudioContextWordTTSService): logger.debug(f"Updated cumulative time to: {self._cumulative_time}") elif msg["type"] == "error": - logger.error(f"{self} error: {msg}") await self.push_frame(TTSStoppedFrame()) await self.stop_all_metrics() - await self.push_error(ErrorFrame(error=f"{self} error: {msg['message']}")) + await self.push_error(error_msg=f"{self} error: {msg['message']}") self._context_id = None async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): diff --git a/src/pipecat/services/sarvam/stt.py b/src/pipecat/services/sarvam/stt.py index 98e643fde..ec5c6702f 100644 --- a/src/pipecat/services/sarvam/stt.py +++ b/src/pipecat/services/sarvam/stt.py @@ -275,8 +275,7 @@ class SarvamSTTService(STTService): await self._socket_client.translate(**method_kwargs) except Exception as e: - logger.error(f"Error sending audio to Sarvam: {e}") - await self.push_error(ErrorFrame(f"Failed to send audio: {e}")) + await self.push_error(error_msg=f"Error sending audio to Sarvam: {e}", exception=e) yield None @@ -332,13 +331,11 @@ class SarvamSTTService(STTService): logger.info("Connected to Sarvam successfully") except ApiError as e: - logger.error(f"Sarvam API error: {e}") - await self.push_error(ErrorFrame(f"Sarvam API error: {e}")) + await self.push_error(error_msg=f"Sarvam API error: {e}", exception=e) except Exception as e: - logger.error(f"Failed to connect to Sarvam: {e}") self._socket_client = None self._websocket_context = None - await self.push_error(ErrorFrame(f"Failed to connect to Sarvam: {e}")) + await self.push_error(error_msg=f"Failed to connect to Sarvam: {e}", exception=e) async def _disconnect(self): """Disconnect from Sarvam WebSocket API using SDK.""" @@ -427,8 +424,7 @@ class SarvamSTTService(STTService): await self.stop_processing_metrics() except Exception as e: - logger.error(f"Error handling Sarvam message: {e}") - await self.push_error(ErrorFrame(f"Failed to handle message: {e}")) + await self.push_error(error_msg=f"Failed to handle message: {e}", exception=e) await self.stop_all_metrics() @traced_stt diff --git a/src/pipecat/services/sarvam/tts.py b/src/pipecat/services/sarvam/tts.py index 127a0d589..341e2dbf6 100644 --- a/src/pipecat/services/sarvam/tts.py +++ b/src/pipecat/services/sarvam/tts.py @@ -254,8 +254,7 @@ class SarvamHttpTTSService(TTSService): async with self._session.post(url, json=payload, headers=headers) as response: if response.status != 200: error_text = await response.text() - logger.error(f"Sarvam API error: {error_text}") - await self.push_error(ErrorFrame(error=f"Sarvam API error: {error_text}")) + await self.push_error(error_msg=f"Sarvam API error: {error_text}") return response_data = await response.json() @@ -264,8 +263,7 @@ class SarvamHttpTTSService(TTSService): # Decode base64 audio data if "audios" not in response_data or not response_data["audios"]: - logger.error("No audio data received from Sarvam API") - await self.push_error(ErrorFrame(error="No audio data received")) + await self.push_error(error_msg="No audio data received") return # Get the first audio (there should be only one for single text input) @@ -560,8 +558,7 @@ class SarvamTTSService(InterruptibleTTSService): await self._disconnect_websocket() except Exception as e: - logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error(exception=e) finally: # Reset state only after everything is cleaned up self._started = False @@ -602,8 +599,7 @@ class SarvamTTSService(InterruptibleTTSService): await self._websocket.send(json.dumps(config_message)) logger.debug("Configuration sent successfully") except Exception as e: - logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error(exception=e) raise async def _disconnect_websocket(self): diff --git a/src/pipecat/services/soniox/stt.py b/src/pipecat/services/soniox/stt.py index b4bcb7ba1..2769800f9 100644 --- a/src/pipecat/services/soniox/stt.py +++ b/src/pipecat/services/soniox/stt.py @@ -327,8 +327,7 @@ class SonioxSTTService(STTService): # Expected when closing the connection logger.debug("WebSocket connection closed, keepalive task stopped.") except Exception as e: - logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error(exception=e) async def _receive_task_handler(self): if not self._websocket: @@ -404,14 +403,7 @@ class SonioxSTTService(STTService): if error_code or error_message: # In case of error, still send the final transcript (if any remaining in the buffer). await send_endpoint_transcript() - logger.error( - f"{self} error: {error_code} (_receive_task_handler) - {error_message}" - ) - await self.push_error( - ErrorFrame( - error=f"{self} error: {error_code} (_receive_task_handler) - {error_message}" - ) - ) + await self.push_error(error_msg=f"{self} error: {error_code} (_receive_task_handler) - {error_message}") finished = content.get("finished") if finished: diff --git a/src/pipecat/services/speechmatics/stt.py b/src/pipecat/services/speechmatics/stt.py index 2d95bd69e..da3c0652a 100644 --- a/src/pipecat/services/speechmatics/stt.py +++ b/src/pipecat/services/speechmatics/stt.py @@ -514,8 +514,7 @@ class SpeechmaticsSTTService(STTService): self._client.send_message(payload), self.get_event_loop() ) except Exception as e: - logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error(exception=e) raise RuntimeError(f"error sending message to STT: {e}") async def _connect(self) -> None: @@ -596,8 +595,7 @@ class SpeechmaticsSTTService(STTService): except asyncio.TimeoutError: logger.warning(f"{self} Timeout while closing Speechmatics client connection") except Exception as e: - logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error(exception=e) finally: self._client = None await self._call_event_handler("on_disconnected") diff --git a/src/pipecat/services/stt_service.py b/src/pipecat/services/stt_service.py index 6fb96c571..a2e1c4837 100644 --- a/src/pipecat/services/stt_service.py +++ b/src/pipecat/services/stt_service.py @@ -329,4 +329,4 @@ class WebsocketSTTService(STTService, WebsocketService): async def _report_error(self, error: ErrorFrame): await self._call_event_handler("on_connection_error", error.error) - await self.push_error(error) + await self.push_error_frame(error) diff --git a/src/pipecat/services/tts_service.py b/src/pipecat/services/tts_service.py index f0d602a40..62a2e22af 100644 --- a/src/pipecat/services/tts_service.py +++ b/src/pipecat/services/tts_service.py @@ -671,7 +671,7 @@ class WebsocketTTSService(TTSService, WebsocketService): async def _report_error(self, error: ErrorFrame): await self._call_event_handler("on_connection_error", error.error) - await self.push_error(error) + await self.push_error_frame(error) class InterruptibleTTSService(WebsocketTTSService): @@ -733,7 +733,7 @@ class WebsocketWordTTSService(WordTTSService, WebsocketService): async def _report_error(self, error: ErrorFrame): await self._call_event_handler("on_connection_error", error.error) - await self.push_error(error) + await self.push_error_frame(error) class InterruptibleWordTTSService(WebsocketWordTTSService): diff --git a/src/pipecat/services/ultravox/stt.py b/src/pipecat/services/ultravox/stt.py index 14eaebf6a..49660f988 100644 --- a/src/pipecat/services/ultravox/stt.py +++ b/src/pipecat/services/ultravox/stt.py @@ -246,8 +246,7 @@ class UltravoxSTTService(AIService): logger.info("Model warm-up completed successfully") except Exception as e: - logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error(exception=e) def _generate_silent_audio(self, sample_rate=16000, duration_sec=1.0): """Generate silent audio as a numpy array. diff --git a/src/pipecat/transports/daily/transport.py b/src/pipecat/transports/daily/transport.py index 51ef637b8..83d5ed888 100644 --- a/src/pipecat/transports/daily/transport.py +++ b/src/pipecat/transports/daily/transport.py @@ -2506,13 +2506,10 @@ class DailyTransport(BaseTransport): async def _on_error(self, error): """Handle error events and push error frames.""" await self._call_event_handler("on_error", error) - # Push error frame to notify the pipeline - error_frame = ErrorFrame(error) - if self._input: - await self._input.push_error(error_frame) + await self._input.push_error(error_msg=error) elif self._output: - await self._output.push_error(error_frame) + await self._input.push_error(error_msg=error) else: logger.error("Both input and output are None while trying to push error") raise Exception("No valid input or output channel to push error")