diff --git a/changelog/3707.fixed.md b/changelog/3707.fixed.md new file mode 100644 index 000000000..257342c62 --- /dev/null +++ b/changelog/3707.fixed.md @@ -0,0 +1 @@ +- Fixed stream closing compatibility for OpenAI-compatible providers (e.g. OpenPipe) that return async generators instead of `AsyncStream`. diff --git a/src/pipecat/services/openai/base_llm.py b/src/pipecat/services/openai/base_llm.py index d8669f622..2cdde51ea 100644 --- a/src/pipecat/services/openai/base_llm.py +++ b/src/pipecat/services/openai/base_llm.py @@ -9,6 +9,7 @@ import asyncio import base64 import json +from contextlib import asynccontextmanager from typing import Any, Dict, List, Mapping, Optional import httpx @@ -374,9 +375,19 @@ class BaseOpenAILLMService(LLMService): else self._stream_chat_completions_universal_context(context) ) - # Use context manager to ensure stream is closed on cancellation/exception. - # Without this, CancelledError during iteration leaves the underlying socket open. - async with chunk_stream: + # Ensure stream is closed on cancellation/exception to prevent socket + # leaks. OpenAI's AsyncStream uses close(), async generators use aclose(). + @asynccontextmanager + async def _closing(stream): + try: + yield stream + finally: + if hasattr(stream, "aclose"): + await stream.aclose() + elif hasattr(stream, "close"): + await stream.close() + + async with _closing(chunk_stream): async for chunk in chunk_stream: if chunk.usage: cached_tokens = ( diff --git a/tests/test_openai_llm_timeout.py b/tests/test_openai_llm_timeout.py index 4ba459a29..f7876f02f 100644 --- a/tests/test_openai_llm_timeout.py +++ b/tests/test_openai_llm_timeout.py @@ -149,13 +149,9 @@ async def test_openai_llm_stream_closed_on_cancellation(): def __init__(self): self.iteration_count = 0 - async def __aenter__(self): - return self - - async def __aexit__(self, exc_type, exc_val, exc_tb): + async def close(self): nonlocal stream_closed stream_closed = True - return False def __aiter__(self): return self