Files
ZNJJ-api-server/examples/stream_chat.py

91 lines
2.7 KiB
Python

#!/usr/bin/env python3
"""
Stream Chat CLI - 与 /chat 端点进行流式交互的脚本。
用法:
python stream_chat.py <session_id> <消息>
示例:
python stream_chat.py test-001 "发生了交通事故"
输出说明:
- [stage_code]: 阶段状态码,如 {"nextStageCode": "0000", "nextStage": "结束通话"}
- [text_delta]: 流式文本片段
- [done]: 流式结束
- [error]: 错误信息
"""
import asyncio
import aiohttp
import json
import sys
from datetime import datetime
API_BASE_URL = "http://localhost:8000"
async def stream_chat(session_id: str, text: str):
"""Send a streaming chat request."""
timestamp = datetime.now().isoformat()
payload = {
"sessionId": session_id,
"timeStamp": timestamp,
"text": text
}
async with aiohttp.ClientSession() as http_session:
async with http_session.post(
f"{API_BASE_URL}/chat",
json=payload,
params={"stream": "true"},
) as response:
print(f"Status: {response.status}")
print("-" * 50)
buffer = ""
# Use async iterator on response.content
async for chunk in response.content.iter_chunked(1024):
chunk_str = chunk.decode("utf-8")
buffer += chunk_str
# Process complete SSE messages
while "\n\n" in buffer:
message, buffer = buffer.split("\n\n", 1)
if message.startswith("event: "):
event_type = message[7:].split("\n")[0]
data_line = message.split("data: ", 1)[-1]
try:
data = json.loads(data_line)
print(f"[{event_type}] {json.dumps(data, ensure_ascii=False)}")
except json.JSONDecodeError:
print(f"[{event_type}] {data_line}")
elif message.startswith("data: "):
data_str = message[6:]
try:
data = json.loads(data_str)
print(f"[data] {json.dumps(data, ensure_ascii=False)}")
except json.JSONDecodeError:
print(f"[data] {data_str}")
async def main():
if len(sys.argv) < 3:
print("Usage: python stream_chat.py <session_id> <message>")
print("Example: python stream_chat.py test-session-123 '发生了交通事故'")
sys.exit(1)
session_id = sys.argv[1]
text = " ".join(sys.argv[2:])
print(f"Session ID: {session_id}")
print(f"Message: {text}")
print("-" * 50)
await stream_chat(session_id, text)
if __name__ == "__main__":
asyncio.run(main())