diff --git a/backend/services/pipecat/pipeline.py b/backend/services/pipecat/pipeline.py index 8360dcf..3e150ae 100644 --- a/backend/services/pipecat/pipeline.py +++ b/backend/services/pipecat/pipeline.py @@ -18,6 +18,8 @@ from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import ( + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, EndFrame, InputTransportMessageFrame, InterruptionFrame, @@ -222,6 +224,10 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None: "current": engine.start_id if workflow_active else None, "ended": False, "turns_in_node": 0, + # 结束流程的精确计时:只在「结束节点自己的结束语」真正说完时挂断。 + "end_turn_id": None, # 结束节点回复的 turn_id(其 text_start 在 ended 之后) + "end_armed": False, # 结束语文本已生成完(已下发 data channel) + "end_speaking": False, # 结束语音频已开始播报 "end_frame_queued": False, } history: list[dict] = [] @@ -256,6 +262,35 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None: assistant_aggregator = PassthroughLLMAssistantAggregator(context) text_input = TextInputProcessor() + # 结束节点:等结束语「说完」(BotStoppedSpeakingFrame)再挂断,确保结束语的 + # 文字(走 data channel)与音频都已下发,避免前端只听到声音、看不到文字。 + worker_holder: dict = {} + + class EndCallAfterSpeech(FrameProcessor): + async def process_frame(self, frame, direction: FrameDirection): + await super().process_frame(frame, direction) + await self.push_frame(frame, direction) + # 结束语文本生成完(end_armed)→ 其音频开始(end_speaking)→ 音频说完才挂断。 + # 配对 started/stopped,避免被结束节点之前的话(如先答一句再转移)的 + # stopped 事件提前触发,导致结束语被截断。 + if isinstance(frame, BotStartedSpeakingFrame) and wf_state["end_armed"]: + wf_state["end_speaking"] = True + elif ( + isinstance(frame, BotStoppedSpeakingFrame) + and wf_state["end_speaking"] + and not wf_state["end_frame_queued"] + and worker_holder.get("worker") is not None + ): + wf_state["end_frame_queued"] = True + logger.info("结束语播报完毕,挂断通话") + # 先告知前端这是正常结束(而非连接异常),再优雅挂断 + await worker_holder["worker"].queue_frame( + OutputTransportMessageUrgentFrame( + message={"type": "call-ended", "reason": "completed"} + ) + ) + await worker_holder["worker"].queue_frame(EndFrame()) + pipeline = Pipeline( [ transport.input(), @@ -268,6 +303,7 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None: # waiting for a TTS provider to emit spoken-text/timestamp frames. assistant_aggregator, tts, + EndCallAfterSpeech(), transport.output(), ] ) @@ -279,6 +315,7 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None: ), enable_rtvi=False, ) + worker_holder["worker"] = worker async def queue_transcript(role: str, content: str, timestamp: str) -> None: if content: @@ -357,7 +394,9 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None: async def handler(params): logger.info(f"LLM 触发转移 → {engine.name(target)}") - await speak_transition(edge) + # 进结束节点不播过渡语(结束语本身就是收尾,避免打断挂断时序) + if not engine.is_end(target): + await speak_transition(edge) await go_to_node(target) # 返回工具结果,pipecat 随即在新节点的提示/工具下继续生成 await params.result_callback({"status": "ok"}) @@ -381,7 +420,8 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None: ) if target and target != wf_state["current"]: logger.info(f"文本兜底触发转移 → {engine.name(target)}") - await speak_transition(engine.find_edge(wf_state["current"], target)) + if not engine.is_end(target): + await speak_transition(engine.find_edge(wf_state["current"], target)) # 仅切换节点提示/工具,下一轮用户输入即在新节点处理 await go_to_node(target) @@ -411,6 +451,14 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None: @assistant_aggregator.event_handler("on_assistant_text_start") async def on_assistant_text_start(_aggregator, turn_id, timestamp): + # 进入结束节点后,第一条「开始生成」的回复就是结束节点自己的结束语 + # (其 text_start 发生在 ended 置位之后,不会误认转移前的那句)。 + if ( + workflow_active + and wf_state["ended"] + and wf_state["end_turn_id"] is None + ): + wf_state["end_turn_id"] = turn_id await worker.queue_frame( OutputTransportMessageUrgentFrame( message={ @@ -449,13 +497,11 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None: # 正常情况下转移由 LLM 直接调用转移工具完成(go_to_node),无需这里处理。 if content and not interrupted and workflow_active: history.append({"role": "assistant", "content": content}) - if wf_state["ended"]: - # 结束节点:说完结束语后挂断,不再继续多轮对话 - if not wf_state["end_frame_queued"]: - wf_state["end_frame_queued"] = True - logger.info("结束节点结束语已播报,挂断通话") - await worker.queue_frame(EndFrame()) - else: + if turn_id == wf_state["end_turn_id"]: + # 结束节点的结束语文本已生成完(也已下发 data channel),武装挂断; + # 真正的 EndFrame 由 EndCallAfterSpeech 在结束语「说完」时排入。 + wf_state["end_armed"] = True + elif not wf_state["ended"]: wf_state["turns_in_node"] += 1 await fallback_route() elif content and not interrupted: diff --git a/backend/services/pipecat/service_factory.py b/backend/services/pipecat/service_factory.py index fa261a9..df05c19 100644 --- a/backend/services/pipecat/service_factory.py +++ b/backend/services/pipecat/service_factory.py @@ -21,6 +21,11 @@ from services.pipecat.xfyun_super_tts import ( ) from services.pipecat.xfyun_tts import DEFAULT_XFYUN_TTS_URL, XfyunTTSService +# TTS「说完」判定的空闲时长:默认 3.0s 过长(导致工作流结束节点说完后还要等约 3s +# 才挂断,也拖慢日常轮次的交还)。设 1.0s 既能让结束语文字/音频送达,又更跟手。 +# 流式 TTS 句间音频间隔通常远小于 1s,不会把一段多句回复误判为结束。 +TTS_STOP_FRAME_TIMEOUT_S = 1.0 + def _language(value: str) -> Language | None: if not value: @@ -107,6 +112,7 @@ def create_tts(cfg: AssistantConfig): volume=int(cfg.tts_values.get("volume") or 50), pitch=int(cfg.tts_values.get("pitch") or 50), push_stop_frames=True, + stop_frame_timeout_s=TTS_STOP_FRAME_TIMEOUT_S, ) if cfg.tts_interface_type not in {"openai-tts", "dashscope-tts"}: raise ValueError(f"不支持的 TTS 接口类型: {cfg.tts_interface_type}") @@ -117,6 +123,7 @@ def create_tts(cfg: AssistantConfig): return OpenAITTSService( api_key=cfg.tts_api_key or config.TTS_API_KEY, base_url=cfg.tts_base_url or config.TTS_BASE_URL, + stop_frame_timeout_s=TTS_STOP_FRAME_TIMEOUT_S, settings=OpenAITTSService.Settings( model=cfg.tts_model or config.TTS_MODEL, voice=voice, diff --git a/frontend/src/hooks/use-voice-preview.ts b/frontend/src/hooks/use-voice-preview.ts index a28d493..93b3f0e 100644 --- a/frontend/src/hooks/use-voice-preview.ts +++ b/frontend/src/hooks/use-voice-preview.ts @@ -105,6 +105,8 @@ export function useVoicePreview( const localStreamRef = useRef(null); const startingRef = useRef(false); const messageSeqRef = useRef(0); + // 后端主动结束(工作流走到结束节点)标记:据此把随后的断开当作正常结束而非报错 + const endedByServerRef = useRef(false); // 工作流激活节点回调存进 ref,避免把它挂进 connect 依赖反复重建连接 const onNodeActiveRef = useRef(onNodeActive); useEffect(() => { @@ -201,6 +203,19 @@ export function useVoicePreview( [releaseResources], ); + // 连接断开时:若是后端主动收尾(call-ended),按正常结束处理(不报错); + // 否则按异常失败处理。 + const closeOnRemoteEnd = useCallback( + (message: string) => { + if (endedByServerRef.current) { + disconnect(); + } else { + fail(message); + } + }, + [disconnect, fail], + ); + const connect = useCallback(async () => { if (startingRef.current || pcRef.current || wsRef.current) return; if (!assistantId) { @@ -212,6 +227,7 @@ export function useVoicePreview( setError(null); setMicWarning(null); setMessages([]); // 新会话清空上一轮聊天记录 + endedByServerRef.current = false; setStatus("connecting"); // 麦克风是可选的:获取失败时继续建立仅接收后端音频的 WebRTC 会话。 @@ -282,7 +298,7 @@ export function useVoicePreview( if (wsRef.current === ws) fail("语音信令连接失败。"); }; ws.onclose = () => { - if (wsRef.current === ws) fail("语音信令连接已断开。"); + if (wsRef.current === ws) closeOnRemoteEnd("语音信令连接已断开。"); }; // 2) 建 PeerConnection(纯 STUN,本机/局域网够用) @@ -392,6 +408,10 @@ export function useVoicePreview( ) { // 工作流:后端报告当前激活节点,交给画布高亮 onNodeActiveRef.current?.(msg.nodeId); + } else if (msg?.type === "call-ended") { + // 后端走到结束节点、正常收尾:随后的断开按正常结束处理,不报错 + endedByServerRef.current = true; + onNodeActiveRef.current?.(null); } } catch { /* 非 JSON / 未知消息,忽略 */ @@ -412,15 +432,18 @@ export function useVoicePreview( if (pcRef.current !== pc) return; if (pc.connectionState === "connected") setStatus("connected"); else if (pc.connectionState === "failed") - fail("WebRTC 音频连接失败。"); + closeOnRemoteEnd("WebRTC 音频连接失败。"); + else if (pc.connectionState === "closed") + closeOnRemoteEnd("WebRTC 音频连接已断开。"); }; pc.oniceconnectionstatechange = () => { if (pcRef.current !== pc) return; const st = pc.iceConnectionState; if (st === "connected" || st === "completed") setStatus("connected"); - else if (st === "failed") fail("WebRTC 音频连接失败。"); - else if (st === "disconnected") fail("WebRTC 音频连接已断开。"); + else if (st === "failed") closeOnRemoteEnd("WebRTC 音频连接失败。"); + else if (st === "disconnected") + closeOnRemoteEnd("WebRTC 音频连接已断开。"); }; // 3) 有麦克风时双向音频;否则明确声明只接收后端音频。 @@ -453,7 +476,7 @@ export function useVoicePreview( } finally { startingRef.current = false; } - }, [assistantId, fail, refreshDevices]); + }, [assistantId, fail, closeOnRemoteEnd, refreshDevices]); // 选择麦克风:更新选择;若会话正在发送麦克风音频,则用 WebRTC replaceTrack // 热切换轨道(无需重新协商),并把波形可视化重新接到新流。