- Introduced a new example script `chat_tui.py` that provides a full-screen Textual interface for interacting with FastGPT. - Implemented streaming chat updates, workflow logging, and modal handling for interactive nodes. - Enhanced FastGPT client with new streaming capabilities and structured event types for better interaction handling. - Normalized base URL handling in the client to prevent duplicate `/api` paths. - Added tests for streaming event parsing and interaction handling.
181 lines
6.2 KiB
Python
181 lines
6.2 KiB
Python
"""Helpers for parsing FastGPT SSE responses."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from typing import Any, AsyncIterator, Dict, Iterator, Optional
|
|
|
|
from .exceptions import StreamParseError
|
|
from .stream_types import FastGPTInteractiveEvent, FastGPTStreamEvent
|
|
|
|
|
|
def _infer_interaction_type(payload: Dict[str, Any]) -> str:
|
|
interaction_type = str(payload.get("type") or "").strip()
|
|
if interaction_type in {"userSelect", "userInput"}:
|
|
return interaction_type
|
|
|
|
params = payload.get("params") if isinstance(payload.get("params"), dict) else {}
|
|
if isinstance(params.get("inputForm"), list):
|
|
return "userInput"
|
|
if isinstance(params.get("userSelectOptions"), list):
|
|
return "userSelect"
|
|
return "userSelect"
|
|
|
|
|
|
def _normalize_interactive_payload(payload: Dict[str, Any]) -> Dict[str, Any]:
|
|
normalized = payload
|
|
wrapped = normalized.get("interactive")
|
|
if isinstance(wrapped, dict):
|
|
normalized = wrapped
|
|
|
|
interaction_type = str(normalized.get("type") or "").strip()
|
|
if interaction_type == "toolChildrenInteractive":
|
|
params = normalized.get("params") if isinstance(normalized.get("params"), dict) else {}
|
|
children_response = params.get("childrenResponse")
|
|
if isinstance(children_response, dict):
|
|
normalized = children_response
|
|
|
|
return normalized
|
|
|
|
|
|
def _coerce_line(line: Any) -> str:
|
|
if isinstance(line, bytes):
|
|
return line.decode("utf-8")
|
|
return str(line)
|
|
|
|
|
|
def _build_event(event_name: str, raw_data: str) -> FastGPTStreamEvent:
|
|
normalized_event = event_name or "data"
|
|
if raw_data == "[DONE]":
|
|
return FastGPTStreamEvent(kind="done", raw_event=normalized_event, raw_data=raw_data)
|
|
|
|
try:
|
|
payload = json.loads(raw_data) if raw_data else {}
|
|
except json.JSONDecodeError as exc:
|
|
raise StreamParseError(
|
|
f"Failed to parse SSE payload for event '{normalized_event}': {exc}"
|
|
) from exc
|
|
|
|
if normalized_event == "interactive":
|
|
if not isinstance(payload, dict):
|
|
raise StreamParseError(
|
|
f"Expected JSON object payload for event '{normalized_event}', got {type(payload).__name__}"
|
|
)
|
|
normalized_payload = _normalize_interactive_payload(payload)
|
|
if not isinstance(normalized_payload, dict):
|
|
raise StreamParseError(
|
|
f"Expected JSON object payload for event '{normalized_event}', got {type(normalized_payload).__name__}"
|
|
)
|
|
interaction_type = _infer_interaction_type(normalized_payload)
|
|
return FastGPTInteractiveEvent(
|
|
kind="interactive",
|
|
data=normalized_payload,
|
|
raw_event=normalized_event,
|
|
raw_data=raw_data,
|
|
interaction_type=interaction_type,
|
|
)
|
|
|
|
allowed_kinds = {
|
|
"data",
|
|
"answer",
|
|
"fastAnswer",
|
|
"flowNodeStatus",
|
|
"flowResponses",
|
|
"toolCall",
|
|
"toolParams",
|
|
"toolResponse",
|
|
"updateVariables",
|
|
"error",
|
|
"done",
|
|
}
|
|
kind = normalized_event if normalized_event in allowed_kinds else "data"
|
|
return FastGPTStreamEvent(kind=kind, data=payload, raw_event=normalized_event, raw_data=raw_data)
|
|
|
|
|
|
def _parse_sse_lines(lines: Iterator[Any]) -> Iterator[FastGPTStreamEvent]:
|
|
current_event: Optional[str] = None
|
|
data_lines: list[str] = []
|
|
|
|
def flush() -> Optional[FastGPTStreamEvent]:
|
|
nonlocal current_event, data_lines
|
|
if not current_event and not data_lines:
|
|
return None
|
|
if not data_lines:
|
|
raise StreamParseError(f"Missing data payload for SSE event '{current_event or 'data'}'")
|
|
raw_data = "\n".join(data_lines)
|
|
event = _build_event(current_event or "data", raw_data)
|
|
current_event = None
|
|
data_lines = []
|
|
return event
|
|
|
|
for raw_line in lines:
|
|
line = _coerce_line(raw_line).rstrip("\r\n")
|
|
if not line:
|
|
event = flush()
|
|
if event is not None:
|
|
yield event
|
|
continue
|
|
if line.startswith(":"):
|
|
continue
|
|
if line.startswith("event:"):
|
|
if current_event is not None or data_lines:
|
|
raise StreamParseError("Encountered new SSE event before previous event payload was terminated")
|
|
current_event = line[6:].strip() or "data"
|
|
continue
|
|
if line.startswith("data:"):
|
|
data_lines.append(line[5:].strip())
|
|
continue
|
|
raise StreamParseError(f"Unsupported SSE line: {line}")
|
|
|
|
event = flush()
|
|
if event is not None:
|
|
yield event
|
|
|
|
|
|
def iter_stream_events(response) -> Iterator[FastGPTStreamEvent]:
|
|
"""Yield typed SSE events from a synchronous response stream."""
|
|
|
|
yield from _parse_sse_lines(iter(response.iter_lines()))
|
|
|
|
|
|
async def aiter_stream_events(response) -> AsyncIterator[FastGPTStreamEvent]:
|
|
"""Yield typed SSE events from an asynchronous response stream."""
|
|
|
|
current_event: Optional[str] = None
|
|
data_lines: list[str] = []
|
|
|
|
async def flush() -> Optional[FastGPTStreamEvent]:
|
|
nonlocal current_event, data_lines
|
|
if not current_event and not data_lines:
|
|
return None
|
|
if not data_lines:
|
|
raise StreamParseError(f"Missing data payload for SSE event '{current_event or 'data'}'")
|
|
raw_data = "\n".join(data_lines)
|
|
event = _build_event(current_event or "data", raw_data)
|
|
current_event = None
|
|
data_lines = []
|
|
return event
|
|
|
|
async for raw_line in response.aiter_lines():
|
|
line = _coerce_line(raw_line).rstrip("\r\n")
|
|
if not line:
|
|
event = await flush()
|
|
if event is not None:
|
|
yield event
|
|
continue
|
|
if line.startswith(":"):
|
|
continue
|
|
if line.startswith("event:"):
|
|
if current_event is not None or data_lines:
|
|
raise StreamParseError("Encountered new SSE event before previous event payload was terminated")
|
|
current_event = line[6:].strip() or "data"
|
|
continue
|
|
if line.startswith("data:"):
|
|
data_lines.append(line[5:].strip())
|
|
continue
|
|
raise StreamParseError(f"Unsupported SSE line: {line}")
|
|
|
|
event = await flush()
|
|
if event is not None:
|
|
yield event
|