diff --git a/backend/models.py b/backend/models.py index 151c653..4a9cd1c 100644 --- a/backend/models.py +++ b/backend/models.py @@ -46,6 +46,9 @@ class AssistantConfig(BaseModel): enableInterrupt: bool = True + # workflow 类型:节点图(nodes/edges)。非 workflow 为空,引擎据此决定是否启用。 + graph: dict = {} + # ---- 运行时连接信息(服务端注入,不来自浏览器) ---- # 为空时,service_factory 会回退到 config.py 的 .env 默认值。 llm_api_key: str = "" diff --git a/backend/services/config_resolver.py b/backend/services/config_resolver.py index 739e90d..b68997d 100644 --- a/backend/services/config_resolver.py +++ b/backend/services/config_resolver.py @@ -66,6 +66,8 @@ async def resolve_runtime_config( prompt=assistant.prompt or "你是一个有帮助的助手。", runtimeMode=assistant.runtime_mode, # type: ignore[arg-type] enableInterrupt=assistant.enable_interrupt, + # workflow 图:仅 workflow 类型非空,引擎据此启用图驱动对话 + graph=(assistant.graph or {}) if assistant.type == "workflow" else {}, # 模型/音色:模型资源中的配置优先 model=str(_value(llm_resource, "modelId", "")), asr=str(_value(stt_resource, "modelId", "")), diff --git a/backend/services/node_specs.py b/backend/services/node_specs.py index 3f18bbd..fa7c359 100644 --- a/backend/services/node_specs.py +++ b/backend/services/node_specs.py @@ -23,7 +23,7 @@ NODE_SPECS: list[dict[str, Any]] = [ "name": "startCall", "displayName": "开始", "category": "call_node", - "description": "工作流入口,每个流程有且仅有一个。播放开场白并进入首个节点。", + "description": "工作流入口,每个流程有且仅有一个。播放开场白,并用自己的提示词进行多轮对话,满足出边条件后流转。", "icon": "Play", "accent": "mint", "addable": False, @@ -31,7 +31,7 @@ NODE_SPECS: list[dict[str, Any]] = [ "fields": [ {"key": "name", "label": "节点名称", "type": "text"}, {"key": "greeting", "label": "开场白", "type": "textarea"}, - {"key": "prompt", "label": "全局提示词", "type": "textarea"}, + {"key": "prompt", "label": "节点提示词", "type": "textarea"}, ], }, { diff --git a/backend/services/pipecat/pipeline.py b/backend/services/pipecat/pipeline.py index bf77bff..807c740 100644 --- a/backend/services/pipecat/pipeline.py +++ b/backend/services/pipecat/pipeline.py @@ -8,10 +8,14 @@ from uuid import uuid4 +import config from loguru import logger from models import AssistantConfig from services.pipecat.service_factory import create_realtime_service, create_services +from services.workflow_engine import WorkflowEngine +from pipecat.adapters.schemas.function_schema import FunctionSchema +from pipecat.adapters.schemas.tools_schema import ToolsSchema from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import ( EndFrame, @@ -209,7 +213,32 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None: stt, llm, tts = create_services(cfg) - context = LLMContext(messages=[{"role": "system", "content": cfg.prompt}]) + # ---- workflow 图引擎(可选)---- + # 有节点图时按图驱动:开场白/系统提示来自起始节点,每轮回复后按条件路由。 + engine = WorkflowEngine(cfg.graph or {}) + workflow_active = engine.has_graph() + wf_state = { + # 开始节点本身就是会话节点(有自己的 prompt,可多轮),从它开始 + "current": engine.start_id if workflow_active else None, + "ended": False, + "turns_in_node": 0, + "end_frame_queued": False, + } + history: list[dict] = [] + # 当前节点没有可调用转移工具(全是空条件)时,才启用文本兜底路由 + FALLBACK_AFTER_TURNS = 2 + + if workflow_active: + greeting = engine.greeting() or cfg.greeting + system_content = engine.system_prompt_for(wf_state["current"]) + logger.info( + f"工作流模式启用: 起始节点={engine.name(wf_state['current'])}" + ) + else: + greeting = cfg.greeting + system_content = cfg.prompt + + context = LLMContext(messages=[{"role": "system", "content": system_content}]) user_aggregator = LLMUserAggregator( context, params=LLMUserAggregatorParams( @@ -267,6 +296,96 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None: greeting_transcript_sent = False pending_text_inputs: list[str] = [] + async def emit_node_active(node_id: str | None) -> None: + """通知前端当前激活的节点,画布据此高亮。""" + if node_id: + await worker.queue_frame( + OutputTransportMessageUrgentFrame( + message={"type": "node-active", "nodeId": node_id} + ) + ) + + def set_system_prompt(text: str) -> None: + """替换上下文里的系统提示(节点切换时整体替换,而非追加)。""" + messages = context.get_messages() + if messages and messages[0].get("role") == "system": + messages[0] = {"role": "system", "content": text} + else: + messages.insert(0, {"role": "system", "content": text}) + + def apply_node(node_id: str | None) -> None: + """进入节点:设置系统提示 + 把出边注册为可调用的转移工具。""" + set_system_prompt(engine.system_prompt_for(node_id)) + if engine.is_end(node_id): + context.set_tools() # 终止节点无工具 + return + schemas = [ + FunctionSchema( + name=engine.edge_fn_name(edge), + description=engine.edge_description(edge), + properties={}, + required=[], + ) + for edge in engine.outgoing(node_id) + ] + if schemas: + context.set_tools(ToolsSchema(standard_tools=schemas)) + else: + context.set_tools() # 无出边:清空工具 + + async def go_to_node(target: str) -> None: + """执行转移:切当前节点、重置计数、点亮画布、设置提示/工具。 + + 结束节点:设 ended 标记,apply_node 会清空工具,模型据结束语提示说完后, + on_assistant_text_end 里排入 EndFrame 挂断,不再多轮。 + """ + wf_state["current"] = target + wf_state["turns_in_node"] = 0 + if engine.is_end(target): + wf_state["ended"] = True + await emit_node_active(target) + apply_node(target) + + def make_transition_handler(target: str): + async def handler(params): + logger.info(f"LLM 触发转移 → {engine.name(target)}") + await go_to_node(target) + # 返回工具结果,pipecat 随即在新节点的提示/工具下继续生成 + await params.result_callback({"status": "ok"}) + + return handler + + async def fallback_route() -> None: + """文本兜底:模型迟迟不调用转移工具时,用一次轻量分类器判断是否转移。""" + if not workflow_active or wf_state["ended"]: + return + if wf_state["turns_in_node"] < FALLBACK_AFTER_TURNS: + return + if not engine.outgoing(wf_state["current"]): + return + target = await engine.route( + wf_state["current"], + history, + api_key=cfg.llm_api_key or config.LLM_API_KEY, + base_url=cfg.llm_base_url or config.LLM_BASE_URL, + model=cfg.model or config.LLM_MODEL, + ) + if target and target != wf_state["current"]: + logger.info(f"文本兜底触发转移 → {engine.name(target)}") + # 仅切换节点提示/工具,下一轮用户输入即在新节点处理 + await go_to_node(target) + + # 把每条边注册成 LLM 可调用的转移函数(按边唯一命名,处理器全局注册一次, + # 由各节点的 context.tools 控制当前可见哪些)。 + if workflow_active: + for edge in engine.edges: + target = edge.get("target") + if target: + llm.register_function( + engine.edge_fn_name(edge), make_transition_handler(target) + ) + apply_node(wf_state["current"]) # 设初始节点的提示与工具 + async def append_user_text_to_context(text: str, *, run_llm: bool) -> None: await worker.queue_frame( LLMMessagesAppendFrame( @@ -277,6 +396,8 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None: @user_aggregator.event_handler("on_user_turn_stopped") async def on_user_turn_stopped(_aggregator, _strategy, message): + if message.content: + history.append({"role": "user", "content": message.content}) await queue_transcript("user", message.content, message.timestamp) @assistant_aggregator.event_handler("on_assistant_text_start") @@ -315,10 +436,26 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None: } ) ) + # 助手把话说完(未被打断)后:累加本节点轮次,必要时走文本兜底路由。 + # 正常情况下转移由 LLM 直接调用转移工具完成(go_to_node),无需这里处理。 + if content and not interrupted and workflow_active: + history.append({"role": "assistant", "content": content}) + if wf_state["ended"]: + # 结束节点:说完结束语后挂断,不再继续多轮对话 + if not wf_state["end_frame_queued"]: + wf_state["end_frame_queued"] = True + logger.info("结束节点结束语已播报,挂断通话") + await worker.queue_frame(EndFrame()) + else: + wf_state["turns_in_node"] += 1 + await fallback_route() + elif content and not interrupted: + history.append({"role": "assistant", "content": content}) @text_input.event_handler("on_text_input") async def on_text_input(_processor, text): pending_text_inputs.append(text) + history.append({"role": "user", "content": text}) # 前端显示不依赖 interruption 后续事件,必须在打断前先排入发送队列。 await queue_transcript("user", text, time_now_iso8601()) @@ -333,21 +470,25 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None: @text_input.event_handler("on_text_append") async def on_text_append(_processor, text): # 静默追加:写进上下文但不打断、不触发推理;transcript 照常上报 + history.append({"role": "user", "content": text}) await queue_transcript("user", text, time_now_iso8601()) await append_user_text_to_context(text, run_llm=False) @text_input.event_handler("on_client_ready") async def on_client_ready(_processor): nonlocal greeting_transcript_sent - if cfg.greeting and not greeting_transcript_sent: + if greeting and not greeting_transcript_sent: greeting_transcript_sent = True - await queue_transcript("assistant", cfg.greeting, time_now_iso8601()) + await queue_transcript("assistant", greeting, time_now_iso8601()) @transport.event_handler("on_client_connected") async def on_client_connected(_transport, _client): - if cfg.greeting: - context.add_message({"role": "assistant", "content": cfg.greeting}) - await worker.queue_frame(TTSSpeakFrame(cfg.greeting, append_to_context=False)) + if greeting: + context.add_message({"role": "assistant", "content": greeting}) + await worker.queue_frame(TTSSpeakFrame(greeting, append_to_context=False)) + # 工作流:点亮当前(开始)节点。开始节点即首个会话节点。 + if workflow_active: + await emit_node_active(wf_state["current"]) @transport.event_handler("on_client_disconnected") async def on_client_disconnected(_transport, _client): diff --git a/backend/services/workflow_engine.py b/backend/services/workflow_engine.py new file mode 100644 index 0000000..bf993eb --- /dev/null +++ b/backend/services/workflow_engine.py @@ -0,0 +1,138 @@ +"""工作流图引擎(第一版)。 + +对应 dograh 的 pipecat_engine.py,极简实现: + - 单个 startCall 入口,开场白来自该节点; + - agentNode 用各自的 prompt 驱动多轮对话; + - 每轮助手回复后,用一次轻量 LLM「路由」判断是否满足某条出边的 condition, + 满足则切换当前节点(linear = 单边;branching = 多边按条件分流); + - 到达 endCall 播放结束语并停止路由。 + +只读图结构,不持有对话状态(当前节点由 pipeline 维护),便于单测。 +""" + +from __future__ import annotations + +import re +from typing import Any + +from loguru import logger + + +class WorkflowEngine: + def __init__(self, graph: dict[str, Any]): + nodes = graph.get("nodes") or [] + self.nodes: dict[str, dict] = {n["id"]: n for n in nodes if n.get("id")} + self.edges: list[dict] = graph.get("edges") or [] + self.start_id: str | None = next( + (nid for nid, n in self.nodes.items() if n.get("type") == "startCall"), + None, + ) + + # ---- 结构查询 ---- + def node_type(self, nid: str | None) -> str | None: + return self.nodes.get(nid or "", {}).get("type") + + def data(self, nid: str | None) -> dict: + return self.nodes.get(nid or "", {}).get("data") or {} + + def name(self, nid: str | None) -> str: + return self.data(nid).get("name") or (self.node_type(nid) or "") + + def outgoing(self, nid: str | None) -> list[dict]: + return [e for e in self.edges if e.get("source") == nid] + + def edge_fn_name(self, edge: dict) -> str: + """每条边对应一个 LLM 函数名(稳定、合法标识符)。""" + raw = edge.get("id") or f"{edge.get('source')}_{edge.get('target')}" + slug = re.sub(r"[^a-z0-9]+", "_", str(raw).lower()).strip("_") + return f"goto_{slug or 'next'}" + + def edge_condition(self, edge: dict) -> str: + return (edge.get("data") or {}).get("condition") or "" + + def edge_description(self, edge: dict) -> str: + """作为转移函数的 description 交给 LLM:满足该条件时模型应调用此函数。""" + cond = self.edge_condition(edge) + target = self.name(edge.get("target")) + if cond: + return f"当满足以下条件时调用以转到节点「{target}」:{cond}" + return f"当当前节点任务完成、应继续推进对话时调用以转到节点「{target}」。" + + def is_end(self, nid: str | None) -> bool: + return self.node_type(nid) == "endCall" + + def has_graph(self) -> bool: + return self.start_id is not None + + def greeting(self) -> str: + return self.data(self.start_id).get("greeting") or "" + + def system_prompt_for(self, nid: str | None) -> str: + """节点系统提示:仅用该节点自己的 prompt(开始节点也是会话节点)。""" + header = f"[当前节点:{self.name(nid)}]" + prompt = str(self.data(nid).get("prompt") or "").strip() + return f"{header}\n{prompt}" if prompt else header + + # ---- 路由:决定下一节点 ---- + async def route( + self, + nid: str | None, + history: list[dict], + *, + api_key: str, + base_url: str, + model: str, + ) -> str | None: + """根据对话历史判断当前节点是否应转移。返回目标节点 id,或 None 表示停留。""" + outs = self.outgoing(nid) + if not outs: + return None + + options = [] + for i, edge in enumerate(outs, 1): + edata = edge.get("data") or {} + cond = edata.get("condition") or "(无明确条件,作为默认后继)" + tgt_name = self.name(edge.get("target")) + options.append(f"{i}. 条件:{cond} → 目标节点:{tgt_name}") + + convo = "\n".join( + f"{m['role']}: {m['content']}" for m in history[-8:] if m.get("content") + ) + system = ( + "你是语音对话的流程路由器。根据最近的对话,判断是否已满足某条转移条件。\n" + "规则:仅当某条件被明确满足时,返回其编号;若都不满足或不确定,返回 0 " + "(停留在当前节点继续对话)。只输出一个数字,不要任何解释。" + ) + user = ( + "可选转移:\n" + + "\n".join(options) + + f"\n\n对话记录:\n{convo}\n\n请只返回编号(0 表示停留):" + ) + + try: + from openai import AsyncOpenAI + + client = AsyncOpenAI(api_key=api_key, base_url=base_url or None) + resp = await client.chat.completions.create( + model=model, + messages=[ + {"role": "system", "content": system}, + {"role": "user", "content": user}, + ], + temperature=0, + max_tokens=5, + ) + text = (resp.choices[0].message.content or "").strip() + except Exception as exc: # noqa: BLE001 - 路由失败不应中断通话 + logger.warning(f"工作流路由调用失败,停留当前节点: {exc}") + return None + + match = re.search(r"\d+", text) + if not match: + return None + idx = int(match.group()) + if idx < 1 or idx > len(outs): + return None + target = outs[idx - 1].get("target") + logger.info(f"工作流路由: {self.name(nid)} → {self.name(target)} (edge {idx})") + return target diff --git a/frontend/src/components/pages/AssistantPage.tsx b/frontend/src/components/pages/AssistantPage.tsx index a009d90..38e44c9 100644 --- a/frontend/src/components/pages/AssistantPage.tsx +++ b/frontend/src/components/pages/AssistantPage.tsx @@ -42,6 +42,7 @@ import { Switch } from "@/components/ui/switch"; import { Sheet, SheetContent, + SheetDescription, SheetHeader, SheetTitle, } from "@/components/ui/sheet"; @@ -372,6 +373,7 @@ export function AssistantPage(props: AssistantPageProps) { allowInterrupt: true, }); const [debugOpen, setDebugOpen] = useState(false); + const [activeNodeId, setActiveNodeId] = useState(null); const loadAssistants = useCallback(async () => { setListLoading(true); @@ -1235,6 +1237,7 @@ export function AssistantPage(props: AssistantPageProps) { onChange={setWorkflowGraph} settings={workflowSettings} onSettingsChange={setWorkflowSettings} + activeNodeId={activeNodeId} modelOptions={{ llm: credOptions("LLM"), asr: credOptions("ASR"), @@ -1243,7 +1246,14 @@ export function AssistantPage(props: AssistantPageProps) { /> - + { + setDebugOpen(open); + if (!open) setActiveNodeId(null); + }} + modal={false} + > 语音调试 + + 与当前助手进行语音对话调试,画布会高亮正在激活的节点。 + - + @@ -1859,9 +1876,11 @@ function SegmentedIconButton({ function DebugDrawer({ assistantId, asSheet = false, + onNodeActive, }: { assistantId: string | null; asSheet?: boolean; + onNodeActive?: (nodeId: string | null) => void; }) { const [showTranscript, setShowTranscript] = useState(false); const [vizStyle, setVizStyle] = useState("aura"); @@ -1915,6 +1934,7 @@ function DebugDrawer({ showTranscript={showTranscript} vizStyle={vizStyle} assistantId={assistantId} + onNodeActive={onNodeActive} /> ); @@ -1924,10 +1944,12 @@ function DebugVoicePanel({ showTranscript, vizStyle, assistantId, + onNodeActive, }: { showTranscript: boolean; vizStyle: VizStyle; assistantId: string | null; + onNodeActive?: (nodeId: string | null) => void; }) { const { status, @@ -1943,7 +1965,7 @@ function DebugVoicePanel({ connect, disconnect, audioRef, - } = useVoicePreview(assistantId); + } = useVoicePreview(assistantId, onNodeActive); // 连接中或已连通都视作"会话进行中" const recording = status === "connecting" || status === "connected"; const [textDraft, setTextDraft] = useState(""); diff --git a/frontend/src/components/workflow/GenericNode.tsx b/frontend/src/components/workflow/GenericNode.tsx index 1723c4b..9a562d1 100644 --- a/frontend/src/components/workflow/GenericNode.tsx +++ b/frontend/src/components/workflow/GenericNode.tsx @@ -10,12 +10,18 @@ import { Pencil, Trash2 } from "lucide-react"; import { useContext } from "react"; import { cn } from "@/lib/utils"; -import { NodeActionContext, NodeSpecsContext } from "./context"; +import { + ActiveNodeContext, + NodeActionContext, + NodeSpecsContext, +} from "./context"; import { accentVar, type WorkflowNodeData } from "./specs"; export function GenericNode({ id, type, data, selected }: NodeProps) { const specs = useContext(NodeSpecsContext); const actions = useContext(NodeActionContext); + const activeNodeId = useContext(ActiveNodeContext); + const isActive = activeNodeId === id; const spec = specs[type as string]; if (!spec) return null; @@ -30,9 +36,11 @@ export function GenericNode({ id, type, data, selected }: NodeProps) { data-node-id={id} className={cn( "group relative w-[228px] rounded-2xl border bg-card p-4 shadow-sm transition-all", - selected - ? "border-primary ring-2 ring-primary/30" - : "border-hairline hover:shadow-md", + isActive + ? "border-success ring-2 ring-success/60" + : selected + ? "border-primary ring-2 ring-primary/30" + : "border-hairline hover:shadow-md", )} > {spec.hasTarget && ( @@ -43,6 +51,13 @@ export function GenericNode({ id, type, data, selected }: NodeProps) { /> )} + {isActive && ( +
+ + 对话中 +
+ )} + {/* 悬停/选中时出现的操作按钮 */}
void; modelOptions: { llm: ModelOption[]; asr: ModelOption[]; tts: ModelOption[] }; + /** 调试通话中当前激活的节点 id(用于高亮)。 */ + activeNodeId?: string | null; }; let nodeSeq = 0; @@ -127,6 +130,7 @@ function Canvas({ settings, onSettingsChange, modelOptions, + activeNodeId, specsByType, }: WorkflowEditorProps & { specsByType: NodeSpecMap }) { const initial = useMemo(() => toFlow(value ?? defaultGraph()), [value]); @@ -252,6 +256,7 @@ function Canvas({ return ( +
@@ -446,6 +451,7 @@ function Canvas({
+
); } diff --git a/frontend/src/components/workflow/context.ts b/frontend/src/components/workflow/context.ts index a7fc7a0..6263d44 100644 --- a/frontend/src/components/workflow/context.ts +++ b/frontend/src/components/workflow/context.ts @@ -21,3 +21,6 @@ const noop: ElementActions = { edit: () => {}, remove: () => {} }; export const NodeActionContext = createContext(noop); export const EdgeActionContext = createContext(noop); + +/** 调试通话中当前激活的节点 id(画布据此高亮),无激活为 null。 */ +export const ActiveNodeContext = createContext(null); diff --git a/frontend/src/hooks/use-voice-preview.ts b/frontend/src/hooks/use-voice-preview.ts index a0bbfd1..a28d493 100644 --- a/frontend/src/hooks/use-voice-preview.ts +++ b/frontend/src/hooks/use-voice-preview.ts @@ -83,7 +83,10 @@ function microphoneErrorMessage(error: unknown): string { return errorMessage(error, "无法访问麦克风。"); } -export function useVoicePreview(assistantId: string | null) { +export function useVoicePreview( + assistantId: string | null, + onNodeActive?: (nodeId: string | null) => void, +) { const [status, setStatus] = useState("idle"); const [error, setError] = useState(null); const [micWarning, setMicWarning] = useState(null); @@ -102,6 +105,11 @@ export function useVoicePreview(assistantId: string | null) { const localStreamRef = useRef(null); const startingRef = useRef(false); const messageSeqRef = useRef(0); + // 工作流激活节点回调存进 ref,避免把它挂进 connect 依赖反复重建连接 + const onNodeActiveRef = useRef(onNodeActive); + useEffect(() => { + onNodeActiveRef.current = onNodeActive; + }, [onNodeActive]); // 枚举可用麦克风。未授权前 label 为空,授权(连接)后再刷新即可拿到名称。 const refreshDevices = useCallback(async () => { @@ -170,6 +178,7 @@ export function useVoicePreview(assistantId: string | null) { localStreamRef.current = null; if (audioRef.current) audioRef.current.srcObject = null; startingRef.current = false; + onNodeActiveRef.current?.(null); }, []); const disconnect = useCallback(() => { @@ -377,6 +386,12 @@ export function useVoicePreview(assistantId: string | null) { sequence: messageSeqRef.current, }; setMessages((prev) => sortMessages([...prev, next])); + } else if ( + msg?.type === "node-active" && + typeof msg.nodeId === "string" + ) { + // 工作流:后端报告当前激活节点,交给画布高亮 + onNodeActiveRef.current?.(msg.nodeId); } } catch { /* 非 JSON / 未知消息,忽略 */