Files
fastgpt-python-sdk/fastgpt_client/async_client.py
Xin Wang 07f30af105 feat: support base64 and upload image input modes in chat
Add a unified way to send images in chat: inline base64 data URLs
(passed through natively) or auto-upload via image_input_mode="upload",
which replaces inline data URLs with hosted URLs using upload_chat_image.

- New fastgpt_client/images.py with content-part / data-URL helpers
- image_input_mode + appId/outLinkAuthData params on create_chat_completion
  (sync and async); upload failures fall back to inline base64
- Tests covering helpers, both modes, validation, and fallback

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-04 09:50:37 +08:00

977 lines
33 KiB
Python

"""FastGPT Async Client - Asynchronous client for FastGPT API."""
import asyncio
import logging
import mimetypes
import os
import tempfile
from pathlib import Path
import weakref
from typing import Any, Dict, Literal, Union
import httpx
from .base_client import BaseClientMixin
from .exceptions import APIError, AuthenticationError, RateLimitError, ValidationError
from .images import decode_data_url, image_url_part, is_data_url
class AsyncFastGPTClient(BaseClientMixin):
"""Asynchronous FastGPT API client.
This client uses httpx.AsyncClient for efficient async connection pooling.
It's recommended to use this client as an async context manager.
Example:
async with AsyncFastGPTClient(api_key="your-key") as client:
response = await client.get_app_info(app_id="app-123")
"""
def __init__(
self,
api_key: str,
base_url: str = "http://localhost:3000",
timeout: float = 60.0,
max_retries: int = 3,
retry_delay: float = 1.0,
enable_logging: bool = False,
):
"""Initialize the Async FastGPT client.
Args:
api_key: Your FastGPT API key
base_url: Base URL for the FastGPT API
timeout: Request timeout in seconds (default: 60.0)
max_retries: Maximum number of retry attempts (default: 3)
retry_delay: Delay between retries in seconds (default: 1.0)
enable_logging: Whether to enable request logging (default: False)
"""
# Initialize base client functionality
super().__init__(api_key, base_url, timeout, max_retries, retry_delay, enable_logging)
connect_timeout = min(float(timeout), 15.0) if timeout and timeout > 0 else 15.0
self._client = httpx.AsyncClient(
base_url=self.base_url,
timeout=httpx.Timeout(timeout, connect=connect_timeout),
)
async def __aenter__(self):
"""Support async context manager protocol."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Clean up resources when exiting async context."""
await self.close()
async def close(self):
"""Close the HTTP client and release resources."""
if hasattr(self, "_client"):
await self._client.aclose()
async def _send_request(
self,
method: str,
endpoint: str,
json: Dict[str, Any] | None = None,
params: Dict[str, Any] | None = None,
stream: bool = False,
**kwargs,
):
"""Send an HTTP request to the FastGPT API with retry logic.
Args:
method: HTTP method (GET, POST, PUT, PATCH, DELETE)
endpoint: API endpoint path
json: JSON request body
params: Query parameters
stream: Whether to stream the response
**kwargs: Additional arguments to pass to httpx.request
Returns:
httpx.Response object
"""
# Validate parameters
if json:
self._validate_params(**json)
if params:
self._validate_params(**params)
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
async def make_request():
"""Inner function to perform the actual HTTP request."""
# Log request if logging is enabled
if self.enable_logging:
self.logger.info(f"Sending {method} request to {endpoint}")
# Debug logging for detailed information
if self.logger.isEnabledFor(logging.DEBUG):
if json:
self.logger.debug(f"Request body: {json}")
if params:
self.logger.debug(f"Request params: {params}")
# httpx.AsyncClient automatically prepends base_url
# For streaming, use stream() method; for non-streaming, use request()
if stream:
# httpx.stream() returns a context manager, enter it and return response
stream_context = self._client.stream(
method,
endpoint,
json=json,
params=params,
headers=headers,
**kwargs,
)
response = await stream_context.__aenter__()
# Store the stream context on the response
response._stream_context = stream_context
response._stream_context_closed = False
# Preserve the native async response closer and make both
# `await response.close()` and `await response.aclose()` safe.
original_aclose = response.aclose
async def close_with_context():
"""Close both the response and the stream context."""
if getattr(response, '_stream_context_closed', False):
return
try:
# Async streaming responses must use `aclose()`.
await original_aclose()
finally:
# Always close the stream context, even if response cleanup fails
if hasattr(response, '_stream_context') and response._stream_context is not None:
try:
await response._stream_context.__aexit__(None, None, None)
except Exception:
pass # Ignore errors during cleanup
finally:
response._stream_context = None
response._stream_context_closed = True
response.close = close_with_context
response.aclose = close_with_context
# Safety net: ensure cleanup on garbage collection
def cleanup_stream_context(stream_ctx_ref):
"""Finalizer to close stream context if response is GC'd without being closed."""
async def _cleanup():
stream_ctx = stream_ctx_ref()
if stream_ctx is not None:
try:
await stream_ctx.__aexit__(None, None, None)
except Exception:
pass # Ignore errors in finalizer
# Schedule cleanup on event loop
try:
loop = asyncio.get_running_loop()
loop.create_task(_cleanup())
except RuntimeError:
pass # No running loop
# Use weakref to avoid circular references
weakref.finalize(response, cleanup_stream_context, weakref.ref(stream_context))
else:
response = await self._client.request(
method,
endpoint,
json=json,
params=params,
headers=headers,
**kwargs,
)
# Log response if logging is enabled
if self.enable_logging:
self.logger.info(f"Received response: {response.status_code}")
return response
# Use the retry mechanism from base client
request_context = f"{method} {endpoint}"
response = await self._retry_request_async(make_request, request_context)
# Handle error responses (API errors don't retry)
self._handle_error_response(response)
return response
async def _retry_request_async(self, request_func, request_context: str):
"""Execute a request with retry logic (async version).
Args:
request_func: Async function that executes the HTTP request
request_context: Description of the request for logging
Returns:
Response from the request
Raises:
APIError: If all retries are exhausted
"""
last_exception = None
for attempt in range(self.max_retries):
try:
response = await request_func()
# Success on non-5xx responses
if response.status_code < 500:
return response
# Server error - will retry
if self.enable_logging:
self.logger.warning(
f"{request_context} failed with status {response.status_code} "
f"(attempt {attempt + 1}/{self.max_retries})"
)
if attempt < self.max_retries - 1:
# Exponential backoff
sleep_time = self.retry_delay * (2 ** attempt)
await asyncio.sleep(sleep_time)
except Exception as e:
last_exception = e
if self.enable_logging:
self.logger.warning(
f"{request_context} raised exception: {e} "
f"(attempt {attempt + 1}/{self.max_retries})"
)
if attempt < self.max_retries - 1:
sleep_time = self.retry_delay * (2 ** attempt)
await asyncio.sleep(sleep_time)
# All retries exhausted
if last_exception:
from .exceptions import APIError
raise APIError(f"Request failed after {self.max_retries} attempts: {last_exception}")
from .exceptions import APIError
raise APIError(f"Request failed after {self.max_retries} attempts")
def _handle_error_response(self, response) -> None:
"""Handle HTTP error responses and raise appropriate exceptions.
Args:
response: httpx.Response object
Raises:
AuthenticationError: If status code is 401
RateLimitError: If status code is 429
ValidationError: If status code is 422
APIError: For other 4xx and 5xx errors
"""
# Check status code first (doesn't consume response body)
if response.status_code < 400:
return # Success response
# Try to parse error message (this will consume the body, but that's OK for errors)
try:
error_data = response.json()
message = error_data.get("message", f"HTTP {response.status_code}")
except httpx.ResponseNotRead:
message = f"HTTP {response.status_code}"
error_data = None
except (ValueError, KeyError, AttributeError):
# If we can't parse JSON (e.g., streaming response or invalid JSON), use status code
message = f"HTTP {response.status_code}"
error_data = None
# Log error response if logging is enabled
if self.enable_logging:
self.logger.error(f"API error: {response.status_code} - {message}")
if response.status_code == 401:
raise AuthenticationError(message, response.status_code, error_data)
elif response.status_code == 429:
retry_after = response.headers.get("Retry-After")
raise RateLimitError(message, retry_after, response.status_code, error_data)
elif response.status_code == 422:
raise ValidationError(message, response.status_code, error_data)
elif response.status_code >= 400:
raise APIError(message, response.status_code, error_data)
class AsyncChatClient(AsyncFastGPTClient):
"""Async client for chat-related operations.
Example:
async with AsyncChatClient(api_key="fastgpt-xxxxx") as client:
response = await client.create_chat_completion(
messages=[{"role": "user", "content": "Hello!"}],
stream=False
)
"""
async def create_chat_completion(
self,
messages: list[dict],
stream: bool = False,
chatId: str | None = None,
detail: bool = False,
variables: dict[str, Any] | None = None,
responseChatItemId: str | None = None,
*,
image_input_mode: Literal["base64", "upload"] = "base64",
appId: str | None = None,
outLinkAuthData: dict[str, Any] | None = None,
):
"""Create a chat completion.
Args:
messages: Array of message objects with role and content
stream: Whether to stream the response
chatId: Chat ID for conversation context (optional)
detail: Whether to return detailed response data
variables: Template variables for substitution
responseChatItemId: Custom ID for the response message
image_input_mode: How to deliver inline base64 ``image_url`` parts.
``"base64"`` (default) sends the data URL as-is. ``"upload"``
uploads each inline data URL via :meth:`upload_chat_image` and
replaces it with the hosted URL (requires ``appId`` and
``chatId``). Image parts that already reference a plain URL are
left untouched in both modes.
appId: Application ID, required when ``image_input_mode="upload"``.
outLinkAuthData: Optional share-link auth payload forwarded to the
upload requests in ``"upload"`` mode.
Returns:
httpx.Response object
"""
self._validate_params(messages=messages)
if image_input_mode == "upload":
if not appId or not chatId:
raise ValidationError(
"image_input_mode='upload' requires both appId and chatId"
)
messages = await self._resolve_image_inputs(
messages,
appId=appId,
chatId=chatId,
outLinkAuthData=outLinkAuthData,
)
elif image_input_mode != "base64":
raise ValidationError("image_input_mode must be 'base64' or 'upload'")
data = {
"messages": messages,
"stream": stream,
"detail": detail,
}
if chatId:
data["chatId"] = chatId
if variables:
data["variables"] = variables
if responseChatItemId:
data["responseChatItemId"] = responseChatItemId
return await self._send_request(
"POST",
"/api/v1/chat/completions",
json=data,
stream=stream,
)
async def _send_first_available_request(
self,
method: str,
endpoints: list[str],
json: Dict[str, Any] | None = None,
params: Dict[str, Any] | None = None,
):
"""Try compatible FastGPT endpoint variants and return the first non-404 response."""
last_error = None
for endpoint in endpoints:
try:
return await self._send_request(method, endpoint, json=json, params=params)
except APIError as exc:
last_error = exc
if exc.status_code != 404:
raise
if last_error is not None:
raise last_error
raise APIError("No endpoint candidates provided")
@staticmethod
def _unwrap_response_data(data: Any) -> Any:
"""Unwrap FastGPT's standard envelope while preserving raw OpenAI-style responses."""
if isinstance(data, dict) and "data" in data and {"code", "message", "statusText"} & set(data):
return data.get("data")
return data
async def _get_chat_file_select_config(self, appId: str, chatId: str) -> dict[str, Any] | None:
"""Best-effort lookup for the app's chat file selection config."""
try:
response = await self.get_chat_init(appId=appId, chatId=chatId)
data = self._unwrap_response_data(response.json())
except Exception:
return None
if not isinstance(data, dict):
return None
app = data.get("app")
if not isinstance(app, dict):
return None
chat_config = app.get("chatConfig")
if not isinstance(chat_config, dict):
return None
file_select_config = chat_config.get("fileSelectConfig")
return file_select_config if isinstance(file_select_config, dict) else None
@staticmethod
def _default_file_select_config(file_type: str) -> dict[str, Any]:
"""Fallback config for servers that require fileSelectConfig in presign requests."""
is_image = file_type == "img"
return {
"canSelectFile": not is_image,
"canSelectImg": is_image,
"canSelectVideo": False,
"canSelectAudio": False,
"canSelectCustomFileExtension": False,
"customFileExtensionList": [],
}
async def presign_chat_file_post_url(
self,
*,
appId: str,
chatId: str,
filename: str,
fileSelectConfig: dict[str, Any] | None = None,
outLinkAuthData: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Request a presigned URL for uploading a chat file."""
payload: dict[str, Any] = {
"appId": appId,
"chatId": chatId,
"filename": filename,
"fileSelectConfig": fileSelectConfig or self._default_file_select_config("img"),
}
if outLinkAuthData:
payload["outLinkAuthData"] = outLinkAuthData
response = await self._send_first_available_request(
"POST",
[
"/api/core/chat/presignChatFilePostUrl",
"/api/core/chat/file/presignChatFilePostUrl",
],
json=payload,
)
data = self._unwrap_response_data(response.json())
if not isinstance(data, dict):
raise APIError("Invalid presigned upload response")
return data
async def presign_chat_file_get_url(
self,
*,
appId: str,
key: str,
outLinkAuthData: dict[str, Any] | None = None,
) -> str:
"""Request a temporary preview/download URL for an uploaded chat file."""
payload: dict[str, Any] = {
"appId": appId,
"key": key,
}
if outLinkAuthData:
payload["outLinkAuthData"] = outLinkAuthData
response = await self._send_first_available_request(
"POST",
[
"/api/core/chat/presignChatFileGetUrl",
"/api/core/chat/file/presignChatFileGetUrl",
],
json=payload,
)
data = self._unwrap_response_data(response.json())
if not isinstance(data, str):
raise APIError("Invalid presigned preview response")
return data
async def upload_to_presigned_url(
self,
*,
upload_url: str,
file_path: str | Path,
headers: dict[str, str] | None = None,
fields: dict[str, Any] | None = None,
method: Literal["POST", "PUT"] | None = None,
) -> httpx.Response:
"""Upload a file to a presigned S3/MinIO URL."""
path = Path(file_path)
content_type = mimetypes.guess_type(path.name)[0] or "application/octet-stream"
upload_method = method or ("POST" if fields else "PUT")
async with httpx.AsyncClient(timeout=self.timeout) as client:
if upload_method == "POST":
with path.open("rb") as file_obj:
response = await client.post(
upload_url,
data=fields or {},
files={"file": (path.name, file_obj, content_type)},
)
elif upload_method == "PUT":
upload_headers = dict(headers or {})
upload_headers.setdefault("Content-Type", content_type)
with path.open("rb") as file_obj:
response = await client.put(
upload_url,
content=file_obj,
headers=upload_headers,
)
else:
raise ValueError("method must be 'POST' or 'PUT'")
response.raise_for_status()
return response
async def upload_chat_file(
self,
*,
appId: str,
chatId: str,
file_path: str | Path,
file_type: Literal["img", "doc"] = "doc",
fileSelectConfig: dict[str, Any] | None = None,
outLinkAuthData: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Upload a chat file and return a normalized file reference."""
path = Path(file_path)
selected_config = (
fileSelectConfig
or await self._get_chat_file_select_config(appId=appId, chatId=chatId)
or self._default_file_select_config(file_type)
)
presigned = await self.presign_chat_file_post_url(
appId=appId,
chatId=chatId,
filename=path.name,
fileSelectConfig=selected_config,
outLinkAuthData=outLinkAuthData,
)
max_size = presigned.get("maxSize")
if max_size is not None and path.stat().st_size > max_size:
raise ValueError(f"File too large: {path.stat().st_size} bytes > {max_size} bytes")
await self.upload_to_presigned_url(
upload_url=presigned["url"],
file_path=path,
headers=presigned.get("headers"),
fields=presigned.get("fields"),
)
fields = presigned.get("fields") if isinstance(presigned.get("fields"), dict) else {}
key = presigned.get("key") or fields.get("key")
preview_url = presigned.get("previewUrl")
if not preview_url and key:
preview_url = await self.presign_chat_file_get_url(
appId=appId,
key=key,
outLinkAuthData=outLinkAuthData,
)
return {
"type": file_type,
"name": path.name,
"key": key,
"url": preview_url,
"previewUrl": preview_url,
}
async def upload_chat_image(
self,
*,
appId: str,
chatId: str,
file_path: str | Path,
fileSelectConfig: dict[str, Any] | None = None,
outLinkAuthData: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Upload an image for use in chat `image_url` content parts."""
return await self.upload_chat_file(
appId=appId,
chatId=chatId,
file_path=file_path,
file_type="img",
fileSelectConfig=fileSelectConfig,
outLinkAuthData=outLinkAuthData,
)
async def _resolve_image_inputs(
self,
messages: list[dict],
*,
appId: str,
chatId: str,
outLinkAuthData: dict[str, Any] | None = None,
) -> list[dict]:
"""Upload inline base64 ``image_url`` parts and swap in the hosted URLs.
Returns new message/content objects; the input ``messages`` are never
mutated. Parts whose URL is not an inline data URL are passed through.
"""
resolved: list[dict] = []
for message in messages:
content = message.get("content")
if not isinstance(content, list):
resolved.append(message)
continue
new_content: list[Any] = []
for part in content:
url = (
part.get("image_url", {}).get("url")
if isinstance(part, dict) and part.get("type") == "image_url"
else None
)
if is_data_url(url):
new_content.append(
image_url_part(
await self._upload_data_url(
url,
appId=appId,
chatId=chatId,
outLinkAuthData=outLinkAuthData,
)
)
)
else:
new_content.append(part)
resolved.append({**message, "content": new_content})
return resolved
async def _upload_data_url(
self,
data_url: str,
*,
appId: str,
chatId: str,
outLinkAuthData: dict[str, Any] | None = None,
) -> str:
"""Upload a ``data:image/...;base64,...`` URL and return the hosted URL.
Falls back to the original data URL if decoding or upload fails so the
request can still proceed with inline base64.
"""
try:
mime_type, raw = decode_data_url(data_url)
except ValueError as exc:
self.logger.warning("Skipping image upload; invalid base64 data URL: %s", exc)
return data_url
suffix = mimetypes.guess_extension(mime_type) or ".jpg"
tmp_path: str | None = None
try:
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp:
tmp.write(raw)
tmp_path = tmp.name
result = await self.upload_chat_image(
appId=appId,
chatId=chatId,
file_path=tmp_path,
outLinkAuthData=outLinkAuthData,
)
url = result.get("url") if isinstance(result, dict) else None
if isinstance(url, str) and url:
return url
self.logger.warning("Image upload returned no url; using inline base64")
return data_url
except Exception as exc: # noqa: BLE001 - graceful fallback to inline base64
self.logger.warning("Image upload failed; using inline base64: %s", exc)
return data_url
finally:
if tmp_path is not None:
try:
os.unlink(tmp_path)
except OSError:
pass
async def get_chat_histories(
self,
appId: str,
offset: int = 0,
pageSize: int = 20,
source: Literal["api", "online", "share", "test"] = "api",
):
"""Get chat histories for an application.
Args:
appId: Application ID
offset: Offset for pagination
pageSize: Number of records per page
source: Source filter (api, online, share, test)
Returns:
httpx.Response object
"""
data = {
"appId": appId,
"offset": offset,
"pageSize": pageSize,
"source": source,
}
return await self._send_request("POST", "/api/core/chat/getHistories", json=data)
async def get_chat_init(self, appId: str, chatId: str):
"""Get chat initialization information.
Args:
appId: Application ID
chatId: Chat ID
Returns:
httpx.Response object
"""
params = {"appId": appId, "chatId": chatId}
return await self._send_request("GET", "/api/core/chat/init", params=params)
async def get_chat_records(
self,
appId: str,
chatId: str,
offset: int = 0,
pageSize: int = 10,
loadCustomFeedbacks: bool = False,
):
"""Get chat records for a specific chat.
Args:
appId: Application ID
chatId: Chat ID
offset: Offset for pagination
pageSize: Number of records per page
loadCustomFeedbacks: Whether to load custom feedbacks
Returns:
httpx.Response object
"""
data = {
"appId": appId,
"chatId": chatId,
"offset": offset,
"pageSize": pageSize,
"loadCustomFeedbacks": loadCustomFeedbacks,
}
return await self._send_request("POST", "/api/core/chat/getPaginationRecords", json=data)
async def get_record_detail(self, appId: str, chatId: str, dataId: str):
"""Get detailed execution data for a specific record.
Args:
appId: Application ID
chatId: Chat ID
dataId: Record ID
Returns:
httpx.Response object
"""
params = {"appId": appId, "chatId": chatId, "dataId": dataId}
return await self._send_request("GET", "/api/core/chat/getResData", params=params)
async def update_chat_history(
self,
appId: str,
chatId: str,
customTitle: str | None = None,
top: bool | None = None,
):
"""Update chat history (title or pin status).
Args:
appId: Application ID
chatId: Chat ID
customTitle: Custom title for the chat
top: Whether to pin the chat
Returns:
httpx.Response object
"""
data = {
"appId": appId,
"chatId": chatId,
}
if customTitle is not None:
data["customTitle"] = customTitle
if top is not None:
data["top"] = top
return await self._send_request("POST", "/api/core/chat/updateHistory", json=data)
async def delete_chat_history(self, appId: str, chatId: str):
"""Delete a chat history.
Args:
appId: Application ID
chatId: Chat ID
Returns:
httpx.Response object
"""
params = {"appId": appId, "chatId": chatId}
return await self._send_request("DELETE", "/api/core/chat/delHistory", params=params)
async def clear_chat_histories(self, appId: str):
"""Clear all chat histories for an application.
Args:
appId: Application ID
Returns:
httpx.Response object
"""
params = {"appId": appId}
return await self._send_request("DELETE", "/api/core/chat/clearHistories", params=params)
async def delete_chat_record(self, appId: str, chatId: str, contentId: str):
"""Delete a single chat record.
Args:
appId: Application ID
chatId: Chat ID
contentId: Content ID of the record
Returns:
httpx.Response object
"""
# Try using JSON body first (some APIs prefer this for DELETE)
data = {"appId": appId, "chatId": chatId, "contentId": contentId}
return await self._send_request("DELETE", "/api/core/chat/item/delete", json=data)
async def send_feedback(
self,
appId: str,
chatId: str,
dataId: str,
userGoodFeedback: str | None = None,
userBadFeedback: str | None = None,
):
"""Send feedback for a chat message (like/dislike).
Args:
appId: Application ID
chatId: Chat ID
dataId: Message ID
userGoodFeedback: Positive feedback text (pass None to cancel like)
userBadFeedback: Negative feedback text (pass None to cancel dislike)
Returns:
httpx.Response object
"""
data = {
"appId": appId,
"chatId": chatId,
"dataId": dataId,
}
if userGoodFeedback is not None:
data["userGoodFeedback"] = userGoodFeedback
if userBadFeedback is not None:
data["userBadFeedback"] = userBadFeedback
return await self._send_request("POST", "/api/core/chat/feedback/updateUserFeedback", json=data)
async def get_suggested_questions(
self,
appId: str,
chatId: str,
questionGuide: dict[str, Any] | None = None,
):
"""Get suggested questions based on chat context.
Args:
appId: Application ID
chatId: Chat ID
questionGuide: Optional custom configuration for question guide
Returns:
httpx.Response object
"""
data = {
"appId": appId,
"chatId": chatId,
}
if questionGuide:
data["questionGuide"] = questionGuide
return await self._send_request("POST", "/api/core/ai/agent/v2/createQuestionGuide", json=data)
class AsyncAppClient(AsyncFastGPTClient):
"""Async client for application analytics and logs.
Example:
async with AsyncAppClient(api_key="fastgpt-xxxxx") as client:
logs = await client.get_app_logs_chart(appId="app-123")
"""
async def get_app_logs_chart(
self,
appId: str,
dateStart: str,
dateEnd: str,
offset: int = 1,
source: list[str] | None = None,
userTimespan: str = "day",
chatTimespan: str = "day",
appTimespan: str = "day",
):
"""Get application analytics chart data.
Args:
appId: Application ID
dateStart: Start date (ISO 8601 format)
dateEnd: End date (ISO 8601 format)
offset: Offset value
source: List of sources (test, online, share, api, etc.)
userTimespan: User data timespan (day, week, month)
chatTimespan: Chat data timespan (day, week, month)
appTimespan: App data timespan (day, week, month)
Returns:
httpx.Response object
"""
if source is None:
source = ["api"]
data = {
"appId": appId,
"dateStart": dateStart,
"dateEnd": dateEnd,
"offset": offset,
"source": source,
"userTimespan": userTimespan,
"chatTimespan": chatTimespan,
"appTimespan": appTimespan,
}
return await self._send_request("POST", "/api/proApi/core/app/logs/getChartData", json=data)