- Introduce edge transition speech functionality in the WorkflowEngine to provide optional speech during node transitions. - Update pipeline execution to utilize the new transition speech feature, enhancing user experience by masking delays during transitions. - Modify frontend components to support transition speech in edge specifications, allowing users to define and edit transition speech for edges. - Refactor edge handling logic in the WorkflowEditor to accommodate the new transition speech field, improving workflow management capabilities.
151 lines
5.8 KiB
Python
151 lines
5.8 KiB
Python
"""工作流图引擎(第一版)。
|
|
|
|
对应 dograh 的 pipecat_engine.py,极简实现:
|
|
- 单个 startCall 入口,开场白来自该节点;
|
|
- agentNode 用各自的 prompt 驱动多轮对话;
|
|
- 每轮助手回复后,用一次轻量 LLM「路由」判断是否满足某条出边的 condition,
|
|
满足则切换当前节点(linear = 单边;branching = 多边按条件分流);
|
|
- 到达 endCall 播放结束语并停止路由。
|
|
|
|
只读图结构,不持有对话状态(当前节点由 pipeline 维护),便于单测。
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from typing import Any
|
|
|
|
from loguru import logger
|
|
|
|
|
|
class WorkflowEngine:
|
|
def __init__(self, graph: dict[str, Any]):
|
|
nodes = graph.get("nodes") or []
|
|
self.nodes: dict[str, dict] = {n["id"]: n for n in nodes if n.get("id")}
|
|
self.edges: list[dict] = graph.get("edges") or []
|
|
self.start_id: str | None = next(
|
|
(nid for nid, n in self.nodes.items() if n.get("type") == "startCall"),
|
|
None,
|
|
)
|
|
|
|
# ---- 结构查询 ----
|
|
def node_type(self, nid: str | None) -> str | None:
|
|
return self.nodes.get(nid or "", {}).get("type")
|
|
|
|
def data(self, nid: str | None) -> dict:
|
|
return self.nodes.get(nid or "", {}).get("data") or {}
|
|
|
|
def name(self, nid: str | None) -> str:
|
|
return self.data(nid).get("name") or (self.node_type(nid) or "")
|
|
|
|
def outgoing(self, nid: str | None) -> list[dict]:
|
|
return [e for e in self.edges if e.get("source") == nid]
|
|
|
|
def edge_fn_name(self, edge: dict) -> str:
|
|
"""每条边对应一个 LLM 函数名(稳定、合法标识符)。"""
|
|
raw = edge.get("id") or f"{edge.get('source')}_{edge.get('target')}"
|
|
slug = re.sub(r"[^a-z0-9]+", "_", str(raw).lower()).strip("_")
|
|
return f"goto_{slug or 'next'}"
|
|
|
|
def edge_condition(self, edge: dict) -> str:
|
|
return (edge.get("data") or {}).get("condition") or ""
|
|
|
|
def edge_transition_speech(self, edge: dict | None) -> str:
|
|
"""命中该边、切换节点瞬间播报的过渡语(可选,掩盖延迟,不写入上下文)。"""
|
|
if not edge:
|
|
return ""
|
|
return (edge.get("data") or {}).get("transition_speech") or ""
|
|
|
|
def find_edge(self, source: str | None, target: str | None) -> dict | None:
|
|
for edge in self.edges:
|
|
if edge.get("source") == source and edge.get("target") == target:
|
|
return edge
|
|
return None
|
|
|
|
def edge_description(self, edge: dict) -> str:
|
|
"""作为转移函数的 description 交给 LLM:满足该条件时模型应调用此函数。"""
|
|
cond = self.edge_condition(edge)
|
|
target = self.name(edge.get("target"))
|
|
if cond:
|
|
return f"当满足以下条件时调用以转到节点「{target}」:{cond}"
|
|
return f"当当前节点任务完成、应继续推进对话时调用以转到节点「{target}」。"
|
|
|
|
def is_end(self, nid: str | None) -> bool:
|
|
return self.node_type(nid) == "endCall"
|
|
|
|
def has_graph(self) -> bool:
|
|
return self.start_id is not None
|
|
|
|
def greeting(self) -> str:
|
|
return self.data(self.start_id).get("greeting") or ""
|
|
|
|
def system_prompt_for(self, nid: str | None) -> str:
|
|
"""节点系统提示:仅用该节点自己的 prompt(开始节点也是会话节点)。"""
|
|
header = f"[当前节点:{self.name(nid)}]"
|
|
prompt = str(self.data(nid).get("prompt") or "").strip()
|
|
return f"{header}\n{prompt}" if prompt else header
|
|
|
|
# ---- 路由:决定下一节点 ----
|
|
async def route(
|
|
self,
|
|
nid: str | None,
|
|
history: list[dict],
|
|
*,
|
|
api_key: str,
|
|
base_url: str,
|
|
model: str,
|
|
) -> str | None:
|
|
"""根据对话历史判断当前节点是否应转移。返回目标节点 id,或 None 表示停留。"""
|
|
outs = self.outgoing(nid)
|
|
if not outs:
|
|
return None
|
|
|
|
options = []
|
|
for i, edge in enumerate(outs, 1):
|
|
edata = edge.get("data") or {}
|
|
cond = edata.get("condition") or "(无明确条件,作为默认后继)"
|
|
tgt_name = self.name(edge.get("target"))
|
|
options.append(f"{i}. 条件:{cond} → 目标节点:{tgt_name}")
|
|
|
|
convo = "\n".join(
|
|
f"{m['role']}: {m['content']}" for m in history[-8:] if m.get("content")
|
|
)
|
|
system = (
|
|
"你是语音对话的流程路由器。根据最近的对话,判断是否已满足某条转移条件。\n"
|
|
"规则:仅当某条件被明确满足时,返回其编号;若都不满足或不确定,返回 0 "
|
|
"(停留在当前节点继续对话)。只输出一个数字,不要任何解释。"
|
|
)
|
|
user = (
|
|
"可选转移:\n"
|
|
+ "\n".join(options)
|
|
+ f"\n\n对话记录:\n{convo}\n\n请只返回编号(0 表示停留):"
|
|
)
|
|
|
|
try:
|
|
from openai import AsyncOpenAI
|
|
|
|
client = AsyncOpenAI(api_key=api_key, base_url=base_url or None)
|
|
resp = await client.chat.completions.create(
|
|
model=model,
|
|
messages=[
|
|
{"role": "system", "content": system},
|
|
{"role": "user", "content": user},
|
|
],
|
|
temperature=0,
|
|
max_tokens=5,
|
|
)
|
|
text = (resp.choices[0].message.content or "").strip()
|
|
except Exception as exc: # noqa: BLE001 - 路由失败不应中断通话
|
|
logger.warning(f"工作流路由调用失败,停留当前节点: {exc}")
|
|
return None
|
|
|
|
match = re.search(r"\d+", text)
|
|
if not match:
|
|
return None
|
|
idx = int(match.group())
|
|
if idx < 1 or idx > len(outs):
|
|
return None
|
|
target = outs[idx - 1].get("target")
|
|
logger.info(f"工作流路由: {self.name(nid)} → {self.name(target)} (edge {idx})")
|
|
return target
|