Add WebSocket-based OpenAI Responses LLM service with previous_response_id optimization

Introduce a WebSocket variant of the OpenAI Responses API service that
maintains a persistent connection to wss://api.openai.com/v1/responses
for lower-latency inference. The WebSocket variant automatically uses
previous_response_id to send only incremental context when possible,
falling back to full context on reconnection or cache miss.

The WebSocket variant becomes the new default OpenAIResponsesLLMService,
and the HTTP variant is renamed to OpenAIResponsesHttpLLMService. Both
share a private base class with common settings, parameter building,
and run_inference (always HTTP) logic.
This commit is contained in:
Paul Kompfner
2026-03-25 12:06:59 -04:00
parent d1eb2699f3
commit f2a8a9e753
11 changed files with 2297 additions and 114 deletions

View File

@@ -0,0 +1,125 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.responses.llm import OpenAIResponsesHttpLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OpenAIResponsesHttpLLMService(
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAIResponsesHttpLLMService.Settings(
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
),
)
context = LLMContext()
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
user_aggregator, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
assistant_aggregator, # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,139 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from PIL import Image
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.responses.llm import OpenAIResponsesHttpLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
load_dotenv(override=True)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OpenAIResponsesHttpLLMService(
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAIResponsesHttpLLMService.Settings(
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way. You are also able to describe images.",
),
)
context = LLMContext()
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
user_aggregator, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
assistant_aggregator, # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
if not runner_args.body:
script_dir = os.path.dirname(__file__)
runner_args.body = {
"image_path": os.path.join(script_dir, "assets", "cat.jpg"),
"question": "Describe this image",
}
image_path = runner_args.body["image_path"]
question = runner_args.body["question"]
# Kick off the conversation.
image = Image.open(image_path)
message = await LLMContext.create_image_message(
image=image.tobytes(),
format="RGB",
size=image.size,
text=question,
)
context.add_message(message)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,175 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.responses.llm import OpenAIResponsesHttpLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
await params.result_callback({"conditions": "nice", "temperature": "75"})
async def fetch_restaurant_recommendation(params: FunctionCallParams):
await params.result_callback({"name": "The Golden Dragon"})
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OpenAIResponsesHttpLLMService(
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAIResponsesHttpLLMService.Settings(
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
),
)
# You can also register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
@llm.event_handler("on_function_calls_started")
async def on_function_calls_started(service, function_calls):
await tts.queue_frame(TTSSpeakFrame("Let me check on that."))
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
restaurant_function = FunctionSchema(
name="get_restaurant_recommendation",
description="Get a restaurant recommendation",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
},
required=["location"],
)
tools = ToolsSchema(standard_tools=[weather_function, restaurant_function])
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
stt,
user_aggregator,
llm,
tts,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,195 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame, TTSSpeakFrame, UserImageRequestFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import (
create_transport,
get_transport_client_id,
maybe_capture_participant_camera,
)
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.responses.llm import OpenAIResponsesHttpLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
load_dotenv(override=True)
async def fetch_user_image(params: FunctionCallParams):
"""Fetch the user image and push it to the LLM.
When called, this function pushes a UserImageRequestFrame upstream to the
transport. As a result, the transport will request the user image and push a
UserImageRawFrame downstream which will be added to the context by the LLM
assistant aggregator. The result_callback will be invoked once the image is
retrieved and processed.
"""
user_id = params.arguments["user_id"]
question = params.arguments["question"]
logger.debug(f"Requesting image with user_id={user_id}, question={question}")
# Request a user image frame and indicate that it should be added to the
# context. Also associate it to the function call. Pass the result_callback
# so it can be invoked when the image is actually retrieved.
await params.llm.push_frame(
UserImageRequestFrame(
user_id=user_id,
text=question,
append_to_context=True,
function_name=params.function_name,
tool_call_id=params.tool_call_id,
result_callback=params.result_callback,
),
FrameDirection.UPSTREAM,
)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_in_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_in_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OpenAIResponsesHttpLLMService(
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAIResponsesHttpLLMService.Settings(
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way. You are able to describe images from the user camera.",
),
)
llm.register_function("fetch_user_image", fetch_user_image)
@llm.event_handler("on_function_calls_started")
async def on_function_calls_started(service, function_calls):
await tts.queue_frame(TTSSpeakFrame("Let me check on that.", append_to_context=False))
fetch_image_function = FunctionSchema(
name="fetch_user_image",
description="Called when the user requests a description of their camera feed",
properties={
"user_id": {
"type": "string",
"description": "The ID of the user to grab the image from",
},
"question": {
"type": "string",
"description": "The question that the user is asking about the image",
},
},
required=["user_id", "question"],
)
tools = ToolsSchema(standard_tools=[fetch_image_function])
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
user_aggregator, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
assistant_aggregator, # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
await maybe_capture_participant_camera(transport, client)
client_id = get_transport_client_id(transport, client)
# Kick off the conversation.
context.add_message(
{
"role": "developer",
"content": f"Please introduce yourself to the user. Use '{client_id}' as the user ID during function calls.",
}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
@tts.event_handler("on_tts_request")
async def on_tts_request(tts, context_id: str, text: str):
logger.debug(f"On TTS request: {context_id}: {text}")
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,249 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import glob
import json
import os
from datetime import datetime
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.responses.llm import OpenAIResponsesHttpLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
BASE_FILENAME = "/tmp/pipecat_conversation_"
async def fetch_weather_from_api(params: FunctionCallParams):
temperature = 75 if params.arguments["format"] == "fahrenheit" else 24
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"format": params.arguments["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
async def get_saved_conversation_filenames(params: FunctionCallParams):
# Construct the full pattern including the BASE_FILENAME
full_pattern = f"{BASE_FILENAME}*.json"
# Use glob to find all matching files
matching_files = glob.glob(full_pattern)
logger.debug(f"matching files: {matching_files}")
await params.result_callback({"filenames": matching_files})
async def save_conversation(params: FunctionCallParams):
timestamp = datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
filename = f"{BASE_FILENAME}{timestamp}.json"
logger.debug(
f"writing conversation to {filename}\n{json.dumps(params.context.get_messages(), indent=4)}"
)
try:
with open(filename, "w") as file:
messages = params.context.get_messages()
# remove the last message, which is the instruction we just gave to save the conversation
messages.pop()
json.dump(messages, file, indent=2)
await params.result_callback({"success": True})
except Exception as e:
await params.result_callback({"success": False, "error": str(e)})
async def load_conversation(params: FunctionCallParams):
global tts
filename = params.arguments["filename"]
logger.debug(f"loading conversation from {filename}")
try:
with open(filename, "r") as file:
params.context.set_messages(json.load(file))
logger.debug(
f"loaded conversation from {filename}\n{json.dumps(params.context.get_messages(), indent=4)}"
)
await params.llm.queue_frame(TTSSpeakFrame("Ok, I've loaded that conversation."))
except Exception as e:
await params.result_callback({"success": False, "error": str(e)})
system_instruction = "You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way."
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
required=["location", "format"],
)
save_conversation_function = FunctionSchema(
name="save_conversation",
description="Save the current conversation. Use this function to persist the current conversation to external storage.",
properties={},
required=[],
)
get_filenames_function = FunctionSchema(
name="get_saved_conversation_filenames",
description="Get a list of saved conversation histories. Returns a list of filenames. Each filename includes a date and timestamp. Each file is conversation history that can be loaded into this session.",
properties={},
required=[],
)
load_conversation_function = FunctionSchema(
name="load_conversation",
description="Load a conversation history. Use this function to load a conversation history into the current session.",
properties={
"filename": {
"type": "string",
"description": "The filename of the conversation history to load.",
}
},
required=["filename"],
)
tools = ToolsSchema(
standard_tools=[
weather_function,
save_conversation_function,
get_filenames_function,
load_conversation_function,
]
)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OpenAIResponsesHttpLLMService(
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAIResponsesHttpLLMService.Settings(
system_instruction=system_instruction,
),
)
# you can either register a single function for all function calls, or specific functions
# llm.register_function(None, fetch_weather_from_api)
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("save_conversation", save_conversation)
llm.register_function("get_saved_conversation_filenames", get_saved_conversation_filenames)
llm.register_function("load_conversation", load_conversation)
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
user_aggregator,
llm, # LLM
tts,
transport.output(), # Transport bot output
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,129 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame, LLMUpdateSettingsFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.responses.llm import OpenAIResponsesHttpLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OpenAIResponsesHttpLLMService(
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAIResponsesHttpLLMService.Settings(
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
),
)
context = LLMContext()
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
stt,
user_aggregator,
llm,
tts,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
await asyncio.sleep(10)
logger.info("Updating OpenAI LLM settings: temperature=0.1")
await task.queue_frame(
LLMUpdateSettingsFrame(delta=OpenAIResponsesHttpLLMService.Settings(temperature=0.1))
)
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -149,6 +149,7 @@ TESTS_07 = [
("07zk-interruptible-resembleai.py", EVAL_SIMPLE_MATH),
("07zl-interruptible-smallest.py", EVAL_SIMPLE_MATH),
("07-interruptible-openai-responses.py", EVAL_SIMPLE_MATH),
("07-interruptible-openai-responses-http.py", EVAL_SIMPLE_MATH),
# Needs a local XTTS docker instance running.
# ("07i-interruptible-xtts.py", EVAL_SIMPLE_MATH),
]
@@ -156,6 +157,7 @@ TESTS_07 = [
TESTS_12 = [
("12-describe-image-openai.py", EVAL_VISION_IMAGE(eval_speaks_first=True)),
("12-describe-image-openai-responses.py", EVAL_VISION_IMAGE(eval_speaks_first=True)),
("12-describe-image-openai-responses-http.py", EVAL_VISION_IMAGE(eval_speaks_first=True)),
("12a-describe-image-anthropic.py", EVAL_VISION_IMAGE(eval_speaks_first=True)),
("12b-describe-image-aws.py", EVAL_VISION_IMAGE(eval_speaks_first=True)),
("12c-describe-image-gemini-flash.py", EVAL_VISION_IMAGE(eval_speaks_first=True)),
@@ -192,6 +194,10 @@ TESTS_14 = [
("14w-function-calling-mistral.py", EVAL_WEATHER),
("14y-function-calling-sarvam.py", EVAL_WEATHER),
("14z-function-calling-novita.py", EVAL_WEATHER),
("14-function-calling-openai-responses.py", EVAL_WEATHER),
("14-function-calling-openai-responses.py", EVAL_WEATHER_AND_RESTAURANT),
("14-function-calling-openai-responses-http.py", EVAL_WEATHER),
("14-function-calling-openai-responses-http.py", EVAL_WEATHER_AND_RESTAURANT),
# Video
("14d-function-calling-anthropic-video.py", EVAL_VISION_CAMERA),
("14d-function-calling-aws-video.py", EVAL_VISION_CAMERA),
@@ -199,6 +205,7 @@ TESTS_14 = [
("14d-function-calling-moondream-video.py", EVAL_VISION_CAMERA),
("14d-function-calling-openai-video.py", EVAL_VISION_CAMERA),
("14d-function-calling-openai-responses-video.py", EVAL_VISION_CAMERA),
("14d-function-calling-openai-responses-video-http.py", EVAL_VISION_CAMERA),
# Currently not working.
# ("14c-function-calling-together.py", EVAL_WEATHER),
# ("14l-function-calling-deepseek.py", EVAL_WEATHER),

View File

@@ -85,16 +85,22 @@ class OpenAIResponsesLLMAdapter(BaseLLMAdapter[OpenAIResponsesLLMInvocationParam
# OpenAILLMService (system_instruction + empty messages) need the
# instructions converted to an initial developer message.
#
# NOTE: if/when we support `previous_response_id` and/or
# `conversation_id`, we'll need to revisit this logic, as it'll
# NOTE: The service layer (OpenAIResponsesLLMService) internally
# manages `previous_response_id` for incremental context delivery
# over WebSocket. This runs post-adapter — the adapter always
# produces the full input list and the service determines what
# subset to send. This empty-input fallback is therefore only
# relevant for one-shot or initial calls.
#
# If we added support for user-provided explicit
# `previous_response_id` and/or `conversation_id` (overriding
# internal management), we'd need to revisit this logic, as it'd
# be legit to provide instructions without input items. Worth
# noting that OpenAI's docs suggest these parameters are primarily
# for development convenience rather than performance (the model
# still processes the full context), and come with the tradeoff
# of requiring OpenAI-side 30-day conversation storage, which may
# not be desirable for many users. But it could give folks an easy
# way to store/switch between conversations without needing to
# manage that storage themselves.
# not be desirable for many users.
if not input_items:
params["input"] = [{"role": "developer", "content": system_instruction}]
else:

View File

@@ -4,8 +4,9 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
"""OpenAI Responses API LLM service implementation."""
"""OpenAI Responses API LLM service implementations (WebSocket and HTTP)."""
import hashlib
import json
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
@@ -30,10 +31,13 @@ from pipecat.adapters.services.open_ai_responses_adapter import (
OpenAIResponsesLLMInvocationParams,
)
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
LLMContextFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
StartFrame,
)
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.llm_context import LLMContext
@@ -43,10 +47,46 @@ from pipecat.services.settings import NOT_GIVEN as _NOT_GIVEN
from pipecat.services.settings import LLMSettings, _NotGiven
from pipecat.utils.tracing.service_decorators import traced_llm
try:
from websockets.asyncio.client import connect as websocket_connect
from websockets.exceptions import ConnectionClosed
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use OpenAI, you need to `pip install pipecat-ai[openai]`.")
raise Exception(f"Missing module: {e}")
# ---------------------------------------------------------------------------
# Private retry exception classes
# ---------------------------------------------------------------------------
class _RetryableError(Exception):
"""Base for errors that should trigger a retry in _process_context."""
pass
class _PreviousResponseNotFoundError(_RetryableError):
"""Server could not find the previous response (connection-local cache miss)."""
pass
class _ConnectionLimitReachedError(_RetryableError):
"""WebSocket connection hit the 60-minute server-side limit."""
pass
# ---------------------------------------------------------------------------
# Settings
# ---------------------------------------------------------------------------
@dataclass
class OpenAIResponsesLLMSettings(LLMSettings):
"""Settings for OpenAIResponsesLLMService.
"""Settings for OpenAI Responses API LLM services.
Parameters:
max_completion_tokens: Maximum completion tokens to generate.
@@ -55,20 +95,17 @@ class OpenAIResponsesLLMSettings(LLMSettings):
max_completion_tokens: int | _NotGiven = field(default_factory=lambda: _NOT_GIVEN)
class OpenAIResponsesLLMService(LLMService):
"""OpenAI Responses API LLM service.
# ---------------------------------------------------------------------------
# Shared base class (private)
# ---------------------------------------------------------------------------
This service works with the universal LLMContext and LLMContextAggregatorPair.
Example::
class _BaseOpenAIResponsesLLMService(LLMService):
"""Shared base for HTTP and WebSocket OpenAI Responses API services.
llm = OpenAIResponsesLLMService(
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAIResponsesLLMService.Settings(
model="gpt-4.1",
system_instruction="You are a helpful assistant.",
),
)
Contains settings, adapter reference, HTTP client creation, parameter
building, ``run_inference``, and metrics support. Subclasses implement
``process_frame`` and ``_process_context`` for their transport.
"""
Settings = OpenAIResponsesLLMSettings
@@ -124,6 +161,7 @@ class OpenAIResponsesLLMService(LLMService):
**kwargs,
)
self._api_key = api_key
self._service_tier = service_tier
self._client = self._create_client(
api_key=api_key,
@@ -173,6 +211,556 @@ class OpenAIResponsesLLMService(LLMService):
"""Check if this service can generate processing metrics."""
return True
def _build_response_params(self, invocation_params: OpenAIResponsesLLMInvocationParams) -> dict:
"""Build parameters for a Responses API call.
Args:
invocation_params: Parameters derived from the LLM context.
Returns:
Dictionary of parameters for the Responses API call.
"""
params: Dict[str, Any] = {
"model": self._settings.model,
"stream": True,
"store": False,
"input": invocation_params["input"],
}
# instructions (set by the adapter when input is non-empty)
if "instructions" in invocation_params:
params["instructions"] = invocation_params["instructions"]
# Optional parameters - only include if given
if isinstance(self._settings.temperature, (int, float)):
params["temperature"] = self._settings.temperature
if isinstance(self._settings.top_p, (int, float)):
params["top_p"] = self._settings.top_p
if isinstance(self._settings.max_completion_tokens, int):
params["max_output_tokens"] = self._settings.max_completion_tokens
if self._service_tier is not None:
params["service_tier"] = self._service_tier
# Tools
tools = invocation_params.get("tools")
if tools is not None and not isinstance(tools, type(NOT_GIVEN)):
params["tools"] = tools
# Extra settings
params.update(self._settings.extra)
return params
async def run_inference(
self,
context: LLMContext,
max_tokens: Optional[int] = None,
system_instruction: Optional[str] = None,
) -> Optional[str]:
"""Run a one-shot, out-of-band inference with the given LLM context.
Always uses the HTTP client regardless of transport variant.
Args:
context: The LLM context containing conversation history.
max_tokens: Optional maximum number of tokens to generate.
system_instruction: Optional system instruction for this inference.
Returns:
The LLM's response as a string, or None if no response is generated.
"""
adapter: OpenAIResponsesLLMAdapter = self.get_llm_adapter()
effective_instruction = system_instruction or self._settings.system_instruction
invocation_params = adapter.get_llm_invocation_params(
context, system_instruction=effective_instruction
)
params = self._build_response_params(invocation_params)
# Override for non-streaming
params["stream"] = False
if max_tokens is not None:
params["max_output_tokens"] = max_tokens
response = await self._client.responses.create(**params)
return response.output_text
def _process_function_calls(
self,
context: LLMContext,
function_calls: Dict[str, Dict[str, str]],
) -> List[FunctionCallFromLLM]:
"""Convert accumulated function call data into FunctionCallFromLLM list.
Args:
context: The LLM context for the current inference.
function_calls: Map of item_id to {name, call_id, arguments}.
Returns:
List of parsed function call objects.
"""
fc_list: List[FunctionCallFromLLM] = []
for item_id, fc in function_calls.items():
try:
arguments = json.loads(fc["arguments"]) if fc["arguments"] else {}
except json.JSONDecodeError:
logger.warning(
f"{self}: Failed to parse function call arguments: {fc['arguments']}"
)
arguments = {}
fc_list.append(
FunctionCallFromLLM(
context=context,
tool_call_id=fc["call_id"],
function_name=fc["name"],
arguments=arguments,
)
)
return fc_list
# ---------------------------------------------------------------------------
# WebSocket variant (default / recommended)
# ---------------------------------------------------------------------------
class OpenAIResponsesLLMService(_BaseOpenAIResponsesLLMService):
"""OpenAI Responses API LLM service using WebSocket transport.
Maintains a persistent WebSocket connection to ``wss://api.openai.com/v1/responses``
for lower-latency inference, especially beneficial for tool-call-heavy workflows.
Automatically uses ``previous_response_id`` to send only incremental context when
possible, and falls back to full context on reconnection or cache miss.
This is the recommended variant for real-time / conversational use.
Example::
llm = OpenAIResponsesLLMService(
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAIResponsesLLMService.Settings(
model="gpt-4.1",
system_instruction="You are a helpful assistant.",
),
)
"""
def __init__(
self,
*,
ws_url: str = "wss://api.openai.com/v1/responses",
**kwargs,
):
"""Initialize the WebSocket-based OpenAI Responses API LLM service.
Args:
ws_url: WebSocket endpoint URL.
Defaults to ``wss://api.openai.com/v1/responses``.
**kwargs: Additional arguments passed to the base class (api_key,
base_url, organization, project, default_headers, service_tier,
settings, etc.).
"""
super().__init__(**kwargs)
self._ws_url = ws_url
self._websocket = None
self._disconnecting = False
# State for previous_response_id optimization
self._previous_response_id: Optional[str] = None
self._previous_input_hash: Optional[str] = None
self._previous_input_length: Optional[int] = None
# -- lifecycle ------------------------------------------------------------
async def start(self, frame: StartFrame):
"""Start the service and establish WebSocket connection.
Args:
frame: The start frame triggering service initialization.
"""
await super().start(frame)
await self._connect()
async def stop(self, frame: EndFrame):
"""Stop the service and close WebSocket connection.
Args:
frame: The end frame triggering service shutdown.
"""
await super().stop(frame)
await self._disconnect()
async def cancel(self, frame: CancelFrame):
"""Cancel the service and close WebSocket connection.
Args:
frame: The cancel frame triggering service cancellation.
"""
await super().cancel(frame)
await self._disconnect()
# -- connection management ------------------------------------------------
async def _connect(self):
"""Establish the WebSocket connection."""
self._disconnecting = False
try:
if self._websocket:
return
self._websocket = await websocket_connect(
uri=self._ws_url,
additional_headers={
"Authorization": f"Bearer {self._api_key}",
},
)
except Exception as e:
await self.push_error(error_msg=f"Error connecting to WebSocket: {e}", exception=e)
self._websocket = None
async def _disconnect(self):
"""Close the WebSocket connection and clear state."""
try:
self._disconnecting = True
await self.stop_all_metrics()
if self._websocket:
await self._websocket.close()
self._websocket = None
self._clear_previous_response_state()
self._disconnecting = False
except Exception as e:
await self.push_error(error_msg=f"Error disconnecting from WebSocket: {e}", exception=e)
async def _reconnect(self):
"""Reconnect to the WebSocket, clearing previous_response_id state."""
await self._disconnect()
await self._connect()
async def _ensure_connected(self):
"""Ensure a WebSocket connection is available, reconnecting if needed.
Raises:
_RetryableError: If the connection could not be established.
"""
if self._websocket is None:
await self._connect()
if self._websocket is None:
raise _RetryableError("Failed to establish WebSocket connection")
async def _ws_send(self, message: dict):
"""Send a JSON message over the WebSocket.
Args:
message: The message dict to serialize and send.
"""
if self._disconnecting or not self._websocket:
return
await self._websocket.send(json.dumps(message))
# -- previous_response_id optimization ------------------------------------
@staticmethod
def _hash_input_items(items: list) -> str:
"""Compute a deterministic hash of input items for comparison.
Args:
items: List of Responses API input items.
Returns:
Hex digest of the SHA-256 hash.
"""
return hashlib.sha256(json.dumps(items, sort_keys=True).encode()).hexdigest()
def _apply_previous_response_optimization(self, params: dict, full_input: list) -> dict:
"""Try to use previous_response_id to send only new input items.
If the prefix of ``full_input`` matches the stored hash from the
previous inference call, only new items are sent along with
``previous_response_id``. Otherwise the full input is sent.
Args:
params: The response params dict (modified in place).
full_input: The complete input items list from the adapter.
Returns:
The (possibly modified) params dict.
"""
if (
self._previous_response_id is not None
and self._previous_input_length is not None
and self._previous_input_hash is not None
and len(full_input) > self._previous_input_length
):
prefix = full_input[: self._previous_input_length]
prefix_hash = self._hash_input_items(prefix)
if prefix_hash == self._previous_input_hash:
new_items = full_input[self._previous_input_length :]
params["input"] = new_items
params["previous_response_id"] = self._previous_response_id
logger.debug(
f"{self}: Using previous_response_id optimization "
f"({len(new_items)} new items, "
f"{self._previous_input_length} cached)"
)
return params
# Full context send (no optimization possible)
return params
def _store_previous_response_state(self, response_id: str, full_input: list):
"""Store state for the next call's previous_response_id optimization.
Args:
response_id: The response ID returned by the server.
full_input: The complete input items list that was used.
"""
self._previous_response_id = response_id
self._previous_input_length = len(full_input)
self._previous_input_hash = self._hash_input_items(full_input)
def _clear_previous_response_state(self):
"""Clear stored previous_response_id state."""
self._previous_response_id = None
self._previous_input_length = None
self._previous_input_hash = None
# -- frame processing -----------------------------------------------------
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames for LLM completion requests.
Args:
frame: The frame to process.
direction: The direction of frame processing.
"""
await super().process_frame(frame, direction)
context = None
if isinstance(frame, LLMContextFrame):
context = frame.context
else:
await self.push_frame(frame, direction)
if context:
try:
await self.push_frame(LLMFullResponseStartFrame())
await self.start_processing_metrics()
await self._process_context(context)
except Exception as e:
await self.push_error(error_msg=f"Error during completion: {e}", exception=e)
finally:
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())
# -- core inference -------------------------------------------------------
@traced_llm
async def _process_context(self, context: LLMContext):
"""Run inference over WebSocket with retry and previous_response_id.
Args:
context: The LLM context containing conversation history.
"""
adapter: OpenAIResponsesLLMAdapter = self.get_llm_adapter()
logger.debug(
f"{self}: Generating response from universal context "
f"{adapter.get_messages_for_logging(context)}"
)
invocation_params = adapter.get_llm_invocation_params(
context, system_instruction=self._settings.system_instruction
)
full_input = invocation_params["input"]
max_attempts = 2
for attempt in range(max_attempts):
params = self._build_response_params(invocation_params)
# WebSocket mode does not use the "stream" parameter
params.pop("stream", None)
# Apply previous_response_id optimization (skipped after a retry)
if attempt == 0:
params = self._apply_previous_response_optimization(params, full_input)
try:
await self._ensure_connected()
await self.start_ttfb_metrics()
await self._ws_send({"type": "response.create", **params})
await self._receive_response_events(context, full_input)
return # Success
except _PreviousResponseNotFoundError:
logger.warning(
f"{self}: previous_response_not_found — "
f"retrying with full context ({len(full_input)} items)"
)
self._clear_previous_response_state()
await self.stop_ttfb_metrics()
if attempt >= max_attempts - 1:
await self.push_error(
error_msg="previous_response_not_found: retry also failed"
)
return
except _ConnectionLimitReachedError:
logger.warning(
f"{self}: WebSocket connection limit reached — "
f"reconnecting and retrying with full context ({len(full_input)} items)"
)
self._clear_previous_response_state()
await self.stop_ttfb_metrics()
await self._reconnect()
if attempt >= max_attempts - 1:
await self.push_error(error_msg="WebSocket connection limit: retry also failed")
return
except ConnectionClosed as e:
logger.warning(
f"{self}: WebSocket connection closed during inference: {e}"
f"reconnecting and retrying with full context ({len(full_input)} items)"
)
self._clear_previous_response_state()
self._websocket = None
await self.stop_ttfb_metrics()
await self._reconnect()
if attempt >= max_attempts - 1:
await self.push_error(
error_msg=f"WebSocket connection closed: retry also failed: {e}",
exception=e,
)
return
async def _receive_response_events(self, context: LLMContext, full_input: list):
"""Receive and process WebSocket events until the response completes.
Args:
context: The LLM context for the current inference.
full_input: The complete input items list (for storing state on success).
Raises:
_PreviousResponseNotFoundError: Server couldn't find previous response.
_ConnectionLimitReachedError: 60-minute connection limit reached.
ConnectionClosed: WebSocket connection was closed unexpectedly.
"""
function_calls: Dict[str, Dict[str, str]] = {}
current_arguments: Dict[str, str] = {}
while True:
raw = await self._websocket.recv()
event = json.loads(raw)
event_type = event.get("type")
if event_type == "response.output_text.delta":
await self.stop_ttfb_metrics()
await self._push_llm_text(event.get("delta", ""))
elif event_type == "response.output_item.added":
await self.stop_ttfb_metrics()
item = event.get("item", {})
if item.get("type") == "function_call":
item_id = item.get("id", "")
function_calls[item_id] = {
"name": item.get("name", ""),
"call_id": item.get("call_id", ""),
"arguments": "",
}
current_arguments[item_id] = ""
elif event_type == "response.function_call_arguments.delta":
item_id = event.get("item_id", "")
if item_id in current_arguments:
current_arguments[item_id] += event.get("delta", "")
elif event_type == "response.function_call_arguments.done":
item_id = event.get("item_id", "")
if item_id in function_calls:
function_calls[item_id]["arguments"] = event.get("arguments", "")
elif event_type == "response.output_item.done":
item = event.get("item", {})
if item.get("type") == "function_call":
item_id = item.get("id", "")
if item_id in function_calls:
function_calls[item_id]["name"] = item.get("name", "")
function_calls[item_id]["call_id"] = item.get("call_id", "")
function_calls[item_id]["arguments"] = item.get("arguments", "")
elif event_type == "response.completed":
response = event.get("response", {})
usage = response.get("usage")
if usage:
input_details = usage.get("input_tokens_details") or {}
output_details = usage.get("output_tokens_details") or {}
tokens = LLMTokenUsage(
prompt_tokens=usage.get("input_tokens", 0),
completion_tokens=usage.get("output_tokens", 0),
total_tokens=usage.get("total_tokens", 0),
cache_read_input_tokens=input_details.get("cached_tokens", 0),
reasoning_tokens=output_details.get("reasoning_tokens", 0),
)
await self.start_llm_usage_metrics(tokens)
self._full_model_name = response.get("model")
# Store state for next call's previous_response_id optimization
response_id = response.get("id")
if response_id:
self._store_previous_response_state(response_id, full_input)
break # Response complete
elif event_type in ("response.failed", "response.incomplete"):
response = event.get("response", {})
status_details = response.get("status_details") or {}
error_info = status_details.get("error") or {}
error_msg = error_info.get("message", f"Response {event_type.split('.')[-1]}")
await self.push_error(error_msg=f"LLM response error: {error_msg}")
break
elif event_type == "error":
error = event.get("error", {})
code = error.get("code", "")
message = error.get("message", "Unknown error")
if code == "previous_response_not_found":
raise _PreviousResponseNotFoundError(message)
elif code == "websocket_connection_limit_reached":
raise _ConnectionLimitReachedError(message)
else:
await self.push_error(error_msg=f"WebSocket API error: {message}")
break
# Process any function calls
if function_calls:
fc_list = self._process_function_calls(context, function_calls)
await self.run_function_calls(fc_list)
# ---------------------------------------------------------------------------
# HTTP variant
# ---------------------------------------------------------------------------
class OpenAIResponsesHttpLLMService(_BaseOpenAIResponsesLLMService):
"""OpenAI Responses API LLM service using HTTP streaming transport.
Uses server-sent events (SSE) via the OpenAI Python SDK for streaming
inference. Each ``_process_context`` call opens a new HTTP connection.
Example::
llm = OpenAIResponsesHttpLLMService(
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAIResponsesHttpLLMService.Settings(
model="gpt-4.1",
system_instruction="You are a helpful assistant.",
),
)
"""
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames for LLM completion requests.
@@ -300,101 +888,12 @@ class OpenAIResponsesLLMService(LLMService):
# Process any function calls
if function_calls:
fc_list: List[FunctionCallFromLLM] = []
for item_id, fc in function_calls.items():
try:
arguments = json.loads(fc["arguments"]) if fc["arguments"] else {}
except json.JSONDecodeError:
logger.warning(
f"{self}: Failed to parse function call arguments: {fc['arguments']}"
)
arguments = {}
fc_list.append(
FunctionCallFromLLM(
context=context,
tool_call_id=fc["call_id"],
function_name=fc["name"],
arguments=arguments,
)
)
fc_list = self._process_function_calls(context, function_calls)
await self.run_function_calls(fc_list)
def _build_response_params(self, invocation_params: OpenAIResponsesLLMInvocationParams) -> dict:
"""Build parameters for the responses.create() call.
Args:
invocation_params: Parameters derived from the LLM context.
Returns:
Dictionary of parameters for the Responses API call.
"""
params: Dict[str, Any] = {
"model": self._settings.model,
"stream": True,
"store": False,
"input": invocation_params["input"],
}
# instructions (set by the adapter when input is non-empty)
if "instructions" in invocation_params:
params["instructions"] = invocation_params["instructions"]
# Optional parameters - only include if given
if isinstance(self._settings.temperature, (int, float)):
params["temperature"] = self._settings.temperature
if isinstance(self._settings.top_p, (int, float)):
params["top_p"] = self._settings.top_p
if isinstance(self._settings.max_completion_tokens, int):
params["max_output_tokens"] = self._settings.max_completion_tokens
if self._service_tier is not None:
params["service_tier"] = self._service_tier
# Tools
tools = invocation_params.get("tools")
if tools is not None and not isinstance(tools, type(NOT_GIVEN)):
params["tools"] = tools
# Extra settings
params.update(self._settings.extra)
return params
async def run_inference(
self,
context: LLMContext,
max_tokens: Optional[int] = None,
system_instruction: Optional[str] = None,
) -> Optional[str]:
"""Run a one-shot, out-of-band inference with the given LLM context.
Args:
context: The LLM context containing conversation history.
max_tokens: Optional maximum number of tokens to generate.
system_instruction: Optional system instruction for this inference.
Returns:
The LLM's response as a string, or None if no response is generated.
"""
adapter: OpenAIResponsesLLMAdapter = self.get_llm_adapter()
effective_instruction = system_instruction or self._settings.system_instruction
invocation_params = adapter.get_llm_invocation_params(
context, system_instruction=effective_instruction
)
params = self._build_response_params(invocation_params)
# Override for non-streaming
params["stream"] = False
if max_tokens is not None:
params["max_output_tokens"] = max_tokens
response = await self._client.responses.create(**params)
return response.output_text
__all__ = ["OpenAIResponsesLLMService", "OpenAIResponsesLLMSettings"]
__all__ = [
"OpenAIResponsesLLMService",
"OpenAIResponsesHttpLLMService",
"OpenAIResponsesLLMSettings",
]

View File

@@ -0,0 +1,491 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Tests for the WebSocket variant of OpenAIResponsesLLMService."""
import json
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.services.openai.responses.llm import OpenAIResponsesLLMService
def _make_service(**kwargs):
"""Create a service with the client mocked out."""
with patch.object(OpenAIResponsesLLMService, "_create_client"):
service = OpenAIResponsesLLMService(
api_key="test-key",
**kwargs,
)
service._client = AsyncMock()
return service
def _ws_events(*events):
"""Build a mock WebSocket that yields the given events from recv()."""
ws = AsyncMock()
# .recv() returns each event in order, then raises StopAsyncIteration
ws.recv = AsyncMock(side_effect=[json.dumps(e) for e in events])
ws.send = AsyncMock()
ws.close = AsyncMock()
ws.close_code = None
return ws
# ---------------------------------------------------------------------------
# Hash determinism
# ---------------------------------------------------------------------------
class TestHashInputItems:
def test_same_input_same_hash(self):
items = [{"role": "user", "content": "hello"}]
h1 = OpenAIResponsesLLMService._hash_input_items(items)
h2 = OpenAIResponsesLLMService._hash_input_items(items)
assert h1 == h2
def test_different_input_different_hash(self):
h1 = OpenAIResponsesLLMService._hash_input_items([{"role": "user", "content": "hello"}])
h2 = OpenAIResponsesLLMService._hash_input_items([{"role": "user", "content": "world"}])
assert h1 != h2
def test_order_independent_keys(self):
"""Keys within a dict should not affect hash (sort_keys=True)."""
h1 = OpenAIResponsesLLMService._hash_input_items([{"a": 1, "b": 2}])
h2 = OpenAIResponsesLLMService._hash_input_items([{"b": 2, "a": 1}])
assert h1 == h2
# ---------------------------------------------------------------------------
# previous_response_id optimization
# ---------------------------------------------------------------------------
class TestPreviousResponseOptimization:
def test_no_previous_state_sends_full_input(self):
service = _make_service()
full_input = [{"role": "user", "content": "hi"}]
params = {"input": full_input, "model": "gpt-4.1"}
result = service._apply_previous_response_optimization(params, full_input)
assert result["input"] == full_input
assert "previous_response_id" not in result
def test_matching_prefix_sends_incremental(self):
service = _make_service()
prev_input = [{"role": "user", "content": "hi"}]
service._store_previous_response_state("resp_123", prev_input)
full_input = [
{"role": "user", "content": "hi"},
{"role": "assistant", "content": "hello"},
{"role": "user", "content": "how are you?"},
]
params = {"input": list(full_input), "model": "gpt-4.1"}
result = service._apply_previous_response_optimization(params, full_input)
assert result["previous_response_id"] == "resp_123"
assert result["input"] == full_input[1:]
def test_mismatched_prefix_sends_full(self):
service = _make_service()
prev_input = [{"role": "user", "content": "hi"}]
service._store_previous_response_state("resp_123", prev_input)
# Different first message
full_input = [
{"role": "user", "content": "different"},
{"role": "assistant", "content": "hello"},
]
params = {"input": list(full_input), "model": "gpt-4.1"}
result = service._apply_previous_response_optimization(params, full_input)
assert "previous_response_id" not in result
assert result["input"] == full_input
def test_same_length_sends_full(self):
"""When new input is same length as previous, no optimization."""
service = _make_service()
prev_input = [{"role": "user", "content": "hi"}]
service._store_previous_response_state("resp_123", prev_input)
full_input = [{"role": "user", "content": "hi"}]
params = {"input": list(full_input), "model": "gpt-4.1"}
result = service._apply_previous_response_optimization(params, full_input)
assert "previous_response_id" not in result
def test_clear_state(self):
service = _make_service()
service._store_previous_response_state("resp_123", [{"role": "user", "content": "hi"}])
service._clear_previous_response_state()
assert service._previous_response_id is None
assert service._previous_input_hash is None
assert service._previous_input_length is None
# ---------------------------------------------------------------------------
# _receive_response_events — text streaming
# ---------------------------------------------------------------------------
class TestReceiveResponseEventsText:
@pytest.mark.asyncio
async def test_text_deltas_pushed(self):
service = _make_service()
service._push_llm_text = AsyncMock()
service.stop_ttfb_metrics = AsyncMock()
service.start_llm_usage_metrics = AsyncMock()
ws = _ws_events(
{"type": "response.output_text.delta", "delta": "Hello"},
{"type": "response.output_text.delta", "delta": " world"},
{
"type": "response.completed",
"response": {
"id": "resp_1",
"model": "gpt-4.1",
"usage": {
"input_tokens": 10,
"output_tokens": 5,
"total_tokens": 15,
"input_tokens_details": {"cached_tokens": 0},
"output_tokens_details": {"reasoning_tokens": 0},
},
},
},
)
service._websocket = ws
context = MagicMock(spec=LLMContext)
full_input = [{"role": "user", "content": "hi"}]
await service._receive_response_events(context, full_input)
assert service._push_llm_text.call_count == 2
service._push_llm_text.assert_any_await("Hello")
service._push_llm_text.assert_any_await(" world")
@pytest.mark.asyncio
async def test_response_completed_stores_state(self):
service = _make_service()
service._push_llm_text = AsyncMock()
service.stop_ttfb_metrics = AsyncMock()
service.start_llm_usage_metrics = AsyncMock()
ws = _ws_events(
{
"type": "response.completed",
"response": {
"id": "resp_42",
"model": "gpt-4.1",
"usage": {
"input_tokens": 10,
"output_tokens": 5,
"total_tokens": 15,
"input_tokens_details": {"cached_tokens": 2},
"output_tokens_details": {"reasoning_tokens": 1},
},
},
},
)
service._websocket = ws
context = MagicMock(spec=LLMContext)
full_input = [{"role": "user", "content": "hi"}]
await service._receive_response_events(context, full_input)
assert service._previous_response_id == "resp_42"
assert service._previous_input_length == 1
assert service._previous_input_hash is not None
assert service.start_llm_usage_metrics.called
@pytest.mark.asyncio
async def test_token_usage_metrics(self):
service = _make_service()
service._push_llm_text = AsyncMock()
service.stop_ttfb_metrics = AsyncMock()
service.start_llm_usage_metrics = AsyncMock()
ws = _ws_events(
{
"type": "response.completed",
"response": {
"id": "resp_1",
"model": "gpt-4.1",
"usage": {
"input_tokens": 100,
"output_tokens": 50,
"total_tokens": 150,
"input_tokens_details": {"cached_tokens": 20},
"output_tokens_details": {"reasoning_tokens": 10},
},
},
},
)
service._websocket = ws
context = MagicMock(spec=LLMContext)
await service._receive_response_events(context, [])
tokens = service.start_llm_usage_metrics.call_args[0][0]
assert tokens.prompt_tokens == 100
assert tokens.completion_tokens == 50
assert tokens.total_tokens == 150
assert tokens.cache_read_input_tokens == 20
assert tokens.reasoning_tokens == 10
# ---------------------------------------------------------------------------
# _receive_response_events — function calls
# ---------------------------------------------------------------------------
class TestReceiveResponseEventsFunctionCalls:
@pytest.mark.asyncio
async def test_function_call_sequence(self):
service = _make_service()
service._push_llm_text = AsyncMock()
service.stop_ttfb_metrics = AsyncMock()
service.start_llm_usage_metrics = AsyncMock()
service.run_function_calls = AsyncMock()
ws = _ws_events(
{
"type": "response.output_item.added",
"item": {
"type": "function_call",
"id": "fc_1",
"name": "get_weather",
"call_id": "call_1",
},
},
{
"type": "response.function_call_arguments.delta",
"item_id": "fc_1",
"delta": '{"loc',
},
{
"type": "response.function_call_arguments.delta",
"item_id": "fc_1",
"delta": 'ation": "SF"}',
},
{
"type": "response.function_call_arguments.done",
"item_id": "fc_1",
"arguments": '{"location": "SF"}',
},
{
"type": "response.output_item.done",
"item": {
"type": "function_call",
"id": "fc_1",
"name": "get_weather",
"call_id": "call_1",
"arguments": '{"location": "SF"}',
},
},
{
"type": "response.completed",
"response": {"id": "resp_1", "model": "gpt-4.1", "usage": None},
},
)
service._websocket = ws
context = MagicMock(spec=LLMContext)
await service._receive_response_events(context, [])
service.run_function_calls.assert_called_once()
fc_list = service.run_function_calls.call_args[0][0]
assert len(fc_list) == 1
assert fc_list[0].function_name == "get_weather"
assert fc_list[0].tool_call_id == "call_1"
assert fc_list[0].arguments == {"location": "SF"}
# ---------------------------------------------------------------------------
# _receive_response_events — errors
# ---------------------------------------------------------------------------
class TestReceiveResponseEventsErrors:
@pytest.mark.asyncio
async def test_response_failed_pushes_error(self):
service = _make_service()
service.stop_ttfb_metrics = AsyncMock()
service.start_llm_usage_metrics = AsyncMock()
service.push_error = AsyncMock()
ws = _ws_events(
{
"type": "response.failed",
"response": {
"id": "resp_1",
"status_details": {
"error": {"message": "Content filter triggered"},
},
},
},
)
service._websocket = ws
context = MagicMock(spec=LLMContext)
await service._receive_response_events(context, [])
service.push_error.assert_called_once()
assert "Content filter triggered" in service.push_error.call_args.kwargs["error_msg"]
@pytest.mark.asyncio
async def test_response_incomplete_pushes_error(self):
service = _make_service()
service.stop_ttfb_metrics = AsyncMock()
service.start_llm_usage_metrics = AsyncMock()
service.push_error = AsyncMock()
ws = _ws_events(
{
"type": "response.incomplete",
"response": {"id": "resp_1", "status_details": None},
},
)
service._websocket = ws
context = MagicMock(spec=LLMContext)
await service._receive_response_events(context, [])
service.push_error.assert_called_once()
@pytest.mark.asyncio
async def test_previous_response_not_found_raises(self):
from pipecat.services.openai.responses.llm import _PreviousResponseNotFoundError
service = _make_service()
service.stop_ttfb_metrics = AsyncMock()
ws = _ws_events(
{
"type": "error",
"error": {
"code": "previous_response_not_found",
"message": "Previous response with id 'resp_abc' not found.",
},
},
)
service._websocket = ws
context = MagicMock(spec=LLMContext)
with pytest.raises(_PreviousResponseNotFoundError):
await service._receive_response_events(context, [])
@pytest.mark.asyncio
async def test_connection_limit_reached_raises(self):
from pipecat.services.openai.responses.llm import _ConnectionLimitReachedError
service = _make_service()
service.stop_ttfb_metrics = AsyncMock()
ws = _ws_events(
{
"type": "error",
"error": {
"code": "websocket_connection_limit_reached",
"message": "Connection limit reached.",
},
},
)
service._websocket = ws
context = MagicMock(spec=LLMContext)
with pytest.raises(_ConnectionLimitReachedError):
await service._receive_response_events(context, [])
@pytest.mark.asyncio
async def test_generic_error_pushes_error(self):
service = _make_service()
service.stop_ttfb_metrics = AsyncMock()
service.start_llm_usage_metrics = AsyncMock()
service.push_error = AsyncMock()
ws = _ws_events(
{
"type": "error",
"error": {
"code": "server_error",
"message": "Internal server error",
},
},
)
service._websocket = ws
context = MagicMock(spec=LLMContext)
await service._receive_response_events(context, [])
service.push_error.assert_called_once()
assert "Internal server error" in service.push_error.call_args.kwargs["error_msg"]
# ---------------------------------------------------------------------------
# Connection lifecycle
# ---------------------------------------------------------------------------
class TestConnectionLifecycle:
@pytest.mark.asyncio
async def test_disconnect_clears_previous_response_state(self):
service = _make_service()
service._store_previous_response_state("resp_1", [{"role": "user", "content": "hi"}])
service.stop_all_metrics = AsyncMock()
await service._disconnect()
assert service._previous_response_id is None
assert service._previous_input_hash is None
assert service._previous_input_length is None
@pytest.mark.asyncio
async def test_reconnect_clears_state_and_reconnects(self):
service = _make_service()
service._store_previous_response_state("resp_1", [{"role": "user", "content": "hi"}])
service.stop_all_metrics = AsyncMock()
service.push_error = AsyncMock()
# Mock connect to set a websocket
mock_ws = AsyncMock()
mock_ws.close = AsyncMock()
service._websocket = mock_ws
with patch(
"pipecat.services.openai.responses.llm.websocket_connect",
new_callable=AsyncMock,
return_value=AsyncMock(),
):
await service._reconnect()
assert service._previous_response_id is None
mock_ws.close.assert_called_once()
@pytest.mark.asyncio
async def test_ensure_connected_raises_on_failure(self):
from pipecat.services.openai.responses.llm import _RetryableError
service = _make_service()
service._websocket = None
service.push_error = AsyncMock()
# Mock connect to fail
with patch(
"pipecat.services.openai.responses.llm.websocket_connect",
new_callable=AsyncMock,
side_effect=Exception("Connection refused"),
):
with pytest.raises(_RetryableError):
await service._ensure_connected()

View File

@@ -20,7 +20,10 @@ from pipecat.services.anthropic.llm import AnthropicLLMService
from pipecat.services.aws.llm import AWSBedrockLLMService
from pipecat.services.google.llm import GoogleLLMService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.openai.responses.llm import OpenAIResponsesLLMService
from pipecat.services.openai.responses.llm import (
OpenAIResponsesHttpLLMService,
OpenAIResponsesLLMService,
)
@pytest.mark.asyncio
@@ -945,3 +948,168 @@ async def test_openai_responses_run_inference_system_instruction_param_with_empt
{"role": "developer", "content": "Summarize the conversation"}
]
assert "instructions" not in call_kwargs
# --- OpenAI Responses HTTP API tests ---
# These mirror the WebSocket variant tests above, verifying that the HTTP
# variant's run_inference (inherited from the shared base class) works
# identically.
@pytest.mark.asyncio
async def test_openai_responses_http_run_inference_with_llm_context():
"""Test run_inference with LLMContext returns expected response (HTTP variant)."""
with patch.object(OpenAIResponsesHttpLLMService, "_create_client"):
service = OpenAIResponsesHttpLLMService(
settings=OpenAIResponsesHttpLLMService.Settings(
model="gpt-4.1",
system_instruction="You are a helpful assistant",
temperature=0.7,
max_completion_tokens=100,
),
)
service._client = AsyncMock()
context = LLMContext(
messages=[
{"role": "user", "content": "Hello, world!"},
]
)
mock_response = MagicMock()
mock_response.output_text = "Hello! How can I help you today?"
service._client.responses.create = AsyncMock(return_value=mock_response)
result = await service.run_inference(context)
assert result == "Hello! How can I help you today?"
call_kwargs = service._client.responses.create.call_args.kwargs
assert call_kwargs["model"] == "gpt-4.1"
assert call_kwargs["stream"] is False
assert call_kwargs["store"] is False
assert call_kwargs["input"] == [{"role": "user", "content": "Hello, world!"}]
assert call_kwargs["instructions"] == "You are a helpful assistant"
assert call_kwargs["temperature"] == 0.7
assert call_kwargs["max_output_tokens"] == 100
@pytest.mark.asyncio
async def test_openai_responses_http_run_inference_client_exception():
"""Test that exceptions from the client are propagated (HTTP variant)."""
with patch.object(OpenAIResponsesHttpLLMService, "_create_client"):
service = OpenAIResponsesHttpLLMService()
service._client = AsyncMock()
context = LLMContext(messages=[{"role": "user", "content": "Hello"}])
service._client.responses.create = AsyncMock(side_effect=Exception("API Error"))
with pytest.raises(Exception, match="API Error"):
await service.run_inference(context)
@pytest.mark.asyncio
async def test_openai_responses_http_run_inference_system_instruction_overrides():
"""Test that system_instruction parameter overrides the settings instruction (HTTP variant)."""
with patch.object(OpenAIResponsesHttpLLMService, "_create_client"):
service = OpenAIResponsesHttpLLMService(
settings=OpenAIResponsesHttpLLMService.Settings(
model="gpt-4.1",
system_instruction="Original instruction",
),
)
service._client = AsyncMock()
context = LLMContext(
messages=[{"role": "user", "content": "Hello"}],
)
mock_response = MagicMock()
mock_response.output_text = "Response"
service._client.responses.create = AsyncMock(return_value=mock_response)
result = await service.run_inference(context, system_instruction="New system instruction")
assert result == "Response"
call_kwargs = service._client.responses.create.call_args.kwargs
assert call_kwargs["instructions"] == "New system instruction"
assert call_kwargs["input"] == [{"role": "user", "content": "Hello"}]
@pytest.mark.asyncio
async def test_openai_responses_http_run_inference_empty_context_with_instruction():
"""Test that system_instruction becomes a developer message when context is empty (HTTP)."""
with patch.object(OpenAIResponsesHttpLLMService, "_create_client"):
service = OpenAIResponsesHttpLLMService(
settings=OpenAIResponsesHttpLLMService.Settings(
model="gpt-4.1",
system_instruction="You are helpful",
),
)
service._client = AsyncMock()
context = LLMContext(messages=[])
mock_response = MagicMock()
mock_response.output_text = "Response"
service._client.responses.create = AsyncMock(return_value=mock_response)
result = await service.run_inference(context)
assert result == "Response"
call_kwargs = service._client.responses.create.call_args.kwargs
assert call_kwargs["input"] == [{"role": "developer", "content": "You are helpful"}]
assert "instructions" not in call_kwargs
@pytest.mark.asyncio
async def test_openai_responses_http_run_inference_max_tokens_override():
"""Test that max_tokens parameter overrides max_output_tokens (HTTP variant)."""
with patch.object(OpenAIResponsesHttpLLMService, "_create_client"):
service = OpenAIResponsesHttpLLMService(
settings=OpenAIResponsesHttpLLMService.Settings(
model="gpt-4.1",
max_completion_tokens=500,
),
)
service._client = AsyncMock()
context = LLMContext(
messages=[{"role": "user", "content": "Summarize this"}],
)
mock_response = MagicMock()
mock_response.output_text = "Summary"
service._client.responses.create = AsyncMock(return_value=mock_response)
result = await service.run_inference(context, max_tokens=200)
assert result == "Summary"
call_kwargs = service._client.responses.create.call_args.kwargs
assert call_kwargs["max_output_tokens"] == 200
@pytest.mark.asyncio
async def test_openai_responses_http_run_inference_system_instruction_param_with_empty_context():
"""Test system_instruction param becomes developer message for empty context (HTTP)."""
with patch.object(OpenAIResponsesHttpLLMService, "_create_client"):
service = OpenAIResponsesHttpLLMService(
settings=OpenAIResponsesHttpLLMService.Settings(model="gpt-4.1"),
)
service._client = AsyncMock()
context = LLMContext(messages=[])
mock_response = MagicMock()
mock_response.output_text = "Response"
service._client.responses.create = AsyncMock(return_value=mock_response)
result = await service.run_inference(
context, system_instruction="Summarize the conversation"
)
assert result == "Response"
call_kwargs = service._client.responses.create.call_args.kwargs
assert call_kwargs["input"] == [
{"role": "developer", "content": "Summarize the conversation"}
]
assert "instructions" not in call_kwargs