Add chat image upload support

This commit is contained in:
Xin Wang
2026-06-01 13:39:22 +08:00
parent a55ca37c39
commit 96d685da91
7 changed files with 951 additions and 4 deletions

View File

@@ -69,6 +69,39 @@ with ChatClient(api_key="fastgpt-xxxxx") as client:
) )
``` ```
### Chat with an Image
Upload the image to FastGPT's chat file storage first, then reference the returned URL in a chat message.
```python
from fastgpt_client import ChatClient
app_id = "your-app-id"
chat_id = "my_conversation_123"
with ChatClient(api_key="fastgpt-xxxxx", base_url="http://localhost:3000") as client:
image = client.upload_chat_image(
appId=app_id,
chatId=chat_id,
file_path="example.png",
)
response = client.create_chat_completion(
chatId=chat_id,
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": "Please describe this image."},
{"type": "image_url", "image_url": {"url": image["url"]}},
],
}
],
)
print(response.json()["choices"][0]["message"]["content"])
```
### Using Variables ### Using Variables
```python ```python

View File

@@ -58,6 +58,82 @@ print(result['choices'][0]['message']['content'])
--- ---
### upload_chat_image
Upload an image to FastGPT's chat file storage and return a normalized file reference.
```python
image = client.upload_chat_image(
appId="app-123",
chatId="chat-123",
file_path="example.png"
)
response = client.create_chat_completion(
chatId="chat-123",
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": "Describe this image"},
{"type": "image_url", "image_url": {"url": image["url"]}},
],
}
],
)
```
Returned file reference:
```python
{
"type": "img",
"name": "example.png",
"key": "chat/...",
"url": "https://...",
"previewUrl": "https://..."
}
```
---
### upload_chat_file
Upload a generic chat file. This is the lower-level helper used by `upload_chat_image`.
```python
file_ref = client.upload_chat_file(
appId="app-123",
chatId="chat-123",
file_path="document.pdf",
file_type="doc"
)
```
---
### presign_chat_file_post_url
Request a presigned upload URL for a chat file. The SDK supports both FastGPT response shapes:
- Presigned POST: `{ "url": "...", "fields": {...}, "maxSize": ... }`
- Presigned PUT: `{ "url": "...", "headers": {...}, "key": "...", "previewUrl": "..." }`
---
### presign_chat_file_get_url
Request a temporary preview/download URL by file `key`.
```python
preview_url = client.presign_chat_file_get_url(
appId="app-123",
key="chat/..."
)
```
---
### get_chat_histories ### get_chat_histories
Get chat histories for an application. Get chat histories for an application.

View File

