#!/usr/bin/env python3 """ Stream Chat CLI - 与 /chat 端点进行流式交互的脚本。 用法: python stream_chat.py <消息> 示例: python stream_chat.py test-001 "发生了交通事故" 输出说明: - [stage_code]: 阶段状态码,如 {"nextStageCode": "0000", "nextStage": "结束通话"} - [text_delta]: 流式文本片段 - [done]: 流式结束 - [error]: 错误信息 """ import asyncio import aiohttp import json import sys from datetime import datetime API_BASE_URL = "http://localhost:8000" async def stream_chat(session_id: str, text: str): """Send a streaming chat request.""" timestamp = datetime.now().isoformat() payload = { "sessionId": session_id, "timeStamp": timestamp, "text": text } async with aiohttp.ClientSession() as http_session: async with http_session.post( f"{API_BASE_URL}/chat", json=payload, params={"stream": "true"}, ) as response: print(f"Status: {response.status}") print("-" * 50) buffer = "" # Use async iterator on response.content async for chunk in response.content.iter_chunked(1024): chunk_str = chunk.decode("utf-8") buffer += chunk_str # Process complete SSE messages while "\n\n" in buffer: message, buffer = buffer.split("\n\n", 1) if message.startswith("event: "): event_type = message[7:].split("\n")[0] data_line = message.split("data: ", 1)[-1] try: data = json.loads(data_line) print(f"[{event_type}] {json.dumps(data, ensure_ascii=False)}") except json.JSONDecodeError: print(f"[{event_type}] {data_line}") elif message.startswith("data: "): data_str = message[6:] try: data = json.loads(data_str) print(f"[data] {json.dumps(data, ensure_ascii=False)}") except json.JSONDecodeError: print(f"[data] {data_str}") async def main(): if len(sys.argv) < 3: print("Usage: python stream_chat.py ") print("Example: python stream_chat.py test-session-123 '发生了交通事故'") sys.exit(1) session_id = sys.argv[1] text = " ".join(sys.argv[2:]) print(f"Session ID: {session_id}") print(f"Message: {text}") print("-" * 50) await stream_chat(session_id, text) if __name__ == "__main__": asyncio.run(main())