Compare commits
9 Commits
v0.0.80
...
hush/inter
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fdf0652141 | ||
|
|
237c400f2d | ||
|
|
b6afce2a92 | ||
|
|
d7f31e0cbd | ||
|
|
c662a2d820 | ||
|
|
b5465364fa | ||
|
|
8714c9137f | ||
|
|
4c029fcfa7 | ||
|
|
5c86f8e687 |
@@ -13,6 +13,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
Gemini model can be prompted to insert styled speech to control the TTS
|
||||
output.
|
||||
|
||||
- For `OpenAILLMService` and its subclasses, added the ability to retry
|
||||
executing a chat completion after a timeout period. The new args are
|
||||
`retry_timeout_secs` and `retry_on_timeout`. This feature is disabled by
|
||||
default.
|
||||
|
||||
- Added Exotel support to Pipecat's development runner. You can now connect
|
||||
using the runner with `uv run bot.py -t exotel` and an ngrok connection to
|
||||
HTTP port 7860.
|
||||
|
||||
@@ -5,16 +5,27 @@
|
||||
#
|
||||
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.frames.frames import (
|
||||
BotStartedSpeakingFrame,
|
||||
Frame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMTextFrame,
|
||||
TranscriptionFrame,
|
||||
TTSSpeakFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
@@ -49,6 +60,65 @@ transport_params = {
|
||||
}
|
||||
|
||||
|
||||
class TranscriptionLogger(FrameProcessor):
|
||||
"""Custom processor that logs transcription frames."""
|
||||
|
||||
async def process_frame(self, frame, direction):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
# Only log TranscriptionFrame objects
|
||||
if isinstance(frame, TranscriptionFrame):
|
||||
logger.info(f"[TRANSCRIPTION]: {frame.text}")
|
||||
|
||||
# Always pass the frame through to maintain pipeline flow
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
class InterventionProcessor(FrameProcessor):
|
||||
"""Custom processor that logs LLM response frames."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._timer_task = None
|
||||
|
||||
async def process_frame(self, frame, direction):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
# Log LLM response start frames
|
||||
if isinstance(frame, LLMFullResponseStartFrame):
|
||||
logger.info(f"[LLM_START]: Starting LLM response")
|
||||
|
||||
# Cancel any existing timer
|
||||
if self._timer_task and not self._timer_task.done():
|
||||
self._timer_task.cancel()
|
||||
|
||||
# Start a new 500ms timer
|
||||
self._timer_task = asyncio.create_task(self._log_after_delay())
|
||||
|
||||
# Cancel timer if bot started speaking before 500ms
|
||||
elif isinstance(frame, BotStartedSpeakingFrame):
|
||||
logger.info(f"[BOT_SPEAKING]: Bot started speaking, canceling intervention timer")
|
||||
if self._timer_task and not self._timer_task.done():
|
||||
self._timer_task.cancel()
|
||||
|
||||
# Log LLM text frames
|
||||
elif isinstance(frame, LLMTextFrame):
|
||||
logger.info(f"[LLM_TEXT]: {frame.text}")
|
||||
|
||||
# Always pass the frame through to maintain pipeline flow
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
async def _log_after_delay(self):
|
||||
"""Log a message after 500ms delay."""
|
||||
try:
|
||||
await asyncio.sleep(0.5) # 500ms
|
||||
logger.info(f"500ms passed since LLMFullResponseStartFrame")
|
||||
await self.queue_frame(TTSSpeakFrame("um..."))
|
||||
except asyncio.CancelledError:
|
||||
# Timer was cancelled, which is fine
|
||||
pass
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
@@ -71,13 +141,21 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
# Create transcription logger instance
|
||||
transcription_logger = TranscriptionLogger()
|
||||
|
||||
# Create LLM logger instance
|
||||
intervention = InterventionProcessor()
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
transcription_logger, # Log transcription frames
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
intervention, # Log LLM response frames
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
|
||||
@@ -137,7 +137,7 @@ You have access to the following tools:
|
||||
- get_current_weather: Get the current weather for a given location.
|
||||
- get_restaurant_recommendation: Get a restaurant recommendation for a given location.
|
||||
|
||||
Remember, your responses should be short. Just one or two sentences, usually.""",
|
||||
Remember, your responses should be short. Just one or two sentences, usually. Respond in English.""",
|
||||
)
|
||||
|
||||
llm = OpenAIRealtimeBetaLLMService(
|
||||
|
||||
@@ -133,7 +133,7 @@ You have access to the following tools:
|
||||
- get_current_weather: Get the current weather for a given location.
|
||||
- get_restaurant_recommendation: Get a restaurant recommendation for a given location.
|
||||
|
||||
Remember, your responses should be short. Just one or two sentences, usually.""",
|
||||
Remember, your responses should be short. Just one or two sentences, usually. Respond in English.""",
|
||||
)
|
||||
|
||||
llm = AzureRealtimeBetaLLMService(
|
||||
|
||||
@@ -139,7 +139,7 @@ You have access to the following tools:
|
||||
- get_current_weather: Get the current weather for a given location.
|
||||
- get_restaurant_recommendation: Get a restaurant recommendation for a given location.
|
||||
|
||||
Remember, your responses should be short. Just one or two sentences, usually.""",
|
||||
Remember, your responses should be short. Just one or two sentences, usually. Respond in English.""",
|
||||
)
|
||||
|
||||
llm = OpenAIRealtimeBetaLLMService(
|
||||
|
||||
@@ -9,8 +9,7 @@
|
||||
from typing import List
|
||||
|
||||
from loguru import logger
|
||||
from openai import AsyncStream
|
||||
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
|
||||
from openai.types.chat import ChatCompletionMessageParam
|
||||
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
@@ -55,20 +54,13 @@ class CerebrasLLMService(OpenAILLMService):
|
||||
logger.debug(f"Creating Cerebras client with api {base_url}")
|
||||
return super().create_client(api_key, base_url, **kwargs)
|
||||
|
||||
async def get_chat_completions(
|
||||
def build_chat_completion_params(
|
||||
self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]
|
||||
) -> AsyncStream[ChatCompletionChunk]:
|
||||
"""Create a streaming chat completion using Cerebras's API.
|
||||
) -> dict:
|
||||
"""Build parameters for Cerebras chat completion request.
|
||||
|
||||
Args:
|
||||
context: The context object containing tools configuration
|
||||
and other settings for the chat completion.
|
||||
messages: The list of messages comprising
|
||||
the conversation history and current request.
|
||||
|
||||
Returns:
|
||||
A streaming response of chat completion
|
||||
chunks that can be processed asynchronously.
|
||||
Cerebras supports a subset of OpenAI parameters, focusing on core
|
||||
completion settings without advanced features like frequency/presence penalties.
|
||||
"""
|
||||
params = {
|
||||
"model": self.model_name,
|
||||
@@ -83,6 +75,4 @@ class CerebrasLLMService(OpenAILLMService):
|
||||
}
|
||||
|
||||
params.update(self._settings["extra"])
|
||||
|
||||
chunks = await self._client.chat.completions.create(**params)
|
||||
return chunks
|
||||
return params
|
||||
|
||||
@@ -9,8 +9,7 @@
|
||||
from typing import List
|
||||
|
||||
from loguru import logger
|
||||
from openai import AsyncStream
|
||||
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
|
||||
from openai.types.chat import ChatCompletionMessageParam
|
||||
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
@@ -55,20 +54,12 @@ class DeepSeekLLMService(OpenAILLMService):
|
||||
logger.debug(f"Creating DeepSeek client with api {base_url}")
|
||||
return super().create_client(api_key, base_url, **kwargs)
|
||||
|
||||
async def get_chat_completions(
|
||||
def _build_chat_completion_params(
|
||||
self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]
|
||||
) -> AsyncStream[ChatCompletionChunk]:
|
||||
"""Create a streaming chat completion using DeepSeek's API.
|
||||
) -> dict:
|
||||
"""Build parameters for DeepSeek chat completion request.
|
||||
|
||||
Args:
|
||||
context: The context object containing tools configuration
|
||||
and other settings for the chat completion.
|
||||
messages: The list of messages comprising the conversation
|
||||
history and current request.
|
||||
|
||||
Returns:
|
||||
A streaming response of chat completion chunks that can be
|
||||
processed asynchronously.
|
||||
DeepSeek doesn't support some OpenAI parameters like seed and max_completion_tokens.
|
||||
"""
|
||||
params = {
|
||||
"model": self.model_name,
|
||||
@@ -85,6 +76,4 @@ class DeepSeekLLMService(OpenAILLMService):
|
||||
}
|
||||
|
||||
params.update(self._settings["extra"])
|
||||
|
||||
chunks = await self._client.chat.completions.create(**params)
|
||||
return chunks
|
||||
return params
|
||||
|
||||
@@ -54,20 +54,13 @@ class FireworksLLMService(OpenAILLMService):
|
||||
logger.debug(f"Creating Fireworks client with api {base_url}")
|
||||
return super().create_client(api_key, base_url, **kwargs)
|
||||
|
||||
async def get_chat_completions(
|
||||
def build_chat_completion_params(
|
||||
self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]
|
||||
):
|
||||
"""Get chat completions from Fireworks API.
|
||||
) -> dict:
|
||||
"""Build parameters for Fireworks chat completion request.
|
||||
|
||||
Removes OpenAI-specific parameters not supported by Fireworks and
|
||||
configures the request with Fireworks-compatible settings.
|
||||
|
||||
Args:
|
||||
context: The OpenAI LLM context containing tools and settings.
|
||||
messages: List of chat completion message parameters.
|
||||
|
||||
Returns:
|
||||
Async generator yielding chat completion chunks from Fireworks API.
|
||||
Fireworks doesn't support some OpenAI parameters like seed, max_completion_tokens,
|
||||
and stream_options.
|
||||
"""
|
||||
params = {
|
||||
"model": self.model_name,
|
||||
@@ -83,6 +76,4 @@ class FireworksLLMService(OpenAILLMService):
|
||||
}
|
||||
|
||||
params.update(self._settings["extra"])
|
||||
|
||||
chunks = await self._client.chat.completions.create(**params)
|
||||
return chunks
|
||||
return params
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
|
||||
"""Base OpenAI LLM service implementation."""
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import json
|
||||
from typing import Any, Dict, List, Mapping, Optional
|
||||
@@ -14,6 +15,7 @@ import httpx
|
||||
from loguru import logger
|
||||
from openai import (
|
||||
NOT_GIVEN,
|
||||
APITimeoutError,
|
||||
AsyncOpenAI,
|
||||
AsyncStream,
|
||||
DefaultAsyncHttpxClient,
|
||||
@@ -91,6 +93,8 @@ class BaseOpenAILLMService(LLMService):
|
||||
project=None,
|
||||
default_headers: Optional[Mapping[str, str]] = None,
|
||||
params: Optional[InputParams] = None,
|
||||
retry_timeout_secs: Optional[float] = 5.0,
|
||||
retry_on_timeout: Optional[bool] = False,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the BaseOpenAILLMService.
|
||||
@@ -103,6 +107,8 @@ class BaseOpenAILLMService(LLMService):
|
||||
project: OpenAI project ID.
|
||||
default_headers: Additional HTTP headers to include in requests.
|
||||
params: Input parameters for model configuration and behavior.
|
||||
retry_timeout_secs: Request timeout in seconds. Defaults to 5.0 seconds.
|
||||
retry_on_timeout: Whether to retry the request once if it times out.
|
||||
**kwargs: Additional arguments passed to the parent LLMService.
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
@@ -119,6 +125,8 @@ class BaseOpenAILLMService(LLMService):
|
||||
"max_completion_tokens": params.max_completion_tokens,
|
||||
"extra": params.extra if isinstance(params.extra, dict) else {},
|
||||
}
|
||||
self._retry_timeout_secs = retry_timeout_secs
|
||||
self._retry_on_timeout = retry_on_timeout
|
||||
self.set_model_name(model)
|
||||
self._client = self.create_client(
|
||||
api_key=api_key,
|
||||
@@ -175,7 +183,7 @@ class BaseOpenAILLMService(LLMService):
|
||||
async def get_chat_completions(
|
||||
self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]
|
||||
) -> AsyncStream[ChatCompletionChunk]:
|
||||
"""Get streaming chat completions from OpenAI API.
|
||||
"""Get streaming chat completions from OpenAI API with optional timeout and retry.
|
||||
|
||||
Args:
|
||||
context: The LLM context containing tools and configuration.
|
||||
@@ -184,6 +192,37 @@ class BaseOpenAILLMService(LLMService):
|
||||
Returns:
|
||||
Async stream of chat completion chunks.
|
||||
"""
|
||||
params = self.build_chat_completion_params(context, messages)
|
||||
|
||||
if self._retry_on_timeout:
|
||||
try:
|
||||
chunks = await asyncio.wait_for(
|
||||
self._client.chat.completions.create(**params), timeout=self._retry_timeout_secs
|
||||
)
|
||||
return chunks
|
||||
except (APITimeoutError, asyncio.TimeoutError):
|
||||
# Retry, this time without a timeout so we get a response
|
||||
logger.debug(f"{self}: Retrying chat completion due to timeout")
|
||||
chunks = await self._client.chat.completions.create(**params)
|
||||
return chunks
|
||||
else:
|
||||
chunks = await self._client.chat.completions.create(**params)
|
||||
return chunks
|
||||
|
||||
def build_chat_completion_params(
|
||||
self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]
|
||||
) -> dict:
|
||||
"""Build parameters for chat completion request.
|
||||
|
||||
Subclasses can override this to customize parameters for different providers.
|
||||
|
||||
Args:
|
||||
context: The LLM context containing tools and configuration.
|
||||
messages: List of chat completion messages to send.
|
||||
|
||||
Returns:
|
||||
Dictionary of parameters for the chat completion request.
|
||||
"""
|
||||
params = {
|
||||
"model": self.model_name,
|
||||
"stream": True,
|
||||
@@ -201,9 +240,7 @@ class BaseOpenAILLMService(LLMService):
|
||||
}
|
||||
|
||||
params.update(self._settings["extra"])
|
||||
|
||||
chunks = await self._client.chat.completions.create(**params)
|
||||
return chunks
|
||||
return params
|
||||
|
||||
async def _stream_chat_completions(
|
||||
self, context: OpenAILLMContext
|
||||
|
||||
@@ -13,14 +13,13 @@ enabling integration with OpenPipe's fine-tuning and monitoring capabilities.
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
from loguru import logger
|
||||
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
|
||||
from openai.types.chat import ChatCompletionMessageParam
|
||||
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
|
||||
try:
|
||||
from openpipe import AsyncOpenAI as OpenPipeAI
|
||||
from openpipe import AsyncStream
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error("In order to use OpenPipe, you need to `pip install pipecat-ai[openpipe]`.")
|
||||
@@ -87,22 +86,27 @@ class OpenPipeLLMService(OpenAILLMService):
|
||||
)
|
||||
return client
|
||||
|
||||
async def get_chat_completions(
|
||||
def build_chat_completion_params(
|
||||
self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]
|
||||
) -> AsyncStream[ChatCompletionChunk]:
|
||||
"""Generate streaming chat completions with OpenPipe logging.
|
||||
) -> dict:
|
||||
"""Build parameters for OpenPipe chat completion request.
|
||||
|
||||
Adds OpenPipe-specific logging and tagging parameters.
|
||||
|
||||
Args:
|
||||
context: The OpenAI LLM context containing conversation state.
|
||||
messages: List of chat completion message parameters.
|
||||
context: The LLM context containing tools and configuration.
|
||||
messages: List of chat completion messages to send.
|
||||
|
||||
Returns:
|
||||
Async stream of chat completion chunks.
|
||||
Dictionary of parameters for the chat completion request.
|
||||
"""
|
||||
chunks = await self._client.chat.completions.create(
|
||||
model=self.model_name,
|
||||
stream=True,
|
||||
messages=messages,
|
||||
openpipe={"tags": self._tags, "log_request": True},
|
||||
)
|
||||
return chunks
|
||||
# Start with base parameters
|
||||
params = super().build_chat_completion_params(context, messages)
|
||||
|
||||
# Add OpenPipe-specific parameters
|
||||
params["openpipe"] = {
|
||||
"tags": self._tags,
|
||||
"log_request": True,
|
||||
}
|
||||
|
||||
return params
|
||||
|
||||
@@ -13,8 +13,8 @@ reporting patterns while maintaining compatibility with the Pipecat framework.
|
||||
|
||||
from typing import List
|
||||
|
||||
from openai import NOT_GIVEN, AsyncStream
|
||||
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
|
||||
from openai import NOT_GIVEN
|
||||
from openai.types.chat import ChatCompletionMessageParam
|
||||
|
||||
from pipecat.metrics.metrics import LLMTokenUsage
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
@@ -53,17 +53,12 @@ class PerplexityLLMService(OpenAILLMService):
|
||||
self._has_reported_prompt_tokens = False
|
||||
self._is_processing = False
|
||||
|
||||
async def get_chat_completions(
|
||||
def build_chat_completion_params(
|
||||
self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]
|
||||
) -> AsyncStream[ChatCompletionChunk]:
|
||||
"""Get chat completions from Perplexity API using OpenAI-compatible parameters.
|
||||
) -> dict:
|
||||
"""Build parameters for Perplexity chat completion request.
|
||||
|
||||
Args:
|
||||
context: The context containing conversation history and settings.
|
||||
messages: The messages to send to the API.
|
||||
|
||||
Returns:
|
||||
A stream of chat completion chunks from the Perplexity API.
|
||||
Perplexity uses a subset of OpenAI parameters and doesn't support tools.
|
||||
"""
|
||||
params = {
|
||||
"model": self.model_name,
|
||||
@@ -83,8 +78,7 @@ class PerplexityLLMService(OpenAILLMService):
|
||||
if self._settings["max_tokens"] is not NOT_GIVEN:
|
||||
params["max_tokens"] = self._settings["max_tokens"]
|
||||
|
||||
chunks = await self._client.chat.completions.create(**params)
|
||||
return chunks
|
||||
return params
|
||||
|
||||
async def _process_context(self, context: OpenAILLMContext):
|
||||
"""Process a context through the LLM and accumulate token usage metrics.
|
||||
|
||||
@@ -68,17 +68,20 @@ class SambaNovaLLMService(OpenAILLMService): # type: ignore
|
||||
logger.debug(f"Creating SambaNova client with API {base_url}")
|
||||
return super().create_client(api_key, base_url, **kwargs)
|
||||
|
||||
async def get_chat_completions(
|
||||
def build_chat_completion_params(
|
||||
self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam]
|
||||
) -> Any:
|
||||
"""Get chat completions from SambaNova API endpoint.
|
||||
) -> dict:
|
||||
"""Build parameters for SambaNova chat completion request.
|
||||
|
||||
SambaNova doesn't support some OpenAI parameters like frequency_penalty,
|
||||
presence_penalty, and seed.
|
||||
|
||||
Args:
|
||||
context: OpenAI LLM context containing tools and configuration.
|
||||
messages: List of chat completion message parameters.
|
||||
context: The LLM context containing tools and configuration.
|
||||
messages: List of chat completion messages to send.
|
||||
|
||||
Returns:
|
||||
Chat completion response stream from SambaNova API.
|
||||
Dictionary of parameters for the chat completion request.
|
||||
"""
|
||||
params = {
|
||||
"model": self.model_name,
|
||||
@@ -94,9 +97,7 @@ class SambaNovaLLMService(OpenAILLMService): # type: ignore
|
||||
}
|
||||
|
||||
params.update(self._settings["extra"])
|
||||
|
||||
chunks = await self._client.chat.completions.create(**params)
|
||||
return chunks
|
||||
return params
|
||||
|
||||
@traced_llm # type: ignore
|
||||
async def _process_context(self, context: OpenAILLMContext) -> AsyncStream[ChatCompletionChunk]:
|
||||
|
||||
Reference in New Issue
Block a user