diff --git a/.env b/.env index d4fae61..b431a13 100644 --- a/.env +++ b/.env @@ -2,11 +2,8 @@ DATABASE_URL=sqlite:///./test.db SECRET_KEY=your_secret_key DEBUG=True -ANALYSIS_SERVICE_URL=http://101.89.151.141:3000/api/v1/chat/completions -ANALYSIS_AUTH_TOKEN=fastgpt-hSPnXMoBNGVAEpTLkQT3YfAnN26gQSyvLd4ABL1MRDoh68nL4RDlopFHXqmH8 -APP_ID=683ea1bc86197e19f71fc1ae -DELETE_SESSION_URL=http://101.89.151.141:3000/api/core/chat/delHistory?chatId={chatId}&appId={appId} -DELETE_CHAT_URL=http://101.89.151.141:3000/api/core/chat/item/delete?contentId={contentId}&chatId={chatId}&appId={appId} -GET_CHAT_RECORDS_URL=http://101.89.151.141:3000/api/core/chat/getPaginationRecords +ANALYSIS_SERVICE_URL=http://127.0.0.1:3030 +ANALYSIS_AUTH_TOKEN=fastgpt-r13smJwPgXfGj1HDfc4SWAvIoNrL5Wc6o0BYnezqBs7hgzPdQ7Q34hVl2FJc0R +APP_ID=6a310def7132e9f7d592dabb VOICE_CONFIG=config/voice-fastgpt-state-xfyunSuperTTS.json diff --git a/src/api/endpoints.py b/src/api/endpoints.py index 35fde98..ab6688f 100644 --- a/src/api/endpoints.py +++ b/src/api/endpoints.py @@ -1,7 +1,7 @@ from fastapi import APIRouter, HTTPException, Depends from fastapi.responses import StreamingResponse from ..schemas.models import ProcessRequest_chat, ProcessResponse_chat, ProcessRequest_get, ProcessResponse_get, ProcessRequest_set, ProcessResponse_set, ProcessResponse_delete_session, ProcessRequest_delete_session -from fastgpt_client import AsyncChatClient +from fastgpt_client import AsyncChatClient, aiter_stream_events from fastgpt_client.exceptions import ( APIError, AuthenticationError, RateLimitError, ValidationError ) @@ -12,6 +12,7 @@ import json import re router = APIRouter() +FORM_EXTRACT_MODULE_NAME = "文本内容提取事故信息" STATUS_CODE_MAP = { '0000': '结束通话', '0001': '转接人工', @@ -34,6 +35,19 @@ STATUS_CODE_MAP = { '2016': '确认双车中的车牌' } +def normalize_stage_code(stage_code: str) -> str: + """Normalize FastGPT stage codes to external API stage codes.""" + if stage_code in ['3001', '3002', '1002']: + return '1002' + if stage_code == '2006': + return '2004' + if stage_code == '2017': + return '2016' + if stage_code == '2020': + return '0002' + return stage_code + + def extract_state_and_content(data1: str) -> dict | None: """ Extracts the state and content from a string in the format STATEcontent. @@ -47,7 +61,7 @@ def extract_state_and_content(data1: str) -> dict | None: """ data1 = data1.strip() regex = r"(.*?)(.*)" - match = re.search(regex, data1) + match = re.search(regex, data1, flags=re.DOTALL) if match: return { @@ -56,6 +70,38 @@ def extract_state_and_content(data1: str) -> dict | None: } return None + +def extract_form_update_from_flow_nodes(nodes) -> str: + """Extract form update data from the configured FastGPT content-extract node.""" + if not isinstance(nodes, list): + return "" + + for node in nodes: + if not isinstance(node, dict): + continue + if node.get("moduleName") != FORM_EXTRACT_MODULE_NAME: + continue + + extract_result = node.get("extractResult", {}) + if not isinstance(extract_result, dict): + return "" + + form_update = extract_result.get("formUpdate", "") + if isinstance(form_update, str): + return form_update + if form_update: + return json.dumps(form_update, ensure_ascii=False) + return "" + + return "" + + +def format_set_info_input(payload: dict, include_input_info: bool) -> str: + """Build optional setInfo input for FastGPT helper calls.""" + if not include_input_info: + return "" + return f"{json.dumps(payload, ensure_ascii=False)}" + async def delete_last_two_chat_records( client: AsyncChatClient, session_id: str @@ -112,6 +158,8 @@ async def chat( """Handle chat completion request.""" json_data = request.model_dump() logger.info(f"用户请求信息ProcessRequest_chat: {json_data}, stream={stream}") + need_form_update = json_data.get('needFormUpdate', False) + chat_variables = {'needFormUpdate': need_form_update} if stream: async def event_generator(): @@ -121,73 +169,80 @@ async def chat( messages=[{"role": "user", "content": json_data['text']}], chatId=json_data['sessionId'], stream=True, - detail=True + detail=True, + variables=chat_variables ) buffer = "" state_code_found = False + module_form_sent = False + + def flush_text_delta(text: str): + return create_sse_event("text_delta", {"text": text}) + + def flush_form_update(form_update: str): + return create_sse_event("formUpdate", {"formUpdate": form_update}) - async for chunk in response.aiter_lines(): - if chunk.startswith('data: '): - data_str = chunk[6:].strip() - if data_str == '[DONE]': - break + async for event in aiter_stream_events(response): + try: + if event.kind == "flowResponses" and not module_form_sent: + form_update = extract_form_update_from_flow_nodes(event.data) + if form_update: + yield flush_form_update(form_update) + module_form_sent = True + continue + + if event.kind not in {"answer", "fastAnswer", "data"}: + continue + + data = event.data + if not isinstance(data, dict): + continue + try: - data = json.loads(data_str) - try: - delta_content = data['choices'][0]['delta'].get('content', '') - except (KeyError, IndexError): - delta_content = '' - if delta_content: - buffer += delta_content - - if not state_code_found: - # Check for XXXX pattern - match = re.search(r"(.*?)", buffer) - if match: - state_code = match.group(1) - - # Apply logic to map/adjust state code - nextStageCode = state_code - if nextStageCode in ['3001', '3002', '1002']: - nextStageCode = '1002' - elif nextStageCode == '2006': - nextStageCode = '2004' - elif nextStageCode == '2017': - nextStageCode = '2016' - elif nextStageCode == '2020': - nextStageCode = '0002' - nextStage = STATUS_CODE_MAP.get(nextStageCode, '') - - # Send stage code event - yield create_sse_event("stage_code", { - "nextStageCode": nextStageCode, - "nextStage": nextStage - }) - - state_code_found = True - - # Send remaining content as text_delta - remaining_content = buffer[match.end():] - if remaining_content: - yield create_sse_event("text_delta", {"text": remaining_content}) - buffer = "" # Clear buffer after extracting state - else: - # State code already found, just stream text - yield create_sse_event("text_delta", {"text": delta_content}) - buffer = "" # Do not buffer text after state found - - except json.JSONDecodeError: - continue - except Exception as e: - print(data) - logger.error(f"Error processing chunk: {e}") + delta_content = data['choices'][0]['delta'].get('content', '') + except (KeyError, IndexError): + delta_content = '' + if not delta_content: continue + buffer += delta_content + + if not state_code_found: + # Check for XXXX pattern + match = re.search(r"(.*?)", buffer, flags=re.DOTALL) + if match: + state_code = match.group(1) + + # Apply logic to map/adjust state code + nextStageCode = normalize_stage_code(state_code) + nextStage = STATUS_CODE_MAP.get(nextStageCode, '') + + # Send stage code event + yield create_sse_event("stage_code", { + "nextStageCode": nextStageCode, + "nextStage": nextStage + }) + + state_code_found = True + + # Send remaining content as text_delta + remaining_content = buffer[match.end():] + if remaining_content: + yield flush_text_delta(remaining_content) + buffer = "" # Clear buffer after extracting state + else: + yield flush_text_delta(delta_content) + buffer = "" + + except Exception as e: + logger.error(f"Error processing stream event: {e}") + continue + # If stream ends and no state code found (unlikely if format is strict), # we might want to send what we have if not state_code_found and buffer: - yield create_sse_event("text_delta", {"text": buffer}) + yield create_sse_event("text_delta", {"text": buffer}) yield create_sse_event("done", {"status": "completed"}) @@ -203,7 +258,8 @@ async def chat( messages=[{"role": "user", "content": json_data['text']}], chatId=json_data['sessionId'], stream=False, - detail=True + detail=True, + variables=chat_variables ) response.raise_for_status() data = response.json() @@ -281,28 +337,18 @@ async def chat( logger.debug(f"State variables: {data.get('newVariables', {})}") - nextStageCode = data['newVariables']['status_code'] - - # 有一些情况需要调整nextStageCode - if nextStageCode in ['3001', '3002', '1002']: - nextStageCode = '1002' - elif nextStageCode == '2006': - nextStageCode = '2004' - elif nextStageCode == '2017': - nextStageCode = '2016' - elif nextStageCode == '2020': - nextStageCode = '0002' - nextStage = STATUS_CODE_MAP.get(nextStageCode, '') - # Parse content - sometimes content is a string, sometimes it is a list + content_stage_code = None if isinstance(content, list): logger.debug("content是一个list") content = content[0]['text']['content'] - elif isinstance(content, str): + + if isinstance(content, str): logger.debug("content是一个str") state_and_content = extract_state_and_content(content) if state_and_content: logger.debug(f"解析后的state和content为: {state_and_content}") + content_stage_code = state_and_content['state'] content = state_and_content['content'] else: raise ValueError("大模型回复中的state解析失败") @@ -310,10 +356,16 @@ async def chat( logger.error(f"content既不是list也不是str, type: {type(content)}") raise ValueError("大模型回复不是list也不是str") + nextStageCode = content_stage_code or data['newVariables']['status_code'] + nextStageCode = normalize_stage_code(nextStageCode) + nextStage = STATUS_CODE_MAP.get(nextStageCode, '') + form_update = extract_form_update_from_flow_nodes(data.get("responseData", [])) + return ProcessResponse_chat( sessionId=json_data['sessionId'], timeStamp=json_data['timeStamp'], outputText=content, + formUpdate=form_update, nextStage=nextStage, nextStageCode=nextStageCode, code="200", @@ -340,11 +392,16 @@ async def set_info( ): """Set information in chat state.""" json_data = request.model_dump() + set_info_payload = {'key': json_data['key'], 'value': json_data['value']} + set_info_input = format_set_info_input( + set_info_payload, + json_data.get('includeInputInfo', False) + ) try: # Get current state response = await client.create_chat_completion( - messages=[{"role": "user", "content": ""}], + messages=[{"role": "user", "content": set_info_input}], chatId=json_data['sessionId'], stream=False, detail=True @@ -387,7 +444,7 @@ async def set_info( # Update state using SDK response = await client.create_chat_completion( - messages=[{"role": "user", "content": ""}], + messages=[{"role": "user", "content": set_info_input}], chatId=json_data['sessionId'], stream=False, detail=True, @@ -421,11 +478,16 @@ async def get_info( ): """Get information from chat state.""" json_data = request.model_dump() + get_info_payload = {'key': json_data['key']} + get_info_input = format_set_info_input( + get_info_payload, + json_data.get('includeInputInfo', False) + ) try: # Get current state response = await client.create_chat_completion( - messages=[{"role": "user", "content": ""}], + messages=[{"role": "user", "content": get_info_input}], chatId=json_data['sessionId'], stream=False, detail=True diff --git a/src/schemas/models.py b/src/schemas/models.py index 88116f0..0335d5f 100644 --- a/src/schemas/models.py +++ b/src/schemas/models.py @@ -5,11 +5,13 @@ class ProcessRequest_chat(BaseModel): sessionId: str = Field(..., max_length=64) timeStamp: str = Field(..., max_length=32) text: str = Field(...) + needFormUpdate: bool = False class ProcessResponse_chat(BaseModel): sessionId: str = Field(..., max_length=64) timeStamp: str = Field(..., max_length=32) outputText: str = Field(...) + formUpdate: str = Field(default="") nextStage: str = Field(..., max_length=32) nextStageCode: str = Field(..., max_length=4) code: str = Field(..., max_length=4) @@ -19,6 +21,7 @@ class ProcessRequest_get(BaseModel): sessionId: str = Field(..., max_length=64) timeStamp: str = Field(..., max_length=32) key: str = Field(...) + includeInputInfo: bool = False class ProcessResponse_get(BaseModel): sessionId: str = Field(..., max_length=64) @@ -32,6 +35,7 @@ class ProcessRequest_set(BaseModel): timeStamp: str = Field(..., max_length=32) key: str = Field(...) value: str = Field(...) + includeInputInfo: bool = False class ProcessResponse_set(BaseModel): sessionId: str = Field(..., max_length=64) diff --git a/test/api/fastapi.http b/test/api/fastapi.http index 41abd74..20632f2 100644 --- a/test/api/fastapi.http +++ b/test/api/fastapi.http @@ -3,27 +3,28 @@ GET http://127.0.0.1:8080 HTTP/1.1 200 - OK -connection: close +date: Wed, 17 Jun 2026 00:37:02 GMT +server: uvicorn content-length: 32 content-type: application/json -date: Thu, 08 Jan 2026 08:58:09 GMT -server: uvicorn +connection: close ### POST http://127.0.0.1:8080/chat content-type: application/json { - "sessionId": "a1002", + "sessionId": "a1100", "timeStamp": "202503310303", - "text": "【拍摄完成】" + "text": "继续", + "needFormTags": true } HTTP/1.1 200 - OK -connection: close -content-length: 205 -content-type: application/json -date: Thu, 08 Jan 2026 08:59:37 GMT +date: Wed, 17 Jun 2026 00:37:26 GMT server: uvicorn +content-length: 274 +content-type: application/json +connection: close ### POST http://127.0.0.1:8080/get_info content-type: application/json @@ -35,11 +36,11 @@ content-type: application/json } HTTP/1.1 200 - OK -connection: close -content-length: 97 -content-type: application/json -date: Thu, 08 Jan 2026 09:27:05 GMT +date: Wed, 17 Jun 2026 00:27:12 GMT server: uvicorn +content-length: 108 +content-type: application/json +connection: close ### POST http://127.0.0.1:8080/set_info content-type: application/json