Compare commits

...

1 Commits

Author SHA1 Message Date
Xin Wang
30306addee Add stream text output 2026-02-02 17:54:55 +08:00
3 changed files with 178 additions and 10 deletions

78
examples/stream_chat.py Normal file
View File

@@ -0,0 +1,78 @@
#!/usr/bin/env python3
"""
Simple CLI script to interact with /chat endpoint in stream mode.
"""
import asyncio
import aiohttp
import json
import sys
from datetime import datetime
API_BASE_URL = "http://localhost:8000"
async def stream_chat(session_id: str, text: str):
"""Send a streaming chat request."""
timestamp = datetime.now().isoformat()
payload = {
"sessionId": session_id,
"timeStamp": timestamp,
"text": text
}
async with aiohttp.ClientSession() as http_session:
async with http_session.post(
f"{API_BASE_URL}/chat",
json=payload,
params={"stream": "true"},
) as response:
print(f"Status: {response.status}")
print("-" * 50)
buffer = ""
# Use async iterator on response.content
async for chunk in response.content.iter_chunked(1024):
chunk_str = chunk.decode("utf-8")
buffer += chunk_str
# Process complete SSE messages
while "\n\n" in buffer:
message, buffer = buffer.split("\n\n", 1)
if message.startswith("event: "):
event_type = message[7:].split("\n")[0]
data_line = message.split("data: ", 1)[-1]
try:
data = json.loads(data_line)
print(f"[{event_type}] {json.dumps(data, ensure_ascii=False)}")
except json.JSONDecodeError:
print(f"[{event_type}] {data_line}")
elif message.startswith("data: "):
data_str = message[6:]
try:
data = json.loads(data_str)
print(f"[data] {json.dumps(data, ensure_ascii=False)}")
except json.JSONDecodeError:
print(f"[data] {data_str}")
async def main():
if len(sys.argv) < 3:
print("Usage: python stream_chat.py <session_id> <message>")
print("Example: python stream_chat.py test-session-123 '发生了交通事故'")
sys.exit(1)
session_id = sys.argv[1]
text = " ".join(sys.argv[2:])
print(f"Session ID: {session_id}")
print(f"Message: {text}")
print("-" * 50)
await stream_chat(session_id, text)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,4 +1,5 @@
from fastapi import APIRouter, HTTPException, Depends
from fastapi.responses import StreamingResponse
from ..schemas.models import ProcessRequest_chat, ProcessResponse_chat, ProcessRequest_get, ProcessResponse_get, ProcessRequest_set, ProcessResponse_set, ProcessResponse_delete_session, ProcessRequest_delete_session
from fastgpt_client import AsyncChatClient
from fastgpt_client.exceptions import (
@@ -98,14 +99,103 @@ async def delete_last_two_chat_records(
# logging.error(f"Validation Error: {exc.errors()}")
# raise HTTPException(status_code=422, detail=exc.errors())
def create_sse_event(event: str, data: dict) -> str:
"""Format data as an SSE event."""
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
@router.post("/chat", response_model=ProcessResponse_chat)
async def chat(
request: ProcessRequest_chat,
stream: bool = False,
client: AsyncChatClient = Depends(get_fastgpt_client)
):
"""Handle chat completion request."""
json_data = request.model_dump()
logger.info(f"用户请求信息ProcessRequest_chat: {json_data}")
logger.info(f"用户请求信息ProcessRequest_chat: {json_data}, stream={stream}")
if stream:
async def event_generator():
try:
# Use SDK's create_chat_completion with stream=True
response = await client.create_chat_completion(
messages=[{"role": "user", "content": json_data['text']}],
chatId=json_data['sessionId'],
stream=True,
detail=True
)
buffer = ""
state_code_found = False
async for chunk in response.aiter_lines():
if chunk.startswith('data: '):
data_str = chunk[6:].strip()
if data_str == '[DONE]':
break
try:
data = json.loads(data_str)
try:
delta_content = data['choices'][0]['delta'].get('content', '')
except (KeyError, IndexError):
delta_content = ''
if delta_content:
buffer += delta_content
if not state_code_found:
# Check for <state>XXXX</state> pattern
match = re.search(r"<state>(.*?)</state>", buffer)
if match:
state_code = match.group(1)
# Apply logic to map/adjust state code
nextStageCode = state_code
if nextStageCode in ['3001', '3002', '1002']:
nextStageCode = '1002'
elif nextStageCode == '2006':
nextStageCode = '2004'
elif nextStageCode == '2017':
nextStageCode = '2016'
elif nextStageCode == '2020':
nextStageCode = '0002'
nextStage = STATUS_CODE_MAP.get(nextStageCode, '')
# Send stage code event
yield create_sse_event("stage_code", {
"nextStageCode": nextStageCode,
"nextStage": nextStage
})
state_code_found = True
# Send remaining content as text_delta
remaining_content = buffer[match.end():]
if remaining_content:
yield create_sse_event("text_delta", {"text": remaining_content})
buffer = "" # Clear buffer after extracting state
else:
# State code already found, just stream text
yield create_sse_event("text_delta", {"text": delta_content})
buffer = "" # Do not buffer text after state found
except json.JSONDecodeError:
continue
except Exception as e:
print(data)
logger.error(f"Error processing chunk: {e}")
continue
# If stream ends and no state code found (unlikely if format is strict),
# we might want to send what we have
if not state_code_found and buffer:
yield create_sse_event("text_delta", {"text": buffer})
yield create_sse_event("done", {"status": "completed"})
except Exception as e:
logger.error(f"Streaming error: {e}")
yield create_sse_event("error", {"msg": str(e), "code": "500"})
return StreamingResponse(event_generator(), media_type="text/event-stream")
try:
# Use SDK's create_chat_completion

View File

@@ -38,37 +38,37 @@ HTTP/1.1 200 - OK
connection: close
content-length: 97
content-type: application/json
date: Thu, 08 Jan 2026 09:26:31 GMT
date: Thu, 08 Jan 2026 09:27:05 GMT
server: uvicorn
###
POST http://127.0.0.1:8080/set_info
content-type: application/json
{
"sessionId": "a0009",
"sessionId": "a1002",
"timeStamp": "202503310303",
"key": "hphm1",
"value": "沪A8938"
"value": "沪A8939"
}
HTTP/1.1 200 - OK
connection: close
content-length: 70
content-type: application/json
date: Thu, 08 Jan 2026 09:25:49 GMT
date: Thu, 08 Jan 2026 09:27:00 GMT
server: uvicorn
###
DELETE http://127.0.0.1:8000/delete_session
DELETE http://127.0.0.1:8080/delete_session
content-type: application/json
{
"sessionId": "a0009",
"sessionId": "a1002",
"timeStamp": "202503310303"
}
HTTP/1.1 200 - OK
date: Fri, 20 Jun 2025 05:56:25 GMT
server: uvicorn
connection: close
content-length: 70
content-type: application/json
connection: close
date: Thu, 08 Jan 2026 09:27:24 GMT
server: uvicorn