diff --git a/.gitignore b/.gitignore
index 09bf5d6..e72cd47 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,5 @@
*.pyc
-__pycache__/
\ No newline at end of file
+__pycache__/
+logs/
+*.log
+.env
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index 8fbd8d0..7ab0031 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -5,4 +5,16 @@ python-dotenv>=1.0.0
httpx>=0.25.0
pytest>=7.4.0
pytest-asyncio>=0.21.0
-pytest-cov>=4.1.0
\ No newline at end of file
+pytest-cov>=4.1.0
+pillow>=10.4.0
+paho-mqtt>=2.1.0
+pydantic-settings==2.1.0
+python-multipart==0.0.6
+python-jose[cryptography]==3.3.0
+passlib[bcrypt]==1.7.4
+openai==1.55.3
+loguru>=0.7.0
+pandas
+requests
+sqlalchemy
+pymysql
diff --git a/src/.env.example b/src/.env.example
new file mode 100644
index 0000000..64320bc
--- /dev/null
+++ b/src/.env.example
@@ -0,0 +1,10 @@
+DATABASE_URL=sqlite:///./test.db
+SECRET_KEY=your_secret_key
+DEBUG=True
+
+ANALYSIS_SERVICE_URL=http://127.0.0.1:3030
+ANALYSIS_AUTH_TOKEN=fastgpt-hSPnXMoBNGVAEpTLkQT3YfAnN26gQSyvLd4ABL1MRDoh68nL4RDlopFHXqmH8
+APP_ID=683ea1bc86197e19f71fc1ae
+DELETE_SESSION_URL=http://127.0.0.1:3030/api/core/chat/delHistory?chatId={chatId}&appId={appId}
+DELETE_CHAT_URL=http://127.0.0.1:3030/api/core/chat/item/delete?contentId={contentId}&chatId={chatId}&appId={appId}
+GET_CHAT_RECORDS_URL=http://127.0.0.1:3030/api/core/chat/getPaginationRecords
diff --git a/src/api/endpoints.py b/src/api/endpoints.py
index 47a755d..046dc89 100644
--- a/src/api/endpoints.py
+++ b/src/api/endpoints.py
@@ -1,37 +1,21 @@
-from fastapi import APIRouter, HTTPException
+from fastapi import APIRouter, HTTPException, Depends
from ..schemas.models import ProcessRequest_chat, ProcessResponse_chat, ProcessRequest_get, ProcessResponse_get, ProcessRequest_set, ProcessResponse_set, ProcessResponse_delete_session, ProcessRequest_delete_session
-import httpx
-from datetime import datetime
-import os
-from dotenv import load_dotenv
-from urllib import parse as parse
+from fastgpt_client import AsyncChatClient
+from fastgpt_client.exceptions import (
+ APIError, AuthenticationError, RateLimitError, ValidationError
+)
+from ..core.fastgpt_client import get_fastgpt_client
+from ..core.config import Config
+from loguru import logger
import json
-
-load_dotenv()
+import re
router = APIRouter()
-
-ANALYSIS_SERVICE_URL = os.getenv("ANALYSIS_SERVICE_URL", None)
-if ANALYSIS_SERVICE_URL is None:
- raise ValueError("ANALYSIS_SERVICE_URL environment variable not set")
-ANALYSIS_AUTH_TOKEN = os.getenv("ANALYSIS_AUTH_TOKEN", None)
-if ANALYSIS_AUTH_TOKEN is None:
- raise ValueError("ANALYSIS_AUTH_TOKEN environment variable not set")
-DELETE_SESSION_URL = os.getenv("DELETE_SESSION_URL", None)
-if DELETE_SESSION_URL is None:
- raise ValueError("DELETE_SESSION_URL environment variable not set")
-APP_ID = os.getenv("APP_ID", None)
-if APP_ID is None:
- raise ValueError("APP_ID environment variable not set")
-DELETE_CHAT_URL = os.getenv("DELETE_CHAT_URL", None)
-if DELETE_CHAT_URL is None:
- raise ValueError("DELETE_CHAT_URL environment variable not set")
-GET_CHAT_RECORDS_URL = os.getenv("GET_CHAT_RECORDS_URL", None)
-if GET_CHAT_RECORDS_URL is None:
- raise ValueError("GET_CHAT_RECORDS_URL environment variable not set")
STATUS_CODE_MAP = {
'0000': '结束通话',
'0001': '转接人工',
+ '0002': '语义无法识别转接人工',
+ '0003': '有人伤转接人工',
'1001': '未准备好通话',
'1002': '通话中',
'2000': '进入单车拍照',
@@ -49,44 +33,63 @@ STATUS_CODE_MAP = {
'2016': '确认双车中的车牌'
}
+def extract_state_and_content(data1: str) -> dict | None:
+ """
+ Extracts the state and content from a string in the format STATEcontent.
-async def delete_last_two_chat_records(sessionId):
- # 获取最后两条聊天记录
- headers = {
- "Authorization": f"Bearer {ANALYSIS_AUTH_TOKEN}",
- "Content-Type": "application/json"
- }
- chat_data = {
- "appId": APP_ID,
- "chatId": sessionId,
- "offset": 0,
- }
- async with httpx.AsyncClient() as client:
- response = await client.post(
- GET_CHAT_RECORDS_URL,
- json=chat_data,
- headers=headers,
- timeout=None
+ Args:
+ data1: The input string.
+
+ Returns:
+ A dictionary with 'state' and 'content' keys if a match is found,
+ otherwise None.
+ """
+ data1 = data1.strip()
+ regex = r"(.*?)(.*)"
+ match = re.search(regex, data1)
+
+ if match:
+ return {
+ "state": match.group(1),
+ "content": match.group(2),
+ }
+ return None
+
+async def delete_last_two_chat_records(
+ client: AsyncChatClient,
+ session_id: str
+) -> None:
+ """Delete the last two chat records."""
+ try:
+ # Get chat records using SDK
+ response = await client.get_chat_records(
+ appId=Config.FASTGPT_APP_ID,
+ chatId=session_id,
+ offset=0,
+ pageSize=10
)
response.raise_for_status()
data = response.json()
- last_two_dataId = [x['dataId'] for x in data['data']['list'][-2:]]
-
- # 删除最后聊天聊天记录
- for contentId in last_two_dataId:
- headers = {
- "Authorization": f"Bearer {ANALYSIS_AUTH_TOKEN}",
- "Content-Type": "application/json"
- }
- url = DELETE_CHAT_URL.format(contentId=contentId, chatId=sessionId, appId=APP_ID)
- async with httpx.AsyncClient() as client:
- response = await client.delete(
- url,
- headers=headers,
- timeout=None
+
+ records = data.get('data', {}).get('list', [])
+ if len(records) < 2:
+ logger.warning(f"Less than 2 records found for session {session_id}")
+ return
+
+ last_two_data_ids = [record['dataId'] for record in records[-2:]]
+
+ # Delete records using SDK
+ for content_id in last_two_data_ids:
+ delete_response = await client.delete_chat_record(
+ appId=Config.FASTGPT_APP_ID,
+ chatId=session_id,
+ contentId=content_id
)
- response.raise_for_status()
- data = response.json()
+ delete_response.raise_for_status()
+
+ except Exception as e:
+ logger.error(f"Error deleting chat records: {e}")
+ raise
# @app.exception_handler(RequestValidationError)
@@ -95,37 +98,60 @@ async def delete_last_two_chat_records(sessionId):
# raise HTTPException(status_code=422, detail=exc.errors())
@router.post("/chat", response_model=ProcessResponse_chat)
-async def chat(request: ProcessRequest_chat):
+async def chat(
+ request: ProcessRequest_chat,
+ client: AsyncChatClient = Depends(get_fastgpt_client)
+):
+ """Handle chat completion request."""
+ json_data = request.model_dump()
+ logger.info(f"用户请求信息ProcessRequest_chat: {json_data}")
+
try:
- headers = {
- "Authorization": f"Bearer {ANALYSIS_AUTH_TOKEN}",
- "Content-Type": "application/json"
- }
- json_data = request.model_dump()
- chat_data = {
- "chatId": json_data['sessionId'],
- "stream": False,
- "detail": True,
- "messages": [
- {
- "role": "user",
- "content": json_data['text']
- }
- ]
- }
+ # Use SDK's create_chat_completion
+ response = await client.create_chat_completion(
+ messages=[{"role": "user", "content": json_data['text']}],
+ chatId=json_data['sessionId'],
+ stream=False,
+ detail=True
+ )
+ response.raise_for_status()
+ data = response.json()
- # Call external analysis service
- async with httpx.AsyncClient() as client:
- response = await client.post(
- ANALYSIS_SERVICE_URL,
- json=chat_data,
- headers=headers,
- timeout=None
- )
- response.raise_for_status()
- data = response.json()
+ except AuthenticationError as e:
+ logger.error(f"Authentication error: {e}")
+ return ProcessResponse_chat(
+ sessionId=json_data['sessionId'],
+ timeStamp=json_data['timeStamp'],
+ outputText="",
+ nextStage="",
+ nextStageCode="",
+ code="401",
+ msg="认证失败"
+ )
+ except RateLimitError as e:
+ logger.error(f"Rate limit error: {e}")
+ return ProcessResponse_chat(
+ sessionId=json_data['sessionId'],
+ timeStamp=json_data['timeStamp'],
+ outputText="",
+ nextStage="",
+ nextStageCode="",
+ code="429",
+ msg="请求过于频繁,请稍后重试"
+ )
+ except APIError as e:
+ logger.error(f"API error: {e}")
+ return ProcessResponse_chat(
+ sessionId=json_data['sessionId'],
+ timeStamp=json_data['timeStamp'],
+ outputText="",
+ nextStage="",
+ nextStageCode="",
+ code="500",
+ msg="大模型服务器无响应"
+ )
except Exception as e:
- print(e)
+ logger.error(f"Unexpected error: {e}")
return ProcessResponse_chat(
sessionId=json_data['sessionId'],
timeStamp=json_data['timeStamp'],
@@ -139,26 +165,59 @@ async def chat(request: ProcessRequest_chat):
try:
# Extract content from FastGPT response
content = data['choices'][0]['message']['content']
+ logger.info(f"FastGPT服务返回信息content: {content}")
+
finish_reason = data['choices'][0]['finish_reason']
- transfer_to_human = data['newVariables']['state'].get("transfer_to_human", False)
- ywrysw = data['newVariables']['state'].get("ywrysw", False)
- ywfjdc = data['newVariables']['state'].get("ywfjdc", False)
- ywmtc = data['newVariables']['state'].get("ywmtc", False)
- jdcsl = data['newVariables']['state'].get("jdcsl", 0)
- accident_info_complete = data['newVariables']['state'].get("accident_info_complete", False)
- user_is_ready = data['newVariables']['state'].get("user_is_ready", False)
+ # Extract state variables
+ state = data.get('newVariables', {}).get('state', {})
+ if isinstance(state, str):
+ state = json.loads(state)
+
+ transfer_to_human = state.get("transfer_to_human", False)
+ ywrysw = state.get("ywrysw", False)
+ ywfjdc = state.get("ywfjdc", False)
+ ywmtc = state.get("ywmtc", False)
+ jdcsl = state.get("jdcsl", 0)
+ accident_info_complete = state.get("accident_info_complete", False)
+ user_is_ready = state.get("user_is_ready", False)
if isinstance(user_is_ready, str):
user_is_ready = user_is_ready.lower() == 'true'
- driver_info_complete = data['newVariables']['state'].get("driver_info_complete", False)
- drivers_info_complete = data['newVariables']['state'].get("drivers_info_complete", False)
- driver_info_check = data['newVariables']['state'].get("drivers_info_check", False)
- drivers_info_check = data['newVariables']['state'].get("drivers_info_check", False)
+ driver_info_complete = state.get("driver_info_complete", False)
+ drivers_info_complete = state.get("drivers_info_complete", False)
+ driver_info_check = state.get("drivers_info_check", False)
+ drivers_info_check = state.get("drivers_info_check", False)
- print(data['newVariables'])
+ logger.debug(f"State variables: {data.get('newVariables', {})}")
- nextStageCode=data['newVariables']['status_code']
- nextStage=STATUS_CODE_MAP[nextStageCode]
+ nextStageCode = data['newVariables']['status_code']
+
+ # 有一些情况需要调整nextStageCode
+ if nextStageCode in ['3001', '3002', '1002']:
+ nextStageCode = '1002'
+ elif nextStageCode == '2006':
+ nextStageCode = '2004'
+ elif nextStageCode == '2017':
+ nextStageCode = '2016'
+ elif nextStageCode == '2020':
+ nextStageCode = '0002'
+ nextStage = STATUS_CODE_MAP.get(nextStageCode, '')
+
+ # Parse content - sometimes content is a string, sometimes it is a list
+ if isinstance(content, list):
+ logger.debug("content是一个list")
+ content = content[0]['text']['content']
+ elif isinstance(content, str):
+ logger.debug("content是一个str")
+ state_and_content = extract_state_and_content(content)
+ if state_and_content:
+ logger.debug(f"解析后的state和content为: {state_and_content}")
+ content = state_and_content['content']
+ else:
+ raise ValueError("大模型回复中的state解析失败")
+ else:
+ logger.error(f"content既不是list也不是str, type: {type(content)}")
+ raise ValueError("大模型回复不是list也不是str")
return ProcessResponse_chat(
sessionId=json_data['sessionId'],
@@ -170,7 +229,8 @@ async def chat(request: ProcessRequest_chat):
msg="",
)
except Exception as e:
- print(e)
+ logger.error(f"解析信息发生错误: {e}")
+ logger.error(f"content: {content}, type: {type(content)}")
return ProcessResponse_chat(
sessionId=json_data['sessionId'],
timeStamp=json_data['timeStamp'],
@@ -182,121 +242,43 @@ async def chat(request: ProcessRequest_chat):
)
-@router.post("/set_info", response_model=ProcessResponse_set)
-async def set_info(request: ProcessRequest_set):
- # 获取当前state
+@router.post("/set_info", response_model=ProcessResponse_set)
+async def set_info(
+ request: ProcessRequest_set,
+ client: AsyncChatClient = Depends(get_fastgpt_client)
+):
+ """Set information in chat state."""
+ json_data = request.model_dump()
+
try:
- headers = {
- "Authorization": f"Bearer {ANALYSIS_AUTH_TOKEN}",
- "Content-Type": "application/json"
- }
- json_data = request.model_dump()
- chat_data = {
- "chatId": json_data['sessionId'],
- "stream": False,
- "detail": True,
- "messages": [
- {
- "role": "user",
- "content": ""
- }
- ]
- }
+ # Get current state
+ response = await client.create_chat_completion(
+ messages=[{"role": "user", "content": ""}],
+ chatId=json_data['sessionId'],
+ stream=False,
+ detail=True
+ )
+ response.raise_for_status()
+ data = response.json()
- # Call external analysis service
- async with httpx.AsyncClient() as client:
- response = await client.post(
- ANALYSIS_SERVICE_URL,
- json=chat_data,
- headers=headers,
- timeout=None
- )
- response.raise_for_status()
- data = response.json()
- current_state = data['newVariables']['state']
- if isinstance(current_state, str):
- current_state = json.loads(current_state)
- print(current_state)
+ current_state = data['newVariables']['state']
+ if isinstance(current_state, str):
+ current_state = json.loads(current_state)
+ logger.debug(f"Current state: {current_state}")
+
except Exception as e:
- print(e)
+ logger.error(f"Error getting current state: {e}")
return ProcessResponse_set(
sessionId=json_data['sessionId'],
timeStamp=json_data['timeStamp'],
code="500",
msg="大模型服务器无响应"
)
-
+
try:
- await delete_last_two_chat_records(json_data['sessionId'])
+ await delete_last_two_chat_records(client, json_data['sessionId'])
except Exception as e:
- print(e)
- return ProcessResponse_set(
- sessionId=json_data['sessionId'],
- timeStamp=json_data['timeStamp'],
- code="500",
- msg="大模型后台无响应"
- )
-
- # 修改当前state
- try:
- headers = {
- "Authorization": f"Bearer {ANALYSIS_AUTH_TOKEN}",
- "Content-Type": "application/json"
- }
- json_data = request.model_dump()
- # key_map = {"xm1": "xm1", "hpzl1": "hpzl1", "hphm1": "driver1License", "sfzmhm1": "driver1ID", "sfzmwh1": "sfzmwh1", "sjhm1": "driver1Phonenumber","sjwh1": "sjwh1", "xm2":"xm2", "hpzl2": "hpzl2", "hphm2": "driver2License", "sfzmhm2": "driver2ID", "sfzmwh2": "sfzmwh2", "sjhm2": "driver2Phonenumber", "sjwh2": "sjwh2"}
- # if json_data['key'] not in key_map:
- # raise Exception("check your key")
- # key = key_map.get(json_data['key'])
- key = json_data['key']
- value = json_data['value']
- current_state[json_data['key']] = value
- print(f'即将上传 {current_state}')
- # if key == 'driver1ID' or key == 'driver2ID' or key == 'driver1Phonenumber' or key == 'driver2Phonenumber':
- # current_state[json_data['key'].replace('hm','wh')] = value[-4:]
-
- chat_data = {
- "chatId": json_data['sessionId'],
- "stream": False,
- "detail": True,
- "variables": {'state':current_state},
- "messages": [
- {
- "role": "user",
- "content": ""
- }
- ]
- }
- # Call external analysis service
- async with httpx.AsyncClient() as client:
- response = await client.post(
- ANALYSIS_SERVICE_URL,
- json=chat_data,
- headers=headers,
- timeout=None
- )
- response.raise_for_status()
-
- return ProcessResponse_set(
- sessionId=json_data['sessionId'],
- timeStamp=json_data['timeStamp'],
- code="200",
- msg=""
- )
-
- except Exception as e:
- print(e)
- return ProcessResponse_set(
- sessionId=json_data['sessionId'],
- timeStamp=json_data['timeStamp'],
- code="500",
- msg="大模型后台无响应"
- )
-
- try:
- await delete_last_two_chat_records(json_data['sessionId'])
- except Exception as e:
- print(e)
+ logger.error(f"Error deleting chat records: {e}")
return ProcessResponse_set(
sessionId=json_data['sessionId'],
timeStamp=json_data['timeStamp'],
@@ -304,43 +286,68 @@ async def set_info(request: ProcessRequest_set):
msg="大模型后台无响应"
)
-@router.post("/get_info", response_model=ProcessResponse_get)
-async def get_info(request: ProcessRequest_get):
- # 获取当前state
try:
- headers = {
- "Authorization": f"Bearer {ANALYSIS_AUTH_TOKEN}",
- "Content-Type": "application/json"
- }
- json_data = request.model_dump()
- chat_data = {
- "chatId": json_data['sessionId'],
- "stream": False,
- "detail": True,
- "messages": [
- {
- "role": "user",
- "content": ""
- }
- ]
- }
+ # Update state
+ key = json_data['key']
+ value = json_data['value']
+ current_state[key] = value
+ logger.info(f'即将上传 {current_state}')
+
+ # Update state using SDK
+ response = await client.create_chat_completion(
+ messages=[{"role": "user", "content": ""}],
+ chatId=json_data['sessionId'],
+ stream=False,
+ detail=True,
+ variables={'state': current_state}
+ )
+ response.raise_for_status()
+
+ # Delete records again after update
+ await delete_last_two_chat_records(client, json_data['sessionId'])
+
+ return ProcessResponse_set(
+ sessionId=json_data['sessionId'],
+ timeStamp=json_data['timeStamp'],
+ code="200",
+ msg=""
+ )
- # Call external analysis service
- async with httpx.AsyncClient() as client:
- response = await client.post(
- ANALYSIS_SERVICE_URL,
- json=chat_data,
- headers=headers,
- timeout=None
- )
- response.raise_for_status()
- data = response.json()
- current_state = data['newVariables']['state']
- if isinstance(current_state, str):
- current_state = json.loads(current_state)
- print(current_state)
except Exception as e:
- print(e)
+ logger.error(f"Error setting info: {e}")
+ return ProcessResponse_set(
+ sessionId=json_data['sessionId'],
+ timeStamp=json_data['timeStamp'],
+ code="500",
+ msg="大模型后台无响应"
+ )
+
+@router.post("/get_info", response_model=ProcessResponse_get)
+async def get_info(
+ request: ProcessRequest_get,
+ client: AsyncChatClient = Depends(get_fastgpt_client)
+):
+ """Get information from chat state."""
+ json_data = request.model_dump()
+
+ try:
+ # Get current state
+ response = await client.create_chat_completion(
+ messages=[{"role": "user", "content": ""}],
+ chatId=json_data['sessionId'],
+ stream=False,
+ detail=True
+ )
+ response.raise_for_status()
+ data = response.json()
+
+ current_state = data['newVariables']['state']
+ if isinstance(current_state, str):
+ current_state = json.loads(current_state)
+ logger.debug(f"Current state: {current_state}")
+
+ except Exception as e:
+ logger.error(f"Error getting state: {e}")
return ProcessResponse_get(
sessionId=json_data['sessionId'],
timeStamp=json_data['timeStamp'],
@@ -348,11 +355,11 @@ async def get_info(request: ProcessRequest_get):
code="500",
msg="大模型服务器无响应"
)
-
+
try:
- await delete_last_two_chat_records(json_data['sessionId'])
+ await delete_last_two_chat_records(client, json_data['sessionId'])
except Exception as e:
- print(e)
+ logger.error(f"Error deleting records: {e}")
return ProcessResponse_get(
sessionId=json_data['sessionId'],
timeStamp=json_data['timeStamp'],
@@ -360,69 +367,39 @@ async def get_info(request: ProcessRequest_get):
code="500",
msg="大模型后台无响应"
)
-
+
try:
key = json_data['key']
- # key_map = {"hpzl1": "hpzl1", "hphm1": "driver1License", "sfzmhm1": "driver1ID", "sfzmwh1": "sfzmwh1", "sjhm1": "driver1Phonenumber","sjwh1": "sjwh1", "hpzl2": "hpzl2", "hphm2": "driver2License", "sfzmhm2": "driver2ID", "sfzmwh2": "sfzmwh2", "sjhm2": "driver2Phonenumber", "sjwh2": "sjwh2"}
acd_keys = ['ywrysw', 'ywfjdc', 'ywmtc', 'bjrjs', 'sgfssj', 'sfsgxc', 'jdcsl', 'sgyy']
human1_keys = ['xm1', 'hpzl1', 'hphm1', 'sfzmhm1', 'sfzmwh1', 'sjhm1', 'sjwh1', 'csbw1']
human2_keys = ['xm2', 'hpzl2', 'hphm2', 'sfzmhm2', 'sfzmwh2', 'sjhm2', 'sjwh2', 'csbw2']
+
+ def bool_to_str(v):
+ """Convert boolean to string representation."""
+ return '1' if v is True else '0' if v is False else v
+
if key == 'all':
- acd_values = {}
- for k in acd_keys:
- v = current_state.get(k, '')
- if isinstance(v, bool):
- v = '1' if v==True else '0'
- acd_values[k] = v
- human1_values = {}
- for k in human1_keys:
- v = current_state.get(k, '')
- if isinstance(v, bool):
- v = '1' if v==True else '0'
- human1_values[k] = v
- human2_values = {}
- for k in human2_keys:
- v = current_state.get(k, '')
- if isinstance(v, bool):
- v = '1' if v==True else '0'
- human2_values[k] = v
- value = {'acdinfo': acd_values, 'acdhuman1': human1_values, 'acdhuman2': human2_values}
- value = json.dumps(value)
+ acd_values = {k: bool_to_str(current_state.get(k, '')) for k in acd_keys}
+ human1_values = {k: bool_to_str(current_state.get(k, '')) for k in human1_keys}
+ human2_values = {k: bool_to_str(current_state.get(k, '')) for k in human2_keys}
+ value = json.dumps({
+ 'acdinfo': acd_values,
+ 'acdhuman1': human1_values,
+ 'acdhuman2': human2_values
+ })
elif key == "acdinfo":
- acd_values = {}
- for k in acd_keys:
- v = current_state.get(k, '')
- if isinstance(v, bool):
- v = '1' if v==True else '0'
- acd_values[k] = v
+ acd_values = {k: bool_to_str(current_state.get(k, '')) for k in acd_keys}
value = json.dumps(acd_values)
elif key == 'acdhuman1':
- human1_values = {}
- for k in human1_keys:
- v = current_state.get(k, '')
- if isinstance(v, bool):
- v = '1' if v==True else '0'
- human1_values[k] = v
- value = json.dumps(human1_value)
+ human1_values = {k: bool_to_str(current_state.get(k, '')) for k in human1_keys}
+ value = json.dumps(human1_values)
elif key == 'acdhuman2':
- human2_values = {}
- for k in human1_keys:
- v = current_state.get(k, '')
- if isinstance(v, bool):
- v = '1' if v==True else '0'
- human2_values[k] = v
- value = json.dumps(human2_value)
+ human2_values = {k: bool_to_str(current_state.get(k, '')) for k in human2_keys}
+ value = json.dumps(human2_values)
else:
- # if key in key_map:
- # real_key = key_map.get(key)
- # else:
- # real_key = key
- value = current_state.get(key, '')
- value = json.dumps(value)
- # if value == 'null':
- # raise Exception("check your key")
-
- print(json_data)
+ value = json.dumps(current_state.get(key, ''))
+
+ logger.debug(f"Returning value for key '{key}': {value}")
return ProcessResponse_get(
sessionId=json_data['sessionId'],
timeStamp=json_data['timeStamp'],
@@ -430,8 +407,9 @@ async def get_info(request: ProcessRequest_get):
code="200",
msg=""
)
+
except Exception as e:
- print(e)
+ logger.error(f"Error getting info: {e}")
return ProcessResponse_get(
sessionId=json_data['sessionId'],
timeStamp=json_data['timeStamp'],
@@ -442,39 +420,32 @@ async def get_info(request: ProcessRequest_get):
@router.delete("/delete_session", response_model=ProcessResponse_delete_session)
-async def delete_session(request: ProcessRequest_delete_session):
- try:
- headers = {
- "Authorization": f"Bearer {ANALYSIS_AUTH_TOKEN}",
- "Content-Type": "application/json"
- }
- json_data = request.model_dump()
- chatId = json_data.get('sessionId', None)
- if chatId is None:
- raise ValueError(f'Error in chatId.')
- url = DELETE_SESSION_URL.format(appId=APP_ID, chatId=chatId)
-
- # Call external analysis service
- async with httpx.AsyncClient() as client:
- response = await client.delete(
- url,
- headers=headers,
- timeout=None
- )
- response.raise_for_status()
- data = response.json()
- # print(data)
- except Exception as e:
- print(e)
- return ProcessResponse_chat(
- sessionId=json_data['sessionId'],
+async def delete_session(
+ request: ProcessRequest_delete_session,
+ client: AsyncChatClient = Depends(get_fastgpt_client)
+):
+ """Delete a chat session."""
+ json_data = request.model_dump()
+ chat_id = json_data.get('sessionId')
+
+ if not chat_id:
+ return ProcessResponse_delete_session(
+ sessionId="",
timeStamp=json_data['timeStamp'],
- code="500",
- msg="大模型服务器无响应"
+ code="400",
+ msg="sessionId is required"
)
-
+
try:
- if data['code'] == 200:
+ # Use SDK's delete_chat_history
+ response = await client.delete_chat_history(
+ appId=Config.FASTGPT_APP_ID,
+ chatId=chat_id
+ )
+ response.raise_for_status()
+ data = response.json()
+
+ if data.get('code') == 200:
return ProcessResponse_delete_session(
sessionId=json_data['sessionId'],
timeStamp=json_data['timeStamp'],
@@ -483,8 +454,9 @@ async def delete_session(request: ProcessRequest_delete_session):
)
else:
raise ValueError("删除会话失败")
+
except Exception as e:
- print(e)
+ logger.error(f"Error deleting session: {e}")
return ProcessResponse_delete_session(
sessionId=json_data['sessionId'],
timeStamp=json_data['timeStamp'],
diff --git a/src/core/config.py b/src/core/config.py
index 14d15ab..3692aba 100644
--- a/src/core/config.py
+++ b/src/core/config.py
@@ -8,4 +8,23 @@ class Config:
API_V1_STR = "/api/v1"
SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key")
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./test.db")
- DEBUG = os.getenv("DEBUG", "false").lower() in ("true", "1", "t")
\ No newline at end of file
+ DEBUG = os.getenv("DEBUG", "false").lower() in ("true", "1", "t")
+
+ # FastGPT Configuration
+ FASTGPT_API_KEY = os.getenv("ANALYSIS_AUTH_TOKEN")
+ FASTGPT_BASE_URL = os.getenv("ANALYSIS_SERVICE_URL")
+ FASTGPT_APP_ID = os.getenv("APP_ID")
+ DELETE_SESSION_URL = os.getenv("DELETE_SESSION_URL")
+ DELETE_CHAT_URL = os.getenv("DELETE_CHAT_URL")
+ GET_CHAT_RECORDS_URL = os.getenv("GET_CHAT_RECORDS_URL")
+
+ @classmethod
+ def validate(cls):
+ """Validate required configuration."""
+ required = [
+ ("FASTGPT_API_KEY", cls.FASTGPT_API_KEY),
+ ("FASTGPT_APP_ID", cls.FASTGPT_APP_ID),
+ ]
+ missing = [name for name, value in required if not value]
+ if missing:
+ raise ValueError(f"Missing required environment variables: {', '.join(missing)}")
\ No newline at end of file
diff --git a/src/core/fastgpt_client.py b/src/core/fastgpt_client.py
new file mode 100644
index 0000000..ee582d8
--- /dev/null
+++ b/src/core/fastgpt_client.py
@@ -0,0 +1,39 @@
+"""FastGPT client dependency injection."""
+from contextlib import asynccontextmanager
+from fastapi import FastAPI
+from fastgpt_client import AsyncChatClient
+from .config import Config
+
+# Global client instance
+_fastgpt_client: AsyncChatClient | None = None
+
+
+@asynccontextmanager
+async def lifespan(app: FastAPI):
+ """Manage FastGPT client lifecycle."""
+ global _fastgpt_client
+ Config.validate()
+
+ # Initialize client
+ _fastgpt_client = AsyncChatClient(
+ api_key=Config.FASTGPT_API_KEY,
+ base_url=Config.FASTGPT_BASE_URL,
+ timeout=60.0,
+ max_retries=3,
+ retry_delay=1.0,
+ enable_logging=Config.DEBUG,
+ )
+ await _fastgpt_client.__aenter__()
+
+ yield
+
+ # Cleanup
+ if _fastgpt_client:
+ await _fastgpt_client.__aexit__(None, None, None)
+
+
+def get_fastgpt_client() -> AsyncChatClient:
+ """Get the FastGPT client instance."""
+ if _fastgpt_client is None:
+ raise RuntimeError("FastGPT client not initialized")
+ return _fastgpt_client
diff --git a/src/core/logging_config.py b/src/core/logging_config.py
new file mode 100644
index 0000000..981604c
--- /dev/null
+++ b/src/core/logging_config.py
@@ -0,0 +1,79 @@
+"""Loguru logging configuration."""
+import sys
+import logging
+import inspect
+from loguru import logger
+from pathlib import Path
+from .config import Config
+
+# Remove default logger
+logger.remove()
+
+# Add console handler with color
+logger.add(
+ sys.stderr,
+ format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} - {message}",
+ level="DEBUG" if Config.DEBUG else "INFO",
+ colorize=True,
+)
+
+# Add file handler with rotation
+log_dir = Path("logs")
+log_dir.mkdir(exist_ok=True)
+
+logger.add(
+ log_dir / "server_{time:YYYY-MM-DD}.log",
+ rotation="00:00", # Rotate at midnight
+ retention="30 days", # Keep logs for 30 days
+ compression="zip", # Compress old logs
+ format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} - {message}",
+ level="DEBUG",
+ encoding="utf-8",
+)
+
+# Add error file handler
+logger.add(
+ log_dir / "error_{time:YYYY-MM-DD}.log",
+ rotation="00:00",
+ retention="90 days",
+ compression="zip",
+ format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} - {message}",
+ level="ERROR",
+ encoding="utf-8",
+)
+
+# Configure uvicorn logging to use loguru
+class InterceptHandler(logging.Handler):
+ """Intercept standard logging messages toward loguru."""
+
+ def emit(self, record: logging.LogRecord) -> None:
+ # Get corresponding Loguru level if it exists
+ try:
+ level = logger.level(record.levelname).name
+ except ValueError:
+ level = str(record.levelno)
+
+ # Find caller from where originated the logged message
+ frame, depth = inspect.currentframe(), 2
+ while frame and frame.f_code.co_filename == logging.__file__:
+ frame = frame.f_back
+ depth += 1
+
+ logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage())
+
+
+def setup_logging():
+ """Configure logging for the application."""
+ # Intercept uvicorn and fastapi loggers
+ logging.getLogger("uvicorn").handlers = [InterceptHandler()]
+ logging.getLogger("uvicorn.access").handlers = [InterceptHandler()]
+ logging.getLogger("fastapi").handlers = [InterceptHandler()]
+
+ # Set log levels
+ logging.getLogger("uvicorn").setLevel(logging.INFO)
+ logging.getLogger("uvicorn.access").setLevel(logging.INFO)
+ logging.getLogger("fastapi").setLevel(logging.INFO)
+
+ # Intercept httpx logs if needed
+ logging.getLogger("httpx").handlers = [InterceptHandler()]
+ logging.getLogger("httpx").setLevel(logging.WARNING)
diff --git a/src/main.py b/src/main.py
index bcb2ebc..25f7d45 100644
--- a/src/main.py
+++ b/src/main.py
@@ -1,11 +1,17 @@
from fastapi import FastAPI
import sys
from .api.endpoints import router as api_router
+from .core.fastgpt_client import lifespan
+from .core.logging_config import setup_logging
+
+# Setup logging first
+setup_logging()
app = FastAPI(
title="AI Accident Information Collection API",
description="AI Accident Information Collection API",
- version="1.0.0"
+ version="1.0.0",
+ lifespan=lifespan
)
@app.get("/")