Enhance logging and performance tracking in chat endpoint

- Added detailed logging for chat request processing, including session ID, input text length, and latency measurements.
- Implemented performance tracking for streaming events, capturing first event and text delta timings.
- Improved error handling logging to include session context and duration on failures.
- Updated non-stream response logging to include latency and output details for better debugging.
This commit is contained in:
Xin Wang
2026-06-17 13:46:32 +08:00
parent 7666759121
commit 084e13e03c

View File

@@ -10,6 +10,7 @@ from ..core.config import Config
from loguru import logger
import json
import re
import time
router = APIRouter()
FORM_EXTRACT_MODULE_NAME = "文本内容提取事故信息"
@@ -171,12 +172,25 @@ async def chat(
):
"""Handle chat completion request."""
json_data = request.model_dump()
logger.info(f"用户请求信息ProcessRequest_chat: {json_data}, stream={stream}")
need_form_update = json_data.get('needFormUpdate', False)
chat_variables = {'needFormUpdate': need_form_update}
request_started_at = time.perf_counter()
logger.info(
"Chat request received "
f"sessionId={json_data['sessionId']} stream={stream} "
f"needFormUpdate={need_form_update} text_len={len(json_data.get('text', ''))} "
f"input={json_data.get('text', '')!r}"
)
if stream:
async def event_generator():
stream_started_at = time.perf_counter()
first_event_logged = False
first_text_delta_logged = False
last_text_delta_at = None
text_delta_count = 0
output_chunks = []
form_update_payload = {}
try:
# Use SDK's create_chat_completion with stream=True
response = await client.create_chat_completion(
@@ -186,12 +200,29 @@ async def chat(
detail=True,
variables=chat_variables
)
logger.info(
"FastGPT stream response opened "
f"sessionId={json_data['sessionId']} "
f"open_latency_ms={(time.perf_counter() - stream_started_at) * 1000:.1f}"
)
buffer = ""
state_code_found = False
module_form_sent = False
def flush_text_delta(text: str):
nonlocal first_text_delta_logged, last_text_delta_at, text_delta_count
now = time.perf_counter()
if not first_text_delta_logged:
first_text_delta_logged = True
logger.info(
"Chat stream first text_delta sent "
f"sessionId={json_data['sessionId']} "
f"text_delta_ttfb_ms={(now - stream_started_at) * 1000:.1f}"
)
last_text_delta_at = now
text_delta_count += 1
output_chunks.append(text)
return create_sse_event("text_delta", {"text": text})
def flush_form_update(form_update):
@@ -199,9 +230,24 @@ async def chat(
async for event in aiter_stream_events(response):
try:
if not first_event_logged:
first_event_logged = True
logger.info(
"FastGPT stream first event "
f"sessionId={json_data['sessionId']} kind={event.kind} "
f"ttfb_ms={(time.perf_counter() - stream_started_at) * 1000:.1f}"
)
if event.kind == "flowResponses" and not module_form_sent:
form_update = extract_form_update_from_flow_nodes(event.data)
if form_update:
form_update_payload = form_update
logger.info(
"FastGPT stream formUpdate extracted "
f"sessionId={json_data['sessionId']} "
f"type={type(form_update).__name__} "
f"formUpdate={form_update!r}"
)
yield flush_form_update(form_update)
module_form_sent = True
continue
@@ -231,6 +277,11 @@ async def chat(
# Apply logic to map/adjust state code
nextStageCode = normalize_stage_code(state_code)
nextStage = STATUS_CODE_MAP.get(nextStageCode, '')
logger.info(
"FastGPT stream stage_code parsed "
f"sessionId={json_data['sessionId']} "
f"nextStageCode={nextStageCode} nextStage={nextStage}"
)
# Send stage code event
yield create_sse_event("stage_code", {
@@ -256,12 +307,32 @@ async def chat(
# If stream ends and no state code found (unlikely if format is strict),
# we might want to send what we have
if not state_code_found and buffer:
yield create_sse_event("text_delta", {"text": buffer})
yield flush_text_delta(buffer)
text_delta_end_ms = (
f"{(last_text_delta_at - stream_started_at) * 1000:.1f}"
if last_text_delta_at is not None
else "-"
)
logger.info(
"Chat stream completed "
f"sessionId={json_data['sessionId']} "
f"duration_ms={(time.perf_counter() - stream_started_at) * 1000:.1f} "
f"text_delta_end_ms={text_delta_end_ms} "
f"text_delta_count={text_delta_count} "
f"stage_code_found={state_code_found} formUpdate_sent={module_form_sent} "
f"output={''.join(output_chunks)!r} "
f"formUpdate={form_update_payload!r}"
)
yield create_sse_event("done", {"status": "completed"})
except Exception as e:
logger.error(f"Streaming error: {e}")
logger.error(
"Chat stream failed "
f"sessionId={json_data['sessionId']} "
f"duration_ms={(time.perf_counter() - stream_started_at) * 1000:.1f} "
f"error={e}"
)
yield create_sse_event("error", {"msg": str(e), "code": "500"})
return StreamingResponse(event_generator(), media_type="text/event-stream")
@@ -277,6 +348,11 @@ async def chat(
)
response.raise_for_status()
data = response.json()
logger.info(
"FastGPT non-stream response received "
f"sessionId={json_data['sessionId']} "
f"latency_ms={(time.perf_counter() - request_started_at) * 1000:.1f}"
)
except AuthenticationError as e:
logger.error(f"Authentication error: {e}")
@@ -374,6 +450,14 @@ async def chat(
nextStageCode = normalize_stage_code(nextStageCode)
nextStage = STATUS_CODE_MAP.get(nextStageCode, '')
form_update = extract_form_update_from_flow_nodes(data.get("responseData", []))
logger.info(
"Chat non-stream completed "
f"sessionId={json_data['sessionId']} "
f"duration_ms={(time.perf_counter() - request_started_at) * 1000:.1f} "
f"nextStageCode={nextStageCode} nextStage={nextStage} "
f"output_len={len(content)} formUpdate_type={type(form_update).__name__} "
f"output={content!r} formUpdate={form_update!r}"
)
return ProcessResponse_chat(
sessionId=json_data['sessionId'],