Merge pull request #2563 from pipecat-ai/pk/expand-universal-llm-context-support-to-more-llms

Expand universal `LLMContext` support to more LLMs
This commit is contained in:
kompfner
2025-09-03 11:20:26 -04:00
committed by GitHub
22 changed files with 62 additions and 185 deletions

View File

@@ -9,6 +9,28 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Expanded support for universal `LLMContext` to more LLM services. Using the
universal `LLMContext` and associated `LLMContextAggregatorPair` is a
pre-requisite for using `LLMSwitcher` to switch between LLMs at runtime.
Here are the newly-supported services:
- Azure
- Cerebras
- Deepseek
- Fireworks AI
- Google Vertex AI
- Grok
- Groq
- Mistral
- NVIDIA NIM
- Ollama
- OpenPipe
- OpenRouter
- Perplexity
- Qwen
- SambaNova
- Together.ai
- Added support for WhatsApp User-initiated Calls.
- Added new audio filter `AICFilter`, speech enhancement for improving VAD/STT

View File

@@ -146,3 +146,12 @@ SENTRY_DSN=...
# Heygen
HEYGEN_API_KEY=...
# Mistral
MISTRAL_API_KEY=...
# NVIDIA
NVIDIA_API_KEY=...
# Qwen
QWEN_API_KEY=...

View File

@@ -75,9 +75,13 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# sent to the same callback with an additional function_name parameter.
llm.register_function("get_current_weather", fetch_weather_from_api)
@llm.event_handler("on_function_calls_started")
async def on_function_calls_started(service, function_calls):
await tts.queue_frame(TTSSpeakFrame("Let me check on that."))
# Disabling for now, as it ends up tripping up the model in this example
# ("let me check on that" ends up at the end of the context, which the
# model erroneously treats as a nudge to call the tool again; the
# ensuing inference then yields wonky results).
# @llm.event_handler("on_function_calls_started")
# async def on_function_calls_started(service, function_calls):
# await tts.queue_frame(TTSSpeakFrame("Let me check on that."))
weather_function = FunctionSchema(
name="get_current_weather",
@@ -99,7 +103,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way. Start by saying hello.",
},
]

View File

@@ -72,9 +72,13 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# sent to the same callback with an additional function_name parameter.
llm.register_function("get_current_weather", fetch_weather_from_api)
@llm.event_handler("on_function_calls_started")
async def on_function_calls_started(service, function_calls):
await tts.queue_frame(TTSSpeakFrame("Let me check on that."))
# Disabling for now, as we end up in an infinite inference loop with the
# model in this example ("let me check on that" ends up at the end of the
# context, which the model erroneously treats as a nudge to call the tool
# again).
# @llm.event_handler("on_function_calls_started")
# async def on_function_calls_started(service, function_calls):
# await tts.queue_frame(TTSSpeakFrame("Let me check on that."))
weather_function = FunctionSchema(
name="get_current_weather",

View File

@@ -60,12 +60,3 @@ class AzureLLMService(OpenAILLMService):
azure_endpoint=self._endpoint,
api_version=self._api_version,
)
@property
def supports_universal_context(self) -> bool:
"""Check if this service supports universal LLMContext.
Returns:
False, as Azure service does yet not support universal LLMContext.
"""
return False

View File

@@ -81,12 +81,3 @@ class CerebrasLLMService(OpenAILLMService):
params.update(self._settings["extra"])
return params
@property
def supports_universal_context(self) -> bool:
"""Check if this service supports universal LLMContext.
Returns:
False, as Cerebras service does not yet support universal LLMContext.
"""
return False

View File

@@ -82,12 +82,3 @@ class DeepSeekLLMService(OpenAILLMService):
params.update(self._settings["extra"])
return params
@property
def supports_universal_context(self) -> bool:
"""Check if this service supports universal LLMContext.
Returns:
False, as DeepSeekLLMService does not yet support universal LLMContext.
"""
return False

View File

@@ -82,12 +82,3 @@ class FireworksLLMService(OpenAILLMService):
params.update(self._settings["extra"])
return params
@property
def supports_universal_context(self) -> bool:
"""Check if this service supports universal LLMContext.
Returns:
False, as FireworksLLMService does not yet support universal LLMContext.
"""
return False

View File

