Merge pull request #3763 from lukepayyapilli/fix/asyncgen-cleanup-uvloop-crash
Fix async generator cleanup to prevent uvloop crash on Python 3.12+
This commit is contained in:
1
changelog/3698.fixed.md
Normal file
1
changelog/3698.fixed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed async generator cleanup in OpenAI LLM streaming to prevent `AttributeError` with uvloop on Python 3.12+ (MagicStack/uvloop#699).
|
||||
@@ -375,20 +375,29 @@ class BaseOpenAILLMService(LLMService):
|
||||
else self._stream_chat_completions_universal_context(context)
|
||||
)
|
||||
|
||||
# Ensure stream is closed on cancellation/exception to prevent socket
|
||||
# leaks. OpenAI's AsyncStream uses close(), async generators use aclose().
|
||||
# Ensure stream and its async iterator are closed on cancellation/exception
|
||||
# to prevent socket leaks and uvloop crashes. Closing the iterator first
|
||||
# cascades cleanup through nested async generators (httpx/httpcore internals),
|
||||
# preventing uvloop's broken asyncgen finalizer from firing on Python 3.12+
|
||||
# (MagicStack/uvloop#699).
|
||||
@asynccontextmanager
|
||||
async def _closing(stream):
|
||||
chunk_iter = stream.__aiter__()
|
||||
try:
|
||||
yield stream
|
||||
yield chunk_iter
|
||||
finally:
|
||||
if hasattr(stream, "aclose"):
|
||||
await stream.aclose()
|
||||
elif hasattr(stream, "close"):
|
||||
# Close the iterator first to cascade cleanup through
|
||||
# nested async generators (httpx/httpcore internals).
|
||||
if hasattr(chunk_iter, "aclose"):
|
||||
await chunk_iter.aclose()
|
||||
# Then close the stream to release HTTP resources.
|
||||
if hasattr(stream, "close"):
|
||||
await stream.close()
|
||||
elif hasattr(stream, "aclose"):
|
||||
await stream.aclose()
|
||||
|
||||
async with _closing(chunk_stream):
|
||||
async for chunk in chunk_stream:
|
||||
async with _closing(chunk_stream) as chunk_iter:
|
||||
async for chunk in chunk_iter:
|
||||
if chunk.usage:
|
||||
cached_tokens = (
|
||||
chunk.usage.prompt_tokens_details.cached_tokens
|
||||
|
||||
@@ -223,3 +223,77 @@ async def test_openai_llm_emits_error_frame_on_exception():
|
||||
assert "Error during completion" in pushed_errors[0]["error_msg"]
|
||||
assert "API Error" in pushed_errors[0]["error_msg"]
|
||||
assert isinstance(pushed_errors[0]["exception"], RuntimeError)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_openai_llm_async_iterator_closed_on_stream_end():
|
||||
"""Test that the async iterator is explicitly closed after stream consumption.
|
||||
|
||||
This prevents uvloop's broken asyncgen finalizer from firing on Python 3.12+
|
||||
when async generators are garbage-collected without explicit cleanup.
|
||||
See MagicStack/uvloop#699.
|
||||
"""
|
||||
with patch.object(OpenAILLMService, "create_client"):
|
||||
service = OpenAILLMService(model="gpt-4")
|
||||
service._client = AsyncMock()
|
||||
|
||||
# Track if the iterator's aclose was called
|
||||
iterator_aclosed = False
|
||||
stream_closed = False
|
||||
|
||||
class MockAsyncIterator:
|
||||
"""Mock async iterator that tracks aclose() calls."""
|
||||
|
||||
def __init__(self):
|
||||
self.iteration_count = 0
|
||||
|
||||
def __aiter__(self):
|
||||
return self
|
||||
|
||||
async def __anext__(self):
|
||||
self.iteration_count += 1
|
||||
if self.iteration_count > 2:
|
||||
raise StopAsyncIteration()
|
||||
# Return a minimal chunk
|
||||
mock_chunk = AsyncMock()
|
||||
mock_chunk.usage = None
|
||||
mock_chunk.model = None
|
||||
mock_chunk.choices = []
|
||||
return mock_chunk
|
||||
|
||||
async def aclose(self):
|
||||
nonlocal iterator_aclosed
|
||||
iterator_aclosed = True
|
||||
|
||||
class MockAsyncStream:
|
||||
"""Mock stream whose __aiter__ returns a separate iterator object."""
|
||||
|
||||
def __init__(self, iterator):
|
||||
self._iterator = iterator
|
||||
|
||||
def __aiter__(self):
|
||||
return self._iterator
|
||||
|
||||
async def close(self):
|
||||
nonlocal stream_closed
|
||||
stream_closed = True
|
||||
|
||||
mock_iterator = MockAsyncIterator()
|
||||
mock_stream = MockAsyncStream(mock_iterator)
|
||||
|
||||
service._stream_chat_completions_specific_context = AsyncMock(return_value=mock_stream)
|
||||
service._stream_chat_completions_universal_context = AsyncMock(return_value=mock_stream)
|
||||
service.start_ttfb_metrics = AsyncMock()
|
||||
service.stop_ttfb_metrics = AsyncMock()
|
||||
service.start_llm_usage_metrics = AsyncMock()
|
||||
|
||||
context = LLMContext(
|
||||
messages=[{"role": "user", "content": "Hello"}],
|
||||
)
|
||||
|
||||
await service._process_context(context)
|
||||
|
||||
# Verify the iterator was explicitly closed (prevents uvloop crash)
|
||||
assert iterator_aclosed, "Async iterator should be explicitly closed"
|
||||
# Verify the stream was also closed (releases HTTP resources)
|
||||
assert stream_closed, "Stream should be closed to release HTTP resources"
|
||||
|
||||
Reference in New Issue
Block a user