diff --git a/examples/.env.example b/examples/.env.example index 5de5c59..6676dc9 100644 --- a/examples/.env.example +++ b/examples/.env.example @@ -1,4 +1,3 @@ API_KEY="" BASE_URL="" -CHAT_ID="" -APP_ID="" \ No newline at end of file +APP_ID="" diff --git a/examples/basic_usage.py b/examples/basic_usage.py index da917ca..3c236cb 100644 --- a/examples/basic_usage.py +++ b/examples/basic_usage.py @@ -153,29 +153,29 @@ def delete_chat_item(): if __name__ == "__main__": - # print("=== Simple Chat ===") - # try: - # simple_chat() - # except Exception as e: - # print(f"Error: {e}") + print("=== Simple Chat ===") + try: + simple_chat() + except Exception as e: + print(f"Error: {e}") - # print("\n=== Streaming Chat ===") - # try: - # streaming_chat() - # except Exception as e: - # print(f"Error: {e}") + print("\n=== Streaming Chat ===") + try: + streaming_chat() + except Exception as e: + print(f"Error: {e}") - # print("\n=== Chat with Context ===") - # try: - # chat_with_context() - # except Exception as e: - # print(f"Error: {e}") + print("\n=== Chat with Context ===") + try: + chat_with_context() + except Exception as e: + print(f"Error: {e}") - # print("\n=== Get Histories ===") - # try: - # get_histories() - # except Exception as e: - # print(f"Error: {e}") + print("\n=== Get Histories ===") + try: + get_histories() + except Exception as e: + print(f"Error: {e}") print("\n=== Delete Chat Item ===") try: diff --git a/examples/chat_cli.py b/examples/chat_cli.py new file mode 100644 index 0000000..0074ce4 --- /dev/null +++ b/examples/chat_cli.py @@ -0,0 +1,431 @@ +"""Interactive chat CLI with FastGPT streaming events. + +Run from the examples directory with .env configured: + python chat_cli.py + python chat_cli.py --chat-id my_existing_conversation + +This example supports: +- streaming text output +- workflow/tool event logs +- interactive FastGPT nodes (`userSelect` and `userInput`) + +Type your message and press Enter. Type /quit to exit. +During an interactive prompt, type /cancel to stop that prompt locally. +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +import uuid +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Dict, List, Optional + +from dotenv import load_dotenv + +from fastgpt_client import ChatClient, FastGPTInteractiveEvent, iter_stream_events + +load_dotenv(Path(__file__).with_name(".env")) + +API_KEY = os.getenv("API_KEY") +BASE_URL = os.getenv("BASE_URL") + +for stream in (sys.stdout, sys.stderr): + if hasattr(stream, "reconfigure"): + try: + stream.reconfigure(encoding="utf-8", errors="replace") + except Exception: + pass + + +@dataclass +class StreamTurnResult: + text: str + interactive: Optional[FastGPTInteractiveEvent] = None + + +def _write_text(text: str) -> None: + try: + sys.stdout.write(text) + except UnicodeEncodeError: + encoding = getattr(sys.stdout, "encoding", None) or "utf-8" + safe_text = text.encode(encoding, errors="replace").decode(encoding, errors="replace") + sys.stdout.write(safe_text) + sys.stdout.flush() + + +def _extract_text_from_event(kind: str, payload: Any) -> str: + if not isinstance(payload, dict): + return "" + + if kind in {"answer", "fastAnswer"}: + text = payload.get("text") + if isinstance(text, str) and text: + return text + + choices = payload.get("choices") if isinstance(payload.get("choices"), list) else [] + if not choices: + return str(payload.get("text") or "") + + first_choice = choices[0] if isinstance(choices[0], dict) else {} + delta = first_choice.get("delta") if isinstance(first_choice.get("delta"), dict) else {} + content = delta.get("content") + if isinstance(content, str) and content: + return content + + message = first_choice.get("message") if isinstance(first_choice.get("message"), dict) else {} + message_content = message.get("content") + if isinstance(message_content, str) and message_content: + return message_content + + return "" + + +def _nested_tool_payload(payload: Any) -> Dict[str, Any]: + if not isinstance(payload, dict): + return {} + nested = payload.get("tool") + if isinstance(nested, dict): + return nested + return {} + + +def _tool_name_from_event(payload: Any) -> str: + if not isinstance(payload, dict): + return "?" + nested = _nested_tool_payload(payload) + return str(payload.get("toolName") or nested.get("toolName") or payload.get("functionName") or nested.get("functionName") or "?") + + +def _normalize_option(raw_option: Any, index: int) -> Optional[Dict[str, str]]: + if isinstance(raw_option, str): + value = raw_option.strip() + if not value: + return None + return {"id": f"option_{index}", "label": value, "value": value, "description": ""} + + if not isinstance(raw_option, dict): + return None + + label = str(raw_option.get("label") or raw_option.get("value") or raw_option.get("id") or "").strip() + value = str(raw_option.get("value") or raw_option.get("label") or raw_option.get("id") or "").strip() + option_id = str(raw_option.get("id") or value or f"option_{index}").strip() + if not label and not value: + return None + return { + "id": option_id or f"option_{index}", + "label": label or value, + "value": value or label, + "description": str(raw_option.get("description") or "").strip(), + } + + +def _resolve_option_token(token: str, options: List[Dict[str, str]]) -> Optional[str]: + normalized = token.strip() + if not normalized: + return None + + if normalized.isdigit(): + index = int(normalized) - 1 + if 0 <= index < len(options): + return options[index]["value"] + + lowered = normalized.lower() + for option in options: + if lowered in { + option["id"].lower(), + option["label"].lower(), + option["value"].lower(), + }: + return option["value"] + return None + + +def _interactive_prompt_text(payload: Dict[str, Any], default_text: str) -> str: + params = payload.get("params") if isinstance(payload.get("params"), dict) else {} + return str( + payload.get("prompt") + or payload.get("title") + or payload.get("text") + or payload.get("description") + or params.get("description") + or default_text + ).strip() + + +def _prompt_user_select(event: FastGPTInteractiveEvent) -> Optional[str]: + payload = event.data + params = payload.get("params") if isinstance(payload.get("params"), dict) else {} + prompt_text = _interactive_prompt_text(payload, "Please select an option") + multiple = bool(params.get("multiple") or payload.get("multiple")) + raw_options = params.get("userSelectOptions") if isinstance(params.get("userSelectOptions"), list) else [] + options = [item for index, raw in enumerate(raw_options, start=1) if (item := _normalize_option(raw, index))] + + print() + print(f"[INTERACTIVE] {prompt_text}") + for index, option in enumerate(options, start=1): + print(f" {index}. {option['label']}") + if option["description"]: + print(f" {option['description']}") + + if not options: + print("No selectable options were provided by FastGPT.") + return None + + while True: + hint = "comma-separated indexes/values" if multiple else "an index or value" + raw_input_value = input(f"Select {hint} (/cancel to stop): ").strip() + if not raw_input_value: + print("Selection is required.") + continue + if raw_input_value.lower() == "/cancel": + return None + + tokens = [part.strip() for part in raw_input_value.split(",")] if multiple else [raw_input_value] + selected_values: List[str] = [] + invalid_tokens: List[str] = [] + for token in tokens: + resolved = _resolve_option_token(token, options) + if resolved is None: + invalid_tokens.append(token) + continue + selected_values.append(resolved) + + if invalid_tokens: + print(f"Invalid option(s): {', '.join(invalid_tokens)}") + continue + if not selected_values: + print("Selection is required.") + continue + if not multiple: + return selected_values[0] + return ", ".join(selected_values) + + +def _prompt_user_input(event: FastGPTInteractiveEvent) -> Optional[str]: + payload = event.data + params = payload.get("params") if isinstance(payload.get("params"), dict) else {} + prompt_text = _interactive_prompt_text(payload, "Please provide the requested input") + form_fields = params.get("inputForm") if isinstance(params.get("inputForm"), list) else [] + + print() + print(f"[INTERACTIVE] {prompt_text}") + + if not form_fields: + value = input("Input (/cancel to stop): ").strip() + if value.lower() == "/cancel": + return None + return value + + values: Dict[str, Any] = {} + for index, field in enumerate(form_fields, start=1): + if not isinstance(field, dict): + continue + + name = str(field.get("key") or field.get("name") or f"field_{index}").strip() or f"field_{index}" + label = str(field.get("label") or field.get("name") or name).strip() or name + placeholder = str(field.get("placeholder") or "").strip() + default_value = field.get("defaultValue", field.get("default")) + required = bool(field.get("required")) + + prompt_label = label + if placeholder: + prompt_label = f"{prompt_label} ({placeholder})" + if default_value not in (None, ""): + prompt_label = f"{prompt_label} [{default_value}]" + if not required: + prompt_label = f"{prompt_label} [optional]" + prompt_label = f"{prompt_label}: " + + while True: + raw_input_value = input(prompt_label).strip() + if raw_input_value.lower() == "/cancel": + return None + if not raw_input_value and default_value not in (None, ""): + raw_input_value = str(default_value) + if raw_input_value or not required: + values[name] = raw_input_value + break + print("This field is required.") + + return json.dumps(values, ensure_ascii=False) + + +def prompt_interactive(event: FastGPTInteractiveEvent) -> Optional[str]: + if event.interaction_type == "userInput": + return _prompt_user_input(event) + return _prompt_user_select(event) + + +def stream_reply(client: ChatClient, messages: List[Dict[str, Any]], chat_id: str) -> StreamTurnResult: + """Stream a single FastGPT request and stop when interactive input is required.""" + + response = client.create_chat_completion( + messages=messages, + stream=True, + detail=True, + chatId=chat_id, + ) + response.raise_for_status() + + full_content: List[str] = [] + interactive_event: Optional[FastGPTInteractiveEvent] = None + printed_text = False + + try: + for event in iter_stream_events(response): + if event.kind in {"data", "answer", "fastAnswer"}: + content = _extract_text_from_event(event.kind, event.data) + if content: + printed_text = True + full_content.append(content) + _write_text(content) + continue + + if event.kind == "flowNodeStatus": + if printed_text: + print() + printed_text = False + if isinstance(event.data, dict): + status = str(event.data.get("status") or "?") + node_name = str(event.data.get("nodeName") or event.data.get("name") or event.data.get("node_id") or "Unknown node") + print(f"[FLOW] {status}: {node_name}") + else: + print(f"[FLOW] {event.data}") + continue + + if event.kind == "flowResponses": + if printed_text: + print() + printed_text = False + if isinstance(event.data, dict): + module_name = str(event.data.get("moduleName") or event.data.get("nodeName") or "Unknown module") + print(f"[FLOW] response from: {module_name}") + elif isinstance(event.data, list): + print(f"[FLOW] response details: {len(event.data)} module record(s)") + else: + print(f"[FLOW] response details: {event.data}") + continue + + if event.kind == "toolCall": + if printed_text: + print() + printed_text = False + tool_name = _tool_name_from_event(event.data) + print(f"[TOOL] Calling: {tool_name}") + continue + + if event.kind == "toolParams": + if printed_text: + print() + printed_text = False + print(f"[TOOL] Params: {event.data}") + continue + + if event.kind == "toolResponse": + if printed_text: + print() + printed_text = False + print(f"[TOOL] Response: {event.data}") + continue + + if event.kind == "updateVariables": + if printed_text: + print() + printed_text = False + if isinstance(event.data, dict): + print(f"[VARS] Updated: {event.data.get('variables') or event.data}") + else: + print(f"[VARS] Updated: {event.data}") + continue + + if event.kind == "interactive": + if printed_text: + print() + interactive_event = event + break + + if event.kind == "error": + if printed_text: + print() + message = str(event.data.get("message") or event.data.get("error") or "Unknown FastGPT error") + raise RuntimeError(message) + + if event.kind == "done": + break + finally: + response.close() + + return StreamTurnResult(text="".join(full_content), interactive=interactive_event) + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="FastGPT interactive chat CLI") + parser.add_argument( + "--chat-id", + dest="chat_id", + help="Reuse an existing FastGPT chatId. Defaults to a random chat_tui_* value.", + ) + return parser.parse_args() + + +def main() -> None: + args = _parse_args() + + if not API_KEY or not BASE_URL: + print("Set API_KEY and BASE_URL in .env (see .env.example)") + sys.exit(1) + + chat_id = args.chat_id or f"chat_tui_{uuid.uuid4().hex[:12]}" + print("FastGPT Chat (interactive streaming). Type /quit to exit.\n") + print(f"Using chatId: {chat_id}\n") + + with ChatClient(api_key=API_KEY, base_url=BASE_URL) as client: + while True: + try: + user_input = input("You: ").strip() + except (EOFError, KeyboardInterrupt): + print("\nBye.") + break + + if not user_input: + continue + if user_input.lower() in {"/quit", "/exit", "/q"}: + print("Bye.") + break + + pending_messages = [{"role": "user", "content": user_input}] + assistant_parts: List[str] = [] + + print("Assistant: ", end="", flush=True) + while True: + try: + turn_result = stream_reply(client, pending_messages, chat_id) + except Exception as exc: + print(f"\nError: {exc}") + break + + if turn_result.text: + assistant_parts.append(turn_result.text) + + if turn_result.interactive is None: + print() + break + + follow_up = prompt_interactive(turn_result.interactive) + if follow_up is None: + print("[INTERACTIVE] Prompt cancelled locally.") + break + + pending_messages = [{"role": "user", "content": follow_up}] + print("Assistant (resume): ", end="", flush=True) + + if assistant_parts: + print("-" * 40) + + +if __name__ == "__main__": + main() diff --git a/examples/chat_tui.py b/examples/chat_tui.py new file mode 100644 index 0000000..6f2d9c2 --- /dev/null +++ b/examples/chat_tui.py @@ -0,0 +1,683 @@ +"""Textual FastGPT workbench styled like a coding assistant terminal. + +Run from the examples directory with .env configured: + python chat_tui.py + python chat_tui.py --chat-id my_existing_conversation + +This example provides: +- a full-screen Textual interface +- streaming chat updates +- workflow / tool event logging +- modal handling for FastGPT interactive nodes +""" + +from __future__ import annotations + +import argparse +import json +import sys +import uuid +from pathlib import Path +from typing import Any, Dict, List, Optional + +from textual import on, work +from textual.app import App, ComposeResult +from textual.binding import Binding +from textual.containers import Container, Horizontal, Vertical, VerticalScroll +from textual.screen import ModalScreen +from textual.widgets import Button, Checkbox, Footer, Input, RichLog, Static, TextArea + +EXAMPLES_DIR = Path(__file__).resolve().parent +if str(EXAMPLES_DIR) not in sys.path: + sys.path.insert(0, str(EXAMPLES_DIR)) + +from chat_cli import ( + API_KEY, + BASE_URL, + _extract_text_from_event, + _interactive_prompt_text, + _normalize_option, + _tool_name_from_event, +) +from fastgpt_client import ChatClient, FastGPTInteractiveEvent, iter_stream_events + + +class MessageCard(Static): + """Lightweight message block used in the transcript pane.""" + + def __init__(self, role: str, title: str, content: str, widget_id: str) -> None: + super().__init__(content or " ", id=widget_id, classes=f"message {role}") + self.role = role + self.content_text = content + self.border_title = title + + def set_text(self, content: str) -> None: + self.content_text = content + self.update(content or " ") + + def append_text(self, chunk: str) -> None: + if self.content_text in {"", "Thinking…"}: + self.content_text = chunk + else: + self.content_text += chunk + self.update(self.content_text or " ") + + +class InteractiveInputScreen(ModalScreen[Optional[str]]): + """Modal used for FastGPT userInput interactions.""" + + CSS = """ + InteractiveInputScreen { + align: center middle; + background: rgba(4, 7, 11, 0.82); + } + + .dialog { + width: 84; + max-width: 110; + background: #101620; + border: round #47c6b3; + padding: 1 2; + } + + .dialog-title { + color: #f5d76e; + text-style: bold; + margin-bottom: 1; + } + + .field-label { + color: #c6d2df; + margin-top: 1; + } + + .dialog-actions { + margin-top: 1; + height: auto; + } + + .dialog-actions Button { + width: 1fr; + margin-right: 1; + } + + .validation { + color: #ffb4a2; + margin-top: 1; + } + """ + + def __init__(self, event: FastGPTInteractiveEvent) -> None: + super().__init__() + self.event = event + self.prompt_text = _interactive_prompt_text(event.data, "Please provide the requested input") + raw_fields = event.data.get("params", {}).get("inputForm", []) + self.fields: List[Dict[str, Any]] = [] + for index, raw_field in enumerate(raw_fields, start=1): + if not isinstance(raw_field, dict): + continue + key = str(raw_field.get("key") or raw_field.get("name") or f"field_{index}").strip() or f"field_{index}" + label = str(raw_field.get("label") or raw_field.get("name") or key).strip() or key + placeholder = str(raw_field.get("placeholder") or raw_field.get("description") or "").strip() + default_value = raw_field.get("defaultValue", raw_field.get("default")) + required = bool(raw_field.get("required")) + self.fields.append( + { + "key": key, + "label": label, + "placeholder": placeholder, + "default": "" if default_value in (None, "") else str(default_value), + "required": required, + } + ) + + def compose(self) -> ComposeResult: + with Container(classes="dialog"): + yield Static(self.prompt_text, classes="dialog-title") + if not self.fields: + yield Static("FastGPT did not provide structured fields. Enter a single value below.", classes="field-label") + yield Input(placeholder="Workflow input", id="input-freeform") + else: + for index, field in enumerate(self.fields, start=1): + suffix = "" if field["required"] else " [optional]" + yield Static(f"{index}. {field['label']}{suffix}", classes="field-label") + yield Input( + value=field["default"], + placeholder=field["placeholder"], + id=f"input-{index}", + ) + yield Static("", id="validation", classes="validation") + with Horizontal(classes="dialog-actions"): + yield Button("Cancel", id="cancel") + yield Button("Submit", id="submit", variant="primary") + + def on_mount(self) -> None: + inputs = list(self.query(Input)) + if inputs: + inputs[0].focus() + else: + self.query_one("#submit", Button).focus() + + @on(Button.Pressed) + def handle_button(self, event: Button.Pressed) -> None: + if event.button.id == "cancel": + self.dismiss(None) + return + + if not self.fields: + value = self.query_one("#input-freeform", Input).value.strip() + self.dismiss(None if not value else value) + return + + payload: Dict[str, Any] = {} + for index, field in enumerate(self.fields, start=1): + value = self.query_one(f"#input-{index}", Input).value.strip() + if not value and field["required"]: + self.query_one("#validation", Static).update(f"{field['label']} is required.") + return + payload[field["key"]] = value + + self.dismiss(json.dumps(payload, ensure_ascii=False)) + + +class InteractiveSelectScreen(ModalScreen[Optional[str]]): + """Modal used for FastGPT userSelect interactions.""" + + CSS = """ + InteractiveSelectScreen { + align: center middle; + background: rgba(4, 7, 11, 0.82); + } + + .dialog { + width: 84; + max-width: 110; + background: #101620; + border: round #f5d76e; + padding: 1 2; + } + + .dialog-title { + color: #f5d76e; + text-style: bold; + margin-bottom: 1; + } + + .option-description { + color: #8fa1b3; + margin-left: 2; + margin-bottom: 1; + } + + .dialog-actions { + margin-top: 1; + height: auto; + } + + .dialog-actions Button { + width: 1fr; + margin-right: 1; + } + + .choice-button { + width: 1fr; + margin-top: 1; + } + + .validation { + color: #ffb4a2; + margin-top: 1; + } + """ + + def __init__(self, event: FastGPTInteractiveEvent) -> None: + super().__init__() + self.event = event + payload = event.data + params = payload.get("params") if isinstance(payload.get("params"), dict) else {} + self.prompt_text = _interactive_prompt_text(payload, "Please select an option") + self.multiple = bool(params.get("multiple") or payload.get("multiple")) + raw_options = params.get("userSelectOptions") if isinstance(params.get("userSelectOptions"), list) else [] + self.options = [ + item + for index, raw_option in enumerate(raw_options, start=1) + if (item := _normalize_option(raw_option, index)) + ] + + def compose(self) -> ComposeResult: + with Container(classes="dialog"): + yield Static(self.prompt_text, classes="dialog-title") + if not self.options: + yield Static("FastGPT did not provide selectable options.", classes="option-description") + elif self.multiple: + for index, option in enumerate(self.options, start=1): + label = f"{index}. {option['label']}" + if option["description"]: + label = f"{label} - {option['description']}" + yield Checkbox(label, id=f"check-{index}") + else: + for index, option in enumerate(self.options, start=1): + yield Button(f"{index}. {option['label']}", id=f"choice-{index}", classes="choice-button") + if option["description"]: + yield Static(option["description"], classes="option-description") + yield Static("", id="validation", classes="validation") + with Horizontal(classes="dialog-actions"): + yield Button("Cancel", id="cancel") + if self.multiple: + yield Button("Submit", id="submit", variant="primary") + + def on_mount(self) -> None: + buttons = [button for button in self.query(Button) if button.id and button.id != "cancel"] + if buttons: + buttons[0].focus() + else: + self.query_one("#cancel", Button).focus() + + @on(Button.Pressed) + def handle_button(self, event: Button.Pressed) -> None: + button_id = event.button.id or "" + if button_id == "cancel": + self.dismiss(None) + return + + if button_id.startswith("choice-"): + index = int(button_id.split("-", 1)[1]) - 1 + self.dismiss(self.options[index]["value"]) + return + + if button_id == "submit": + selected: List[str] = [] + for index, option in enumerate(self.options, start=1): + if self.query_one(f"#check-{index}", Checkbox).value: + selected.append(option["value"]) + if not selected: + self.query_one("#validation", Static).update("Select at least one option.") + return + self.dismiss(", ".join(selected)) + + +class FastGPTWorkbench(App[None]): + """Claude Code style Textual workbench for FastGPT streaming chat.""" + + CSS = """ + Screen { + background: #0b0f14; + color: #e6edf3; + } + + #shell { + layout: grid; + grid-size: 2; + grid-columns: 32 1fr; + height: 1fr; + } + + #sidebar { + padding: 1 1 1 2; + background: #101620; + border-right: heavy #273244; + } + + #brand { + color: #f5d76e; + text-style: bold; + margin-bottom: 1; + } + + .panel { + background: #0d131d; + border: round #2b3547; + padding: 1; + margin-bottom: 1; + } + + #event_log { + height: 1fr; + background: #0d131d; + border: round #2b3547; + } + + #main_panel { + layout: vertical; + } + + #chat_title { + padding: 1 2; + background: #101620; + border-bottom: heavy #273244; + color: #f0f3f7; + text-style: bold; + } + + #messages { + padding: 1 2; + height: 1fr; + } + + .message { + width: 1fr; + padding: 1 2; + margin-bottom: 1; + background: #111723; + border: round #2b3547; + } + + .user { + border-left: tall #f5d76e; + background: #16161d; + } + + .assistant { + border-left: tall #47c6b3; + background: #0f1821; + } + + .system { + border-left: tall #7aa2f7; + background: #101824; + } + + .workflow { + border-left: tall #ff9e64; + background: #1b1712; + } + + #composer_shell { + layout: vertical; + height: 12; + padding: 1 2; + background: #101620; + border-top: heavy #273244; + } + + #composer { + height: 1fr; + background: #0d131d; + color: #eef4ff; + border: round #2b3547; + } + + #composer_actions { + height: auto; + margin-top: 1; + } + + #composer_actions Button { + width: 16; + margin-left: 1; + } + + #composer_spacer { + width: 1fr; + } + + """ + + TITLE = "FastGPT Workbench" + SUB_TITLE = "Claude-style Textual TUI" + BINDINGS = [ + Binding("ctrl+j", "send_message", "Send"), + Binding("ctrl+n", "new_chat", "New Chat"), + Binding("ctrl+c", "quit", "Quit"), + ] + + def __init__(self, chat_id: Optional[str] = None) -> None: + super().__init__() + self.chat_id = chat_id or self._new_chat_id() + self._message_counter = 0 + self._busy = False + + def compose(self) -> ComposeResult: + with Container(id="shell"): + with Vertical(id="sidebar"): + yield Static("FastGPT Workbench", id="brand") + yield Static("", id="session_panel", classes="panel") + yield Static("", id="status_panel", classes="panel") + yield Static("Ctrl+J send\nCtrl+N new chat\nEsc closes modal prompts", classes="panel") + yield RichLog(id="event_log", wrap=True, highlight=False, markup=False) + with Vertical(id="main_panel"): + yield Static("Claude-style FastGPT Console", id="chat_title") + yield VerticalScroll(id="messages") + with Vertical(id="composer_shell"): + yield TextArea("", id="composer") + with Horizontal(id="composer_actions"): + yield Static("", id="composer_spacer") + yield Button("New Chat", id="new_chat") + yield Button("Send", id="send", variant="primary") + yield Footer() + + def on_mount(self) -> None: + if not API_KEY or not BASE_URL: + raise RuntimeError("Set API_KEY and BASE_URL in examples/.env before starting chat_tui.py") + self._refresh_sidebar() + self._set_status("Ready", "Fresh session") + self._append_message( + role="system", + title="Session", + content="Start typing below. FastGPT workflow events will appear in the left rail.", + ) + self.query_one("#composer", TextArea).focus() + + def _new_chat_id(self) -> str: + return f"chat_tui_{uuid.uuid4().hex[:12]}" + + def _refresh_sidebar(self) -> None: + session_panel = self.query_one("#session_panel", Static) + base_url = BASE_URL or "" + session_panel.update( + f"Session\n\nchatId: {self.chat_id}\nbaseUrl: {base_url}" + ) + + def _set_status(self, heading: str, detail: str) -> None: + panel = self.query_one("#status_panel", Static) + panel.update(f"Status\n\n{heading}\n{detail}") + + def _log_event(self, message: str) -> None: + self.query_one("#event_log", RichLog).write(message) + + def _format_workflow_payload(self, content: str) -> str: + try: + return json.dumps(json.loads(content), ensure_ascii=False, indent=2) + except Exception: + return content + + def _append_message(self, role: str, title: str, content: str) -> str: + self._message_counter += 1 + widget_id = f"message-{self._message_counter}" + card = MessageCard(role=role, title=title, content=content, widget_id=widget_id) + messages = self.query_one("#messages", VerticalScroll) + messages.mount(card) + messages.scroll_end(animate=False) + return widget_id + + def _assistant_card(self) -> str: + card_id = self._append_message("assistant", "FastGPT", "Thinking…") + self.query_one(f"#{card_id}", MessageCard).set_text("Thinking…") + return card_id + + def _start_turn(self, content: str, *, title: str = "You", role: str = "user") -> None: + if self._busy: + self._log_event("[local] Busy streaming. Wait for the current turn to finish.") + return + + display_content = self._format_workflow_payload(content) if role == "workflow" else content + self._append_message(role=role, title=title, content=display_content) + assistant_card_id = self._assistant_card() + self._busy = True + self._set_status("Streaming", "Receiving FastGPT output") + self._stream_turn( + messages=[{"role": "user", "content": content}], + assistant_card_id=assistant_card_id, + ) + + def _complete_turn(self, assistant_card_id: str, *, waiting_interactive: bool) -> None: + card = self.query_one(f"#{assistant_card_id}", MessageCard) + if waiting_interactive and not card.content_text.strip(): + card.set_text("Waiting for workflow input…") + self._set_status("Interactive", "Provide the requested workflow input") + elif not card.content_text.strip(): + card.set_text("(no text response)") + self._set_status("Ready", "Idle") + else: + self._set_status("Ready", "Idle") + self._busy = False + + def _append_assistant_chunk(self, assistant_card_id: str, chunk: str) -> None: + card = self.query_one(f"#{assistant_card_id}", MessageCard) + card.append_text(chunk) + self.query_one("#messages", VerticalScroll).scroll_end(animate=False) + + def _mark_turn_failed(self, assistant_card_id: str, message: str) -> None: + card = self.query_one(f"#{assistant_card_id}", MessageCard) + if card.content_text in {"", "Thinking…"}: + card.set_text(f"Error: {message}") + else: + card.append_text(f"\n\nError: {message}") + self._busy = False + self._set_status("Error", message) + self._log_event(f"[error] {message}") + + def _handle_interactive_result(self, result: Optional[str]) -> None: + if result is None: + self._log_event("[interactive] Prompt cancelled locally.") + self._set_status("Ready", "Interactive prompt dismissed") + return + + self._start_turn(result, title="Workflow Input", role="workflow") + + def _present_interactive(self, event: FastGPTInteractiveEvent) -> None: + self._log_event(f"[interactive] {event.interaction_type}") + if event.interaction_type == "userInput": + self.push_screen(InteractiveInputScreen(event), self._handle_interactive_result) + return + self.push_screen(InteractiveSelectScreen(event), self._handle_interactive_result) + + def action_send_message(self) -> None: + composer = self.query_one("#composer", TextArea) + content = composer.text.strip() + if not content: + return + composer.text = "" + composer.focus() + self._start_turn(content) + + def action_new_chat(self) -> None: + if self._busy: + self._log_event("[local] Cannot reset chat while a turn is streaming.") + return + self.chat_id = self._new_chat_id() + self.query_one("#messages", VerticalScroll).remove_children() + self._refresh_sidebar() + self._set_status("Ready", "Started a new random session") + self._append_message( + role="system", + title="Session", + content="New chat created. Start typing below.", + ) + self._log_event(f"[local] Started new chatId {self.chat_id}") + + @on(Button.Pressed, "#send") + def _send_button(self, _: Button.Pressed) -> None: + self.action_send_message() + + @on(Button.Pressed, "#new_chat") + def _new_chat_button(self, _: Button.Pressed) -> None: + self.action_new_chat() + + @work(thread=True, exclusive=True) + def _stream_turn(self, messages: List[Dict[str, Any]], assistant_card_id: str) -> None: + interactive_event: Optional[FastGPTInteractiveEvent] = None + try: + with ChatClient(api_key=API_KEY, base_url=BASE_URL) as client: + response = client.create_chat_completion( + messages=messages, + stream=True, + detail=True, + chatId=self.chat_id, + ) + response.raise_for_status() + try: + for event in iter_stream_events(response): + if event.kind in {"data", "answer", "fastAnswer"}: + content = _extract_text_from_event(event.kind, event.data) + if content: + self.call_from_thread(self._append_assistant_chunk, assistant_card_id, content) + continue + + if event.kind == "flowNodeStatus": + if isinstance(event.data, dict): + status = str(event.data.get("status") or "?") + node_name = str(event.data.get("nodeName") or event.data.get("name") or event.data.get("node_id") or "Unknown node") + self.call_from_thread(self._log_event, f"[flow] {status}: {node_name}") + else: + self.call_from_thread(self._log_event, f"[flow] {event.data}") + continue + + if event.kind == "flowResponses": + if isinstance(event.data, dict): + module_name = str(event.data.get("moduleName") or event.data.get("nodeName") or "Unknown module") + self.call_from_thread(self._log_event, f"[flow] response from: {module_name}") + elif isinstance(event.data, list): + self.call_from_thread(self._log_event, f"[flow] response details: {len(event.data)} module record(s)") + else: + self.call_from_thread(self._log_event, f"[flow] response details: {event.data}") + continue + + if event.kind == "toolCall": + tool_name = _tool_name_from_event(event.data) + self.call_from_thread(self._log_event, f"[tool] calling: {tool_name}") + continue + + if event.kind == "toolParams": + self.call_from_thread(self._log_event, f"[tool] params: {event.data}") + continue + + if event.kind == "toolResponse": + self.call_from_thread(self._log_event, f"[tool] response: {event.data}") + continue + + if event.kind == "updateVariables": + self.call_from_thread(self._log_event, f"[vars] updated: {event.data}") + continue + + if event.kind == "interactive": + interactive_event = event + self.call_from_thread(self._present_interactive, event) + break + + if event.kind == "error": + message = str(event.data.get("message") or event.data.get("error") or "Unknown FastGPT error") + raise RuntimeError(message) + + if event.kind == "done": + break + finally: + response.close() + except Exception as exc: + self.call_from_thread(self._mark_turn_failed, assistant_card_id, str(exc)) + return + + self.call_from_thread( + self._complete_turn, + assistant_card_id, + waiting_interactive=interactive_event is not None, + ) + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Textual FastGPT chat workbench") + parser.add_argument( + "--chat-id", + dest="chat_id", + help="Reuse an existing FastGPT chatId. Defaults to a random chat_tui_* value.", + ) + return parser.parse_args() + + +def main() -> None: + args = _parse_args() + FastGPTWorkbench(chat_id=args.chat_id).run() + + +if __name__ == "__main__": + main() diff --git a/fastgpt_client/__init__.py b/fastgpt_client/__init__.py index d677e64..77be1a6 100644 --- a/fastgpt_client/__init__.py +++ b/fastgpt_client/__init__.py @@ -1,4 +1,4 @@ -"""FastGPT Python SDK +"""FastGPT Python SDK A Python client library for interacting with FastGPT's OpenAPI. """ @@ -17,6 +17,8 @@ from fastgpt_client.exceptions import ( StreamParseError, ValidationError, ) +from fastgpt_client.streaming import aiter_stream_events, iter_stream_events +from fastgpt_client.stream_types import FastGPTInteractiveEvent, FastGPTStreamEvent __all__ = [ # Synchronous clients @@ -34,6 +36,11 @@ __all__ = [ "RateLimitError", "ValidationError", "StreamParseError", + # Streaming helpers + "iter_stream_events", + "aiter_stream_events", + "FastGPTStreamEvent", + "FastGPTInteractiveEvent", ] __version__ = "0.1.0" diff --git a/fastgpt_client/async_client.py b/fastgpt_client/async_client.py index 1535e5b..10c5b4b 100644 --- a/fastgpt_client/async_client.py +++ b/fastgpt_client/async_client.py @@ -44,9 +44,10 @@ class AsyncFastGPTClient(BaseClientMixin): # Initialize base client functionality super().__init__(api_key, base_url, timeout, max_retries, retry_delay, enable_logging) + connect_timeout = min(float(timeout), 15.0) if timeout and timeout > 0 else 15.0 self._client = httpx.AsyncClient( - base_url=base_url, - timeout=httpx.Timeout(timeout, connect=5.0), + base_url=self.base_url, + timeout=httpx.Timeout(timeout, connect=connect_timeout), ) async def __aenter__(self): @@ -270,6 +271,9 @@ class AsyncFastGPTClient(BaseClientMixin): try: error_data = response.json() message = error_data.get("message", f"HTTP {response.status_code}") + except httpx.ResponseNotRead: + message = f"HTTP {response.status_code}" + error_data = None except (ValueError, KeyError, AttributeError): # If we can't parse JSON (e.g., streaming response or invalid JSON), use status code message = f"HTTP {response.status_code}" diff --git a/fastgpt_client/base_client.py b/fastgpt_client/base_client.py index a930a73..e70ce86 100644 --- a/fastgpt_client/base_client.py +++ b/fastgpt_client/base_client.py @@ -8,6 +8,15 @@ from typing import Any class BaseClientMixin: """Mixin class providing retry logic, validation, and logging for FastGPT clients.""" + @staticmethod + def _normalize_base_url(base_url: str) -> str: + """Normalize FastGPT base URLs so '/api' is not duplicated by endpoint paths.""" + + normalized = str(base_url or "").strip().rstrip("/") + if normalized.endswith("/api"): + normalized = normalized[:-4] + return normalized + def __init__( self, api_key: str, @@ -28,7 +37,7 @@ class BaseClientMixin: enable_logging: Whether to enable request logging """ self.api_key = api_key - self.base_url = base_url + self.base_url = self._normalize_base_url(base_url) self.timeout = timeout self.max_retries = max_retries self.retry_delay = retry_delay diff --git a/fastgpt_client/client.py b/fastgpt_client/client.py index 14bef4e..1c35ef6 100644 --- a/fastgpt_client/client.py +++ b/fastgpt_client/client.py @@ -43,9 +43,10 @@ class FastGPTClient(BaseClientMixin): # Initialize base client functionality super().__init__(api_key, base_url, timeout, max_retries, retry_delay, enable_logging) + connect_timeout = min(float(timeout), 15.0) if timeout and timeout > 0 else 15.0 self._client = httpx.Client( - base_url=base_url, - timeout=httpx.Timeout(timeout, connect=5.0), + base_url=self.base_url, + timeout=httpx.Timeout(timeout, connect=connect_timeout), ) def __enter__(self): @@ -206,6 +207,19 @@ class FastGPTClient(BaseClientMixin): try: error_data = response.json() message = error_data.get("message", f"HTTP {response.status_code}") + except httpx.ResponseNotRead: + try: + response.read() + error_data = response.json() + message = error_data.get("message", f"HTTP {response.status_code}") + except Exception: + error_text = "" + try: + error_text = response.text.strip() + except Exception: + error_text = "" + message = error_text or f"HTTP {response.status_code}" + error_data = None except (ValueError, KeyError, AttributeError): # If we can't parse JSON (e.g., streaming response or invalid JSON), use status code message = f"HTTP {response.status_code}" diff --git a/fastgpt_client/stream_types.py b/fastgpt_client/stream_types.py new file mode 100644 index 0000000..f9c96fd --- /dev/null +++ b/fastgpt_client/stream_types.py @@ -0,0 +1,41 @@ +"""Typed FastGPT SSE stream events.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Literal + + +FastGPTEventKind = Literal[ + "data", + "answer", + "fastAnswer", + "flowNodeStatus", + "flowResponses", + "interactive", + "toolCall", + "toolParams", + "toolResponse", + "updateVariables", + "error", + "done", +] + +InteractiveType = Literal["userSelect", "userInput"] + + +@dataclass(frozen=True) +class FastGPTStreamEvent: + """Structured event parsed from a FastGPT SSE response.""" + + kind: FastGPTEventKind + data: Any = field(default_factory=dict) + raw_event: str = "" + raw_data: str = "" + + +@dataclass(frozen=True) +class FastGPTInteractiveEvent(FastGPTStreamEvent): + """Interactive workflow event requiring client input.""" + + interaction_type: InteractiveType = "userSelect" diff --git a/fastgpt_client/streaming.py b/fastgpt_client/streaming.py new file mode 100644 index 0000000..7551273 --- /dev/null +++ b/fastgpt_client/streaming.py @@ -0,0 +1,180 @@ +"""Helpers for parsing FastGPT SSE responses.""" + +from __future__ import annotations + +import json +from typing import Any, AsyncIterator, Dict, Iterator, Optional + +from .exceptions import StreamParseError +from .stream_types import FastGPTInteractiveEvent, FastGPTStreamEvent + + +def _infer_interaction_type(payload: Dict[str, Any]) -> str: + interaction_type = str(payload.get("type") or "").strip() + if interaction_type in {"userSelect", "userInput"}: + return interaction_type + + params = payload.get("params") if isinstance(payload.get("params"), dict) else {} + if isinstance(params.get("inputForm"), list): + return "userInput" + if isinstance(params.get("userSelectOptions"), list): + return "userSelect" + return "userSelect" + + +def _normalize_interactive_payload(payload: Dict[str, Any]) -> Dict[str, Any]: + normalized = payload + wrapped = normalized.get("interactive") + if isinstance(wrapped, dict): + normalized = wrapped + + interaction_type = str(normalized.get("type") or "").strip() + if interaction_type == "toolChildrenInteractive": + params = normalized.get("params") if isinstance(normalized.get("params"), dict) else {} + children_response = params.get("childrenResponse") + if isinstance(children_response, dict): + normalized = children_response + + return normalized + + +def _coerce_line(line: Any) -> str: + if isinstance(line, bytes): + return line.decode("utf-8") + return str(line) + + +def _build_event(event_name: str, raw_data: str) -> FastGPTStreamEvent: + normalized_event = event_name or "data" + if raw_data == "[DONE]": + return FastGPTStreamEvent(kind="done", raw_event=normalized_event, raw_data=raw_data) + + try: + payload = json.loads(raw_data) if raw_data else {} + except json.JSONDecodeError as exc: + raise StreamParseError( + f"Failed to parse SSE payload for event '{normalized_event}': {exc}" + ) from exc + + if normalized_event == "interactive": + if not isinstance(payload, dict): + raise StreamParseError( + f"Expected JSON object payload for event '{normalized_event}', got {type(payload).__name__}" + ) + normalized_payload = _normalize_interactive_payload(payload) + if not isinstance(normalized_payload, dict): + raise StreamParseError( + f"Expected JSON object payload for event '{normalized_event}', got {type(normalized_payload).__name__}" + ) + interaction_type = _infer_interaction_type(normalized_payload) + return FastGPTInteractiveEvent( + kind="interactive", + data=normalized_payload, + raw_event=normalized_event, + raw_data=raw_data, + interaction_type=interaction_type, + ) + + allowed_kinds = { + "data", + "answer", + "fastAnswer", + "flowNodeStatus", + "flowResponses", + "toolCall", + "toolParams", + "toolResponse", + "updateVariables", + "error", + "done", + } + kind = normalized_event if normalized_event in allowed_kinds else "data" + return FastGPTStreamEvent(kind=kind, data=payload, raw_event=normalized_event, raw_data=raw_data) + + +def _parse_sse_lines(lines: Iterator[Any]) -> Iterator[FastGPTStreamEvent]: + current_event: Optional[str] = None + data_lines: list[str] = [] + + def flush() -> Optional[FastGPTStreamEvent]: + nonlocal current_event, data_lines + if not current_event and not data_lines: + return None + if not data_lines: + raise StreamParseError(f"Missing data payload for SSE event '{current_event or 'data'}'") + raw_data = "\n".join(data_lines) + event = _build_event(current_event or "data", raw_data) + current_event = None + data_lines = [] + return event + + for raw_line in lines: + line = _coerce_line(raw_line).rstrip("\r\n") + if not line: + event = flush() + if event is not None: + yield event + continue + if line.startswith(":"): + continue + if line.startswith("event:"): + if current_event is not None or data_lines: + raise StreamParseError("Encountered new SSE event before previous event payload was terminated") + current_event = line[6:].strip() or "data" + continue + if line.startswith("data:"): + data_lines.append(line[5:].strip()) + continue + raise StreamParseError(f"Unsupported SSE line: {line}") + + event = flush() + if event is not None: + yield event + + +def iter_stream_events(response) -> Iterator[FastGPTStreamEvent]: + """Yield typed SSE events from a synchronous response stream.""" + + yield from _parse_sse_lines(iter(response.iter_lines())) + + +async def aiter_stream_events(response) -> AsyncIterator[FastGPTStreamEvent]: + """Yield typed SSE events from an asynchronous response stream.""" + + current_event: Optional[str] = None + data_lines: list[str] = [] + + async def flush() -> Optional[FastGPTStreamEvent]: + nonlocal current_event, data_lines + if not current_event and not data_lines: + return None + if not data_lines: + raise StreamParseError(f"Missing data payload for SSE event '{current_event or 'data'}'") + raw_data = "\n".join(data_lines) + event = _build_event(current_event or "data", raw_data) + current_event = None + data_lines = [] + return event + + async for raw_line in response.aiter_lines(): + line = _coerce_line(raw_line).rstrip("\r\n") + if not line: + event = await flush() + if event is not None: + yield event + continue + if line.startswith(":"): + continue + if line.startswith("event:"): + if current_event is not None or data_lines: + raise StreamParseError("Encountered new SSE event before previous event payload was terminated") + current_event = line[6:].strip() or "data" + continue + if line.startswith("data:"): + data_lines.append(line[5:].strip()) + continue + raise StreamParseError(f"Unsupported SSE line: {line}") + + event = await flush() + if event is not None: + yield event diff --git a/pyproject.toml b/pyproject.toml index 9fd79f5..4573050 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,10 @@ dependencies = [ ] [project.optional-dependencies] +examples = [ + "python-dotenv>=1.0", + "textual>=8.0.0", +] dev = [ "pytest>=7.0", "pytest-cov>=4.0", diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index e69de29..0000000 diff --git a/tests/test_chat_client.py b/tests/test_chat_client.py index 3bc78fa..b8693e7 100644 --- a/tests/test_chat_client.py +++ b/tests/test_chat_client.py @@ -12,6 +12,14 @@ from fastgpt_client.exceptions import ValidationError class TestChatClientCreateChatCompletion: """Test suite for ChatClient.create_chat_completion method.""" + def test_base_url_trailing_api_is_normalized(self, api_key): + """A base URL ending with /api should not produce /api/api/... requests.""" + + client = ChatClient(api_key, base_url="https://cloud.fastgpt.cn/api") + + assert client.base_url == "https://cloud.fastgpt.cn" + assert str(client._client.base_url) == "https://cloud.fastgpt.cn" + def test_create_chat_completion_basic(self, api_key, sample_chat_response): """Test basic chat completion creation.""" client = ChatClient(api_key) diff --git a/tests/test_streaming.py b/tests/test_streaming.py new file mode 100644 index 0000000..27ca938 --- /dev/null +++ b/tests/test_streaming.py @@ -0,0 +1,154 @@ +"""Tests for FastGPT SSE stream parsing helpers.""" + +import pytest + +from fastgpt_client.exceptions import StreamParseError +from fastgpt_client.streaming import aiter_stream_events, iter_stream_events +from fastgpt_client.stream_types import FastGPTInteractiveEvent + + +class _AsyncResponse: + def __init__(self, lines): + self._lines = lines + + async def aiter_lines(self): + for line in self._lines: + yield line + + +class _SyncResponse: + def __init__(self, lines): + self._lines = lines + + def iter_lines(self): + return iter(self._lines) + + +def test_iter_stream_events_parses_mixed_stream(): + response = _SyncResponse([ + 'data: {"choices":[{"delta":{"content":"Hel"}}]}', + '', + 'event:flowNodeStatus', + 'data: {"status":"running","nodeName":"Classifier"}', + '', + 'event:flowResponses', + 'data: {"moduleName":"Classifier","tokens":4}', + '', + 'event:interactive', + 'data: {"type":"userSelect","params":{"userSelectOptions":[{"value":"A"}]}}', + '', + 'data: [DONE]', + '', + ]) + + events = list(iter_stream_events(response)) + + assert [event.kind for event in events] == [ + 'data', + 'flowNodeStatus', + 'flowResponses', + 'interactive', + 'done', + ] + assert isinstance(events[3], FastGPTInteractiveEvent) + assert events[3].interaction_type == 'userSelect' + + +def test_iter_stream_events_parses_user_input_interactive(): + response = _SyncResponse([ + 'event:interactive', + 'data: {"type":"userInput","params":{"inputForm":[{"label":"Name"}]}}', + '', + ]) + + events = list(iter_stream_events(response)) + + assert len(events) == 1 + assert isinstance(events[0], FastGPTInteractiveEvent) + assert events[0].interaction_type == 'userInput' + + +def test_iter_stream_events_unwraps_tool_children_interactive(): + response = _SyncResponse([ + 'event:interactive', + 'data: {"interactive":{"type":"toolChildrenInteractive","params":{"childrenResponse":{"type":"userInput","params":{"description":"???????","inputForm":[{"label":"????"}]}}}}}', + '', + ]) + + events = list(iter_stream_events(response)) + + assert len(events) == 1 + assert isinstance(events[0], FastGPTInteractiveEvent) + assert events[0].interaction_type == 'userInput' + assert events[0].data['params']['description'] == '???????' + assert isinstance(events[0].data['params']['inputForm'], list) + assert events[0].data['params']['inputForm'][0]['label'] == '????' + + +@pytest.mark.asyncio +async def test_aiter_stream_events_parses_mixed_stream(): + response = _AsyncResponse([ + 'data: {"choices":[{"delta":{"content":"Hi"}}]}', + '', + 'event:interactive', + 'data: {"type":"userSelect","params":{"userSelectOptions":[{"value":"A"}]}}', + '', + 'data: [DONE]', + '', + ]) + + events = [] + async for event in aiter_stream_events(response): + events.append(event) + + assert [event.kind for event in events] == ['data', 'interactive', 'done'] + assert isinstance(events[1], FastGPTInteractiveEvent) + + +def test_iter_stream_events_rejects_malformed_json(): + response = _SyncResponse([ + 'event:interactive', + 'data: {invalid json}', + '', + ]) + + with pytest.raises(StreamParseError): + list(iter_stream_events(response)) + + +def test_iter_stream_events_rejects_malformed_frame_boundary(): + response = _SyncResponse([ + 'event:interactive', + 'event:error', + 'data: {"message":"bad"}', + '', + ]) + + with pytest.raises(StreamParseError): + list(iter_stream_events(response)) + + +def test_iter_stream_events_treats_done_marker_under_answer_event_as_done(): + response = _SyncResponse([ + 'event:answer', + 'data: [DONE]', + '', + ]) + + events = list(iter_stream_events(response)) + + assert [event.kind for event in events] == ['done'] + + +def test_iter_stream_events_allows_array_payloads_for_non_interactive_events(): + response = _SyncResponse([ + 'event:flowResponses', + 'data: [{"moduleName":"NodeA"},{"moduleName":"NodeB"}]', + '', + ]) + + events = list(iter_stream_events(response)) + + assert [event.kind for event in events] == ['flowResponses'] + assert isinstance(events[0].data, list) + assert events[0].data[0]['moduleName'] == 'NodeA'