fix stream issue

This commit is contained in:
Xin Wang
2026-01-06 15:48:05 +08:00
parent 0495dd4676
commit 017bdc4b53

View File

@@ -1,6 +1,7 @@
"""FastGPT Client - Main synchronous client.""" """FastGPT Client - Main synchronous client."""
import logging import logging
import weakref
from typing import Any, Dict, Literal, Union from typing import Any, Dict, Literal, Union
import httpx import httpx
@@ -107,14 +108,68 @@ class FastGPTClient(BaseClientMixin):
self.logger.debug(f"Request params: {params}") self.logger.debug(f"Request params: {params}")
# httpx.Client automatically prepends base_url # httpx.Client automatically prepends base_url
response = self._client.request( # For streaming, use stream() method; for non-streaming, use request()
method, if stream:
endpoint, # httpx.stream() returns a context manager, enter it and return response
json=json, stream_context = self._client.stream(
params=params, method,
headers=headers, endpoint,
**kwargs, json=json,
) params=params,
headers=headers,
**kwargs,
)
response = stream_context.__enter__()
# Store the stream context on the response
response._stream_context = stream_context
response._stream_context_closed = False
# Override close() to also close the stream context
original_close = response.close
def close_with_context():
"""Close both the response and the stream context."""
if getattr(response, '_stream_context_closed', False):
return
try:
# Close the response first
original_close()
finally:
# Always close the stream context, even if response.close() fails
if hasattr(response, '_stream_context') and response._stream_context is not None:
try:
response._stream_context.__exit__(None, None, None)
except Exception:
pass # Ignore errors during cleanup
finally:
response._stream_context = None
response._stream_context_closed = True
response.close = close_with_context
# Safety net: ensure cleanup on garbage collection
def cleanup_stream_context(stream_ctx_ref):
"""Finalizer to close stream context if response is GC'd without being closed."""
stream_ctx = stream_ctx_ref()
if stream_ctx is not None:
try:
stream_ctx.__exit__(None, None, None)
except Exception:
pass # Ignore errors in finalizer
# Use weakref to avoid circular references
weakref.finalize(response, cleanup_stream_context, weakref.ref(stream_context))
else:
response = self._client.request(
method,
endpoint,
json=json,
params=params,
headers=headers,
**kwargs,
)
# Log response if logging is enabled # Log response if logging is enabled
if self.enable_logging: if self.enable_logging:
@@ -143,13 +198,16 @@ class FastGPTClient(BaseClientMixin):
ValidationError: If status code is 422 ValidationError: If status code is 422
APIError: For other 4xx and 5xx errors APIError: For other 4xx and 5xx errors
""" """
# Check status code first (doesn't consume response body)
if response.status_code < 400: if response.status_code < 400:
return # Success response return # Success response
# Try to parse error message (this will consume the body, but that's OK for errors)
try: try:
error_data = response.json() error_data = response.json()
message = error_data.get("message", f"HTTP {response.status_code}") message = error_data.get("message", f"HTTP {response.status_code}")
except (ValueError, KeyError): except (ValueError, KeyError, AttributeError):
# If we can't parse JSON (e.g., streaming response or invalid JSON), use status code
message = f"HTTP {response.status_code}" message = f"HTTP {response.status_code}"
error_data = None error_data = None