diff --git a/src/voice/config.py b/src/voice/config.py index d451da9..d1d71f3 100644 --- a/src/voice/config.py +++ b/src/voice/config.py @@ -131,6 +131,7 @@ class LLMConfig: variables: dict[str, str] = field(default_factory=dict) detail: bool = False timeout_sec: float = 60.0 + image_input_mode: str = "base64" @property def is_fastgpt(self) -> bool: @@ -236,6 +237,15 @@ def config_from_dict(data: dict) -> EngineConfig: if llm.get("chat_id") == "": llm["chat_id"] = None llm.pop("send_system_prompt", None) + image_input_mode = str( + llm.get("image_input_mode", LLMConfig().image_input_mode) + ).strip().lower() + if image_input_mode not in {"base64", "upload"}: + raise ValueError( + "services.llm.image_input_mode must be 'base64' or 'upload', " + f"got {llm.get('image_input_mode')!r}" + ) + llm["image_input_mode"] = image_input_mode if llm.get("app_id") == "": llm["app_id"] = None if not isinstance(llm.get("variables"), dict): diff --git a/src/voice/fastgpt_llm.py b/src/voice/fastgpt_llm.py index 73f4e2b..2b66266 100644 --- a/src/voice/fastgpt_llm.py +++ b/src/voice/fastgpt_llm.py @@ -1,5 +1,10 @@ from __future__ import annotations +import asyncio +import base64 +import binascii +import os +import tempfile import uuid from dataclasses import dataclass, field from typing import Any @@ -19,6 +24,7 @@ from pipecat.frames.frames import ( LLMFullResponseStartFrame, LLMTextFrame, OutputTransportMessageFrame, + OutputTransportMessageUrgentFrame, ) from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.frame_processor import FrameDirection @@ -129,6 +135,50 @@ def _interactive_spoken_prompt(event: FastGPTInteractiveEvent) -> str: return "请继续。" +IMAGE_INPUT_MODE_BASE64 = "base64" +IMAGE_INPUT_MODE_UPLOAD = "upload" +SUPPORTED_IMAGE_INPUT_MODES = frozenset({IMAGE_INPUT_MODE_BASE64, IMAGE_INPUT_MODE_UPLOAD}) + +_MIME_TO_EXT = { + "image/jpeg": ".jpg", + "image/png": ".png", + "image/webp": ".webp", +} + + +def _message_has_image(message: dict[str, Any]) -> bool: + content = message.get("content") + if not isinstance(content, list): + return False + return any( + isinstance(part, dict) and part.get("type") == "image_url" + for part in content + ) + + +def _redact_messages_for_log(messages: list[dict[str, Any]]) -> list[dict[str, Any]]: + """Replace base64 image data URLs with a short placeholder for logging.""" + redacted: list[dict[str, Any]] = [] + for message in messages: + content = message.get("content") + if not isinstance(content, list): + redacted.append(message) + continue + parts: list[Any] = [] + for part in content: + if ( + isinstance(part, dict) + and part.get("type") == "image_url" + and isinstance(part.get("image_url"), dict) + ): + url = str(part["image_url"].get("url") or "") + parts.append({"type": "image_url", "image_url": {"url": f"<{len(url)} chars>"}}) + else: + parts.append(part) + redacted.append({**message, "content": parts}) + return redacted + + @dataclass class FastGPTLLMSettings(LLMSettings): variables: dict[str, Any] = field(default_factory=dict) @@ -167,6 +217,7 @@ class FastGPTLLMService(LLMService): app_id: str | None = None, greeting_prompt: str | None = None, timeout: float = 60.0, + image_input_mode: str = IMAGE_INPUT_MODE_BASE64, settings: FastGPTLLMSettings | None = None, **kwargs, ) -> None: @@ -185,6 +236,20 @@ class FastGPTLLMService(LLMService): ) self._active_response = None + mode = (image_input_mode or IMAGE_INPUT_MODE_BASE64).strip().lower() + if mode not in SUPPORTED_IMAGE_INPUT_MODES: + raise ValueError( + f"Unsupported image_input_mode {image_input_mode!r}; " + f"expected one of {sorted(SUPPORTED_IMAGE_INPUT_MODES)}" + ) + if mode == IMAGE_INPUT_MODE_UPLOAD and not self._app_id: + logger.warning( + "FastGPT image_input_mode='upload' requires app_id; " + "falling back to inline base64" + ) + mode = IMAGE_INPUT_MODE_BASE64 + self._image_input_mode = mode + @property def app_id(self) -> str: return self._app_id @@ -305,26 +370,114 @@ class FastGPTLLMService(LLMService): if response is not None: await response.aclose() - def _build_fastgpt_messages(self, context: LLMContext) -> list[dict[str, str]]: + def _build_fastgpt_messages(self, context: LLMContext) -> list[dict[str, Any]]: raw_messages = context.get_messages() for message in reversed(raw_messages): if not isinstance(message, dict) or message.get("role") != "user": continue + if _message_has_image(message): + # Multimodal turn: forward the OpenAI-style content list as-is + # (text parts + image_url with a base64 data URL). FastGPT's + # /chat/completions accepts this directly. + return [{"role": "user", "content": message["content"]}] text = _message_text(message) if text: return [{"role": "user", "content": text}] return [{"role": "user", "content": self._greeting_prompt}] + async def _resolve_image_inputs( + self, messages: list[dict[str, Any]] + ) -> list[dict[str, Any]]: + """In ``upload`` mode, replace inline base64 image data URLs with uploaded URLs. + + In ``base64`` mode the messages are returned untouched (inline data URLs). + New message/content objects are built so the shared ``LLMContext`` messages + are never mutated. + """ + if self._image_input_mode != IMAGE_INPUT_MODE_UPLOAD: + return messages + + resolved: list[dict[str, Any]] = [] + for message in messages: + content = message.get("content") + if not isinstance(content, list): + resolved.append(message) + continue + + new_content: list[Any] = [] + for part in content: + url = ( + part.get("image_url", {}).get("url") + if isinstance(part, dict) and part.get("type") == "image_url" + else None + ) + if isinstance(url, str) and url.startswith("data:image/"): + uploaded = await self._upload_data_url(url) + new_content.append( + {"type": "image_url", "image_url": {"url": uploaded}} + ) + else: + new_content.append(part) + resolved.append({**message, "content": new_content}) + + return resolved + + async def _upload_data_url(self, data_url: str) -> str: + """Upload a ``data:image/...;base64,...`` URL via FastGPT and return its URL. + + Falls back to the original data URL if parsing or upload fails so the turn + still proceeds with inline base64. + """ + header, _, payload = data_url.partition(",") + mime_type = header[len("data:"):].split(";", 1)[0].strip() or "image/jpeg" + try: + raw = base64.b64decode(payload, validate=True) + except (binascii.Error, ValueError) as exc: + logger.warning(f"FastGPT image upload skipped; invalid base64: {exc}") + return data_url + + suffix = _MIME_TO_EXT.get(mime_type, ".jpg") + tmp_path: str | None = None + try: + with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp: + tmp.write(raw) + tmp_path = tmp.name + result = await self._client.upload_chat_image( + appId=self._app_id, + chatId=self._chat_id, + file_path=tmp_path, + ) + url = result.get("url") if isinstance(result, dict) else None + if isinstance(url, str) and url: + logger.info( + f"FastGPT image uploaded chatId={self._chat_id} " + f"bytes={len(raw)} url={url}" + ) + return url + logger.warning("FastGPT image upload returned no url; using inline base64") + return data_url + except Exception as exc: + logger.warning(f"FastGPT image upload failed; using inline base64: {exc}") + return data_url + finally: + if tmp_path is not None: + try: + os.unlink(tmp_path) + except OSError: + pass + async def _process_context(self, context: LLMContext) -> None: messages = self._build_fastgpt_messages(context) + messages = await self._resolve_image_inputs(messages) variables = self._settings.variables or None logger.info( "FastGPT chat completion " f"chatId={self._chat_id} appId={self._app_id or '-'} " - f"variables={sorted((variables or {}).keys())} messages={messages!r}" + f"variables={sorted((variables or {}).keys())} " + f"messages={_redact_messages_for_log(messages)!r}" ) await self.start_ttfb_metrics() diff --git a/src/voice/pipeline.py b/src/voice/pipeline.py index 1d25cb5..bfc0529 100644 --- a/src/voice/pipeline.py +++ b/src/voice/pipeline.py @@ -23,6 +23,7 @@ from pipecat.processors.aggregators.llm_response_universal import ( UserTurnStoppedMessage, ) from pipecat.serializers.base_serializer import FrameSerializer +from pipecat.serializers.protobuf import ProtobufFrameSerializer from pipecat.transports.websocket.fastapi import ( FastAPIWebsocketParams, FastAPIWebsocketTransport, @@ -68,6 +69,15 @@ async def run_product_voice_pipeline(websocket, config: EngineConfig) -> None: ) +async def run_voice_pipeline(websocket, config: EngineConfig) -> None: + await run_pipeline_with_serializer( + websocket, + config, + serializer=ProtobufFrameSerializer(), + client_label="Pipecat protobuf", + ) + + async def run_pipeline_with_serializer( websocket, config: EngineConfig, @@ -120,8 +130,13 @@ async def run_pipeline_with_serializer( stop_secs=config.turn.vad.stop_secs, min_volume=config.turn.vad.min_volume, ) - # Use a simple silence-timeout strategy for streaming ASR so short Chinese - # pauses do not split one logical utterance into multiple LLM calls. + # Replace pipecat's default stop strategy (Smart Turn v3) with a simple + # silence-timeout strategy. Smart Turn v3 was finalizing every short + # Chinese phrase as a complete turn, which caused one logical utterance + # to become several LLM calls and several user bubbles in the UI. The + # timeout strategy waits for `user_speech_timeout_sec` of silence + # (re-armed every time the user resumes speaking) before declaring the + # turn finished — which is what we actually want for streaming ASRs. user_turn_strategies = UserTurnStrategies( start=[ InterruptionGateUserTurnStartStrategy( @@ -225,22 +240,6 @@ async def run_pipeline_with_serializer( nonlocal idle_prompt_count idle_prompt_count = 0 - @user_aggregator.event_handler("on_user_turn_idle") - async def on_user_turn_idle(aggregator): - nonlocal idle_prompt_count - text = config.turn.idle_prompt_text.strip() - if not text or config.turn.idle_prompt_max_count <= 0: - return - if idle_prompt_count >= config.turn.idle_prompt_max_count: - return - - idle_prompt_count += 1 - logger.info( - "User idle prompt triggered " - f"count={idle_prompt_count}/{config.turn.idle_prompt_max_count}" - ) - await aggregator.push_frame(TTSSpeakFrame(text)) - @user_aggregator.event_handler("on_user_turn_stopped") async def on_user_turn_stopped(_aggregator, _strategy, message: UserTurnStoppedMessage): logger.info(f"User: {message.content}") @@ -268,5 +267,25 @@ async def run_pipeline_with_serializer( ) text_stream.take_interrupted_stream_text() + @user_aggregator.event_handler("on_user_turn_idle") + async def on_user_turn_idle(aggregator): + nonlocal idle_prompt_count + text = config.turn.idle_prompt_text.strip() + if not text or config.turn.idle_prompt_max_count <= 0: + return + if idle_prompt_count >= config.turn.idle_prompt_max_count: + return + + idle_prompt_count += 1 + logger.info( + "User idle prompt triggered " + f"count={idle_prompt_count}/{config.turn.idle_prompt_max_count}" + ) + await aggregator.push_frame(TTSSpeakFrame(text)) + + # NOTE: assistant turn started/final events are emitted by + # ProductTextStreamProcessor, upstream of TTS, so text streams to the + # client ahead of audio. This logger is kept for server-side visibility. + runner = PipelineRunner(handle_sigint=False) await runner.run(task) diff --git a/src/voice/services.py b/src/voice/services.py index 8fbd916..142cc25 100644 --- a/src/voice/services.py +++ b/src/voice/services.py @@ -65,6 +65,7 @@ def create_llm_service( app_id=config.app_id, greeting_prompt=greeting_prompt, timeout=config.timeout_sec, + image_input_mode=config.image_input_mode, settings=FastGPTLLMSettings( model=config.model or "fastgpt", variables=variables, diff --git a/src/voice/text_input.py b/src/voice/text_input.py index f9b5c4e..2cc8007 100644 --- a/src/voice/text_input.py +++ b/src/voice/text_input.py @@ -6,6 +6,7 @@ from pipecat.frames.frames import ( Frame, InputTransportMessageFrame, LLMMessagesAppendFrame, + UserImageRawFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame, ) @@ -13,11 +14,17 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor class ProductTextInputProcessor(FrameProcessor): - """Converts product text-input transport messages into LLM turns.""" + """Converts product text-input transport messages and marks image input as user activity.""" async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) + if isinstance(frame, UserImageRawFrame): + await self.broadcast_frame(UserStartedSpeakingFrame) + await self.push_frame(frame, direction) + await self.broadcast_frame(UserStoppedSpeakingFrame) + return + if not isinstance(frame, InputTransportMessageFrame): await self.push_frame(frame, direction) return diff --git a/src/voice/text_stream.py b/src/voice/text_stream.py index 2cd60ee..0074931 100644 --- a/src/voice/text_stream.py +++ b/src/voice/text_stream.py @@ -154,6 +154,8 @@ class ProductTextStreamProcessor(FrameProcessor): await self.push_frame(frame, direction) await self._handle_interrupt() elif isinstance(frame, TTSSpeakFrame): + # Fixed-text / direct-speech path: there's no LLM cycle, so + # synthesize one started/delta/final sequence for the spoken text. text = frame.text or "" await self.push_frame(frame, direction) await self._start_turn() @@ -172,6 +174,8 @@ class ProductTextStreamProcessor(FrameProcessor): async def _delta(self, text: str) -> None: if not self._turn_active: + # A text frame outside a turn shouldn't happen, but if it does, + # synthesize a started boundary so the client renders sensibly. await self._start_turn() self._aggregation.append(text) await self._emit("response.text.delta", text=text) diff --git a/src/voice/turn_start.py b/src/voice/turn_start.py index cdc5643..929ac75 100644 --- a/src/voice/turn_start.py +++ b/src/voice/turn_start.py @@ -18,7 +18,12 @@ _COUNTABLE_TEXT_RE = re.compile(r"[\w\u4e00-\u9fff]", re.UNICODE) class InterruptionGateUserTurnStartStrategy(BaseUserTurnStartStrategy): - """Starts user turns only after likely intentional speech.""" + """Starts user turns only after likely intentional speech. + + When the assistant is speaking, short background speech should not barge in + unless it is a common answer to a yes/no style question. When the assistant + is not speaking, any non-empty transcript can start a normal user turn. + """ def __init__( self, diff --git a/static/voice-demo/app.js b/static/voice-demo/app.js index defd4bd..f8bcfd8 100644 --- a/static/voice-demo/app.js +++ b/static/voice-demo/app.js @@ -24,6 +24,19 @@ const WS_LOG_GROUP_KEYS = { AUDIO_SEND: "send:input.audio", }; const CAMERA_DONE_TEXT = "【拍摄完成】"; +// Sample images shown as thumbnails under the camera preview. Same-origin files +// so they can be drawn to a canvas (for base64 + dimensions) without tainting. +const SAMPLE_IMAGES = [ + { src: "./samples/damage1.png", label: "车辆前部" }, + { src: "./samples/damage2.png", label: "车辆后部" }, + { src: "./samples/plate1.jpg", label: "车牌 1" }, + { src: "./samples/plate2.jpg", label: "车牌 2" }, + { src: "./samples/user1.jpg", label: "人物 1" }, + { src: "./samples/user2.jpg", label: "人物 2" }, +]; +// Cap the longer edge before JPEG-encoding so payloads stay small. +const IMAGE_MAX_DIM = 1280; +const IMAGE_JPEG_QUALITY = 0.85; const CAMERA_STATE_PROMPTS = { 2000: "请对准车辆碰撞部位拍摄照片。", 2001: "请对准车辆碰撞部位拍摄照片。", @@ -62,6 +75,15 @@ const els = { cameraState: document.getElementById("camera-state"), cameraQuestion: document.getElementById("camera-question"), cameraDoneBtn: document.getElementById("camera-done-btn"), + cameraPreview: document.getElementById("camera-preview"), + cameraVideo: document.getElementById("camera-video"), + cameraPhoto: document.getElementById("camera-photo"), + cameraCanvas: document.getElementById("camera-canvas"), + cameraStartBtn: document.getElementById("camera-start-btn"), + cameraDeviceRow: document.getElementById("camera-device-row"), + cameraDeviceSelect: document.getElementById("camera-device-select"), + cameraUpload: document.getElementById("camera-upload"), + cameraSamples: document.getElementById("camera-samples"), clearBtn: document.getElementById("clear-btn"), clearWsLogBtn: document.getElementById("clear-ws-log-btn"), wsLog: document.getElementById("ws-log"), @@ -125,6 +147,14 @@ const state = { assistantState: "", cameraState: "", + // Camera / image input. + cameraStream: null, + cameraActive: false, + cameraFacing: "environment", + videoDevices: [], + pendingImage: null, + samplesRendered: false, + // VU meter smoothing. meterLevel: 0, @@ -143,15 +173,15 @@ function setConnectButton() { els.chatId.disabled = state.connected || state.connecting; els.copyChatIdBtn.disabled = !state.connected || !state.chatId; if (state.connecting) { - els.connectBtn.textContent = "Connecting…"; + els.connectBtn.textContent = "连接中…"; els.connectBtn.disabled = true; els.connectBtn.classList.remove("is-disconnect"); } else if (state.connected) { - els.connectBtn.textContent = "Disconnect"; + els.connectBtn.textContent = "断开连接"; els.connectBtn.disabled = false; els.connectBtn.classList.add("is-disconnect"); } else { - els.connectBtn.textContent = "Connect"; + els.connectBtn.textContent = "连接"; els.connectBtn.disabled = false; els.connectBtn.classList.remove("is-disconnect"); } @@ -180,8 +210,8 @@ async function copyChatId() { function setMicButton() { els.micBtn.disabled = !state.connected; els.micBtn.setAttribute("aria-pressed", state.micEnabled ? "true" : "false"); - els.micBtn.title = state.micEnabled ? "Mute mic" : "Unmute mic"; - els.micLabel.textContent = state.micEnabled ? "Mute mic" : "Enable mic"; + els.micBtn.title = state.micEnabled ? "关闭麦克风" : "开启麦克风"; + els.micLabel.textContent = state.micEnabled ? "关闭麦克风" : "开启麦克风"; els.micIndicator.classList.toggle("is-active", state.micEnabled); } @@ -204,41 +234,40 @@ function setAssistantState(value) { const label = text.length > 32 ? `${text.slice(0, 31)}…` : text; state.assistantState = text; els.stateIndicator.classList.toggle("is-active", Boolean(text)); - els.stateLabel.textContent = label ? `State ${label}` : "State -"; - els.stateIndicator.title = label ? `Assistant state: ${text}` : "Assistant state"; + els.stateLabel.textContent = label ? `状态 ${label}` : "状态 -"; + els.stateIndicator.title = label ? `助手状态:${text}` : "助手状态"; syncCameraDrawer(text); } function setCameraButtonEnabled() { if (!els.cameraDoneBtn) return; - els.cameraDoneBtn.disabled = - !state.connected || !state.cameraState || - !state.ws || state.ws.readyState !== WebSocket.OPEN; + const wsReady = + state.connected && state.ws && state.ws.readyState === WebSocket.OPEN; + const hasImageSource = state.cameraActive || Boolean(state.pendingImage); + els.cameraDoneBtn.disabled = !wsReady || !state.cameraState || !hasImageSource; } function syncCameraDrawer(value) { const prompt = CAMERA_STATE_PROMPTS[value]; const open = Boolean(prompt); + const wasOpen = Boolean(state.cameraState); state.cameraState = open ? value : ""; els.cameraDrawer.classList.toggle("is-open", open); els.conversation.classList.toggle("has-camera", open); els.cameraDrawer.setAttribute("aria-hidden", open ? "false" : "true"); if (open) { - els.cameraState.textContent = `State ${value}`; + els.cameraState.textContent = `状态 ${value}`; els.cameraQuestion.textContent = prompt; + renderSampleThumbnails(); + selectDefaultImage(); } else { - els.cameraState.textContent = "State -"; + els.cameraState.textContent = "状态 -"; els.cameraQuestion.textContent = ""; + if (wasOpen) resetCameraInput(); } setCameraButtonEnabled(); } -function updateCameraQuestion(text) { - const value = typeof text === "string" ? text.trim() : ""; - if (!state.cameraState || !value) return; - els.cameraQuestion.textContent = value; -} - function addBubble(role, text) { if (els.chatLog.querySelector(".chat__empty")) { els.chatLog.innerHTML = ""; @@ -248,7 +277,7 @@ function addBubble(role, text) { if (role !== "system") { const tag = document.createElement("span"); tag.className = "bubble__role"; - tag.textContent = role === "user" ? "You" : "Assistant"; + tag.textContent = role === "user" ? "你" : "助手"; bubble.appendChild(tag); } const body = document.createElement("span"); @@ -260,6 +289,35 @@ function addBubble(role, text) { return bubble; } +// Render a single chat bubble holding an image and (optionally) text together. +function addImageBubble(role, imageUrl, text) { + if (els.chatLog.querySelector(".chat__empty")) { + els.chatLog.innerHTML = ""; + } + const bubble = document.createElement("div"); + bubble.className = `bubble bubble--${role}`; + if (role !== "system") { + const tag = document.createElement("span"); + tag.className = "bubble__role"; + tag.textContent = role === "user" ? "你" : "助手"; + bubble.appendChild(tag); + } + const img = document.createElement("img"); + img.className = "bubble__image"; + img.src = imageUrl; + img.alt = text || "image"; + bubble.appendChild(img); + + const body = document.createElement("span"); + body.className = "bubble__text"; + body.textContent = text || ""; + bubble.appendChild(body); + + els.chatLog.appendChild(bubble); + scrollChatToBottom(); + return bubble; +} + function appendToBubble(bubble, text) { const body = bubble.querySelector(".bubble__text"); body.textContent += text; @@ -276,7 +334,7 @@ function clearChat() { setAssistantState(""); const empty = document.createElement("div"); empty.className = "chat__empty"; - empty.innerHTML = "
Chat cleared.
"; + empty.innerHTML = "对话已清空。
"; els.chatLog.appendChild(empty); } @@ -499,6 +557,9 @@ function compactWsPayload(payload) { if (typeof compact.audio === "string") { compact.audio = `