Files
pipecat/tests/test_openai_llm_timeout.py
Aleix Conchillo Flaqué e553bb010f tests: migrate LLM tests to Settings-based constructor API
Replace the old `model=` / `params=InputParams(...)` style with the
new `settings=<Service>.Settings(...)` form across LLM service tests.
2026-04-10 17:37:49 -07:00

298 lines
11 KiB
Python

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Unit tests for OpenAI LLM error handling."""
from unittest.mock import AsyncMock, patch
import httpx
import pytest
from pipecat.frames.frames import (
LLMContextFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
)
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.openai.llm import OpenAILLMService
@pytest.mark.asyncio
async def test_openai_llm_emits_error_frame_on_timeout():
"""Test that OpenAI LLM service emits ErrorFrame when a timeout occurs.
This enables LLMSwitcher to trigger failover to backup LLMs when the
primary LLM times out.
"""
with patch.object(OpenAILLMService, "create_client"):
service = OpenAILLMService(settings=OpenAILLMService.Settings(model="gpt-4"))
service._client = AsyncMock()
# Track pushed frames and errors
pushed_frames = []
pushed_errors = []
timeout_handler_called = False
original_push_frame = service.push_frame
async def mock_push_frame(frame, direction=FrameDirection.DOWNSTREAM):
pushed_frames.append(frame)
await original_push_frame(frame, direction)
async def mock_push_error(error_msg, exception=None):
pushed_errors.append({"error_msg": error_msg, "exception": exception})
async def mock_timeout_handler(event_name):
nonlocal timeout_handler_called
if event_name == "on_completion_timeout":
timeout_handler_called = True
service.push_frame = mock_push_frame
service.push_error = mock_push_error
service._call_event_handler = AsyncMock(side_effect=mock_timeout_handler)
# Mock _process_context to raise TimeoutException
service._process_context = AsyncMock(
side_effect=httpx.TimeoutException("Connection timed out")
)
# Mock metrics methods
service.start_processing_metrics = AsyncMock()
service.stop_processing_metrics = AsyncMock()
service.start_ttfb_metrics = AsyncMock()
# Create a context frame to process
context = LLMContext(
messages=[{"role": "user", "content": "Hello"}],
)
frame = LLMContextFrame(context=context)
# Process the frame
await service.process_frame(frame, FrameDirection.DOWNSTREAM)
# Verify timeout handler was called
service._call_event_handler.assert_called_once_with("on_completion_timeout")
assert timeout_handler_called
# Verify push_error was called with correct message
assert len(pushed_errors) == 1
assert pushed_errors[0]["error_msg"] == "LLM completion timeout"
assert isinstance(pushed_errors[0]["exception"], httpx.TimeoutException)
# Verify LLMFullResponseStartFrame and LLMFullResponseEndFrame were pushed
frame_types = [type(f) for f in pushed_frames]
assert LLMFullResponseStartFrame in frame_types
assert LLMFullResponseEndFrame in frame_types
@pytest.mark.asyncio
async def test_openai_llm_timeout_still_pushes_end_frame():
"""Test that LLMFullResponseEndFrame is pushed even when timeout occurs.
The finally block should ensure proper cleanup regardless of timeout.
"""
with patch.object(OpenAILLMService, "create_client"):
service = OpenAILLMService(settings=OpenAILLMService.Settings(model="gpt-4"))
service._client = AsyncMock()
pushed_frames = []
async def mock_push_frame(frame, direction=FrameDirection.DOWNSTREAM):
pushed_frames.append(frame)
service.push_frame = mock_push_frame
service.push_error = AsyncMock()
service._call_event_handler = AsyncMock()
service._process_context = AsyncMock(side_effect=httpx.TimeoutException("Timeout"))
service.start_processing_metrics = AsyncMock()
service.stop_processing_metrics = AsyncMock()
context = LLMContext(
messages=[{"role": "user", "content": "Hello"}],
)
frame = LLMContextFrame(context=context)
await service.process_frame(frame, FrameDirection.DOWNSTREAM)
# Verify both start and end frames are pushed
frame_types = [type(f) for f in pushed_frames]
assert LLMFullResponseStartFrame in frame_types
assert LLMFullResponseEndFrame in frame_types
# Verify metrics were stopped
service.stop_processing_metrics.assert_called_once()
@pytest.mark.asyncio
async def test_openai_llm_stream_closed_on_cancellation():
"""Test that the stream is closed when CancelledError occurs during iteration.
This prevents socket leaks when the pipeline is interrupted (e.g., user interruption).
See issue #3589.
"""
import asyncio
with patch.object(OpenAILLMService, "create_client"):
service = OpenAILLMService(settings=OpenAILLMService.Settings(model="gpt-4"))
service._client = AsyncMock()
# Track if close was called
stream_closed = False
class MockAsyncStream:
"""Mock AsyncStream that tracks close() calls and raises CancelledError."""
def __init__(self):
self.iteration_count = 0
async def close(self):
nonlocal stream_closed
stream_closed = True
def __aiter__(self):
return self
async def __anext__(self):
self.iteration_count += 1
if self.iteration_count > 1:
# Simulate cancellation during iteration
raise asyncio.CancelledError()
# Return a minimal chunk for first iteration
mock_chunk = AsyncMock()
mock_chunk.usage = None
mock_chunk.model = None
mock_chunk.choices = []
return mock_chunk
mock_stream = MockAsyncStream()
# Mock the stream creation methods
service.get_chat_completions = AsyncMock(return_value=mock_stream)
service.start_ttfb_metrics = AsyncMock()
service.stop_ttfb_metrics = AsyncMock()
service.start_llm_usage_metrics = AsyncMock()
context = LLMContext(
messages=[{"role": "user", "content": "Hello"}],
)
# Process context should raise CancelledError but stream should still be closed
with pytest.raises(asyncio.CancelledError):
await service._process_context(context)
# Verify stream was closed despite the cancellation
assert stream_closed, "Stream should be closed even when CancelledError occurs"
@pytest.mark.asyncio
async def test_openai_llm_emits_error_frame_on_exception():
"""Test that OpenAI LLM service emits ErrorFrame when a general exception occurs.
This enables proper error handling for API errors, rate limits, and other failures.
"""
with patch.object(OpenAILLMService, "create_client"):
service = OpenAILLMService(settings=OpenAILLMService.Settings(model="gpt-4"))
service._client = AsyncMock()
pushed_errors = []
async def mock_push_error(error_msg, exception=None):
pushed_errors.append({"error_msg": error_msg, "exception": exception})
service.push_frame = AsyncMock()
service.push_error = mock_push_error
service._call_event_handler = AsyncMock()
service._process_context = AsyncMock(side_effect=RuntimeError("API Error"))
service.start_processing_metrics = AsyncMock()
service.stop_processing_metrics = AsyncMock()
context = LLMContext(
messages=[{"role": "user", "content": "Hello"}],
)
frame = LLMContextFrame(context=context)
await service.process_frame(frame, FrameDirection.DOWNSTREAM)
# Verify push_error was called with correct message
assert len(pushed_errors) == 1
assert "Error during completion" in pushed_errors[0]["error_msg"]
assert "API Error" in pushed_errors[0]["error_msg"]
assert isinstance(pushed_errors[0]["exception"], RuntimeError)
@pytest.mark.asyncio
async def test_openai_llm_async_iterator_closed_on_stream_end():
"""Test that the async iterator is explicitly closed after stream consumption.
This prevents uvloop's broken asyncgen finalizer from firing on Python 3.12+
when async generators are garbage-collected without explicit cleanup.
See MagicStack/uvloop#699.
"""
with patch.object(OpenAILLMService, "create_client"):
service = OpenAILLMService(settings=OpenAILLMService.Settings(model="gpt-4"))
service._client = AsyncMock()
# Track if the iterator's aclose was called
iterator_aclosed = False
stream_closed = False
class MockAsyncIterator:
"""Mock async iterator that tracks aclose() calls."""
def __init__(self):
self.iteration_count = 0
def __aiter__(self):
return self
async def __anext__(self):
self.iteration_count += 1
if self.iteration_count > 2:
raise StopAsyncIteration()
# Return a minimal chunk
mock_chunk = AsyncMock()
mock_chunk.usage = None
mock_chunk.model = None
mock_chunk.choices = []
return mock_chunk
async def aclose(self):
nonlocal iterator_aclosed
iterator_aclosed = True
class MockAsyncStream:
"""Mock stream whose __aiter__ returns a separate iterator object."""
def __init__(self, iterator):
self._iterator = iterator
def __aiter__(self):
return self._iterator
async def close(self):
nonlocal stream_closed
stream_closed = True
mock_iterator = MockAsyncIterator()
mock_stream = MockAsyncStream(mock_iterator)
service.get_chat_completions = AsyncMock(return_value=mock_stream)
service.start_ttfb_metrics = AsyncMock()
service.stop_ttfb_metrics = AsyncMock()
service.start_llm_usage_metrics = AsyncMock()
context = LLMContext(
messages=[{"role": "user", "content": "Hello"}],
)
await service._process_context(context)
# Verify the iterator was explicitly closed (prevents uvloop crash)
assert iterator_aclosed, "Async iterator should be explicitly closed"
# Verify the stream was also closed (releases HTTP resources)
assert stream_closed, "Stream should be closed to release HTTP resources"