Files
ZNJJ-api-server/src/api/endpoints.py
2026-05-22 14:29:42 +08:00

558 lines
21 KiB
Python

from fastapi import APIRouter, HTTPException, Depends
from fastapi.responses import StreamingResponse
from ..schemas.models import ProcessRequest_chat, ProcessResponse_chat, ProcessRequest_get, ProcessResponse_get, ProcessRequest_set, ProcessResponse_set, ProcessResponse_delete_session, ProcessRequest_delete_session
from fastgpt_client import AsyncChatClient
from fastgpt_client.exceptions import (
APIError, AuthenticationError, RateLimitError, ValidationError
)
from ..core.fastgpt_client import get_fastgpt_client
from ..core.config import Config
from loguru import logger
import json
import re
router = APIRouter()
STATUS_CODE_MAP = {
'0000': '结束通话',
'0001': '转接人工',
'0002': '语义无法识别转接人工',
'0003': '有人伤转接人工',
'1001': '未准备好通话',
'1002': '通话中',
'2000': '进入单车拍照',
'2001': '请对准车辆碰撞部位拍摄照片',
'2002': '请对准被撞物品拍摄照片',
'2003': '请切换摄像头对准本人拍摄一张正面照片',
'2004': '确认单车车牌',
'2005': '请确认车损位置是在车辆前方、后方还是侧面',
'2010': '进入双车拍照',
'2011': '请对准第一辆车碰撞部位拍摄',
'2012': '请对准第二辆车碰撞部位拍摄',
'2013': '请对准第二方车辆侧后方,看清车牌拍摄',
'2014': '请拍摄另一方驾驶人的正面照片',
'2015': '请切换前置摄像头对准本人拍摄一张正面照片',
'2016': '确认双车中的车牌'
}
def extract_state_and_content(data1: str) -> dict | None:
"""
Extracts the state and content from a string in the format <state>STATE</state>content.
Args:
data1: The input string.
Returns:
A dictionary with 'state' and 'content' keys if a match is found,
otherwise None.
"""
data1 = data1.strip()
regex = r"<state>(.*?)</state>(.*)"
match = re.search(regex, data1)
if match:
return {
"state": match.group(1),
"content": match.group(2),
}
return None
async def delete_last_two_chat_records(
client: AsyncChatClient,
session_id: str
) -> None:
"""Delete the last two chat records."""
try:
# Get chat records using SDK
response = await client.get_chat_records(
appId=Config.FASTGPT_APP_ID,
chatId=session_id,
offset=0,
pageSize=10
)
response.raise_for_status()
data = response.json()
records = data.get('data', {}).get('list', [])
if len(records) < 2:
logger.warning(f"Less than 2 records found for session {session_id}")
return
last_two_data_ids = [record['dataId'] for record in records[-2:]]
logger.info(f"last_two_data_ids: {last_two_data_ids}")
# Delete records using SDK
for content_id in last_two_data_ids:
delete_response = await client.delete_chat_record(
appId=Config.FASTGPT_APP_ID,
chatId=session_id,
contentId=content_id
)
delete_response.raise_for_status()
except Exception as e:
logger.error(f"Error deleting chat records: {e}")
raise
# @app.exception_handler(RequestValidationError)
# async def validation_exception_handler(request: Request, exc: RequestValidationError):
# logging.error(f"Validation Error: {exc.errors()}")
# raise HTTPException(status_code=422, detail=exc.errors())
def create_sse_event(event: str, data: dict) -> str:
"""Format data as an SSE event."""
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
@router.post("/chat", response_model=ProcessResponse_chat)
async def chat(
request: ProcessRequest_chat,
stream: bool = False,
client: AsyncChatClient = Depends(get_fastgpt_client)
):
"""Handle chat completion request."""
json_data = request.model_dump()
logger.info(f"用户请求信息ProcessRequest_chat: {json_data}, stream={stream}")
if stream:
async def event_generator():
try:
# Use SDK's create_chat_completion with stream=True
response = await client.create_chat_completion(
messages=[{"role": "user", "content": json_data['text']}],
chatId=json_data['sessionId'],
stream=True,
detail=True
)
buffer = ""
state_code_found = False
async for chunk in response.aiter_lines():
if chunk.startswith('data: '):
data_str = chunk[6:].strip()
if data_str == '[DONE]':
break
try:
data = json.loads(data_str)
try:
delta_content = data['choices'][0]['delta'].get('content', '')
except (KeyError, IndexError):
delta_content = ''
if delta_content:
buffer += delta_content
if not state_code_found:
# Check for <state>XXXX</state> pattern
match = re.search(r"<state>(.*?)</state>", buffer)
if match:
state_code = match.group(1)
# Apply logic to map/adjust state code
nextStageCode = state_code
if nextStageCode in ['3001', '3002', '1002']:
nextStageCode = '1002'
elif nextStageCode == '2006':
nextStageCode = '2004'
elif nextStageCode == '2017':
nextStageCode = '2016'
elif nextStageCode == '2020':
nextStageCode = '0002'
nextStage = STATUS_CODE_MAP.get(nextStageCode, '')
# Send stage code event
yield create_sse_event("stage_code", {
"nextStageCode": nextStageCode,
"nextStage": nextStage
})
state_code_found = True
# Send remaining content as text_delta
remaining_content = buffer[match.end():]
if remaining_content:
yield create_sse_event("text_delta", {"text": remaining_content})
buffer = "" # Clear buffer after extracting state
else:
# State code already found, just stream text
yield create_sse_event("text_delta", {"text": delta_content})
buffer = "" # Do not buffer text after state found
except json.JSONDecodeError:
continue
except Exception as e:
print(data)
logger.error(f"Error processing chunk: {e}")
continue
# If stream ends and no state code found (unlikely if format is strict),
# we might want to send what we have
if not state_code_found and buffer:
yield create_sse_event("text_delta", {"text": buffer})
yield create_sse_event("done", {"status": "completed"})
except Exception as e:
logger.error(f"Streaming error: {e}")
yield create_sse_event("error", {"msg": str(e), "code": "500"})
return StreamingResponse(event_generator(), media_type="text/event-stream")
try:
# Use SDK's create_chat_completion
response = await client.create_chat_completion(
messages=[{"role": "user", "content": json_data['text']}],
chatId=json_data['sessionId'],
stream=False,
detail=True
)
response.raise_for_status()
data = response.json()
except AuthenticationError as e:
logger.error(f"Authentication error: {e}")
return ProcessResponse_chat(
sessionId=json_data['sessionId'],
timeStamp=json_data['timeStamp'],
outputText="",
nextStage="",
nextStageCode="",
code="401",
msg="认证失败"
)
except RateLimitError as e:
logger.error(f"Rate limit error: {e}")
return ProcessResponse_chat(
sessionId=json_data['sessionId'],
timeStamp=json_data['timeStamp'],
outputText="",
nextStage="",
nextStageCode="",
code="429",
msg="请求过于频繁,请稍后重试"
)
except APIError as e:
logger.error(f"API error: {e}")
return ProcessResponse_chat(
sessionId=json_data['sessionId'],
timeStamp=json_data['timeStamp'],
outputText="",
nextStage="",
nextStageCode="",
code="500",
msg="大模型服务器无响应"
)
except Exception as e:
logger.error(f"Unexpected error: {e}")
return ProcessResponse_chat(
sessionId=json_data['sessionId'],
timeStamp=json_data['timeStamp'],
outputText="",
nextStage="",
nextStageCode="",
code="500",
msg="大模型服务器无响应"
)
try:
# Extract content from FastGPT response
content = data['choices'][0]['message']['content']
logger.info(f"FastGPT服务返回信息content: {content}")
finish_reason = data['choices'][0]['finish_reason']
# Extract state variables
state = data.get('newVariables', {}).get('state', {})
if isinstance(state, str):
state = json.loads(state)
transfer_to_human = state.get("transfer_to_human", False)
ywrysw = state.get("ywrysw", False)
ywfjdc = state.get("ywfjdc", False)
ywmtc = state.get("ywmtc", False)
jdcsl = state.get("jdcsl", 0)
accident_info_complete = state.get("accident_info_complete", False)
user_is_ready = state.get("user_is_ready", False)
if isinstance(user_is_ready, str):
user_is_ready = user_is_ready.lower() == 'true'
driver_info_complete = state.get("driver_info_complete", False)
drivers_info_complete = state.get("drivers_info_complete", False)
driver_info_check = state.get("drivers_info_check", False)
drivers_info_check = state.get("drivers_info_check", False)
logger.debug(f"State variables: {data.get('newVariables', {})}")
nextStageCode = data['newVariables']['status_code']
# 有一些情况需要调整nextStageCode
if nextStageCode in ['3001', '3002', '1002']:
nextStageCode = '1002'
elif nextStageCode == '2006':
nextStageCode = '2004'
elif nextStageCode == '2017':
nextStageCode = '2016'
elif nextStageCode == '2020':
nextStageCode = '0002'
nextStage = STATUS_CODE_MAP.get(nextStageCode, '')
# Parse content - sometimes content is a string, sometimes it is a list
if isinstance(content, list):
logger.debug("content是一个list")
content = content[0]['text']['content']
elif isinstance(content, str):
logger.debug("content是一个str")
state_and_content = extract_state_and_content(content)
if state_and_content:
logger.debug(f"解析后的state和content为: {state_and_content}")
content = state_and_content['content']
else:
raise ValueError("大模型回复中的state解析失败")
else:
logger.error(f"content既不是list也不是str, type: {type(content)}")
raise ValueError("大模型回复不是list也不是str")
return ProcessResponse_chat(
sessionId=json_data['sessionId'],
timeStamp=json_data['timeStamp'],
outputText=content,
nextStage=nextStage,
nextStageCode=nextStageCode,
code="200",
msg="",
)
except Exception as e:
logger.error(f"解析信息发生错误: {e}")
logger.error(f"content: {content}, type: {type(content)}")
return ProcessResponse_chat(
sessionId=json_data['sessionId'],
timeStamp=json_data['timeStamp'],
outputText="",
nextStage="",
nextStageCode="",
code="500",
msg="大模型服务返回消息不完整"
)
@router.post("/set_info", response_model=ProcessResponse_set)
async def set_info(
request: ProcessRequest_set,
client: AsyncChatClient = Depends(get_fastgpt_client)
):
"""Set information in chat state."""
json_data = request.model_dump()
try:
# Get current state
response = await client.create_chat_completion(
messages=[{"role": "user", "content": ""}],
chatId=json_data['sessionId'],
stream=False,
detail=True
)
response.raise_for_status()
data = response.json()
current_state = data['newVariables']['state']
if isinstance(current_state, str):
current_state = json.loads(current_state)
logger.debug(f"Current state: {current_state}")
except Exception as e:
logger.error(f"Error getting current state: {e}")
return ProcessResponse_set(
sessionId=json_data['sessionId'],
timeStamp=json_data['timeStamp'],
code="500",
msg="大模型服务器无响应"
)
try:
await delete_last_two_chat_records(client, json_data['sessionId'])
except Exception as e:
logger.error(f"Error deleting chat records: {e}")
return ProcessResponse_set(
sessionId=json_data['sessionId'],
timeStamp=json_data['timeStamp'],
code="500",
msg="大模型后台无响应"
)
try:
# Update state
key = json_data['key']
value = json_data['value']
current_state[key] = value
logger.info(f'即将设置 {key}{value}')
logger.info(f'即将上传 {current_state}')
# Update state using SDK
response = await client.create_chat_completion(
messages=[{"role": "user", "content": ""}],
chatId=json_data['sessionId'],
stream=False,
detail=True,
variables={'state': current_state}
)
response.raise_for_status()
# Delete records again after update
await delete_last_two_chat_records(client, json_data['sessionId'])
return ProcessResponse_set(
sessionId=json_data['sessionId'],
timeStamp=json_data['timeStamp'],
code="200",
msg=""
)
except Exception as e:
logger.error(f"Error setting info: {e}")
return ProcessResponse_set(
sessionId=json_data['sessionId'],
timeStamp=json_data['timeStamp'],
code="500",
msg="大模型后台无响应"
)
@router.post("/get_info", response_model=ProcessResponse_get)
async def get_info(
request: ProcessRequest_get,
client: AsyncChatClient = Depends(get_fastgpt_client)
):
"""Get information from chat state."""
json_data = request.model_dump()
try:
# Get current state
response = await client.create_chat_completion(
messages=[{"role": "user", "content": ""}],
chatId=json_data['sessionId'],
stream=False,
detail=True
)
response.raise_for_status()
data = response.json()
current_state = data['newVariables']['state']
if isinstance(current_state, str):
current_state = json.loads(current_state)
logger.debug(f"Current state: {current_state}")
except Exception as e:
logger.error(f"Error getting state: {e}")
return ProcessResponse_get(
sessionId=json_data['sessionId'],
timeStamp=json_data['timeStamp'],
value="",
code="500",
msg="大模型服务器无响应"
)
try:
await delete_last_two_chat_records(client, json_data['sessionId'])
except Exception as e:
logger.error(f"Error deleting records: {e}")
return ProcessResponse_get(
sessionId=json_data['sessionId'],
timeStamp=json_data['timeStamp'],
value="",
code="500",
msg="大模型后台无响应"
)
try:
key = json_data['key']
acd_keys = ['ywrysw', 'ywfjdc', 'ywmtc', 'bjrjs', 'sgfssj', 'sfsgxc', 'jdcsl', 'sgyy']
human1_keys = ['xm1', 'hpzl1', 'hphm1', 'sfzmhm1', 'sfzmwh1', 'sjhm1', 'sjwh1', 'csbw1']
human2_keys = ['xm2', 'hpzl2', 'hphm2', 'sfzmhm2', 'sfzmwh2', 'sjhm2', 'sjwh2', 'csbw2']
def bool_to_str(v):
"""Convert boolean to string representation."""
return '1' if v is True else '0' if v is False else v
if key == 'all':
acd_values = {k: bool_to_str(current_state.get(k, '')) for k in acd_keys}
human1_values = {k: bool_to_str(current_state.get(k, '')) for k in human1_keys}
human2_values = {k: bool_to_str(current_state.get(k, '')) for k in human2_keys}
value = json.dumps({
'acdinfo': acd_values,
'acdhuman1': human1_values,
'acdhuman2': human2_values
})
elif key == "acdinfo":
acd_values = {k: bool_to_str(current_state.get(k, '')) for k in acd_keys}
value = json.dumps(acd_values)
elif key == 'acdhuman1':
human1_values = {k: bool_to_str(current_state.get(k, '')) for k in human1_keys}
value = json.dumps(human1_values)
elif key == 'acdhuman2':
human2_values = {k: bool_to_str(current_state.get(k, '')) for k in human2_keys}
value = json.dumps(human2_values)
else:
value = json.dumps(current_state.get(key, ''))
logger.debug(f"Returning value for key '{key}': {value}")
return ProcessResponse_get(
sessionId=json_data['sessionId'],
timeStamp=json_data['timeStamp'],
value=value,
code="200",
msg=""
)
except Exception as e:
logger.error(f"Error getting info: {e}")
return ProcessResponse_get(
sessionId=json_data['sessionId'],
timeStamp=json_data['timeStamp'],
value="",
code="500",
msg="入参不规范"
)
@router.delete("/delete_session", response_model=ProcessResponse_delete_session)
async def delete_session(
request: ProcessRequest_delete_session,
client: AsyncChatClient = Depends(get_fastgpt_client)
):
"""Delete a chat session."""
json_data = request.model_dump()
chat_id = json_data.get('sessionId')
if not chat_id:
return ProcessResponse_delete_session(
sessionId="",
timeStamp=json_data['timeStamp'],
code="400",
msg="sessionId is required"
)
try:
# Use SDK's delete_chat_history
response = await client.delete_chat_history(
appId=Config.FASTGPT_APP_ID,
chatId=chat_id
)
response.raise_for_status()
data = response.json()
if data.get('code') == 200:
return ProcessResponse_delete_session(
sessionId=json_data['sessionId'],
timeStamp=json_data['timeStamp'],
code="200",
msg=""
)
else:
raise ValueError("删除会话失败")
except Exception as e:
logger.error(f"Error deleting session: {e}")
return ProcessResponse_delete_session(
sessionId=json_data['sessionId'],
timeStamp=json_data['timeStamp'],
code="500",
msg="删除会话失败"
)