Files
engine-v5-pipecat-core/engine/fastgpt_llm.py
2026-06-02 08:24:53 +08:00

688 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import asyncio
import base64
import binascii
import json
import os
import tempfile
import uuid
from dataclasses import dataclass, field
from typing import Any
import httpx
from fastgpt_client import AsyncChatClient, FastGPTInteractiveEvent, aiter_stream_events
from fastgpt_client.exceptions import FastGPTError
from loguru import logger
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
InterruptionFrame,
LLMContextFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMTextFrame,
OutputTransportMessageFrame,
OutputTransportMessageUrgentFrame,
)
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import LLMService
from pipecat.services.settings import LLMSettings
from .state_info import FastGPTStateFlushRequestFrame
def _extract_text_from_event(kind: str, payload: Any) -> str:
if not isinstance(payload, dict):
return ""
if kind in {"answer", "fastAnswer"}:
text = payload.get("text")
if isinstance(text, str) and text:
return text
choices = payload.get("choices") if isinstance(payload.get("choices"), list) else []
if not choices:
return str(payload.get("text") or "")
first_choice = choices[0] if isinstance(choices[0], dict) else {}
delta = first_choice.get("delta") if isinstance(first_choice.get("delta"), dict) else {}
content = delta.get("content")
if isinstance(content, str) and content:
return content
message = first_choice.get("message") if isinstance(first_choice.get("message"), dict) else {}
message_content = message.get("content")
if isinstance(message_content, str) and message_content:
return message_content
return ""
def _message_text(message: dict[str, Any]) -> str:
content = message.get("content")
if isinstance(content, str):
return content.strip()
if isinstance(content, list):
parts: list[str] = []
for part in content:
if isinstance(part, dict) and part.get("type") == "text":
text = part.get("text")
if isinstance(text, str) and text.strip():
parts.append(text.strip())
return " ".join(parts)
return ""
IMAGE_INPUT_MODE_BASE64 = "base64"
IMAGE_INPUT_MODE_UPLOAD = "upload"
SUPPORTED_IMAGE_INPUT_MODES = frozenset({IMAGE_INPUT_MODE_BASE64, IMAGE_INPUT_MODE_UPLOAD})
_MIME_TO_EXT = {
"image/jpeg": ".jpg",
"image/png": ".png",
"image/webp": ".webp",
}
def _message_has_image(message: dict[str, Any]) -> bool:
content = message.get("content")
if not isinstance(content, list):
return False
return any(
isinstance(part, dict) and part.get("type") == "image_url"
for part in content
)
def _redact_messages_for_log(messages: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Replace base64 image data URLs with a short placeholder for logging."""
redacted: list[dict[str, Any]] = []
for message in messages:
content = message.get("content")
if not isinstance(content, list):
redacted.append(message)
continue
parts: list[Any] = []
for part in content:
if (
isinstance(part, dict)
and part.get("type") == "image_url"
and isinstance(part.get("image_url"), dict)
):
url = str(part["image_url"].get("url") or "")
parts.append({"type": "image_url", "image_url": {"url": f"<{len(url)} chars>"}})
else:
parts.append(part)
redacted.append({**message, "content": parts})
return redacted
def _first_nonempty_text(*values: Any) -> str:
for value in values:
if isinstance(value, str):
text = value.strip()
if text:
return text
return ""
def _interactive_spoken_prompt(event: FastGPTInteractiveEvent) -> str:
payload = event.data if isinstance(event.data, dict) else {}
params = payload.get("params") if isinstance(payload.get("params"), dict) else {}
prompt = _first_nonempty_text(
payload.get("opener"),
params.get("opener"),
payload.get("prompt"),
params.get("prompt"),
payload.get("text"),
params.get("text"),
payload.get("title"),
params.get("title"),
payload.get("description"),
params.get("description"),
)
if prompt:
return prompt
if event.interaction_type == "userSelect":
raw_options = (
params.get("userSelectOptions")
if isinstance(params.get("userSelectOptions"), list)
else []
)
labels: list[str] = []
for index, raw in enumerate(raw_options, start=1):
if isinstance(raw, str) and raw.strip():
labels.append(f"{index}. {raw.strip()}")
elif isinstance(raw, dict):
label = _first_nonempty_text(raw.get("label"), raw.get("value"))
if label:
labels.append(f"{index}. {label}")
if labels:
return "请选择:" + "".join(labels)
return "请选择一个选项。"
if event.interaction_type == "userInput":
input_form = params.get("inputForm") if isinstance(params.get("inputForm"), list) else []
labels = [
_first_nonempty_text(field.get("label"), field.get("name"))
for field in input_form
if isinstance(field, dict)
]
labels = [label for label in labels if label]
if labels:
return "请提供以下信息:" + "".join(labels)
return "请补充所需信息。"
return "请继续。"
@dataclass
class FastGPTLLMSettings(LLMSettings):
variables: dict[str, Any] = field(default_factory=dict)
detail: bool = False
def _default_fastgpt_settings(*, model: str = "fastgpt") -> FastGPTLLMSettings:
return FastGPTLLMSettings(
model=model,
system_instruction=None,
temperature=None,
max_tokens=None,
top_p=None,
top_k=None,
frequency_penalty=None,
presence_penalty=None,
seed=None,
filter_incomplete_user_turns=False,
user_turn_completion_config=None,
variables={},
detail=False,
)
class FastGPTLLMService(LLMService):
"""FastGPT LLM service using chatId server-side memory and workflow variables."""
Settings = FastGPTLLMSettings
def __init__(
self,
*,
api_key: str,
base_url: str,
chat_id: str | None = None,
app_id: str | None = None,
greeting_prompt: str | None = None,
timeout: float = 60.0,
image_input_mode: str = IMAGE_INPUT_MODE_BASE64,
settings: FastGPTLLMSettings | None = None,
**kwargs,
) -> None:
default_settings = _default_fastgpt_settings()
if settings is not None:
default_settings.apply_update(settings)
super().__init__(settings=default_settings, **kwargs)
self._chat_id = chat_id or f"voice_{uuid.uuid4().hex[:16]}"
self._app_id = (app_id or "").strip()
self._greeting_prompt = (greeting_prompt or "你好").strip() or "你好"
mode = (image_input_mode or IMAGE_INPUT_MODE_BASE64).strip().lower()
if mode not in SUPPORTED_IMAGE_INPUT_MODES:
raise ValueError(
f"Unsupported image_input_mode {image_input_mode!r}; "
f"expected one of {sorted(SUPPORTED_IMAGE_INPUT_MODES)}"
)
if mode == IMAGE_INPUT_MODE_UPLOAD and not self._app_id:
logger.warning(
"FastGPT image_input_mode='upload' requires app_id; "
"falling back to inline base64"
)
mode = IMAGE_INPUT_MODE_BASE64
self._image_input_mode = mode
self._client = AsyncChatClient(
api_key=api_key,
base_url=base_url,
timeout=timeout,
)
self._active_response = None
@property
def app_id(self) -> str:
return self._app_id
@property
def chat_id(self) -> str:
return self._chat_id
def set_variables(self, variables: dict[str, Any]) -> None:
merged = dict(self._settings.variables)
merged.update(variables)
self._settings.variables = merged
async def stop(self, frame: EndFrame) -> None:
await self._close_active_response()
await self._client.close()
await super().stop(frame)
async def cancel(self, frame: CancelFrame) -> None:
await self._close_active_response()
await super().cancel(frame)
async def _handle_interruptions(self, _: InterruptionFrame) -> None:
await self._close_active_response()
await super()._handle_interruptions(_)
@staticmethod
def _welcome_text_from_init_payload(payload: Any) -> str:
if not isinstance(payload, dict):
return ""
for container in (payload.get("app"), payload.get("data"), payload):
if not isinstance(container, dict):
continue
nested_app = container.get("app")
if isinstance(nested_app, dict):
text = FastGPTLLMService._welcome_text_from_app(nested_app)
if text:
return text
text = FastGPTLLMService._welcome_text_from_app(container)
if text:
return text
return ""
@staticmethod
def _welcome_text_from_app(app_payload: dict[str, Any]) -> str:
chat_config = (
app_payload.get("chatConfig")
if isinstance(app_payload.get("chatConfig"), dict)
else {}
)
return _first_nonempty_text(
chat_config.get("welcomeText"),
app_payload.get("welcomeText"),
app_payload.get("opener"),
app_payload.get("intro"),
)
async def fetch_welcome_text(self) -> str | None:
"""Return FastGPT app welcome text from chat init when ``app_id`` is configured."""
if not self._app_id:
return None
try:
response = await self._client.get_chat_init(
appId=self._app_id,
chatId=self._chat_id,
)
response.raise_for_status()
text = self._welcome_text_from_init_payload(response.json())
if text:
logger.info(f"FastGPT app opener loaded for appId={self._app_id}")
return text or None
except FastGPTError as exc:
logger.warning(f"FastGPT chat init failed: {exc}")
except httpx.HTTPError as exc:
logger.warning(f"FastGPT chat init HTTP error: {exc}")
except Exception as exc:
logger.warning(f"FastGPT chat init error: {exc}")
return None
async def has_chat_history(self) -> bool:
"""Return whether FastGPT has persisted records for this chatId."""
if not self._app_id:
return False
try:
response = await self._client.get_chat_records(
appId=self._app_id,
chatId=self._chat_id,
offset=0,
pageSize=1,
)
response.raise_for_status()
data = response.json()
records = data.get("data", {}).get("list", [])
return isinstance(records, list) and bool(records)
except FastGPTError as exc:
logger.warning(f"FastGPT chat records failed: {exc}")
except httpx.HTTPError as exc:
logger.warning(f"FastGPT chat records HTTP error: {exc}")
except Exception as exc:
logger.warning(f"FastGPT chat records error: {exc}")
return False
async def fetch_session_greeting_text(self, reconnect_greeting: str) -> str | None:
"""Use opener for a new chatId and a fixed greeting for reconnects."""
if await self.has_chat_history():
logger.info(f"FastGPT chatId={self._chat_id} has history; using reconnect greeting")
return reconnect_greeting.strip() or None
logger.info(f"FastGPT chatId={self._chat_id} has no history; using app opener")
return await self.fetch_welcome_text()
async def _close_active_response(self) -> None:
response = self._active_response
self._active_response = None
if response is not None:
await response.aclose()
def _build_fastgpt_messages(self, context: LLMContext) -> list[dict[str, Any]]:
raw_messages = context.get_messages()
for message in reversed(raw_messages):
if not isinstance(message, dict) or message.get("role") != "user":
continue
if _message_has_image(message):
# Multimodal turn: forward the OpenAI-style content list as-is
# (text parts + image_url with a base64 data URL). FastGPT's
# /chat/completions accepts this directly.
return [{"role": "user", "content": message["content"]}]
text = _message_text(message)
if text:
return [{"role": "user", "content": text}]
return [{"role": "user", "content": self._greeting_prompt}]
async def _resolve_image_inputs(
self, messages: list[dict[str, Any]]
) -> list[dict[str, Any]]:
"""In ``upload`` mode, replace inline base64 image data URLs with uploaded URLs.
In ``base64`` mode the messages are returned untouched (inline data URLs).
New message/content objects are built so the shared ``LLMContext`` messages
are never mutated.
"""
if self._image_input_mode != IMAGE_INPUT_MODE_UPLOAD:
return messages
resolved: list[dict[str, Any]] = []
for message in messages:
content = message.get("content")
if not isinstance(content, list):
resolved.append(message)
continue
new_content: list[Any] = []
for part in content:
url = (
part.get("image_url", {}).get("url")
if isinstance(part, dict) and part.get("type") == "image_url"
else None
)
if isinstance(url, str) and url.startswith("data:image/"):
uploaded = await self._upload_data_url(url)
new_content.append(
{"type": "image_url", "image_url": {"url": uploaded}}
)
else:
new_content.append(part)
resolved.append({**message, "content": new_content})
return resolved
async def _upload_data_url(self, data_url: str) -> str:
"""Upload a ``data:image/...;base64,...`` URL via FastGPT and return its URL.
Falls back to the original data URL if parsing or upload fails so the turn
still proceeds with inline base64.
"""
header, _, payload = data_url.partition(",")
mime_type = header[len("data:") :].split(";", 1)[0].strip() or "image/jpeg"
try:
raw = base64.b64decode(payload, validate=True)
except (binascii.Error, ValueError) as exc:
logger.warning(f"FastGPT image upload skipped; invalid base64: {exc}")
return data_url
suffix = _MIME_TO_EXT.get(mime_type, ".jpg")
tmp_path: str | None = None
try:
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp:
tmp.write(raw)
tmp_path = tmp.name
result = await self._client.upload_chat_image(
appId=self._app_id,
chatId=self._chat_id,
file_path=tmp_path,
)
url = result.get("url") if isinstance(result, dict) else None
if isinstance(url, str) and url:
logger.info(
f"FastGPT image uploaded chatId={self._chat_id} "
f"bytes={len(raw)} url={url}"
)
return url
logger.warning("FastGPT image upload returned no url; using inline base64")
return data_url
except Exception as exc:
logger.warning(f"FastGPT image upload failed; using inline base64: {exc}")
return data_url
finally:
if tmp_path is not None:
try:
os.unlink(tmp_path)
except OSError:
pass
async def _process_context(self, context: LLMContext) -> None:
messages = self._build_fastgpt_messages(context)
messages = await self._resolve_image_inputs(messages)
variables = self._settings.variables or None
logger.info(
"FastGPT chat completion "
f"chatId={self._chat_id} appId={self._app_id or '-'} "
f"variables={sorted((variables or {}).keys())} "
f"messages={_redact_messages_for_log(messages)!r}"
)
await self.start_ttfb_metrics()
try:
response = await self._client.create_chat_completion(
messages=messages,
stream=True,
chatId=self._chat_id,
variables=variables,
detail=self._settings.detail,
)
except FastGPTError as exc:
await self.push_error(error_msg=f"FastGPT request failed: {exc}", exception=exc)
return
except httpx.HTTPError as exc:
await self.push_error(error_msg=f"FastGPT HTTP error: {exc}", exception=exc)
return
self._active_response = response
try:
async for event in aiter_stream_events(response):
if event.kind in {"data", "answer", "fastAnswer"}:
text = _extract_text_from_event(event.kind, event.data)
if text:
await self.stop_ttfb_metrics()
await self.push_frame(LLMTextFrame(text))
continue
if event.kind == "interactive" and isinstance(event, FastGPTInteractiveEvent):
await self._handle_interactive(event)
break
if event.kind == "error":
payload = event.data if isinstance(event.data, dict) else {}
message = _first_nonempty_text(
payload.get("message"),
payload.get("error"),
) or "FastGPT stream error"
await self.push_error(error_msg=message)
break
if event.kind == "done":
break
finally:
self._active_response = None
await response.aclose()
async def _handle_interactive(self, event: FastGPTInteractiveEvent) -> None:
prompt = _interactive_spoken_prompt(event)
if prompt:
await self.stop_ttfb_metrics()
await self.push_frame(LLMTextFrame(prompt))
await self.push_frame(
OutputTransportMessageFrame(
message={
"type": "response.interactive",
"interaction_type": event.interaction_type,
"data": event.data,
}
),
FrameDirection.DOWNSTREAM,
)
async def _process_state_flush_request(self, frame: FastGPTStateFlushRequestFrame) -> None:
try:
await self._run_state_transaction(frame)
except Exception as exc:
logger.error(
"FastGPT set_info failed "
f"request_id={frame.request_id} key={frame.key!r}: {exc}"
)
await self._push_state_ack(
request_id=frame.request_id,
ok=False,
error=str(exc) or "FastGPT state update failed",
retryable=True,
)
return
await self._push_state_ack(request_id=frame.request_id, ok=True)
async def _run_state_transaction(self, frame: FastGPTStateFlushRequestFrame) -> None:
task = asyncio.create_task(self._set_fastgpt_state(frame))
try:
await asyncio.shield(task)
except asyncio.CancelledError:
logger.info(
"Waiting for in-flight FastGPT set_info to finish after cancellation "
f"request_id={frame.request_id}"
)
await task
async def _set_fastgpt_state(self, frame: FastGPTStateFlushRequestFrame) -> None:
current_state = await self._read_fastgpt_state()
await self._delete_last_two_chat_records()
current_state[frame.key] = frame.value
logger.info(
"Writing FastGPT state "
f"chatId={self._chat_id} request_id={frame.request_id} key={frame.key!r}"
)
response = await self._client.create_chat_completion(
messages=[{"role": "user", "content": ""}],
chatId=self._chat_id,
stream=False,
detail=True,
variables={"state": current_state},
)
response.raise_for_status()
await self._delete_last_two_chat_records()
async def _read_fastgpt_state(self) -> dict[str, Any]:
response = await self._client.create_chat_completion(
messages=[{"role": "user", "content": ""}],
chatId=self._chat_id,
stream=False,
detail=True,
)
response.raise_for_status()
data = response.json()
state = data.get("newVariables", {}).get("state", {})
if isinstance(state, str):
state = json.loads(state) if state else {}
if state is None:
return {}
if not isinstance(state, dict):
raise ValueError("FastGPT newVariables.state must be an object or JSON object string")
return dict(state)
async def _delete_last_two_chat_records(self) -> None:
if not self._app_id:
raise ValueError("FastGPT app_id is required to clean synthetic chat records")
response = await self._client.get_chat_records(
appId=self._app_id,
chatId=self._chat_id,
offset=0,
pageSize=10,
)
response.raise_for_status()
data = response.json()
records = data.get("data", {}).get("list", [])
if len(records) < 2:
logger.warning(f"Less than 2 FastGPT records found for chatId={self._chat_id}")
return
data_ids = [record["dataId"] for record in records[-2:]]
logger.info(f"Deleting FastGPT synthetic records chatId={self._chat_id} dataIds={data_ids}")
for data_id in data_ids:
delete_response = await self._client.delete_chat_record(
appId=self._app_id,
chatId=self._chat_id,
contentId=data_id,
)
delete_response.raise_for_status()
async def _push_state_ack(
self,
*,
request_id: str,
ok: bool,
error: str | None = None,
retryable: bool | None = None,
) -> None:
payload: dict[str, Any] = {
"type": "session.set_info.ack",
"request_id": request_id,
"ok": ok,
}
if error is not None:
payload["error"] = error
if retryable is not None:
payload["retryable"] = retryable
await self.push_frame(
OutputTransportMessageUrgentFrame(message=payload),
FrameDirection.DOWNSTREAM,
)
async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
await super().process_frame(frame, direction)
if isinstance(frame, FastGPTStateFlushRequestFrame):
await self._process_state_flush_request(frame)
elif isinstance(frame, LLMContextFrame):
try:
await self.push_frame(LLMFullResponseStartFrame())
await self.start_processing_metrics()
await self._process_context(frame.context)
except httpx.TimeoutException as exc:
await self._call_event_handler("on_completion_timeout")
await self.push_error(error_msg="FastGPT completion timeout", exception=exc)
except Exception as exc:
await self.push_error(error_msg=f"FastGPT completion error: {exc}", exception=exc)
finally:
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())
else:
await self.push_frame(frame, direction)