Compare commits

...

14 Commits

Author SHA1 Message Date
Mark Backman
b57a9d2546 Review of services (AsyncGenerator, Fatal=True) 2025-11-26 12:15:23 -05:00
Mark Backman
54ff49c4fc Add exception docstring 2025-11-26 12:01:20 -05:00
Filipi da Silva Fuchter
8549331b32 Merge pull request #3123 from pipecat-ai/mb/improve-error-handler-review
Use push_error in services
2025-11-26 12:04:27 -03:00
Filipi Fuchter
6b38510ace Restoring to use yield ErrorFrame 2025-11-26 12:03:52 -03:00
Mark Backman
bbd5c3b487 Update push_error_frame to log error 2025-11-26 08:46:03 -05:00
Mark Backman
c6fcf58be4 Use push_error in services 2025-11-24 18:10:08 -05:00
Filipi Fuchter
a475d09b90 Replace all logger.exception to logger.error. 2025-11-19 16:21:57 -03:00
Filipi Fuchter
4b11bcb4a4 Fixing ruff format. 2025-11-19 15:56:39 -03:00
Filipi Fuchter
c1a1d40f9a Refactoring services to use push_error. 2025-11-19 15:54:43 -03:00
Filipi Fuchter
ede995c563 Merge branch 'main' into filipi/improve_error_handler 2025-11-19 15:42:35 -03:00
Filipi Fuchter
fdf3c8b4cf Refactoring the services to use push_error and push_error_frame 2025-11-18 18:43:30 -03:00
Filipi Fuchter
50bef86d33 Refactoring the services to use push_error and push_error_frame 2025-11-18 18:22:45 -03:00
Filipi Fuchter
79f43ece74 Creating push_error_frame 2025-11-18 18:20:52 -03:00
Filipi Fuchter
08b2365244 Starting to refactor how we are handling the errors. 2025-11-18 17:50:47 -03:00
73 changed files with 205 additions and 325 deletions

View File

@@ -807,11 +807,13 @@ class ErrorFrame(SystemFrame):
error: Description of the error that occurred.
fatal: Whether the error is fatal and requires bot shutdown.
processor: The frame processor that generated the error.
exception: The exception that occurred.
"""
error: str
fatal: bool = False
processor: Optional["FrameProcessor"] = None
exception: Optional[Exception] = None
def __str__(self):
return f"{self.name}(error: {self.error}, fatal: {self.fatal})"

View File

@@ -126,6 +126,4 @@ class WakeCheckFilter(FrameProcessor):
else:
await self.push_frame(frame, direction)
except Exception as e:
error_msg = f"Error in wake word filter: {e}"
logger.exception(error_msg)
await self.push_error(ErrorFrame(error_msg))
await self.push_error(error_msg=f"Error in wake word filter: {e}", exception=e)

View File

@@ -142,6 +142,7 @@ class FrameProcessor(BaseObject):
- on_after_process_frame: Called after a frame is processed
- on_before_push_frame: Called before a frame is pushed
- on_after_push_frame: Called after a frame is pushed
- on_error: Called when an error is raised in the frame processing.
"""
def __init__(
@@ -234,6 +235,7 @@ class FrameProcessor(BaseObject):
self._register_event_handler("on_after_process_frame", sync=True)
self._register_event_handler("on_before_push_frame", sync=True)
self._register_event_handler("on_after_push_frame", sync=True)
self._register_event_handler("on_error", sync=True)
@property
def id(self) -> int:
@@ -630,7 +632,45 @@ class FrameProcessor(BaseObject):
elif isinstance(frame, (FrameProcessorResumeFrame, FrameProcessorResumeUrgentFrame)):
await self.__resume(frame)
async def push_error(self, error: ErrorFrame):
async def push_error(
self,
error_msg: Optional[str] = None,
exception: Optional[Exception] = None,
fatal: bool = False,
):
"""Creates and pushes an ErrorFrame upstream.
Creates and pushes an ErrorFrame upstream to notify other processors in the
pipeline about an error condition. The error frame will include context about
which processor generated the error.
Args:
error_msg: Optional descriptive message explaining the error condition.
exception: Optional exception object that caused the error, if available.
This provides additional context for debugging and error handling.
fatal: Whether this error should be considered fatal to the pipeline.
Fatal errors typically cause the entire pipeline to stop processing.
Defaults to False for non-fatal errors.
Example:
```python
# Non-fatal error
await self.push_error("Failed to process audio chunk, skipping")
# Fatal error with exception context
try:
result = some_critical_operation()
except Exception as e:
await self.push_error("Critical operation failed", exception=e, fatal=True)
```
"""
error_message = error_msg or f"{self} exception: {exception}"
error_frame = ErrorFrame(
error=error_message, fatal=fatal, exception=exception, processor=self
)
await self.push_error_frame(error=error_frame)
async def push_error_frame(self, error: ErrorFrame):
"""Push an error frame upstream.
Args:
@@ -638,6 +678,8 @@ class FrameProcessor(BaseObject):
"""
if not error.processor:
error.processor = self
await self._call_event_handler("on_error", error)
logger.error(error.error)
await self.push_frame(error, FrameDirection.UPSTREAM)
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
@@ -759,8 +801,10 @@ class FrameProcessor(BaseObject):
await self.__cancel_process_task()
self.__create_process_task()
except Exception as e:
logger.exception(f"Uncaught exception in {self} when handling _start_interruption: {e}")
await self.push_error(ErrorFrame(str(e)))
await self.push_error(
error_msg=f"Uncaught exception in {self} when handling _start_interruption: {e}",
exception=e,
)
async def __internal_push_frame(self, frame: Frame, direction: FrameDirection):
"""Internal method to push frames to adjacent processors.
@@ -797,8 +841,7 @@ class FrameProcessor(BaseObject):
await self._observer.on_push_frame(data)
await self._prev.queue_frame(frame, direction)
except Exception as e:
logger.exception(f"Uncaught exception in {self}: {e}")
await self.push_error(ErrorFrame(str(e)))
await self.push_error(exception=e)
def _check_started(self, frame: Frame):
"""Check if the processor has been started.
@@ -874,8 +917,7 @@ class FrameProcessor(BaseObject):
await self._call_event_handler("on_after_process_frame", frame)
except Exception as e:
logger.exception(f"{self}: error processing frame: {e}")
await self.push_error(ErrorFrame(str(e)))
await self.push_error(error_msg=f"{self}: error processing frame: {e}", exception=e)
async def __input_frame_task_handler(self):
"""Handle frames from the input queue.

View File

@@ -24,7 +24,7 @@ try:
from langchain_core.messages import AIMessageChunk
from langchain_core.runnables import Runnable
except ModuleNotFoundError as e:
logger.exception("In order to use Langchain, you need to `pip install pipecat-ai[langchain]`. ")
logger.error("In order to use Langchain, you need to `pip install pipecat-ai[langchain]`. ")
raise Exception(f"Missing module: {e}")
@@ -113,6 +113,6 @@ class LangchainProcessor(FrameProcessor):
except GeneratorExit:
logger.warning(f"{self} generator was closed prematurely")
except Exception as e:
logger.exception(f"{self} an unknown error occurred: {e}")
await self.push_error(exception=e)
finally:
await self.push_frame(LLMFullResponseEndFrame())

View File

@@ -23,7 +23,7 @@ try:
from strands import Agent
from strands.multiagent.graph import Graph
except ModuleNotFoundError as e:
logger.exception("In order to use Strands Agents, you need to `pip install strands-agents`.")
logger.error("In order to use Strands Agents, you need to `pip install strands-agents`.")
raise Exception(f"Missing module: {e}")
@@ -143,7 +143,7 @@ class StrandsAgentsProcessor(FrameProcessor):
except GeneratorExit:
logger.warning(f"{self} generator was closed prematurely")
except Exception as e:
logger.exception(f"{self} an unknown error occurred: {e}")
await self.push_error(exception=e)
finally:
if ttfb_tracking:
await self.stop_ttfb_metrics()

View File

@@ -199,7 +199,7 @@ class PlivoFrameSerializer(FrameSerializer):
)
except Exception as e:
logger.exception(f"Failed to hang up Plivo call: {e}")
logger.error(f"Failed to hang up Plivo call: {e}")
async def deserialize(self, data: str | bytes) -> Frame | None:
"""Deserializes Plivo WebSocket data to Pipecat frames.

View File

@@ -225,7 +225,7 @@ class TelnyxFrameSerializer(FrameSerializer):
)
except Exception as e:
logger.exception(f"Failed to hang up Telnyx call: {e}")
logger.error(f"Failed to hang up Telnyx call: {e}")
async def deserialize(self, data: str | bytes) -> Frame | None:
"""Deserializes Telnyx WebSocket data to Pipecat frames.

View File

@@ -236,7 +236,7 @@ class TwilioFrameSerializer(FrameSerializer):
)
except Exception as e:
logger.exception(f"Failed to hang up Twilio call: {e}")
logger.error(f"Failed to hang up Twilio call: {e}")
async def deserialize(self, data: str | bytes) -> Frame | None:
"""Deserializes Twilio WebSocket data to Pipecat frames.

View File

@@ -166,6 +166,6 @@ class AIService(FrameProcessor):
async for f in generator:
if f:
if isinstance(f, ErrorFrame):
await self.push_error(f)
await self.push_error_frame(f)
else:
await self.push_frame(f)

View File

@@ -458,8 +458,7 @@ class AnthropicLLMService(LLMService):
except httpx.TimeoutException:
await self._call_event_handler("on_completion_timeout")
except Exception as e:
logger.exception(f"{self} exception: {e}")
await self.push_error(ErrorFrame(f"{e}"))
await self.push_error(exception=e)
finally:
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())

View File

@@ -206,9 +206,8 @@ class AssemblyAISTTService(STTService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
self._connected = False
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
raise
async def _disconnect(self):
@@ -233,8 +232,7 @@ class AssemblyAISTTService(STTService):
logger.warning("Timed out waiting for termination message from server")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
if self._receive_task:
await self.cancel_task(self._receive_task)
@@ -242,8 +240,7 @@ class AssemblyAISTTService(STTService):
await self._websocket.close()
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
finally:
self._websocket = None
@@ -262,13 +259,11 @@ class AssemblyAISTTService(STTService):
except websockets.exceptions.ConnectionClosedOK:
break
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
break
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
def _parse_message(self, message: Dict[str, Any]) -> BaseMessage:
"""Parse a raw message into the appropriate message type."""
@@ -297,8 +292,7 @@ class AssemblyAISTTService(STTService):
elif isinstance(parsed_message, TerminationMessage):
await self._handle_termination(parsed_message)
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
async def _handle_termination(self, message: TerminationMessage):
"""Handle termination message."""

View File

@@ -228,8 +228,7 @@ class AsyncAITTSService(InterruptibleTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -241,8 +240,7 @@ class AsyncAITTSService(InterruptibleTTSService):
logger.debug("Disconnecting from Async")
await self._websocket.close()
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
finally:
self._websocket = None
self._started = False
@@ -287,12 +285,11 @@ class AsyncAITTSService(InterruptibleTTSService):
)
await self.push_frame(frame)
elif msg.get("error_code"):
logger.error(f"{self} error: {msg}")
await self.push_frame(TTSStoppedFrame())
await self.stop_all_metrics()
await self.push_error(ErrorFrame(error=f"{self} error: {msg['message']}"))
await self.push_error(error_msg=f"{self} error: {msg['message']}")
else:
logger.error(f"{self} error, unknown message type: {msg}")
await self.push_error(error_msg=f"{self} error, unknown message type: {msg}")
async def _keepalive_task_handler(self):
"""Send periodic keepalive messages to maintain WebSocket connection."""
@@ -335,7 +332,6 @@ class AsyncAITTSService(InterruptibleTTSService):
await self._get_websocket().send(msg)
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
@@ -343,7 +339,6 @@ class AsyncAITTSService(InterruptibleTTSService):
return
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
@@ -477,8 +472,7 @@ class AsyncAIHttpTTSService(TTSService):
async with self._session.post(url, json=payload, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Async API error: {error_text}")
await self.push_error(ErrorFrame(error=f"Async API error: {error_text}"))
yield ErrorFrame(error=f"Async API error: {error_text}")
raise Exception(f"Async API returned status {response.status}: {error_text}")
audio_data = await response.read()
@@ -494,8 +488,7 @@ class AsyncAIHttpTTSService(TTSService):
yield frame
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
yield ErrorFrame(error=f"{self} error: {e}")
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -1136,7 +1136,7 @@ class AWSBedrockLLMService(LLMService):
except (ReadTimeoutError, asyncio.TimeoutError):
await self._call_event_handler("on_completion_timeout")
except Exception as e:
logger.exception(f"{self} exception: {e}")
await self.push_error(exception=e)
finally:
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())

View File

@@ -452,7 +452,7 @@ class AWSNovaSonicLLMService(LLMService):
self._ready_to_send_context = True
await self._finish_connecting_if_context_available()
except Exception as e:
logger.error(f"{self} initialization error: {e}")
await self.push_error(exception=e)
await self._disconnect()
async def _process_completed_function_calls(self, send_new_results: bool):
@@ -576,7 +576,7 @@ class AWSNovaSonicLLMService(LLMService):
logger.info("Finished disconnecting")
except Exception as e:
logger.error(f"{self} error disconnecting: {e}")
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
def _create_client(self) -> BedrockRuntimeClient:
config = Config(
@@ -884,7 +884,7 @@ class AWSNovaSonicLLMService(LLMService):
# Errors are kind of expected while disconnecting, so just
# ignore them and do nothing
return
logger.error(f"{self} error processing responses: {e}")
await self.push_error(exception=e)
if self._wants_connection:
await self.reset_conversation()

View File

@@ -140,8 +140,7 @@ class AWSTranscribeSTTService(STTService):
return
logger.warning("WebSocket connection not established after connect")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
retry_count += 1
if retry_count < max_retries:
await asyncio.sleep(1) # Wait before retrying
@@ -182,7 +181,6 @@ class AWSTranscribeSTTService(STTService):
try:
await self._connect()
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
return
@@ -200,12 +198,10 @@ class AWSTranscribeSTTService(STTService):
await self._disconnect()
# Don't yield error here - we'll retry on next frame
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
await self._disconnect()
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
await self._disconnect()
@@ -289,8 +285,7 @@ class AWSTranscribeSTTService(STTService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
await self._disconnect()
raise
@@ -310,8 +305,7 @@ class AWSTranscribeSTTService(STTService):
await self._ws_client.send(json.dumps(end_stream))
await self._ws_client.close()
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
finally:
self._ws_client = None
await self._call_event_handler("on_disconnected")
@@ -529,15 +523,15 @@ class AWSTranscribeSTTService(STTService):
)
elif headers.get(":message-type") == "exception":
error_msg = payload.get("Message", "Unknown error")
logger.error(f"{self} Exception from AWS: {error_msg}")
await self.push_frame(ErrorFrame(f"AWS Transcribe error: {error_msg}"))
await self.push_error(error_msg=f"AWS Transcribe error: {error_msg}")
else:
logger.debug(f"{self} Other message type received: {headers}")
logger.debug(f"{self} Payload: {payload}")
except websockets.exceptions.ConnectionClosed as e:
logger.error(f"{self} WebSocket connection closed in receive loop: {e}")
await self.push_error(
error_msg=f"WebSocket connection closed in receive loop", exception=e
)
break
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
break

View File

@@ -312,7 +312,6 @@ class AWSPollyTTSService(TTSService):
yield TTSStoppedFrame()
except (BotoCoreError, ClientError) as error:
logger.exception(f"{self} error generating TTS: {error}")
error_message = f"AWS Polly TTS error: {str(error)}"
yield ErrorFrame(error=error_message)

View File

@@ -91,7 +91,6 @@ class AzureImageGenServiceREST(ImageGenService):
while status != "succeeded":
attempts_left -= 1
if attempts_left == 0:
logger.error(f"{self} error: image generation timed out")
yield ErrorFrame("Image generation timed out")
return
@@ -104,7 +103,6 @@ class AzureImageGenServiceREST(ImageGenService):
image_url = json_response["result"]["data"][0]["url"] if json_response else None
if not image_url:
logger.error(f"{self} error: image generation failed")
yield ErrorFrame("Image generation failed")
return

View File

@@ -61,5 +61,5 @@ class AzureRealtimeLLMService(OpenAIRealtimeLLMService):
)
self._receive_task = self.create_task(self._receive_task_handler())
except Exception as e:
logger.error(f"{self} initialization error: {e}")
await self.push_error(exception=e)
self._websocket = None

View File

@@ -121,7 +121,6 @@ class AzureSTTService(STTService):
self._audio_stream.write(audio)
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
async def start(self, frame: StartFrame):
@@ -151,8 +150,9 @@ class AzureSTTService(STTService):
self._speech_recognizer.recognized.connect(self._on_handle_recognized)
self._speech_recognizer.start_continuous_recognition_async()
except Exception as e:
logger.error(f"{self} exception during initialization: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(
error_msg=f"{self} exception during initialization: {e}", exception=e
)
async def stop(self, frame: EndFrame):
"""Stop the speech recognition service.

View File

@@ -327,7 +327,6 @@ class AzureTTSService(AzureBaseTTSService):
try:
if self._speech_synthesizer is None:
error_msg = "Speech synthesizer not initialized."
logger.error(error_msg)
yield ErrorFrame(error=error_msg)
return
@@ -355,14 +354,12 @@ class AzureTTSService(AzureBaseTTSService):
yield TTSStoppedFrame()
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
# Could add reconnection logic here if needed
return
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
@@ -440,5 +437,4 @@ class AzureHttpTTSService(AzureBaseTTSService):
cancellation_details = result.cancellation_details
logger.warning(f"Speech synthesis canceled: {cancellation_details.reason}")
if cancellation_details.reason == CancellationReason.Error:
logger.error(f"{self} error: {cancellation_details.error_details}")
yield ErrorFrame(error=f"{self} error: {cancellation_details.error_details}")

View File

@@ -276,8 +276,7 @@ class CartesiaSTTService(WebsocketSTTService):
self._websocket = await websocket_connect(ws_url, additional_headers=headers)
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
async def _disconnect_websocket(self):
try:
@@ -285,8 +284,7 @@ class CartesiaSTTService(WebsocketSTTService):
logger.debug("Disconnecting from Cartesia STT")
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"{self} error closing websocket: {e}", exception=e)
finally:
self._websocket = None
await self._call_event_handler("on_disconnected")
@@ -319,8 +317,7 @@ class CartesiaSTTService(WebsocketSTTService):
elif data["type"] == "error":
error_msg = data.get("message", "Unknown error")
logger.error(f"Cartesia error: {error_msg}")
await self.push_error(ErrorFrame(error=error_msg))
await self.push_error(error_msg=error_msg)
@traced_stt
async def _handle_transcription(

View File

@@ -397,8 +397,7 @@ class CartesiaTTSService(AudioContextWordTTSService):
)
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -410,8 +409,7 @@ class CartesiaTTSService(AudioContextWordTTSService):
logger.debug("Disconnecting from Cartesia")
await self._websocket.close()
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
finally:
self._context_id = None
self._websocket = None
@@ -464,13 +462,12 @@ class CartesiaTTSService(AudioContextWordTTSService):
)
await self.append_to_audio_context(msg["context_id"], frame)
elif msg["type"] == "error":
logger.error(f"{self} error: {msg}")
await self.push_frame(TTSStoppedFrame())
await self.stop_all_metrics()
await self.push_error(ErrorFrame(error=f"{self} error: {msg['error']}"))
await self.push_error(error_msg=f"{self} error: {msg}")
self._context_id = None
else:
logger.error(f"{self} error, unknown message type: {msg}")
await self.push_error(error_msg=f"{self} error, unknown message type: {msg}")
async def _receive_messages(self):
while True:
@@ -508,7 +505,6 @@ class CartesiaTTSService(AudioContextWordTTSService):
await self._get_websocket().send(msg)
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
@@ -516,7 +512,6 @@ class CartesiaTTSService(AudioContextWordTTSService):
return
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
@@ -708,8 +703,7 @@ class CartesiaHttpTTSService(TTSService):
async with session.post(url, json=payload, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Cartesia API error: {error_text}")
await self.push_error(ErrorFrame(error=f"Cartesia API error: {error_text}"))
yield ErrorFrame(error=f"Cartesia API error: {error_text}")
raise Exception(f"Cartesia API returned status {response.status}: {error_text}")
audio_data = await response.read()
@@ -725,8 +719,7 @@ class CartesiaHttpTTSService(TTSService):
yield frame
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
yield ErrorFrame(error=f"{self} error: {e}")
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -192,8 +192,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
try:
await self._disconnect_websocket()
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
finally:
# Reset state only after everything is cleaned up
self._websocket = None
@@ -251,8 +250,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
logger.debug("Connected to Deepgram Flux Websocket")
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -280,8 +278,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
logger.debug("Disconnecting from Deepgram Flux Websocket")
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"{self} error closing websocket: {e}", exception=e)
finally:
self._websocket = None
await self._call_event_handler("on_disconnected")
@@ -381,7 +378,6 @@ class DeepgramFluxSTTService(WebsocketSTTService):
are issues sending the audio data.
"""
if not self._websocket:
logger.error("Not connected to Deepgram Flux.")
yield ErrorFrame("Not connected to Deepgram Flux.")
return
@@ -389,7 +385,6 @@ class DeepgramFluxSTTService(WebsocketSTTService):
self._last_stt_time = time.monotonic()
await self.send_with_retry(audio, self._report_error)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
return
@@ -467,8 +462,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
# Skip malformed messages
continue
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
# Error will be handled inside WebsocketService->_receive_task_handler
raise
else:

View File

@@ -233,7 +233,7 @@ class DeepgramSTTService(STTService):
)
if not await self._connection.start(options=self._settings, addons=self._addons):
logger.error(f"{self}: unable to connect to Deepgram")
await self.push_error(error_msg=f"Unable to connect to Deepgram")
async def _disconnect(self):
if await self._connection.is_connected():
@@ -256,7 +256,7 @@ class DeepgramSTTService(STTService):
async def _on_error(self, *args, **kwargs):
error: ErrorResponse = kwargs["error"]
logger.warning(f"{self} connection error, will retry: {error}")
await self.push_error(ErrorFrame(error=f"{error}"))
await self.push_error(error_msg=f"{error}")
await self.stop_all_metrics()
# NOTE(aleix): we don't disconnect (i.e. call finish on the connection)
# because this triggers more errors internally in the Deepgram SDK. So,

