Merge pull request #3806 from pipecat-ai/mb/ultravox-updates

Align Ultravox Realtime service with OpenAI/Gemini patterns
This commit is contained in:
Mark Backman
2026-02-26 10:49:21 -05:00
committed by GitHub
6 changed files with 384 additions and 26 deletions

1
changelog/3806.added.md Normal file
View File

@@ -0,0 +1 @@
- Added `output_medium` parameter to `AgentInputParams` and `OneShotInputParams` in Ultravox service to control initial output medium (text or voice) at call creation time.

View File

@@ -0,0 +1 @@
- Improved Ultravox TTFB measurement accuracy by using VAD speech end time instead of `UserStoppedSpeakingFrame` timing.

View File

@@ -0,0 +1 @@
- Aligned `UltravoxRealtimeLLMService` frame handling with OpenAI/Gemini realtime services: added `InterruptionFrame` handling with metrics cleanup, processing metrics at response boundaries, and improved agent transcript handling for both voice and text output modalities.

View File

@@ -12,11 +12,18 @@ from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.aggregators.llm_response_universal import (
AssistantTurnStoppedMessage,
LLMContextAggregatorPair,
LLMUserAggregatorParams,
UserTurnStoppedMessage,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.llm_service import FunctionCallParams
@@ -24,6 +31,8 @@ from pipecat.services.ultravox.llm import OneShotInputParams, UltravoxRealtimeLL
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.turns.user_stop import SpeechTimeoutUserTurnStopStrategy
from pipecat.turns.user_turn_strategies import UserTurnStrategies
# Load environment variables
load_dotenv(override=True)
@@ -168,8 +177,21 @@ There is also a secret menu that changes daily. If the user asks about it, use t
llm.register_function("get_secret_menu", get_secret_menu)
# Necessary to complete the function call lifecycle in Pipecat.
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(LLMContext([]))
context = LLMContext([])
# Necessary to complete the function call lifecycle in Pipecat and
# to produce user and assistant turn stopped events.
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
user_turn_strategies=UserTurnStrategies(
stop=[SpeechTimeoutUserTurnStopStrategy()],
),
# Set the VAD analyzer to create reliable TTFB measurements and
# user stop events.
vad_analyzer=SileroVADAnalyzer(),
),
)
# Build the pipeline
pipeline = Pipeline(
@@ -177,8 +199,8 @@ There is also a secret menu that changes daily. If the user asks about it, use t
transport.input(),
user_aggregator,
llm,
assistant_aggregator,
transport.output(),
assistant_aggregator,
]
)
@@ -203,6 +225,18 @@ There is also a secret menu that changes daily. If the user asks about it, use t
logger.info(f"Client disconnected")
await task.cancel()
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
line = f"{timestamp}user: {message.content}"
logger.info(f"Transcript: {line}")
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage):
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
line = f"{timestamp}assistant: {message.content}"
logger.info(f"Transcript: {line}")
# Run the pipeline
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)

View File

