From d5f81ef79f634338a01c982b0be964417344632f Mon Sep 17 00:00:00 2001 From: Xin Wang Date: Thu, 8 Jan 2026 16:22:25 +0800 Subject: [PATCH] use fastgpt python sdk --- .gitignore | 5 +- requirements.txt | 14 +- src/.env.example | 10 + src/api/endpoints.py | 628 ++++++++++++++++++------------------- src/core/config.py | 21 +- src/core/fastgpt_client.py | 39 +++ src/core/logging_config.py | 79 +++++ src/main.py | 8 +- 8 files changed, 472 insertions(+), 332 deletions(-) create mode 100644 src/.env.example create mode 100644 src/core/fastgpt_client.py create mode 100644 src/core/logging_config.py diff --git a/.gitignore b/.gitignore index 09bf5d6..e72cd47 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,5 @@ *.pyc -__pycache__/ \ No newline at end of file +__pycache__/ +logs/ +*.log +.env \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 8fbd8d0..7ab0031 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,16 @@ python-dotenv>=1.0.0 httpx>=0.25.0 pytest>=7.4.0 pytest-asyncio>=0.21.0 -pytest-cov>=4.1.0 \ No newline at end of file +pytest-cov>=4.1.0 +pillow>=10.4.0 +paho-mqtt>=2.1.0 +pydantic-settings==2.1.0 +python-multipart==0.0.6 +python-jose[cryptography]==3.3.0 +passlib[bcrypt]==1.7.4 +openai==1.55.3 +loguru>=0.7.0 +pandas +requests +sqlalchemy +pymysql diff --git a/src/.env.example b/src/.env.example new file mode 100644 index 0000000..64320bc --- /dev/null +++ b/src/.env.example @@ -0,0 +1,10 @@ +DATABASE_URL=sqlite:///./test.db +SECRET_KEY=your_secret_key +DEBUG=True + +ANALYSIS_SERVICE_URL=http://127.0.0.1:3030 +ANALYSIS_AUTH_TOKEN=fastgpt-hSPnXMoBNGVAEpTLkQT3YfAnN26gQSyvLd4ABL1MRDoh68nL4RDlopFHXqmH8 +APP_ID=683ea1bc86197e19f71fc1ae +DELETE_SESSION_URL=http://127.0.0.1:3030/api/core/chat/delHistory?chatId={chatId}&appId={appId} +DELETE_CHAT_URL=http://127.0.0.1:3030/api/core/chat/item/delete?contentId={contentId}&chatId={chatId}&appId={appId} +GET_CHAT_RECORDS_URL=http://127.0.0.1:3030/api/core/chat/getPaginationRecords diff --git a/src/api/endpoints.py b/src/api/endpoints.py index 47a755d..046dc89 100644 --- a/src/api/endpoints.py +++ b/src/api/endpoints.py @@ -1,37 +1,21 @@ -from fastapi import APIRouter, HTTPException +from fastapi import APIRouter, HTTPException, Depends from ..schemas.models import ProcessRequest_chat, ProcessResponse_chat, ProcessRequest_get, ProcessResponse_get, ProcessRequest_set, ProcessResponse_set, ProcessResponse_delete_session, ProcessRequest_delete_session -import httpx -from datetime import datetime -import os -from dotenv import load_dotenv -from urllib import parse as parse +from fastgpt_client import AsyncChatClient +from fastgpt_client.exceptions import ( + APIError, AuthenticationError, RateLimitError, ValidationError +) +from ..core.fastgpt_client import get_fastgpt_client +from ..core.config import Config +from loguru import logger import json - -load_dotenv() +import re router = APIRouter() - -ANALYSIS_SERVICE_URL = os.getenv("ANALYSIS_SERVICE_URL", None) -if ANALYSIS_SERVICE_URL is None: - raise ValueError("ANALYSIS_SERVICE_URL environment variable not set") -ANALYSIS_AUTH_TOKEN = os.getenv("ANALYSIS_AUTH_TOKEN", None) -if ANALYSIS_AUTH_TOKEN is None: - raise ValueError("ANALYSIS_AUTH_TOKEN environment variable not set") -DELETE_SESSION_URL = os.getenv("DELETE_SESSION_URL", None) -if DELETE_SESSION_URL is None: - raise ValueError("DELETE_SESSION_URL environment variable not set") -APP_ID = os.getenv("APP_ID", None) -if APP_ID is None: - raise ValueError("APP_ID environment variable not set") -DELETE_CHAT_URL = os.getenv("DELETE_CHAT_URL", None) -if DELETE_CHAT_URL is None: - raise ValueError("DELETE_CHAT_URL environment variable not set") -GET_CHAT_RECORDS_URL = os.getenv("GET_CHAT_RECORDS_URL", None) -if GET_CHAT_RECORDS_URL is None: - raise ValueError("GET_CHAT_RECORDS_URL environment variable not set") STATUS_CODE_MAP = { '0000': '结束通话', '0001': '转接人工', + '0002': '语义无法识别转接人工', + '0003': '有人伤转接人工', '1001': '未准备好通话', '1002': '通话中', '2000': '进入单车拍照', @@ -49,44 +33,63 @@ STATUS_CODE_MAP = { '2016': '确认双车中的车牌' } +def extract_state_and_content(data1: str) -> dict | None: + """ + Extracts the state and content from a string in the format STATEcontent. -async def delete_last_two_chat_records(sessionId): - # 获取最后两条聊天记录 - headers = { - "Authorization": f"Bearer {ANALYSIS_AUTH_TOKEN}", - "Content-Type": "application/json" - } - chat_data = { - "appId": APP_ID, - "chatId": sessionId, - "offset": 0, - } - async with httpx.AsyncClient() as client: - response = await client.post( - GET_CHAT_RECORDS_URL, - json=chat_data, - headers=headers, - timeout=None + Args: + data1: The input string. + + Returns: + A dictionary with 'state' and 'content' keys if a match is found, + otherwise None. + """ + data1 = data1.strip() + regex = r"(.*?)(.*)" + match = re.search(regex, data1) + + if match: + return { + "state": match.group(1), + "content": match.group(2), + } + return None + +async def delete_last_two_chat_records( + client: AsyncChatClient, + session_id: str +) -> None: + """Delete the last two chat records.""" + try: + # Get chat records using SDK + response = await client.get_chat_records( + appId=Config.FASTGPT_APP_ID, + chatId=session_id, + offset=0, + pageSize=10 ) response.raise_for_status() data = response.json() - last_two_dataId = [x['dataId'] for x in data['data']['list'][-2:]] - - # 删除最后聊天聊天记录 - for contentId in last_two_dataId: - headers = { - "Authorization": f"Bearer {ANALYSIS_AUTH_TOKEN}", - "Content-Type": "application/json" - } - url = DELETE_CHAT_URL.format(contentId=contentId, chatId=sessionId, appId=APP_ID) - async with httpx.AsyncClient() as client: - response = await client.delete( - url, - headers=headers, - timeout=None + + records = data.get('data', {}).get('list', []) + if len(records) < 2: + logger.warning(f"Less than 2 records found for session {session_id}") + return + + last_two_data_ids = [record['dataId'] for record in records[-2:]] + + # Delete records using SDK + for content_id in last_two_data_ids: + delete_response = await client.delete_chat_record( + appId=Config.FASTGPT_APP_ID, + chatId=session_id, + contentId=content_id ) - response.raise_for_status() - data = response.json() + delete_response.raise_for_status() + + except Exception as e: + logger.error(f"Error deleting chat records: {e}") + raise # @app.exception_handler(RequestValidationError) @@ -95,37 +98,60 @@ async def delete_last_two_chat_records(sessionId): # raise HTTPException(status_code=422, detail=exc.errors()) @router.post("/chat", response_model=ProcessResponse_chat) -async def chat(request: ProcessRequest_chat): +async def chat( + request: ProcessRequest_chat, + client: AsyncChatClient = Depends(get_fastgpt_client) +): + """Handle chat completion request.""" + json_data = request.model_dump() + logger.info(f"用户请求信息ProcessRequest_chat: {json_data}") + try: - headers = { - "Authorization": f"Bearer {ANALYSIS_AUTH_TOKEN}", - "Content-Type": "application/json" - } - json_data = request.model_dump() - chat_data = { - "chatId": json_data['sessionId'], - "stream": False, - "detail": True, - "messages": [ - { - "role": "user", - "content": json_data['text'] - } - ] - } + # Use SDK's create_chat_completion + response = await client.create_chat_completion( + messages=[{"role": "user", "content": json_data['text']}], + chatId=json_data['sessionId'], + stream=False, + detail=True + ) + response.raise_for_status() + data = response.json() - # Call external analysis service - async with httpx.AsyncClient() as client: - response = await client.post( - ANALYSIS_SERVICE_URL, - json=chat_data, - headers=headers, - timeout=None - ) - response.raise_for_status() - data = response.json() + except AuthenticationError as e: + logger.error(f"Authentication error: {e}") + return ProcessResponse_chat( + sessionId=json_data['sessionId'], + timeStamp=json_data['timeStamp'], + outputText="", + nextStage="", + nextStageCode="", + code="401", + msg="认证失败" + ) + except RateLimitError as e: + logger.error(f"Rate limit error: {e}") + return ProcessResponse_chat( + sessionId=json_data['sessionId'], + timeStamp=json_data['timeStamp'], + outputText="", + nextStage="", + nextStageCode="", + code="429", + msg="请求过于频繁,请稍后重试" + ) + except APIError as e: + logger.error(f"API error: {e}") + return ProcessResponse_chat( + sessionId=json_data['sessionId'], + timeStamp=json_data['timeStamp'], + outputText="", + nextStage="", + nextStageCode="", + code="500", + msg="大模型服务器无响应" + ) except Exception as e: - print(e) + logger.error(f"Unexpected error: {e}") return ProcessResponse_chat( sessionId=json_data['sessionId'], timeStamp=json_data['timeStamp'], @@ -139,26 +165,59 @@ async def chat(request: ProcessRequest_chat): try: # Extract content from FastGPT response content = data['choices'][0]['message']['content'] + logger.info(f"FastGPT服务返回信息content: {content}") + finish_reason = data['choices'][0]['finish_reason'] - transfer_to_human = data['newVariables']['state'].get("transfer_to_human", False) - ywrysw = data['newVariables']['state'].get("ywrysw", False) - ywfjdc = data['newVariables']['state'].get("ywfjdc", False) - ywmtc = data['newVariables']['state'].get("ywmtc", False) - jdcsl = data['newVariables']['state'].get("jdcsl", 0) - accident_info_complete = data['newVariables']['state'].get("accident_info_complete", False) - user_is_ready = data['newVariables']['state'].get("user_is_ready", False) + # Extract state variables + state = data.get('newVariables', {}).get('state', {}) + if isinstance(state, str): + state = json.loads(state) + + transfer_to_human = state.get("transfer_to_human", False) + ywrysw = state.get("ywrysw", False) + ywfjdc = state.get("ywfjdc", False) + ywmtc = state.get("ywmtc", False) + jdcsl = state.get("jdcsl", 0) + accident_info_complete = state.get("accident_info_complete", False) + user_is_ready = state.get("user_is_ready", False) if isinstance(user_is_ready, str): user_is_ready = user_is_ready.lower() == 'true' - driver_info_complete = data['newVariables']['state'].get("driver_info_complete", False) - drivers_info_complete = data['newVariables']['state'].get("drivers_info_complete", False) - driver_info_check = data['newVariables']['state'].get("drivers_info_check", False) - drivers_info_check = data['newVariables']['state'].get("drivers_info_check", False) + driver_info_complete = state.get("driver_info_complete", False) + drivers_info_complete = state.get("drivers_info_complete", False) + driver_info_check = state.get("drivers_info_check", False) + drivers_info_check = state.get("drivers_info_check", False) - print(data['newVariables']) + logger.debug(f"State variables: {data.get('newVariables', {})}") - nextStageCode=data['newVariables']['status_code'] - nextStage=STATUS_CODE_MAP[nextStageCode] + nextStageCode = data['newVariables']['status_code'] + + # 有一些情况需要调整nextStageCode + if nextStageCode in ['3001', '3002', '1002']: + nextStageCode = '1002' + elif nextStageCode == '2006': + nextStageCode = '2004' + elif nextStageCode == '2017': + nextStageCode = '2016' + elif nextStageCode == '2020': + nextStageCode = '0002' + nextStage = STATUS_CODE_MAP.get(nextStageCode, '') + + # Parse content - sometimes content is a string, sometimes it is a list + if isinstance(content, list): + logger.debug("content是一个list") + content = content[0]['text']['content'] + elif isinstance(content, str): + logger.debug("content是一个str") + state_and_content = extract_state_and_content(content) + if state_and_content: + logger.debug(f"解析后的state和content为: {state_and_content}") + content = state_and_content['content'] + else: + raise ValueError("大模型回复中的state解析失败") + else: + logger.error(f"content既不是list也不是str, type: {type(content)}") + raise ValueError("大模型回复不是list也不是str") return ProcessResponse_chat( sessionId=json_data['sessionId'], @@ -170,7 +229,8 @@ async def chat(request: ProcessRequest_chat): msg="", ) except Exception as e: - print(e) + logger.error(f"解析信息发生错误: {e}") + logger.error(f"content: {content}, type: {type(content)}") return ProcessResponse_chat( sessionId=json_data['sessionId'], timeStamp=json_data['timeStamp'], @@ -182,121 +242,43 @@ async def chat(request: ProcessRequest_chat): ) -@router.post("/set_info", response_model=ProcessResponse_set) -async def set_info(request: ProcessRequest_set): - # 获取当前state +@router.post("/set_info", response_model=ProcessResponse_set) +async def set_info( + request: ProcessRequest_set, + client: AsyncChatClient = Depends(get_fastgpt_client) +): + """Set information in chat state.""" + json_data = request.model_dump() + try: - headers = { - "Authorization": f"Bearer {ANALYSIS_AUTH_TOKEN}", - "Content-Type": "application/json" - } - json_data = request.model_dump() - chat_data = { - "chatId": json_data['sessionId'], - "stream": False, - "detail": True, - "messages": [ - { - "role": "user", - "content": "" - } - ] - } + # Get current state + response = await client.create_chat_completion( + messages=[{"role": "user", "content": ""}], + chatId=json_data['sessionId'], + stream=False, + detail=True + ) + response.raise_for_status() + data = response.json() - # Call external analysis service - async with httpx.AsyncClient() as client: - response = await client.post( - ANALYSIS_SERVICE_URL, - json=chat_data, - headers=headers, - timeout=None - ) - response.raise_for_status() - data = response.json() - current_state = data['newVariables']['state'] - if isinstance(current_state, str): - current_state = json.loads(current_state) - print(current_state) + current_state = data['newVariables']['state'] + if isinstance(current_state, str): + current_state = json.loads(current_state) + logger.debug(f"Current state: {current_state}") + except Exception as e: - print(e) + logger.error(f"Error getting current state: {e}") return ProcessResponse_set( sessionId=json_data['sessionId'], timeStamp=json_data['timeStamp'], code="500", msg="大模型服务器无响应" ) - + try: - await delete_last_two_chat_records(json_data['sessionId']) + await delete_last_two_chat_records(client, json_data['sessionId']) except Exception as e: - print(e) - return ProcessResponse_set( - sessionId=json_data['sessionId'], - timeStamp=json_data['timeStamp'], - code="500", - msg="大模型后台无响应" - ) - - # 修改当前state - try: - headers = { - "Authorization": f"Bearer {ANALYSIS_AUTH_TOKEN}", - "Content-Type": "application/json" - } - json_data = request.model_dump() - # key_map = {"xm1": "xm1", "hpzl1": "hpzl1", "hphm1": "driver1License", "sfzmhm1": "driver1ID", "sfzmwh1": "sfzmwh1", "sjhm1": "driver1Phonenumber","sjwh1": "sjwh1", "xm2":"xm2", "hpzl2": "hpzl2", "hphm2": "driver2License", "sfzmhm2": "driver2ID", "sfzmwh2": "sfzmwh2", "sjhm2": "driver2Phonenumber", "sjwh2": "sjwh2"} - # if json_data['key'] not in key_map: - # raise Exception("check your key") - # key = key_map.get(json_data['key']) - key = json_data['key'] - value = json_data['value'] - current_state[json_data['key']] = value - print(f'即将上传 {current_state}') - # if key == 'driver1ID' or key == 'driver2ID' or key == 'driver1Phonenumber' or key == 'driver2Phonenumber': - # current_state[json_data['key'].replace('hm','wh')] = value[-4:] - - chat_data = { - "chatId": json_data['sessionId'], - "stream": False, - "detail": True, - "variables": {'state':current_state}, - "messages": [ - { - "role": "user", - "content": "" - } - ] - } - # Call external analysis service - async with httpx.AsyncClient() as client: - response = await client.post( - ANALYSIS_SERVICE_URL, - json=chat_data, - headers=headers, - timeout=None - ) - response.raise_for_status() - - return ProcessResponse_set( - sessionId=json_data['sessionId'], - timeStamp=json_data['timeStamp'], - code="200", - msg="" - ) - - except Exception as e: - print(e) - return ProcessResponse_set( - sessionId=json_data['sessionId'], - timeStamp=json_data['timeStamp'], - code="500", - msg="大模型后台无响应" - ) - - try: - await delete_last_two_chat_records(json_data['sessionId']) - except Exception as e: - print(e) + logger.error(f"Error deleting chat records: {e}") return ProcessResponse_set( sessionId=json_data['sessionId'], timeStamp=json_data['timeStamp'], @@ -304,43 +286,68 @@ async def set_info(request: ProcessRequest_set): msg="大模型后台无响应" ) -@router.post("/get_info", response_model=ProcessResponse_get) -async def get_info(request: ProcessRequest_get): - # 获取当前state try: - headers = { - "Authorization": f"Bearer {ANALYSIS_AUTH_TOKEN}", - "Content-Type": "application/json" - } - json_data = request.model_dump() - chat_data = { - "chatId": json_data['sessionId'], - "stream": False, - "detail": True, - "messages": [ - { - "role": "user", - "content": "" - } - ] - } + # Update state + key = json_data['key'] + value = json_data['value'] + current_state[key] = value + logger.info(f'即将上传 {current_state}') + + # Update state using SDK + response = await client.create_chat_completion( + messages=[{"role": "user", "content": ""}], + chatId=json_data['sessionId'], + stream=False, + detail=True, + variables={'state': current_state} + ) + response.raise_for_status() + + # Delete records again after update + await delete_last_two_chat_records(client, json_data['sessionId']) + + return ProcessResponse_set( + sessionId=json_data['sessionId'], + timeStamp=json_data['timeStamp'], + code="200", + msg="" + ) - # Call external analysis service - async with httpx.AsyncClient() as client: - response = await client.post( - ANALYSIS_SERVICE_URL, - json=chat_data, - headers=headers, - timeout=None - ) - response.raise_for_status() - data = response.json() - current_state = data['newVariables']['state'] - if isinstance(current_state, str): - current_state = json.loads(current_state) - print(current_state) except Exception as e: - print(e) + logger.error(f"Error setting info: {e}") + return ProcessResponse_set( + sessionId=json_data['sessionId'], + timeStamp=json_data['timeStamp'], + code="500", + msg="大模型后台无响应" + ) + +@router.post("/get_info", response_model=ProcessResponse_get) +async def get_info( + request: ProcessRequest_get, + client: AsyncChatClient = Depends(get_fastgpt_client) +): + """Get information from chat state.""" + json_data = request.model_dump() + + try: + # Get current state + response = await client.create_chat_completion( + messages=[{"role": "user", "content": ""}], + chatId=json_data['sessionId'], + stream=False, + detail=True + ) + response.raise_for_status() + data = response.json() + + current_state = data['newVariables']['state'] + if isinstance(current_state, str): + current_state = json.loads(current_state) + logger.debug(f"Current state: {current_state}") + + except Exception as e: + logger.error(f"Error getting state: {e}") return ProcessResponse_get( sessionId=json_data['sessionId'], timeStamp=json_data['timeStamp'], @@ -348,11 +355,11 @@ async def get_info(request: ProcessRequest_get): code="500", msg="大模型服务器无响应" ) - + try: - await delete_last_two_chat_records(json_data['sessionId']) + await delete_last_two_chat_records(client, json_data['sessionId']) except Exception as e: - print(e) + logger.error(f"Error deleting records: {e}") return ProcessResponse_get( sessionId=json_data['sessionId'], timeStamp=json_data['timeStamp'], @@ -360,69 +367,39 @@ async def get_info(request: ProcessRequest_get): code="500", msg="大模型后台无响应" ) - + try: key = json_data['key'] - # key_map = {"hpzl1": "hpzl1", "hphm1": "driver1License", "sfzmhm1": "driver1ID", "sfzmwh1": "sfzmwh1", "sjhm1": "driver1Phonenumber","sjwh1": "sjwh1", "hpzl2": "hpzl2", "hphm2": "driver2License", "sfzmhm2": "driver2ID", "sfzmwh2": "sfzmwh2", "sjhm2": "driver2Phonenumber", "sjwh2": "sjwh2"} acd_keys = ['ywrysw', 'ywfjdc', 'ywmtc', 'bjrjs', 'sgfssj', 'sfsgxc', 'jdcsl', 'sgyy'] human1_keys = ['xm1', 'hpzl1', 'hphm1', 'sfzmhm1', 'sfzmwh1', 'sjhm1', 'sjwh1', 'csbw1'] human2_keys = ['xm2', 'hpzl2', 'hphm2', 'sfzmhm2', 'sfzmwh2', 'sjhm2', 'sjwh2', 'csbw2'] + + def bool_to_str(v): + """Convert boolean to string representation.""" + return '1' if v is True else '0' if v is False else v + if key == 'all': - acd_values = {} - for k in acd_keys: - v = current_state.get(k, '') - if isinstance(v, bool): - v = '1' if v==True else '0' - acd_values[k] = v - human1_values = {} - for k in human1_keys: - v = current_state.get(k, '') - if isinstance(v, bool): - v = '1' if v==True else '0' - human1_values[k] = v - human2_values = {} - for k in human2_keys: - v = current_state.get(k, '') - if isinstance(v, bool): - v = '1' if v==True else '0' - human2_values[k] = v - value = {'acdinfo': acd_values, 'acdhuman1': human1_values, 'acdhuman2': human2_values} - value = json.dumps(value) + acd_values = {k: bool_to_str(current_state.get(k, '')) for k in acd_keys} + human1_values = {k: bool_to_str(current_state.get(k, '')) for k in human1_keys} + human2_values = {k: bool_to_str(current_state.get(k, '')) for k in human2_keys} + value = json.dumps({ + 'acdinfo': acd_values, + 'acdhuman1': human1_values, + 'acdhuman2': human2_values + }) elif key == "acdinfo": - acd_values = {} - for k in acd_keys: - v = current_state.get(k, '') - if isinstance(v, bool): - v = '1' if v==True else '0' - acd_values[k] = v + acd_values = {k: bool_to_str(current_state.get(k, '')) for k in acd_keys} value = json.dumps(acd_values) elif key == 'acdhuman1': - human1_values = {} - for k in human1_keys: - v = current_state.get(k, '') - if isinstance(v, bool): - v = '1' if v==True else '0' - human1_values[k] = v - value = json.dumps(human1_value) + human1_values = {k: bool_to_str(current_state.get(k, '')) for k in human1_keys} + value = json.dumps(human1_values) elif key == 'acdhuman2': - human2_values = {} - for k in human1_keys: - v = current_state.get(k, '') - if isinstance(v, bool): - v = '1' if v==True else '0' - human2_values[k] = v - value = json.dumps(human2_value) + human2_values = {k: bool_to_str(current_state.get(k, '')) for k in human2_keys} + value = json.dumps(human2_values) else: - # if key in key_map: - # real_key = key_map.get(key) - # else: - # real_key = key - value = current_state.get(key, '') - value = json.dumps(value) - # if value == 'null': - # raise Exception("check your key") - - print(json_data) + value = json.dumps(current_state.get(key, '')) + + logger.debug(f"Returning value for key '{key}': {value}") return ProcessResponse_get( sessionId=json_data['sessionId'], timeStamp=json_data['timeStamp'], @@ -430,8 +407,9 @@ async def get_info(request: ProcessRequest_get): code="200", msg="" ) + except Exception as e: - print(e) + logger.error(f"Error getting info: {e}") return ProcessResponse_get( sessionId=json_data['sessionId'], timeStamp=json_data['timeStamp'], @@ -442,39 +420,32 @@ async def get_info(request: ProcessRequest_get): @router.delete("/delete_session", response_model=ProcessResponse_delete_session) -async def delete_session(request: ProcessRequest_delete_session): - try: - headers = { - "Authorization": f"Bearer {ANALYSIS_AUTH_TOKEN}", - "Content-Type": "application/json" - } - json_data = request.model_dump() - chatId = json_data.get('sessionId', None) - if chatId is None: - raise ValueError(f'Error in chatId.') - url = DELETE_SESSION_URL.format(appId=APP_ID, chatId=chatId) - - # Call external analysis service - async with httpx.AsyncClient() as client: - response = await client.delete( - url, - headers=headers, - timeout=None - ) - response.raise_for_status() - data = response.json() - # print(data) - except Exception as e: - print(e) - return ProcessResponse_chat( - sessionId=json_data['sessionId'], +async def delete_session( + request: ProcessRequest_delete_session, + client: AsyncChatClient = Depends(get_fastgpt_client) +): + """Delete a chat session.""" + json_data = request.model_dump() + chat_id = json_data.get('sessionId') + + if not chat_id: + return ProcessResponse_delete_session( + sessionId="", timeStamp=json_data['timeStamp'], - code="500", - msg="大模型服务器无响应" + code="400", + msg="sessionId is required" ) - + try: - if data['code'] == 200: + # Use SDK's delete_chat_history + response = await client.delete_chat_history( + appId=Config.FASTGPT_APP_ID, + chatId=chat_id + ) + response.raise_for_status() + data = response.json() + + if data.get('code') == 200: return ProcessResponse_delete_session( sessionId=json_data['sessionId'], timeStamp=json_data['timeStamp'], @@ -483,8 +454,9 @@ async def delete_session(request: ProcessRequest_delete_session): ) else: raise ValueError("删除会话失败") + except Exception as e: - print(e) + logger.error(f"Error deleting session: {e}") return ProcessResponse_delete_session( sessionId=json_data['sessionId'], timeStamp=json_data['timeStamp'], diff --git a/src/core/config.py b/src/core/config.py index 14d15ab..3692aba 100644 --- a/src/core/config.py +++ b/src/core/config.py @@ -8,4 +8,23 @@ class Config: API_V1_STR = "/api/v1" SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key") DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./test.db") - DEBUG = os.getenv("DEBUG", "false").lower() in ("true", "1", "t") \ No newline at end of file + DEBUG = os.getenv("DEBUG", "false").lower() in ("true", "1", "t") + + # FastGPT Configuration + FASTGPT_API_KEY = os.getenv("ANALYSIS_AUTH_TOKEN") + FASTGPT_BASE_URL = os.getenv("ANALYSIS_SERVICE_URL") + FASTGPT_APP_ID = os.getenv("APP_ID") + DELETE_SESSION_URL = os.getenv("DELETE_SESSION_URL") + DELETE_CHAT_URL = os.getenv("DELETE_CHAT_URL") + GET_CHAT_RECORDS_URL = os.getenv("GET_CHAT_RECORDS_URL") + + @classmethod + def validate(cls): + """Validate required configuration.""" + required = [ + ("FASTGPT_API_KEY", cls.FASTGPT_API_KEY), + ("FASTGPT_APP_ID", cls.FASTGPT_APP_ID), + ] + missing = [name for name, value in required if not value] + if missing: + raise ValueError(f"Missing required environment variables: {', '.join(missing)}") \ No newline at end of file diff --git a/src/core/fastgpt_client.py b/src/core/fastgpt_client.py new file mode 100644 index 0000000..ee582d8 --- /dev/null +++ b/src/core/fastgpt_client.py @@ -0,0 +1,39 @@ +"""FastGPT client dependency injection.""" +from contextlib import asynccontextmanager +from fastapi import FastAPI +from fastgpt_client import AsyncChatClient +from .config import Config + +# Global client instance +_fastgpt_client: AsyncChatClient | None = None + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Manage FastGPT client lifecycle.""" + global _fastgpt_client + Config.validate() + + # Initialize client + _fastgpt_client = AsyncChatClient( + api_key=Config.FASTGPT_API_KEY, + base_url=Config.FASTGPT_BASE_URL, + timeout=60.0, + max_retries=3, + retry_delay=1.0, + enable_logging=Config.DEBUG, + ) + await _fastgpt_client.__aenter__() + + yield + + # Cleanup + if _fastgpt_client: + await _fastgpt_client.__aexit__(None, None, None) + + +def get_fastgpt_client() -> AsyncChatClient: + """Get the FastGPT client instance.""" + if _fastgpt_client is None: + raise RuntimeError("FastGPT client not initialized") + return _fastgpt_client diff --git a/src/core/logging_config.py b/src/core/logging_config.py new file mode 100644 index 0000000..981604c --- /dev/null +++ b/src/core/logging_config.py @@ -0,0 +1,79 @@ +"""Loguru logging configuration.""" +import sys +import logging +import inspect +from loguru import logger +from pathlib import Path +from .config import Config + +# Remove default logger +logger.remove() + +# Add console handler with color +logger.add( + sys.stderr, + format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} - {message}", + level="DEBUG" if Config.DEBUG else "INFO", + colorize=True, +) + +# Add file handler with rotation +log_dir = Path("logs") +log_dir.mkdir(exist_ok=True) + +logger.add( + log_dir / "server_{time:YYYY-MM-DD}.log", + rotation="00:00", # Rotate at midnight + retention="30 days", # Keep logs for 30 days + compression="zip", # Compress old logs + format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} - {message}", + level="DEBUG", + encoding="utf-8", +) + +# Add error file handler +logger.add( + log_dir / "error_{time:YYYY-MM-DD}.log", + rotation="00:00", + retention="90 days", + compression="zip", + format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} - {message}", + level="ERROR", + encoding="utf-8", +) + +# Configure uvicorn logging to use loguru +class InterceptHandler(logging.Handler): + """Intercept standard logging messages toward loguru.""" + + def emit(self, record: logging.LogRecord) -> None: + # Get corresponding Loguru level if it exists + try: + level = logger.level(record.levelname).name + except ValueError: + level = str(record.levelno) + + # Find caller from where originated the logged message + frame, depth = inspect.currentframe(), 2 + while frame and frame.f_code.co_filename == logging.__file__: + frame = frame.f_back + depth += 1 + + logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage()) + + +def setup_logging(): + """Configure logging for the application.""" + # Intercept uvicorn and fastapi loggers + logging.getLogger("uvicorn").handlers = [InterceptHandler()] + logging.getLogger("uvicorn.access").handlers = [InterceptHandler()] + logging.getLogger("fastapi").handlers = [InterceptHandler()] + + # Set log levels + logging.getLogger("uvicorn").setLevel(logging.INFO) + logging.getLogger("uvicorn.access").setLevel(logging.INFO) + logging.getLogger("fastapi").setLevel(logging.INFO) + + # Intercept httpx logs if needed + logging.getLogger("httpx").handlers = [InterceptHandler()] + logging.getLogger("httpx").setLevel(logging.WARNING) diff --git a/src/main.py b/src/main.py index bcb2ebc..25f7d45 100644 --- a/src/main.py +++ b/src/main.py @@ -1,11 +1,17 @@ from fastapi import FastAPI import sys from .api.endpoints import router as api_router +from .core.fastgpt_client import lifespan +from .core.logging_config import setup_logging + +# Setup logging first +setup_logging() app = FastAPI( title="AI Accident Information Collection API", description="AI Accident Information Collection API", - version="1.0.0" + version="1.0.0", + lifespan=lifespan ) @app.get("/")