diff --git a/examples/foundational/07-interruptible-openai-responses-http.py b/examples/foundational/07-interruptible-openai-responses-http.py new file mode 100644 index 000000000..6960bf7b2 --- /dev/null +++ b/examples/foundational/07-interruptible-openai-responses-http.py @@ -0,0 +1,125 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import os + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import LLMRunFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import ( + LLMContextAggregatorPair, + LLMUserAggregatorParams, +) +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.openai.responses.llm import OpenAIResponsesHttpLLMService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams +from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams + +load_dotenv(override=True) + +# We use lambdas to defer transport parameter creation until the transport +# type is selected at runtime. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info(f"Starting bot") + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + settings=CartesiaTTSService.Settings( + voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ), + ) + + llm = OpenAIResponsesHttpLLMService( + api_key=os.getenv("OPENAI_API_KEY"), + settings=OpenAIResponsesHttpLLMService.Settings( + system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.", + ), + ) + + context = LLMContext() + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + ) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, + user_aggregator, # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + assistant_aggregator, # Assistant spoken responses + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + # Kick off the conversation. + context.add_message( + {"role": "developer", "content": "Please introduce yourself to the user."} + ) + await task.queue_frames([LLMRunFrame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/examples/foundational/12-describe-image-openai-responses-http.py b/examples/foundational/12-describe-image-openai-responses-http.py new file mode 100644 index 000000000..866a86d2c --- /dev/null +++ b/examples/foundational/12-describe-image-openai-responses-http.py @@ -0,0 +1,139 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + + +import os + +from dotenv import load_dotenv +from loguru import logger +from PIL import Image + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import LLMRunFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import ( + LLMContextAggregatorPair, + LLMUserAggregatorParams, +) +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.openai.responses.llm import OpenAIResponsesHttpLLMService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams + +load_dotenv(override=True) + + +# We use lambdas to defer transport parameter creation until the transport +# type is selected at runtime. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info(f"Starting bot") + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + settings=CartesiaTTSService.Settings( + voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ), + ) + + llm = OpenAIResponsesHttpLLMService( + api_key=os.getenv("OPENAI_API_KEY"), + settings=OpenAIResponsesHttpLLMService.Settings( + system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way. You are also able to describe images.", + ), + ) + + context = LLMContext() + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + ) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, # STT + user_aggregator, # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + assistant_aggregator, # Assistant spoken responses + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + + if not runner_args.body: + script_dir = os.path.dirname(__file__) + runner_args.body = { + "image_path": os.path.join(script_dir, "assets", "cat.jpg"), + "question": "Describe this image", + } + + image_path = runner_args.body["image_path"] + question = runner_args.body["question"] + + # Kick off the conversation. + image = Image.open(image_path) + message = await LLMContext.create_image_message( + image=image.tobytes(), + format="RGB", + size=image.size, + text=question, + ) + context.add_message(message) + await task.queue_frames([LLMRunFrame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/examples/foundational/14-function-calling-openai-responses-http.py b/examples/foundational/14-function-calling-openai-responses-http.py new file mode 100644 index 000000000..ae38c98f5 --- /dev/null +++ b/examples/foundational/14-function-calling-openai-responses-http.py @@ -0,0 +1,175 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import os + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.adapters.schemas.function_schema import FunctionSchema +from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import LLMRunFrame, TTSSpeakFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import ( + LLMContextAggregatorPair, + LLMUserAggregatorParams, +) +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.llm_service import FunctionCallParams +from pipecat.services.openai.responses.llm import OpenAIResponsesHttpLLMService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams +from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams + +load_dotenv(override=True) + + +async def fetch_weather_from_api(params: FunctionCallParams): + await params.result_callback({"conditions": "nice", "temperature": "75"}) + + +async def fetch_restaurant_recommendation(params: FunctionCallParams): + await params.result_callback({"name": "The Golden Dragon"}) + + +# We use lambdas to defer transport parameter creation until the transport +# type is selected at runtime. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info(f"Starting bot") + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + settings=CartesiaTTSService.Settings( + voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ), + ) + + llm = OpenAIResponsesHttpLLMService( + api_key=os.getenv("OPENAI_API_KEY"), + settings=OpenAIResponsesHttpLLMService.Settings( + system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.", + ), + ) + + # You can also register a function_name of None to get all functions + # sent to the same callback with an additional function_name parameter. + llm.register_function("get_current_weather", fetch_weather_from_api) + llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation) + + @llm.event_handler("on_function_calls_started") + async def on_function_calls_started(service, function_calls): + await tts.queue_frame(TTSSpeakFrame("Let me check on that.")) + + weather_function = FunctionSchema( + name="get_current_weather", + description="Get the current weather", + properties={ + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "format": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "The temperature unit to use. Infer this from the user's location.", + }, + }, + required=["location", "format"], + ) + restaurant_function = FunctionSchema( + name="get_restaurant_recommendation", + description="Get a restaurant recommendation", + properties={ + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + }, + required=["location"], + ) + tools = ToolsSchema(standard_tools=[weather_function, restaurant_function]) + + context = LLMContext(tools=tools) + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + ) + + pipeline = Pipeline( + [ + transport.input(), + stt, + user_aggregator, + llm, + tts, + transport.output(), + assistant_aggregator, + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + # Kick off the conversation. + context.add_message( + {"role": "developer", "content": "Please introduce yourself to the user."} + ) + await task.queue_frames([LLMRunFrame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/examples/foundational/14d-function-calling-openai-responses-video-http.py b/examples/foundational/14d-function-calling-openai-responses-video-http.py new file mode 100644 index 000000000..ad83aab24 --- /dev/null +++ b/examples/foundational/14d-function-calling-openai-responses-video-http.py @@ -0,0 +1,195 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + + +import os + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.adapters.schemas.function_schema import FunctionSchema +from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import LLMRunFrame, TTSSpeakFrame, UserImageRequestFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import ( + LLMContextAggregatorPair, + LLMUserAggregatorParams, +) +from pipecat.processors.frame_processor import FrameDirection +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import ( + create_transport, + get_transport_client_id, + maybe_capture_participant_camera, +) +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.llm_service import FunctionCallParams +from pipecat.services.openai.responses.llm import OpenAIResponsesHttpLLMService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams + +load_dotenv(override=True) + + +async def fetch_user_image(params: FunctionCallParams): + """Fetch the user image and push it to the LLM. + + When called, this function pushes a UserImageRequestFrame upstream to the + transport. As a result, the transport will request the user image and push a + UserImageRawFrame downstream which will be added to the context by the LLM + assistant aggregator. The result_callback will be invoked once the image is + retrieved and processed. + """ + user_id = params.arguments["user_id"] + question = params.arguments["question"] + logger.debug(f"Requesting image with user_id={user_id}, question={question}") + + # Request a user image frame and indicate that it should be added to the + # context. Also associate it to the function call. Pass the result_callback + # so it can be invoked when the image is actually retrieved. + await params.llm.push_frame( + UserImageRequestFrame( + user_id=user_id, + text=question, + append_to_context=True, + function_name=params.function_name, + tool_call_id=params.tool_call_id, + result_callback=params.result_callback, + ), + FrameDirection.UPSTREAM, + ) + + +# We use lambdas to defer transport parameter creation until the transport +# type is selected at runtime. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + video_in_enabled=True, + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + video_in_enabled=True, + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info(f"Starting bot") + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + settings=CartesiaTTSService.Settings( + voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ), + ) + + llm = OpenAIResponsesHttpLLMService( + api_key=os.getenv("OPENAI_API_KEY"), + settings=OpenAIResponsesHttpLLMService.Settings( + system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way. You are able to describe images from the user camera.", + ), + ) + llm.register_function("fetch_user_image", fetch_user_image) + + @llm.event_handler("on_function_calls_started") + async def on_function_calls_started(service, function_calls): + await tts.queue_frame(TTSSpeakFrame("Let me check on that.", append_to_context=False)) + + fetch_image_function = FunctionSchema( + name="fetch_user_image", + description="Called when the user requests a description of their camera feed", + properties={ + "user_id": { + "type": "string", + "description": "The ID of the user to grab the image from", + }, + "question": { + "type": "string", + "description": "The question that the user is asking about the image", + }, + }, + required=["user_id", "question"], + ) + tools = ToolsSchema(standard_tools=[fetch_image_function]) + + context = LLMContext(tools=tools) + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + ) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, # STT + user_aggregator, # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + assistant_aggregator, # Assistant spoken responses + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + + await maybe_capture_participant_camera(transport, client) + + client_id = get_transport_client_id(transport, client) + + # Kick off the conversation. + context.add_message( + { + "role": "developer", + "content": f"Please introduce yourself to the user. Use '{client_id}' as the user ID during function calls.", + } + ) + await task.queue_frames([LLMRunFrame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + @tts.event_handler("on_tts_request") + async def on_tts_request(tts, context_id: str, text: str): + logger.debug(f"On TTS request: {context_id}: {text}") + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/examples/foundational/20a-persistent-context-openai-responses-http.py b/examples/foundational/20a-persistent-context-openai-responses-http.py new file mode 100644 index 000000000..0f13eda18 --- /dev/null +++ b/examples/foundational/20a-persistent-context-openai-responses-http.py @@ -0,0 +1,249 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import glob +import json +import os +from datetime import datetime + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.adapters.schemas.function_schema import FunctionSchema +from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import LLMRunFrame, TTSSpeakFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import ( + LLMContextAggregatorPair, + LLMUserAggregatorParams, +) +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.llm_service import FunctionCallParams +from pipecat.services.openai.responses.llm import OpenAIResponsesHttpLLMService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams +from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams + +load_dotenv(override=True) + + +BASE_FILENAME = "/tmp/pipecat_conversation_" + + +async def fetch_weather_from_api(params: FunctionCallParams): + temperature = 75 if params.arguments["format"] == "fahrenheit" else 24 + await params.result_callback( + { + "conditions": "nice", + "temperature": temperature, + "format": params.arguments["format"], + "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"), + } + ) + + +async def get_saved_conversation_filenames(params: FunctionCallParams): + # Construct the full pattern including the BASE_FILENAME + full_pattern = f"{BASE_FILENAME}*.json" + + # Use glob to find all matching files + matching_files = glob.glob(full_pattern) + logger.debug(f"matching files: {matching_files}") + + await params.result_callback({"filenames": matching_files}) + + +async def save_conversation(params: FunctionCallParams): + timestamp = datetime.now().strftime("%Y-%m-%d_%H:%M:%S") + filename = f"{BASE_FILENAME}{timestamp}.json" + logger.debug( + f"writing conversation to {filename}\n{json.dumps(params.context.get_messages(), indent=4)}" + ) + try: + with open(filename, "w") as file: + messages = params.context.get_messages() + # remove the last message, which is the instruction we just gave to save the conversation + messages.pop() + json.dump(messages, file, indent=2) + await params.result_callback({"success": True}) + except Exception as e: + await params.result_callback({"success": False, "error": str(e)}) + + +async def load_conversation(params: FunctionCallParams): + global tts + filename = params.arguments["filename"] + logger.debug(f"loading conversation from {filename}") + try: + with open(filename, "r") as file: + params.context.set_messages(json.load(file)) + logger.debug( + f"loaded conversation from {filename}\n{json.dumps(params.context.get_messages(), indent=4)}" + ) + await params.llm.queue_frame(TTSSpeakFrame("Ok, I've loaded that conversation.")) + except Exception as e: + await params.result_callback({"success": False, "error": str(e)}) + + +system_instruction = "You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way." + +weather_function = FunctionSchema( + name="get_current_weather", + description="Get the current weather", + properties={ + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "format": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "The temperature unit to use. Infer this from the users location.", + }, + }, + required=["location", "format"], +) + +save_conversation_function = FunctionSchema( + name="save_conversation", + description="Save the current conversation. Use this function to persist the current conversation to external storage.", + properties={}, + required=[], +) + +get_filenames_function = FunctionSchema( + name="get_saved_conversation_filenames", + description="Get a list of saved conversation histories. Returns a list of filenames. Each filename includes a date and timestamp. Each file is conversation history that can be loaded into this session.", + properties={}, + required=[], +) + +load_conversation_function = FunctionSchema( + name="load_conversation", + description="Load a conversation history. Use this function to load a conversation history into the current session.", + properties={ + "filename": { + "type": "string", + "description": "The filename of the conversation history to load.", + } + }, + required=["filename"], +) + +tools = ToolsSchema( + standard_tools=[ + weather_function, + save_conversation_function, + get_filenames_function, + load_conversation_function, + ] +) + + +# We use lambdas to defer transport parameter creation until the transport +# type is selected at runtime. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info(f"Starting bot") + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + settings=CartesiaTTSService.Settings( + voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ), + ) + + llm = OpenAIResponsesHttpLLMService( + api_key=os.getenv("OPENAI_API_KEY"), + settings=OpenAIResponsesHttpLLMService.Settings( + system_instruction=system_instruction, + ), + ) + + # you can either register a single function for all function calls, or specific functions + # llm.register_function(None, fetch_weather_from_api) + llm.register_function("get_current_weather", fetch_weather_from_api) + llm.register_function("save_conversation", save_conversation) + llm.register_function("get_saved_conversation_filenames", get_saved_conversation_filenames) + llm.register_function("load_conversation", load_conversation) + + context = LLMContext(tools=tools) + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + ) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, # STT + user_aggregator, + llm, # LLM + tts, + transport.output(), # Transport bot output + assistant_aggregator, + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + # Kick off the conversation. + await task.queue_frames([LLMRunFrame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/examples/foundational/55zi-update-settings-openai-responses-http-llm.py b/examples/foundational/55zi-update-settings-openai-responses-http-llm.py new file mode 100644 index 000000000..051a2f71e --- /dev/null +++ b/examples/foundational/55zi-update-settings-openai-responses-http-llm.py @@ -0,0 +1,129 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import os + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import LLMRunFrame, LLMUpdateSettingsFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import ( + LLMContextAggregatorPair, + LLMUserAggregatorParams, +) +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.openai.responses.llm import OpenAIResponsesHttpLLMService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams +from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams + +load_dotenv(override=True) + +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info(f"Starting bot") + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + settings=CartesiaTTSService.Settings( + voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ), + ) + + llm = OpenAIResponsesHttpLLMService( + api_key=os.getenv("OPENAI_API_KEY"), + settings=OpenAIResponsesHttpLLMService.Settings( + system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.", + ), + ) + + context = LLMContext() + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + ) + + pipeline = Pipeline( + [ + transport.input(), + stt, + user_aggregator, + llm, + tts, + transport.output(), + assistant_aggregator, + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + context.add_message( + {"role": "developer", "content": "Please introduce yourself to the user."} + ) + await task.queue_frames([LLMRunFrame()]) + + await asyncio.sleep(10) + logger.info("Updating OpenAI LLM settings: temperature=0.1") + await task.queue_frame( + LLMUpdateSettingsFrame(delta=OpenAIResponsesHttpLLMService.Settings(temperature=0.1)) + ) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/scripts/evals/run-release-evals.py b/scripts/evals/run-release-evals.py index cfd4b08ee..9e2305ddc 100644 --- a/scripts/evals/run-release-evals.py +++ b/scripts/evals/run-release-evals.py @@ -149,6 +149,7 @@ TESTS_07 = [ ("07zk-interruptible-resembleai.py", EVAL_SIMPLE_MATH), ("07zl-interruptible-smallest.py", EVAL_SIMPLE_MATH), ("07-interruptible-openai-responses.py", EVAL_SIMPLE_MATH), + ("07-interruptible-openai-responses-http.py", EVAL_SIMPLE_MATH), # Needs a local XTTS docker instance running. # ("07i-interruptible-xtts.py", EVAL_SIMPLE_MATH), ] @@ -156,6 +157,7 @@ TESTS_07 = [ TESTS_12 = [ ("12-describe-image-openai.py", EVAL_VISION_IMAGE(eval_speaks_first=True)), ("12-describe-image-openai-responses.py", EVAL_VISION_IMAGE(eval_speaks_first=True)), + ("12-describe-image-openai-responses-http.py", EVAL_VISION_IMAGE(eval_speaks_first=True)), ("12a-describe-image-anthropic.py", EVAL_VISION_IMAGE(eval_speaks_first=True)), ("12b-describe-image-aws.py", EVAL_VISION_IMAGE(eval_speaks_first=True)), ("12c-describe-image-gemini-flash.py", EVAL_VISION_IMAGE(eval_speaks_first=True)), @@ -192,6 +194,10 @@ TESTS_14 = [ ("14w-function-calling-mistral.py", EVAL_WEATHER), ("14y-function-calling-sarvam.py", EVAL_WEATHER), ("14z-function-calling-novita.py", EVAL_WEATHER), + ("14-function-calling-openai-responses.py", EVAL_WEATHER), + ("14-function-calling-openai-responses.py", EVAL_WEATHER_AND_RESTAURANT), + ("14-function-calling-openai-responses-http.py", EVAL_WEATHER), + ("14-function-calling-openai-responses-http.py", EVAL_WEATHER_AND_RESTAURANT), # Video ("14d-function-calling-anthropic-video.py", EVAL_VISION_CAMERA), ("14d-function-calling-aws-video.py", EVAL_VISION_CAMERA), @@ -199,6 +205,7 @@ TESTS_14 = [ ("14d-function-calling-moondream-video.py", EVAL_VISION_CAMERA), ("14d-function-calling-openai-video.py", EVAL_VISION_CAMERA), ("14d-function-calling-openai-responses-video.py", EVAL_VISION_CAMERA), + ("14d-function-calling-openai-responses-video-http.py", EVAL_VISION_CAMERA), # Currently not working. # ("14c-function-calling-together.py", EVAL_WEATHER), # ("14l-function-calling-deepseek.py", EVAL_WEATHER), diff --git a/src/pipecat/adapters/services/open_ai_responses_adapter.py b/src/pipecat/adapters/services/open_ai_responses_adapter.py index 57398a9f6..3abd22cfd 100644 --- a/src/pipecat/adapters/services/open_ai_responses_adapter.py +++ b/src/pipecat/adapters/services/open_ai_responses_adapter.py @@ -85,16 +85,22 @@ class OpenAIResponsesLLMAdapter(BaseLLMAdapter[OpenAIResponsesLLMInvocationParam # OpenAILLMService (system_instruction + empty messages) need the # instructions converted to an initial developer message. # - # NOTE: if/when we support `previous_response_id` and/or - # `conversation_id`, we'll need to revisit this logic, as it'll + # NOTE: The service layer (OpenAIResponsesLLMService) internally + # manages `previous_response_id` for incremental context delivery + # over WebSocket. This runs post-adapter — the adapter always + # produces the full input list and the service determines what + # subset to send. This empty-input fallback is therefore only + # relevant for one-shot or initial calls. + # + # If we added support for user-provided explicit + # `previous_response_id` and/or `conversation_id` (overriding + # internal management), we'd need to revisit this logic, as it'd # be legit to provide instructions without input items. Worth # noting that OpenAI's docs suggest these parameters are primarily # for development convenience rather than performance (the model # still processes the full context), and come with the tradeoff # of requiring OpenAI-side 30-day conversation storage, which may - # not be desirable for many users. But it could give folks an easy - # way to store/switch between conversations without needing to - # manage that storage themselves. + # not be desirable for many users. if not input_items: params["input"] = [{"role": "developer", "content": system_instruction}] else: diff --git a/src/pipecat/services/openai/responses/llm.py b/src/pipecat/services/openai/responses/llm.py index e9e5d3a1f..7249772cf 100644 --- a/src/pipecat/services/openai/responses/llm.py +++ b/src/pipecat/services/openai/responses/llm.py @@ -4,8 +4,9 @@ # SPDX-License-Identifier: BSD 2-Clause License # -"""OpenAI Responses API LLM service implementation.""" +"""OpenAI Responses API LLM service implementations (WebSocket and HTTP).""" +import hashlib import json from contextlib import asynccontextmanager from dataclasses import dataclass, field @@ -30,10 +31,13 @@ from pipecat.adapters.services.open_ai_responses_adapter import ( OpenAIResponsesLLMInvocationParams, ) from pipecat.frames.frames import ( + CancelFrame, + EndFrame, Frame, LLMContextFrame, LLMFullResponseEndFrame, LLMFullResponseStartFrame, + StartFrame, ) from pipecat.metrics.metrics import LLMTokenUsage from pipecat.processors.aggregators.llm_context import LLMContext @@ -43,10 +47,46 @@ from pipecat.services.settings import NOT_GIVEN as _NOT_GIVEN from pipecat.services.settings import LLMSettings, _NotGiven from pipecat.utils.tracing.service_decorators import traced_llm +try: + from websockets.asyncio.client import connect as websocket_connect + from websockets.exceptions import ConnectionClosed +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error("In order to use OpenAI, you need to `pip install pipecat-ai[openai]`.") + raise Exception(f"Missing module: {e}") + + +# --------------------------------------------------------------------------- +# Private retry exception classes +# --------------------------------------------------------------------------- + + +class _RetryableError(Exception): + """Base for errors that should trigger a retry in _process_context.""" + + pass + + +class _PreviousResponseNotFoundError(_RetryableError): + """Server could not find the previous response (connection-local cache miss).""" + + pass + + +class _ConnectionLimitReachedError(_RetryableError): + """WebSocket connection hit the 60-minute server-side limit.""" + + pass + + +# --------------------------------------------------------------------------- +# Settings +# --------------------------------------------------------------------------- + @dataclass class OpenAIResponsesLLMSettings(LLMSettings): - """Settings for OpenAIResponsesLLMService. + """Settings for OpenAI Responses API LLM services. Parameters: max_completion_tokens: Maximum completion tokens to generate. @@ -55,20 +95,17 @@ class OpenAIResponsesLLMSettings(LLMSettings): max_completion_tokens: int | _NotGiven = field(default_factory=lambda: _NOT_GIVEN) -class OpenAIResponsesLLMService(LLMService): - """OpenAI Responses API LLM service. +# --------------------------------------------------------------------------- +# Shared base class (private) +# --------------------------------------------------------------------------- - This service works with the universal LLMContext and LLMContextAggregatorPair. - Example:: +class _BaseOpenAIResponsesLLMService(LLMService): + """Shared base for HTTP and WebSocket OpenAI Responses API services. - llm = OpenAIResponsesLLMService( - api_key=os.getenv("OPENAI_API_KEY"), - settings=OpenAIResponsesLLMService.Settings( - model="gpt-4.1", - system_instruction="You are a helpful assistant.", - ), - ) + Contains settings, adapter reference, HTTP client creation, parameter + building, ``run_inference``, and metrics support. Subclasses implement + ``process_frame`` and ``_process_context`` for their transport. """ Settings = OpenAIResponsesLLMSettings @@ -124,6 +161,7 @@ class OpenAIResponsesLLMService(LLMService): **kwargs, ) + self._api_key = api_key self._service_tier = service_tier self._client = self._create_client( api_key=api_key, @@ -173,6 +211,556 @@ class OpenAIResponsesLLMService(LLMService): """Check if this service can generate processing metrics.""" return True + def _build_response_params(self, invocation_params: OpenAIResponsesLLMInvocationParams) -> dict: + """Build parameters for a Responses API call. + + Args: + invocation_params: Parameters derived from the LLM context. + + Returns: + Dictionary of parameters for the Responses API call. + """ + params: Dict[str, Any] = { + "model": self._settings.model, + "stream": True, + "store": False, + "input": invocation_params["input"], + } + + # instructions (set by the adapter when input is non-empty) + if "instructions" in invocation_params: + params["instructions"] = invocation_params["instructions"] + + # Optional parameters - only include if given + if isinstance(self._settings.temperature, (int, float)): + params["temperature"] = self._settings.temperature + + if isinstance(self._settings.top_p, (int, float)): + params["top_p"] = self._settings.top_p + + if isinstance(self._settings.max_completion_tokens, int): + params["max_output_tokens"] = self._settings.max_completion_tokens + + if self._service_tier is not None: + params["service_tier"] = self._service_tier + + # Tools + tools = invocation_params.get("tools") + if tools is not None and not isinstance(tools, type(NOT_GIVEN)): + params["tools"] = tools + + # Extra settings + params.update(self._settings.extra) + + return params + + async def run_inference( + self, + context: LLMContext, + max_tokens: Optional[int] = None, + system_instruction: Optional[str] = None, + ) -> Optional[str]: + """Run a one-shot, out-of-band inference with the given LLM context. + + Always uses the HTTP client regardless of transport variant. + + Args: + context: The LLM context containing conversation history. + max_tokens: Optional maximum number of tokens to generate. + system_instruction: Optional system instruction for this inference. + + Returns: + The LLM's response as a string, or None if no response is generated. + """ + adapter: OpenAIResponsesLLMAdapter = self.get_llm_adapter() + effective_instruction = system_instruction or self._settings.system_instruction + invocation_params = adapter.get_llm_invocation_params( + context, system_instruction=effective_instruction + ) + + params = self._build_response_params(invocation_params) + + # Override for non-streaming + params["stream"] = False + + if max_tokens is not None: + params["max_output_tokens"] = max_tokens + + response = await self._client.responses.create(**params) + + return response.output_text + + def _process_function_calls( + self, + context: LLMContext, + function_calls: Dict[str, Dict[str, str]], + ) -> List[FunctionCallFromLLM]: + """Convert accumulated function call data into FunctionCallFromLLM list. + + Args: + context: The LLM context for the current inference. + function_calls: Map of item_id to {name, call_id, arguments}. + + Returns: + List of parsed function call objects. + """ + fc_list: List[FunctionCallFromLLM] = [] + for item_id, fc in function_calls.items(): + try: + arguments = json.loads(fc["arguments"]) if fc["arguments"] else {} + except json.JSONDecodeError: + logger.warning( + f"{self}: Failed to parse function call arguments: {fc['arguments']}" + ) + arguments = {} + fc_list.append( + FunctionCallFromLLM( + context=context, + tool_call_id=fc["call_id"], + function_name=fc["name"], + arguments=arguments, + ) + ) + return fc_list + + +# --------------------------------------------------------------------------- +# WebSocket variant (default / recommended) +# --------------------------------------------------------------------------- + + +class OpenAIResponsesLLMService(_BaseOpenAIResponsesLLMService): + """OpenAI Responses API LLM service using WebSocket transport. + + Maintains a persistent WebSocket connection to ``wss://api.openai.com/v1/responses`` + for lower-latency inference, especially beneficial for tool-call-heavy workflows. + Automatically uses ``previous_response_id`` to send only incremental context when + possible, and falls back to full context on reconnection or cache miss. + + This is the recommended variant for real-time / conversational use. + + Example:: + + llm = OpenAIResponsesLLMService( + api_key=os.getenv("OPENAI_API_KEY"), + settings=OpenAIResponsesLLMService.Settings( + model="gpt-4.1", + system_instruction="You are a helpful assistant.", + ), + ) + """ + + def __init__( + self, + *, + ws_url: str = "wss://api.openai.com/v1/responses", + **kwargs, + ): + """Initialize the WebSocket-based OpenAI Responses API LLM service. + + Args: + ws_url: WebSocket endpoint URL. + Defaults to ``wss://api.openai.com/v1/responses``. + **kwargs: Additional arguments passed to the base class (api_key, + base_url, organization, project, default_headers, service_tier, + settings, etc.). + """ + super().__init__(**kwargs) + + self._ws_url = ws_url + self._websocket = None + self._disconnecting = False + + # State for previous_response_id optimization + self._previous_response_id: Optional[str] = None + self._previous_input_hash: Optional[str] = None + self._previous_input_length: Optional[int] = None + + # -- lifecycle ------------------------------------------------------------ + + async def start(self, frame: StartFrame): + """Start the service and establish WebSocket connection. + + Args: + frame: The start frame triggering service initialization. + """ + await super().start(frame) + await self._connect() + + async def stop(self, frame: EndFrame): + """Stop the service and close WebSocket connection. + + Args: + frame: The end frame triggering service shutdown. + """ + await super().stop(frame) + await self._disconnect() + + async def cancel(self, frame: CancelFrame): + """Cancel the service and close WebSocket connection. + + Args: + frame: The cancel frame triggering service cancellation. + """ + await super().cancel(frame) + await self._disconnect() + + # -- connection management ------------------------------------------------ + + async def _connect(self): + """Establish the WebSocket connection.""" + self._disconnecting = False + try: + if self._websocket: + return + self._websocket = await websocket_connect( + uri=self._ws_url, + additional_headers={ + "Authorization": f"Bearer {self._api_key}", + }, + ) + except Exception as e: + await self.push_error(error_msg=f"Error connecting to WebSocket: {e}", exception=e) + self._websocket = None + + async def _disconnect(self): + """Close the WebSocket connection and clear state.""" + try: + self._disconnecting = True + await self.stop_all_metrics() + if self._websocket: + await self._websocket.close() + self._websocket = None + self._clear_previous_response_state() + self._disconnecting = False + except Exception as e: + await self.push_error(error_msg=f"Error disconnecting from WebSocket: {e}", exception=e) + + async def _reconnect(self): + """Reconnect to the WebSocket, clearing previous_response_id state.""" + await self._disconnect() + await self._connect() + + async def _ensure_connected(self): + """Ensure a WebSocket connection is available, reconnecting if needed. + + Raises: + _RetryableError: If the connection could not be established. + """ + if self._websocket is None: + await self._connect() + if self._websocket is None: + raise _RetryableError("Failed to establish WebSocket connection") + + async def _ws_send(self, message: dict): + """Send a JSON message over the WebSocket. + + Args: + message: The message dict to serialize and send. + """ + if self._disconnecting or not self._websocket: + return + await self._websocket.send(json.dumps(message)) + + # -- previous_response_id optimization ------------------------------------ + + @staticmethod + def _hash_input_items(items: list) -> str: + """Compute a deterministic hash of input items for comparison. + + Args: + items: List of Responses API input items. + + Returns: + Hex digest of the SHA-256 hash. + """ + return hashlib.sha256(json.dumps(items, sort_keys=True).encode()).hexdigest() + + def _apply_previous_response_optimization(self, params: dict, full_input: list) -> dict: + """Try to use previous_response_id to send only new input items. + + If the prefix of ``full_input`` matches the stored hash from the + previous inference call, only new items are sent along with + ``previous_response_id``. Otherwise the full input is sent. + + Args: + params: The response params dict (modified in place). + full_input: The complete input items list from the adapter. + + Returns: + The (possibly modified) params dict. + """ + if ( + self._previous_response_id is not None + and self._previous_input_length is not None + and self._previous_input_hash is not None + and len(full_input) > self._previous_input_length + ): + prefix = full_input[: self._previous_input_length] + prefix_hash = self._hash_input_items(prefix) + if prefix_hash == self._previous_input_hash: + new_items = full_input[self._previous_input_length :] + params["input"] = new_items + params["previous_response_id"] = self._previous_response_id + logger.debug( + f"{self}: Using previous_response_id optimization " + f"({len(new_items)} new items, " + f"{self._previous_input_length} cached)" + ) + return params + + # Full context send (no optimization possible) + return params + + def _store_previous_response_state(self, response_id: str, full_input: list): + """Store state for the next call's previous_response_id optimization. + + Args: + response_id: The response ID returned by the server. + full_input: The complete input items list that was used. + """ + self._previous_response_id = response_id + self._previous_input_length = len(full_input) + self._previous_input_hash = self._hash_input_items(full_input) + + def _clear_previous_response_state(self): + """Clear stored previous_response_id state.""" + self._previous_response_id = None + self._previous_input_length = None + self._previous_input_hash = None + + # -- frame processing ----------------------------------------------------- + + async def process_frame(self, frame: Frame, direction: FrameDirection): + """Process frames for LLM completion requests. + + Args: + frame: The frame to process. + direction: The direction of frame processing. + """ + await super().process_frame(frame, direction) + + context = None + if isinstance(frame, LLMContextFrame): + context = frame.context + else: + await self.push_frame(frame, direction) + + if context: + try: + await self.push_frame(LLMFullResponseStartFrame()) + await self.start_processing_metrics() + await self._process_context(context) + except Exception as e: + await self.push_error(error_msg=f"Error during completion: {e}", exception=e) + finally: + await self.stop_processing_metrics() + await self.push_frame(LLMFullResponseEndFrame()) + + # -- core inference ------------------------------------------------------- + + @traced_llm + async def _process_context(self, context: LLMContext): + """Run inference over WebSocket with retry and previous_response_id. + + Args: + context: The LLM context containing conversation history. + """ + adapter: OpenAIResponsesLLMAdapter = self.get_llm_adapter() + logger.debug( + f"{self}: Generating response from universal context " + f"{adapter.get_messages_for_logging(context)}" + ) + + invocation_params = adapter.get_llm_invocation_params( + context, system_instruction=self._settings.system_instruction + ) + + full_input = invocation_params["input"] + + max_attempts = 2 + for attempt in range(max_attempts): + params = self._build_response_params(invocation_params) + # WebSocket mode does not use the "stream" parameter + params.pop("stream", None) + + # Apply previous_response_id optimization (skipped after a retry) + if attempt == 0: + params = self._apply_previous_response_optimization(params, full_input) + + try: + await self._ensure_connected() + await self.start_ttfb_metrics() + await self._ws_send({"type": "response.create", **params}) + await self._receive_response_events(context, full_input) + return # Success + except _PreviousResponseNotFoundError: + logger.warning( + f"{self}: previous_response_not_found — " + f"retrying with full context ({len(full_input)} items)" + ) + self._clear_previous_response_state() + await self.stop_ttfb_metrics() + if attempt >= max_attempts - 1: + await self.push_error( + error_msg="previous_response_not_found: retry also failed" + ) + return + except _ConnectionLimitReachedError: + logger.warning( + f"{self}: WebSocket connection limit reached — " + f"reconnecting and retrying with full context ({len(full_input)} items)" + ) + self._clear_previous_response_state() + await self.stop_ttfb_metrics() + await self._reconnect() + if attempt >= max_attempts - 1: + await self.push_error(error_msg="WebSocket connection limit: retry also failed") + return + except ConnectionClosed as e: + logger.warning( + f"{self}: WebSocket connection closed during inference: {e} — " + f"reconnecting and retrying with full context ({len(full_input)} items)" + ) + self._clear_previous_response_state() + self._websocket = None + await self.stop_ttfb_metrics() + await self._reconnect() + if attempt >= max_attempts - 1: + await self.push_error( + error_msg=f"WebSocket connection closed: retry also failed: {e}", + exception=e, + ) + return + + async def _receive_response_events(self, context: LLMContext, full_input: list): + """Receive and process WebSocket events until the response completes. + + Args: + context: The LLM context for the current inference. + full_input: The complete input items list (for storing state on success). + + Raises: + _PreviousResponseNotFoundError: Server couldn't find previous response. + _ConnectionLimitReachedError: 60-minute connection limit reached. + ConnectionClosed: WebSocket connection was closed unexpectedly. + """ + function_calls: Dict[str, Dict[str, str]] = {} + current_arguments: Dict[str, str] = {} + + while True: + raw = await self._websocket.recv() + event = json.loads(raw) + event_type = event.get("type") + + if event_type == "response.output_text.delta": + await self.stop_ttfb_metrics() + await self._push_llm_text(event.get("delta", "")) + + elif event_type == "response.output_item.added": + await self.stop_ttfb_metrics() + item = event.get("item", {}) + if item.get("type") == "function_call": + item_id = item.get("id", "") + function_calls[item_id] = { + "name": item.get("name", ""), + "call_id": item.get("call_id", ""), + "arguments": "", + } + current_arguments[item_id] = "" + + elif event_type == "response.function_call_arguments.delta": + item_id = event.get("item_id", "") + if item_id in current_arguments: + current_arguments[item_id] += event.get("delta", "") + + elif event_type == "response.function_call_arguments.done": + item_id = event.get("item_id", "") + if item_id in function_calls: + function_calls[item_id]["arguments"] = event.get("arguments", "") + + elif event_type == "response.output_item.done": + item = event.get("item", {}) + if item.get("type") == "function_call": + item_id = item.get("id", "") + if item_id in function_calls: + function_calls[item_id]["name"] = item.get("name", "") + function_calls[item_id]["call_id"] = item.get("call_id", "") + function_calls[item_id]["arguments"] = item.get("arguments", "") + + elif event_type == "response.completed": + response = event.get("response", {}) + usage = response.get("usage") + if usage: + input_details = usage.get("input_tokens_details") or {} + output_details = usage.get("output_tokens_details") or {} + tokens = LLMTokenUsage( + prompt_tokens=usage.get("input_tokens", 0), + completion_tokens=usage.get("output_tokens", 0), + total_tokens=usage.get("total_tokens", 0), + cache_read_input_tokens=input_details.get("cached_tokens", 0), + reasoning_tokens=output_details.get("reasoning_tokens", 0), + ) + await self.start_llm_usage_metrics(tokens) + + self._full_model_name = response.get("model") + + # Store state for next call's previous_response_id optimization + response_id = response.get("id") + if response_id: + self._store_previous_response_state(response_id, full_input) + + break # Response complete + + elif event_type in ("response.failed", "response.incomplete"): + response = event.get("response", {}) + status_details = response.get("status_details") or {} + error_info = status_details.get("error") or {} + error_msg = error_info.get("message", f"Response {event_type.split('.')[-1]}") + await self.push_error(error_msg=f"LLM response error: {error_msg}") + break + + elif event_type == "error": + error = event.get("error", {}) + code = error.get("code", "") + message = error.get("message", "Unknown error") + + if code == "previous_response_not_found": + raise _PreviousResponseNotFoundError(message) + elif code == "websocket_connection_limit_reached": + raise _ConnectionLimitReachedError(message) + else: + await self.push_error(error_msg=f"WebSocket API error: {message}") + break + + # Process any function calls + if function_calls: + fc_list = self._process_function_calls(context, function_calls) + await self.run_function_calls(fc_list) + + +# --------------------------------------------------------------------------- +# HTTP variant +# --------------------------------------------------------------------------- + + +class OpenAIResponsesHttpLLMService(_BaseOpenAIResponsesLLMService): + """OpenAI Responses API LLM service using HTTP streaming transport. + + Uses server-sent events (SSE) via the OpenAI Python SDK for streaming + inference. Each ``_process_context`` call opens a new HTTP connection. + + Example:: + + llm = OpenAIResponsesHttpLLMService( + api_key=os.getenv("OPENAI_API_KEY"), + settings=OpenAIResponsesHttpLLMService.Settings( + model="gpt-4.1", + system_instruction="You are a helpful assistant.", + ), + ) + """ + async def process_frame(self, frame: Frame, direction: FrameDirection): """Process frames for LLM completion requests. @@ -300,101 +888,12 @@ class OpenAIResponsesLLMService(LLMService): # Process any function calls if function_calls: - fc_list: List[FunctionCallFromLLM] = [] - for item_id, fc in function_calls.items(): - try: - arguments = json.loads(fc["arguments"]) if fc["arguments"] else {} - except json.JSONDecodeError: - logger.warning( - f"{self}: Failed to parse function call arguments: {fc['arguments']}" - ) - arguments = {} - fc_list.append( - FunctionCallFromLLM( - context=context, - tool_call_id=fc["call_id"], - function_name=fc["name"], - arguments=arguments, - ) - ) + fc_list = self._process_function_calls(context, function_calls) await self.run_function_calls(fc_list) - def _build_response_params(self, invocation_params: OpenAIResponsesLLMInvocationParams) -> dict: - """Build parameters for the responses.create() call. - Args: - invocation_params: Parameters derived from the LLM context. - - Returns: - Dictionary of parameters for the Responses API call. - """ - params: Dict[str, Any] = { - "model": self._settings.model, - "stream": True, - "store": False, - "input": invocation_params["input"], - } - - # instructions (set by the adapter when input is non-empty) - if "instructions" in invocation_params: - params["instructions"] = invocation_params["instructions"] - - # Optional parameters - only include if given - if isinstance(self._settings.temperature, (int, float)): - params["temperature"] = self._settings.temperature - - if isinstance(self._settings.top_p, (int, float)): - params["top_p"] = self._settings.top_p - - if isinstance(self._settings.max_completion_tokens, int): - params["max_output_tokens"] = self._settings.max_completion_tokens - - if self._service_tier is not None: - params["service_tier"] = self._service_tier - - # Tools - tools = invocation_params.get("tools") - if tools is not None and not isinstance(tools, type(NOT_GIVEN)): - params["tools"] = tools - - # Extra settings - params.update(self._settings.extra) - - return params - - async def run_inference( - self, - context: LLMContext, - max_tokens: Optional[int] = None, - system_instruction: Optional[str] = None, - ) -> Optional[str]: - """Run a one-shot, out-of-band inference with the given LLM context. - - Args: - context: The LLM context containing conversation history. - max_tokens: Optional maximum number of tokens to generate. - system_instruction: Optional system instruction for this inference. - - Returns: - The LLM's response as a string, or None if no response is generated. - """ - adapter: OpenAIResponsesLLMAdapter = self.get_llm_adapter() - effective_instruction = system_instruction or self._settings.system_instruction - invocation_params = adapter.get_llm_invocation_params( - context, system_instruction=effective_instruction - ) - - params = self._build_response_params(invocation_params) - - # Override for non-streaming - params["stream"] = False - - if max_tokens is not None: - params["max_output_tokens"] = max_tokens - - response = await self._client.responses.create(**params) - - return response.output_text - - -__all__ = ["OpenAIResponsesLLMService", "OpenAIResponsesLLMSettings"] +__all__ = [ + "OpenAIResponsesLLMService", + "OpenAIResponsesHttpLLMService", + "OpenAIResponsesLLMSettings", +] diff --git a/tests/test_openai_responses_websocket.py b/tests/test_openai_responses_websocket.py new file mode 100644 index 000000000..6828578de --- /dev/null +++ b/tests/test_openai_responses_websocket.py @@ -0,0 +1,491 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Tests for the WebSocket variant of OpenAIResponsesLLMService.""" + +import json +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.services.openai.responses.llm import OpenAIResponsesLLMService + + +def _make_service(**kwargs): + """Create a service with the client mocked out.""" + with patch.object(OpenAIResponsesLLMService, "_create_client"): + service = OpenAIResponsesLLMService( + api_key="test-key", + **kwargs, + ) + service._client = AsyncMock() + return service + + +def _ws_events(*events): + """Build a mock WebSocket that yields the given events from recv().""" + ws = AsyncMock() + # .recv() returns each event in order, then raises StopAsyncIteration + ws.recv = AsyncMock(side_effect=[json.dumps(e) for e in events]) + ws.send = AsyncMock() + ws.close = AsyncMock() + ws.close_code = None + return ws + + +# --------------------------------------------------------------------------- +# Hash determinism +# --------------------------------------------------------------------------- + + +class TestHashInputItems: + def test_same_input_same_hash(self): + items = [{"role": "user", "content": "hello"}] + h1 = OpenAIResponsesLLMService._hash_input_items(items) + h2 = OpenAIResponsesLLMService._hash_input_items(items) + assert h1 == h2 + + def test_different_input_different_hash(self): + h1 = OpenAIResponsesLLMService._hash_input_items([{"role": "user", "content": "hello"}]) + h2 = OpenAIResponsesLLMService._hash_input_items([{"role": "user", "content": "world"}]) + assert h1 != h2 + + def test_order_independent_keys(self): + """Keys within a dict should not affect hash (sort_keys=True).""" + h1 = OpenAIResponsesLLMService._hash_input_items([{"a": 1, "b": 2}]) + h2 = OpenAIResponsesLLMService._hash_input_items([{"b": 2, "a": 1}]) + assert h1 == h2 + + +# --------------------------------------------------------------------------- +# previous_response_id optimization +# --------------------------------------------------------------------------- + + +class TestPreviousResponseOptimization: + def test_no_previous_state_sends_full_input(self): + service = _make_service() + full_input = [{"role": "user", "content": "hi"}] + params = {"input": full_input, "model": "gpt-4.1"} + + result = service._apply_previous_response_optimization(params, full_input) + + assert result["input"] == full_input + assert "previous_response_id" not in result + + def test_matching_prefix_sends_incremental(self): + service = _make_service() + prev_input = [{"role": "user", "content": "hi"}] + service._store_previous_response_state("resp_123", prev_input) + + full_input = [ + {"role": "user", "content": "hi"}, + {"role": "assistant", "content": "hello"}, + {"role": "user", "content": "how are you?"}, + ] + params = {"input": list(full_input), "model": "gpt-4.1"} + + result = service._apply_previous_response_optimization(params, full_input) + + assert result["previous_response_id"] == "resp_123" + assert result["input"] == full_input[1:] + + def test_mismatched_prefix_sends_full(self): + service = _make_service() + prev_input = [{"role": "user", "content": "hi"}] + service._store_previous_response_state("resp_123", prev_input) + + # Different first message + full_input = [ + {"role": "user", "content": "different"}, + {"role": "assistant", "content": "hello"}, + ] + params = {"input": list(full_input), "model": "gpt-4.1"} + + result = service._apply_previous_response_optimization(params, full_input) + + assert "previous_response_id" not in result + assert result["input"] == full_input + + def test_same_length_sends_full(self): + """When new input is same length as previous, no optimization.""" + service = _make_service() + prev_input = [{"role": "user", "content": "hi"}] + service._store_previous_response_state("resp_123", prev_input) + + full_input = [{"role": "user", "content": "hi"}] + params = {"input": list(full_input), "model": "gpt-4.1"} + + result = service._apply_previous_response_optimization(params, full_input) + + assert "previous_response_id" not in result + + def test_clear_state(self): + service = _make_service() + service._store_previous_response_state("resp_123", [{"role": "user", "content": "hi"}]) + service._clear_previous_response_state() + + assert service._previous_response_id is None + assert service._previous_input_hash is None + assert service._previous_input_length is None + + +# --------------------------------------------------------------------------- +# _receive_response_events — text streaming +# --------------------------------------------------------------------------- + + +class TestReceiveResponseEventsText: + @pytest.mark.asyncio + async def test_text_deltas_pushed(self): + service = _make_service() + service._push_llm_text = AsyncMock() + service.stop_ttfb_metrics = AsyncMock() + service.start_llm_usage_metrics = AsyncMock() + + ws = _ws_events( + {"type": "response.output_text.delta", "delta": "Hello"}, + {"type": "response.output_text.delta", "delta": " world"}, + { + "type": "response.completed", + "response": { + "id": "resp_1", + "model": "gpt-4.1", + "usage": { + "input_tokens": 10, + "output_tokens": 5, + "total_tokens": 15, + "input_tokens_details": {"cached_tokens": 0}, + "output_tokens_details": {"reasoning_tokens": 0}, + }, + }, + }, + ) + service._websocket = ws + + context = MagicMock(spec=LLMContext) + full_input = [{"role": "user", "content": "hi"}] + await service._receive_response_events(context, full_input) + + assert service._push_llm_text.call_count == 2 + service._push_llm_text.assert_any_await("Hello") + service._push_llm_text.assert_any_await(" world") + + @pytest.mark.asyncio + async def test_response_completed_stores_state(self): + service = _make_service() + service._push_llm_text = AsyncMock() + service.stop_ttfb_metrics = AsyncMock() + service.start_llm_usage_metrics = AsyncMock() + + ws = _ws_events( + { + "type": "response.completed", + "response": { + "id": "resp_42", + "model": "gpt-4.1", + "usage": { + "input_tokens": 10, + "output_tokens": 5, + "total_tokens": 15, + "input_tokens_details": {"cached_tokens": 2}, + "output_tokens_details": {"reasoning_tokens": 1}, + }, + }, + }, + ) + service._websocket = ws + + context = MagicMock(spec=LLMContext) + full_input = [{"role": "user", "content": "hi"}] + await service._receive_response_events(context, full_input) + + assert service._previous_response_id == "resp_42" + assert service._previous_input_length == 1 + assert service._previous_input_hash is not None + assert service.start_llm_usage_metrics.called + + @pytest.mark.asyncio + async def test_token_usage_metrics(self): + service = _make_service() + service._push_llm_text = AsyncMock() + service.stop_ttfb_metrics = AsyncMock() + service.start_llm_usage_metrics = AsyncMock() + + ws = _ws_events( + { + "type": "response.completed", + "response": { + "id": "resp_1", + "model": "gpt-4.1", + "usage": { + "input_tokens": 100, + "output_tokens": 50, + "total_tokens": 150, + "input_tokens_details": {"cached_tokens": 20}, + "output_tokens_details": {"reasoning_tokens": 10}, + }, + }, + }, + ) + service._websocket = ws + + context = MagicMock(spec=LLMContext) + await service._receive_response_events(context, []) + + tokens = service.start_llm_usage_metrics.call_args[0][0] + assert tokens.prompt_tokens == 100 + assert tokens.completion_tokens == 50 + assert tokens.total_tokens == 150 + assert tokens.cache_read_input_tokens == 20 + assert tokens.reasoning_tokens == 10 + + +# --------------------------------------------------------------------------- +# _receive_response_events — function calls +# --------------------------------------------------------------------------- + + +class TestReceiveResponseEventsFunctionCalls: + @pytest.mark.asyncio + async def test_function_call_sequence(self): + service = _make_service() + service._push_llm_text = AsyncMock() + service.stop_ttfb_metrics = AsyncMock() + service.start_llm_usage_metrics = AsyncMock() + service.run_function_calls = AsyncMock() + + ws = _ws_events( + { + "type": "response.output_item.added", + "item": { + "type": "function_call", + "id": "fc_1", + "name": "get_weather", + "call_id": "call_1", + }, + }, + { + "type": "response.function_call_arguments.delta", + "item_id": "fc_1", + "delta": '{"loc', + }, + { + "type": "response.function_call_arguments.delta", + "item_id": "fc_1", + "delta": 'ation": "SF"}', + }, + { + "type": "response.function_call_arguments.done", + "item_id": "fc_1", + "arguments": '{"location": "SF"}', + }, + { + "type": "response.output_item.done", + "item": { + "type": "function_call", + "id": "fc_1", + "name": "get_weather", + "call_id": "call_1", + "arguments": '{"location": "SF"}', + }, + }, + { + "type": "response.completed", + "response": {"id": "resp_1", "model": "gpt-4.1", "usage": None}, + }, + ) + service._websocket = ws + + context = MagicMock(spec=LLMContext) + await service._receive_response_events(context, []) + + service.run_function_calls.assert_called_once() + fc_list = service.run_function_calls.call_args[0][0] + assert len(fc_list) == 1 + assert fc_list[0].function_name == "get_weather" + assert fc_list[0].tool_call_id == "call_1" + assert fc_list[0].arguments == {"location": "SF"} + + +# --------------------------------------------------------------------------- +# _receive_response_events — errors +# --------------------------------------------------------------------------- + + +class TestReceiveResponseEventsErrors: + @pytest.mark.asyncio + async def test_response_failed_pushes_error(self): + service = _make_service() + service.stop_ttfb_metrics = AsyncMock() + service.start_llm_usage_metrics = AsyncMock() + service.push_error = AsyncMock() + + ws = _ws_events( + { + "type": "response.failed", + "response": { + "id": "resp_1", + "status_details": { + "error": {"message": "Content filter triggered"}, + }, + }, + }, + ) + service._websocket = ws + + context = MagicMock(spec=LLMContext) + await service._receive_response_events(context, []) + + service.push_error.assert_called_once() + assert "Content filter triggered" in service.push_error.call_args.kwargs["error_msg"] + + @pytest.mark.asyncio + async def test_response_incomplete_pushes_error(self): + service = _make_service() + service.stop_ttfb_metrics = AsyncMock() + service.start_llm_usage_metrics = AsyncMock() + service.push_error = AsyncMock() + + ws = _ws_events( + { + "type": "response.incomplete", + "response": {"id": "resp_1", "status_details": None}, + }, + ) + service._websocket = ws + + context = MagicMock(spec=LLMContext) + await service._receive_response_events(context, []) + + service.push_error.assert_called_once() + + @pytest.mark.asyncio + async def test_previous_response_not_found_raises(self): + from pipecat.services.openai.responses.llm import _PreviousResponseNotFoundError + + service = _make_service() + service.stop_ttfb_metrics = AsyncMock() + + ws = _ws_events( + { + "type": "error", + "error": { + "code": "previous_response_not_found", + "message": "Previous response with id 'resp_abc' not found.", + }, + }, + ) + service._websocket = ws + + context = MagicMock(spec=LLMContext) + with pytest.raises(_PreviousResponseNotFoundError): + await service._receive_response_events(context, []) + + @pytest.mark.asyncio + async def test_connection_limit_reached_raises(self): + from pipecat.services.openai.responses.llm import _ConnectionLimitReachedError + + service = _make_service() + service.stop_ttfb_metrics = AsyncMock() + + ws = _ws_events( + { + "type": "error", + "error": { + "code": "websocket_connection_limit_reached", + "message": "Connection limit reached.", + }, + }, + ) + service._websocket = ws + + context = MagicMock(spec=LLMContext) + with pytest.raises(_ConnectionLimitReachedError): + await service._receive_response_events(context, []) + + @pytest.mark.asyncio + async def test_generic_error_pushes_error(self): + service = _make_service() + service.stop_ttfb_metrics = AsyncMock() + service.start_llm_usage_metrics = AsyncMock() + service.push_error = AsyncMock() + + ws = _ws_events( + { + "type": "error", + "error": { + "code": "server_error", + "message": "Internal server error", + }, + }, + ) + service._websocket = ws + + context = MagicMock(spec=LLMContext) + await service._receive_response_events(context, []) + + service.push_error.assert_called_once() + assert "Internal server error" in service.push_error.call_args.kwargs["error_msg"] + + +# --------------------------------------------------------------------------- +# Connection lifecycle +# --------------------------------------------------------------------------- + + +class TestConnectionLifecycle: + @pytest.mark.asyncio + async def test_disconnect_clears_previous_response_state(self): + service = _make_service() + service._store_previous_response_state("resp_1", [{"role": "user", "content": "hi"}]) + service.stop_all_metrics = AsyncMock() + + await service._disconnect() + + assert service._previous_response_id is None + assert service._previous_input_hash is None + assert service._previous_input_length is None + + @pytest.mark.asyncio + async def test_reconnect_clears_state_and_reconnects(self): + service = _make_service() + service._store_previous_response_state("resp_1", [{"role": "user", "content": "hi"}]) + service.stop_all_metrics = AsyncMock() + service.push_error = AsyncMock() + + # Mock connect to set a websocket + mock_ws = AsyncMock() + mock_ws.close = AsyncMock() + service._websocket = mock_ws + + with patch( + "pipecat.services.openai.responses.llm.websocket_connect", + new_callable=AsyncMock, + return_value=AsyncMock(), + ): + await service._reconnect() + + assert service._previous_response_id is None + mock_ws.close.assert_called_once() + + @pytest.mark.asyncio + async def test_ensure_connected_raises_on_failure(self): + from pipecat.services.openai.responses.llm import _RetryableError + + service = _make_service() + service._websocket = None + service.push_error = AsyncMock() + + # Mock connect to fail + with patch( + "pipecat.services.openai.responses.llm.websocket_connect", + new_callable=AsyncMock, + side_effect=Exception("Connection refused"), + ): + with pytest.raises(_RetryableError): + await service._ensure_connected() diff --git a/tests/test_run_inference.py b/tests/test_run_inference.py index bb1868973..ae9d30f8f 100644 --- a/tests/test_run_inference.py +++ b/tests/test_run_inference.py @@ -20,7 +20,10 @@ from pipecat.services.anthropic.llm import AnthropicLLMService from pipecat.services.aws.llm import AWSBedrockLLMService from pipecat.services.google.llm import GoogleLLMService from pipecat.services.openai.llm import OpenAILLMService -from pipecat.services.openai.responses.llm import OpenAIResponsesLLMService +from pipecat.services.openai.responses.llm import ( + OpenAIResponsesHttpLLMService, + OpenAIResponsesLLMService, +) @pytest.mark.asyncio @@ -945,3 +948,168 @@ async def test_openai_responses_run_inference_system_instruction_param_with_empt {"role": "developer", "content": "Summarize the conversation"} ] assert "instructions" not in call_kwargs + + +# --- OpenAI Responses HTTP API tests --- +# These mirror the WebSocket variant tests above, verifying that the HTTP +# variant's run_inference (inherited from the shared base class) works +# identically. + + +@pytest.mark.asyncio +async def test_openai_responses_http_run_inference_with_llm_context(): + """Test run_inference with LLMContext returns expected response (HTTP variant).""" + with patch.object(OpenAIResponsesHttpLLMService, "_create_client"): + service = OpenAIResponsesHttpLLMService( + settings=OpenAIResponsesHttpLLMService.Settings( + model="gpt-4.1", + system_instruction="You are a helpful assistant", + temperature=0.7, + max_completion_tokens=100, + ), + ) + service._client = AsyncMock() + + context = LLMContext( + messages=[ + {"role": "user", "content": "Hello, world!"}, + ] + ) + + mock_response = MagicMock() + mock_response.output_text = "Hello! How can I help you today?" + service._client.responses.create = AsyncMock(return_value=mock_response) + + result = await service.run_inference(context) + + assert result == "Hello! How can I help you today?" + call_kwargs = service._client.responses.create.call_args.kwargs + assert call_kwargs["model"] == "gpt-4.1" + assert call_kwargs["stream"] is False + assert call_kwargs["store"] is False + assert call_kwargs["input"] == [{"role": "user", "content": "Hello, world!"}] + assert call_kwargs["instructions"] == "You are a helpful assistant" + assert call_kwargs["temperature"] == 0.7 + assert call_kwargs["max_output_tokens"] == 100 + + +@pytest.mark.asyncio +async def test_openai_responses_http_run_inference_client_exception(): + """Test that exceptions from the client are propagated (HTTP variant).""" + with patch.object(OpenAIResponsesHttpLLMService, "_create_client"): + service = OpenAIResponsesHttpLLMService() + service._client = AsyncMock() + + context = LLMContext(messages=[{"role": "user", "content": "Hello"}]) + service._client.responses.create = AsyncMock(side_effect=Exception("API Error")) + + with pytest.raises(Exception, match="API Error"): + await service.run_inference(context) + + +@pytest.mark.asyncio +async def test_openai_responses_http_run_inference_system_instruction_overrides(): + """Test that system_instruction parameter overrides the settings instruction (HTTP variant).""" + with patch.object(OpenAIResponsesHttpLLMService, "_create_client"): + service = OpenAIResponsesHttpLLMService( + settings=OpenAIResponsesHttpLLMService.Settings( + model="gpt-4.1", + system_instruction="Original instruction", + ), + ) + service._client = AsyncMock() + + context = LLMContext( + messages=[{"role": "user", "content": "Hello"}], + ) + + mock_response = MagicMock() + mock_response.output_text = "Response" + service._client.responses.create = AsyncMock(return_value=mock_response) + + result = await service.run_inference(context, system_instruction="New system instruction") + + assert result == "Response" + call_kwargs = service._client.responses.create.call_args.kwargs + assert call_kwargs["instructions"] == "New system instruction" + assert call_kwargs["input"] == [{"role": "user", "content": "Hello"}] + + +@pytest.mark.asyncio +async def test_openai_responses_http_run_inference_empty_context_with_instruction(): + """Test that system_instruction becomes a developer message when context is empty (HTTP).""" + with patch.object(OpenAIResponsesHttpLLMService, "_create_client"): + service = OpenAIResponsesHttpLLMService( + settings=OpenAIResponsesHttpLLMService.Settings( + model="gpt-4.1", + system_instruction="You are helpful", + ), + ) + service._client = AsyncMock() + + context = LLMContext(messages=[]) + + mock_response = MagicMock() + mock_response.output_text = "Response" + service._client.responses.create = AsyncMock(return_value=mock_response) + + result = await service.run_inference(context) + + assert result == "Response" + call_kwargs = service._client.responses.create.call_args.kwargs + assert call_kwargs["input"] == [{"role": "developer", "content": "You are helpful"}] + assert "instructions" not in call_kwargs + + +@pytest.mark.asyncio +async def test_openai_responses_http_run_inference_max_tokens_override(): + """Test that max_tokens parameter overrides max_output_tokens (HTTP variant).""" + with patch.object(OpenAIResponsesHttpLLMService, "_create_client"): + service = OpenAIResponsesHttpLLMService( + settings=OpenAIResponsesHttpLLMService.Settings( + model="gpt-4.1", + max_completion_tokens=500, + ), + ) + service._client = AsyncMock() + + context = LLMContext( + messages=[{"role": "user", "content": "Summarize this"}], + ) + + mock_response = MagicMock() + mock_response.output_text = "Summary" + service._client.responses.create = AsyncMock(return_value=mock_response) + + result = await service.run_inference(context, max_tokens=200) + + assert result == "Summary" + call_kwargs = service._client.responses.create.call_args.kwargs + assert call_kwargs["max_output_tokens"] == 200 + + +@pytest.mark.asyncio +async def test_openai_responses_http_run_inference_system_instruction_param_with_empty_context(): + """Test system_instruction param becomes developer message for empty context (HTTP).""" + with patch.object(OpenAIResponsesHttpLLMService, "_create_client"): + service = OpenAIResponsesHttpLLMService( + settings=OpenAIResponsesHttpLLMService.Settings(model="gpt-4.1"), + ) + service._client = AsyncMock() + + context = LLMContext(messages=[]) + + mock_response = MagicMock() + mock_response.output_text = "Response" + service._client.responses.create = AsyncMock(return_value=mock_response) + + result = await service.run_inference( + context, system_instruction="Summarize the conversation" + ) + + assert result == "Response" + call_kwargs = service._client.responses.create.call_args.kwargs + assert call_kwargs["input"] == [ + {"role": "developer", "content": "Summarize the conversation"} + ] + assert "instructions" not in call_kwargs