Sync voice chatId session handling

This commit is contained in:
Xin Wang
2026-06-01 10:08:15 +08:00
parent 6df6c16e1d
commit 00c1bbdc6b
8 changed files with 228 additions and 32 deletions

View File

@@ -115,6 +115,7 @@ class AgentConfig:
system_prompt: str = "You are a helpful, friendly voice assistant."
greeting: str | None = None
greeting_mode: str = "generated"
fastgpt_reconnect_greeting: str = "欢迎回来继续对话"
response_state: ResponseStateConfig = field(default_factory=ResponseStateConfig)
@@ -130,7 +131,6 @@ class LLMConfig:
variables: dict[str, str] = field(default_factory=dict)
detail: bool = False
timeout_sec: float = 60.0
send_system_prompt: bool = False
@property
def is_fastgpt(self) -> bool:
@@ -143,7 +143,7 @@ class LLMConfig:
@property
def uses_local_context_history(self) -> bool:
"""Whether the pipeline should seed and maintain local LLM context history."""
return not self.is_fastgpt or self.send_system_prompt
return not self.is_fastgpt
@dataclass(frozen=True)
@@ -219,7 +219,7 @@ def config_from_dict(data: dict) -> EngineConfig:
raise ValueError(
"agent.greeting_mode must be one of: generated, fixed, off, fastgpt_opener"
)
response_state = ResponseStateConfig(**_dict(agent.pop("response_state")))
response_state = ResponseStateConfig(**_dict(agent.pop("response_state", None)))
if response_state.max_prefix_chars < 1:
raise ValueError("agent.response_state.max_prefix_chars must be greater than 0")
if not response_state.tag:
@@ -235,6 +235,7 @@ def config_from_dict(data: dict) -> EngineConfig:
llm["provider"] = _normalize_llm_provider(llm.get("provider", LLMConfig().provider))
if llm.get("chat_id") == "":
llm["chat_id"] = None
llm.pop("send_system_prompt", None)
if llm.get("app_id") == "":
llm["app_id"] = None
if not isinstance(llm.get("variables"), dict):

View File