@@ -76,15 +76,6 @@ class GoogleLLMOpenAIBetaService(OpenAILLMService):
super().__init__(api_key=api_key, base_url=base_url, model=model, **kwargs)
@property
def supports_universal_context(self) -> bool:
"""Check if this service supports universal LLMContext.
Returns:
False, as GoogleLLMOpenAIBetaService does not yet support universal LLMContext.
"""
return False
async def _process_context(self, context: OpenAILLMContext):
functions_list = []
arguments_list = []

View File

@@ -139,12 +139,3 @@ class GoogleVertexLLMService(OpenAILLMService):
creds.refresh(Request()) # Ensure token is up-to-date, lifetime is 1 hour.
return creds.token
@property
def supports_universal_context(self) -> bool:
"""Check if this service supports universal LLMContext.
Returns:
False, as GoogleVertexLLMService does not yet support universal LLMContext.
"""
return False

View File

@@ -16,6 +16,7 @@ from dataclasses import dataclass
from loguru import logger
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response import (
LLMAssistantAggregatorParams,
LLMUserAggregatorParams,
@@ -107,7 +108,7 @@ class GrokLLMService(OpenAILLMService):
logger.debug(f"Creating Grok client with api {base_url}")
return super().create_client(api_key, base_url, **kwargs)
async def _process_context(self, context: OpenAILLMContext):
async def _process_context(self, context: OpenAILLMContext | LLMContext):
"""Process a context through the LLM and accumulate token usage metrics.
This method overrides the parent class implementation to handle Grok's
@@ -190,12 +191,3 @@ class GrokLLMService(OpenAILLMService):
user = OpenAIUserContextAggregator(context, params=user_params)
assistant = OpenAIAssistantContextAggregator(context, params=assistant_params)
return GrokContextAggregatorPair(_user=user, _assistant=assistant)
@property
def supports_universal_context(self) -> bool:
"""Check if this service supports universal LLMContext.
Returns:
False, as GrokLLMService does not yet support universal LLMContext.
"""
return False

View File

@@ -49,12 +49,3 @@ class GroqLLMService(OpenAILLMService):
"""
logger.debug(f"Creating Groq client with api {base_url}")
return super().create_client(api_key, base_url, **kwargs)
@property
def supports_universal_context(self) -> bool:
"""Check if this service supports universal LLMContext.
Returns:
False, as GroqLLMService does not yet support universal LLMContext.
"""
return False

View File

@@ -11,6 +11,7 @@ Microservice) API while maintaining compatibility with the OpenAI-style interfac
"""
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai.llm import OpenAILLMService
@@ -47,16 +48,7 @@ class NimLLMService(OpenAILLMService):
self._has_reported_prompt_tokens = False
self._is_processing = False
@property
def supports_universal_context(self) -> bool:
"""Check if this service supports universal LLMContext.
Returns:
False, as NimLLMService does not yet support universal LLMContext.
"""
return False
async def _process_context(self, context: OpenAILLMContext):
async def _process_context(self, context: OpenAILLMContext | LLMContext):
"""Process a context through the LLM and accumulate token usage metrics.
This method overrides the parent class implementation to handle NVIDIA's

View File

@@ -43,12 +43,3 @@ class OLLamaLLMService(OpenAILLMService):
"""
logger.debug(f"Creating Ollama client with api {base_url}")
return super().create_client(base_url=base_url, **kwargs)
@property
def supports_universal_context(self) -> bool:
"""Check if this service supports universal LLMContext.
Returns:
False, as OLLamaLLMService does not yet support universal LLMContext.
"""
return False

View File

@@ -419,18 +419,6 @@ class BaseOpenAILLMService(LLMService):
await self.run_function_calls(function_calls)
@property
def supports_universal_context(self) -> bool:
"""Check if this service supports universal LLMContext.
Returns:
Whether service supports universal LLMContext.
"""
# Return True in subclasses that support universal LLMContext
# This property lets us gradually roll out support for universal
# LLMContext to OpenAI-like services in a controlled manner.
return False
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames for LLM completion requests.
@@ -450,12 +438,7 @@ class BaseOpenAILLMService(LLMService):
context = frame.context
elif isinstance(frame, LLMContextFrame):
# Handle universal (LLM-agnostic) LLM context frames
if self.supports_universal_context:
context = frame.context
else:
raise NotImplementedError(
f"Universal LLMContext is not yet supported for {self.__class__.__name__}."
)
context = frame.context
elif isinstance(frame, LLMMessagesFrame):
# NOTE: LLMMessagesFrame is deprecated, so we don't support the newer universal
# LLMContext with it

View File

@@ -107,15 +107,6 @@ class OpenAILLMService(BaseOpenAILLMService):
assistant = OpenAIAssistantContextAggregator(context, params=assistant_params)
return OpenAIContextAggregatorPair(_user=user, _assistant=assistant)
@property
def supports_universal_context(self) -> bool:
"""Check if this service supports universal LLMContext.
Returns:
True, as OpenAI service supports universal LLMContext.
"""
return True
class OpenAIUserContextAggregator(LLMUserContextAggregator):
"""OpenAI-specific user context aggregator.

