Compare commits

...

9 Commits

Author SHA1 Message Date
James Hush
1b28fc8e8e Fix: Ensure EndFrame propagates through AIService before stop()
This fix addresses a critical bug where EndFrame (and potentially other
system frames) would trigger the stop() method in AIService but never
be pushed downstream to subsequent processors, causing pipelines to hang.

The issue occurred because AIService.process_frame() would call stop(frame)
for EndFrame without first pushing it downstream. This meant that downstream
processors never received the shutdown signal, leaving the pipeline in a
waiting state.

The fix ensures EndFrame is pushed downstream BEFORE calling stop(), following
the same pattern used by RTVIProcessor and properly-implemented processors.
This guarantees that:
1. Downstream processors receive the EndFrame for proper cleanup
2. The stop() method can then safely perform service-specific cleanup
3. The ordering prevents race conditions during shutdown

This bug affected all AI services inheriting from AIService that didn't
override process_frame() to explicitly handle EndFrame, including scenarios
with TTS services, LLM services, and other AI service implementations.

Fixes pipeline hangs during graceful shutdown when EndFrame is sent.
2025-11-17 11:09:42 +01:00
Mark Backman
35ff44b799 Merge pull request #3059 from pipecat-ai/mb/remove-llm-tracing-fallback 2025-11-14 14:07:40 -05:00
Angad Singh
d1116d149e feat: Add ErrorFrame emission to TTS/STT services for pipeline error detection (#2881)
* feat: Add ErrorFrame emission to TTS/STT services for pipeline error detection

- Add ErrorFrame emission to all major TTS/STT services during initialization and runtime failures
- Services updated: Cartesia, ElevenLabs, Deepgram, AssemblyAI, Rime, Azure
- ErrorFrame objects emitted with fatal=False for graceful degradation
- Enables on_pipeline_error event handler to detect service failures programmatically
- Add comprehensive pytest test suite to verify ErrorFrame emission
- Fixes issue where services failed gracefully but didn't emit ErrorFrame objects

This allows developers to implement real-time error monitoring and alerting
using the on_pipeline_error event handler introduced in v0.0.90.

* Update STT and TTS services to use consistent error handling pattern

- Improves error handling consistency across all services

* Add changelog entry for STT/TTS error handling improvements

* Linting issues Resolved

* Azure STT ErrorFrames added with consistent patterns

* Cartesia STT and Deepgram STT; additional fixes made

* Removed Fatal Flags across services, removed duplication

* Moving the changelog entry to the correct place.

* Refactoring some classes to use yield instead of push_error directly.

* Fixing ruff format.

---------

Co-authored-by: Filipi Fuchter <filipi87@gmail.com>
2025-11-14 15:03:05 -03:00
Mark Backman
d01876ee60 Remove fallbacks in traced_llm 2025-11-14 12:13:49 -05:00
Mark Backman
74a0e8c88d Merge pull request #3050 from ai-coustics/aic-vad-analyzer
feat(ai-coustics): add ai-coustics integrated VAD
2025-11-14 08:11:15 -05:00
Corvin Jaedicke
fbbad27d37 add changelog info 2025-11-14 13:30:06 +01:00
Corvin Jaedicke
2fab3e2286 fix formatting 2025-11-13 14:39:26 +01:00
Corvin Jaedicke
a7b2052b38 add ai-coustics VAD 2025-11-13 14:20:35 +01:00
Corvin Jaedicke
3c76917c1e use async process function 2025-11-12 13:48:22 +01:00
48 changed files with 515 additions and 211 deletions

View File

@@ -18,6 +18,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Updated all STT and TTS services to use consistent error handling pattern with
`push_error()` method for better pipeline error event integration.
- Added Hindi support for Rime TTS services.
- Updated `GeminiTTSService` to use Google Cloud Text-to-Speech streaming API
@@ -51,6 +54,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Prevented `HeyGenVideoService` from automatically disconnecting after 5 minutes.
### Added
- Added ai-coustics integrated VAD (`AICVADAnalyzer`) with `AICFilter` factory and
example wiring; leverages the enhancement model for robust detection with no
ONNX dependency or added processing complexity.
## [0.0.94] - 2025-11-10
### Changed

View File

@@ -15,7 +15,6 @@ from loguru import logger
from pipecat.audio.filters.aic_filter import AICFilter
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
@@ -48,7 +47,7 @@ def _create_aic_filter() -> AICFilter:
return AICFilter(
license_key=license_key,
enhancement_level=1.0,
enhancement_level=0.5,
)
@@ -56,27 +55,33 @@ def _create_aic_filter() -> AICFilter:
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
audio_in_filter=_create_aic_filter(),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
audio_in_filter=_create_aic_filter(),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
audio_in_filter=_create_aic_filter(),
),
"daily": lambda: (
lambda aic: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=aic.create_vad_analyzer(lookback_buffer_size=6.0, sensitivity=6.0),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
audio_in_filter=aic,
)
)(_create_aic_filter()),
"twilio": lambda: (
lambda aic: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=aic.create_vad_analyzer(lookback_buffer_size=6.0, sensitivity=6.0),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
audio_in_filter=aic,
)
)(_create_aic_filter()),
"webrtc": lambda: (
lambda aic: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=aic.create_vad_analyzer(lookback_buffer_size=6.0, sensitivity=6.0),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
audio_in_filter=aic,
)
)(_create_aic_filter()),
}

View File

@@ -45,7 +45,7 @@ Source = "https://github.com/pipecat-ai/pipecat"
Website = "https://pipecat.ai"
[project.optional-dependencies]
aic = [ "aic-sdk~=1.0.1" ]
aic = [ "aic-sdk~=1.1.0" ]
anthropic = [ "anthropic~=0.49.0" ]
assemblyai = [ "pipecat-ai[websockets-base]" ]
asyncai = [ "pipecat-ai[websockets-base]" ]

View File

