feat: add OpenAI Responses API LLM service

Add OpenAIResponsesLLMService using the Responses API, with a dedicated
adapter that converts LLMContext messages to Responses API input items
(system→developer, tool_calls→function_call, tool→function_call_output,
multimodal content conversion, and tools schema flattening).

- New adapter: open_ai_responses_adapter.py
- New service: openai/responses/llm.py
- Examples: 07-interruptible and 14-function-calling variants
- 19 unit tests for adapter conversion logic
- Eval entries for both examples
This commit is contained in:
Paul Kompfner
2026-03-18 11:45:23 -04:00
committed by Mark Backman
parent b5c2d41ba3
commit 4d548117fa
8 changed files with 1291 additions and 0 deletions

View File

@@ -0,0 +1,125 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.responses.llm import OpenAIResponsesLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OpenAIResponsesLLMService(
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAIResponsesLLMService.Settings(
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
),
)
context = LLMContext()
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
user_aggregator, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
assistant_aggregator, # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,175 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.responses.llm import OpenAIResponsesLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
await params.result_callback({"conditions": "nice", "temperature": "75"})
async def fetch_restaurant_recommendation(params: FunctionCallParams):
await params.result_callback({"name": "The Golden Dragon"})
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OpenAIResponsesLLMService(
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAIResponsesLLMService.Settings(
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
),
)
# You can also register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
@llm.event_handler("on_function_calls_started")
async def on_function_calls_started(service, function_calls):
await tts.queue_frame(TTSSpeakFrame("Let me check on that."))
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
restaurant_function = FunctionSchema(
name="get_restaurant_recommendation",
description="Get a restaurant recommendation",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
},
required=["location"],
)
tools = ToolsSchema(standard_tools=[weather_function, restaurant_function])
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
stt,
user_aggregator,
llm,
tts,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -147,6 +147,7 @@ TESTS_07 = [
("07zi-interruptible-piper.py", EVAL_SIMPLE_MATH),
("07zj-interruptible-kokoro.py", EVAL_SIMPLE_MATH),
("07zk-interruptible-resembleai.py", EVAL_SIMPLE_MATH),
("07-interruptible-openai-responses.py", EVAL_SIMPLE_MATH),
# Needs a local XTTS docker instance running.
# ("07i-interruptible-xtts.py", EVAL_SIMPLE_MATH),
]
@@ -184,6 +185,8 @@ TESTS_14 = [
("14v-function-calling-openai.py", EVAL_WEATHER),
("14w-function-calling-mistral.py", EVAL_WEATHER),
("14x-function-calling-openpipe.py", EVAL_WEATHER),
("14-function-calling-openai-responses.py", EVAL_WEATHER),
("14-function-calling-openai-responses.py", EVAL_WEATHER_AND_RESTAURANT),
# Video
("14d-function-calling-anthropic-video.py", EVAL_VISION_CAMERA),
("14d-function-calling-aws-video.py", EVAL_VISION_CAMERA),

View File

@@ -0,0 +1,240 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""OpenAI Responses API adapter for Pipecat."""
import copy
from typing import Any, Dict, List, Optional, TypedDict
from loguru import logger
from openai._types import NotGiven as OpenAINotGiven
from openai.types.responses import FunctionToolParam, ResponseInputItemParam
from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.processors.aggregators.llm_context import (
LLMContext,
LLMContextMessage,
LLMSpecificMessage,
NotGiven,
)
class OpenAIResponsesLLMInvocationParams(TypedDict, total=False):
"""Context-based parameters for invoking OpenAI Responses API."""
input: List[ResponseInputItemParam]
tools: List[FunctionToolParam] | OpenAINotGiven
instructions: str
class OpenAIResponsesLLMAdapter(BaseLLMAdapter[OpenAIResponsesLLMInvocationParams]):
"""OpenAI Responses API adapter for Pipecat.
Handles:
- Converting LLMContext messages to Responses API input items
- Converting Pipecat's standardized tools schema to Responses API function tool format
- Extracting and sanitizing messages from the LLM context for logging
"""
def __init__(self):
"""Initialize the adapter."""
super().__init__()
self._warned_system_instruction = False
@property
def id_for_llm_specific_messages(self) -> str:
"""Get the identifier used in LLMSpecificMessage instances."""
return "openai_responses"
def get_llm_invocation_params(
self,
context: LLMContext,
*,
system_instruction: Optional[str] = None,
) -> OpenAIResponsesLLMInvocationParams:
"""Get Responses API invocation parameters from a universal LLM context.
Args:
context: The LLM context containing messages, tools, etc.
system_instruction: Optional system instruction from service settings.
Returns:
Dictionary of parameters for the Responses API.
"""
messages = self.get_messages(context)
input_items = self._convert_messages_to_input(messages)
params: OpenAIResponsesLLMInvocationParams = {
"input": input_items,
"tools": self.from_standard_tools(context.tools),
}
if system_instruction:
# Compatibility: The Responses API requires at least one input
# message when instructions are provided. Contexts that worked with
# OpenAILLMService (system_instruction + empty messages) need the
# instructions converted to an initial developer message.
if not input_items:
params["input"] = [{"role": "developer", "content": system_instruction}]
else:
params["instructions"] = system_instruction
return params
def to_provider_tools_format(self, tools_schema: ToolsSchema) -> List[FunctionToolParam]:
"""Convert function schemas to Responses API function tool format.
Args:
tools_schema: The Pipecat tools schema to convert.
Returns:
List of Responses API function tool definitions.
"""
functions_schema = tools_schema.standard_tools
result = []
for func in functions_schema:
d = func.to_default_dict()
tool: FunctionToolParam = {
"type": "function",
"name": d["name"],
"parameters": d.get("parameters", {}),
"strict": d.get("strict", None),
}
if "description" in d:
tool["description"] = d["description"]
result.append(tool)
return result
def get_messages_for_logging(self, context: LLMContext) -> List[Dict[str, Any]]:
"""Get messages from context in a format ready for logging.
Removes or truncates sensitive data like image content for safe logging.
Args:
context: The LLM context containing messages.
Returns:
List of messages in a format ready for logging.
"""
msgs = []
for message in self.get_messages(context):
msg = copy.deepcopy(message)
if "content" in msg:
if isinstance(msg["content"], list):
for item in msg["content"]:
if item.get("type") == "image_url":
if item["image_url"]["url"].startswith("data:image/"):
item["image_url"]["url"] = "data:image/..."
if item.get("type") == "input_audio":
item["input_audio"]["data"] = "..."
msgs.append(msg)
return msgs
def _convert_messages_to_input(
self, messages: List[LLMContextMessage]
) -> List[ResponseInputItemParam]:
"""Convert LLMContext messages to Responses API input items.
Args:
messages: Messages from the LLMContext.
Returns:
List of Responses API input items.
"""
result: List[ResponseInputItemParam] = []
is_first = True
for message in messages:
if isinstance(message, LLMSpecificMessage):
result.append(message.message)
is_first = False
continue
role = message.get("role")
if role == "system":
if is_first and not self._warned_system_instruction:
logger.warning(
"System messages in LLMContext are converted to 'developer' role for the "
"Responses API. Consider using settings.system_instruction instead, which "
"maps to the 'instructions' parameter."
)
self._warned_system_instruction = True
content = message.get("content", "")
if isinstance(content, list):
content = self._convert_multimodal_content(content)
result.append({"role": "developer", "content": content})
elif role == "user":
content = message.get("content", "")
if isinstance(content, list):
content = self._convert_multimodal_content(content)
result.append({"role": "user", "content": content})
elif role == "assistant":
tool_calls = message.get("tool_calls")
if tool_calls:
for tc in tool_calls:
func = tc.get("function", {})
result.append(
{
"type": "function_call",
"call_id": tc.get("id", ""),
"name": func.get("name", ""),
"arguments": func.get("arguments", ""),
}
)
else:
content = message.get("content", "")
if isinstance(content, list):
content = self._convert_multimodal_content(content)
result.append({"role": "assistant", "content": content})
elif role == "tool":
content = message.get("content", "")
if not isinstance(content, str):
content = str(content)
result.append(
{
"type": "function_call_output",
"call_id": message.get("tool_call_id", ""),
"output": content,
}
)
is_first = False
return result
def _convert_multimodal_content(self, content: list) -> list:
"""Convert multimodal content parts to Responses API format.
Args:
content: List of content parts from the LLMContext message.
Returns:
List of content parts in Responses API format.
"""
result = []
for part in content:
part_type = part.get("type")
if part_type == "text":
result.append({"type": "input_text", "text": part.get("text", "")})
elif part_type == "image_url":
image_url_obj = part.get("image_url", {})
result.append(
{
"type": "input_image",
"image_url": image_url_obj.get("url", ""),
"detail": image_url_obj.get("detail", "auto"),
}
)
else:
# Pass through unknown types as-is
result.append(part)
return result

View File

@@ -11,6 +11,7 @@ from pipecat.services import DeprecatedModuleProxy
from .image import *
from .llm import *
from .realtime import *
from .responses.llm import *
from .stt import *
from .tts import *

View File

@@ -0,0 +1,5 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

View File

@@ -0,0 +1,393 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""OpenAI Responses API LLM service implementation."""
import json
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import Any, Dict, List, Mapping, Optional
import httpx
from loguru import logger
from openai import NOT_GIVEN, AsyncOpenAI, AsyncStream, DefaultAsyncHttpxClient
from openai.types.responses import (
ResponseCompletedEvent,
ResponseFunctionCallArgumentsDeltaEvent,
ResponseFunctionCallArgumentsDoneEvent,
ResponseOutputItemAddedEvent,
ResponseOutputItemDoneEvent,
ResponseStreamEvent,
ResponseTextDeltaEvent,
)
from pipecat.adapters.services.open_ai_responses_adapter import (
OpenAIResponsesLLMAdapter,
OpenAIResponsesLLMInvocationParams,
)
from pipecat.frames.frames import (
Frame,
LLMContextFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
)
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
from pipecat.services.settings import NOT_GIVEN as _NOT_GIVEN
from pipecat.services.settings import LLMSettings, _NotGiven
from pipecat.utils.tracing.service_decorators import traced_llm
@dataclass
class OpenAIResponsesLLMSettings(LLMSettings):
"""Settings for OpenAIResponsesLLMService.
Parameters:
max_completion_tokens: Maximum completion tokens to generate.
"""
max_completion_tokens: int | _NotGiven = field(default_factory=lambda: _NOT_GIVEN)
class OpenAIResponsesLLMService(LLMService):
"""OpenAI Responses API LLM service.
This service works with the universal LLMContext and LLMContextAggregatorPair.
Example::
llm = OpenAIResponsesLLMService(
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAIResponsesLLMService.Settings(
model="gpt-4.1",
system_instruction="You are a helpful assistant.",
),
)
"""
Settings = OpenAIResponsesLLMSettings
_settings: Settings
adapter_class = OpenAIResponsesLLMAdapter
def __init__(
self,
*,
model: Optional[str] = None,
api_key=None,
base_url=None,
organization=None,
project=None,
default_headers: Optional[Mapping[str, str]] = None,
settings: Optional[Settings] = None,
**kwargs,
):
"""Initialize the OpenAI Responses API LLM service.
Args:
model: The OpenAI model name to use. Defaults to "gpt-4.1".
api_key: OpenAI API key. If None, uses environment variable.
base_url: Custom base URL for OpenAI API. If None, uses default.
organization: OpenAI organization ID.
project: OpenAI project ID.
default_headers: Additional HTTP headers to include in requests.
settings: Runtime-updatable settings. When provided alongside
other parameters, ``settings`` values take precedence.
**kwargs: Additional arguments passed to the parent LLMService.
"""
default_settings = self.Settings(
model="gpt-4.1",
system_instruction=None,
frequency_penalty=None,
presence_penalty=None,
seed=None,
temperature=NOT_GIVEN,
top_p=NOT_GIVEN,
top_k=None,
max_tokens=None,
max_completion_tokens=NOT_GIVEN,
filter_incomplete_user_turns=False,
user_turn_completion_config=None,
extra={},
)
if model is not None:
default_settings.model = model
if settings is not None:
default_settings.apply_update(settings)
super().__init__(
settings=default_settings,
**kwargs,
)
self._client = self._create_client(
api_key=api_key,
base_url=base_url,
organization=organization,
project=project,
default_headers=default_headers,
)
if self._settings.system_instruction:
logger.debug(f"{self}: Using system instruction: {self._settings.system_instruction}")
def _create_client(
self,
api_key=None,
base_url=None,
organization=None,
project=None,
default_headers=None,
) -> AsyncOpenAI:
"""Create an AsyncOpenAI client instance.
Args:
api_key: OpenAI API key.
base_url: Custom base URL for the API.
organization: OpenAI organization ID.
project: OpenAI project ID.
default_headers: Additional HTTP headers.
Returns:
Configured AsyncOpenAI client instance.
"""
return AsyncOpenAI(
api_key=api_key,
base_url=base_url,
organization=organization,
project=project,
http_client=DefaultAsyncHttpxClient(
limits=httpx.Limits(
max_keepalive_connections=100, max_connections=1000, keepalive_expiry=None
)
),
default_headers=default_headers,
)
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics."""
return True
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames for LLM completion requests.
Args:
frame: The frame to process.
direction: The direction of frame processing.
"""
await super().process_frame(frame, direction)
context = None
if isinstance(frame, LLMContextFrame):
context = frame.context
else:
await self.push_frame(frame, direction)
if context:
try:
await self.push_frame(LLMFullResponseStartFrame())
await self.start_processing_metrics()
await self._process_context(context)
except httpx.TimeoutException as e:
await self._call_event_handler("on_completion_timeout")
await self.push_error(error_msg="LLM completion timeout", exception=e)
except Exception as e:
await self.push_error(error_msg=f"Error during completion: {e}", exception=e)
finally:
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())
@traced_llm
async def _process_context(self, context: LLMContext):
adapter = self.get_llm_adapter()
logger.debug(
f"{self}: Generating response from universal context "
f"{adapter.get_messages_for_logging(context)}"
)
invocation_params: OpenAIResponsesLLMInvocationParams = adapter.get_llm_invocation_params(
context, system_instruction=self._settings.system_instruction
)
params = self._build_response_params(invocation_params)
await self.start_ttfb_metrics()
stream: AsyncStream[ResponseStreamEvent] = await self._client.responses.create(**params)
# Track function calls across stream events
function_calls: Dict[str, Dict[str, str]] = {} # item_id -> {name, call_id, arguments}
current_arguments: Dict[str, str] = {} # item_id -> accumulated arguments
@asynccontextmanager
async def _closing(stream):
chunk_iter = stream.__aiter__()
try:
yield chunk_iter
finally:
if hasattr(chunk_iter, "aclose"):
await chunk_iter.aclose()
if hasattr(stream, "close"):
await stream.close()
elif hasattr(stream, "aclose"):
await stream.aclose()
async with _closing(stream) as event_iter:
async for event in event_iter:
if isinstance(event, ResponseTextDeltaEvent):
await self.stop_ttfb_metrics()
await self._push_llm_text(event.delta)
elif isinstance(event, ResponseOutputItemAddedEvent):
await self.stop_ttfb_metrics()
item = event.item
if getattr(item, "type", None) == "function_call":
item_id = getattr(item, "id", "") or ""
function_calls[item_id] = {
"name": getattr(item, "name", ""),
"call_id": getattr(item, "call_id", ""),
"arguments": "",
}
current_arguments[item_id] = ""
elif isinstance(event, ResponseFunctionCallArgumentsDeltaEvent):
item_id = event.item_id
if item_id in current_arguments:
current_arguments[item_id] += event.delta
elif isinstance(event, ResponseFunctionCallArgumentsDoneEvent):
item_id = event.item_id
if item_id in function_calls:
function_calls[item_id]["arguments"] = event.arguments
elif isinstance(event, ResponseOutputItemDoneEvent):
item = event.item
if getattr(item, "type", None) == "function_call":
item_id = getattr(item, "id", "") or ""
if item_id in function_calls:
function_calls[item_id]["name"] = getattr(item, "name", "")
function_calls[item_id]["call_id"] = getattr(item, "call_id", "")
function_calls[item_id]["arguments"] = getattr(item, "arguments", "")
elif isinstance(event, ResponseCompletedEvent):
response = event.response
usage = getattr(response, "usage", None)
if usage:
tokens = LLMTokenUsage(
prompt_tokens=getattr(usage, "input_tokens", 0),
completion_tokens=getattr(usage, "output_tokens", 0),
total_tokens=getattr(usage, "total_tokens", 0),
)
await self.start_llm_usage_metrics(tokens)
model = getattr(response, "model", None)
if model:
self._full_model_name = model
# Process any function calls
if function_calls:
fc_list: List[FunctionCallFromLLM] = []
for item_id, fc in function_calls.items():
try:
arguments = json.loads(fc["arguments"]) if fc["arguments"] else {}
except json.JSONDecodeError:
logger.warning(
f"{self}: Failed to parse function call arguments: {fc['arguments']}"
)
arguments = {}
fc_list.append(
FunctionCallFromLLM(
context=context,
tool_call_id=fc["call_id"],
function_name=fc["name"],
arguments=arguments,
)
)
await self.run_function_calls(fc_list)
def _build_response_params(self, invocation_params: OpenAIResponsesLLMInvocationParams) -> dict:
"""Build parameters for the responses.create() call.
Args:
invocation_params: Parameters derived from the LLM context.
Returns:
Dictionary of parameters for the Responses API call.
"""
params: Dict[str, Any] = {
"model": self._settings.model,
"stream": True,
"input": invocation_params["input"],
}
# instructions (set by the adapter when input is non-empty)
if "instructions" in invocation_params:
params["instructions"] = invocation_params["instructions"]
# Optional parameters - only include if given
if isinstance(self._settings.temperature, (int, float)):
params["temperature"] = self._settings.temperature
if isinstance(self._settings.top_p, (int, float)):
params["top_p"] = self._settings.top_p
if isinstance(self._settings.max_completion_tokens, int):
params["max_output_tokens"] = self._settings.max_completion_tokens
# Tools
tools = invocation_params.get("tools")
if tools is not None and not isinstance(tools, type(NOT_GIVEN)):
params["tools"] = tools
# Extra settings
params.update(self._settings.extra)
return params
async def run_inference(
self,
context: LLMContext,
max_tokens: Optional[int] = None,
system_instruction: Optional[str] = None,
) -> Optional[str]:
"""Run a one-shot, out-of-band inference with the given LLM context.
Args:
context: The LLM context containing conversation history.
max_tokens: Optional maximum number of tokens to generate.
system_instruction: Optional system instruction for this inference.
Returns:
The LLM's response as a string, or None if no response is generated.
"""
adapter = self.get_llm_adapter()
effective_instruction = system_instruction or self._settings.system_instruction
invocation_params = adapter.get_llm_invocation_params(
context, system_instruction=effective_instruction
)
params = self._build_response_params(invocation_params)
# Override for non-streaming
params["stream"] = False
# Override instructions if caller provided one explicitly
if system_instruction is not None:
params["instructions"] = system_instruction
if max_tokens is not None:
params["max_output_tokens"] = max_tokens
response = await self._client.responses.create(**params)
return response.output_text
__all__ = ["OpenAIResponsesLLMService", "OpenAIResponsesLLMSettings"]

