diff --git a/CHANGELOG.md b/CHANGELOG.md index a71ae5f02..582258df2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added retry logic to `get_chat_completions()` fo all OpenAI-based LLM + services. + - Added new log observers `LLMLogObserver` and `TranscriptionLogObserver` that can be useful for debugging your pipelines. diff --git a/pyproject.toml b/pyproject.toml index 3c5d6262b..768c6d150 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,7 +33,8 @@ dependencies = [ "pydantic~=2.10.5", "pyloudnorm~=0.1.1", "resampy~=0.4.3", - "soxr~=0.5.0" + "soxr~=0.5.0", + "tenacity~=9.0.0" ] [project.urls] diff --git a/src/pipecat/services/cerebras.py b/src/pipecat/services/cerebras.py index fce3c22bf..562d8480d 100644 --- a/src/pipecat/services/cerebras.py +++ b/src/pipecat/services/cerebras.py @@ -7,6 +7,7 @@ from typing import List from loguru import logger +from tenacity import retry, stop_after_attempt, wait_fixed from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.services.openai import OpenAILLMService @@ -52,6 +53,7 @@ class CerebrasLLMService(OpenAILLMService): logger.debug(f"Creating Cerebras client with api {base_url}") return super().create_client(api_key, base_url, **kwargs) + @retry(stop=stop_after_attempt(2), wait=wait_fixed(2)) async def get_chat_completions( self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam] ) -> AsyncStream[ChatCompletionChunk]: diff --git a/src/pipecat/services/deepseek.py b/src/pipecat/services/deepseek.py index 67433dfaa..25088c981 100644 --- a/src/pipecat/services/deepseek.py +++ b/src/pipecat/services/deepseek.py @@ -8,6 +8,7 @@ from typing import List from loguru import logger +from tenacity import retry, stop_after_attempt, wait_fixed from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.services.openai import OpenAILLMService @@ -53,6 +54,7 @@ class DeepSeekLLMService(OpenAILLMService): logger.debug(f"Creating DeepSeek client with api {base_url}") return super().create_client(api_key, base_url, **kwargs) + @retry(stop=stop_after_attempt(2), wait=wait_fixed(2)) async def get_chat_completions( self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam] ) -> AsyncStream[ChatCompletionChunk]: diff --git a/src/pipecat/services/fireworks.py b/src/pipecat/services/fireworks.py index 0e50fc980..696bef9c5 100644 --- a/src/pipecat/services/fireworks.py +++ b/src/pipecat/services/fireworks.py @@ -8,6 +8,7 @@ from typing import List from loguru import logger +from tenacity import retry, stop_after_attempt, wait_fixed from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.services.openai import OpenAILLMService @@ -50,6 +51,7 @@ class FireworksLLMService(OpenAILLMService): logger.debug(f"Creating Fireworks client with api {base_url}") return super().create_client(api_key, base_url, **kwargs) + @retry(stop=stop_after_attempt(2), wait=wait_fixed(2)) async def get_chat_completions( self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam] ): diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index 4077f4354..2ed508578 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -15,6 +15,7 @@ import httpx from loguru import logger from PIL import Image from pydantic import BaseModel, Field +from tenacity import retry, stop_after_attempt, wait_fixed from pipecat.frames.frames import ( ErrorFrame, @@ -160,6 +161,7 @@ class BaseOpenAILLMService(LLMService): def can_generate_metrics(self) -> bool: return True + @retry(stop=stop_after_attempt(2), wait=wait_fixed(2)) async def get_chat_completions( self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam] ) -> AsyncStream[ChatCompletionChunk]: diff --git a/src/pipecat/services/openpipe.py b/src/pipecat/services/openpipe.py index 98241eb50..e8ef1b5d8 100644 --- a/src/pipecat/services/openpipe.py +++ b/src/pipecat/services/openpipe.py @@ -7,6 +7,7 @@ from typing import Dict, List, Optional from loguru import logger +from tenacity import retry, stop_after_attempt, wait_fixed from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.services.openai import OpenAILLMService @@ -55,6 +56,7 @@ class OpenPipeLLMService(OpenAILLMService): ) return client + @retry(stop=stop_after_attempt(2), wait=wait_fixed(2)) async def get_chat_completions( self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam] ) -> AsyncStream[ChatCompletionChunk]: diff --git a/src/pipecat/services/perplexity.py b/src/pipecat/services/perplexity.py index 8152461c3..702623b51 100644 --- a/src/pipecat/services/perplexity.py +++ b/src/pipecat/services/perplexity.py @@ -7,6 +7,7 @@ from typing import List from loguru import logger +from tenacity import retry, stop_after_attempt, wait_fixed from pipecat.metrics.metrics import LLMTokenUsage from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext @@ -56,6 +57,7 @@ class PerplexityLLMService(OpenAILLMService): self._has_reported_prompt_tokens = False self._is_processing = False + @retry(stop=stop_after_attempt(2), wait=wait_fixed(2)) async def get_chat_completions( self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam] ) -> AsyncStream[ChatCompletionChunk]: