Files
ai-video-fullstack/backend/services/node_specs.py
Xin Wang c2a39257ff Add workflow editor and node types support in frontend and backend
- Introduce a new workflow editor component for visualizing and managing workflows, allowing users to add nodes and define connections.
- Implement backend support for node types, including validation and constraints for workflow graphs.
- Add new API endpoints for retrieving node types and their specifications.
- Enhance the AssistantPage to integrate the workflow editor, enabling users to create and edit workflows directly.
- Update frontend components to support new workflow functionalities, including condition edges and generic nodes.
- Refactor existing code to accommodate the new workflow features and improve overall structure.
2026-06-15 10:12:41 +08:00

162 lines
5.9 KiB
Python

"""工作流节点规格 + 图校验(对齐 dograh 的 node-spec / GraphConstraints 思路)。
当前实现 3 个核心节点:开始(startCall)/智能体(agentNode)/结束(endCall)。
本模块是「节点类型」的唯一事实源:
- /api/node-types 接口直接吐这里的规格;
- 助手保存时用这里的约束校验 workflow 图。
新增节点类型只需在 NODE_SPECS 里加一条并补充约束。前端 specs.ts 与此保持一致。
"""
from __future__ import annotations
from typing import Any
# 规格版本号:节点定义有破坏性变更时 +1,前端可据此判断是否需要刷新缓存。
SPEC_VERSION = "1"
# 每个节点的图约束。None 表示不限制。
# min_incoming / max_incoming:入边数量
# min_outgoing / max_outgoing:出边数量
NODE_SPECS: list[dict[str, Any]] = [
{
"name": "startCall",
"displayName": "开始",
"category": "call_node",
"description": "工作流入口,每个流程有且仅有一个。播放开场白并进入首个节点。",
"icon": "Play",
"accent": "mint",
"addable": False,
"constraints": {"minIncoming": 0, "maxIncoming": 0},
"fields": [
{"key": "name", "label": "节点名称", "type": "text"},
{"key": "greeting", "label": "开场白", "type": "textarea"},
{"key": "prompt", "label": "全局提示词", "type": "textarea"},
],
},
{
"name": "agentNode",
"displayName": "智能体节点",
"category": "call_node",
"description": "对话处理单元。按提示词与用户多轮交互,可有多个并通过条件边流转。",
"icon": "Bot",
"accent": "sky",
"addable": True,
"constraints": {"minIncoming": 1},
"fields": [
{"key": "name", "label": "节点名称", "type": "text"},
{"key": "prompt", "label": "节点提示词", "type": "textarea", "required": True},
{"key": "allowInterrupt", "label": "允许用户打断", "type": "switch"},
],
},
{
"name": "endCall",
"displayName": "结束",
"category": "call_node",
"description": "终止节点,礼貌结束对话。可有多个,均无出边。",
"icon": "Flag",
"accent": "rose",
"addable": False,
"constraints": {"minIncoming": 1, "minOutgoing": 0, "maxOutgoing": 0},
"fields": [
{"key": "name", "label": "节点名称", "type": "text"},
{"key": "prompt", "label": "结束语提示词", "type": "textarea"},
],
},
]
_SPEC_BY_NAME = {spec["name"]: spec for spec in NODE_SPECS}
def node_types_response() -> dict[str, Any]:
"""/api/node-types 的响应体(camelCase,直接喂前端)。"""
return {"specVersion": SPEC_VERSION, "nodeTypes": NODE_SPECS}
def validate_graph(graph: dict[str, Any]) -> list[str]:
"""校验 workflow 图,返回错误信息列表(空列表 = 通过)。
基础规则(对齐 dograh 的核心不变量):
1. 节点类型必须是已注册类型;
2. 有且仅有一个 startCall;
3. 至少有一个 endCall;
4. 边的 source/target 必须指向存在的节点;
5. 入边/出边数量满足各节点类型的约束。
空图(无节点)视为草稿,直接放行,方便先存后编排。
"""
errors: list[str] = []
nodes = graph.get("nodes") or []
edges = graph.get("edges") or []
if not nodes:
return errors # 草稿:放行
node_ids: set[str] = set()
type_counts: dict[str, int] = {}
node_type_by_id: dict[str, str] = {}
for node in nodes:
node_id = node.get("id")
node_type = node.get("type")
if not node_id:
errors.append("存在缺少 id 的节点")
continue
if node_id in node_ids:
errors.append(f"节点 id 重复:{node_id}")
node_ids.add(node_id)
if node_type not in _SPEC_BY_NAME:
errors.append(f"未知节点类型:{node_type}(节点 {node_id})")
continue
node_type_by_id[node_id] = node_type
type_counts[node_type] = type_counts.get(node_type, 0) + 1
start_count = type_counts.get("startCall", 0)
if start_count == 0:
errors.append("工作流必须有一个「开始」节点")
elif start_count > 1:
errors.append("工作流只能有一个「开始」节点")
if type_counts.get("endCall", 0) == 0:
errors.append("工作流至少需要一个「结束」节点")
# 统计入边/出边
incoming: dict[str, int] = {nid: 0 for nid in node_ids}
outgoing: dict[str, int] = {nid: 0 for nid in node_ids}
for edge in edges:
source = edge.get("source")
target = edge.get("target")
if source not in node_ids:
errors.append(f"连线指向了不存在的源节点:{source}")
continue
if target not in node_ids:
errors.append(f"连线指向了不存在的目标节点:{target}")
continue
outgoing[source] += 1
incoming[target] += 1
for node_id, node_type in node_type_by_id.items():
constraints = _SPEC_BY_NAME[node_type]["constraints"]
name = node_type
_check_count(errors, incoming[node_id], constraints, "Incoming", name, "入边")
_check_count(errors, outgoing[node_id], constraints, "Outgoing", name, "出边")
return errors
def _check_count(
errors: list[str],
actual: int,
constraints: dict[str, int],
suffix: str,
node_type: str,
label: str,
) -> None:
lo = constraints.get(f"min{suffix}")
hi = constraints.get(f"max{suffix}")
display = _SPEC_BY_NAME[node_type]["displayName"]
if lo is not None and actual < lo:
errors.append(f"{display}」节点{label}数量不能少于 {lo}(当前 {actual})")
if hi is not None and actual > hi:
errors.append(f"{display}」节点{label}数量不能多于 {hi}(当前 {actual})")