View File

@@ -108,12 +108,3 @@ class OpenPipeLLMService(OpenAILLMService):
}
return params
@property
def supports_universal_context(self) -> bool:
"""Check if this service supports universal LLMContext.
Returns:
False, as OpenPipeLLMService does not yet support universal LLMContext.
"""
return False

View File

@@ -61,12 +61,3 @@ class OpenRouterLLMService(OpenAILLMService):
"""
logger.debug(f"Creating OpenRouter client with api {base_url}")
return super().create_client(api_key, base_url, **kwargs)
@property
def supports_universal_context(self) -> bool:
"""Check if this service supports universal LLMContext.
Returns:
False, as OpenRouterLLMService does not yet support universal LLMContext.
"""
return False

View File

@@ -15,6 +15,7 @@ from openai import NOT_GIVEN
from pipecat.adapters.services.open_ai_adapter import OpenAILLMInvocationParams
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai.llm import OpenAILLMService
@@ -84,16 +85,7 @@ class PerplexityLLMService(OpenAILLMService):
return params
@property
def supports_universal_context(self) -> bool:
"""Check if this service supports universal LLMContext.
Returns:
False, as PerplexityLLMService does not yet support universal LLMContext.
"""
return False
async def _process_context(self, context: OpenAILLMContext):
async def _process_context(self, context: OpenAILLMContext | LLMContext):
"""Process a context through the LLM and accumulate token usage metrics.
This method overrides the parent class implementation to handle

View File

@@ -50,12 +50,3 @@ class QwenLLMService(OpenAILLMService):
"""
logger.debug(f"Creating Qwen client with base URL: {base_url}")
return super().create_client(api_key, base_url, **kwargs)
@property
def supports_universal_context(self) -> bool:
"""Check if this service supports universal LLMContext.
Returns:
False, as QwenLLMService does not yet support universal LLMContext.
"""
return False

View File

@@ -18,6 +18,7 @@ from pipecat.frames.frames import (
LLMTextFrame,
)
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.llm_service import FunctionCallFromLLM
from pipecat.services.openai.llm import OpenAILLMService
@@ -99,7 +100,9 @@ class SambaNovaLLMService(OpenAILLMService): # type: ignore
return params
@traced_llm # type: ignore
async def _process_context(self, context: OpenAILLMContext) -> AsyncStream[ChatCompletionChunk]:
async def _process_context(
self, context: OpenAILLMContext | LLMContext
) -> AsyncStream[ChatCompletionChunk]:
"""Process OpenAI LLM context and stream chat completion chunks.
This method handles the streaming response from SambaNova API, including
@@ -122,9 +125,11 @@ class SambaNovaLLMService(OpenAILLMService): # type: ignore
await self.start_ttfb_metrics()
chunk_stream: AsyncStream[
ChatCompletionChunk
] = await self._stream_chat_completions_specific_context(context)
chunk_stream = await (
self._stream_chat_completions_specific_context(context)
if isinstance(context, OpenAILLMContext)
else self._stream_chat_completions_universal_context(context)
)
async for chunk in chunk_stream:
if chunk.usage:
@@ -210,12 +215,3 @@ class SambaNovaLLMService(OpenAILLMService): # type: ignore
)
await self.run_function_calls(function_calls)
@property
def supports_universal_context(self) -> bool:
"""Check if this service supports universal LLMContext.
Returns:
False, as SambaNovaLLMService does not yet support universal LLMContext.
"""
return False

View File

@@ -49,12 +49,3 @@ class TogetherLLMService(OpenAILLMService):
"""
logger.debug(f"Creating Together.ai client with api {base_url}")
return super().create_client(api_key, base_url, **kwargs)
@property
def supports_universal_context(self) -> bool:
"""Check if this service supports universal LLMContext.
Returns:
False, as TogetherLLMService does not yet support universal LLMContext.
"""
return False