From 907ff58d41ce77da6b6b9270fc2969f1d6106edb Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Mon, 23 Feb 2026 17:11:08 -0500 Subject: [PATCH] Align Ultravox Realtime service with OpenAI/Gemini patterns - Add InterruptionFrame handling with stop_all_metrics() - Add processing metrics (start/stop) at response boundaries - Fix agent transcript handling for voice and text modalities: - Voice mode: push LLMTextFrame (append_to_context=False) and TTSTextFrame for deltas, skip duplicated final text - Text mode: push LLMTextFrame with proper response lifecycle, no TTSTextFrame (downstream TTS handles audio) - Add output_medium parameter to AgentInputParams and OneShotInputParams - Improve TTFB measurement using VAD speech end time - Update example with user turn strategies and transcript events - Add text-only output example (50a-ultravox-realtime-text.py) --- changelog/3806.added.md | 1 + changelog/3806.changed.2.md | 1 + changelog/3806.changed.md | 1 + examples/foundational/50-ultravox-realtime.py | 42 ++- .../50a-ultravox-realtime-text.py | 263 ++++++++++++++++++ src/pipecat/services/ultravox/llm.py | 102 +++++-- 6 files changed, 384 insertions(+), 26 deletions(-) create mode 100644 changelog/3806.added.md create mode 100644 changelog/3806.changed.2.md create mode 100644 changelog/3806.changed.md create mode 100644 examples/foundational/50a-ultravox-realtime-text.py diff --git a/changelog/3806.added.md b/changelog/3806.added.md new file mode 100644 index 000000000..eeddc9825 --- /dev/null +++ b/changelog/3806.added.md @@ -0,0 +1 @@ +- Added `output_medium` parameter to `AgentInputParams` and `OneShotInputParams` in Ultravox service to control initial output medium (text or voice) at call creation time. diff --git a/changelog/3806.changed.2.md b/changelog/3806.changed.2.md new file mode 100644 index 000000000..9d6dfdf76 --- /dev/null +++ b/changelog/3806.changed.2.md @@ -0,0 +1 @@ +- Improved Ultravox TTFB measurement accuracy by using VAD speech end time instead of `UserStoppedSpeakingFrame` timing. diff --git a/changelog/3806.changed.md b/changelog/3806.changed.md new file mode 100644 index 000000000..c8e2fb68c --- /dev/null +++ b/changelog/3806.changed.md @@ -0,0 +1 @@ +- Aligned `UltravoxRealtimeLLMService` frame handling with OpenAI/Gemini realtime services: added `InterruptionFrame` handling with metrics cleanup, processing metrics at response boundaries, and improved agent transcript handling for both voice and text output modalities. diff --git a/examples/foundational/50-ultravox-realtime.py b/examples/foundational/50-ultravox-realtime.py index 5038cbb4c..0908c518c 100644 --- a/examples/foundational/50-ultravox-realtime.py +++ b/examples/foundational/50-ultravox-realtime.py @@ -12,11 +12,18 @@ from loguru import logger from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.audio.vad.vad_analyzer import VADParams from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext -from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair +from pipecat.processors.aggregators.llm_response_universal import ( + AssistantTurnStoppedMessage, + LLMContextAggregatorPair, + LLMUserAggregatorParams, + UserTurnStoppedMessage, +) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.llm_service import FunctionCallParams @@ -24,6 +31,8 @@ from pipecat.services.ultravox.llm import OneShotInputParams, UltravoxRealtimeLL from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams +from pipecat.turns.user_stop import SpeechTimeoutUserTurnStopStrategy +from pipecat.turns.user_turn_strategies import UserTurnStrategies # Load environment variables load_dotenv(override=True) @@ -168,8 +177,21 @@ There is also a secret menu that changes daily. If the user asks about it, use t llm.register_function("get_secret_menu", get_secret_menu) - # Necessary to complete the function call lifecycle in Pipecat. - user_aggregator, assistant_aggregator = LLMContextAggregatorPair(LLMContext([])) + context = LLMContext([]) + + # Necessary to complete the function call lifecycle in Pipecat and + # to produce user and assistant turn stopped events. + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + user_params=LLMUserAggregatorParams( + user_turn_strategies=UserTurnStrategies( + stop=[SpeechTimeoutUserTurnStopStrategy()], + ), + # Set the VAD analyzer to create reliable TTFB measurements and + # user stop events. + vad_analyzer=SileroVADAnalyzer(), + ), + ) # Build the pipeline pipeline = Pipeline( @@ -177,8 +199,8 @@ There is also a secret menu that changes daily. If the user asks about it, use t transport.input(), user_aggregator, llm, - assistant_aggregator, transport.output(), + assistant_aggregator, ] ) @@ -203,6 +225,18 @@ There is also a secret menu that changes daily. If the user asks about it, use t logger.info(f"Client disconnected") await task.cancel() + @user_aggregator.event_handler("on_user_turn_stopped") + async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage): + timestamp = f"[{message.timestamp}] " if message.timestamp else "" + line = f"{timestamp}user: {message.content}" + logger.info(f"Transcript: {line}") + + @assistant_aggregator.event_handler("on_assistant_turn_stopped") + async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage): + timestamp = f"[{message.timestamp}] " if message.timestamp else "" + line = f"{timestamp}assistant: {message.content}" + logger.info(f"Transcript: {line}") + # Run the pipeline runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) await runner.run(task) diff --git a/examples/foundational/50a-ultravox-realtime-text.py b/examples/foundational/50a-ultravox-realtime-text.py new file mode 100644 index 000000000..8b876048a --- /dev/null +++ b/examples/foundational/50a-ultravox-realtime-text.py @@ -0,0 +1,263 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import datetime +import os + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.adapters.schemas.function_schema import FunctionSchema +from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.audio.vad.vad_analyzer import VADParams +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import ( + AssistantTurnStoppedMessage, + LLMContextAggregatorPair, + LLMUserAggregatorParams, + UserTurnStoppedMessage, +) +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.inworld.tts import InworldTTSService +from pipecat.services.llm_service import FunctionCallParams +from pipecat.services.ultravox.llm import OneShotInputParams, UltravoxRealtimeLLMService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams +from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams +from pipecat.turns.user_stop import SpeechTimeoutUserTurnStopStrategy +from pipecat.turns.user_turn_strategies import UserTurnStrategies + +# Load environment variables +load_dotenv(override=True) + + +# We use lambdas to defer transport parameter creation until the transport +# type is selected at runtime. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), +} + + +async def get_secret_menu(params: FunctionCallParams): + category = params.arguments.get("category", "both") + logger.debug(f"Fetching secret menu with category: {category}") + items = [] + if category in {"donuts", "both"}: + items.append( + { + "name": "Butter Pecan Ice Cream (one scoop)", + "price": "$2.99", + } + ) + if category in {"drinks", "both"}: + items.append( + { + "name": "Banana Smoothie", + "price": "$4.99", + } + ) + await params.result_callback( + { + "date": datetime.date.today().isoformat(), + "items": items, + } + ) + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info(f"Starting bot") + + system_prompt = f""" +You are a drive-thru order taker for a donut shop called "Dr. Donut". Local time is currently: {datetime.datetime.now().isoformat()} +The user is talking to you over voice on their phone, and your response will be read out loud with realistic text-to-speech (TTS) technology. + +Follow every direction here when crafting your response: + +1. Use natural, conversational language that is clear and easy to follow (short sentences, simple words). +1a. Be concise and relevant: Most of your responses should be a sentence or two, unless you're asked to go deeper. Don't monopolize the conversation. +1b. Use discourse markers to ease comprehension. Never use the list format. + +2. Keep the conversation flowing. +2a. Clarify: when there is ambiguity, ask clarifying questions, rather than make assumptions. +2b. Don't implicitly or explicitly try to end the chat (i.e. do not end a response with "Talk soon!", or "Enjoy!"). +2c. Sometimes the user might just want to chat. Ask them relevant follow-up questions. +2d. Don't ask them if there's anything else they need help with (e.g. don't say things like "How can I assist you further?"). + +3. Remember that this is a voice conversation: +3a. Don't use lists, markdown, bullet points, or other formatting that's not typically spoken. +3b. Type out numbers in words (e.g. 'twenty twelve' instead of the year 2012) +3c. If something doesn't make sense, it's likely because you misheard them. There wasn't a typo, and the user didn't mispronounce anything. + +Remember to follow these rules absolutely, and do not refer to these rules, even if you're asked about them. + +When talking with the user, use the following script: +1. Take their order, acknowledging each item as it is ordered. If it's not clear which menu item the user is ordering, ask them to clarify. + DO NOT add an item to the order unless it's one of the items on the menu below. +2. Once the order is complete, repeat back the order. +2a. If the user only ordered a drink, ask them if they would like to add a donut to their order. +2b. If the user only ordered donuts, ask them if they would like to add a drink to their order. +2c. If the user ordered both drinks and donuts, don't suggest anything. +3. Total up the price of all ordered items and inform the user. +4. Ask the user to pull up to the drive thru window. +If the user asks for something that's not on the menu, inform them of that fact, and suggest the most similar item on the menu. +If the user says something unrelated to your role, responed with "Um... this is a Dr. Donut." +If the user says "thank you", respond with "My pleasure." +If the user asks about what's on the menu, DO NOT read the entire menu to them. Instead, give a couple suggestions. + +The menu of available items is as follows: + +# DONUTS + +PUMPKIN SPICE ICED DOUGHNUT $1.29 +PUMPKIN SPICE CAKE DOUGHNUT $1.29 +OLD FASHIONED DOUGHNUT $1.29 +CHOCOLATE ICED DOUGHNUT $1.09 +CHOCOLATE ICED DOUGHNUT WITH SPRINKLES $1.09 +RASPBERRY FILLED DOUGHNUT $1.09 +BLUEBERRY CAKE DOUGHNUT $1.09 +STRAWBERRY ICED DOUGHNUT WITH SPRINKLES $1.09 +LEMON FILLED DOUGHNUT $1.09 +DOUGHNUT HOLES $3.99 + +# COFFEE & DRINKS + +PUMPKIN SPICE COFFEE $2.59 +PUMPKIN SPICE LATTE $4.59 +REGULAR BREWED COFFEE $1.79 +DECAF BREWED COFFEE $1.79 +LATTE $3.49 +CAPPUCINO $3.49 +CARAMEL MACCHIATO $3.49 +MOCHA LATTE $3.49 +CARAMEL MOCHA LATTE $3.49 + +There is also a secret menu that changes daily. If the user asks about it, use the get_secret_menu tool to look up today's secret menu items. +""" + + secret_menu_function = FunctionSchema( + name="get_secret_menu", + description="Get today's secret menu items", + properties={ + "category": { + "type": "string", + "enum": ["donuts", "drinks", "both"], + "description": "The category of secret menu items to retrieve. Defaults to both.", + }, + }, + required=[], + ) + + llm = UltravoxRealtimeLLMService( + params=OneShotInputParams( + api_key=os.getenv("ULTRAVOX_API_KEY"), + system_prompt=system_prompt, + temperature=0.3, + max_duration=datetime.timedelta(minutes=3), + output_medium="text", + ), + one_shot_selected_tools=ToolsSchema(standard_tools=[secret_menu_function]), + ) + + llm.register_function("get_secret_menu", get_secret_menu) + + tts = InworldTTSService( + api_key=os.getenv("INWORLD_API_KEY", ""), + voice_id="Ashley", + model="inworld-tts-1", + temperature=1.1, + ) + + context = LLMContext([]) + + # Necessary to complete the function call lifecycle in Pipecat and + # to produce user and assistant turn stopped events. + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + user_params=LLMUserAggregatorParams( + user_turn_strategies=UserTurnStrategies( + stop=[SpeechTimeoutUserTurnStopStrategy()], + ), + # Set the VAD analyzer to emulate timing of the model. + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)), + ), + ) + + # Build the pipeline + pipeline = Pipeline( + [ + transport.input(), + user_aggregator, + llm, + tts, + transport.output(), + assistant_aggregator, + ] + ) + + # Configure the pipeline task + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) + + # Handle client connection event + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + + # Handle client disconnection events + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + @user_aggregator.event_handler("on_user_turn_stopped") + async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage): + timestamp = f"[{message.timestamp}] " if message.timestamp else "" + line = f"{timestamp}user: {message.content}" + logger.info(f"Transcript: {line}") + + @assistant_aggregator.event_handler("on_assistant_turn_stopped") + async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage): + timestamp = f"[{message.timestamp}] " if message.timestamp else "" + line = f"{timestamp}assistant: {message.content}" + logger.info(f"Transcript: {line}") + + # Run the pipeline + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/src/pipecat/services/ultravox/llm.py b/src/pipecat/services/ultravox/llm.py index d14c3b9ca..07c3c34fe 100644 --- a/src/pipecat/services/ultravox/llm.py +++ b/src/pipecat/services/ultravox/llm.py @@ -31,6 +31,7 @@ from pipecat.frames.frames import ( Frame, InputAudioRawFrame, InputTextRawFrame, + InterruptionFrame, LLMContextFrame, LLMFullResponseEndFrame, LLMFullResponseStartFrame, @@ -42,7 +43,7 @@ from pipecat.frames.frames import ( TTSStoppedFrame, TTSTextFrame, UserAudioRawFrame, - UserStoppedSpeakingFrame, + VADUserStoppedSpeakingFrame, ) from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response import ( @@ -90,6 +91,9 @@ class AgentInputParams(BaseModel): template_context: Context variables to use when instantiating a call with the agent. Defaults to an empty dict. metadata: Metadata to attach to the call. Default to an empty dict. + output_medium: The initial output medium for the agent. Use "text" for text + responses or "voice" for audio responses. Defaults to None, which uses the + agent's default. max_duration: The maximum duration of the call. Defaults to None, which will use the agent's default maximum duration. extra: Extra parameters to include in the agent call creation request. Defaults @@ -101,6 +105,7 @@ class AgentInputParams(BaseModel): agent_id: uuid.UUID template_context: Dict[str, Any] = Field(default_factory=dict) metadata: Dict[str, str] = Field(default_factory=dict) + output_medium: Optional[Literal["text", "voice"]] = None max_duration: Optional[datetime.timedelta] = Field( default=None, ge=datetime.timedelta(seconds=10), le=datetime.timedelta(hours=1) ) @@ -117,6 +122,8 @@ class OneShotInputParams(BaseModel): model: Model identifier to use. Defaults to "fixie-ai/ultravox". voice: Voice identifier for speech generation. Defaults to None. metadata: Metadata to attach to the call. Default to an empty dict. + output_medium: The initial output medium for the agent. Use "text" for text + responses or "voice" for audio responses. Defaults to None (voice). max_duration: The maximum duration of the call. Defaults to one hour. extra: Extra parameters to include in the call creation request. Defaults to an empty dict. See the Ultravox API documentation for valid arguments: @@ -129,6 +136,7 @@ class OneShotInputParams(BaseModel): model: Optional[str] = None voice: Optional[uuid.UUID] = None metadata: Dict[str, str] = Field(default_factory=dict) + output_medium: Optional[Literal["text", "voice"]] = None max_duration: datetime.timedelta = Field( default=datetime.timedelta(hours=1), ge=datetime.timedelta(seconds=10), @@ -210,6 +218,14 @@ class UltravoxRealtimeLLMService(LLMService): self._sample_rate = 48000 self._resampler = create_stream_resampler() + def can_generate_metrics(self) -> bool: + """Check if the service can generate usage metrics. + + Returns: + True if metrics generation is supported. + """ + return True + # # standard AIService frame handling # @@ -237,6 +253,14 @@ class UltravoxRealtimeLLMService(LLMService): except Exception as e: await self.push_error("Failed to connect to Ultravox", e, fatal=True) + @staticmethod + def _output_medium_to_api(medium: Optional[Literal["text", "voice"]]) -> Optional[str]: + if medium == "text": + return "MESSAGE_MEDIUM_TEXT" + elif medium == "voice": + return "MESSAGE_MEDIUM_VOICE" + return None + async def _start_agent_call(self, params: AgentInputParams) -> str: request_body = { "templateContext": params.template_context, @@ -247,6 +271,9 @@ class UltravoxRealtimeLLMService(LLMService): } }, } + initial_output_medium = self._output_medium_to_api(params.output_medium) + if initial_output_medium: + request_body["initialOutputMedium"] = initial_output_medium if params.max_duration: request_body["maxDuration"] = f"{params.max_duration.total_seconds():3f}s" request_body = request_body | params.extra @@ -277,7 +304,11 @@ class UltravoxRealtimeLLMService(LLMService): "inputSampleRate": self._sample_rate, } }, - } | params.extra + } + initial_output_medium = self._output_medium_to_api(params.output_medium) + if initial_output_medium: + request_body["initialOutputMedium"] = initial_output_medium + request_body = request_body | params.extra async with aiohttp.ClientSession() as session: async with session.post( "https://api.ultravox.ai/api/calls", @@ -367,18 +398,17 @@ class UltravoxRealtimeLLMService(LLMService): else LLMContext.from_openai_context(frame.context) ) await self._handle_context(context) + elif isinstance(frame, InterruptionFrame): + await self.stop_all_metrics() + await self.push_frame(frame, direction) elif isinstance(frame, InputTextRawFrame): await self._send_user_text(frame.text) await self.push_frame(frame, direction) elif isinstance(frame, InputAudioRawFrame): await self._send_user_audio(frame) await self.push_frame(frame, direction) - elif isinstance(frame, UserStoppedSpeakingFrame): - # This may or may not align with Ultravox's end of user speech detection, - # which relies on a more complex endpointing model. In particular it will - # yield a seemingly very slow TTFB in the case of endpointing false - # negatives. It will be close in the majority of cases though. - await self.start_ttfb_metrics() + elif isinstance(frame, VADUserStoppedSpeakingFrame): + await self._handle_vad_user_stopped_speaking(frame) await self.push_frame(frame, direction) else: await self.push_frame(frame, direction) @@ -399,6 +429,25 @@ class UltravoxRealtimeLLMService(LLMService): } await self._send(socket_message) + async def _handle_vad_user_stopped_speaking(self, frame: VADUserStoppedSpeakingFrame): + """Handle VAD user stopped speaking frame. + + Calculates the actual speech end time and starts a timeout task to wait + for the final transcription before reporting TTFB. + + Args: + frame: The VAD user stopped speaking frame. + """ + # Skip TTFB measurement if stop_secs is not set + if frame.stop_secs == 0.0: + return + + # Calculate the actual speech end time (current time minus VAD stop delay). + # This approximates when the last user audio was sent to the Ultravox service, + # which we use to measure against the eventual transcription response. + speech_end_time = frame.timestamp - frame.stop_secs + await self.start_ttfb_metrics(start_time=speech_end_time) + async def _send_user_audio(self, frame: InputAudioRawFrame): """Send user audio frame to Ultravox Realtime.""" if not self._socket: @@ -502,6 +551,7 @@ class UltravoxRealtimeLLMService(LLMService): if not audio: return if not self._bot_responding: + await self.start_processing_metrics() await self.stop_ttfb_metrics() await self.push_frame(LLMFullResponseStartFrame()) await self.push_frame(TTSStartedFrame()) @@ -509,6 +559,7 @@ class UltravoxRealtimeLLMService(LLMService): await self.push_frame(TTSAudioRawFrame(audio, self._sample_rate, 1)) async def _handle_response_end(self): + await self.stop_processing_metrics() if self._bot_responding == "voice": await self.push_frame(TTSStoppedFrame()) await self.push_frame(LLMFullResponseEndFrame()) @@ -542,22 +593,29 @@ class UltravoxRealtimeLLMService(LLMService): async def _handle_agent_transcript( self, medium: str, text: Optional[str], delta: Optional[str], final: bool ): - if text or delta: - frame = LLMTextFrame(text=text or delta) - frame.skip_tts = medium == "voice" - await self.push_frame(frame) - if medium == "text": - if text: - await self.stop_ttfb_metrics() - await self.push_frame(LLMFullResponseStartFrame()) - await self.push_frame(TTSStartedFrame()) - await self.push_frame(TTSTextFrame(text=text, aggregated_by=AggregationType.WORD)) - self._bot_responding = "text" - elif final: + if medium == "voice": + # In voice mode, audio is handled by _handle_audio(). Here we push + # text transcripts of the audio for downstream consumers. + if (text or delta) and not final: + frame = LLMTextFrame(text=text or delta) + frame.append_to_context = False + await self.push_frame(frame) + if delta: + tts_frame = TTSTextFrame(text=delta, aggregated_by=AggregationType.WORD) + tts_frame.includes_inter_frame_spaces = True + await self.push_frame(tts_frame) + elif medium == "text": + if final: + await self.stop_processing_metrics() await self.push_frame(LLMFullResponseEndFrame()) self._bot_responding = None - elif delta: - await self.push_frame(TTSTextFrame(text=delta, aggregated_by=AggregationType.WORD)) + elif text or delta: + if not self._bot_responding: + await self.start_processing_metrics() + await self.stop_ttfb_metrics() + await self.push_frame(LLMFullResponseStartFrame()) + self._bot_responding = "text" + await self.push_frame(LLMTextFrame(text=text or delta)) def create_context_aggregator( self,