diff --git a/src/pipecat/pipeline/llm_switcher.py b/src/pipecat/pipeline/llm_switcher.py index 8509cf0f4..1d8631027 100644 --- a/src/pipecat/pipeline/llm_switcher.py +++ b/src/pipecat/pipeline/llm_switcher.py @@ -50,19 +50,24 @@ class LLMSwitcher(ParallelPipeline, Generic[StrategyType]): self.llms = llms self.strategy = strategy - async def generate_summary(self, summary_prompt: str, context: LLMContext) -> Optional[str]: - """Generate a conversation summary from the given LLM context, using the currently active LLM. + async def run_inference( + self, context: LLMContext, system_instruction: Optional[str] = None + ) -> Optional[str]: + """Run a one-shot, out-of-band (i.e. out-of-pipeline) inference with the given LLM context, using the currently active LLM. Args: - summary_prompt: The prompt to use to guide generating the summary. context: The LLM context containing conversation history. + system_instruction: Optional system instruction to guide the LLM's + behavior. You could also (again, optionally) provide a system + instruction directly in the context. If both are provided, the + one in the context takes precedence. Returns: - The generated summary, or None if generation failed. + The LLM's response as a string, or None if no response is generated. """ if self.strategy.active_llm: - return await self.strategy.active_llm.generate_summary( - summary_prompt=summary_prompt, context=context + return await self.strategy.active_llm.run_inference( + context=context, system_instruction=system_instruction ) return None diff --git a/src/pipecat/services/anthropic/llm.py b/src/pipecat/services/anthropic/llm.py index 0601d52f5..ee042aa1b 100644 --- a/src/pipecat/services/anthropic/llm.py +++ b/src/pipecat/services/anthropic/llm.py @@ -199,55 +199,45 @@ class AnthropicLLMService(LLMService): response = await api_call(**params) return response - async def generate_summary( - self, summary_prompt: str, context: LLMContext | OpenAILLMContext + async def run_inference( + self, context: LLMContext | OpenAILLMContext, system_instruction: Optional[str] = None ) -> Optional[str]: - """Generate a conversation summary from the given LLM context. + """Run a one-shot, out-of-band (i.e. out-of-pipeline) inference with the given LLM context. Args: - summary_prompt: The prompt to use to guide generating the summary. context: The LLM context containing conversation history. + system_instruction: Optional system instruction to guide the LLM's + behavior. You could also (again, optionally) provide a system + instruction directly in the context. If both are provided, the + one in the context takes precedence. Returns: - The generated summary, or None if generation failed. + The LLM's response as a string, or None if no response is generated. """ - try: - if isinstance(context, LLMContext): - # Not sure if it's strictly necessary to adapt messages here - # since they'll just be a string in the prompt, but erring on - # the side of putting them in the format the LLM would expect - # if consuming them directly (i.e. assuming greater LLM - # familiarity with its own format). - # adapter = self.get_llm_adapter() - # params: AnthropicLLMInvocationParams = adapter.get_llm_invocation_params(context) - # messages = params["messages"] - raise NotImplementedError( - "Universal LLMContext is not yet supported for Anthropic." - ) - else: - messages = context.messages + messages = [] + system = [] + if isinstance(context, LLMContext): + # Future code will be something like this: + # adapter = self.get_llm_adapter() + # params: AnthropicLLMInvocationParams = adapter.get_llm_invocation_params(context) + # messages = params["messages"] + # system = params["system_instruction"] + raise NotImplementedError("Universal LLMContext is not yet supported for Anthropic.") + else: + context = AnthropicLLMContext.upgrade_to_anthropic(context) + messages = context.messages + system = getattr(context, "system", None) or system_instruction - prompt_messages = [ - { - "role": "user", - "content": f"Conversation history: {messages}", - }, - ] + # LLM completion + response = await self._client.messages.create( + model=self.model_name, + messages=messages, + system=system, + max_tokens=8192, + stream=False, + ) - # LLM completion - response = await self._client.messages.create( - model=self.model_name, - messages=prompt_messages, - system=summary_prompt, - max_tokens=8192, - stream=False, - ) - - return response.content[0].text - - except Exception as e: - logger.error(f"Anthropic summary generation failed: {e}", exc_info=True) - return None + return response.content[0].text @property def enable_prompt_caching_beta(self) -> bool: diff --git a/src/pipecat/services/aws/llm.py b/src/pipecat/services/aws/llm.py index 1d9028c39..6e109c4c1 100644 --- a/src/pipecat/services/aws/llm.py +++ b/src/pipecat/services/aws/llm.py @@ -791,33 +791,37 @@ class AWSBedrockLLMService(LLMService): """ return True - async def generate_summary( - self, summary_prompt: str, context: LLMContext | OpenAILLMContext + async def run_inference( + self, context: LLMContext | OpenAILLMContext, system_instruction: Optional[str] = None ) -> Optional[str]: - """Generate a conversation summary from the given LLM context. + """Run a one-shot, out-of-band (i.e. out-of-pipeline) inference with the given LLM context. Args: - summary_prompt: The prompt to use to guide generating the summary. context: The LLM context containing conversation history. + system_instruction: Optional system instruction to guide the LLM's + behavior. You could also (again, optionally) provide a system + instruction directly in the context. If both are provided, the + one in the context takes precedence. Returns: - The generated summary, or None if generation failed. + The LLM's response as a string, or None if no response is generated. """ try: + messages = [] + system = [] if isinstance(context, LLMContext): - # Not sure if it's strictly necessary to adapt messages here - # since they'll just be a string in the prompt, but erring on - # the side of putting them in the format the LLM would expect - # if consuming them directly (i.e. assuming greater LLM - # familiarity with its own format). + # Future code will be something like this: # adapter = self.get_llm_adapter() # params: AWSBedrockLLMInvocationParams = adapter.get_llm_invocation_params(context) # messages = params["messages"] + # system = params["system_instruction"] raise NotImplementedError( "Universal LLMContext is not yet supported for AWS Bedrock." ) else: + context = AWSBedrockLLMContext.upgrade_to_bedrock(context) messages = context.messages + system = getattr(context, "system", None) or system_instruction # Determine if we're using Claude or Nova based on model ID model_id = self.model_name @@ -825,12 +829,7 @@ class AWSBedrockLLMService(LLMService): # Prepare request parameters request_params = { "modelId": model_id, - "messages": [ - { - "role": "user", - "content": [{"text": f"Conversation history: {messages}"}], - }, - ], + "messages": messages, "inferenceConfig": { "maxTokens": 8192, "temperature": 0.7, @@ -838,7 +837,8 @@ class AWSBedrockLLMService(LLMService): }, } - request_params["system"] = [{"text": summary_prompt}] + if system: + request_params["system"] = [{"text": system}] async with self._aws_session.client( service_name="bedrock-runtime", **self._aws_params diff --git a/src/pipecat/services/google/llm.py b/src/pipecat/services/google/llm.py index faf3f2f52..7a140c363 100644 --- a/src/pipecat/services/google/llm.py +++ b/src/pipecat/services/google/llm.py @@ -733,57 +733,49 @@ class GoogleLLMService(LLMService): def _create_client(self, api_key: str, http_options: Optional[HttpOptions] = None): self._client = genai.Client(api_key=api_key, http_options=http_options) - async def generate_summary( - self, summary_prompt: str, context: LLMContext | OpenAILLMContext + async def run_inference( + self, context: LLMContext | OpenAILLMContext, system_instruction: Optional[str] = None ) -> Optional[str]: - """Generate a conversation summary from the given LLM context. + """Run a one-shot, out-of-band (i.e. out-of-pipeline) inference with the given LLM context. Args: - summary_prompt: The prompt to use to guide generating the summary. context: The LLM context containing conversation history. + system_instruction: Optional system instruction to guide the LLM's + behavior. You could also (again, optionally) provide a system + instruction directly in the context. If both are provided, the + one in the context takes precedence. Returns: - The generated summary, or None if generation failed. + The LLM's response as a string, or None if no response is generated. """ - try: - if isinstance(context, LLMContext): - # Not sure if it's strictly necessary to adapt messages here - # since they'll just be a string in the prompt, but erring on - # the side of putting them in the format the LLM would expect - # if consuming them directly (i.e. assuming greater LLM - # familiarity with its own format). - adapter = self.get_llm_adapter() - params: GeminiLLMInvocationParams = adapter.get_llm_invocation_params(context) - messages = params["messages"] - else: - messages = context.messages + messages = [] + system = [] + if isinstance(context, LLMContext): + adapter = self.get_llm_adapter() + params: GeminiLLMInvocationParams = adapter.get_llm_invocation_params(context) + messages = params["messages"] + system = params["system_instruction"] + else: + context = GoogleLLMContext.upgrade_to_google(context) + messages = context.messages + system = getattr(context, "system_message", None) or system_instruction - # Format conversation history as user message - contents = [ - Content(role="user", parts=[Part(text=f"Conversation history: {messages}")]) - ] + generation_config = GenerateContentConfig(system_instruction=system) - # Use summary_prompt as system instruction - generation_config = GenerateContentConfig(system_instruction=summary_prompt) + # Use the new google-genai client's async method + response = await self._client.aio.models.generate_content( + model=self._model_name, + contents=messages, + config=generation_config, + ) - # Use the new google-genai client's async method - response = await self._client.aio.models.generate_content( - model=self._model_name, - contents=contents, - config=generation_config, - ) + # Extract text from response + if response.candidates and response.candidates[0].content: + for part in response.candidates[0].content.parts: + if part.text: + return part.text - # Extract text from response - if response.candidates and response.candidates[0].content: - for part in response.candidates[0].content.parts: - if part.text: - return part.text - - return None - - except Exception as e: - logger.error(f"Google summary generation failed: {e}", exc_info=True) - return None + return None def needs_mcp_alternate_schema(self) -> bool: """Check if this LLM service requires alternate MCP schema. diff --git a/src/pipecat/services/llm_service.py b/src/pipecat/services/llm_service.py index cbf295e9c..3152a0083 100644 --- a/src/pipecat/services/llm_service.py +++ b/src/pipecat/services/llm_service.py @@ -191,19 +191,23 @@ class LLMService(AIService): """ return self._adapter - async def generate_summary( - self, summary_prompt: str, context: LLMContext | OpenAILLMContext + async def run_inference( + self, context: LLMContext | OpenAILLMContext, system_instruction: Optional[str] = None ) -> Optional[str]: - """Generate a conversation summary from the given LLM context. + """Run a one-shot, out-of-band (i.e. out-of-pipeline) inference with the given LLM context. + + Must be implemented by subclasses. Args: - summary_prompt: The prompt to use to guide generating the summary. context: The LLM context containing conversation history. + system_instruction: Optional system instruction to guide the LLM's + behavior. You could also (again, optionally) provide a system + instruction directly in the context. Returns: - The generated summary, or None if generation failed. + The LLM's response as a string, or None if no response is generated. """ - raise NotImplementedError(f"generate_summary() not supported by {self.__class__.__name__}") + raise NotImplementedError(f"run_inference() not supported by {self.__class__.__name__}") def create_context_aggregator( self, diff --git a/src/pipecat/services/openai/base_llm.py b/src/pipecat/services/openai/base_llm.py index e591b4b4e..e51755cba 100644 --- a/src/pipecat/services/openai/base_llm.py +++ b/src/pipecat/services/openai/base_llm.py @@ -245,53 +245,35 @@ class BaseOpenAILLMService(LLMService): params.update(self._settings["extra"]) return params - async def generate_summary( - self, summary_prompt: str, context: LLMContext | OpenAILLMContext + async def run_inference( + self, context: LLMContext | OpenAILLMContext, system_instruction: Optional[str] = None ) -> Optional[str]: - """Generate a conversation summary from the given LLM context. + """Run a one-shot, out-of-band (i.e. out-of-pipeline) inference with the given LLM context. Args: - summary_prompt: The prompt to use to guide generating the summary. context: The LLM context containing conversation history. + system_instruction: Optional system instruction to guide the LLM's + behavior. You could also (again, optionally) provide a system + instruction directly in the context. Returns: - The generated summary, or None if generation failed. + The LLM's response as a string, or None if no response is generated. """ - try: - if isinstance(context, LLMContext): - # Not sure if it's strictly necessary to adapt messages here - # since they'll just be a string in the prompt, but erring on - # the side of putting them in the format the LLM would expect - # if consuming them directly (i.e. assuming greater LLM - # familiarity with its own format). - adapter = self.get_llm_adapter() - params: OpenAILLMInvocationParams = adapter.get_llm_invocation_params(context) - messages = params["messages"] - else: - messages = context.messages - prompt_messages = [ - { - "role": "system", - "content": summary_prompt, - }, - { - "role": "user", - "content": f"Conversation history: {messages}", - }, - ] + if isinstance(context, LLMContext): + adapter = self.get_llm_adapter() + params: OpenAILLMInvocationParams = adapter.get_llm_invocation_params(context) + messages = params["messages"] + else: + messages = context.messages - # LLM completion - response = await self._client.chat.completions.create( - model=self.model_name, - messages=prompt_messages, - stream=False, - ) + # LLM completion + response = await self._client.chat.completions.create( + model=self.model_name, + messages=messages, + stream=False, + ) - return response.choices[0].message.content - - except Exception as e: - logger.error(f"OpenAI summary generation failed: {e}", exc_info=True) - return None + return response.choices[0].message.content async def _stream_chat_completions_specific_context( self, context: OpenAILLMContext