Compare commits

..

3 Commits

Author SHA1 Message Date
James Hush
8bbfa829d3 Remove wait 2025-11-26 12:27:02 +01:00
James Hush
c2eb663bdc Add TurnAwareTranscriptProcessor for turn-based transcript tracking
- Implements TurnAwareTranscriptProcessor that combines user and assistant transcript tracking with turn boundary detection
- Correctly handles interruptions by capturing only what was actually spoken
- Emits on_turn_started and on_turn_ended events with accumulated transcripts
- Handles async frame processing with strategic delays to ensure proper text accumulation
- Adds comprehensive tests covering basic flow, interruptions, and multiple turns
- Includes documentation and usage examples
2025-11-26 12:26:25 +01:00
James Hush
bf055843e6 Fix race condition in DeepgramFluxSTTService reconnection
Moved _receive_task and _watchdog_task creation from _connect_websocket() to _connect() to prevent multiple coroutines from attempting to receive from the websocket simultaneously during reconnection.

Previously, when reconnection occurred, _connect_websocket() would be called while the existing _receive_task was still running, causing both to try to receive from the websocket. This resulted in the error: 'cannot call recv while another coroutine is already running recv or recv_streaming'.

Now tasks are created only once during initial connection, and reconnection only re-establishes the websocket connection itself. This matches the pattern used by other websocket services in the codebase.

Fixes issue reported in 0.0.95 where reconnection attempts would fail with recv errors.
2025-11-26 10:11:19 +01:00
87 changed files with 1082 additions and 682 deletions

View File

@@ -5,32 +5,10 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.0.96] - 2025-11-26 🦃 "Happy Thanksgiving!" 🦃
## [Unreleased]
### Added
- Enhanced error handling across the framework:
- Added `on_error` callback to `FrameProcessor` for centralized error
handling.
- Renamed `push_error(error: ErrorFrame)` to `push_error_frame(error: ErrorFrame)`
for clarity.
- Added new `push_error` method for simplified error reporting:
```python
async def push_error(error_msg: str,
exception: Optional[Exception] = None,
fatal: bool = False)
```
- Standardized error logging by replacing `logger.exception` calls with
`logger.error` throughout the codebase.
- Added `cache_read_input_tokens`, `cache_creation_input_tokens` and
`reasoning_tokens` to OTel spans for LLM call
- Added `LiveKitRESTHelper` utility class for managing LiveKit rooms via REST API.
- Added `DeepgramSageMakerSTTService` which connects to a SageMaker hosted
@@ -110,18 +88,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added new emotions: calm and fluent
- Added `enable_logging` to `SimliVideoService` input parameters. It's disabled
by default.
### Changed
- Updated `FishAudioTTSService` default model to `s1`.
- Updated `DeepgramTTSService` to use Deepgram's TTS websocket API. ⚠️ This is
a potential breaking change, which only affects you if you're self-hosting
`DeepgramTTSService`. The new service uses Websockets and improves TTFB
latency.
- Updated `daily-python` to 0.22.0.
- `BaseTextAggregator` changes:
@@ -279,11 +247,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed an issue in `AWSBedrockLLMService` where the `aws_region` arg was
always set to `us-east-1`.
- Fixed an issue with `DeepgramFluxSTTService` where it sometimes failed to reconnect.
- Fixed an issue in `ElevenLabsRealtimeSTTService` where dynamic language
updates were not working.

View File

@@ -0,0 +1,103 @@
# TurnAwareTranscriptProcessor Example
## Overview
The `TurnAwareTranscriptProcessor` combines user and assistant transcript tracking with turn boundary detection. It correctly handles interruptions by only capturing what was actually spoken.
## Basic Usage
```python
from pipecat.processors.transcript_processor import TurnAwareTranscriptProcessor
# Create the processor
turn_processor = TurnAwareTranscriptProcessor()
# Register event handlers
@turn_processor.event_handler("on_turn_started")
async def handle_turn_started(processor, turn_number):
print(f"Turn {turn_number} started")
@turn_processor.event_handler("on_turn_ended")
async def handle_turn_ended(processor, turn_number, user_text, assistant_text, was_interrupted):
print(f"\nTurn {turn_number} ended:")
print(f" User said: {user_text}")
print(f" Assistant said: {assistant_text}")
print(f" Was interrupted: {was_interrupted}")
@turn_processor.event_handler("on_transcript_update")
async def handle_transcript_update(processor, frame):
for msg in frame.messages:
print(f"[{msg.role}]: {msg.content}")
# Add to pipeline
pipeline = Pipeline([
transport.input(),
stt,
turn_processor, # Process transcripts and track turns
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
])
```
## Features
1. **Turn Boundary Detection**: Automatically detects when turns start and end based on user and bot speaking patterns
2. **Interruption Handling**: Correctly captures only what was actually spoken when interruptions occur
3. **Real-time Transcripts**: Emits transcript messages for both user and assistant speech
4. **Turn Events**: Provides start/end events with accumulated transcripts for each turn
## Events
### on_turn_started
Emitted when a new turn begins (user starts speaking).
**Handler signature**: `async def handler(processor, turn_number)`
### on_turn_ended
Emitted when a turn ends with accumulated transcripts.
**Handler signature**: `async def handler(processor, turn_number, user_transcript, assistant_transcript, was_interrupted)`
### on_transcript_update
Inherited from `BaseTranscriptProcessor`, emitted for individual transcript messages.
**Handler signature**: `async def handler(processor, frame)`
## Turn Logic
- Turns start when the user begins speaking (`UserStartedSpeakingFrame`)
- Turns end when:
- The user starts speaking again (previous turn ends, new turn starts)
- The bot is interrupted (`InterruptionFrame`)
- The pipeline ends (`EndFrame`/`CancelFrame`)
## Integration with OpenTelemetry
You can use turn events to enrich OpenTelemetry spans:
```python
from pipecat.utils.tracing.turn_trace_observer import TurnTraceObserver
turn_tracker = TurnTrackingObserver()
turn_tracer = TurnTraceObserver(turn_tracker)
turn_processor = TurnAwareTranscriptProcessor()
@turn_processor.event_handler("on_turn_ended")
async def add_transcripts_to_span(processor, turn_number, user_text, assistant_text, interrupted):
# Get current span and add transcript data
from opentelemetry import trace
current_span = trace.get_current_span()
if current_span:
current_span.set_attribute("turn.user_text", user_text)
current_span.set_attribute("turn.assistant_text", assistant_text)
```
## Notes
- The processor handles async frame processing correctly by delaying turn end until frames are processed
- Works with word-level timestamps from TTS services like Cartesia
- Accumulates both user (`TranscriptionFrame`) and assistant (`TTSTextFrame`) speech
- Emits individual transcript messages in addition to turn-level aggregation

View File

@@ -55,7 +55,7 @@ azure = [ "azure-cognitiveservices-speech~=1.42.0"]
cartesia = [ "cartesia~=2.0.3", "pipecat-ai[websockets-base]" ]
cerebras = []
daily = [ "daily-python~=0.22.0" ]
deepgram = [ "deepgram-sdk~=4.7.0", "pipecat-ai[websockets-base]" ]
deepgram = [ "deepgram-sdk~=4.7.0" ]
deepseek = []
elevenlabs = [ "pipecat-ai[websockets-base]" ]
fal = [ "fal-client~=0.5.9" ]

View File

