Compare commits

...

9 Commits

Author SHA1 Message Date
James Hush
fdf0652141 Remove vad 2025-08-14 11:17:22 +08:00
James Hush
237c400f2d Remove vad 2025-08-14 11:16:54 +08:00
James Hush
b6afce2a92 intervention processor 2025-08-14 11:15:55 +08:00
Mark Backman
d7f31e0cbd Merge pull request #2387 from pipecat-ai/mb/retry-chat-completion
Retry chat completions for OpenAILLMService and its subclasses
2025-08-13 14:39:40 -07:00
Mark Backman
c662a2d820 Merge pull request #2437 from pipecat-ai/mb/19-english
Foundational 19: Respond in English
2025-08-13 11:57:24 -07:00
Mark Backman
b5465364fa Foundational 19: Respond in English 2025-08-13 12:37:13 -04:00
Mark Backman
8714c9137f Code review fixes 2025-08-12 17:49:13 -04:00
Mark Backman
4c029fcfa7 Update OpenAILLMService subclasses to use the new build_chat_completion_params function 2025-08-12 17:48:51 -04:00
Mark Backman
5c86f8e687 Add timeout/retry logic and refactor parameter building in BaseOpenAILLMService
- Add timeout (default 5.0s) and retry_on_timeout parameters to constructor
- Implement timeout/retry logic in get_chat_completions using asyncio.wait_for
- Extract build_chat_completion_params() as public method for subclass customization
2025-08-12 17:48:51 -04:00
12 changed files with 182 additions and 93 deletions

View File

@@ -13,6 +13,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
Gemini model can be prompted to insert styled speech to control the TTS
output.
- For `OpenAILLMService` and its subclasses, added the ability to retry
executing a chat completion after a timeout period. The new args are
`retry_timeout_secs` and `retry_on_timeout`. This feature is disabled by
default.
- Added Exotel support to Pipecat's development runner. You can now connect
using the runner with `uv run bot.py -t exotel` and an ngrok connection to
HTTP port 7860.

View File

@@ -5,16 +5,27 @@
#
import asyncio
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
Frame,
LLMFullResponseStartFrame,
LLMTextFrame,
TranscriptionFrame,
TTSSpeakFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.deepgram.stt import DeepgramSTTService
@@ -49,6 +60,65 @@ transport_params = {
}
class TranscriptionLogger(FrameProcessor):
"""Custom processor that logs transcription frames."""
async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
# Only log TranscriptionFrame objects
if isinstance(frame, TranscriptionFrame):
logger.info(f"[TRANSCRIPTION]: {frame.text}")
# Always pass the frame through to maintain pipeline flow
await self.push_frame(frame, direction)
class InterventionProcessor(FrameProcessor):
"""Custom processor that logs LLM response frames."""
def __init__(self):
super().__init__()
self._timer_task = None
async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
# Log LLM response start frames
if isinstance(frame, LLMFullResponseStartFrame):
logger.info(f"[LLM_START]: Starting LLM response")
# Cancel any existing timer
if self._timer_task and not self._timer_task.done():
self._timer_task.cancel()
# Start a new 500ms timer
self._timer_task = asyncio.create_task(self._log_after_delay())
# Cancel timer if bot started speaking before 500ms
elif isinstance(frame, BotStartedSpeakingFrame):
logger.info(f"[BOT_SPEAKING]: Bot started speaking, canceling intervention timer")
if self._timer_task and not self._timer_task.done():
self._timer_task.cancel()
# Log LLM text frames
elif isinstance(frame, LLMTextFrame):
logger.info(f"[LLM_TEXT]: {frame.text}")
# Always pass the frame through to maintain pipeline flow
await self.push_frame(frame, direction)
async def _log_after_delay(self):
"""Log a message after 500ms delay."""
try:
await asyncio.sleep(0.5) # 500ms
logger.info(f"500ms passed since LLMFullResponseStartFrame")
await self.queue_frame(TTSSpeakFrame("um..."))
except asyncio.CancelledError:
# Timer was cancelled, which is fine
pass
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
@@ -71,13 +141,21 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
# Create transcription logger instance
transcription_logger = TranscriptionLogger()
# Create LLM logger instance
intervention = InterventionProcessor()
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
transcription_logger, # Log transcription frames
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
intervention, # Log LLM response frames
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]

