Compare commits
1 Commits
32c491cd3f
...
fastgpt-py
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
30306addee |
78
examples/stream_chat.py
Normal file
78
examples/stream_chat.py
Normal file
@@ -0,0 +1,78 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Simple CLI script to interact with /chat endpoint in stream mode.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import aiohttp
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
|
||||||
|
API_BASE_URL = "http://localhost:8000"
|
||||||
|
|
||||||
|
|
||||||
|
async def stream_chat(session_id: str, text: str):
|
||||||
|
"""Send a streaming chat request."""
|
||||||
|
timestamp = datetime.now().isoformat()
|
||||||
|
|
||||||
|
payload = {
|
||||||
|
"sessionId": session_id,
|
||||||
|
"timeStamp": timestamp,
|
||||||
|
"text": text
|
||||||
|
}
|
||||||
|
|
||||||
|
async with aiohttp.ClientSession() as http_session:
|
||||||
|
async with http_session.post(
|
||||||
|
f"{API_BASE_URL}/chat",
|
||||||
|
json=payload,
|
||||||
|
params={"stream": "true"},
|
||||||
|
) as response:
|
||||||
|
print(f"Status: {response.status}")
|
||||||
|
print("-" * 50)
|
||||||
|
|
||||||
|
buffer = ""
|
||||||
|
# Use async iterator on response.content
|
||||||
|
async for chunk in response.content.iter_chunked(1024):
|
||||||
|
chunk_str = chunk.decode("utf-8")
|
||||||
|
buffer += chunk_str
|
||||||
|
|
||||||
|
# Process complete SSE messages
|
||||||
|
while "\n\n" in buffer:
|
||||||
|
message, buffer = buffer.split("\n\n", 1)
|
||||||
|
if message.startswith("event: "):
|
||||||
|
event_type = message[7:].split("\n")[0]
|
||||||
|
data_line = message.split("data: ", 1)[-1]
|
||||||
|
try:
|
||||||
|
data = json.loads(data_line)
|
||||||
|
print(f"[{event_type}] {json.dumps(data, ensure_ascii=False)}")
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
print(f"[{event_type}] {data_line}")
|
||||||
|
elif message.startswith("data: "):
|
||||||
|
data_str = message[6:]
|
||||||
|
try:
|
||||||
|
data = json.loads(data_str)
|
||||||
|
print(f"[data] {json.dumps(data, ensure_ascii=False)}")
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
print(f"[data] {data_str}")
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
if len(sys.argv) < 3:
|
||||||
|
print("Usage: python stream_chat.py <session_id> <message>")
|
||||||
|
print("Example: python stream_chat.py test-session-123 '发生了交通事故'")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
session_id = sys.argv[1]
|
||||||
|
text = " ".join(sys.argv[2:])
|
||||||
|
|
||||||
|
print(f"Session ID: {session_id}")
|
||||||
|
print(f"Message: {text}")
|
||||||
|
print("-" * 50)
|
||||||
|
|
||||||
|
await stream_chat(session_id, text)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(main())
|
||||||
@@ -1,4 +1,5 @@
|
|||||||
from fastapi import APIRouter, HTTPException, Depends
|
from fastapi import APIRouter, HTTPException, Depends
|
||||||
|
from fastapi.responses import StreamingResponse
|
||||||
from ..schemas.models import ProcessRequest_chat, ProcessResponse_chat, ProcessRequest_get, ProcessResponse_get, ProcessRequest_set, ProcessResponse_set, ProcessResponse_delete_session, ProcessRequest_delete_session
|
from ..schemas.models import ProcessRequest_chat, ProcessResponse_chat, ProcessRequest_get, ProcessResponse_get, ProcessRequest_set, ProcessResponse_set, ProcessResponse_delete_session, ProcessRequest_delete_session
|
||||||
from fastgpt_client import AsyncChatClient
|
from fastgpt_client import AsyncChatClient
|
||||||
from fastgpt_client.exceptions import (
|
from fastgpt_client.exceptions import (
|
||||||
@@ -98,15 +99,104 @@ async def delete_last_two_chat_records(
|
|||||||
# logging.error(f"Validation Error: {exc.errors()}")
|
# logging.error(f"Validation Error: {exc.errors()}")
|
||||||
# raise HTTPException(status_code=422, detail=exc.errors())
|
# raise HTTPException(status_code=422, detail=exc.errors())
|
||||||
|
|
||||||
|
def create_sse_event(event: str, data: dict) -> str:
|
||||||
|
"""Format data as an SSE event."""
|
||||||
|
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
|
||||||
|
|
||||||
@router.post("/chat", response_model=ProcessResponse_chat)
|
@router.post("/chat", response_model=ProcessResponse_chat)
|
||||||
async def chat(
|
async def chat(
|
||||||
request: ProcessRequest_chat,
|
request: ProcessRequest_chat,
|
||||||
|
stream: bool = False,
|
||||||
client: AsyncChatClient = Depends(get_fastgpt_client)
|
client: AsyncChatClient = Depends(get_fastgpt_client)
|
||||||
):
|
):
|
||||||
"""Handle chat completion request."""
|
"""Handle chat completion request."""
|
||||||
json_data = request.model_dump()
|
json_data = request.model_dump()
|
||||||
logger.info(f"用户请求信息ProcessRequest_chat: {json_data}")
|
logger.info(f"用户请求信息ProcessRequest_chat: {json_data}, stream={stream}")
|
||||||
|
|
||||||
|
if stream:
|
||||||
|
async def event_generator():
|
||||||
|
try:
|
||||||
|
# Use SDK's create_chat_completion with stream=True
|
||||||
|
response = await client.create_chat_completion(
|
||||||
|
messages=[{"role": "user", "content": json_data['text']}],
|
||||||
|
chatId=json_data['sessionId'],
|
||||||
|
stream=True,
|
||||||
|
detail=True
|
||||||
|
)
|
||||||
|
|
||||||
|
buffer = ""
|
||||||
|
state_code_found = False
|
||||||
|
|
||||||
|
async for chunk in response.aiter_lines():
|
||||||
|
if chunk.startswith('data: '):
|
||||||
|
data_str = chunk[6:].strip()
|
||||||
|
if data_str == '[DONE]':
|
||||||
|
break
|
||||||
|
try:
|
||||||
|
data = json.loads(data_str)
|
||||||
|
try:
|
||||||
|
delta_content = data['choices'][0]['delta'].get('content', '')
|
||||||
|
except (KeyError, IndexError):
|
||||||
|
delta_content = ''
|
||||||
|
if delta_content:
|
||||||
|
buffer += delta_content
|
||||||
|
|
||||||
|
if not state_code_found:
|
||||||
|
# Check for <state>XXXX</state> pattern
|
||||||
|
match = re.search(r"<state>(.*?)</state>", buffer)
|
||||||
|
if match:
|
||||||
|
state_code = match.group(1)
|
||||||
|
|
||||||
|
# Apply logic to map/adjust state code
|
||||||
|
nextStageCode = state_code
|
||||||
|
if nextStageCode in ['3001', '3002', '1002']:
|
||||||
|
nextStageCode = '1002'
|
||||||
|
elif nextStageCode == '2006':
|
||||||
|
nextStageCode = '2004'
|
||||||
|
elif nextStageCode == '2017':
|
||||||
|
nextStageCode = '2016'
|
||||||
|
elif nextStageCode == '2020':
|
||||||
|
nextStageCode = '0002'
|
||||||
|
nextStage = STATUS_CODE_MAP.get(nextStageCode, '')
|
||||||
|
|
||||||
|
# Send stage code event
|
||||||
|
yield create_sse_event("stage_code", {
|
||||||
|
"nextStageCode": nextStageCode,
|
||||||
|
"nextStage": nextStage
|
||||||
|
})
|
||||||
|
|
||||||
|
state_code_found = True
|
||||||
|
|
||||||
|
# Send remaining content as text_delta
|
||||||
|
remaining_content = buffer[match.end():]
|
||||||
|
if remaining_content:
|
||||||
|
yield create_sse_event("text_delta", {"text": remaining_content})
|
||||||
|
buffer = "" # Clear buffer after extracting state
|
||||||
|
else:
|
||||||
|
# State code already found, just stream text
|
||||||
|
yield create_sse_event("text_delta", {"text": delta_content})
|
||||||
|
buffer = "" # Do not buffer text after state found
|
||||||
|
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
continue
|
||||||
|
except Exception as e:
|
||||||
|
print(data)
|
||||||
|
logger.error(f"Error processing chunk: {e}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# If stream ends and no state code found (unlikely if format is strict),
|
||||||
|
# we might want to send what we have
|
||||||
|
if not state_code_found and buffer:
|
||||||
|
yield create_sse_event("text_delta", {"text": buffer})
|
||||||
|
|
||||||
|
yield create_sse_event("done", {"status": "completed"})
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Streaming error: {e}")
|
||||||
|
yield create_sse_event("error", {"msg": str(e), "code": "500"})
|
||||||
|
|
||||||
|
return StreamingResponse(event_generator(), media_type="text/event-stream")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Use SDK's create_chat_completion
|
# Use SDK's create_chat_completion
|
||||||
response = await client.create_chat_completion(
|
response = await client.create_chat_completion(
|
||||||
|
|||||||
@@ -38,37 +38,37 @@ HTTP/1.1 200 - OK
|
|||||||
connection: close
|
connection: close
|
||||||
content-length: 97
|
content-length: 97
|
||||||
content-type: application/json
|
content-type: application/json
|
||||||
date: Thu, 08 Jan 2026 09:26:31 GMT
|
date: Thu, 08 Jan 2026 09:27:05 GMT
|
||||||
server: uvicorn
|
server: uvicorn
|
||||||
###
|
###
|
||||||
POST http://127.0.0.1:8080/set_info
|
POST http://127.0.0.1:8080/set_info
|
||||||
content-type: application/json
|
content-type: application/json
|
||||||
|
|
||||||
{
|
{
|
||||||
"sessionId": "a0009",
|
"sessionId": "a1002",
|
||||||
"timeStamp": "202503310303",
|
"timeStamp": "202503310303",
|
||||||
"key": "hphm1",
|
"key": "hphm1",
|
||||||
"value": "沪A8938"
|
"value": "沪A8939"
|
||||||
}
|
}
|
||||||
|
|
||||||
HTTP/1.1 200 - OK
|
HTTP/1.1 200 - OK
|
||||||
connection: close
|
connection: close
|
||||||
content-length: 70
|
content-length: 70
|
||||||
content-type: application/json
|
content-type: application/json
|
||||||
date: Thu, 08 Jan 2026 09:25:49 GMT
|
date: Thu, 08 Jan 2026 09:27:00 GMT
|
||||||
server: uvicorn
|
server: uvicorn
|
||||||
###
|
###
|
||||||
DELETE http://127.0.0.1:8000/delete_session
|
DELETE http://127.0.0.1:8080/delete_session
|
||||||
content-type: application/json
|
content-type: application/json
|
||||||
|
|
||||||
{
|
{
|
||||||
"sessionId": "a0009",
|
"sessionId": "a1002",
|
||||||
"timeStamp": "202503310303"
|
"timeStamp": "202503310303"
|
||||||
}
|
}
|
||||||
|
|
||||||
HTTP/1.1 200 - OK
|
HTTP/1.1 200 - OK
|
||||||
date: Fri, 20 Jun 2025 05:56:25 GMT
|
connection: close
|
||||||
server: uvicorn
|
|
||||||
content-length: 70
|
content-length: 70
|
||||||
content-type: application/json
|
content-type: application/json
|
||||||
connection: close
|
date: Thu, 08 Jan 2026 09:27:24 GMT
|
||||||
|
server: uvicorn
|
||||||
Reference in New Issue
Block a user