View File

@@ -116,7 +116,6 @@ class DeepgramTTSService(TTSService):
yield TTSStoppedFrame()
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
@@ -227,5 +226,4 @@ class DeepgramHttpTTSService(TTSService):
yield TTSStoppedFrame()
except Exception as e:
logger.exception(f"{self} exception: {e}")
yield ErrorFrame(f"Error getting audio: {str(e)}")

View File

@@ -351,7 +351,6 @@ class ElevenLabsSTTService(SegmentedSTTService):
)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
@@ -586,7 +585,6 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
}
await self._websocket.send(json.dumps(message))
except Exception as e:
logger.error(f"Error sending audio: {e}")
yield ErrorFrame(f"ElevenLabs Realtime STT error: {str(e)}")
yield None
@@ -645,8 +643,9 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
await self._call_event_handler("on_connected")
logger.debug("Connected to ElevenLabs Realtime STT")
except Exception as e:
logger.error(f"{self}: unable to connect to ElevenLabs Realtime STT: {e}")
await self.push_error(ErrorFrame(f"Connection error: {str(e)}"))
await self.push_error(
error_msg=f"{self}: unable to connect to ElevenLabs Realtime STT: {e}", exception=e
)
async def _disconnect_websocket(self):
"""Disconnect from ElevenLabs Realtime STT WebSocket."""
@@ -655,7 +654,7 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
logger.debug("Disconnecting from ElevenLabs Realtime STT")
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
await self.push_error(error_msg=f"Error closing websocket: {e}", exception=e)
finally:
self._websocket = None
await self._call_event_handler("on_disconnected")
@@ -714,13 +713,11 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
elif message_type == "input_error":
error_msg = data.get("error", "Unknown input error")
logger.error(f"ElevenLabs input error: {error_msg}")
await self.push_error(ErrorFrame(f"Input error: {error_msg}"))
await self.push_error(error_msg=f"ElevenLabs input error: {error_msg}")
elif message_type in ["auth_error", "quota_exceeded", "transcriber_error", "error"]:
error_msg = data.get("error", data.get("message", "Unknown error"))
logger.error(f"ElevenLabs error ({message_type}): {error_msg}")
await self.push_error(ErrorFrame(f"{message_type}: {error_msg}"))
await self.push_error(error_msg=f"ElevenLabs error ({message_type}): {error_msg}")
else:
logger.debug(f"Unknown message type: {message_type}")

