From d942c85eff1b8b08e917277cf79633afe7b55b8a Mon Sep 17 00:00:00 2001 From: Xin Wang Date: Fri, 27 Feb 2026 13:59:37 +0800 Subject: [PATCH] Add new tools to DuplexPipeline: calculator, code_interpreter, turn_on_camera, turn_off_camera, increase_volume, and decrease_volume. Implement fallback schema for unknown string tools and assign default client executors for specific tools. Update tests to validate new functionality and ensure correct tool handling in the pipeline. --- engine/core/duplex_pipeline.py | 104 +++++++++++++++++++++++++- engine/tests/test_tool_call_flow.py | 25 ++++++- web/pages/Assistants.tsx | 112 ++++++++++++++++++++++------ 3 files changed, 214 insertions(+), 27 deletions(-) diff --git a/engine/core/duplex_pipeline.py b/engine/core/duplex_pipeline.py index 44d3cad..e879012 100644 --- a/engine/core/duplex_pipeline.py +++ b/engine/core/duplex_pipeline.py @@ -84,7 +84,75 @@ class DuplexPipeline: "required": [], }, }, + "calculator": { + "name": "calculator", + "description": "Execute a math expression", + "parameters": { + "type": "object", + "properties": { + "expression": {"type": "string", "description": "Math expression, e.g. 2 + 3 * 4"}, + }, + "required": ["expression"], + }, + }, + "code_interpreter": { + "name": "code_interpreter", + "description": "Safely evaluate a Python expression", + "parameters": { + "type": "object", + "properties": { + "code": {"type": "string", "description": "Python expression to evaluate"}, + }, + "required": ["code"], + }, + }, + "turn_on_camera": { + "name": "turn_on_camera", + "description": "Turn on client camera", + "parameters": { + "type": "object", + "properties": {}, + "required": [], + }, + }, + "turn_off_camera": { + "name": "turn_off_camera", + "description": "Turn off client camera", + "parameters": { + "type": "object", + "properties": {}, + "required": [], + }, + }, + "increase_volume": { + "name": "increase_volume", + "description": "Increase client volume", + "parameters": { + "type": "object", + "properties": { + "step": {"type": "integer", "description": "Volume increase step, default 1"}, + }, + "required": [], + }, + }, + "decrease_volume": { + "name": "decrease_volume", + "description": "Decrease client volume", + "parameters": { + "type": "object", + "properties": { + "step": {"type": "integer", "description": "Volume decrease step, default 1"}, + }, + "required": [], + }, + }, } + _DEFAULT_CLIENT_EXECUTORS = frozenset({ + "turn_on_camera", + "turn_off_camera", + "increase_volume", + "decrease_volume", + }) def __init__( self, @@ -1313,9 +1381,14 @@ class DuplexPipeline: def _resolved_tool_schemas(self) -> List[Dict[str, Any]]: schemas: List[Dict[str, Any]] = [] + seen: set[str] = set() for item in self._runtime_tools: if isinstance(item, str): - base = self._DEFAULT_TOOL_SCHEMAS.get(item) + tool_name = item.strip() + if not tool_name or tool_name in seen: + continue + seen.add(tool_name) + base = self._DEFAULT_TOOL_SCHEMAS.get(tool_name) if base: schemas.append( { @@ -1327,6 +1400,17 @@ class DuplexPipeline: }, } ) + else: + schemas.append( + { + "type": "function", + "function": { + "name": tool_name, + "description": f"Execute tool '{tool_name}'", + "parameters": {"type": "object", "properties": {}}, + }, + } + ) continue if not isinstance(item, dict): @@ -1334,12 +1418,15 @@ class DuplexPipeline: fn = item.get("function") if isinstance(fn, dict) and fn.get("name"): - fn_name = str(fn.get("name")) + fn_name = str(fn.get("name")).strip() + if not fn_name or fn_name in seen: + continue + seen.add(fn_name) schemas.append( { "type": "function", "function": { - "name": str(fn.get("name")), + "name": fn_name, "description": str(fn.get("description") or item.get("description") or ""), "parameters": fn.get("parameters") or {"type": "object", "properties": {}}, }, @@ -1348,11 +1435,15 @@ class DuplexPipeline: continue if item.get("name"): + item_name = str(item.get("name")).strip() + if not item_name or item_name in seen: + continue + seen.add(item_name) schemas.append( { "type": "function", "function": { - "name": str(item.get("name")), + "name": item_name, "description": str(item.get("description") or ""), "parameters": item.get("parameters") or {"type": "object", "properties": {}}, }, @@ -1363,6 +1454,11 @@ class DuplexPipeline: def _resolved_tool_executor_map(self) -> Dict[str, str]: result: Dict[str, str] = {} for item in self._runtime_tools: + if isinstance(item, str): + name = item.strip() + if name in self._DEFAULT_CLIENT_EXECUTORS: + result[name] = "client" + continue if not isinstance(item, dict): continue fn = item.get("function") diff --git a/engine/tests/test_tool_call_flow.py b/engine/tests/test_tool_call_flow.py index 6337edd..8e44815 100644 --- a/engine/tests/test_tool_call_flow.py +++ b/engine/tests/test_tool_call_flow.py @@ -97,6 +97,7 @@ def test_pipeline_uses_default_tools_from_settings(monkeypatch): "core.duplex_pipeline.settings.tools", [ "current_time", + "calculator", { "name": "weather", "description": "Get weather by city", @@ -112,14 +113,36 @@ def test_pipeline_uses_default_tools_from_settings(monkeypatch): pipeline, _events = _build_pipeline(monkeypatch, [[LLMStreamEvent(type="done")]]) cfg = pipeline.resolved_runtime_config() - assert cfg["tools"]["allowlist"] == ["current_time", "weather"] + assert cfg["tools"]["allowlist"] == ["calculator", "current_time", "weather"] schemas = pipeline._resolved_tool_schemas() names = [s.get("function", {}).get("name") for s in schemas if isinstance(s, dict)] assert "current_time" in names + assert "calculator" in names assert "weather" in names +def test_pipeline_exposes_unknown_string_tools_with_fallback_schema(monkeypatch): + monkeypatch.setattr("core.duplex_pipeline.settings.tools", ["custom_system_cmd"]) + pipeline, _events = _build_pipeline(monkeypatch, [[LLMStreamEvent(type="done")]]) + + schemas = pipeline._resolved_tool_schemas() + tool_schema = next((s for s in schemas if s.get("function", {}).get("name") == "custom_system_cmd"), None) + assert tool_schema is not None + assert tool_schema.get("function", {}).get("parameters", {}).get("type") == "object" + + +def test_pipeline_assigns_default_client_executor_for_system_string_tools(monkeypatch): + monkeypatch.setattr("core.duplex_pipeline.settings.tools", ["increase_volume"]) + pipeline, _events = _build_pipeline(monkeypatch, [[LLMStreamEvent(type="done")]]) + + tool_call = { + "type": "function", + "function": {"name": "increase_volume", "arguments": "{}"}, + } + assert pipeline._tool_executor(tool_call) == "client" + + @pytest.mark.asyncio async def test_ws_message_parses_tool_call_results(): msg = parse_client_message( diff --git a/web/pages/Assistants.tsx b/web/pages/Assistants.tsx index dadae74..ea72b26 100644 --- a/web/pages/Assistants.tsx +++ b/web/pages/Assistants.tsx @@ -2042,6 +2042,7 @@ export const DebugDrawer: React.FC<{ const micFrameBufferRef = useRef(new Uint8Array(0)); const userDraftIndexRef = useRef(null); const lastUserFinalRef = useRef(''); + const debugVolumePercentRef = useRef(50); const selectedToolSchemas = useMemo(() => { const ids = assistant.tools || []; if (!ids.length) return []; @@ -2916,33 +2917,100 @@ export const DebugDrawer: React.FC<{ }, ]); if (executor === 'client' && toolCallId && ws.readyState === WebSocket.OPEN) { + let parsedArgs: Record = {}; + if (rawArgs) { + try { + const candidate = JSON.parse(rawArgs); + parsedArgs = candidate && typeof candidate === 'object' ? candidate : {}; + } catch { + parsedArgs = {}; + } + } const resultPayload: any = { tool_call_id: toolCallId, name: toolName, - output: { - message: 'Client tool execution is not implemented in debug web client', - }, + output: { message: `Unhandled client tool '${toolName}'` }, status: { code: 501, message: 'not_implemented' }, }; - ws.send( - JSON.stringify({ - type: 'tool_call.results', - results: [resultPayload], - }) - ); - const statusCode = Number(resultPayload?.status?.code || 500); - const statusMessage = String(resultPayload?.status?.message || 'error'); - const resultText = - statusCode === 200 && typeof resultPayload?.output?.result === 'number' - ? `result ${toolName} = ${resultPayload.output.result}` - : `result ${toolName} status=${statusCode} ${statusMessage}`; - setMessages((prev) => [ - ...prev, - { - role: 'tool', - text: resultText, - }, - ]); + const sendToolResult = () => { + ws.send( + JSON.stringify({ + type: 'tool_call.results', + results: [resultPayload], + }) + ); + const statusCode = Number(resultPayload?.status?.code || 500); + const statusMessage = String(resultPayload?.status?.message || 'error'); + const resultText = + statusCode === 200 && typeof resultPayload?.output?.result === 'number' + ? `result ${toolName} = ${resultPayload.output.result}` + : `result ${toolName} status=${statusCode} ${statusMessage}`; + setMessages((prev) => [ + ...prev, + { + role: 'tool', + text: resultText, + }, + ]); + }; + try { + if (toolName === 'turn_on_camera') { + navigator.mediaDevices + .getUserMedia({ + video: selectedCamera ? { deviceId: { exact: selectedCamera } } : true, + audio: false, + }) + .then((stream) => { + if (videoRef.current) videoRef.current.srcObject = stream; + streamRef.current = stream; + resultPayload.output = { + message: 'camera_on', + tracks: stream.getVideoTracks().length, + }; + resultPayload.status = { code: 200, message: 'ok' }; + sendToolResult(); + }) + .catch((err) => { + resultPayload.output = { + message: `Client tool '${toolName}' failed`, + error: err instanceof Error ? err.message : String(err), + }; + resultPayload.status = { code: 500, message: 'client_tool_failed' }; + sendToolResult(); + }); + return; + } else if (toolName === 'turn_off_camera') { + stopMedia(); + if (videoRef.current) videoRef.current.srcObject = null; + resultPayload.output = { message: 'camera_off' }; + resultPayload.status = { code: 200, message: 'ok' }; + } else if (toolName === 'increase_volume') { + const rawStep = Number(parsedArgs?.step); + const step = Number.isFinite(rawStep) ? Math.max(1, Math.floor(rawStep)) : 1; + debugVolumePercentRef.current = Math.min(100, debugVolumePercentRef.current + step); + resultPayload.output = { + message: 'volume_increased', + level: debugVolumePercentRef.current, + }; + resultPayload.status = { code: 200, message: 'ok' }; + } else if (toolName === 'decrease_volume') { + const rawStep = Number(parsedArgs?.step); + const step = Number.isFinite(rawStep) ? Math.max(1, Math.floor(rawStep)) : 1; + debugVolumePercentRef.current = Math.max(0, debugVolumePercentRef.current - step); + resultPayload.output = { + message: 'volume_decreased', + level: debugVolumePercentRef.current, + }; + resultPayload.status = { code: 200, message: 'ok' }; + } + } catch (err) { + resultPayload.output = { + message: `Client tool '${toolName}' failed`, + error: err instanceof Error ? err.message : String(err), + }; + resultPayload.status = { code: 500, message: 'client_tool_failed' }; + } + sendToolResult(); } return; }