"""Interactive chat CLI with FastGPT streaming events. Run from the examples directory with .env configured: python chat_cli.py python chat_cli.py --chat-id my_existing_conversation This example supports: - streaming text output - workflow/tool event logs - interactive FastGPT nodes (`userSelect` and `userInput`) Type your message and press Enter. Type /quit to exit. During an interactive prompt, type /cancel to stop that prompt locally. """ from __future__ import annotations import argparse import json import os import sys import uuid from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional from dotenv import load_dotenv from fastgpt_client import ChatClient, FastGPTInteractiveEvent, iter_stream_events load_dotenv(Path(__file__).with_name(".env")) API_KEY = os.getenv("API_KEY") BASE_URL = os.getenv("BASE_URL") for stream in (sys.stdout, sys.stderr): if hasattr(stream, "reconfigure"): try: stream.reconfigure(encoding="utf-8", errors="replace") except Exception: pass @dataclass class StreamTurnResult: text: str interactive: Optional[FastGPTInteractiveEvent] = None def _write_text(text: str) -> None: try: sys.stdout.write(text) except UnicodeEncodeError: encoding = getattr(sys.stdout, "encoding", None) or "utf-8" safe_text = text.encode(encoding, errors="replace").decode(encoding, errors="replace") sys.stdout.write(safe_text) sys.stdout.flush() def _extract_text_from_event(kind: str, payload: Any) -> str: if not isinstance(payload, dict): return "" if kind in {"answer", "fastAnswer"}: text = payload.get("text") if isinstance(text, str) and text: return text choices = payload.get("choices") if isinstance(payload.get("choices"), list) else [] if not choices: return str(payload.get("text") or "") first_choice = choices[0] if isinstance(choices[0], dict) else {} delta = first_choice.get("delta") if isinstance(first_choice.get("delta"), dict) else {} content = delta.get("content") if isinstance(content, str) and content: return content message = first_choice.get("message") if isinstance(first_choice.get("message"), dict) else {} message_content = message.get("content") if isinstance(message_content, str) and message_content: return message_content return "" def _nested_tool_payload(payload: Any) -> Dict[str, Any]: if not isinstance(payload, dict): return {} nested = payload.get("tool") if isinstance(nested, dict): return nested return {} def _tool_name_from_event(payload: Any) -> str: if not isinstance(payload, dict): return "?" nested = _nested_tool_payload(payload) return str(payload.get("toolName") or nested.get("toolName") or payload.get("functionName") or nested.get("functionName") or "?") def _normalize_option(raw_option: Any, index: int) -> Optional[Dict[str, str]]: if isinstance(raw_option, str): value = raw_option.strip() if not value: return None return {"id": f"option_{index}", "label": value, "value": value, "description": ""} if not isinstance(raw_option, dict): return None label = str(raw_option.get("label") or raw_option.get("value") or raw_option.get("id") or "").strip() value = str(raw_option.get("value") or raw_option.get("label") or raw_option.get("id") or "").strip() option_id = str(raw_option.get("id") or value or f"option_{index}").strip() if not label and not value: return None return { "id": option_id or f"option_{index}", "label": label or value, "value": value or label, "description": str(raw_option.get("description") or "").strip(), } def _resolve_option_token(token: str, options: List[Dict[str, str]]) -> Optional[str]: normalized = token.strip() if not normalized: return None if normalized.isdigit(): index = int(normalized) - 1 if 0 <= index < len(options): return options[index]["value"] lowered = normalized.lower() for option in options: if lowered in { option["id"].lower(), option["label"].lower(), option["value"].lower(), }: return option["value"] return None def _interactive_prompt_text(payload: Dict[str, Any], default_text: str) -> str: params = payload.get("params") if isinstance(payload.get("params"), dict) else {} return str( payload.get("prompt") or payload.get("title") or payload.get("text") or payload.get("description") or params.get("description") or default_text ).strip() def _prompt_user_select(event: FastGPTInteractiveEvent) -> Optional[str]: payload = event.data params = payload.get("params") if isinstance(payload.get("params"), dict) else {} prompt_text = _interactive_prompt_text(payload, "Please select an option") multiple = bool(params.get("multiple") or payload.get("multiple")) raw_options = params.get("userSelectOptions") if isinstance(params.get("userSelectOptions"), list) else [] options = [item for index, raw in enumerate(raw_options, start=1) if (item := _normalize_option(raw, index))] print() print(f"[INTERACTIVE] {prompt_text}") for index, option in enumerate(options, start=1): print(f" {index}. {option['label']}") if option["description"]: print(f" {option['description']}") if not options: print("No selectable options were provided by FastGPT.") return None while True: hint = "comma-separated indexes/values" if multiple else "an index or value" raw_input_value = input(f"Select {hint} (/cancel to stop): ").strip() if not raw_input_value: print("Selection is required.") continue if raw_input_value.lower() == "/cancel": return None tokens = [part.strip() for part in raw_input_value.split(",")] if multiple else [raw_input_value] selected_values: List[str] = [] invalid_tokens: List[str] = [] for token in tokens: resolved = _resolve_option_token(token, options) if resolved is None: invalid_tokens.append(token) continue selected_values.append(resolved) if invalid_tokens: print(f"Invalid option(s): {', '.join(invalid_tokens)}") continue if not selected_values: print("Selection is required.") continue if not multiple: return selected_values[0] return ", ".join(selected_values) def _prompt_user_input(event: FastGPTInteractiveEvent) -> Optional[str]: payload = event.data params = payload.get("params") if isinstance(payload.get("params"), dict) else {} prompt_text = _interactive_prompt_text(payload, "Please provide the requested input") form_fields = params.get("inputForm") if isinstance(params.get("inputForm"), list) else [] print() print(f"[INTERACTIVE] {prompt_text}") if not form_fields: value = input("Input (/cancel to stop): ").strip() if value.lower() == "/cancel": return None return value values: Dict[str, Any] = {} for index, field in enumerate(form_fields, start=1): if not isinstance(field, dict): continue name = str(field.get("key") or field.get("name") or f"field_{index}").strip() or f"field_{index}" label = str(field.get("label") or field.get("name") or name).strip() or name placeholder = str(field.get("placeholder") or "").strip() default_value = field.get("defaultValue", field.get("default")) required = bool(field.get("required")) prompt_label = label if placeholder: prompt_label = f"{prompt_label} ({placeholder})" if default_value not in (None, ""): prompt_label = f"{prompt_label} [{default_value}]" if not required: prompt_label = f"{prompt_label} [optional]" prompt_label = f"{prompt_label}: " while True: raw_input_value = input(prompt_label).strip() if raw_input_value.lower() == "/cancel": return None if not raw_input_value and default_value not in (None, ""): raw_input_value = str(default_value) if raw_input_value or not required: values[name] = raw_input_value break print("This field is required.") return json.dumps(values, ensure_ascii=False) def prompt_interactive(event: FastGPTInteractiveEvent) -> Optional[str]: if event.interaction_type == "userInput": return _prompt_user_input(event) return _prompt_user_select(event) def stream_reply(client: ChatClient, messages: List[Dict[str, Any]], chat_id: str) -> StreamTurnResult: """Stream a single FastGPT request and stop when interactive input is required.""" response = client.create_chat_completion( messages=messages, stream=True, detail=True, chatId=chat_id, ) response.raise_for_status() full_content: List[str] = [] interactive_event: Optional[FastGPTInteractiveEvent] = None printed_text = False try: for event in iter_stream_events(response): if event.kind in {"data", "answer", "fastAnswer"}: content = _extract_text_from_event(event.kind, event.data) if content: printed_text = True full_content.append(content) _write_text(content) continue if event.kind == "flowNodeStatus": if printed_text: print() printed_text = False if isinstance(event.data, dict): status = str(event.data.get("status") or "?") node_name = str(event.data.get("nodeName") or event.data.get("name") or event.data.get("node_id") or "Unknown node") print(f"[FLOW] {status}: {node_name}") else: print(f"[FLOW] {event.data}") continue if event.kind == "flowResponses": if printed_text: print() printed_text = False if isinstance(event.data, dict): module_name = str(event.data.get("moduleName") or event.data.get("nodeName") or "Unknown module") print(f"[FLOW] response from: {module_name}") elif isinstance(event.data, list): print(f"[FLOW] response details: {len(event.data)} module record(s)") else: print(f"[FLOW] response details: {event.data}") continue if event.kind == "toolCall": if printed_text: print() printed_text = False tool_name = _tool_name_from_event(event.data) print(f"[TOOL] Calling: {tool_name}") continue if event.kind == "toolParams": if printed_text: print() printed_text = False print(f"[TOOL] Params: {event.data}") continue if event.kind == "toolResponse": if printed_text: print() printed_text = False print(f"[TOOL] Response: {event.data}") continue if event.kind == "updateVariables": if printed_text: print() printed_text = False if isinstance(event.data, dict): print(f"[VARS] Updated: {event.data.get('variables') or event.data}") else: print(f"[VARS] Updated: {event.data}") continue if event.kind == "interactive": if printed_text: print() interactive_event = event break if event.kind == "error": if printed_text: print() message = str(event.data.get("message") or event.data.get("error") or "Unknown FastGPT error") raise RuntimeError(message) if event.kind == "done": break finally: response.close() return StreamTurnResult(text="".join(full_content), interactive=interactive_event) def _parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="FastGPT interactive chat CLI") parser.add_argument( "--chat-id", dest="chat_id", help="Reuse an existing FastGPT chatId. Defaults to a random chat_tui_* value.", ) return parser.parse_args() def main() -> None: args = _parse_args() if not API_KEY or not BASE_URL: print("Set API_KEY and BASE_URL in .env (see .env.example)") sys.exit(1) chat_id = args.chat_id or f"chat_tui_{uuid.uuid4().hex[:12]}" print("FastGPT Chat (interactive streaming). Type /quit to exit.\n") print(f"Using chatId: {chat_id}\n") with ChatClient(api_key=API_KEY, base_url=BASE_URL) as client: while True: try: user_input = input("You: ").strip() except (EOFError, KeyboardInterrupt): print("\nBye.") break if not user_input: continue if user_input.lower() in {"/quit", "/exit", "/q"}: print("Bye.") break pending_messages = [{"role": "user", "content": user_input}] assistant_parts: List[str] = [] print("Assistant: ", end="", flush=True) while True: try: turn_result = stream_reply(client, pending_messages, chat_id) except Exception as exc: print(f"\nError: {exc}") break if turn_result.text: assistant_parts.append(turn_result.text) if turn_result.interactive is None: print() break follow_up = prompt_interactive(turn_result.interactive) if follow_up is None: print("[INTERACTIVE] Prompt cancelled locally.") break pending_messages = [{"role": "user", "content": follow_up}] print("Assistant (resume): ", end="", flush=True) if assistant_parts: print("-" * 40) if __name__ == "__main__": main()