View File

@@ -424,8 +424,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
json.dumps({"context_id": self._context_id, "close_context": True})
)
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
self._context_id = None
self._started = False
@@ -536,9 +535,8 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
self._websocket = None
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
await self._call_event_handler("on_connection_error", f"{e}")
async def _disconnect_websocket(self):
@@ -553,8 +551,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
await self._websocket.close()
logger.debug("Disconnected from ElevenLabs")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
finally:
self._started = False
self._context_id = None
@@ -584,8 +581,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
json.dumps({"context_id": self._context_id, "close_context": True})
)
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
self._context_id = None
self._started = False
self._partial_word = ""
@@ -740,14 +736,12 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
else:
await self._send_text(text)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield TTSStoppedFrame()
yield ErrorFrame(error=f"{self} error: {e}")
self._started = False
return
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
@@ -1043,7 +1037,6 @@ class ElevenLabsHttpTTSService(WordTTSService):
) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"{self} error: {error_text}")
yield ErrorFrame(error=f"ElevenLabs API error: {error_text}")
return
@@ -1091,7 +1084,6 @@ class ElevenLabsHttpTTSService(WordTTSService):
logger.warning(f"Failed to parse JSON from stream: {e}")
continue
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
continue
@@ -1116,7 +1108,6 @@ class ElevenLabsHttpTTSService(WordTTSService):
self._previous_text = text
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
finally:
await self.stop_ttfb_metrics()

View File

@@ -110,7 +110,6 @@ class FalImageGenService(ImageGenService):
image_url = response["images"][0]["url"] if response else None
if not image_url:
logger.error(f"{self} error: image generation failed")
yield ErrorFrame("Image generation failed")
return

View File

@@ -290,5 +290,4 @@ class FalSTTService(SegmentedSTTService):
)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")

View File

