Compare commits

...

5 Commits

Author SHA1 Message Date
Xin Wang
c2ef76620e Enhance workflow engine and frontend components with transition speech support
- Introduce edge transition speech functionality in the WorkflowEngine to provide optional speech during node transitions.
- Update pipeline execution to utilize the new transition speech feature, enhancing user experience by masking delays during transitions.
- Modify frontend components to support transition speech in edge specifications, allowing users to define and edit transition speech for edges.
- Refactor edge handling logic in the WorkflowEditor to accommodate the new transition speech field, improving workflow management capabilities.
2026-06-15 15:57:05 +08:00
Xin Wang
09a5ffbdbc Update node specifications and enhance GenericNode component
- Change the 'addable' property of a specific node type to true, allowing for dynamic addition of nodes.
- Modify the GenericNode component to include a new icon and adjust styles for better visual representation.
- Update node handling logic to prevent deletion of 'startCall' nodes and improve node change handling in the workflow editor.
- Refactor layout and styling in the WorkflowEditor for a more polished user interface.
2026-06-15 15:49:58 +08:00
Xin Wang
aae0342a57 Enhance workflow engine and integration in backend and frontend
- Introduce a new WorkflowEngine class to manage workflow graphs, enabling dynamic node-based interactions.
- Update AssistantConfig to include a graph field for workflow definitions, allowing for flexible configuration.
- Modify pipeline execution to support workflow-driven dialogue, integrating node transitions and system prompts based on active nodes.
- Enhance frontend components to visualize active nodes and provide debugging capabilities, including highlighting the current node during interactions.
- Refactor existing components to accommodate new workflow functionalities and improve overall user experience.
2026-06-15 15:32:10 +08:00
Xin Wang
c2a39257ff Add workflow editor and node types support in frontend and backend
- Introduce a new workflow editor component for visualizing and managing workflows, allowing users to add nodes and define connections.
- Implement backend support for node types, including validation and constraints for workflow graphs.
- Add new API endpoints for retrieving node types and their specifications.
- Enhance the AssistantPage to integrate the workflow editor, enabling users to create and edit workflows directly.
- Update frontend components to support new workflow functionalities, including condition edges and generic nodes.
- Refactor existing code to accommodate the new workflow features and improve overall structure.
2026-06-15 10:12:41 +08:00
Xin Wang
0309c154b5 Implement StepFun Realtime service and enhance AssistantConfig
- Add new fields to AssistantConfig for realtime interface configuration, including types, values, and secrets.
- Introduce StepFunRealtimeService to handle speech-to-speech processing via WebSocket, integrating STT, LLM, and TTS functionalities.
- Refactor pipeline execution to support a new realtime mode, allowing direct text input processing and immediate responses.
- Update model resource testing to include validation for StepFun Realtime connections.
- Enhance service factory to create realtime services based on configuration settings.
- Modify README documentation to reflect new realtime capabilities and usage instructions.
2026-06-14 23:41:40 +08:00
28 changed files with 2867 additions and 81 deletions

View File

@@ -129,6 +129,6 @@ docker compose --profile remote up -d
- [ ] 联调 Pipecat 1.3.0 语音链路与各 OpenAI 兼容服务
- [ ] 起本地 SenseVoice / CosyVoice 的 OpenAI 兼容服务
- [ ] `realtime` 模式(目前只 `pipeline` 级联)
- [x] `realtime` 模式(StepFun StepAudio Realtime)
- [x] 前端 `DebugVoicePanel``/ws/voice`(参考 dograh `useWebSocketRTC.tsx`)
- [ ] 加 DB 后:助手配置入库(目前随请求内联)

View File

@@ -24,6 +24,7 @@ from routes import (
health,
knowledge_bases,
model_registry,
node_types,
voice_webrtc,
voice_ws,
)
@@ -49,6 +50,7 @@ app.include_router(health.router)
app.include_router(assistants.router)
app.include_router(knowledge_bases.router)
app.include_router(model_registry.router)
app.include_router(node_types.router)
app.include_router(voice_webrtc.router)
app.include_router(voice_ws.router)

View File

@@ -15,7 +15,6 @@ VALUES
('model_004', 'SiliconFlow-CosyVoice2-0.5B', 'TTS', 'openai-tts',
'{"modelId":"FunAudioLLM/CosyVoice2-0.5B","apiUrl":"https://api.siliconflow.cn/v1","voice":"FunAudioLLM/CosyVoice2-0.5B:anna","speed":1.0,"sourceSampleRate":24000}',
'{"apiKey":"replace-me"}', TRUE, FALSE),
'{"apiKey":"replace-me"}', TRUE, FALSE),
('model_005', '讯飞语音识别', 'ASR', 'xfyun-asr',
'{"apiUrl":"https://iat-api.xfyun.cn/v2/iat","language":"zh_cn","domain":"iat","accent":"mandarin","dynamicCorrection":false,"frameSize":1280}',
'{"appId":"replace-me","apiKey":"replace-me","apiSecret":"replace-me"}', TRUE, TRUE),
@@ -36,13 +35,9 @@ VALUES
'{"apiKey":"replace-me"}', TRUE, FALSE),
('model_011', 'text-embedding-3', 'Embedding', 'openai-embedding',
'{"modelId":"text-embedding-3-small","apiUrl":"https://api.openai.com/v1/embeddings"}',
'{"apiKey":"replace-me"}', TRUE, FALSE),
('model_012', 'StepAudio 2.5 Realtime', 'Realtime', 'stepfun-realtime',
'{"modelId":"stepaudio-2.5-realtime","apiUrl":"wss://api.stepfun.com/v1/realtime","voice":"linjiajiejie","inputSampleRate":24000,"outputSampleRate":24000,"prefixPaddingMs":500,"silenceDurationMs":300,"energyAwakenessThreshold":2500}',
'{"apiKey":"replace-me"}', TRUE, FALSE)
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
capability = EXCLUDED.capability,
interface_type = EXCLUDED.interface_type,
values = EXCLUDED.values,
secrets = EXCLUDED.secrets,
enabled = EXCLUDED.enabled,
is_default = EXCLUDED.is_default,
updated_at = now();
-- Seed defaults must never overwrite resources configured through the UI.
ON CONFLICT (id) DO NOTHING;

View File

@@ -31,6 +31,9 @@ class AssistantConfig(BaseModel):
stt_language: str = ""
tts_speed: float = 1.0
realtimeModel: str = ""
realtime_interface_type: str = ""
realtime_values: dict = {}
realtime_secrets: dict = {}
llm_interface_type: str = "openai-llm"
stt_interface_type: str = "openai-asr"
tts_interface_type: str = "openai-tts"
@@ -43,6 +46,9 @@ class AssistantConfig(BaseModel):
enableInterrupt: bool = True
# workflow 类型:节点图(nodes/edges)。非 workflow 为空,引擎据此决定是否启用。
graph: dict = {}
# ---- 运行时连接信息(服务端注入,不来自浏览器) ----
# 为空时,service_factory 会回退到 config.py 的 .env 默认值。
llm_api_key: str = ""
@@ -51,6 +57,8 @@ class AssistantConfig(BaseModel):
stt_base_url: str = ""
tts_api_key: str = ""
tts_base_url: str = ""
realtime_api_key: str = ""
realtime_base_url: str = ""
class SignalingOffer(BaseModel):

View File

@@ -7,6 +7,7 @@ from db.session import get_session
from fastapi import APIRouter, Depends, HTTPException
from schemas import AssistantOut, AssistantUpsert
from services.masking import mask, resolve_incoming_key
from services.node_specs import validate_graph
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
@@ -14,6 +15,15 @@ router = APIRouter(prefix="/api/assistants", tags=["assistants"])
CAPABILITIES = ("LLM", "ASR", "TTS", "Realtime", "Embedding")
def _validate_workflow(body: AssistantUpsert) -> None:
"""workflow 类型:保存前校验图结构,不通过则 400。其他类型跳过。"""
if body.type != "workflow":
return
errors = validate_graph(body.graph or {})
if errors:
raise HTTPException(400, "工作流校验失败:" + ";".join(errors))
async def _sync_bindings(
session: AsyncSession, assistant_id: str, resource_ids: dict[str, str]
) -> None:
@@ -82,6 +92,7 @@ async def list_assistants(session: AsyncSession = Depends(get_session)):
async def create_assistant(
body: AssistantUpsert, session: AsyncSession = Depends(get_session)
):
_validate_workflow(body)
data = body.model_dump()
resource_ids = data.pop("model_resource_ids")
assistant = Assistant(id=f"asst_{uuid.uuid4().hex[:12]}", **data)
@@ -141,6 +152,7 @@ async def update_assistant(
assistant = await session.get(Assistant, assistant_id)
if not assistant:
raise HTTPException(404, "助手不存在")
_validate_workflow(body)
data = body.model_dump()
resource_ids = data.pop("model_resource_ids")
data["api_key"] = resolve_incoming_key(data["api_key"], assistant.api_key)

View File

@@ -59,6 +59,15 @@ def _resource_out(row: ModelResource) -> ModelResourceOut:
)
async def _commit_resource(
session: AsyncSession, row: ModelResource
) -> ModelResourceOut:
"""Commit and reload server-generated fields before serializing the row."""
await session.commit()
await session.refresh(row)
return _resource_out(row)
async def _definition(
session: AsyncSession, interface_type: str
) -> InterfaceDefinition:
@@ -141,8 +150,7 @@ async def create_model_resource(
.where(ModelResource.capability == row.capability, ModelResource.id != row.id)
.values(is_default=False)
)
await session.commit()
return _resource_out(row)
return await _commit_resource(session, row)
@router.post("/model-resources/test", response_model=ModelResourceTestResult)
@@ -196,8 +204,7 @@ async def duplicate_model_resource(
is_default=False,
)
session.add(row)
await session.commit()
return _resource_out(row)
return await _commit_resource(session, row)
@router.put("/model-resources/{resource_id}", response_model=ModelResourceOut)
@@ -224,8 +231,7 @@ async def update_model_resource(
.where(ModelResource.capability == row.capability, ModelResource.id != row.id)
.values(is_default=False)
)
await session.commit()
return _resource_out(row)
return await _commit_resource(session, row)
@router.delete("/model-resources/{resource_id}")

View File

@@ -0,0 +1,14 @@
"""工作流节点类型目录。前端画布据此渲染「添加节点」面板与节点编辑表单。
规格是只读的、与代码同生命周期(改了要重启后端 + 前端刷新),所以无需鉴权与缓存层。
"""
from fastapi import APIRouter
from services.node_specs import node_types_response
router = APIRouter(prefix="/api/node-types", tags=["workflow"])
@router.get("")
async def list_node_types():
return node_types_response()

View File

@@ -58,7 +58,9 @@ async def voice_signaling(websocket: WebSocket):
except Exception as e:
logger.error(f"WebRTC 信令出错: {e}")
finally:
for pc in peers.values():
# disconnect() triggers the registered closed callback, which removes
# the peer from this dict. Iterate over a snapshot to avoid mutation.
for pc in list(peers.values()):
await pc.disconnect()

View File

