diff --git a/src/api/endpoints.py b/src/api/endpoints.py index 9e009ea..1b766dc 100644 --- a/src/api/endpoints.py +++ b/src/api/endpoints.py @@ -10,6 +10,7 @@ from ..core.config import Config from loguru import logger import json import re +import time router = APIRouter() FORM_EXTRACT_MODULE_NAME = "文本内容提取事故信息" @@ -171,12 +172,25 @@ async def chat( ): """Handle chat completion request.""" json_data = request.model_dump() - logger.info(f"用户请求信息ProcessRequest_chat: {json_data}, stream={stream}") need_form_update = json_data.get('needFormUpdate', False) chat_variables = {'needFormUpdate': need_form_update} + request_started_at = time.perf_counter() + logger.info( + "Chat request received " + f"sessionId={json_data['sessionId']} stream={stream} " + f"needFormUpdate={need_form_update} text_len={len(json_data.get('text', ''))} " + f"input={json_data.get('text', '')!r}" + ) if stream: async def event_generator(): + stream_started_at = time.perf_counter() + first_event_logged = False + first_text_delta_logged = False + last_text_delta_at = None + text_delta_count = 0 + output_chunks = [] + form_update_payload = {} try: # Use SDK's create_chat_completion with stream=True response = await client.create_chat_completion( @@ -186,12 +200,29 @@ async def chat( detail=True, variables=chat_variables ) + logger.info( + "FastGPT stream response opened " + f"sessionId={json_data['sessionId']} " + f"open_latency_ms={(time.perf_counter() - stream_started_at) * 1000:.1f}" + ) buffer = "" state_code_found = False module_form_sent = False def flush_text_delta(text: str): + nonlocal first_text_delta_logged, last_text_delta_at, text_delta_count + now = time.perf_counter() + if not first_text_delta_logged: + first_text_delta_logged = True + logger.info( + "Chat stream first text_delta sent " + f"sessionId={json_data['sessionId']} " + f"text_delta_ttfb_ms={(now - stream_started_at) * 1000:.1f}" + ) + last_text_delta_at = now + text_delta_count += 1 + output_chunks.append(text) return create_sse_event("text_delta", {"text": text}) def flush_form_update(form_update): @@ -199,9 +230,24 @@ async def chat( async for event in aiter_stream_events(response): try: + if not first_event_logged: + first_event_logged = True + logger.info( + "FastGPT stream first event " + f"sessionId={json_data['sessionId']} kind={event.kind} " + f"ttfb_ms={(time.perf_counter() - stream_started_at) * 1000:.1f}" + ) + if event.kind == "flowResponses" and not module_form_sent: form_update = extract_form_update_from_flow_nodes(event.data) if form_update: + form_update_payload = form_update + logger.info( + "FastGPT stream formUpdate extracted " + f"sessionId={json_data['sessionId']} " + f"type={type(form_update).__name__} " + f"formUpdate={form_update!r}" + ) yield flush_form_update(form_update) module_form_sent = True continue @@ -231,6 +277,11 @@ async def chat( # Apply logic to map/adjust state code nextStageCode = normalize_stage_code(state_code) nextStage = STATUS_CODE_MAP.get(nextStageCode, '') + logger.info( + "FastGPT stream stage_code parsed " + f"sessionId={json_data['sessionId']} " + f"nextStageCode={nextStageCode} nextStage={nextStage}" + ) # Send stage code event yield create_sse_event("stage_code", { @@ -256,12 +307,32 @@ async def chat( # If stream ends and no state code found (unlikely if format is strict), # we might want to send what we have if not state_code_found and buffer: - yield create_sse_event("text_delta", {"text": buffer}) + yield flush_text_delta(buffer) + text_delta_end_ms = ( + f"{(last_text_delta_at - stream_started_at) * 1000:.1f}" + if last_text_delta_at is not None + else "-" + ) + logger.info( + "Chat stream completed " + f"sessionId={json_data['sessionId']} " + f"duration_ms={(time.perf_counter() - stream_started_at) * 1000:.1f} " + f"text_delta_end_ms={text_delta_end_ms} " + f"text_delta_count={text_delta_count} " + f"stage_code_found={state_code_found} formUpdate_sent={module_form_sent} " + f"output={''.join(output_chunks)!r} " + f"formUpdate={form_update_payload!r}" + ) yield create_sse_event("done", {"status": "completed"}) except Exception as e: - logger.error(f"Streaming error: {e}") + logger.error( + "Chat stream failed " + f"sessionId={json_data['sessionId']} " + f"duration_ms={(time.perf_counter() - stream_started_at) * 1000:.1f} " + f"error={e}" + ) yield create_sse_event("error", {"msg": str(e), "code": "500"}) return StreamingResponse(event_generator(), media_type="text/event-stream") @@ -277,6 +348,11 @@ async def chat( ) response.raise_for_status() data = response.json() + logger.info( + "FastGPT non-stream response received " + f"sessionId={json_data['sessionId']} " + f"latency_ms={(time.perf_counter() - request_started_at) * 1000:.1f}" + ) except AuthenticationError as e: logger.error(f"Authentication error: {e}") @@ -374,6 +450,14 @@ async def chat( nextStageCode = normalize_stage_code(nextStageCode) nextStage = STATUS_CODE_MAP.get(nextStageCode, '') form_update = extract_form_update_from_flow_nodes(data.get("responseData", [])) + logger.info( + "Chat non-stream completed " + f"sessionId={json_data['sessionId']} " + f"duration_ms={(time.perf_counter() - request_started_at) * 1000:.1f} " + f"nextStageCode={nextStageCode} nextStage={nextStage} " + f"output_len={len(content)} formUpdate_type={type(form_update).__name__} " + f"output={content!r} formUpdate={form_update!r}" + ) return ProcessResponse_chat( sessionId=json_data['sessionId'],