Skip server-known output items in previous_response_id optimization
When using previous_response_id, the server already knows its own output from the previous response. Store the raw response output and, on the next call, compare it against the items following the matched input prefix — checking role and text content for messages, and call_id for function calls. If the items match, skip them and send only truly new input (user messages, tool results). Falls back to full context if either the prefix or the output comparison fails.
This commit is contained in:
@@ -375,6 +375,7 @@ class OpenAIResponsesLLMService(_BaseOpenAIResponsesLLMService):
|
||||
self._previous_response_id: Optional[str] = None
|
||||
self._previous_input_hash: Optional[str] = None
|
||||
self._previous_input_length: Optional[int] = None
|
||||
self._previous_response_output: Optional[list] = None
|
||||
|
||||
# -- lifecycle ------------------------------------------------------------
|
||||
|
||||
@@ -499,35 +500,112 @@ class OpenAIResponsesLLMService(_BaseOpenAIResponsesLLMService):
|
||||
prefix = full_input[: self._previous_input_length]
|
||||
prefix_hash = self._hash_input_items(prefix)
|
||||
if prefix_hash == self._previous_input_hash:
|
||||
new_items = full_input[self._previous_input_length :]
|
||||
params["input"] = new_items
|
||||
params["previous_response_id"] = self._previous_response_id
|
||||
logger.debug(
|
||||
f"{self}: Using previous_response_id optimization "
|
||||
f"({len(new_items)} new items, "
|
||||
f"{self._previous_input_length} cached)"
|
||||
)
|
||||
return params
|
||||
items_after_prefix = full_input[self._previous_input_length :]
|
||||
response_output = self._previous_response_output or []
|
||||
|
||||
# Full context send (no optimization possible)
|
||||
if self._starts_with_response_output(items_after_prefix, response_output):
|
||||
# The server already knows its own output — skip those items
|
||||
items_to_send = items_after_prefix[len(response_output) :]
|
||||
cached = self._previous_input_length + len(response_output)
|
||||
params["input"] = items_to_send
|
||||
params["previous_response_id"] = self._previous_response_id
|
||||
logger.debug(
|
||||
f"{self}: Sending incremental context via previous_response_id "
|
||||
f"({len(items_to_send)} new items, {cached} cached)"
|
||||
)
|
||||
return params
|
||||
|
||||
logger.debug(f"{self}: Sending full context ({len(full_input)} items)")
|
||||
return params
|
||||
|
||||
def _store_previous_response_state(self, response_id: str, full_input: list):
|
||||
@staticmethod
|
||||
def _starts_with_response_output(items: list, response_output: list) -> bool:
|
||||
"""Check whether ``items`` begins with entries that match ``response_output``.
|
||||
|
||||
When using ``previous_response_id``, the server already knows its own
|
||||
output. After confirming that the input prefix matches what we
|
||||
previously sent, this method checks whether the items immediately
|
||||
following that prefix correspond to the server's response output.
|
||||
If they do, those items can be skipped so we send only the truly
|
||||
new items (user messages, tool results, etc.).
|
||||
|
||||
For messages, the comparison checks role and text content (extracting
|
||||
text from the output's ``output_text`` content parts and comparing
|
||||
against the input's content). For function calls, it matches by
|
||||
``call_id``. This avoids requiring exact format equality while
|
||||
still confirming the items represent the same data. If the match
|
||||
fails for any reason, the caller falls back to sending the full
|
||||
context.
|
||||
|
||||
Args:
|
||||
items: The input items following the matched prefix.
|
||||
response_output: Raw ``output`` array from the previous
|
||||
``response.completed`` event.
|
||||
|
||||
Returns:
|
||||
True if the leading items correspond to the response output.
|
||||
"""
|
||||
if len(items) < len(response_output):
|
||||
return False
|
||||
|
||||
for output_item, input_item in zip(response_output, items):
|
||||
output_type = output_item.get("type")
|
||||
if output_type == "message":
|
||||
if input_item.get("role") != output_item.get("role", "assistant"):
|
||||
return False
|
||||
# Extract text from the output's content array and compare
|
||||
# against the input's content (which the adapter stores as
|
||||
# a plain string for simple text responses).
|
||||
output_content = output_item.get("content", [])
|
||||
if isinstance(output_content, list):
|
||||
output_text = "".join(
|
||||
p.get("text", "") for p in output_content if p.get("type") == "output_text"
|
||||
)
|
||||
else:
|
||||
output_text = str(output_content)
|
||||
input_content = input_item.get("content", "")
|
||||
if isinstance(input_content, list):
|
||||
# Adapter may produce multimodal content parts
|
||||
input_text = "".join(
|
||||
p.get("text", "") for p in input_content if p.get("type") == "input_text"
|
||||
)
|
||||
else:
|
||||
input_text = str(input_content)
|
||||
if output_text != input_text:
|
||||
return False
|
||||
elif output_type == "function_call":
|
||||
if input_item.get("type") != "function_call" or input_item.get(
|
||||
"call_id"
|
||||
) != output_item.get("call_id"):
|
||||
return False
|
||||
else:
|
||||
# Unknown output type — can't confirm match
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def _store_previous_response_state(
|
||||
self, response_id: str, full_input: list, response_output: list
|
||||
):
|
||||
"""Store state for the next call's previous_response_id optimization.
|
||||
|
||||
Args:
|
||||
response_id: The response ID returned by the server.
|
||||
full_input: The complete input items list that was used.
|
||||
full_input: The complete input items list that was sent.
|
||||
response_output: Raw ``output`` array from the ``response.completed``
|
||||
event, stored for loose comparison on the next call.
|
||||
"""
|
||||
self._previous_response_id = response_id
|
||||
self._previous_input_length = len(full_input)
|
||||
self._previous_input_hash = self._hash_input_items(full_input)
|
||||
self._previous_response_output = response_output
|
||||
|
||||
def _clear_previous_response_state(self):
|
||||
"""Clear stored previous_response_id state."""
|
||||
self._previous_response_id = None
|
||||
self._previous_input_length = None
|
||||
self._previous_input_hash = None
|
||||
self._previous_response_output = None
|
||||
|
||||
# -- frame processing -----------------------------------------------------
|
||||
|
||||
@@ -705,10 +783,13 @@ class OpenAIResponsesLLMService(_BaseOpenAIResponsesLLMService):
|
||||
|
||||
self._full_model_name = response.get("model")
|
||||
|
||||
# Store state for next call's previous_response_id optimization
|
||||
# Store state for next call's previous_response_id optimization.
|
||||
# Include the response output so the hash covers the assistant's
|
||||
# reply — the server already knows it, so we won't resend it.
|
||||
response_id = response.get("id")
|
||||
if response_id:
|
||||
self._store_previous_response_state(response_id, full_input)
|
||||
response_output = response.get("output") or []
|
||||
self._store_previous_response_state(response_id, full_input, response_output)
|
||||
|
||||
break # Response complete
|
||||
|
||||
|
||||
@@ -61,6 +61,107 @@ class TestHashInputItems:
|
||||
assert h1 == h2
|
||||
|
||||
|
||||
class TestStartsWithResponseOutput:
|
||||
def test_text_message_matches_by_role(self):
|
||||
response_output = [
|
||||
{
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"content": [{"type": "output_text", "text": "Hello!"}],
|
||||
}
|
||||
]
|
||||
# Adapter produces a different format, but same role
|
||||
items = [{"role": "assistant", "content": "Hello!"}, {"role": "user", "content": "hi"}]
|
||||
assert OpenAIResponsesLLMService._starts_with_response_output(items, response_output)
|
||||
|
||||
def test_function_call_matches_by_call_id(self):
|
||||
response_output = [
|
||||
{
|
||||
"type": "function_call",
|
||||
"id": "fc_1",
|
||||
"call_id": "call_1",
|
||||
"name": "get_weather",
|
||||
"arguments": '{"location": "SF"}',
|
||||
}
|
||||
]
|
||||
# Adapter format (no "id" field)
|
||||
items = [
|
||||
{
|
||||
"type": "function_call",
|
||||
"call_id": "call_1",
|
||||
"name": "get_weather",
|
||||
"arguments": "{}",
|
||||
},
|
||||
{"type": "function_call_output", "call_id": "call_1", "output": "sunny"},
|
||||
]
|
||||
assert OpenAIResponsesLLMService._starts_with_response_output(items, response_output)
|
||||
|
||||
def test_mixed_output(self):
|
||||
response_output = [
|
||||
{
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"content": [{"type": "output_text", "text": "Let me check."}],
|
||||
},
|
||||
{
|
||||
"type": "function_call",
|
||||
"id": "fc_1",
|
||||
"call_id": "call_1",
|
||||
"name": "get_weather",
|
||||
"arguments": "{}",
|
||||
},
|
||||
]
|
||||
items = [
|
||||
{"role": "assistant", "content": "Let me check."},
|
||||
{
|
||||
"type": "function_call",
|
||||
"call_id": "call_1",
|
||||
"name": "get_weather",
|
||||
"arguments": "{}",
|
||||
},
|
||||
{"type": "function_call_output", "call_id": "call_1", "output": "sunny"},
|
||||
]
|
||||
assert OpenAIResponsesLLMService._starts_with_response_output(items, response_output)
|
||||
|
||||
def test_role_mismatch(self):
|
||||
response_output = [{"type": "message", "role": "assistant", "content": []}]
|
||||
items = [{"role": "user", "content": "hi"}]
|
||||
assert not OpenAIResponsesLLMService._starts_with_response_output(items, response_output)
|
||||
|
||||
def test_text_content_mismatch(self):
|
||||
response_output = [
|
||||
{
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"content": [{"type": "output_text", "text": "Hello!"}],
|
||||
}
|
||||
]
|
||||
items = [{"role": "assistant", "content": "Something completely different"}]
|
||||
assert not OpenAIResponsesLLMService._starts_with_response_output(items, response_output)
|
||||
|
||||
def test_call_id_mismatch(self):
|
||||
response_output = [{"type": "function_call", "call_id": "call_1", "name": "f"}]
|
||||
items = [{"type": "function_call", "call_id": "call_999", "name": "f"}]
|
||||
assert not OpenAIResponsesLLMService._starts_with_response_output(items, response_output)
|
||||
|
||||
def test_too_few_items(self):
|
||||
response_output = [
|
||||
{"type": "message", "role": "assistant", "content": []},
|
||||
{"type": "function_call", "call_id": "call_1", "name": "f"},
|
||||
]
|
||||
items = [{"role": "assistant", "content": "hi"}]
|
||||
assert not OpenAIResponsesLLMService._starts_with_response_output(items, response_output)
|
||||
|
||||
def test_empty_output_always_matches(self):
|
||||
assert OpenAIResponsesLLMService._starts_with_response_output([], [])
|
||||
assert OpenAIResponsesLLMService._starts_with_response_output([{"role": "user"}], [])
|
||||
|
||||
def test_unknown_output_type_rejects(self):
|
||||
response_output = [{"type": "unknown_thing", "data": "something"}]
|
||||
items = [{"role": "assistant", "content": "hi"}]
|
||||
assert not OpenAIResponsesLLMService._starts_with_response_output(items, response_output)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# previous_response_id optimization
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -79,9 +180,18 @@ class TestPreviousResponseOptimization:
|
||||
|
||||
def test_matching_prefix_sends_incremental(self):
|
||||
service = _make_service()
|
||||
# Simulate: sent [user_msg], got assistant reply "hello"
|
||||
prev_input = [{"role": "user", "content": "hi"}]
|
||||
service._store_previous_response_state("resp_123", prev_input)
|
||||
prev_output = [
|
||||
{
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"content": [{"type": "output_text", "text": "hello"}],
|
||||
}
|
||||
]
|
||||
service._store_previous_response_state("resp_123", prev_input, prev_output)
|
||||
|
||||
# Next call: adapter produces full context including assistant reply + new user msg
|
||||
full_input = [
|
||||
{"role": "user", "content": "hi"},
|
||||
{"role": "assistant", "content": "hello"},
|
||||
@@ -92,12 +202,13 @@ class TestPreviousResponseOptimization:
|
||||
result = service._apply_previous_response_optimization(params, full_input)
|
||||
|
||||
assert result["previous_response_id"] == "resp_123"
|
||||
assert result["input"] == full_input[1:]
|
||||
# Only the new user message should be sent
|
||||
assert result["input"] == [{"role": "user", "content": "how are you?"}]
|
||||
|
||||
def test_mismatched_prefix_sends_full(self):
|
||||
service = _make_service()
|
||||
prev_input = [{"role": "user", "content": "hi"}]
|
||||
service._store_previous_response_state("resp_123", prev_input)
|
||||
service._store_previous_response_state("resp_123", prev_input, [])
|
||||
|
||||
# Different first message
|
||||
full_input = [
|
||||
@@ -115,7 +226,7 @@ class TestPreviousResponseOptimization:
|
||||
"""When new input is same length as previous, no optimization."""
|
||||
service = _make_service()
|
||||
prev_input = [{"role": "user", "content": "hi"}]
|
||||
service._store_previous_response_state("resp_123", prev_input)
|
||||
service._store_previous_response_state("resp_123", prev_input, [])
|
||||
|
||||
full_input = [{"role": "user", "content": "hi"}]
|
||||
params = {"input": list(full_input), "model": "gpt-4.1"}
|
||||
@@ -124,9 +235,35 @@ class TestPreviousResponseOptimization:
|
||||
|
||||
assert "previous_response_id" not in result
|
||||
|
||||
def test_output_mismatch_sends_full_context(self):
|
||||
"""When prefix matches but output doesn't, fall back to full context."""
|
||||
service = _make_service()
|
||||
prev_input = [{"role": "user", "content": "hi"}]
|
||||
prev_output = [
|
||||
{
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"content": [{"type": "output_text", "text": "hello"}],
|
||||
}
|
||||
]
|
||||
service._store_previous_response_state("resp_123", prev_input, prev_output)
|
||||
|
||||
# Aggregator stored the output differently (e.g. different role)
|
||||
full_input = [
|
||||
{"role": "user", "content": "hi"},
|
||||
{"role": "developer", "content": "something unexpected"},
|
||||
{"role": "user", "content": "how are you?"},
|
||||
]
|
||||
params = {"input": list(full_input), "model": "gpt-4.1"}
|
||||
|
||||
result = service._apply_previous_response_optimization(params, full_input)
|
||||
|
||||
assert "previous_response_id" not in result
|
||||
assert result["input"] == full_input
|
||||
|
||||
def test_clear_state(self):
|
||||
service = _make_service()
|
||||
service._store_previous_response_state("resp_123", [{"role": "user", "content": "hi"}])
|
||||
service._store_previous_response_state("resp_123", [{"role": "user", "content": "hi"}], [])
|
||||
service._clear_previous_response_state()
|
||||
|
||||
assert service._previous_response_id is None
|
||||
@@ -188,6 +325,13 @@ class TestReceiveResponseEventsText:
|
||||
"response": {
|
||||
"id": "resp_42",
|
||||
"model": "gpt-4.1",
|
||||
"output": [
|
||||
{
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"content": [{"type": "output_text", "text": "Hello!"}],
|
||||
}
|
||||
],
|
||||
"usage": {
|
||||
"input_tokens": 10,
|
||||
"output_tokens": 5,
|
||||
@@ -207,6 +351,7 @@ class TestReceiveResponseEventsText:
|
||||
assert service._previous_response_id == "resp_42"
|
||||
assert service._previous_input_length == 1
|
||||
assert service._previous_input_hash is not None
|
||||
assert len(service._previous_response_output) == 1
|
||||
assert service.start_llm_usage_metrics.called
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -442,7 +587,7 @@ class TestConnectionLifecycle:
|
||||
@pytest.mark.asyncio
|
||||
async def test_disconnect_clears_previous_response_state(self):
|
||||
service = _make_service()
|
||||
service._store_previous_response_state("resp_1", [{"role": "user", "content": "hi"}])
|
||||
service._store_previous_response_state("resp_1", [{"role": "user", "content": "hi"}], [])
|
||||
service.stop_all_metrics = AsyncMock()
|
||||
|
||||
await service._disconnect()
|
||||
@@ -454,7 +599,7 @@ class TestConnectionLifecycle:
|
||||
@pytest.mark.asyncio
|
||||
async def test_reconnect_clears_state_and_reconnects(self):
|
||||
service = _make_service()
|
||||
service._store_previous_response_state("resp_1", [{"role": "user", "content": "hi"}])
|
||||
service._store_previous_response_state("resp_1", [{"role": "user", "content": "hi"}], [])
|
||||
service.stop_all_metrics = AsyncMock()
|
||||
service.push_error = AsyncMock()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user