@@ -228,8 +228,7 @@ class FishAudioTTSService(InterruptibleTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -243,8 +242,7 @@ class FishAudioTTSService(InterruptibleTTSService):
await self._websocket.send(ormsgpack.packb(stop_message))
await self._websocket.close()
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
finally:
self._request_id = None
self._started = False
@@ -286,8 +284,7 @@ class FishAudioTTSService(InterruptibleTTSService):
continue
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
@@ -323,7 +320,6 @@ class FishAudioTTSService(InterruptibleTTSService):
flush_message = {"event": "flush"}
await self._get_websocket().send(ormsgpack.packb(flush_message))
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
@@ -332,5 +328,4 @@ class FishAudioTTSService(InterruptibleTTSService):
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")

View File

@@ -468,8 +468,7 @@ class GladiaSTTService(STTService):
break
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
self._connection_active = False
if not self._should_reconnect:
@@ -559,8 +558,7 @@ class GladiaSTTService(STTService):
except websockets.exceptions.ConnectionClosed:
logger.debug("Connection closed during keepalive")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
async def _receive_task_handler(self):
try:
@@ -623,8 +621,7 @@ class GladiaSTTService(STTService):
# Expected when closing the connection
pass
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
async def _maybe_reconnect(self) -> bool:
"""Handle exponential backoff reconnection logic."""
@@ -632,7 +629,9 @@ class GladiaSTTService(STTService):
return False
self._reconnection_attempts += 1
if self._reconnection_attempts > self._max_reconnection_attempts:
logger.error(f"Max reconnection attempts ({self._max_reconnection_attempts}) reached")
await self.push_error(
error_msg=f"Max reconnection attempts ({self._max_reconnection_attempts}) reached"
)
self._should_reconnect = False
return False
delay = self._reconnection_delay * (2 ** (self._reconnection_attempts - 1))

View File

@@ -1174,7 +1174,7 @@ class GeminiLiveLLMService(LLMService):
self._connection_task = self.create_task(self._connection_task_handler(config=config))
except Exception as e:
await self.push_error(ErrorFrame(error=f"{self} Initialization error: {e}"))
await self.push_error(exception=e)
async def _connection_task_handler(self, config: LiveConnectConfig):
async with self._client.aio.live.connect(model=self._model_name, config=config) as session:
@@ -1251,11 +1251,11 @@ class GeminiLiveLLMService(LLMService):
)
if self._consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
logger.error(
error_msg = (
f"Max consecutive failures ({MAX_CONSECUTIVE_FAILURES}) reached, "
"treating as fatal error"
)
await self.push_error(ErrorFrame(error=f"{self} Error in receive loop: {error}"))
await self.push_error(error_msg=error_msg, exception=error)
return False
else:
logger.info(
@@ -1283,7 +1283,7 @@ class GeminiLiveLLMService(LLMService):
self._completed_tool_calls = set()
self._disconnecting = False
except Exception as e:
logger.error(f"{self} error disconnecting: {e}")
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
async def _send_user_audio(self, frame):
"""Send user audio frame to Gemini Live API."""
@@ -1742,7 +1742,7 @@ class GeminiLiveLLMService(LLMService):
# state management, and that exponential backoff for retries can have
# cost/stability implications for a service cluster, let's just treat a
# send-side error as fatal.
await self.push_error(ErrorFrame(error=f"{self} Send error: {error}", fatal=True))
await self.push_error(ErrorFrame(error=f"{self} Send error: {error}"))
def create_context_aggregator(
self,

View File

@@ -110,7 +110,6 @@ class GoogleImageGenService(ImageGenService):
await self.stop_ttfb_metrics()
if not response or not response.generated_images:
logger.error(f"{self} error: image generation failed")
yield ErrorFrame("Image generation failed")
return
@@ -128,5 +127,4 @@ class GoogleImageGenService(ImageGenService):
yield frame
except Exception as e:
logger.error(f"{self} error generating image: {e}")
yield ErrorFrame(f"Image generation error: {str(e)}")

View File

@@ -793,7 +793,7 @@ class GoogleLLMService(LLMService):
return
generation_params.setdefault("thinking_config", {})["thinking_budget"] = 0
except Exception as e:
logger.exception(f"Failed to unset thinking budget: {e}")
logger.error(f"Failed to unset thinking budget: {e}")
async def _stream_content(
self, params_from_context: GeminiLLMInvocationParams
@@ -983,7 +983,7 @@ class GoogleLLMService(LLMService):
except DeadlineExceeded:
await self._call_event_handler("on_completion_timeout")
except Exception as e:
logger.exception(f"{self} exception: {e}")
await self.push_error(exception=e)
finally:
if grounding_metadata and isinstance(grounding_metadata, dict):
llm_search_frame = LLMSearchResponseFrame(

View File

@@ -774,8 +774,7 @@ class GoogleSTTService(STTService):
yield cloud_speech.StreamingRecognizeRequest(audio=audio_data)
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
raise
async def _stream_audio(self):
@@ -806,15 +805,13 @@ class GoogleSTTService(STTService):
break
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
await asyncio.sleep(1) # Brief delay before reconnecting
self._stream_start_time = int(time.time() * 1000)
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""Process an audio chunk for STT transcription.
@@ -902,8 +899,7 @@ class GoogleSTTService(STTService):
)
raise
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
# Re-raise the exception to let it propagate (e.g. in the case of a
# timeout, propagate to _stream_audio to reconnect)
raise

View File

@@ -737,7 +737,6 @@ class GoogleHttpTTSService(TTSService):
yield TTSStoppedFrame()
except Exception as e:
logger.error(f"{self} exception: {e}")
error_message = f"TTS generation error: {str(e)}"
yield ErrorFrame(error=error_message)
@@ -996,9 +995,7 @@ class GoogleTTSService(GoogleBaseTTSService):
yield frame
except Exception as e:
logger.error(f"{self} exception: {e}")
error_message = f"TTS generation error: {str(e)}"
yield ErrorFrame(error=error_message)
yield ErrorFrame(error=f"{self} error: {e}")
class GeminiTTSService(GoogleBaseTTSService):
@@ -1248,6 +1245,5 @@ class GeminiTTSService(GoogleBaseTTSService):
yield frame
except Exception as e:
logger.error(f"{self} exception: {e}")
error_message = f"Gemini TTS generation error: {str(e)}"
yield ErrorFrame(error=error_message)

View File

@@ -146,7 +146,6 @@ class GroqTTSService(TTSService):
bytes = w.readframes(num_frames)
yield TTSAudioRawFrame(bytes, frame_rate, channels)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()

View File

@@ -179,7 +179,7 @@ class HeyGenClient:
await self._task_manager.cancel_task(self._event_task)
self._event_task = None
except Exception as e:
logger.exception(f"Exception during cleanup: {e}")
logger.error(f"Exception during cleanup: {e}")
async def start(self, frame: StartFrame, audio_chunk_size: int) -> None:
"""Start the client and establish all necessary connections.

View File

@@ -287,8 +287,7 @@ class HumeTTSService(WordTTSService):
self._cumulative_time = utterance_duration
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
yield ErrorFrame(error=f"{self} error: {e}")
finally:
# Ensure TTFB timer is stopped even on early failures
await self.stop_ttfb_metrics()

View File

@@ -392,8 +392,7 @@ class InworldTTSService(TTSService):
# STEP 7: ERROR HANDLING
# ================================================================================
# Log any unexpected errors and notify the pipeline
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
finally:
# ================================================================================
# STEP 8: CLEANUP AND COMPLETION

View File

@@ -214,8 +214,7 @@ class LmntTTSService(InterruptibleTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -231,8 +230,7 @@ class LmntTTSService(InterruptibleTTSService):
# await self._websocket.send(json.dumps({"eof": True}))
await self._websocket.close()
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Error disconnecting from LMNT: {e}", exception=e)
finally:
self._started = False
self._websocket = None
@@ -266,10 +264,9 @@ class LmntTTSService(InterruptibleTTSService):
try:
msg = json.loads(message)
if "error" in msg:
logger.error(f"{self} error: {msg['error']}")
await self.push_frame(TTSStoppedFrame())
await self.stop_all_metrics()
await self.push_error(ErrorFrame(error=f"{self} error: {msg['error']}"))
await self.push_error(error_msg=f"{self} error: {msg['error']}")
return
except json.JSONDecodeError:
logger.error(f"Invalid JSON message: {message}")
@@ -302,7 +299,6 @@ class LmntTTSService(InterruptibleTTSService):
await self._get_websocket().send(json.dumps({"flush": True}))
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
@@ -310,5 +306,4 @@ class LmntTTSService(InterruptibleTTSService):
return
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")

View File

@@ -176,7 +176,6 @@ class MCPClient(BaseObject):
except Exception as e:
error_msg = f"Error calling mcp tool {params.function_name}: {str(e)}"
logger.error(error_msg)
logger.exception("Full exception details:")
await params.result_callback(error_msg)
async def _stdio_list_tools(self) -> ToolsSchema:
@@ -207,7 +206,6 @@ class MCPClient(BaseObject):
except Exception as e:
error_msg = f"Error calling mcp tool {params.function_name}: {str(e)}"
logger.error(error_msg)
logger.exception("Full exception details:")
await params.result_callback(error_msg)
async def _streamable_http_list_tools(self) -> ToolsSchema:
@@ -246,7 +244,6 @@ class MCPClient(BaseObject):
except Exception as e:
error_msg = f"Error calling mcp tool {params.function_name}: {str(e)}"
logger.error(error_msg)
logger.exception("Full exception details:")
await params.result_callback(error_msg)
async def _call_tool(self, session, function_name, arguments, result_callback):
@@ -302,7 +299,6 @@ class MCPClient(BaseObject):
except Exception as e:
logger.error(f"Failed to read tool '{tool_name}': {str(e)}")
logger.exception("Full exception details:")
continue
logger.debug(f"Completed reading {len(tool_schemas)} tools")

View File

@@ -253,8 +253,7 @@ class Mem0MemoryService(FrameProcessor):
# Otherwise, pass the enhanced context frame downstream
await self.push_frame(frame)
except Exception as e:
logger.error(f"Error processing with Mem0: {str(e)}")
await self.push_frame(ErrorFrame(f"Error processing with Mem0: {str(e)}"))
await self.push_error(exception=e)
await self.push_frame(frame) # Still pass the original frame through
else:
# For non-context frames, just pass them through

View File

@@ -264,7 +264,6 @@ class MiniMaxHttpTTSService(TTSService):
) as response:
if response.status != 200:
error_message = f"MiniMax TTS error: HTTP {response.status}"
logger.error(error_message)
yield ErrorFrame(error=error_message)
return
@@ -338,7 +337,6 @@ class MiniMaxHttpTTSService(TTSService):
continue
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
finally:
await self.stop_ttfb_metrics()

View File

@@ -110,7 +110,6 @@ class MoondreamService(VisionService):
if analysis fails.
"""
if not self._model:
logger.error(f"{self} error: Moondream model not available ({self.model_name})")
yield ErrorFrame("Moondream model not available")
return

View File

@@ -285,8 +285,7 @@ class NeuphonicTTSService(InterruptibleTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -299,8 +298,7 @@ class NeuphonicTTSService(InterruptibleTTSService):
logger.debug("Disconnecting from Neuphonic")
await self._websocket.close()
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
finally:
self._started = False
self._websocket = None
@@ -365,7 +363,6 @@ class NeuphonicTTSService(InterruptibleTTSService):
await self._send_text(text)
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
@@ -373,7 +370,6 @@ class NeuphonicTTSService(InterruptibleTTSService):
return
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
@@ -539,7 +535,6 @@ class NeuphonicHttpTTSService(TTSService):
error_text = await response.text()
error_message = f"Neuphonic API error: HTTP {response.status} - {error_text}"
logger.error(error_message)
yield ErrorFrame(error=error_message)
return
await self.start_tts_usage_metrics(text)
@@ -568,7 +563,6 @@ class NeuphonicHttpTTSService(TTSService):
yield TTSAudioRawFrame(audio_bytes, self.sample_rate, 1)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
# Don't yield error frame for individual message failures
continue
@@ -577,7 +571,6 @@ class NeuphonicHttpTTSService(TTSService):
logger.debug("TTS generation cancelled")
raise
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
finally:
await self.stop_ttfb_metrics()

View File

@@ -76,7 +76,6 @@ class OpenAIImageGenService(ImageGenService):
image_url = image.data[0].url
if not image_url:
logger.error(f"{self} No image provided in response: {image}")
yield ErrorFrame("Image generation failed")
return

View File

@@ -443,7 +443,7 @@ class OpenAIRealtimeLLMService(LLMService):
)
self._receive_task = self.create_task(self._receive_task_handler())
except Exception as e:
logger.error(f"{self} initialization error: {e}")
await self.push_error(error_msg=f"Error connecting: {e}", exception=e)
self._websocket = None
async def _disconnect(self):
@@ -460,7 +460,7 @@ class OpenAIRealtimeLLMService(LLMService):
self._completed_tool_calls = set()
self._disconnecting = False
except Exception as e:
logger.error(f"{self} error disconnecting: {e}")
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
async def _ws_send(self, realtime_message):
try:
@@ -473,12 +473,11 @@ class OpenAIRealtimeLLMService(LLMService):
# somehow *started* the websocket send attempt while we still
# had a connection)
return
logger.error(f"Error sending message to websocket: {e}")
# In server-to-server contexts, a WebSocket error should be quite rare. Given how hard
# it is to recover from a send-side error with proper state management, and that exponential
# backoff for retries can have cost/stability implications for a service cluster, let's just
# treat a send-side error as fatal.
await self.push_error(ErrorFrame(error=f"Error sending client event: {e}"))
await self.push_error(error_msg=f"Error sending client event: {e}", exception=e)
async def _update_settings(self):
settings = self._session_properties
@@ -759,7 +758,7 @@ class OpenAIRealtimeLLMService(LLMService):
async def _handle_evt_error(self, evt):
# Errors are fatal to this connection. Send an ErrorFrame.
await self.push_error(ErrorFrame(error=f"Error: {evt}"))
await self.push_error(error_msg=f"Error: {evt}")
#
# state and client events for the current conversation

View File

@@ -206,5 +206,4 @@ class OpenAITTSService(TTSService):
yield frame
yield TTSStoppedFrame()
except BadRequestError as e:
logger.exception(f"{self} error generating TTS: {e}")
yield ErrorFrame(error=f"{self} error: {e}")

View File

@@ -79,5 +79,5 @@ class AzureRealtimeBetaLLMService(OpenAIRealtimeBetaLLMService):
)
self._receive_task = self.create_task(self._receive_task_handler())
except Exception as e:
logger.error(f"{self} initialization error: {e}")
await self.push_error(error_msg=f"Error connecting: {e}", exception=e)
self._websocket = None

View File

@@ -424,7 +424,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
)
self._receive_task = self.create_task(self._receive_task_handler())
except Exception as e:
logger.error(f"{self} initialization error: {e}")
await self.push_error(error_msg=f"Error connecting: {e}", exception=e)
self._websocket = None
async def _disconnect(self):
@@ -440,7 +440,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
self._receive_task = None
self._disconnecting = False
except Exception as e:
logger.error(f"{self} error disconnecting: {e}")
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
async def _ws_send(self, realtime_message):
try:
@@ -449,12 +449,11 @@ class OpenAIRealtimeBetaLLMService(LLMService):
except Exception as e:
if self._disconnecting:
return
logger.error(f"Error sending message to websocket: {e}")
# In server-to-server contexts, a WebSocket error should be quite rare. Given how hard
# it is to recover from a send-side error with proper state management, and that exponential
# backoff for retries can have cost/stability implications for a service cluster, let's just
# treat a send-side error as fatal.
await self.push_error(ErrorFrame(error=f"Error sending client event: {e}"))
await self.push_error(error_msg=f"Error sending client event: {e}", exception=e)
async def _update_settings(self):
settings = self._session_properties
@@ -685,7 +684,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
async def _handle_evt_error(self, evt):
# Errors are fatal to this connection. Send an ErrorFrame.
await self.push_error(ErrorFrame(error=f"Error: {evt}"))
await self.push_error(error_msg=f"Error: {evt}")
async def _handle_assistant_output(self, output):
# We haven't seen intermixed audio and function_call items in the same response. But let's

View File

@@ -88,9 +88,6 @@ class PiperTTSService(TTSService):
) as response:
if response.status != 200:
error = await response.text()
logger.error(
f"{self} error getting audio (status: {response.status}, error: {error})"
)
yield ErrorFrame(
error=f"Error getting audio (status: {response.status}, error: {error})"
)

View File

@@ -266,8 +266,7 @@ class PlayHTTTSService(InterruptibleTTSService):
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Error connecting: {e}", exception=e)
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -280,8 +279,7 @@ class PlayHTTTSService(InterruptibleTTSService):
logger.debug("Disconnecting from PlayHT")
await self._websocket.close()
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
finally:
self._request_id = None
self._websocket = None
@@ -351,8 +349,7 @@ class PlayHTTTSService(InterruptibleTTSService):
await self.push_frame(TTSStoppedFrame())
self._request_id = None
elif "error" in msg:
logger.error(f"{self} error: {msg}")
await self.push_error(ErrorFrame(error=f"{self} error: {msg['error']}"))
await self.push_error(error_msg=f"{self} error: {msg['error']}")
except json.JSONDecodeError:
logger.error(f"Invalid JSON message: {message}")
@@ -394,7 +391,6 @@ class PlayHTTTSService(InterruptibleTTSService):
await self._get_websocket().send(json.dumps(tts_command))
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
@@ -405,7 +401,6 @@ class PlayHTTTSService(InterruptibleTTSService):
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
@@ -626,7 +621,6 @@ class PlayHTHttpTTSService(TTSService):
yield frame
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
finally:
await self.stop_ttfb_metrics()

View File

@@ -259,8 +259,7 @@ class RimeTTSService(AudioContextWordTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Error connecting: {e}", exception=e)
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -272,8 +271,7 @@ class RimeTTSService(AudioContextWordTTSService):
await self._websocket.send(json.dumps(self._build_eos_msg()))
await self._websocket.close()
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
finally:
self._context_id = None
self._websocket = None
@@ -366,10 +364,9 @@ class RimeTTSService(AudioContextWordTTSService):
logger.debug(f"Updated cumulative time to: {self._cumulative_time}")
elif msg["type"] == "error":
logger.error(f"{self} error: {msg}")
await self.push_frame(TTSStoppedFrame())
await self.stop_all_metrics()
await self.push_error(ErrorFrame(error=f"{self} error: {msg['message']}"))
await self.push_error(error_msg=f"{self} error: {msg['message']}")
self._context_id = None
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
@@ -411,7 +408,6 @@ class RimeTTSService(AudioContextWordTTSService):
await self._get_websocket().send(json.dumps(msg))
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
@@ -419,7 +415,6 @@ class RimeTTSService(AudioContextWordTTSService):
return
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
@@ -551,7 +546,6 @@ class RimeHttpTTSService(TTSService):
) as response:
if response.status != 200:
error_message = f"Rime TTS error: HTTP {response.status}"
logger.error(error_message)
yield ErrorFrame(error=error_message)
return
@@ -569,7 +563,6 @@ class RimeHttpTTSService(TTSService):
yield frame
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
finally:
await self.stop_ttfb_metrics()

View File

@@ -655,11 +655,9 @@ class RivaSegmentedSTTService(SegmentedSTTService):
logger.debug("No transcription results found in Riva response")
except AttributeError as ae:
logger.error(f"Unexpected response structure from Riva: {ae}")
yield ErrorFrame(f"Unexpected Riva response format: {str(ae)}")
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")

View File

@@ -180,7 +180,6 @@ class RivaTTSService(TTSService):
yield frame
resp = await asyncio.wait_for(queue.get(), timeout=RIVA_TTS_TIMEOUT_SECS)
except asyncio.TimeoutError:
logger.error(f"{self} timeout waiting for audio response")
yield ErrorFrame(error=f"{self} error: {e}")
await self.start_tts_usage_metrics(text)

View File

@@ -275,8 +275,7 @@ class SarvamSTTService(STTService):
await self._socket_client.translate(**method_kwargs)
except Exception as e:
logger.error(f"Error sending audio to Sarvam: {e}")
await self.push_error(ErrorFrame(f"Failed to send audio: {e}"))
yield ErrorFrame(error=f"{self} error: {e}")
yield None
@@ -332,13 +331,11 @@ class SarvamSTTService(STTService):
logger.info("Connected to Sarvam successfully")
except ApiError as e:
logger.error(f"Sarvam API error: {e}")
await self.push_error(ErrorFrame(f"Sarvam API error: {e}"))
await self.push_error(error_msg=f"Sarvam API error: {e}", exception=e)
except Exception as e:
logger.error(f"Failed to connect to Sarvam: {e}")
self._socket_client = None
self._websocket_context = None
await self.push_error(ErrorFrame(f"Failed to connect to Sarvam: {e}"))
await self.push_error(error_msg=f"Failed to connect to Sarvam: {e}", exception=e)
async def _disconnect(self):
"""Disconnect from Sarvam WebSocket API using SDK."""
@@ -351,7 +348,9 @@ class SarvamSTTService(STTService):
# Exit the async context manager
await self._websocket_context.__aexit__(None, None, None)
except Exception as e:
logger.error(f"Error closing WebSocket connection: {e}")
await self.push_error(
error_msg=f"Error closing WebSocket connection: {e}", exception=e
)
finally:
logger.debug("Disconnected from Sarvam WebSocket")
self._socket_client = None
@@ -371,8 +370,7 @@ class SarvamSTTService(STTService):
# Messages will be handled via the _message_handler callback
await self._socket_client.start_listening()
except Exception as e:
logger.error(f"Error in Sarvam receive task: {e}")
await self.push_error(ErrorFrame(f"Sarvam receive task error: {e}"))
await self.push_error(error_msg=f"Sarvam receive task error: {e}", exception=e)
async def _handle_message(self, message):
"""Handle incoming WebSocket message from Sarvam SDK.
@@ -427,8 +425,7 @@ class SarvamSTTService(STTService):
await self.stop_processing_metrics()
except Exception as e:
logger.error(f"Error handling Sarvam message: {e}")
await self.push_error(ErrorFrame(f"Failed to handle message: {e}"))
await self.push_error(error_msg=f"Failed to handle message: {e}", exception=e)
await self.stop_all_metrics()
@traced_stt

View File

@@ -254,8 +254,7 @@ class SarvamHttpTTSService(TTSService):
async with self._session.post(url, json=payload, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Sarvam API error: {error_text}")
await self.push_error(ErrorFrame(error=f"Sarvam API error: {error_text}"))
yield ErrorFrame(error=f"Sarvam API error: {error_text}")
return
response_data = await response.json()
@@ -264,8 +263,7 @@ class SarvamHttpTTSService(TTSService):
# Decode base64 audio data
if "audios" not in response_data or not response_data["audios"]:
logger.error("No audio data received from Sarvam API")
await self.push_error(ErrorFrame(error="No audio data received"))
yield ErrorFrame(error="No audio data received")
return
# Get the first audio (there should be only one for single text input)
@@ -286,8 +284,7 @@ class SarvamHttpTTSService(TTSService):
yield frame
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
yield ErrorFrame(error=f"{self} error: {e}")
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()
@@ -560,8 +557,7 @@ class SarvamTTSService(InterruptibleTTSService):
await self._disconnect_websocket()
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
finally:
# Reset state only after everything is cleaned up
self._started = False
@@ -585,8 +581,9 @@ class SarvamTTSService(InterruptibleTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(
error_msg=f"Error connecting to Sarvam TTS Websocket: {e}", exception=e
)
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -602,8 +599,7 @@ class SarvamTTSService(InterruptibleTTSService):
await self._websocket.send(json.dumps(config_message))
logger.debug("Configuration sent successfully")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
raise
async def _disconnect_websocket(self):
@@ -615,8 +611,7 @@ class SarvamTTSService(InterruptibleTTSService):
logger.debug("Disconnecting from Sarvam")
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Error closing websocket: {e}", exception=e)
finally:
self._started = False
self._websocket = None
@@ -640,7 +635,7 @@ class SarvamTTSService(InterruptibleTTSService):
await self.push_frame(frame)
elif msg.get("type") == "error":
error_msg = msg["data"]["message"]
logger.error(f"TTS Error: {error_msg}")
await self.push_error(error_msg=f"TTS Error: {error_msg}")
# If it's a timeout error, the connection might need to be reset
if "too long" in error_msg.lower() or "timeout" in error_msg.lower():
@@ -702,7 +697,6 @@ class SarvamTTSService(InterruptibleTTSService):
await self._send_text(text)
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
@@ -710,5 +704,4 @@ class SarvamTTSService(InterruptibleTTSService):
return
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")

View File

@@ -178,7 +178,7 @@ class SimliVideoService(FrameProcessor):
self._audio_task = self.create_task(self._consume_and_process_audio())
self._video_task = self.create_task(self._consume_and_process_video())
except Exception as e:
logger.error(f"{self}: unable to start connection: {e}")
await self.push_error(error_msg=f"Unable to start connection: {e}", exception=e)
async def _consume_and_process_audio(self):
"""Consume audio frames from Simli and push them downstream."""
@@ -256,7 +256,7 @@ class SimliVideoService(FrameProcessor):
await self._simli_client.send(audioBytes)
return
except Exception as e:
logger.exception(f"{self} exception: {e}")
await self.push_error(error_msg=f"Error sending audio: {e}", exception=e)
elif isinstance(frame, TTSStoppedFrame):
try:
if self._previously_interrupted and len(self._audio_buffer) > 0:
@@ -264,7 +264,7 @@ class SimliVideoService(FrameProcessor):
self._previously_interrupted = False
self._audio_buffer = bytearray()
except Exception as e:
logger.exception(f"{self} exception: {e}")
await self.push_error(error_msg=f"Error stopping TTS: {e}", exception=e)
return
elif isinstance(frame, (EndFrame, CancelFrame)):
await self._stop()

View File

@@ -194,7 +194,7 @@ class SonioxSTTService(STTService):
self._websocket = await websocket_connect(self._url)
if not self._websocket:
logger.error(f"Unable to connect to Soniox API at {self._url}")
await self.push_error(error_msg=f"Unable to connect to Soniox API at {self._url}")
# If vad_force_turn_endpoint is not enabled, we need to enable endpoint detection.
# Either one or the other is required.
@@ -327,8 +327,7 @@ class SonioxSTTService(STTService):
# Expected when closing the connection
logger.debug("WebSocket connection closed, keepalive task stopped.")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
async def _receive_task_handler(self):
if not self._websocket:
@@ -404,13 +403,8 @@ class SonioxSTTService(STTService):
if error_code or error_message:
# In case of error, still send the final transcript (if any remaining in the buffer).
await send_endpoint_transcript()
logger.error(
f"{self} error: {error_code} (_receive_task_handler) - {error_message}"
)
await self.push_error(
ErrorFrame(
error=f"{self} error: {error_code} (_receive_task_handler) - {error_message}"
)
error_msg=f"{self} error: {error_code} (_receive_task_handler) - {error_message}"
)
finished = content.get("finished")
@@ -425,5 +419,4 @@ class SonioxSTTService(STTService):
# Expected when closing the connection.
pass
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Error receiving message: {e}", exception=e)

View File

@@ -467,7 +467,6 @@ class SpeechmaticsSTTService(STTService):
await self._client.send_audio(audio)
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
await self._disconnect()
@@ -514,8 +513,7 @@ class SpeechmaticsSTTService(STTService):
self._client.send_message(payload), self.get_event_loop()
)
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
raise RuntimeError(f"error sending message to STT: {e}")
async def _connect(self) -> None:
@@ -581,8 +579,7 @@ class SpeechmaticsSTTService(STTService):
logger.debug(f"{self} Connected to Speechmatics STT service")
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Error connecting to Speechmatics: {e}", exception=e)
self._client = None
async def _disconnect(self) -> None:
@@ -596,8 +593,9 @@ class SpeechmaticsSTTService(STTService):
except asyncio.TimeoutError:
logger.warning(f"{self} Timeout while closing Speechmatics client connection")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(
error_msg=f"Error disconnecting from Speechmatics: {e}", exception=e
)
finally:
self._client = None
await self._call_event_handler("on_disconnected")

View File

@@ -174,16 +174,13 @@ class SpeechmaticsTTSService(TTSService):
except (ValueError, ArithmeticError):
yield ErrorFrame(
error=f"{self} Service unavailable [503] (attempts {attempt})",
fatal=True,
error=f"{self} Service unavailable [503] (attempts {attempt})"
)
return
# != 200 : Error
if response.status != 200:
yield ErrorFrame(
error=f"{self} Service unavailable [{response.status}]", fatal=True
)
yield ErrorFrame(error=f"{self} Service unavailable [{response.status}]")
return
# Update Pipecat metrics
@@ -225,7 +222,7 @@ class SpeechmaticsTTSService(TTSService):
break
except Exception as e:
yield ErrorFrame(error=f"{self}: Error generating TTS: {e}", fatal=True)
yield ErrorFrame(error=f"{self}: Error generating TTS: {e}")
finally:
# Emit the TTS stopped frame
yield TTSStoppedFrame()

View File

@@ -329,4 +329,4 @@ class WebsocketSTTService(STTService, WebsocketService):
async def _report_error(self, error: ErrorFrame):
await self._call_event_handler("on_connection_error", error.error)
await self.push_error(error)
await self.push_error_frame(error)

View File

@@ -671,7 +671,7 @@ class WebsocketTTSService(TTSService, WebsocketService):
async def _report_error(self, error: ErrorFrame):
await self._call_event_handler("on_connection_error", error.error)
await self.push_error(error)
await self.push_error_frame(error)
class InterruptibleTTSService(WebsocketTTSService):
@@ -733,7 +733,7 @@ class WebsocketWordTTSService(WordTTSService, WebsocketService):
async def _report_error(self, error: ErrorFrame):
await self._call_event_handler("on_connection_error", error.error)
await self.push_error(error)
await self.push_error_frame(error)
class InterruptibleWordTTSService(WebsocketWordTTSService):

View File

@@ -246,8 +246,7 @@ class UltravoxSTTService(AIService):
logger.info("Model warm-up completed successfully")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(exception=e)
def _generate_silent_audio(self, sample_rate=16000, duration_sec=1.0):
"""Generate silent audio as a numpy array.
@@ -437,17 +436,11 @@ class UltravoxSTTService(AIService):
yield LLMFullResponseEndFrame()
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
else:
logger.error("No model available for text generation")
yield ErrorFrame("No model available for text generation")
except Exception as e:
logger.error(f"{self} exception: {e}")
import traceback
logger.error(traceback.format_exc())
yield ErrorFrame(f"Error processing audio: {str(e)}")
finally:
self._buffer.is_processing = False

View File

@@ -226,7 +226,6 @@ class BaseWhisperSTTService(SegmentedSTTService):
logger.warning("Received empty transcription from API")
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
async def _transcribe(self, audio: bytes) -> Transcription:

View File

@@ -285,7 +285,6 @@ class WhisperSTTService(SegmentedSTTService):
The service will normalize it to float32 in the range [-1, 1].
"""
if not self._model:
logger.error(f"{self} error: Whisper model not available")
yield ErrorFrame("Whisper model not available")
return
@@ -428,5 +427,4 @@ class WhisperSTTServiceMLX(WhisperSTTService):
)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")

View File

@@ -141,13 +141,8 @@ class XTTSService(TTSService):
async with self._aiohttp_session.get(self._settings["base_url"] + "/studio_speakers") as r:
if r.status != 200:
text = await r.text()
logger.error(
f"{self} error getting studio speakers (status: {r.status}, error: {text})"
)
await self.push_error(
ErrorFrame(
error=f"Error getting studio speakers (status: {r.status}, error: {text})"
)
error_msg=f"{self} error getting studio speakers (status: {r.status}, error: {text})"
)
return
self._studio_speakers = await r.json()
@@ -186,7 +181,6 @@ class XTTSService(TTSService):
async with self._aiohttp_session.post(url, json=payload) as r:
if r.status != 200:
text = await r.text()
logger.error(f"{self} error getting audio (status: {r.status}, error: {text})")
yield ErrorFrame(error=f"Error getting audio (status: {r.status}, error: {text})")
return

View File

@@ -2506,13 +2506,10 @@ class DailyTransport(BaseTransport):
async def _on_error(self, error):
"""Handle error events and push error frames."""
await self._call_event_handler("on_error", error)
# Push error frame to notify the pipeline
error_frame = ErrorFrame(error)
if self._input:
await self._input.push_error(error_frame)
await self._input.push_error(error_msg=error)
elif self._output:
await self._output.push_error(error_frame)
await self._output.push_error(error_msg=error)
else:
logger.error("Both input and output are None while trying to push error")
raise Exception("No valid input or output channel to push error")
@@ -2568,7 +2565,7 @@ class DailyTransport(BaseTransport):
except asyncio.TimeoutError:
logger.error(f"Timeout handling dialin-ready event ({url})")
except Exception as e:
logger.exception(f"Error handling dialin-ready event ({url}): {e}")
logger.error(f"Error handling dialin-ready event ({url}): {e}")
async def _on_dialin_connected(self, data):
"""Handle dial-in connected events."""

View File

@@ -316,7 +316,7 @@ class SmallWebRTCConnection(BaseObject):
logger.debug("Client not connected. Queuing app-message.")
self._pending_app_messages.append(json_message)
except Exception as e:
logger.exception(f"Error parsing JSON message {message}, {e}")
logger.error(f"Error parsing JSON message {message}, {e}")
# Despite the fact that aiortc provides this listener, they don't have a status for "disconnected"
# So, in case we loose connection, this event will not be triggered

View File

@@ -265,7 +265,7 @@ class TavusTransportClient:
try:
await self._client.cleanup()
except Exception as e:
logger.exception(f"Exception during cleanup: {e}")
logger.error(f"Exception during cleanup: {e}")
async def _on_joined(self, data):
"""Handle joined event."""

View File

@@ -162,7 +162,7 @@ class TaskManager(BaseTaskManager):
# Re-raise the exception to ensure the task is cancelled.
raise
except Exception as e:
logger.exception(f"{name}: unexpected exception: {e}")
logger.error(f"{name}: unexpected exception: {e}")
if not self._params:
raise Exception("TaskManager is not setup: unable to get event loop")
@@ -197,7 +197,7 @@ class TaskManager(BaseTaskManager):
# Here are sure the task is cancelled properly.
pass
except Exception as e:
logger.exception(f"{name}: unexpected exception while cancelling task: {e}")
logger.error(f"{name}: unexpected exception while cancelling task: {e}")
except BaseException as e:
logger.critical(f"{name}: fatal base exception while cancelling task: {e}")
raise

View File

@@ -187,7 +187,7 @@ class BaseObject(ABC):
else:
handler(self, *args, **kwargs)
except Exception as e:
logger.exception(f"Exception in event handler {event_name}: {e}")
logger.error(f"Exception in event handler {event_name}: {e}")
def _event_task_finished(self, task: asyncio.Task):
"""Clean up completed event handler tasks.