@@ -68,6 +68,58 @@ class AICFilter(BaseAudioFilter):
# Model will be created in start() since the API now requires sample_rate
self._aic = None
def get_vad_factory(self):
"""Return a zero-arg factory that will create the VAD once the model exists.
Returns:
A zero-argument callable that, when invoked, returns an initialized
VoiceActivityDetector bound to the underlying AIC model. Raises a
RuntimeError if the model has not been initialized (i.e. start()
has not been called successfully).
"""
def _factory():
if self._aic is None:
raise RuntimeError("AIC model not initialized yet. Call start(sample_rate) first.")
return self._aic.create_vad()
return _factory
def create_vad_analyzer(
self,
*,
lookback_buffer_size: Optional[float] = None,
sensitivity: Optional[float] = None,
):
"""Return an analyzer that will lazily instantiate the AIC VAD when ready.
AIC VAD parameters:
- lookback_buffer_size:
Number of window-length audio buffers used as a lookback buffer.
Higher values increase prediction stability but add latency.
Range: 1.0 .. 20.0, Default (SDK): 6.0
- sensitivity:
Energy threshold sensitivity. Energy threshold = 10 ** (-sensitivity).
Range: 1.0 .. 15.0, Default (SDK): 6.0
Args:
lookback_buffer_size: Optional lookback buffer size to configure on the VAD.
Range: 1.0 .. 20.0. If None, SDK default is used.
sensitivity: Optional sensitivity (energy threshold) to configure on the VAD.
Range: 1.0 .. 15.0. If None, SDK default is used.
Returns:
A lazily-initialized AICVADAnalyzer that will bind to the VAD backend
once the filter's model has been created (after start(sample_rate)).
"""
from pipecat.audio.vad.aic_vad import AICVADAnalyzer
return AICVADAnalyzer(
vad_factory=self.get_vad_factory(),
lookback_buffer_size=lookback_buffer_size,
sensitivity=sensitivity,
)
async def start(self, sample_rate: int):
"""Initialize the filter with the transport's sample rate.
@@ -185,7 +237,7 @@ class AICFilter(BaseAudioFilter):
)
# Process planar in-place; returns ndarray (same shape)
out_f32 = self._aic.process(block_f32)
out_f32 = await self._aic.process_async(block_f32)
# Convert back to int16 bytes, planar layout
out_i16 = np.clip(out_f32 * 32768.0, -32768, 32767).astype(np.int16)

View File

@@ -0,0 +1,158 @@
"""AIC-integrated VAD analyzer that lazily binds to the AIC SDK backend.
This analyzer queries the backend's is_speech_detected() and maps it to a float
confidence (1.0/0.0). It uses 10 ms windows based on the sample rate and applies
optional AIC VAD parameters (lookback_buffer_size, sensitivity) when available.
"""
from typing import Any, Callable, Optional
from loguru import logger
from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADParams
try:
from aic import AICVadParameter
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use the AIC filter, you need to `pip install pipecat-ai[aic]`.")
raise Exception(f"Missing module: {e}")
class AICVADAnalyzer(VADAnalyzer):
"""VAD analyzer that lazily instantiates the AIC VoiceActivityDetector via a factory.
The analyzer can be constructed before the AIC Model exists. Once the filter has
started and the Model is available, the provided factory will succeed and the
backend VAD will be created. We then switch to single-sample updates where
num_frames_required() returns 1 and confidence is derived from the backend's
boolean is_speech_detected() state.
AIC VAD runtime parameters:
- lookback_buffer_size:
Controls the lookback buffer size used by the VAD, i.e. the number of
window-length audio buffers used as a lookback buffer. Larger values improve
stability but increase latency.
Range: 1.0 .. 20.0
Default (SDK): 6.0
- sensitivity:
Controls the energy threshold sensitivity. Higher values make the detector
less sensitive (require more energy to count as speech).
Range: 1.0 .. 15.0
Formula: Energy threshold = 10 ** (-sensitivity)
Default (SDK): 6.0
"""
def __init__(
self,
*,
vad_factory: Optional[Callable[[], Any]] = None,
lookback_buffer_size: Optional[float] = None,
sensitivity: Optional[float] = None,
):
"""Create an AIC VAD analyzer.
Args:
vad_factory:
Zero-arg callable that returns an initialized AIC VoiceActivityDetector.
This may raise until the filter's Model has been created; the analyzer
will retry on set_sample_rate/first use.
lookback_buffer_size:
Optional override for AIC VAD lookback buffer size.
Range: 1.0 .. 20.0. Larger values increase stability at the cost of latency.
If None, the SDK default (6.0) is used.
sensitivity:
Optional override for AIC VAD sensitivity (energy threshold).
Range: 1.0 .. 15.0. Energy threshold = 10 ** (-sensitivity).
If None, the SDK default (6.0) is used.
"""
# Use fixed VAD parameters for AIC: no user override
fixed_params = VADParams(confidence=0.5, start_secs=0.0, stop_secs=0.0, min_volume=0.0)
super().__init__(sample_rate=None, params=fixed_params)
self._vad_factory = vad_factory
self._backend_vad: Optional[Any] = None
self._pending_lookback: Optional[float] = lookback_buffer_size
self._pending_sensitivity: Optional[float] = sensitivity
def bind_vad_factory(self, vad_factory: Callable[[], Any]):
"""Attach or replace the factory post-construction."""
self._vad_factory = vad_factory
self._ensure_backend_initialized()
def _apply_backend_params(self):
"""Apply optional AIC VAD parameters if available."""
if self._backend_vad is None or AICVadParameter is None:
return
try:
if self._pending_lookback is not None:
self._backend_vad.set_parameter(
AICVadParameter.LOOKBACK_BUFFER_SIZE, float(self._pending_lookback)
)
if self._pending_sensitivity is not None:
self._backend_vad.set_parameter(
AICVadParameter.SENSITIVITY, float(self._pending_sensitivity)
)
except Exception as e: # noqa: BLE001
logger.debug(f"AIC VAD parameter application deferred/failed: {e}")
def _ensure_backend_initialized(self):
if self._backend_vad is not None:
return
if not self._vad_factory:
return
try:
self._backend_vad = self._vad_factory()
self._apply_backend_params()
# With backend ready, recompute internal frame sizing
super().set_params(self._params)
logger.debug("AIC VAD backend initialized in analyzer.")
except Exception as e: # noqa: BLE001
# Filter may not be started yet; try again later
logger.debug(f"Deferring AIC VAD backend initialization: {e}")
def set_sample_rate(self, sample_rate: int):
"""Set the sample rate for audio processing.
Args:
sample_rate: Audio sample rate in Hz.
"""
# Set rate and attempt backend initialization once we know SR
self._sample_rate = self._init_sample_rate or sample_rate
self._ensure_backend_initialized()
# Ensure params are initialized even if backend not ready yet
try:
super().set_params(self._params)
except Exception:
pass
def num_frames_required(self) -> int:
"""Get the number of audio frames required for analysis.
Returns:
Number of frames needed for VAD processing.
"""
# Use 10 ms windows based on sample rate
return int(self.sample_rate * 0.01) if self.sample_rate > 0 else 160
def voice_confidence(self, buffer: bytes) -> float:
"""Calculate voice activity confidence for the given audio buffer.
Args:
buffer: Audio buffer to analyze.
Returns:
Voice confidence score is 0.0 or 1.0.
"""
# Ensure backend exists (filter might have started since last call)
self._ensure_backend_initialized()
if self._backend_vad is None:
return 0.0
# We do not need to analyze 'buffer' here since the model's VAD is updated
# as part of the enhancement pipeline. Simply query the boolean and map it.
try:
is_speech = self._backend_vad.is_speech_detected()
return 1.0 if is_speech else 0.0
except Exception as e: # noqa: BLE001
logger.error(f"AIC VAD inference error: {e}")
return 0.0

View File

@@ -152,6 +152,9 @@ class AIService(FrameProcessor):
elif isinstance(frame, CancelFrame):
await self.cancel(frame)
elif isinstance(frame, EndFrame):
# Push EndFrame before stop(), because stop() may wait on tasks to
# finish and downstream processors need to receive the EndFrame.
await self.push_frame(frame, direction)
await self.stop(frame)
async def process_generator(self, generator: AsyncGenerator[Frame | None, None]):

View File

@@ -21,6 +21,7 @@ from pipecat import __version__ as pipecat_version
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
@@ -205,8 +206,9 @@ class AssemblyAISTTService(STTService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"Failed to connect to AssemblyAI: {e}")
logger.error(f"{self} exception: {e}")
self._connected = False
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
raise
async def _disconnect(self):
@@ -231,7 +233,8 @@ class AssemblyAISTTService(STTService):
logger.warning("Timed out waiting for termination message from server")
except Exception as e:
logger.warning(f"Error during termination handshake: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
if self._receive_task:
await self.cancel_task(self._receive_task)
@@ -239,7 +242,8 @@ class AssemblyAISTTService(STTService):
await self._websocket.close()
except Exception as e:
logger.error(f"Error during disconnect: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._websocket = None
@@ -258,11 +262,13 @@ class AssemblyAISTTService(STTService):
except websockets.exceptions.ConnectionClosedOK:
break
except Exception as e:
logger.error(f"Error processing WebSocket message: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
break
except Exception as e:
logger.error(f"Fatal error in receive handler: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
def _parse_message(self, message: Dict[str, Any]) -> BaseMessage:
"""Parse a raw message into the appropriate message type."""
@@ -291,7 +297,8 @@ class AssemblyAISTTService(STTService):
elif isinstance(parsed_message, TerminationMessage):
await self._handle_termination(parsed_message)
except Exception as e:
logger.error(f"Error handling message: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
async def _handle_termination(self, message: TerminationMessage):
"""Handle termination message."""

View File

@@ -237,7 +237,8 @@ class AsyncAITTSService(InterruptibleTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -249,7 +250,8 @@ class AsyncAITTSService(InterruptibleTTSService):
logger.debug("Disconnecting from Async")
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._websocket = None
self._started = False
@@ -297,7 +299,7 @@ class AsyncAITTSService(InterruptibleTTSService):
logger.error(f"{self} error: {msg}")
await self.push_frame(TTSStoppedFrame())
await self.stop_all_metrics()
await self.push_error(ErrorFrame(f"{self} error: {msg['message']}"))
await self.push_error(ErrorFrame(error=f"{self} error: {msg['message']}"))
else:
logger.error(f"{self} error, unknown message type: {msg}")
@@ -342,7 +344,8 @@ class AsyncAITTSService(InterruptibleTTSService):
await self._get_websocket().send(msg)
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} error sending message: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
@@ -350,6 +353,7 @@ class AsyncAITTSService(InterruptibleTTSService):
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
class AsyncAIHttpTTSService(TTSService):
@@ -492,7 +496,7 @@ class AsyncAIHttpTTSService(TTSService):
if response.status != 200:
error_text = await response.text()
logger.error(f"Async API error: {error_text}")
await self.push_error(ErrorFrame(f"Async API error: {error_text}"))
await self.push_error(ErrorFrame(error=f"Async API error: {error_text}"))
raise Exception(f"Async API returned status {response.status}: {error_text}")
audio_data = await response.read()
@@ -509,7 +513,7 @@ class AsyncAIHttpTTSService(TTSService):
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(f"Error generating TTS: {e}"))
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -140,7 +140,8 @@ class AWSTranscribeSTTService(STTService):
return
logger.warning("WebSocket connection not established after connect")
except Exception as e:
logger.error(f"Failed to connect (attempt {retry_count + 1}/{max_retries}): {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
retry_count += 1
if retry_count < max_retries:
await asyncio.sleep(1) # Wait before retrying
@@ -181,8 +182,8 @@ class AWSTranscribeSTTService(STTService):
try:
await self._connect()
except Exception as e:
logger.error(f"Failed to reconnect: {e}")
yield ErrorFrame("Failed to reconnect to AWS Transcribe", fatal=False)
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
return
# Format the audio data according to AWS event stream format
@@ -199,13 +200,13 @@ class AWSTranscribeSTTService(STTService):
await self._disconnect()
# Don't yield error here - we'll retry on next frame
except Exception as e:
logger.error(f"Error sending audio: {e}")
yield ErrorFrame(f"AWS Transcribe error: {str(e)}", fatal=False)
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
await self._disconnect()
except Exception as e:
logger.error(f"Error in run_stt: {e}")
yield ErrorFrame(f"AWS Transcribe error: {str(e)}", fatal=False)
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
await self._disconnect()
async def _connect(self):
@@ -288,7 +289,8 @@ class AWSTranscribeSTTService(STTService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} Failed to connect to AWS Transcribe: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self._disconnect()
raise
@@ -308,7 +310,8 @@ class AWSTranscribeSTTService(STTService):
await self._ws_client.send(json.dumps(end_stream))
await self._ws_client.close()
except Exception as e:
logger.warning(f"{self} Error closing WebSocket connection: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._ws_client = None
await self._call_event_handler("on_disconnected")
@@ -527,9 +530,7 @@ class AWSTranscribeSTTService(STTService):
elif headers.get(":message-type") == "exception":
error_msg = payload.get("Message", "Unknown error")
logger.error(f"{self} Exception from AWS: {error_msg}")
await self.push_frame(
ErrorFrame(f"AWS Transcribe error: {error_msg}", fatal=False)
)
await self.push_frame(ErrorFrame(f"AWS Transcribe error: {error_msg}"))
else:
logger.debug(f"{self} Other message type received: {headers}")
logger.debug(f"{self} Payload: {payload}")
@@ -537,5 +538,6 @@ class AWSTranscribeSTTService(STTService):
logger.error(f"{self} WebSocket connection closed in receive loop: {e}")
break
except Exception as e:
logger.error(f"{self} Unexpected error in receive loop: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
break

View File

@@ -18,6 +18,7 @@ from loguru import logger
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
@@ -111,13 +112,17 @@ class AzureSTTService(STTService):
audio: Raw audio bytes to process.
Yields:
None - actual transcription frames are pushed via callbacks.
Frame: Either None for successful processing or ErrorFrame on failure.
"""
await self.start_processing_metrics()
await self.start_ttfb_metrics()
if self._audio_stream:
self._audio_stream.write(audio)
yield None
try:
await self.start_processing_metrics()
await self.start_ttfb_metrics()
if self._audio_stream:
self._audio_stream.write(audio)
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
async def start(self, frame: StartFrame):
"""Start the speech recognition service.
@@ -133,17 +138,21 @@ class AzureSTTService(STTService):
if self._audio_stream:
return
stream_format = AudioStreamFormat(samples_per_second=self.sample_rate, channels=1)
self._audio_stream = PushAudioInputStream(stream_format)
try:
stream_format = AudioStreamFormat(samples_per_second=self.sample_rate, channels=1)
self._audio_stream = PushAudioInputStream(stream_format)
audio_config = AudioConfig(stream=self._audio_stream)
audio_config = AudioConfig(stream=self._audio_stream)
self._speech_recognizer = SpeechRecognizer(
speech_config=self._speech_config, audio_config=audio_config
)
self._speech_recognizer.recognizing.connect(self._on_handle_recognizing)
self._speech_recognizer.recognized.connect(self._on_handle_recognized)
self._speech_recognizer.start_continuous_recognition_async()
self._speech_recognizer = SpeechRecognizer(
speech_config=self._speech_config, audio_config=audio_config
)
self._speech_recognizer.recognizing.connect(self._on_handle_recognizing)
self._speech_recognizer.recognized.connect(self._on_handle_recognized)
self._speech_recognizer.start_continuous_recognition_async()
except Exception as e:
logger.error(f"{self} exception during initialization: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
async def stop(self, frame: EndFrame):
"""Stop the speech recognition service.

