Files
fastgpt-python-sdk/docs/advanced/streaming_events.md
2026-01-08 17:35:21 +08:00

7.8 KiB

Streaming Events

FastGPT uses Server-Sent Events (SSE) for streaming responses. This guide covers all event types you may encounter.

SSE Format

FastGPT sends events in this format:

event: eventType
data: {"key": "value"}

event: anotherEvent
data: {"key": "value"}

Event Types

1. data Event

The main streaming event, compatible with OpenAI's format:

import json

for line in response.iter_lines():
    if line.startswith("data:"):
        data = line[5:].strip()
        if data and data != "[DONE]":
            chunk = json.loads(data)
            # Process OpenAI-compatible response
            if "choices" in chunk and chunk["choices"]:
                delta = chunk["choices"][0].get("delta", {})
                content = delta.get("content", "")
                if content:
                    print(content, end="", flush=True)

2. answer Event

Main chat response content (alternative format):

for line in response.iter_lines():
    if line.startswith("event:answer"):
        # Next line contains the data
        answer_data = json.loads(next_line[5:])
        print(answer_data.get("text", ""), end="", flush=True)

3. fastAnswer Event

Quick reply content:

for line in response.iter_lines():
    if line.startswith("event:fastAnswer"):
        fast_answer_data = json.loads(next_line[5:])
        print(f"Quick reply: {fast_answer_data}")

4. flowNodeStatus Event

Workflow node status updates:

for line in response.iter_lines():
    if line.startswith("event:flowNodeStatus"):
        status_data = json.loads(next_line[5:])
        status = status_data.get("status")  # "running", "completed", "error"
        node_name = status_data.get("nodeName")
        print(f"[{status.upper()}] {node_name}")

5. flowResponses Event

Complete node response data (requires detail=True):

client.create_chat_completion(
    messages=[{"role": "user", "content": "Hello"}],
    stream=True,
    detail=True  # Enable detailed responses
)

# Then in the stream:
for line in response.iter_lines():
    if line.startswith("event:flowResponses"):
        response_data = json.loads(next_line[5:])
        # Contains module execution details
        module_name = response_data.get("moduleName")
        tokens = response_data.get("tokens")
        print(f"Module: {module_name}, Tokens: {tokens}")

6. interactive Event

Interactive node (requires user input or selection):

for line in response.iter_lines():
    if line.startswith("event:interactive"):
        interactive_data = json.loads(next_line[5:])
        interactive_type = interactive_data.get("type")

        if interactive_type == "userSelect":
            options = interactive_data["params"]["userSelectOptions"]
            print("Please select an option:")
            for i, option in enumerate(options):
                print(f"{i + 1}. {option['value']}")

        elif interactive_type == "userInput":
            form_fields = interactive_data["params"]["inputForm"]
            print("Please provide the following information:")
            for field in form_fields:
                print(f"- {field['label']}")

7. updateVariables Event

Variable updates during execution:

for line in response.iter_lines():
    if line.startswith("event:updateVariables"):
        var_data = json.loads(next_line[5:])
        variables = var_data.get("variables", {})
        print(f"Variables updated: {variables}")

8. error Event

Error events:

for line in response.iter_lines():
    if line.startswith("event:error"):
        error_data = json.loads(next_line[5:])
        error_message = error_data.get("message", "Unknown error")
        error_type = error_data.get("type", "Error")
        print(f"Error [{error_type}]: {error_message}")

9. toolCall, toolParams, toolResponse Events

Tool/agent operation events:

for line in response.iter_lines():
    if line.startswith("event:toolCall"):
        tool_data = json.loads(next_line[5:])
        tool_name = tool_data.get("toolName")
        print(f"Tool called: {tool_name}")

    elif line.startswith("event:toolParams"):
        params_data = json.loads(next_line[5:])
        print(f"Tool parameters: {params_data}")

    elif line.startswith("event:toolResponse"):
        response_data = json.loads(next_line[5:])
        print(f"Tool response: {response_data}")

Complete Event Handler

import json
from fastgpt_client import ChatClient

def handle_all_events(response):
    """Handle all streaming event types."""

    buffer = ""
    current_event = None

    for line in response.iter_lines():
        if not line:
            continue

        # Event type line
        if line.startswith("event:"):
            current_event = line[6:].strip()

        # Data line
        elif line.startswith("data:"):
            data = line[5:].strip()
            if not data or data == "[DONE]":
                continue

            try:
                data_obj = json.loads(data)
            except json.JSONDecodeError:
                continue

            # Handle based on event type
            if current_event is None:
                # Default: OpenAI-compatible format
                if "choices" in data_obj and data_obj["choices"]:
                    delta = data_obj["choices"][0].get("delta", {})
                    content = delta.get("content", "")
                    if content:
                        buffer += content
                        print(content, end="", flush=True)

            elif current_event == "answer":
                text = data_obj.get("text", "")
                if text:
                    buffer += text
                    print(text, end="", flush=True)

            elif current_event == "flowNodeStatus":
                status = data_obj.get("status")
                node = data_obj.get("nodeName", "Unknown")
                print(f"\n[{status.upper()}] {node}")

            elif current_event == "interactive":
                interactive_type = data_obj.get("type")
                print(f"\n[INTERACTIVE] {interactive_type}")
                print(f"Details: {data_obj.get('params', {})}")

            elif current_event == "error":
                print(f"\n[ERROR] {data_obj.get('message', 'Unknown error')}")

            elif current_event == "toolCall":
                print(f"\n[TOOL] Calling: {data_obj.get('toolName')}")

            # Reset event
            current_event = None

    return buffer


# Usage
with ChatClient(api_key="fastgpt-xxxxx") as client:
    response = client.create_chat_completion(
        messages=[{"role": "user", "content": "Hello"}],
        stream=True,
        detail=True  # Enable flow responses
    )

    full_response = handle_all_events(response)
    print(f"\n\nFull response: {full_response}")

Event Flow Example

A typical streaming conversation might generate events like:

event:flowNodeStatus
data:{"status": "running", "nodeName": "Chat Node"}

event:answer
data:{"text": "Hello"}

event:answer
data:{"text": "! How"}

event:answer
data:{"text": " can I help"}

event:flowNodeStatus
data:{"status": "completed", "nodeName": "Chat Node"}

event:flowResponses
data:{"moduleName": "Chat Node", "tokens": 50}

data:{"choices": [{"delta": {"content": "Hello! How can I help"}}], "usage": {"total_tokens": 50}}

data:[DONE]

Best Practices

  1. Handle [DONE] - Check for end of stream
  2. Validate JSON - Use try/except for parsing
  3. Buffer content - Accumulate text for display
  4. Handle errors - Watch for error events
  5. Check event types - Use startswith("event:") to detect events

See Also