Merge pull request #1370 from pipecat-ai/aleix/minor-ultravox-updates

services(ultravox): CHANGELOG, formatting and minor changes
This commit is contained in:
Aleix Conchillo Flaqué
2025-03-13 12:05:13 -07:00
committed by GitHub
5 changed files with 120 additions and 111 deletions

View File

@@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added new `UltravoxSTTService`.
(see https://github.com/fixie-ai/ultravox)
- Added `on_frame_reached_upstream` and `on_frame_reached_downstream` event
handlers to `PipelineTask`. Those events will be called when a frame reaches
the beginning or end of the pipeline respectively.

View File

@@ -56,8 +56,8 @@ pip install "pipecat-ai[option,...]"
### Available services
| Category | Services | Install Command Example |
| ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------------------- |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) | `pip install "pipecat-ai[deepgram]"` |
|---------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------|
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) | `pip install "pipecat-ai[deepgram]"` |
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Together AI](https://docs.pipecat.ai/server/services/llm/together) | `pip install "pipecat-ai[openai]"` |
| Text-to-Speech | [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) | `pip install "pipecat-ai[cartesia]"` |
| Speech-to-Speech | [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) | `pip install "pipecat-ai[google]"` |

View File

@@ -17,9 +17,9 @@ from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.services.ultravox import UltravoxSTTService
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.ultravox import UltravoxSTTService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
@@ -30,13 +30,14 @@ load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
#Want to initialize the ultravox processor since it takes time to load the model and dont
#want to load it every time the pipeline is run
# Want to initialize the ultravox processor since it takes time to load the model and dont
# want to load it every time the pipeline is run
ultravox_processor = UltravoxSTTService(
model_size="fixie-ai/ultravox-v0_4_1-llama-3_1-8b",
hf_token=os.getenv("HF_TOKEN"),
)
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
@@ -56,11 +57,9 @@ async def main():
tts = CartesiaTTSService(
api_key=os.environ.get("CARTESIA_API_KEY"),
voice_id='97f4b8fb-f2fe-444b-bb9a-c109783a857a',
voice_id="97f4b8fb-f2fe-444b-bb9a-c109783a857a",
)
pipeline = Pipeline(
[
transport.input(), # Transport user input
@@ -89,5 +88,3 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -78,6 +78,7 @@ silero = [ "onnxruntime~=1.20.1" ]
simli = [ "simli-ai~=0.1.10"]
soundfile = [ "soundfile~=0.13.0" ]
together = []
ultravox = [ "transformers~=4.48.0", "vllm~=0.7.3" ]
websocket = [ "websockets~=13.1", "fastapi~=0.115.6" ]
whisper = [ "faster-whisper~=1.1.1" ]
openrouter = []

View File

@@ -1,132 +1,140 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""This module implements Ultravox speech-to-text with a locally-loaded model."""
import json
import time
import os
import time
from typing import AsyncGenerator, List, Optional
import numpy as np
from enum import Enum
from typing import AsyncGenerator, Optional, List
from loguru import logger
from pydantic import BaseModel
from huggingface_hub import login
from loguru import logger
from pipecat.frames.frames import (
Frame,
AudioRawFrame,
TranscriptionFrame,
TextFrame,
StartFrame,
EndFrame,
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
StartFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
ErrorFrame
)
from pipecat.services.ai_services import AIService
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import AIService
from pipecat.utils.time import time_now_iso8601
try:
from vllm import SamplingParams, AsyncLLMEngine
from vllm.engine.arg_utils import AsyncEngineArgs
from transformers import AutoTokenizer
from vllm import AsyncLLMEngine, SamplingParams
from vllm.engine.arg_utils import AsyncEngineArgs
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Ultravox, you need to `pip install pipecat-ai[ultravox]`.")
raise Exception(f"Missing module: {e}")
class AudioBuffer:
"""Buffer to collect audio frames before processing.
Attributes:
frames: List of AudioRawFrames to process
started_at: Timestamp when speech started
is_processing: Flag to prevent concurrent processing
"""
def __init__(self):
self.frames: List[AudioRawFrame] = []
self.started_at: Optional[float] = None
self.is_processing: bool = False
class UltravoxModel:
"""Model wrapper for the Ultravox multimodal model.
This class handles loading and running the Ultravox model for speech-to-text.
Args:
model_name: The name or path of the Ultravox model to load
Attributes:
model_name: The name of the loaded model
engine: The vLLM engine for model inference
tokenizer: The tokenizer for the model
stop_token_ids: Optional token IDs to stop generation
"""
def __init__(self, model_name: str = "fixie-ai/ultravox-v0_4_1-llama-3_1-8b"):
self.model_name = model_name
self._initialize_engine()
self._initialize_tokenizer()
self.stop_token_ids = None
def _initialize_engine(self):
"""Initialize the vLLM engine for inference."""
engine_args = AsyncEngineArgs(
model=self.model_name,
gpu_memory_utilization=0.9,
max_model_len=8192,
trust_remote_code=True
trust_remote_code=True,
)
self.engine = AsyncLLMEngine.from_engine_args(engine_args)
def _initialize_tokenizer(self):
"""Initialize the tokenizer for the model."""
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
def format_prompt(self, messages: list):
"""Format chat messages into a prompt for the model.
Args:
messages: List of message dictionaries with 'role' and 'content'
Returns:
str: Formatted prompt string
"""
return self.tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
messages, tokenize=False, add_generation_prompt=True
)
async def generate(self, messages: list, temperature: float = 0.7, max_tokens: int = 100, audio: np.ndarray = None):
async def generate(
self,
messages: list,
temperature: float = 0.7,
max_tokens: int = 100,
audio: np.ndarray = None,
):
"""Generate text from audio input using the model.
Args:
messages: List of message dictionaries
temperature: Sampling temperature
max_tokens: Maximum tokens to generate
audio: Audio data as numpy array
Yields:
str: JSON chunks of the generated response
"""
sampling_params = SamplingParams(
temperature=temperature,
max_tokens=max_tokens,
stop_token_ids=self.stop_token_ids
temperature=temperature, max_tokens=max_tokens, stop_token_ids=self.stop_token_ids
)
mm_data = {
"audio": audio
}
mm_data = {"audio": audio}
inputs = {"prompt": self.format_prompt(messages), "multi_modal_data": mm_data}
results_generator = self.engine.generate(inputs, sampling_params, str(time.time()))
previous_text = ""
first_chunk = True
async for output in results_generator:
prompt_output = output.outputs
new_text = prompt_output[0].text[len(previous_text):]
new_text = prompt_output[0].text[len(previous_text) :]
previous_text = prompt_output[0].text
# Construct OpenAI-compatible chunk
@@ -160,19 +168,20 @@ class UltravoxModel:
yield json.dumps(chunk)
class UltravoxSTTService(AIService):
"""Service to transcribe audio using the Ultravox multimodal model.
This service collects audio frames and processes them with Ultravox
to generate text transcriptions.
Args:
model_size: The Ultravox model to use (ModelSize enum or string)
hf_token: Hugging Face token for model access
temperature: Sampling temperature for generation
max_tokens: Maximum tokens to generate
**kwargs: Additional arguments passed to AIService
Attributes:
model: The UltravoxModel instance
buffer: Buffer to collect audio frames
@@ -180,17 +189,18 @@ class UltravoxSTTService(AIService):
max_tokens: Maximum tokens to generate
_connection_active: Flag indicating if service is active
"""
def __init__(
self,
*,
model_size: str = "fixie-ai/ultravox-v0_4_1-llama-3_1-8b",
model_size: str = "fixie-ai/ultravox-v0_4_1-llama-3_1-8b",
hf_token: Optional[str] = None,
temperature: float = 0.7,
max_tokens: int = 100,
**kwargs,
):
super().__init__(**kwargs)
# Authenticate with Hugging Face if token provided
if hf_token:
login(token=hf_token)
@@ -198,22 +208,22 @@ class UltravoxSTTService(AIService):
login(token=os.environ.get("HF_TOKEN"))
else:
logger.warning("No Hugging Face token provided. Model may not load correctly.")
# Initialize model
model_name = model_size if isinstance(model_size, str) else model_size.value
self.model = UltravoxModel(model_name=model_name)
self._model = UltravoxModel(model_name=model_name)
# Initialize service state
self.buffer = AudioBuffer()
self.temperature = temperature
self.max_tokens = max_tokens
self._buffer = AudioBuffer()
self._temperature = temperature
self._max_tokens = max_tokens
self._connection_active = False
logger.info(f"Initialized UltravoxSTTService with model: {model_name}")
def can_generate_metrics(self) -> bool:
"""Indicates whether this service can generate metrics.
Returns:
bool: True, as this service supports metric generation.
"""
@@ -221,7 +231,7 @@ class UltravoxSTTService(AIService):
async def start(self, frame: StartFrame):
"""Handle service start.
Args:
frame: StartFrame that triggered this method
"""
@@ -231,7 +241,7 @@ class UltravoxSTTService(AIService):
async def stop(self, frame: EndFrame):
"""Handle service stop.
Args:
frame: EndFrame that triggered this method
"""
@@ -241,20 +251,20 @@ class UltravoxSTTService(AIService):
async def cancel(self, frame: CancelFrame):
"""Handle service cancellation.
Args:
frame: CancelFrame that triggered this method
"""
await super().cancel(frame)
self._connection_active = False
self.buffer = AudioBuffer()
self._buffer = AudioBuffer()
logger.info("UltravoxSTTService cancelled")
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames.
This method collects audio frames and processes them when speech ends.
Args:
frame: The frame to process
direction: Direction of the frame (input/output)
@@ -263,44 +273,44 @@ class UltravoxSTTService(AIService):
if isinstance(frame, UserStartedSpeakingFrame):
logger.info("Speech started")
self.buffer = AudioBuffer()
self.buffer.started_at = time.time()
elif isinstance(frame, AudioRawFrame) and self.buffer.started_at is not None:
self.buffer.frames.append(frame)
self._buffer = AudioBuffer()
self._buffer.started_at = time.time()
elif isinstance(frame, AudioRawFrame) and self._buffer.started_at is not None:
self._buffer.frames.append(frame)
elif isinstance(frame, UserStoppedSpeakingFrame):
if self.buffer.frames and not self.buffer.is_processing:
if self._buffer.frames and not self._buffer.is_processing:
logger.info("Speech ended, processing buffer...")
await self.process_generator(self._process_audio_buffer())
return # Return early to avoid pushing None frame
# Only push the original frame if we haven't processed audio
if frame is not None:
await self.push_frame(frame, direction)
async def _process_audio_buffer(self) -> AsyncGenerator[Frame, None]:
"""Process collected audio frames with Ultravox.
This method concatenates audio frames, processes them with the model,
and yields the resulting text frames.
Yields:
Frame: TextFrame containing the transcribed text
"""
try:
self.buffer.is_processing = True
self._buffer.is_processing = True
# Check if we have valid frames before processing
if not self.buffer.frames:
if not self._buffer.frames:
logger.warning("No audio frames to process")
yield ErrorFrame("No audio frames to process")
return
# Process audio frames
audio_arrays = []
for f in self.buffer.frames:
if hasattr(f, 'audio') and f.audio:
for f in self._buffer.frames:
if hasattr(f, "audio") and f.audio:
# Handle bytes data - these are int16 PCM samples
if isinstance(f.audio, bytes):
try:
@@ -319,75 +329,73 @@ class UltravoxSTTService(AIService):
audio_arrays.append(f.audio.astype(np.int16))
else:
audio_arrays.append(f.audio)
# Only proceed if we have valid audio arrays
if not audio_arrays:
logger.warning("No valid audio data found in frames")
yield ErrorFrame("No valid audio data found in frames")
return
# Concatenate audio frames - all should be int16 now
audio_data = np.concatenate(audio_arrays)
# Generate text using the model
if self.model:
if self._model:
try:
logger.info("Generating text from audio using model...")
full_response = ""
# Start metrics tracking
await self.start_ttfb_metrics()
await self.start_processing_metrics()
async for response in self.model.generate(
messages=[{
'role': 'user',
'content': "<|audio|>\n"
}],
temperature=self.temperature,
max_tokens=self.max_tokens,
audio=audio_data
async for response in self._model.generate(
messages=[{"role": "user", "content": "<|audio|>\n"}],
temperature=self._temperature,
max_tokens=self._max_tokens,
audio=audio_data,
):
# Stop TTFB metrics after first response
await self.stop_ttfb_metrics()
chunk = json.loads(response)
if "choices" in chunk and len(chunk["choices"]) > 0:
delta = chunk["choices"][0]["delta"]
if "content" in delta:
new_text = delta["content"]
full_response += new_text
# Stop processing metrics after completion
await self.stop_processing_metrics()
logger.info(f"Generated text: {full_response}")
# Create a transcription frame with the generated text
transcription = full_response.strip()
if transcription:
yield TranscriptionFrame(
user_id="",
text=transcription,
interim_text="",
timestamp=time_now_iso8601()
timestamp=time_now_iso8601(),
)
else:
logger.warning("Empty transcription result")
yield ErrorFrame("Empty transcription result")
except Exception as e:
logger.error(f"Error generating text from model: {e}")
yield ErrorFrame(f"Error generating text: {str(e)}")
else:
logger.warning("No model available for text generation")
yield ErrorFrame("No model available for text generation")
except Exception as e:
logger.error(f"Error processing audio buffer: {e}")
import traceback
logger.error(traceback.format_exc())
yield ErrorFrame(f"Error processing audio: {str(e)}")
finally:
self.buffer.is_processing = False
self.buffer.frames = []
self.buffer.started_at = None
self._buffer.is_processing = False
self._buffer.frames = []
self._buffer.started_at = None