@@ -7,6 +7,7 @@ Run from the examples directory with .env configured:
This example provides: This example provides:
- a full-screen Textual interface - a full-screen Textual interface
- streaming chat updates - streaming chat updates
- image turns with /image <path> [prompt]
- workflow / tool event logging - workflow / tool event logging
- modal handling for FastGPT interactive nodes - modal handling for FastGPT interactive nodes
""" """
@@ -15,6 +16,7 @@ from __future__ import annotations
import argparse import argparse
import json import json
import shlex
import sys import sys
import uuid import uuid
from pathlib import Path from pathlib import Path
@@ -44,6 +46,42 @@ from chat_cli import (
from fastgpt_client import ChatClient, FastGPTInteractiveEvent, iter_stream_events from fastgpt_client import ChatClient, FastGPTInteractiveEvent, iter_stream_events
DEFAULT_IMAGE_PROMPT = "Please describe this image."
IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".webp", ".gif", ".bmp"}
def parse_image_command(content: str) -> Optional[tuple[Path, str]]:
"""Parse `/image <path> [prompt]`, including terminal-dragged quoted paths."""
stripped = content.strip()
if not stripped.startswith("/image"):
return None
command, _, rest = stripped.partition(" ")
if command != "/image":
return None
if not rest.strip():
raise ValueError("Usage: /image <path> [prompt]")
try:
parts = shlex.split(rest)
except ValueError as exc:
raise ValueError(f"Could not parse image command: {exc}") from exc
if not parts:
raise ValueError("Usage: /image <path> [prompt]")
path = Path(parts[0]).expanduser()
if not path.exists():
raise ValueError(f"Image file not found: {path}")
if not path.is_file():
raise ValueError(f"Image path is not a file: {path}")
if path.suffix.lower() not in IMAGE_EXTENSIONS:
raise ValueError(f"Unsupported image extension: {path.suffix or '(none)'}")
prompt = " ".join(parts[1:]).strip() or DEFAULT_IMAGE_PROMPT
return path, prompt
class MessageCard(Static): class MessageCard(Static):
"""Lightweight message block used in the transcript pane.""" """Lightweight message block used in the transcript pane."""
@@ -435,7 +473,7 @@ class FastGPTWorkbench(App[None]):
yield Static("FastGPT Workbench", id="brand") yield Static("FastGPT Workbench", id="brand")
yield Static("", id="session_panel", classes="panel") yield Static("", id="session_panel", classes="panel")
yield Static("", id="status_panel", classes="panel") yield Static("", id="status_panel", classes="panel")
yield Static("Ctrl+J send\nCtrl+N new chat\nEsc closes modal prompts", classes="panel") yield Static("Ctrl+J send\nCtrl+N new chat\n/image <path> [prompt]", classes="panel")
yield RichLog(id="event_log", wrap=True, highlight=False, markup=False) yield RichLog(id="event_log", wrap=True, highlight=False, markup=False)
with Vertical(id="main_panel"): with Vertical(id="main_panel"):
yield Static("Claude-style FastGPT Console", id="chat_title") yield Static("Claude-style FastGPT Console", id="chat_title")
@@ -486,7 +524,7 @@ class FastGPTWorkbench(App[None]):
return content return content
def _default_session_message(self) -> str: def _default_session_message(self) -> str:
return "Start typing below. FastGPT workflow events will appear in the left rail." return "Start typing below. Use /image <path> [prompt] to attach a local image."
def _initial_session_message(self) -> str: def _initial_session_message(self) -> str:
if not APP_ID: if not APP_ID:
@@ -517,7 +555,14 @@ class FastGPTWorkbench(App[None]):
self.query_one(f"#{card_id}", MessageCard).set_text("Thinking…") self.query_one(f"#{card_id}", MessageCard).set_text("Thinking…")
return card_id return card_id
def _start_turn(self, content: str, *, title: str = "You", role: str = "user") -> None: def _start_turn(
self,
content: str,
*,
title: str = "You",
role: str = "user",
messages: Optional[List[Dict[str, Any]]] = None,
) -> None:
if self._busy: if self._busy:
self._log_event("[local] Busy streaming. Wait for the current turn to finish.") self._log_event("[local] Busy streaming. Wait for the current turn to finish.")
return return
@@ -528,7 +573,27 @@ class FastGPTWorkbench(App[None]):
self._busy = True self._busy = True
self._set_status("Streaming", "Receiving FastGPT output") self._set_status("Streaming", "Receiving FastGPT output")
self._stream_turn( self._stream_turn(
messages=[{"role": "user", "content": content}], messages=messages or [{"role": "user", "content": content}],
assistant_card_id=assistant_card_id,
)
def _start_image_turn(self, image_path: Path, prompt: str) -> None:
if self._busy:
self._log_event("[local] Busy streaming. Wait for the current turn to finish.")
return
if not APP_ID:
self._log_event("[local] APP_ID is required for image upload.")
self._set_status("Error", "APP_ID is required for image upload")
return
display_content = f"{prompt}\n\n[image] {image_path}"
self._append_message(role="user", title="You", content=display_content)
assistant_card_id = self._assistant_card()
self._busy = True
self._set_status("Uploading", image_path.name)
self._stream_image_turn(
image_path=image_path,
prompt=prompt,
assistant_card_id=assistant_card_id, assistant_card_id=assistant_card_id,
) )
@@ -580,8 +645,19 @@ class FastGPTWorkbench(App[None]):
content = composer.text.strip() content = composer.text.strip()
if not content: if not content:
return return
try:
image_command = parse_image_command(content)
except ValueError as exc:
self._log_event(f"[local] {exc}")
self._set_status("Error", str(exc))
return
composer.text = "" composer.text = ""
composer.focus() composer.focus()
if image_command is not None:
image_path, prompt = image_command
self._start_image_turn(image_path, prompt)
return
self._start_turn(content) self._start_turn(content)
def action_new_chat(self) -> None: def action_new_chat(self) -> None:
@@ -687,6 +763,104 @@ class FastGPTWorkbench(App[None]):
waiting_interactive=interactive_event is not None, waiting_interactive=interactive_event is not None,
) )
@work(thread=True, exclusive=True)
def _stream_image_turn(self, image_path: Path, prompt: str, assistant_card_id: str) -> None:
try:
with ChatClient(api_key=API_KEY, base_url=BASE_URL) as client:
self.call_from_thread(self._log_event, f"[image] uploading: {image_path}")
image = client.upload_chat_image(
appId=APP_ID,
chatId=self.chat_id,
file_path=image_path,
)
image_url = image.get("url")
if not image_url:
raise RuntimeError("FastGPT did not return an image preview URL")
self.call_from_thread(self._log_event, f"[image] uploaded: {image_path.name}")
self.call_from_thread(self._set_status, "Streaming", "Receiving FastGPT output")
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{"type": "image_url", "image_url": {"url": image_url}},
],
}
]
response = client.create_chat_completion(
messages=messages,
stream=True,
detail=True,
chatId=self.chat_id,
)
response.raise_for_status()
try:
for event in iter_stream_events(response):
if event.kind in {"data", "answer", "fastAnswer"}:
content = _extract_text_from_event(event.kind, event.data)
if content:
self.call_from_thread(self._append_assistant_chunk, assistant_card_id, content)
continue
if event.kind == "flowNodeStatus":
if isinstance(event.data, dict):
status = str(event.data.get("status") or "?")
node_name = str(event.data.get("nodeName") or event.data.get("name") or event.data.get("node_id") or "Unknown node")
self.call_from_thread(self._log_event, f"[flow] {status}: {node_name}")
else:
self.call_from_thread(self._log_event, f"[flow] {event.data}")
continue
if event.kind == "flowResponses":
if isinstance(event.data, dict):
module_name = str(event.data.get("moduleName") or event.data.get("nodeName") or "Unknown module")
self.call_from_thread(self._log_event, f"[flow] response from: {module_name}")
elif isinstance(event.data, list):
self.call_from_thread(self._log_event, f"[flow] response details: {len(event.data)} module record(s)")
else:
self.call_from_thread(self._log_event, f"[flow] response details: {event.data}")
continue
if event.kind == "toolCall":
tool_name = _tool_name_from_event(event.data)
self.call_from_thread(self._log_event, f"[tool] calling: {tool_name}")
continue
if event.kind == "toolParams":
self.call_from_thread(self._log_event, f"[tool] params: {event.data}")
continue
if event.kind == "toolResponse":
self.call_from_thread(self._log_event, f"[tool] response: {event.data}")
continue
if event.kind == "updateVariables":
self.call_from_thread(self._log_event, f"[vars] updated: {event.data}")
continue
if event.kind == "interactive":
self.call_from_thread(
self._log_event,
"[interactive] Image turns do not support workflow prompts yet.",
)
break
if event.kind == "error":
message = str(event.data.get("message") or event.data.get("error") or "Unknown FastGPT error")
raise RuntimeError(message)
if event.kind == "done":
break
finally:
response.close()
except Exception as exc:
self.call_from_thread(self._mark_turn_failed, assistant_card_id, str(exc))
return
self.call_from_thread(self._complete_turn, assistant_card_id, waiting_interactive=False)
def _parse_args() -> argparse.Namespace: def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Textual FastGPT chat workbench") parser = argparse.ArgumentParser(description="Textual FastGPT chat workbench")

