Files
fastgpt-python-sdk/tests/test_streaming.py
Xin Wang ef2614a70a feat: add chat_tui example for FastGPT with Textual interface
- Introduced a new example script `chat_tui.py` that provides a full-screen Textual interface for interacting with FastGPT.
- Implemented streaming chat updates, workflow logging, and modal handling for interactive nodes.
- Enhanced FastGPT client with new streaming capabilities and structured event types for better interaction handling.
- Normalized base URL handling in the client to prevent duplicate `/api` paths.
- Added tests for streaming event parsing and interaction handling.
2026-03-10 15:34:27 +08:00

155 lines
4.3 KiB
Python

"""Tests for FastGPT SSE stream parsing helpers."""
import pytest
from fastgpt_client.exceptions import StreamParseError
from fastgpt_client.streaming import aiter_stream_events, iter_stream_events
from fastgpt_client.stream_types import FastGPTInteractiveEvent
class _AsyncResponse:
def __init__(self, lines):
self._lines = lines
async def aiter_lines(self):
for line in self._lines:
yield line
class _SyncResponse:
def __init__(self, lines):
self._lines = lines
def iter_lines(self):
return iter(self._lines)
def test_iter_stream_events_parses_mixed_stream():
response = _SyncResponse([
'data: {"choices":[{"delta":{"content":"Hel"}}]}',
'',
'event:flowNodeStatus',
'data: {"status":"running","nodeName":"Classifier"}',
'',
'event:flowResponses',
'data: {"moduleName":"Classifier","tokens":4}',
'',
'event:interactive',
'data: {"type":"userSelect","params":{"userSelectOptions":[{"value":"A"}]}}',
'',
'data: [DONE]',
'',
])
events = list(iter_stream_events(response))
assert [event.kind for event in events] == [
'data',
'flowNodeStatus',
'flowResponses',
'interactive',
'done',
]
assert isinstance(events[3], FastGPTInteractiveEvent)
assert events[3].interaction_type == 'userSelect'
def test_iter_stream_events_parses_user_input_interactive():
response = _SyncResponse([
'event:interactive',
'data: {"type":"userInput","params":{"inputForm":[{"label":"Name"}]}}',
'',
])
events = list(iter_stream_events(response))
assert len(events) == 1
assert isinstance(events[0], FastGPTInteractiveEvent)
assert events[0].interaction_type == 'userInput'
def test_iter_stream_events_unwraps_tool_children_interactive():
response = _SyncResponse([
'event:interactive',
'data: {"interactive":{"type":"toolChildrenInteractive","params":{"childrenResponse":{"type":"userInput","params":{"description":"???????","inputForm":[{"label":"????"}]}}}}}',
'',
])
events = list(iter_stream_events(response))
assert len(events) == 1
assert isinstance(events[0], FastGPTInteractiveEvent)
assert events[0].interaction_type == 'userInput'
assert events[0].data['params']['description'] == '???????'
assert isinstance(events[0].data['params']['inputForm'], list)
assert events[0].data['params']['inputForm'][0]['label'] == '????'
@pytest.mark.asyncio
async def test_aiter_stream_events_parses_mixed_stream():
response = _AsyncResponse([
'data: {"choices":[{"delta":{"content":"Hi"}}]}',
'',
'event:interactive',
'data: {"type":"userSelect","params":{"userSelectOptions":[{"value":"A"}]}}',
'',
'data: [DONE]',
'',
])
events = []
async for event in aiter_stream_events(response):
events.append(event)
assert [event.kind for event in events] == ['data', 'interactive', 'done']
assert isinstance(events[1], FastGPTInteractiveEvent)
def test_iter_stream_events_rejects_malformed_json():
response = _SyncResponse([
'event:interactive',
'data: {invalid json}',
'',
])
with pytest.raises(StreamParseError):
list(iter_stream_events(response))
def test_iter_stream_events_rejects_malformed_frame_boundary():
response = _SyncResponse([
'event:interactive',
'event:error',
'data: {"message":"bad"}',
'',
])
with pytest.raises(StreamParseError):
list(iter_stream_events(response))
def test_iter_stream_events_treats_done_marker_under_answer_event_as_done():
response = _SyncResponse([
'event:answer',
'data: [DONE]',
'',
])
events = list(iter_stream_events(response))
assert [event.kind for event in events] == ['done']
def test_iter_stream_events_allows_array_payloads_for_non_interactive_events():
response = _SyncResponse([
'event:flowResponses',
'data: [{"moduleName":"NodeA"},{"moduleName":"NodeB"}]',
'',
])
events = list(iter_stream_events(response))
assert [event.kind for event in events] == ['flowResponses']
assert isinstance(events[0].data, list)
assert events[0].data[0]['moduleName'] == 'NodeA'