@@ -66,6 +66,8 @@ async def resolve_runtime_config(
prompt=assistant.prompt or "你是一个有帮助的助手。",
runtimeMode=assistant.runtime_mode, # type: ignore[arg-type]
enableInterrupt=assistant.enable_interrupt,
# workflow 图:仅 workflow 类型非空,引擎据此启用图驱动对话
graph=(assistant.graph or {}) if assistant.type == "workflow" else {},
# 模型/音色:模型资源中的配置优先
model=str(_value(llm_resource, "modelId", "")),
asr=str(_value(stt_resource, "modelId", "")),
@@ -83,6 +85,11 @@ async def resolve_runtime_config(
stt_secrets=(stt_resource.secrets or {}) if stt_resource else {},
tts_values=(tts_resource.values or {}) if tts_resource else {},
tts_secrets=(tts_resource.secrets or {}) if tts_resource else {},
realtime_interface_type=(
realtime_resource.interface_type if realtime_resource else ""
),
realtime_values=(realtime_resource.values or {}) if realtime_resource else {},
realtime_secrets=(realtime_resource.secrets or {}) if realtime_resource else {},
# 运行时连接信息(真 key + url):模型资源优先,否则 .env 兜底
llm_api_key=_secret(llm_resource, "apiKey", config.LLM_API_KEY),
llm_base_url=str(_value(llm_resource, "apiUrl", config.LLM_BASE_URL)),
@@ -90,4 +97,6 @@ async def resolve_runtime_config(
stt_base_url=str(_value(stt_resource, "apiUrl", config.STT_BASE_URL)),
tts_api_key=_secret(tts_resource, "apiKey", config.TTS_API_KEY),
tts_base_url=str(_value(tts_resource, "apiUrl", config.TTS_BASE_URL)),
realtime_api_key=_secret(realtime_resource, "apiKey", ""),
realtime_base_url=str(_value(realtime_resource, "apiUrl", "")),
)

View File

@@ -78,6 +78,25 @@ INTERFACE_DEFINITIONS: list[dict] = [
"capability": "Realtime",
"fields": OPENAI_COMMON + [field("voice", "Voice")],
},
{
"interface_type": "stepfun-realtime",
"name": "StepFun StepAudio Realtime",
"capability": "Realtime",
"fields": OPENAI_COMMON
+ [
field("voice", "Voice", required=True, default="linjiajiejie"),
field("inputSampleRate", "Input Sample Rate", type_="number", default=24000),
field("outputSampleRate", "Output Sample Rate", type_="number", default=24000),
field("prefixPaddingMs", "VAD Prefix Padding (ms)", type_="number", default=500),
field("silenceDurationMs", "VAD Silence Duration (ms)", type_="number", default=300),
field(
"energyAwakenessThreshold",
"VAD Energy Threshold",
type_="number",
default=2500,
),
],
},
{
"interface_type": "xfyun-asr",
"name": "Xfyun Streaming ASR",

View File

@@ -2,11 +2,15 @@
from __future__ import annotations
import asyncio
import io
import json
import time
import wave
from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit
import httpx
from websockets.asyncio.client import connect as websocket_connect
import config
from schemas import ModelResourceTestResult
@@ -57,6 +61,8 @@ async def test_model_resource(
message="讯飞连接参数有效",
detail="鉴权字段和连接参数完整,请在语音测试页验证签名及音频链路",
)
if interface_type == "stepfun-realtime":
return await _test_stepfun_realtime(values, secrets)
if capability == "Realtime":
return ModelResourceTestResult(
ok=False,
@@ -150,3 +156,61 @@ async def test_model_resource(
message="无法连接到模型服务",
detail=str(exc)[:300],
)
async def _test_stepfun_realtime(
values: dict, secrets: dict
) -> ModelResourceTestResult:
api_url = str(values.get("apiUrl") or "")
model_id = str(values.get("modelId") or "")
api_key = str(secrets.get("apiKey") or "")
parts = urlsplit(api_url)
query = dict(parse_qsl(parts.query))
query["model"] = model_id
url = urlunsplit(
(parts.scheme, parts.netloc, parts.path, urlencode(query), parts.fragment)
)
started = time.perf_counter()
try:
async with websocket_connect(
url,
additional_headers={"Authorization": f"Bearer {api_key}"},
open_timeout=TEST_TIMEOUT_SECONDS,
close_timeout=2,
) as websocket:
raw_message = await asyncio.wait_for(
websocket.recv(), timeout=TEST_TIMEOUT_SECONDS
)
event = json.loads(raw_message)
if event.get("type") != "session.created":
return ModelResourceTestResult(
ok=False,
latency_ms=round((time.perf_counter() - started) * 1000),
message="Realtime 连接返回了意外事件",
detail=str(event.get("type") or event)[:300],
)
return ModelResourceTestResult(
ok=True,
latency_ms=round((time.perf_counter() - started) * 1000),
message="Realtime 连接成功",
detail="StepFun 返回 session.created",
)
except TimeoutError:
return ModelResourceTestResult(
ok=False,
latency_ms=round((time.perf_counter() - started) * 1000),
message="Realtime 连接超时",
detail=f"服务未在 {TEST_TIMEOUT_SECONDS:g} 秒内创建 session",
)
except Exception as exc:
detail = str(exc)
for secret in secrets.values():
if secret:
detail = detail.replace(str(secret), "***")
return ModelResourceTestResult(
ok=False,
latency_ms=round((time.perf_counter() - started) * 1000),
message="无法连接到 StepFun Realtime",
detail=detail[:300],
)

View File

@@ -0,0 +1,161 @@
"""工作流节点规格 + 图校验(对齐 dograh 的 node-spec / GraphConstraints 思路)。
当前实现 3 个核心节点:开始(startCall)/智能体(agentNode)/结束(endCall)。
本模块是「节点类型」的唯一事实源:
- /api/node-types 接口直接吐这里的规格;
- 助手保存时用这里的约束校验 workflow 图。
新增节点类型只需在 NODE_SPECS 里加一条并补充约束。前端 specs.ts 与此保持一致。
"""
from __future__ import annotations
from typing import Any
# 规格版本号:节点定义有破坏性变更时 +1,前端可据此判断是否需要刷新缓存。
SPEC_VERSION = "1"
# 每个节点的图约束。None 表示不限制。
# min_incoming / max_incoming:入边数量
# min_outgoing / max_outgoing:出边数量
NODE_SPECS: list[dict[str, Any]] = [
{
"name": "startCall",
"displayName": "开始",
"category": "call_node",
"description": "工作流入口,每个流程有且仅有一个。播放开场白,并用自己的提示词进行多轮对话,满足出边条件后流转。",
"icon": "Play",
"accent": "mint",
"addable": False,
"constraints": {"minIncoming": 0, "maxIncoming": 0},
"fields": [
{"key": "name", "label": "节点名称", "type": "text"},
{"key": "greeting", "label": "开场白", "type": "textarea"},
{"key": "prompt", "label": "节点提示词", "type": "textarea"},
],
},
{
"name": "agentNode",
"displayName": "智能体节点",
"category": "call_node",
"description": "对话处理单元。按提示词与用户多轮交互,可有多个并通过条件边流转。",
"icon": "Bot",
"accent": "sky",
"addable": True,
"constraints": {"minIncoming": 1},
"fields": [
{"key": "name", "label": "节点名称", "type": "text"},
{"key": "prompt", "label": "节点提示词", "type": "textarea", "required": True},
{"key": "allowInterrupt", "label": "允许用户打断", "type": "switch"},
],
},
{
"name": "endCall",
"displayName": "结束",
"category": "call_node",
"description": "终止节点,礼貌结束对话。可有多个,均无出边。",
"icon": "Flag",
"accent": "rose",
"addable": True,
"constraints": {"minIncoming": 1, "minOutgoing": 0, "maxOutgoing": 0},
"fields": [
{"key": "name", "label": "节点名称", "type": "text"},
{"key": "prompt", "label": "结束语提示词", "type": "textarea"},
],
},
]
_SPEC_BY_NAME = {spec["name"]: spec for spec in NODE_SPECS}
def node_types_response() -> dict[str, Any]:
"""/api/node-types 的响应体(camelCase,直接喂前端)。"""
return {"specVersion": SPEC_VERSION, "nodeTypes": NODE_SPECS}
def validate_graph(graph: dict[str, Any]) -> list[str]:
"""校验 workflow 图,返回错误信息列表(空列表 = 通过)。
基础规则(对齐 dograh 的核心不变量):
1. 节点类型必须是已注册类型;
2. 有且仅有一个 startCall;
3. 至少有一个 endCall;
4. 边的 source/target 必须指向存在的节点;
5. 入边/出边数量满足各节点类型的约束。
空图(无节点)视为草稿,直接放行,方便先存后编排。
"""
errors: list[str] = []
nodes = graph.get("nodes") or []
edges = graph.get("edges") or []
if not nodes:
return errors # 草稿:放行
node_ids: set[str] = set()
type_counts: dict[str, int] = {}
node_type_by_id: dict[str, str] = {}
for node in nodes:
node_id = node.get("id")
node_type = node.get("type")
if not node_id:
errors.append("存在缺少 id 的节点")
continue
if node_id in node_ids:
errors.append(f"节点 id 重复:{node_id}")
node_ids.add(node_id)
if node_type not in _SPEC_BY_NAME:
errors.append(f"未知节点类型:{node_type}(节点 {node_id})")
continue
node_type_by_id[node_id] = node_type
type_counts[node_type] = type_counts.get(node_type, 0) + 1
start_count = type_counts.get("startCall", 0)
if start_count == 0:
errors.append("工作流必须有一个「开始」节点")
elif start_count > 1:
errors.append("工作流只能有一个「开始」节点")
if type_counts.get("endCall", 0) == 0:
errors.append("工作流至少需要一个「结束」节点")
# 统计入边/出边
incoming: dict[str, int] = {nid: 0 for nid in node_ids}
outgoing: dict[str, int] = {nid: 0 for nid in node_ids}
for edge in edges:
source = edge.get("source")
target = edge.get("target")
if source not in node_ids:
errors.append(f"连线指向了不存在的源节点:{source}")
continue
if target not in node_ids:
errors.append(f"连线指向了不存在的目标节点:{target}")
continue
outgoing[source] += 1
incoming[target] += 1
for node_id, node_type in node_type_by_id.items():
constraints = _SPEC_BY_NAME[node_type]["constraints"]
name = node_type
_check_count(errors, incoming[node_id], constraints, "Incoming", name, "入边")
_check_count(errors, outgoing[node_id], constraints, "Outgoing", name, "出边")
return errors
def _check_count(
errors: list[str],
actual: int,
constraints: dict[str, int],
suffix: str,
node_type: str,
label: str,
) -> None:
lo = constraints.get(f"min{suffix}")
hi = constraints.get(f"max{suffix}")
display = _SPEC_BY_NAME[node_type]["displayName"]
if lo is not None and actual < lo:
errors.append(f"{display}」节点{label}数量不能少于 {lo}(当前 {actual})")
if hi is not None and actual > hi:
errors.append(f"{display}」节点{label}数量不能多于 {hi}(当前 {actual})")

View File

@@ -8,10 +8,14 @@
from uuid import uuid4
import config
from loguru import logger
from models import AssistantConfig
from services.pipecat.service_factory import create_services
from services.pipecat.service_factory import create_realtime_service, create_services
from services.workflow_engine import WorkflowEngine
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
EndFrame,
@@ -106,6 +110,33 @@ class TextInputProcessor(FrameProcessor):
await self._call_event_handler("on_text_append", text)
class RealtimeTextInputProcessor(FrameProcessor):
"""Route text input directly to a realtime service without cascade semantics."""
def __init__(self):
super().__init__()
self._register_event_handler("on_text_input")
self._register_event_handler("on_text_append")
async def process_frame(self, frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if not isinstance(frame, InputTransportMessageFrame):
await self.push_frame(frame, direction)
return
parsed = _text_input(frame.message)
if not parsed:
await self.push_frame(frame, direction)
return
text, run_immediately = parsed
await self._call_event_handler(
"on_text_input" if run_immediately else "on_text_append",
text,
)
class PassthroughLLMAssistantAggregator(LLMAssistantAggregator):
"""聚合 LLM 回复进上下文,同时继续把回复帧交给下游 TTS。"""
@@ -176,9 +207,38 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None:
"""
logger.info(f"启动管线: assistant={cfg.name} mode={cfg.runtimeMode}")
if cfg.runtimeMode == "realtime":
await run_realtime_pipeline(transport, cfg)
return
stt, llm, tts = create_services(cfg)
context = LLMContext(messages=[{"role": "system", "content": cfg.prompt}])
# ---- workflow 图引擎(可选)----
# 有节点图时按图驱动:开场白/系统提示来自起始节点,每轮回复后按条件路由。
engine = WorkflowEngine(cfg.graph or {})
workflow_active = engine.has_graph()
wf_state = {
# 开始节点本身就是会话节点(有自己的 prompt,可多轮),从它开始
"current": engine.start_id if workflow_active else None,
"ended": False,
"turns_in_node": 0,
"end_frame_queued": False,
}
history: list[dict] = []
# 当前节点没有可调用转移工具(全是空条件)时,才启用文本兜底路由
FALLBACK_AFTER_TURNS = 2
if workflow_active:
greeting = engine.greeting() or cfg.greeting
system_content = engine.system_prompt_for(wf_state["current"])
logger.info(
f"工作流模式启用: 起始节点={engine.name(wf_state['current'])}"
)
else:
greeting = cfg.greeting
system_content = cfg.prompt
context = LLMContext(messages=[{"role": "system", "content": system_content}])
user_aggregator = LLMUserAggregator(
context,
params=LLMUserAggregatorParams(
@@ -236,6 +296,105 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None:
greeting_transcript_sent = False
pending_text_inputs: list[str] = []
async def emit_node_active(node_id: str | None) -> None:
"""通知前端当前激活的节点,画布据此高亮。"""
if node_id:
await worker.queue_frame(
OutputTransportMessageUrgentFrame(
message={"type": "node-active", "nodeId": node_id}
)
)
def set_system_prompt(text: str) -> None:
"""替换上下文里的系统提示(节点切换时整体替换,而非追加)。"""
messages = context.get_messages()
if messages and messages[0].get("role") == "system":
messages[0] = {"role": "system", "content": text}
else:
messages.insert(0, {"role": "system", "content": text})
def apply_node(node_id: str | None) -> None:
"""进入节点:设置系统提示 + 把出边注册为可调用的转移工具。"""
set_system_prompt(engine.system_prompt_for(node_id))
if engine.is_end(node_id):
context.set_tools() # 终止节点无工具
return
schemas = [
FunctionSchema(
name=engine.edge_fn_name(edge),
description=engine.edge_description(edge),
properties={},
required=[],
)
for edge in engine.outgoing(node_id)
]
if schemas:
context.set_tools(ToolsSchema(standard_tools=schemas))
else:
context.set_tools() # 无出边:清空工具
async def go_to_node(target: str) -> None:
"""执行转移:切当前节点、重置计数、点亮画布、设置提示/工具。
结束节点:设 ended 标记,apply_node 会清空工具,模型据结束语提示说完后,
on_assistant_text_end 里排入 EndFrame 挂断,不再多轮。
"""
wf_state["current"] = target
wf_state["turns_in_node"] = 0
if engine.is_end(target):
wf_state["ended"] = True
await emit_node_active(target)
apply_node(target)
async def speak_transition(edge: dict | None) -> None:
"""切换瞬间播报过渡语(可选),掩盖切节点/新一轮生成的延迟。不写入上下文。"""
speech = engine.edge_transition_speech(edge)
if speech:
await worker.queue_frame(TTSSpeakFrame(speech, append_to_context=False))
def make_transition_handler(edge: dict):
target = edge.get("target")
async def handler(params):
logger.info(f"LLM 触发转移 → {engine.name(target)}")
await speak_transition(edge)
await go_to_node(target)
# 返回工具结果,pipecat 随即在新节点的提示/工具下继续生成
await params.result_callback({"status": "ok"})
return handler
async def fallback_route() -> None:
"""文本兜底:模型迟迟不调用转移工具时,用一次轻量分类器判断是否转移。"""
if not workflow_active or wf_state["ended"]:
return
if wf_state["turns_in_node"] < FALLBACK_AFTER_TURNS:
return
if not engine.outgoing(wf_state["current"]):
return
target = await engine.route(
wf_state["current"],
history,
api_key=cfg.llm_api_key or config.LLM_API_KEY,
base_url=cfg.llm_base_url or config.LLM_BASE_URL,
model=cfg.model or config.LLM_MODEL,
)
if target and target != wf_state["current"]:
logger.info(f"文本兜底触发转移 → {engine.name(target)}")
await speak_transition(engine.find_edge(wf_state["current"], target))
# 仅切换节点提示/工具,下一轮用户输入即在新节点处理
await go_to_node(target)
# 把每条边注册成 LLM 可调用的转移函数(按边唯一命名,处理器全局注册一次,
# 由各节点的 context.tools 控制当前可见哪些)。
if workflow_active:
for edge in engine.edges:
if edge.get("target"):
llm.register_function(
engine.edge_fn_name(edge), make_transition_handler(edge)
)
apply_node(wf_state["current"]) # 设初始节点的提示与工具
async def append_user_text_to_context(text: str, *, run_llm: bool) -> None:
await worker.queue_frame(
LLMMessagesAppendFrame(
@@ -246,6 +405,8 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None:
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(_aggregator, _strategy, message):
if message.content:
history.append({"role": "user", "content": message.content})
await queue_transcript("user", message.content, message.timestamp)
@assistant_aggregator.event_handler("on_assistant_text_start")
@@ -284,10 +445,26 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None:
}
)
)
# 助手把话说完(未被打断)后:累加本节点轮次,必要时走文本兜底路由。
# 正常情况下转移由 LLM 直接调用转移工具完成(go_to_node),无需这里处理。
if content and not interrupted and workflow_active:
history.append({"role": "assistant", "content": content})
if wf_state["ended"]:
# 结束节点:说完结束语后挂断,不再继续多轮对话
if not wf_state["end_frame_queued"]:
wf_state["end_frame_queued"] = True
logger.info("结束节点结束语已播报,挂断通话")
await worker.queue_frame(EndFrame())
else:
wf_state["turns_in_node"] += 1
await fallback_route()
elif content and not interrupted:
history.append({"role": "assistant", "content": content})
@text_input.event_handler("on_text_input")
async def on_text_input(_processor, text):
pending_text_inputs.append(text)
history.append({"role": "user", "content": text})
# 前端显示不依赖 interruption 后续事件,必须在打断前先排入发送队列。
await queue_transcript("user", text, time_now_iso8601())
@@ -302,21 +479,25 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None:
@text_input.event_handler("on_text_append")
async def on_text_append(_processor, text):
# 静默追加:写进上下文但不打断、不触发推理;transcript 照常上报
history.append({"role": "user", "content": text})
await queue_transcript("user", text, time_now_iso8601())
await append_user_text_to_context(text, run_llm=False)
@text_input.event_handler("on_client_ready")
async def on_client_ready(_processor):
nonlocal greeting_transcript_sent
if cfg.greeting and not greeting_transcript_sent:
if greeting and not greeting_transcript_sent:
greeting_transcript_sent = True
await queue_transcript("assistant", cfg.greeting, time_now_iso8601())
await queue_transcript("assistant", greeting, time_now_iso8601())
@transport.event_handler("on_client_connected")
async def on_client_connected(_transport, _client):
if cfg.greeting:
context.add_message({"role": "assistant", "content": cfg.greeting})
await worker.queue_frame(TTSSpeakFrame(cfg.greeting, append_to_context=False))
if greeting:
context.add_message({"role": "assistant", "content": greeting})
await worker.queue_frame(TTSSpeakFrame(greeting, append_to_context=False))
# 工作流:点亮当前(开始)节点。开始节点即首个会话节点。
if workflow_active:
await emit_node_active(wf_state["current"])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(_transport, _client):
@@ -327,3 +508,70 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None:
await runner.add_workers(worker)
await runner.run()
logger.info("管线已结束")
async def run_realtime_pipeline(transport, cfg: AssistantConfig) -> None:
"""Run a speech-to-speech model that owns ASR, reasoning, and synthesis."""
realtime = create_realtime_service(cfg)
text_input = RealtimeTextInputProcessor()
pipeline = Pipeline(
[
transport.input(),
text_input,
realtime,
transport.output(),
]
)
worker = PipelineWorker(
pipeline,
params=PipelineParams(
enable_metrics=False,
audio_in_sample_rate=int(
cfg.realtime_values.get("inputSampleRate") or 24000
),
audio_out_sample_rate=int(
cfg.realtime_values.get("outputSampleRate") or 24000
),
),
enable_rtvi=False,
)
async def queue_transcript(role: str, content: str) -> None:
if content:
await worker.queue_frame(
OutputTransportMessageUrgentFrame(
message={
"type": "transcript",
"role": role,
"content": content,
"timestamp": time_now_iso8601(),
},
)
)
@text_input.event_handler("on_text_input")
async def on_text_input(_processor, text):
await queue_transcript("user", text)
await realtime.interrupt()
await realtime.send_text(text, run_immediately=True)
@text_input.event_handler("on_text_append")
async def on_text_append(_processor, text):
await queue_transcript("user", text)
await realtime.send_text(text, run_immediately=False)
@transport.event_handler("on_client_connected")
async def on_client_connected(_transport, _client):
if cfg.greeting:
await realtime.speak(cfg.greeting)
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(_transport, _client):
logger.info("Realtime 对端断开,结束管线")
await worker.queue_frame(EndFrame())
runner = WorkerRunner(handle_sigint=False)
await runner.add_workers(worker)
await runner.run()
logger.info("Realtime 管线已结束")

View File

@@ -133,3 +133,33 @@ def create_services(cfg: AssistantConfig):
f"voice={cfg.voice or config.TTS_VOICE}"
)
return create_stt(cfg), create_llm(cfg), create_tts(cfg)
def create_realtime_service(cfg: AssistantConfig):
"""Create a speech-to-speech service that owns STT, LLM, and TTS."""
if cfg.realtime_interface_type == "stepfun-realtime":
from services.pipecat.stepfun_realtime import StepFunRealtimeService
return StepFunRealtimeService(
api_key=cfg.realtime_api_key,
model=cfg.realtimeModel,
base_url=cfg.realtime_base_url,
instructions=cfg.prompt,
voice=str(cfg.realtime_values.get("voice") or "linjiajiejie"),
input_sample_rate=int(
cfg.realtime_values.get("inputSampleRate") or 24000
),
output_sample_rate=int(
cfg.realtime_values.get("outputSampleRate") or 24000
),
prefix_padding_ms=int(
cfg.realtime_values.get("prefixPaddingMs") or 500
),
silence_duration_ms=int(
cfg.realtime_values.get("silenceDurationMs") or 300
),
energy_awakeness_threshold=int(
cfg.realtime_values.get("energyAwakenessThreshold") or 2500
),
)
raise ValueError(f"不支持的 Realtime 接口类型: {cfg.realtime_interface_type}")

View File

@@ -0,0 +1,367 @@
"""StepFun StepAudio realtime speech-to-speech Pipecat service."""
from __future__ import annotations
import asyncio
import base64
import json
from typing import Any
from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit
from uuid import uuid4
from loguru import logger
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
InputAudioRawFrame,
InterruptionFrame,
LLMMessagesAppendFrame,
OutputTransportMessageUrgentFrame,
StartFrame,
TTSAudioRawFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_service import AIService
from pipecat.services.settings import ServiceSettings
from pipecat.utils.time import time_now_iso8601
from websockets.asyncio.client import connect as websocket_connect
from websockets.protocol import State
DEFAULT_STEPFUN_REALTIME_URL = "wss://api.stepfun.com/v1/realtime"
class StepFunRealtimeService(AIService):
"""Bridge Pipecat audio frames to StepFun's Realtime WebSocket events."""
def __init__(
self,
*,
api_key: str,
model: str,
base_url: str = DEFAULT_STEPFUN_REALTIME_URL,
instructions: str = "",
voice: str = "linjiajiejie",
input_sample_rate: int = 24000,
output_sample_rate: int = 24000,
prefix_padding_ms: int = 500,
silence_duration_ms: int = 300,
energy_awakeness_threshold: int = 2500,
**kwargs,
) -> None:
super().__init__(settings=ServiceSettings(model=model), **kwargs)
self._api_key = api_key
self._model = model
self._base_url = base_url or DEFAULT_STEPFUN_REALTIME_URL
self._instructions = instructions
self._voice = voice
self._input_sample_rate = input_sample_rate
self._output_sample_rate = output_sample_rate
self._prefix_padding_ms = prefix_padding_ms
self._silence_duration_ms = silence_duration_ms
self._energy_awakeness_threshold = energy_awakeness_threshold
self._warned_input_sample_rate = False
self._websocket = None
self._receive_task: asyncio.Task | None = None
self._session_ready = asyncio.Event()
self._pending_events: list[dict[str, Any]] = []
self._assistant_turn_id: str | None = None
self._assistant_text = ""
self._assistant_timestamp = ""
async def start(self, frame: StartFrame) -> None:
await super().start(frame)
if not self._api_key or not self._model:
await self.push_error(
"StepFun Realtime requires api_key and model", fatal=True
)
return
await self._connect()
async def stop(self, frame: EndFrame) -> None:
await self._disconnect()
await super().stop(frame)
async def cancel(self, frame: CancelFrame) -> None:
await self._disconnect()
await super().cancel(frame)
async def cleanup(self) -> None:
await self._disconnect()
await super().cleanup()
async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
await super().process_frame(frame, direction)
if isinstance(frame, InputAudioRawFrame):
if (
frame.sample_rate != self._input_sample_rate
and not self._warned_input_sample_rate
):
self._warned_input_sample_rate = True
logger.warning(
"StepFun Realtime expected {} Hz input, received {} Hz",
self._input_sample_rate,
frame.sample_rate,
)
await self._send_event(
{
"type": "input_audio_buffer.append",
"audio": base64.b64encode(frame.audio).decode("ascii"),
}
)
return
if isinstance(frame, LLMMessagesAppendFrame):
for message in frame.messages:
text = self._message_text(message)
if text:
await self.send_text(text, run_immediately=frame.run_llm is not False)
return
if isinstance(frame, InterruptionFrame):
await self._send_event({"type": "response.cancel"}, wait_until_ready=False)
await self._finish_assistant_text(interrupted=True)
await self.push_frame(frame, direction)
async def send_text(self, text: str, *, run_immediately: bool = True) -> None:
await self._send_event(
{
"type": "conversation.item.create",
"item": {
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": text}],
},
}
)
if run_immediately:
await self._send_event({"type": "response.create"})
async def interrupt(self) -> None:
await self._send_event({"type": "response.cancel"}, wait_until_ready=False)
await self._finish_assistant_text(interrupted=True)
await self.broadcast_interruption()
async def speak(self, text: str) -> None:
"""Ask the realtime model to voice a fixed greeting."""
if not text:
return
await self._send_event(
{
"type": "response.create",
"session": {
"instructions": f"请原样无修改地输出下面的话:\n{text}",
},
}
)
async def _connect(self) -> None:
if self._websocket and self._websocket.state is State.OPEN:
return
try:
self._websocket = await websocket_connect(
self._connection_url(),
additional_headers={"Authorization": f"Bearer {self._api_key}"},
max_size=None,
open_timeout=10,
)
self._receive_task = self.create_task(
self._receive_messages(), name="stepfun_realtime_receive"
)
except Exception as exc:
self._websocket = None
await self.push_error(
f"StepFun Realtime connection failed: {exc}",
exception=exc,
fatal=True,
)
async def _disconnect(self) -> None:
current_task = asyncio.current_task()
task = self._receive_task
self._receive_task = None
if task and task is not current_task:
await self.cancel_task(task)
websocket = self._websocket
self._websocket = None
self._session_ready.clear()
if websocket and websocket.state is State.OPEN:
try:
await websocket.close()
except Exception:
pass
def _connection_url(self) -> str:
parts = urlsplit(self._base_url)
query = dict(parse_qsl(parts.query))
query["model"] = self._model
return urlunsplit(
(parts.scheme, parts.netloc, parts.path, urlencode(query), parts.fragment)
)
async def _receive_messages(self) -> None:
websocket = self._websocket
if not websocket:
return
try:
async for raw_message in websocket:
payload = json.loads(raw_message)
await self._handle_server_event(payload)
except Exception as exc:
if self._websocket is websocket:
await self.push_error(
f"StepFun Realtime receive failed: {exc}", exception=exc
)
finally:
if self._websocket is websocket:
self._websocket = None
self._session_ready.clear()
if self._receive_task is asyncio.current_task():
self._receive_task = None
async def _handle_server_event(self, event: dict[str, Any]) -> None:
event_type = event.get("type")
if event_type == "session.created":
await self._send_session_update()
elif event_type == "session.updated":
self._session_ready.set()
pending, self._pending_events = self._pending_events, []
for payload in pending:
await self._send_event(payload, wait_until_ready=False)
elif event_type == "response.audio.delta":
audio = event.get("delta")
if audio:
await self.push_frame(
TTSAudioRawFrame(
base64.b64decode(audio),
self._output_sample_rate,
1,
)
)
elif event_type in {"response.audio_transcript.delta", "response.text.delta"}:
await self._append_assistant_text(str(event.get("delta") or ""))
elif event_type in {"response.audio_transcript.done", "response.text.done"}:
transcript = str(event.get("transcript") or event.get("text") or "")
if transcript:
if not self._assistant_turn_id:
await self._append_assistant_text(transcript)
else:
self._assistant_text = transcript
await self._finish_assistant_text(interrupted=False)
elif event_type == "conversation.item.input_audio_transcription.completed":
await self._send_transcript("user", str(event.get("transcript") or ""))
elif event_type == "input_audio_buffer.speech_started":
await self._send_event({"type": "response.cancel"}, wait_until_ready=False)
await self.broadcast_interruption()
elif event_type == "response.done":
response = event.get("response")
interrupted = isinstance(response, dict) and response.get("status") in {
"cancelled",
"incomplete",
"interrupted",
}
await self._finish_assistant_text(interrupted=interrupted)
elif event_type == "error":
error = event.get("error")
message = error.get("message") if isinstance(error, dict) else str(error)
if "cancel" not in str(message).lower():
await self.push_error(f"StepFun Realtime error: {message}")
async def _send_session_update(self) -> None:
await self._send_event(
{
"type": "session.update",
"session": {
"modalities": ["text", "audio"],
"instructions": self._instructions,
"voice": self._voice,
"input_audio_format": "pcm16",
"output_audio_format": "pcm16",
"turn_detection": {
"type": "server_vad",
"prefix_padding_ms": self._prefix_padding_ms,
"silence_duration_ms": self._silence_duration_ms,
"energy_awakeness_threshold": self._energy_awakeness_threshold,
},
},
},
wait_until_ready=False,
)
async def _send_event(
self, payload: dict[str, Any], *, wait_until_ready: bool = True
) -> None:
if wait_until_ready and not self._session_ready.is_set():
self._pending_events.append(payload)
return
if not self._websocket or self._websocket.state is not State.OPEN:
return
payload = {"event_id": uuid4().hex, **payload}
await self._websocket.send(json.dumps(payload, ensure_ascii=False))
async def _append_assistant_text(self, delta: str) -> None:
if not delta:
return
if not self._assistant_turn_id:
self._assistant_turn_id = uuid4().hex
self._assistant_timestamp = time_now_iso8601()
await self._send_transport_message(
{
"type": "assistant-text-start",
"turn_id": self._assistant_turn_id,
"timestamp": self._assistant_timestamp,
}
)
self._assistant_text += delta
await self._send_transport_message(
{
"type": "assistant-text-delta",
"turn_id": self._assistant_turn_id,
"delta": delta,
}
)
async def _finish_assistant_text(self, *, interrupted: bool) -> None:
if not self._assistant_turn_id:
return
await self._send_transport_message(
{
"type": "assistant-text-end",
"turn_id": self._assistant_turn_id,
"content": self._assistant_text,
"interrupted": interrupted,
}
)
self._assistant_turn_id = None
self._assistant_text = ""
self._assistant_timestamp = ""
async def _send_transcript(self, role: str, content: str) -> None:
if content:
await self._send_transport_message(
{
"type": "transcript",
"role": role,
"content": content,
"timestamp": time_now_iso8601(),
}
)
async def _send_transport_message(self, message: dict[str, Any]) -> None:
await self.push_frame(OutputTransportMessageUrgentFrame(message=message))
@staticmethod
def _message_text(message: Any) -> str:
if not isinstance(message, dict):
return ""
content = message.get("content")
if isinstance(content, str):
return content.strip()
if isinstance(content, list):
return "\n".join(
str(part.get("text") or "")
for part in content
if isinstance(part, dict) and part.get("type") == "text"
).strip()
return ""

View File

@@ -0,0 +1,150 @@
"""工作流图引擎(第一版)。
对应 dograh 的 pipecat_engine.py,极简实现:
- 单个 startCall 入口,开场白来自该节点;
- agentNode 用各自的 prompt 驱动多轮对话;
- 每轮助手回复后,用一次轻量 LLM「路由」判断是否满足某条出边的 condition,
满足则切换当前节点(linear = 单边;branching = 多边按条件分流);
- 到达 endCall 播放结束语并停止路由。
只读图结构,不持有对话状态(当前节点由 pipeline 维护),便于单测。
"""
from __future__ import annotations
import re
from typing import Any
from loguru import logger
class WorkflowEngine:
def __init__(self, graph: dict[str, Any]):
nodes = graph.get("nodes") or []
self.nodes: dict[str, dict] = {n["id"]: n for n in nodes if n.get("id")}
self.edges: list[dict] = graph.get("edges") or []
self.start_id: str | None = next(
(nid for nid, n in self.nodes.items() if n.get("type") == "startCall"),
None,
)
# ---- 结构查询 ----
def node_type(self, nid: str | None) -> str | None:
return self.nodes.get(nid or "", {}).get("type")
def data(self, nid: str | None) -> dict:
return self.nodes.get(nid or "", {}).get("data") or {}
def name(self, nid: str | None) -> str:
return self.data(nid).get("name") or (self.node_type(nid) or "")
def outgoing(self, nid: str | None) -> list[dict]:
return [e for e in self.edges if e.get("source") == nid]
def edge_fn_name(self, edge: dict) -> str:
"""每条边对应一个 LLM 函数名(稳定、合法标识符)。"""
raw = edge.get("id") or f"{edge.get('source')}_{edge.get('target')}"
slug = re.sub(r"[^a-z0-9]+", "_", str(raw).lower()).strip("_")
return f"goto_{slug or 'next'}"
def edge_condition(self, edge: dict) -> str:
return (edge.get("data") or {}).get("condition") or ""
def edge_transition_speech(self, edge: dict | None) -> str:
"""命中该边、切换节点瞬间播报的过渡语(可选,掩盖延迟,不写入上下文)。"""
if not edge:
return ""
return (edge.get("data") or {}).get("transition_speech") or ""
def find_edge(self, source: str | None, target: str | None) -> dict | None:
for edge in self.edges:
if edge.get("source") == source and edge.get("target") == target:
return edge
return None
def edge_description(self, edge: dict) -> str:
"""作为转移函数的 description 交给 LLM:满足该条件时模型应调用此函数。"""
cond = self.edge_condition(edge)
target = self.name(edge.get("target"))
if cond:
return f"当满足以下条件时调用以转到节点「{target}」:{cond}"
return f"当当前节点任务完成、应继续推进对话时调用以转到节点「{target}」。"
def is_end(self, nid: str | None) -> bool:
return self.node_type(nid) == "endCall"
def has_graph(self) -> bool:
return self.start_id is not None
def greeting(self) -> str:
return self.data(self.start_id).get("greeting") or ""
def system_prompt_for(self, nid: str | None) -> str:
"""节点系统提示:仅用该节点自己的 prompt(开始节点也是会话节点)。"""
header = f"[当前节点:{self.name(nid)}]"
prompt = str(self.data(nid).get("prompt") or "").strip()
return f"{header}\n{prompt}" if prompt else header
# ---- 路由:决定下一节点 ----
async def route(
self,
nid: str | None,
history: list[dict],
*,
api_key: str,
base_url: str,
model: str,
) -> str | None:
"""根据对话历史判断当前节点是否应转移。返回目标节点 id,或 None 表示停留。"""
outs = self.outgoing(nid)
if not outs:
return None
options = []
for i, edge in enumerate(outs, 1):
edata = edge.get("data") or {}
cond = edata.get("condition") or "(无明确条件,作为默认后继)"
tgt_name = self.name(edge.get("target"))
options.append(f"{i}. 条件:{cond} → 目标节点:{tgt_name}")
convo = "\n".join(
f"{m['role']}: {m['content']}" for m in history[-8:] if m.get("content")
)
system = (
"你是语音对话的流程路由器。根据最近的对话,判断是否已满足某条转移条件。\n"
"规则:仅当某条件被明确满足时,返回其编号;若都不满足或不确定,返回 0 "
"(停留在当前节点继续对话)。只输出一个数字,不要任何解释。"
)
user = (
"可选转移:\n"
+ "\n".join(options)
+ f"\n\n对话记录:\n{convo}\n\n请只返回编号(0 表示停留):"
)
try:
from openai import AsyncOpenAI
client = AsyncOpenAI(api_key=api_key, base_url=base_url or None)
resp = await client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user},
],
temperature=0,
max_tokens=5,
)
text = (resp.choices[0].message.content or "").strip()
except Exception as exc: # noqa: BLE001 - 路由失败不应中断通话
logger.warning(f"工作流路由调用失败,停留当前节点: {exc}")
return None
match = re.search(r"\d+", text)
if not match:
return None
idx = int(match.group())
if idx < 1 or idx > len(outs):
return None
target = outs[idx - 1].get("target")
logger.info(f"工作流路由: {self.name(nid)}{self.name(target)} (edge {idx})")
return target

View File

@@ -8,6 +8,7 @@
"name": "ai-video-admin-frontend",
"version": "0.1.0",
"dependencies": {
"@xyflow/react": "^12.11.0",
"class-variance-authority": "^0.7.1",
"clsx": "^2.1.1",
"lucide-react": "^1.17.0",
@@ -3776,6 +3777,55 @@
"tslib": "^2.4.0"
}
},
"node_modules/@types/d3-color": {
"version": "3.1.3",
"resolved": "https://registry.npmjs.org/@types/d3-color/-/d3-color-3.1.3.tgz",
"integrity": "sha512-iO90scth9WAbmgv7ogoq57O9YpKmFBbmoEoCHDB2xMBY0+/KVrqAaCDyCE16dUspeOvIxFFRI+0sEtqDqy2b4A==",
"license": "MIT"
},
"node_modules/@types/d3-drag": {
"version": "3.0.7",
"resolved": "https://registry.npmjs.org/@types/d3-drag/-/d3-drag-3.0.7.tgz",
"integrity": "sha512-HE3jVKlzU9AaMazNufooRJ5ZpWmLIoc90A37WU2JMmeq28w1FQqCZswHZ3xR+SuxYftzHq6WU6KJHvqxKzTxxQ==",
"license": "MIT",
"dependencies": {
"@types/d3-selection": "*"
}
},
"node_modules/@types/d3-interpolate": {
"version": "3.0.4",
"resolved": "https://registry.npmjs.org/@types/d3-interpolate/-/d3-interpolate-3.0.4.tgz",
"integrity": "sha512-mgLPETlrpVV1YRJIglr4Ez47g7Yxjl1lj7YKsiMCb27VJH9W8NVM6Bb9d8kkpG/uAQS5AmbA48q2IAolKKo1MA==",
"license": "MIT",
"dependencies": {
"@types/d3-color": "*"
}
},
"node_modules/@types/d3-selection": {
"version": "3.0.11",
"resolved": "https://registry.npmjs.org/@types/d3-selection/-/d3-selection-3.0.11.tgz",
"integrity": "sha512-bhAXu23DJWsrI45xafYpkQ4NtcKMwWnAC/vKrd2l+nxMFuvOT3XMYTIj2opv8vq8AO5Yh7Qac/nSeP/3zjTK0w==",
"license": "MIT"
},
"node_modules/@types/d3-transition": {
"version": "3.0.9",
"resolved": "https://registry.npmjs.org/@types/d3-transition/-/d3-transition-3.0.9.tgz",
"integrity": "sha512-uZS5shfxzO3rGlu0cC3bjmMFKsXv+SmZZcgp0KD22ts4uGXp5EVYGzu/0YdwZeKmddhcAccYtREJKkPfXkZuCg==",
"license": "MIT",
"dependencies": {
"@types/d3-selection": "*"
}
},
"node_modules/@types/d3-zoom": {
"version": "3.0.8",
"resolved": "https://registry.npmjs.org/@types/d3-zoom/-/d3-zoom-3.0.8.tgz",
"integrity": "sha512-iqMC4/YlFCSlO8+2Ii1GGGliCAY4XdeG748w5vQUbevlbDu0zSjH/+jojorQVBK/se0j6DUFNPBGSqD3YWYnDw==",
"license": "MIT",
"dependencies": {
"@types/d3-interpolate": "*",
"@types/d3-selection": "*"
}
},
"node_modules/@types/estree": {
"version": "1.0.9",
"resolved": "https://registry.npmjs.org/@types/estree/-/estree-1.0.9.tgz",
@@ -4455,6 +4505,48 @@
"win32"
]
},
"node_modules/@xyflow/react": {
"version": "12.11.0",
"resolved": "https://registry.npmjs.org/@xyflow/react/-/react-12.11.0.tgz",
"integrity": "sha512-na4IO33FSs2OS72hASgZDmTYwFAkef7Z74uBUVrong3ARmQQHfnRUVaCFn1kTt5LbS6pK03TbYjCPGLjLFfziA==",
"license": "MIT",
"dependencies": {
"@xyflow/system": "0.0.77",
"classcat": "^5.0.3",
"zustand": "^4.4.0"
},
"peerDependencies": {
"@types/react": ">=17",
"@types/react-dom": ">=17",
"react": ">=17",
"react-dom": ">=17"
},
"peerDependenciesMeta": {
"@types/react": {
"optional": true
},
"@types/react-dom": {
"optional": true
}
}
},
"node_modules/@xyflow/system": {
"version": "0.0.77",
"resolved": "https://registry.npmjs.org/@xyflow/system/-/system-0.0.77.tgz",
"integrity": "sha512-qCDCMCQAAgUu8yHnhloHG9F5mwPX5E+Wl8McpYIOPSSXfzFJJoZcwOcsDiAjitVKIg2de1WmJbCHfpcvxprsgg==",
"license": "MIT",
"dependencies": {
"@types/d3-drag": "^3.0.7",
"@types/d3-interpolate": "^3.0.4",
"@types/d3-selection": "^3.0.10",
"@types/d3-transition": "^3.0.8",
"@types/d3-zoom": "^3.0.8",
"d3-drag": "^3.0.0",
"d3-interpolate": "^3.0.1",
"d3-selection": "^3.0.0",
"d3-zoom": "^3.0.0"
}
},
"node_modules/accepts": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/accepts/-/accepts-2.0.0.tgz",
@@ -5071,6 +5163,12 @@
"url": "https://polar.sh/cva"
}
},
"node_modules/classcat": {
"version": "5.0.5",
"resolved": "https://registry.npmjs.org/classcat/-/classcat-5.0.5.tgz",
"integrity": "sha512-JhZUT7JFcQy/EzW605k/ktHtncoo9vnyW/2GspNYwFlN1C/WmjuV/xtS04e9SOkL2sTdw0VAZ2UGCcQ9lR6p6w==",
"license": "MIT"
},
"node_modules/cli-cursor": {
"version": "5.0.0",
"resolved": "https://registry.npmjs.org/cli-cursor/-/cli-cursor-5.0.0.tgz",
@@ -5318,6 +5416,111 @@
"devOptional": true,
"license": "MIT"
},
"node_modules/d3-color": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/d3-color/-/d3-color-3.1.0.tgz",
"integrity": "sha512-zg/chbXyeBtMQ1LbD/WSoW2DpC3I0mpmPdW+ynRTj/x2DAWYrIY7qeZIHidozwV24m4iavr15lNwIwLxRmOxhA==",
"license": "ISC",
"engines": {
"node": ">=12"
}
},
"node_modules/d3-dispatch": {
"version": "3.0.1",
"resolved": "https://registry.npmjs.org/d3-dispatch/-/d3-dispatch-3.0.1.tgz",
"integrity": "sha512-rzUyPU/S7rwUflMyLc1ETDeBj0NRuHKKAcvukozwhshr6g6c5d8zh4c2gQjY2bZ0dXeGLWc1PF174P2tVvKhfg==",
"license": "ISC",
"engines": {
"node": ">=12"
}
},
"node_modules/d3-drag": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/d3-drag/-/d3-drag-3.0.0.tgz",
"integrity": "sha512-pWbUJLdETVA8lQNJecMxoXfH6x+mO2UQo8rSmZ+QqxcbyA3hfeprFgIT//HW2nlHChWeIIMwS2Fq+gEARkhTkg==",
"license": "ISC",
"dependencies": {
"d3-dispatch": "1 - 3",
"d3-selection": "3"
},
"engines": {
"node": ">=12"
}
},
"node_modules/d3-ease": {
"version": "3.0.1",
"resolved": "https://registry.npmjs.org/d3-ease/-/d3-ease-3.0.1.tgz",
"integrity": "sha512-wR/XK3D3XcLIZwpbvQwQ5fK+8Ykds1ip7A2Txe0yxncXSdq1L9skcG7blcedkOX+ZcgxGAmLX1FrRGbADwzi0w==",
"license": "BSD-3-Clause",
"engines": {
"node": ">=12"
}
},
"node_modules/d3-interpolate": {
"version": "3.0.1",
"resolved": "https://registry.npmjs.org/d3-interpolate/-/d3-interpolate-3.0.1.tgz",
"integrity": "sha512-3bYs1rOD33uo8aqJfKP3JWPAibgw8Zm2+L9vBKEHJ2Rg+viTR7o5Mmv5mZcieN+FRYaAOWX5SJATX6k1PWz72g==",
"license": "ISC",
"dependencies": {
"d3-color": "1 - 3"
},
"engines": {
"node": ">=12"
}
},
"node_modules/d3-selection": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/d3-selection/-/d3-selection-3.0.0.tgz",
"integrity": "sha512-fmTRWbNMmsmWq6xJV8D19U/gw/bwrHfNXxrIN+HfZgnzqTHp9jOmKMhsTUjXOJnZOdZY9Q28y4yebKzqDKlxlQ==",
"license": "ISC",
"engines": {
"node": ">=12"
}
},
"node_modules/d3-timer": {
"version": "3.0.1",
"resolved": "https://registry.npmjs.org/d3-timer/-/d3-timer-3.0.1.tgz",
"integrity": "sha512-ndfJ/JxxMd3nw31uyKoY2naivF+r29V+Lc0svZxe1JvvIRmi8hUsrMvdOwgS1o6uBHmiz91geQ0ylPP0aj1VUA==",
"license": "ISC",
"engines": {
"node": ">=12"
}
},
"node_modules/d3-transition": {
"version": "3.0.1",
"resolved": "https://registry.npmjs.org/d3-transition/-/d3-transition-3.0.1.tgz",
"integrity": "sha512-ApKvfjsSR6tg06xrL434C0WydLr7JewBB3V+/39RMHsaXTOG0zmt/OAXeng5M5LBm0ojmxJrpomQVZ1aPvBL4w==",
"license": "ISC",
"dependencies": {
"d3-color": "1 - 3",
"d3-dispatch": "1 - 3",
"d3-ease": "1 - 3",
"d3-interpolate": "1 - 3",
"d3-timer": "1 - 3"
},
"engines": {
"node": ">=12"
},
"peerDependencies": {
"d3-selection": "2 - 3"
}
},
"node_modules/d3-zoom": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/d3-zoom/-/d3-zoom-3.0.0.tgz",
"integrity": "sha512-b8AmV3kfQaqWAuacbPuNbL6vahnOJflOhexLzMMNLga62+/nh0JzvJ0aO/5a5MVgUFGS7Hu1P9P03o3fJkDCyw==",
"license": "ISC",
"dependencies": {
"d3-dispatch": "1 - 3",
"d3-drag": "2 - 3",
"d3-interpolate": "1 - 3",
"d3-selection": "2 - 3",
"d3-transition": "2 - 3"
},
"engines": {
"node": ">=12"
}
},
"node_modules/damerau-levenshtein": {
"version": "1.0.8",
"resolved": "https://registry.npmjs.org/damerau-levenshtein/-/damerau-levenshtein-1.0.8.tgz",
@@ -11532,6 +11735,34 @@
"peerDependencies": {
"zod": "^3.25.0 || ^4.0.0"
}
},
"node_modules/zustand": {
"version": "4.5.7",
"resolved": "https://registry.npmjs.org/zustand/-/zustand-4.5.7.tgz",
"integrity": "sha512-CHOUy7mu3lbD6o6LJLfllpjkzhHXSBlX8B9+qPddUsIfeF5S/UZ5q0kmCsnRqT1UHFQZchNFDDzMbQsuesHWlw==",
"license": "MIT",
"dependencies": {
"use-sync-external-store": "^1.2.2"
},
"engines": {
"node": ">=12.7.0"
},
"peerDependencies": {
"@types/react": ">=16.8",
"immer": ">=9.0.6",
"react": ">=16.8"
},
"peerDependenciesMeta": {
"@types/react": {
"optional": true
},
"immer": {
"optional": true
},
"react": {
"optional": true
}
}
}
}
}

