Merge pull request #3707 from pipecat-ai/aleix/fix-openai-stream-close-compat
fix(openai): use compatible stream closing for non-OpenAI providers
This commit is contained in:
1
changelog/3707.fixed.md
Normal file
1
changelog/3707.fixed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed stream closing compatibility for OpenAI-compatible providers (e.g. OpenPipe) that return async generators instead of `AsyncStream`.
|
||||
@@ -9,6 +9,7 @@
|
||||
import asyncio
|
||||
import base64
|
||||
import json
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Any, Dict, List, Mapping, Optional
|
||||
|
||||
import httpx
|
||||
@@ -374,9 +375,19 @@ class BaseOpenAILLMService(LLMService):
|
||||
else self._stream_chat_completions_universal_context(context)
|
||||
)
|
||||
|
||||
# Use context manager to ensure stream is closed on cancellation/exception.
|
||||
# Without this, CancelledError during iteration leaves the underlying socket open.
|
||||
async with chunk_stream:
|
||||
# Ensure stream is closed on cancellation/exception to prevent socket
|
||||
# leaks. OpenAI's AsyncStream uses close(), async generators use aclose().
|
||||
@asynccontextmanager
|
||||
async def _closing(stream):
|
||||
try:
|
||||
yield stream
|
||||
finally:
|
||||
if hasattr(stream, "aclose"):
|
||||
await stream.aclose()
|
||||
elif hasattr(stream, "close"):
|
||||
await stream.close()
|
||||
|
||||
async with _closing(chunk_stream):
|
||||
async for chunk in chunk_stream:
|
||||
if chunk.usage:
|
||||
cached_tokens = (
|
||||
|
||||
@@ -149,13 +149,9 @@ async def test_openai_llm_stream_closed_on_cancellation():
|
||||
def __init__(self):
|
||||
self.iteration_count = 0
|
||||
|
||||
async def __aenter__(self):
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||
async def close(self):
|
||||
nonlocal stream_closed
|
||||
stream_closed = True
|
||||
return False
|
||||
|
||||
def __aiter__(self):
|
||||
return self
|
||||
|
||||
Reference in New Issue
Block a user