119 lines
3.7 KiB
Python
119 lines
3.7 KiB
Python
"""Server-side tool execution helpers."""
|
|
|
|
import ast
|
|
import operator
|
|
from typing import Any, Dict
|
|
|
|
|
|
_BIN_OPS = {
|
|
ast.Add: operator.add,
|
|
ast.Sub: operator.sub,
|
|
ast.Mult: operator.mul,
|
|
ast.Div: operator.truediv,
|
|
ast.Mod: operator.mod,
|
|
}
|
|
|
|
_UNARY_OPS = {
|
|
ast.UAdd: operator.pos,
|
|
ast.USub: operator.neg,
|
|
}
|
|
|
|
|
|
def _safe_eval_expr(expression: str) -> float:
|
|
tree = ast.parse(expression, mode="eval")
|
|
|
|
def _eval(node: ast.AST) -> float:
|
|
if isinstance(node, ast.Expression):
|
|
return _eval(node.body)
|
|
if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
|
|
return float(node.value)
|
|
if isinstance(node, ast.BinOp):
|
|
op = _BIN_OPS.get(type(node.op))
|
|
if not op:
|
|
raise ValueError("unsupported operator")
|
|
return float(op(_eval(node.left), _eval(node.right)))
|
|
if isinstance(node, ast.UnaryOp):
|
|
op = _UNARY_OPS.get(type(node.op))
|
|
if not op:
|
|
raise ValueError("unsupported unary operator")
|
|
return float(op(_eval(node.operand)))
|
|
raise ValueError("unsupported expression")
|
|
|
|
return _eval(tree)
|
|
|
|
|
|
def _extract_tool_name(tool_call: Dict[str, Any]) -> str:
|
|
function_payload = tool_call.get("function")
|
|
if isinstance(function_payload, dict):
|
|
return str(function_payload.get("name") or "").strip()
|
|
return ""
|
|
|
|
|
|
def _extract_tool_args(tool_call: Dict[str, Any]) -> Dict[str, Any]:
|
|
function_payload = tool_call.get("function")
|
|
if not isinstance(function_payload, dict):
|
|
return {}
|
|
raw = function_payload.get("arguments")
|
|
if isinstance(raw, dict):
|
|
return raw
|
|
if not isinstance(raw, str):
|
|
return {}
|
|
text = raw.strip()
|
|
if not text:
|
|
return {}
|
|
try:
|
|
import json
|
|
|
|
parsed = json.loads(text)
|
|
return parsed if isinstance(parsed, dict) else {}
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
async def execute_server_tool(tool_call: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Execute a server-side tool and return normalized result payload."""
|
|
call_id = str(tool_call.get("id") or "").strip()
|
|
tool_name = _extract_tool_name(tool_call)
|
|
args = _extract_tool_args(tool_call)
|
|
|
|
if tool_name == "calculator":
|
|
expression = str(args.get("expression") or "").strip()
|
|
if not expression:
|
|
return {
|
|
"tool_call_id": call_id,
|
|
"name": tool_name,
|
|
"output": {"error": "missing expression"},
|
|
"status": {"code": 400, "message": "bad_request"},
|
|
}
|
|
if len(expression) > 200:
|
|
return {
|
|
"tool_call_id": call_id,
|
|
"name": tool_name,
|
|
"output": {"expression": expression, "error": "expression too long"},
|
|
"status": {"code": 422, "message": "invalid_expression"},
|
|
}
|
|
try:
|
|
value = _safe_eval_expr(expression)
|
|
if value.is_integer():
|
|
value = int(value)
|
|
return {
|
|
"tool_call_id": call_id,
|
|
"name": tool_name,
|
|
"output": {"expression": expression, "result": value},
|
|
"status": {"code": 200, "message": "ok"},
|
|
}
|
|
except Exception as exc:
|
|
return {
|
|
"tool_call_id": call_id,
|
|
"name": tool_name,
|
|
"output": {"expression": expression, "error": str(exc)},
|
|
"status": {"code": 422, "message": "invalid_expression"},
|
|
}
|
|
|
|
return {
|
|
"tool_call_id": call_id,
|
|
"name": tool_name or "unknown_tool",
|
|
"output": {"message": "server tool not implemented"},
|
|
"status": {"code": 501, "message": "not_implemented"},
|
|
}
|