@@ -327,19 +327,23 @@ class TextFrame(DataFrame):
Parameters:
text: The text content.
skip_tts: Whether this text should skip TTS processing.
"""
text: str
skip_tts: bool = field(default=False, kw_only=True)
skip_tts: bool = field(init=False)
# Whether any necessary inter-frame (leading/trailing) spaces are already
# included in the text.
# NOTE: Ideally this would be available at init time with a default value,
# but that would impact how subclasses can be initialized (it would require
# mandatory fields of theirs to have defaults to preserve
# non-default-before-default argument order)
includes_inter_frame_spaces: bool = field(init=False)
# Whether this text frame should be appended to the LLM context.
append_to_context: bool = field(init=False)
def __post_init__(self):
super().__post_init__()
self.skip_tts = False
self.includes_inter_frame_spaces = False
self.append_to_context = True
@@ -831,13 +835,11 @@ class ErrorFrame(SystemFrame):
error: Description of the error that occurred.
fatal: Whether the error is fatal and requires bot shutdown.
processor: The frame processor that generated the error.
exception: The exception that occurred.
"""
error: str
fatal: bool = False
processor: Optional["FrameProcessor"] = None
exception: Optional[Exception] = None
def __str__(self):
return f"{self.name}(error: {self.error}, fatal: {self.fatal})"
@@ -1626,23 +1628,24 @@ class LLMFullResponseStartFrame(ControlFrame):
Used to indicate the beginning of an LLM response. Followed by one or
more TextFrames and a final LLMFullResponseEndFrame.
Parameters:
skip_tts: Whether LLM output should skip TTS processing.
"""
skip_tts: bool = field(default=False, kw_only=True)
skip_tts: bool = field(init=False)
def __post_init__(self):
super().__post_init__()
self.skip_tts = False
@dataclass
class LLMFullResponseEndFrame(ControlFrame):
"""Frame indicating the end of an LLM response.
"""Frame indicating the end of an LLM response."""
Parameters:
skip_tts: Whether LLM output should skip TTS processing.
"""
skip_tts: bool = field(init=False)
skip_tts: bool = field(default=False, kw_only=True)
def __post_init__(self):
super().__post_init__()
self.skip_tts = False
@dataclass

View File

@@ -126,4 +126,6 @@ class WakeCheckFilter(FrameProcessor):
else:
await self.push_frame(frame, direction)
except Exception as e:
await self.push_error(error_msg=f"Error in wake word filter: {e}", exception=e)
error_msg = f"Error in wake word filter: {e}"
logger.exception(error_msg)
await self.push_error(ErrorFrame(error_msg))

View File

@@ -142,7 +142,6 @@ class FrameProcessor(BaseObject):
- on_after_process_frame: Called after a frame is processed
- on_before_push_frame: Called before a frame is pushed
- on_after_push_frame: Called after a frame is pushed
- on_error: Called when an error is raised in the frame processing.
"""
def __init__(
@@ -235,7 +234,6 @@ class FrameProcessor(BaseObject):
self._register_event_handler("on_after_process_frame", sync=True)
self._register_event_handler("on_before_push_frame", sync=True)
self._register_event_handler("on_after_push_frame", sync=True)
self._register_event_handler("on_error", sync=True)
@property
def id(self) -> int:
@@ -632,43 +630,7 @@ class FrameProcessor(BaseObject):
elif isinstance(frame, (FrameProcessorResumeFrame, FrameProcessorResumeUrgentFrame)):
await self.__resume(frame)
async def push_error(
self,
error_msg: str,
exception: Optional[Exception] = None,
fatal: bool = False,
):
"""Creates and pushes an ErrorFrame upstream.
Creates and pushes an ErrorFrame upstream to notify other processors in the
pipeline about an error condition. The error frame will include context about
which processor generated the error.
Args:
error_msg: Descriptive message explaining the error condition.
exception: Optional exception object that caused the error, if available.
This provides additional context for debugging and error handling.
fatal: Whether this error should be considered fatal to the pipeline.
Fatal errors typically cause the entire pipeline to stop processing.
Defaults to False for non-fatal errors.
Example::
```python
# Non-fatal error
await self.push_error("Failed to process audio chunk, skipping")
# Fatal error with exception context
try:
result = some_critical_operation()
except Exception as e:
await self.push_error("Critical operation failed", exception=e, fatal=True)
```
"""
error_frame = ErrorFrame(error=error_msg, fatal=fatal, exception=exception, processor=self)
await self.push_error_frame(error=error_frame)
async def push_error_frame(self, error: ErrorFrame):
async def push_error(self, error: ErrorFrame):
"""Push an error frame upstream.
Args:
@@ -676,8 +638,6 @@ class FrameProcessor(BaseObject):
"""
if not error.processor:
error.processor = self
await self._call_event_handler("on_error", error)
logger.error(f"{error.processor} error: {error.error}")
await self.push_frame(error, FrameDirection.UPSTREAM)
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
@@ -799,10 +759,8 @@ class FrameProcessor(BaseObject):
await self.__cancel_process_task()
self.__create_process_task()
except Exception as e:
await self.push_error(
error_msg=f"Uncaught exception handling _start_interruption: {e}",
exception=e,
)
logger.exception(f"Uncaught exception in {self} when handling _start_interruption: {e}")
await self.push_error(ErrorFrame(str(e)))
async def __internal_push_frame(self, frame: Frame, direction: FrameDirection):
"""Internal method to push frames to adjacent processors.
@@ -839,7 +797,8 @@ class FrameProcessor(BaseObject):
await self._observer.on_push_frame(data)
await self._prev.queue_frame(frame, direction)
except Exception as e:
await self.push_error(error_msg=f"Uncaught exception: {e}", exception=e)
logger.exception(f"Uncaught exception in {self}: {e}")
await self.push_error(ErrorFrame(str(e)))
def _check_started(self, frame: Frame):
"""Check if the processor has been started.
@@ -915,7 +874,8 @@ class FrameProcessor(BaseObject):
await self._call_event_handler("on_after_process_frame", frame)
except Exception as e:
await self.push_error(error_msg=f"Error processing frame: {e}", exception=e)
logger.exception(f"{self}: error processing frame: {e}")
await self.push_error(ErrorFrame(str(e)))
async def __input_frame_task_handler(self):
"""Handle frames from the input queue.

View File

@@ -24,7 +24,7 @@ try:
from langchain_core.messages import AIMessageChunk
from langchain_core.runnables import Runnable
except ModuleNotFoundError as e:
logger.error("In order to use Langchain, you need to `pip install pipecat-ai[langchain]`. ")
logger.exception("In order to use Langchain, you need to `pip install pipecat-ai[langchain]`. ")
raise Exception(f"Missing module: {e}")
@@ -113,6 +113,6 @@ class LangchainProcessor(FrameProcessor):
except GeneratorExit:
logger.warning(f"{self} generator was closed prematurely")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.exception(f"{self} an unknown error occurred: {e}")
finally:
await self.push_frame(LLMFullResponseEndFrame())

View File

@@ -23,7 +23,7 @@ try:
from strands import Agent
from strands.multiagent.graph import Graph
except ModuleNotFoundError as e:
logger.error("In order to use Strands Agents, you need to `pip install strands-agents`.")
logger.exception("In order to use Strands Agents, you need to `pip install strands-agents`.")
raise Exception(f"Missing module: {e}")
@@ -143,7 +143,7 @@ class StrandsAgentsProcessor(FrameProcessor):
except GeneratorExit:
logger.warning(f"{self} generator was closed prematurely")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.exception(f"{self} an unknown error occurred: {e}")
finally:
if ttfb_tracking:
await self.stop_ttfb_metrics()

View File

@@ -15,6 +15,7 @@ from typing import List, Optional
from loguru import logger
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
@@ -24,6 +25,7 @@ from pipecat.frames.frames import (
TranscriptionMessage,
TranscriptionUpdateFrame,
TTSTextFrame,
UserStartedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.string import TextPartForConcatenation, concatenate_aggregated_text
@@ -306,3 +308,267 @@ class TranscriptProcessor:
return handler
return decorator
class TurnAwareTranscriptProcessor(BaseTranscriptProcessor):
"""Processes transcripts with turn boundary awareness.
This processor combines user and assistant transcript tracking with turn
detection, emitting events when turns start and end. It correctly handles
interruptions by only capturing what was actually spoken.
Turn boundaries are detected based on:
- User started speaking (UserStartedSpeakingFrame)
- Bot stopped speaking (BotStoppedSpeakingFrame)
- Interruptions (InterruptionFrame)
Events:
on_turn_started: Emitted when a new turn begins.
Handler signature: async def handler(processor, turn_number)
on_turn_ended: Emitted when a turn ends.
Handler signature: async def handler(processor, turn_number,
user_transcript, assistant_transcript,
was_interrupted)
on_transcript_update: Inherited from BaseTranscriptProcessor, emitted for
individual transcript messages.
Example::
turn_processor = TurnAwareTranscriptProcessor()
@turn_processor.event_handler("on_turn_started")
async def handle_turn_started(processor, turn_number):
print(f"Turn {turn_number} started")
@turn_processor.event_handler("on_turn_ended")
async def handle_turn_ended(processor, turn_number, user_text, assistant_text, interrupted):
print(f"Turn {turn_number} ended")
print(f"User said: {user_text}")
print(f"Assistant said: {assistant_text}")
print(f"Was interrupted: {interrupted}")
pipeline = Pipeline([
transport.input(),
stt,
turn_processor,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
])
"""
def __init__(self, **kwargs):
"""Initialize the turn-aware transcript processor.
Args:
**kwargs: Additional arguments passed to parent class.
"""
super().__init__(**kwargs)
# Turn tracking state
self._turn_number = 0
self._turn_active = False
self._turn_start_time: Optional[str] = None
# Accumulate text for current turn
self._current_turn_user_parts: List[TextPartForConcatenation] = []
self._current_turn_assistant_parts: List[TextPartForConcatenation] = []
# Track bot speaking state
self._bot_is_speaking = False
# Register turn events
self._register_event_handler("on_turn_started")
self._register_event_handler("on_turn_ended")
async def _start_turn(self):
"""Start a new turn."""
if not self._turn_active:
self._turn_number += 1
self._turn_active = True
self._turn_start_time = time_now_iso8601()
self._current_turn_user_parts = []
self._current_turn_assistant_parts = []
logger.debug(f"Turn {self._turn_number} started")
await self._call_event_handler("on_turn_started", self._turn_number)
async def _end_turn(self, was_interrupted: bool = False):
"""End the current turn and emit aggregated transcripts.
Args:
was_interrupted: Whether the turn ended due to an interruption.
"""
if not self._turn_active:
return
# Aggregate user text
user_transcript = ""
if self._current_turn_user_parts:
user_transcript = concatenate_aggregated_text(self._current_turn_user_parts)
# Aggregate assistant text
assistant_transcript = ""
if self._current_turn_assistant_parts:
assistant_transcript = concatenate_aggregated_text(self._current_turn_assistant_parts)
# Emit turn ended event
logger.debug(
f"Turn {self._turn_number} ended (interrupted={was_interrupted}). "
f"User: '{user_transcript}', Assistant: '{assistant_transcript}'"
)
await self._call_event_handler(
"on_turn_ended",
self._turn_number,
user_transcript,
assistant_transcript,
was_interrupted,
)
# Reset turn state
self._turn_active = False
self._current_turn_user_parts = []
self._current_turn_assistant_parts = []
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames for turn-aware transcript tracking.
Handles:
- UserStartedSpeakingFrame: Start new turn
- TranscriptionFrame: Accumulate user speech and emit transcript message
- BotStartedSpeakingFrame: Track bot speaking state
- TTSTextFrame: Accumulate assistant speech
- BotStoppedSpeakingFrame: End turn if no interruption pending
- InterruptionFrame: End turn immediately as interrupted
- EndFrame/CancelFrame: End any active turn
Args:
frame: Input frame to process.
direction: Frame processing direction.
"""
await super().process_frame(frame, direction)
if isinstance(frame, UserStartedSpeakingFrame):
# User started speaking
if self._bot_is_speaking:
# This is an interruption - end the current turn with what was spoken
if self._current_turn_assistant_parts:
assistant_content = concatenate_aggregated_text(
self._current_turn_assistant_parts
)
if assistant_content:
message = TranscriptionMessage(
role="assistant",
content=assistant_content,
timestamp=self._turn_start_time or time_now_iso8601(),
)
await self._emit_update([message])
await self._end_turn(was_interrupted=True)
self._bot_is_speaking = False
elif self._turn_active:
# Previous turn is ending normally (bot finished speaking)
if self._current_turn_assistant_parts:
assistant_content = concatenate_aggregated_text(
self._current_turn_assistant_parts
)
if assistant_content:
message = TranscriptionMessage(
role="assistant",
content=assistant_content,
timestamp=self._turn_start_time or time_now_iso8601(),
)
await self._emit_update([message])
await self._end_turn(was_interrupted=False)
# Start a new turn
await self._start_turn()
await self.push_frame(frame, direction)
elif isinstance(frame, TranscriptionFrame):
# Accumulate user speech for the current turn
if self._turn_active:
self._current_turn_user_parts.append(
TextPartForConcatenation(frame.text, includes_inter_part_spaces=True)
)
# Also emit individual transcript message
message = TranscriptionMessage(
role="user",
user_id=frame.user_id,
content=frame.text,
timestamp=frame.timestamp,
)
await self._emit_update([message])
await self.push_frame(frame, direction)
elif isinstance(frame, BotStartedSpeakingFrame):
# Bot started speaking
self._bot_is_speaking = True
await self.push_frame(frame, direction)
elif isinstance(frame, TTSTextFrame):
# Accumulate assistant speech for the current turn
if self._turn_active:
self._current_turn_assistant_parts.append(
TextPartForConcatenation(
frame.text, includes_inter_part_spaces=frame.includes_inter_frame_spaces
)
)
await self.push_frame(frame, direction)
elif isinstance(frame, BotStoppedSpeakingFrame):
# Bot stopped speaking - just mark it, don't end turn yet
# Turn will end when next user speaks or pipeline ends
self._bot_is_speaking = False
await self.push_frame(frame, direction)
elif isinstance(frame, InterruptionFrame):
# Emit assistant transcript message with what was spoken before interruption
if self._current_turn_assistant_parts:
assistant_content = concatenate_aggregated_text(self._current_turn_assistant_parts)
if assistant_content:
message = TranscriptionMessage(
role="assistant",
content=assistant_content,
timestamp=self._turn_start_time or time_now_iso8601(),
)
await self._emit_update([message])
# Push frame first to ensure proper cleanup
await self.push_frame(frame, direction)
# End turn as interrupted
await self._end_turn(was_interrupted=True)
self._bot_is_speaking = False
elif isinstance(frame, (EndFrame, CancelFrame)):
# Pipeline ending - finalize any active turn
if self._turn_active:
# Emit any pending assistant transcript (allow time for TTSTextFrames to be processed)
# Give a brief moment for any pending frames to process
import asyncio
await asyncio.sleep(0.001)
if self._current_turn_assistant_parts:
assistant_content = concatenate_aggregated_text(
self._current_turn_assistant_parts
)
if assistant_content:
message = TranscriptionMessage(
role="assistant",
content=assistant_content,
timestamp=self._turn_start_time or time_now_iso8601(),
)
await self._emit_update([message])
await self._end_turn(was_interrupted=isinstance(frame, CancelFrame))
await self.push_frame(frame, direction)
else:
await self.push_frame(frame, direction)

View File

@@ -199,7 +199,7 @@ class PlivoFrameSerializer(FrameSerializer):
)
except Exception as e:
logger.error(f"Failed to hang up Plivo call: {e}")
logger.exception(f"Failed to hang up Plivo call: {e}")
async def deserialize(self, data: str | bytes) -> Frame | None:
"""Deserializes Plivo WebSocket data to Pipecat frames.

View File

@@ -225,7 +225,7 @@ class TelnyxFrameSerializer(FrameSerializer):
)
except Exception as e:
logger.error(f"Failed to hang up Telnyx call: {e}")
logger.exception(f"Failed to hang up Telnyx call: {e}")
async def deserialize(self, data: str | bytes) -> Frame | None:
"""Deserializes Telnyx WebSocket data to Pipecat frames.

View File

@@ -236,7 +236,7 @@ class TwilioFrameSerializer(FrameSerializer):
)
except Exception as e:
logger.error(f"Failed to hang up Twilio call: {e}")
logger.exception(f"Failed to hang up Twilio call: {e}")
async def deserialize(self, data: str | bytes) -> Frame | None:
"""Deserializes Twilio WebSocket data to Pipecat frames.

View File

@@ -166,6 +166,6 @@ class AIService(FrameProcessor):
async for f in generator:
if f:
if isinstance(f, ErrorFrame):
await self.push_error_frame(f)
await self.push_error(f)
else:
await self.push_frame(f)

View File

@@ -327,7 +327,7 @@ class AnthropicLLMService(LLMService):
cache_read_input_tokens = 0
try:
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
await self.push_frame(LLMFullResponseStartFrame())
await self.start_processing_metrics()
params_from_context = self._get_llm_invocation_params(context)
@@ -373,9 +373,7 @@ class AnthropicLLMService(LLMService):
if event.type == "content_block_delta":
if hasattr(event.delta, "text"):
await self.push_frame(
LLMTextFrame(event.delta.text, skip_tts=self._get_skip_tts())
)
await self.push_frame(LLMTextFrame(event.delta.text))
completion_tokens_estimate += self._estimate_tokens(event.delta.text)
elif hasattr(event.delta, "partial_json") and tool_use_block:
json_accumulator += event.delta.partial_json
@@ -460,10 +458,11 @@ class AnthropicLLMService(LLMService):
except httpx.TimeoutException:
await self._call_event_handler("on_completion_timeout")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.exception(f"{self} exception: {e}")
await self.push_error(ErrorFrame(f"{e}"))
finally:
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame(skip_tts=self._get_skip_tts()))
await self.push_frame(LLMFullResponseEndFrame())
comp_tokens = (
completion_tokens
if not use_completion_tokens_estimate

View File

@@ -206,8 +206,9 @@ class AssemblyAISTTService(STTService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
self._connected = False
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
raise
async def _disconnect(self):
@@ -232,7 +233,8 @@ class AssemblyAISTTService(STTService):
logger.warning("Timed out waiting for termination message from server")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
if self._receive_task:
await self.cancel_task(self._receive_task)
@@ -240,7 +242,8 @@ class AssemblyAISTTService(STTService):
await self._websocket.close()
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._websocket = None
@@ -259,11 +262,13 @@ class AssemblyAISTTService(STTService):
except websockets.exceptions.ConnectionClosedOK:
break
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
break
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
def _parse_message(self, message: Dict[str, Any]) -> BaseMessage:
"""Parse a raw message into the appropriate message type."""
@@ -292,7 +297,8 @@ class AssemblyAISTTService(STTService):
elif isinstance(parsed_message, TerminationMessage):
await self._handle_termination(parsed_message)
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
async def _handle_termination(self, message: TerminationMessage):
"""Handle termination message."""

View File

@@ -228,7 +228,8 @@ class AsyncAITTSService(InterruptibleTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -240,7 +241,8 @@ class AsyncAITTSService(InterruptibleTTSService):
logger.debug("Disconnecting from Async")
await self._websocket.close()
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._websocket = None
self._started = False
@@ -285,11 +287,12 @@ class AsyncAITTSService(InterruptibleTTSService):
)
await self.push_frame(frame)
elif msg.get("error_code"):
logger.error(f"{self} error: {msg}")
await self.push_frame(TTSStoppedFrame())
await self.stop_all_metrics()
await self.push_error(error_msg=f"Error: {msg['message']}")
await self.push_error(ErrorFrame(error=f"{self} error: {msg['message']}"))
else:
await self.push_error(error_msg=f"Unknown message type: {msg}")
logger.error(f"{self} error, unknown message type: {msg}")
async def _keepalive_task_handler(self):
"""Send periodic keepalive messages to maintain WebSocket connection."""
@@ -332,14 +335,16 @@ class AsyncAITTSService(InterruptibleTTSService):
await self._get_websocket().send(msg)
await self.start_tts_usage_metrics(text)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
return
yield None
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
class AsyncAIHttpTTSService(TTSService):
@@ -472,7 +477,8 @@ class AsyncAIHttpTTSService(TTSService):
async with self._session.post(url, json=payload, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
await self.push_error(error_msg=f"Async API error: {error_text}")
logger.error(f"Async API error: {error_text}")
await self.push_error(ErrorFrame(error=f"Async API error: {error_text}"))
raise Exception(f"Async API returned status {response.status}: {error_text}")
audio_data = await response.read()
@@ -488,7 +494,8 @@ class AsyncAIHttpTTSService(TTSService):
yield frame
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -172,7 +172,7 @@ class AWSAgentCoreProcessor(FrameProcessor):
await asyncio.sleep(self._output_response_timeout)
if self._output_response_open:
self._output_response_open = False
await self.push_frame(LLMFullResponseEndFrame(skip_tts=self._get_skip_tts()))
await self.push_frame(LLMFullResponseEndFrame())
async def _push_text_frame(self, text: str):
"""Push a text frame, managing output response bookends."""
@@ -182,11 +182,11 @@ class AWSAgentCoreProcessor(FrameProcessor):
# Open output response if needed
if not self._output_response_open:
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
await self.push_frame(LLMFullResponseStartFrame())
self._output_response_open = True
# Push the text frame
await self.push_frame(LLMTextFrame(text, skip_tts=self._get_skip_tts()))
await self.push_frame(LLMTextFrame(text))
self._last_text_frame_time = asyncio.get_event_loop().time()
# Schedule closing the output response after timeout
@@ -253,6 +253,6 @@ class AWSAgentCoreProcessor(FrameProcessor):
if self._close_task and not self._close_task.done():
await self.cancel_task(self._close_task)
self._output_response_open = False
await self.push_frame(LLMFullResponseEndFrame(skip_tts=self._get_skip_tts()))
await self.push_frame(LLMFullResponseEndFrame())
else:
await self.push_frame(frame, direction)

View File

@@ -734,7 +734,7 @@ class AWSBedrockLLMService(LLMService):
aws_access_key: Optional[str] = None,
aws_secret_key: Optional[str] = None,
aws_session_token: Optional[str] = None,
aws_region: Optional[str] = None,
aws_region: str = "us-east-1",
params: Optional[InputParams] = None,
client_config: Optional[Config] = None,
retry_timeout_secs: Optional[float] = 5.0,
@@ -981,7 +981,7 @@ class AWSBedrockLLMService(LLMService):
using_noop_tool = False
try:
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
await self.push_frame(LLMFullResponseStartFrame())
await self.start_processing_metrics()
await self.start_ttfb_metrics()
@@ -1078,9 +1078,7 @@ class AWSBedrockLLMService(LLMService):
if "contentBlockDelta" in event:
delta = event["contentBlockDelta"]["delta"]
if "text" in delta:
await self.push_frame(
LLMTextFrame(delta["text"], skip_tts=self._get_skip_tts())
)
await self.push_frame(LLMTextFrame(delta["text"]))
completion_tokens_estimate += self._estimate_tokens(delta["text"])
elif "toolUse" in delta and "input" in delta["toolUse"]:
# Handle partial JSON for tool use
@@ -1138,10 +1136,10 @@ class AWSBedrockLLMService(LLMService):
except (ReadTimeoutError, asyncio.TimeoutError):
await self._call_event_handler("on_completion_timeout")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.exception(f"{self} exception: {e}")
finally:
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame(skip_tts=self._get_skip_tts()))
await self.push_frame(LLMFullResponseEndFrame())
comp_tokens = (
completion_tokens
if not use_completion_tokens_estimate

View File

@@ -453,7 +453,7 @@ class AWSNovaSonicLLMService(LLMService):
self._ready_to_send_context = True
await self._finish_connecting_if_context_available()
except Exception as e:
await self.push_error(error_msg=f"Initialization error: {e}", exception=e)
logger.error(f"{self} initialization error: {e}")
await self._disconnect()
async def _process_completed_function_calls(self, send_new_results: bool):
@@ -577,7 +577,7 @@ class AWSNovaSonicLLMService(LLMService):
logger.info("Finished disconnecting")
except Exception as e:
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
logger.error(f"{self} error disconnecting: {e}")
def _create_client(self) -> BedrockRuntimeClient:
config = Config(
@@ -885,7 +885,7 @@ class AWSNovaSonicLLMService(LLMService):
# Errors are kind of expected while disconnecting, so just
# ignore them and do nothing
return
await self.push_error(error_msg=f"Error processing responses: {e}", exception=e)
logger.error(f"{self} error processing responses: {e}")
if self._wants_connection:
await self.reset_conversation()
@@ -1016,7 +1016,7 @@ class AWSNovaSonicLLMService(LLMService):
logger.debug("Assistant response started")
# Report the start of the assistant response.
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
await self.push_frame(LLMFullResponseStartFrame())
# Report that equivalent of TTS (this is a speech-to-speech model) started
await self.push_frame(TTSStartedFrame())
@@ -1062,7 +1062,7 @@ class AWSNovaSonicLLMService(LLMService):
# We also need to re-push the LLMFullResponseStartFrame since the
# TTSTextFrame would be ignored otherwise (the interruption frame
# would have cleared the assistant aggregator state).
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
await self.push_frame(LLMFullResponseStartFrame())
frame = TTSTextFrame(
self._assistant_text_buffer, aggregated_by=AggregationType.SENTENCE
)
@@ -1071,7 +1071,7 @@ class AWSNovaSonicLLMService(LLMService):
self._may_need_repush_assistant_text = False
# Report the end of the assistant response.
await self.push_frame(LLMFullResponseEndFrame(skip_tts=self._get_skip_tts()))
await self.push_frame(LLMFullResponseEndFrame())
# Report that equivalent of TTS (this is a speech-to-speech model) stopped.
await self.push_frame(TTSStoppedFrame())

View File

@@ -140,7 +140,8 @@ class AWSTranscribeSTTService(STTService):
return
logger.warning("WebSocket connection not established after connect")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
retry_count += 1
if retry_count < max_retries:
await asyncio.sleep(1) # Wait before retrying
@@ -181,7 +182,8 @@ class AWSTranscribeSTTService(STTService):
try:
await self._connect()
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
return
# Format the audio data according to AWS event stream format
@@ -198,11 +200,13 @@ class AWSTranscribeSTTService(STTService):
await self._disconnect()
# Don't yield error here - we'll retry on next frame
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
await self._disconnect()
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
await self._disconnect()
async def _connect(self):
@@ -285,7 +289,8 @@ class AWSTranscribeSTTService(STTService):
await self._call_event_handler("on_connected")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self._disconnect()
raise
@@ -305,7 +310,8 @@ class AWSTranscribeSTTService(STTService):
await self._ws_client.send(json.dumps(end_stream))
await self._ws_client.close()
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._ws_client = None
await self._call_event_handler("on_disconnected")
@@ -523,15 +529,15 @@ class AWSTranscribeSTTService(STTService):
)
elif headers.get(":message-type") == "exception":
error_msg = payload.get("Message", "Unknown error")
await self.push_error(error_msg=f"AWS Transcribe error: {error_msg}")
logger.error(f"{self} Exception from AWS: {error_msg}")
await self.push_frame(ErrorFrame(f"AWS Transcribe error: {error_msg}"))
else:
logger.debug(f"{self} Other message type received: {headers}")
logger.debug(f"{self} Payload: {payload}")
except websockets.exceptions.ConnectionClosed as e:
await self.push_error(
error_msg=f"WebSocket connection closed in receive loop", exception=e
)
logger.error(f"{self} WebSocket connection closed in receive loop: {e}")
break
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
break

View File

@@ -312,6 +312,7 @@ class AWSPollyTTSService(TTSService):
yield TTSStoppedFrame()
except (BotoCoreError, ClientError) as error:
logger.exception(f"{self} error generating TTS: {error}")
error_message = f"AWS Polly TTS error: {str(error)}"
yield ErrorFrame(error=error_message)

View File

@@ -91,6 +91,7 @@ class AzureImageGenServiceREST(ImageGenService):
while status != "succeeded":
attempts_left -= 1
if attempts_left == 0:
logger.error(f"{self} error: image generation timed out")
yield ErrorFrame("Image generation timed out")
return
@@ -103,6 +104,7 @@ class AzureImageGenServiceREST(ImageGenService):
image_url = json_response["result"]["data"][0]["url"] if json_response else None
if not image_url:
logger.error(f"{self} error: image generation failed")
yield ErrorFrame("Image generation failed")
return

View File

@@ -61,5 +61,5 @@ class AzureRealtimeLLMService(OpenAIRealtimeLLMService):
)
self._receive_task = self.create_task(self._receive_task_handler())
except Exception as e:
await self.push_error(error_msg=f"initialization error: {e}", exception=e)
logger.error(f"{self} initialization error: {e}")
self._websocket = None

View File

@@ -121,7 +121,8 @@ class AzureSTTService(STTService):
self._audio_stream.write(audio)
yield None
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
async def start(self, frame: StartFrame):
"""Start the speech recognition service.
@@ -150,9 +151,8 @@ class AzureSTTService(STTService):
self._speech_recognizer.recognized.connect(self._on_handle_recognized)
self._speech_recognizer.start_continuous_recognition_async()
except Exception as e:
await self.push_error(
error_msg=f"Uncaught exception during initialization: {e}", exception=e
)
logger.error(f"{self} exception during initialization: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
async def stop(self, frame: EndFrame):
"""Stop the speech recognition service.

View File

@@ -327,6 +327,7 @@ class AzureTTSService(AzureBaseTTSService):
try:
if self._speech_synthesizer is None:
error_msg = "Speech synthesizer not initialized."
logger.error(error_msg)
yield ErrorFrame(error=error_msg)
return
@@ -354,13 +355,15 @@ class AzureTTSService(AzureBaseTTSService):
yield TTSStoppedFrame()
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
# Could add reconnection logic here if needed
return
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
class AzureHttpTTSService(AzureBaseTTSService):
@@ -437,6 +440,5 @@ class AzureHttpTTSService(AzureBaseTTSService):
cancellation_details = result.cancellation_details
logger.warning(f"Speech synthesis canceled: {cancellation_details.reason}")
if cancellation_details.reason == CancellationReason.Error:
yield ErrorFrame(
error=f"Unknown error occurred: {cancellation_details.error_details}"
)
logger.error(f"{self} error: {cancellation_details.error_details}")
yield ErrorFrame(error=f"{self} error: {cancellation_details.error_details}")

View File

@@ -276,7 +276,8 @@ class CartesiaSTTService(WebsocketSTTService):
self._websocket = await websocket_connect(ws_url, additional_headers=headers)
await self._call_event_handler("on_connected")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
async def _disconnect_websocket(self):
try:
@@ -284,7 +285,8 @@ class CartesiaSTTService(WebsocketSTTService):
logger.debug("Disconnecting from Cartesia STT")
await self._websocket.close()
except Exception as e:
await self.push_error(error_msg=f"Error closing websocket: {e}", exception=e)
logger.error(f"{self} error closing websocket: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._websocket = None
await self._call_event_handler("on_disconnected")
@@ -317,7 +319,8 @@ class CartesiaSTTService(WebsocketSTTService):
elif data["type"] == "error":
error_msg = data.get("message", "Unknown error")
await self.push_error(error_msg=error_msg)
logger.error(f"Cartesia error: {error_msg}")
await self.push_error(ErrorFrame(error=error_msg))
@traced_stt
async def _handle_transcription(

View File

@@ -497,7 +497,8 @@ class CartesiaTTSService(AudioContextWordTTSService):
)
await self._call_event_handler("on_connected")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -509,7 +510,8 @@ class CartesiaTTSService(AudioContextWordTTSService):
logger.debug("Disconnecting from Cartesia")
await self._websocket.close()
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._context_id = None
self._websocket = None
@@ -562,12 +564,13 @@ class CartesiaTTSService(AudioContextWordTTSService):
)
await self.append_to_audio_context(msg["context_id"], frame)
elif msg["type"] == "error":
logger.error(f"{self} error: {msg}")
await self.push_frame(TTSStoppedFrame())
await self.stop_all_metrics()
await self.push_error(error_msg=f"Error: {msg}")
await self.push_error(ErrorFrame(error=f"{self} error: {msg['error']}"))
self._context_id = None
else:
await self.push_error(error_msg=f"Error, unknown message type: {msg}")
logger.error(f"{self} error, unknown message type: {msg}")
async def _receive_messages(self):
while True:
@@ -605,14 +608,16 @@ class CartesiaTTSService(AudioContextWordTTSService):
await self._get_websocket().send(msg)
await self.start_tts_usage_metrics(text)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
return
yield None
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
class CartesiaHttpTTSService(TTSService):
@@ -803,7 +808,8 @@ class CartesiaHttpTTSService(TTSService):
async with session.post(url, json=payload, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
yield ErrorFrame(error=f"Cartesia API error: {error_text}")
logger.error(f"Cartesia API error: {error_text}")
await self.push_error(ErrorFrame(error=f"Cartesia API error: {error_text}"))
raise Exception(f"Cartesia API returned status {response.status}: {error_text}")
audio_data = await response.read()
@@ -819,7 +825,8 @@ class CartesiaHttpTTSService(TTSService):
yield frame
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -150,17 +150,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
params=params
)
"""
# Note: For DeepgramFluxSTTService, differently from other processes, we need to create
# the _receive_task inside _connect_websocket, because the websocket should only be
# considered connected and ready to send audio once we receive from Flux the message
# which confirms the connection has been established.
# If we try to keep the logic reconnect_on_error, when receiving a message, the
# _receive_task_handler would try to reconnect in case of error, invoking the
# _connect_websocket again and leading to a case where the first _receive_task_handler
# was never destroyed.
# So we can keep it here as false, because inside the method send_with_retry, it will
# already try to reconnect if needed.
super().__init__(sample_rate=sample_rate, reconnect_on_error=False, **kwargs)
super().__init__(sample_rate=sample_rate, **kwargs)
self._api_key = api_key
self._url = url
@@ -193,6 +183,14 @@ class DeepgramFluxSTTService(WebsocketSTTService):
"""
await self._connect_websocket()
# Creating the receiver task (only created once during initial connection)
if not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
# Creating the watchdog task (only created once during initial connection)
if not self._watchdog_task:
self._watchdog_task = self.create_task(self._watchdog_task_handler())
async def _disconnect(self):
"""Disconnect from WebSocket and clean up tasks.
@@ -202,7 +200,8 @@ class DeepgramFluxSTTService(WebsocketSTTService):
try:
await self._disconnect_websocket()
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
# Reset state only after everything is cleaned up
self._websocket = None
@@ -244,23 +243,14 @@ class DeepgramFluxSTTService(WebsocketSTTService):
additional_headers={"Authorization": f"Token {self._api_key}"},
)
# Creating the receiver task
if not self._receive_task:
self._receive_task = self.create_task(
self._receive_task_handler(self._report_error)
)
# Creating the watchdog task
if not self._watchdog_task:
self._watchdog_task = self.create_task(self._watchdog_task_handler())
# Now wait for the connection established event
logger.debug("WebSocket connected, waiting for server confirmation...")
await self._connection_established_event.wait()
logger.debug("Connected to Deepgram Flux Websocket")
await self._call_event_handler("on_connected")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -288,7 +278,8 @@ class DeepgramFluxSTTService(WebsocketSTTService):
logger.debug("Disconnecting from Deepgram Flux Websocket")
await self._websocket.close()
except Exception as e:
await self.push_error(error_msg=f"Error closing websocket: {e}", exception=e)
logger.error(f"{self} error closing websocket: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._websocket = None
await self._call_event_handler("on_disconnected")
@@ -298,13 +289,10 @@ class DeepgramFluxSTTService(WebsocketSTTService):
This signals to the server that no more audio data will be sent.
"""
try:
if self._websocket:
logger.debug("Sending CloseStream message to Deepgram Flux")
message = {"type": "CloseStream"}
await self._websocket.send(json.dumps(message))
except Exception as e:
await self.push_error(error_msg=f"Error sending closeStream: {e}", exception=e)
if self._websocket:
logger.debug("Sending CloseStream message to Deepgram Flux")
message = {"type": "CloseStream"}
await self._websocket.send(json.dumps(message))
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
@@ -391,13 +379,16 @@ class DeepgramFluxSTTService(WebsocketSTTService):
are issues sending the audio data.
"""
if not self._websocket:
logger.error("Not connected to Deepgram Flux.")
yield ErrorFrame("Not connected to Deepgram Flux.")
return
try:
self._last_stt_time = time.monotonic()
await self.send_with_retry(audio, self._report_error)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
return
yield None
@@ -474,7 +465,8 @@ class DeepgramFluxSTTService(WebsocketSTTService):
# Skip malformed messages
continue
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
# Error will be handled inside WebsocketService->_receive_task_handler
raise
else:

View File

@@ -233,7 +233,7 @@ class DeepgramSTTService(STTService):
)
if not await self._connection.start(options=self._settings, addons=self._addons):
await self.push_error(error_msg=f"Unable to connect to Deepgram")
logger.error(f"{self}: unable to connect to Deepgram")
async def _disconnect(self):
if await self._connection.is_connected():
@@ -256,7 +256,7 @@ class DeepgramSTTService(STTService):
async def _on_error(self, *args, **kwargs):
error: ErrorResponse = kwargs["error"]
logger.warning(f"{self} connection error, will retry: {error}")
await self.push_error(error_msg=f"{error}")
await self.push_error(ErrorFrame(error=f"{error}"))
await self.stop_all_metrics()
# NOTE(aleix): we don't disconnect (i.e. call finish on the connection)
# because this triggers more errors internally in the Deepgram SDK. So,

View File

@@ -210,7 +210,8 @@ class DeepgramSageMakerSTTService(STTService):
try:
await self._client.send_audio_chunk(audio)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"Error sending audio to SageMaker: {e}")
await self.push_error(ErrorFrame(error=f"SageMaker STT error: {e}"))
yield None
async def _connect(self):
@@ -259,7 +260,8 @@ class DeepgramSageMakerSTTService(STTService):
await self._call_event_handler("on_connected")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"Failed to connect to SageMaker: {e}")
await self.push_error(ErrorFrame(error=f"SageMaker connection error: {e}"))
await self._call_event_handler("on_connection_error", str(e))
async def _disconnect(self):
@@ -340,7 +342,8 @@ class DeepgramSageMakerSTTService(STTService):
except asyncio.CancelledError:
logger.debug("Response processor cancelled")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"Error processing responses: {e}", exc_info=True)
await self.push_error(ErrorFrame(error=f"SageMaker response error: {e}"))
finally:
logger.debug("Response processor stopped")

