Compare commits
14 Commits
vp-trace-c
...
mb/review-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b57a9d2546 | ||
|
|
54ff49c4fc | ||
|
|
8549331b32 | ||
|
|
6b38510ace | ||
|
|
bbd5c3b487 | ||
|
|
c6fcf58be4 | ||
|
|
a475d09b90 | ||
|
|
4b11bcb4a4 | ||
|
|
c1a1d40f9a | ||
|
|
ede995c563 | ||
|
|
fdf3c8b4cf | ||
|
|
50bef86d33 | ||
|
|
79f43ece74 | ||
|
|
08b2365244 |
@@ -24,8 +24,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
- Added word-level timestamps support to Hume TTS service
|
||||
|
||||
- Added optional speaking rate control to `InworldTTSService`.
|
||||
|
||||
### Changed
|
||||
|
||||
- ⚠️ Breaking change: `LLMContext.create_image_message()`,
|
||||
@@ -91,9 +89,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
- Prevented `HeyGenVideoService` from automatically disconnecting after 5 minutes.
|
||||
|
||||
- Fixed `InworldTTSService` audio config payload to use camelCase keys expected
|
||||
by the Inworld API.
|
||||
|
||||
## [0.0.94] - 2025-11-10
|
||||
|
||||
### Changed
|
||||
|
||||
@@ -1,155 +0,0 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from turn_detector_observer import TurnDetectorObserver
|
||||
|
||||
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
|
||||
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.services.openai.stt import OpenAISTTService
|
||||
from pipecat.services.openai.tts import OpenAITTSService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
# instantiated. The function will be called when the desired transport gets
|
||||
# selected.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
### STT ###
|
||||
stt = OpenAISTTService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4o-transcribe",
|
||||
prompt="Expect normal helpful conversation.",
|
||||
)
|
||||
|
||||
### LLM ###
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
|
||||
### TTS ###
|
||||
tts = OpenAITTSService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
voice="ballad",
|
||||
params=OpenAITTSService.InputParams(
|
||||
instructions="Please speak clearly and at a moderate pace."
|
||||
),
|
||||
)
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
context = LLMContext(messages)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
|
||||
# RTVI events for detecting bot aggregation
|
||||
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
|
||||
|
||||
### PIPELINE ###
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
rtvi,
|
||||
stt,
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
### TASK ###
|
||||
turn_detector = TurnDetectorObserver()
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
observers=[turn_detector, RTVIObserver(rtvi)],
|
||||
)
|
||||
|
||||
turn_detector.set_turn_observer_event_handlers(task.turn_tracking_observer)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
# not sure this is needed...
|
||||
@rtvi.event_handler("on_client_ready")
|
||||
async def on_client_ready(rtvi):
|
||||
await rtvi.set_bot_ready()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
@@ -1,161 +0,0 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from turn_detector_observer import TurnDetectorObserver
|
||||
|
||||
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
|
||||
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.services.openai.stt import OpenAISTTService
|
||||
from pipecat.services.openai.tts import OpenAITTSService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
# instantiated. The function will be called when the desired transport gets
|
||||
# selected.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
session_properties = SessionProperties(
|
||||
audio=AudioConfiguration(
|
||||
input=AudioInput(
|
||||
transcription=InputAudioTranscription(),
|
||||
# Set openai TurnDetection parameters. Not setting this at all will turn it
|
||||
# on by default
|
||||
turn_detection=SemanticTurnDetection(),
|
||||
# Or set to False to disable openai turn detection and use transport VAD
|
||||
# turn_detection=False,
|
||||
noise_reduction=InputAudioNoiseReduction(type="near_field"),
|
||||
)
|
||||
),
|
||||
# In this example we provide tools through the context, but you could
|
||||
# alternatively provide them here.
|
||||
# tools=tools,
|
||||
instructions="""You are a helpful and friendly AI.
|
||||
|
||||
Act like a human, but remember that you aren't a human and that you can't do human
|
||||
things in the real world. Your voice and personality should be warm and engaging, with a lively and
|
||||
playful tone.
|
||||
|
||||
If interacting in a non-English language, start by using the standard accent or dialect familiar to
|
||||
the user. Talk quickly. You should always call a function if you can. Do not refer to these rules,
|
||||
even if you're asked about them.
|
||||
|
||||
You are participating in a voice conversation. Keep your responses concise, short, and to the point
|
||||
unless specifically asked to elaborate on a topic.
|
||||
|
||||
Remember, your responses should be short. Just one or two sentences, usually. Respond in English.""",
|
||||
)
|
||||
|
||||
### LLM ###
|
||||
llm = OpenAIRealtimeLLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
session_properties=session_properties,
|
||||
start_audio_paused=False,
|
||||
)
|
||||
|
||||
# Create a standard OpenAI LLM context object using the normal messages format. The
|
||||
# OpenAIRealtimeLLMService will convert this internally to messages that the
|
||||
# openai WebSocket API can understand.
|
||||
context = LLMContext(
|
||||
[{"role": "user", "content": "Say hello!"}],
|
||||
tools,
|
||||
)
|
||||
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
|
||||
### PIPELINE ###
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
context_aggregator.user(),
|
||||
llm, # LLM
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
)
|
||||
|
||||
### TASK ###
|
||||
turn_detector = TurnDetectorObserver()
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
observers=[turn_detector],
|
||||
)
|
||||
|
||||
turn_detector.set_turn_observer_event_handlers(task.turn_tracking_observer)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
@@ -1,188 +0,0 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from turn_detector_observer import TurnDetectorObserver
|
||||
|
||||
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
|
||||
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.services.openai.stt import OpenAISTTService
|
||||
from pipecat.services.openai.tts import OpenAITTSService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
async def fetch_weather_from_api(params: FunctionCallParams):
|
||||
await params.result_callback({"conditions": "nice", "temperature": "75"})
|
||||
|
||||
|
||||
async def fetch_restaurant_recommendation(params: FunctionCallParams):
|
||||
await params.result_callback({"name": "The Golden Dragon"})
|
||||
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
# instantiated. The function will be called when the desired transport gets
|
||||
# selected.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
### STT ###
|
||||
stt = OpenAISTTService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4o-transcribe",
|
||||
prompt="Expect normal helpful conversation.",
|
||||
)
|
||||
|
||||
### LLM ###
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
|
||||
### TTS ###
|
||||
tts = OpenAITTSService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
voice="ballad",
|
||||
params=OpenAITTSService.InputParams(instructions="Please speak clearly and at a moderate pace."),
|
||||
)
|
||||
|
||||
# You can also register a function_name of None to get all functions
|
||||
# sent to the same callback with an additional function_name parameter.
|
||||
llm.register_function("get_current_weather", fetch_weather_from_api)
|
||||
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
|
||||
|
||||
@llm.event_handler("on_function_calls_started")
|
||||
async def on_function_calls_started(service, function_calls):
|
||||
await tts.queue_frame(TTSSpeakFrame("Let me check on that."))
|
||||
|
||||
weather_function = FunctionSchema(
|
||||
name="get_current_weather",
|
||||
description="Get the current weather",
|
||||
properties={
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
"format": {
|
||||
"type": "string",
|
||||
"enum": ["celsius", "fahrenheit"],
|
||||
"description": "The temperature unit to use. Infer this from the user's location.",
|
||||
},
|
||||
},
|
||||
required=["location", "format"],
|
||||
)
|
||||
restaurant_function = FunctionSchema(
|
||||
name="get_restaurant_recommendation",
|
||||
description="Get a restaurant recommendation",
|
||||
properties={
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
},
|
||||
required=["location"],
|
||||
)
|
||||
tools = ToolsSchema(standard_tools=[weather_function, restaurant_function])
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
context = LLMContext(messages, tools)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
|
||||
### PIPELINE ###
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
### TASK ###
|
||||
turn_detector = TurnDetectorObserver()
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
observers=[turn_detector],
|
||||
)
|
||||
|
||||
turn_detector.set_turn_observer_event_handlers(task.turn_tracking_observer)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
@@ -1,11 +0,0 @@
|
||||
```bash
|
||||
uv sync
|
||||
uv pip install -e '.[cartesia,daily,elevenlabs,local-smart-turn-v3,openai,runner,webrtc]'
|
||||
```
|
||||
|
||||
```bash
|
||||
python examples/foundational/trace/001-trace.py
|
||||
```
|
||||
|
||||
- open [http://localhost:7860](http://localhost:7860)
|
||||
- click `connect` button in top right
|
||||
@@ -1,5 +0,0 @@
|
||||
OPENAI_API_KEY=...
|
||||
|
||||
ELEVENLABS_API_KEY=...
|
||||
ELEVENLABS_VOICE_ID=...
|
||||
CARTESIA_API_KEY=...
|
||||
@@ -1,181 +0,0 @@
|
||||
import time
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
BotStartedSpeakingFrame,
|
||||
BotStoppedSpeakingFrame,
|
||||
EndFrame,
|
||||
FunctionCallResultFrame,
|
||||
FunctionCallsStartedFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
StartFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.observers.base_observer import BaseObserver, FramePushed
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.openai.base_llm import LLMService
|
||||
from pipecat.transports.base_output import BaseOutputTransport
|
||||
|
||||
|
||||
class TurnDetectorObserver(BaseObserver):
|
||||
"""Observer ... of turns."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
self._turn_observer = None
|
||||
self._arrow = "→"
|
||||
|
||||
self._turn_number = 1
|
||||
self._endframe_queued = False
|
||||
|
||||
def init(self):
|
||||
"""
|
||||
Set ...
|
||||
"""
|
||||
pass
|
||||
|
||||
def set_turn_observer_event_handlers(self, turn_observer):
|
||||
self._turn_observer = turn_observer
|
||||
self.set_turn_observer_event_handlers(self._turn_observer)
|
||||
|
||||
def get_turn_observer(self):
|
||||
return self._turn_observer
|
||||
|
||||
def set_turn_observer_event_handlers(self, turn_observer):
|
||||
"""Sets the Turn Observer event handlers `on_turn_started` and `on_turn_ended`.
|
||||
|
||||
Args:
|
||||
turn_observer: The turn tracking observer of the pipeline task
|
||||
"""
|
||||
|
||||
@turn_observer.event_handler("on_turn_started")
|
||||
async def on_turn_started(observer, turn_number):
|
||||
self._turn_number = turn_number
|
||||
current_time = time.time()
|
||||
logger.info(f"🔄 Turn {turn_number} started")
|
||||
|
||||
#
|
||||
# code to start conversation turn here
|
||||
#
|
||||
#
|
||||
#
|
||||
|
||||
@turn_observer.event_handler("on_turn_ended")
|
||||
async def on_turn_ended(observer, turn_number, duration, was_interrupted):
|
||||
current_time = time.time()
|
||||
|
||||
if was_interrupted:
|
||||
logger.info(f"🔄 Turn {turn_number} interrupted after {duration:.2f}s")
|
||||
else:
|
||||
logger.info(f"🏁 Turn {turn_number} completed in {duration:.2f}s")
|
||||
|
||||
#
|
||||
# code to end conversation turn here
|
||||
#
|
||||
#
|
||||
#
|
||||
|
||||
########
|
||||
# everything past here isn't needed, just nice to have logging
|
||||
########
|
||||
async def on_push_frame(self, data: FramePushed):
|
||||
"""Runs when any frame is pushed through pipeline.
|
||||
Determines based on what type of frame and where it came from
|
||||
what metrics to update.
|
||||
|
||||
Args:
|
||||
data: the pushed frame
|
||||
"""
|
||||
src = data.source
|
||||
dst = data.destination
|
||||
frame = data.frame
|
||||
direction = data.direction
|
||||
timestamp = data.timestamp
|
||||
|
||||
# Convert timestamp to milliseconds for readability
|
||||
time_sec = timestamp / 1_000_000
|
||||
# Convert timestamp to seconds for readability
|
||||
# time_sec = timestamp / 1_000_000_000
|
||||
|
||||
# only log downstream frames
|
||||
if direction == FrameDirection.UPSTREAM:
|
||||
return
|
||||
|
||||
if isinstance(src, Pipeline) or isinstance(dst, Pipeline):
|
||||
if isinstance(frame, StartFrame):
|
||||
self._handle_StartFrame(src, dst, frame, time_sec)
|
||||
elif isinstance(frame, EndFrame):
|
||||
self._handle_EndFrame(src, dst, frame, time_sec)
|
||||
|
||||
if isinstance(src, BaseOutputTransport):
|
||||
if isinstance(frame, BotStartedSpeakingFrame):
|
||||
self._handle_BotStartedSpeakingFrame(src, dst, frame, time_sec)
|
||||
elif isinstance(frame, BotStoppedSpeakingFrame):
|
||||
self._handle_BotStoppedSpeakingFrame(src, dst, frame, time_sec)
|
||||
|
||||
elif isinstance(frame, UserStartedSpeakingFrame):
|
||||
self._handle_UserStartedSpeakingFrame(src, dst, frame, time_sec)
|
||||
elif isinstance(frame, UserStoppedSpeakingFrame):
|
||||
self._handle_UserStoppedSpeakingFrame(src, dst, frame, time_sec)
|
||||
|
||||
if isinstance(src, LLMService):
|
||||
if isinstance(frame, LLMFullResponseStartFrame):
|
||||
self._handle_LLMFullResponseStartFrame(src, dst, frame, time_sec)
|
||||
elif isinstance(frame, LLMFullResponseEndFrame):
|
||||
self._handle_LLMFullResponseEndFrame(src, dst, frame, time_sec)
|
||||
elif isinstance(frame, FunctionCallsStartedFrame):
|
||||
self._handle_FunctionCallsStartedFrame(src, dst, frame, time_sec)
|
||||
elif isinstance(frame, FunctionCallResultFrame):
|
||||
self._handle_FunctionCallResultFrame(src, dst, frame, time_sec)
|
||||
|
||||
# ------------ FRAME HANDLERS ------------
|
||||
|
||||
def _handle_StartFrame(self, src, dst, frame, time_sec):
|
||||
if isinstance(dst, Pipeline):
|
||||
logger.info(f"🟢🟢🟢 StartFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
|
||||
|
||||
def _handle_EndFrame(self, src, dst, frame, time_sec):
|
||||
if isinstance(dst, Pipeline):
|
||||
logger.info(f"Queueing 🔴🔴🔴 EndFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
|
||||
self._endframe_queued = True
|
||||
|
||||
if isinstance(src, Pipeline):
|
||||
logger.info(f"🔴🔴🔴 EndFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
|
||||
|
||||
current_time = time.time()
|
||||
end_state_info = {
|
||||
"turn_number": self._turn_number,
|
||||
}
|
||||
|
||||
def _handle_BotStartedSpeakingFrame(self, src, dst, frame, time_sec):
|
||||
logger.info(f"🤖🟢 BotStartedSpeakingFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
|
||||
|
||||
def _handle_BotStoppedSpeakingFrame(self, src, dst, frame, time_sec):
|
||||
logger.info(f"🤖🔴 BotStoppedSpeakingFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
|
||||
|
||||
def _handle_LLMFullResponseStartFrame(self, src, dst, frame, time_sec):
|
||||
logger.info(f"🧠🟢 LLMFullResponseStartFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
|
||||
|
||||
def _handle_LLMFullResponseEndFrame(self, src, dst, frame, time_sec):
|
||||
logger.info(f"🧠🔴 LLMFullResponseEndFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
|
||||
|
||||
def _handle_UserStartedSpeakingFrame(self, src, dst, frame, time_sec):
|
||||
logger.info(f"🙂🟢 UserStartedSpeakingFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
|
||||
|
||||
def _handle_UserStoppedSpeakingFrame(self, src, dst, frame, time_sec):
|
||||
logger.info(f"🙂🔴 UserStoppedSpeakingFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
|
||||
|
||||
def _handle_FunctionCallsStartedFrame(self, src, dst, frame, time_sec):
|
||||
logger.info(
|
||||
f"📐🟢 {frame.function_calls[0].function_name} FunctionCallsStartedFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s"
|
||||
)
|
||||
|
||||
def _handle_FunctionCallResultFrame(self, src, dst, frame, time_sec):
|
||||
logger.info(
|
||||
f"📐🔴 {frame.function_name} FunctionCallResultFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s"
|
||||
)
|
||||
@@ -807,11 +807,13 @@ class ErrorFrame(SystemFrame):
|
||||
error: Description of the error that occurred.
|
||||
fatal: Whether the error is fatal and requires bot shutdown.
|
||||
processor: The frame processor that generated the error.
|
||||
exception: The exception that occurred.
|
||||
"""
|
||||
|
||||
error: str
|
||||
fatal: bool = False
|
||||
processor: Optional["FrameProcessor"] = None
|
||||
exception: Optional[Exception] = None
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}(error: {self.error}, fatal: {self.fatal})"
|
||||
|
||||
@@ -126,6 +126,4 @@ class WakeCheckFilter(FrameProcessor):
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
except Exception as e:
|
||||
error_msg = f"Error in wake word filter: {e}"
|
||||
logger.exception(error_msg)
|
||||
await self.push_error(ErrorFrame(error_msg))
|
||||
await self.push_error(error_msg=f"Error in wake word filter: {e}", exception=e)
|
||||
|
||||
@@ -142,6 +142,7 @@ class FrameProcessor(BaseObject):
|
||||
- on_after_process_frame: Called after a frame is processed
|
||||
- on_before_push_frame: Called before a frame is pushed
|
||||
- on_after_push_frame: Called after a frame is pushed
|
||||
- on_error: Called when an error is raised in the frame processing.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@@ -234,6 +235,7 @@ class FrameProcessor(BaseObject):
|
||||
self._register_event_handler("on_after_process_frame", sync=True)
|
||||
self._register_event_handler("on_before_push_frame", sync=True)
|
||||
self._register_event_handler("on_after_push_frame", sync=True)
|
||||
self._register_event_handler("on_error", sync=True)
|
||||
|
||||
@property
|
||||
def id(self) -> int:
|
||||
@@ -630,7 +632,45 @@ class FrameProcessor(BaseObject):
|
||||
elif isinstance(frame, (FrameProcessorResumeFrame, FrameProcessorResumeUrgentFrame)):
|
||||
await self.__resume(frame)
|
||||
|
||||
async def push_error(self, error: ErrorFrame):
|
||||
async def push_error(
|
||||
self,
|
||||
error_msg: Optional[str] = None,
|
||||
exception: Optional[Exception] = None,
|
||||
fatal: bool = False,
|
||||
):
|
||||
"""Creates and pushes an ErrorFrame upstream.
|
||||
|
||||
Creates and pushes an ErrorFrame upstream to notify other processors in the
|
||||
pipeline about an error condition. The error frame will include context about
|
||||
which processor generated the error.
|
||||
|
||||
Args:
|
||||
error_msg: Optional descriptive message explaining the error condition.
|
||||
exception: Optional exception object that caused the error, if available.
|
||||
This provides additional context for debugging and error handling.
|
||||
fatal: Whether this error should be considered fatal to the pipeline.
|
||||
Fatal errors typically cause the entire pipeline to stop processing.
|
||||
Defaults to False for non-fatal errors.
|
||||
|
||||
Example:
|
||||
```python
|
||||
# Non-fatal error
|
||||
await self.push_error("Failed to process audio chunk, skipping")
|
||||
|
||||
# Fatal error with exception context
|
||||
try:
|
||||
result = some_critical_operation()
|
||||
except Exception as e:
|
||||
await self.push_error("Critical operation failed", exception=e, fatal=True)
|
||||
```
|
||||
"""
|
||||
error_message = error_msg or f"{self} exception: {exception}"
|
||||
error_frame = ErrorFrame(
|
||||
error=error_message, fatal=fatal, exception=exception, processor=self
|
||||
)
|
||||
await self.push_error_frame(error=error_frame)
|
||||
|
||||
async def push_error_frame(self, error: ErrorFrame):
|
||||
"""Push an error frame upstream.
|
||||
|
||||
Args:
|
||||
@@ -638,6 +678,8 @@ class FrameProcessor(BaseObject):
|
||||
"""
|
||||
if not error.processor:
|
||||
error.processor = self
|
||||
await self._call_event_handler("on_error", error)
|
||||
logger.error(error.error)
|
||||
await self.push_frame(error, FrameDirection.UPSTREAM)
|
||||
|
||||
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
|
||||
@@ -759,8 +801,10 @@ class FrameProcessor(BaseObject):
|
||||
await self.__cancel_process_task()
|
||||
self.__create_process_task()
|
||||
except Exception as e:
|
||||
logger.exception(f"Uncaught exception in {self} when handling _start_interruption: {e}")
|
||||
await self.push_error(ErrorFrame(str(e)))
|
||||
await self.push_error(
|
||||
error_msg=f"Uncaught exception in {self} when handling _start_interruption: {e}",
|
||||
exception=e,
|
||||
)
|
||||
|
||||
async def __internal_push_frame(self, frame: Frame, direction: FrameDirection):
|
||||
"""Internal method to push frames to adjacent processors.
|
||||
@@ -797,8 +841,7 @@ class FrameProcessor(BaseObject):
|
||||
await self._observer.on_push_frame(data)
|
||||
await self._prev.queue_frame(frame, direction)
|
||||
except Exception as e:
|
||||
logger.exception(f"Uncaught exception in {self}: {e}")
|
||||
await self.push_error(ErrorFrame(str(e)))
|
||||
await self.push_error(exception=e)
|
||||
|
||||
def _check_started(self, frame: Frame):
|
||||
"""Check if the processor has been started.
|
||||
@@ -874,8 +917,7 @@ class FrameProcessor(BaseObject):
|
||||
|
||||
await self._call_event_handler("on_after_process_frame", frame)
|
||||
except Exception as e:
|
||||
logger.exception(f"{self}: error processing frame: {e}")
|
||||
await self.push_error(ErrorFrame(str(e)))
|
||||
await self.push_error(error_msg=f"{self}: error processing frame: {e}", exception=e)
|
||||
|
||||
async def __input_frame_task_handler(self):
|
||||
"""Handle frames from the input queue.
|
||||
|
||||
@@ -24,7 +24,7 @@ try:
|
||||
from langchain_core.messages import AIMessageChunk
|
||||
from langchain_core.runnables import Runnable
|
||||
except ModuleNotFoundError as e:
|
||||
logger.exception("In order to use Langchain, you need to `pip install pipecat-ai[langchain]`. ")
|
||||
logger.error("In order to use Langchain, you need to `pip install pipecat-ai[langchain]`. ")
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
@@ -113,6 +113,6 @@ class LangchainProcessor(FrameProcessor):
|
||||
except GeneratorExit:
|
||||
logger.warning(f"{self} generator was closed prematurely")
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} an unknown error occurred: {e}")
|
||||
await self.push_error(exception=e)
|
||||
finally:
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
|
||||
@@ -23,7 +23,7 @@ try:
|
||||
from strands import Agent
|
||||
from strands.multiagent.graph import Graph
|
||||
except ModuleNotFoundError as e:
|
||||
logger.exception("In order to use Strands Agents, you need to `pip install strands-agents`.")
|
||||
logger.error("In order to use Strands Agents, you need to `pip install strands-agents`.")
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
@@ -143,7 +143,7 @@ class StrandsAgentsProcessor(FrameProcessor):
|
||||
except GeneratorExit:
|
||||
logger.warning(f"{self} generator was closed prematurely")
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} an unknown error occurred: {e}")
|
||||
await self.push_error(exception=e)
|
||||
finally:
|
||||
if ttfb_tracking:
|
||||
await self.stop_ttfb_metrics()
|
||||
|
||||
@@ -199,7 +199,7 @@ class PlivoFrameSerializer(FrameSerializer):
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to hang up Plivo call: {e}")
|
||||
logger.error(f"Failed to hang up Plivo call: {e}")
|
||||
|
||||
async def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
"""Deserializes Plivo WebSocket data to Pipecat frames.
|
||||
|
||||
@@ -225,7 +225,7 @@ class TelnyxFrameSerializer(FrameSerializer):
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to hang up Telnyx call: {e}")
|
||||
logger.error(f"Failed to hang up Telnyx call: {e}")
|
||||
|
||||
async def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
"""Deserializes Telnyx WebSocket data to Pipecat frames.
|
||||
|
||||
@@ -236,7 +236,7 @@ class TwilioFrameSerializer(FrameSerializer):
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to hang up Twilio call: {e}")
|
||||
logger.error(f"Failed to hang up Twilio call: {e}")
|
||||
|
||||
async def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
"""Deserializes Twilio WebSocket data to Pipecat frames.
|
||||
|
||||
@@ -166,6 +166,6 @@ class AIService(FrameProcessor):
|
||||
async for f in generator:
|
||||
if f:
|
||||
if isinstance(f, ErrorFrame):
|
||||
await self.push_error(f)
|
||||
await self.push_error_frame(f)
|
||||
else:
|
||||
await self.push_frame(f)
|
||||
|
||||
@@ -458,8 +458,7 @@ class AnthropicLLMService(LLMService):
|
||||
except httpx.TimeoutException:
|
||||
await self._call_event_handler("on_completion_timeout")
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(f"{e}"))
|
||||
await self.push_error(exception=e)
|
||||
finally:
|
||||
await self.stop_processing_metrics()
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
|
||||
@@ -206,9 +206,8 @@ class AssemblyAISTTService(STTService):
|
||||
|
||||
await self._call_event_handler("on_connected")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
self._connected = False
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
raise
|
||||
|
||||
async def _disconnect(self):
|
||||
@@ -233,8 +232,7 @@ class AssemblyAISTTService(STTService):
|
||||
logger.warning("Timed out waiting for termination message from server")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
|
||||
if self._receive_task:
|
||||
await self.cancel_task(self._receive_task)
|
||||
@@ -242,8 +240,7 @@ class AssemblyAISTTService(STTService):
|
||||
await self._websocket.close()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
|
||||
finally:
|
||||
self._websocket = None
|
||||
@@ -262,13 +259,11 @@ class AssemblyAISTTService(STTService):
|
||||
except websockets.exceptions.ConnectionClosedOK:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
|
||||
def _parse_message(self, message: Dict[str, Any]) -> BaseMessage:
|
||||
"""Parse a raw message into the appropriate message type."""
|
||||
@@ -297,8 +292,7 @@ class AssemblyAISTTService(STTService):
|
||||
elif isinstance(parsed_message, TerminationMessage):
|
||||
await self._handle_termination(parsed_message)
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
|
||||
async def _handle_termination(self, message: TerminationMessage):
|
||||
"""Handle termination message."""
|
||||
|
||||
@@ -228,8 +228,7 @@ class AsyncAITTSService(InterruptibleTTSService):
|
||||
|
||||
await self._call_event_handler("on_connected")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
self._websocket = None
|
||||
await self._call_event_handler("on_connection_error", f"{e}")
|
||||
|
||||
@@ -241,8 +240,7 @@ class AsyncAITTSService(InterruptibleTTSService):
|
||||
logger.debug("Disconnecting from Async")
|
||||
await self._websocket.close()
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
finally:
|
||||
self._websocket = None
|
||||
self._started = False
|
||||
@@ -287,12 +285,11 @@ class AsyncAITTSService(InterruptibleTTSService):
|
||||
)
|
||||
await self.push_frame(frame)
|
||||
elif msg.get("error_code"):
|
||||
logger.error(f"{self} error: {msg}")
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
await self.stop_all_metrics()
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {msg['message']}"))
|
||||
await self.push_error(error_msg=f"{self} error: {msg['message']}")
|
||||
else:
|
||||
logger.error(f"{self} error, unknown message type: {msg}")
|
||||
await self.push_error(error_msg=f"{self} error, unknown message type: {msg}")
|
||||
|
||||
async def _keepalive_task_handler(self):
|
||||
"""Send periodic keepalive messages to maintain WebSocket connection."""
|
||||
@@ -335,7 +332,6 @@ class AsyncAITTSService(InterruptibleTTSService):
|
||||
await self._get_websocket().send(msg)
|
||||
await self.start_tts_usage_metrics(text)
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
yield TTSStoppedFrame()
|
||||
await self._disconnect()
|
||||
@@ -343,7 +339,6 @@ class AsyncAITTSService(InterruptibleTTSService):
|
||||
return
|
||||
yield None
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
|
||||
|
||||
@@ -477,8 +472,7 @@ class AsyncAIHttpTTSService(TTSService):
|
||||
async with self._session.post(url, json=payload, headers=headers) as response:
|
||||
if response.status != 200:
|
||||
error_text = await response.text()
|
||||
logger.error(f"Async API error: {error_text}")
|
||||
await self.push_error(ErrorFrame(error=f"Async API error: {error_text}"))
|
||||
yield ErrorFrame(error=f"Async API error: {error_text}")
|
||||
raise Exception(f"Async API returned status {response.status}: {error_text}")
|
||||
|
||||
audio_data = await response.read()
|
||||
@@ -494,8 +488,7 @@ class AsyncAIHttpTTSService(TTSService):
|
||||
yield frame
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
finally:
|
||||
await self.stop_ttfb_metrics()
|
||||
yield TTSStoppedFrame()
|
||||
|
||||
@@ -1136,7 +1136,7 @@ class AWSBedrockLLMService(LLMService):
|
||||
except (ReadTimeoutError, asyncio.TimeoutError):
|
||||
await self._call_event_handler("on_completion_timeout")
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
await self.push_error(exception=e)
|
||||
finally:
|
||||
await self.stop_processing_metrics()
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
|
||||
@@ -452,7 +452,7 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
self._ready_to_send_context = True
|
||||
await self._finish_connecting_if_context_available()
|
||||
except Exception as e:
|
||||
logger.error(f"{self} initialization error: {e}")
|
||||
await self.push_error(exception=e)
|
||||
await self._disconnect()
|
||||
|
||||
async def _process_completed_function_calls(self, send_new_results: bool):
|
||||
@@ -576,7 +576,7 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
|
||||
logger.info("Finished disconnecting")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error disconnecting: {e}")
|
||||
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
|
||||
|
||||
def _create_client(self) -> BedrockRuntimeClient:
|
||||
config = Config(
|
||||
@@ -884,7 +884,7 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
# Errors are kind of expected while disconnecting, so just
|
||||
# ignore them and do nothing
|
||||
return
|
||||
logger.error(f"{self} error processing responses: {e}")
|
||||
await self.push_error(exception=e)
|
||||
if self._wants_connection:
|
||||
await self.reset_conversation()
|
||||
|
||||
|
||||
@@ -140,8 +140,7 @@ class AWSTranscribeSTTService(STTService):
|
||||
return
|
||||
logger.warning("WebSocket connection not established after connect")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
retry_count += 1
|
||||
if retry_count < max_retries:
|
||||
await asyncio.sleep(1) # Wait before retrying
|
||||
@@ -182,7 +181,6 @@ class AWSTranscribeSTTService(STTService):
|
||||
try:
|
||||
await self._connect()
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
return
|
||||
|
||||
@@ -200,12 +198,10 @@ class AWSTranscribeSTTService(STTService):
|
||||
await self._disconnect()
|
||||
# Don't yield error here - we'll retry on next frame
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
await self._disconnect()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
await self._disconnect()
|
||||
|
||||
@@ -289,8 +285,7 @@ class AWSTranscribeSTTService(STTService):
|
||||
|
||||
await self._call_event_handler("on_connected")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
await self._disconnect()
|
||||
raise
|
||||
|
||||
@@ -310,8 +305,7 @@ class AWSTranscribeSTTService(STTService):
|
||||
await self._ws_client.send(json.dumps(end_stream))
|
||||
await self._ws_client.close()
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
finally:
|
||||
self._ws_client = None
|
||||
await self._call_event_handler("on_disconnected")
|
||||
@@ -529,15 +523,15 @@ class AWSTranscribeSTTService(STTService):
|
||||
)
|
||||
elif headers.get(":message-type") == "exception":
|
||||
error_msg = payload.get("Message", "Unknown error")
|
||||
logger.error(f"{self} Exception from AWS: {error_msg}")
|
||||
await self.push_frame(ErrorFrame(f"AWS Transcribe error: {error_msg}"))
|
||||
await self.push_error(error_msg=f"AWS Transcribe error: {error_msg}")
|
||||
else:
|
||||
logger.debug(f"{self} Other message type received: {headers}")
|
||||
logger.debug(f"{self} Payload: {payload}")
|
||||
except websockets.exceptions.ConnectionClosed as e:
|
||||
logger.error(f"{self} WebSocket connection closed in receive loop: {e}")
|
||||
await self.push_error(
|
||||
error_msg=f"WebSocket connection closed in receive loop", exception=e
|
||||
)
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
break
|
||||
|
||||
@@ -312,7 +312,6 @@ class AWSPollyTTSService(TTSService):
|
||||
|
||||
yield TTSStoppedFrame()
|
||||
except (BotoCoreError, ClientError) as error:
|
||||
logger.exception(f"{self} error generating TTS: {error}")
|
||||
error_message = f"AWS Polly TTS error: {str(error)}"
|
||||
yield ErrorFrame(error=error_message)
|
||||
|
||||
|
||||
@@ -91,7 +91,6 @@ class AzureImageGenServiceREST(ImageGenService):
|
||||
while status != "succeeded":
|
||||
attempts_left -= 1
|
||||
if attempts_left == 0:
|
||||
logger.error(f"{self} error: image generation timed out")
|
||||
yield ErrorFrame("Image generation timed out")
|
||||
return
|
||||
|
||||
@@ -104,7 +103,6 @@ class AzureImageGenServiceREST(ImageGenService):
|
||||
|
||||
image_url = json_response["result"]["data"][0]["url"] if json_response else None
|
||||
if not image_url:
|
||||
logger.error(f"{self} error: image generation failed")
|
||||
yield ErrorFrame("Image generation failed")
|
||||
return
|
||||
|
||||
|
||||
@@ -61,5 +61,5 @@ class AzureRealtimeLLMService(OpenAIRealtimeLLMService):
|
||||
)
|
||||
self._receive_task = self.create_task(self._receive_task_handler())
|
||||
except Exception as e:
|
||||
logger.error(f"{self} initialization error: {e}")
|
||||
await self.push_error(exception=e)
|
||||
self._websocket = None
|
||||
|
||||
@@ -121,7 +121,6 @@ class AzureSTTService(STTService):
|
||||
self._audio_stream.write(audio)
|
||||
yield None
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
@@ -151,8 +150,9 @@ class AzureSTTService(STTService):
|
||||
self._speech_recognizer.recognized.connect(self._on_handle_recognized)
|
||||
self._speech_recognizer.start_continuous_recognition_async()
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception during initialization: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(
|
||||
error_msg=f"{self} exception during initialization: {e}", exception=e
|
||||
)
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
"""Stop the speech recognition service.
|
||||
|
||||
@@ -327,7 +327,6 @@ class AzureTTSService(AzureBaseTTSService):
|
||||
try:
|
||||
if self._speech_synthesizer is None:
|
||||
error_msg = "Speech synthesizer not initialized."
|
||||
logger.error(error_msg)
|
||||
yield ErrorFrame(error=error_msg)
|
||||
return
|
||||
|
||||
@@ -355,14 +354,12 @@ class AzureTTSService(AzureBaseTTSService):
|
||||
yield TTSStoppedFrame()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
yield TTSStoppedFrame()
|
||||
# Could add reconnection logic here if needed
|
||||
return
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
|
||||
|
||||
@@ -440,5 +437,4 @@ class AzureHttpTTSService(AzureBaseTTSService):
|
||||
cancellation_details = result.cancellation_details
|
||||
logger.warning(f"Speech synthesis canceled: {cancellation_details.reason}")
|
||||
if cancellation_details.reason == CancellationReason.Error:
|
||||
logger.error(f"{self} error: {cancellation_details.error_details}")
|
||||
yield ErrorFrame(error=f"{self} error: {cancellation_details.error_details}")
|
||||
|
||||
@@ -276,8 +276,7 @@ class CartesiaSTTService(WebsocketSTTService):
|
||||
self._websocket = await websocket_connect(ws_url, additional_headers=headers)
|
||||
await self._call_event_handler("on_connected")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
|
||||
async def _disconnect_websocket(self):
|
||||
try:
|
||||
@@ -285,8 +284,7 @@ class CartesiaSTTService(WebsocketSTTService):
|
||||
logger.debug("Disconnecting from Cartesia STT")
|
||||
await self._websocket.close()
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error closing websocket: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(error_msg=f"{self} error closing websocket: {e}", exception=e)
|
||||
finally:
|
||||
self._websocket = None
|
||||
await self._call_event_handler("on_disconnected")
|
||||
@@ -319,8 +317,7 @@ class CartesiaSTTService(WebsocketSTTService):
|
||||
|
||||
elif data["type"] == "error":
|
||||
error_msg = data.get("message", "Unknown error")
|
||||
logger.error(f"Cartesia error: {error_msg}")
|
||||
await self.push_error(ErrorFrame(error=error_msg))
|
||||
await self.push_error(error_msg=error_msg)
|
||||
|
||||
@traced_stt
|
||||
async def _handle_transcription(
|
||||
|
||||
@@ -397,8 +397,7 @@ class CartesiaTTSService(AudioContextWordTTSService):
|
||||
)
|
||||
await self._call_event_handler("on_connected")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
self._websocket = None
|
||||
await self._call_event_handler("on_connection_error", f"{e}")
|
||||
|
||||
@@ -410,8 +409,7 @@ class CartesiaTTSService(AudioContextWordTTSService):
|
||||
logger.debug("Disconnecting from Cartesia")
|
||||
await self._websocket.close()
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
finally:
|
||||
self._context_id = None
|
||||
self._websocket = None
|
||||
@@ -464,13 +462,12 @@ class CartesiaTTSService(AudioContextWordTTSService):
|
||||
)
|
||||
await self.append_to_audio_context(msg["context_id"], frame)
|
||||
elif msg["type"] == "error":
|
||||
logger.error(f"{self} error: {msg}")
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
await self.stop_all_metrics()
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {msg['error']}"))
|
||||
await self.push_error(error_msg=f"{self} error: {msg}")
|
||||
self._context_id = None
|
||||
else:
|
||||
logger.error(f"{self} error, unknown message type: {msg}")
|
||||
await self.push_error(error_msg=f"{self} error, unknown message type: {msg}")
|
||||
|
||||
async def _receive_messages(self):
|
||||
while True:
|
||||
@@ -508,7 +505,6 @@ class CartesiaTTSService(AudioContextWordTTSService):
|
||||
await self._get_websocket().send(msg)
|
||||
await self.start_tts_usage_metrics(text)
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
yield TTSStoppedFrame()
|
||||
await self._disconnect()
|
||||
@@ -516,7 +512,6 @@ class CartesiaTTSService(AudioContextWordTTSService):
|
||||
return
|
||||
yield None
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
|
||||
|
||||
@@ -708,8 +703,7 @@ class CartesiaHttpTTSService(TTSService):
|
||||
async with session.post(url, json=payload, headers=headers) as response:
|
||||
if response.status != 200:
|
||||
error_text = await response.text()
|
||||
logger.error(f"Cartesia API error: {error_text}")
|
||||
await self.push_error(ErrorFrame(error=f"Cartesia API error: {error_text}"))
|
||||
yield ErrorFrame(error=f"Cartesia API error: {error_text}")
|
||||
raise Exception(f"Cartesia API returned status {response.status}: {error_text}")
|
||||
|
||||
audio_data = await response.read()
|
||||
@@ -725,8 +719,7 @@ class CartesiaHttpTTSService(TTSService):
|
||||
yield frame
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
finally:
|
||||
await self.stop_ttfb_metrics()
|
||||
yield TTSStoppedFrame()
|
||||
|
||||
@@ -192,8 +192,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
|
||||
try:
|
||||
await self._disconnect_websocket()
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
finally:
|
||||
# Reset state only after everything is cleaned up
|
||||
self._websocket = None
|
||||
@@ -251,8 +250,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
|
||||
logger.debug("Connected to Deepgram Flux Websocket")
|
||||
await self._call_event_handler("on_connected")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
self._websocket = None
|
||||
await self._call_event_handler("on_connection_error", f"{e}")
|
||||
|
||||
@@ -280,8 +278,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
|
||||
logger.debug("Disconnecting from Deepgram Flux Websocket")
|
||||
await self._websocket.close()
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error closing websocket: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(error_msg=f"{self} error closing websocket: {e}", exception=e)
|
||||
finally:
|
||||
self._websocket = None
|
||||
await self._call_event_handler("on_disconnected")
|
||||
@@ -381,7 +378,6 @@ class DeepgramFluxSTTService(WebsocketSTTService):
|
||||
are issues sending the audio data.
|
||||
"""
|
||||
if not self._websocket:
|
||||
logger.error("Not connected to Deepgram Flux.")
|
||||
yield ErrorFrame("Not connected to Deepgram Flux.")
|
||||
return
|
||||
|
||||
@@ -389,7 +385,6 @@ class DeepgramFluxSTTService(WebsocketSTTService):
|
||||
self._last_stt_time = time.monotonic()
|
||||
await self.send_with_retry(audio, self._report_error)
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
return
|
||||
|
||||
@@ -467,8 +462,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
|
||||
# Skip malformed messages
|
||||
continue
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
# Error will be handled inside WebsocketService->_receive_task_handler
|
||||
raise
|
||||
else:
|
||||
|
||||
@@ -233,7 +233,7 @@ class DeepgramSTTService(STTService):
|
||||
)
|
||||
|
||||
if not await self._connection.start(options=self._settings, addons=self._addons):
|
||||
logger.error(f"{self}: unable to connect to Deepgram")
|
||||
await self.push_error(error_msg=f"Unable to connect to Deepgram")
|
||||
|
||||
async def _disconnect(self):
|
||||
if await self._connection.is_connected():
|
||||
@@ -256,7 +256,7 @@ class DeepgramSTTService(STTService):
|
||||
async def _on_error(self, *args, **kwargs):
|
||||
error: ErrorResponse = kwargs["error"]
|
||||
logger.warning(f"{self} connection error, will retry: {error}")
|
||||
await self.push_error(ErrorFrame(error=f"{error}"))
|
||||
await self.push_error(error_msg=f"{error}")
|
||||
await self.stop_all_metrics()
|
||||
# NOTE(aleix): we don't disconnect (i.e. call finish on the connection)
|
||||
# because this triggers more errors internally in the Deepgram SDK. So,
|
||||
|
||||
@@ -116,7 +116,6 @@ class DeepgramTTSService(TTSService):
|
||||
yield TTSStoppedFrame()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
|
||||
|
||||
@@ -227,5 +226,4 @@ class DeepgramHttpTTSService(TTSService):
|
||||
yield TTSStoppedFrame()
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
yield ErrorFrame(f"Error getting audio: {str(e)}")
|
||||
|
||||
@@ -351,7 +351,6 @@ class ElevenLabsSTTService(SegmentedSTTService):
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
|
||||
|
||||
@@ -586,7 +585,6 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
|
||||
}
|
||||
await self._websocket.send(json.dumps(message))
|
||||
except Exception as e:
|
||||
logger.error(f"Error sending audio: {e}")
|
||||
yield ErrorFrame(f"ElevenLabs Realtime STT error: {str(e)}")
|
||||
|
||||
yield None
|
||||
@@ -645,8 +643,9 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
|
||||
await self._call_event_handler("on_connected")
|
||||
logger.debug("Connected to ElevenLabs Realtime STT")
|
||||
except Exception as e:
|
||||
logger.error(f"{self}: unable to connect to ElevenLabs Realtime STT: {e}")
|
||||
await self.push_error(ErrorFrame(f"Connection error: {str(e)}"))
|
||||
await self.push_error(
|
||||
error_msg=f"{self}: unable to connect to ElevenLabs Realtime STT: {e}", exception=e
|
||||
)
|
||||
|
||||
async def _disconnect_websocket(self):
|
||||
"""Disconnect from ElevenLabs Realtime STT WebSocket."""
|
||||
@@ -655,7 +654,7 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
|
||||
logger.debug("Disconnecting from ElevenLabs Realtime STT")
|
||||
await self._websocket.close()
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error closing websocket: {e}")
|
||||
await self.push_error(error_msg=f"Error closing websocket: {e}", exception=e)
|
||||
finally:
|
||||
self._websocket = None
|
||||
await self._call_event_handler("on_disconnected")
|
||||
@@ -714,13 +713,11 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
|
||||
|
||||
elif message_type == "input_error":
|
||||
error_msg = data.get("error", "Unknown input error")
|
||||
logger.error(f"ElevenLabs input error: {error_msg}")
|
||||
await self.push_error(ErrorFrame(f"Input error: {error_msg}"))
|
||||
await self.push_error(error_msg=f"ElevenLabs input error: {error_msg}")
|
||||
|
||||
elif message_type in ["auth_error", "quota_exceeded", "transcriber_error", "error"]:
|
||||
error_msg = data.get("error", data.get("message", "Unknown error"))
|
||||
logger.error(f"ElevenLabs error ({message_type}): {error_msg}")
|
||||
await self.push_error(ErrorFrame(f"{message_type}: {error_msg}"))
|
||||
await self.push_error(error_msg=f"ElevenLabs error ({message_type}): {error_msg}")
|
||||
|
||||
else:
|
||||
logger.debug(f"Unknown message type: {message_type}")
|
||||
|
||||
@@ -424,8 +424,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
json.dumps({"context_id": self._context_id, "close_context": True})
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
self._context_id = None
|
||||
self._started = False
|
||||
|
||||
@@ -536,9 +535,8 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
|
||||
await self._call_event_handler("on_connected")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
self._websocket = None
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
await self._call_event_handler("on_connection_error", f"{e}")
|
||||
|
||||
async def _disconnect_websocket(self):
|
||||
@@ -553,8 +551,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
await self._websocket.close()
|
||||
logger.debug("Disconnected from ElevenLabs")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
finally:
|
||||
self._started = False
|
||||
self._context_id = None
|
||||
@@ -584,8 +581,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
json.dumps({"context_id": self._context_id, "close_context": True})
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
self._context_id = None
|
||||
self._started = False
|
||||
self._partial_word = ""
|
||||
@@ -740,14 +736,12 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
else:
|
||||
await self._send_text(text)
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield TTSStoppedFrame()
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
self._started = False
|
||||
return
|
||||
yield None
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
|
||||
|
||||
@@ -1043,7 +1037,6 @@ class ElevenLabsHttpTTSService(WordTTSService):
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
error_text = await response.text()
|
||||
logger.error(f"{self} error: {error_text}")
|
||||
yield ErrorFrame(error=f"ElevenLabs API error: {error_text}")
|
||||
return
|
||||
|
||||
@@ -1091,7 +1084,6 @@ class ElevenLabsHttpTTSService(WordTTSService):
|
||||
logger.warning(f"Failed to parse JSON from stream: {e}")
|
||||
continue
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
continue
|
||||
|
||||
@@ -1116,7 +1108,6 @@ class ElevenLabsHttpTTSService(WordTTSService):
|
||||
self._previous_text = text
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
finally:
|
||||
await self.stop_ttfb_metrics()
|
||||
|
||||
@@ -110,7 +110,6 @@ class FalImageGenService(ImageGenService):
|
||||
image_url = response["images"][0]["url"] if response else None
|
||||
|
||||
if not image_url:
|
||||
logger.error(f"{self} error: image generation failed")
|
||||
yield ErrorFrame("Image generation failed")
|
||||
return
|
||||
|
||||
|
||||
@@ -290,5 +290,4 @@ class FalSTTService(SegmentedSTTService):
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
|
||||
@@ -228,8 +228,7 @@ class FishAudioTTSService(InterruptibleTTSService):
|
||||
|
||||
await self._call_event_handler("on_connected")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
self._websocket = None
|
||||
await self._call_event_handler("on_connection_error", f"{e}")
|
||||
|
||||
@@ -243,8 +242,7 @@ class FishAudioTTSService(InterruptibleTTSService):
|
||||
await self._websocket.send(ormsgpack.packb(stop_message))
|
||||
await self._websocket.close()
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
finally:
|
||||
self._request_id = None
|
||||
self._started = False
|
||||
@@ -286,8 +284,7 @@ class FishAudioTTSService(InterruptibleTTSService):
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
|
||||
@traced_tts
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
@@ -323,7 +320,6 @@ class FishAudioTTSService(InterruptibleTTSService):
|
||||
flush_message = {"event": "flush"}
|
||||
await self._get_websocket().send(ormsgpack.packb(flush_message))
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
yield TTSStoppedFrame()
|
||||
await self._disconnect()
|
||||
@@ -332,5 +328,4 @@ class FishAudioTTSService(InterruptibleTTSService):
|
||||
yield None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
|
||||
@@ -468,8 +468,7 @@ class GladiaSTTService(STTService):
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
self._connection_active = False
|
||||
|
||||
if not self._should_reconnect:
|
||||
@@ -559,8 +558,7 @@ class GladiaSTTService(STTService):
|
||||
except websockets.exceptions.ConnectionClosed:
|
||||
logger.debug("Connection closed during keepalive")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
|
||||
async def _receive_task_handler(self):
|
||||
try:
|
||||
@@ -623,8 +621,7 @@ class GladiaSTTService(STTService):
|
||||
# Expected when closing the connection
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
|
||||
async def _maybe_reconnect(self) -> bool:
|
||||
"""Handle exponential backoff reconnection logic."""
|
||||
@@ -632,7 +629,9 @@ class GladiaSTTService(STTService):
|
||||
return False
|
||||
self._reconnection_attempts += 1
|
||||
if self._reconnection_attempts > self._max_reconnection_attempts:
|
||||
logger.error(f"Max reconnection attempts ({self._max_reconnection_attempts}) reached")
|
||||
await self.push_error(
|
||||
error_msg=f"Max reconnection attempts ({self._max_reconnection_attempts}) reached"
|
||||
)
|
||||
self._should_reconnect = False
|
||||
return False
|
||||
delay = self._reconnection_delay * (2 ** (self._reconnection_attempts - 1))
|
||||
|
||||
@@ -1174,7 +1174,7 @@ class GeminiLiveLLMService(LLMService):
|
||||
self._connection_task = self.create_task(self._connection_task_handler(config=config))
|
||||
|
||||
except Exception as e:
|
||||
await self.push_error(ErrorFrame(error=f"{self} Initialization error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
|
||||
async def _connection_task_handler(self, config: LiveConnectConfig):
|
||||
async with self._client.aio.live.connect(model=self._model_name, config=config) as session:
|
||||
@@ -1251,11 +1251,11 @@ class GeminiLiveLLMService(LLMService):
|
||||
)
|
||||
|
||||
if self._consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
|
||||
logger.error(
|
||||
error_msg = (
|
||||
f"Max consecutive failures ({MAX_CONSECUTIVE_FAILURES}) reached, "
|
||||
"treating as fatal error"
|
||||
)
|
||||
await self.push_error(ErrorFrame(error=f"{self} Error in receive loop: {error}"))
|
||||
await self.push_error(error_msg=error_msg, exception=error)
|
||||
return False
|
||||
else:
|
||||
logger.info(
|
||||
@@ -1283,7 +1283,7 @@ class GeminiLiveLLMService(LLMService):
|
||||
self._completed_tool_calls = set()
|
||||
self._disconnecting = False
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error disconnecting: {e}")
|
||||
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
|
||||
|
||||
async def _send_user_audio(self, frame):
|
||||
"""Send user audio frame to Gemini Live API."""
|
||||
@@ -1742,7 +1742,7 @@ class GeminiLiveLLMService(LLMService):
|
||||
# state management, and that exponential backoff for retries can have
|
||||
# cost/stability implications for a service cluster, let's just treat a
|
||||
# send-side error as fatal.
|
||||
await self.push_error(ErrorFrame(error=f"{self} Send error: {error}", fatal=True))
|
||||
await self.push_error(ErrorFrame(error=f"{self} Send error: {error}"))
|
||||
|
||||
def create_context_aggregator(
|
||||
self,
|
||||
|
||||
@@ -110,7 +110,6 @@ class GoogleImageGenService(ImageGenService):
|
||||
await self.stop_ttfb_metrics()
|
||||
|
||||
if not response or not response.generated_images:
|
||||
logger.error(f"{self} error: image generation failed")
|
||||
yield ErrorFrame("Image generation failed")
|
||||
return
|
||||
|
||||
@@ -128,5 +127,4 @@ class GoogleImageGenService(ImageGenService):
|
||||
yield frame
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error generating image: {e}")
|
||||
yield ErrorFrame(f"Image generation error: {str(e)}")
|
||||
|
||||
@@ -793,7 +793,7 @@ class GoogleLLMService(LLMService):
|
||||
return
|
||||
generation_params.setdefault("thinking_config", {})["thinking_budget"] = 0
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to unset thinking budget: {e}")
|
||||
logger.error(f"Failed to unset thinking budget: {e}")
|
||||
|
||||
async def _stream_content(
|
||||
self, params_from_context: GeminiLLMInvocationParams
|
||||
@@ -983,7 +983,7 @@ class GoogleLLMService(LLMService):
|
||||
except DeadlineExceeded:
|
||||
await self._call_event_handler("on_completion_timeout")
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
await self.push_error(exception=e)
|
||||
finally:
|
||||
if grounding_metadata and isinstance(grounding_metadata, dict):
|
||||
llm_search_frame = LLMSearchResponseFrame(
|
||||
|
||||
@@ -774,8 +774,7 @@ class GoogleSTTService(STTService):
|
||||
yield cloud_speech.StreamingRecognizeRequest(audio=audio_data)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
raise
|
||||
|
||||
async def _stream_audio(self):
|
||||
@@ -806,15 +805,13 @@ class GoogleSTTService(STTService):
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
|
||||
await asyncio.sleep(1) # Brief delay before reconnecting
|
||||
self._stream_start_time = int(time.time() * 1000)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
|
||||
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
|
||||
"""Process an audio chunk for STT transcription.
|
||||
@@ -902,8 +899,7 @@ class GoogleSTTService(STTService):
|
||||
)
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
# Re-raise the exception to let it propagate (e.g. in the case of a
|
||||
# timeout, propagate to _stream_audio to reconnect)
|
||||
raise
|
||||
|
||||
@@ -737,7 +737,6 @@ class GoogleHttpTTSService(TTSService):
|
||||
yield TTSStoppedFrame()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
error_message = f"TTS generation error: {str(e)}"
|
||||
yield ErrorFrame(error=error_message)
|
||||
|
||||
@@ -996,9 +995,7 @@ class GoogleTTSService(GoogleBaseTTSService):
|
||||
yield frame
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
error_message = f"TTS generation error: {str(e)}"
|
||||
yield ErrorFrame(error=error_message)
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
|
||||
|
||||
class GeminiTTSService(GoogleBaseTTSService):
|
||||
@@ -1248,6 +1245,5 @@ class GeminiTTSService(GoogleBaseTTSService):
|
||||
yield frame
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
error_message = f"Gemini TTS generation error: {str(e)}"
|
||||
yield ErrorFrame(error=error_message)
|
||||
|
||||
@@ -146,7 +146,6 @@ class GroqTTSService(TTSService):
|
||||
bytes = w.readframes(num_frames)
|
||||
yield TTSAudioRawFrame(bytes, frame_rate, channels)
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
|
||||
yield TTSStoppedFrame()
|
||||
|
||||
@@ -179,7 +179,7 @@ class HeyGenClient:
|
||||
await self._task_manager.cancel_task(self._event_task)
|
||||
self._event_task = None
|
||||
except Exception as e:
|
||||
logger.exception(f"Exception during cleanup: {e}")
|
||||
logger.error(f"Exception during cleanup: {e}")
|
||||
|
||||
async def start(self, frame: StartFrame, audio_chunk_size: int) -> None:
|
||||
"""Start the client and establish all necessary connections.
|
||||
|
||||
@@ -287,8 +287,7 @@ class HumeTTSService(WordTTSService):
|
||||
self._cumulative_time = utterance_duration
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
finally:
|
||||
# Ensure TTFB timer is stopped even on early failures
|
||||
await self.stop_ttfb_metrics()
|
||||
|
||||
@@ -146,8 +146,6 @@ class InworldTTSService(TTSService):
|
||||
Parameters:
|
||||
temperature: Voice temperature control for synthesis variability (e.g., 1.1).
|
||||
Valid range: [0, 2]. Higher values increase variability.
|
||||
speaking_rate: Speaking speed control (range: [0.5, 1.5]). Defaults to 1.0 when
|
||||
unset.
|
||||
|
||||
Note:
|
||||
Language is automatically inferred from the input text by Inworld's TTS models,
|
||||
@@ -155,7 +153,6 @@ class InworldTTSService(TTSService):
|
||||
"""
|
||||
|
||||
temperature: Optional[float] = None # optional temperature control (range: [0, 2])
|
||||
speaking_rate: Optional[float] = None # optional speaking rate control (range: [0.5, 1.5])
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -201,7 +198,6 @@ class InworldTTSService(TTSService):
|
||||
- Other formats as supported by Inworld API
|
||||
params: Optional input parameters for additional configuration. Use this to specify:
|
||||
- temperature: Voice temperature control for variability (range: [0, 2], e.g., 1.1, optional)
|
||||
- speaking_rate: Set desired speaking speed (range: [0.5, 1.5], optional)
|
||||
Language is automatically inferred from input text.
|
||||
**kwargs: Additional arguments passed to the parent TTSService class.
|
||||
|
||||
@@ -232,18 +228,15 @@ class InworldTTSService(TTSService):
|
||||
self._settings = {
|
||||
"voiceId": voice_id, # Voice selection from direct parameter
|
||||
"modelId": model, # TTS model selection from direct parameter
|
||||
"audioConfig": { # Audio format configuration
|
||||
"audioEncoding": encoding, # Format: LINEAR16, MP3, etc.
|
||||
"sampleRateHertz": 0, # Will be set in start() from parent service
|
||||
"audio_config": { # Audio format configuration
|
||||
"audio_encoding": encoding, # Format: LINEAR16, MP3, etc.
|
||||
"sample_rate_hertz": 0, # Will be set in start() from parent service
|
||||
},
|
||||
}
|
||||
|
||||
# Add optional temperature parameter if provided (valid range: [0, 2])
|
||||
if params and params.temperature is not None:
|
||||
self._settings["temperature"] = params.temperature
|
||||
# Add optional speaking rate if provided (valid range: [0.5, 1.5])
|
||||
if params and params.speaking_rate is not None:
|
||||
self._settings["audioConfig"]["speakingRate"] = params.speaking_rate
|
||||
|
||||
# Register voice and model with parent service for metrics and tracking
|
||||
self.set_voice(voice_id) # Used for logging and metrics
|
||||
@@ -264,7 +257,7 @@ class InworldTTSService(TTSService):
|
||||
frame: The start frame containing initialization parameters.
|
||||
"""
|
||||
await super().start(frame)
|
||||
self._settings["audioConfig"]["sampleRateHertz"] = self.sample_rate
|
||||
self._settings["audio_config"]["sample_rate_hertz"] = self.sample_rate
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
"""Stop the Inworld TTS service.
|
||||
@@ -330,7 +323,9 @@ class InworldTTSService(TTSService):
|
||||
"text": text, # Text to synthesize
|
||||
"voiceId": self._settings["voiceId"], # Voice selection (Ashley, Hades, etc.)
|
||||
"modelId": self._settings["modelId"], # TTS model (inworld-tts-1)
|
||||
"audioConfig": self._settings["audioConfig"], # Audio format settings (LINEAR16, 48kHz)
|
||||
"audio_config": self._settings[
|
||||
"audio_config"
|
||||
], # Audio format settings (LINEAR16, 48kHz)
|
||||
}
|
||||
|
||||
# Add optional temperature parameter if configured (valid range: [0, 2])
|
||||
@@ -397,8 +392,7 @@ class InworldTTSService(TTSService):
|
||||
# STEP 7: ERROR HANDLING
|
||||
# ================================================================================
|
||||
# Log any unexpected errors and notify the pipeline
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
finally:
|
||||
# ================================================================================
|
||||
# STEP 8: CLEANUP AND COMPLETION
|
||||
|
||||
@@ -214,8 +214,7 @@ class LmntTTSService(InterruptibleTTSService):
|
||||
|
||||
await self._call_event_handler("on_connected")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
self._websocket = None
|
||||
await self._call_event_handler("on_connection_error", f"{e}")
|
||||
|
||||
@@ -231,8 +230,7 @@ class LmntTTSService(InterruptibleTTSService):
|
||||
# await self._websocket.send(json.dumps({"eof": True}))
|
||||
await self._websocket.close()
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(error_msg=f"Error disconnecting from LMNT: {e}", exception=e)
|
||||
finally:
|
||||
self._started = False
|
||||
self._websocket = None
|
||||
@@ -266,10 +264,9 @@ class LmntTTSService(InterruptibleTTSService):
|
||||
try:
|
||||
msg = json.loads(message)
|
||||
if "error" in msg:
|
||||
logger.error(f"{self} error: {msg['error']}")
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
await self.stop_all_metrics()
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {msg['error']}"))
|
||||
await self.push_error(error_msg=f"{self} error: {msg['error']}")
|
||||
return
|
||||
except json.JSONDecodeError:
|
||||
logger.error(f"Invalid JSON message: {message}")
|
||||
@@ -302,7 +299,6 @@ class LmntTTSService(InterruptibleTTSService):
|
||||
await self._get_websocket().send(json.dumps({"flush": True}))
|
||||
await self.start_tts_usage_metrics(text)
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
yield TTSStoppedFrame()
|
||||
await self._disconnect()
|
||||
@@ -310,5 +306,4 @@ class LmntTTSService(InterruptibleTTSService):
|
||||
return
|
||||
yield None
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
|
||||
@@ -176,7 +176,6 @@ class MCPClient(BaseObject):
|
||||
except Exception as e:
|
||||
error_msg = f"Error calling mcp tool {params.function_name}: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
logger.exception("Full exception details:")
|
||||
await params.result_callback(error_msg)
|
||||
|
||||
async def _stdio_list_tools(self) -> ToolsSchema:
|
||||
@@ -207,7 +206,6 @@ class MCPClient(BaseObject):
|
||||
except Exception as e:
|
||||
error_msg = f"Error calling mcp tool {params.function_name}: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
logger.exception("Full exception details:")
|
||||
await params.result_callback(error_msg)
|
||||
|
||||
async def _streamable_http_list_tools(self) -> ToolsSchema:
|
||||
@@ -246,7 +244,6 @@ class MCPClient(BaseObject):
|
||||
except Exception as e:
|
||||
error_msg = f"Error calling mcp tool {params.function_name}: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
logger.exception("Full exception details:")
|
||||
await params.result_callback(error_msg)
|
||||
|
||||
async def _call_tool(self, session, function_name, arguments, result_callback):
|
||||
@@ -302,7 +299,6 @@ class MCPClient(BaseObject):
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to read tool '{tool_name}': {str(e)}")
|
||||
logger.exception("Full exception details:")
|
||||
continue
|
||||
|
||||
logger.debug(f"Completed reading {len(tool_schemas)} tools")
|
||||
|
||||
@@ -253,8 +253,7 @@ class Mem0MemoryService(FrameProcessor):
|
||||
# Otherwise, pass the enhanced context frame downstream
|
||||
await self.push_frame(frame)
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing with Mem0: {str(e)}")
|
||||
await self.push_frame(ErrorFrame(f"Error processing with Mem0: {str(e)}"))
|
||||
await self.push_error(exception=e)
|
||||
await self.push_frame(frame) # Still pass the original frame through
|
||||
else:
|
||||
# For non-context frames, just pass them through
|
||||
|
||||
@@ -264,7 +264,6 @@ class MiniMaxHttpTTSService(TTSService):
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
error_message = f"MiniMax TTS error: HTTP {response.status}"
|
||||
logger.error(error_message)
|
||||
yield ErrorFrame(error=error_message)
|
||||
return
|
||||
|
||||
@@ -338,7 +337,6 @@ class MiniMaxHttpTTSService(TTSService):
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
finally:
|
||||
await self.stop_ttfb_metrics()
|
||||
|
||||
@@ -110,7 +110,6 @@ class MoondreamService(VisionService):
|
||||
if analysis fails.
|
||||
"""
|
||||
if not self._model:
|
||||
logger.error(f"{self} error: Moondream model not available ({self.model_name})")
|
||||
yield ErrorFrame("Moondream model not available")
|
||||
return
|
||||
|
||||
|
||||
@@ -285,8 +285,7 @@ class NeuphonicTTSService(InterruptibleTTSService):
|
||||
|
||||
await self._call_event_handler("on_connected")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
self._websocket = None
|
||||
await self._call_event_handler("on_connection_error", f"{e}")
|
||||
|
||||
@@ -299,8 +298,7 @@ class NeuphonicTTSService(InterruptibleTTSService):
|
||||
logger.debug("Disconnecting from Neuphonic")
|
||||
await self._websocket.close()
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
finally:
|
||||
self._started = False
|
||||
self._websocket = None
|
||||
@@ -365,7 +363,6 @@ class NeuphonicTTSService(InterruptibleTTSService):
|
||||
await self._send_text(text)
|
||||
await self.start_tts_usage_metrics(text)
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
yield TTSStoppedFrame()
|
||||
await self._disconnect()
|
||||
@@ -373,7 +370,6 @@ class NeuphonicTTSService(InterruptibleTTSService):
|
||||
return
|
||||
yield None
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
|
||||
|
||||
@@ -539,7 +535,6 @@ class NeuphonicHttpTTSService(TTSService):
|
||||
error_text = await response.text()
|
||||
error_message = f"Neuphonic API error: HTTP {response.status} - {error_text}"
|
||||
logger.error(error_message)
|
||||
yield ErrorFrame(error=error_message)
|
||||
return
|
||||
|
||||
await self.start_tts_usage_metrics(text)
|
||||
@@ -568,7 +563,6 @@ class NeuphonicHttpTTSService(TTSService):
|
||||
yield TTSAudioRawFrame(audio_bytes, self.sample_rate, 1)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
# Don't yield error frame for individual message failures
|
||||
continue
|
||||
@@ -577,7 +571,6 @@ class NeuphonicHttpTTSService(TTSService):
|
||||
logger.debug("TTS generation cancelled")
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
finally:
|
||||
await self.stop_ttfb_metrics()
|
||||
|
||||
@@ -76,7 +76,6 @@ class OpenAIImageGenService(ImageGenService):
|
||||
image_url = image.data[0].url
|
||||
|
||||
if not image_url:
|
||||
logger.error(f"{self} No image provided in response: {image}")
|
||||
yield ErrorFrame("Image generation failed")
|
||||
return
|
||||
|
||||
|
||||
@@ -443,7 +443,7 @@ class OpenAIRealtimeLLMService(LLMService):
|
||||
)
|
||||
self._receive_task = self.create_task(self._receive_task_handler())
|
||||
except Exception as e:
|
||||
logger.error(f"{self} initialization error: {e}")
|
||||
await self.push_error(error_msg=f"Error connecting: {e}", exception=e)
|
||||
self._websocket = None
|
||||
|
||||
async def _disconnect(self):
|
||||
@@ -460,7 +460,7 @@ class OpenAIRealtimeLLMService(LLMService):
|
||||
self._completed_tool_calls = set()
|
||||
self._disconnecting = False
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error disconnecting: {e}")
|
||||
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
|
||||
|
||||
async def _ws_send(self, realtime_message):
|
||||
try:
|
||||
@@ -473,12 +473,11 @@ class OpenAIRealtimeLLMService(LLMService):
|
||||
# somehow *started* the websocket send attempt while we still
|
||||
# had a connection)
|
||||
return
|
||||
logger.error(f"Error sending message to websocket: {e}")
|
||||
# In server-to-server contexts, a WebSocket error should be quite rare. Given how hard
|
||||
# it is to recover from a send-side error with proper state management, and that exponential
|
||||
# backoff for retries can have cost/stability implications for a service cluster, let's just
|
||||
# treat a send-side error as fatal.
|
||||
await self.push_error(ErrorFrame(error=f"Error sending client event: {e}"))
|
||||
await self.push_error(error_msg=f"Error sending client event: {e}", exception=e)
|
||||
|
||||
async def _update_settings(self):
|
||||
settings = self._session_properties
|
||||
@@ -759,7 +758,7 @@ class OpenAIRealtimeLLMService(LLMService):
|
||||
|
||||
async def _handle_evt_error(self, evt):
|
||||
# Errors are fatal to this connection. Send an ErrorFrame.
|
||||
await self.push_error(ErrorFrame(error=f"Error: {evt}"))
|
||||
await self.push_error(error_msg=f"Error: {evt}")
|
||||
|
||||
#
|
||||
# state and client events for the current conversation
|
||||
|
||||
@@ -206,5 +206,4 @@ class OpenAITTSService(TTSService):
|
||||
yield frame
|
||||
yield TTSStoppedFrame()
|
||||
except BadRequestError as e:
|
||||
logger.exception(f"{self} error generating TTS: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
|
||||
@@ -79,5 +79,5 @@ class AzureRealtimeBetaLLMService(OpenAIRealtimeBetaLLMService):
|
||||
)
|
||||
self._receive_task = self.create_task(self._receive_task_handler())
|
||||
except Exception as e:
|
||||
logger.error(f"{self} initialization error: {e}")
|
||||
await self.push_error(error_msg=f"Error connecting: {e}", exception=e)
|
||||
self._websocket = None
|
||||
|
||||
@@ -424,7 +424,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
|
||||
)
|
||||
self._receive_task = self.create_task(self._receive_task_handler())
|
||||
except Exception as e:
|
||||
logger.error(f"{self} initialization error: {e}")
|
||||
await self.push_error(error_msg=f"Error connecting: {e}", exception=e)
|
||||
self._websocket = None
|
||||
|
||||
async def _disconnect(self):
|
||||
@@ -440,7 +440,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
|
||||
self._receive_task = None
|
||||
self._disconnecting = False
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error disconnecting: {e}")
|
||||
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
|
||||
|
||||
async def _ws_send(self, realtime_message):
|
||||
try:
|
||||
@@ -449,12 +449,11 @@ class OpenAIRealtimeBetaLLMService(LLMService):
|
||||
except Exception as e:
|
||||
if self._disconnecting:
|
||||
return
|
||||
logger.error(f"Error sending message to websocket: {e}")
|
||||
# In server-to-server contexts, a WebSocket error should be quite rare. Given how hard
|
||||
# it is to recover from a send-side error with proper state management, and that exponential
|
||||
# backoff for retries can have cost/stability implications for a service cluster, let's just
|
||||
# treat a send-side error as fatal.
|
||||
await self.push_error(ErrorFrame(error=f"Error sending client event: {e}"))
|
||||
await self.push_error(error_msg=f"Error sending client event: {e}", exception=e)
|
||||
|
||||
async def _update_settings(self):
|
||||
settings = self._session_properties
|
||||
@@ -685,7 +684,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
|
||||
|
||||
async def _handle_evt_error(self, evt):
|
||||
# Errors are fatal to this connection. Send an ErrorFrame.
|
||||
await self.push_error(ErrorFrame(error=f"Error: {evt}"))
|
||||
await self.push_error(error_msg=f"Error: {evt}")
|
||||
|
||||
async def _handle_assistant_output(self, output):
|
||||
# We haven't seen intermixed audio and function_call items in the same response. But let's
|
||||
|
||||
@@ -88,9 +88,6 @@ class PiperTTSService(TTSService):
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
error = await response.text()
|
||||
logger.error(
|
||||
f"{self} error getting audio (status: {response.status}, error: {error})"
|
||||
)
|
||||
yield ErrorFrame(
|
||||
error=f"Error getting audio (status: {response.status}, error: {error})"
|
||||
)
|
||||
|
||||
@@ -266,8 +266,7 @@ class PlayHTTTSService(InterruptibleTTSService):
|
||||
self._websocket = None
|
||||
await self._call_event_handler("on_connection_error", f"{e}")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(error_msg=f"Error connecting: {e}", exception=e)
|
||||
self._websocket = None
|
||||
await self._call_event_handler("on_connection_error", f"{e}")
|
||||
|
||||
@@ -280,8 +279,7 @@ class PlayHTTTSService(InterruptibleTTSService):
|
||||
logger.debug("Disconnecting from PlayHT")
|
||||
await self._websocket.close()
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
|
||||
finally:
|
||||
self._request_id = None
|
||||
self._websocket = None
|
||||
@@ -351,8 +349,7 @@ class PlayHTTTSService(InterruptibleTTSService):
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
self._request_id = None
|
||||
elif "error" in msg:
|
||||
logger.error(f"{self} error: {msg}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {msg['error']}"))
|
||||
await self.push_error(error_msg=f"{self} error: {msg['error']}")
|
||||
except json.JSONDecodeError:
|
||||
logger.error(f"Invalid JSON message: {message}")
|
||||
|
||||
@@ -394,7 +391,6 @@ class PlayHTTTSService(InterruptibleTTSService):
|
||||
await self._get_websocket().send(json.dumps(tts_command))
|
||||
await self.start_tts_usage_metrics(text)
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
yield TTSStoppedFrame()
|
||||
await self._disconnect()
|
||||
@@ -405,7 +401,6 @@ class PlayHTTTSService(InterruptibleTTSService):
|
||||
yield None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
|
||||
|
||||
@@ -626,7 +621,6 @@ class PlayHTHttpTTSService(TTSService):
|
||||
yield frame
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
finally:
|
||||
await self.stop_ttfb_metrics()
|
||||
|
||||
@@ -259,8 +259,7 @@ class RimeTTSService(AudioContextWordTTSService):
|
||||
|
||||
await self._call_event_handler("on_connected")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(error_msg=f"Error connecting: {e}", exception=e)
|
||||
self._websocket = None
|
||||
await self._call_event_handler("on_connection_error", f"{e}")
|
||||
|
||||
@@ -272,8 +271,7 @@ class RimeTTSService(AudioContextWordTTSService):
|
||||
await self._websocket.send(json.dumps(self._build_eos_msg()))
|
||||
await self._websocket.close()
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
|
||||
finally:
|
||||
self._context_id = None
|
||||
self._websocket = None
|
||||
@@ -366,10 +364,9 @@ class RimeTTSService(AudioContextWordTTSService):
|
||||
logger.debug(f"Updated cumulative time to: {self._cumulative_time}")
|
||||
|
||||
elif msg["type"] == "error":
|
||||
logger.error(f"{self} error: {msg}")
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
await self.stop_all_metrics()
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {msg['message']}"))
|
||||
await self.push_error(error_msg=f"{self} error: {msg['message']}")
|
||||
self._context_id = None
|
||||
|
||||
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
|
||||
@@ -411,7 +408,6 @@ class RimeTTSService(AudioContextWordTTSService):
|
||||
await self._get_websocket().send(json.dumps(msg))
|
||||
await self.start_tts_usage_metrics(text)
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
yield TTSStoppedFrame()
|
||||
await self._disconnect()
|
||||
@@ -419,7 +415,6 @@ class RimeTTSService(AudioContextWordTTSService):
|
||||
return
|
||||
yield None
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
|
||||
|
||||
@@ -551,7 +546,6 @@ class RimeHttpTTSService(TTSService):
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
error_message = f"Rime TTS error: HTTP {response.status}"
|
||||
logger.error(error_message)
|
||||
yield ErrorFrame(error=error_message)
|
||||
return
|
||||
|
||||
@@ -569,7 +563,6 @@ class RimeHttpTTSService(TTSService):
|
||||
yield frame
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
finally:
|
||||
await self.stop_ttfb_metrics()
|
||||
|
||||
@@ -655,11 +655,9 @@ class RivaSegmentedSTTService(SegmentedSTTService):
|
||||
logger.debug("No transcription results found in Riva response")
|
||||
|
||||
except AttributeError as ae:
|
||||
logger.error(f"Unexpected response structure from Riva: {ae}")
|
||||
yield ErrorFrame(f"Unexpected Riva response format: {str(ae)}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
|
||||
|
||||
|
||||
@@ -180,7 +180,6 @@ class RivaTTSService(TTSService):
|
||||
yield frame
|
||||
resp = await asyncio.wait_for(queue.get(), timeout=RIVA_TTS_TIMEOUT_SECS)
|
||||
except asyncio.TimeoutError:
|
||||
logger.error(f"{self} timeout waiting for audio response")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
|
||||
await self.start_tts_usage_metrics(text)
|
||||
|
||||
@@ -275,8 +275,7 @@ class SarvamSTTService(STTService):
|
||||
await self._socket_client.translate(**method_kwargs)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error sending audio to Sarvam: {e}")
|
||||
await self.push_error(ErrorFrame(f"Failed to send audio: {e}"))
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
|
||||
yield None
|
||||
|
||||
@@ -332,13 +331,11 @@ class SarvamSTTService(STTService):
|
||||
logger.info("Connected to Sarvam successfully")
|
||||
|
||||
except ApiError as e:
|
||||
logger.error(f"Sarvam API error: {e}")
|
||||
await self.push_error(ErrorFrame(f"Sarvam API error: {e}"))
|
||||
await self.push_error(error_msg=f"Sarvam API error: {e}", exception=e)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to connect to Sarvam: {e}")
|
||||
self._socket_client = None
|
||||
self._websocket_context = None
|
||||
await self.push_error(ErrorFrame(f"Failed to connect to Sarvam: {e}"))
|
||||
await self.push_error(error_msg=f"Failed to connect to Sarvam: {e}", exception=e)
|
||||
|
||||
async def _disconnect(self):
|
||||
"""Disconnect from Sarvam WebSocket API using SDK."""
|
||||
@@ -351,7 +348,9 @@ class SarvamSTTService(STTService):
|
||||
# Exit the async context manager
|
||||
await self._websocket_context.__aexit__(None, None, None)
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing WebSocket connection: {e}")
|
||||
await self.push_error(
|
||||
error_msg=f"Error closing WebSocket connection: {e}", exception=e
|
||||
)
|
||||
finally:
|
||||
logger.debug("Disconnected from Sarvam WebSocket")
|
||||
self._socket_client = None
|
||||
@@ -371,8 +370,7 @@ class SarvamSTTService(STTService):
|
||||
# Messages will be handled via the _message_handler callback
|
||||
await self._socket_client.start_listening()
|
||||
except Exception as e:
|
||||
logger.error(f"Error in Sarvam receive task: {e}")
|
||||
await self.push_error(ErrorFrame(f"Sarvam receive task error: {e}"))
|
||||
await self.push_error(error_msg=f"Sarvam receive task error: {e}", exception=e)
|
||||
|
||||
async def _handle_message(self, message):
|
||||
"""Handle incoming WebSocket message from Sarvam SDK.
|
||||
@@ -427,8 +425,7 @@ class SarvamSTTService(STTService):
|
||||
await self.stop_processing_metrics()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling Sarvam message: {e}")
|
||||
await self.push_error(ErrorFrame(f"Failed to handle message: {e}"))
|
||||
await self.push_error(error_msg=f"Failed to handle message: {e}", exception=e)
|
||||
await self.stop_all_metrics()
|
||||
|
||||
@traced_stt
|
||||
|
||||
@@ -254,8 +254,7 @@ class SarvamHttpTTSService(TTSService):
|
||||
async with self._session.post(url, json=payload, headers=headers) as response:
|
||||
if response.status != 200:
|
||||
error_text = await response.text()
|
||||
logger.error(f"Sarvam API error: {error_text}")
|
||||
await self.push_error(ErrorFrame(error=f"Sarvam API error: {error_text}"))
|
||||
yield ErrorFrame(error=f"Sarvam API error: {error_text}")
|
||||
return
|
||||
|
||||
response_data = await response.json()
|
||||
@@ -264,8 +263,7 @@ class SarvamHttpTTSService(TTSService):
|
||||
|
||||
# Decode base64 audio data
|
||||
if "audios" not in response_data or not response_data["audios"]:
|
||||
logger.error("No audio data received from Sarvam API")
|
||||
await self.push_error(ErrorFrame(error="No audio data received"))
|
||||
yield ErrorFrame(error="No audio data received")
|
||||
return
|
||||
|
||||
# Get the first audio (there should be only one for single text input)
|
||||
@@ -286,8 +284,7 @@ class SarvamHttpTTSService(TTSService):
|
||||
yield frame
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
finally:
|
||||
await self.stop_ttfb_metrics()
|
||||
yield TTSStoppedFrame()
|
||||
@@ -560,8 +557,7 @@ class SarvamTTSService(InterruptibleTTSService):
|
||||
await self._disconnect_websocket()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
finally:
|
||||
# Reset state only after everything is cleaned up
|
||||
self._started = False
|
||||
@@ -585,8 +581,9 @@ class SarvamTTSService(InterruptibleTTSService):
|
||||
|
||||
await self._call_event_handler("on_connected")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(
|
||||
error_msg=f"Error connecting to Sarvam TTS Websocket: {e}", exception=e
|
||||
)
|
||||
self._websocket = None
|
||||
await self._call_event_handler("on_connection_error", f"{e}")
|
||||
|
||||
@@ -602,8 +599,7 @@ class SarvamTTSService(InterruptibleTTSService):
|
||||
await self._websocket.send(json.dumps(config_message))
|
||||
logger.debug("Configuration sent successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
raise
|
||||
|
||||
async def _disconnect_websocket(self):
|
||||
@@ -615,8 +611,7 @@ class SarvamTTSService(InterruptibleTTSService):
|
||||
logger.debug("Disconnecting from Sarvam")
|
||||
await self._websocket.close()
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error closing websocket: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(error_msg=f"Error closing websocket: {e}", exception=e)
|
||||
finally:
|
||||
self._started = False
|
||||
self._websocket = None
|
||||
@@ -640,7 +635,7 @@ class SarvamTTSService(InterruptibleTTSService):
|
||||
await self.push_frame(frame)
|
||||
elif msg.get("type") == "error":
|
||||
error_msg = msg["data"]["message"]
|
||||
logger.error(f"TTS Error: {error_msg}")
|
||||
await self.push_error(error_msg=f"TTS Error: {error_msg}")
|
||||
|
||||
# If it's a timeout error, the connection might need to be reset
|
||||
if "too long" in error_msg.lower() or "timeout" in error_msg.lower():
|
||||
@@ -702,7 +697,6 @@ class SarvamTTSService(InterruptibleTTSService):
|
||||
await self._send_text(text)
|
||||
await self.start_tts_usage_metrics(text)
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
yield TTSStoppedFrame()
|
||||
await self._disconnect()
|
||||
@@ -710,5 +704,4 @@ class SarvamTTSService(InterruptibleTTSService):
|
||||
return
|
||||
yield None
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
|
||||
@@ -178,7 +178,7 @@ class SimliVideoService(FrameProcessor):
|
||||
self._audio_task = self.create_task(self._consume_and_process_audio())
|
||||
self._video_task = self.create_task(self._consume_and_process_video())
|
||||
except Exception as e:
|
||||
logger.error(f"{self}: unable to start connection: {e}")
|
||||
await self.push_error(error_msg=f"Unable to start connection: {e}", exception=e)
|
||||
|
||||
async def _consume_and_process_audio(self):
|
||||
"""Consume audio frames from Simli and push them downstream."""
|
||||
@@ -256,7 +256,7 @@ class SimliVideoService(FrameProcessor):
|
||||
await self._simli_client.send(audioBytes)
|
||||
return
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
await self.push_error(error_msg=f"Error sending audio: {e}", exception=e)
|
||||
elif isinstance(frame, TTSStoppedFrame):
|
||||
try:
|
||||
if self._previously_interrupted and len(self._audio_buffer) > 0:
|
||||
@@ -264,7 +264,7 @@ class SimliVideoService(FrameProcessor):
|
||||
self._previously_interrupted = False
|
||||
self._audio_buffer = bytearray()
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
await self.push_error(error_msg=f"Error stopping TTS: {e}", exception=e)
|
||||
return
|
||||
elif isinstance(frame, (EndFrame, CancelFrame)):
|
||||
await self._stop()
|
||||
|
||||
@@ -194,7 +194,7 @@ class SonioxSTTService(STTService):
|
||||
self._websocket = await websocket_connect(self._url)
|
||||
|
||||
if not self._websocket:
|
||||
logger.error(f"Unable to connect to Soniox API at {self._url}")
|
||||
await self.push_error(error_msg=f"Unable to connect to Soniox API at {self._url}")
|
||||
|
||||
# If vad_force_turn_endpoint is not enabled, we need to enable endpoint detection.
|
||||
# Either one or the other is required.
|
||||
@@ -327,8 +327,7 @@ class SonioxSTTService(STTService):
|
||||
# Expected when closing the connection
|
||||
logger.debug("WebSocket connection closed, keepalive task stopped.")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
|
||||
async def _receive_task_handler(self):
|
||||
if not self._websocket:
|
||||
@@ -404,13 +403,8 @@ class SonioxSTTService(STTService):
|
||||
if error_code or error_message:
|
||||
# In case of error, still send the final transcript (if any remaining in the buffer).
|
||||
await send_endpoint_transcript()
|
||||
logger.error(
|
||||
f"{self} error: {error_code} (_receive_task_handler) - {error_message}"
|
||||
)
|
||||
await self.push_error(
|
||||
ErrorFrame(
|
||||
error=f"{self} error: {error_code} (_receive_task_handler) - {error_message}"
|
||||
)
|
||||
error_msg=f"{self} error: {error_code} (_receive_task_handler) - {error_message}"
|
||||
)
|
||||
|
||||
finished = content.get("finished")
|
||||
@@ -425,5 +419,4 @@ class SonioxSTTService(STTService):
|
||||
# Expected when closing the connection.
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(error_msg=f"Error receiving message: {e}", exception=e)
|
||||
|
||||
@@ -467,7 +467,6 @@ class SpeechmaticsSTTService(STTService):
|
||||
await self._client.send_audio(audio)
|
||||
yield None
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
await self._disconnect()
|
||||
|
||||
@@ -514,8 +513,7 @@ class SpeechmaticsSTTService(STTService):
|
||||
self._client.send_message(payload), self.get_event_loop()
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
raise RuntimeError(f"error sending message to STT: {e}")
|
||||
|
||||
async def _connect(self) -> None:
|
||||
@@ -581,8 +579,7 @@ class SpeechmaticsSTTService(STTService):
|
||||
logger.debug(f"{self} Connected to Speechmatics STT service")
|
||||
await self._call_event_handler("on_connected")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(error_msg=f"Error connecting to Speechmatics: {e}", exception=e)
|
||||
self._client = None
|
||||
|
||||
async def _disconnect(self) -> None:
|
||||
@@ -596,8 +593,9 @@ class SpeechmaticsSTTService(STTService):
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(f"{self} Timeout while closing Speechmatics client connection")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(
|
||||
error_msg=f"Error disconnecting from Speechmatics: {e}", exception=e
|
||||
)
|
||||
finally:
|
||||
self._client = None
|
||||
await self._call_event_handler("on_disconnected")
|
||||
|
||||
@@ -174,16 +174,13 @@ class SpeechmaticsTTSService(TTSService):
|
||||
|
||||
except (ValueError, ArithmeticError):
|
||||
yield ErrorFrame(
|
||||
error=f"{self} Service unavailable [503] (attempts {attempt})",
|
||||
fatal=True,
|
||||
error=f"{self} Service unavailable [503] (attempts {attempt})"
|
||||
)
|
||||
return
|
||||
|
||||
# != 200 : Error
|
||||
if response.status != 200:
|
||||
yield ErrorFrame(
|
||||
error=f"{self} Service unavailable [{response.status}]", fatal=True
|
||||
)
|
||||
yield ErrorFrame(error=f"{self} Service unavailable [{response.status}]")
|
||||
return
|
||||
|
||||
# Update Pipecat metrics
|
||||
@@ -225,7 +222,7 @@ class SpeechmaticsTTSService(TTSService):
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
yield ErrorFrame(error=f"{self}: Error generating TTS: {e}", fatal=True)
|
||||
yield ErrorFrame(error=f"{self}: Error generating TTS: {e}")
|
||||
finally:
|
||||
# Emit the TTS stopped frame
|
||||
yield TTSStoppedFrame()
|
||||
|
||||
@@ -38,7 +38,7 @@ class STTService(AIService):
|
||||
|
||||
Event handlers:
|
||||
on_connected: Called when connected to the STT service.
|
||||
on_disconnected: Called when disconnected from the STT service.
|
||||
on_connected: Called when disconnected from the STT service.
|
||||
on_connection_error: Called when a connection to the STT service error occurs.
|
||||
|
||||
Example::
|
||||
@@ -329,4 +329,4 @@ class WebsocketSTTService(STTService, WebsocketService):
|
||||
|
||||
async def _report_error(self, error: ErrorFrame):
|
||||
await self._call_event_handler("on_connection_error", error.error)
|
||||
await self.push_error(error)
|
||||
await self.push_error_frame(error)
|
||||
|
||||
@@ -671,7 +671,7 @@ class WebsocketTTSService(TTSService, WebsocketService):
|
||||
|
||||
async def _report_error(self, error: ErrorFrame):
|
||||
await self._call_event_handler("on_connection_error", error.error)
|
||||
await self.push_error(error)
|
||||
await self.push_error_frame(error)
|
||||
|
||||
|
||||
class InterruptibleTTSService(WebsocketTTSService):
|
||||
@@ -733,7 +733,7 @@ class WebsocketWordTTSService(WordTTSService, WebsocketService):
|
||||
|
||||
async def _report_error(self, error: ErrorFrame):
|
||||
await self._call_event_handler("on_connection_error", error.error)
|
||||
await self.push_error(error)
|
||||
await self.push_error_frame(error)
|
||||
|
||||
|
||||
class InterruptibleWordTTSService(WebsocketWordTTSService):
|
||||
|
||||
@@ -246,8 +246,7 @@ class UltravoxSTTService(AIService):
|
||||
|
||||
logger.info("Model warm-up completed successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
await self.push_error(exception=e)
|
||||
|
||||
def _generate_silent_audio(self, sample_rate=16000, duration_sec=1.0):
|
||||
"""Generate silent audio as a numpy array.
|
||||
@@ -437,17 +436,11 @@ class UltravoxSTTService(AIService):
|
||||
yield LLMFullResponseEndFrame()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
else:
|
||||
logger.error("No model available for text generation")
|
||||
yield ErrorFrame("No model available for text generation")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
import traceback
|
||||
|
||||
logger.error(traceback.format_exc())
|
||||
yield ErrorFrame(f"Error processing audio: {str(e)}")
|
||||
finally:
|
||||
self._buffer.is_processing = False
|
||||
|
||||
@@ -226,7 +226,6 @@ class BaseWhisperSTTService(SegmentedSTTService):
|
||||
logger.warning("Received empty transcription from API")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
|
||||
async def _transcribe(self, audio: bytes) -> Transcription:
|
||||
|
||||
@@ -285,7 +285,6 @@ class WhisperSTTService(SegmentedSTTService):
|
||||
The service will normalize it to float32 in the range [-1, 1].
|
||||
"""
|
||||
if not self._model:
|
||||
logger.error(f"{self} error: Whisper model not available")
|
||||
yield ErrorFrame("Whisper model not available")
|
||||
return
|
||||
|
||||
@@ -428,5 +427,4 @@ class WhisperSTTServiceMLX(WhisperSTTService):
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
yield ErrorFrame(error=f"{self} error: {e}")
|
||||
|
||||
@@ -141,13 +141,8 @@ class XTTSService(TTSService):
|
||||
async with self._aiohttp_session.get(self._settings["base_url"] + "/studio_speakers") as r:
|
||||
if r.status != 200:
|
||||
text = await r.text()
|
||||
logger.error(
|
||||
f"{self} error getting studio speakers (status: {r.status}, error: {text})"
|
||||
)
|
||||
await self.push_error(
|
||||
ErrorFrame(
|
||||
error=f"Error getting studio speakers (status: {r.status}, error: {text})"
|
||||
)
|
||||
error_msg=f"{self} error getting studio speakers (status: {r.status}, error: {text})"
|
||||
)
|
||||
return
|
||||
self._studio_speakers = await r.json()
|
||||
@@ -186,7 +181,6 @@ class XTTSService(TTSService):
|
||||
async with self._aiohttp_session.post(url, json=payload) as r:
|
||||
if r.status != 200:
|
||||
text = await r.text()
|
||||
logger.error(f"{self} error getting audio (status: {r.status}, error: {text})")
|
||||
yield ErrorFrame(error=f"Error getting audio (status: {r.status}, error: {text})")
|
||||
return
|
||||
|
||||
|
||||
@@ -2506,13 +2506,10 @@ class DailyTransport(BaseTransport):
|
||||
async def _on_error(self, error):
|
||||
"""Handle error events and push error frames."""
|
||||
await self._call_event_handler("on_error", error)
|
||||
# Push error frame to notify the pipeline
|
||||
error_frame = ErrorFrame(error)
|
||||
|
||||
if self._input:
|
||||
await self._input.push_error(error_frame)
|
||||
await self._input.push_error(error_msg=error)
|
||||
elif self._output:
|
||||
await self._output.push_error(error_frame)
|
||||
await self._output.push_error(error_msg=error)
|
||||
else:
|
||||
logger.error("Both input and output are None while trying to push error")
|
||||
raise Exception("No valid input or output channel to push error")
|
||||
@@ -2568,7 +2565,7 @@ class DailyTransport(BaseTransport):
|
||||
except asyncio.TimeoutError:
|
||||
logger.error(f"Timeout handling dialin-ready event ({url})")
|
||||
except Exception as e:
|
||||
logger.exception(f"Error handling dialin-ready event ({url}): {e}")
|
||||
logger.error(f"Error handling dialin-ready event ({url}): {e}")
|
||||
|
||||
async def _on_dialin_connected(self, data):
|
||||
"""Handle dial-in connected events."""
|
||||
|
||||
@@ -316,7 +316,7 @@ class SmallWebRTCConnection(BaseObject):
|
||||
logger.debug("Client not connected. Queuing app-message.")
|
||||
self._pending_app_messages.append(json_message)
|
||||
except Exception as e:
|
||||
logger.exception(f"Error parsing JSON message {message}, {e}")
|
||||
logger.error(f"Error parsing JSON message {message}, {e}")
|
||||
|
||||
# Despite the fact that aiortc provides this listener, they don't have a status for "disconnected"
|
||||
# So, in case we loose connection, this event will not be triggered
|
||||
|
||||
@@ -265,7 +265,7 @@ class TavusTransportClient:
|
||||
try:
|
||||
await self._client.cleanup()
|
||||
except Exception as e:
|
||||
logger.exception(f"Exception during cleanup: {e}")
|
||||
logger.error(f"Exception during cleanup: {e}")
|
||||
|
||||
async def _on_joined(self, data):
|
||||
"""Handle joined event."""
|
||||
|
||||
@@ -162,7 +162,7 @@ class TaskManager(BaseTaskManager):
|
||||
# Re-raise the exception to ensure the task is cancelled.
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.exception(f"{name}: unexpected exception: {e}")
|
||||
logger.error(f"{name}: unexpected exception: {e}")
|
||||
|
||||
if not self._params:
|
||||
raise Exception("TaskManager is not setup: unable to get event loop")
|
||||
@@ -197,7 +197,7 @@ class TaskManager(BaseTaskManager):
|
||||
# Here are sure the task is cancelled properly.
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.exception(f"{name}: unexpected exception while cancelling task: {e}")
|
||||
logger.error(f"{name}: unexpected exception while cancelling task: {e}")
|
||||
except BaseException as e:
|
||||
logger.critical(f"{name}: fatal base exception while cancelling task: {e}")
|
||||
raise
|
||||
|
||||
@@ -187,7 +187,7 @@ class BaseObject(ABC):
|
||||
else:
|
||||
handler(self, *args, **kwargs)
|
||||
except Exception as e:
|
||||
logger.exception(f"Exception in event handler {event_name}: {e}")
|
||||
logger.error(f"Exception in event handler {event_name}: {e}")
|
||||
|
||||
def _event_task_finished(self, task: asyncio.Task):
|
||||
"""Clean up completed event handler tasks.
|
||||
|
||||
Reference in New Issue
Block a user