Files
engine-v5-pipecat-core/engine/fastgpt_llm.py

390 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import uuid
from dataclasses import dataclass, field
from typing import Any
import httpx
from fastgpt_client import AsyncChatClient, FastGPTInteractiveEvent, aiter_stream_events
from fastgpt_client.exceptions import FastGPTError
from loguru import logger
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
InterruptionFrame,
LLMContextFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMTextFrame,
OutputTransportMessageFrame,
)
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import LLMService
from pipecat.services.settings import LLMSettings
def _extract_text_from_event(kind: str, payload: Any) -> str:
if not isinstance(payload, dict):
return ""
if kind in {"answer", "fastAnswer"}:
text = payload.get("text")
if isinstance(text, str) and text:
return text
choices = payload.get("choices") if isinstance(payload.get("choices"), list) else []
if not choices:
return str(payload.get("text") or "")
first_choice = choices[0] if isinstance(choices[0], dict) else {}
delta = first_choice.get("delta") if isinstance(first_choice.get("delta"), dict) else {}
content = delta.get("content")
if isinstance(content, str) and content:
return content
message = first_choice.get("message") if isinstance(first_choice.get("message"), dict) else {}
message_content = message.get("content")
if isinstance(message_content, str) and message_content:
return message_content
return ""
def _message_text(message: dict[str, Any]) -> str:
content = message.get("content")
if isinstance(content, str):
return content.strip()
if isinstance(content, list):
parts: list[str] = []
for part in content:
if isinstance(part, dict) and part.get("type") == "text":
text = part.get("text")
if isinstance(text, str) and text.strip():
parts.append(text.strip())
return " ".join(parts)
return ""
def _first_nonempty_text(*values: Any) -> str:
for value in values:
if isinstance(value, str):
text = value.strip()
if text:
return text
return ""
def _interactive_spoken_prompt(event: FastGPTInteractiveEvent) -> str:
payload = event.data if isinstance(event.data, dict) else {}
params = payload.get("params") if isinstance(payload.get("params"), dict) else {}
prompt = _first_nonempty_text(
payload.get("opener"),
params.get("opener"),
payload.get("prompt"),
params.get("prompt"),
payload.get("text"),
params.get("text"),
payload.get("title"),
params.get("title"),
payload.get("description"),
params.get("description"),
)
if prompt:
return prompt
if event.interaction_type == "userSelect":
raw_options = (
params.get("userSelectOptions")
if isinstance(params.get("userSelectOptions"), list)
else []
)
labels: list[str] = []
for index, raw in enumerate(raw_options, start=1):
if isinstance(raw, str) and raw.strip():
labels.append(f"{index}. {raw.strip()}")
elif isinstance(raw, dict):
label = _first_nonempty_text(raw.get("label"), raw.get("value"))
if label:
labels.append(f"{index}. {label}")
if labels:
return "请选择:" + "".join(labels)
return "请选择一个选项。"
if event.interaction_type == "userInput":
input_form = params.get("inputForm") if isinstance(params.get("inputForm"), list) else []
labels = [
_first_nonempty_text(field.get("label"), field.get("name"))
for field in input_form
if isinstance(field, dict)
]
labels = [label for label in labels if label]
if labels:
return "请提供以下信息:" + "".join(labels)
return "请补充所需信息。"
return "请继续。"
@dataclass
class FastGPTLLMSettings(LLMSettings):
variables: dict[str, Any] = field(default_factory=dict)
detail: bool = False
def _default_fastgpt_settings(*, model: str = "fastgpt") -> FastGPTLLMSettings:
return FastGPTLLMSettings(
model=model,
system_instruction=None,
temperature=None,
max_tokens=None,
top_p=None,
top_k=None,
frequency_penalty=None,
presence_penalty=None,
seed=None,
filter_incomplete_user_turns=False,
user_turn_completion_config=None,
variables={},
detail=False,
)
class FastGPTLLMService(LLMService):
"""FastGPT LLM service using chatId server-side memory and workflow variables."""
Settings = FastGPTLLMSettings
def __init__(
self,
*,
api_key: str,
base_url: str,
chat_id: str | None = None,
app_id: str | None = None,
send_system_prompt: bool = False,
greeting_prompt: str | None = None,
timeout: float = 60.0,
settings: FastGPTLLMSettings | None = None,
**kwargs,
) -> None:
default_settings = _default_fastgpt_settings()
if settings is not None:
default_settings.apply_update(settings)
super().__init__(settings=default_settings, **kwargs)
self._chat_id = chat_id or f"voice_{uuid.uuid4().hex[:16]}"
self._app_id = (app_id or "").strip()
self._send_system_prompt = send_system_prompt
self._greeting_prompt = (greeting_prompt or "你好").strip() or "你好"
self._client = AsyncChatClient(
api_key=api_key,
base_url=base_url,
timeout=timeout,
)
self._active_response = None
@property
def app_id(self) -> str:
return self._app_id
@property
def chat_id(self) -> str:
return self._chat_id
def set_variables(self, variables: dict[str, Any]) -> None:
merged = dict(self._settings.variables)
merged.update(variables)
self._settings.variables = merged
async def stop(self, frame: EndFrame) -> None:
await self._close_active_response()
await self._client.close()
await super().stop(frame)
async def cancel(self, frame: CancelFrame) -> None:
await self._close_active_response()
await super().cancel(frame)
async def _handle_interruptions(self, _: InterruptionFrame) -> None:
await self._close_active_response()
await super()._handle_interruptions(_)
@staticmethod
def _welcome_text_from_init_payload(payload: Any) -> str:
if not isinstance(payload, dict):
return ""
for container in (payload.get("app"), payload.get("data"), payload):
if not isinstance(container, dict):
continue
nested_app = container.get("app")
if isinstance(nested_app, dict):
text = FastGPTLLMService._welcome_text_from_app(nested_app)
if text:
return text
text = FastGPTLLMService._welcome_text_from_app(container)
if text:
return text
return ""
@staticmethod
def _welcome_text_from_app(app_payload: dict[str, Any]) -> str:
chat_config = (
app_payload.get("chatConfig")
if isinstance(app_payload.get("chatConfig"), dict)
else {}
)
return _first_nonempty_text(
chat_config.get("welcomeText"),
app_payload.get("welcomeText"),
)
async def fetch_welcome_text(self) -> str | None:
"""Return FastGPT app welcome text from chat init when ``app_id`` is configured."""
if not self._app_id:
return None
try:
response = await self._client.get_chat_init(
appId=self._app_id,
chatId=self._chat_id,
)
response.raise_for_status()
text = self._welcome_text_from_init_payload(response.json())
if text:
logger.info(f"FastGPT welcomeText loaded for appId={self._app_id}")
return text or None
except FastGPTError as exc:
logger.warning(f"FastGPT chat init failed: {exc}")
except httpx.HTTPError as exc:
logger.warning(f"FastGPT chat init HTTP error: {exc}")
except Exception as exc:
logger.warning(f"FastGPT chat init error: {exc}")
return None
async def _close_active_response(self) -> None:
response = self._active_response
self._active_response = None
if response is not None:
await response.aclose()
def _build_fastgpt_messages(self, context: LLMContext) -> list[dict[str, str]]:
raw_messages = context.get_messages()
messages: list[dict[str, str]] = []
if self._send_system_prompt:
for message in raw_messages:
if not isinstance(message, dict) or message.get("role") != "system":
continue
text = _message_text(message)
if text:
messages.append({"role": "system", "content": text})
for message in reversed(raw_messages):
if not isinstance(message, dict) or message.get("role") != "user":
continue
text = _message_text(message)
if text:
messages.append({"role": "user", "content": text})
return messages
messages.append({"role": "user", "content": self._greeting_prompt})
return messages
async def _process_context(self, context: LLMContext) -> None:
messages = self._build_fastgpt_messages(context)
variables = self._settings.variables or None
logger.info(
"FastGPT chat completion "
f"chatId={self._chat_id} appId={self._app_id or '-'} "
f"variables={sorted((variables or {}).keys())} messages={messages!r}"
)
await self.start_ttfb_metrics()
try:
response = await self._client.create_chat_completion(
messages=messages,
stream=True,
chatId=self._chat_id,
variables=variables,
detail=self._settings.detail,
)
except FastGPTError as exc:
await self.push_error(error_msg=f"FastGPT request failed: {exc}", exception=exc)
return
except httpx.HTTPError as exc:
await self.push_error(error_msg=f"FastGPT HTTP error: {exc}", exception=exc)
return
self._active_response = response
try:
async for event in aiter_stream_events(response):
if event.kind in {"data", "answer", "fastAnswer"}:
text = _extract_text_from_event(event.kind, event.data)
if text:
await self.stop_ttfb_metrics()
await self.push_frame(LLMTextFrame(text))
continue
if event.kind == "interactive" and isinstance(event, FastGPTInteractiveEvent):
await self._handle_interactive(event)
break
if event.kind == "error":
payload = event.data if isinstance(event.data, dict) else {}
message = _first_nonempty_text(
payload.get("message"),
payload.get("error"),
) or "FastGPT stream error"
await self.push_error(error_msg=message)
break
if event.kind == "done":
break
finally:
self._active_response = None
await response.aclose()
async def _handle_interactive(self, event: FastGPTInteractiveEvent) -> None:
prompt = _interactive_spoken_prompt(event)
if prompt:
await self.stop_ttfb_metrics()
await self.push_frame(LLMTextFrame(prompt))
await self.push_frame(
OutputTransportMessageFrame(
message={
"type": "response.interactive",
"interaction_type": event.interaction_type,
"data": event.data,
}
),
FrameDirection.DOWNSTREAM,
)
async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
await super().process_frame(frame, direction)
if isinstance(frame, LLMContextFrame):
try:
await self.push_frame(LLMFullResponseStartFrame())
await self.start_processing_metrics()
await self._process_context(frame.context)
except httpx.TimeoutException as exc:
await self._call_event_handler("on_completion_timeout")
await self.push_error(error_msg="FastGPT completion timeout", exception=exc)
except Exception as exc:
await self.push_error(error_msg=f"FastGPT completion error: {exc}", exception=exc)
finally:
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())
else:
await self.push_frame(frame, direction)