View File

@@ -10,45 +10,35 @@ This module provides integration with Deepgram's text-to-speech API
for generating speech from text using various voice models.
"""
import json
from typing import AsyncGenerator, Optional
import aiohttp
from loguru import logger
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterruptionFrame,
LLMFullResponseEndFrame,
StartFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.tts_service import TTSService, WebsocketTTSService
from pipecat.services.tts_service import TTSService
from pipecat.utils.tracing.service_decorators import traced_tts
try:
from websockets.asyncio.client import connect as websocket_connect
from websockets.protocol import State
from deepgram import DeepgramClient, DeepgramClientOptions, SpeakOptions
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use DeepgramWebsocketTTSService, you need to `pip install pipecat-ai[deepgram]`."
)
logger.error("In order to use Deepgram, you need to `pip install pipecat-ai[deepgram]`.")
raise Exception(f"Missing module: {e}")
class DeepgramTTSService(WebsocketTTSService):
"""Deepgram WebSocket-based text-to-speech service.
class DeepgramTTSService(TTSService):
"""Deepgram text-to-speech service.
Provides real-time text-to-speech synthesis using Deepgram's WebSocket API.
Supports streaming audio generation with interruption handling via the Clear
message for conversational AI use cases.
Provides text-to-speech synthesis using Deepgram's streaming API.
Supports various voice models and audio encoding formats with
configurable sample rates and quality settings.
"""
def __init__(
@@ -56,211 +46,42 @@ class DeepgramTTSService(WebsocketTTSService):
*,
api_key: str,
voice: str = "aura-2-helena-en",
base_url: str = "wss://api.deepgram.com",
base_url: str = "",
sample_rate: Optional[int] = None,
encoding: str = "linear16",
**kwargs,
):
"""Initialize the Deepgram WebSocket TTS service.
"""Initialize the Deepgram TTS service.
Args:
api_key: Deepgram API key for authentication.
voice: Voice model to use for synthesis. Defaults to "aura-2-helena-en".
base_url: WebSocket base URL for Deepgram API. Defaults to "wss://api.deepgram.com".
base_url: Custom base URL for Deepgram API. Uses default if empty.
sample_rate: Audio sample rate in Hz. If None, uses service default.
encoding: Audio encoding format. Defaults to "linear16".
**kwargs: Additional arguments passed to parent InterruptibleTTSService class.
**kwargs: Additional arguments passed to parent TTSService class.
"""
super().__init__(sample_rate=sample_rate, **kwargs)
self._api_key = api_key
self._base_url = base_url
self._settings = {
"encoding": encoding,
}
self.set_voice(voice)
self._receive_task = None
client_options = DeepgramClientOptions(url=base_url)
self._deepgram_client = DeepgramClient(api_key, config=client_options)
def can_generate_metrics(self) -> bool:
"""Check if the service can generate metrics.
Returns:
True, as Deepgram WebSocket TTS service supports metrics generation.
True, as Deepgram TTS service supports metrics generation.
"""
return True
async def start(self, frame: StartFrame):
"""Start the Deepgram WebSocket TTS service.
Args:
frame: The start frame containing initialization parameters.
"""
await super().start(frame)
await self._connect()
async def stop(self, frame: EndFrame):
"""Stop the Deepgram WebSocket TTS service.
Args:
frame: The end frame.
"""
await super().stop(frame)
await self._disconnect()
async def cancel(self, frame: CancelFrame):
"""Cancel the Deepgram WebSocket TTS service.
Args:
frame: The cancel frame.
"""
await super().cancel(frame)
await self._disconnect()
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames with special handling for LLM response end.
Args:
frame: The frame to process.
direction: The direction of frame processing.
"""
await super().process_frame(frame, direction)
# When the LLM finishes responding, flush any remaining text in Deepgram's buffer
if isinstance(frame, (LLMFullResponseEndFrame, EndFrame)):
await self.flush_audio()
async def _connect(self):
"""Connect to Deepgram WebSocket and start receive task."""
await self._connect_websocket()
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
async def _disconnect(self):
"""Disconnect from Deepgram WebSocket and clean up tasks."""
if self._receive_task:
await self.cancel_task(self._receive_task)
self._receive_task = None
await self._disconnect_websocket()
async def _connect_websocket(self):
"""Connect to Deepgram WebSocket API with configured settings."""
try:
if self._websocket and self._websocket.state is State.OPEN:
return
logger.debug("Connecting to Deepgram WebSocket")
# Build WebSocket URL with query parameters
params = []
params.append(f"model={self._voice_id}")
params.append(f"encoding={self._settings['encoding']}")
params.append(f"sample_rate={self.sample_rate}")
url = f"{self._base_url}/v1/speak?{'&'.join(params)}"
headers = {"Authorization": f"Token {self._api_key}"}
self._websocket = await websocket_connect(url, additional_headers=headers)
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
async def _disconnect_websocket(self):
"""Close WebSocket connection and reset state."""
try:
await self.stop_all_metrics()
if self._websocket:
logger.debug("Disconnecting from Deepgram WebSocket")
# Send Close message to gracefully close the connection
await self._websocket.send(json.dumps({"type": "Close"}))
await self._websocket.close()
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._websocket = None
await self._call_event_handler("on_disconnected")
def _get_websocket(self):
"""Get active websocket connection or raise exception."""
if self._websocket:
return self._websocket
raise Exception("Websocket not connected")
async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection):
"""Handle interruption by sending Clear message to Deepgram.
The Clear message will clear Deepgram's internal text buffer and stop
sending audio, allowing for a new response to be generated.
"""
await super()._handle_interruption(frame, direction)
# Send Clear message to stop current audio generation
if self._websocket:
try:
clear_msg = {"type": "Clear"}
await self._websocket.send(json.dumps(clear_msg))
except Exception as e:
logger.error(f"{self} error sending Clear message: {e}")
async def _receive_messages(self):
"""Receive and process messages from Deepgram WebSocket."""
async for message in self._get_websocket():
if isinstance(message, bytes):
# Binary message contains audio data
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(message, self.sample_rate, 1)
await self.push_frame(frame)
elif isinstance(message, str):
# Text message contains metadata or control messages
try:
msg = json.loads(message)
msg_type = msg.get("type")
if msg_type == "Metadata":
logger.trace(f"Received metadata: {msg}")
elif msg_type == "Flushed":
logger.trace(f"Received Flushed: {msg}")
# Flushed indicates the end of audio generation for the current buffer
# This happens after flush_audio() is called
await self.push_frame(TTSStoppedFrame())
elif msg_type == "Cleared":
logger.trace(f"Received Cleared: {msg}")
# Buffer has been cleared after interruption
# TTSStoppedFrame will be sent by the interruption handler
elif msg_type == "Warning":
logger.warning(
f"{self} warning: {msg.get('description', 'Unknown warning')}"
)
else:
logger.debug(f"Received unknown message type: {msg}")
except json.JSONDecodeError:
logger.error(f"Invalid JSON message: {message}")
async def flush_audio(self):
"""Flush any pending audio synthesis by sending Flush command.
This should be called when the LLM finishes a complete response to force
generation of audio from Deepgram's internal text buffer.
"""
if self._websocket:
try:
flush_msg = {"type": "Flush"}
await self._websocket.send(json.dumps(flush_msg))
except Exception as e:
logger.error(f"{self} error sending Flush message: {e}")
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using Deepgram's WebSocket TTS API.
"""Generate speech from text using Deepgram's TTS API.
Args:
text: The text to synthesize into speech.
@@ -270,27 +91,33 @@ class DeepgramTTSService(WebsocketTTSService):
"""
logger.debug(f"{self}: Generating TTS [{text}]")
options = SpeakOptions(
model=self._voice_id,
encoding=self._settings["encoding"],
sample_rate=self.sample_rate,
container="none",
)
try:
# Reconnect if the websocket is closed
if not self._websocket or self._websocket.state is State.CLOSED:
await self._connect()
await self.start_ttfb_metrics()
await self.start_tts_usage_metrics(text)
response = await self._deepgram_client.speak.asyncrest.v("1").stream_raw(
{"text": text}, options
)
await self.start_tts_usage_metrics(text)
yield TTSStartedFrame()
# Send text message to Deepgram
# Note: We don't send Flush here - that should only be sent when the
# LLM finishes a complete response via flush_audio()
speak_msg = {"type": "Speak", "text": text}
await self._get_websocket().send(json.dumps(speak_msg))
async for data in response.aiter_bytes():
await self.stop_ttfb_metrics()
if data:
yield TTSAudioRawFrame(audio=data, sample_rate=self.sample_rate, num_channels=1)
# The actual audio frames will be handled in _receive_messages
yield None
yield TTSStoppedFrame()
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
class DeepgramHttpTTSService(TTSService):
@@ -400,4 +227,5 @@ class DeepgramHttpTTSService(TTSService):
yield TTSStoppedFrame()
except Exception as e:
logger.exception(f"{self} exception: {e}")
yield ErrorFrame(f"Error getting audio: {str(e)}")

View File

@@ -351,7 +351,8 @@ class ElevenLabsSTTService(SegmentedSTTService):
)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
def audio_format_from_sample_rate(sample_rate: int) -> str:
@@ -597,6 +598,7 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
}
await self._websocket.send(json.dumps(message))
except Exception as e:
logger.error(f"Error sending audio: {e}")
yield ErrorFrame(f"ElevenLabs Realtime STT error: {str(e)}")
yield None
@@ -661,9 +663,8 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
await self._call_event_handler("on_connected")
logger.debug("Connected to ElevenLabs Realtime STT")
except Exception as e:
await self.push_error(
error_msg=f"Unable to connect to ElevenLabs Realtime STT: {e}", exception=e
)
logger.error(f"{self}: unable to connect to ElevenLabs Realtime STT: {e}")
await self.push_error(ErrorFrame(f"Connection error: {str(e)}"))
async def _disconnect_websocket(self):
"""Disconnect from ElevenLabs Realtime STT WebSocket."""
@@ -672,7 +673,7 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
logger.debug("Disconnecting from ElevenLabs Realtime STT")
await self._websocket.close()
except Exception as e:
await self.push_error(error_msg=f"Error closing websocket: {e}", exception=e)
logger.error(f"{self} error closing websocket: {e}")
finally:
self._websocket = None
await self._call_event_handler("on_disconnected")
@@ -732,17 +733,17 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
elif message_type == "error":
error_msg = data.get("error", "Unknown error")
logger.error(f"ElevenLabs error: {error_msg}")
await self.push_error(error_msg=f"Error: {error_msg}")
await self.push_error(ErrorFrame(f"Error: {error_msg}"))
elif message_type == "auth_error":
error_msg = data.get("error", "Authentication error")
logger.error(f"ElevenLabs auth error: {error_msg}")
await self.push_error(error_msg=f"Auth error: {error_msg}")
await self.push_error(ErrorFrame(f"Auth error: {error_msg}"))
elif message_type == "quota_exceeded_error":
error_msg = data.get("error", "Quota exceeded")
logger.error(f"ElevenLabs quota exceeded: {error_msg}")
await self.push_error(error_msg=f"Quota exceeded: {error_msg}")
await self.push_error(ErrorFrame(f"Quota exceeded: {error_msg}"))
else:
logger.debug(f"Unknown message type: {message_type}")

View File

@@ -424,7 +424,8 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
json.dumps({"context_id": self._context_id, "close_context": True})
)
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._context_id = None
self._started = False
@@ -535,8 +536,9 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
self._websocket = None
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self._call_event_handler("on_connection_error", f"{e}")
async def _disconnect_websocket(self):
@@ -551,7 +553,8 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
await self._websocket.close()
logger.debug("Disconnected from ElevenLabs")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._started = False
self._context_id = None
@@ -581,7 +584,8 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
json.dumps({"context_id": self._context_id, "close_context": True})
)
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._context_id = None
self._started = False
self._partial_word = ""
@@ -736,13 +740,15 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
else:
await self._send_text(text)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield TTSStoppedFrame()
yield ErrorFrame(error=f"Unknown error occurred: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
self._started = False
return
yield None
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
class ElevenLabsHttpTTSService(WordTTSService):
@@ -1037,6 +1043,7 @@ class ElevenLabsHttpTTSService(WordTTSService):
) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"{self} error: {error_text}")
yield ErrorFrame(error=f"ElevenLabs API error: {error_text}")
return
@@ -1084,7 +1091,8 @@ class ElevenLabsHttpTTSService(WordTTSService):
logger.warning(f"Failed to parse JSON from stream: {e}")
continue
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
continue
# After processing all chunks, emit any remaining partial word
@@ -1108,7 +1116,8 @@ class ElevenLabsHttpTTSService(WordTTSService):
self._previous_text = text
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
finally:
await self.stop_ttfb_metrics()
# Let the parent class handle TTSStoppedFrame

View File

@@ -110,6 +110,7 @@ class FalImageGenService(ImageGenService):
image_url = response["images"][0]["url"] if response else None
if not image_url:
logger.error(f"{self} error: image generation failed")
yield ErrorFrame("Image generation failed")
return

View File

@@ -290,4 +290,5 @@ class FalSTTService(SegmentedSTTService):
)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")

View File