View File

@@ -137,7 +137,7 @@ You have access to the following tools:
- get_current_weather: Get the current weather for a given location.
- get_restaurant_recommendation: Get a restaurant recommendation for a given location.
Remember, your responses should be short. Just one or two sentences, usually.""",
Remember, your responses should be short. Just one or two sentences, usually. Respond in English.""",
)
llm = OpenAIRealtimeBetaLLMService(

View File

@@ -133,7 +133,7 @@ You have access to the following tools:
- get_current_weather: Get the current weather for a given location.
- get_restaurant_recommendation: Get a restaurant recommendation for a given location.
Remember, your responses should be short. Just one or two sentences, usually.""",
Remember, your responses should be short. Just one or two sentences, usually. Respond in English.""",
)
llm = AzureRealtimeBetaLLMService(

View File

@@ -139,7 +139,7 @@ You have access to the following tools:
- get_current_weather: Get the current weather for a given location.
- get_restaurant_recommendation: Get a restaurant recommendation for a given location.
Remember, your responses should be short. Just one or two sentences, usually.""",
Remember, your responses should be short. Just one or two sentences, usually. Respond in English.""",
)
llm = OpenAIRealtimeBetaLLMService(

View File

@@ -9,8 +9,7 @@
from typing import List
from loguru import logger
from openai import AsyncStream
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
from openai.types.chat import ChatCompletionMessageParam
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai.llm import OpenAILLMService
@@ -55,20 +54,13 @@ class CerebrasLLMService(OpenAILLMService):
logger.debug(f"Creating Cerebras client with api {base_url}")
return super().create_client(api_key, base_url, **kwargs)
async def get_chat_completions(
def build_chat_completion_params(
self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]
) -> AsyncStream[ChatCompletionChunk]:
"""Create a streaming chat completion using Cerebras's API.
) -> dict:
"""Build parameters for Cerebras chat completion request.
Args:
context: The context object containing tools configuration
and other settings for the chat completion.
messages: The list of messages comprising
the conversation history and current request.
Returns:
A streaming response of chat completion
chunks that can be processed asynchronously.
Cerebras supports a subset of OpenAI parameters, focusing on core
completion settings without advanced features like frequency/presence penalties.
"""
params = {
"model": self.model_name,
@@ -83,6 +75,4 @@ class CerebrasLLMService(OpenAILLMService):
}
params.update(self._settings["extra"])
chunks = await self._client.chat.completions.create(**params)
return chunks
return params

View File

@@ -9,8 +9,7 @@
from typing import List
from loguru import logger
from openai import AsyncStream
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
from openai.types.chat import ChatCompletionMessageParam
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai.llm import OpenAILLMService
@@ -55,20 +54,12 @@ class DeepSeekLLMService(OpenAILLMService):
logger.debug(f"Creating DeepSeek client with api {base_url}")
return super().create_client(api_key, base_url, **kwargs)
async def get_chat_completions(
def _build_chat_completion_params(
self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]
) -> AsyncStream[ChatCompletionChunk]:
"""Create a streaming chat completion using DeepSeek's API.
) -> dict:
"""Build parameters for DeepSeek chat completion request.
Args:
context: The context object containing tools configuration
and other settings for the chat completion.
messages: The list of messages comprising the conversation
history and current request.
Returns:
A streaming response of chat completion chunks that can be
processed asynchronously.
DeepSeek doesn't support some OpenAI parameters like seed and max_completion_tokens.
"""
params = {
"model": self.model_name,
@@ -85,6 +76,4 @@ class DeepSeekLLMService(OpenAILLMService):
}
params.update(self._settings["extra"])
chunks = await self._client.chat.completions.create(**params)
return chunks
return params

View File

