From 30306addee820abf3c80991696e498bf889713a3 Mon Sep 17 00:00:00 2001 From: Xin Wang Date: Mon, 2 Feb 2026 17:54:55 +0800 Subject: [PATCH] Add stream text output --- examples/stream_chat.py | 78 ++++++++++++++++++++++++++++++++++ src/api/endpoints.py | 92 ++++++++++++++++++++++++++++++++++++++++- test/api/fastapi.http | 18 ++++---- 3 files changed, 178 insertions(+), 10 deletions(-) create mode 100644 examples/stream_chat.py diff --git a/examples/stream_chat.py b/examples/stream_chat.py new file mode 100644 index 0000000..0264912 --- /dev/null +++ b/examples/stream_chat.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python3 +""" +Simple CLI script to interact with /chat endpoint in stream mode. +""" + +import asyncio +import aiohttp +import json +import sys +from datetime import datetime + + +API_BASE_URL = "http://localhost:8000" + + +async def stream_chat(session_id: str, text: str): + """Send a streaming chat request.""" + timestamp = datetime.now().isoformat() + + payload = { + "sessionId": session_id, + "timeStamp": timestamp, + "text": text + } + + async with aiohttp.ClientSession() as http_session: + async with http_session.post( + f"{API_BASE_URL}/chat", + json=payload, + params={"stream": "true"}, + ) as response: + print(f"Status: {response.status}") + print("-" * 50) + + buffer = "" + # Use async iterator on response.content + async for chunk in response.content.iter_chunked(1024): + chunk_str = chunk.decode("utf-8") + buffer += chunk_str + + # Process complete SSE messages + while "\n\n" in buffer: + message, buffer = buffer.split("\n\n", 1) + if message.startswith("event: "): + event_type = message[7:].split("\n")[0] + data_line = message.split("data: ", 1)[-1] + try: + data = json.loads(data_line) + print(f"[{event_type}] {json.dumps(data, ensure_ascii=False)}") + except json.JSONDecodeError: + print(f"[{event_type}] {data_line}") + elif message.startswith("data: "): + data_str = message[6:] + try: + data = json.loads(data_str) + print(f"[data] {json.dumps(data, ensure_ascii=False)}") + except json.JSONDecodeError: + print(f"[data] {data_str}") + + +async def main(): + if len(sys.argv) < 3: + print("Usage: python stream_chat.py ") + print("Example: python stream_chat.py test-session-123 '发生了交通事故'") + sys.exit(1) + + session_id = sys.argv[1] + text = " ".join(sys.argv[2:]) + + print(f"Session ID: {session_id}") + print(f"Message: {text}") + print("-" * 50) + + await stream_chat(session_id, text) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/api/endpoints.py b/src/api/endpoints.py index 72e053a..2522f65 100644 --- a/src/api/endpoints.py +++ b/src/api/endpoints.py @@ -1,4 +1,5 @@ from fastapi import APIRouter, HTTPException, Depends +from fastapi.responses import StreamingResponse from ..schemas.models import ProcessRequest_chat, ProcessResponse_chat, ProcessRequest_get, ProcessResponse_get, ProcessRequest_set, ProcessResponse_set, ProcessResponse_delete_session, ProcessRequest_delete_session from fastgpt_client import AsyncChatClient from fastgpt_client.exceptions import ( @@ -98,15 +99,104 @@ async def delete_last_two_chat_records( # logging.error(f"Validation Error: {exc.errors()}") # raise HTTPException(status_code=422, detail=exc.errors()) +def create_sse_event(event: str, data: dict) -> str: + """Format data as an SSE event.""" + return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" + @router.post("/chat", response_model=ProcessResponse_chat) async def chat( request: ProcessRequest_chat, + stream: bool = False, client: AsyncChatClient = Depends(get_fastgpt_client) ): """Handle chat completion request.""" json_data = request.model_dump() - logger.info(f"用户请求信息ProcessRequest_chat: {json_data}") + logger.info(f"用户请求信息ProcessRequest_chat: {json_data}, stream={stream}") + if stream: + async def event_generator(): + try: + # Use SDK's create_chat_completion with stream=True + response = await client.create_chat_completion( + messages=[{"role": "user", "content": json_data['text']}], + chatId=json_data['sessionId'], + stream=True, + detail=True + ) + + buffer = "" + state_code_found = False + + async for chunk in response.aiter_lines(): + if chunk.startswith('data: '): + data_str = chunk[6:].strip() + if data_str == '[DONE]': + break + try: + data = json.loads(data_str) + try: + delta_content = data['choices'][0]['delta'].get('content', '') + except (KeyError, IndexError): + delta_content = '' + if delta_content: + buffer += delta_content + + if not state_code_found: + # Check for XXXX pattern + match = re.search(r"(.*?)", buffer) + if match: + state_code = match.group(1) + + # Apply logic to map/adjust state code + nextStageCode = state_code + if nextStageCode in ['3001', '3002', '1002']: + nextStageCode = '1002' + elif nextStageCode == '2006': + nextStageCode = '2004' + elif nextStageCode == '2017': + nextStageCode = '2016' + elif nextStageCode == '2020': + nextStageCode = '0002' + nextStage = STATUS_CODE_MAP.get(nextStageCode, '') + + # Send stage code event + yield create_sse_event("stage_code", { + "nextStageCode": nextStageCode, + "nextStage": nextStage + }) + + state_code_found = True + + # Send remaining content as text_delta + remaining_content = buffer[match.end():] + if remaining_content: + yield create_sse_event("text_delta", {"text": remaining_content}) + buffer = "" # Clear buffer after extracting state + else: + # State code already found, just stream text + yield create_sse_event("text_delta", {"text": delta_content}) + buffer = "" # Do not buffer text after state found + + except json.JSONDecodeError: + continue + except Exception as e: + print(data) + logger.error(f"Error processing chunk: {e}") + continue + + # If stream ends and no state code found (unlikely if format is strict), + # we might want to send what we have + if not state_code_found and buffer: + yield create_sse_event("text_delta", {"text": buffer}) + + yield create_sse_event("done", {"status": "completed"}) + + except Exception as e: + logger.error(f"Streaming error: {e}") + yield create_sse_event("error", {"msg": str(e), "code": "500"}) + + return StreamingResponse(event_generator(), media_type="text/event-stream") + try: # Use SDK's create_chat_completion response = await client.create_chat_completion( diff --git a/test/api/fastapi.http b/test/api/fastapi.http index dd2ef42..41abd74 100644 --- a/test/api/fastapi.http +++ b/test/api/fastapi.http @@ -38,37 +38,37 @@ HTTP/1.1 200 - OK connection: close content-length: 97 content-type: application/json -date: Thu, 08 Jan 2026 09:26:31 GMT +date: Thu, 08 Jan 2026 09:27:05 GMT server: uvicorn ### POST http://127.0.0.1:8080/set_info content-type: application/json { - "sessionId": "a0009", + "sessionId": "a1002", "timeStamp": "202503310303", "key": "hphm1", - "value": "沪A8938" + "value": "沪A8939" } HTTP/1.1 200 - OK connection: close content-length: 70 content-type: application/json -date: Thu, 08 Jan 2026 09:25:49 GMT +date: Thu, 08 Jan 2026 09:27:00 GMT server: uvicorn ### -DELETE http://127.0.0.1:8000/delete_session +DELETE http://127.0.0.1:8080/delete_session content-type: application/json { - "sessionId": "a0009", + "sessionId": "a1002", "timeStamp": "202503310303" } HTTP/1.1 200 - OK -date: Fri, 20 Jun 2025 05:56:25 GMT -server: uvicorn +connection: close content-length: 70 content-type: application/json -connection: close \ No newline at end of file +date: Thu, 08 Jan 2026 09:27:24 GMT +server: uvicorn \ No newline at end of file