Add reasoning support to OpenAIRealtimeLLMService for gpt-realtime-2

This commit is contained in:
Paul Kompfner
2026-05-12 13:53:43 -04:00
parent 007fa3a3a8
commit a52bdef32b
5 changed files with 316 additions and 1 deletions

1
changelog/4470.added.md Normal file
View File

@@ -0,0 +1 @@
- Added support for `reasoning` configuration on `OpenAIRealtimeLLMService`, for use with reasoning-capable Realtime models such as `gpt-realtime-2`.

View File

@@ -232,6 +232,20 @@ Remember, your responses should be short. Just one or two sentences, usually. Re
# [LLMUpdateSettingsFrame(settings=SessionProperties(tools=new_tools).model_dump())]
# )
# Reasoning effort can be changed at runtime too. Only
# reasoning-capable Realtime models (e.g. gpt-realtime-2) support this.
# await task.queue_frames(
# [
# LLMUpdateSettingsFrame(
# delta=OpenAIRealtimeLLMService.Settings(
# session_properties=SessionProperties(
# reasoning=Reasoning(effort="xhigh"),
# ),
# )
# )
# ]
# )
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")

View File

@@ -164,6 +164,19 @@ class AudioConfiguration(BaseModel):
output: AudioOutput | None = None
class Reasoning(BaseModel):
"""Reasoning configuration for reasoning-capable Realtime models (e.g. ``gpt-realtime-2``).
Parameters:
effort: How much reasoning effort the model should apply. ``None``
(the default) leaves the field unset and lets the server pick.
"""
# ``| str`` for forward compatibility: if OpenAI adds new effort levels,
# users can pass the new string without waiting for a Pipecat release.
effort: Literal["minimal", "low", "medium", "high", "xhigh"] | str | None = None
class SessionProperties(BaseModel):
"""Configuration properties for an OpenAI Realtime session.
@@ -184,6 +197,8 @@ class SessionProperties(BaseModel):
prompt: Reference to a prompt template and its variables.
expires_at: Session expiration timestamp.
include: Additional fields to include in server outputs.
reasoning: Reasoning configuration. Only supported by reasoning-capable
Realtime models such as ``gpt-realtime-2``.
"""
# Needed to support ToolSchema in tools field.
@@ -206,6 +221,7 @@ class SessionProperties(BaseModel):
prompt: dict | None = None
expires_at: int | None = None
include: list[str] | None = None
reasoning: Reasoning | None = None
#

View File

@@ -321,6 +321,10 @@ class OpenAIRealtimeLLMService(LLMService[OpenAIRealtimeLLMAdapter]):
self._messages_added_manually = {}
self._pending_function_calls = {} # Track function calls by call_id
self._completed_tool_calls = set()
# Whether we've already emitted the "stripping `reasoning`" warning
# for this service instance. The Realtime API doesn't allow swapping
# the model mid-session, so once is enough.
self._reasoning_strip_warned = False
self._register_event_handler("on_conversation_item_created")
self._register_event_handler("on_conversation_item_updated")
@@ -670,6 +674,32 @@ class OpenAIRealtimeLLMService(LLMService[OpenAIRealtimeLLMAdapter]):
self._warn_unhandled_updated_settings(changed.keys() - handled)
return changed
# Substrings used to recognize reasoning-capable Realtime models. Substring
# match (rather than exact equality) so date-versioned variants of the same
# base model also match without code changes. Extend this tuple as OpenAI
# ships more reasoning-capable Realtime models.
_REASONING_CAPABLE_MODEL_SUBSTRINGS = ("gpt-realtime-2",)
def _strip_unsupported_reasoning(
self, settings: events.SessionProperties
) -> events.SessionProperties:
"""Drop ``reasoning`` from an outgoing session.update if the model can't use it.
The server otherwise rejects the whole update and kills the session.
Returns a copy when stripping; the user's stored config is preserved.
"""
if settings.reasoning is None or not settings.model:
return settings
if any(s in settings.model for s in self._REASONING_CAPABLE_MODEL_SUBSTRINGS):
return settings
if not self._reasoning_strip_warned:
logger.warning(
f"{self} stripping `reasoning` from session.update: model={settings.model!r} "
f"isn't a known reasoning-capable Realtime model."
)
self._reasoning_strip_warned = True
return settings.model_copy(update={"reasoning": None})
async def _send_session_update(self):
settings = assert_given(self._settings.session_properties)
adapter = self.get_llm_adapter()
@@ -695,7 +725,9 @@ class OpenAIRealtimeLLMService(LLMService[OpenAIRealtimeLLMAdapter]):
if settings.tools and isinstance(settings.tools, ToolsSchema):
settings.tools = adapter.from_standard_tools(settings.tools)
await self.send_client_event(events.SessionUpdateEvent(session=settings))
outgoing = self._strip_unsupported_reasoning(settings)
await self.send_client_event(events.SessionUpdateEvent(session=outgoing))
#
# inbound server event handling

