Convert LLM generate_summary() methods to the more generic run_inference()

This commit is contained in:
Paul Kompfner
2025-08-25 12:20:16 -04:00
parent a0a2bb3aa4
commit 43f1b59b86
6 changed files with 120 additions and 147 deletions

View File

@@ -50,19 +50,24 @@ class LLMSwitcher(ParallelPipeline, Generic[StrategyType]):
self.llms = llms
self.strategy = strategy
async def generate_summary(self, summary_prompt: str, context: LLMContext) -> Optional[str]:
"""Generate a conversation summary from the given LLM context, using the currently active LLM.
async def run_inference(
self, context: LLMContext, system_instruction: Optional[str] = None
) -> Optional[str]:
"""Run a one-shot, out-of-band (i.e. out-of-pipeline) inference with the given LLM context, using the currently active LLM.
Args:
summary_prompt: The prompt to use to guide generating the summary.
context: The LLM context containing conversation history.
system_instruction: Optional system instruction to guide the LLM's
behavior. You could also (again, optionally) provide a system
instruction directly in the context. If both are provided, the
one in the context takes precedence.
Returns:
The generated summary, or None if generation failed.
The LLM's response as a string, or None if no response is generated.
"""
if self.strategy.active_llm:
return await self.strategy.active_llm.generate_summary(
summary_prompt=summary_prompt, context=context
return await self.strategy.active_llm.run_inference(
context=context, system_instruction=system_instruction
)
return None

View File

@@ -199,55 +199,45 @@ class AnthropicLLMService(LLMService):
response = await api_call(**params)
return response
async def generate_summary(
self, summary_prompt: str, context: LLMContext | OpenAILLMContext
async def run_inference(
self, context: LLMContext | OpenAILLMContext, system_instruction: Optional[str] = None
) -> Optional[str]:
"""Generate a conversation summary from the given LLM context.
"""Run a one-shot, out-of-band (i.e. out-of-pipeline) inference with the given LLM context.
Args:
summary_prompt: The prompt to use to guide generating the summary.
context: The LLM context containing conversation history.
system_instruction: Optional system instruction to guide the LLM's
behavior. You could also (again, optionally) provide a system
instruction directly in the context. If both are provided, the
one in the context takes precedence.
Returns:
The generated summary, or None if generation failed.
The LLM's response as a string, or None if no response is generated.
"""
try:
if isinstance(context, LLMContext):
# Not sure if it's strictly necessary to adapt messages here
# since they'll just be a string in the prompt, but erring on
# the side of putting them in the format the LLM would expect
# if consuming them directly (i.e. assuming greater LLM
# familiarity with its own format).
# adapter = self.get_llm_adapter()
# params: AnthropicLLMInvocationParams = adapter.get_llm_invocation_params(context)
# messages = params["messages"]
raise NotImplementedError(
"Universal LLMContext is not yet supported for Anthropic."
)
else:
messages = context.messages
messages = []
system = []
if isinstance(context, LLMContext):
# Future code will be something like this:
# adapter = self.get_llm_adapter()
# params: AnthropicLLMInvocationParams = adapter.get_llm_invocation_params(context)
# messages = params["messages"]
# system = params["system_instruction"]
raise NotImplementedError("Universal LLMContext is not yet supported for Anthropic.")
else:
context = AnthropicLLMContext.upgrade_to_anthropic(context)
messages = context.messages
system = getattr(context, "system", None) or system_instruction
prompt_messages = [
{
"role": "user",
"content": f"Conversation history: {messages}",
},
]
# LLM completion
response = await self._client.messages.create(
model=self.model_name,
messages=messages,
system=system,
max_tokens=8192,
stream=False,
)
# LLM completion
response = await self._client.messages.create(
model=self.model_name,
messages=prompt_messages,
system=summary_prompt,
max_tokens=8192,
stream=False,
)
return response.content[0].text
except Exception as e:
logger.error(f"Anthropic summary generation failed: {e}", exc_info=True)
return None
return response.content[0].text
@property
def enable_prompt_caching_beta(self) -> bool:

View File