View File

@@ -9,6 +9,7 @@
"lint": "eslint"
},
"dependencies": {
"@xyflow/react": "^12.11.0",
"class-variance-authority": "^0.7.1",
"clsx": "^2.1.1",
"lucide-react": "^1.17.0",

View File

@@ -31,6 +31,7 @@ import {
PhoneOff,
Orbit,
Waves,
Bug,
} from "lucide-react";
import { Button } from "@/components/ui/button";
@@ -38,6 +39,13 @@ import { Badge } from "@/components/ui/badge";
import { Input } from "@/components/ui/input";
import { Textarea } from "@/components/ui/textarea";
import { Switch } from "@/components/ui/switch";
import {
Sheet,
SheetContent,
SheetDescription,
SheetHeader,
SheetTitle,
} from "@/components/ui/sheet";
import {
DropdownMenu,
DropdownMenuContent,
@@ -89,6 +97,14 @@ import {
type ChatMessage,
type VoicePreviewStatus,
} from "@/hooks/use-voice-preview";
import {
WorkflowEditor,
type WorkflowSettings,
} from "@/components/workflow/WorkflowEditor";
import {
defaultGraph,
type WorkflowGraph,
} from "@/components/workflow/specs";
type RuntimeMode = "pipeline" | "realtime";
@@ -167,7 +183,7 @@ const typeToView = {
dify: "create-dify",
fastgpt: "create-fastgpt",
opencode: "create-opencode",
workflow: "placeholder",
workflow: "workflow",
} as const;
type View = "list" | "choose" | "loading" | (typeof typeToView)[ApiAssistantType];
@@ -348,6 +364,17 @@ export function AssistantPage(props: AssistantPageProps) {
// 已保存基线(当前类型表单的 JSON);与表单不一致时保存按钮才可点击
const [savedSnapshot, setSavedSnapshot] = useState<string | null>(null);
// workflow 编辑器:名称 + 图(nodes/edges)。graph 实时由画布回传。
const [workflowName, setWorkflowName] = useState("");
const [workflowGraph, setWorkflowGraph] = useState<WorkflowGraph>(() =>
defaultGraph(),
);
const [workflowSettings, setWorkflowSettings] = useState<WorkflowSettings>({
allowInterrupt: true,
});
const [debugOpen, setDebugOpen] = useState(false);
const [activeNodeId, setActiveNodeId] = useState<string | null>(null);
const loadAssistants = useCallback(async () => {
setListLoading(true);
setListError(null);
@@ -507,6 +534,14 @@ export function AssistantPage(props: AssistantPageProps) {
const next = { ...openCodeForm, apiKey: "" };
setOpenCodeForm(next);
setSavedSnapshot(JSON.stringify(next));
} else if (view === "workflow") {
setSavedSnapshot(
JSON.stringify({
name: workflowName,
graph: workflowGraph,
settings: workflowSettings,
}),
);
}
} catch (error) {
setSaveError(error instanceof Error ? error.message : "保存失败");
@@ -635,9 +670,29 @@ export function AssistantPage(props: AssistantPageProps) {
} else if (assistant.type === "opencode") {
setSavedSnapshot(JSON.stringify(fillOpenCodeForm(assistant)));
} else {
// 工作流:暂时显示占位页
setDraftName(assistant.name);
setDraftType(typeToLabel[assistant.type]);
// 工作流:回填名称与图(空图回落到默认 开始→智能体→结束)
const graph =
assistant.graph &&
Array.isArray((assistant.graph as WorkflowGraph).nodes) &&
(assistant.graph as WorkflowGraph).nodes.length > 0
? (assistant.graph as WorkflowGraph)
: defaultGraph();
const wfSettings: WorkflowSettings = {
llm: assistant.modelResourceIds.LLM,
asr: assistant.modelResourceIds.ASR,
tts: assistant.modelResourceIds.TTS,
allowInterrupt: assistant.enableInterrupt,
};
setWorkflowName(assistant.name);
setWorkflowGraph(graph);
setWorkflowSettings(wfSettings);
setSavedSnapshot(
JSON.stringify({
name: assistant.name,
graph,
settings: wfSettings,
}),
);
}
setView(typeToView[assistant.type]);
} catch (error) {
@@ -672,6 +727,22 @@ export function AssistantPage(props: AssistantPageProps) {
);
}
function handleSaveWorkflow() {
void save(
baseUpsert({
name: workflowName.trim(),
type: "workflow",
enableInterrupt: workflowSettings.allowInterrupt,
modelResourceIds: {
...(workflowSettings.llm ? { LLM: workflowSettings.llm } : {}),
...(workflowSettings.asr ? { ASR: workflowSettings.asr } : {}),
...(workflowSettings.tts ? { TTS: workflowSettings.tts } : {}),
},
graph: workflowGraph as unknown as Record<string, unknown>,
}),
);
}
// 当前编辑器表单相对已保存基线是否有改动(决定保存按钮是否可点)
const activeFormJson =
view === "create"
@@ -682,7 +753,13 @@ export function AssistantPage(props: AssistantPageProps) {
? JSON.stringify(fastGptForm)
: view === "create-opencode"
? JSON.stringify(openCodeForm)
: null;
: view === "workflow"
? JSON.stringify({
name: workflowName,
graph: workflowGraph,
settings: workflowSettings,
})
: null;
const dirty =
activeFormJson !== null &&
savedSnapshot !== null &&
@@ -1114,59 +1191,88 @@ export function AssistantPage(props: AssistantPageProps) {
);
}
if (view === "placeholder") {
if (view === "workflow") {
return (
<div className="mx-auto flex w-full max-w-[1180px] flex-col gap-6">
<div className="flex items-start justify-between gap-6">
<div>
<h1 className="font-display display-lg text-ink">
{draftName.trim() || "创建助手"}
</h1>
<p className="mt-3 max-w-2xl text-[15px] leading-7 text-muted-foreground">
{draftType}
</p>
<div className="-mt-6 flex h-full flex-col gap-4">
<div className="flex shrink-0 items-center justify-between gap-6 border-b border-hairline pb-3 pt-1">
<div className="flex min-w-0 items-center gap-2">
<EditorBackButton onClick={() => router.push("/assistants")} />
<EditableTitle value={workflowName} onChange={setWorkflowName} />
<AssistantIdentity assistantId={editingId} />
</div>
<Button
variant="outline"
size="lg"
className="shrink-0 gap-2 border-hairline-strong text-muted-foreground hover:text-foreground"
onClick={() => router.push("/assistants")}
>
<ChevronLeft size={16} />
</Button>
<div className="flex shrink-0 gap-2">
{saveError && (
<span className="self-center text-xs text-destructive">
{saveError}
</span>
)}
<Button
variant="outline"
className="gap-2 border-hairline-strong text-foreground hover:bg-surface-strong"
disabled={!editingId}
onClick={() => setDebugOpen(true)}
>
<Bug size={16} />
</Button>
<Button
className="gap-2"
disabled={saving || !dirty || !workflowName.trim()}
onClick={() => handleSaveWorkflow()}
>
{saving ? (
<Loader2 size={16} className="animate-spin" />
) : (
<Save size={16} />
)}
</Button>
</div>
</div>
<section className="relative overflow-hidden rounded-3xl border border-hairline bg-canvas-soft px-10 py-20 text-center">
<div
aria-hidden
className="pointer-events-none absolute -right-24 top-0 h-72 w-72 rounded-full opacity-50 blur-3xl"
style={{
backgroundImage:
"radial-gradient(circle, color-mix(in srgb, var(--gradient-sky) 50%, transparent), transparent 70%)",
<div className="min-h-0 flex-1">
<WorkflowEditor
value={workflowGraph}
onChange={setWorkflowGraph}
settings={workflowSettings}
onSettingsChange={setWorkflowSettings}
activeNodeId={activeNodeId}
modelOptions={{
llm: credOptions("LLM"),
asr: credOptions("ASR"),
tts: credOptions("TTS"),
}}
/>
<div
aria-hidden
className="pointer-events-none absolute -left-20 bottom-0 h-64 w-64 rounded-full opacity-45 blur-3xl"
style={{
backgroundImage:
"radial-gradient(circle, color-mix(in srgb, var(--gradient-lavender) 50%, transparent), transparent 70%)",
}}
/>
<div className="relative">
<div className="caption-label inline-flex rounded-full bg-surface-strong px-3 py-1 text-muted-foreground">
</div>
<p className="font-display display-sm mx-auto mt-5 max-w-md text-ink">
{draftType}
</p>
<p className="mx-auto mt-3 max-w-md text-sm leading-7 text-body">
</p>
</div>
</section>
</div>
<Sheet
open={debugOpen}
onOpenChange={(open) => {
setDebugOpen(open);
if (!open) setActiveNodeId(null);
}}
modal={false}
>
<SheetContent
side="right"
showOverlay={false}
onInteractOutside={(e) => e.preventDefault()}
className="w-[440px] gap-0 border-l border-hairline bg-card p-0 sm:max-w-[440px]"
>
<SheetHeader className="sr-only">
<SheetTitle></SheetTitle>
<SheetDescription>
,
</SheetDescription>
</SheetHeader>
<DebugDrawer
assistantId={editingId}
asSheet
onNodeActive={setActiveNodeId}
/>
</SheetContent>
</Sheet>
</div>
);
}
@@ -1767,12 +1873,25 @@ function SegmentedIconButton({
);
}
function DebugDrawer({ assistantId }: { assistantId: string | null }) {
function DebugDrawer({
assistantId,
asSheet = false,
onNodeActive,
}: {
assistantId: string | null;
asSheet?: boolean;
onNodeActive?: (nodeId: string | null) => void;
}) {
const [showTranscript, setShowTranscript] = useState(false);
const [vizStyle, setVizStyle] = useState<VizStyle>("aura");
// 内联(prompt 编辑器右栏)用 aside + 圆角边框;抽屉模式占满 Sheet。
const containerClass = asSheet
? "flex h-full min-w-0 flex-1 flex-col overflow-hidden"
: "hidden min-w-0 flex-1 flex-col overflow-hidden rounded-2xl border border-hairline bg-card shadow-sm lg:flex";
return (
<aside className="hidden min-w-0 flex-1 flex-col overflow-hidden rounded-2xl border border-hairline bg-card shadow-sm lg:flex">
<aside className={containerClass}>
<div className="flex shrink-0 items-center justify-between gap-3 border-b border-hairline px-5 py-3">
<div className="text-sm font-medium text-foreground"></div>
{SHOW_VOICE_VIZ && (
@@ -1815,6 +1934,7 @@ function DebugDrawer({ assistantId }: { assistantId: string | null }) {
showTranscript={showTranscript}
vizStyle={vizStyle}
assistantId={assistantId}
onNodeActive={onNodeActive}
/>
</aside>
);
@@ -1824,10 +1944,12 @@ function DebugVoicePanel({
showTranscript,
vizStyle,
assistantId,
onNodeActive,
}: {
showTranscript: boolean;
vizStyle: VizStyle;
assistantId: string | null;
onNodeActive?: (nodeId: string | null) => void;
}) {
const {
status,
@@ -1843,7 +1965,7 @@ function DebugVoicePanel({
connect,
disconnect,
audioRef,
} = useVoicePreview(assistantId);
} = useVoicePreview(assistantId, onNodeActive);
// 连接中或已连通都视作"会话进行中"
const recording = status === "connecting" || status === "connected";
const [textDraft, setTextDraft] = useState("");

View File

@@ -50,14 +50,16 @@ function SheetContent({
children,
side = "right",
showCloseButton = true,
showOverlay = true,
...props
}: React.ComponentProps<typeof SheetPrimitive.Content> & {
side?: "top" | "right" | "bottom" | "left"
showCloseButton?: boolean
showOverlay?: boolean
}) {
return (
<SheetPortal>
<SheetOverlay />
{showOverlay && <SheetOverlay />}
<SheetPrimitive.Content
data-slot="sheet-content"
data-side={side}

View File

@@ -0,0 +1,116 @@
"use client";
/**
* 条件边。边携带 condition(自然语言条件,LLM 据此决定是否走这条路径)与
* label(日志里识别该路径的短标签)。悬停/选中时在标签旁显示「编辑 / 删除」按钮。
*/
import {
BaseEdge,
EdgeLabelRenderer,
getSmoothStepPath,
type EdgeProps,
useReactFlow,
} from "@xyflow/react";
import { Pencil, Trash2 } from "lucide-react";
import { useContext, useState } from "react";
import { cn } from "@/lib/utils";
import { EdgeActionContext } from "./context";
export function ConditionEdge({
id,
sourceX,
sourceY,
targetX,
targetY,
sourcePosition,
targetPosition,
data,
selected,
}: EdgeProps) {
const actions = useContext(EdgeActionContext);
const { setEdges } = useReactFlow();
const [hovered, setHovered] = useState(false);
// 点击标签:只选中这条边(露出编辑/删除按钮),不直接进入编辑。
const selectThisEdge = () =>
setEdges((eds) => eds.map((e) => ({ ...e, selected: e.id === id })));
const [path, labelX, labelY] = getSmoothStepPath({
sourceX,
sourceY,
sourcePosition,
targetX,
targetY,
targetPosition,
borderRadius: 8,
offset: 20,
});
const label = ((data?.label as string) || (data?.condition as string) || "")
.toString()
.trim();
const expanded = hovered || selected;
return (
<>
<BaseEdge
id={id}
path={path}
style={{
stroke: selected ? "var(--primary)" : "var(--muted-soft)",
strokeWidth: selected ? 2.5 : 1.5,
transition: "stroke 0.15s ease, stroke-width 0.15s ease",
}}
/>
<EdgeLabelRenderer>
<div
className="nodrag nopan pointer-events-auto absolute flex items-center gap-1"
style={{
transform: `translate(-50%, -50%) translate(${labelX}px, ${labelY}px)`,
}}
onMouseEnter={() => setHovered(true)}
onMouseLeave={() => setHovered(false)}
>
<button
type="button"
title="条件"
className={cn(
"max-w-[160px] truncate rounded-full border px-2.5 py-1 text-xs shadow-sm transition-colors",
label
? "border-hairline bg-card text-foreground hover:border-primary"
: "border-dashed border-hairline-strong bg-card/80 text-muted-soft hover:text-foreground",
selected && "border-primary ring-2 ring-primary/30",
)}
onClick={selectThisEdge}
>
{label || " 条件"}
</button>
{expanded && (
<button
type="button"
title="编辑条件"
className="flex h-6 w-6 items-center justify-center rounded-full border border-hairline bg-card text-muted-foreground shadow-sm transition-colors hover:text-foreground"
onClick={() => actions.edit(id)}
>
<Pencil size={12} />
</button>
)}
{expanded && (
<button
type="button"
title="删除连线"
className="flex h-6 w-6 items-center justify-center rounded-full border border-hairline bg-card text-muted-foreground shadow-sm transition-colors hover:text-destructive"
onClick={() => actions.remove(id)}
>
<Trash2 size={12} />
</button>
)}
</div>
</EdgeLabelRenderer>
</>
);
}
export const edgeTypes = { condition: ConditionEdge };

View File

@@ -0,0 +1,147 @@
"use client";
/**
* 通用节点渲染器。所有节点类型共用一个组件,规格来自 NodeSpecsContext
* (后端 /api/node-types)。悬停/选中时在右上角显示「编辑 / 删除」按钮。
*/
import { Handle, Position, type NodeProps } from "@xyflow/react";
import { MessageSquareText, Pencil, Trash2 } from "lucide-react";
import { useContext } from "react";
import { cn } from "@/lib/utils";
import {
ActiveNodeContext,
NodeActionContext,
NodeSpecsContext,
} from "./context";
import { accentVar, type WorkflowNodeData } from "./specs";
export function GenericNode({ id, type, data, selected }: NodeProps) {
const specs = useContext(NodeSpecsContext);
const actions = useContext(NodeActionContext);
const activeNodeId = useContext(ActiveNodeContext);
const isActive = activeNodeId === id;
const spec = specs[type as string];
if (!spec) return null;
const nodeData = data as WorkflowNodeData;
const Icon = spec.icon;
const preview = (nodeData.greeting || nodeData.prompt || "")
.toString()
.trim();
return (
<div
data-node-id={id}
className={cn(
"group relative w-[250px] rounded-2xl border bg-card p-4 text-card-foreground shadow-sm transition-[border-color,box-shadow,transform]",
isActive
? "border-success shadow-[0_12px_34px_color-mix(in_srgb,var(--success)_20%,transparent)] ring-2 ring-success/50"
: selected
? "border-primary shadow-[0_12px_34px_color-mix(in_srgb,var(--primary)_16%,transparent)]"
: "border-hairline hover:border-hairline-strong hover:shadow-md",
)}
>
{spec.hasTarget && (
<Handle
type="target"
position={Position.Left}
className="!h-3 !w-3 !border-[3px] !border-card !bg-muted-soft"
/>
)}
<div
aria-hidden
className="absolute left-5 right-5 top-0 h-px"
style={{
background: `linear-gradient(90deg, transparent, var(${accentVar(spec.accent)}), transparent)`,
}}
/>
{isActive && (
<div className="absolute -top-3 left-3 flex items-center gap-1.5 rounded-full bg-success px-2 py-0.5 text-[10px] font-medium text-on-primary shadow-sm">
<span className="h-1.5 w-1.5 animate-pulse rounded-full bg-on-primary" />
</div>
)}
{/* 悬停/选中时出现的操作按钮 */}
<div
className={cn(
"nodrag absolute -top-3 right-3 flex items-center gap-1 transition-opacity",
selected
? "opacity-100"
: "opacity-0 group-hover:opacity-100",
)}
>
<button
type="button"
title="编辑节点"
className="flex h-7 w-7 items-center justify-center rounded-full border border-hairline bg-card text-muted-foreground shadow-sm transition-colors hover:text-foreground"
onClick={(e) => {
e.stopPropagation();
actions.edit(id);
}}
>
<Pencil size={13} />
</button>
{type !== "startCall" && (
<button
type="button"
title="删除节点"
className="flex h-7 w-7 items-center justify-center rounded-full border border-hairline bg-card text-muted-foreground shadow-sm transition-colors hover:text-destructive"
onClick={(e) => {
e.stopPropagation();
actions.remove(id);
}}
>
<Trash2 size={13} />
</button>
)}
</div>
<div className="flex items-start gap-3">
<div
className="flex h-10 w-10 shrink-0 items-center justify-center rounded-full text-foreground"
style={{
background: `color-mix(in srgb, var(${accentVar(spec.accent)}) 28%, var(--surface-strong))`,
}}
>
<Icon size={17} />
</div>
<div className="min-w-0 flex-1">
<div className="caption-label text-muted-soft">{spec.displayName}</div>
<div className="mt-1 truncate text-sm font-medium text-foreground">
{nodeData.name || spec.displayName}
</div>
</div>
<MessageSquareText size={15} className="mt-1 text-muted-soft" />
</div>
{preview ? (
<p className="mt-4 line-clamp-2 border-t border-hairline-soft pt-3 text-xs leading-5 text-muted-foreground">
{preview}
</p>
) : (
<p className="mt-4 border-t border-hairline-soft pt-3 text-xs leading-5 text-muted-foreground">
{spec.description}
</p>
)}
{spec.hasSource && (
<Handle
type="source"
position={Position.Right}
className="!h-3 !w-3 !border-[3px] !border-card !bg-primary"
/>
)}
</div>
);
}
export const nodeTypes = {
startCall: GenericNode,
agentNode: GenericNode,
endCall: GenericNode,
};

View File

@@ -0,0 +1,807 @@
"use client";
/**
* 工作流可视化编辑器。基于 @xyflow/react,样式套用 ai-video 设计 token。
*
* 工具栏:添加节点(弹窗选类型)、工作流设置(ASR/LLM/TTS、是否允许打断)。
* 节点与条件边:悬停/选中出现「编辑 / 删除」按钮,编辑统一用弹出对话框。
* 节点目录来自后端 /api/node-types。数据通过 value/onChange 与外部 graph 同步。
*/
import {
addEdge,
Background,
BackgroundVariant,
type Connection,
Controls,
type Edge,
MiniMap,
type Node,
type NodeChange,
Panel,
ReactFlow,
ReactFlowProvider,
useEdgesState,
useNodesState,
useReactFlow,
} from "@xyflow/react";
import "@xyflow/react/dist/style.css";
import { Loader2, Plus, Settings2, Trash2 } from "lucide-react";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { Button } from "@/components/ui/button";
import {
Dialog,
DialogContent,
DialogDescription,
DialogFooter,
DialogHeader,
DialogTitle,
} from "@/components/ui/dialog";
import { Input } from "@/components/ui/input";
import {
Select,
SelectContent,
SelectItem,
SelectTrigger,
SelectValue,
} from "@/components/ui/select";
import { Switch } from "@/components/ui/switch";
import { Textarea } from "@/components/ui/textarea";
import { edgeTypes } from "./ConditionEdge";
import {
ActiveNodeContext,
EdgeActionContext,
NodeActionContext,
NodeSpecsContext,
} from "./context";
import { nodeTypes } from "./GenericNode";
import {
accentVar,
defaultGraph,
type NodeSpecMap,
type RuntimeNodeSpec,
type WorkflowGraph,
type WorkflowNodeData,
type WorkflowNodeType,
} from "./specs";
import { useNodeSpecs } from "./useNodeSpecs";
export type WorkflowSettings = {
llm?: string;
asr?: string;
tts?: string;
allowInterrupt: boolean;
};
export type ModelOption = { value: string; label: string };
export type WorkflowEditorProps = {
value?: WorkflowGraph;
onChange?: (graph: WorkflowGraph) => void;
settings: WorkflowSettings;
onSettingsChange: (settings: WorkflowSettings) => void;
modelOptions: { llm: ModelOption[]; asr: ModelOption[]; tts: ModelOption[] };
/** 调试通话中当前激活的节点 id(用于高亮)。 */
activeNodeId?: string | null;
};
let nodeSeq = 0;
const NONE = "__none__";
function toFlow(graph: WorkflowGraph): { nodes: Node[]; edges: Edge[] } {
return {
nodes: graph.nodes.map((n) => ({
id: n.id,
type: n.type,
position: n.position,
data: n.data,
})),
edges: graph.edges.map((e) => ({
id: e.id,
type: "condition",
source: e.source,
target: e.target,
data: e.data ?? {},
})),
};
}
function fromFlow(nodes: Node[], edges: Edge[]): WorkflowGraph {
return {
nodes: nodes.map((n) => ({
id: n.id,
type: n.type as WorkflowNodeType,
position: n.position,
data: n.data as WorkflowNodeData,
})),
edges: edges.map((e) => ({
id: e.id,
source: e.source,
target: e.target,
data:
(e.data as {
condition?: string;
label?: string;
transition_speech?: string;
}) ?? {},
})),
};
}
function Canvas({
value,
onChange,
settings,
onSettingsChange,
modelOptions,
activeNodeId,
specsByType,
}: WorkflowEditorProps & { specsByType: NodeSpecMap }) {
const initial = useMemo(() => toFlow(value ?? defaultGraph()), [value]);
const [nodes, setNodes, onNodesChange] = useNodesState(initial.nodes);
const [edges, setEdges, onEdgesChange] = useEdgesState(initial.edges);
const [editingId, setEditingId] = useState<string | null>(null);
const [editingEdgeId, setEditingEdgeId] = useState<string | null>(null);
const [addOpen, setAddOpen] = useState(false);
const [settingsOpen, setSettingsOpen] = useState(false);
const { screenToFlowPosition } = useReactFlow();
// 回传画布状态给外部(助手 graph)。用 ref 避免把 onChange 放进依赖导致循环。
const onChangeRef = useRef(onChange);
useEffect(() => {
onChangeRef.current = onChange;
}, [onChange]);
useEffect(() => {
onChangeRef.current?.(fromFlow(nodes, edges));
}, [nodes, edges]);
const onConnect = useCallback(
(connection: Connection) => {
setEdges((eds) =>
addEdge(
{
...connection,
id: `e-${connection.source}-${connection.target}-${Date.now()}`,
type: "condition",
animated: true,
data: {},
},
eds,
),
);
},
[setEdges],
);
// 连线约束:不能连入开始节点(无入边句柄),不能自连。
const isValidConnection = useCallback(
(c: Connection | Edge) => {
if (c.source === c.target) return false;
const target = nodes.find((n) => n.id === c.target);
if (!target) return false;
const spec = specsByType[target.type as string];
return !!spec?.hasTarget;
},
[nodes, specsByType],
);
const addNode = useCallback(
(spec: RuntimeNodeSpec) => {
nodeSeq += 1;
const id = `${spec.type}-${Date.now()}-${nodeSeq}`;
const position = screenToFlowPosition({
x: window.innerWidth / 2,
y: window.innerHeight / 2,
});
const data: WorkflowNodeData = { name: spec.displayName };
if (spec.type === "agentNode") {
data.allowInterrupt = true;
data.prompt = "";
}
setNodes((ns) => [...ns, { id, type: spec.type, position, data }]);
setAddOpen(false);
setEditingId(id);
},
[screenToFlowPosition, setNodes],
);
const updateNodeData = useCallback(
(id: string, patch: Partial<WorkflowNodeData>) => {
setNodes((ns) =>
ns.map((n) =>
n.id === id ? { ...n, data: { ...n.data, ...patch } } : n,
),
);
},
[setNodes],
);
const deleteNode = useCallback(
(id: string) => {
if (nodes.find((node) => node.id === id)?.type === "startCall") return;
setNodes((ns) => ns.filter((n) => n.id !== id));
setEdges((es) => es.filter((e) => e.source !== id && e.target !== id));
setEditingId((cur) => (cur === id ? null : cur));
},
[nodes, setNodes, setEdges],
);
const handleNodesChange = useCallback(
(changes: NodeChange[]) => {
const startIds = new Set(
nodes.filter((node) => node.type === "startCall").map((node) => node.id),
);
onNodesChange(
changes.filter(
(change) => change.type !== "remove" || !startIds.has(change.id),
),
);
},
[nodes, onNodesChange],
);
const updateEdgeData = useCallback(
(
id: string,
patch: {
condition?: string;
label?: string;
transition_speech?: string;
},
) => {
setEdges((es) =>
es.map((e) =>
e.id === id ? { ...e, data: { ...(e.data ?? {}), ...patch } } : e,
),
);
},
[setEdges],
);
const deleteEdge = useCallback(
(id: string) => {
setEdges((es) => es.filter((e) => e.id !== id));
setEditingEdgeId((cur) => (cur === id ? null : cur));
},
[setEdges],
);
const nodeActions = useMemo(
() => ({ edit: setEditingId, remove: deleteNode }),
[deleteNode],
);
const edgeActions = useMemo(
() => ({ edit: setEditingEdgeId, remove: deleteEdge }),
[deleteEdge],
);
const editingNode = nodes.find((n) => n.id === editingId);
const editingSpec = editingNode ? specsByType[editingNode.type as string] : null;
const editingEdge = edges.find((e) => e.id === editingEdgeId);
const addableSpecs = Object.values(specsByType).filter((s) => s.addable);
return (
<NodeSpecsContext.Provider value={specsByType}>
<ActiveNodeContext.Provider value={activeNodeId ?? null}>
<NodeActionContext.Provider value={nodeActions}>
<EdgeActionContext.Provider value={edgeActions}>
<div className="h-full w-full min-h-[560px]">
<section className="relative h-full w-full overflow-hidden rounded-2xl border border-hairline bg-canvas-soft shadow-sm">
<div
aria-hidden
className="pointer-events-none absolute -right-24 -top-24 z-0 h-80 w-80 rounded-full opacity-30 blur-3xl"
style={{
background:
"radial-gradient(circle, var(--gradient-sky), transparent 68%)",
}}
/>
<div
aria-hidden
className="pointer-events-none absolute -bottom-28 left-1/4 z-0 h-72 w-72 rounded-full opacity-25 blur-3xl"
style={{
background:
"radial-gradient(circle, var(--gradient-lavender), transparent 68%)",
}}
/>
<ReactFlow
nodes={nodes}
edges={edges}
nodeTypes={nodeTypes}
edgeTypes={edgeTypes}
onNodesChange={handleNodesChange}
onEdgesChange={onEdgesChange}
onConnect={onConnect}
isValidConnection={isValidConnection}
onPaneClick={() => {
setEditingId(null);
setEditingEdgeId(null);
}}
fitView
proOptions={{ hideAttribution: true }}
defaultEdgeOptions={{ type: "condition", animated: true }}
>
<Background
variant={BackgroundVariant.Dots}
gap={22}
size={1}
color="var(--hairline-strong)"
/>
<Controls
className="!rounded-xl !border !border-hairline !bg-card !shadow-sm [&_button]:!border-hairline [&_button]:!bg-card [&_button]:!text-foreground"
/>
<MiniMap
pannable
zoomable
className="!rounded-xl !border !border-hairline !bg-card"
maskColor="color-mix(in srgb, var(--canvas-soft) 70%, transparent)"
nodeColor="var(--surface-strong)"
/>
<Panel position="top-left" className="flex gap-2">
<Button
size="sm"
className="gap-2"
onClick={() => setAddOpen(true)}
>
<Plus size={15} />
</Button>
<Button
size="sm"
variant="outline"
className="gap-2 border-hairline-strong bg-card text-foreground hover:bg-surface-strong"
onClick={() => setSettingsOpen(true)}
>
<Settings2 size={15} />
</Button>
</Panel>
</ReactFlow>
</section>
{/* 添加节点弹窗 */}
<Dialog open={addOpen} onOpenChange={setAddOpen}>
<DialogContent className="gap-0 overflow-hidden border border-hairline bg-card p-0 shadow-2xl sm:max-w-[500px]">
<DialogHeader className="relative overflow-hidden border-b border-hairline px-6 py-6 pr-16">
<div
aria-hidden
className="pointer-events-none absolute -right-14 -top-16 h-40 w-40 rounded-full opacity-40 blur-3xl"
style={{
background:
"radial-gradient(circle, var(--gradient-sky), transparent 68%)",
}}
/>
<div className="relative flex items-start gap-4">
<div className="flex h-11 w-11 shrink-0 items-center justify-center rounded-full bg-surface-strong text-foreground">
<Plus size={18} />
</div>
<div>
<div className="caption-label text-muted-soft">
</div>
<DialogTitle className="font-display display-sm mt-1 text-ink">
</DialogTitle>
<DialogDescription className="mt-2 leading-6">
线
</DialogDescription>
</div>
</div>
</DialogHeader>
<div className="flex max-h-[440px] flex-col gap-3 overflow-y-auto bg-canvas-soft/70 p-4">
{addableSpecs.length === 0 ? (
<p className="rounded-2xl border border-dashed border-hairline-strong bg-card px-4 py-8 text-center text-sm text-muted-soft">
</p>
) : null}
{addableSpecs.map((spec) => {
const Icon = spec.icon;
return (
<button
key={spec.type}
type="button"
className="group relative flex items-start gap-4 overflow-hidden rounded-2xl border border-hairline bg-card p-4 text-left shadow-sm transition-[border-color,box-shadow,transform] hover:-translate-y-0.5 hover:border-hairline-strong hover:shadow-md"
onClick={() => addNode(spec)}
>
<span
aria-hidden
className="absolute left-5 right-5 top-0 h-px"
style={{
background: `linear-gradient(90deg, transparent, var(${accentVar(spec.accent)}), transparent)`,
}}
/>
<div
className="flex h-10 w-10 shrink-0 items-center justify-center rounded-full text-foreground transition-transform group-hover:scale-105"
style={{
background: `color-mix(in srgb, var(${accentVar(spec.accent)}) 28%, var(--surface-strong))`,
}}
>
<Icon size={17} />
</div>
<div className="min-w-0 flex-1">
<div className="text-sm font-medium text-foreground">
{spec.displayName}
</div>
<div className="mt-1 text-xs leading-5 text-muted-foreground">
{spec.description}
</div>
</div>
<Plus
size={15}
className="mt-1 shrink-0 text-muted-soft transition-colors group-hover:text-foreground"
/>
</button>
);
})}
</div>
</DialogContent>
</Dialog>
{/* 工作流设置弹窗 */}
<Dialog open={settingsOpen} onOpenChange={setSettingsOpen}>
<DialogContent className="gap-0 overflow-hidden border border-hairline bg-card p-0 shadow-2xl sm:max-w-[500px]">
<DialogHeader className="relative overflow-hidden border-b border-hairline px-6 py-6 pr-16">
<div
aria-hidden
className="pointer-events-none absolute -right-14 -top-16 h-40 w-40 rounded-full opacity-40 blur-3xl"
style={{
background:
"radial-gradient(circle, var(--gradient-lavender), transparent 68%)",
}}
/>
<div className="relative flex items-start gap-4">
<div className="flex h-11 w-11 shrink-0 items-center justify-center rounded-full bg-surface-strong text-foreground">
<Settings2 size={18} />
</div>
<div>
<div className="caption-label text-muted-soft">
</div>
<DialogTitle className="font-display display-sm mt-1 text-ink">
</DialogTitle>
<DialogDescription className="mt-2 leading-6">
使
</DialogDescription>
</div>
</div>
</DialogHeader>
<div className="flex max-h-[62vh] flex-col gap-3 overflow-y-auto bg-canvas-soft/70 p-4">
<ModelSelect
label="大模型(LLM)"
value={settings.llm}
options={modelOptions.llm}
onChange={(v) => onSettingsChange({ ...settings, llm: v })}
/>
<ModelSelect
label="语音识别(ASR)"
value={settings.asr}
options={modelOptions.asr}
onChange={(v) => onSettingsChange({ ...settings, asr: v })}
/>
<ModelSelect
label="语音合成(TTS)"
value={settings.tts}
options={modelOptions.tts}
onChange={(v) => onSettingsChange({ ...settings, tts: v })}
/>
<label className="flex items-center justify-between gap-4 rounded-2xl border border-hairline bg-card p-4 shadow-sm">
<span>
<span className="block text-sm font-medium text-foreground">
</span>
<span className="mt-1 block text-xs leading-5 text-muted-foreground">
</span>
</span>
<Switch
checked={settings.allowInterrupt}
onCheckedChange={(checked) =>
onSettingsChange({ ...settings, allowInterrupt: checked })
}
/>
</label>
</div>
<DialogFooter className="border-t border-hairline bg-card px-5 py-4">
<Button onClick={() => setSettingsOpen(false)}></Button>
</DialogFooter>
</DialogContent>
</Dialog>
{/* 节点编辑弹窗 */}
<Dialog
open={!!editingNode}
onOpenChange={(open) => !open && setEditingId(null)}
>
<DialogContent className="sm:max-w-[460px]">
{editingNode && editingSpec && (
<NodeForm
key={editingNode.id}
spec={editingSpec}
data={editingNode.data as WorkflowNodeData}
onSave={(patch) => {
updateNodeData(editingNode.id, patch);
setEditingId(null);
}}
onDelete={
editingNode.type !== "startCall"
? () => deleteNode(editingNode.id)
: undefined
}
/>
)}
</DialogContent>
</Dialog>
{/* 条件编辑弹窗 */}
<Dialog
open={!!editingEdge}
onOpenChange={(open) => !open && setEditingEdgeId(null)}
>
<DialogContent className="sm:max-w-[460px]">
{editingEdge && (
<EdgeForm
key={editingEdge.id}
edge={editingEdge}
onSave={(patch) => {
updateEdgeData(editingEdge.id, patch);
setEditingEdgeId(null);
}}
onDelete={() => deleteEdge(editingEdge.id)}
/>
)}
</DialogContent>
</Dialog>
</div>
</EdgeActionContext.Provider>
</NodeActionContext.Provider>
</ActiveNodeContext.Provider>
</NodeSpecsContext.Provider>
);
}
function ModelSelect({
label,
value,
options,
onChange,
}: {
label: string;
value?: string;
options: ModelOption[];
onChange: (value: string | undefined) => void;
}) {
return (
<div className="flex flex-col gap-2 rounded-2xl border border-hairline bg-card p-4 shadow-sm">
<label className="text-sm font-medium text-foreground">{label}</label>
<Select
value={value ?? NONE}
onValueChange={(v) => onChange(v === NONE ? undefined : v)}
>
<SelectTrigger className="w-full border-hairline-strong bg-canvas-soft text-foreground">
<SelectValue placeholder="选择模型资源" />
</SelectTrigger>
<SelectContent>
<SelectItem value={NONE}></SelectItem>
{options.map((o) => (
<SelectItem key={o.value} value={o.value}>
{o.label}
</SelectItem>
))}
</SelectContent>
</Select>
</div>
);
}
function NodeForm({
spec,
data,
onSave,
onDelete,
}: {
spec: RuntimeNodeSpec;
data: WorkflowNodeData;
onSave: (patch: WorkflowNodeData) => void;
onDelete?: () => void;
}) {
const [draft, setDraft] = useState<WorkflowNodeData>({ ...data });
const set = (key: string, val: unknown) =>
setDraft((d) => ({ ...d, [key]: val }));
return (
<>
<DialogHeader>
<DialogTitle className="font-display text-ink">
{spec.displayName}
</DialogTitle>
<DialogDescription>{spec.description}</DialogDescription>
</DialogHeader>
<div className="flex flex-col gap-5 py-2">
{spec.fields.map((field) => {
const raw = draft[field.key];
if (field.type === "switch") {
return (
<label
key={field.key}
className="flex items-center justify-between gap-3"
>
<span className="text-sm font-medium text-foreground">
{field.label}
</span>
<Switch
checked={Boolean(raw)}
onCheckedChange={(checked) => set(field.key, checked)}
/>
</label>
);
}
return (
<div key={field.key} className="flex flex-col gap-2">
<label className="text-sm font-medium text-foreground">
{field.label}
{field.required && <span className="text-destructive"> *</span>}
</label>
{field.type === "textarea" ? (
<Textarea
rows={4}
value={(raw as string) ?? ""}
onChange={(e) => set(field.key, e.target.value)}
className="resize-none border-hairline-strong bg-background text-foreground placeholder:text-muted-soft"
/>
) : (
<Input
value={(raw as string) ?? ""}
onChange={(e) => set(field.key, e.target.value)}
className="border-hairline-strong bg-background text-foreground placeholder:text-muted-soft"
/>
)}
</div>
);
})}
</div>
<DialogFooter className="flex-row justify-between sm:justify-between">
{onDelete ? (
<Button
variant="outline"
className="gap-2 border-hairline-strong text-muted-foreground hover:text-destructive"
onClick={onDelete}
>
<Trash2 size={15} />
</Button>
) : (
<span />
)}
<Button onClick={() => onSave(draft)}></Button>
</DialogFooter>
</>
);
}
function EdgeForm({
edge,
onSave,
onDelete,
}: {
edge: Edge;
onSave: (patch: {
condition: string;
label: string;
transition_speech: string;
}) => void;
onDelete: () => void;
}) {
const data = (edge.data ?? {}) as {
condition?: string;
label?: string;
transition_speech?: string;
};
const [label, setLabel] = useState(data.label ?? "");
const [condition, setCondition] = useState(data.condition ?? "");
const [transitionSpeech, setTransitionSpeech] = useState(
data.transition_speech ?? "",
);
return (
<>
<DialogHeader>
<DialogTitle className="font-display text-ink"></DialogTitle>
<DialogDescription>
沿
</DialogDescription>
</DialogHeader>
<div className="flex flex-col gap-5 py-2">
<div className="flex flex-col gap-2">
<label className="text-sm font-medium text-foreground"></label>
<Input
value={label}
maxLength={64}
placeholder="例如:用户想转人工"
onChange={(e) => setLabel(e.target.value)}
className="border-hairline-strong bg-background text-foreground placeholder:text-muted-soft"
/>
<span className="text-xs text-muted-soft">
,{label.length}/64
</span>
</div>
<div className="flex flex-col gap-2">
<label className="text-sm font-medium text-foreground"></label>
<Textarea
rows={4}
value={condition}
placeholder="描述触发这条路径的条件,例如:用户明确表示要找人工客服。"
onChange={(e) => setCondition(e.target.value)}
className="resize-none border-hairline-strong bg-background text-foreground placeholder:text-muted-soft"
/>
</div>
<div className="flex flex-col gap-2">
<label className="text-sm font-medium text-foreground">
()
</label>
<Textarea
rows={2}
value={transitionSpeech}
placeholder="切换到下一节点前先说的一句过渡语,用于掩盖延迟,例如:好的,我帮你转接。"
onChange={(e) => setTransitionSpeech(e.target.value)}
className="resize-none border-hairline-strong bg-background text-foreground placeholder:text-muted-soft"
/>
<span className="text-xs text-muted-soft">
,
</span>
</div>
</div>
<DialogFooter className="flex-row justify-between sm:justify-between">
<Button
variant="outline"
className="gap-2 border-hairline-strong text-muted-foreground hover:text-destructive"
onClick={onDelete}
>
<Trash2 size={15} />
线
</Button>
<Button
onClick={() =>
onSave({ condition, label, transition_speech: transitionSpeech })
}
>
</Button>
</DialogFooter>
</>
);
}
export function WorkflowEditor(props: WorkflowEditorProps) {
const { byType, loading, error } = useNodeSpecs();
if (loading) {
return (
<div className="flex h-full w-full items-center justify-center rounded-2xl border border-hairline bg-canvas-soft text-muted-foreground">
<Loader2 className="mr-2 animate-spin" size={18} />
</div>
);
}
if (error) {
return (
<div className="flex h-full w-full items-center justify-center rounded-2xl border border-hairline bg-canvas-soft px-6 text-center text-sm text-destructive">
:{error}
</div>
);
}
return (
<ReactFlowProvider>
<Canvas {...props} specsByType={byType} />
</ReactFlowProvider>
);
}

View File

@@ -0,0 +1,26 @@
"use client";
/**
* 画布内共享上下文。GenericNode / ConditionEdge 由 ReactFlow 渲染,拿不到外层
* 回调,统一通过这些 context 取节点规格与编辑/删除动作。
*/
import { createContext } from "react";
import type { NodeSpecMap } from "./specs";
/** 节点类型 → 运行期规格(来自 /api/node-types) */
export const NodeSpecsContext = createContext<NodeSpecMap>({});
export type ElementActions = {
edit: (id: string) => void;
remove: (id: string) => void;
};
const noop: ElementActions = { edit: () => {}, remove: () => {} };
export const NodeActionContext = createContext<ElementActions>(noop);
export const EdgeActionContext = createContext<ElementActions>(noop);
/** 调试通话中当前激活的节点 id(画布据此高亮),无激活为 null。 */
export const ActiveNodeContext = createContext<string | null>(null);

View File

@@ -0,0 +1,135 @@
/**
* 工作流节点的前端类型与运行期规格。
*
* 节点「目录」(有哪些类型、各自的字段与约束)由后端 /api/node-types 提供,
* 通过 useNodeSpecs 拉取后用 toRuntimeSpec 转成带 React 组件(图标)的运行期规格。
* 本文件只保留:类型定义、默认图、图标/配色解析。新增节点类型改后端即可。
*/
import * as LucideIcons from "lucide-react";
import { Circle, type LucideIcon } from "lucide-react";
import type { NodeSpecDto } from "@/lib/api";
export type WorkflowNodeType = "startCall" | "agentNode" | "endCall";
export type WorkflowNodeData = {
/** 节点显示名 */
name: string;
/** 开场白(仅 startCall) */
greeting?: string;
/** 节点提示词 */
prompt?: string;
/** 允许打断(仅 agentNode) */
allowInterrupt?: boolean;
[key: string]: unknown;
};
export type FieldSpec = {
key: string;
label: string;
type: "text" | "textarea" | "switch";
required?: boolean;
};
/** 解析后的运行期节点规格(DTO + 解析出的 React 图标 + 派生句柄) */
export type RuntimeNodeSpec = {
type: string;
displayName: string;
description: string;
icon: LucideIcon;
accent: string;
addable: boolean;
/** 入边句柄(开始节点没有) */
hasTarget: boolean;
/** 出边句柄(结束节点没有) */
hasSource: boolean;
fields: FieldSpec[];
};
/** 渐变 token → CSS 变量名(图标底色用),未知配色回落到 sky */
export const ACCENT_VAR: Record<string, string> = {
mint: "--gradient-mint",
sky: "--gradient-sky",
rose: "--gradient-rose",
peach: "--gradient-peach",
lavender: "--gradient-lavender",
};
export function accentVar(accent: string): string {
return ACCENT_VAR[accent] ?? ACCENT_VAR.sky;
}
/** 按名字解析 Lucide 图标,找不到回落到 Circle(对齐 dograh resolveIcon)。 */
export function resolveIcon(name: string): LucideIcon {
const icons = LucideIcons as unknown as Record<string, LucideIcon>;
return icons[name] ?? Circle;
}
/** 后端 DTO → 运行期规格。hasTarget/hasSource 由入/出边上限派生。 */
export function toRuntimeSpec(dto: NodeSpecDto): RuntimeNodeSpec {
return {
type: dto.name,
displayName: dto.displayName,
description: dto.description,
icon: resolveIcon(dto.icon),
accent: dto.accent,
addable: dto.addable,
hasTarget: dto.constraints.maxIncoming !== 0,
hasSource: dto.constraints.maxOutgoing !== 0,
fields: dto.fields.map((f) => ({
key: f.key,
label: f.label,
type: f.type,
required: f.required,
})),
};
}
export type NodeSpecMap = Record<string, RuntimeNodeSpec>;
export type WorkflowGraph = {
nodes: Array<{
id: string;
type: WorkflowNodeType;
position: { x: number; y: number };
data: WorkflowNodeData;
}>;
edges: Array<{
id: string;
source: string;
target: string;
data?: { condition?: string; label?: string; transition_speech?: string };
}>;
viewport?: { x: number; y: number; zoom: number };
};
/** 新建工作流的默认图:开始 → 智能体 → 结束 */
export function defaultGraph(): WorkflowGraph {
return {
nodes: [
{
id: "start",
type: "startCall",
position: { x: 80, y: 160 },
data: { name: "开始", greeting: "你好,我是 AI 视频助手,有什么可以帮你?" },
},
{
id: "agent-1",
type: "agentNode",
position: { x: 400, y: 160 },
data: { name: "智能体节点", prompt: "", allowInterrupt: true },
},
{
id: "end",
type: "endCall",
position: { x: 720, y: 160 },
data: { name: "结束", prompt: "感谢你的来电,再见!" },
},
],
edges: [
{ id: "e-start-agent", source: "start", target: "agent-1", data: {} },
{ id: "e-agent-end", source: "agent-1", target: "end", data: {} },
],
};
}

View File

@@ -0,0 +1,61 @@
"use client";
/**
* 拉取并缓存 /api/node-types 节点目录(每会话一次,模块级缓存)。
* 对齐 dograh 的 useNodeSpecs:规格随后端 + 刷新生效。
*/
import { useEffect, useRef, useState } from "react";
import { nodeTypesApi, type NodeTypesResponse } from "@/lib/api";
import {
toRuntimeSpec,
type NodeSpecMap,
type RuntimeNodeSpec,
} from "./specs";
let _cache: NodeTypesResponse | null = null;
type State = {
specs: RuntimeNodeSpec[];
byType: NodeSpecMap;
loading: boolean;
error: string | null;
};
function build(data: NodeTypesResponse | null): {
specs: RuntimeNodeSpec[];
byType: NodeSpecMap;
} {
const specs = (data?.nodeTypes ?? []).map(toRuntimeSpec);
const byType: NodeSpecMap = {};
for (const spec of specs) byType[spec.type] = spec;
return { specs, byType };
}
export function useNodeSpecs(): State {
const fetched = useRef(false);
const [state, setState] = useState<State>(() => {
const { specs, byType } = build(_cache);
return { specs, byType, loading: !_cache, error: null };
});
useEffect(() => {
if (_cache || fetched.current) return;
fetched.current = true;
nodeTypesApi
.list()
.then((data) => {
_cache = data;
const { specs, byType } = build(data);
setState({ specs, byType, loading: false, error: null });
})
.catch((err: unknown) => {
const message = err instanceof Error ? err.message : String(err);
setState((s) => ({ ...s, loading: false, error: message }));
});
}, []);
return state;
}

View File

@@ -83,7 +83,10 @@ function microphoneErrorMessage(error: unknown): string {
return errorMessage(error, "无法访问麦克风。");
}
export function useVoicePreview(assistantId: string | null) {
export function useVoicePreview(
assistantId: string | null,
onNodeActive?: (nodeId: string | null) => void,
) {
const [status, setStatus] = useState<VoicePreviewStatus>("idle");
const [error, setError] = useState<string | null>(null);
const [micWarning, setMicWarning] = useState<string | null>(null);
@@ -102,6 +105,11 @@ export function useVoicePreview(assistantId: string | null) {
const localStreamRef = useRef<MediaStream | null>(null);
const startingRef = useRef(false);
const messageSeqRef = useRef(0);
// 工作流激活节点回调存进 ref,避免把它挂进 connect 依赖反复重建连接
const onNodeActiveRef = useRef(onNodeActive);
useEffect(() => {
onNodeActiveRef.current = onNodeActive;
}, [onNodeActive]);
// 枚举可用麦克风。未授权前 label 为空,授权(连接)后再刷新即可拿到名称。
const refreshDevices = useCallback(async () => {
@@ -170,6 +178,7 @@ export function useVoicePreview(assistantId: string | null) {
localStreamRef.current = null;
if (audioRef.current) audioRef.current.srcObject = null;
startingRef.current = false;
onNodeActiveRef.current?.(null);
}, []);
const disconnect = useCallback(() => {
@@ -377,6 +386,12 @@ export function useVoicePreview(assistantId: string | null) {
sequence: messageSeqRef.current,
};
setMessages((prev) => sortMessages([...prev, next]));
} else if (
msg?.type === "node-active" &&
typeof msg.nodeId === "string"
) {
// 工作流:后端报告当前激活节点,交给画布高亮
onNodeActiveRef.current?.(msg.nodeId);
}
} catch {
/* 非 JSON / 未知消息,忽略 */

View File

@@ -172,3 +172,39 @@ export type KnowledgeBase = {
export const knowledgeBasesApi = {
list: () => request<KnowledgeBase[]>("/api/knowledge-bases"),
};
// ---------- 工作流节点类型目录 ----------
export type NodeFieldSpec = {
key: string;
label: string;
type: "text" | "textarea" | "switch";
required?: boolean;
};
export type NodeConstraints = {
minIncoming?: number;
maxIncoming?: number;
minOutgoing?: number;
maxOutgoing?: number;
};
export type NodeSpecDto = {
name: string;
displayName: string;
category: string;
description: string;
icon: string;
accent: string;
addable: boolean;
constraints: NodeConstraints;
fields: NodeFieldSpec[];
};
export type NodeTypesResponse = {
specVersion: string;
nodeTypes: NodeSpecDto[];
};
export const nodeTypesApi = {
list: () => request<NodeTypesResponse>("/api/node-types"),
};