@@ -76,7 +76,7 @@ class FishAudioTTSService(InterruptibleTTSService):
api_key: str,
reference_id: Optional[str] = None, # This is the voice ID
model: Optional[str] = None, # Deprecated
model_id: str = "s1",
model_id: str = "speech-1.5",
output_format: FishAudioOutputFormat = "pcm",
sample_rate: Optional[int] = None,
params: Optional[InputParams] = None,
@@ -93,7 +93,7 @@ class FishAudioTTSService(InterruptibleTTSService):
The `model` parameter is deprecated and will be removed in version 0.1.0.
Use `reference_id` instead to specify the voice model.
model_id: Specify which Fish Audio TTS model to use (e.g. "s1")
model_id: Specify which Fish Audio TTS model to use (e.g. "speech-1.5")
output_format: Audio output format. Defaults to "pcm".
sample_rate: Audio sample rate. If None, uses default.
params: Additional input parameters for voice customization.
@@ -228,7 +228,8 @@ class FishAudioTTSService(InterruptibleTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -242,7 +243,8 @@ class FishAudioTTSService(InterruptibleTTSService):
await self._websocket.send(ormsgpack.packb(stop_message))
await self._websocket.close()
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._request_id = None
self._started = False
@@ -284,7 +286,8 @@ class FishAudioTTSService(InterruptibleTTSService):
continue
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
@@ -320,7 +323,8 @@ class FishAudioTTSService(InterruptibleTTSService):
flush_message = {"event": "flush"}
await self._get_websocket().send(ormsgpack.packb(flush_message))
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
@@ -328,4 +332,5 @@ class FishAudioTTSService(InterruptibleTTSService):
yield None
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")

View File

@@ -468,7 +468,8 @@ class GladiaSTTService(STTService):
break
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._connection_active = False
if not self._should_reconnect:
@@ -558,7 +559,8 @@ class GladiaSTTService(STTService):
except websockets.exceptions.ConnectionClosed:
logger.debug("Connection closed during keepalive")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
async def _receive_task_handler(self):
try:
@@ -621,7 +623,8 @@ class GladiaSTTService(STTService):
# Expected when closing the connection
pass
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
async def _maybe_reconnect(self) -> bool:
"""Handle exponential backoff reconnection logic."""
@@ -629,9 +632,7 @@ class GladiaSTTService(STTService):
return False
self._reconnection_attempts += 1
if self._reconnection_attempts > self._max_reconnection_attempts:
await self.push_error(
error_msg=f"Max reconnection attempts ({self._max_reconnection_attempts}) reached",
)
logger.error(f"Max reconnection attempts ({self._max_reconnection_attempts}) reached")
self._should_reconnect = False
return False
delay = self._reconnection_delay * (2 ** (self._reconnection_attempts - 1))

View File

@@ -1175,7 +1175,7 @@ class GeminiLiveLLMService(LLMService):
self._connection_task = self.create_task(self._connection_task_handler(config=config))
except Exception as e:
await self.push_error(error_msg=f"Initialization error: {e}", exception=e)
await self.push_error(ErrorFrame(error=f"{self} Initialization error: {e}"))
async def _connection_task_handler(self, config: LiveConnectConfig):
async with self._client.aio.live.connect(model=self._model_name, config=config) as session:
@@ -1252,11 +1252,11 @@ class GeminiLiveLLMService(LLMService):
)
if self._consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
error_msg = (
logger.error(
f"Max consecutive failures ({MAX_CONSECUTIVE_FAILURES}) reached, "
"treating as fatal error"
)
await self.push_error(error_msg=error_msg, exception=error)
await self.push_error(ErrorFrame(error=f"{self} Error in receive loop: {error}"))
return False
else:
logger.info(
@@ -1284,7 +1284,7 @@ class GeminiLiveLLMService(LLMService):
self._completed_tool_calls = set()
self._disconnecting = False
except Exception as e:
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
logger.error(f"{self} error disconnecting: {e}")
async def _send_user_audio(self, frame):
"""Send user audio frame to Gemini Live API."""
@@ -1448,11 +1448,11 @@ class GeminiLiveLLMService(LLMService):
# Update bot responding state and send service start frame
# (AUDIO modality case)
await self._set_bot_is_responding(True)
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
await self.push_frame(LLMFullResponseStartFrame())
self._bot_text_buffer += text
self._search_result_buffer += text # Also accumulate for grounding
frame = LLMTextFrame(text=text, skip_tts=self._get_skip_tts())
frame = LLMTextFrame(text=text)
await self.push_frame(frame)
# Check for grounding metadata in server content
@@ -1491,7 +1491,7 @@ class GeminiLiveLLMService(LLMService):
if not self._bot_is_responding:
await self._set_bot_is_responding(True)
await self.push_frame(TTSStartedFrame())
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
await self.push_frame(LLMFullResponseStartFrame())
self._bot_audio_buffer.extend(audio)
frame = TTSAudioRawFrame(
@@ -1552,10 +1552,10 @@ class GeminiLiveLLMService(LLMService):
if not text:
# AUDIO modality case
await self.push_frame(TTSStoppedFrame())
await self.push_frame(LLMFullResponseEndFrame(skip_tts=self._get_skip_tts()))
await self.push_frame(LLMFullResponseEndFrame())
else:
# TEXT modality case
await self.push_frame(LLMFullResponseEndFrame(skip_tts=self._get_skip_tts()))
await self.push_frame(LLMFullResponseEndFrame())
@traced_stt
async def _handle_user_transcription(
@@ -1643,7 +1643,7 @@ class GeminiLiveLLMService(LLMService):
if not self._bot_is_responding:
await self._set_bot_is_responding(True)
await self.push_frame(TTSStartedFrame())
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
await self.push_frame(LLMFullResponseStartFrame())
frame = TTSTextFrame(text=text, aggregated_by=AggregationType.SENTENCE)
# Gemini Live text already includes any necessary inter-chunk spaces
@@ -1723,8 +1723,6 @@ class GeminiLiveLLMService(LLMService):
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens,
cache_read_input_tokens=usage.cached_content_token_count,
reasoning_tokens=usage.thoughts_token_count,
)
await self.start_llm_usage_metrics(tokens)
@@ -1745,7 +1743,7 @@ class GeminiLiveLLMService(LLMService):
# state management, and that exponential backoff for retries can have
# cost/stability implications for a service cluster, let's just treat a
# send-side error as fatal.
await self.push_error(error_msg=f"Send error: {error}")
await self.push_error(ErrorFrame(error=f"{self} Send error: {error}", fatal=True))
def create_context_aggregator(
self,

View File

@@ -110,6 +110,7 @@ class GoogleImageGenService(ImageGenService):
await self.stop_ttfb_metrics()
if not response or not response.generated_images:
logger.error(f"{self} error: image generation failed")
yield ErrorFrame("Image generation failed")
return
@@ -127,4 +128,5 @@ class GoogleImageGenService(ImageGenService):
yield frame
except Exception as e:
logger.error(f"{self} error generating image: {e}")
yield ErrorFrame(f"Image generation error: {str(e)}")

View File

@@ -793,7 +793,7 @@ class GoogleLLMService(LLMService):
return
generation_params.setdefault("thinking_config", {})["thinking_budget"] = 0
except Exception as e:
logger.error(f"Failed to unset thinking budget: {e}")
logger.exception(f"Failed to unset thinking budget: {e}")
async def _stream_content(
self, params_from_context: GeminiLLMInvocationParams
@@ -876,7 +876,7 @@ class GoogleLLMService(LLMService):
@traced_llm
async def _process_context(self, context: OpenAILLMContext | LLMContext):
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
await self.push_frame(LLMFullResponseStartFrame())
prompt_tokens = 0
completion_tokens = 0
@@ -920,9 +920,7 @@ class GoogleLLMService(LLMService):
for part in candidate.content.parts:
if not part.thought and part.text:
search_result += part.text
await self.push_frame(
LLMTextFrame(part.text, skip_tts=self._get_skip_tts())
)
await self.push_frame(LLMTextFrame(part.text))
elif part.function_call:
function_call = part.function_call
id = function_call.id or str(uuid.uuid4())
@@ -985,7 +983,7 @@ class GoogleLLMService(LLMService):
except DeadlineExceeded:
await self._call_event_handler("on_completion_timeout")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.exception(f"{self} exception: {e}")
finally:
if grounding_metadata and isinstance(grounding_metadata, dict):
llm_search_frame = LLMSearchResponseFrame(
@@ -1004,7 +1002,7 @@ class GoogleLLMService(LLMService):
reasoning_tokens=reasoning_tokens,
)
)
await self.push_frame(LLMFullResponseEndFrame(skip_tts=self._get_skip_tts()))
await self.push_frame(LLMFullResponseEndFrame())
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and handle different frame types.

View File

@@ -136,9 +136,7 @@ class GoogleLLMOpenAIBetaService(OpenAILLMService):
# Keep iterating through the response to collect all the argument fragments
arguments += tool_call.function.arguments
elif chunk.choices[0].delta.content:
await self.push_frame(
LLMTextFrame(chunk.choices[0].delta.content, skip_tts=self._get_skip_tts())
)
await self.push_frame(LLMTextFrame(chunk.choices[0].delta.content))
# if we got a function name and arguments, check to see if it's a function with
# a registered handler. If so, run the registered callback, save the result to

View File

@@ -774,7 +774,8 @@ class GoogleSTTService(STTService):
yield cloud_speech.StreamingRecognizeRequest(audio=audio_data)
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
raise
async def _stream_audio(self):
@@ -805,13 +806,15 @@ class GoogleSTTService(STTService):
break
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await asyncio.sleep(1) # Brief delay before reconnecting
self._stream_start_time = int(time.time() * 1000)
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""Process an audio chunk for STT transcription.
@@ -899,7 +902,8 @@ class GoogleSTTService(STTService):
)
raise
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
# Re-raise the exception to let it propagate (e.g. in the case of a
# timeout, propagate to _stream_audio to reconnect)
raise

View File

@@ -737,6 +737,7 @@ class GoogleHttpTTSService(TTSService):
yield TTSStoppedFrame()
except Exception as e:
logger.error(f"{self} exception: {e}")
error_message = f"TTS generation error: {str(e)}"
yield ErrorFrame(error=error_message)
@@ -995,7 +996,9 @@ class GoogleTTSService(GoogleBaseTTSService):
yield frame
except Exception as e:
await self.push_error(error_msg=f"TTS generation error: {str(e)}", exception=e)
logger.error(f"{self} exception: {e}")
error_message = f"TTS generation error: {str(e)}"
yield ErrorFrame(error=error_message)
class GeminiTTSService(GoogleBaseTTSService):
@@ -1245,5 +1248,6 @@ class GeminiTTSService(GoogleBaseTTSService):
yield frame
except Exception as e:
logger.error(f"{self} exception: {e}")
error_message = f"Gemini TTS generation error: {str(e)}"
yield ErrorFrame(error=error_message)

View File

@@ -123,8 +123,6 @@ class GrokLLMService(OpenAILLMService):
self._prompt_tokens = 0
self._completion_tokens = 0
self._total_tokens = 0
self._cache_read_input_tokens = None
self._reasoning_tokens = None
self._has_reported_prompt_tokens = False
self._is_processing = True
@@ -139,8 +137,6 @@ class GrokLLMService(OpenAILLMService):
prompt_tokens=self._prompt_tokens,
completion_tokens=self._completion_tokens,
total_tokens=self._total_tokens,
cache_read_input_tokens=self._cache_read_input_tokens,
reasoning_tokens=self._reasoning_tokens,
)
await super().start_llm_usage_metrics(tokens)
@@ -153,7 +149,7 @@ class GrokLLMService(OpenAILLMService):
Args:
tokens: The token usage metrics for the current chunk of processing,
containing prompt_tokens, completion_tokens, and optional cached/reasoning tokens.
containing prompt_tokens and completion_tokens counts.
"""
# Only accumulate metrics during active processing
if not self._is_processing:
@@ -168,13 +164,6 @@ class GrokLLMService(OpenAILLMService):
if tokens.completion_tokens > self._completion_tokens:
self._completion_tokens = tokens.completion_tokens
# Capture cached & reasoning tokens (these typically only appear once per request)
if tokens.cache_read_input_tokens is not None:
self._cache_read_input_tokens = tokens.cache_read_input_tokens
if tokens.reasoning_tokens is not None:
self._reasoning_tokens = tokens.reasoning_tokens
def create_context_aggregator(
self,
context: OpenAILLMContext,

View File

@@ -146,6 +146,7 @@ class GroqTTSService(TTSService):
bytes = w.readframes(num_frames)
yield TTSAudioRawFrame(bytes, frame_rate, channels)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()

View File

@@ -179,7 +179,7 @@ class HeyGenClient:
await self._task_manager.cancel_task(self._event_task)
self._event_task = None
except Exception as e:
logger.error(f"Exception during cleanup: {e}")
logger.exception(f"Exception during cleanup: {e}")
async def start(self, frame: StartFrame, audio_chunk_size: int) -> None:
"""Start the client and establish all necessary connections.

View File

@@ -287,7 +287,8 @@ class HumeTTSService(WordTTSService):
self._cumulative_time = utterance_duration
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
# Ensure TTFB timer is stopped even on early failures
await self.stop_ttfb_metrics()

View File

@@ -397,7 +397,8 @@ class InworldTTSService(TTSService):
# STEP 7: ERROR HANDLING
# ================================================================================
# Log any unexpected errors and notify the pipeline
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
# ================================================================================
# STEP 8: CLEANUP AND COMPLETION
@@ -512,7 +513,7 @@ class InworldTTSService(TTSService):
# Extract the base64-encoded audio content from response
if "audioContent" not in response_data:
logger.error("No audioContent in Inworld API response")
yield ErrorFrame(error="No audioContent in response")
await self.push_error(ErrorFrame(error="No audioContent in response"))
return
# ================================================================================

View File

@@ -9,7 +9,17 @@
import asyncio
import inspect
from dataclasses import dataclass
from typing import Any, Awaitable, Callable, Dict, Mapping, Optional, Protocol, Sequence, Type
from typing import (
Any,
Awaitable,
Callable,
Dict,
Mapping,
Optional,
Protocol,
Sequence,
Type,
)
from loguru import logger
@@ -275,13 +285,17 @@ class LLMService(AIService):
elif isinstance(frame, LLMConfigureOutputFrame):
self._skip_tts = frame.skip_tts
def _get_skip_tts(self) -> bool:
"""Get the current skip_tts configuration.
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
"""Pushes a frame.
Returns:
The current skip_tts setting for frames generated by this LLM.
Args:
frame: The frame to push.
direction: The direction of frame pushing.
"""
return self._skip_tts
if isinstance(frame, (LLMTextFrame, LLMFullResponseStartFrame, LLMFullResponseEndFrame)):
frame.skip_tts = self._skip_tts
await super().push_frame(frame, direction)
async def _handle_interruptions(self, _: InterruptionFrame):
for function_name, entry in self._functions.items():

View File

@@ -214,7 +214,8 @@ class LmntTTSService(InterruptibleTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -230,7 +231,8 @@ class LmntTTSService(InterruptibleTTSService):
# await self._websocket.send(json.dumps({"eof": True}))
await self._websocket.close()
except Exception as e:
await self.push_error(error_msg=f"Error disconnecting from LMNT: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._started = False
self._websocket = None
@@ -264,9 +266,10 @@ class LmntTTSService(InterruptibleTTSService):
try:
msg = json.loads(message)
if "error" in msg:
logger.error(f"{self} error: {msg['error']}")
await self.push_frame(TTSStoppedFrame())
await self.stop_all_metrics()
await self.push_error(error_msg=f"Error: {msg['error']}")
await self.push_error(ErrorFrame(error=f"{self} error: {msg['error']}"))
return
except json.JSONDecodeError:
logger.error(f"Invalid JSON message: {message}")
@@ -299,11 +302,13 @@ class LmntTTSService(InterruptibleTTSService):
await self._get_websocket().send(json.dumps({"flush": True}))
await self.start_tts_usage_metrics(text)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
return
yield None
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")

View File

@@ -176,6 +176,7 @@ class MCPClient(BaseObject):
except Exception as e:
error_msg = f"Error calling mcp tool {params.function_name}: {str(e)}"
logger.error(error_msg)
logger.exception("Full exception details:")
await params.result_callback(error_msg)
async def _stdio_list_tools(self) -> ToolsSchema:
@@ -206,6 +207,7 @@ class MCPClient(BaseObject):
except Exception as e:
error_msg = f"Error calling mcp tool {params.function_name}: {str(e)}"
logger.error(error_msg)
logger.exception("Full exception details:")
await params.result_callback(error_msg)
async def _streamable_http_list_tools(self) -> ToolsSchema:
@@ -244,6 +246,7 @@ class MCPClient(BaseObject):
except Exception as e:
error_msg = f"Error calling mcp tool {params.function_name}: {str(e)}"
logger.error(error_msg)
logger.exception("Full exception details:")
await params.result_callback(error_msg)
async def _call_tool(self, session, function_name, arguments, result_callback):
@@ -299,6 +302,7 @@ class MCPClient(BaseObject):
except Exception as e:
logger.error(f"Failed to read tool '{tool_name}': {str(e)}")
logger.exception("Full exception details:")
continue
logger.debug(f"Completed reading {len(tool_schemas)} tools")

View File

@@ -253,9 +253,8 @@ class Mem0MemoryService(FrameProcessor):
# Otherwise, pass the enhanced context frame downstream
await self.push_frame(frame)
except Exception as e:
await self.push_error(
error_msg=f"Error processing with Mem0: {str(e)}", exception=e
)
logger.error(f"Error processing with Mem0: {str(e)}")
await self.push_frame(ErrorFrame(f"Error processing with Mem0: {str(e)}"))
await self.push_frame(frame) # Still pass the original frame through
else:
# For non-context frames, just pass them through

View File

@@ -314,6 +314,7 @@ class MiniMaxHttpTTSService(TTSService):
) as response:
if response.status != 200:
error_message = f"MiniMax TTS error: HTTP {response.status}"
logger.error(error_message)
yield ErrorFrame(error=error_message)
return
@@ -391,7 +392,8 @@ class MiniMaxHttpTTSService(TTSService):
continue
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -110,6 +110,7 @@ class MoondreamService(VisionService):
if analysis fails.
"""
if not self._model:
logger.error(f"{self} error: Moondream model not available ({self.model_name})")
yield ErrorFrame("Moondream model not available")
return

View File

@@ -285,7 +285,8 @@ class NeuphonicTTSService(InterruptibleTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -298,7 +299,8 @@ class NeuphonicTTSService(InterruptibleTTSService):
logger.debug("Disconnecting from Neuphonic")
await self._websocket.close()
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._started = False
self._websocket = None
@@ -363,14 +365,16 @@ class NeuphonicTTSService(InterruptibleTTSService):
await self._send_text(text)
await self.start_tts_usage_metrics(text)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
return
yield None
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
class NeuphonicHttpTTSService(TTSService):
@@ -534,6 +538,7 @@ class NeuphonicHttpTTSService(TTSService):
if response.status != 200:
error_text = await response.text()
error_message = f"Neuphonic API error: HTTP {response.status} - {error_text}"
logger.error(error_message)
yield ErrorFrame(error=error_message)
return
@@ -563,7 +568,8 @@ class NeuphonicHttpTTSService(TTSService):
yield TTSAudioRawFrame(audio_bytes, self.sample_rate, 1)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
# Don't yield error frame for individual message failures
continue
@@ -571,7 +577,8 @@ class NeuphonicHttpTTSService(TTSService):
logger.debug("TTS generation cancelled")
raise
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -13,7 +13,13 @@ from typing import Any, Dict, List, Mapping, Optional
import httpx
from loguru import logger
from openai import NOT_GIVEN, APITimeoutError, AsyncOpenAI, AsyncStream, DefaultAsyncHttpxClient
from openai import (
NOT_GIVEN,
APITimeoutError,
AsyncOpenAI,
AsyncStream,
DefaultAsyncHttpxClient,
)
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
from pydantic import BaseModel, Field
@@ -340,17 +346,11 @@ class BaseOpenAILLMService(LLMService):
if chunk.usage.prompt_tokens_details
else None
)
reasoning_tokens = (
chunk.usage.completion_tokens_details.reasoning_tokens
if chunk.usage.completion_tokens_details
else None
)
tokens = LLMTokenUsage(
prompt_tokens=chunk.usage.prompt_tokens,
completion_tokens=chunk.usage.completion_tokens,
total_tokens=chunk.usage.total_tokens,
cache_read_input_tokens=cached_tokens,
reasoning_tokens=reasoning_tokens,
)
await self.start_llm_usage_metrics(tokens)
@@ -390,20 +390,14 @@ class BaseOpenAILLMService(LLMService):
# Keep iterating through the response to collect all the argument fragments
arguments += tool_call.function.arguments
elif chunk.choices[0].delta.content:
await self.push_frame(
LLMTextFrame(chunk.choices[0].delta.content, skip_tts=self._get_skip_tts())
)
await self.push_frame(LLMTextFrame(chunk.choices[0].delta.content))
# When gpt-4o-audio / gpt-4o-mini-audio is used for llm or stt+llm
# we need to get LLMTextFrame for the transcript
elif hasattr(chunk.choices[0].delta, "audio") and chunk.choices[0].delta.audio.get(
"transcript"
):
await self.push_frame(
LLMTextFrame(
chunk.choices[0].delta.audio["transcript"], skip_tts=self._get_skip_tts()
)
)
await self.push_frame(LLMTextFrame(chunk.choices[0].delta.audio["transcript"]))
# if we got a function name and arguments, check to see if it's a function with
# a registered handler. If so, run the registered callback, save the result to
@@ -463,11 +457,11 @@ class BaseOpenAILLMService(LLMService):
if context:
try:
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
await self.push_frame(LLMFullResponseStartFrame())
await self.start_processing_metrics()
await self._process_context(context)
except httpx.TimeoutException:
await self._call_event_handler("on_completion_timeout")
finally:
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame(skip_tts=self._get_skip_tts()))
await self.push_frame(LLMFullResponseEndFrame())

View File

@@ -76,6 +76,7 @@ class OpenAIImageGenService(ImageGenService):
image_url = image.data[0].url
if not image_url:
logger.error(f"{self} No image provided in response: {image}")
yield ErrorFrame("Image generation failed")
return

View File

@@ -15,7 +15,9 @@ from typing import Optional
from loguru import logger
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.adapters.services.open_ai_realtime_adapter import OpenAIRealtimeLLMAdapter
from pipecat.adapters.services.open_ai_realtime_adapter import (
OpenAIRealtimeLLMAdapter,
)
from pipecat.frames.frames import (
AggregationType,
BotStoppedSpeakingFrame,
@@ -55,6 +57,7 @@ from pipecat.processors.aggregators.openai_llm_context import (
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
from pipecat.services.openai.llm import OpenAIContextAggregatorPair
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_openai_realtime, traced_stt
@@ -282,7 +285,7 @@ class OpenAIRealtimeLLMService(LLMService):
await self._truncate_current_audio_response()
await self.stop_all_metrics()
if self._current_assistant_response:
await self.push_frame(LLMFullResponseEndFrame(skip_tts=self._get_skip_tts()))
await self.push_frame(LLMFullResponseEndFrame())
# Only push TTSStoppedFrame if audio modality is enabled
if self._is_modality_enabled("audio"):
await self.push_frame(TTSStoppedFrame())
@@ -441,7 +444,7 @@ class OpenAIRealtimeLLMService(LLMService):
)
self._receive_task = self.create_task(self._receive_task_handler())
except Exception as e:
await self.push_error(error_msg=f"Error connecting: {e}", exception=e)
logger.error(f"{self} initialization error: {e}")
self._websocket = None
async def _disconnect(self):
@@ -458,7 +461,7 @@ class OpenAIRealtimeLLMService(LLMService):
self._completed_tool_calls = set()
self._disconnecting = False
except Exception as e:
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
logger.error(f"{self} error disconnecting: {e}")
async def _ws_send(self, realtime_message):
try:
@@ -471,11 +474,12 @@ class OpenAIRealtimeLLMService(LLMService):
# somehow *started* the websocket send attempt while we still
# had a connection)
return
logger.error(f"Error sending message to websocket: {e}")
# In server-to-server contexts, a WebSocket error should be quite rare. Given how hard
# it is to recover from a send-side error with proper state management, and that exponential
# backoff for retries can have cost/stability implications for a service cluster, let's just
# treat a send-side error as fatal.
await self.push_error(error_msg=f"Error sending client event: {e}", exception=e)
await self.push_error(ErrorFrame(error=f"Error sending client event: {e}"))
async def _update_settings(self):
settings = self._session_properties
@@ -606,7 +610,7 @@ class OpenAIRealtimeLLMService(LLMService):
if evt.item.role == "assistant":
self._current_assistant_response = evt.item
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
await self.push_frame(LLMFullResponseStartFrame())
async def _handle_evt_conversation_item_done(self, evt):
"""Handle conversation.item.done event - item is fully completed."""
@@ -653,25 +657,18 @@ class OpenAIRealtimeLLMService(LLMService):
async def _handle_evt_response_done(self, evt):
# todo: figure out whether there's anything we need to do for "cancelled" events
# usage metrics
cached_tokens = (
evt.response.usage.input_token_details.cached_tokens
if hasattr(evt.response.usage, "input_token_details")
and evt.response.usage.input_token_details
else None
)
tokens = LLMTokenUsage(
prompt_tokens=evt.response.usage.input_tokens,
completion_tokens=evt.response.usage.output_tokens,
total_tokens=evt.response.usage.total_tokens,
cache_read_input_tokens=cached_tokens,
)
await self.start_llm_usage_metrics(tokens)
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame(skip_tts=self._get_skip_tts()))
await self.push_frame(LLMFullResponseEndFrame())
self._current_assistant_response = None
# error handling
if evt.response.status == "failed":
await self.push_error(error_msg=evt.response.status_details["error"]["message"])
await self.push_error(ErrorFrame(error=evt.response.status_details["error"]["message"]))
return
# response content
for item in evt.response.output:
@@ -681,7 +678,7 @@ class OpenAIRealtimeLLMService(LLMService):
# We receive text deltas (as opposed to audio transcript deltas) when
# the output modality is "text"
if evt.delta:
frame = LLMTextFrame(evt.delta, skip_tts=self._get_skip_tts())
frame = LLMTextFrame(evt.delta)
await self.push_frame(frame)
async def _handle_evt_audio_transcript_delta(self, evt):
@@ -763,7 +760,7 @@ class OpenAIRealtimeLLMService(LLMService):
async def _handle_evt_error(self, evt):
# Errors are fatal to this connection. Send an ErrorFrame.
await self.push_error(error_msg=f"Error: {evt}")
await self.push_error(ErrorFrame(error=f"Error: {evt}"))
#
# state and client events for the current conversation
@@ -813,9 +810,9 @@ class OpenAIRealtimeLLMService(LLMService):
# We're done configuring the LLM for this session
self._llm_needs_conversation_setup = False
logger.debug("Creating response")
logger.debug(f"Creating response")
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
await self.push_frame(LLMFullResponseStartFrame())
await self.start_processing_metrics()
await self.start_ttfb_metrics()
await self.send_client_event(

View File

@@ -206,4 +206,5 @@ class OpenAITTSService(TTSService):
yield frame
yield TTSStoppedFrame()
except BadRequestError as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.exception(f"{self} error generating TTS: {e}")
yield ErrorFrame(error=f"{self} error: {e}")

View File

@@ -79,5 +79,5 @@ class AzureRealtimeBetaLLMService(OpenAIRealtimeBetaLLMService):
)
self._receive_task = self.create_task(self._receive_task_handler())
except Exception as e:
await self.push_error(error_msg=f"Error connecting: {e}", exception=e)
logger.error(f"{self} initialization error: {e}")
self._websocket = None

View File

@@ -265,7 +265,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
await self._truncate_current_audio_response()
await self.stop_all_metrics()
if self._current_assistant_response:
await self.push_frame(LLMFullResponseEndFrame(skip_tts=self._get_skip_tts()))
await self.push_frame(LLMFullResponseEndFrame())
# Only push TTSStoppedFrame if audio modality is enabled
if self._is_modality_enabled("audio"):
await self.push_frame(TTSStoppedFrame())
@@ -425,7 +425,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
)
self._receive_task = self.create_task(self._receive_task_handler())
except Exception as e:
await self.push_error(error_msg=f"Error connecting: {e}", exception=e)
logger.error(f"{self} initialization error: {e}")
self._websocket = None
async def _disconnect(self):
@@ -441,7 +441,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
self._receive_task = None
self._disconnecting = False
except Exception as e:
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
logger.error(f"{self} error disconnecting: {e}")
async def _ws_send(self, realtime_message):
try:
@@ -450,11 +450,12 @@ class OpenAIRealtimeBetaLLMService(LLMService):
except Exception as e:
if self._disconnecting:
return
logger.error(f"Error sending message to websocket: {e}")
# In server-to-server contexts, a WebSocket error should be quite rare. Given how hard
# it is to recover from a send-side error with proper state management, and that exponential
# backoff for retries can have cost/stability implications for a service cluster, let's just
# treat a send-side error as fatal.
await self.push_error(error_msg=f"Error sending client event: {e}", exception=e)
await self.push_error(ErrorFrame(error=f"Error sending client event: {e}"))
async def _update_settings(self):
settings = self._session_properties
@@ -564,7 +565,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
self._user_and_response_message_tuple = (evt.item, {"done": False, "output": []})
elif evt.item.role == "assistant":
self._current_assistant_response = evt.item
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
await self.push_frame(LLMFullResponseStartFrame())
async def _handle_evt_input_audio_transcription_delta(self, evt):
if self._send_transcription_frames:
@@ -623,7 +624,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
)
await self.start_llm_usage_metrics(tokens)
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame(skip_tts=self._get_skip_tts()))
await self.push_frame(LLMFullResponseEndFrame())
self._current_assistant_response = None
# error handling
if evt.response.status == "failed":
@@ -647,11 +648,11 @@ class OpenAIRealtimeBetaLLMService(LLMService):
async def _handle_evt_text_delta(self, evt):
if evt.delta:
await self.push_frame(LLMTextFrame(evt.delta, skip_tts=self._get_skip_tts()))
await self.push_frame(LLMTextFrame(evt.delta))
async def _handle_evt_audio_transcript_delta(self, evt):
if evt.delta:
await self.push_frame(LLMTextFrame(evt.delta, skip_tts=self._get_skip_tts()))
await self.push_frame(LLMTextFrame(evt.delta))
await self.push_frame(TTSTextFrame(evt.delta, aggregated_by=AggregationType.SENTENCE))
async def _handle_evt_speech_started(self, evt):
@@ -685,7 +686,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
async def _handle_evt_error(self, evt):
# Errors are fatal to this connection. Send an ErrorFrame.
await self.push_error(error_msg=f"Error: {evt}")
await self.push_error(ErrorFrame(error=f"Error: {evt}"))
async def _handle_assistant_output(self, output):
# We haven't seen intermixed audio and function_call items in the same response. But let's
@@ -747,7 +748,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
logger.debug(f"Creating response: {self._context.get_messages_for_logging()}")
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
await self.push_frame(LLMFullResponseStartFrame())
await self.start_processing_metrics()
await self.start_ttfb_metrics()
await self.send_client_event(

View File

@@ -88,6 +88,9 @@ class PiperTTSService(TTSService):
) as response:
if response.status != 200:
error = await response.text()
logger.error(
f"{self} error getting audio (status: {response.status}, error: {error})"
)
yield ErrorFrame(
error=f"Error getting audio (status: {response.status}, error: {error})"
)
@@ -106,7 +109,7 @@ class PiperTTSService(TTSService):
yield frame
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
finally:
logger.debug(f"{self}: Finished TTS [{text}]")
await self.stop_ttfb_metrics()

