From 18aad05a7c8230c08a709c9139a176d31fbdb002 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 10 Feb 2026 17:59:21 -0800 Subject: [PATCH 1/3] fix(openai): use compatible stream closing for non-OpenAI providers OpenAI's AsyncStream uses close() while async generators (e.g. from OpenPipe) use aclose(). Replace direct async-with on the stream with a helper that handles both protocols. --- src/pipecat/services/openai/base_llm.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/pipecat/services/openai/base_llm.py b/src/pipecat/services/openai/base_llm.py index d8669f622..2cdde51ea 100644 --- a/src/pipecat/services/openai/base_llm.py +++ b/src/pipecat/services/openai/base_llm.py @@ -9,6 +9,7 @@ import asyncio import base64 import json +from contextlib import asynccontextmanager from typing import Any, Dict, List, Mapping, Optional import httpx @@ -374,9 +375,19 @@ class BaseOpenAILLMService(LLMService): else self._stream_chat_completions_universal_context(context) ) - # Use context manager to ensure stream is closed on cancellation/exception. - # Without this, CancelledError during iteration leaves the underlying socket open. - async with chunk_stream: + # Ensure stream is closed on cancellation/exception to prevent socket + # leaks. OpenAI's AsyncStream uses close(), async generators use aclose(). + @asynccontextmanager + async def _closing(stream): + try: + yield stream + finally: + if hasattr(stream, "aclose"): + await stream.aclose() + elif hasattr(stream, "close"): + await stream.close() + + async with _closing(chunk_stream): async for chunk in chunk_stream: if chunk.usage: cached_tokens = ( From f3eb5b30a08a0208d9bb068294f83eccd8feb4ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 10 Feb 2026 18:01:29 -0800 Subject: [PATCH 2/3] Add changelog for #3707 --- changelog/3707.fixed.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog/3707.fixed.md diff --git a/changelog/3707.fixed.md b/changelog/3707.fixed.md new file mode 100644 index 000000000..257342c62 --- /dev/null +++ b/changelog/3707.fixed.md @@ -0,0 +1 @@ +- Fixed stream closing compatibility for OpenAI-compatible providers (e.g. OpenPipe) that return async generators instead of `AsyncStream`. From 93f4402198b770e652b05027c04746db93165552 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 10 Feb 2026 18:19:57 -0800 Subject: [PATCH 3/3] Update stream close test to match new _closing helper --- tests/test_openai_llm_timeout.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tests/test_openai_llm_timeout.py b/tests/test_openai_llm_timeout.py index 4ba459a29..f7876f02f 100644 --- a/tests/test_openai_llm_timeout.py +++ b/tests/test_openai_llm_timeout.py @@ -149,13 +149,9 @@ async def test_openai_llm_stream_closed_on_cancellation(): def __init__(self): self.iteration_count = 0 - async def __aenter__(self): - return self - - async def __aexit__(self, exc_type, exc_val, exc_tb): + async def close(self): nonlocal stream_closed stream_closed = True - return False def __aiter__(self): return self