View File

@@ -0,0 +1,252 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Tests for OpenAI Realtime reasoning support (gpt-realtime-2).
Covers:
- ``SessionProperties.reasoning`` round-trips through Pydantic.
- Compatibility heuristic warns when reasoning is configured on a model
that isn't known to support it, and stays quiet otherwise.
- Runtime ``LLMUpdateSettingsFrame`` carrying reasoning triggers a
``session.update`` and the outgoing event includes the new reasoning.
"""
import io
import pytest
from loguru import logger
from pipecat.frames.frames import LLMUpdateSettingsFrame
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.openai.realtime import events
from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService
# ---------------------------------------------------------------------------
# Pure data: SessionProperties round-trip
# ---------------------------------------------------------------------------
def test_session_properties_accepts_reasoning_object():
sp = events.SessionProperties(reasoning=events.Reasoning(effort="high"))
assert sp.reasoning is not None
assert sp.reasoning.effort == "high"
def test_session_properties_coerces_reasoning_dict():
"""Pydantic coerces nested dicts into Reasoning automatically."""
sp = events.SessionProperties.model_validate({"reasoning": {"effort": "low"}})
assert isinstance(sp.reasoning, events.Reasoning)
assert sp.reasoning.effort == "low"
def test_reasoning_accepts_future_effort_strings():
"""Forward compat: unknown effort strings pass through (the field accepts ``| str``)."""
r = events.Reasoning(effort="ultra") # not in the today's Literal set
assert r.effort == "ultra"
def test_reasoning_serializes_into_session_update():
"""Confirm the wire shape sent to OpenAI matches the documented schema."""
sp = events.SessionProperties(reasoning=events.Reasoning(effort="medium"))
dumped = events.SessionUpdateEvent(session=sp).model_dump(exclude_none=True)
assert dumped["session"]["reasoning"] == {"effort": "medium"}
# ---------------------------------------------------------------------------
# Strip-on-the-client compatibility behavior
# ---------------------------------------------------------------------------
def _capture_warnings():
"""Attach a fresh loguru sink that captures WARNING-and-above messages."""
sink = io.StringIO()
handler_id = logger.add(sink, level="WARNING", format="{message}")
return sink, handler_id
class _EventRecorder:
def __init__(self):
self.events: list[events.ClientEvent] = []
async def __call__(self, event: events.ClientEvent):
self.events.append(event)
async def _send_and_capture(service: OpenAIRealtimeLLMService) -> events.SessionProperties:
"""Run ``_send_session_update`` and return the outgoing session payload."""
sent = _EventRecorder()
service.send_client_event = sent
await service._send_session_update()
session_updates = [e for e in sent.events if isinstance(e, events.SessionUpdateEvent)]
assert len(session_updates) == 1
return session_updates[0].session
@pytest.mark.asyncio
async def test_outgoing_session_update_strips_reasoning_on_unsupported_model():
service = OpenAIRealtimeLLMService(
api_key="test-key",
settings=OpenAIRealtimeLLMService.Settings(
model="gpt-realtime-1.5",
system_instruction="be helpful",
session_properties=events.SessionProperties(
reasoning=events.Reasoning(effort="high"),
),
),
)
sink, handler_id = _capture_warnings()
try:
outgoing = await _send_and_capture(service)
finally:
logger.remove(handler_id)
# Stripped on the wire.
assert outgoing.reasoning is None
# Warning surfaced for visibility.
text = sink.getvalue()
assert "stripping `reasoning`" in text
assert "gpt-realtime-1.5" in text
# Stored config preserved — strip happens on a copy.
assert service._settings.session_properties.reasoning is not None
assert service._settings.session_properties.reasoning.effort == "high"
@pytest.mark.asyncio
async def test_outgoing_session_update_keeps_reasoning_on_supported_model():
service = OpenAIRealtimeLLMService(
api_key="test-key",
settings=OpenAIRealtimeLLMService.Settings(
model="gpt-realtime-2",
system_instruction="be helpful",
session_properties=events.SessionProperties(
reasoning=events.Reasoning(effort="high"),
),
),
)
sink, handler_id = _capture_warnings()
try:
outgoing = await _send_and_capture(service)
finally:
logger.remove(handler_id)
assert outgoing.reasoning is not None
assert outgoing.reasoning.effort == "high"
assert sink.getvalue() == ""
@pytest.mark.asyncio
async def test_supported_model_variant_keeps_reasoning():
"""Substring match covers variants of a supported base model (e.g. date suffixes)."""
service = OpenAIRealtimeLLMService(
api_key="test-key",
settings=OpenAIRealtimeLLMService.Settings(
model="gpt-realtime-2-some-variant",
system_instruction="be helpful",
session_properties=events.SessionProperties(
reasoning=events.Reasoning(effort="high"),
),
),
)
sink, handler_id = _capture_warnings()
try:
outgoing = await _send_and_capture(service)
finally:
logger.remove(handler_id)
assert outgoing.reasoning is not None
assert outgoing.reasoning.effort == "high"
assert sink.getvalue() == ""
@pytest.mark.asyncio
async def test_no_warning_when_reasoning_is_unset_on_unsupported_model():
service = OpenAIRealtimeLLMService(
api_key="test-key",
settings=OpenAIRealtimeLLMService.Settings(
model="gpt-realtime-1.5",
system_instruction="be helpful",
),
)
sink, handler_id = _capture_warnings()
try:
outgoing = await _send_and_capture(service)
finally:
logger.remove(handler_id)
assert outgoing.reasoning is None
assert sink.getvalue() == ""
# ---------------------------------------------------------------------------
# Runtime updates via LLMUpdateSettingsFrame
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_runtime_settings_update_with_reasoning_triggers_session_update():
service = OpenAIRealtimeLLMService(
api_key="test-key",
settings=OpenAIRealtimeLLMService.Settings(
model="gpt-realtime-2",
system_instruction="be helpful",
session_properties=events.SessionProperties(
reasoning=events.Reasoning(effort="low"),
),
),
)
sent = _EventRecorder()
service.send_client_event = sent
# Send a runtime update that changes reasoning effort.
new_sp = events.SessionProperties(reasoning=events.Reasoning(effort="high"))
delta = OpenAIRealtimeLLMService.Settings(session_properties=new_sp)
await service.process_frame(
LLMUpdateSettingsFrame(delta=delta),
FrameDirection.DOWNSTREAM,
)
session_updates = [e for e in sent.events if isinstance(e, events.SessionUpdateEvent)]
assert len(session_updates) == 1
sent_session = session_updates[0].session
assert sent_session.reasoning is not None
assert sent_session.reasoning.effort == "high"
@pytest.mark.asyncio
async def test_runtime_settings_update_strips_reasoning_on_unsupported_model():
"""Runtime updates honor the same strip-on-the-client rule as init-time."""
service = OpenAIRealtimeLLMService(
api_key="test-key",
settings=OpenAIRealtimeLLMService.Settings(
model="gpt-realtime-1.5",
system_instruction="be helpful",
),
)
sent = _EventRecorder()
service.send_client_event = sent
new_sp = events.SessionProperties(reasoning=events.Reasoning(effort="high"))
delta = OpenAIRealtimeLLMService.Settings(session_properties=new_sp)
sink, handler_id = _capture_warnings()
try:
await service.process_frame(
LLMUpdateSettingsFrame(delta=delta),
FrameDirection.DOWNSTREAM,
)
finally:
logger.remove(handler_id)
session_updates = [e for e in sent.events if isinstance(e, events.SessionUpdateEvent)]
assert len(session_updates) == 1
assert session_updates[0].session.reasoning is None
assert "stripping `reasoning`" in sink.getvalue()