Compare commits

...

1 Commits

Author SHA1 Message Date
Aleix Conchillo Flaqué
ca84e665aa openai: add retry logic to get_chat_completions() 2025-02-21 20:56:12 -08:00
8 changed files with 17 additions and 1 deletions

View File

@@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added retry logic to `get_chat_completions()` fo all OpenAI-based LLM
services.
- Added new log observers `LLMLogObserver` and `TranscriptionLogObserver` that
can be useful for debugging your pipelines.

View File

@@ -33,7 +33,8 @@ dependencies = [
"pydantic~=2.10.5",
"pyloudnorm~=0.1.1",
"resampy~=0.4.3",
"soxr~=0.5.0"
"soxr~=0.5.0",
"tenacity~=9.0.0"
]
[project.urls]

View File

@@ -7,6 +7,7 @@
from typing import List
from loguru import logger
from tenacity import retry, stop_after_attempt, wait_fixed
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai import OpenAILLMService
@@ -52,6 +53,7 @@ class CerebrasLLMService(OpenAILLMService):
logger.debug(f"Creating Cerebras client with api {base_url}")
return super().create_client(api_key, base_url, **kwargs)
@retry(stop=stop_after_attempt(2), wait=wait_fixed(2))
async def get_chat_completions(
self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]
) -> AsyncStream[ChatCompletionChunk]:

View File

@@ -8,6 +8,7 @@
from typing import List
from loguru import logger
from tenacity import retry, stop_after_attempt, wait_fixed
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai import OpenAILLMService
@@ -53,6 +54,7 @@ class DeepSeekLLMService(OpenAILLMService):
logger.debug(f"Creating DeepSeek client with api {base_url}")
return super().create_client(api_key, base_url, **kwargs)
@retry(stop=stop_after_attempt(2), wait=wait_fixed(2))
async def get_chat_completions(
self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]
) -> AsyncStream[ChatCompletionChunk]:

View File

@@ -8,6 +8,7 @@
from typing import List
from loguru import logger
from tenacity import retry, stop_after_attempt, wait_fixed
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai import OpenAILLMService
@@ -50,6 +51,7 @@ class FireworksLLMService(OpenAILLMService):
logger.debug(f"Creating Fireworks client with api {base_url}")
return super().create_client(api_key, base_url, **kwargs)
@retry(stop=stop_after_attempt(2), wait=wait_fixed(2))
async def get_chat_completions(
self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]
):

View File

@@ -15,6 +15,7 @@ import httpx
from loguru import logger
from PIL import Image
from pydantic import BaseModel, Field
from tenacity import retry, stop_after_attempt, wait_fixed
from pipecat.frames.frames import (
ErrorFrame,
@@ -160,6 +161,7 @@ class BaseOpenAILLMService(LLMService):
def can_generate_metrics(self) -> bool:
return True
@retry(stop=stop_after_attempt(2), wait=wait_fixed(2))
async def get_chat_completions(
self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]
) -> AsyncStream[ChatCompletionChunk]:

View File

@@ -7,6 +7,7 @@
from typing import Dict, List, Optional
from loguru import logger
from tenacity import retry, stop_after_attempt, wait_fixed
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai import OpenAILLMService
@@ -55,6 +56,7 @@ class OpenPipeLLMService(OpenAILLMService):
)
return client
@retry(stop=stop_after_attempt(2), wait=wait_fixed(2))
async def get_chat_completions(
self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]
) -> AsyncStream[ChatCompletionChunk]:

View File

@@ -7,6 +7,7 @@
from typing import List
from loguru import logger
from tenacity import retry, stop_after_attempt, wait_fixed
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
@@ -56,6 +57,7 @@ class PerplexityLLMService(OpenAILLMService):
self._has_reported_prompt_tokens = False
self._is_processing = False
@retry(stop=stop_after_attempt(2), wait=wait_fixed(2))
async def get_chat_completions(
self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]
) -> AsyncStream[ChatCompletionChunk]: