Files
fastgpt-python-sdk/examples/chat_cli.py

513 lines
17 KiB
Python

"""Interactive chat CLI with FastGPT streaming events.
Run from the examples directory with .env configured:
python chat_cli.py
python chat_cli.py --chat-id my_existing_conversation
This example supports:
- streaming text output
- workflow/tool event logs
- interactive FastGPT nodes (`userSelect` and `userInput`)
Type your message and press Enter. Type /quit to exit.
During an interactive prompt, type /cancel to stop that prompt locally.
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional
from dotenv import load_dotenv
from fastgpt_client import ChatClient, FastGPTInteractiveEvent, iter_stream_events
load_dotenv(Path(__file__).with_name(".env"))
API_KEY = os.getenv("API_KEY")
BASE_URL = os.getenv("BASE_URL")
APP_ID = os.getenv("APP_ID")
for stream in (sys.stdout, sys.stderr):
if hasattr(stream, "reconfigure"):
try:
stream.reconfigure(encoding="utf-8", errors="replace")
except Exception:
pass
@dataclass
class StreamTurnResult:
text: str
interactive: Optional[FastGPTInteractiveEvent] = None
def _write_text(text: str) -> None:
try:
sys.stdout.write(text)
except UnicodeEncodeError:
encoding = getattr(sys.stdout, "encoding", None) or "utf-8"
safe_text = text.encode(encoding, errors="replace").decode(encoding, errors="replace")
sys.stdout.write(safe_text)
sys.stdout.flush()
def _extract_text_from_event(kind: str, payload: Any) -> str:
if not isinstance(payload, dict):
return ""
if kind in {"answer", "fastAnswer"}:
text = payload.get("text")
if isinstance(text, str) and text:
return text
choices = payload.get("choices") if isinstance(payload.get("choices"), list) else []
if not choices:
return str(payload.get("text") or "")
first_choice = choices[0] if isinstance(choices[0], dict) else {}
delta = first_choice.get("delta") if isinstance(first_choice.get("delta"), dict) else {}
content = delta.get("content")
if isinstance(content, str) and content:
return content
message = first_choice.get("message") if isinstance(first_choice.get("message"), dict) else {}
message_content = message.get("content")
if isinstance(message_content, str) and message_content:
return message_content
return ""
def _nested_tool_payload(payload: Any) -> Dict[str, Any]:
if not isinstance(payload, dict):
return {}
nested = payload.get("tool")
if isinstance(nested, dict):
return nested
return {}
def _tool_name_from_event(payload: Any) -> str:
if not isinstance(payload, dict):
return "?"
nested = _nested_tool_payload(payload)
return str(payload.get("toolName") or nested.get("toolName") or payload.get("functionName") or nested.get("functionName") or "?")
def _normalize_option(raw_option: Any, index: int) -> Optional[Dict[str, str]]:
if isinstance(raw_option, str):
value = raw_option.strip()
if not value:
return None
return {"id": f"option_{index}", "label": value, "value": value, "description": ""}
if not isinstance(raw_option, dict):
return None
label = str(raw_option.get("label") or raw_option.get("value") or raw_option.get("id") or "").strip()
value = str(raw_option.get("value") or raw_option.get("label") or raw_option.get("id") or "").strip()
option_id = str(raw_option.get("id") or value or f"option_{index}").strip()
if not label and not value:
return None
return {
"id": option_id or f"option_{index}",
"label": label or value,
"value": value or label,
"description": str(raw_option.get("description") or "").strip(),
}
def _resolve_option_token(token: str, options: List[Dict[str, str]]) -> Optional[str]:
normalized = token.strip()
if not normalized:
return None
if normalized.isdigit():
index = int(normalized) - 1
if 0 <= index < len(options):
return options[index]["value"]
lowered = normalized.lower()
for option in options:
if lowered in {
option["id"].lower(),
option["label"].lower(),
option["value"].lower(),
}:
return option["value"]
return None
def _coerce_text(value: Any) -> str:
return str(value or "").strip()
def _first_nonempty_text(*values: Any) -> str:
for value in values:
text = _coerce_text(value)
if text:
return text
return ""
def _merge_prompt_parts(*values: Any) -> str:
parts: List[str] = []
seen = set()
for value in values:
text = _coerce_text(value)
if not text or text in seen:
continue
seen.add(text)
parts.append(text)
return "\n".join(parts)
def _interactive_prompt_text(payload: Dict[str, Any], default_text: str) -> str:
params = payload.get("params") if isinstance(payload.get("params"), dict) else {}
opener = _first_nonempty_text(
payload.get("opener"),
params.get("opener"),
payload.get("intro"),
params.get("intro"),
)
prompt = _first_nonempty_text(
payload.get("prompt"),
params.get("prompt"),
payload.get("text"),
params.get("text"),
)
title = _first_nonempty_text(
payload.get("title"),
params.get("title"),
payload.get("nodeName"),
payload.get("label"),
)
description = _first_nonempty_text(
payload.get("description"),
payload.get("desc"),
params.get("description"),
params.get("desc"),
)
return _merge_prompt_parts(opener, prompt) or title or description or default_text
def _extract_chat_init_opener(payload: Any) -> str:
if not isinstance(payload, dict):
return ""
data = payload.get("data") if isinstance(payload.get("data"), dict) else payload
app = data.get("app") if isinstance(data.get("app"), dict) else {}
chat_config = app.get("chatConfig") if isinstance(app.get("chatConfig"), dict) else {}
return _first_nonempty_text(
chat_config.get("welcomeText"),
app.get("welcomeText"),
data.get("welcomeText"),
data.get("opener"),
app.get("opener"),
data.get("intro"),
app.get("intro"),
)
def _get_initial_app_opener(client: ChatClient, chat_id: str) -> str:
if not APP_ID:
return ""
response = client.get_chat_init(appId=APP_ID, chatId=chat_id)
response.raise_for_status()
return _extract_chat_init_opener(response.json())
def _prompt_user_select(event: FastGPTInteractiveEvent) -> Optional[str]:
payload = event.data
params = payload.get("params") if isinstance(payload.get("params"), dict) else {}
prompt_text = _interactive_prompt_text(payload, "Please select an option")
multiple = bool(params.get("multiple") or payload.get("multiple"))
raw_options = params.get("userSelectOptions") if isinstance(params.get("userSelectOptions"), list) else []
options = [item for index, raw in enumerate(raw_options, start=1) if (item := _normalize_option(raw, index))]
print()
print("[INTERACTIVE]")
print(prompt_text)
for index, option in enumerate(options, start=1):
print(f" {index}. {option['label']}")
if option["description"]:
print(f" {option['description']}")
if not options:
print("No selectable options were provided by FastGPT.")
return None
while True:
hint = "comma-separated indexes/values" if multiple else "an index or value"
raw_input_value = input(f"Select {hint} (/cancel to stop): ").strip()
if not raw_input_value:
print("Selection is required.")
continue
if raw_input_value.lower() == "/cancel":
return None
tokens = [part.strip() for part in raw_input_value.split(",")] if multiple else [raw_input_value]
selected_values: List[str] = []
invalid_tokens: List[str] = []
for token in tokens:
resolved = _resolve_option_token(token, options)
if resolved is None:
invalid_tokens.append(token)
continue
selected_values.append(resolved)
if invalid_tokens:
print(f"Invalid option(s): {', '.join(invalid_tokens)}")
continue
if not selected_values:
print("Selection is required.")
continue
if not multiple:
return selected_values[0]
return ", ".join(selected_values)
def _prompt_user_input(event: FastGPTInteractiveEvent) -> Optional[str]:
payload = event.data
params = payload.get("params") if isinstance(payload.get("params"), dict) else {}
prompt_text = _interactive_prompt_text(payload, "Please provide the requested input")
form_fields = params.get("inputForm") if isinstance(params.get("inputForm"), list) else []
print()
print("[INTERACTIVE]")
print(prompt_text)
if not form_fields:
value = input("Input (/cancel to stop): ").strip()
if value.lower() == "/cancel":
return None
return value
values: Dict[str, Any] = {}
for index, field in enumerate(form_fields, start=1):
if not isinstance(field, dict):
continue
name = str(field.get("key") or field.get("name") or f"field_{index}").strip() or f"field_{index}"
label = str(field.get("label") or field.get("name") or name).strip() or name
placeholder = str(field.get("placeholder") or "").strip()
default_value = field.get("defaultValue", field.get("default"))
required = bool(field.get("required"))
prompt_label = label
if placeholder:
prompt_label = f"{prompt_label} ({placeholder})"
if default_value not in (None, ""):
prompt_label = f"{prompt_label} [{default_value}]"
if not required:
prompt_label = f"{prompt_label} [optional]"
prompt_label = f"{prompt_label}: "
while True:
raw_input_value = input(prompt_label).strip()
if raw_input_value.lower() == "/cancel":
return None
if not raw_input_value and default_value not in (None, ""):
raw_input_value = str(default_value)
if raw_input_value or not required:
values[name] = raw_input_value
break
print("This field is required.")
return json.dumps(values, ensure_ascii=False)
def prompt_interactive(event: FastGPTInteractiveEvent) -> Optional[str]:
if event.interaction_type == "userInput":
return _prompt_user_input(event)
return _prompt_user_select(event)
def stream_reply(client: ChatClient, messages: List[Dict[str, Any]], chat_id: str) -> StreamTurnResult:
"""Stream a single FastGPT request and stop when interactive input is required."""
response = client.create_chat_completion(
messages=messages,
stream=True,
detail=True,
chatId=chat_id,
)
response.raise_for_status()
full_content: List[str] = []
interactive_event: Optional[FastGPTInteractiveEvent] = None
printed_text = False
try:
for event in iter_stream_events(response):
if event.kind in {"data", "answer", "fastAnswer"}:
content = _extract_text_from_event(event.kind, event.data)
if content:
printed_text = True
full_content.append(content)
_write_text(content)
continue
if event.kind == "flowNodeStatus":
if printed_text:
print()
printed_text = False
if isinstance(event.data, dict):
status = str(event.data.get("status") or "?")
node_name = str(event.data.get("nodeName") or event.data.get("name") or event.data.get("node_id") or "Unknown node")
print(f"[FLOW] {status}: {node_name}")
else:
print(f"[FLOW] {event.data}")
continue
if event.kind == "flowResponses":
if printed_text:
print()
printed_text = False
if isinstance(event.data, dict):
module_name = str(event.data.get("moduleName") or event.data.get("nodeName") or "Unknown module")
print(f"[FLOW] response from: {module_name}")
elif isinstance(event.data, list):
print(f"[FLOW] response details: {len(event.data)} module record(s)")
else:
print(f"[FLOW] response details: {event.data}")
continue
if event.kind == "toolCall":
if printed_text:
print()
printed_text = False
tool_name = _tool_name_from_event(event.data)
print(f"[TOOL] Calling: {tool_name}")
continue
if event.kind == "toolParams":
if printed_text:
print()
printed_text = False
print(f"[TOOL] Params: {event.data}")
continue
if event.kind == "toolResponse":
if printed_text:
print()
printed_text = False
print(f"[TOOL] Response: {event.data}")
continue
if event.kind == "updateVariables":
if printed_text:
print()
printed_text = False
if isinstance(event.data, dict):
print(f"[VARS] Updated: {event.data.get('variables') or event.data}")
else:
print(f"[VARS] Updated: {event.data}")
continue
if event.kind == "interactive":
if printed_text:
print()
interactive_event = event
break
if event.kind == "error":
if printed_text:
print()
message = str(event.data.get("message") or event.data.get("error") or "Unknown FastGPT error")
raise RuntimeError(message)
if event.kind == "done":
break
finally:
response.close()
return StreamTurnResult(text="".join(full_content), interactive=interactive_event)
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="FastGPT interactive chat CLI")
parser.add_argument(
"--chat-id",
dest="chat_id",
help="Reuse an existing FastGPT chatId. Defaults to a random chat_tui_* value.",
)
return parser.parse_args()
def main() -> None:
args = _parse_args()
if not API_KEY or not BASE_URL:
print("Set API_KEY and BASE_URL in .env (see .env.example)")
sys.exit(1)
chat_id = args.chat_id or f"chat_tui_{uuid.uuid4().hex[:12]}"
print("FastGPT Chat (interactive streaming). Type /quit to exit.\n")
print(f"Using chatId: {chat_id}\n")
with ChatClient(api_key=API_KEY, base_url=BASE_URL) as client:
try:
opener = _get_initial_app_opener(client, chat_id)
except Exception as exc:
print(f"[INIT] Failed to load app opener: {exc}\n")
opener = ""
if opener:
print(f"Assistant: {opener}\n")
while True:
try:
user_input = input("You: ").strip()
except (EOFError, KeyboardInterrupt):
print("\nBye.")
break
if not user_input:
continue
if user_input.lower() in {"/quit", "/exit", "/q"}:
print("Bye.")
break
pending_messages = [{"role": "user", "content": user_input}]
assistant_parts: List[str] = []
print("Assistant: ", end="", flush=True)
while True:
try:
turn_result = stream_reply(client, pending_messages, chat_id)
except Exception as exc:
print(f"\nError: {exc}")
break
if turn_result.text:
assistant_parts.append(turn_result.text)
if turn_result.interactive is None:
print()
break
follow_up = prompt_interactive(turn_result.interactive)
if follow_up is None:
print("[INTERACTIVE] Prompt cancelled locally.")
break
pending_messages = [{"role": "user", "content": follow_up}]
print("Assistant (resume): ", end="", flush=True)
if assistant_parts:
print("-" * 40)
if __name__ == "__main__":
main()