Enhance workflow engine and integration in backend and frontend

- Introduce a new WorkflowEngine class to manage workflow graphs, enabling dynamic node-based interactions.
- Update AssistantConfig to include a graph field for workflow definitions, allowing for flexible configuration.
- Modify pipeline execution to support workflow-driven dialogue, integrating node transitions and system prompts based on active nodes.
- Enhance frontend components to visualize active nodes and provide debugging capabilities, including highlighting the current node during interactions.
- Refactor existing components to accommodate new workflow functionalities and improve overall user experience.
This commit is contained in:
Xin Wang
2026-06-15 15:32:10 +08:00
parent c2a39257ff
commit aae0342a57
10 changed files with 361 additions and 16 deletions

View File

@@ -46,6 +46,9 @@ class AssistantConfig(BaseModel):
enableInterrupt: bool = True
# workflow 类型:节点图(nodes/edges)。非 workflow 为空,引擎据此决定是否启用。
graph: dict = {}
# ---- 运行时连接信息(服务端注入,不来自浏览器) ----
# 为空时,service_factory 会回退到 config.py 的 .env 默认值。
llm_api_key: str = ""

View File

@@ -66,6 +66,8 @@ async def resolve_runtime_config(
prompt=assistant.prompt or "你是一个有帮助的助手。",
runtimeMode=assistant.runtime_mode, # type: ignore[arg-type]
enableInterrupt=assistant.enable_interrupt,
# workflow 图:仅 workflow 类型非空,引擎据此启用图驱动对话
graph=(assistant.graph or {}) if assistant.type == "workflow" else {},
# 模型/音色:模型资源中的配置优先
model=str(_value(llm_resource, "modelId", "")),
asr=str(_value(stt_resource, "modelId", "")),

View File

@@ -23,7 +23,7 @@ NODE_SPECS: list[dict[str, Any]] = [
"name": "startCall",
"displayName": "开始",
"category": "call_node",
"description": "工作流入口,每个流程有且仅有一个。播放开场白并进入首个节点",
"description": "工作流入口,每个流程有且仅有一个。播放开场白,并用自己的提示词进行多轮对话,满足出边条件后流转",
"icon": "Play",
"accent": "mint",
"addable": False,
@@ -31,7 +31,7 @@ NODE_SPECS: list[dict[str, Any]] = [
"fields": [
{"key": "name", "label": "节点名称", "type": "text"},
{"key": "greeting", "label": "开场白", "type": "textarea"},
{"key": "prompt", "label": "全局提示词", "type": "textarea"},
{"key": "prompt", "label": "节点提示词", "type": "textarea"},
],
},
{

View File

@@ -8,10 +8,14 @@
from uuid import uuid4
import config
from loguru import logger
from models import AssistantConfig
from services.pipecat.service_factory import create_realtime_service, create_services
from services.workflow_engine import WorkflowEngine
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
EndFrame,
@@ -209,7 +213,32 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None:
stt, llm, tts = create_services(cfg)
context = LLMContext(messages=[{"role": "system", "content": cfg.prompt}])
# ---- workflow 图引擎(可选)----
# 有节点图时按图驱动:开场白/系统提示来自起始节点,每轮回复后按条件路由。
engine = WorkflowEngine(cfg.graph or {})
workflow_active = engine.has_graph()
wf_state = {
# 开始节点本身就是会话节点(有自己的 prompt,可多轮),从它开始
"current": engine.start_id if workflow_active else None,
"ended": False,
"turns_in_node": 0,
"end_frame_queued": False,
}
history: list[dict] = []
# 当前节点没有可调用转移工具(全是空条件)时,才启用文本兜底路由
FALLBACK_AFTER_TURNS = 2
if workflow_active:
greeting = engine.greeting() or cfg.greeting
system_content = engine.system_prompt_for(wf_state["current"])
logger.info(
f"工作流模式启用: 起始节点={engine.name(wf_state['current'])}"
)
else:
greeting = cfg.greeting
system_content = cfg.prompt
context = LLMContext(messages=[{"role": "system", "content": system_content}])
user_aggregator = LLMUserAggregator(
context,
params=LLMUserAggregatorParams(
@@ -267,6 +296,96 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None:
greeting_transcript_sent = False
pending_text_inputs: list[str] = []
async def emit_node_active(node_id: str | None) -> None:
"""通知前端当前激活的节点,画布据此高亮。"""
if node_id:
await worker.queue_frame(
OutputTransportMessageUrgentFrame(
message={"type": "node-active", "nodeId": node_id}
)
)
def set_system_prompt(text: str) -> None:
"""替换上下文里的系统提示(节点切换时整体替换,而非追加)。"""
messages = context.get_messages()
if messages and messages[0].get("role") == "system":
messages[0] = {"role": "system", "content": text}
else:
messages.insert(0, {"role": "system", "content": text})
def apply_node(node_id: str | None) -> None:
"""进入节点:设置系统提示 + 把出边注册为可调用的转移工具。"""
set_system_prompt(engine.system_prompt_for(node_id))
if engine.is_end(node_id):
context.set_tools() # 终止节点无工具
return
schemas = [
FunctionSchema(
name=engine.edge_fn_name(edge),
description=engine.edge_description(edge),
properties={},
required=[],
)
for edge in engine.outgoing(node_id)
]
if schemas:
context.set_tools(ToolsSchema(standard_tools=schemas))
else:
context.set_tools() # 无出边:清空工具
async def go_to_node(target: str) -> None:
"""执行转移:切当前节点、重置计数、点亮画布、设置提示/工具。
结束节点:设 ended 标记,apply_node 会清空工具,模型据结束语提示说完后,
on_assistant_text_end 里排入 EndFrame 挂断,不再多轮。
"""
wf_state["current"] = target
wf_state["turns_in_node"] = 0
if engine.is_end(target):
wf_state["ended"] = True
await emit_node_active(target)
apply_node(target)
def make_transition_handler(target: str):
async def handler(params):
logger.info(f"LLM 触发转移 → {engine.name(target)}")
await go_to_node(target)
# 返回工具结果,pipecat 随即在新节点的提示/工具下继续生成
await params.result_callback({"status": "ok"})
return handler
async def fallback_route() -> None:
"""文本兜底:模型迟迟不调用转移工具时,用一次轻量分类器判断是否转移。"""
if not workflow_active or wf_state["ended"]:
return
if wf_state["turns_in_node"] < FALLBACK_AFTER_TURNS:
return
if not engine.outgoing(wf_state["current"]):
return
target = await engine.route(
wf_state["current"],
history,
api_key=cfg.llm_api_key or config.LLM_API_KEY,
base_url=cfg.llm_base_url or config.LLM_BASE_URL,
model=cfg.model or config.LLM_MODEL,
)
if target and target != wf_state["current"]:
logger.info(f"文本兜底触发转移 → {engine.name(target)}")
# 仅切换节点提示/工具,下一轮用户输入即在新节点处理
await go_to_node(target)
# 把每条边注册成 LLM 可调用的转移函数(按边唯一命名,处理器全局注册一次,
# 由各节点的 context.tools 控制当前可见哪些)。
if workflow_active:
for edge in engine.edges:
target = edge.get("target")
if target:
llm.register_function(
engine.edge_fn_name(edge), make_transition_handler(target)
)
apply_node(wf_state["current"]) # 设初始节点的提示与工具
async def append_user_text_to_context(text: str, *, run_llm: bool) -> None:
await worker.queue_frame(
LLMMessagesAppendFrame(
@@ -277,6 +396,8 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None:
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(_aggregator, _strategy, message):
if message.content:
history.append({"role": "user", "content": message.content})
await queue_transcript("user", message.content, message.timestamp)
@assistant_aggregator.event_handler("on_assistant_text_start")
@@ -315,10 +436,26 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None:
}
)
)
# 助手把话说完(未被打断)后:累加本节点轮次,必要时走文本兜底路由。
# 正常情况下转移由 LLM 直接调用转移工具完成(go_to_node),无需这里处理。
if content and not interrupted and workflow_active:
history.append({"role": "assistant", "content": content})
if wf_state["ended"]:
# 结束节点:说完结束语后挂断,不再继续多轮对话
if not wf_state["end_frame_queued"]:
wf_state["end_frame_queued"] = True
logger.info("结束节点结束语已播报,挂断通话")
await worker.queue_frame(EndFrame())
else:
wf_state["turns_in_node"] += 1
await fallback_route()
elif content and not interrupted:
history.append({"role": "assistant", "content": content})
@text_input.event_handler("on_text_input")
async def on_text_input(_processor, text):
pending_text_inputs.append(text)
history.append({"role": "user", "content": text})
# 前端显示不依赖 interruption 后续事件,必须在打断前先排入发送队列。
await queue_transcript("user", text, time_now_iso8601())
@@ -333,21 +470,25 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None:
@text_input.event_handler("on_text_append")
async def on_text_append(_processor, text):
# 静默追加:写进上下文但不打断、不触发推理;transcript 照常上报
history.append({"role": "user", "content": text})
await queue_transcript("user", text, time_now_iso8601())
await append_user_text_to_context(text, run_llm=False)
@text_input.event_handler("on_client_ready")
async def on_client_ready(_processor):
nonlocal greeting_transcript_sent
if cfg.greeting and not greeting_transcript_sent:
if greeting and not greeting_transcript_sent:
greeting_transcript_sent = True
await queue_transcript("assistant", cfg.greeting, time_now_iso8601())
await queue_transcript("assistant", greeting, time_now_iso8601())
@transport.event_handler("on_client_connected")
async def on_client_connected(_transport, _client):
if cfg.greeting:
context.add_message({"role": "assistant", "content": cfg.greeting})
await worker.queue_frame(TTSSpeakFrame(cfg.greeting, append_to_context=False))
if greeting:
context.add_message({"role": "assistant", "content": greeting})
await worker.queue_frame(TTSSpeakFrame(greeting, append_to_context=False))
# 工作流:点亮当前(开始)节点。开始节点即首个会话节点。
if workflow_active:
await emit_node_active(wf_state["current"])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(_transport, _client):

View File

@@ -0,0 +1,138 @@
"""工作流图引擎(第一版)。
对应 dograh 的 pipecat_engine.py,极简实现:
- 单个 startCall 入口,开场白来自该节点;
- agentNode 用各自的 prompt 驱动多轮对话;
- 每轮助手回复后,用一次轻量 LLM「路由」判断是否满足某条出边的 condition,
满足则切换当前节点(linear = 单边;branching = 多边按条件分流);
- 到达 endCall 播放结束语并停止路由。
只读图结构,不持有对话状态(当前节点由 pipeline 维护),便于单测。
"""
from __future__ import annotations
import re
from typing import Any
from loguru import logger
class WorkflowEngine:
def __init__(self, graph: dict[str, Any]):
nodes = graph.get("nodes") or []
self.nodes: dict[str, dict] = {n["id"]: n for n in nodes if n.get("id")}
self.edges: list[dict] = graph.get("edges") or []
self.start_id: str | None = next(
(nid for nid, n in self.nodes.items() if n.get("type") == "startCall"),
None,
)
# ---- 结构查询 ----
def node_type(self, nid: str | None) -> str | None:
return self.nodes.get(nid or "", {}).get("type")
def data(self, nid: str | None) -> dict:
return self.nodes.get(nid or "", {}).get("data") or {}
def name(self, nid: str | None) -> str:
return self.data(nid).get("name") or (self.node_type(nid) or "")
def outgoing(self, nid: str | None) -> list[dict]:
return [e for e in self.edges if e.get("source") == nid]
def edge_fn_name(self, edge: dict) -> str:
"""每条边对应一个 LLM 函数名(稳定、合法标识符)。"""
raw = edge.get("id") or f"{edge.get('source')}_{edge.get('target')}"
slug = re.sub(r"[^a-z0-9]+", "_", str(raw).lower()).strip("_")
return f"goto_{slug or 'next'}"
def edge_condition(self, edge: dict) -> str:
return (edge.get("data") or {}).get("condition") or ""
def edge_description(self, edge: dict) -> str:
"""作为转移函数的 description 交给 LLM:满足该条件时模型应调用此函数。"""
cond = self.edge_condition(edge)
target = self.name(edge.get("target"))
if cond:
return f"当满足以下条件时调用以转到节点「{target}」:{cond}"
return f"当当前节点任务完成、应继续推进对话时调用以转到节点「{target}」。"
def is_end(self, nid: str | None) -> bool:
return self.node_type(nid) == "endCall"
def has_graph(self) -> bool:
return self.start_id is not None
def greeting(self) -> str:
return self.data(self.start_id).get("greeting") or ""
def system_prompt_for(self, nid: str | None) -> str:
"""节点系统提示:仅用该节点自己的 prompt(开始节点也是会话节点)。"""
header = f"[当前节点:{self.name(nid)}]"
prompt = str(self.data(nid).get("prompt") or "").strip()
return f"{header}\n{prompt}" if prompt else header
# ---- 路由:决定下一节点 ----
async def route(
self,
nid: str | None,
history: list[dict],
*,
api_key: str,
base_url: str,
model: str,
) -> str | None:
"""根据对话历史判断当前节点是否应转移。返回目标节点 id,或 None 表示停留。"""
outs = self.outgoing(nid)
if not outs:
return None
options = []
for i, edge in enumerate(outs, 1):
edata = edge.get("data") or {}
cond = edata.get("condition") or "(无明确条件,作为默认后继)"
tgt_name = self.name(edge.get("target"))
options.append(f"{i}. 条件:{cond} → 目标节点:{tgt_name}")
convo = "\n".join(
f"{m['role']}: {m['content']}" for m in history[-8:] if m.get("content")
)
system = (
"你是语音对话的流程路由器。根据最近的对话,判断是否已满足某条转移条件。\n"
"规则:仅当某条件被明确满足时,返回其编号;若都不满足或不确定,返回 0 "
"(停留在当前节点继续对话)。只输出一个数字,不要任何解释。"
)
user = (
"可选转移:\n"
+ "\n".join(options)
+ f"\n\n对话记录:\n{convo}\n\n请只返回编号(0 表示停留):"
)
try:
from openai import AsyncOpenAI
client = AsyncOpenAI(api_key=api_key, base_url=base_url or None)
resp = await client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user},
],
temperature=0,
max_tokens=5,
)
text = (resp.choices[0].message.content or "").strip()
except Exception as exc: # noqa: BLE001 - 路由失败不应中断通话
logger.warning(f"工作流路由调用失败,停留当前节点: {exc}")
return None
match = re.search(r"\d+", text)
if not match:
return None
idx = int(match.group())
if idx < 1 or idx > len(outs):
return None
target = outs[idx - 1].get("target")
logger.info(f"工作流路由: {self.name(nid)}{self.name(target)} (edge {idx})")
return target

View File

@@ -42,6 +42,7 @@ import { Switch } from "@/components/ui/switch";
import {
Sheet,
SheetContent,
SheetDescription,
SheetHeader,
SheetTitle,
} from "@/components/ui/sheet";
@@ -372,6 +373,7 @@ export function AssistantPage(props: AssistantPageProps) {
allowInterrupt: true,
});
const [debugOpen, setDebugOpen] = useState(false);
const [activeNodeId, setActiveNodeId] = useState<string | null>(null);
const loadAssistants = useCallback(async () => {
setListLoading(true);
@@ -1235,6 +1237,7 @@ export function AssistantPage(props: AssistantPageProps) {
onChange={setWorkflowGraph}
settings={workflowSettings}
onSettingsChange={setWorkflowSettings}
activeNodeId={activeNodeId}
modelOptions={{
llm: credOptions("LLM"),
asr: credOptions("ASR"),
@@ -1243,7 +1246,14 @@ export function AssistantPage(props: AssistantPageProps) {
/>
</div>
<Sheet open={debugOpen} onOpenChange={setDebugOpen} modal={false}>
<Sheet
open={debugOpen}
onOpenChange={(open) => {
setDebugOpen(open);
if (!open) setActiveNodeId(null);
}}
modal={false}
>
<SheetContent
side="right"
showOverlay={false}
@@ -1252,8 +1262,15 @@ export function AssistantPage(props: AssistantPageProps) {
>
<SheetHeader className="sr-only">
<SheetTitle></SheetTitle>
<SheetDescription>
,
</SheetDescription>
</SheetHeader>
<DebugDrawer assistantId={editingId} asSheet />
<DebugDrawer
assistantId={editingId}
asSheet
onNodeActive={setActiveNodeId}
/>
</SheetContent>
</Sheet>
</div>
@@ -1859,9 +1876,11 @@ function SegmentedIconButton({
function DebugDrawer({
assistantId,
asSheet = false,
onNodeActive,
}: {
assistantId: string | null;
asSheet?: boolean;
onNodeActive?: (nodeId: string | null) => void;
}) {
const [showTranscript, setShowTranscript] = useState(false);
const [vizStyle, setVizStyle] = useState<VizStyle>("aura");
@@ -1915,6 +1934,7 @@ function DebugDrawer({
showTranscript={showTranscript}
vizStyle={vizStyle}
assistantId={assistantId}
onNodeActive={onNodeActive}
/>
</aside>
);
@@ -1924,10 +1944,12 @@ function DebugVoicePanel({
showTranscript,
vizStyle,
assistantId,
onNodeActive,
}: {
showTranscript: boolean;
vizStyle: VizStyle;
assistantId: string | null;
onNodeActive?: (nodeId: string | null) => void;
}) {
const {
status,
@@ -1943,7 +1965,7 @@ function DebugVoicePanel({
connect,
disconnect,
audioRef,
} = useVoicePreview(assistantId);
} = useVoicePreview(assistantId, onNodeActive);
// 连接中或已连通都视作"会话进行中"
const recording = status === "connecting" || status === "connected";
const [textDraft, setTextDraft] = useState("");

View File

@@ -10,12 +10,18 @@ import { Pencil, Trash2 } from "lucide-react";
import { useContext } from "react";
import { cn } from "@/lib/utils";
import { NodeActionContext, NodeSpecsContext } from "./context";
import {
ActiveNodeContext,
NodeActionContext,
NodeSpecsContext,
} from "./context";
import { accentVar, type WorkflowNodeData } from "./specs";
export function GenericNode({ id, type, data, selected }: NodeProps) {
const specs = useContext(NodeSpecsContext);
const actions = useContext(NodeActionContext);
const activeNodeId = useContext(ActiveNodeContext);
const isActive = activeNodeId === id;
const spec = specs[type as string];
if (!spec) return null;
@@ -30,9 +36,11 @@ export function GenericNode({ id, type, data, selected }: NodeProps) {
data-node-id={id}
className={cn(
"group relative w-[228px] rounded-2xl border bg-card p-4 shadow-sm transition-all",
selected
? "border-primary ring-2 ring-primary/30"
: "border-hairline hover:shadow-md",
isActive
? "border-success ring-2 ring-success/60"
: selected
? "border-primary ring-2 ring-primary/30"
: "border-hairline hover:shadow-md",
)}
>
{spec.hasTarget && (
@@ -43,6 +51,13 @@ export function GenericNode({ id, type, data, selected }: NodeProps) {
/>
)}
{isActive && (
<div className="absolute -top-3 left-3 flex items-center gap-1.5 rounded-full bg-success px-2 py-0.5 text-[10px] font-medium text-on-primary shadow-sm">
<span className="h-1.5 w-1.5 animate-pulse rounded-full bg-on-primary" />
</div>
)}
{/* 悬停/选中时出现的操作按钮 */}
<div
className={cn(

View File

@@ -50,6 +50,7 @@ import { Textarea } from "@/components/ui/textarea";
import { edgeTypes } from "./ConditionEdge";
import {
ActiveNodeContext,
EdgeActionContext,
NodeActionContext,
NodeSpecsContext,
@@ -81,6 +82,8 @@ export type WorkflowEditorProps = {
settings: WorkflowSettings;
onSettingsChange: (settings: WorkflowSettings) => void;
modelOptions: { llm: ModelOption[]; asr: ModelOption[]; tts: ModelOption[] };
/** 调试通话中当前激活的节点 id(用于高亮)。 */
activeNodeId?: string | null;
};
let nodeSeq = 0;
@@ -127,6 +130,7 @@ function Canvas({
settings,
onSettingsChange,
modelOptions,
activeNodeId,
specsByType,
}: WorkflowEditorProps & { specsByType: NodeSpecMap }) {
const initial = useMemo(() => toFlow(value ?? defaultGraph()), [value]);
@@ -252,6 +256,7 @@ function Canvas({
return (
<NodeSpecsContext.Provider value={specsByType}>
<ActiveNodeContext.Provider value={activeNodeId ?? null}>
<NodeActionContext.Provider value={nodeActions}>
<EdgeActionContext.Provider value={edgeActions}>
<div className="h-full w-full overflow-hidden rounded-2xl border border-hairline bg-canvas-soft">
@@ -446,6 +451,7 @@ function Canvas({
</div>
</EdgeActionContext.Provider>
</NodeActionContext.Provider>
</ActiveNodeContext.Provider>
</NodeSpecsContext.Provider>
);
}

View File

@@ -21,3 +21,6 @@ const noop: ElementActions = { edit: () => {}, remove: () => {} };
export const NodeActionContext = createContext<ElementActions>(noop);
export const EdgeActionContext = createContext<ElementActions>(noop);
/** 调试通话中当前激活的节点 id(画布据此高亮),无激活为 null。 */
export const ActiveNodeContext = createContext<string | null>(null);

View File

@@ -83,7 +83,10 @@ function microphoneErrorMessage(error: unknown): string {
return errorMessage(error, "无法访问麦克风。");
}
export function useVoicePreview(assistantId: string | null) {
export function useVoicePreview(
assistantId: string | null,
onNodeActive?: (nodeId: string | null) => void,
) {
const [status, setStatus] = useState<VoicePreviewStatus>("idle");
const [error, setError] = useState<string | null>(null);
const [micWarning, setMicWarning] = useState<string | null>(null);
@@ -102,6 +105,11 @@ export function useVoicePreview(assistantId: string | null) {
const localStreamRef = useRef<MediaStream | null>(null);
const startingRef = useRef(false);
const messageSeqRef = useRef(0);
// 工作流激活节点回调存进 ref,避免把它挂进 connect 依赖反复重建连接
const onNodeActiveRef = useRef(onNodeActive);
useEffect(() => {
onNodeActiveRef.current = onNodeActive;
}, [onNodeActive]);
// 枚举可用麦克风。未授权前 label 为空,授权(连接)后再刷新即可拿到名称。
const refreshDevices = useCallback(async () => {
@@ -170,6 +178,7 @@ export function useVoicePreview(assistantId: string | null) {
localStreamRef.current = null;
if (audioRef.current) audioRef.current.srcObject = null;
startingRef.current = false;
onNodeActiveRef.current?.(null);
}, []);
const disconnect = useCallback(() => {
@@ -377,6 +386,12 @@ export function useVoicePreview(assistantId: string | null) {
sequence: messageSeqRef.current,
};
setMessages((prev) => sortMessages([...prev, next]));
} else if (
msg?.type === "node-active" &&
typeof msg.nodeId === "string"
) {
// 工作流:后端报告当前激活节点,交给画布高亮
onNodeActiveRef.current?.(msg.nodeId);
}
} catch {
/* 非 JSON / 未知消息,忽略 */