View File

@@ -2,6 +2,8 @@
import asyncio import asyncio
import logging import logging
import mimetypes
from pathlib import Path
import weakref import weakref
from typing import Any, Dict, Literal, Union from typing import Any, Dict, Literal, Union
@@ -351,6 +353,245 @@ class AsyncChatClient(AsyncFastGPTClient):
stream=stream, stream=stream,
) )
async def _send_first_available_request(
self,
method: str,
endpoints: list[str],
json: Dict[str, Any] | None = None,
params: Dict[str, Any] | None = None,
):
"""Try compatible FastGPT endpoint variants and return the first non-404 response."""
last_error = None
for endpoint in endpoints:
try:
return await self._send_request(method, endpoint, json=json, params=params)
except APIError as exc:
last_error = exc
if exc.status_code != 404:
raise
if last_error is not None:
raise last_error
raise APIError("No endpoint candidates provided")
@staticmethod
def _unwrap_response_data(data: Any) -> Any:
"""Unwrap FastGPT's standard envelope while preserving raw OpenAI-style responses."""
if isinstance(data, dict) and "data" in data and {"code", "message", "statusText"} & set(data):
return data.get("data")
return data
async def _get_chat_file_select_config(self, appId: str, chatId: str) -> dict[str, Any] | None:
"""Best-effort lookup for the app's chat file selection config."""
try:
response = await self.get_chat_init(appId=appId, chatId=chatId)
data = self._unwrap_response_data(response.json())
except Exception:
return None
if not isinstance(data, dict):
return None
app = data.get("app")
if not isinstance(app, dict):
return None
chat_config = app.get("chatConfig")
if not isinstance(chat_config, dict):
return None
file_select_config = chat_config.get("fileSelectConfig")
return file_select_config if isinstance(file_select_config, dict) else None
@staticmethod
def _default_file_select_config(file_type: str) -> dict[str, Any]:
"""Fallback config for servers that require fileSelectConfig in presign requests."""
is_image = file_type == "img"
return {
"canSelectFile": not is_image,
"canSelectImg": is_image,
"canSelectVideo": False,
"canSelectAudio": False,
"canSelectCustomFileExtension": False,
"customFileExtensionList": [],
}
async def presign_chat_file_post_url(
self,
*,
appId: str,
chatId: str,
filename: str,
fileSelectConfig: dict[str, Any] | None = None,
outLinkAuthData: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Request a presigned URL for uploading a chat file."""
payload: dict[str, Any] = {
"appId": appId,
"chatId": chatId,
"filename": filename,
"fileSelectConfig": fileSelectConfig or self._default_file_select_config("img"),
}
if outLinkAuthData:
payload["outLinkAuthData"] = outLinkAuthData
response = await self._send_first_available_request(
"POST",
[
"/api/core/chat/presignChatFilePostUrl",
"/api/core/chat/file/presignChatFilePostUrl",
],
json=payload,
)
data = self._unwrap_response_data(response.json())
if not isinstance(data, dict):
raise APIError("Invalid presigned upload response")
return data
async def presign_chat_file_get_url(
self,
*,
appId: str,
key: str,
outLinkAuthData: dict[str, Any] | None = None,
) -> str:
"""Request a temporary preview/download URL for an uploaded chat file."""
payload: dict[str, Any] = {
"appId": appId,
"key": key,
}
if outLinkAuthData:
payload["outLinkAuthData"] = outLinkAuthData
response = await self._send_first_available_request(
"POST",
[
"/api/core/chat/presignChatFileGetUrl",
"/api/core/chat/file/presignChatFileGetUrl",
],
json=payload,
)
data = self._unwrap_response_data(response.json())
if not isinstance(data, str):
raise APIError("Invalid presigned preview response")
return data
async def upload_to_presigned_url(
self,
*,
upload_url: str,
file_path: str | Path,
headers: dict[str, str] | None = None,
fields: dict[str, Any] | None = None,
method: Literal["POST", "PUT"] | None = None,
) -> httpx.Response:
"""Upload a file to a presigned S3/MinIO URL."""
path = Path(file_path)
content_type = mimetypes.guess_type(path.name)[0] or "application/octet-stream"
upload_method = method or ("POST" if fields else "PUT")
async with httpx.AsyncClient(timeout=self.timeout) as client:
if upload_method == "POST":
with path.open("rb") as file_obj:
response = await client.post(
upload_url,
data=fields or {},
files={"file": (path.name, file_obj, content_type)},
)
elif upload_method == "PUT":
upload_headers = dict(headers or {})
upload_headers.setdefault("Content-Type", content_type)
with path.open("rb") as file_obj:
response = await client.put(
upload_url,
content=file_obj,
headers=upload_headers,
)
else:
raise ValueError("method must be 'POST' or 'PUT'")
response.raise_for_status()
return response
async def upload_chat_file(
self,
*,
appId: str,
chatId: str,
file_path: str | Path,
file_type: Literal["img", "doc"] = "doc",
fileSelectConfig: dict[str, Any] | None = None,
outLinkAuthData: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Upload a chat file and return a normalized file reference."""
path = Path(file_path)
selected_config = (
fileSelectConfig
or await self._get_chat_file_select_config(appId=appId, chatId=chatId)
or self._default_file_select_config(file_type)
)
presigned = await self.presign_chat_file_post_url(
appId=appId,
chatId=chatId,
filename=path.name,
fileSelectConfig=selected_config,
outLinkAuthData=outLinkAuthData,
)
max_size = presigned.get("maxSize")
if max_size is not None and path.stat().st_size > max_size:
raise ValueError(f"File too large: {path.stat().st_size} bytes > {max_size} bytes")
await self.upload_to_presigned_url(
upload_url=presigned["url"],
file_path=path,
headers=presigned.get("headers"),
fields=presigned.get("fields"),
)
fields = presigned.get("fields") if isinstance(presigned.get("fields"), dict) else {}
key = presigned.get("key") or fields.get("key")
preview_url = presigned.get("previewUrl")
if not preview_url and key:
preview_url = await self.presign_chat_file_get_url(
appId=appId,
key=key,
outLinkAuthData=outLinkAuthData,
)
return {
"type": file_type,
"name": path.name,
"key": key,
"url": preview_url,
"previewUrl": preview_url,
}
async def upload_chat_image(
self,
*,
appId: str,
chatId: str,
file_path: str | Path,
fileSelectConfig: dict[str, Any] | None = None,
outLinkAuthData: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Upload an image for use in chat `image_url` content parts."""
return await self.upload_chat_file(
appId=appId,
chatId=chatId,
file_path=file_path,
file_type="img",
fileSelectConfig=fileSelectConfig,
outLinkAuthData=outLinkAuthData,
)
async def get_chat_histories( async def get_chat_histories(
self, self,
appId: str, appId: str,

View File

@@ -1,6 +1,8 @@
"""FastGPT Client - Main synchronous client.""" """FastGPT Client - Main synchronous client."""
import logging import logging
import mimetypes
from pathlib import Path
import weakref import weakref
from typing import Any, Dict, Literal, Union from typing import Any, Dict, Literal, Union
@@ -295,6 +297,255 @@ class ChatClient(FastGPTClient):
stream=stream, stream=stream,
) )
def _send_first_available_request(
self,
method: str,
endpoints: list[str],
json: Dict[str, Any] | None = None,
params: Dict[str, Any] | None = None,
):
"""Try compatible FastGPT endpoint variants and return the first non-404 response."""
last_error = None
for endpoint in endpoints:
try:
return self._send_request(method, endpoint, json=json, params=params)
except APIError as exc:
last_error = exc
if exc.status_code != 404:
raise
if last_error is not None:
raise last_error
raise APIError("No endpoint candidates provided")
@staticmethod
def _unwrap_response_data(data: Any) -> Any:
"""Unwrap FastGPT's standard envelope while preserving raw OpenAI-style responses."""
if isinstance(data, dict) and "data" in data and {"code", "message", "statusText"} & set(data):
return data.get("data")
return data
def _get_chat_file_select_config(self, appId: str, chatId: str) -> dict[str, Any] | None:
"""Best-effort lookup for the app's chat file selection config."""
try:
response = self.get_chat_init(appId=appId, chatId=chatId)
data = self._unwrap_response_data(response.json())
except Exception:
return None
if not isinstance(data, dict):
return None
app = data.get("app")
if not isinstance(app, dict):
return None
chat_config = app.get("chatConfig")
if not isinstance(chat_config, dict):
return None
file_select_config = chat_config.get("fileSelectConfig")
return file_select_config if isinstance(file_select_config, dict) else None
@staticmethod
def _default_file_select_config(file_type: str) -> dict[str, Any]:
"""Fallback config for servers that require fileSelectConfig in presign requests."""
is_image = file_type == "img"
return {
"canSelectFile": not is_image,
"canSelectImg": is_image,
"canSelectVideo": False,
"canSelectAudio": False,
"canSelectCustomFileExtension": False,
"customFileExtensionList": [],
}
def presign_chat_file_post_url(
self,
*,
appId: str,
chatId: str,
filename: str,
fileSelectConfig: dict[str, Any] | None = None,
outLinkAuthData: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Request a presigned URL for uploading a chat file.
FastGPT versions differ on the endpoint path and response shape. This
method supports both the current local route and the newer `/file/`
route, and returns the unwrapped presign payload.
"""
payload: dict[str, Any] = {
"appId": appId,
"chatId": chatId,
"filename": filename,
"fileSelectConfig": fileSelectConfig or self._default_file_select_config("img"),
}
if outLinkAuthData:
payload["outLinkAuthData"] = outLinkAuthData
response = self._send_first_available_request(
"POST",
[
"/api/core/chat/presignChatFilePostUrl",
"/api/core/chat/file/presignChatFilePostUrl",
],
json=payload,
)
data = self._unwrap_response_data(response.json())
if not isinstance(data, dict):
raise APIError("Invalid presigned upload response")
return data
def presign_chat_file_get_url(
self,
*,
appId: str,
key: str,
outLinkAuthData: dict[str, Any] | None = None,
) -> str:
"""Request a temporary preview/download URL for an uploaded chat file."""
payload: dict[str, Any] = {
"appId": appId,
"key": key,
}
if outLinkAuthData:
payload["outLinkAuthData"] = outLinkAuthData
response = self._send_first_available_request(
"POST",
[
"/api/core/chat/presignChatFileGetUrl",
"/api/core/chat/file/presignChatFileGetUrl",
],
json=payload,
)
data = self._unwrap_response_data(response.json())
if not isinstance(data, str):
raise APIError("Invalid presigned preview response")
return data
def upload_to_presigned_url(
self,
*,
upload_url: str,
file_path: str | Path,
headers: dict[str, str] | None = None,
fields: dict[str, Any] | None = None,
method: Literal["POST", "PUT"] | None = None,
) -> httpx.Response:
"""Upload a file to a presigned S3/MinIO URL.
FastGPT may return either presigned POST fields or PUT headers. This
method detects both formats and deliberately avoids FastGPT auth headers.
"""
path = Path(file_path)
content_type = mimetypes.guess_type(path.name)[0] or "application/octet-stream"
upload_method = method or ("POST" if fields else "PUT")
if upload_method == "POST":
with path.open("rb") as file_obj:
response = httpx.post(
upload_url,
data=fields or {},
files={"file": (path.name, file_obj, content_type)},
timeout=self.timeout,
)
elif upload_method == "PUT":
upload_headers = dict(headers or {})
upload_headers.setdefault("Content-Type", content_type)
with path.open("rb") as file_obj:
response = httpx.put(
upload_url,
content=file_obj,
headers=upload_headers,
timeout=self.timeout,
)
else:
raise ValueError("method must be 'POST' or 'PUT'")
response.raise_for_status()
return response
def upload_chat_file(
self,
*,
appId: str,
chatId: str,
file_path: str | Path,
file_type: Literal["img", "doc"] = "doc",
fileSelectConfig: dict[str, Any] | None = None,
outLinkAuthData: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Upload a chat file and return a normalized file reference."""
path = Path(file_path)
selected_config = (
fileSelectConfig
or self._get_chat_file_select_config(appId=appId, chatId=chatId)
or self._default_file_select_config(file_type)
)
presigned = self.presign_chat_file_post_url(
appId=appId,
chatId=chatId,
filename=path.name,
fileSelectConfig=selected_config,
outLinkAuthData=outLinkAuthData,
)
max_size = presigned.get("maxSize")
if max_size is not None and path.stat().st_size > max_size:
raise ValueError(f"File too large: {path.stat().st_size} bytes > {max_size} bytes")
self.upload_to_presigned_url(
upload_url=presigned["url"],
file_path=path,
headers=presigned.get("headers"),
fields=presigned.get("fields"),
)
fields = presigned.get("fields") if isinstance(presigned.get("fields"), dict) else {}
key = presigned.get("key") or fields.get("key")
preview_url = presigned.get("previewUrl")
if not preview_url and key:
preview_url = self.presign_chat_file_get_url(
appId=appId,
key=key,
outLinkAuthData=outLinkAuthData,
)
return {
"type": file_type,
"name": path.name,
"key": key,
"url": preview_url,
"previewUrl": preview_url,
}
def upload_chat_image(
self,
*,
appId: str,
chatId: str,
file_path: str | Path,
fileSelectConfig: dict[str, Any] | None = None,
outLinkAuthData: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Upload an image for use in chat `image_url` content parts."""
return self.upload_chat_file(
appId=appId,
chatId=chatId,
file_path=file_path,
file_type="img",
fileSelectConfig=fileSelectConfig,
outLinkAuthData=outLinkAuthData,
)
def get_chat_histories( def get_chat_histories(
self, self,
appId: str, appId: str,

View File

@@ -358,6 +358,64 @@ class TestAsyncChatClient:
assert response.status_code == 200 assert response.status_code == 200
await client.close() await client.close()
@pytest.mark.asyncio
async def test_presign_chat_file_post_url(self, api_key):
"""Test async presigned chat file upload URL creation."""
client = AsyncChatClient(api_key)
mock_response = Mock(spec=httpx.Response)
mock_response.status_code = 200
mock_response.json = Mock(return_value={
"code": 200,
"message": "",
"data": {"url": "https://s3.example/upload", "fields": {"key": "chat/file.png"}},
})
with patch.object(client, "_send_request", AsyncMock(return_value=mock_response)) as mock_send:
result = await client.presign_chat_file_post_url(
appId="app-123",
chatId="chat-123",
filename="file.png",
fileSelectConfig={"canSelectImg": True},
)
assert result["fields"]["key"] == "chat/file.png"
assert mock_send.call_args[0][1] == "/api/core/chat/presignChatFilePostUrl"
await client.close()
@pytest.mark.asyncio
async def test_upload_chat_image_with_fields_gets_preview_url(self, api_key, tmp_path):
"""Test async upload_chat_image normalizes presigned POST uploads."""
client = AsyncChatClient(api_key)
file_path = tmp_path / "file.png"
file_path.write_bytes(b"png")
with patch.object(client, "_get_chat_file_select_config", AsyncMock(return_value={"canSelectImg": True})), \
patch.object(client, "presign_chat_file_post_url", AsyncMock(return_value={
"url": "https://s3.example/upload",
"fields": {"key": "chat/file.png"},
"maxSize": 100,
})) as mock_presign, \
patch.object(client, "upload_to_presigned_url", AsyncMock()) as mock_upload, \
patch.object(client, "presign_chat_file_get_url", AsyncMock(return_value="https://s3.example/preview.png")) as mock_get:
result = await client.upload_chat_image(
appId="app-123",
chatId="chat-123",
file_path=file_path,
)
assert result == {
"type": "img",
"name": "file.png",
"key": "chat/file.png",
"url": "https://s3.example/preview.png",
"previewUrl": "https://s3.example/preview.png",
}
assert mock_presign.call_args[1]["fileSelectConfig"] == {"canSelectImg": True}
mock_upload.assert_awaited_once()
mock_get.assert_awaited_once_with(appId="app-123", key="chat/file.png", outLinkAuthData=None)
await client.close()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_get_chat_histories_basic(self, api_key, sample_chat_histories_response): async def test_get_chat_histories_basic(self, api_key, sample_chat_histories_response):
"""Test getting chat histories with basic parameters.""" """Test getting chat histories with basic parameters."""

View File

@@ -162,6 +162,120 @@ class TestChatClientCreateChatCompletion:
mock_send.assert_called_once() mock_send.assert_called_once()
class TestChatClientFileUpload:
"""Test suite for ChatClient chat file upload helpers."""
def test_presign_chat_file_post_url_uses_local_path(self, api_key):
client = ChatClient(api_key)
mock_response = Mock(spec=httpx.Response)
mock_response.status_code = 200
mock_response.json = Mock(return_value={
"code": 200,
"message": "",
"data": {"url": "https://s3.example/upload", "fields": {"key": "chat/file.png"}},
})
with patch.object(client, "_send_request", return_value=mock_response) as mock_send:
result = client.presign_chat_file_post_url(
appId="app-123",
chatId="chat-123",
filename="file.png",
fileSelectConfig={"canSelectImg": True},
)
assert result["fields"]["key"] == "chat/file.png"
assert mock_send.call_args[0][1] == "/api/core/chat/presignChatFilePostUrl"
assert mock_send.call_args[1]["json"]["fileSelectConfig"] == {"canSelectImg": True}
def test_presign_chat_file_get_url_unwraps_string(self, api_key):
client = ChatClient(api_key)
mock_response = Mock(spec=httpx.Response)
mock_response.status_code = 200
mock_response.json = Mock(return_value={
"code": 200,
"message": "",
"data": "https://s3.example/preview.png",
})
with patch.object(client, "_send_request", return_value=mock_response) as mock_send:
result = client.presign_chat_file_get_url(appId="app-123", key="chat/file.png")
assert result == "https://s3.example/preview.png"
assert mock_send.call_args[0][1] == "/api/core/chat/presignChatFileGetUrl"
def test_upload_to_presigned_url_with_post_fields(self, api_key, tmp_path):
client = ChatClient(api_key)
file_path = tmp_path / "file.png"
file_path.write_bytes(b"png")
mock_response = Mock(spec=httpx.Response)
mock_response.raise_for_status = Mock()
with patch("fastgpt_client.client.httpx.post", return_value=mock_response) as mock_post:
response = client.upload_to_presigned_url(
upload_url="https://s3.example/upload",
file_path=file_path,
fields={"key": "chat/file.png", "policy": "abc"},
)
assert response is mock_response
mock_post.assert_called_once()
assert mock_post.call_args[1]["data"] == {"key": "chat/file.png", "policy": "abc"}
assert "file" in mock_post.call_args[1]["files"]
def test_upload_to_presigned_url_with_put_headers(self, api_key, tmp_path):
client = ChatClient(api_key)
file_path = tmp_path / "file.png"
file_path.write_bytes(b"png")
mock_response = Mock(spec=httpx.Response)
mock_response.raise_for_status = Mock()
with patch("fastgpt_client.client.httpx.put", return_value=mock_response) as mock_put:
response = client.upload_to_presigned_url(
upload_url="https://s3.example/upload",
file_path=file_path,
headers={"x-amz-meta-test": "1"},
)
assert response is mock_response
mock_put.assert_called_once()
assert mock_put.call_args[1]["headers"]["x-amz-meta-test"] == "1"
assert mock_put.call_args[1]["headers"]["Content-Type"] == "image/png"
def test_upload_chat_image_with_fields_gets_preview_url(self, api_key, tmp_path):
client = ChatClient(api_key)
file_path = tmp_path / "file.png"
file_path.write_bytes(b"png")
with patch.object(client, "_get_chat_file_select_config", return_value={"canSelectImg": True}), \
patch.object(client, "presign_chat_file_post_url", return_value={
"url": "https://s3.example/upload",
"fields": {"key": "chat/file.png"},
"maxSize": 100,
}) as mock_presign, \
patch.object(client, "upload_to_presigned_url") as mock_upload, \
patch.object(client, "presign_chat_file_get_url", return_value="https://s3.example/preview.png") as mock_get:
result = client.upload_chat_image(
appId="app-123",
chatId="chat-123",
file_path=file_path,
)
assert result == {
"type": "img",
"name": "file.png",
"key": "chat/file.png",
"url": "https://s3.example/preview.png",
"previewUrl": "https://s3.example/preview.png",
}
assert mock_presign.call_args[1]["fileSelectConfig"] == {"canSelectImg": True}
mock_upload.assert_called_once()
mock_get.assert_called_once_with(appId="app-123", key="chat/file.png", outLinkAuthData=None)
class TestChatClientGetChatHistories: class TestChatClientGetChatHistories:
"""Test suite for ChatClient.get_chat_histories method.""" """Test suite for ChatClient.get_chat_histories method."""