View File

@@ -337,7 +337,7 @@ class AzureTTSService(AzureBaseTTSService):
if self._speech_synthesizer is None:
error_msg = "Speech synthesizer not initialized."
logger.error(error_msg)
yield ErrorFrame(error_msg)
yield ErrorFrame(error=error_msg)
return
try:
@@ -364,13 +364,15 @@ class AzureTTSService(AzureBaseTTSService):
yield TTSStoppedFrame()
except Exception as e:
logger.error(f"{self} error during synthesis: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
# Could add reconnection logic here if needed
return
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
class AzureHttpTTSService(AzureBaseTTSService):
@@ -448,3 +450,4 @@ class AzureHttpTTSService(AzureBaseTTSService):
logger.warning(f"Speech synthesis canceled: {cancellation_details.reason}")
if cancellation_details.reason == CancellationReason.Error:
logger.error(f"{self} error: {cancellation_details.error_details}")
yield ErrorFrame(error=f"{self} error: {cancellation_details.error_details}")

View File

@@ -20,6 +20,7 @@ from loguru import logger
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
@@ -275,7 +276,8 @@ class CartesiaSTTService(WebsocketSTTService):
self._websocket = await websocket_connect(ws_url, additional_headers=headers)
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self}: unable to connect to Cartesia: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
async def _disconnect_websocket(self):
try:
@@ -284,6 +286,7 @@ class CartesiaSTTService(WebsocketSTTService):
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._websocket = None
await self._call_event_handler("on_disconnected")
@@ -315,7 +318,9 @@ class CartesiaSTTService(WebsocketSTTService):
await self._on_transcript(data)
elif data["type"] == "error":
logger.error(f"Cartesia error: {data.get('message', 'Unknown error')}")
error_msg = data.get("message", "Unknown error")
logger.error(f"Cartesia error: {error_msg}")
await self.push_error(ErrorFrame(error=error_msg))
@traced_stt
async def _handle_transcription(

View File

@@ -397,7 +397,8 @@ class CartesiaTTSService(AudioContextWordTTSService):
)
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -409,7 +410,8 @@ class CartesiaTTSService(AudioContextWordTTSService):
logger.debug("Disconnecting from Cartesia")
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._context_id = None
self._websocket = None
@@ -465,7 +467,7 @@ class CartesiaTTSService(AudioContextWordTTSService):
logger.error(f"{self} error: {msg}")
await self.push_frame(TTSStoppedFrame())
await self.stop_all_metrics()
await self.push_error(ErrorFrame(f"{self} error: {msg['error']}"))
await self.push_error(ErrorFrame(error=f"{self} error: {msg['error']}"))
self._context_id = None
else:
logger.error(f"{self} error, unknown message type: {msg}")
@@ -506,7 +508,8 @@ class CartesiaTTSService(AudioContextWordTTSService):
await self._get_websocket().send(msg)
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} error sending message: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
@@ -514,6 +517,7 @@ class CartesiaTTSService(AudioContextWordTTSService):
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
class CartesiaHttpTTSService(TTSService):
@@ -705,7 +709,7 @@ class CartesiaHttpTTSService(TTSService):
if response.status != 200:
error_text = await response.text()
logger.error(f"Cartesia API error: {error_text}")
await self.push_error(ErrorFrame(f"Cartesia API error: {error_text}"))
await self.push_error(ErrorFrame(error=f"Cartesia API error: {error_text}"))
raise Exception(f"Cartesia API returned status {response.status}: {error_text}")
audio_data = await response.read()
@@ -722,7 +726,7 @@ class CartesiaHttpTTSService(TTSService):
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(f"Error generating TTS: {e}"))
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -191,7 +191,8 @@ class DeepgramFluxSTTService(WebsocketSTTService):
await self._disconnect_websocket()
except Exception as e:
logger.error(f"Error during disconnect: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
# Reset state only after everything is cleaned up
self._websocket = None
@@ -214,7 +215,8 @@ class DeepgramFluxSTTService(WebsocketSTTService):
logger.debug("Connected to Deepgram Flux Websocket")
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -233,6 +235,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._websocket = None
await self._call_event_handler("on_disconnected")
@@ -333,14 +336,14 @@ class DeepgramFluxSTTService(WebsocketSTTService):
"""
if not self._websocket:
logger.error("Not connected to Deepgram Flux.")
yield ErrorFrame("Not connected to Deepgram Flux.", fatal=True)
yield ErrorFrame("Not connected to Deepgram Flux.")
return
try:
await self._websocket.send(audio)
except Exception as e:
logger.error(f"Failed to send audio to Flux: {e}")
yield ErrorFrame(f"Failed to send audio to Flux: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
return
yield None
@@ -417,7 +420,8 @@ class DeepgramFluxSTTService(WebsocketSTTService):
# Skip malformed messages
continue
except Exception as e:
logger.error(f"Error processing message: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
# Error will be handled inside WebsocketService->_receive_task_handler
raise
else:

View File

@@ -256,7 +256,7 @@ class DeepgramSTTService(STTService):
async def _on_error(self, *args, **kwargs):
error: ErrorResponse = kwargs["error"]
logger.warning(f"{self} connection error, will retry: {error}")
await self.push_error(ErrorFrame(f"{error}"))
await self.push_error(ErrorFrame(error=f"{error}"))
await self.stop_all_metrics()
# NOTE(aleix): we don't disconnect (i.e. call finish on the connection)
# because this triggers more errors internally in the Deepgram SDK. So,

View File

@@ -125,8 +125,8 @@ class DeepgramTTSService(TTSService):
yield TTSStoppedFrame()
except Exception as e:
logger.exception(f"{self} exception: {e}")
yield ErrorFrame(f"Error getting audio: {str(e)}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
class DeepgramHttpTTSService(TTSService):

View File

@@ -351,8 +351,8 @@ class ElevenLabsSTTService(SegmentedSTTService):
)
except Exception as e:
logger.error(f"ElevenLabs STT error: {e}")
yield ErrorFrame(f"ElevenLabs STT error: {str(e)}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
def audio_format_from_sample_rate(sample_rate: int) -> str:

View File

@@ -424,7 +424,8 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
json.dumps({"context_id": self._context_id, "close_context": True})
)
except Exception as e:
logger.warning(f"Error closing context for voice settings update: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._context_id = None
self._started = False
@@ -535,8 +536,9 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
logger.error(f"{self} exception: {e}")
self._websocket = None
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self._call_event_handler("on_connection_error", f"{e}")
async def _disconnect_websocket(self):
@@ -551,7 +553,8 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
await self._websocket.close()
logger.debug("Disconnected from ElevenLabs")
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._started = False
self._context_id = None
@@ -581,7 +584,8 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
json.dumps({"context_id": self._context_id, "close_context": True})
)
except Exception as e:
logger.error(f"Error closing context on interruption: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._context_id = None
self._started = False
self._partial_word = ""
@@ -736,13 +740,15 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
else:
await self._send_text(text)
except Exception as e:
logger.error(f"{self} error sending message: {e}")
logger.error(f"{self} exception: {e}")
yield TTSStoppedFrame()
yield ErrorFrame(error=f"{self} error: {e}")
self._started = False
return
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
class ElevenLabsHttpTTSService(WordTTSService):
@@ -1085,7 +1091,8 @@ class ElevenLabsHttpTTSService(WordTTSService):
logger.warning(f"Failed to parse JSON from stream: {e}")
continue
except Exception as e:
logger.error(f"Error processing response: {e}", exc_info=True)
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
continue
# After processing all chunks, emit any remaining partial word
@@ -1109,8 +1116,8 @@ class ElevenLabsHttpTTSService(WordTTSService):
self._previous_text = text
except Exception as e:
logger.error(f"Error in run_tts: {e}")
yield ErrorFrame(error=str(e))
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
finally:
await self.stop_ttfb_metrics()
# Let the parent class handle TTSStoppedFrame

View File

@@ -290,5 +290,5 @@ class FalSTTService(SegmentedSTTService):
)
except Exception as e:
logger.error(f"Fal Wizper error: {e}")
yield ErrorFrame(f"Fal Wizper error: {str(e)}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")

View File

@@ -237,7 +237,8 @@ class FishAudioTTSService(InterruptibleTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"Fish Audio initialization error: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -251,7 +252,8 @@ class FishAudioTTSService(InterruptibleTTSService):
await self._websocket.send(ormsgpack.packb(stop_message))
await self._websocket.close()
except Exception as e:
logger.error(f"Error closing websocket: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._request_id = None
self._started = False
@@ -293,7 +295,8 @@ class FishAudioTTSService(InterruptibleTTSService):
continue
except Exception as e:
logger.error(f"Error processing message: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
@@ -329,7 +332,8 @@ class FishAudioTTSService(InterruptibleTTSService):
flush_message = {"event": "flush"}
await self._get_websocket().send(ormsgpack.packb(flush_message))
except Exception as e:
logger.error(f"{self} error sending message: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
@@ -337,5 +341,5 @@ class FishAudioTTSService(InterruptibleTTSService):
yield None
except Exception as e:
logger.error(f"Error generating TTS: {e}")
yield ErrorFrame(f"Error: {str(e)}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")

View File

@@ -23,6 +23,7 @@ from pipecat import __version__ as pipecat_version
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
@@ -467,7 +468,8 @@ class GladiaSTTService(STTService):
break
except Exception as e:
logger.error(f"Error in connection handler: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._connection_active = False
if not self._should_reconnect:
@@ -557,7 +559,8 @@ class GladiaSTTService(STTService):
except websockets.exceptions.ConnectionClosed:
logger.debug("Connection closed during keepalive")
except Exception as e:
logger.error(f"Error in Gladia keepalive task: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
async def _receive_task_handler(self):
try:
@@ -620,7 +623,8 @@ class GladiaSTTService(STTService):
# Expected when closing the connection
pass
except Exception as e:
logger.error(f"Error in Gladia WebSocket handler: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
async def _maybe_reconnect(self) -> bool:
"""Handle exponential backoff reconnection logic."""

View File

@@ -1174,7 +1174,7 @@ class GeminiLiveLLMService(LLMService):
self._connection_task = self.create_task(self._connection_task_handler(config=config))
except Exception as e:
await self.push_error(ErrorFrame(error=f"{self} Initialization error: {e}", fatal=True))
await self.push_error(ErrorFrame(error=f"{self} Initialization error: {e}"))
async def _connection_task_handler(self, config: LiveConnectConfig):
async with self._client.aio.live.connect(model=self._model_name, config=config) as session:
@@ -1255,9 +1255,7 @@ class GeminiLiveLLMService(LLMService):
f"Max consecutive failures ({MAX_CONSECUTIVE_FAILURES}) reached, "
"treating as fatal error"
)
await self.push_error(
ErrorFrame(error=f"{self} Error in receive loop: {error}", fatal=True)
)
await self.push_error(ErrorFrame(error=f"{self} Error in receive loop: {error}"))
return False
else:
logger.info(

View File

@@ -774,7 +774,8 @@ class GoogleSTTService(STTService):
yield cloud_speech.StreamingRecognizeRequest(audio=audio_data)
except Exception as e:
logger.error(f"Error in request generator: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
raise
async def _stream_audio(self):
@@ -805,14 +806,15 @@ class GoogleSTTService(STTService):
break
except Exception as e:
logger.warning(f"{self} Reconnecting: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await asyncio.sleep(1) # Brief delay before reconnecting
self._stream_start_time = int(time.time() * 1000)
except Exception as e:
logger.error(f"Error in streaming task: {e}")
await self.push_frame(ErrorFrame(str(e)))
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""Process an audio chunk for STT transcription.
@@ -900,7 +902,8 @@ class GoogleSTTService(STTService):
)
raise
except Exception as e:
logger.error(f"Error processing Google STT responses: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
# Re-raise the exception to let it propagate (e.g. in the case of a
# timeout, propagate to _stream_audio to reconnect)
raise

View File

@@ -746,7 +746,7 @@ class GoogleHttpTTSService(TTSService):
yield TTSStoppedFrame()
except Exception as e:
logger.exception(f"{self} error generating TTS: {e}")
logger.error(f"{self} exception: {e}")
error_message = f"TTS generation error: {str(e)}"
yield ErrorFrame(error=error_message)
@@ -1014,7 +1014,7 @@ class GoogleTTSService(GoogleBaseTTSService):
yield frame
except Exception as e:
logger.exception(f"{self} error generating TTS: {e}")
logger.error(f"{self} exception: {e}")
error_message = f"TTS generation error: {str(e)}"
yield ErrorFrame(error=error_message)
@@ -1266,6 +1266,6 @@ class GeminiTTSService(GoogleBaseTTSService):
yield frame
except Exception as e:
logger.exception(f"{self} error generating TTS: {e}")
logger.error(f"{self} exception: {e}")
error_message = f"Gemini TTS generation error: {str(e)}"
yield ErrorFrame(error=error_message)

View File

@@ -13,7 +13,13 @@ from typing import AsyncGenerator, Optional
from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import Frame, TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame
from pipecat.frames.frames import (
ErrorFrame,
Frame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.services.tts_service import TTSService
from pipecat.transcriptions.language import Language
from pipecat.utils.tracing.service_decorators import traced_tts
@@ -150,5 +156,6 @@ class GroqTTSService(TTSService):
yield TTSAudioRawFrame(bytes, frame_rate, channels)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()

View File

@@ -225,8 +225,8 @@ class HumeTTSService(TTSService):
self._audio_bytes = b""
except Exception as e:
logger.exception(f"{self} error generating TTS: {e}")
await self.push_error(ErrorFrame(f"Error generating TTS: {e}"))
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
# Ensure TTFB timer is stopped even on early failures
await self.stop_ttfb_metrics()

View File

@@ -374,7 +374,7 @@ class InworldTTSService(TTSService):
if response.status != 200:
error_text = await response.text()
logger.error(f"Inworld API error: {error_text}")
await self.push_error(ErrorFrame(f"Inworld API error: {error_text}"))
yield ErrorFrame(error=f"Inworld API error: {error_text}")
return
# ================================================================================
@@ -402,7 +402,7 @@ class InworldTTSService(TTSService):
# ================================================================================
# Log any unexpected errors and notify the pipeline
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(f"Error generating TTS: {e}"))
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
# ================================================================================
# STEP 8: CLEANUP AND COMPLETION
@@ -517,7 +517,7 @@ class InworldTTSService(TTSService):
# Extract the base64-encoded audio content from response
if "audioContent" not in response_data:
logger.error("No audioContent in Inworld API response")
await self.push_error(ErrorFrame("No audioContent in response"))
await self.push_error(ErrorFrame(error="No audioContent in response"))
return
# ================================================================================

View File

@@ -223,7 +223,8 @@ class LmntTTSService(InterruptibleTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -239,7 +240,8 @@ class LmntTTSService(InterruptibleTTSService):
# await self._websocket.send(json.dumps({"eof": True}))
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._started = False
self._websocket = None
@@ -276,7 +278,7 @@ class LmntTTSService(InterruptibleTTSService):
logger.error(f"{self} error: {msg['error']}")
await self.push_frame(TTSStoppedFrame())
await self.stop_all_metrics()
await self.push_error(ErrorFrame(f"{self} error: {msg['error']}"))
await self.push_error(ErrorFrame(error=f"{self} error: {msg['error']}"))
return
except json.JSONDecodeError:
logger.error(f"Invalid JSON message: {message}")
@@ -309,7 +311,8 @@ class LmntTTSService(InterruptibleTTSService):
await self._get_websocket().send(json.dumps({"flush": True}))
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} error sending message: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
@@ -317,3 +320,4 @@ class LmntTTSService(InterruptibleTTSService):
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")

View File

@@ -347,8 +347,8 @@ class MiniMaxHttpTTSService(TTSService):
continue
except Exception as e:
logger.exception(f"Error generating TTS: {e}")
yield ErrorFrame(error=f"MiniMax TTS error: {str(e)}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -294,7 +294,8 @@ class NeuphonicTTSService(InterruptibleTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -307,7 +308,8 @@ class NeuphonicTTSService(InterruptibleTTSService):
logger.debug("Disconnecting from Neuphonic")
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._started = False
self._websocket = None
@@ -372,7 +374,8 @@ class NeuphonicTTSService(InterruptibleTTSService):
await self._send_text(text)
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} error sending message: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
@@ -380,6 +383,7 @@ class NeuphonicTTSService(InterruptibleTTSService):
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
class NeuphonicHttpTTSService(TTSService):
@@ -582,7 +586,8 @@ class NeuphonicHttpTTSService(TTSService):
yield TTSAudioRawFrame(audio_bytes, self.sample_rate, 1)
except Exception as e:
logger.error(f"Error processing SSE message: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
# Don't yield error frame for individual message failures
continue
@@ -590,8 +595,8 @@ class NeuphonicHttpTTSService(TTSService):
logger.debug("TTS generation cancelled")
raise
except Exception as e:
logger.exception(f"Error in run_tts: {e}")
yield ErrorFrame(error=f"Neuphonic TTS error: {str(e)}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -478,7 +478,7 @@ class OpenAIRealtimeLLMService(LLMService):
# it is to recover from a send-side error with proper state management, and that exponential
# backoff for retries can have cost/stability implications for a service cluster, let's just
# treat a send-side error as fatal.
await self.push_error(ErrorFrame(error=f"Error sending client event: {e}", fatal=True))
await self.push_error(ErrorFrame(error=f"Error sending client event: {e}"))
async def _update_settings(self):
settings = self._session_properties
@@ -667,9 +667,7 @@ class OpenAIRealtimeLLMService(LLMService):
self._current_assistant_response = None
# error handling
if evt.response.status == "failed":
await self.push_error(
ErrorFrame(error=evt.response.status_details["error"]["message"], fatal=True)
)
await self.push_error(ErrorFrame(error=evt.response.status_details["error"]["message"]))
return
# response content
for item in evt.response.output:
@@ -763,7 +761,7 @@ class OpenAIRealtimeLLMService(LLMService):
async def _handle_evt_error(self, evt):
# Errors are fatal to this connection. Send an ErrorFrame.
await self.push_error(ErrorFrame(error=f"Error: {evt}", fatal=True))
await self.push_error(ErrorFrame(error=f"Error: {evt}"))
#
# state and client events for the current conversation

View File

@@ -199,7 +199,7 @@ class OpenAITTSService(TTSService):
f"{self} error getting audio (status: {r.status_code}, error: {error})"
)
yield ErrorFrame(
f"Error getting audio (status: {r.status_code}, error: {error})"
error=f"Error getting audio (status: {r.status_code}, error: {error})"
)
return
@@ -216,3 +216,4 @@ class OpenAITTSService(TTSService):
yield TTSStoppedFrame()
except BadRequestError as e:
logger.exception(f"{self} error generating TTS: {e}")
yield ErrorFrame(error=f"{self} error: {e}")

View File

@@ -454,7 +454,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
# it is to recover from a send-side error with proper state management, and that exponential
# backoff for retries can have cost/stability implications for a service cluster, let's just
# treat a send-side error as fatal.
await self.push_error(ErrorFrame(error=f"Error sending client event: {e}", fatal=True))
await self.push_error(ErrorFrame(error=f"Error sending client event: {e}"))
async def _update_settings(self):
settings = self._session_properties
@@ -627,9 +627,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
self._current_assistant_response = None
# error handling
if evt.response.status == "failed":
await self.push_error(
ErrorFrame(error=evt.response.status_details["error"]["message"], fatal=True)
)
await self.push_error(ErrorFrame(error=evt.response.status_details["error"]["message"]))
return
# response content
for item in evt.response.output:
@@ -687,7 +685,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
async def _handle_evt_error(self, evt):
# Errors are fatal to this connection. Send an ErrorFrame.
await self.push_error(ErrorFrame(error=f"Error: {evt}", fatal=True))
await self.push_error(ErrorFrame(error=f"Error: {evt}"))
async def _handle_assistant_output(self, output):
# We haven't seen intermixed audio and function_call items in the same response. But let's

View File

@@ -101,7 +101,7 @@ class PiperTTSService(TTSService):
f"{self} error getting audio (status: {response.status}, error: {error})"
)
yield ErrorFrame(
f"Error getting audio (status: {response.status}, error: {error})"
error=f"Error getting audio (status: {response.status}, error: {error})"
)
return
@@ -117,8 +117,8 @@ class PiperTTSService(TTSService):
await self.stop_ttfb_metrics()
yield frame
except Exception as e:
logger.error(f"Error in run_tts: {e}")
yield ErrorFrame(error=str(e))
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
finally:
logger.debug(f"{self}: Finished TTS [{text}]")
await self.stop_ttfb_metrics()

View File

@@ -266,7 +266,8 @@ class PlayHTTTSService(InterruptibleTTSService):
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -279,7 +280,8 @@ class PlayHTTTSService(InterruptibleTTSService):
logger.debug("Disconnecting from PlayHT")
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._request_id = None
self._websocket = None
@@ -350,7 +352,7 @@ class PlayHTTTSService(InterruptibleTTSService):
self._request_id = None
elif "error" in msg:
logger.error(f"{self} error: {msg}")
await self.push_error(ErrorFrame(f"{self} error: {msg['error']}"))
await self.push_error(ErrorFrame(error=f"{self} error: {msg['error']}"))
except json.JSONDecodeError:
logger.error(f"Invalid JSON message: {message}")
@@ -392,7 +394,8 @@ class PlayHTTTSService(InterruptibleTTSService):
await self._get_websocket().send(json.dumps(tts_command))
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} error sending message: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
@@ -402,8 +405,8 @@ class PlayHTTTSService(InterruptibleTTSService):
yield None
except Exception as e:
logger.error(f"{self} error generating TTS: {e}")
yield ErrorFrame(f"{self} error: {str(e)}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
class PlayHTHttpTTSService(TTSService):
@@ -623,7 +626,8 @@ class PlayHTHttpTTSService(TTSService):
yield frame
except Exception as e:
logger.error(f"{self} error generating TTS: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -259,7 +259,8 @@ class RimeTTSService(AudioContextWordTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -271,7 +272,8 @@ class RimeTTSService(AudioContextWordTTSService):
await self._websocket.send(json.dumps(self._build_eos_msg()))
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._context_id = None
self._websocket = None
@@ -367,7 +369,7 @@ class RimeTTSService(AudioContextWordTTSService):
logger.error(f"{self} error: {msg}")
await self.push_frame(TTSStoppedFrame())
await self.stop_all_metrics()
await self.push_error(ErrorFrame(f"{self} error: {msg['message']}"))
await self.push_error(ErrorFrame(error=f"{self} error: {msg['message']}"))
self._context_id = None
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
@@ -409,7 +411,8 @@ class RimeTTSService(AudioContextWordTTSService):
await self._get_websocket().send(json.dumps(msg))
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} error sending message: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
@@ -417,6 +420,7 @@ class RimeTTSService(AudioContextWordTTSService):
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
class RimeHttpTTSService(TTSService):
@@ -574,8 +578,8 @@ class RimeHttpTTSService(TTSService):
yield frame
except Exception as e:
logger.exception(f"Error generating TTS: {e}")
yield ErrorFrame(error=f"Rime TTS error: {str(e)}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -659,8 +659,8 @@ class RivaSegmentedSTTService(SegmentedSTTService):
yield ErrorFrame(f"Unexpected Riva response format: {str(ae)}")
except Exception as e:
logger.exception(f"Riva Canary ASR error: {e}")
yield ErrorFrame(f"Riva Canary ASR error: {str(e)}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
class ParakeetSTTService(RivaSTTService):

View File

@@ -23,6 +23,7 @@ from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import (
ErrorFrame,
Frame,
TTSAudioRawFrame,
TTSStartedFrame,
@@ -165,6 +166,7 @@ class RivaTTSService(TTSService):
add_response(None)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
add_response(None)
await self.start_ttfb_metrics()

View File

@@ -264,7 +264,7 @@ class SarvamHttpTTSService(TTSService):
if response.status != 200:
error_text = await response.text()
logger.error(f"Sarvam API error: {error_text}")
await self.push_error(ErrorFrame(f"Sarvam API error: {error_text}"))
await self.push_error(ErrorFrame(error=f"Sarvam API error: {error_text}"))
return
response_data = await response.json()
@@ -274,7 +274,7 @@ class SarvamHttpTTSService(TTSService):
# Decode base64 audio data
if "audios" not in response_data or not response_data["audios"]:
logger.error("No audio data received from Sarvam API")
await self.push_error(ErrorFrame("No audio data received"))
await self.push_error(ErrorFrame(error="No audio data received"))
return
# Get the first audio (there should be only one for single text input)
@@ -296,7 +296,7 @@ class SarvamHttpTTSService(TTSService):
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(f"Error generating TTS: {e}"))
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()
@@ -578,7 +578,8 @@ class SarvamTTSService(InterruptibleTTSService):
await self._disconnect_websocket()
except Exception as e:
logger.error(f"Error during disconnect: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
# Reset state only after everything is cleaned up
self._started = False
@@ -602,7 +603,8 @@ class SarvamTTSService(InterruptibleTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -618,8 +620,8 @@ class SarvamTTSService(InterruptibleTTSService):
await self._websocket.send(json.dumps(config_message))
logger.debug("Configuration sent successfully")
except Exception as e:
logger.error(f"Failed to send config: {str(e)}")
await self.push_frame(ErrorFrame(f"Failed to send config: {str(e)}"))
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
raise
async def _disconnect_websocket(self):
@@ -632,6 +634,7 @@ class SarvamTTSService(InterruptibleTTSService):
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._started = False
self._websocket = None
@@ -661,7 +664,7 @@ class SarvamTTSService(InterruptibleTTSService):
if "too long" in error_msg.lower() or "timeout" in error_msg.lower():
logger.warning("Connection timeout detected, service may need restart")
await self.push_frame(ErrorFrame(f"TTS Error: {error_msg}"))
await self.push_frame(ErrorFrame(error=f"TTS Error: {error_msg}"))
async def _keepalive_task_handler(self):
"""Handle keepalive messages to maintain WebSocket connection."""
@@ -717,7 +720,8 @@ class SarvamTTSService(InterruptibleTTSService):
await self._send_text(text)
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} error sending message: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
@@ -725,3 +729,4 @@ class SarvamTTSService(InterruptibleTTSService):
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")

View File

@@ -327,8 +327,8 @@ class SonioxSTTService(STTService):
# Expected when closing the connection
logger.debug("WebSocket connection closed, keepalive task stopped.")
except Exception as e:
logger.error(f"{self} error (_keepalive_task_handler): {e}")
await self.push_error(ErrorFrame(f"{self} error (_keepalive_task_handler): {e}"))
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
async def _receive_task_handler(self):
if not self._websocket:
@@ -409,7 +409,7 @@ class SonioxSTTService(STTService):
)
await self.push_error(
ErrorFrame(
f"{self} error: {error_code} (_receive_task_handler) - {error_message}"
error=f"{self} error: {error_code} (_receive_task_handler) - {error_message}"
)
)
@@ -425,5 +425,5 @@ class SonioxSTTService(STTService):
# Expected when closing the connection.
pass
except Exception as e:
logger.error(f"{self} error: {e}")
await self.push_error(ErrorFrame(f"{self} error: {e}"))
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))

View File

@@ -467,8 +467,8 @@ class SpeechmaticsSTTService(STTService):
await self._client.send_audio(audio)
yield None
except Exception as e:
logger.error(f"Speechmatics error: {e}")
yield ErrorFrame(f"Speechmatics error: {e}", fatal=False)
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
await self._disconnect()
def update_params(
@@ -514,6 +514,8 @@ class SpeechmaticsSTTService(STTService):
self._client.send_message(payload), self.get_event_loop()
)
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
raise RuntimeError(f"error sending message to STT: {e}")
async def _connect(self) -> None:
@@ -579,7 +581,8 @@ class SpeechmaticsSTTService(STTService):
logger.debug(f"{self} Connected to Speechmatics STT service")
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} Error connecting to Speechmatics: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._client = None
async def _disconnect(self) -> None:
@@ -593,7 +596,8 @@ class SpeechmaticsSTTService(STTService):
except asyncio.TimeoutError:
logger.warning(f"{self} Timeout while closing Speechmatics client connection")
except Exception as e:
logger.error(f"{self} Error closing Speechmatics client: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._client = None
await self._call_event_handler("on_disconnected")

View File

@@ -246,7 +246,8 @@ class UltravoxSTTService(AIService):
logger.info("Model warm-up completed successfully")
except Exception as e:
logger.warning(f"Model warm-up failed: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
def _generate_silent_audio(self, sample_rate=16000, duration_sec=1.0):
"""Generate silent audio as a numpy array.
@@ -376,7 +377,7 @@ class UltravoxSTTService(AIService):
if arr.size > 0: # Check if array is not empty
audio_arrays.append(arr)
except Exception as e:
logger.error(f"Error processing bytes audio frame: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
# Handle numpy array data
elif isinstance(f.audio, np.ndarray):
if f.audio.size > 0: # Check if array is not empty
@@ -436,14 +437,14 @@ class UltravoxSTTService(AIService):
yield LLMFullResponseEndFrame()
except Exception as e:
logger.error(f"Error generating text from model: {e}")
yield ErrorFrame(f"Error generating text: {str(e)}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
else:
logger.warning("No model available for text generation")
logger.error("No model available for text generation")
yield ErrorFrame("No model available for text generation")
except Exception as e:
logger.error(f"Error processing audio buffer: {e}")
logger.error(f"{self} exception: {e}")
import traceback
logger.error(traceback.format_exc())

View File

@@ -94,7 +94,7 @@ class WebsocketService(ABC):
if self._reconnect_on_error:
retry_count += 1
if retry_count >= MAX_RETRIES:
await report_error(ErrorFrame(message, fatal=True))
await report_error(ErrorFrame(message))
break
logger.warning(f"{self} connection error, will retry: {e}")

View File

@@ -226,8 +226,8 @@ class BaseWhisperSTTService(SegmentedSTTService):
logger.warning("Received empty transcription from API")
except Exception as e:
logger.exception(f"Exception during transcription: {e}")
yield ErrorFrame(f"Error during transcription: {str(e)}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
async def _transcribe(self, audio: bytes) -> Transcription:
"""Transcribe audio data to text.

View File

@@ -428,5 +428,5 @@ class WhisperSTTServiceMLX(WhisperSTTService):
)
except Exception as e:
logger.exception(f"MLX Whisper transcription error: {e}")
yield ErrorFrame(f"MLX Whisper transcription error: {str(e)}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")

View File

@@ -146,7 +146,7 @@ class XTTSService(TTSService):
)
await self.push_error(
ErrorFrame(
f"Error error getting studio speakers (status: {r.status}, error: {text})"
error=f"Error getting studio speakers (status: {r.status}, error: {text})"
)
)
return
@@ -187,7 +187,7 @@ class XTTSService(TTSService):
if r.status != 200:
text = await r.text()
logger.error(f"{self} error getting audio (status: {r.status}, error: {text})")
yield ErrorFrame(f"Error getting audio (status: {r.status}, error: {text})")
yield ErrorFrame(error=f"Error getting audio (status: {r.status}, error: {text})")
return
await self.start_tts_usage_metrics(text)

View File

@@ -23,7 +23,7 @@ if TYPE_CHECKING:
from opentelemetry import context as context_api
from opentelemetry import trace
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_context import NOT_GIVEN, LLMContext
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.utils.tracing.service_attributes import (
add_gemini_live_span_attributes,
@@ -399,11 +399,6 @@ def traced_llm(func: Optional[Callable] = None, *, name: Optional[str] = None) -
if hasattr(self, "get_llm_adapter"):
adapter = self.get_llm_adapter()
messages = adapter.get_messages_for_logging(context)
elif hasattr(context, "get_messages"):
# Fallback for unknown context types
messages = context.get_messages()
elif hasattr(context, "messages"):
messages = context.messages
# Serialize messages if available
if messages:
@@ -424,15 +419,10 @@ def traced_llm(func: Optional[Callable] = None, *, name: Optional[str] = None) -
if hasattr(self, "get_llm_adapter") and hasattr(context, "tools"):
adapter = self.get_llm_adapter()
tools = adapter.from_standard_tools(context.tools)
elif hasattr(context, "tools"):
# Fallback for unknown context types
tools = context.tools
# Serialize and count tools if available
# Check if tools is not None and not NOT_GIVEN (using attribute check as fallback)
if tools is not None and not (
hasattr(tools, "__name__") and tools.__name__ == "NOT_GIVEN"
):
# Check if tools is not None and not NOT_GIVEN
if tools is not None and tools is not NOT_GIVEN:
serialized_tools = json.dumps(tools)
tool_count = len(tools) if isinstance(tools, list) else 1

6
uv.lock generated
View File

@@ -36,12 +36,12 @@ wheels = [
[[package]]
name = "aic-sdk"
version = "1.0.2"
version = "1.1.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "numpy" },
]
sdist = { url = "https://files.pythonhosted.org/packages/51/90/b02e853e863c303f8456c689b42ac24ad403b781adc9642d0a91ed4bed7e/aic_sdk-1.0.2.tar.gz", hash = "sha256:239097dd3aaa8a8a0fd7542b75d2510cb34144caec796370639b7c636acbc56e", size = 32059, upload-time = "2025-08-24T09:20:03.9Z" }
sdist = { url = "https://files.pythonhosted.org/packages/99/83/bf38b95d98c67b8ebc574fb4a4f23c07a3740b51992d7522976173d30b98/aic_sdk-1.1.0.tar.gz", hash = "sha256:04e08df695581c8cb4db8acca20e73815e9f449e7bd08e0162fd55518c727963", size = 34954, upload-time = "2025-11-11T20:45:24.25Z" }
[[package]]
name = "aioboto3"
@@ -4647,7 +4647,7 @@ docs = [
[package.metadata]
requires-dist = [
{ name = "accelerate", marker = "extra == 'moondream'", specifier = "~=1.10.0" },
{ name = "aic-sdk", marker = "extra == 'aic'", specifier = "~=1.0.1" },
{ name = "aic-sdk", marker = "extra == 'aic'", specifier = "~=1.1.0" },
{ name = "aioboto3", marker = "extra == 'aws'", specifier = "~=15.0.0" },
{ name = "aiofiles", specifier = ">=24.1.0,<25" },
{ name = "aiohttp", specifier = ">=3.11.12,<4" },