Add Ultravox service (#1)

Adds support for using Ultravox Realtime as a speech-to-speech service.

Also removes the deprecated Ultravox speech-to-text vllm model integration to avoid confusion.
This commit is contained in:
Mike Depinet
2025-12-12 10:16:15 -08:00
committed by GitHub
parent 645e1802f8
commit 4b81be7acf
16 changed files with 775 additions and 1548 deletions

View File

@@ -50,7 +50,6 @@ jobs:
run: |
uv sync --group dev --all-extras \
--no-extra krisp \
--no-extra ultravox \
--no-extra local-smart-turn \
--no-extra moondream \
--no-extra mlx-whisper

View File

@@ -11,7 +11,7 @@ build:
jobs:
post_install:
- pip install uv
- UV_PROJECT_ENVIRONMENT=$READTHEDOCS_VIRTUALENV_PATH uv sync --group docs --all-extras --no-extra krisp --no-extra gstreamer --no-extra ultravox --no-extra local_smart_turn --no-extra moondream --no-extra riva --no-extra mlx-whisper
- UV_PROJECT_ENVIRONMENT=$READTHEDOCS_VIRTUALENV_PATH uv sync --group docs --all-extras --no-extra krisp --no-extra gstreamer --no-extra local_smart_turn --no-extra moondream --no-extra riva --no-extra mlx-whisper
sphinx:
configuration: docs/api/conf.py

View File

@@ -73,10 +73,10 @@ Catch new features, interviews, and how-tos on our [Pipecat TV](https://www.yout
| Category | Services |
| ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/stt/elevenlabs), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Gradium](https://docs.pipecat.ai/server/services/stt/gradium), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [NVIDIA Riva](https://docs.pipecat.ai/server/services/stt/riva), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Sarvam](https://docs.pipecat.ai/server/services/stt/sarvam), [Soniox](https://docs.pipecat.ai/server/services/stt/soniox), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/stt/elevenlabs), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Gradium](https://docs.pipecat.ai/server/services/stt/gradium), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [NVIDIA Riva](https://docs.pipecat.ai/server/services/stt/riva), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Sarvam](https://docs.pipecat.ai/server/services/stt/sarvam), [Soniox](https://docs.pipecat.ai/server/services/stt/soniox), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [Mistral](https://docs.pipecat.ai/server/services/llm/mistral), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/server/services/llm/sambanova) [Together AI](https://docs.pipecat.ai/server/services/llm/together) |
| Text-to-Speech | [Async](https://docs.pipecat.ai/server/services/tts/asyncai), [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [Gradium](https://docs.pipecat.ai/server/services/tts/gradium), [Groq](https://docs.pipecat.ai/server/services/tts/groq), [Hume](https://docs.pipecat.ai/server/services/tts/hume), [Inworld](https://docs.pipecat.ai/server/services/tts/inworld), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [NVIDIA Riva](https://docs.pipecat.ai/server/services/tts/riva), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [Speechmatics](https://docs.pipecat.ai/server/services/tts/speechmatics), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) |
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) |
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai), Ultravox, |
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [SmallWebRTCTransport](https://docs.pipecat.ai/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local |
| Serializers | [Plivo](https://docs.pipecat.ai/server/utilities/serializers/plivo), [Twilio](https://docs.pipecat.ai/server/utilities/serializers/twilio), [Telnyx](https://docs.pipecat.ai/server/utilities/serializers/telnyx) |
| Video | [HeyGen](https://docs.pipecat.ai/server/services/video/heygen), [Tavus](https://docs.pipecat.ai/server/services/video/tavus), [Simli](https://docs.pipecat.ai/server/services/video/simli) |
@@ -153,7 +153,6 @@ You can get started with Pipecat running on your local machine, then move your a
--no-extra gstreamer \
--no-extra krisp \
--no-extra local \
--no-extra ultravox # (ultravox not fully supported on macOS)
```
3. Install the git pre-commit hooks:

4
changelog/XXX.added.md Normal file
View File

@@ -0,0 +1,4 @@
- [Ultravox Realtime](https://docs.ultravox.ai) is now a supported speech-to-speech
service.
- Added `UltravoxRealtimeLLMService` for the integration.
- Added `49-ultravox-realtime.py` example (with tool calling).

1
changelog/XXX.removed.md Normal file
View File

@@ -0,0 +1 @@
- Removed the deprecated VLLM-based open source Ultravox STT service.

View File

@@ -2,7 +2,7 @@
# Build docs using uv
echo "Installing dependencies with uv..."
uv sync --group docs --all-extras --no-extra krisp --no-extra gstreamer --no-extra ultravox --no-extra local_smart_turn --no-extra moondream --no-extra riva --no-extra mlx-whisper
uv sync --group docs --all-extras --no-extra krisp --no-extra gstreamer --no-extra local_smart_turn --no-extra moondream --no-extra riva --no-extra mlx-whisper
# Check if sphinx-build is available
if ! uv run sphinx-build --version &> /dev/null; then
@@ -24,4 +24,4 @@ if [ $? -eq 0 ]; then
else
echo "Documentation build failed!" >&2
exit 1
fi
fi

View File

@@ -61,9 +61,6 @@ autodoc_mock_imports = [
# OpenCV - sometimes has import issues during docs build
"cv2",
# Heavy ML packages excluded from ReadTheDocs
# ultravox dependencies
"vllm",
"vllm.engine.arg_utils",
# local-smart-turn dependencies
"coremltools",
"coremltools.models",

View File

@@ -190,6 +190,9 @@ TOGETHER_API_KEY=...
TWILIO_ACCOUNT_SID=...
TWILIO_AUTH_TOKEN=...
# Ultravox Realtime
ULTRAVOX_API_KEY=...
# WhatsApp
WHATSAPP_TOKEN=...
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN=...

View File

@@ -1,117 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.ultravox.stt import UltravoxSTTService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# NOTE: This example requires GPU resources to run efficiently.
# The Ultravox model is compute-intensive and performs best with GPU acceleration.
# This can be deployed on cloud GPU providers like Cerebrium.ai for optimal performance.
# Want to initialize the ultravox processor since it takes time to load the model and dont
# want to load it every time the pipeline is run
ultravox_processor = UltravoxSTTService(
model_name="fixie-ai/ultravox-v0_5-llama-3_1-8b",
hf_token=os.getenv("HF_TOKEN"),
)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
tts = CartesiaTTSService(
api_key=os.environ.get("CARTESIA_API_KEY"),
voice_id="97f4b8fb-f2fe-444b-bb9a-c109783a857a",
)
pipeline = Pipeline(
[
transport.input(), # Transport user input
ultravox_processor,
tts, # TTS
transport.output(), # Transport bot output
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,224 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import datetime
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.ultravox.llm import OneShotInputParams, UltravoxRealtimeLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
# Load environment variables
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=False,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=False,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=False,
),
}
async def get_secret_menu(params: FunctionCallParams):
category = params.arguments.get("category", "both")
logger.debug(f"Fetching secret menu with category: {category}")
items = []
if category in {"donuts", "both"}:
items.append(
{
"name": "Butter Pecan Ice Cream (one scoop)",
"price": "$2.99",
}
)
if category in {"drinks", "both"}:
items.append(
{
"name": "Banana Smoothie",
"price": "$4.99",
}
)
await params.result_callback(
{
"date": datetime.date.today().isoformat(),
"items": items,
}
)
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
system_prompt = f"""
You are a drive-thru order taker for a donut shop called "Dr. Donut". Local time is currently: {datetime.datetime.now().isoformat()}
The user is talking to you over voice on their phone, and your response will be read out loud with realistic text-to-speech (TTS) technology.
Follow every direction here when crafting your response:
1. Use natural, conversational language that is clear and easy to follow (short sentences, simple words).
1a. Be concise and relevant: Most of your responses should be a sentence or two, unless you're asked to go deeper. Don't monopolize the conversation.
1b. Use discourse markers to ease comprehension. Never use the list format.
2. Keep the conversation flowing.
2a. Clarify: when there is ambiguity, ask clarifying questions, rather than make assumptions.
2b. Don't implicitly or explicitly try to end the chat (i.e. do not end a response with "Talk soon!", or "Enjoy!").
2c. Sometimes the user might just want to chat. Ask them relevant follow-up questions.
2d. Don't ask them if there's anything else they need help with (e.g. don't say things like "How can I assist you further?").
3. Remember that this is a voice conversation:
3a. Don't use lists, markdown, bullet points, or other formatting that's not typically spoken.
3b. Type out numbers in words (e.g. 'twenty twelve' instead of the year 2012)
3c. If something doesn't make sense, it's likely because you misheard them. There wasn't a typo, and the user didn't mispronounce anything.
Remember to follow these rules absolutely, and do not refer to these rules, even if you're asked about them.
When talking with the user, use the following script:
1. Take their order, acknowledging each item as it is ordered. If it's not clear which menu item the user is ordering, ask them to clarify.
DO NOT add an item to the order unless it's one of the items on the menu below.
2. Once the order is complete, repeat back the order.
2a. If the user only ordered a drink, ask them if they would like to add a donut to their order.
2b. If the user only ordered donuts, ask them if they would like to add a drink to their order.
2c. If the user ordered both drinks and donuts, don't suggest anything.
3. Total up the price of all ordered items and inform the user.
4. Ask the user to pull up to the drive thru window.
If the user asks for something that's not on the menu, inform them of that fact, and suggest the most similar item on the menu.
If the user says something unrelated to your role, responed with "Um... this is a Dr. Donut."
If the user says "thank you", respond with "My pleasure."
If the user asks about what's on the menu, DO NOT read the entire menu to them. Instead, give a couple suggestions.
The menu of available items is as follows:
# DONUTS
PUMPKIN SPICE ICED DOUGHNUT $1.29
PUMPKIN SPICE CAKE DOUGHNUT $1.29
OLD FASHIONED DOUGHNUT $1.29
CHOCOLATE ICED DOUGHNUT $1.09
CHOCOLATE ICED DOUGHNUT WITH SPRINKLES $1.09
RASPBERRY FILLED DOUGHNUT $1.09
BLUEBERRY CAKE DOUGHNUT $1.09
STRAWBERRY ICED DOUGHNUT WITH SPRINKLES $1.09
LEMON FILLED DOUGHNUT $1.09
DOUGHNUT HOLES $3.99
# COFFEE & DRINKS
PUMPKIN SPICE COFFEE $2.59
PUMPKIN SPICE LATTE $4.59
REGULAR BREWED COFFEE $1.79
DECAF BREWED COFFEE $1.79
LATTE $3.49
CAPPUCINO $3.49
CARAMEL MACCHIATO $3.49
MOCHA LATTE $3.49
CARAMEL MOCHA LATTE $3.49
There is also a secret menu that changes daily. If the user asks about it, use the get_secret_menu tool to look up today's secret menu items.
"""
secret_menu_function = FunctionSchema(
name="get_secret_menu",
description="Get today's secret menu items",
properties={
"category": {
"type": "string",
"enum": ["donuts", "drinks", "both"],
"description": "The category of secret menu items to retrieve. Defaults to both.",
},
},
required=[],
)
llm = UltravoxRealtimeLLMService(
params=OneShotInputParams(
api_key=os.getenv("ULTRAVOX_API_KEY"),
system_prompt=system_prompt,
temperature=0.3,
max_duration=datetime.timedelta(minutes=3),
),
one_shot_selected_tools=ToolsSchema(standard_tools=[secret_menu_function]),
)
llm.register_function("get_secret_menu", get_secret_menu)
# Necessary to complete the function call lifecycle in Pipecat.
context_aggregator = LLMContextAggregatorPair(LLMContext([]))
# Build the pipeline
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
context_aggregator.assistant(),
transport.output(),
]
)
# Configure the pipeline task
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
# Handle client connection event
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Handle client disconnection events
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
# Run the pipeline
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -109,7 +109,7 @@ strands = [ "strands-agents>=1.9.1,<2" ]
tavus=[]
together = []
tracing = [ "opentelemetry-sdk>=1.33.0", "opentelemetry-api>=1.33.0", "opentelemetry-instrumentation>=0.54b0" ]
ultravox = [ "transformers>=4.48.0", "vllm>=0.9.0" ]
ultravox = [ "pipecat-ai[websockets-base]" ]
webrtc = [ "aiortc>=1.13.0,<2", "opencv-python>=4.11.0.86,<5" ]
websocket = [ "pipecat-ai[websockets-base]", "fastapi>=0.115.6,<0.122.0" ]
websockets-base = [ "websockets>=13.1,<16.0" ]

View File

@@ -116,8 +116,6 @@ TESTS_07 = [
# ("07i-interruptible-xtts.py", EVAL_SIMPLE_MATH),
# Needs a Krisp license.
# ("07p-interruptible-krisp.py", EVAL_SIMPLE_MATH),
# Needs GPU resources.
# ("07u-interruptible-ultravox.py", EVAL_SIMPLE_MATH),
]
TESTS_12 = [

View File

@@ -1,13 +1 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import sys
from pipecat.services import DeprecatedModuleProxy
from .stt import *
sys.modules[__name__] = DeprecatedModuleProxy(globals(), "ultravox", "ultravox.stt")
from .llm import UltravoxRealtimeLLMService

View File

@@ -0,0 +1,534 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Ultravox Realtime API service implementation.
This module provides real-time conversational AI capabilities using Ultravox's
Realtime API, supporting both text and audio modalities with
voice transcription, streaming responses, and tool usage.
"""
import asyncio
import datetime
import json
import uuid
from typing import Any, Dict, List, Literal, Optional, Union
import aiohttp
from loguru import logger
from openai.types import chat as openai_chat_types
from pydantic import BaseModel, Field
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.utils import create_stream_resampler
from pipecat.frames.frames import (
AggregationType,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
Frame,
InputAudioRawFrame,
InputTextRawFrame,
LLMContextFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMTextFrame,
LLMUpdateSettingsFrame,
StartFrame,
TranscriptionFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
TTSTextFrame,
)
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response import (
LLMAssistantAggregatorParams,
LLMUserAggregatorParams,
)
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
from pipecat.utils.time import time_now_iso8601
try:
from websockets.asyncio import client as websocket_client
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Ultravox, you need to `pip install pipecat-ai[ultravox]`.")
raise Exception(f"Missing module: {e}")
class AgentInputParams(BaseModel):
"""Input parameters for Ultravox Realtime generation using a pre-defined Agent.
Parameters:
api_key: Ultravox API key for authentication.
agent_id: The ID of the Ultravox Realtime agent you'd like to use. Agents
are pre-configured to handle calls consistently. You can create and edit
agents in the Ultravox console (https://app.ultravox.ai/agents) or using
the Ultravox API (https://docs.ultravox.ai/api-reference/agents/agents-post).
template_context: Context variables to use when instantiating a call with the
agent. Defaults to an empty dict.
metadata: Metadata to attach to the call. Default to an empty dict.
max_duration: The maximum duration of the call. Defaults to None, which will
use the agent's default maximum duration.
extra: Extra parameters to include in the agent call creation request. Defaults
to an empty dict. See the Ultravox API documentation for valid arguments:
https://docs.ultravox.ai/api-reference/agents/agents-calls-post
"""
api_key: str
agent_id: uuid.UUID
template_context: Dict[str, Any] = Field(default_factory=dict)
metadata: Dict[str, str] = Field(default_factory=dict)
max_duration: Optional[datetime.timedelta] = Field(
default=None, ge=datetime.timedelta(seconds=10), le=datetime.timedelta(hours=1)
)
extra: Dict[str, Any] = Field(default_factory=dict)
class OneShotInputParams(BaseModel):
"""Input parameters for Ultravox Realtime generation using a one-off call.
Parameters:
api_key: Ultravox API key for authentication.
system_prompt: System prompt to guide the model's behavior. Defaults to None.
temperature: Sampling temperature for response generation. Defaults to 0.
model: Model identifier to use. Defaults to "fixie-ai/ultravox".
voice: Voice identifier for speech generation. Defaults to None.
metadata: Metadata to attach to the call. Default to an empty dict.
max_duration: The maximum duration of the call. Defaults to one hour.
extra: Extra parameters to include in the call creation request. Defaults
to an empty dict. See the Ultravox API documentation for valid arguments:
https://docs.ultravox.ai/api-reference/calls/calls-post
"""
api_key: str
system_prompt: Optional[str] = None
temperature: float = Field(default=0.0, ge=0.0, le=1.0)
model: str = "fixie-ai/ultravox"
voice: Optional[uuid.UUID] = None
metadata: Dict[str, str] = Field(default_factory=dict)
max_duration: datetime.timedelta = Field(
default=datetime.timedelta(hours=1),
ge=datetime.timedelta(seconds=10),
le=datetime.timedelta(hours=1),
)
extra: Dict[str, Any] = Field(default_factory=dict)
class JoinUrlInputParams(BaseModel):
"""Input parameters for joining an existing Ultravox Realtime call via join URL.
Parameters:
join_url: The join URL for the existing Ultravox Realtime call.
"""
join_url: str
class UltravoxRealtimeLLMService(LLMService):
"""Provides access to the Ultravox Realtime API.
This service enables real-time conversations with Ultravox, supporting both
text and audio output. It handles voice transcription, streaming audio
responses, and tool usage.
Note: Ultravox is an audio-native model, so voice transcriptions are not used
by the model and may not always align with its understanding of user input.
"""
def __init__(
self,
*,
params: Union[AgentInputParams, OneShotInputParams, JoinUrlInputParams],
one_shot_selected_tools: Optional[ToolsSchema] = None,
**kwargs,
):
"""Initialize the Ultravox Realtime LLM service.
Args:
api_key: Ultravox API key for authentication.
params: Configuration parameters for the model.
one_shot_selected_tools: ToolsSchema for tools to use with this call.
May only be set with OneShotInputParams.
**kwargs: Additional arguments passed to parent LLMService.
"""
super().__init__(**kwargs)
self._params = params
if one_shot_selected_tools:
if not isinstance(self._params, OneShotInputParams):
logger.warning(
"one_shot_selected_tools may only be set when using OneShotInputParams; ignoring."
)
else:
self._selected_tools = one_shot_selected_tools
self._socket: Optional[websocket_client.ClientConnection] = None
self._receive_task: Optional[asyncio.Task] = None
self._disconnecting = False
self._bot_responding: Literal[None, "text", "voice"] = None
self._sample_rate = 48000
self._resampler = create_stream_resampler()
#
# standard AIService frame handling
#
async def start(self, frame: StartFrame):
"""Start the service and establish connection.
Args:
frame: The start frame.
"""
await super().start(frame)
match self._params:
case JoinUrlInputParams():
join_url = self._params.join_url
case AgentInputParams():
request_body = {
"templateContext": self._params.template_context,
"metadata": self._params.metadata,
"maxDuration": f"{self._params.max_duration.total_seconds():3f}s",
"medium": {
"serverWebSocket": {
"inputSampleRate": self._sample_rate,
}
},
} | self._params.extra
async with aiohttp.ClientSession() as session:
async with session.post(
f"https://api.ultravox.ai/api/agents/{self._params.agent_id}/calls",
headers={"X-Api-Key": self._params.api_key},
json=request_body,
) as response:
if response.status != 201:
error_text = await response.text()
raise Exception(f"Ultravox API error {response.status}: {error_text}")
join_url = (await response.json())["joinUrl"]
case OneShotInputParams():
request_body = {
"systemPrompt": self._params.system_prompt,
"temperature": self._params.temperature,
"model": self._params.model,
"voice": str(self._params.voice) if self._params.voice else None,
"metadata": self._params.metadata,
"maxDuration": f"{self._params.max_duration.total_seconds():3f}s",
"selectedTools": self._to_selected_tools(self._selected_tools)
if self._selected_tools
else [],
"medium": {
"serverWebSocket": {
"inputSampleRate": self._sample_rate,
}
},
} | self._params.extra
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.ultravox.ai/api/calls",
headers={"X-Api-Key": self._params.api_key},
json=request_body,
) as response:
if response.status != 201:
error_text = await response.text()
raise Exception(f"Ultravox API error {response.status}: {error_text}")
join_url = (await response.json())["joinUrl"]
logger.info(f"Joining Ultravox Realtime call via URL: {join_url}")
self._socket = await websocket_client.connect(join_url)
self._receive_task = self.create_task(self._receive_messages())
def _to_selected_tools(self, tool: ToolsSchema) -> List[Dict[str, Any]]:
result: List[Dict[str, Any]] = []
for standard_tool in tool.standard_tools:
result.append(
{
"temporaryTool": {
"modelToolName": standard_tool.name,
"description": standard_tool.description,
"dynamicParameters": [
{
"name": k,
"location": "PARAMETER_LOCATION_BODY",
"schema": v,
"required": k in standard_tool.required,
}
for k, v in standard_tool.properties.items()
],
"client": {},
}
}
)
return result
async def stop(self, frame: EndFrame):
"""Stop the service and close connections.
Args:
frame: The end frame.
"""
await super().stop(frame)
await self._disconnect()
async def cancel(self, frame: CancelFrame):
"""Cancel the service and close connections.
Args:
frame: The cancel frame.
"""
await super().cancel(frame)
await self._disconnect()
async def _disconnect(self):
self._disconnecting = True
if self._socket:
await self._socket.close()
self._socket = None
if self._receive_task:
await self.cancel_task(self._receive_task, timeout=1.0)
self._receive_task = None
#
# frame processing
# StartFrame, StopFrame, CancelFrame implemented in base class
#
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames for the Ultravox Realtime service.
Args:
frame: The frame to process.
direction: The frame processing direction.
"""
await super().process_frame(frame, direction)
if isinstance(frame, (LLMContextFrame, OpenAILLMContextFrame)):
context = (
frame.context
if isinstance(frame, LLMContextFrame)
else LLMContext.from_openai_context(frame.context)
)
await self._handle_context(context)
elif isinstance(frame, LLMUpdateSettingsFrame):
if "output_medium" in frame.settings:
await self._update_output_medium(frame.settings.get("output_medium"))
elif isinstance(frame, InputTextRawFrame):
await self._send_user_text(frame.text)
await self.push_frame(frame, direction)
elif isinstance(frame, InputAudioRawFrame):
await self._send_user_audio(frame)
await self.push_frame(frame, direction)
else:
await self.push_frame(frame, direction)
async def _handle_context(self, context: LLMContext):
# Ultravox handles all context server-side, so the only context we may
# need to handle here is new function call results.
for message in reversed(context.messages):
if message.get("role") != "tool":
break
content = message.get("content")
socket_message = {
"type": "client_tool_result",
"invocationId": message.get("tool_call_id"),
"result": content if isinstance(content, str) else "".join(t.text for t in content),
}
await self._send(socket_message)
async def _send_user_audio(self, frame: InputAudioRawFrame):
"""Send user audio frame to Ultravox Realtime."""
if not self._socket:
return
audio = frame.audio
if frame.sample_rate != self._sample_rate:
audio = await self._resampler.resample(audio, frame.sample_rate, self._sample_rate)
await self._send(audio)
async def _send_user_text(self, text: str):
"""Send user text via Ultravox Realtime.
Args:
text: The text to send as user input.
"""
if not self._socket:
return
await self._send({"type": "user_text_message", "text": text})
async def _update_output_medium(self, output_medium: str):
output_medium = output_medium.lower()
if output_medium == "audio":
output_medium = "voice"
if output_medium.lower() not in {"voice", "text"}:
logger.warning(f"Unsupported Ultravox output medium: {output_medium}")
return
await self._send({"type": "set_output_medium", "medium": output_medium})
async def _send(self, content: Union[bytes, Dict[str, Any]]):
"""Send content via the WebSocket connection.
Args:
content: The content to send, either as bytes or a JSON-serializable dict.
"""
if self._disconnecting or not self._socket:
return
try:
if isinstance(content, bytes):
await self._socket.send(content)
else:
await self._socket.send(json.dumps(content))
except Exception as e:
if self._disconnecting or not self._socket:
return
await self.push_error("Ultravox websocket send error", e, fatal=True)
#
# response handling
#
async def _receive_messages(self):
"""Receive messages from the Ultravox Realtime WebSocket."""
async for message in self._socket:
try:
if isinstance(message, bytes):
await self._handle_audio(message)
continue
data = json.loads(message)
match data.get("type"):
case "state":
if self._bot_responding and data.get("state") != "speaking":
await self._handle_response_end()
case "client_tool_invocation":
await self._handle_tool_invocation(
data.get("toolName"), data.get("invocationId"), data.get("parameters")
)
case "transcript":
match data.get("role"):
case "user":
if not data.get("final"):
logger.warning(
"Unexpected non-final user transcript from Ultravox Realtime; ignoring."
)
else:
await self._handle_user_transcript(data.get("text"))
case "agent":
await self._handle_agent_transcript(
data.get("medium"),
data.get("text"),
data.get("delta"),
data.get("final", False),
)
case _:
logger.debug(
f"Received transcript with unknown role from Ultravox Realtime: {data}"
)
case _:
logger.debug(f"Received unhandled Ultravox message: {data}")
except Exception as e:
if self._disconnecting or not self._socket:
return
await self.push_error("Ultravox websocket receive error", e, fatal=True)
async def _handle_audio(self, audio: bytes):
"""Handle incoming audio bytes from Ultravox Realtime."""
if not audio:
return
if not self._bot_responding:
await self.push_frame(BotStartedSpeakingFrame())
await self.push_frame(TTSStartedFrame())
await self.push_frame(LLMFullResponseStartFrame())
self._bot_responding = "voice"
await self.push_frame(TTSAudioRawFrame(audio, self._sample_rate, 1))
async def _handle_response_end(self):
if self._bot_responding == "voice":
await self.push_frame(BotStoppedSpeakingFrame())
await self.push_frame(TTSStoppedFrame())
await self.push_frame(LLMFullResponseEndFrame())
self._bot_responding = False
async def _handle_tool_invocation(
self, tool_name: str, invocation_id: str, parameters: Dict[str, Any]
):
await self.run_function_calls(
[
FunctionCallFromLLM(
function_name=tool_name,
tool_call_id=invocation_id,
arguments=parameters,
context=None,
)
]
)
async def _handle_user_transcript(self, text: str):
await self.push_frame(
TranscriptionFrame(
user_id="",
timestamp=time_now_iso8601(),
result=text,
text=text,
),
FrameDirection.UPSTREAM,
)
async def _handle_agent_transcript(
self, medium: str, text: Optional[str], delta: Optional[str], final: bool
):
frame = LLMTextFrame(text=text or delta)
frame.skip_tts = medium == "voice"
await self.push_frame(frame)
if medium == "text":
if text:
await self.push_frame(LLMFullResponseStartFrame())
await self.push_frame(TTSStartedFrame())
await self.push_frame(TTSTextFrame(text=text, aggregated_by=AggregationType.WORD))
self._bot_responding = "text"
elif final:
await self.push_frame(LLMFullResponseEndFrame())
self._bot_responding = False
else:
await self.push_frame(TTSTextFrame(text=delta, aggregated_by=AggregationType.WORD))
def create_context_aggregator(
self,
context: OpenAILLMContext,
*,
user_params: LLMUserAggregatorParams = LLMUserAggregatorParams(),
assistant_params: LLMAssistantAggregatorParams = LLMAssistantAggregatorParams(),
) -> LLMContextAggregatorPair:
"""Create an instance of LLMContextAggregatorPair from an OpenAILLMContext.
Constructor keyword arguments for both the user and assistant aggregators can be provided.
NOTE: this method exists only for backward compatibility. New code
should instead do::
context = LLMContext(...)
context_aggregator = LLMContextAggregatorPair(context)
Args:
context: The LLM context to use.
user_params: User aggregator parameters. Defaults to LLMUserAggregatorParams().
assistant_params: Assistant aggregator parameters. Defaults to LLMAssistantAggregatorParams().
Returns:
A pair of user and assistant context aggregators.
"""
context = LLMContext.from_openai_context(context)
assistant_params.expect_stripped_words = False
return LLMContextAggregatorPair(
context, user_params=user_params, assistant_params=assistant_params
)

View File

@@ -1,448 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""This module implements Ultravox speech-to-text with a locally-loaded model."""
import json
import os
import time
from typing import AsyncGenerator, List, Optional
import numpy as np
from huggingface_hub import login
from loguru import logger
from pipecat.frames.frames import (
AudioRawFrame,
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMTextFrame,
StartFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_service import AIService
try:
from transformers import AutoTokenizer
from vllm import AsyncLLMEngine, SamplingParams
from vllm.engine.arg_utils import AsyncEngineArgs
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Ultravox, you need to `pip install pipecat-ai[ultravox]`.")
raise Exception(f"Missing module: {e}")
class AudioBuffer:
"""Buffer to collect audio frames before processing.
Manages the collection and state of audio frames during speech
recording sessions, including timing and processing flags.
"""
def __init__(self):
"""Initialize the audio buffer."""
self.frames: List[AudioRawFrame] = []
self.started_at: Optional[float] = None
self.is_processing: bool = False
class UltravoxModel:
"""Model wrapper for the Ultravox multimodal model.
This class handles loading and running the Ultravox model for speech-to-text
transcription using vLLM for efficient inference.
"""
def __init__(self, model_name: str = "fixie-ai/ultravox-v0_5-llama-3_1-8b"):
"""Initialize the Ultravox model.
Args:
model_name: The name or path of the Ultravox model to load.
Defaults to "fixie-ai/ultravox-v0_5-llama-3_1-8b".
"""
self.model_name = model_name
self._initialize_engine()
self._initialize_tokenizer()
self.stop_token_ids = None
def _initialize_engine(self):
"""Initialize the vLLM engine for inference."""
engine_args = AsyncEngineArgs(
model=self.model_name,
gpu_memory_utilization=0.9,
max_model_len=8192,
trust_remote_code=True,
)
self.engine = AsyncLLMEngine.from_engine_args(engine_args)
def _initialize_tokenizer(self):
"""Initialize the tokenizer for the model."""
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
def format_prompt(self, messages: list):
"""Format chat messages into a prompt for the model.
Args:
messages: List of message dictionaries with 'role' and 'content'.
Returns:
str: Formatted prompt string ready for model input.
"""
return self.tokenizer.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
async def generate(
self,
messages: list,
temperature: float = 0.7,
max_tokens: int = 100,
audio: np.ndarray = None,
):
"""Generate text from audio input using the model.
Args:
messages: List of message dictionaries for conversation context.
temperature: Sampling temperature for generation randomness.
max_tokens: Maximum number of tokens to generate.
audio: Audio data as numpy array in float32 format.
Yields:
str: JSON chunks of the generated response in OpenAI format.
"""
sampling_params = SamplingParams(
temperature=temperature, max_tokens=max_tokens, stop_token_ids=self.stop_token_ids
)
mm_data = {"audio": audio}
inputs = {"prompt": self.format_prompt(messages), "multi_modal_data": mm_data}
results_generator = self.engine.generate(inputs, sampling_params, str(time.time()))
previous_text = ""
first_chunk = True
async for output in results_generator:
prompt_output = output.outputs
new_text = prompt_output[0].text[len(previous_text) :]
previous_text = prompt_output[0].text
# Construct OpenAI-compatible chunk
chunk = {
"id": str(int(time.time() * 1000)),
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": self.model_name,
"choices": [
{
"index": 0,
"delta": {},
"finish_reason": None,
}
],
}
# Include the role in the first chunk
if first_chunk:
chunk["choices"][0]["delta"]["role"] = "assistant"
first_chunk = False
# Add new text to the delta if any
if new_text:
chunk["choices"][0]["delta"]["content"] = new_text
# Capture a finish reason if it's provided
finish_reason = prompt_output[0].finish_reason or None
if finish_reason and finish_reason != "none":
chunk["choices"][0]["finish_reason"] = finish_reason
yield json.dumps(chunk)
class UltravoxSTTService(AIService):
"""Service to transcribe audio using the Ultravox multimodal model.
This service collects audio frames during speech and processes them with
Ultravox to generate text transcriptions. It handles real-time audio
buffering, model warm-up, and streaming text generation.
"""
def __init__(
self,
*,
model_name: str = "fixie-ai/ultravox-v0_5-llama-3_1-8b",
hf_token: Optional[str] = None,
temperature: float = 0.7,
max_tokens: int = 100,
**kwargs,
):
"""Initialize the UltravoxSTTService.
Args:
model_name: The Ultravox model to use. Defaults to
"fixie-ai/ultravox-v0_5-llama-3_1-8b".
hf_token: Hugging Face token for model access. If None, will try
to use HF_TOKEN environment variable.
temperature: Sampling temperature for generation. Defaults to 0.7.
max_tokens: Maximum tokens to generate. Defaults to 100.
**kwargs: Additional arguments passed to AIService.
"""
super().__init__(**kwargs)
# Authenticate with Hugging Face if token provided
if hf_token:
login(token=hf_token)
elif os.environ.get("HF_TOKEN"):
login(token=os.environ.get("HF_TOKEN"))
else:
logger.warning("No Hugging Face token provided. Model may not load correctly.")
# Initialize model
self._model = UltravoxModel(model_name=model_name)
# Initialize service state
self._buffer = AudioBuffer()
self._temperature = temperature
self._max_tokens = max_tokens
self._connection_active = False
self._warm_up_duration_sec = 1
logger.info(f"Initialized UltravoxSTTService with model: {model_name}")
async def _warm_up_model(self):
"""Warm up the model with silent audio to improve first inference performance.
This method generates a short segment of silent audio and runs it through
the model to ensure the model is fully loaded and optimized for the first
real inference request.
"""
logger.info("Warming up Ultravox model with silent audio...")
# Generate silent audio at 16kHz sample rate
sample_rate = 16000
silent_audio = self._generate_silent_audio(sample_rate, self._warm_up_duration_sec)
try:
# Process the silent audio with the model
messages = [{"role": "user", "content": "<|audio|>\n"}]
warmup_generator = self._model.generate(
messages=messages,
temperature=self._temperature,
max_tokens=self._max_tokens,
audio=silent_audio,
)
# Consume the generator to actually run the inference
async for _ in warmup_generator:
pass
logger.info("Model warm-up completed successfully")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
def _generate_silent_audio(self, sample_rate=16000, duration_sec=1.0):
"""Generate silent audio as a numpy array.
Args:
sample_rate: Sample rate in Hz
duration_sec: Duration of silence in seconds
Returns:
np.ndarray: Float32 array of zeros representing silent audio
"""
# Calculate number of samples
num_samples = int(sample_rate * duration_sec)
# Create silent audio as float32 in the [-1.0, 1.0] range
silent_audio = np.zeros(num_samples, dtype=np.float32)
logger.info(f"Generated {duration_sec}s of silent audio ({num_samples} samples)")
return silent_audio
def can_generate_metrics(self) -> bool:
"""Indicates whether this service can generate metrics.
Returns:
bool: True, as this service supports metric generation.
"""
return True
async def start(self, frame: StartFrame):
"""Handle service start.
Starts the service, marks it as active, and performs model warm-up
to ensure optimal performance for the first inference.
Args:
frame: StartFrame that triggered this method.
"""
await super().start(frame)
self._connection_active = True
await self._warm_up_model()
logger.info("UltravoxSTTService started")
async def stop(self, frame: EndFrame):
"""Handle service stop.
Stops the service and marks it as inactive.
Args:
frame: EndFrame that triggered this method.
"""
await super().stop(frame)
self._connection_active = False
logger.info("UltravoxSTTService stopped")
async def cancel(self, frame: CancelFrame):
"""Handle service cancellation.
Cancels the service, clears any buffered audio, and marks it as inactive.
Args:
frame: CancelFrame that triggered this method.
"""
await super().cancel(frame)
self._connection_active = False
self._buffer = AudioBuffer()
logger.info("UltravoxSTTService cancelled")
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames.
This method collects audio frames during speech and processes them
when speech ends to generate text transcriptions.
Args:
frame: The frame to process.
direction: Direction of the frame (input/output).
"""
await super().process_frame(frame, direction)
if isinstance(frame, UserStartedSpeakingFrame):
logger.info("Speech started")
self._buffer = AudioBuffer()
self._buffer.started_at = time.time()
elif isinstance(frame, AudioRawFrame) and self._buffer.started_at is not None:
self._buffer.frames.append(frame)
elif isinstance(frame, UserStoppedSpeakingFrame):
if self._buffer.frames and not self._buffer.is_processing:
logger.info("Speech ended, processing buffer...")
await self.process_generator(self._process_audio_buffer())
return # Return early to avoid pushing None frame
# Only push the original frame if we haven't processed audio
if frame is not None:
await self.push_frame(frame, direction)
async def _process_audio_buffer(self) -> AsyncGenerator[Frame, None]:
"""Process collected audio frames with Ultravox.
This method concatenates audio frames, processes them with the model,
and yields the resulting text frames.
Yields:
Frame: TextFrame containing the transcribed text
"""
try:
self._buffer.is_processing = True
# Check if we have valid frames before processing
if not self._buffer.frames:
logger.warning("No audio frames to process")
yield ErrorFrame("No audio frames to process")
return
# Process audio frames
audio_arrays = []
for f in self._buffer.frames:
if hasattr(f, "audio") and f.audio:
# Handle bytes data - these are int16 PCM samples
if isinstance(f.audio, bytes):
try:
# Convert bytes to int16 array
arr = np.frombuffer(f.audio, dtype=np.int16)
if arr.size > 0: # Check if array is not empty
audio_arrays.append(arr)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
# Handle numpy array data
elif isinstance(f.audio, np.ndarray):
if f.audio.size > 0: # Check if array is not empty
# Ensure it's int16 data
if f.audio.dtype != np.int16:
logger.info(f"Converting array from {f.audio.dtype} to int16")
audio_arrays.append(f.audio.astype(np.int16))
else:
audio_arrays.append(f.audio)
# Only proceed if we have valid audio arrays
if not audio_arrays:
logger.warning("No valid audio data found in frames")
yield ErrorFrame("No valid audio data found in frames")
return
# Concatenate audio frames - all should be int16 now
audio_data = np.concatenate(audio_arrays)
audio_int16 = audio_data # Already in int16 format
# Save int16 audio
# Convert int16 to float32 and normalize for model input
audio_float32 = audio_int16.astype(np.float32) / 32768.0
# Generate text using the model
if self._model:
try:
logger.info("Generating text from audio using model...")
# Start metrics tracking
await self.start_ttfb_metrics()
await self.start_processing_metrics()
yield LLMFullResponseStartFrame()
async for response in self._model.generate(
messages=[{"role": "user", "content": "<|audio|>\n"}],
temperature=self._temperature,
max_tokens=self._max_tokens,
audio=audio_float32,
):
# Stop TTFB metrics after first response
await self.stop_ttfb_metrics()
chunk = json.loads(response)
if "choices" in chunk and len(chunk["choices"]) > 0:
delta = chunk["choices"][0]["delta"]
if "content" in delta:
new_text = delta["content"]
if new_text:
yield LLMTextFrame(text=new_text)
# Stop processing metrics after completion
await self.stop_processing_metrics()
yield LLMFullResponseEndFrame()
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
else:
yield ErrorFrame("No model available for text generation")
except Exception as e:
yield ErrorFrame(f"Error processing audio: {str(e)}")
finally:
self._buffer.is_processing = False
self._buffer.frames = []
self._buffer.started_at = None

959
uv.lock generated

File diff suppressed because it is too large Load Diff