@@ -165,7 +165,6 @@ class FastGPTLLMService(LLMService):
base_url: str,
chat_id: str | None = None,
app_id: str | None = None,
send_system_prompt: bool = False,
greeting_prompt: str | None = None,
timeout: float = 60.0,
settings: FastGPTLLMSettings | None = None,
@@ -178,7 +177,6 @@ class FastGPTLLMService(LLMService):
self._chat_id = chat_id or f"voice_{uuid.uuid4().hex[:16]}"
self._app_id = (app_id or "").strip()
self._send_system_prompt = send_system_prompt
self._greeting_prompt = (greeting_prompt or "你好").strip() or "你好"
self._client = AsyncChatClient(
api_key=api_key,
@@ -241,6 +239,8 @@ class FastGPTLLMService(LLMService):
return _first_nonempty_text(
chat_config.get("welcomeText"),
app_payload.get("welcomeText"),
app_payload.get("opener"),
app_payload.get("intro"),
)
async def fetch_welcome_text(self) -> str | None:
@@ -256,7 +256,7 @@ class FastGPTLLMService(LLMService):
response.raise_for_status()
text = self._welcome_text_from_init_payload(response.json())
if text:
logger.info(f"FastGPT welcomeText loaded for appId={self._app_id}")
logger.info(f"FastGPT app opener loaded for appId={self._app_id}")
return text or None
except FastGPTError as exc:
logger.warning(f"FastGPT chat init failed: {exc}")
@@ -266,6 +266,39 @@ class FastGPTLLMService(LLMService):
logger.warning(f"FastGPT chat init error: {exc}")
return None
async def has_chat_history(self) -> bool:
"""Return whether FastGPT has persisted records for this chatId."""
if not self._app_id:
return False
try:
response = await self._client.get_chat_records(
appId=self._app_id,
chatId=self._chat_id,
offset=0,
pageSize=1,
)
response.raise_for_status()
data = response.json()
records = data.get("data", {}).get("list", [])
return isinstance(records, list) and bool(records)
except FastGPTError as exc:
logger.warning(f"FastGPT chat records failed: {exc}")
except httpx.HTTPError as exc:
logger.warning(f"FastGPT chat records HTTP error: {exc}")
except Exception as exc:
logger.warning(f"FastGPT chat records error: {exc}")
return False
async def fetch_session_greeting_text(self, reconnect_greeting: str) -> str | None:
"""Use opener for a new chatId and a fixed greeting for reconnects."""
if await self.has_chat_history():
logger.info(f"FastGPT chatId={self._chat_id} has history; using reconnect greeting")
return reconnect_greeting.strip() or None
logger.info(f"FastGPT chatId={self._chat_id} has no history; using app opener")
return await self.fetch_welcome_text()
async def _close_active_response(self) -> None:
response = self._active_response
self._active_response = None
@@ -274,26 +307,15 @@ class FastGPTLLMService(LLMService):
def _build_fastgpt_messages(self, context: LLMContext) -> list[dict[str, str]]:
raw_messages = context.get_messages()
messages: list[dict[str, str]] = []
if self._send_system_prompt:
for message in raw_messages:
if not isinstance(message, dict) or message.get("role") != "system":
continue
text = _message_text(message)
if text:
messages.append({"role": "system", "content": text})
for message in reversed(raw_messages):
if not isinstance(message, dict) or message.get("role") != "user":
continue
text = _message_text(message)
if text:
messages.append({"role": "user", "content": text})
return messages
return [{"role": "user", "content": text}]
messages.append({"role": "user", "content": self._greeting_prompt})
return messages
return [{"role": "user", "content": self._greeting_prompt}]
async def _process_context(self, context: LLMContext) -> None:
messages = self._build_fastgpt_messages(context)

View File

@@ -44,6 +44,18 @@ from .transcript_stream import ProductTranscriptStreamProcessor
from .turn_start import InterruptionGateUserTurnStartStrategy
def _chat_id_from_websocket(websocket) -> str | None:
query_params = getattr(websocket, "query_params", None)
if not query_params:
return None
for name in ("chatId", "chat_id"):
value = query_params.get(name)
if isinstance(value, str) and value.strip():
return value.strip()
return None
async def run_product_voice_pipeline(websocket, config: EngineConfig) -> None:
await run_pipeline_with_serializer(
websocket,
@@ -80,7 +92,7 @@ async def run_pipeline_with_serializer(
stt = create_stt_service(config.services.stt, config.audio)
llm_config = config.services.llm
chat_id = llm_config.chat_id or f"voice_{uuid.uuid4().hex[:16]}"
chat_id = _chat_id_from_websocket(websocket) or f"voice_{uuid.uuid4().hex[:16]}"
llm = create_llm_service(
llm_config,
chat_id=chat_id,
@@ -108,6 +120,8 @@ async def run_pipeline_with_serializer(
stop_secs=config.turn.vad.stop_secs,
min_volume=config.turn.vad.min_volume,
)
# Use a simple silence-timeout strategy for streaming ASR so short Chinese
# pauses do not split one logical utterance into multiple LLM calls.
user_turn_strategies = UserTurnStrategies(
start=[
InterruptionGateUserTurnStartStrategy(
@@ -179,15 +193,15 @@ async def run_pipeline_with_serializer(
await task.queue_frames([TTSSpeakFrame(config.agent.greeting)])
elif config.agent.greeting_mode == "fastgpt_opener":
if isinstance(llm, FastGPTLLMService):
welcome = await llm.fetch_welcome_text()
welcome = await llm.fetch_session_greeting_text(
config.agent.fastgpt_reconnect_greeting
)
if welcome:
await task.queue_frames([TTSSpeakFrame(welcome)])
else:
logger.warning("FastGPT opener requested but no opener text was returned")
else:
raise RuntimeError(
"agent.greeting_mode='fastgpt_opener' requires FastGPT LLM service"
)
raise RuntimeError("agent.greeting_mode='fastgpt_opener' requires FastGPT LLM service")
elif config.agent.greeting_mode == "generated":
await task.queue_frames([LLMRunFrame()])
@@ -233,7 +247,7 @@ async def run_pipeline_with_serializer(
text = (message.content or "").strip()
if not text:
return
await task.queue_frame(
await _aggregator.push_frame(
OutputTransportMessageUrgentFrame(
message={
"type": "input.transcript.final",

View File

@@ -18,7 +18,6 @@ from pipecat.frames.frames import (
OutputAudioRawFrame,
OutputTransportMessageFrame,
OutputTransportMessageUrgentFrame,
TextFrame,
TranscriptionFrame,
UserImageRawFrame,
)
@@ -64,13 +63,15 @@ class ProductWebsocketSerializer(FrameSerializer):
timestamp=frame.timestamp,
)
if isinstance(frame, TextFrame):
return self._event("response.text.delta", text=frame.text)
# ProductTextStreamProcessor owns response.text.* events. TTS can also
# emit TextFrame subclasses internally, so serializing them here would
# make clients render duplicate assistant text.
if isinstance(frame, (OutputTransportMessageFrame, OutputTransportMessageUrgentFrame)):
if self.should_ignore_frame(frame):
return None
message = frame.message
# Allow callers to emit a named protocol event by pushing a
# transport-message frame whose payload already carries a `type`.
if isinstance(message, dict) and isinstance(message.get("type"), str):
event_type = message["type"]
payload = {k: v for k, v in message.items() if k != "type"}
@@ -99,10 +100,12 @@ class ProductWebsocketSerializer(FrameSerializer):
message_type = message.get("type")
if message_type == "session.start":
chat_id = message.get("chatId") or message.get("chat_id")
return InputTransportMessageFrame(
message={
"type": "session.started",
"protocol": self.protocol,
"chatId": chat_id if isinstance(chat_id, str) else None,
"audio": {
"encoding": "pcm_s16le",
"sample_rate": self._sample_rate,

View File

@@ -61,9 +61,8 @@ def create_llm_service(
return FastGPTLLMService(
api_key=config.api_key,
base_url=config.base_url or "http://localhost:3000",
chat_id=chat_id or config.chat_id,
chat_id=chat_id,
app_id=config.app_id,
send_system_prompt=config.send_system_prompt,
greeting_prompt=greeting_prompt,
timeout=config.timeout_sec,
settings=FastGPTLLMSettings(
@@ -107,6 +106,7 @@ def create_tts_service(config: TTSConfig, audio: AudioConfig):
volume=config.volume,
pitch=config.pitch,
timeout=config.timeout_sec,
push_stop_frames=True,
)
if config.provider in ("xfyun_super", "xfyun_super_tts"):

View File

@@ -44,6 +44,8 @@ function defaultWsUrl() {
const els = {
url: document.getElementById("ws-url"),
chatId: document.getElementById("chat-id"),
copyChatIdBtn: document.getElementById("copy-chat-id-btn"),
connectBtn: document.getElementById("connect-btn"),
statusDot: document.getElementById("status-dot"),
statusText: document.getElementById("status-text"),
@@ -69,10 +71,38 @@ const els = {
sendBtn: document.getElementById("send-btn"),
};
function generateChatId() {
if (typeof crypto !== "undefined" && crypto.randomUUID) {
return `voice_${crypto.randomUUID().replaceAll("-", "").slice(0, 16)}`;
}
return `voice_${Date.now().toString(36)}${Math.random()
.toString(36)
.slice(2, 10)}`;
}
function currentChatIdInput() {
return (els.chatId.value || "").trim();
}
function wsUrlWithChatId(chatId) {
const rawUrl = (els.url.value || "").trim();
if (!rawUrl || !chatId) return rawUrl;
try {
const url = new URL(rawUrl, location.href);
url.searchParams.set("chatId", chatId);
return url.href;
} catch (_) {
const separator = rawUrl.includes("?") ? "&" : "?";
return `${rawUrl}${separator}chatId=${encodeURIComponent(chatId)}`;
}
}
const state = {
ws: null,
connected: false,
connecting: false,
chatId: "",
audioContext: null,
micStream: null,
@@ -110,6 +140,8 @@ function setStatus(kind, text) {
}
function setConnectButton() {
els.chatId.disabled = state.connected || state.connecting;
els.copyChatIdBtn.disabled = !state.connected || !state.chatId;
if (state.connecting) {
els.connectBtn.textContent = "Connecting…";
els.connectBtn.disabled = true;
@@ -125,6 +157,26 @@ function setConnectButton() {
}
}
async function copyChatId() {
if (!state.connected || !state.chatId) return;
try {
await navigator.clipboard.writeText(state.chatId);
} catch (_) {
const selectionStart = els.chatId.selectionStart;
const selectionEnd = els.chatId.selectionEnd;
els.chatId.disabled = false;
els.chatId.select();
document.execCommand("copy");
els.chatId.setSelectionRange(selectionStart, selectionEnd);
els.chatId.disabled = true;
}
els.copyChatIdBtn.classList.add("copied");
window.setTimeout(() => {
els.copyChatIdBtn.classList.remove("copied");
}, 1200);
}
function setMicButton() {
els.micBtn.disabled = !state.connected;
els.micBtn.setAttribute("aria-pressed", state.micEnabled ? "true" : "false");
@@ -874,13 +926,17 @@ function handleEvent(event) {
async function connect() {
if (state.connected || state.connecting) return;
const url = (els.url.value || "").trim();
const inputChatId = currentChatIdInput();
const chatId = inputChatId || generateChatId();
const url = wsUrlWithChatId(chatId);
if (!url) {
setStatus("error", "Missing URL");
return;
}
state.connecting = true;
state.chatId = chatId;
els.chatId.value = chatId;
setStatus("connecting", "Connecting…");
setConnectButton();
addWsLog("system", `connecting ${url}`);
@@ -891,6 +947,8 @@ async function connect() {
} catch (err) {
console.error("AudioContext failed", err);
state.connecting = false;
state.chatId = "";
if (!inputChatId) els.chatId.value = "";
setStatus("error", "Audio init failed");
setConnectButton();
addWsLog("error", `audio init failed: ${err.message || err}`, "error");
@@ -903,6 +961,8 @@ async function connect() {
} catch (err) {
console.error("WebSocket constructor failed", err);
state.connecting = false;
state.chatId = "";
if (!inputChatId) els.chatId.value = "";
setStatus("error", "Bad URL");
setConnectButton();
addWsLog("error", `bad websocket URL: ${err.message || err}`, "error");
@@ -921,6 +981,7 @@ async function connect() {
channels: CHANNELS,
},
};
startMessage.chatId = state.chatId;
state.connecting = false;
state.connected = true;
@@ -974,6 +1035,7 @@ async function connect() {
state.ws = null;
state.connected = false;
state.connecting = false;
state.chatId = "";
setAssistantState("");
if (state.micEnabled) stopMic();
stopPlaybackQueue();
@@ -1026,6 +1088,8 @@ els.connectBtn.addEventListener("click", () => {
else connect();
});
els.copyChatIdBtn.addEventListener("click", copyChatId);
els.micBtn.addEventListener("click", async () => {
if (!state.connected) return;
els.micBtn.disabled = true;

View File

@@ -25,6 +25,34 @@
autocomplete="off"
/>
</label>
<label class="connection__field connection__field--chat">
<span>Chat ID</span>
<div class="chat-id-control">
<input
id="chat-id"
type="text"
placeholder="optional chatId"
spellcheck="false"
autocomplete="off"
/>
<button
id="copy-chat-id-btn"
class="chat-id-control__copy"
type="button"
disabled
title="Copy Chat ID"
aria-label="Copy Chat ID"
>
<svg class="copy-icon copy-icon--default" viewBox="0 0 16 16" width="14" height="14" fill="none" aria-hidden="true">
<rect x="5" y="5" width="8" height="9" rx="1.5" stroke="currentColor" stroke-width="1.4"/>
<path d="M3 11V3.5A1.5 1.5 0 0 1 4.5 2H11" stroke="currentColor" stroke-width="1.4" stroke-linecap="round"/>
</svg>
<svg class="copy-icon copy-icon--check" viewBox="0 0 16 16" width="14" height="14" fill="none" aria-hidden="true">
<path d="M3 8.5l3.5 3.5 6.5-7" stroke="currentColor" stroke-width="1.6" stroke-linecap="round" stroke-linejoin="round"/>
</svg>
</button>
</div>
</label>
<button id="connect-btn" class="btn btn--primary" type="button">
Connect
</button>

View File

@@ -304,6 +304,10 @@ body {
flex: 1;
}
.connection__field--chat {
flex: 0 1 220px;
}
.connection__field span {
font-size: 11px;
color: var(--text-dim);
@@ -324,11 +328,67 @@ body {
min-width: 0;
}
.connection__field input:disabled {
color: var(--text-dim);
background: rgba(148, 163, 184, 0.12);
cursor: not-allowed;
}
.connection__field input:focus {
border-color: var(--accent);
box-shadow: 0 0 0 3px rgba(79, 140, 255, 0.18);
}
.chat-id-control {
display: flex;
min-width: 0;
}
.chat-id-control input {
border-top-right-radius: 0;
border-bottom-right-radius: 0;
}
.chat-id-control__copy {
flex: 0 0 auto;
display: flex;
align-items: center;
justify-content: center;
width: 34px;
border: 1px solid var(--border);
border-left: 0;
border-radius: 0 10px 10px 0;
background: var(--bg-soft);
color: var(--text-dim);
padding: 0;
cursor: pointer;
transition: color 0.15s, border-color 0.15s, background 0.15s;
}
.chat-id-control__copy:disabled {
opacity: 0.45;
cursor: not-allowed;
}
.chat-id-control__copy:not(:disabled):hover {
color: var(--accent-strong);
border-color: var(--accent);
background: rgba(79, 140, 255, 0.08);
}
.chat-id-control__copy .copy-icon--check {
display: none;
color: var(--success);
}
.chat-id-control__copy.copied .copy-icon--default {
display: none;
}
.chat-id-control__copy.copied .copy-icon--check {
display: block;
}
.status {
display: flex;
align-items: center;
@@ -1013,6 +1073,10 @@ body {
align-items: stretch;
}
.connection__field--chat {
flex-basis: auto;
}
.ws-log__entry,
.ws-log__group-header {
grid-template-columns: 54px 38px minmax(0, 1fr);