View File

@@ -266,7 +266,8 @@ class PlayHTTTSService(InterruptibleTTSService):
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
except Exception as e:
await self.push_error(error_msg=f"Error connecting: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -279,7 +280,8 @@ class PlayHTTTSService(InterruptibleTTSService):
logger.debug("Disconnecting from PlayHT")
await self._websocket.close()
except Exception as e:
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._request_id = None
self._websocket = None
@@ -349,7 +351,8 @@ class PlayHTTTSService(InterruptibleTTSService):
await self.push_frame(TTSStoppedFrame())
self._request_id = None
elif "error" in msg:
await self.push_error(error_msg=f"Error: {msg['error']}")
logger.error(f"{self} error: {msg}")
await self.push_error(ErrorFrame(error=f"{self} error: {msg['error']}"))
except json.JSONDecodeError:
logger.error(f"Invalid JSON message: {message}")
@@ -391,7 +394,8 @@ class PlayHTTTSService(InterruptibleTTSService):
await self._get_websocket().send(json.dumps(tts_command))
await self.start_tts_usage_metrics(text)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
@@ -401,7 +405,8 @@ class PlayHTTTSService(InterruptibleTTSService):
yield None
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
class PlayHTHttpTTSService(TTSService):
@@ -621,7 +626,8 @@ class PlayHTHttpTTSService(TTSService):
yield frame
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -300,7 +300,8 @@ class RimeTTSService(AudioContextWordTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
await self.push_error(error_msg=f"Error connecting: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -312,7 +313,8 @@ class RimeTTSService(AudioContextWordTTSService):
await self._websocket.send(json.dumps(self._build_eos_msg()))
await self._websocket.close()
except Exception as e:
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._context_id = None
self._websocket = None
@@ -405,9 +407,10 @@ class RimeTTSService(AudioContextWordTTSService):
logger.debug(f"Updated cumulative time to: {self._cumulative_time}")
elif msg["type"] == "error":
logger.error(f"{self} error: {msg}")
await self.push_frame(TTSStoppedFrame())
await self.stop_all_metrics()
await self.push_error(error_msg=f"Error: {msg['message']}")
await self.push_error(ErrorFrame(error=f"{self} error: {msg['message']}"))
self._context_id = None
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
@@ -449,14 +452,16 @@ class RimeTTSService(AudioContextWordTTSService):
await self._get_websocket().send(json.dumps(msg))
await self.start_tts_usage_metrics(text)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
return
yield None
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
class RimeHttpTTSService(TTSService):
@@ -587,6 +592,7 @@ class RimeHttpTTSService(TTSService):
) as response:
if response.status != 200:
error_message = f"Rime TTS error: HTTP {response.status}"
logger.error(error_message)
yield ErrorFrame(error=error_message)
return
@@ -604,7 +610,8 @@ class RimeHttpTTSService(TTSService):
yield frame
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -655,10 +655,12 @@ class RivaSegmentedSTTService(SegmentedSTTService):
logger.debug("No transcription results found in Riva response")
except AttributeError as ae:
logger.error(f"Unexpected response structure from Riva: {ae}")
yield ErrorFrame(f"Unexpected Riva response format: {str(ae)}")
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
class ParakeetSTTService(RivaSTTService):

View File

@@ -180,7 +180,8 @@ class RivaTTSService(TTSService):
yield frame
resp = await asyncio.wait_for(queue.get(), timeout=RIVA_TTS_TIMEOUT_SECS)
except asyncio.TimeoutError:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} timeout waiting for audio response")
yield ErrorFrame(error=f"{self} error: {e}")
await self.start_tts_usage_metrics(text)
yield TTSStoppedFrame()

View File

@@ -14,7 +14,9 @@ from openai import AsyncStream
from openai.types.chat import ChatCompletionChunk
from pipecat.adapters.services.open_ai_adapter import OpenAILLMInvocationParams
from pipecat.frames.frames import LLMTextFrame
from pipecat.frames.frames import (
LLMTextFrame,
)
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
@@ -174,20 +176,14 @@ class SambaNovaLLMService(OpenAILLMService): # type: ignore
# Keep iterating through the response to collect all the argument fragments
arguments += tool_call.function.arguments
elif chunk.choices[0].delta.content:
await self.push_frame(
LLMTextFrame(chunk.choices[0].delta.content, skip_tts=self._get_skip_tts())
)
await self.push_frame(LLMTextFrame(chunk.choices[0].delta.content))
# When gpt-4o-audio / gpt-4o-mini-audio is used for llm or stt+llm
# we need to get LLMTextFrame for the transcript
elif hasattr(chunk.choices[0].delta, "audio") and chunk.choices[0].delta.audio.get(
"transcript"
):
await self.push_frame(
LLMTextFrame(
chunk.choices[0].delta.audio["transcript"], skip_tts=self._get_skip_tts()
)
)
await self.push_frame(LLMTextFrame(chunk.choices[0].delta.audio["transcript"]))
# if we got a function name and arguments, check to see if it's a function with
# a registered handler. If so, run the registered callback, save the result to

View File

@@ -275,7 +275,8 @@ class SarvamSTTService(STTService):
await self._socket_client.translate(**method_kwargs)
except Exception as e:
yield ErrorFrame(error=f"Error sending audio to Sarvam: {e}", exception=e)
logger.error(f"Error sending audio to Sarvam: {e}")
await self.push_error(ErrorFrame(f"Failed to send audio: {e}"))
yield None
@@ -331,11 +332,13 @@ class SarvamSTTService(STTService):
logger.info("Connected to Sarvam successfully")
except ApiError as e:
await self.push_error(error_msg=f"Sarvam API error: {e}", exception=e)
logger.error(f"Sarvam API error: {e}")
await self.push_error(ErrorFrame(f"Sarvam API error: {e}"))
except Exception as e:
logger.error(f"Failed to connect to Sarvam: {e}")
self._socket_client = None
self._websocket_context = None
await self.push_error(error_msg=f"Failed to connect to Sarvam: {e}", exception=e)
await self.push_error(ErrorFrame(f"Failed to connect to Sarvam: {e}"))
async def _disconnect(self):
"""Disconnect from Sarvam WebSocket API using SDK."""
@@ -348,9 +351,7 @@ class SarvamSTTService(STTService):
# Exit the async context manager
await self._websocket_context.__aexit__(None, None, None)
except Exception as e:
await self.push_error(
error_msg=f"Error closing WebSocket connection: {e}", exception=e
)
logger.error(f"Error closing WebSocket connection: {e}")
finally:
logger.debug("Disconnected from Sarvam WebSocket")
self._socket_client = None
@@ -370,7 +371,8 @@ class SarvamSTTService(STTService):
# Messages will be handled via the _message_handler callback
await self._socket_client.start_listening()
except Exception as e:
await self.push_error(error_msg=f"Sarvam receive task error: {e}", exception=e)
logger.error(f"Error in Sarvam receive task: {e}")
await self.push_error(ErrorFrame(f"Sarvam receive task error: {e}"))
async def _handle_message(self, message):
"""Handle incoming WebSocket message from Sarvam SDK.
@@ -425,7 +427,8 @@ class SarvamSTTService(STTService):
await self.stop_processing_metrics()
except Exception as e:
await self.push_error(error_msg=f"Failed to handle message: {e}", exception=e)
logger.error(f"Error handling Sarvam message: {e}")
await self.push_error(ErrorFrame(f"Failed to handle message: {e}"))
await self.stop_all_metrics()
@traced_stt

View File

@@ -254,7 +254,8 @@ class SarvamHttpTTSService(TTSService):
async with self._session.post(url, json=payload, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
yield ErrorFrame(error=f"Sarvam API error: {error_text}")
logger.error(f"Sarvam API error: {error_text}")
await self.push_error(ErrorFrame(error=f"Sarvam API error: {error_text}"))
return
response_data = await response.json()
@@ -263,7 +264,8 @@ class SarvamHttpTTSService(TTSService):
# Decode base64 audio data
if "audios" not in response_data or not response_data["audios"]:
yield ErrorFrame(error="No audio data received")
logger.error("No audio data received from Sarvam API")
await self.push_error(ErrorFrame(error="No audio data received"))
return
# Get the first audio (there should be only one for single text input)
@@ -284,7 +286,8 @@ class SarvamHttpTTSService(TTSService):
yield frame
except Exception as e:
yield ErrorFrame(error=f"Error generating TTS: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()
@@ -557,7 +560,8 @@ class SarvamTTSService(InterruptibleTTSService):
await self._disconnect_websocket()
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
# Reset state only after everything is cleaned up
self._started = False
@@ -581,9 +585,8 @@ class SarvamTTSService(InterruptibleTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
await self.push_error(
error_msg=f"Error connecting to Sarvam TTS Websocket: {e}", exception=e
)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -599,7 +602,8 @@ class SarvamTTSService(InterruptibleTTSService):
await self._websocket.send(json.dumps(config_message))
logger.debug("Configuration sent successfully")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
raise
async def _disconnect_websocket(self):
@@ -611,7 +615,8 @@ class SarvamTTSService(InterruptibleTTSService):
logger.debug("Disconnecting from Sarvam")
await self._websocket.close()
except Exception as e:
await self.push_error(error_msg=f"Error closing websocket: {e}", exception=e)
logger.error(f"{self} error closing websocket: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._started = False
self._websocket = None
@@ -635,7 +640,7 @@ class SarvamTTSService(InterruptibleTTSService):
await self.push_frame(frame)
elif msg.get("type") == "error":
error_msg = msg["data"]["message"]
await self.push_error(error_msg=f"TTS Error: {error_msg}")
logger.error(f"TTS Error: {error_msg}")
# If it's a timeout error, the connection might need to be reset
if "too long" in error_msg.lower() or "timeout" in error_msg.lower():
@@ -697,11 +702,13 @@ class SarvamTTSService(InterruptibleTTSService):
await self._send_text(text)
await self.start_tts_usage_metrics(text)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
return
yield None
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")

View File

@@ -48,14 +48,12 @@ class SimliVideoService(FrameProcessor):
"""Input parameters for Simli video configuration.
Parameters:
enable_logging: Whether to enable Simli logging.
max_session_length: Absolute maximum session duration in seconds.
Avatar will disconnect after this time even if it's speaking.
max_idle_time: Maximum duration in seconds the avatar is not speaking
before the avatar disconnects.
"""
enable_logging: Optional[bool] = None
max_session_length: Optional[int] = None
max_idle_time: Optional[int] = None
@@ -156,7 +154,6 @@ class SimliVideoService(FrameProcessor):
config=config,
latencyInterval=latency_interval,
simliURL=simli_url,
enable_logging=params.enable_logging or False,
)
self._pipecat_resampler: AudioResampler = None
@@ -181,7 +178,7 @@ class SimliVideoService(FrameProcessor):
self._audio_task = self.create_task(self._consume_and_process_audio())
self._video_task = self.create_task(self._consume_and_process_video())
except Exception as e:
await self.push_error(error_msg=f"Unable to start connection: {e}", exception=e)
logger.error(f"{self}: unable to start connection: {e}")
async def _consume_and_process_audio(self):
"""Consume audio frames from Simli and push them downstream."""
@@ -259,7 +256,7 @@ class SimliVideoService(FrameProcessor):
await self._simli_client.send(audioBytes)
return
except Exception as e:
await self.push_error(error_msg=f"Error sending audio: {e}", exception=e)
logger.exception(f"{self} exception: {e}")
elif isinstance(frame, TTSStoppedFrame):
try:
if self._previously_interrupted and len(self._audio_buffer) > 0:
@@ -267,7 +264,7 @@ class SimliVideoService(FrameProcessor):
self._previously_interrupted = False
self._audio_buffer = bytearray()
except Exception as e:
await self.push_error(error_msg=f"Error stopping TTS: {e}", exception=e)
logger.exception(f"{self} exception: {e}")
return
elif isinstance(frame, (EndFrame, CancelFrame)):
await self._stop()

View File

@@ -194,7 +194,7 @@ class SonioxSTTService(STTService):
self._websocket = await websocket_connect(self._url)
if not self._websocket:
await self.push_error(error_msg=f"Unable to connect to Soniox API at {self._url}")
logger.error(f"Unable to connect to Soniox API at {self._url}")
# If vad_force_turn_endpoint is not enabled, we need to enable endpoint detection.
# Either one or the other is required.
@@ -327,7 +327,8 @@ class SonioxSTTService(STTService):
# Expected when closing the connection
logger.debug("WebSocket connection closed, keepalive task stopped.")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
async def _receive_task_handler(self):
if not self._websocket:
@@ -403,8 +404,13 @@ class SonioxSTTService(STTService):
if error_code or error_message:
# In case of error, still send the final transcript (if any remaining in the buffer).
await send_endpoint_transcript()
logger.error(
f"{self} error: {error_code} (_receive_task_handler) - {error_message}"
)
await self.push_error(
error_msg=f"Error: {error_code} (_receive_task_handler) - {error_message}"
ErrorFrame(
error=f"{self} error: {error_code} (_receive_task_handler) - {error_message}"
)
)
finished = content.get("finished")
@@ -419,4 +425,5 @@ class SonioxSTTService(STTService):
# Expected when closing the connection.
pass
except Exception as e:
await self.push_error(error_msg=f"Error receiving message: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))

View File

@@ -467,7 +467,8 @@ class SpeechmaticsSTTService(STTService):
await self._client.send_audio(audio)
yield None
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
await self._disconnect()
def update_params(
@@ -513,7 +514,8 @@ class SpeechmaticsSTTService(STTService):
self._client.send_message(payload), self.get_event_loop()
)
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
raise RuntimeError(f"error sending message to STT: {e}")
async def _connect(self) -> None:
@@ -579,7 +581,8 @@ class SpeechmaticsSTTService(STTService):
logger.debug(f"{self} Connected to Speechmatics STT service")
await self._call_event_handler("on_connected")
except Exception as e:
await self.push_error(error_msg=f"Error connecting to Speechmatics: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._client = None
async def _disconnect(self) -> None:
@@ -593,9 +596,8 @@ class SpeechmaticsSTTService(STTService):
except asyncio.TimeoutError:
logger.warning(f"{self} Timeout while closing Speechmatics client connection")
except Exception as e:
await self.push_error(
error_msg=f"Error disconnecting from Speechmatics: {e}", exception=e
)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._client = None
await self._call_event_handler("on_disconnected")

View File

@@ -163,7 +163,7 @@ class SpeechmaticsTTSService(TTSService):
# Report error frame
yield ErrorFrame(
error=f"Service unavailable [503] (attempt {attempt}, retry in {backoff_time:.2f}s)"
error=f"{self} Service unavailable [503] (attempt {attempt}, retry in {backoff_time:.2f}s)"
)
# Wait before retrying
@@ -174,13 +174,16 @@ class SpeechmaticsTTSService(TTSService):
except (ValueError, ArithmeticError):
yield ErrorFrame(
error=f"Service unavailable [503] (attempts {attempt})",
error=f"{self} Service unavailable [503] (attempts {attempt})",
fatal=True,
)
return
# != 200 : Error
if response.status != 200:
yield ErrorFrame(error=f"Service unavailable [{response.status}]")
yield ErrorFrame(
error=f"{self} Service unavailable [{response.status}]", fatal=True
)
return
# Update Pipecat metrics
@@ -222,7 +225,7 @@ class SpeechmaticsTTSService(TTSService):
break
except Exception as e:
yield ErrorFrame(error=f"Error generating TTS: {e}")
yield ErrorFrame(error=f"{self}: Error generating TTS: {e}", fatal=True)
finally:
# Emit the TTS stopped frame
yield TTSStoppedFrame()

View File

@@ -329,4 +329,4 @@ class WebsocketSTTService(STTService, WebsocketService):
async def _report_error(self, error: ErrorFrame):
await self._call_event_handler("on_connection_error", error.error)
await self.push_error_frame(error)
await self.push_error(error)

View File

@@ -781,7 +781,7 @@ class WebsocketTTSService(TTSService, WebsocketService):
async def _report_error(self, error: ErrorFrame):
await self._call_event_handler("on_connection_error", error.error)
await self.push_error_frame(error)
await self.push_error(error)
class InterruptibleTTSService(WebsocketTTSService):
@@ -843,7 +843,7 @@ class WebsocketWordTTSService(WordTTSService, WebsocketService):
async def _report_error(self, error: ErrorFrame):
await self._call_event_handler("on_connection_error", error.error)
await self.push_error_frame(error)
await self.push_error(error)
class InterruptibleWordTTSService(WebsocketWordTTSService):

View File

@@ -246,7 +246,8 @@ class UltravoxSTTService(AIService):
logger.info("Model warm-up completed successfully")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
def _generate_silent_audio(self, sample_rate=16000, duration_sec=1.0):
"""Generate silent audio as a numpy array.
@@ -376,7 +377,7 @@ class UltravoxSTTService(AIService):
if arr.size > 0: # Check if array is not empty
audio_arrays.append(arr)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
# Handle numpy array data
elif isinstance(f.audio, np.ndarray):
if f.audio.size > 0: # Check if array is not empty
@@ -436,11 +437,17 @@ class UltravoxSTTService(AIService):
yield LLMFullResponseEndFrame()
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
else:
logger.error("No model available for text generation")
yield ErrorFrame("No model available for text generation")
except Exception as e:
logger.error(f"{self} exception: {e}")
import traceback
logger.error(traceback.format_exc())
yield ErrorFrame(f"Error processing audio: {str(e)}")
finally:
self._buffer.is_processing = False

View File

@@ -226,7 +226,8 @@ class BaseWhisperSTTService(SegmentedSTTService):
logger.warning("Received empty transcription from API")
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
async def _transcribe(self, audio: bytes) -> Transcription:
"""Transcribe audio data to text.

View File

@@ -285,6 +285,7 @@ class WhisperSTTService(SegmentedSTTService):
The service will normalize it to float32 in the range [-1, 1].
"""
if not self._model:
logger.error(f"{self} error: Whisper model not available")
yield ErrorFrame("Whisper model not available")
return
@@ -427,4 +428,5 @@ class WhisperSTTServiceMLX(WhisperSTTService):
)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")

