diff --git a/src/pipecat/services/openai/responses/llm.py b/src/pipecat/services/openai/responses/llm.py index 7249772cf..3745e40b9 100644 --- a/src/pipecat/services/openai/responses/llm.py +++ b/src/pipecat/services/openai/responses/llm.py @@ -375,6 +375,7 @@ class OpenAIResponsesLLMService(_BaseOpenAIResponsesLLMService): self._previous_response_id: Optional[str] = None self._previous_input_hash: Optional[str] = None self._previous_input_length: Optional[int] = None + self._previous_response_output: Optional[list] = None # -- lifecycle ------------------------------------------------------------ @@ -499,35 +500,112 @@ class OpenAIResponsesLLMService(_BaseOpenAIResponsesLLMService): prefix = full_input[: self._previous_input_length] prefix_hash = self._hash_input_items(prefix) if prefix_hash == self._previous_input_hash: - new_items = full_input[self._previous_input_length :] - params["input"] = new_items - params["previous_response_id"] = self._previous_response_id - logger.debug( - f"{self}: Using previous_response_id optimization " - f"({len(new_items)} new items, " - f"{self._previous_input_length} cached)" - ) - return params + items_after_prefix = full_input[self._previous_input_length :] + response_output = self._previous_response_output or [] - # Full context send (no optimization possible) + if self._starts_with_response_output(items_after_prefix, response_output): + # The server already knows its own output — skip those items + items_to_send = items_after_prefix[len(response_output) :] + cached = self._previous_input_length + len(response_output) + params["input"] = items_to_send + params["previous_response_id"] = self._previous_response_id + logger.debug( + f"{self}: Sending incremental context via previous_response_id " + f"({len(items_to_send)} new items, {cached} cached)" + ) + return params + + logger.debug(f"{self}: Sending full context ({len(full_input)} items)") return params - def _store_previous_response_state(self, response_id: str, full_input: list): + @staticmethod + def _starts_with_response_output(items: list, response_output: list) -> bool: + """Check whether ``items`` begins with entries that match ``response_output``. + + When using ``previous_response_id``, the server already knows its own + output. After confirming that the input prefix matches what we + previously sent, this method checks whether the items immediately + following that prefix correspond to the server's response output. + If they do, those items can be skipped so we send only the truly + new items (user messages, tool results, etc.). + + For messages, the comparison checks role and text content (extracting + text from the output's ``output_text`` content parts and comparing + against the input's content). For function calls, it matches by + ``call_id``. This avoids requiring exact format equality while + still confirming the items represent the same data. If the match + fails for any reason, the caller falls back to sending the full + context. + + Args: + items: The input items following the matched prefix. + response_output: Raw ``output`` array from the previous + ``response.completed`` event. + + Returns: + True if the leading items correspond to the response output. + """ + if len(items) < len(response_output): + return False + + for output_item, input_item in zip(response_output, items): + output_type = output_item.get("type") + if output_type == "message": + if input_item.get("role") != output_item.get("role", "assistant"): + return False + # Extract text from the output's content array and compare + # against the input's content (which the adapter stores as + # a plain string for simple text responses). + output_content = output_item.get("content", []) + if isinstance(output_content, list): + output_text = "".join( + p.get("text", "") for p in output_content if p.get("type") == "output_text" + ) + else: + output_text = str(output_content) + input_content = input_item.get("content", "") + if isinstance(input_content, list): + # Adapter may produce multimodal content parts + input_text = "".join( + p.get("text", "") for p in input_content if p.get("type") == "input_text" + ) + else: + input_text = str(input_content) + if output_text != input_text: + return False + elif output_type == "function_call": + if input_item.get("type") != "function_call" or input_item.get( + "call_id" + ) != output_item.get("call_id"): + return False + else: + # Unknown output type — can't confirm match + return False + + return True + + def _store_previous_response_state( + self, response_id: str, full_input: list, response_output: list + ): """Store state for the next call's previous_response_id optimization. Args: response_id: The response ID returned by the server. - full_input: The complete input items list that was used. + full_input: The complete input items list that was sent. + response_output: Raw ``output`` array from the ``response.completed`` + event, stored for loose comparison on the next call. """ self._previous_response_id = response_id self._previous_input_length = len(full_input) self._previous_input_hash = self._hash_input_items(full_input) + self._previous_response_output = response_output def _clear_previous_response_state(self): """Clear stored previous_response_id state.""" self._previous_response_id = None self._previous_input_length = None self._previous_input_hash = None + self._previous_response_output = None # -- frame processing ----------------------------------------------------- @@ -705,10 +783,13 @@ class OpenAIResponsesLLMService(_BaseOpenAIResponsesLLMService): self._full_model_name = response.get("model") - # Store state for next call's previous_response_id optimization + # Store state for next call's previous_response_id optimization. + # Include the response output so the hash covers the assistant's + # reply — the server already knows it, so we won't resend it. response_id = response.get("id") if response_id: - self._store_previous_response_state(response_id, full_input) + response_output = response.get("output") or [] + self._store_previous_response_state(response_id, full_input, response_output) break # Response complete diff --git a/tests/test_openai_responses_websocket.py b/tests/test_openai_responses_websocket.py index 6828578de..ef69ec2bd 100644 --- a/tests/test_openai_responses_websocket.py +++ b/tests/test_openai_responses_websocket.py @@ -61,6 +61,107 @@ class TestHashInputItems: assert h1 == h2 +class TestStartsWithResponseOutput: + def test_text_message_matches_by_role(self): + response_output = [ + { + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "Hello!"}], + } + ] + # Adapter produces a different format, but same role + items = [{"role": "assistant", "content": "Hello!"}, {"role": "user", "content": "hi"}] + assert OpenAIResponsesLLMService._starts_with_response_output(items, response_output) + + def test_function_call_matches_by_call_id(self): + response_output = [ + { + "type": "function_call", + "id": "fc_1", + "call_id": "call_1", + "name": "get_weather", + "arguments": '{"location": "SF"}', + } + ] + # Adapter format (no "id" field) + items = [ + { + "type": "function_call", + "call_id": "call_1", + "name": "get_weather", + "arguments": "{}", + }, + {"type": "function_call_output", "call_id": "call_1", "output": "sunny"}, + ] + assert OpenAIResponsesLLMService._starts_with_response_output(items, response_output) + + def test_mixed_output(self): + response_output = [ + { + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "Let me check."}], + }, + { + "type": "function_call", + "id": "fc_1", + "call_id": "call_1", + "name": "get_weather", + "arguments": "{}", + }, + ] + items = [ + {"role": "assistant", "content": "Let me check."}, + { + "type": "function_call", + "call_id": "call_1", + "name": "get_weather", + "arguments": "{}", + }, + {"type": "function_call_output", "call_id": "call_1", "output": "sunny"}, + ] + assert OpenAIResponsesLLMService._starts_with_response_output(items, response_output) + + def test_role_mismatch(self): + response_output = [{"type": "message", "role": "assistant", "content": []}] + items = [{"role": "user", "content": "hi"}] + assert not OpenAIResponsesLLMService._starts_with_response_output(items, response_output) + + def test_text_content_mismatch(self): + response_output = [ + { + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "Hello!"}], + } + ] + items = [{"role": "assistant", "content": "Something completely different"}] + assert not OpenAIResponsesLLMService._starts_with_response_output(items, response_output) + + def test_call_id_mismatch(self): + response_output = [{"type": "function_call", "call_id": "call_1", "name": "f"}] + items = [{"type": "function_call", "call_id": "call_999", "name": "f"}] + assert not OpenAIResponsesLLMService._starts_with_response_output(items, response_output) + + def test_too_few_items(self): + response_output = [ + {"type": "message", "role": "assistant", "content": []}, + {"type": "function_call", "call_id": "call_1", "name": "f"}, + ] + items = [{"role": "assistant", "content": "hi"}] + assert not OpenAIResponsesLLMService._starts_with_response_output(items, response_output) + + def test_empty_output_always_matches(self): + assert OpenAIResponsesLLMService._starts_with_response_output([], []) + assert OpenAIResponsesLLMService._starts_with_response_output([{"role": "user"}], []) + + def test_unknown_output_type_rejects(self): + response_output = [{"type": "unknown_thing", "data": "something"}] + items = [{"role": "assistant", "content": "hi"}] + assert not OpenAIResponsesLLMService._starts_with_response_output(items, response_output) + + # --------------------------------------------------------------------------- # previous_response_id optimization # --------------------------------------------------------------------------- @@ -79,9 +180,18 @@ class TestPreviousResponseOptimization: def test_matching_prefix_sends_incremental(self): service = _make_service() + # Simulate: sent [user_msg], got assistant reply "hello" prev_input = [{"role": "user", "content": "hi"}] - service._store_previous_response_state("resp_123", prev_input) + prev_output = [ + { + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "hello"}], + } + ] + service._store_previous_response_state("resp_123", prev_input, prev_output) + # Next call: adapter produces full context including assistant reply + new user msg full_input = [ {"role": "user", "content": "hi"}, {"role": "assistant", "content": "hello"}, @@ -92,12 +202,13 @@ class TestPreviousResponseOptimization: result = service._apply_previous_response_optimization(params, full_input) assert result["previous_response_id"] == "resp_123" - assert result["input"] == full_input[1:] + # Only the new user message should be sent + assert result["input"] == [{"role": "user", "content": "how are you?"}] def test_mismatched_prefix_sends_full(self): service = _make_service() prev_input = [{"role": "user", "content": "hi"}] - service._store_previous_response_state("resp_123", prev_input) + service._store_previous_response_state("resp_123", prev_input, []) # Different first message full_input = [ @@ -115,7 +226,7 @@ class TestPreviousResponseOptimization: """When new input is same length as previous, no optimization.""" service = _make_service() prev_input = [{"role": "user", "content": "hi"}] - service._store_previous_response_state("resp_123", prev_input) + service._store_previous_response_state("resp_123", prev_input, []) full_input = [{"role": "user", "content": "hi"}] params = {"input": list(full_input), "model": "gpt-4.1"} @@ -124,9 +235,35 @@ class TestPreviousResponseOptimization: assert "previous_response_id" not in result + def test_output_mismatch_sends_full_context(self): + """When prefix matches but output doesn't, fall back to full context.""" + service = _make_service() + prev_input = [{"role": "user", "content": "hi"}] + prev_output = [ + { + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "hello"}], + } + ] + service._store_previous_response_state("resp_123", prev_input, prev_output) + + # Aggregator stored the output differently (e.g. different role) + full_input = [ + {"role": "user", "content": "hi"}, + {"role": "developer", "content": "something unexpected"}, + {"role": "user", "content": "how are you?"}, + ] + params = {"input": list(full_input), "model": "gpt-4.1"} + + result = service._apply_previous_response_optimization(params, full_input) + + assert "previous_response_id" not in result + assert result["input"] == full_input + def test_clear_state(self): service = _make_service() - service._store_previous_response_state("resp_123", [{"role": "user", "content": "hi"}]) + service._store_previous_response_state("resp_123", [{"role": "user", "content": "hi"}], []) service._clear_previous_response_state() assert service._previous_response_id is None @@ -188,6 +325,13 @@ class TestReceiveResponseEventsText: "response": { "id": "resp_42", "model": "gpt-4.1", + "output": [ + { + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "Hello!"}], + } + ], "usage": { "input_tokens": 10, "output_tokens": 5, @@ -207,6 +351,7 @@ class TestReceiveResponseEventsText: assert service._previous_response_id == "resp_42" assert service._previous_input_length == 1 assert service._previous_input_hash is not None + assert len(service._previous_response_output) == 1 assert service.start_llm_usage_metrics.called @pytest.mark.asyncio @@ -442,7 +587,7 @@ class TestConnectionLifecycle: @pytest.mark.asyncio async def test_disconnect_clears_previous_response_state(self): service = _make_service() - service._store_previous_response_state("resp_1", [{"role": "user", "content": "hi"}]) + service._store_previous_response_state("resp_1", [{"role": "user", "content": "hi"}], []) service.stop_all_metrics = AsyncMock() await service._disconnect() @@ -454,7 +599,7 @@ class TestConnectionLifecycle: @pytest.mark.asyncio async def test_reconnect_clears_state_and_reconnects(self): service = _make_service() - service._store_previous_response_state("resp_1", [{"role": "user", "content": "hi"}]) + service._store_previous_response_state("resp_1", [{"role": "user", "content": "hi"}], []) service.stop_all_metrics = AsyncMock() service.push_error = AsyncMock()