@@ -791,33 +791,37 @@ class AWSBedrockLLMService(LLMService):
"""
return True
async def generate_summary(
self, summary_prompt: str, context: LLMContext | OpenAILLMContext
async def run_inference(
self, context: LLMContext | OpenAILLMContext, system_instruction: Optional[str] = None
) -> Optional[str]:
"""Generate a conversation summary from the given LLM context.
"""Run a one-shot, out-of-band (i.e. out-of-pipeline) inference with the given LLM context.
Args:
summary_prompt: The prompt to use to guide generating the summary.
context: The LLM context containing conversation history.
system_instruction: Optional system instruction to guide the LLM's
behavior. You could also (again, optionally) provide a system
instruction directly in the context. If both are provided, the
one in the context takes precedence.
Returns:
The generated summary, or None if generation failed.
The LLM's response as a string, or None if no response is generated.
"""
try:
messages = []
system = []
if isinstance(context, LLMContext):
# Not sure if it's strictly necessary to adapt messages here
# since they'll just be a string in the prompt, but erring on
# the side of putting them in the format the LLM would expect
# if consuming them directly (i.e. assuming greater LLM
# familiarity with its own format).
# Future code will be something like this:
# adapter = self.get_llm_adapter()
# params: AWSBedrockLLMInvocationParams = adapter.get_llm_invocation_params(context)
# messages = params["messages"]
# system = params["system_instruction"]
raise NotImplementedError(
"Universal LLMContext is not yet supported for AWS Bedrock."
)
else:
context = AWSBedrockLLMContext.upgrade_to_bedrock(context)
messages = context.messages
system = getattr(context, "system", None) or system_instruction
# Determine if we're using Claude or Nova based on model ID
model_id = self.model_name
@@ -825,12 +829,7 @@ class AWSBedrockLLMService(LLMService):
# Prepare request parameters
request_params = {
"modelId": model_id,
"messages": [
{
"role": "user",
"content": [{"text": f"Conversation history: {messages}"}],
},
],
"messages": messages,
"inferenceConfig": {
"maxTokens": 8192,
"temperature": 0.7,
@@ -838,7 +837,8 @@ class AWSBedrockLLMService(LLMService):
},
}
request_params["system"] = [{"text": summary_prompt}]
if system:
request_params["system"] = [{"text": system}]
async with self._aws_session.client(
service_name="bedrock-runtime", **self._aws_params

View File

@@ -733,57 +733,49 @@ class GoogleLLMService(LLMService):
def _create_client(self, api_key: str, http_options: Optional[HttpOptions] = None):
self._client = genai.Client(api_key=api_key, http_options=http_options)
async def generate_summary(
self, summary_prompt: str, context: LLMContext | OpenAILLMContext
async def run_inference(
self, context: LLMContext | OpenAILLMContext, system_instruction: Optional[str] = None
) -> Optional[str]:
"""Generate a conversation summary from the given LLM context.
"""Run a one-shot, out-of-band (i.e. out-of-pipeline) inference with the given LLM context.
Args:
summary_prompt: The prompt to use to guide generating the summary.
context: The LLM context containing conversation history.
system_instruction: Optional system instruction to guide the LLM's
behavior. You could also (again, optionally) provide a system
instruction directly in the context. If both are provided, the
one in the context takes precedence.
Returns:
The generated summary, or None if generation failed.
The LLM's response as a string, or None if no response is generated.
"""
try:
if isinstance(context, LLMContext):
# Not sure if it's strictly necessary to adapt messages here
# since they'll just be a string in the prompt, but erring on
# the side of putting them in the format the LLM would expect
# if consuming them directly (i.e. assuming greater LLM
# familiarity with its own format).
adapter = self.get_llm_adapter()
params: GeminiLLMInvocationParams = adapter.get_llm_invocation_params(context)
messages = params["messages"]
else:
messages = context.messages
messages = []
system = []
if isinstance(context, LLMContext):
adapter = self.get_llm_adapter()
params: GeminiLLMInvocationParams = adapter.get_llm_invocation_params(context)
messages = params["messages"]
system = params["system_instruction"]
else:
context = GoogleLLMContext.upgrade_to_google(context)
messages = context.messages
system = getattr(context, "system_message", None) or system_instruction
# Format conversation history as user message
contents = [
Content(role="user", parts=[Part(text=f"Conversation history: {messages}")])
]
generation_config = GenerateContentConfig(system_instruction=system)
# Use summary_prompt as system instruction
generation_config = GenerateContentConfig(system_instruction=summary_prompt)
# Use the new google-genai client's async method
response = await self._client.aio.models.generate_content(
model=self._model_name,
contents=messages,
config=generation_config,
)
# Use the new google-genai client's async method
response = await self._client.aio.models.generate_content(
model=self._model_name,
contents=contents,
config=generation_config,
)
# Extract text from response
if response.candidates and response.candidates[0].content:
for part in response.candidates[0].content.parts:
if part.text:
return part.text
# Extract text from response
if response.candidates and response.candidates[0].content:
for part in response.candidates[0].content.parts:
if part.text:
return part.text
return None
except Exception as e:
logger.error(f"Google summary generation failed: {e}", exc_info=True)
return None
return None
def needs_mcp_alternate_schema(self) -> bool:
"""Check if this LLM service requires alternate MCP schema.

View File

@@ -191,19 +191,23 @@ class LLMService(AIService):
"""
return self._adapter
async def generate_summary(
self, summary_prompt: str, context: LLMContext | OpenAILLMContext
async def run_inference(
self, context: LLMContext | OpenAILLMContext, system_instruction: Optional[str] = None
) -> Optional[str]:
"""Generate a conversation summary from the given LLM context.
"""Run a one-shot, out-of-band (i.e. out-of-pipeline) inference with the given LLM context.
Must be implemented by subclasses.
Args:
summary_prompt: The prompt to use to guide generating the summary.
context: The LLM context containing conversation history.
system_instruction: Optional system instruction to guide the LLM's
behavior. You could also (again, optionally) provide a system
instruction directly in the context.
Returns:
The generated summary, or None if generation failed.
The LLM's response as a string, or None if no response is generated.
"""
raise NotImplementedError(f"generate_summary() not supported by {self.__class__.__name__}")
raise NotImplementedError(f"run_inference() not supported by {self.__class__.__name__}")
def create_context_aggregator(
self,

View File

@@ -245,53 +245,35 @@ class BaseOpenAILLMService(LLMService):
params.update(self._settings["extra"])
return params
async def generate_summary(
self, summary_prompt: str, context: LLMContext | OpenAILLMContext
async def run_inference(
self, context: LLMContext | OpenAILLMContext, system_instruction: Optional[str] = None
) -> Optional[str]:
"""Generate a conversation summary from the given LLM context.
"""Run a one-shot, out-of-band (i.e. out-of-pipeline) inference with the given LLM context.
Args:
summary_prompt: The prompt to use to guide generating the summary.
context: The LLM context containing conversation history.
system_instruction: Optional system instruction to guide the LLM's
behavior. You could also (again, optionally) provide a system
instruction directly in the context.
Returns:
The generated summary, or None if generation failed.
The LLM's response as a string, or None if no response is generated.
"""
try:
if isinstance(context, LLMContext):
# Not sure if it's strictly necessary to adapt messages here
# since they'll just be a string in the prompt, but erring on
# the side of putting them in the format the LLM would expect
# if consuming them directly (i.e. assuming greater LLM
# familiarity with its own format).
adapter = self.get_llm_adapter()
params: OpenAILLMInvocationParams = adapter.get_llm_invocation_params(context)
messages = params["messages"]
else:
messages = context.messages
prompt_messages = [
{
"role": "system",
"content": summary_prompt,
},
{
"role": "user",
"content": f"Conversation history: {messages}",
},
]
if isinstance(context, LLMContext):
adapter = self.get_llm_adapter()
params: OpenAILLMInvocationParams = adapter.get_llm_invocation_params(context)
messages = params["messages"]
else:
messages = context.messages
# LLM completion
response = await self._client.chat.completions.create(
model=self.model_name,
messages=prompt_messages,
stream=False,
)
# LLM completion
response = await self._client.chat.completions.create(
model=self.model_name,
messages=messages,
stream=False,
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"OpenAI summary generation failed: {e}", exc_info=True)
return None
return response.choices[0].message.content
async def _stream_chat_completions_specific_context(
self, context: OpenAILLMContext