From 4d548117fa8b0fa83c67397de5ca8b6d03228cd2 Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Wed, 18 Mar 2026 11:45:23 -0400 Subject: [PATCH] feat: add OpenAI Responses API LLM service MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add OpenAIResponsesLLMService using the Responses API, with a dedicated adapter that converts LLMContext messages to Responses API input items (system→developer, tool_calls→function_call, tool→function_call_output, multimodal content conversion, and tools schema flattening). - New adapter: open_ai_responses_adapter.py - New service: openai/responses/llm.py - Examples: 07-interruptible and 14-function-calling variants - 19 unit tests for adapter conversion logic - Eval entries for both examples --- .../07-interruptible-openai-responses.py | 125 ++++++ .../14-function-calling-openai-responses.py | 175 ++++++++ scripts/evals/run-release-evals.py | 3 + .../services/open_ai_responses_adapter.py | 240 +++++++++++ src/pipecat/services/openai/__init__.py | 1 + .../services/openai/responses/__init__.py | 5 + src/pipecat/services/openai/responses/llm.py | 393 ++++++++++++++++++ tests/test_openai_responses_adapter.py | 349 ++++++++++++++++ 8 files changed, 1291 insertions(+) create mode 100644 examples/foundational/07-interruptible-openai-responses.py create mode 100644 examples/foundational/14-function-calling-openai-responses.py create mode 100644 src/pipecat/adapters/services/open_ai_responses_adapter.py create mode 100644 src/pipecat/services/openai/responses/__init__.py create mode 100644 src/pipecat/services/openai/responses/llm.py create mode 100644 tests/test_openai_responses_adapter.py diff --git a/examples/foundational/07-interruptible-openai-responses.py b/examples/foundational/07-interruptible-openai-responses.py new file mode 100644 index 000000000..baae3754a --- /dev/null +++ b/examples/foundational/07-interruptible-openai-responses.py @@ -0,0 +1,125 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import os + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import LLMRunFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import ( + LLMContextAggregatorPair, + LLMUserAggregatorParams, +) +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.openai.responses.llm import OpenAIResponsesLLMService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams +from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams + +load_dotenv(override=True) + +# We use lambdas to defer transport parameter creation until the transport +# type is selected at runtime. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info(f"Starting bot") + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + settings=CartesiaTTSService.Settings( + voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ), + ) + + llm = OpenAIResponsesLLMService( + api_key=os.getenv("OPENAI_API_KEY"), + settings=OpenAIResponsesLLMService.Settings( + system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.", + ), + ) + + context = LLMContext() + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + ) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, + user_aggregator, # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + assistant_aggregator, # Assistant spoken responses + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + # Kick off the conversation. + context.add_message( + {"role": "developer", "content": "Please introduce yourself to the user."} + ) + await task.queue_frames([LLMRunFrame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/examples/foundational/14-function-calling-openai-responses.py b/examples/foundational/14-function-calling-openai-responses.py new file mode 100644 index 000000000..58cac774a --- /dev/null +++ b/examples/foundational/14-function-calling-openai-responses.py @@ -0,0 +1,175 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import os + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.adapters.schemas.function_schema import FunctionSchema +from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import LLMRunFrame, TTSSpeakFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import ( + LLMContextAggregatorPair, + LLMUserAggregatorParams, +) +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.llm_service import FunctionCallParams +from pipecat.services.openai.responses.llm import OpenAIResponsesLLMService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams +from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams + +load_dotenv(override=True) + + +async def fetch_weather_from_api(params: FunctionCallParams): + await params.result_callback({"conditions": "nice", "temperature": "75"}) + + +async def fetch_restaurant_recommendation(params: FunctionCallParams): + await params.result_callback({"name": "The Golden Dragon"}) + + +# We use lambdas to defer transport parameter creation until the transport +# type is selected at runtime. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info(f"Starting bot") + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + settings=CartesiaTTSService.Settings( + voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ), + ) + + llm = OpenAIResponsesLLMService( + api_key=os.getenv("OPENAI_API_KEY"), + settings=OpenAIResponsesLLMService.Settings( + system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.", + ), + ) + + # You can also register a function_name of None to get all functions + # sent to the same callback with an additional function_name parameter. + llm.register_function("get_current_weather", fetch_weather_from_api) + llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation) + + @llm.event_handler("on_function_calls_started") + async def on_function_calls_started(service, function_calls): + await tts.queue_frame(TTSSpeakFrame("Let me check on that.")) + + weather_function = FunctionSchema( + name="get_current_weather", + description="Get the current weather", + properties={ + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "format": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "The temperature unit to use. Infer this from the user's location.", + }, + }, + required=["location", "format"], + ) + restaurant_function = FunctionSchema( + name="get_restaurant_recommendation", + description="Get a restaurant recommendation", + properties={ + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + }, + required=["location"], + ) + tools = ToolsSchema(standard_tools=[weather_function, restaurant_function]) + + context = LLMContext(tools=tools) + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + ) + + pipeline = Pipeline( + [ + transport.input(), + stt, + user_aggregator, + llm, + tts, + transport.output(), + assistant_aggregator, + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + # Kick off the conversation. + context.add_message( + {"role": "developer", "content": "Please introduce yourself to the user."} + ) + await task.queue_frames([LLMRunFrame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/scripts/evals/run-release-evals.py b/scripts/evals/run-release-evals.py index 625f33564..9671703ba 100644 --- a/scripts/evals/run-release-evals.py +++ b/scripts/evals/run-release-evals.py @@ -147,6 +147,7 @@ TESTS_07 = [ ("07zi-interruptible-piper.py", EVAL_SIMPLE_MATH), ("07zj-interruptible-kokoro.py", EVAL_SIMPLE_MATH), ("07zk-interruptible-resembleai.py", EVAL_SIMPLE_MATH), + ("07-interruptible-openai-responses.py", EVAL_SIMPLE_MATH), # Needs a local XTTS docker instance running. # ("07i-interruptible-xtts.py", EVAL_SIMPLE_MATH), ] @@ -184,6 +185,8 @@ TESTS_14 = [ ("14v-function-calling-openai.py", EVAL_WEATHER), ("14w-function-calling-mistral.py", EVAL_WEATHER), ("14x-function-calling-openpipe.py", EVAL_WEATHER), + ("14-function-calling-openai-responses.py", EVAL_WEATHER), + ("14-function-calling-openai-responses.py", EVAL_WEATHER_AND_RESTAURANT), # Video ("14d-function-calling-anthropic-video.py", EVAL_VISION_CAMERA), ("14d-function-calling-aws-video.py", EVAL_VISION_CAMERA), diff --git a/src/pipecat/adapters/services/open_ai_responses_adapter.py b/src/pipecat/adapters/services/open_ai_responses_adapter.py new file mode 100644 index 000000000..f55ba6435 --- /dev/null +++ b/src/pipecat/adapters/services/open_ai_responses_adapter.py @@ -0,0 +1,240 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""OpenAI Responses API adapter for Pipecat.""" + +import copy +from typing import Any, Dict, List, Optional, TypedDict + +from loguru import logger +from openai._types import NotGiven as OpenAINotGiven +from openai.types.responses import FunctionToolParam, ResponseInputItemParam + +from pipecat.adapters.base_llm_adapter import BaseLLMAdapter +from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.processors.aggregators.llm_context import ( + LLMContext, + LLMContextMessage, + LLMSpecificMessage, + NotGiven, +) + + +class OpenAIResponsesLLMInvocationParams(TypedDict, total=False): + """Context-based parameters for invoking OpenAI Responses API.""" + + input: List[ResponseInputItemParam] + tools: List[FunctionToolParam] | OpenAINotGiven + instructions: str + + +class OpenAIResponsesLLMAdapter(BaseLLMAdapter[OpenAIResponsesLLMInvocationParams]): + """OpenAI Responses API adapter for Pipecat. + + Handles: + + - Converting LLMContext messages to Responses API input items + - Converting Pipecat's standardized tools schema to Responses API function tool format + - Extracting and sanitizing messages from the LLM context for logging + """ + + def __init__(self): + """Initialize the adapter.""" + super().__init__() + self._warned_system_instruction = False + + @property + def id_for_llm_specific_messages(self) -> str: + """Get the identifier used in LLMSpecificMessage instances.""" + return "openai_responses" + + def get_llm_invocation_params( + self, + context: LLMContext, + *, + system_instruction: Optional[str] = None, + ) -> OpenAIResponsesLLMInvocationParams: + """Get Responses API invocation parameters from a universal LLM context. + + Args: + context: The LLM context containing messages, tools, etc. + system_instruction: Optional system instruction from service settings. + + Returns: + Dictionary of parameters for the Responses API. + """ + messages = self.get_messages(context) + input_items = self._convert_messages_to_input(messages) + + params: OpenAIResponsesLLMInvocationParams = { + "input": input_items, + "tools": self.from_standard_tools(context.tools), + } + + if system_instruction: + # Compatibility: The Responses API requires at least one input + # message when instructions are provided. Contexts that worked with + # OpenAILLMService (system_instruction + empty messages) need the + # instructions converted to an initial developer message. + if not input_items: + params["input"] = [{"role": "developer", "content": system_instruction}] + else: + params["instructions"] = system_instruction + + return params + + def to_provider_tools_format(self, tools_schema: ToolsSchema) -> List[FunctionToolParam]: + """Convert function schemas to Responses API function tool format. + + Args: + tools_schema: The Pipecat tools schema to convert. + + Returns: + List of Responses API function tool definitions. + """ + functions_schema = tools_schema.standard_tools + result = [] + for func in functions_schema: + d = func.to_default_dict() + tool: FunctionToolParam = { + "type": "function", + "name": d["name"], + "parameters": d.get("parameters", {}), + "strict": d.get("strict", None), + } + if "description" in d: + tool["description"] = d["description"] + result.append(tool) + return result + + def get_messages_for_logging(self, context: LLMContext) -> List[Dict[str, Any]]: + """Get messages from context in a format ready for logging. + + Removes or truncates sensitive data like image content for safe logging. + + Args: + context: The LLM context containing messages. + + Returns: + List of messages in a format ready for logging. + """ + msgs = [] + for message in self.get_messages(context): + msg = copy.deepcopy(message) + if "content" in msg: + if isinstance(msg["content"], list): + for item in msg["content"]: + if item.get("type") == "image_url": + if item["image_url"]["url"].startswith("data:image/"): + item["image_url"]["url"] = "data:image/..." + if item.get("type") == "input_audio": + item["input_audio"]["data"] = "..." + msgs.append(msg) + return msgs + + def _convert_messages_to_input( + self, messages: List[LLMContextMessage] + ) -> List[ResponseInputItemParam]: + """Convert LLMContext messages to Responses API input items. + + Args: + messages: Messages from the LLMContext. + + Returns: + List of Responses API input items. + """ + result: List[ResponseInputItemParam] = [] + is_first = True + + for message in messages: + if isinstance(message, LLMSpecificMessage): + result.append(message.message) + is_first = False + continue + + role = message.get("role") + + if role == "system": + if is_first and not self._warned_system_instruction: + logger.warning( + "System messages in LLMContext are converted to 'developer' role for the " + "Responses API. Consider using settings.system_instruction instead, which " + "maps to the 'instructions' parameter." + ) + self._warned_system_instruction = True + content = message.get("content", "") + if isinstance(content, list): + content = self._convert_multimodal_content(content) + result.append({"role": "developer", "content": content}) + + elif role == "user": + content = message.get("content", "") + if isinstance(content, list): + content = self._convert_multimodal_content(content) + result.append({"role": "user", "content": content}) + + elif role == "assistant": + tool_calls = message.get("tool_calls") + if tool_calls: + for tc in tool_calls: + func = tc.get("function", {}) + result.append( + { + "type": "function_call", + "call_id": tc.get("id", ""), + "name": func.get("name", ""), + "arguments": func.get("arguments", ""), + } + ) + else: + content = message.get("content", "") + if isinstance(content, list): + content = self._convert_multimodal_content(content) + result.append({"role": "assistant", "content": content}) + + elif role == "tool": + content = message.get("content", "") + if not isinstance(content, str): + content = str(content) + result.append( + { + "type": "function_call_output", + "call_id": message.get("tool_call_id", ""), + "output": content, + } + ) + + is_first = False + + return result + + def _convert_multimodal_content(self, content: list) -> list: + """Convert multimodal content parts to Responses API format. + + Args: + content: List of content parts from the LLMContext message. + + Returns: + List of content parts in Responses API format. + """ + result = [] + for part in content: + part_type = part.get("type") + if part_type == "text": + result.append({"type": "input_text", "text": part.get("text", "")}) + elif part_type == "image_url": + image_url_obj = part.get("image_url", {}) + result.append( + { + "type": "input_image", + "image_url": image_url_obj.get("url", ""), + "detail": image_url_obj.get("detail", "auto"), + } + ) + else: + # Pass through unknown types as-is + result.append(part) + return result diff --git a/src/pipecat/services/openai/__init__.py b/src/pipecat/services/openai/__init__.py index e182264b1..3caa3c3cb 100644 --- a/src/pipecat/services/openai/__init__.py +++ b/src/pipecat/services/openai/__init__.py @@ -11,6 +11,7 @@ from pipecat.services import DeprecatedModuleProxy from .image import * from .llm import * from .realtime import * +from .responses.llm import * from .stt import * from .tts import * diff --git a/src/pipecat/services/openai/responses/__init__.py b/src/pipecat/services/openai/responses/__init__.py new file mode 100644 index 000000000..c4d243b97 --- /dev/null +++ b/src/pipecat/services/openai/responses/__init__.py @@ -0,0 +1,5 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# diff --git a/src/pipecat/services/openai/responses/llm.py b/src/pipecat/services/openai/responses/llm.py new file mode 100644 index 000000000..e72b8acdb --- /dev/null +++ b/src/pipecat/services/openai/responses/llm.py @@ -0,0 +1,393 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""OpenAI Responses API LLM service implementation.""" + +import json +from contextlib import asynccontextmanager +from dataclasses import dataclass, field +from typing import Any, Dict, List, Mapping, Optional + +import httpx +from loguru import logger +from openai import NOT_GIVEN, AsyncOpenAI, AsyncStream, DefaultAsyncHttpxClient +from openai.types.responses import ( + ResponseCompletedEvent, + ResponseFunctionCallArgumentsDeltaEvent, + ResponseFunctionCallArgumentsDoneEvent, + ResponseOutputItemAddedEvent, + ResponseOutputItemDoneEvent, + ResponseStreamEvent, + ResponseTextDeltaEvent, +) + +from pipecat.adapters.services.open_ai_responses_adapter import ( + OpenAIResponsesLLMAdapter, + OpenAIResponsesLLMInvocationParams, +) +from pipecat.frames.frames import ( + Frame, + LLMContextFrame, + LLMFullResponseEndFrame, + LLMFullResponseStartFrame, +) +from pipecat.metrics.metrics import LLMTokenUsage +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.llm_service import FunctionCallFromLLM, LLMService +from pipecat.services.settings import NOT_GIVEN as _NOT_GIVEN +from pipecat.services.settings import LLMSettings, _NotGiven +from pipecat.utils.tracing.service_decorators import traced_llm + + +@dataclass +class OpenAIResponsesLLMSettings(LLMSettings): + """Settings for OpenAIResponsesLLMService. + + Parameters: + max_completion_tokens: Maximum completion tokens to generate. + """ + + max_completion_tokens: int | _NotGiven = field(default_factory=lambda: _NOT_GIVEN) + + +class OpenAIResponsesLLMService(LLMService): + """OpenAI Responses API LLM service. + + This service works with the universal LLMContext and LLMContextAggregatorPair. + + Example:: + + llm = OpenAIResponsesLLMService( + api_key=os.getenv("OPENAI_API_KEY"), + settings=OpenAIResponsesLLMService.Settings( + model="gpt-4.1", + system_instruction="You are a helpful assistant.", + ), + ) + """ + + Settings = OpenAIResponsesLLMSettings + _settings: Settings + + adapter_class = OpenAIResponsesLLMAdapter + + def __init__( + self, + *, + model: Optional[str] = None, + api_key=None, + base_url=None, + organization=None, + project=None, + default_headers: Optional[Mapping[str, str]] = None, + settings: Optional[Settings] = None, + **kwargs, + ): + """Initialize the OpenAI Responses API LLM service. + + Args: + model: The OpenAI model name to use. Defaults to "gpt-4.1". + api_key: OpenAI API key. If None, uses environment variable. + base_url: Custom base URL for OpenAI API. If None, uses default. + organization: OpenAI organization ID. + project: OpenAI project ID. + default_headers: Additional HTTP headers to include in requests. + settings: Runtime-updatable settings. When provided alongside + other parameters, ``settings`` values take precedence. + **kwargs: Additional arguments passed to the parent LLMService. + """ + default_settings = self.Settings( + model="gpt-4.1", + system_instruction=None, + frequency_penalty=None, + presence_penalty=None, + seed=None, + temperature=NOT_GIVEN, + top_p=NOT_GIVEN, + top_k=None, + max_tokens=None, + max_completion_tokens=NOT_GIVEN, + filter_incomplete_user_turns=False, + user_turn_completion_config=None, + extra={}, + ) + + if model is not None: + default_settings.model = model + + if settings is not None: + default_settings.apply_update(settings) + + super().__init__( + settings=default_settings, + **kwargs, + ) + + self._client = self._create_client( + api_key=api_key, + base_url=base_url, + organization=organization, + project=project, + default_headers=default_headers, + ) + + if self._settings.system_instruction: + logger.debug(f"{self}: Using system instruction: {self._settings.system_instruction}") + + def _create_client( + self, + api_key=None, + base_url=None, + organization=None, + project=None, + default_headers=None, + ) -> AsyncOpenAI: + """Create an AsyncOpenAI client instance. + + Args: + api_key: OpenAI API key. + base_url: Custom base URL for the API. + organization: OpenAI organization ID. + project: OpenAI project ID. + default_headers: Additional HTTP headers. + + Returns: + Configured AsyncOpenAI client instance. + """ + return AsyncOpenAI( + api_key=api_key, + base_url=base_url, + organization=organization, + project=project, + http_client=DefaultAsyncHttpxClient( + limits=httpx.Limits( + max_keepalive_connections=100, max_connections=1000, keepalive_expiry=None + ) + ), + default_headers=default_headers, + ) + + def can_generate_metrics(self) -> bool: + """Check if this service can generate processing metrics.""" + return True + + async def process_frame(self, frame: Frame, direction: FrameDirection): + """Process frames for LLM completion requests. + + Args: + frame: The frame to process. + direction: The direction of frame processing. + """ + await super().process_frame(frame, direction) + + context = None + if isinstance(frame, LLMContextFrame): + context = frame.context + else: + await self.push_frame(frame, direction) + + if context: + try: + await self.push_frame(LLMFullResponseStartFrame()) + await self.start_processing_metrics() + await self._process_context(context) + except httpx.TimeoutException as e: + await self._call_event_handler("on_completion_timeout") + await self.push_error(error_msg="LLM completion timeout", exception=e) + except Exception as e: + await self.push_error(error_msg=f"Error during completion: {e}", exception=e) + finally: + await self.stop_processing_metrics() + await self.push_frame(LLMFullResponseEndFrame()) + + @traced_llm + async def _process_context(self, context: LLMContext): + adapter = self.get_llm_adapter() + logger.debug( + f"{self}: Generating response from universal context " + f"{adapter.get_messages_for_logging(context)}" + ) + + invocation_params: OpenAIResponsesLLMInvocationParams = adapter.get_llm_invocation_params( + context, system_instruction=self._settings.system_instruction + ) + + params = self._build_response_params(invocation_params) + + await self.start_ttfb_metrics() + + stream: AsyncStream[ResponseStreamEvent] = await self._client.responses.create(**params) + + # Track function calls across stream events + function_calls: Dict[str, Dict[str, str]] = {} # item_id -> {name, call_id, arguments} + current_arguments: Dict[str, str] = {} # item_id -> accumulated arguments + + @asynccontextmanager + async def _closing(stream): + chunk_iter = stream.__aiter__() + try: + yield chunk_iter + finally: + if hasattr(chunk_iter, "aclose"): + await chunk_iter.aclose() + if hasattr(stream, "close"): + await stream.close() + elif hasattr(stream, "aclose"): + await stream.aclose() + + async with _closing(stream) as event_iter: + async for event in event_iter: + if isinstance(event, ResponseTextDeltaEvent): + await self.stop_ttfb_metrics() + await self._push_llm_text(event.delta) + + elif isinstance(event, ResponseOutputItemAddedEvent): + await self.stop_ttfb_metrics() + item = event.item + if getattr(item, "type", None) == "function_call": + item_id = getattr(item, "id", "") or "" + function_calls[item_id] = { + "name": getattr(item, "name", ""), + "call_id": getattr(item, "call_id", ""), + "arguments": "", + } + current_arguments[item_id] = "" + + elif isinstance(event, ResponseFunctionCallArgumentsDeltaEvent): + item_id = event.item_id + if item_id in current_arguments: + current_arguments[item_id] += event.delta + + elif isinstance(event, ResponseFunctionCallArgumentsDoneEvent): + item_id = event.item_id + if item_id in function_calls: + function_calls[item_id]["arguments"] = event.arguments + + elif isinstance(event, ResponseOutputItemDoneEvent): + item = event.item + if getattr(item, "type", None) == "function_call": + item_id = getattr(item, "id", "") or "" + if item_id in function_calls: + function_calls[item_id]["name"] = getattr(item, "name", "") + function_calls[item_id]["call_id"] = getattr(item, "call_id", "") + function_calls[item_id]["arguments"] = getattr(item, "arguments", "") + + elif isinstance(event, ResponseCompletedEvent): + response = event.response + usage = getattr(response, "usage", None) + if usage: + tokens = LLMTokenUsage( + prompt_tokens=getattr(usage, "input_tokens", 0), + completion_tokens=getattr(usage, "output_tokens", 0), + total_tokens=getattr(usage, "total_tokens", 0), + ) + await self.start_llm_usage_metrics(tokens) + + model = getattr(response, "model", None) + if model: + self._full_model_name = model + + # Process any function calls + if function_calls: + fc_list: List[FunctionCallFromLLM] = [] + for item_id, fc in function_calls.items(): + try: + arguments = json.loads(fc["arguments"]) if fc["arguments"] else {} + except json.JSONDecodeError: + logger.warning( + f"{self}: Failed to parse function call arguments: {fc['arguments']}" + ) + arguments = {} + fc_list.append( + FunctionCallFromLLM( + context=context, + tool_call_id=fc["call_id"], + function_name=fc["name"], + arguments=arguments, + ) + ) + await self.run_function_calls(fc_list) + + def _build_response_params(self, invocation_params: OpenAIResponsesLLMInvocationParams) -> dict: + """Build parameters for the responses.create() call. + + Args: + invocation_params: Parameters derived from the LLM context. + + Returns: + Dictionary of parameters for the Responses API call. + """ + params: Dict[str, Any] = { + "model": self._settings.model, + "stream": True, + "input": invocation_params["input"], + } + + # instructions (set by the adapter when input is non-empty) + if "instructions" in invocation_params: + params["instructions"] = invocation_params["instructions"] + + # Optional parameters - only include if given + if isinstance(self._settings.temperature, (int, float)): + params["temperature"] = self._settings.temperature + + if isinstance(self._settings.top_p, (int, float)): + params["top_p"] = self._settings.top_p + + if isinstance(self._settings.max_completion_tokens, int): + params["max_output_tokens"] = self._settings.max_completion_tokens + + # Tools + tools = invocation_params.get("tools") + if tools is not None and not isinstance(tools, type(NOT_GIVEN)): + params["tools"] = tools + + # Extra settings + params.update(self._settings.extra) + + return params + + async def run_inference( + self, + context: LLMContext, + max_tokens: Optional[int] = None, + system_instruction: Optional[str] = None, + ) -> Optional[str]: + """Run a one-shot, out-of-band inference with the given LLM context. + + Args: + context: The LLM context containing conversation history. + max_tokens: Optional maximum number of tokens to generate. + system_instruction: Optional system instruction for this inference. + + Returns: + The LLM's response as a string, or None if no response is generated. + """ + adapter = self.get_llm_adapter() + effective_instruction = system_instruction or self._settings.system_instruction + invocation_params = adapter.get_llm_invocation_params( + context, system_instruction=effective_instruction + ) + + params = self._build_response_params(invocation_params) + + # Override for non-streaming + params["stream"] = False + + # Override instructions if caller provided one explicitly + if system_instruction is not None: + params["instructions"] = system_instruction + + if max_tokens is not None: + params["max_output_tokens"] = max_tokens + + response = await self._client.responses.create(**params) + + return response.output_text + + +__all__ = ["OpenAIResponsesLLMService", "OpenAIResponsesLLMSettings"] diff --git a/tests/test_openai_responses_adapter.py b/tests/test_openai_responses_adapter.py new file mode 100644 index 000000000..973c05c8c --- /dev/null +++ b/tests/test_openai_responses_adapter.py @@ -0,0 +1,349 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Unit tests for the OpenAI Responses API adapter. + +Tests the conversion from LLMContext messages to Responses API input items, including: + +1. Simple user/assistant text messages pass through (with correct role) +2. System role converted to developer role +3. First-message system role triggers a warning +4. Assistant messages with tool_calls produce function_call input items +5. Tool messages produce function_call_output input items +6. Mixed conversations with text + function calls convert correctly +7. Multimodal content conversion (text -> input_text, image_url -> input_image) +8. Tools schema flattening (nested function dict -> flat format) +9. Empty messages list +10. LLMSpecificMessage with llm="openai_responses" passes through +""" + +import unittest +from unittest.mock import patch + +from pipecat.adapters.schemas.function_schema import FunctionSchema +from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.adapters.services.open_ai_responses_adapter import OpenAIResponsesLLMAdapter +from pipecat.processors.aggregators.llm_context import LLMContext, LLMStandardMessage + + +class TestOpenAIResponsesAdapter(unittest.TestCase): + def setUp(self): + self.adapter = OpenAIResponsesLLMAdapter() + + def test_simple_user_assistant_messages(self): + """Simple user/assistant text messages are converted correctly.""" + messages: list[LLMStandardMessage] = [ + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi there!"}, + ] + context = LLMContext(messages=messages) + params = self.adapter.get_llm_invocation_params(context) + + self.assertEqual(len(params["input"]), 2) + self.assertEqual(params["input"][0], {"role": "user", "content": "Hello"}) + self.assertEqual(params["input"][1], {"role": "assistant", "content": "Hi there!"}) + + def test_system_role_converted_to_developer(self): + """System role messages are converted to developer role.""" + messages: list[LLMStandardMessage] = [ + {"role": "system", "content": "You are helpful."}, + {"role": "user", "content": "Hello"}, + ] + context = LLMContext(messages=messages) + params = self.adapter.get_llm_invocation_params(context) + + self.assertEqual(params["input"][0]["role"], "developer") + self.assertEqual(params["input"][0]["content"], "You are helpful.") + + def test_first_system_message_triggers_warning(self): + """First system message triggers a warning about using system_instruction.""" + # Use a fresh adapter so the warning hasn't been emitted yet + adapter = OpenAIResponsesLLMAdapter() + messages: list[LLMStandardMessage] = [ + {"role": "system", "content": "You are helpful."}, + {"role": "user", "content": "Hello"}, + ] + context = LLMContext(messages=messages) + + with patch("pipecat.adapters.services.open_ai_responses_adapter.logger") as mock_logger: + adapter.get_llm_invocation_params(context) + mock_logger.warning.assert_called_once() + warning_msg = mock_logger.warning.call_args[0][0] + self.assertIn("system_instruction", warning_msg) + + def test_non_initial_system_message_no_warning(self): + """Non-initial system messages are converted without a warning.""" + messages: list[LLMStandardMessage] = [ + {"role": "user", "content": "Hello"}, + {"role": "system", "content": "New instruction"}, + ] + context = LLMContext(messages=messages) + + adapter = OpenAIResponsesLLMAdapter() + with patch("pipecat.adapters.services.open_ai_responses_adapter.logger") as mock_logger: + params = adapter.get_llm_invocation_params(context) + mock_logger.warning.assert_not_called() + + self.assertEqual(params["input"][1]["role"], "developer") + self.assertEqual(params["input"][1]["content"], "New instruction") + + def test_first_system_message_warning_fires_only_once(self): + """The first-system-message warning fires only once per adapter instance.""" + messages: list[LLMStandardMessage] = [ + {"role": "system", "content": "You are helpful."}, + {"role": "user", "content": "Hello"}, + ] + context = LLMContext(messages=messages) + + adapter = OpenAIResponsesLLMAdapter() + with patch("pipecat.adapters.services.open_ai_responses_adapter.logger") as mock_logger: + adapter.get_llm_invocation_params(context) + adapter.get_llm_invocation_params(context) + # Warning should have been emitted exactly once, not twice + mock_logger.warning.assert_called_once() + + def test_assistant_tool_calls_to_function_call(self): + """Assistant messages with tool_calls produce function_call input items.""" + messages = [ + { + "role": "assistant", + "tool_calls": [ + { + "id": "call_123", + "function": { + "name": "get_weather", + "arguments": '{"location": "SF"}', + }, + "type": "function", + } + ], + } + ] + context = LLMContext(messages=messages) + params = self.adapter.get_llm_invocation_params(context) + + self.assertEqual(len(params["input"]), 1) + fc = params["input"][0] + self.assertEqual(fc["type"], "function_call") + self.assertEqual(fc["call_id"], "call_123") + self.assertEqual(fc["name"], "get_weather") + self.assertEqual(fc["arguments"], '{"location": "SF"}') + + def test_multiple_tool_calls(self): + """Multiple tool calls in one assistant message produce multiple function_call items.""" + messages = [ + { + "role": "assistant", + "tool_calls": [ + { + "id": "call_1", + "function": {"name": "get_weather", "arguments": '{"location": "SF"}'}, + "type": "function", + }, + { + "id": "call_2", + "function": {"name": "get_restaurant", "arguments": '{"location": "SF"}'}, + "type": "function", + }, + ], + } + ] + context = LLMContext(messages=messages) + params = self.adapter.get_llm_invocation_params(context) + + self.assertEqual(len(params["input"]), 2) + self.assertEqual(params["input"][0]["name"], "get_weather") + self.assertEqual(params["input"][1]["name"], "get_restaurant") + + def test_tool_message_to_function_call_output(self): + """Tool role messages produce function_call_output input items.""" + messages = [ + { + "role": "tool", + "content": '{"temperature": "72"}', + "tool_call_id": "call_123", + } + ] + context = LLMContext(messages=messages) + params = self.adapter.get_llm_invocation_params(context) + + self.assertEqual(len(params["input"]), 1) + fco = params["input"][0] + self.assertEqual(fco["type"], "function_call_output") + self.assertEqual(fco["call_id"], "call_123") + self.assertEqual(fco["output"], '{"temperature": "72"}') + + def test_mixed_conversation(self): + """Mixed conversation with text + function calls converts correctly.""" + messages = [ + {"role": "user", "content": "What's the weather in SF?"}, + { + "role": "assistant", + "tool_calls": [ + { + "id": "call_abc", + "function": {"name": "get_weather", "arguments": '{"location": "SF"}'}, + "type": "function", + } + ], + }, + { + "role": "tool", + "content": '{"temp": "72"}', + "tool_call_id": "call_abc", + }, + {"role": "assistant", "content": "It's 72 degrees in SF."}, + ] + context = LLMContext(messages=messages) + params = self.adapter.get_llm_invocation_params(context) + + self.assertEqual(len(params["input"]), 4) + self.assertEqual(params["input"][0]["role"], "user") + self.assertEqual(params["input"][1]["type"], "function_call") + self.assertEqual(params["input"][2]["type"], "function_call_output") + self.assertEqual(params["input"][3]["role"], "assistant") + + def test_multimodal_text_conversion(self): + """Multimodal text content parts are converted to input_text.""" + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": "What's in this image?"}, + ], + } + ] + context = LLMContext(messages=messages) + params = self.adapter.get_llm_invocation_params(context) + + content = params["input"][0]["content"] + self.assertEqual(len(content), 1) + self.assertEqual(content[0]["type"], "input_text") + self.assertEqual(content[0]["text"], "What's in this image?") + + def test_multimodal_image_conversion(self): + """Multimodal image_url content parts are converted to input_image.""" + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": "Describe this:"}, + { + "type": "image_url", + "image_url": {"url": "data:image/jpeg;base64,abc123"}, + }, + ], + } + ] + context = LLMContext(messages=messages) + params = self.adapter.get_llm_invocation_params(context) + + content = params["input"][0]["content"] + self.assertEqual(len(content), 2) + self.assertEqual(content[0]["type"], "input_text") + self.assertEqual(content[1]["type"], "input_image") + self.assertEqual(content[1]["image_url"], "data:image/jpeg;base64,abc123") + self.assertEqual(content[1]["detail"], "auto") + + def test_multimodal_image_with_detail(self): + """Image content parts preserve the detail setting when provided.""" + messages = [ + { + "role": "user", + "content": [ + { + "type": "image_url", + "image_url": {"url": "https://example.com/img.png", "detail": "high"}, + }, + ], + } + ] + context = LLMContext(messages=messages) + params = self.adapter.get_llm_invocation_params(context) + + content = params["input"][0]["content"] + self.assertEqual(content[0]["detail"], "high") + + def test_tools_schema_flattening(self): + """Tools schema with nested function dict is flattened to Responses API format.""" + weather_fn = FunctionSchema( + name="get_weather", + description="Get the current weather", + properties={ + "location": {"type": "string", "description": "The city"}, + }, + required=["location"], + ) + tools = ToolsSchema(standard_tools=[weather_fn]) + context = LLMContext(tools=tools) + params = self.adapter.get_llm_invocation_params(context) + + tool_list = params["tools"] + self.assertEqual(len(tool_list), 1) + tool = tool_list[0] + self.assertEqual(tool["type"], "function") + self.assertEqual(tool["name"], "get_weather") + self.assertEqual(tool["description"], "Get the current weather") + self.assertIn("properties", tool["parameters"]) + + def test_empty_messages(self): + """Empty messages list produces empty input list.""" + context = LLMContext(messages=[]) + params = self.adapter.get_llm_invocation_params(context) + self.assertEqual(params["input"], []) + + def test_llm_specific_message_passthrough(self): + """LLMSpecificMessage with llm='openai_responses' passes through.""" + specific_msg = self.adapter.create_llm_specific_message( + {"type": "function_call", "call_id": "x", "name": "foo", "arguments": "{}"} + ) + messages = [ + {"role": "user", "content": "Hello"}, + specific_msg, + ] + context = LLMContext(messages=messages) + params = self.adapter.get_llm_invocation_params(context) + + self.assertEqual(len(params["input"]), 2) + self.assertEqual(params["input"][0]["role"], "user") + self.assertEqual(params["input"][1]["type"], "function_call") + + def test_id_for_llm_specific_messages(self): + """Adapter identifier is 'openai_responses'.""" + self.assertEqual(self.adapter.id_for_llm_specific_messages, "openai_responses") + + def test_system_instruction_with_messages_sets_instructions(self): + """When system_instruction is provided and input is non-empty, sets instructions.""" + messages: list[LLMStandardMessage] = [ + {"role": "user", "content": "Hello"}, + ] + context = LLMContext(messages=messages) + params = self.adapter.get_llm_invocation_params(context, system_instruction="Be helpful.") + + self.assertEqual(params["instructions"], "Be helpful.") + self.assertEqual(len(params["input"]), 1) + self.assertEqual(params["input"][0]["role"], "user") + + def test_system_instruction_with_empty_input_becomes_developer_message(self): + """When system_instruction is provided but input is empty, it becomes a developer message.""" + context = LLMContext(messages=[]) + params = self.adapter.get_llm_invocation_params(context, system_instruction="Be helpful.") + + self.assertNotIn("instructions", params) + self.assertEqual(len(params["input"]), 1) + self.assertEqual(params["input"][0]["role"], "developer") + self.assertEqual(params["input"][0]["content"], "Be helpful.") + + def test_no_system_instruction_omits_instructions(self): + """When no system_instruction is provided, instructions key is absent.""" + context = LLMContext(messages=[{"role": "user", "content": "Hi"}]) + params = self.adapter.get_llm_invocation_params(context) + + self.assertNotIn("instructions", params) + + +if __name__ == "__main__": + unittest.main()