From 2d7fc2b70082d13dcee5cef8407e26751f09881c Mon Sep 17 00:00:00 2001 From: Xin Wang Date: Tue, 10 Feb 2026 19:17:45 +0800 Subject: [PATCH] Add server tool timeout protection --- engine/core/duplex_pipeline.py | 14 ++++++++- engine/tests/test_tool_call_flow.py | 47 +++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/engine/core/duplex_pipeline.py b/engine/core/duplex_pipeline.py index 51f76de..2fa2948 100644 --- a/engine/core/duplex_pipeline.py +++ b/engine/core/duplex_pipeline.py @@ -58,6 +58,7 @@ class DuplexPipeline: _SENTENCE_CLOSERS = frozenset({'"', "'", "”", "’", ")", "]", "}", ")", "】", "」", "』", "》"}) _MIN_SPLIT_SPOKEN_CHARS = 6 _TOOL_WAIT_TIMEOUT_SECONDS = 15.0 + _SERVER_TOOL_TIMEOUT_SECONDS = 15.0 _DEFAULT_TOOL_SCHEMAS: Dict[str, Dict[str, Any]] = { "search": { "name": "search", @@ -984,7 +985,18 @@ class DuplexPipeline: tool_results.append(result) continue - result = await execute_server_tool(call) + try: + result = await asyncio.wait_for( + execute_server_tool(call), + timeout=self._SERVER_TOOL_TIMEOUT_SECONDS, + ) + except asyncio.TimeoutError: + result = { + "tool_call_id": call_id, + "name": self._tool_name(call) or "unknown_tool", + "output": {"message": "server tool timeout"}, + "status": {"code": 504, "message": "server_tool_timeout"}, + } await self._emit_tool_result(result, source="server") tool_results.append(result) diff --git a/engine/tests/test_tool_call_flow.py b/engine/tests/test_tool_call_flow.py index 6f70828..d21e73a 100644 --- a/engine/tests/test_tool_call_flow.py +++ b/engine/tests/test_tool_call_flow.py @@ -252,3 +252,50 @@ async def test_server_calculator_emits_tool_result(monkeypatch): payload = tool_results[-1].get("result", {}) assert payload.get("status", {}).get("code") == 200 assert payload.get("output", {}).get("result") == 3 + + +@pytest.mark.asyncio +async def test_server_tool_timeout_emits_504_and_continues(monkeypatch): + async def _slow_execute(_call): + await asyncio.sleep(0.05) + return { + "tool_call_id": "call_slow", + "name": "weather", + "output": {"ok": True}, + "status": {"code": 200, "message": "ok"}, + } + + monkeypatch.setattr("core.duplex_pipeline.execute_server_tool", _slow_execute) + + pipeline, events = _build_pipeline( + monkeypatch, + [ + [ + LLMStreamEvent( + type="tool_call", + tool_call={ + "id": "call_slow", + "executor": "server", + "type": "function", + "function": {"name": "weather", "arguments": "{\"city\":\"hz\"}"}, + }, + ), + LLMStreamEvent(type="done"), + ], + [ + LLMStreamEvent(type="text_delta", text="timeout fallback."), + LLMStreamEvent(type="done"), + ], + ], + ) + pipeline._SERVER_TOOL_TIMEOUT_SECONDS = 0.01 + + await pipeline._handle_turn("weather?") + + tool_results = [e for e in events if e.get("type") == "assistant.tool_result"] + assert tool_results + payload = tool_results[-1].get("result", {}) + assert payload.get("status", {}).get("code") == 504 + finals = [e for e in events if e.get("type") == "assistant.response.final"] + assert finals + assert "timeout fallback" in finals[-1].get("text", "")