Files
ai-video-fullstack/backend/services/pipecat/pipeline.py
Xin Wang 809b634420 Enhance AssistantConfig and pipeline for FastGPT integration
- Add new fields in AssistantConfig for FastGPT connection details, including `fastgpt_api_url`, `fastgpt_api_key`, and `fastgpt_app_id`.
- Update the pipeline to utilize the new FastGPT configuration, ensuring proper integration with external services.
- Introduce type handling for different assistant types, including support for realtime modes and external brain management.
- Refactor frontend components to include hints for FastGPT configuration inputs, improving user guidance during setup.
2026-06-16 16:55:51 +08:00

647 lines
26 KiB
Python

"""管线核心:给定一个 transport + 配置,跑完整的语音闭环。
关键设计:**transport 由调用方传入**,管线本身不关心是 WebRTC 还是 WS。
这就是"同时支持多种输出"的落点——加输出方式不用动这里。
对应 dograh 的 pipeline_builder.py + run_pipeline.py(已砍掉 workflow 引擎/DB/录音/指标)。
"""
from uuid import uuid4
import config
from loguru import logger
from models import AssistantConfig
from services.brains import build_brain
from services.pipecat.service_factory import (
create_realtime_service,
create_stt,
create_tts,
)
from services.workflow_engine import WorkflowEngine
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
EndFrame,
InputTransportMessageFrame,
InterruptionFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMTextFrame,
LLMMessagesAppendFrame,
OutputTransportMessageUrgentFrame,
TextFrame,
TTSSpeakFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMAssistantAggregator,
LLMUserAggregator,
LLMUserAggregatorParams,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.turns.user_start import (
TranscriptionUserTurnStartStrategy,
VADUserTurnStartStrategy,
)
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.utils.time import time_now_iso8601
from pipecat.workers.runner import WorkerRunner
def _text_input(message) -> tuple[str, bool] | None:
"""解析现有 user-text 与 RTVI send-text 两种前端文字消息。"""
if not isinstance(message, dict):
return None
if message.get("type") == "user-text":
text = str(message.get("text") or "").strip()
return (text, True) if text else None
if message.get("type") == "send-text":
data = message.get("data")
if not isinstance(data, dict):
return None
text = str(data.get("content") or "").strip()
options = data.get("options")
run_immediately = not isinstance(options, dict) or options.get(
"run_immediately", True
)
return (text, bool(run_immediately)) if text else None
return None
class TextInputProcessor(FrameProcessor):
"""把 transport 文字消息转换成 LLM 可消费的帧。
run_immediately(默认/打断):先通过 on_text_input 事件把用户文字交给
run_pipeline 登记,再用 broadcast_interruption() 打断当前播报。新的 LLM
回复由 assistant aggregator 确认处理完 interruption 后触发。
run_immediately=False(RTVI send-text 静默追加):仅把文字写进上下文,
不打断、不触发推理。
"""
def __init__(self):
super().__init__()
# 立即触发的文字(含打断语义)走 on_text_input;静默追加另走一条事件
self._register_event_handler("on_text_input")
self._register_event_handler("on_text_append")
self._register_event_handler("on_client_ready")
async def process_frame(self, frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if not isinstance(frame, InputTransportMessageFrame):
await self.push_frame(frame, direction)
return
if isinstance(frame.message, dict) and frame.message.get("type") == "client-ready":
await self._call_event_handler("on_client_ready")
return
parsed = _text_input(frame.message)
if not parsed:
await self.push_frame(frame, direction)
return
text, run_immediately = parsed
if run_immediately:
# 先登记文字再打断。下一轮 LLM 由 assistant aggregator 在真正处理完
# InterruptionFrame 后触发,避免新回复被这次 interruption 一起取消。
await self._call_event_handler("on_text_input", text)
await self.broadcast_interruption()
else:
await self._call_event_handler("on_text_append", text)
class RealtimeTextInputProcessor(FrameProcessor):
"""Route text input directly to a realtime service without cascade semantics."""
def __init__(self):
super().__init__()
self._register_event_handler("on_text_input")
self._register_event_handler("on_text_append")
async def process_frame(self, frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if not isinstance(frame, InputTransportMessageFrame):
await self.push_frame(frame, direction)
return
parsed = _text_input(frame.message)
if not parsed:
await self.push_frame(frame, direction)
return
text, run_immediately = parsed
await self._call_event_handler(
"on_text_input" if run_immediately else "on_text_append",
text,
)
class PassthroughLLMAssistantAggregator(LLMAssistantAggregator):
"""聚合 LLM 回复进上下文,同时继续把回复帧交给下游 TTS。"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._register_event_handler("on_interruption_processed")
self._register_event_handler("on_assistant_text_start")
self._register_event_handler("on_assistant_text_delta")
self._register_event_handler("on_assistant_text_end")
self._stream_turn_id: str | None = None
self._stream_timestamp = ""
self._stream_text = ""
async def process_frame(self, frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, LLMFullResponseStartFrame):
self._stream_turn_id = uuid4().hex
self._stream_timestamp = time_now_iso8601()
self._stream_text = ""
await self._call_event_handler(
"on_assistant_text_start",
self._stream_turn_id,
self._stream_timestamp,
)
elif isinstance(frame, LLMTextFrame) and self._stream_turn_id:
self._stream_text += frame.text
await self._call_event_handler(
"on_assistant_text_delta",
self._stream_turn_id,
frame.text,
)
elif isinstance(frame, LLMFullResponseEndFrame):
await self._finish_text_stream(interrupted=False)
# LLMAssistantAggregator 默认会消费这些帧。放在 TTS 前用于中断时保存
# 已生成前缀时,必须显式透传,否则 TTS 收不到任何 LLM 回复。
if isinstance(
frame,
(LLMFullResponseStartFrame, LLMFullResponseEndFrame, TextFrame),
):
await self.push_frame(frame, direction)
elif isinstance(frame, InterruptionFrame):
await self._finish_text_stream(interrupted=True)
await self._call_event_handler("on_interruption_processed")
async def _finish_text_stream(self, *, interrupted: bool):
if not self._stream_turn_id:
return
await self._call_event_handler(
"on_assistant_text_end",
self._stream_turn_id,
self._stream_text,
interrupted,
)
self._stream_turn_id = None
self._stream_timestamp = ""
self._stream_text = ""
async def run_pipeline(transport, cfg: AssistantConfig) -> None:
"""在给定 transport 上构建并运行管线,直到连接结束。
Args:
transport: 任意 pipecat transport(WebRTC / WS / 电话…),
只要有 .input() / .output() / event_handler 即可。
cfg: 助手配置(随请求内联传入)。
"""
logger.info(f"启动管线: assistant={cfg.name} type={cfg.type} mode={cfg.runtimeMode}")
# 大脑:按类型决定 LLM 槽/开场白/上下文归属。每通电话一个实例(可持会话状态)。
brain = build_brain(cfg)
if (
cfg.runtimeMode == "realtime"
and "realtime" not in brain.spec.supported_runtime_modes
):
logger.warning(f"类型 {cfg.type} 不支持 realtime,回退 cascade")
cfg.runtimeMode = "pipeline"
if cfg.runtimeMode == "realtime":
await run_realtime_pipeline(transport, cfg)
return
stt = create_stt(cfg)
tts = create_tts(cfg)
# ---- workflow 图引擎(可选)----
# 有节点图时按图驱动:开场白/系统提示来自起始节点,每轮回复后按条件路由。
engine = WorkflowEngine(cfg.graph or {})
workflow_active = engine.has_graph()
wf_state = {
# 开始节点本身就是会话节点(有自己的 prompt,可多轮),从它开始
"current": engine.start_id if workflow_active else None,
"ended": False,
"turns_in_node": 0,
# 结束流程的精确计时:只在「结束节点自己的结束语」真正说完时挂断。
"end_turn_id": None, # 结束节点回复的 turn_id(其 text_start 在 ended 之后)
"end_armed": False, # 结束语文本已生成完(已下发 data channel)
"end_speaking": False, # 结束语音频已开始播报
"end_frame_queued": False,
}
history: list[dict] = []
# 当前节点没有可调用转移工具(全是空条件)时,才启用文本兜底路由
FALLBACK_AFTER_TURNS = 2
if workflow_active:
greeting = engine.greeting() or cfg.greeting
system_content = engine.system_prompt_for(wf_state["current"])
logger.info(
f"工作流模式启用: 起始节点={engine.name(wf_state['current'])}"
)
elif brain.spec.owns_context:
greeting = cfg.greeting
system_content = cfg.prompt
else:
# 外部托管(fastgpt 等):开场白来自对方后台,系统提示/上下文不归我们维护
greeting = await brain.greeting(cfg)
system_content = ""
context = LLMContext(messages=[{"role": "system", "content": system_content}])
# LLM 槽由大脑提供:内部类型=OpenAI 兼容服务;fastgpt=包 SDK 的伪 LLM。
llm = brain.build_llm(cfg, context)
user_aggregator = LLMUserAggregator(
context,
params=LLMUserAggregatorParams(
vad_analyzer=SileroVADAnalyzer(),
user_turn_strategies=UserTurnStrategies(
start=[
VADUserTurnStartStrategy(enable_interruptions=cfg.enableInterrupt),
TranscriptionUserTurnStartStrategy(
enable_interruptions=cfg.enableInterrupt
),
]
),
),
)
assistant_aggregator = PassthroughLLMAssistantAggregator(context)
text_input = TextInputProcessor()
# 结束节点:等结束语「说完」(BotStoppedSpeakingFrame)再挂断,确保结束语的
# 文字(走 data channel)与音频都已下发,避免前端只听到声音、看不到文字。
worker_holder: dict = {}
class EndCallAfterSpeech(FrameProcessor):
async def process_frame(self, frame, direction: FrameDirection):
await super().process_frame(frame, direction)
await self.push_frame(frame, direction)
# 结束语文本生成完(end_armed)→ 其音频开始(end_speaking)→ 音频说完才挂断。
# 配对 started/stopped,避免被结束节点之前的话(如先答一句再转移)的
# stopped 事件提前触发,导致结束语被截断。
if isinstance(frame, BotStartedSpeakingFrame) and wf_state["end_armed"]:
wf_state["end_speaking"] = True
elif (
isinstance(frame, BotStoppedSpeakingFrame)
and wf_state["end_speaking"]
and not wf_state["end_frame_queued"]
and worker_holder.get("worker") is not None
):
wf_state["end_frame_queued"] = True
logger.info("结束语播报完毕,挂断通话")
# 先告知前端这是正常结束(而非连接异常),再优雅挂断
await worker_holder["worker"].queue_frame(
OutputTransportMessageUrgentFrame(
message={"type": "call-ended", "reason": "completed"}
)
)
await worker_holder["worker"].queue_frame(EndFrame())
pipeline = Pipeline(
[
transport.input(),
text_input,
stt,
user_aggregator,
llm,
# Aggregate the streamed LLM text before TTS. On interruption,
# Pipecat commits the generated prefix immediately instead of
# waiting for a TTS provider to emit spoken-text/timestamp frames.
assistant_aggregator,
tts,
EndCallAfterSpeech(),
transport.output(),
]
)
worker = PipelineWorker(
pipeline,
params=PipelineParams(
enable_metrics=False,
),
enable_rtvi=False,
)
worker_holder["worker"] = worker
async def queue_transcript(role: str, content: str, timestamp: str) -> None:
if content:
await worker.queue_frame(
OutputTransportMessageUrgentFrame(
message={
"type": "transcript",
"role": role,
"content": content,
"timestamp": timestamp,
},
)
)
greeting_transcript_sent = False
pending_text_inputs: list[str] = []
async def emit_node_active(node_id: str | None) -> None:
"""通知前端当前激活的节点,画布据此高亮。"""
if node_id:
await worker.queue_frame(
OutputTransportMessageUrgentFrame(
message={"type": "node-active", "nodeId": node_id}
)
)
def set_system_prompt(text: str) -> None:
"""替换上下文里的系统提示(节点切换时整体替换,而非追加)。"""
messages = context.get_messages()
if messages and messages[0].get("role") == "system":
messages[0] = {"role": "system", "content": text}
else:
messages.insert(0, {"role": "system", "content": text})
def apply_node(node_id: str | None) -> None:
"""进入节点:设置系统提示 + 把出边注册为可调用的转移工具。"""
set_system_prompt(engine.system_prompt_for(node_id))
if engine.is_end(node_id):
context.set_tools() # 终止节点无工具
return
schemas = [
FunctionSchema(
name=engine.edge_fn_name(edge),
description=engine.edge_description(edge),
properties={},
required=[],
)
for edge in engine.outgoing(node_id)
]
if schemas:
context.set_tools(ToolsSchema(standard_tools=schemas))
else:
context.set_tools() # 无出边:清空工具
async def go_to_node(target: str) -> None:
"""执行转移:切当前节点、重置计数、点亮画布、设置提示/工具。
结束节点:设 ended 标记,apply_node 会清空工具,模型据结束语提示说完后,
on_assistant_text_end 里排入 EndFrame 挂断,不再多轮。
"""
wf_state["current"] = target
wf_state["turns_in_node"] = 0
if engine.is_end(target):
wf_state["ended"] = True
await emit_node_active(target)
apply_node(target)
async def speak_transition(edge: dict | None) -> None:
"""切换瞬间播报过渡语(可选),掩盖切节点/新一轮生成的延迟。不写入上下文。"""
speech = engine.edge_transition_speech(edge)
if speech:
await worker.queue_frame(TTSSpeakFrame(speech, append_to_context=False))
def make_transition_handler(edge: dict):
target = edge.get("target")
async def handler(params):
logger.info(f"LLM 触发转移 → {engine.name(target)}")
# 进结束节点不播过渡语(结束语本身就是收尾,避免打断挂断时序)
if not engine.is_end(target):
await speak_transition(edge)
await go_to_node(target)
# 返回工具结果,pipecat 随即在新节点的提示/工具下继续生成
await params.result_callback({"status": "ok"})
return handler
async def fallback_route() -> None:
"""文本兜底:模型迟迟不调用转移工具时,用一次轻量分类器判断是否转移。"""
if not workflow_active or wf_state["ended"]:
return
if wf_state["turns_in_node"] < FALLBACK_AFTER_TURNS:
return
if not engine.outgoing(wf_state["current"]):
return
target = await engine.route(
wf_state["current"],
history,
api_key=cfg.llm_api_key or config.LLM_API_KEY,
base_url=cfg.llm_base_url or config.LLM_BASE_URL,
model=cfg.model or config.LLM_MODEL,
)
if target and target != wf_state["current"]:
logger.info(f"文本兜底触发转移 → {engine.name(target)}")
if not engine.is_end(target):
await speak_transition(engine.find_edge(wf_state["current"], target))
# 仅切换节点提示/工具,下一轮用户输入即在新节点处理
await go_to_node(target)
# 把每条边注册成 LLM 可调用的转移函数(按边唯一命名,处理器全局注册一次,
# 由各节点的 context.tools 控制当前可见哪些)。
if workflow_active:
for edge in engine.edges:
if edge.get("target"):
llm.register_function(
engine.edge_fn_name(edge), make_transition_handler(edge)
)
apply_node(wf_state["current"]) # 设初始节点的提示与工具
async def append_user_text_to_context(text: str, *, run_llm: bool) -> None:
await worker.queue_frame(
LLMMessagesAppendFrame(
messages=[{"role": "user", "content": text}],
run_llm=run_llm,
)
)
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(_aggregator, _strategy, message):
if message.content:
history.append({"role": "user", "content": message.content})
await queue_transcript("user", message.content, message.timestamp)
@assistant_aggregator.event_handler("on_assistant_text_start")
async def on_assistant_text_start(_aggregator, turn_id, timestamp):
# 进入结束节点后,第一条「开始生成」的回复就是结束节点自己的结束语
# (其 text_start 发生在 ended 置位之后,不会误认转移前的那句)。
if (
workflow_active
and wf_state["ended"]
and wf_state["end_turn_id"] is None
):
wf_state["end_turn_id"] = turn_id
await worker.queue_frame(
OutputTransportMessageUrgentFrame(
message={
"type": "assistant-text-start",
"turn_id": turn_id,
"timestamp": timestamp,
}
)
)
@assistant_aggregator.event_handler("on_assistant_text_delta")
async def on_assistant_text_delta(_aggregator, turn_id, delta):
await worker.queue_frame(
OutputTransportMessageUrgentFrame(
message={
"type": "assistant-text-delta",
"turn_id": turn_id,
"delta": delta,
}
)
)
@assistant_aggregator.event_handler("on_assistant_text_end")
async def on_assistant_text_end(_aggregator, turn_id, content, interrupted):
await worker.queue_frame(
OutputTransportMessageUrgentFrame(
message={
"type": "assistant-text-end",
"turn_id": turn_id,
"content": content,
"interrupted": interrupted,
}
)
)
# 助手把话说完(未被打断)后:累加本节点轮次,必要时走文本兜底路由。
# 正常情况下转移由 LLM 直接调用转移工具完成(go_to_node),无需这里处理。
if content and not interrupted and workflow_active:
history.append({"role": "assistant", "content": content})
if turn_id == wf_state["end_turn_id"]:
# 结束节点的结束语文本已生成完(也已下发 data channel),武装挂断;
# 真正的 EndFrame 由 EndCallAfterSpeech 在结束语「说完」时排入。
wf_state["end_armed"] = True
elif not wf_state["ended"]:
wf_state["turns_in_node"] += 1
await fallback_route()
elif content and not interrupted:
history.append({"role": "assistant", "content": content})
@text_input.event_handler("on_text_input")
async def on_text_input(_processor, text):
pending_text_inputs.append(text)
history.append({"role": "user", "content": text})
# 前端显示不依赖 interruption 后续事件,必须在打断前先排入发送队列。
await queue_transcript("user", text, time_now_iso8601())
@assistant_aggregator.event_handler("on_interruption_processed")
async def on_interruption_processed(_aggregator):
if not pending_text_inputs:
return
text = pending_text_inputs.pop(0)
# assistant aggregator 已处理完 interruption,现在再启动下一轮 LLM。
await append_user_text_to_context(text, run_llm=True)
@text_input.event_handler("on_text_append")
async def on_text_append(_processor, text):
# 静默追加:写进上下文但不打断、不触发推理;transcript 照常上报
history.append({"role": "user", "content": text})
await queue_transcript("user", text, time_now_iso8601())
await append_user_text_to_context(text, run_llm=False)
@text_input.event_handler("on_client_ready")
async def on_client_ready(_processor):
nonlocal greeting_transcript_sent
if greeting and not greeting_transcript_sent:
greeting_transcript_sent = True
await queue_transcript("assistant", greeting, time_now_iso8601())
@transport.event_handler("on_client_connected")
async def on_client_connected(_transport, _client):
if greeting:
# 外部托管类型的上下文由对方服务端维护,开场白不写入本地 context
if brain.spec.owns_context:
context.add_message({"role": "assistant", "content": greeting})
await worker.queue_frame(TTSSpeakFrame(greeting, append_to_context=False))
# 工作流:点亮当前(开始)节点。开始节点即首个会话节点。
if workflow_active:
await emit_node_active(wf_state["current"])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(_transport, _client):
logger.info("对端断开,结束管线")
await worker.queue_frame(EndFrame())
runner = WorkerRunner(handle_sigint=False)
await runner.add_workers(worker)
await runner.run()
logger.info("管线已结束")
async def run_realtime_pipeline(transport, cfg: AssistantConfig) -> None:
"""Run a speech-to-speech model that owns ASR, reasoning, and synthesis."""
realtime = create_realtime_service(cfg)
text_input = RealtimeTextInputProcessor()
pipeline = Pipeline(
[
transport.input(),
text_input,
realtime,
transport.output(),
]
)
worker = PipelineWorker(
pipeline,
params=PipelineParams(
enable_metrics=False,
audio_in_sample_rate=int(
cfg.realtime_values.get("inputSampleRate") or 24000
),
audio_out_sample_rate=int(
cfg.realtime_values.get("outputSampleRate") or 24000
),
),
enable_rtvi=False,
)
async def queue_transcript(role: str, content: str) -> None:
if content:
await worker.queue_frame(
OutputTransportMessageUrgentFrame(
message={
"type": "transcript",
"role": role,
"content": content,
"timestamp": time_now_iso8601(),
},
)
)
@text_input.event_handler("on_text_input")
async def on_text_input(_processor, text):
await queue_transcript("user", text)
await realtime.interrupt()
await realtime.send_text(text, run_immediately=True)
@text_input.event_handler("on_text_append")
async def on_text_append(_processor, text):
await queue_transcript("user", text)
await realtime.send_text(text, run_immediately=False)
@transport.event_handler("on_client_connected")
async def on_client_connected(_transport, _client):
if cfg.greeting:
await realtime.speak(cfg.greeting)
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(_transport, _client):
logger.info("Realtime 对端断开,结束管线")
await worker.queue_frame(EndFrame())
runner = WorkerRunner(handle_sigint=False)
await runner.add_workers(worker)
await runner.run()
logger.info("Realtime 管线已结束")