diff --git a/CHANGELOG.md b/CHANGELOG.md index 776041481..deda5de9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added new `UltravoxSTTService`. + (see https://github.com/fixie-ai/ultravox) + - Added `on_frame_reached_upstream` and `on_frame_reached_downstream` event handlers to `PipelineTask`. Those events will be called when a frame reaches the beginning or end of the pipeline respectively. diff --git a/README.md b/README.md index ec09c8f72..599412673 100644 --- a/README.md +++ b/README.md @@ -56,8 +56,8 @@ pip install "pipecat-ai[option,...]" ### Available services | Category | Services | Install Command Example | -| ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------------------- | -| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) | `pip install "pipecat-ai[deepgram]"` | +|---------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------| +| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) | `pip install "pipecat-ai[deepgram]"` | | LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Together AI](https://docs.pipecat.ai/server/services/llm/together) | `pip install "pipecat-ai[openai]"` | | Text-to-Speech | [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) | `pip install "pipecat-ai[cartesia]"` | | Speech-to-Speech | [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) | `pip install "pipecat-ai[google]"` | diff --git a/examples/foundational/07u-interruptible-ultravox.py b/examples/foundational/07u-interruptible-ultravox.py index 195eaf197..3ae4540f0 100644 --- a/examples/foundational/07u-interruptible-ultravox.py +++ b/examples/foundational/07u-interruptible-ultravox.py @@ -17,9 +17,9 @@ from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask -from pipecat.transports.services.daily import DailyParams, DailyTransport -from pipecat.services.ultravox import UltravoxSTTService from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.ultravox import UltravoxSTTService +from pipecat.transports.services.daily import DailyParams, DailyTransport load_dotenv(override=True) @@ -30,13 +30,14 @@ load_dotenv(override=True) logger.remove(0) logger.add(sys.stderr, level="DEBUG") -#Want to initialize the ultravox processor since it takes time to load the model and dont -#want to load it every time the pipeline is run +# Want to initialize the ultravox processor since it takes time to load the model and dont +# want to load it every time the pipeline is run ultravox_processor = UltravoxSTTService( model_size="fixie-ai/ultravox-v0_4_1-llama-3_1-8b", hf_token=os.getenv("HF_TOKEN"), ) + async def main(): async with aiohttp.ClientSession() as session: (room_url, token) = await configure(session) @@ -56,11 +57,9 @@ async def main(): tts = CartesiaTTSService( api_key=os.environ.get("CARTESIA_API_KEY"), - voice_id='97f4b8fb-f2fe-444b-bb9a-c109783a857a', - + voice_id="97f4b8fb-f2fe-444b-bb9a-c109783a857a", ) - pipeline = Pipeline( [ transport.input(), # Transport user input @@ -89,5 +88,3 @@ async def main(): if __name__ == "__main__": asyncio.run(main()) - - diff --git a/pyproject.toml b/pyproject.toml index dd58b1bfe..eab7b60f2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -78,6 +78,7 @@ silero = [ "onnxruntime~=1.20.1" ] simli = [ "simli-ai~=0.1.10"] soundfile = [ "soundfile~=0.13.0" ] together = [] +ultravox = [ "transformers~=4.48.0", "vllm~=0.7.3" ] websocket = [ "websockets~=13.1", "fastapi~=0.115.6" ] whisper = [ "faster-whisper~=1.1.1" ] openrouter = [] diff --git a/src/pipecat/services/ultravox.py b/src/pipecat/services/ultravox.py index f39dc348f..40029e673 100644 --- a/src/pipecat/services/ultravox.py +++ b/src/pipecat/services/ultravox.py @@ -1,132 +1,140 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + """This module implements Ultravox speech-to-text with a locally-loaded model.""" import json -import time import os +import time +from typing import AsyncGenerator, List, Optional + import numpy as np -from enum import Enum -from typing import AsyncGenerator, Optional, List -from loguru import logger -from pydantic import BaseModel from huggingface_hub import login +from loguru import logger from pipecat.frames.frames import ( - Frame, AudioRawFrame, - TranscriptionFrame, - TextFrame, - StartFrame, - EndFrame, CancelFrame, + EndFrame, + ErrorFrame, + Frame, + StartFrame, + TranscriptionFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame, - ErrorFrame ) -from pipecat.services.ai_services import AIService from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.ai_services import AIService from pipecat.utils.time import time_now_iso8601 try: - from vllm import SamplingParams, AsyncLLMEngine - from vllm.engine.arg_utils import AsyncEngineArgs from transformers import AutoTokenizer + from vllm import AsyncLLMEngine, SamplingParams + from vllm.engine.arg_utils import AsyncEngineArgs except ModuleNotFoundError as e: logger.error(f"Exception: {e}") logger.error("In order to use Ultravox, you need to `pip install pipecat-ai[ultravox]`.") raise Exception(f"Missing module: {e}") + class AudioBuffer: """Buffer to collect audio frames before processing. - + Attributes: frames: List of AudioRawFrames to process started_at: Timestamp when speech started is_processing: Flag to prevent concurrent processing """ + def __init__(self): self.frames: List[AudioRawFrame] = [] self.started_at: Optional[float] = None self.is_processing: bool = False + class UltravoxModel: """Model wrapper for the Ultravox multimodal model. - + This class handles loading and running the Ultravox model for speech-to-text. - + Args: model_name: The name or path of the Ultravox model to load - + Attributes: model_name: The name of the loaded model engine: The vLLM engine for model inference tokenizer: The tokenizer for the model stop_token_ids: Optional token IDs to stop generation """ + def __init__(self, model_name: str = "fixie-ai/ultravox-v0_4_1-llama-3_1-8b"): self.model_name = model_name self._initialize_engine() self._initialize_tokenizer() self.stop_token_ids = None - + def _initialize_engine(self): """Initialize the vLLM engine for inference.""" engine_args = AsyncEngineArgs( model=self.model_name, gpu_memory_utilization=0.9, max_model_len=8192, - trust_remote_code=True + trust_remote_code=True, ) self.engine = AsyncLLMEngine.from_engine_args(engine_args) - + def _initialize_tokenizer(self): """Initialize the tokenizer for the model.""" self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) - + def format_prompt(self, messages: list): """Format chat messages into a prompt for the model. - + Args: messages: List of message dictionaries with 'role' and 'content' - + Returns: str: Formatted prompt string """ return self.tokenizer.apply_chat_template( - messages, - tokenize=False, - add_generation_prompt=True + messages, tokenize=False, add_generation_prompt=True ) - async def generate(self, messages: list, temperature: float = 0.7, max_tokens: int = 100, audio: np.ndarray = None): + async def generate( + self, + messages: list, + temperature: float = 0.7, + max_tokens: int = 100, + audio: np.ndarray = None, + ): """Generate text from audio input using the model. - + Args: messages: List of message dictionaries temperature: Sampling temperature max_tokens: Maximum tokens to generate audio: Audio data as numpy array - + Yields: str: JSON chunks of the generated response """ sampling_params = SamplingParams( - temperature=temperature, - max_tokens=max_tokens, - stop_token_ids=self.stop_token_ids + temperature=temperature, max_tokens=max_tokens, stop_token_ids=self.stop_token_ids ) - - mm_data = { - "audio": audio - } + + mm_data = {"audio": audio} inputs = {"prompt": self.format_prompt(messages), "multi_modal_data": mm_data} results_generator = self.engine.generate(inputs, sampling_params, str(time.time())) - + previous_text = "" first_chunk = True async for output in results_generator: prompt_output = output.outputs - new_text = prompt_output[0].text[len(previous_text):] + new_text = prompt_output[0].text[len(previous_text) :] previous_text = prompt_output[0].text # Construct OpenAI-compatible chunk @@ -160,19 +168,20 @@ class UltravoxModel: yield json.dumps(chunk) + class UltravoxSTTService(AIService): """Service to transcribe audio using the Ultravox multimodal model. - + This service collects audio frames and processes them with Ultravox to generate text transcriptions. - + Args: model_size: The Ultravox model to use (ModelSize enum or string) hf_token: Hugging Face token for model access temperature: Sampling temperature for generation max_tokens: Maximum tokens to generate **kwargs: Additional arguments passed to AIService - + Attributes: model: The UltravoxModel instance buffer: Buffer to collect audio frames @@ -180,17 +189,18 @@ class UltravoxSTTService(AIService): max_tokens: Maximum tokens to generate _connection_active: Flag indicating if service is active """ + def __init__( self, *, - model_size: str = "fixie-ai/ultravox-v0_4_1-llama-3_1-8b", + model_size: str = "fixie-ai/ultravox-v0_4_1-llama-3_1-8b", hf_token: Optional[str] = None, temperature: float = 0.7, max_tokens: int = 100, **kwargs, ): super().__init__(**kwargs) - + # Authenticate with Hugging Face if token provided if hf_token: login(token=hf_token) @@ -198,22 +208,22 @@ class UltravoxSTTService(AIService): login(token=os.environ.get("HF_TOKEN")) else: logger.warning("No Hugging Face token provided. Model may not load correctly.") - + # Initialize model model_name = model_size if isinstance(model_size, str) else model_size.value - self.model = UltravoxModel(model_name=model_name) - + self._model = UltravoxModel(model_name=model_name) + # Initialize service state - self.buffer = AudioBuffer() - self.temperature = temperature - self.max_tokens = max_tokens + self._buffer = AudioBuffer() + self._temperature = temperature + self._max_tokens = max_tokens self._connection_active = False - + logger.info(f"Initialized UltravoxSTTService with model: {model_name}") - + def can_generate_metrics(self) -> bool: """Indicates whether this service can generate metrics. - + Returns: bool: True, as this service supports metric generation. """ @@ -221,7 +231,7 @@ class UltravoxSTTService(AIService): async def start(self, frame: StartFrame): """Handle service start. - + Args: frame: StartFrame that triggered this method """ @@ -231,7 +241,7 @@ class UltravoxSTTService(AIService): async def stop(self, frame: EndFrame): """Handle service stop. - + Args: frame: EndFrame that triggered this method """ @@ -241,20 +251,20 @@ class UltravoxSTTService(AIService): async def cancel(self, frame: CancelFrame): """Handle service cancellation. - + Args: frame: CancelFrame that triggered this method """ await super().cancel(frame) self._connection_active = False - self.buffer = AudioBuffer() + self._buffer = AudioBuffer() logger.info("UltravoxSTTService cancelled") async def process_frame(self, frame: Frame, direction: FrameDirection): """Process incoming frames. - + This method collects audio frames and processes them when speech ends. - + Args: frame: The frame to process direction: Direction of the frame (input/output) @@ -263,44 +273,44 @@ class UltravoxSTTService(AIService): if isinstance(frame, UserStartedSpeakingFrame): logger.info("Speech started") - self.buffer = AudioBuffer() - self.buffer.started_at = time.time() - - elif isinstance(frame, AudioRawFrame) and self.buffer.started_at is not None: - self.buffer.frames.append(frame) - + self._buffer = AudioBuffer() + self._buffer.started_at = time.time() + + elif isinstance(frame, AudioRawFrame) and self._buffer.started_at is not None: + self._buffer.frames.append(frame) + elif isinstance(frame, UserStoppedSpeakingFrame): - if self.buffer.frames and not self.buffer.is_processing: + if self._buffer.frames and not self._buffer.is_processing: logger.info("Speech ended, processing buffer...") await self.process_generator(self._process_audio_buffer()) return # Return early to avoid pushing None frame - + # Only push the original frame if we haven't processed audio if frame is not None: await self.push_frame(frame, direction) async def _process_audio_buffer(self) -> AsyncGenerator[Frame, None]: """Process collected audio frames with Ultravox. - + This method concatenates audio frames, processes them with the model, and yields the resulting text frames. - + Yields: Frame: TextFrame containing the transcribed text """ try: - self.buffer.is_processing = True - + self._buffer.is_processing = True + # Check if we have valid frames before processing - if not self.buffer.frames: + if not self._buffer.frames: logger.warning("No audio frames to process") yield ErrorFrame("No audio frames to process") return - + # Process audio frames audio_arrays = [] - for f in self.buffer.frames: - if hasattr(f, 'audio') and f.audio: + for f in self._buffer.frames: + if hasattr(f, "audio") and f.audio: # Handle bytes data - these are int16 PCM samples if isinstance(f.audio, bytes): try: @@ -319,75 +329,73 @@ class UltravoxSTTService(AIService): audio_arrays.append(f.audio.astype(np.int16)) else: audio_arrays.append(f.audio) - + # Only proceed if we have valid audio arrays if not audio_arrays: logger.warning("No valid audio data found in frames") yield ErrorFrame("No valid audio data found in frames") return - + # Concatenate audio frames - all should be int16 now audio_data = np.concatenate(audio_arrays) - + # Generate text using the model - if self.model: + if self._model: try: logger.info("Generating text from audio using model...") full_response = "" - + # Start metrics tracking await self.start_ttfb_metrics() await self.start_processing_metrics() - - async for response in self.model.generate( - messages=[{ - 'role': 'user', - 'content': "<|audio|>\n" - }], - temperature=self.temperature, - max_tokens=self.max_tokens, - audio=audio_data + + async for response in self._model.generate( + messages=[{"role": "user", "content": "<|audio|>\n"}], + temperature=self._temperature, + max_tokens=self._max_tokens, + audio=audio_data, ): # Stop TTFB metrics after first response await self.stop_ttfb_metrics() - + chunk = json.loads(response) if "choices" in chunk and len(chunk["choices"]) > 0: delta = chunk["choices"][0]["delta"] if "content" in delta: new_text = delta["content"] full_response += new_text - + # Stop processing metrics after completion await self.stop_processing_metrics() - + logger.info(f"Generated text: {full_response}") - + # Create a transcription frame with the generated text transcription = full_response.strip() if transcription: yield TranscriptionFrame( + user_id="", text=transcription, - interim_text="", - timestamp=time_now_iso8601() + timestamp=time_now_iso8601(), ) else: logger.warning("Empty transcription result") yield ErrorFrame("Empty transcription result") - + except Exception as e: logger.error(f"Error generating text from model: {e}") yield ErrorFrame(f"Error generating text: {str(e)}") else: logger.warning("No model available for text generation") yield ErrorFrame("No model available for text generation") - + except Exception as e: logger.error(f"Error processing audio buffer: {e}") import traceback + logger.error(traceback.format_exc()) yield ErrorFrame(f"Error processing audio: {str(e)}") finally: - self.buffer.is_processing = False - self.buffer.frames = [] - self.buffer.started_at = None + self._buffer.is_processing = False + self._buffer.frames = [] + self._buffer.started_at = None