View File

@@ -0,0 +1,349 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Unit tests for the OpenAI Responses API adapter.
Tests the conversion from LLMContext messages to Responses API input items, including:
1. Simple user/assistant text messages pass through (with correct role)
2. System role converted to developer role
3. First-message system role triggers a warning
4. Assistant messages with tool_calls produce function_call input items
5. Tool messages produce function_call_output input items
6. Mixed conversations with text + function calls convert correctly
7. Multimodal content conversion (text -> input_text, image_url -> input_image)
8. Tools schema flattening (nested function dict -> flat format)
9. Empty messages list
10. LLMSpecificMessage with llm="openai_responses" passes through
"""
import unittest
from unittest.mock import patch
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.adapters.services.open_ai_responses_adapter import OpenAIResponsesLLMAdapter
from pipecat.processors.aggregators.llm_context import LLMContext, LLMStandardMessage
class TestOpenAIResponsesAdapter(unittest.TestCase):
def setUp(self):
self.adapter = OpenAIResponsesLLMAdapter()
def test_simple_user_assistant_messages(self):
"""Simple user/assistant text messages are converted correctly."""
messages: list[LLMStandardMessage] = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there!"},
]
context = LLMContext(messages=messages)
params = self.adapter.get_llm_invocation_params(context)
self.assertEqual(len(params["input"]), 2)
self.assertEqual(params["input"][0], {"role": "user", "content": "Hello"})
self.assertEqual(params["input"][1], {"role": "assistant", "content": "Hi there!"})
def test_system_role_converted_to_developer(self):
"""System role messages are converted to developer role."""
messages: list[LLMStandardMessage] = [
{"role": "system", "content": "You are helpful."},
{"role": "user", "content": "Hello"},
]
context = LLMContext(messages=messages)
params = self.adapter.get_llm_invocation_params(context)
self.assertEqual(params["input"][0]["role"], "developer")
self.assertEqual(params["input"][0]["content"], "You are helpful.")
def test_first_system_message_triggers_warning(self):
"""First system message triggers a warning about using system_instruction."""
# Use a fresh adapter so the warning hasn't been emitted yet
adapter = OpenAIResponsesLLMAdapter()
messages: list[LLMStandardMessage] = [
{"role": "system", "content": "You are helpful."},
{"role": "user", "content": "Hello"},
]
context = LLMContext(messages=messages)
with patch("pipecat.adapters.services.open_ai_responses_adapter.logger") as mock_logger:
adapter.get_llm_invocation_params(context)
mock_logger.warning.assert_called_once()
warning_msg = mock_logger.warning.call_args[0][0]
self.assertIn("system_instruction", warning_msg)
def test_non_initial_system_message_no_warning(self):
"""Non-initial system messages are converted without a warning."""
messages: list[LLMStandardMessage] = [
{"role": "user", "content": "Hello"},
{"role": "system", "content": "New instruction"},
]
context = LLMContext(messages=messages)
adapter = OpenAIResponsesLLMAdapter()
with patch("pipecat.adapters.services.open_ai_responses_adapter.logger") as mock_logger:
params = adapter.get_llm_invocation_params(context)
mock_logger.warning.assert_not_called()
self.assertEqual(params["input"][1]["role"], "developer")
self.assertEqual(params["input"][1]["content"], "New instruction")
def test_first_system_message_warning_fires_only_once(self):
"""The first-system-message warning fires only once per adapter instance."""
messages: list[LLMStandardMessage] = [
{"role": "system", "content": "You are helpful."},
{"role": "user", "content": "Hello"},
]
context = LLMContext(messages=messages)
adapter = OpenAIResponsesLLMAdapter()
with patch("pipecat.adapters.services.open_ai_responses_adapter.logger") as mock_logger:
adapter.get_llm_invocation_params(context)
adapter.get_llm_invocation_params(context)
# Warning should have been emitted exactly once, not twice
mock_logger.warning.assert_called_once()
def test_assistant_tool_calls_to_function_call(self):
"""Assistant messages with tool_calls produce function_call input items."""
messages = [
{
"role": "assistant",
"tool_calls": [
{
"id": "call_123",
"function": {
"name": "get_weather",
"arguments": '{"location": "SF"}',
},
"type": "function",
}
],
}
]
context = LLMContext(messages=messages)
params = self.adapter.get_llm_invocation_params(context)
self.assertEqual(len(params["input"]), 1)
fc = params["input"][0]
self.assertEqual(fc["type"], "function_call")
self.assertEqual(fc["call_id"], "call_123")
self.assertEqual(fc["name"], "get_weather")
self.assertEqual(fc["arguments"], '{"location": "SF"}')
def test_multiple_tool_calls(self):
"""Multiple tool calls in one assistant message produce multiple function_call items."""
messages = [
{
"role": "assistant",
"tool_calls": [
{
"id": "call_1",
"function": {"name": "get_weather", "arguments": '{"location": "SF"}'},
"type": "function",
},
{
"id": "call_2",
"function": {"name": "get_restaurant", "arguments": '{"location": "SF"}'},
"type": "function",
},
],
}
]
context = LLMContext(messages=messages)
params = self.adapter.get_llm_invocation_params(context)
self.assertEqual(len(params["input"]), 2)
self.assertEqual(params["input"][0]["name"], "get_weather")
self.assertEqual(params["input"][1]["name"], "get_restaurant")
def test_tool_message_to_function_call_output(self):
"""Tool role messages produce function_call_output input items."""
messages = [
{
"role": "tool",
"content": '{"temperature": "72"}',
"tool_call_id": "call_123",
}
]
context = LLMContext(messages=messages)
params = self.adapter.get_llm_invocation_params(context)
self.assertEqual(len(params["input"]), 1)
fco = params["input"][0]
self.assertEqual(fco["type"], "function_call_output")
self.assertEqual(fco["call_id"], "call_123")
self.assertEqual(fco["output"], '{"temperature": "72"}')
def test_mixed_conversation(self):
"""Mixed conversation with text + function calls converts correctly."""
messages = [
{"role": "user", "content": "What's the weather in SF?"},
{
"role": "assistant",
"tool_calls": [
{
"id": "call_abc",
"function": {"name": "get_weather", "arguments": '{"location": "SF"}'},
"type": "function",
}
],
},
{
"role": "tool",
"content": '{"temp": "72"}',
"tool_call_id": "call_abc",
},
{"role": "assistant", "content": "It's 72 degrees in SF."},
]
context = LLMContext(messages=messages)
params = self.adapter.get_llm_invocation_params(context)
self.assertEqual(len(params["input"]), 4)
self.assertEqual(params["input"][0]["role"], "user")
self.assertEqual(params["input"][1]["type"], "function_call")
self.assertEqual(params["input"][2]["type"], "function_call_output")
self.assertEqual(params["input"][3]["role"], "assistant")
def test_multimodal_text_conversion(self):
"""Multimodal text content parts are converted to input_text."""
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": "What's in this image?"},
],
}
]
context = LLMContext(messages=messages)
params = self.adapter.get_llm_invocation_params(context)
content = params["input"][0]["content"]
self.assertEqual(len(content), 1)
self.assertEqual(content[0]["type"], "input_text")
self.assertEqual(content[0]["text"], "What's in this image?")
def test_multimodal_image_conversion(self):
"""Multimodal image_url content parts are converted to input_image."""
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": "Describe this:"},
{
"type": "image_url",
"image_url": {"url": "data:image/jpeg;base64,abc123"},
},
],
}
]
context = LLMContext(messages=messages)
params = self.adapter.get_llm_invocation_params(context)
content = params["input"][0]["content"]
self.assertEqual(len(content), 2)
self.assertEqual(content[0]["type"], "input_text")
self.assertEqual(content[1]["type"], "input_image")
self.assertEqual(content[1]["image_url"], "data:image/jpeg;base64,abc123")
self.assertEqual(content[1]["detail"], "auto")
def test_multimodal_image_with_detail(self):
"""Image content parts preserve the detail setting when provided."""
messages = [
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {"url": "https://example.com/img.png", "detail": "high"},
},
],
}
]
context = LLMContext(messages=messages)
params = self.adapter.get_llm_invocation_params(context)
content = params["input"][0]["content"]
self.assertEqual(content[0]["detail"], "high")
def test_tools_schema_flattening(self):
"""Tools schema with nested function dict is flattened to Responses API format."""
weather_fn = FunctionSchema(
name="get_weather",
description="Get the current weather",
properties={
"location": {"type": "string", "description": "The city"},
},
required=["location"],
)
tools = ToolsSchema(standard_tools=[weather_fn])
context = LLMContext(tools=tools)
params = self.adapter.get_llm_invocation_params(context)
tool_list = params["tools"]
self.assertEqual(len(tool_list), 1)
tool = tool_list[0]
self.assertEqual(tool["type"], "function")
self.assertEqual(tool["name"], "get_weather")
self.assertEqual(tool["description"], "Get the current weather")
self.assertIn("properties", tool["parameters"])
def test_empty_messages(self):
"""Empty messages list produces empty input list."""
context = LLMContext(messages=[])
params = self.adapter.get_llm_invocation_params(context)
self.assertEqual(params["input"], [])
def test_llm_specific_message_passthrough(self):
"""LLMSpecificMessage with llm='openai_responses' passes through."""
specific_msg = self.adapter.create_llm_specific_message(
{"type": "function_call", "call_id": "x", "name": "foo", "arguments": "{}"}
)
messages = [
{"role": "user", "content": "Hello"},
specific_msg,
]
context = LLMContext(messages=messages)
params = self.adapter.get_llm_invocation_params(context)
self.assertEqual(len(params["input"]), 2)
self.assertEqual(params["input"][0]["role"], "user")
self.assertEqual(params["input"][1]["type"], "function_call")
def test_id_for_llm_specific_messages(self):
"""Adapter identifier is 'openai_responses'."""
self.assertEqual(self.adapter.id_for_llm_specific_messages, "openai_responses")
def test_system_instruction_with_messages_sets_instructions(self):
"""When system_instruction is provided and input is non-empty, sets instructions."""
messages: list[LLMStandardMessage] = [
{"role": "user", "content": "Hello"},
]
context = LLMContext(messages=messages)
params = self.adapter.get_llm_invocation_params(context, system_instruction="Be helpful.")
self.assertEqual(params["instructions"], "Be helpful.")
self.assertEqual(len(params["input"]), 1)
self.assertEqual(params["input"][0]["role"], "user")
def test_system_instruction_with_empty_input_becomes_developer_message(self):
"""When system_instruction is provided but input is empty, it becomes a developer message."""
context = LLMContext(messages=[])
params = self.adapter.get_llm_invocation_params(context, system_instruction="Be helpful.")
self.assertNotIn("instructions", params)
self.assertEqual(len(params["input"]), 1)
self.assertEqual(params["input"][0]["role"], "developer")
self.assertEqual(params["input"][0]["content"], "Be helpful.")
def test_no_system_instruction_omits_instructions(self):
"""When no system_instruction is provided, instructions key is absent."""
context = LLMContext(messages=[{"role": "user", "content": "Hi"}])
params = self.adapter.get_llm_invocation_params(context)
self.assertNotIn("instructions", params)
if __name__ == "__main__":
unittest.main()