Enhance pipeline execution and voice preview handling for graceful call termination

- Introduce mechanisms in the pipeline to ensure that the end call process waits for the completion of the end speech before hanging up, improving user experience during call termination.
- Update the useVoicePreview hook to handle server-initiated call endings gracefully, distinguishing between normal and error disconnections.
- Adjust TTS stop frame timeout settings to optimize the timing of call terminations, ensuring timely responses without unnecessary delays.
- Refactor related components to support the new end call logic, enhancing overall workflow management and user interaction.
This commit is contained in:
Xin Wang
2026-06-16 09:24:24 +08:00
parent c2ef76620e
commit b22a9e1045
3 changed files with 90 additions and 14 deletions

View File

@@ -18,6 +18,8 @@ from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
EndFrame,
InputTransportMessageFrame,
InterruptionFrame,
@@ -222,6 +224,10 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None:
"current": engine.start_id if workflow_active else None,
"ended": False,
"turns_in_node": 0,
# 结束流程的精确计时:只在「结束节点自己的结束语」真正说完时挂断。
"end_turn_id": None, # 结束节点回复的 turn_id(其 text_start 在 ended 之后)
"end_armed": False, # 结束语文本已生成完(已下发 data channel)
"end_speaking": False, # 结束语音频已开始播报
"end_frame_queued": False,
}
history: list[dict] = []
@@ -256,6 +262,35 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None:
assistant_aggregator = PassthroughLLMAssistantAggregator(context)
text_input = TextInputProcessor()
# 结束节点:等结束语「说完」(BotStoppedSpeakingFrame)再挂断,确保结束语的
# 文字(走 data channel)与音频都已下发,避免前端只听到声音、看不到文字。
worker_holder: dict = {}
class EndCallAfterSpeech(FrameProcessor):
async def process_frame(self, frame, direction: FrameDirection):
await super().process_frame(frame, direction)
await self.push_frame(frame, direction)
# 结束语文本生成完(end_armed)→ 其音频开始(end_speaking)→ 音频说完才挂断。
# 配对 started/stopped,避免被结束节点之前的话(如先答一句再转移)的
# stopped 事件提前触发,导致结束语被截断。
if isinstance(frame, BotStartedSpeakingFrame) and wf_state["end_armed"]:
wf_state["end_speaking"] = True
elif (
isinstance(frame, BotStoppedSpeakingFrame)
and wf_state["end_speaking"]
and not wf_state["end_frame_queued"]
and worker_holder.get("worker") is not None
):
wf_state["end_frame_queued"] = True
logger.info("结束语播报完毕,挂断通话")
# 先告知前端这是正常结束(而非连接异常),再优雅挂断
await worker_holder["worker"].queue_frame(
OutputTransportMessageUrgentFrame(
message={"type": "call-ended", "reason": "completed"}
)
)
await worker_holder["worker"].queue_frame(EndFrame())
pipeline = Pipeline(
[
transport.input(),
@@ -268,6 +303,7 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None:
# waiting for a TTS provider to emit spoken-text/timestamp frames.
assistant_aggregator,
tts,
EndCallAfterSpeech(),
transport.output(),
]
)
@@ -279,6 +315,7 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None:
),
enable_rtvi=False,
)
worker_holder["worker"] = worker
async def queue_transcript(role: str, content: str, timestamp: str) -> None:
if content:
@@ -357,7 +394,9 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None:
async def handler(params):
logger.info(f"LLM 触发转移 → {engine.name(target)}")
await speak_transition(edge)
# 进结束节点不播过渡语(结束语本身就是收尾,避免打断挂断时序)
if not engine.is_end(target):
await speak_transition(edge)
await go_to_node(target)
# 返回工具结果,pipecat 随即在新节点的提示/工具下继续生成
await params.result_callback({"status": "ok"})
@@ -381,7 +420,8 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None:
)
if target and target != wf_state["current"]:
logger.info(f"文本兜底触发转移 → {engine.name(target)}")
await speak_transition(engine.find_edge(wf_state["current"], target))
if not engine.is_end(target):
await speak_transition(engine.find_edge(wf_state["current"], target))
# 仅切换节点提示/工具,下一轮用户输入即在新节点处理
await go_to_node(target)
@@ -411,6 +451,14 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None:
@assistant_aggregator.event_handler("on_assistant_text_start")
async def on_assistant_text_start(_aggregator, turn_id, timestamp):
# 进入结束节点后,第一条「开始生成」的回复就是结束节点自己的结束语
# (其 text_start 发生在 ended 置位之后,不会误认转移前的那句)。
if (
workflow_active
and wf_state["ended"]
and wf_state["end_turn_id"] is None
):
wf_state["end_turn_id"] = turn_id
await worker.queue_frame(
OutputTransportMessageUrgentFrame(
message={
@@ -449,13 +497,11 @@ async def run_pipeline(transport, cfg: AssistantConfig) -> None:
# 正常情况下转移由 LLM 直接调用转移工具完成(go_to_node),无需这里处理。
if content and not interrupted and workflow_active:
history.append({"role": "assistant", "content": content})
if wf_state["ended"]:
# 结束节点:说完结束语后挂断,不再继续多轮对话
if not wf_state["end_frame_queued"]:
wf_state["end_frame_queued"] = True
logger.info("结束节点结束语已播报,挂断通话")
await worker.queue_frame(EndFrame())
else:
if turn_id == wf_state["end_turn_id"]:
# 结束节点的结束语文本已生成完(也已下发 data channel),武装挂断;
# 真正的 EndFrame 由 EndCallAfterSpeech 在结束语「说完」时排入。
wf_state["end_armed"] = True
elif not wf_state["ended"]:
wf_state["turns_in_node"] += 1
await fallback_route()
elif content and not interrupted:

View File

@@ -21,6 +21,11 @@ from services.pipecat.xfyun_super_tts import (
)
from services.pipecat.xfyun_tts import DEFAULT_XFYUN_TTS_URL, XfyunTTSService
# TTS「说完」判定的空闲时长:默认 3.0s 过长(导致工作流结束节点说完后还要等约 3s
# 才挂断,也拖慢日常轮次的交还)。设 1.0s 既能让结束语文字/音频送达,又更跟手。
# 流式 TTS 句间音频间隔通常远小于 1s,不会把一段多句回复误判为结束。
TTS_STOP_FRAME_TIMEOUT_S = 1.0
def _language(value: str) -> Language | None:
if not value:
@@ -107,6 +112,7 @@ def create_tts(cfg: AssistantConfig):
volume=int(cfg.tts_values.get("volume") or 50),
pitch=int(cfg.tts_values.get("pitch") or 50),
push_stop_frames=True,
stop_frame_timeout_s=TTS_STOP_FRAME_TIMEOUT_S,
)
if cfg.tts_interface_type not in {"openai-tts", "dashscope-tts"}:
raise ValueError(f"不支持的 TTS 接口类型: {cfg.tts_interface_type}")
@@ -117,6 +123,7 @@ def create_tts(cfg: AssistantConfig):
return OpenAITTSService(
api_key=cfg.tts_api_key or config.TTS_API_KEY,
base_url=cfg.tts_base_url or config.TTS_BASE_URL,
stop_frame_timeout_s=TTS_STOP_FRAME_TIMEOUT_S,
settings=OpenAITTSService.Settings(
model=cfg.tts_model or config.TTS_MODEL,
voice=voice,

View File

@@ -105,6 +105,8 @@ export function useVoicePreview(
const localStreamRef = useRef<MediaStream | null>(null);
const startingRef = useRef(false);
const messageSeqRef = useRef(0);
// 后端主动结束(工作流走到结束节点)标记:据此把随后的断开当作正常结束而非报错
const endedByServerRef = useRef(false);
// 工作流激活节点回调存进 ref,避免把它挂进 connect 依赖反复重建连接
const onNodeActiveRef = useRef(onNodeActive);
useEffect(() => {
@@ -201,6 +203,19 @@ export function useVoicePreview(
[releaseResources],
);
// 连接断开时:若是后端主动收尾(call-ended),按正常结束处理(不报错);
// 否则按异常失败处理。
const closeOnRemoteEnd = useCallback(
(message: string) => {
if (endedByServerRef.current) {
disconnect();
} else {
fail(message);
}
},
[disconnect, fail],
);
const connect = useCallback(async () => {
if (startingRef.current || pcRef.current || wsRef.current) return;
if (!assistantId) {
@@ -212,6 +227,7 @@ export function useVoicePreview(
setError(null);
setMicWarning(null);
setMessages([]); // 新会话清空上一轮聊天记录
endedByServerRef.current = false;
setStatus("connecting");
// 麦克风是可选的:获取失败时继续建立仅接收后端音频的 WebRTC 会话。
@@ -282,7 +298,7 @@ export function useVoicePreview(
if (wsRef.current === ws) fail("语音信令连接失败。");
};
ws.onclose = () => {
if (wsRef.current === ws) fail("语音信令连接已断开。");
if (wsRef.current === ws) closeOnRemoteEnd("语音信令连接已断开。");
};
// 2) 建 PeerConnection(纯 STUN,本机/局域网够用)
@@ -392,6 +408,10 @@ export function useVoicePreview(
) {
// 工作流:后端报告当前激活节点,交给画布高亮
onNodeActiveRef.current?.(msg.nodeId);
} else if (msg?.type === "call-ended") {
// 后端走到结束节点、正常收尾:随后的断开按正常结束处理,不报错
endedByServerRef.current = true;
onNodeActiveRef.current?.(null);
}
} catch {
/* 非 JSON / 未知消息,忽略 */
@@ -412,15 +432,18 @@ export function useVoicePreview(
if (pcRef.current !== pc) return;
if (pc.connectionState === "connected") setStatus("connected");
else if (pc.connectionState === "failed")
fail("WebRTC 音频连接失败。");
closeOnRemoteEnd("WebRTC 音频连接失败。");
else if (pc.connectionState === "closed")
closeOnRemoteEnd("WebRTC 音频连接已断开。");
};
pc.oniceconnectionstatechange = () => {
if (pcRef.current !== pc) return;
const st = pc.iceConnectionState;
if (st === "connected" || st === "completed") setStatus("connected");
else if (st === "failed") fail("WebRTC 音频连接失败。");
else if (st === "disconnected") fail("WebRTC 音频连接已断开。");
else if (st === "failed") closeOnRemoteEnd("WebRTC 音频连接失败。");
else if (st === "disconnected")
closeOnRemoteEnd("WebRTC 音频连接已断开。");
};
// 3) 有麦克风时双向音频;否则明确声明只接收后端音频。
@@ -453,7 +476,7 @@ export function useVoicePreview(
} finally {
startingRef.current = false;
}
}, [assistantId, fail, refreshDevices]);
}, [assistantId, fail, closeOnRemoteEnd, refreshDevices]);
// 选择麦克风:更新选择;若会话正在发送麦克风音频,则用 WebRTC replaceTrack
// 热切换轨道(无需重新协商),并把波形可视化重新接到新流。