From 809b634420633cf87cee77cce67e4379731869c7 Mon Sep 17 00:00:00 2001 From: Xin Wang Date: Tue, 16 Jun 2026 16:55:51 +0800 Subject: [PATCH] Enhance AssistantConfig and pipeline for FastGPT integration - Add new fields in AssistantConfig for FastGPT connection details, including `fastgpt_api_url`, `fastgpt_api_key`, and `fastgpt_app_id`. - Update the pipeline to utilize the new FastGPT configuration, ensuring proper integration with external services. - Introduce type handling for different assistant types, including support for realtime modes and external brain management. - Refactor frontend components to include hints for FastGPT configuration inputs, improving user guidance during setup. --- backend/models.py | 7 + backend/requirements.txt | 3 + backend/schemas.py | 8 + backend/services/brains/__init__.py | 6 + backend/services/brains/base.py | 46 +++++ backend/services/brains/fastgpt_brain.py | 67 ++++++++ backend/services/brains/fastgpt_llm.py | 161 ++++++++++++++++++ backend/services/brains/internal_brain.py | 37 ++++ backend/services/brains/registry.py | 25 +++ backend/services/config_resolver.py | 5 + backend/services/pipecat/pipeline.py | 33 +++- backend/services/pipecat/service_factory.py | 10 -- .../src/components/pages/AssistantPage.tsx | 25 ++- 13 files changed, 415 insertions(+), 18 deletions(-) create mode 100644 backend/services/brains/__init__.py create mode 100644 backend/services/brains/base.py create mode 100644 backend/services/brains/fastgpt_brain.py create mode 100644 backend/services/brains/fastgpt_llm.py create mode 100644 backend/services/brains/internal_brain.py create mode 100644 backend/services/brains/registry.py diff --git a/backend/models.py b/backend/models.py index 4a9cd1c..ba8aca4 100644 --- a/backend/models.py +++ b/backend/models.py @@ -19,6 +19,8 @@ class AssistantConfig(BaseModel): """运行时配置:前端可见部分(name/prompt/...) + 服务端注入部分(*_api_key/*_base_url)。""" name: str = "未命名助手" + # prompt|workflow|dify|fastgpt|opencode;决定由哪种「大脑」驱动对话 + type: str = "prompt" greeting: str = "您好,我是 AI 视频助手,请问有什么可以帮您?" prompt: str = "你是一个有帮助的助手。" runtimeMode: RuntimeMode = "pipeline" @@ -49,6 +51,11 @@ class AssistantConfig(BaseModel): # workflow 类型:节点图(nodes/edges)。非 workflow 为空,引擎据此决定是否启用。 graph: dict = {} + # 外部托管类型(fastgpt/dify/opencode)的连接信息:context/KB/tools 由对方服务端接管。 + fastgpt_api_url: str = "" + fastgpt_api_key: str = "" + fastgpt_app_id: str = "" + # ---- 运行时连接信息(服务端注入,不来自浏览器) ---- # 为空时,service_factory 会回退到 config.py 的 .env 默认值。 llm_api_key: str = "" diff --git a/backend/requirements.txt b/backend/requirements.txt index 1280ac6..0296877 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -4,6 +4,9 @@ # openai -> OpenAI 兼容的 LLM/STT/TTS 客户端(DeepSeek、SenseVoice、CosyVoice 都走它) pipecat-ai[webrtc,websocket,silero,openai]==1.3.0 +# FastGPT 类型助手:本地 SDK(包 /api/v1/chat/completions 流式 + chatId 会话) +fastgpt-client @ file:///Users/wangx/Code/AI-VideoAssistant-Project/fastgpt-python-sdk + fastapi httpx uvicorn[standard] diff --git a/backend/schemas.py b/backend/schemas.py index 4aebe4c..51a055d 100644 --- a/backend/schemas.py +++ b/backend/schemas.py @@ -19,6 +19,11 @@ AssistantType = Literal["prompt", "workflow", "dify", "fastgpt", "opencode"] # 外部应用类型:其 config.apiKey 是该助手私有密钥,读时打码 / 写时哨兵 EXTERNAL_TYPES = {"dify", "fastgpt", "opencode"} +# 支持 realtime(语音到语音)的类型;外部托管大脑只能走 cascade。 +# 与 services.brains 各 BrainSpec.supported_runtime_modes 对齐(此处独立声明, +# 避免 HTTP schema 层为做校验而引入 pipecat 重依赖)。 +REALTIME_CAPABLE_TYPES = {"prompt", "workflow"} + class CamelModel(BaseModel): """JSON camelCase ↔ Python snake_case。protected_namespaces 关掉以允许 model_id。""" @@ -67,6 +72,9 @@ class AssistantUpsert(CamelModel): setattr(self, field, "") if "graph" not in allowed: self.graph = {} + # 外部托管大脑只能 cascade,拦住不兼容的 realtime + if self.runtime_mode == "realtime" and self.type not in REALTIME_CAPABLE_TYPES: + raise ValueError(f"类型 {self.type} 不支持 realtime 运行模式") return self diff --git a/backend/services/brains/__init__.py b/backend/services/brains/__init__.py new file mode 100644 index 0000000..8613af0 --- /dev/null +++ b/backend/services/brains/__init__.py @@ -0,0 +1,6 @@ +"""可插拔的「大脑」:把不同助手类型在运行时的差异收口到各自的 Brain 实现。""" + +from services.brains.base import Brain, BrainSpec +from services.brains.registry import SPECS, build_brain + +__all__ = ["Brain", "BrainSpec", "SPECS", "build_brain"] diff --git a/backend/services/brains/base.py b/backend/services/brains/base.py new file mode 100644 index 0000000..44bd293 --- /dev/null +++ b/backend/services/brains/base.py @@ -0,0 +1,46 @@ +"""「大脑」抽象:把不同助手类型(prompt/workflow/fastgpt/…)在运行时的差异收口。 + +cascade 管线骨架对所有类型一致(STT → LLM 槽 → TTS),变化的只有: + - 谁产出助手文本(LLM 槽里塞什么)——build_llm + - 开场白来源(静态 / 外部异步拉取)——greeting + - 对话上下文归谁维护——spec.owns_context + - 是否支持 realtime——spec.supported_runtime_modes + +阶段 1 只抽到「够 fastgpt 用」的程度;workflow 编排仍内联在 pipeline.py, +待阶段 2 再搬进 WorkflowBrain 收口。 +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Protocol, runtime_checkable + +from models import AssistantConfig +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.frame_processor import FrameProcessor + + +@dataclass(frozen=True) +class BrainSpec: + """类型元数据。单一来源,供运行时门控与上下文归属决策复用。""" + + type: str + supported_runtime_modes: frozenset[str] + # True:由本服务维护 LLMContext(prompt/workflow); + # False:上下文/知识库/工具由外部服务端接管(fastgpt/dify),本地不写 context。 + owns_context: bool + + +@runtime_checkable +class Brain(Protocol): + """每通电话 new 一个实例(可持有 chatId / 当前节点等会话状态)。""" + + spec: BrainSpec + + async def greeting(self, cfg: AssistantConfig) -> str: + """开场白。内部类型通常直接用 cfg.greeting;外部类型异步拉取后端配置。""" + ... + + def build_llm(self, cfg: AssistantConfig, context: LLMContext) -> FrameProcessor: + """返回丢进管线 LLM 槽位的帧处理器(标准 LLMService 或外部托管的伪 LLM)。""" + ... diff --git a/backend/services/brains/fastgpt_brain.py b/backend/services/brains/fastgpt_brain.py new file mode 100644 index 0000000..b3e8c96 --- /dev/null +++ b/backend/services/brains/fastgpt_brain.py @@ -0,0 +1,67 @@ +"""FastGPT 大脑:外部托管,context/KB/tools 全在 FastGPT 服务端。 + +cascade-only(realtime 不兼容外部大脑)。每通电话持有一个稳定 chatId: +greeting(get_chat_init)与后续每轮推理共用它,保证服务端上下文连续。 +""" + +from __future__ import annotations + +from typing import Any +from uuid import uuid4 + +from fastgpt_client import AsyncChatClient +from loguru import logger +from models import AssistantConfig +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.frame_processor import FrameProcessor + +from services.brains.base import BrainSpec +from services.brains.fastgpt_llm import FastGPTLLMService, normalize_base_url + + +def _extract_welcome(payload: Any) -> str: + """从 get_chat_init 响应里取开场白(welcomeText),多层兜底。""" + if not isinstance(payload, dict): + return "" + data = payload.get("data") if isinstance(payload.get("data"), dict) else payload + app = data.get("app") if isinstance(data.get("app"), dict) else {} + chat_config = app.get("chatConfig") if isinstance(app.get("chatConfig"), dict) else {} + for value in ( + chat_config.get("welcomeText"), + app.get("welcomeText"), + data.get("welcomeText"), + data.get("opener"), + app.get("opener"), + ): + if isinstance(value, str) and value.strip(): + return value.strip() + return "" + + +class FastGPTBrain: + def __init__(self): + self.spec = BrainSpec( + type="fastgpt", + supported_runtime_modes=frozenset({"pipeline"}), + owns_context=False, + ) + self._chat_id = uuid4().hex + + async def greeting(self, cfg: AssistantConfig) -> str: + """优先用 FastGPT 后台配置的开场白;无 app_id 或取不到时回退 cfg.greeting。""" + if not cfg.fastgpt_app_id: + return cfg.greeting + try: + client = AsyncChatClient( + api_key=cfg.fastgpt_api_key, + base_url=normalize_base_url(cfg.fastgpt_api_url), + ) + response = await client.get_chat_init(cfg.fastgpt_app_id, self._chat_id) + welcome = _extract_welcome(response.json()) + return welcome or cfg.greeting + except Exception as exc: # noqa: BLE001 - 拉取失败不应阻断通话 + logger.warning(f"FastGPT get_chat_init 失败,回退 cfg.greeting: {exc}") + return cfg.greeting + + def build_llm(self, cfg: AssistantConfig, context: LLMContext) -> FrameProcessor: + return FastGPTLLMService(cfg, chat_id=self._chat_id) diff --git a/backend/services/brains/fastgpt_llm.py b/backend/services/brains/fastgpt_llm.py new file mode 100644 index 0000000..f1c662d --- /dev/null +++ b/backend/services/brains/fastgpt_llm.py @@ -0,0 +1,161 @@ +"""FastGPT 作为 pipecat LLM 槽位。 + +与普通 LLM 的关键不同:context / 知识库 / 工具全在 FastGPT 服务端,靠 chatId +维持会话。所以本服务只发「最后一条 user 文本」+ 稳定 chatId,把流式 answer +事件转成 LLMTextFrame 交给下游 TTS;不消费/不依赖本地 LLMContext 的历史。 +""" + +from __future__ import annotations + +from typing import Any + +from fastgpt_client import AsyncChatClient, aiter_stream_events +from loguru import logger +from models import AssistantConfig + +from pipecat.frames.frames import ( + Frame, + LLMContextFrame, + LLMFullResponseEndFrame, + LLMFullResponseStartFrame, + LLMTextFrame, +) +from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.llm_service import LLMService +from pipecat.services.settings import LLMSettings + +# 承载回复文本的事件种类。detail=False 时 FastGPT 走 OpenAI 兼容流,文本以裸 +# data: 块下发(无 event 名 → kind="data");detail=True / 旧版则用 answer/fastAnswer。 +_ANSWER_KINDS = {"data", "answer", "fastAnswer"} + +# SDK 会自动在 base_url 后拼 /api/v1/chat/completions(并去掉末尾 /api)。 +# 用户常把「完整接口地址」填进 api_url,这里剥掉这些后缀,归一成主机根地址, +# 避免路径重复导致 404。 +_ENDPOINT_SUFFIXES = ( + "/api/v1/chat/completions", + "/v1/chat/completions", + "/chat/completions", +) + + +def normalize_base_url(url: str) -> str: + base = (url or "").strip().rstrip("/") + for suffix in _ENDPOINT_SUFFIXES: + if base.endswith(suffix): + base = base[: -len(suffix)] + break + return base or "http://localhost:3000" + + +def _last_user_text(messages: list[dict]) -> str: + """取最近一条 user 消息的纯文本(兼容多模态分片)。""" + for message in reversed(messages or []): + if message.get("role") != "user": + continue + content = message.get("content") + if isinstance(content, str): + return content + if isinstance(content, list): + return "".join( + str(part.get("text") or "") + for part in content + if isinstance(part, dict) + ) + return "" + + +def _event_text(data: Any) -> str: + """从一个流事件里取增量文本。 + + 兼容两种形态(对齐 SDK examples 的解析): + - 直接 text 字段(answer/fastAnswer 详情流); + - OpenAI 兼容块 choices[0].delta.content / message.content(detail=False)。 + """ + if not isinstance(data, dict): + return "" + + text = data.get("text") + if isinstance(text, str) and text: + return text + + choices = data.get("choices") + if not isinstance(choices, list) or not choices: + return "" + first = choices[0] if isinstance(choices[0], dict) else {} + + delta = first.get("delta") + if isinstance(delta, dict): + content = delta.get("content") + if isinstance(content, str) and content: + return content + + message = first.get("message") + if isinstance(message, dict): + content = message.get("content") + if isinstance(content, str) and content: + return content + + return "" + + +class FastGPTLLMService(LLMService): + """包 FastGPT OpenAPI 的伪 LLM 服务。""" + + def __init__(self, cfg: AssistantConfig, chat_id: str): + # FastGPT 自管 model/温度等参数,这里把所有 LLM 设置初始化为 None, + # 满足基类 validate_complete(否则启动期会报 NOT_GIVEN)。 + super().__init__( + settings=LLMSettings( + model=None, + system_instruction=None, + temperature=None, + max_tokens=None, + top_p=None, + top_k=None, + frequency_penalty=None, + presence_penalty=None, + seed=None, + filter_incomplete_user_turns=None, + user_turn_completion_config=None, + ) + ) + self._chat_id = chat_id + self._base_url = normalize_base_url(cfg.fastgpt_api_url) + self._client = AsyncChatClient( + api_key=cfg.fastgpt_api_key, + base_url=self._base_url, + ) + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if not isinstance(frame, LLMContextFrame): + await self.push_frame(frame, direction) + return + + user_text = _last_user_text(frame.context.get_messages()) + if not user_text: + return + + await self.push_frame(LLMFullResponseStartFrame()) + try: + response = await self._client.create_chat_completion( + messages=[{"role": "user", "content": user_text}], + stream=True, + chatId=self._chat_id, + detail=False, + ) + async for event in aiter_stream_events(response): + if event.kind in _ANSWER_KINDS: + text = _event_text(event.data) + if text: + await self.push_frame(LLMTextFrame(text)) + elif event.kind == "error": + logger.error(f"FastGPT 流式错误: {event.data}") + except Exception as exc: # noqa: BLE001 - 单轮失败不应中断通话 + logger.error( + f"FastGPT 调用失败: {exc} " + f"(base_url={self._base_url},拼接后应为 {self._base_url}/api/v1/chat/completions)" + ) + finally: + await self.push_frame(LLMFullResponseEndFrame()) diff --git a/backend/services/brains/internal_brain.py b/backend/services/brains/internal_brain.py new file mode 100644 index 0000000..605b026 --- /dev/null +++ b/backend/services/brains/internal_brain.py @@ -0,0 +1,37 @@ +"""内部 LLM 大脑:prompt 与 workflow。 + +二者都用本地维护的 LLMContext + OpenAI 兼容 LLM,支持 cascade 与 realtime。 +workflow 的图编排(切提示/转移工具/node-active)阶段 1 仍内联在 pipeline.py, +这里只负责提供 LLM 槽位与元数据,行为与改造前完全一致。 +""" + +from __future__ import annotations + +from models import AssistantConfig +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.frame_processor import FrameProcessor + +from services.brains.base import BrainSpec + +_CASCADE_AND_REALTIME = frozenset({"pipeline", "realtime"}) + + +class InternalBrain: + """prompt / workflow 共用。""" + + def __init__(self, brain_type: str): + self.spec = BrainSpec( + type=brain_type, + supported_runtime_modes=_CASCADE_AND_REALTIME, + owns_context=True, + ) + + async def greeting(self, cfg: AssistantConfig) -> str: + # 内部类型的开场白由 pipeline.py 现有逻辑(workflow 起始节点 / cfg.greeting)决定, + # 该方法仅为满足 Brain 协议,实际不在内部路径上被调用。 + return cfg.greeting + + def build_llm(self, cfg: AssistantConfig, context: LLMContext) -> FrameProcessor: + from services.pipecat.service_factory import create_llm + + return create_llm(cfg) diff --git a/backend/services/brains/registry.py b/backend/services/brains/registry.py new file mode 100644 index 0000000..f8a3a0e --- /dev/null +++ b/backend/services/brains/registry.py @@ -0,0 +1,25 @@ +"""类型 → Brain 工厂。新增一种大脑 = 加一个 brain 文件 + 在此注册一行。""" + +from __future__ import annotations + +from models import AssistantConfig + +from services.brains.base import Brain, BrainSpec +from services.brains.fastgpt_brain import FastGPTBrain +from services.brains.internal_brain import InternalBrain + +# 各类型的元数据(供 schema 校验 / realtime 门控复用,无需实例化 Brain)。 +SPECS: dict[str, BrainSpec] = { + "prompt": InternalBrain("prompt").spec, + "workflow": InternalBrain("workflow").spec, + "fastgpt": FastGPTBrain().spec, +} + + +def build_brain(cfg: AssistantConfig) -> Brain: + """按 cfg.type 构造每通电话的 Brain 实例(未知类型回退 prompt)。""" + if cfg.type == "fastgpt": + return FastGPTBrain() + if cfg.type in ("prompt", "workflow"): + return InternalBrain(cfg.type) + return InternalBrain("prompt") diff --git a/backend/services/config_resolver.py b/backend/services/config_resolver.py index b68997d..6599086 100644 --- a/backend/services/config_resolver.py +++ b/backend/services/config_resolver.py @@ -61,6 +61,7 @@ async def resolve_runtime_config( return AssistantConfig( name=assistant.name, + type=assistant.type, greeting=assistant.greeting, # prompt 现在是真列;外部类型由其平台编排,这里给个兜底 prompt=assistant.prompt or "你是一个有帮助的助手。", @@ -68,6 +69,10 @@ async def resolve_runtime_config( enableInterrupt=assistant.enable_interrupt, # workflow 图:仅 workflow 类型非空,引擎据此启用图驱动对话 graph=(assistant.graph or {}) if assistant.type == "workflow" else {}, + # 外部托管类型连接信息(DB 存真 key,直接注入) + fastgpt_api_url=assistant.api_url, + fastgpt_api_key=assistant.api_key, + fastgpt_app_id=assistant.app_id, # 模型/音色:模型资源中的配置优先 model=str(_value(llm_resource, "modelId", "")), asr=str(_value(stt_resource, "modelId", "")), diff --git a/backend/services/pipecat/pipeline.py b/backend/services/pipecat/pipeline.py index 3e150ae..5c37e7e 100644 --- a/backend/services/pipecat/pipeline.py +++ b/backend/services/pipecat/pipeline.py @@ -11,7 +11,12 @@ from uuid import uuid4 import config from loguru import logger from models import AssistantConfig -from services.pipecat.service_factory import create_realtime_service, create_services +from services.brains import build_brain +from services.pipecat.service_factory import ( + create_realtime_service, + create_stt, + create_tts, +) from services.workflow_engine import WorkflowEngine from pipecat.adapters.schemas.function_schema import FunctionSchema @@ -207,13 +212,23 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None: 只要有 .input() / .output() / event_handler 即可。 cfg: 助手配置(随请求内联传入)。 """ - logger.info(f"启动管线: assistant={cfg.name} mode={cfg.runtimeMode}") + logger.info(f"启动管线: assistant={cfg.name} type={cfg.type} mode={cfg.runtimeMode}") + + # 大脑:按类型决定 LLM 槽/开场白/上下文归属。每通电话一个实例(可持会话状态)。 + brain = build_brain(cfg) + if ( + cfg.runtimeMode == "realtime" + and "realtime" not in brain.spec.supported_runtime_modes + ): + logger.warning(f"类型 {cfg.type} 不支持 realtime,回退 cascade") + cfg.runtimeMode = "pipeline" if cfg.runtimeMode == "realtime": await run_realtime_pipeline(transport, cfg) return - stt, llm, tts = create_services(cfg) + stt = create_stt(cfg) + tts = create_tts(cfg) # ---- workflow 图引擎(可选)---- # 有节点图时按图驱动:开场白/系统提示来自起始节点,每轮回复后按条件路由。 @@ -240,11 +255,17 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None: logger.info( f"工作流模式启用: 起始节点={engine.name(wf_state['current'])}" ) - else: + elif brain.spec.owns_context: greeting = cfg.greeting system_content = cfg.prompt + else: + # 外部托管(fastgpt 等):开场白来自对方后台,系统提示/上下文不归我们维护 + greeting = await brain.greeting(cfg) + system_content = "" context = LLMContext(messages=[{"role": "system", "content": system_content}]) + # LLM 槽由大脑提供:内部类型=OpenAI 兼容服务;fastgpt=包 SDK 的伪 LLM。 + llm = brain.build_llm(cfg, context) user_aggregator = LLMUserAggregator( context, params=LLMUserAggregatorParams( @@ -539,7 +560,9 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None: @transport.event_handler("on_client_connected") async def on_client_connected(_transport, _client): if greeting: - context.add_message({"role": "assistant", "content": greeting}) + # 外部托管类型的上下文由对方服务端维护,开场白不写入本地 context + if brain.spec.owns_context: + context.add_message({"role": "assistant", "content": greeting}) await worker.queue_frame(TTSSpeakFrame(greeting, append_to_context=False)) # 工作流:点亮当前(开始)节点。开始节点即首个会话节点。 if workflow_active: diff --git a/backend/services/pipecat/service_factory.py b/backend/services/pipecat/service_factory.py index df05c19..227e3bf 100644 --- a/backend/services/pipecat/service_factory.py +++ b/backend/services/pipecat/service_factory.py @@ -132,16 +132,6 @@ def create_tts(cfg: AssistantConfig): ) -def create_services(cfg: AssistantConfig): - logger.info( - f"创建服务: stt={cfg.stt_interface_type}/{cfg.asr or config.STT_MODEL} " - f"llm={cfg.model or config.LLM_MODEL} " - f"tts={cfg.tts_interface_type}/{cfg.tts_model or config.TTS_MODEL} " - f"voice={cfg.voice or config.TTS_VOICE}" - ) - return create_stt(cfg), create_llm(cfg), create_tts(cfg) - - def create_realtime_service(cfg: AssistantConfig): """Create a speech-to-speech service that owns STT, LLM, and TTS.""" if cfg.realtime_interface_type == "stepfun-realtime": diff --git a/frontend/src/components/pages/AssistantPage.tsx b/frontend/src/components/pages/AssistantPage.tsx index 38e44c9..8522aac 100644 --- a/frontend/src/components/pages/AssistantPage.tsx +++ b/frontend/src/components/pages/AssistantPage.tsx @@ -1419,18 +1419,27 @@ export function AssistantPage(props: AssistantPageProps) { > updateFastGptForm("appId", value)} placeholder="请输入 FastGPT 应用 ID" /> updateFastGptForm("apiUrl", value)} - placeholder="https://api.fastgpt.in/api/v1/chat/completions" + placeholder="http://localhost:3000" /> updateFastGptForm("apiKey", value)} placeholder="请输入 FastGPT API Key" @@ -2529,12 +2538,14 @@ function SectionCard({ function InputField({ label, + hint, value, placeholder, type = "text", onChange, }: { label?: string; + hint?: string; value: string; placeholder?: string; type?: string; @@ -2543,7 +2554,10 @@ function InputField({ return (