7.7 KiB
7.7 KiB
Streaming Example
Learn how to use streaming responses with the FastGPT Python SDK.
Why Use Streaming?
Streaming allows you to:
- Display real-time responses - Show text as it's generated
- Reduce perceived latency - Users see content immediately
- Better user experience - More interactive and engaging
Basic Streaming
import json
from fastgpt_client import ChatClient
with ChatClient(api_key="fastgpt-xxxxx") as client:
response = client.create_chat_completion(
messages=[{"role": "user", "content": "Tell me a short story"}],
stream=True
)
print("Story: ", end="", flush=True)
for line in response.iter_lines():
if line.startswith("data:"):
data = line[5:].strip()
if data and data != "[DONE]":
chunk = json.loads(data)
if "choices" in chunk and chunk["choices"]:
delta = chunk["choices"][0].get("delta", {})
content = delta.get("content", "")
if content:
print(content, end="", flush=True)
print()
Complete Streaming Example
"""Streaming chat completion example."""
import json
from fastgpt_client import ChatClient
from dotenv import load_dotenv
import os
load_dotenv()
API_KEY = os.getenv("API_KEY")
BASE_URL = os.getenv("BASE_URL")
def stream_chat():
"""Simple streaming chat completion."""
with ChatClient(api_key=API_KEY, base_url=BASE_URL) as client:
response = client.create_chat_completion(
messages=[{"role": "user", "content": "Tell me a short story about AI"}],
stream=True
)
print("\n=== Streaming Response ===\n")
for line in response.iter_lines():
if line.startswith("data:"):
data = line[5:].strip()
if data and data != "[DONE]":
chunk = json.loads(data)
if "choices" in chunk and chunk["choices"]:
delta = chunk["choices"][0].get("delta", {})
content = delta.get("content", "")
if content:
print(content, end="", flush=True)
print("\n")
def stream_with_progress():
"""Stream with progress indicator."""
with ChatClient(api_key=API_KEY, base_url=BASE_URL) as client:
response = client.create_chat_completion(
messages=[{"role": "user", "content": "Explain quantum computing"}],
stream=True
)
print("\n=== Streaming with Progress ===\n")
token_count = 0
for line in response.iter_lines():
if line.startswith("data:"):
data = line[5:].strip()
if data and data != "[DONE]":
chunk = json.loads(data)
if "choices" in chunk and chunk["choices"]:
delta = chunk["choices"][0].get("delta", {})
content = delta.get("content", "")
if content:
print(content, end="", flush=True)
token_count += 1
print(f"\n\nTotal tokens: {token_count}")
def stream_with_buffer():
"""Stream with word buffering (print complete words only)."""
with ChatClient(api_key=API_KEY, base_url=BASE_URL) as client:
response = client.create_chat_completion(
messages=[{"role": "user", "content": "What is machine learning?"}],
stream=True
)
print("\n=== Buffered Streaming ===\n")
buffer = ""
for line in response.iter_lines():
if line.startswith("data:"):
data = line[5:].strip()
if data and data != "[DONE]":
chunk = json.loads(data)
if "choices" in chunk and chunk["choices"]:
delta = chunk["choices"][0].get("delta", {})
content = delta.get("content", "")
if content:
buffer += content
# Print complete words
if " " in buffer:
parts = buffer.split(" ", 1)
print(parts[0] + " ", end="", flush=True)
buffer = parts[1] if len(parts) > 1 else ""
# Print remaining content
if buffer:
print(buffer)
print()
if __name__ == "__main__":
try:
stream_chat()
except Exception as e:
print(f"Error: {e}")
try:
stream_with_progress()
except Exception as e:
print(f"Error: {e}")
try:
stream_with_buffer()
except Exception as e:
print(f"Error: {e}")
Async Streaming
import asyncio
import json
from fastgpt_client import AsyncChatClient
async def stream_async():
"""Async streaming example."""
async with AsyncChatClient(api_key="fastgpt-xxxxx") as client:
response = await client.create_chat_completion(
messages=[{"role": "user", "content": "Tell me about async/await"}],
stream=True
)
async for line in response.aiter_lines():
if line.startswith("data:"):
data = line[5:].strip()
if data and data != "[DONE]":
chunk = json.loads(data)
if "choices" in chunk and chunk["choices"]:
delta = chunk["choices"][0].get("delta", {})
content = delta.get("content", "")
if content:
print(content, end="", flush=True)
asyncio.run(stream_async())
Streaming Event Types
FastGPT sends multiple SSE (Server-Sent Events) event types. The common ones are:
data- Standard response chunks (OpenAI-compatible)answer- Main chat response contentfastAnswer- Quick reply contentflowNodeStatus- Workflow node status updatesinteractive- Interactive node promptserror- Error events
For more details on event types, see Streaming Events.
Best Practices
- Always flush output: Use
flush=Truewhen printing - Handle connection errors: Streaming can fail mid-response
- Use context managers: Ensures proper cleanup
- Buffer for better formatting: Consider buffering for word boundaries
import time
def robust_stream():
"""Handle streaming errors gracefully."""
try:
with ChatClient(api_key=API_KEY) as client:
response = client.create_chat_completion(
messages=[{"role": "user", "content": "Hello"}],
stream=True
)
for line in response.iter_lines():
if line:
try:
data = line[5:].strip() if line.startswith("data:") else line
if data and data != "[DONE]":
chunk = json.loads(data)
# Process chunk
if "choices" in chunk and chunk["choices"]:
delta = chunk["choices"][0].get("delta", {})
content = delta.get("content", "")
if content:
print(content, end="", flush=True)
except json.JSONDecodeError:
continue
except Exception as e:
print(f"\nStreaming error: {e}")
See Also
- Streaming Events - Advanced SSE event handling
- Async Usage - Async streaming examples
- Error Handling - Robust error handling