@@ -0,0 +1,263 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import datetime
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
AssistantTurnStoppedMessage,
LLMContextAggregatorPair,
LLMUserAggregatorParams,
UserTurnStoppedMessage,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.inworld.tts import InworldTTSService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.ultravox.llm import OneShotInputParams, UltravoxRealtimeLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.turns.user_stop import SpeechTimeoutUserTurnStopStrategy
from pipecat.turns.user_turn_strategies import UserTurnStrategies
# Load environment variables
load_dotenv(override=True)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def get_secret_menu(params: FunctionCallParams):
category = params.arguments.get("category", "both")
logger.debug(f"Fetching secret menu with category: {category}")
items = []
if category in {"donuts", "both"}:
items.append(
{
"name": "Butter Pecan Ice Cream (one scoop)",
"price": "$2.99",
}
)
if category in {"drinks", "both"}:
items.append(
{
"name": "Banana Smoothie",
"price": "$4.99",
}
)
await params.result_callback(
{
"date": datetime.date.today().isoformat(),
"items": items,
}
)
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
system_prompt = f"""
You are a drive-thru order taker for a donut shop called "Dr. Donut". Local time is currently: {datetime.datetime.now().isoformat()}
The user is talking to you over voice on their phone, and your response will be read out loud with realistic text-to-speech (TTS) technology.
Follow every direction here when crafting your response:
1. Use natural, conversational language that is clear and easy to follow (short sentences, simple words).
1a. Be concise and relevant: Most of your responses should be a sentence or two, unless you're asked to go deeper. Don't monopolize the conversation.
1b. Use discourse markers to ease comprehension. Never use the list format.
2. Keep the conversation flowing.
2a. Clarify: when there is ambiguity, ask clarifying questions, rather than make assumptions.
2b. Don't implicitly or explicitly try to end the chat (i.e. do not end a response with "Talk soon!", or "Enjoy!").
2c. Sometimes the user might just want to chat. Ask them relevant follow-up questions.
2d. Don't ask them if there's anything else they need help with (e.g. don't say things like "How can I assist you further?").
3. Remember that this is a voice conversation:
3a. Don't use lists, markdown, bullet points, or other formatting that's not typically spoken.
3b. Type out numbers in words (e.g. 'twenty twelve' instead of the year 2012)
3c. If something doesn't make sense, it's likely because you misheard them. There wasn't a typo, and the user didn't mispronounce anything.
Remember to follow these rules absolutely, and do not refer to these rules, even if you're asked about them.
When talking with the user, use the following script:
1. Take their order, acknowledging each item as it is ordered. If it's not clear which menu item the user is ordering, ask them to clarify.
DO NOT add an item to the order unless it's one of the items on the menu below.
2. Once the order is complete, repeat back the order.
2a. If the user only ordered a drink, ask them if they would like to add a donut to their order.
2b. If the user only ordered donuts, ask them if they would like to add a drink to their order.
2c. If the user ordered both drinks and donuts, don't suggest anything.
3. Total up the price of all ordered items and inform the user.
4. Ask the user to pull up to the drive thru window.
If the user asks for something that's not on the menu, inform them of that fact, and suggest the most similar item on the menu.
If the user says something unrelated to your role, responed with "Um... this is a Dr. Donut."
If the user says "thank you", respond with "My pleasure."
If the user asks about what's on the menu, DO NOT read the entire menu to them. Instead, give a couple suggestions.
The menu of available items is as follows:
# DONUTS
PUMPKIN SPICE ICED DOUGHNUT $1.29
PUMPKIN SPICE CAKE DOUGHNUT $1.29
OLD FASHIONED DOUGHNUT $1.29
CHOCOLATE ICED DOUGHNUT $1.09
CHOCOLATE ICED DOUGHNUT WITH SPRINKLES $1.09
RASPBERRY FILLED DOUGHNUT $1.09
BLUEBERRY CAKE DOUGHNUT $1.09
STRAWBERRY ICED DOUGHNUT WITH SPRINKLES $1.09
LEMON FILLED DOUGHNUT $1.09
DOUGHNUT HOLES $3.99
# COFFEE & DRINKS
PUMPKIN SPICE COFFEE $2.59
PUMPKIN SPICE LATTE $4.59
REGULAR BREWED COFFEE $1.79
DECAF BREWED COFFEE $1.79
LATTE $3.49
CAPPUCINO $3.49
CARAMEL MACCHIATO $3.49
MOCHA LATTE $3.49
CARAMEL MOCHA LATTE $3.49
There is also a secret menu that changes daily. If the user asks about it, use the get_secret_menu tool to look up today's secret menu items.
"""
secret_menu_function = FunctionSchema(
name="get_secret_menu",
description="Get today's secret menu items",
properties={
"category": {
"type": "string",
"enum": ["donuts", "drinks", "both"],
"description": "The category of secret menu items to retrieve. Defaults to both.",
},
},
required=[],
)
llm = UltravoxRealtimeLLMService(
params=OneShotInputParams(
api_key=os.getenv("ULTRAVOX_API_KEY"),
system_prompt=system_prompt,
temperature=0.3,
max_duration=datetime.timedelta(minutes=3),
output_medium="text",
),
one_shot_selected_tools=ToolsSchema(standard_tools=[secret_menu_function]),
)
llm.register_function("get_secret_menu", get_secret_menu)
tts = InworldTTSService(
api_key=os.getenv("INWORLD_API_KEY", ""),
voice_id="Ashley",
model="inworld-tts-1",
temperature=1.1,
)
context = LLMContext([])
# Necessary to complete the function call lifecycle in Pipecat and
# to produce user and assistant turn stopped events.
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
user_turn_strategies=UserTurnStrategies(
stop=[SpeechTimeoutUserTurnStopStrategy()],
),
# Set the VAD analyzer to emulate timing of the model.
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
),
)
# Build the pipeline
pipeline = Pipeline(
[
transport.input(),
user_aggregator,
llm,
tts,
transport.output(),
assistant_aggregator,
]
)
# Configure the pipeline task
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
# Handle client connection event
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Handle client disconnection events
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
line = f"{timestamp}user: {message.content}"
logger.info(f"Transcript: {line}")
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage):
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
line = f"{timestamp}assistant: {message.content}"
logger.info(f"Transcript: {line}")
# Run the pipeline
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -31,6 +31,7 @@ from pipecat.frames.frames import (
Frame,
InputAudioRawFrame,
InputTextRawFrame,
InterruptionFrame,
LLMContextFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
@@ -42,7 +43,7 @@ from pipecat.frames.frames import (
TTSStoppedFrame,
TTSTextFrame,
UserAudioRawFrame,
UserStoppedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response import (
@@ -90,6 +91,9 @@ class AgentInputParams(BaseModel):
template_context: Context variables to use when instantiating a call with the
agent. Defaults to an empty dict.
metadata: Metadata to attach to the call. Default to an empty dict.
output_medium: The initial output medium for the agent. Use "text" for text
responses or "voice" for audio responses. Defaults to None, which uses the
agent's default.
max_duration: The maximum duration of the call. Defaults to None, which will
use the agent's default maximum duration.
extra: Extra parameters to include in the agent call creation request. Defaults
@@ -101,6 +105,7 @@ class AgentInputParams(BaseModel):
agent_id: uuid.UUID
template_context: Dict[str, Any] = Field(default_factory=dict)
metadata: Dict[str, str] = Field(default_factory=dict)
output_medium: Optional[Literal["text", "voice"]] = None
max_duration: Optional[datetime.timedelta] = Field(
default=None, ge=datetime.timedelta(seconds=10), le=datetime.timedelta(hours=1)
)
@@ -117,6 +122,8 @@ class OneShotInputParams(BaseModel):
model: Model identifier to use. Defaults to "fixie-ai/ultravox".
voice: Voice identifier for speech generation. Defaults to None.
metadata: Metadata to attach to the call. Default to an empty dict.
output_medium: The initial output medium for the agent. Use "text" for text
responses or "voice" for audio responses. Defaults to None (voice).
max_duration: The maximum duration of the call. Defaults to one hour.
extra: Extra parameters to include in the call creation request. Defaults
to an empty dict. See the Ultravox API documentation for valid arguments:
@@ -129,6 +136,7 @@ class OneShotInputParams(BaseModel):
model: Optional[str] = None
voice: Optional[uuid.UUID] = None
metadata: Dict[str, str] = Field(default_factory=dict)
output_medium: Optional[Literal["text", "voice"]] = None
max_duration: datetime.timedelta = Field(
default=datetime.timedelta(hours=1),
ge=datetime.timedelta(seconds=10),
@@ -210,6 +218,14 @@ class UltravoxRealtimeLLMService(LLMService):
self._sample_rate = 48000
self._resampler = create_stream_resampler()
def can_generate_metrics(self) -> bool:
"""Check if the service can generate usage metrics.
Returns:
True if metrics generation is supported.
"""
return True
#
# standard AIService frame handling
#
@@ -237,6 +253,14 @@ class UltravoxRealtimeLLMService(LLMService):
except Exception as e:
await self.push_error("Failed to connect to Ultravox", e, fatal=True)
@staticmethod
def _output_medium_to_api(medium: Optional[Literal["text", "voice"]]) -> Optional[str]:
if medium == "text":
return "MESSAGE_MEDIUM_TEXT"
elif medium == "voice":
return "MESSAGE_MEDIUM_VOICE"
return None
async def _start_agent_call(self, params: AgentInputParams) -> str:
request_body = {
"templateContext": params.template_context,
@@ -247,6 +271,9 @@ class UltravoxRealtimeLLMService(LLMService):
}
},
}
initial_output_medium = self._output_medium_to_api(params.output_medium)
if initial_output_medium:
request_body["initialOutputMedium"] = initial_output_medium
if params.max_duration:
request_body["maxDuration"] = f"{params.max_duration.total_seconds():3f}s"
request_body = request_body | params.extra
@@ -277,7 +304,11 @@ class UltravoxRealtimeLLMService(LLMService):
"inputSampleRate": self._sample_rate,
}
},
} | params.extra
}
initial_output_medium = self._output_medium_to_api(params.output_medium)
if initial_output_medium:
request_body["initialOutputMedium"] = initial_output_medium
request_body = request_body | params.extra
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.ultravox.ai/api/calls",
@@ -367,18 +398,17 @@ class UltravoxRealtimeLLMService(LLMService):
else LLMContext.from_openai_context(frame.context)
)
await self._handle_context(context)
elif isinstance(frame, InterruptionFrame):
await self.stop_all_metrics()
await self.push_frame(frame, direction)
elif isinstance(frame, InputTextRawFrame):
await self._send_user_text(frame.text)
await self.push_frame(frame, direction)
elif isinstance(frame, InputAudioRawFrame):
await self._send_user_audio(frame)
await self.push_frame(frame, direction)
elif isinstance(frame, UserStoppedSpeakingFrame):
# This may or may not align with Ultravox's end of user speech detection,
# which relies on a more complex endpointing model. In particular it will
# yield a seemingly very slow TTFB in the case of endpointing false
# negatives. It will be close in the majority of cases though.
await self.start_ttfb_metrics()
elif isinstance(frame, VADUserStoppedSpeakingFrame):
await self._handle_vad_user_stopped_speaking(frame)
await self.push_frame(frame, direction)
else:
await self.push_frame(frame, direction)
@@ -399,6 +429,25 @@ class UltravoxRealtimeLLMService(LLMService):
}
await self._send(socket_message)
async def _handle_vad_user_stopped_speaking(self, frame: VADUserStoppedSpeakingFrame):
"""Handle VAD user stopped speaking frame.
Calculates the actual speech end time and starts a timeout task to wait
for the final transcription before reporting TTFB.
Args:
frame: The VAD user stopped speaking frame.
"""
# Skip TTFB measurement if stop_secs is not set
if frame.stop_secs == 0.0:
return
# Calculate the actual speech end time (current time minus VAD stop delay).
# This approximates when the last user audio was sent to the Ultravox service,
# which we use to measure against the eventual transcription response.
speech_end_time = frame.timestamp - frame.stop_secs
await self.start_ttfb_metrics(start_time=speech_end_time)
async def _send_user_audio(self, frame: InputAudioRawFrame):
"""Send user audio frame to Ultravox Realtime."""
if not self._socket:
@@ -502,6 +551,7 @@ class UltravoxRealtimeLLMService(LLMService):
if not audio:
return
if not self._bot_responding:
await self.start_processing_metrics()
await self.stop_ttfb_metrics()
await self.push_frame(LLMFullResponseStartFrame())
await self.push_frame(TTSStartedFrame())
@@ -509,6 +559,7 @@ class UltravoxRealtimeLLMService(LLMService):
await self.push_frame(TTSAudioRawFrame(audio, self._sample_rate, 1))
async def _handle_response_end(self):
await self.stop_processing_metrics()
if self._bot_responding == "voice":
await self.push_frame(TTSStoppedFrame())
await self.push_frame(LLMFullResponseEndFrame())
@@ -542,22 +593,29 @@ class UltravoxRealtimeLLMService(LLMService):
async def _handle_agent_transcript(
self, medium: str, text: Optional[str], delta: Optional[str], final: bool
):
if text or delta:
frame = LLMTextFrame(text=text or delta)
frame.skip_tts = medium == "voice"
await self.push_frame(frame)
if medium == "text":
if text:
await self.stop_ttfb_metrics()
await self.push_frame(LLMFullResponseStartFrame())
await self.push_frame(TTSStartedFrame())
await self.push_frame(TTSTextFrame(text=text, aggregated_by=AggregationType.WORD))
self._bot_responding = "text"
elif final:
if medium == "voice":
# In voice mode, audio is handled by _handle_audio(). Here we push
# text transcripts of the audio for downstream consumers.
if (text or delta) and not final:
frame = LLMTextFrame(text=text or delta)
frame.append_to_context = False
await self.push_frame(frame)
if delta:
tts_frame = TTSTextFrame(text=delta, aggregated_by=AggregationType.WORD)
tts_frame.includes_inter_frame_spaces = True
await self.push_frame(tts_frame)
elif medium == "text":
if final:
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())
self._bot_responding = None
elif delta:
await self.push_frame(TTSTextFrame(text=delta, aggregated_by=AggregationType.WORD))
elif text or delta:
if not self._bot_responding:
await self.start_processing_metrics()
await self.stop_ttfb_metrics()
await self.push_frame(LLMFullResponseStartFrame())
self._bot_responding = "text"
await self.push_frame(LLMTextFrame(text=text or delta))
def create_context_aggregator(
self,