View File

@@ -141,8 +141,13 @@ class XTTSService(TTSService):
async with self._aiohttp_session.get(self._settings["base_url"] + "/studio_speakers") as r:
if r.status != 200:
text = await r.text()
logger.error(
f"{self} error getting studio speakers (status: {r.status}, error: {text})"
)
await self.push_error(
error_msg=f"Error getting studio speakers (status: {r.status}, error: {text})"
ErrorFrame(
error=f"Error getting studio speakers (status: {r.status}, error: {text})"
)
)
return
self._studio_speakers = await r.json()
@@ -181,6 +186,7 @@ class XTTSService(TTSService):
async with self._aiohttp_session.post(url, json=payload) as r:
if r.status != 200:
text = await r.text()
logger.error(f"{self} error getting audio (status: {r.status}, error: {text})")
yield ErrorFrame(error=f"Error getting audio (status: {r.status}, error: {text})")
return

View File

@@ -2506,10 +2506,13 @@ class DailyTransport(BaseTransport):
async def _on_error(self, error):
"""Handle error events and push error frames."""
await self._call_event_handler("on_error", error)
# Push error frame to notify the pipeline
error_frame = ErrorFrame(error)
if self._input:
await self._input.push_error(error_msg=error)
await self._input.push_error(error_frame)
elif self._output:
await self._output.push_error(error_msg=error)
await self._output.push_error(error_frame)
else:
logger.error("Both input and output are None while trying to push error")
raise Exception("No valid input or output channel to push error")
@@ -2565,7 +2568,7 @@ class DailyTransport(BaseTransport):
except asyncio.TimeoutError:
logger.error(f"Timeout handling dialin-ready event ({url})")
except Exception as e:
logger.error(f"Error handling dialin-ready event ({url}): {e}")
logger.exception(f"Error handling dialin-ready event ({url}): {e}")
async def _on_dialin_connected(self, data):
"""Handle dial-in connected events."""

