"""工作流图引擎(第一版)。 对应 dograh 的 pipecat_engine.py,极简实现: - 单个 startCall 入口,开场白来自该节点; - agentNode 用各自的 prompt 驱动多轮对话; - 每轮助手回复后,用一次轻量 LLM「路由」判断是否满足某条出边的 condition, 满足则切换当前节点(linear = 单边;branching = 多边按条件分流); - 到达 endCall 播放结束语并停止路由。 只读图结构,不持有对话状态(当前节点由 pipeline 维护),便于单测。 """ from __future__ import annotations import re from typing import Any from loguru import logger class WorkflowEngine: def __init__(self, graph: dict[str, Any]): nodes = graph.get("nodes") or [] self.nodes: dict[str, dict] = {n["id"]: n for n in nodes if n.get("id")} self.edges: list[dict] = graph.get("edges") or [] self.start_id: str | None = next( (nid for nid, n in self.nodes.items() if n.get("type") == "startCall"), None, ) # ---- 结构查询 ---- def node_type(self, nid: str | None) -> str | None: return self.nodes.get(nid or "", {}).get("type") def data(self, nid: str | None) -> dict: return self.nodes.get(nid or "", {}).get("data") or {} def name(self, nid: str | None) -> str: return self.data(nid).get("name") or (self.node_type(nid) or "") def outgoing(self, nid: str | None) -> list[dict]: return [e for e in self.edges if e.get("source") == nid] def edge_fn_name(self, edge: dict) -> str: """每条边对应一个 LLM 函数名(稳定、合法标识符)。""" raw = edge.get("id") or f"{edge.get('source')}_{edge.get('target')}" slug = re.sub(r"[^a-z0-9]+", "_", str(raw).lower()).strip("_") return f"goto_{slug or 'next'}" def edge_condition(self, edge: dict) -> str: return (edge.get("data") or {}).get("condition") or "" def edge_transition_speech(self, edge: dict | None) -> str: """命中该边、切换节点瞬间播报的过渡语(可选,掩盖延迟,不写入上下文)。""" if not edge: return "" return (edge.get("data") or {}).get("transition_speech") or "" def find_edge(self, source: str | None, target: str | None) -> dict | None: for edge in self.edges: if edge.get("source") == source and edge.get("target") == target: return edge return None def edge_description(self, edge: dict) -> str: """作为转移函数的 description 交给 LLM:满足该条件时模型应调用此函数。""" cond = self.edge_condition(edge) target = self.name(edge.get("target")) if cond: return f"当满足以下条件时调用以转到节点「{target}」:{cond}" return f"当当前节点任务完成、应继续推进对话时调用以转到节点「{target}」。" def is_end(self, nid: str | None) -> bool: return self.node_type(nid) == "endCall" def has_graph(self) -> bool: return self.start_id is not None def greeting(self) -> str: return self.data(self.start_id).get("greeting") or "" def system_prompt_for(self, nid: str | None) -> str: """节点系统提示:仅用该节点自己的 prompt(开始节点也是会话节点)。""" header = f"[当前节点:{self.name(nid)}]" prompt = str(self.data(nid).get("prompt") or "").strip() return f"{header}\n{prompt}" if prompt else header # ---- 路由:决定下一节点 ---- async def route( self, nid: str | None, history: list[dict], *, api_key: str, base_url: str, model: str, ) -> str | None: """根据对话历史判断当前节点是否应转移。返回目标节点 id,或 None 表示停留。""" outs = self.outgoing(nid) if not outs: return None options = [] for i, edge in enumerate(outs, 1): edata = edge.get("data") or {} cond = edata.get("condition") or "(无明确条件,作为默认后继)" tgt_name = self.name(edge.get("target")) options.append(f"{i}. 条件:{cond} → 目标节点:{tgt_name}") convo = "\n".join( f"{m['role']}: {m['content']}" for m in history[-8:] if m.get("content") ) system = ( "你是语音对话的流程路由器。根据最近的对话,判断是否已满足某条转移条件。\n" "规则:仅当某条件被明确满足时,返回其编号;若都不满足或不确定,返回 0 " "(停留在当前节点继续对话)。只输出一个数字,不要任何解释。" ) user = ( "可选转移:\n" + "\n".join(options) + f"\n\n对话记录:\n{convo}\n\n请只返回编号(0 表示停留):" ) try: from openai import AsyncOpenAI client = AsyncOpenAI(api_key=api_key, base_url=base_url or None) resp = await client.chat.completions.create( model=model, messages=[ {"role": "system", "content": system}, {"role": "user", "content": user}, ], temperature=0, max_tokens=5, ) text = (resp.choices[0].message.content or "").strip() except Exception as exc: # noqa: BLE001 - 路由失败不应中断通话 logger.warning(f"工作流路由调用失败,停留当前节点: {exc}") return None match = re.search(r"\d+", text) if not match: return None idx = int(match.group()) if idx < 1 or idx > len(outs): return None target = outs[idx - 1].get("target") logger.info(f"工作流路由: {self.name(nid)} → {self.name(target)} (edge {idx})") return target