From 00c1bbdc6bde077f511ec4672efb35281bdba978 Mon Sep 17 00:00:00 2001 From: Xin Wang Date: Mon, 1 Jun 2026 10:08:15 +0800 Subject: [PATCH] Sync voice chatId session handling --- src/voice/config.py | 7 ++-- src/voice/fastgpt_llm.py | 54 ++++++++++++++++++++--------- src/voice/pipeline.py | 26 ++++++++++---- src/voice/protocol.py | 11 +++--- src/voice/services.py | 4 +-- static/voice-demo/app.js | 66 +++++++++++++++++++++++++++++++++++- static/voice-demo/index.html | 28 +++++++++++++++ static/voice-demo/styles.css | 64 ++++++++++++++++++++++++++++++++++ 8 files changed, 228 insertions(+), 32 deletions(-) diff --git a/src/voice/config.py b/src/voice/config.py index 8807e6e..d451da9 100644 --- a/src/voice/config.py +++ b/src/voice/config.py @@ -115,6 +115,7 @@ class AgentConfig: system_prompt: str = "You are a helpful, friendly voice assistant." greeting: str | None = None greeting_mode: str = "generated" + fastgpt_reconnect_greeting: str = "欢迎回来继续对话" response_state: ResponseStateConfig = field(default_factory=ResponseStateConfig) @@ -130,7 +131,6 @@ class LLMConfig: variables: dict[str, str] = field(default_factory=dict) detail: bool = False timeout_sec: float = 60.0 - send_system_prompt: bool = False @property def is_fastgpt(self) -> bool: @@ -143,7 +143,7 @@ class LLMConfig: @property def uses_local_context_history(self) -> bool: """Whether the pipeline should seed and maintain local LLM context history.""" - return not self.is_fastgpt or self.send_system_prompt + return not self.is_fastgpt @dataclass(frozen=True) @@ -219,7 +219,7 @@ def config_from_dict(data: dict) -> EngineConfig: raise ValueError( "agent.greeting_mode must be one of: generated, fixed, off, fastgpt_opener" ) - response_state = ResponseStateConfig(**_dict(agent.pop("response_state"))) + response_state = ResponseStateConfig(**_dict(agent.pop("response_state", None))) if response_state.max_prefix_chars < 1: raise ValueError("agent.response_state.max_prefix_chars must be greater than 0") if not response_state.tag: @@ -235,6 +235,7 @@ def config_from_dict(data: dict) -> EngineConfig: llm["provider"] = _normalize_llm_provider(llm.get("provider", LLMConfig().provider)) if llm.get("chat_id") == "": llm["chat_id"] = None + llm.pop("send_system_prompt", None) if llm.get("app_id") == "": llm["app_id"] = None if not isinstance(llm.get("variables"), dict): diff --git a/src/voice/fastgpt_llm.py b/src/voice/fastgpt_llm.py index b05a0f2..73f4e2b 100644 --- a/src/voice/fastgpt_llm.py +++ b/src/voice/fastgpt_llm.py @@ -165,7 +165,6 @@ class FastGPTLLMService(LLMService): base_url: str, chat_id: str | None = None, app_id: str | None = None, - send_system_prompt: bool = False, greeting_prompt: str | None = None, timeout: float = 60.0, settings: FastGPTLLMSettings | None = None, @@ -178,7 +177,6 @@ class FastGPTLLMService(LLMService): self._chat_id = chat_id or f"voice_{uuid.uuid4().hex[:16]}" self._app_id = (app_id or "").strip() - self._send_system_prompt = send_system_prompt self._greeting_prompt = (greeting_prompt or "你好").strip() or "你好" self._client = AsyncChatClient( api_key=api_key, @@ -241,6 +239,8 @@ class FastGPTLLMService(LLMService): return _first_nonempty_text( chat_config.get("welcomeText"), app_payload.get("welcomeText"), + app_payload.get("opener"), + app_payload.get("intro"), ) async def fetch_welcome_text(self) -> str | None: @@ -256,7 +256,7 @@ class FastGPTLLMService(LLMService): response.raise_for_status() text = self._welcome_text_from_init_payload(response.json()) if text: - logger.info(f"FastGPT welcomeText loaded for appId={self._app_id}") + logger.info(f"FastGPT app opener loaded for appId={self._app_id}") return text or None except FastGPTError as exc: logger.warning(f"FastGPT chat init failed: {exc}") @@ -266,6 +266,39 @@ class FastGPTLLMService(LLMService): logger.warning(f"FastGPT chat init error: {exc}") return None + async def has_chat_history(self) -> bool: + """Return whether FastGPT has persisted records for this chatId.""" + if not self._app_id: + return False + + try: + response = await self._client.get_chat_records( + appId=self._app_id, + chatId=self._chat_id, + offset=0, + pageSize=1, + ) + response.raise_for_status() + data = response.json() + records = data.get("data", {}).get("list", []) + return isinstance(records, list) and bool(records) + except FastGPTError as exc: + logger.warning(f"FastGPT chat records failed: {exc}") + except httpx.HTTPError as exc: + logger.warning(f"FastGPT chat records HTTP error: {exc}") + except Exception as exc: + logger.warning(f"FastGPT chat records error: {exc}") + return False + + async def fetch_session_greeting_text(self, reconnect_greeting: str) -> str | None: + """Use opener for a new chatId and a fixed greeting for reconnects.""" + if await self.has_chat_history(): + logger.info(f"FastGPT chatId={self._chat_id} has history; using reconnect greeting") + return reconnect_greeting.strip() or None + + logger.info(f"FastGPT chatId={self._chat_id} has no history; using app opener") + return await self.fetch_welcome_text() + async def _close_active_response(self) -> None: response = self._active_response self._active_response = None @@ -274,26 +307,15 @@ class FastGPTLLMService(LLMService): def _build_fastgpt_messages(self, context: LLMContext) -> list[dict[str, str]]: raw_messages = context.get_messages() - messages: list[dict[str, str]] = [] - - if self._send_system_prompt: - for message in raw_messages: - if not isinstance(message, dict) or message.get("role") != "system": - continue - text = _message_text(message) - if text: - messages.append({"role": "system", "content": text}) for message in reversed(raw_messages): if not isinstance(message, dict) or message.get("role") != "user": continue text = _message_text(message) if text: - messages.append({"role": "user", "content": text}) - return messages + return [{"role": "user", "content": text}] - messages.append({"role": "user", "content": self._greeting_prompt}) - return messages + return [{"role": "user", "content": self._greeting_prompt}] async def _process_context(self, context: LLMContext) -> None: messages = self._build_fastgpt_messages(context) diff --git a/src/voice/pipeline.py b/src/voice/pipeline.py index a9b0bfe..1d25cb5 100644 --- a/src/voice/pipeline.py +++ b/src/voice/pipeline.py @@ -44,6 +44,18 @@ from .transcript_stream import ProductTranscriptStreamProcessor from .turn_start import InterruptionGateUserTurnStartStrategy +def _chat_id_from_websocket(websocket) -> str | None: + query_params = getattr(websocket, "query_params", None) + if not query_params: + return None + + for name in ("chatId", "chat_id"): + value = query_params.get(name) + if isinstance(value, str) and value.strip(): + return value.strip() + return None + + async def run_product_voice_pipeline(websocket, config: EngineConfig) -> None: await run_pipeline_with_serializer( websocket, @@ -80,7 +92,7 @@ async def run_pipeline_with_serializer( stt = create_stt_service(config.services.stt, config.audio) llm_config = config.services.llm - chat_id = llm_config.chat_id or f"voice_{uuid.uuid4().hex[:16]}" + chat_id = _chat_id_from_websocket(websocket) or f"voice_{uuid.uuid4().hex[:16]}" llm = create_llm_service( llm_config, chat_id=chat_id, @@ -108,6 +120,8 @@ async def run_pipeline_with_serializer( stop_secs=config.turn.vad.stop_secs, min_volume=config.turn.vad.min_volume, ) + # Use a simple silence-timeout strategy for streaming ASR so short Chinese + # pauses do not split one logical utterance into multiple LLM calls. user_turn_strategies = UserTurnStrategies( start=[ InterruptionGateUserTurnStartStrategy( @@ -179,15 +193,15 @@ async def run_pipeline_with_serializer( await task.queue_frames([TTSSpeakFrame(config.agent.greeting)]) elif config.agent.greeting_mode == "fastgpt_opener": if isinstance(llm, FastGPTLLMService): - welcome = await llm.fetch_welcome_text() + welcome = await llm.fetch_session_greeting_text( + config.agent.fastgpt_reconnect_greeting + ) if welcome: await task.queue_frames([TTSSpeakFrame(welcome)]) else: logger.warning("FastGPT opener requested but no opener text was returned") else: - raise RuntimeError( - "agent.greeting_mode='fastgpt_opener' requires FastGPT LLM service" - ) + raise RuntimeError("agent.greeting_mode='fastgpt_opener' requires FastGPT LLM service") elif config.agent.greeting_mode == "generated": await task.queue_frames([LLMRunFrame()]) @@ -233,7 +247,7 @@ async def run_pipeline_with_serializer( text = (message.content or "").strip() if not text: return - await task.queue_frame( + await _aggregator.push_frame( OutputTransportMessageUrgentFrame( message={ "type": "input.transcript.final", diff --git a/src/voice/protocol.py b/src/voice/protocol.py index 6ee3633..9bca779 100644 --- a/src/voice/protocol.py +++ b/src/voice/protocol.py @@ -18,7 +18,6 @@ from pipecat.frames.frames import ( OutputAudioRawFrame, OutputTransportMessageFrame, OutputTransportMessageUrgentFrame, - TextFrame, TranscriptionFrame, UserImageRawFrame, ) @@ -64,13 +63,15 @@ class ProductWebsocketSerializer(FrameSerializer): timestamp=frame.timestamp, ) - if isinstance(frame, TextFrame): - return self._event("response.text.delta", text=frame.text) - + # ProductTextStreamProcessor owns response.text.* events. TTS can also + # emit TextFrame subclasses internally, so serializing them here would + # make clients render duplicate assistant text. if isinstance(frame, (OutputTransportMessageFrame, OutputTransportMessageUrgentFrame)): if self.should_ignore_frame(frame): return None message = frame.message + # Allow callers to emit a named protocol event by pushing a + # transport-message frame whose payload already carries a `type`. if isinstance(message, dict) and isinstance(message.get("type"), str): event_type = message["type"] payload = {k: v for k, v in message.items() if k != "type"} @@ -99,10 +100,12 @@ class ProductWebsocketSerializer(FrameSerializer): message_type = message.get("type") if message_type == "session.start": + chat_id = message.get("chatId") or message.get("chat_id") return InputTransportMessageFrame( message={ "type": "session.started", "protocol": self.protocol, + "chatId": chat_id if isinstance(chat_id, str) else None, "audio": { "encoding": "pcm_s16le", "sample_rate": self._sample_rate, diff --git a/src/voice/services.py b/src/voice/services.py index ae8652c..8fbd916 100644 --- a/src/voice/services.py +++ b/src/voice/services.py @@ -61,9 +61,8 @@ def create_llm_service( return FastGPTLLMService( api_key=config.api_key, base_url=config.base_url or "http://localhost:3000", - chat_id=chat_id or config.chat_id, + chat_id=chat_id, app_id=config.app_id, - send_system_prompt=config.send_system_prompt, greeting_prompt=greeting_prompt, timeout=config.timeout_sec, settings=FastGPTLLMSettings( @@ -107,6 +106,7 @@ def create_tts_service(config: TTSConfig, audio: AudioConfig): volume=config.volume, pitch=config.pitch, timeout=config.timeout_sec, + push_stop_frames=True, ) if config.provider in ("xfyun_super", "xfyun_super_tts"): diff --git a/static/voice-demo/app.js b/static/voice-demo/app.js index ea6010a..defd4bd 100644 --- a/static/voice-demo/app.js +++ b/static/voice-demo/app.js @@ -44,6 +44,8 @@ function defaultWsUrl() { const els = { url: document.getElementById("ws-url"), + chatId: document.getElementById("chat-id"), + copyChatIdBtn: document.getElementById("copy-chat-id-btn"), connectBtn: document.getElementById("connect-btn"), statusDot: document.getElementById("status-dot"), statusText: document.getElementById("status-text"), @@ -69,10 +71,38 @@ const els = { sendBtn: document.getElementById("send-btn"), }; +function generateChatId() { + if (typeof crypto !== "undefined" && crypto.randomUUID) { + return `voice_${crypto.randomUUID().replaceAll("-", "").slice(0, 16)}`; + } + return `voice_${Date.now().toString(36)}${Math.random() + .toString(36) + .slice(2, 10)}`; +} + +function currentChatIdInput() { + return (els.chatId.value || "").trim(); +} + +function wsUrlWithChatId(chatId) { + const rawUrl = (els.url.value || "").trim(); + if (!rawUrl || !chatId) return rawUrl; + + try { + const url = new URL(rawUrl, location.href); + url.searchParams.set("chatId", chatId); + return url.href; + } catch (_) { + const separator = rawUrl.includes("?") ? "&" : "?"; + return `${rawUrl}${separator}chatId=${encodeURIComponent(chatId)}`; + } +} + const state = { ws: null, connected: false, connecting: false, + chatId: "", audioContext: null, micStream: null, @@ -110,6 +140,8 @@ function setStatus(kind, text) { } function setConnectButton() { + els.chatId.disabled = state.connected || state.connecting; + els.copyChatIdBtn.disabled = !state.connected || !state.chatId; if (state.connecting) { els.connectBtn.textContent = "Connecting…"; els.connectBtn.disabled = true; @@ -125,6 +157,26 @@ function setConnectButton() { } } +async function copyChatId() { + if (!state.connected || !state.chatId) return; + try { + await navigator.clipboard.writeText(state.chatId); + } catch (_) { + const selectionStart = els.chatId.selectionStart; + const selectionEnd = els.chatId.selectionEnd; + els.chatId.disabled = false; + els.chatId.select(); + document.execCommand("copy"); + els.chatId.setSelectionRange(selectionStart, selectionEnd); + els.chatId.disabled = true; + } + + els.copyChatIdBtn.classList.add("copied"); + window.setTimeout(() => { + els.copyChatIdBtn.classList.remove("copied"); + }, 1200); +} + function setMicButton() { els.micBtn.disabled = !state.connected; els.micBtn.setAttribute("aria-pressed", state.micEnabled ? "true" : "false"); @@ -874,13 +926,17 @@ function handleEvent(event) { async function connect() { if (state.connected || state.connecting) return; - const url = (els.url.value || "").trim(); + const inputChatId = currentChatIdInput(); + const chatId = inputChatId || generateChatId(); + const url = wsUrlWithChatId(chatId); if (!url) { setStatus("error", "Missing URL"); return; } state.connecting = true; + state.chatId = chatId; + els.chatId.value = chatId; setStatus("connecting", "Connecting…"); setConnectButton(); addWsLog("system", `connecting ${url}`); @@ -891,6 +947,8 @@ async function connect() { } catch (err) { console.error("AudioContext failed", err); state.connecting = false; + state.chatId = ""; + if (!inputChatId) els.chatId.value = ""; setStatus("error", "Audio init failed"); setConnectButton(); addWsLog("error", `audio init failed: ${err.message || err}`, "error"); @@ -903,6 +961,8 @@ async function connect() { } catch (err) { console.error("WebSocket constructor failed", err); state.connecting = false; + state.chatId = ""; + if (!inputChatId) els.chatId.value = ""; setStatus("error", "Bad URL"); setConnectButton(); addWsLog("error", `bad websocket URL: ${err.message || err}`, "error"); @@ -921,6 +981,7 @@ async function connect() { channels: CHANNELS, }, }; + startMessage.chatId = state.chatId; state.connecting = false; state.connected = true; @@ -974,6 +1035,7 @@ async function connect() { state.ws = null; state.connected = false; state.connecting = false; + state.chatId = ""; setAssistantState(""); if (state.micEnabled) stopMic(); stopPlaybackQueue(); @@ -1026,6 +1088,8 @@ els.connectBtn.addEventListener("click", () => { else connect(); }); +els.copyChatIdBtn.addEventListener("click", copyChatId); + els.micBtn.addEventListener("click", async () => { if (!state.connected) return; els.micBtn.disabled = true; diff --git a/static/voice-demo/index.html b/static/voice-demo/index.html index 0d0f76a..677f03a 100644 --- a/static/voice-demo/index.html +++ b/static/voice-demo/index.html @@ -25,6 +25,34 @@ autocomplete="off" /> + diff --git a/static/voice-demo/styles.css b/static/voice-demo/styles.css index 0854611..11abd69 100644 --- a/static/voice-demo/styles.css +++ b/static/voice-demo/styles.css @@ -304,6 +304,10 @@ body { flex: 1; } +.connection__field--chat { + flex: 0 1 220px; +} + .connection__field span { font-size: 11px; color: var(--text-dim); @@ -324,11 +328,67 @@ body { min-width: 0; } +.connection__field input:disabled { + color: var(--text-dim); + background: rgba(148, 163, 184, 0.12); + cursor: not-allowed; +} + .connection__field input:focus { border-color: var(--accent); box-shadow: 0 0 0 3px rgba(79, 140, 255, 0.18); } +.chat-id-control { + display: flex; + min-width: 0; +} + +.chat-id-control input { + border-top-right-radius: 0; + border-bottom-right-radius: 0; +} + +.chat-id-control__copy { + flex: 0 0 auto; + display: flex; + align-items: center; + justify-content: center; + width: 34px; + border: 1px solid var(--border); + border-left: 0; + border-radius: 0 10px 10px 0; + background: var(--bg-soft); + color: var(--text-dim); + padding: 0; + cursor: pointer; + transition: color 0.15s, border-color 0.15s, background 0.15s; +} + +.chat-id-control__copy:disabled { + opacity: 0.45; + cursor: not-allowed; +} + +.chat-id-control__copy:not(:disabled):hover { + color: var(--accent-strong); + border-color: var(--accent); + background: rgba(79, 140, 255, 0.08); +} + +.chat-id-control__copy .copy-icon--check { + display: none; + color: var(--success); +} + +.chat-id-control__copy.copied .copy-icon--default { + display: none; +} + +.chat-id-control__copy.copied .copy-icon--check { + display: block; +} + .status { display: flex; align-items: center; @@ -1013,6 +1073,10 @@ body { align-items: stretch; } + .connection__field--chat { + flex-basis: auto; + } + .ws-log__entry, .ws-log__group-header { grid-template-columns: 54px 38px minmax(0, 1fr);