View File

@@ -316,7 +316,7 @@ class SmallWebRTCConnection(BaseObject):
logger.debug("Client not connected. Queuing app-message.")
self._pending_app_messages.append(json_message)
except Exception as e:
logger.error(f"Error parsing JSON message {message}, {e}")
logger.exception(f"Error parsing JSON message {message}, {e}")
# Despite the fact that aiortc provides this listener, they don't have a status for "disconnected"
# So, in case we loose connection, this event will not be triggered

View File

@@ -265,7 +265,7 @@ class TavusTransportClient:
try:
await self._client.cleanup()
except Exception as e:
logger.error(f"Exception during cleanup: {e}")
logger.exception(f"Exception during cleanup: {e}")
async def _on_joined(self, data):
"""Handle joined event."""

View File

@@ -162,7 +162,7 @@ class TaskManager(BaseTaskManager):
# Re-raise the exception to ensure the task is cancelled.
raise
except Exception as e:
logger.error(f"{name}: unexpected exception: {e}")
logger.exception(f"{name}: unexpected exception: {e}")
if not self._params:
raise Exception("TaskManager is not setup: unable to get event loop")
@@ -197,7 +197,7 @@ class TaskManager(BaseTaskManager):
# Here are sure the task is cancelled properly.
pass
except Exception as e:
logger.error(f"{name}: unexpected exception while cancelling task: {e}")
logger.exception(f"{name}: unexpected exception while cancelling task: {e}")
except BaseException as e:
logger.critical(f"{name}: fatal base exception while cancelling task: {e}")
raise

View File

@@ -187,7 +187,7 @@ class BaseObject(ABC):
else:
handler(self, *args, **kwargs)
except Exception as e:
logger.error(f"Exception in event handler {event_name}: {e}")
logger.exception(f"Exception in event handler {event_name}: {e}")
def _event_task_finished(self, task: asyncio.Task):
"""Clean up completed event handler tasks.

View File

@@ -92,24 +92,6 @@ def _add_token_usage_to_span(span, token_usage):
span.set_attribute("gen_ai.usage.input_tokens", token_usage["prompt_tokens"])
if "completion_tokens" in token_usage:
span.set_attribute("gen_ai.usage.output_tokens", token_usage["completion_tokens"])
# Add cached token metrics for dictionary
if (
"cache_read_input_tokens" in token_usage
and token_usage["cache_read_input_tokens"] is not None
):
span.set_attribute(
"gen_ai.usage.cache_read_input_tokens", token_usage["cache_read_input_tokens"]
)
if (
"cache_creation_input_tokens" in token_usage
and token_usage["cache_creation_input_tokens"] is not None
):
span.set_attribute(
"gen_ai.usage.cache_creation_input_tokens",
token_usage["cache_creation_input_tokens"],
)
if "reasoning_tokens" in token_usage and token_usage["reasoning_tokens"] is not None:
span.set_attribute("gen_ai.usage.reasoning_tokens", token_usage["reasoning_tokens"])
else:
# Handle LLMTokenUsage object
span.set_attribute("gen_ai.usage.input_tokens", getattr(token_usage, "prompt_tokens", 0))
@@ -117,19 +99,6 @@ def _add_token_usage_to_span(span, token_usage):
"gen_ai.usage.output_tokens", getattr(token_usage, "completion_tokens", 0)
)
# Add cached token metrics for LLMTokenUsage object
cache_read_tokens = getattr(token_usage, "cache_read_input_tokens", None)
if cache_read_tokens is not None:
span.set_attribute("gen_ai.usage.cache_read_input_tokens", cache_read_tokens)
cache_creation_tokens = getattr(token_usage, "cache_creation_input_tokens", None)
if cache_creation_tokens is not None:
span.set_attribute("gen_ai.usage.cache_creation_input_tokens", cache_creation_tokens)
reasoning_tokens = getattr(token_usage, "reasoning_tokens", None)
if reasoning_tokens is not None:
span.set_attribute("gen_ai.usage.reasoning_tokens", reasoning_tokens)
def traced_tts(func: Optional[Callable] = None, *, name: Optional[str] = None) -> Callable:
"""Trace TTS service methods with TTS-specific attributes.
@@ -746,7 +715,7 @@ def traced_gemini_live(operation: str) -> Callable:
else:
operation_attrs["tool.result_status"] = "completed"
except json.JSONDecodeError:
except json.JSONDecodeError as e:
operation_attrs["tool.result"] = (
f"Invalid JSON: {str(result_content)[:500]}"
)

View File

@@ -0,0 +1,189 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import unittest
from pipecat.frames.frames import (
AggregationType,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
InterruptionFrame,
TranscriptionFrame,
TranscriptionUpdateFrame,
TTSTextFrame,
UserStartedSpeakingFrame,
)
from pipecat.processors.transcript_processor import TurnAwareTranscriptProcessor
from pipecat.tests.utils import SleepFrame, run_test
class TestTurnAwareTranscriptProcessor(unittest.IsolatedAsyncioTestCase):
"""Tests for TurnAwareTranscriptProcessor."""
async def test_basic_turn_flow(self):
"""Test basic turn start/end with user and assistant speech."""
processor = TurnAwareTranscriptProcessor()
# Track events
turn_started_calls = []
turn_ended_calls = []
@processor.event_handler("on_turn_started")
async def on_turn_started(proc, turn_number):
turn_started_calls.append(turn_number)
@processor.event_handler("on_turn_ended")
async def on_turn_ended(proc, turn_number, user_text, assistant_text, interrupted):
turn_ended_calls.append(
{
"turn_number": turn_number,
"user_text": user_text,
"assistant_text": assistant_text,
"interrupted": interrupted,
}
)
frames_to_send = [
# Turn 1: User speaks, bot responds
UserStartedSpeakingFrame(),
TranscriptionFrame(text="Hello", user_id="user1", timestamp=""),
SleepFrame(sleep=0.01), # Allow transcription to process
BotStartedSpeakingFrame(),
TTSTextFrame(text="Hi", aggregated_by=AggregationType.WORD),
TTSTextFrame(text=" there", aggregated_by=AggregationType.WORD),
BotStoppedSpeakingFrame(),
SleepFrame(sleep=0.1),
]
await run_test(processor, frames_to_send=frames_to_send)
# Verify events
self.assertEqual(
len(turn_started_calls), 1, f"Expected 1 turn started, got {len(turn_started_calls)}"
)
self.assertEqual(turn_started_calls[0], 1)
self.assertEqual(
len(turn_ended_calls), 1, f"Expected 1 turn ended, got {len(turn_ended_calls)}"
)
self.assertEqual(turn_ended_calls[0]["turn_number"], 1)
self.assertEqual(turn_ended_calls[0]["user_text"], "Hello")
self.assertEqual(turn_ended_calls[0]["assistant_text"], "Hi there")
self.assertFalse(turn_ended_calls[0]["interrupted"])
async def test_interruption(self):
"""Test turn ending on interruption."""
processor = TurnAwareTranscriptProcessor()
# Track events
turn_ended_calls = []
@processor.event_handler("on_turn_ended")
async def on_turn_ended(proc, turn_number, user_text, assistant_text, interrupted):
turn_ended_calls.append(
{
"turn_number": turn_number,
"user_text": user_text,
"assistant_text": assistant_text,
"interrupted": interrupted,
}
)
frames_to_send = [
# User speaks
UserStartedSpeakingFrame(),
TranscriptionFrame(text="Tell me", user_id="user1", timestamp=""),
SleepFrame(sleep=0.01), # Allow transcription to process
# Bot starts responding
BotStartedSpeakingFrame(),
TTSTextFrame(text="Sure", aggregated_by=AggregationType.WORD),
TTSTextFrame(text=" I", aggregated_by=AggregationType.WORD),
TTSTextFrame(text=" can", aggregated_by=AggregationType.WORD),
# User interrupts
InterruptionFrame(),
# New turn starts
UserStartedSpeakingFrame(),
TranscriptionFrame(text="Wait", user_id="user1", timestamp=""),
SleepFrame(sleep=0.1),
]
await run_test(processor, frames_to_send=frames_to_send)
# Verify first turn was interrupted
self.assertGreaterEqual(
len(turn_ended_calls), 1, f"Expected at least 1 turn ended, got {len(turn_ended_calls)}"
)
first_turn = turn_ended_calls[0]
self.assertEqual(first_turn["user_text"], "Tell me")
# Note: In this test flow, InterruptionFrame arrives before TTSTextFrames are processed,
# so assistant text may be empty. In real scenarios, word timestamps ensure proper capture.
self.assertIn(first_turn["assistant_text"], ["", "Sure I can", "Sure I can"])
self.assertTrue(first_turn["interrupted"])
async def test_multiple_turns(self):
"""Test multiple back-and-forth turns."""
processor = TurnAwareTranscriptProcessor()
# Track events
turn_started_calls = []
turn_ended_calls = []
@processor.event_handler("on_turn_started")
async def on_turn_started(proc, turn_number):
turn_started_calls.append(turn_number)
@processor.event_handler("on_turn_ended")
async def on_turn_ended(proc, turn_number, user_text, assistant_text, interrupted):
turn_ended_calls.append(
{
"turn_number": turn_number,
"user_text": user_text,
"assistant_text": assistant_text,
}
)
frames_to_send = [
# Turn 1
UserStartedSpeakingFrame(),
TranscriptionFrame(text="Hi", user_id="user1", timestamp=""),
SleepFrame(sleep=0.01), # Allow transcription to process
BotStartedSpeakingFrame(),
TTSTextFrame(text="Hello", aggregated_by=AggregationType.WORD),
BotStoppedSpeakingFrame(),
SleepFrame(sleep=0.05),
# Turn 2
UserStartedSpeakingFrame(),
TranscriptionFrame(text="How are you", user_id="user1", timestamp=""),
SleepFrame(sleep=0.01), # Allow transcription to process
BotStartedSpeakingFrame(),
TTSTextFrame(text="I'm", aggregated_by=AggregationType.WORD),
TTSTextFrame(text=" good", aggregated_by=AggregationType.WORD),
BotStoppedSpeakingFrame(),
SleepFrame(sleep=0.1),
]
await run_test(processor, frames_to_send=frames_to_send)
# Verify multiple turns
self.assertEqual(
len(turn_started_calls), 2, f"Expected 2 turns started, got {len(turn_started_calls)}"
)
self.assertEqual(turn_started_calls, [1, 2])
self.assertEqual(
len(turn_ended_calls), 2, f"Expected 2 turns ended, got {len(turn_ended_calls)}"
)
self.assertEqual(turn_ended_calls[0]["turn_number"], 1)
self.assertEqual(turn_ended_calls[0]["user_text"], "Hi")
self.assertEqual(turn_ended_calls[0]["assistant_text"], "Hello")
self.assertEqual(turn_ended_calls[1]["turn_number"], 2)
self.assertEqual(turn_ended_calls[1]["user_text"], "How are you")
self.assertEqual(turn_ended_calls[1]["assistant_text"], "I'm good")
if __name__ == "__main__":
unittest.main()

2
uv.lock generated
View File

@@ -4475,7 +4475,6 @@ daily = [
]
deepgram = [
{ name = "deepgram-sdk" },
{ name = "websockets" },
]
elevenlabs = [
{ name = "websockets" },
@@ -4721,7 +4720,6 @@ requires-dist = [
{ name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'asyncai'" },
{ name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'aws'" },
{ name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'cartesia'" },
{ name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'deepgram'" },
{ name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'elevenlabs'" },
{ name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'fish'" },
{ name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'gladia'" },