From 247f0bbcd33c995ad3b7da70780b0617c09f1e18 Mon Sep 17 00:00:00 2001 From: Luke Payyapilli Date: Tue, 17 Feb 2026 13:04:10 -0500 Subject: [PATCH] Fix async generator cleanup to prevent uvloop crash on Python 3.12+ --- changelog/3698.fixed.md | 1 + src/pipecat/services/openai/base_llm.py | 25 ++++++--- tests/test_openai_llm_timeout.py | 74 +++++++++++++++++++++++++ 3 files changed, 92 insertions(+), 8 deletions(-) create mode 100644 changelog/3698.fixed.md diff --git a/changelog/3698.fixed.md b/changelog/3698.fixed.md new file mode 100644 index 000000000..c040e9efb --- /dev/null +++ b/changelog/3698.fixed.md @@ -0,0 +1 @@ +- Fixed async generator cleanup in OpenAI LLM streaming to prevent `AttributeError` with uvloop on Python 3.12+ (MagicStack/uvloop#699). diff --git a/src/pipecat/services/openai/base_llm.py b/src/pipecat/services/openai/base_llm.py index 2cdde51ea..ebe9eda91 100644 --- a/src/pipecat/services/openai/base_llm.py +++ b/src/pipecat/services/openai/base_llm.py @@ -375,20 +375,29 @@ class BaseOpenAILLMService(LLMService): else self._stream_chat_completions_universal_context(context) ) - # Ensure stream is closed on cancellation/exception to prevent socket - # leaks. OpenAI's AsyncStream uses close(), async generators use aclose(). + # Ensure stream and its async iterator are closed on cancellation/exception + # to prevent socket leaks and uvloop crashes. Closing the iterator first + # cascades cleanup through nested async generators (httpx/httpcore internals), + # preventing uvloop's broken asyncgen finalizer from firing on Python 3.12+ + # (MagicStack/uvloop#699). @asynccontextmanager async def _closing(stream): + chunk_iter = stream.__aiter__() try: - yield stream + yield chunk_iter finally: - if hasattr(stream, "aclose"): - await stream.aclose() - elif hasattr(stream, "close"): + # Close the iterator first to cascade cleanup through + # nested async generators (httpx/httpcore internals). + if hasattr(chunk_iter, "aclose"): + await chunk_iter.aclose() + # Then close the stream to release HTTP resources. + if hasattr(stream, "close"): await stream.close() + elif hasattr(stream, "aclose"): + await stream.aclose() - async with _closing(chunk_stream): - async for chunk in chunk_stream: + async with _closing(chunk_stream) as chunk_iter: + async for chunk in chunk_iter: if chunk.usage: cached_tokens = ( chunk.usage.prompt_tokens_details.cached_tokens diff --git a/tests/test_openai_llm_timeout.py b/tests/test_openai_llm_timeout.py index f7876f02f..8ee776a9b 100644 --- a/tests/test_openai_llm_timeout.py +++ b/tests/test_openai_llm_timeout.py @@ -223,3 +223,77 @@ async def test_openai_llm_emits_error_frame_on_exception(): assert "Error during completion" in pushed_errors[0]["error_msg"] assert "API Error" in pushed_errors[0]["error_msg"] assert isinstance(pushed_errors[0]["exception"], RuntimeError) + + +@pytest.mark.asyncio +async def test_openai_llm_async_iterator_closed_on_stream_end(): + """Test that the async iterator is explicitly closed after stream consumption. + + This prevents uvloop's broken asyncgen finalizer from firing on Python 3.12+ + when async generators are garbage-collected without explicit cleanup. + See MagicStack/uvloop#699. + """ + with patch.object(OpenAILLMService, "create_client"): + service = OpenAILLMService(model="gpt-4") + service._client = AsyncMock() + + # Track if the iterator's aclose was called + iterator_aclosed = False + stream_closed = False + + class MockAsyncIterator: + """Mock async iterator that tracks aclose() calls.""" + + def __init__(self): + self.iteration_count = 0 + + def __aiter__(self): + return self + + async def __anext__(self): + self.iteration_count += 1 + if self.iteration_count > 2: + raise StopAsyncIteration() + # Return a minimal chunk + mock_chunk = AsyncMock() + mock_chunk.usage = None + mock_chunk.model = None + mock_chunk.choices = [] + return mock_chunk + + async def aclose(self): + nonlocal iterator_aclosed + iterator_aclosed = True + + class MockAsyncStream: + """Mock stream whose __aiter__ returns a separate iterator object.""" + + def __init__(self, iterator): + self._iterator = iterator + + def __aiter__(self): + return self._iterator + + async def close(self): + nonlocal stream_closed + stream_closed = True + + mock_iterator = MockAsyncIterator() + mock_stream = MockAsyncStream(mock_iterator) + + service._stream_chat_completions_specific_context = AsyncMock(return_value=mock_stream) + service._stream_chat_completions_universal_context = AsyncMock(return_value=mock_stream) + service.start_ttfb_metrics = AsyncMock() + service.stop_ttfb_metrics = AsyncMock() + service.start_llm_usage_metrics = AsyncMock() + + context = LLMContext( + messages=[{"role": "user", "content": "Hello"}], + ) + + await service._process_context(context) + + # Verify the iterator was explicitly closed (prevents uvloop crash) + assert iterator_aclosed, "Async iterator should be explicitly closed" + # Verify the stream was also closed (releases HTTP resources) + assert stream_closed, "Stream should be closed to release HTTP resources"