@@ -54,20 +54,13 @@ class FireworksLLMService(OpenAILLMService):
logger.debug(f"Creating Fireworks client with api {base_url}")
return super().create_client(api_key, base_url, **kwargs)
async def get_chat_completions(
def build_chat_completion_params(
self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]
):
"""Get chat completions from Fireworks API.
) -> dict:
"""Build parameters for Fireworks chat completion request.
Removes OpenAI-specific parameters not supported by Fireworks and
configures the request with Fireworks-compatible settings.
Args:
context: The OpenAI LLM context containing tools and settings.
messages: List of chat completion message parameters.
Returns:
Async generator yielding chat completion chunks from Fireworks API.
Fireworks doesn't support some OpenAI parameters like seed, max_completion_tokens,
and stream_options.
"""
params = {
"model": self.model_name,
@@ -83,6 +76,4 @@ class FireworksLLMService(OpenAILLMService):
}
params.update(self._settings["extra"])
chunks = await self._client.chat.completions.create(**params)
return chunks
return params

View File

@@ -6,6 +6,7 @@
"""Base OpenAI LLM service implementation."""
import asyncio
import base64
import json
from typing import Any, Dict, List, Mapping, Optional
@@ -14,6 +15,7 @@ import httpx
from loguru import logger
from openai import (
NOT_GIVEN,
APITimeoutError,
AsyncOpenAI,
AsyncStream,
DefaultAsyncHttpxClient,
@@ -91,6 +93,8 @@ class BaseOpenAILLMService(LLMService):
project=None,
default_headers: Optional[Mapping[str, str]] = None,
params: Optional[InputParams] = None,
retry_timeout_secs: Optional[float] = 5.0,
retry_on_timeout: Optional[bool] = False,
**kwargs,
):
"""Initialize the BaseOpenAILLMService.
@@ -103,6 +107,8 @@ class BaseOpenAILLMService(LLMService):
project: OpenAI project ID.
default_headers: Additional HTTP headers to include in requests.
params: Input parameters for model configuration and behavior.
retry_timeout_secs: Request timeout in seconds. Defaults to 5.0 seconds.
retry_on_timeout: Whether to retry the request once if it times out.
**kwargs: Additional arguments passed to the parent LLMService.
"""
super().__init__(**kwargs)
@@ -119,6 +125,8 @@ class BaseOpenAILLMService(LLMService):
"max_completion_tokens": params.max_completion_tokens,
"extra": params.extra if isinstance(params.extra, dict) else {},
}
self._retry_timeout_secs = retry_timeout_secs
self._retry_on_timeout = retry_on_timeout
self.set_model_name(model)
self._client = self.create_client(
api_key=api_key,
@@ -175,7 +183,7 @@ class BaseOpenAILLMService(LLMService):
async def get_chat_completions(
self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]
) -> AsyncStream[ChatCompletionChunk]:
"""Get streaming chat completions from OpenAI API.
"""Get streaming chat completions from OpenAI API with optional timeout and retry.
Args:
context: The LLM context containing tools and configuration.
@@ -184,6 +192,37 @@ class BaseOpenAILLMService(LLMService):
Returns:
Async stream of chat completion chunks.
"""
params = self.build_chat_completion_params(context, messages)
if self._retry_on_timeout:
try:
chunks = await asyncio.wait_for(
self._client.chat.completions.create(**params), timeout=self._retry_timeout_secs
)
return chunks
except (APITimeoutError, asyncio.TimeoutError):
# Retry, this time without a timeout so we get a response
logger.debug(f"{self}: Retrying chat completion due to timeout")
chunks = await self._client.chat.completions.create(**params)
return chunks
else:
chunks = await self._client.chat.completions.create(**params)
return chunks
def build_chat_completion_params(
self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]
) -> dict:
"""Build parameters for chat completion request.
Subclasses can override this to customize parameters for different providers.
Args:
context: The LLM context containing tools and configuration.
messages: List of chat completion messages to send.
Returns:
Dictionary of parameters for the chat completion request.
"""
params = {
"model": self.model_name,
"stream": True,
@@ -201,9 +240,7 @@ class BaseOpenAILLMService(LLMService):
}
params.update(self._settings["extra"])
chunks = await self._client.chat.completions.create(**params)
return chunks
return params
async def _stream_chat_completions(
self, context: OpenAILLMContext

View File

@@ -13,14 +13,13 @@ enabling integration with OpenPipe's fine-tuning and monitoring capabilities.
from typing import Dict, List, Optional
from loguru import logger
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
from openai.types.chat import ChatCompletionMessageParam
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai.llm import OpenAILLMService
try:
from openpipe import AsyncOpenAI as OpenPipeAI
from openpipe import AsyncStream
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use OpenPipe, you need to `pip install pipecat-ai[openpipe]`.")
@@ -87,22 +86,27 @@ class OpenPipeLLMService(OpenAILLMService):
)
return client
async def get_chat_completions(
def build_chat_completion_params(
self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]
) -> AsyncStream[ChatCompletionChunk]:
"""Generate streaming chat completions with OpenPipe logging.
) -> dict:
"""Build parameters for OpenPipe chat completion request.
Adds OpenPipe-specific logging and tagging parameters.
Args:
context: The OpenAI LLM context containing conversation state.
messages: List of chat completion message parameters.
context: The LLM context containing tools and configuration.
messages: List of chat completion messages to send.
Returns:
Async stream of chat completion chunks.
Dictionary of parameters for the chat completion request.
"""
chunks = await self._client.chat.completions.create(
model=self.model_name,
stream=True,
messages=messages,
openpipe={"tags": self._tags, "log_request": True},
)
return chunks
# Start with base parameters
params = super().build_chat_completion_params(context, messages)
# Add OpenPipe-specific parameters
params["openpipe"] = {
"tags": self._tags,
"log_request": True,
}
return params

View File

@@ -13,8 +13,8 @@ reporting patterns while maintaining compatibility with the Pipecat framework.
from typing import List
from openai import NOT_GIVEN, AsyncStream
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
from openai import NOT_GIVEN
from openai.types.chat import ChatCompletionMessageParam
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
@@ -53,17 +53,12 @@ class PerplexityLLMService(OpenAILLMService):
self._has_reported_prompt_tokens = False
self._is_processing = False
async def get_chat_completions(
def build_chat_completion_params(
self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]
) -> AsyncStream[ChatCompletionChunk]:
"""Get chat completions from Perplexity API using OpenAI-compatible parameters.
) -> dict:
"""Build parameters for Perplexity chat completion request.
Args:
context: The context containing conversation history and settings.
messages: The messages to send to the API.
Returns:
A stream of chat completion chunks from the Perplexity API.
Perplexity uses a subset of OpenAI parameters and doesn't support tools.
"""
params = {
"model": self.model_name,
@@ -83,8 +78,7 @@ class PerplexityLLMService(OpenAILLMService):
if self._settings["max_tokens"] is not NOT_GIVEN:
params["max_tokens"] = self._settings["max_tokens"]
chunks = await self._client.chat.completions.create(**params)
return chunks
return params
async def _process_context(self, context: OpenAILLMContext):
"""Process a context through the LLM and accumulate token usage metrics.

View File

@@ -68,17 +68,20 @@ class SambaNovaLLMService(OpenAILLMService): # type: ignore
logger.debug(f"Creating SambaNova client with API {base_url}")
return super().create_client(api_key, base_url, **kwargs)
async def get_chat_completions(
def build_chat_completion_params(
self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]
) -> Any:
"""Get chat completions from SambaNova API endpoint.
) -> dict:
"""Build parameters for SambaNova chat completion request.
SambaNova doesn't support some OpenAI parameters like frequency_penalty,
presence_penalty, and seed.
Args:
context: OpenAI LLM context containing tools and configuration.
messages: List of chat completion message parameters.
context: The LLM context containing tools and configuration.
messages: List of chat completion messages to send.
Returns:
Chat completion response stream from SambaNova API.
Dictionary of parameters for the chat completion request.
"""
params = {
"model": self.model_name,
@@ -94,9 +97,7 @@ class SambaNovaLLMService(OpenAILLMService): # type: ignore
}
params.update(self._settings["extra"])
chunks = await self._client.chat.completions.create(**params)
return chunks
return params
@traced_llm # type: ignore
async def _process_context(self, context: OpenAILLMContext) -> AsyncStream[ChatCompletionChunk]: