Compare commits

...

8 Commits

Author SHA1 Message Date
Paul Kompfner
0dac7f7e48 Update OpenAIRealtimeLLMService to work with LLMContext and LLMContextAggregatorPair 2025-10-01 10:18:07 -04:00
Paul Kompfner
a2c69fbd8b Update OpenAIRealtimeLLMService to work with LLMContext and LLMContextAggregatorPair 2025-10-01 09:57:51 -04:00
Paul Kompfner
8962263329 Update OpenAIRealtimeLLMService to work with LLMContext and LLMContextAggregatorPair 2025-09-30 14:47:06 -04:00
Paul Kompfner
ae22673595 Update OpenAIRealtimeLLMService to work with LLMContext and LLMContextAggregatorPair 2025-09-30 14:33:05 -04:00
Paul Kompfner
236ac93ac6 Remove remaining usage of OpenAILLMContext throughout the codebase in favor of LLMContext, except for:
- Usage in classes that are already deprecated
- Usage related to realtime LLMs, which don't yet support `LLMContext`
- Usage in (soon-to-be-deprecated) code paths related to `OpenAILLMContext` itself and associated machinery
2025-09-30 14:33:05 -04:00
Paul Kompfner
ceba27e696 Address some AWSNovaSonicLLMService context-recording edge cases 2025-09-29 14:51:09 -04:00
Paul Kompfner
07ba02a491 Get rid of LLMContext.get_messages_for_persistent_storage().
The reason for its `system_instruction` argument was to support usage with LLMs where you might pass the system instruction as a parameter to the `LLMService` rather than specifying it in the context.

But as I thought about it more I became unconvinced that the `system_instruction` argument was really beneficial:

- If you specified your system instruction in your context in the first place, it'll still be there when you read messages for persistent storage
- If you didn't specify your system instruction in the context and instead passed it in as an `LLMService` parameter, you most likely *don't* want it to be in the context when you read messages for persistent storage
- ...and if you really really do need to inject it at the start of the context, it's quite easy to do anyway

And if we remove the `system_instruction` argument from `get_messages_for_persistent_storage()`, then it's essentially just `get_messages()`.
2025-09-29 11:00:02 -04:00
Paul Kompfner
4d9873b613 Update AWSNovaSonicLLMService to work with LLMContext and LLMContextAggregatorPair 2025-09-29 10:28:38 -04:00
18 changed files with 595 additions and 670 deletions

View File

@@ -9,6 +9,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Expanded support for universal `LLMContext` to `AWSNovaSonicLLMService`.
As a reminder, the context-setup pattern when using `LLMContext` is:
```python
context = LLMContext(messages, tools)
context_aggregator = LLMContextAggregatorPair(context)
```
(Note that even though `AWSNovaSonicLLMService` now supports the universal
`LLMContext`, it is not meant to be swapped out for another LLM service at
runtime.)
- Include OpenAI-based LLM services cached tokens to `MetricsFrame`.
## Fixed

View File

@@ -19,6 +19,8 @@ from pipecat.observers.loggers.transcription_log_observer import TranscriptionLo
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.transcript_processor import TranscriptProcessor
from pipecat.runner.types import RunnerArguments
@@ -162,12 +164,12 @@ Remember, your responses should be short. Just one or two sentences, usually. Re
# Create a standard OpenAI LLM context object using the normal messages format. The
# OpenAIRealtimeLLMService will convert this internally to messages that the
# openai WebSocket API can understand.
context = OpenAILLMContext(
context = LLMContext(
[{"role": "user", "content": "Say hello!"}],
tools,
)
context_aggregator = llm.create_context_aggregator(context)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[

View File

@@ -72,7 +72,6 @@ async def save_conversation(params: FunctionCallParams):
)
try:
with open(filename, "w") as file:
# todo: extract 'system' into the first message in the list
messages = params.context.get_messages()
# remove the last message, which is the instruction we just gave to save the conversation
messages.pop()

View File

@@ -90,7 +90,6 @@ async def save_conversation(params: FunctionCallParams):
)
try:
with open(filename, "w") as file:
# todo: extract 'system' into the first message in the list
messages = params.context.get_messages()
# remove the last message (the instruction to save the context)
messages.pop()

View File

@@ -20,6 +20,8 @@ from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
@@ -75,7 +77,7 @@ async def save_conversation(params: FunctionCallParams):
filename = f"{BASE_FILENAME}{timestamp}.json"
try:
with open(filename, "w") as file:
messages = params.context.get_messages_for_persistent_storage()
messages = params.context.get_messages()
# remove the last few messages. in reverse order, they are:
# - the in progress save tool call
# - the invocation of the save tool call
@@ -223,13 +225,13 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
llm.register_function("get_saved_conversation_filenames", get_saved_conversation_filenames)
llm.register_function("load_conversation", load_conversation)
context = OpenAILLMContext(
context = LLMContext(
messages=[
{"role": "system", "content": f"{system_instruction}"},
],
tools=tools,
)
context_aggregator = llm.create_context_aggregator(context)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[

View File

@@ -18,7 +18,8 @@ from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.aws_nova_sonic import AWSNovaSonicLLMService
@@ -119,9 +120,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
llm.register_function("get_current_weather", fetch_weather_from_api)
# Set up context and context management.
# AWSNovaSonicService will adapt OpenAI LLM context objects with standard message format to
# what's expected by Nova Sonic.
context = OpenAILLMContext(
context = LLMContext(
messages=[
{"role": "system", "content": f"{system_instruction}"},
{
@@ -131,7 +130,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
],
tools=tools,
)
context_aggregator = llm.create_context_aggregator(context)
context_aggregator = LLMContextAggregatorPair(context)
# Build the pipeline
pipeline = Pipeline(

View File

@@ -34,7 +34,8 @@ from pipecat.frames.frames import EndTaskFrame, LLMRunFrame, OutputImageRawFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.processors.frame_processor import FrameDirection
from pipecat.runner.types import RunnerArguments
@@ -283,8 +284,8 @@ async def run_eval_pipeline(
},
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
context = LLMContext(messages, tools)
context_aggregator = LLMContextAggregatorPair(context)
audio_buffer = AudioBufferProcessor()

View File

@@ -6,13 +6,47 @@
"""AWS Nova Sonic LLM adapter for Pipecat."""
import copy
import json
from typing import Any, Dict, List, TypedDict
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, List, Optional, TypedDict
from loguru import logger
from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_context import LLMContext, LLMContextMessage
class Role(Enum):
"""Roles supported in AWS Nova Sonic conversations.
Parameters:
SYSTEM: System-level messages (not used in conversation history).
USER: Messages sent by the user.
ASSISTANT: Messages sent by the assistant.
TOOL: Messages sent by tools (not used in conversation history).
"""
SYSTEM = "SYSTEM"
USER = "USER"
ASSISTANT = "ASSISTANT"
TOOL = "TOOL"
@dataclass
class AWSNovaSonicConversationHistoryMessage:
"""A single message in AWS Nova Sonic conversation history.
Parameters:
role: The role of the message sender (USER or ASSISTANT only).
text: The text content of the message.
"""
role: Role # only USER and ASSISTANT
text: str
class AWSNovaSonicLLMInvocationParams(TypedDict):
@@ -21,7 +55,9 @@ class AWSNovaSonicLLMInvocationParams(TypedDict):
This is a placeholder until support for universal LLMContext machinery is added for AWS Nova Sonic.
"""
pass
system_instruction: Optional[str]
messages: List[AWSNovaSonicConversationHistoryMessage]
tools: List[Dict[str, Any]]
class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
@@ -34,7 +70,7 @@ class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
@property
def id_for_llm_specific_messages(self) -> str:
"""Get the identifier used in LLMSpecificMessage instances for AWS Nova Sonic."""
raise NotImplementedError("Universal LLMContext is not yet supported for AWS Nova Sonic.")
return "aws-nova-sonic"
def get_llm_invocation_params(self, context: LLMContext) -> AWSNovaSonicLLMInvocationParams:
"""Get AWS Nova Sonic-specific LLM invocation parameters from a universal LLM context.
@@ -47,7 +83,13 @@ class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
Returns:
Dictionary of parameters for invoking AWS Nova Sonic's LLM API.
"""
raise NotImplementedError("Universal LLMContext is not yet supported for AWS Nova Sonic.")
messages = self._from_universal_context_messages(self.get_messages(context))
return {
"system_instruction": messages.system_instruction,
"messages": messages.messages,
# NOTE: LLMContext's tools are guaranteed to be a ToolsSchema (or NOT_GIVEN)
"tools": self.from_standard_tools(context.tools) or [],
}
def get_messages_for_logging(self, context) -> List[Dict[str, Any]]:
"""Get messages from a universal LLM context in a format ready for logging about AWS Nova Sonic.
@@ -62,7 +104,75 @@ class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
Returns:
List of messages in a format ready for logging about AWS Nova Sonic.
"""
raise NotImplementedError("Universal LLMContext is not yet supported for AWS Nova Sonic.")
return self._from_universal_context_messages(self.get_messages(context)).messages
@dataclass
class ConvertedMessages:
"""Container for Google-formatted messages converted from universal context."""
messages: List[AWSNovaSonicConversationHistoryMessage]
system_instruction: Optional[str] = None
def _from_universal_context_messages(
self, universal_context_messages: List[LLMContextMessage]
) -> ConvertedMessages:
system_instruction = None
messages = []
# Bail if there are no messages
if not universal_context_messages:
return self.ConvertedMessages()
universal_context_messages = copy.deepcopy(universal_context_messages)
# If we have a "system" message as our first message, let's pull that out into "instruction"
if universal_context_messages[0].get("role") == "system":
system = universal_context_messages.pop(0)
content = system.get("content")
if isinstance(content, str):
system_instruction = content
elif isinstance(content, list):
system_instruction = content[0].get("text")
if system_instruction:
self._system_instruction = system_instruction
# Process remaining messages to fill out conversation history.
# Nova Sonic supports "user" and "assistant" messages in history.
for universal_context_message in universal_context_messages:
message = self._from_universal_context_message(universal_context_message)
if message:
messages.append(message)
return self.ConvertedMessages(messages=messages, system_instruction=system_instruction)
def _from_universal_context_message(self, message) -> AWSNovaSonicConversationHistoryMessage:
"""Convert standard message format to Nova Sonic format.
Args:
message: Standard message dictionary to convert.
Returns:
Nova Sonic conversation history message, or None if not convertible.
"""
role = message.get("role")
if message.get("role") == "user" or message.get("role") == "assistant":
content = message.get("content")
if isinstance(message.get("content"), list):
content = ""
for c in message.get("content"):
if c.get("type") == "text":
content += " " + c.get("text")
else:
logger.error(
f"Unhandled content type in context message: {c.get('type')} - {message}"
)
# There won't be content if this is an assistant tool call entry.
# We're ignoring those since they can't be loaded into AWS Nova Sonic conversation
# history
if content:
return AWSNovaSonicConversationHistoryMessage(role=Role[role.upper()], text=content)
# NOTE: we're ignoring messages with role "tool" since they can't be loaded into AWS Nova
# Sonic conversation history
@staticmethod
def _to_aws_nova_sonic_function_format(function: FunctionSchema) -> Dict[str, Any]:

View File

@@ -6,12 +6,18 @@
"""OpenAI Realtime LLM adapter for Pipecat."""
from typing import Any, Dict, List, TypedDict
import copy
import json
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, TypedDict
from loguru import logger
from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_context import LLMContext, LLMContextMessage
from pipecat.services.openai_realtime import events
class OpenAIRealtimeLLMInvocationParams(TypedDict):
@@ -20,7 +26,9 @@ class OpenAIRealtimeLLMInvocationParams(TypedDict):
This is a placeholder until support for universal LLMContext machinery is added for OpenAI Realtime.
"""
pass
system_instruction: Optional[str]
messages: List[events.ConversationItem]
tools: List[Dict[str, Any]]
class OpenAIRealtimeLLMAdapter(BaseLLMAdapter):
@@ -33,7 +41,7 @@ class OpenAIRealtimeLLMAdapter(BaseLLMAdapter):
@property
def id_for_llm_specific_messages(self) -> str:
"""Get the identifier used in LLMSpecificMessage instances for OpenAI Realtime."""
raise NotImplementedError("Universal LLMContext is not yet supported for OpenAI Realtime.")
return "openai-realtime"
def get_llm_invocation_params(self, context: LLMContext) -> OpenAIRealtimeLLMInvocationParams:
"""Get OpenAI Realtime-specific LLM invocation parameters from a universal LLM context.
@@ -46,7 +54,13 @@ class OpenAIRealtimeLLMAdapter(BaseLLMAdapter):
Returns:
Dictionary of parameters for invoking OpenAI Realtime's API.
"""
raise NotImplementedError("Universal LLMContext is not yet supported for OpenAI Realtime.")
messages = self._from_universal_context_messages(self.get_messages(context))
return {
"system_instruction": messages.system_instruction,
"messages": messages.messages,
# NOTE: LLMContext's tools are guaranteed to be a ToolsSchema (or NOT_GIVEN)
"tools": self.from_standard_tools(context.tools) or [],
}
def get_messages_for_logging(self, context) -> List[Dict[str, Any]]:
"""Get messages from a universal LLM context in a format ready for logging about OpenAI Realtime.
@@ -61,7 +75,106 @@ class OpenAIRealtimeLLMAdapter(BaseLLMAdapter):
Returns:
List of messages in a format ready for logging about OpenAI Realtime.
"""
raise NotImplementedError("Universal LLMContext is not yet supported for OpenAI Realtime.")
return self._from_universal_context_messages(self.get_messages(context)).messages
@dataclass
class ConvertedMessages:
"""Container for OpenAI-formatted messages converted from universal context."""
messages: List[events.ConversationItem]
system_instruction: Optional[str] = None
def _from_universal_context_messages(
self, universal_context_messages: List[LLMContextMessage]
) -> ConvertedMessages:
# We can't load a long conversation history into the openai realtime api yet. (The API/model
# forgets that it can do audio, if you do a series of `conversation.item.create` calls.) So
# our general strategy until this is fixed is just to put everything into a first "user"
# message as a single input.
if not universal_context_messages:
return self.ConvertedMessages()
messages = copy.deepcopy(universal_context_messages)
system_instruction = None
# If we have a "system" message as our first message, let's pull that out into session
# "instructions"
if messages[0].get("role") == "system":
system = messages.pop(0)
content = system.get("content")
if isinstance(content, str):
system_instruction = content
elif isinstance(content, list):
system_instruction = content[0].get("text")
if not messages:
return self.ConvertedMessages(messages=[], system_instruction=system_instruction)
# If we have just a single "user" item, we can just send it normally
if len(messages) == 1 and messages[0].get("role") == "user":
return self.ConvertedMessages(
messages=[self._from_universal_context_message(messages[0])],
system_instruction=system_instruction,
)
# Otherwise, let's pack everything into a single "user" message with a bit of
# explanation for the LLM
intro_text = """
This is a previously saved conversation. Please treat this conversation history as a
starting point for the current conversation."""
trailing_text = """
This is the end of the previously saved conversation. Please continue the conversation
from here. If the last message is a user instruction or question, act on that instruction
or answer the question. If the last message is an assistant response, simple say that you
are ready to continue the conversation."""
self.ConvertedMessages(
messages=[
{
"role": "user",
"type": "message",
"content": [
{
"type": "input_text",
"text": "\n\n".join(
[intro_text, json.dumps(messages, indent=2), trailing_text]
),
}
],
}
],
system_instruction=system_instruction,
)
def _from_universal_context_message(
self, message: LLMContextMessage
) -> events.ConversationItem:
if message.get("role") == "user":
content = message.get("content")
if isinstance(message.get("content"), list):
content = ""
for c in message.get("content"):
if c.get("type") == "text":
content += " " + c.get("text")
else:
logger.error(
f"Unhandled content type in context message: {c.get('type')} - {message}"
)
return events.ConversationItem(
role="user",
type="message",
content=[events.ItemContent(type="input_text", text=content)],
)
if message.get("role") == "assistant" and message.get("tool_calls"):
tc = message.get("tool_calls")[0]
return events.ConversationItem(
type="function_call",
call_id=tc["id"],
name=tc["function"]["name"],
arguments=tc["function"]["arguments"],
)
logger.error(f"Unhandled message type in _from_universal_context_message: {message}")
@staticmethod
def _to_openai_realtime_function_format(function: FunctionSchema) -> Dict[str, Any]:

View File

@@ -15,9 +15,10 @@ service-specific adapter.
"""
import base64
import copy
import io
from dataclasses import dataclass
from typing import Any, List, Optional, TypeAlias, Union
from typing import TYPE_CHECKING, Any, List, Optional, TypeAlias, Union
from loguru import logger
from openai._types import NOT_GIVEN as OPEN_AI_NOT_GIVEN
@@ -31,6 +32,9 @@ from PIL import Image
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.frames.frames import AudioRawFrame
if TYPE_CHECKING:
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
# "Re-export" types from OpenAI that we're using as universal context types.
# NOTE: if universal message types need to someday diverge from OpenAI's, we
# should consider managing our own definitions. But we should do so carefully,
@@ -65,6 +69,26 @@ class LLMContext:
and content formatting.
"""
@staticmethod
def from_openai_context(openai_context: "OpenAILLMContext") -> "LLMContext":
"""Create a universal LLM context from an OpenAI-specific context.
NOTE: this should only be used internally, for facilitating migration
from OpenAILLMContext to LLMContext. New user code should use
LLMContext directly.
Args:
openai_context: The OpenAI LLM context to convert.
Returns:
New LLMContext instance with converted messages and settings.
"""
return LLMContext(
messages=openai_context.get_messages(),
tools=openai_context.tools,
tool_choice=openai_context.tool_choice,
)
def __init__(
self,
messages: Optional[List[LLMContextMessage]] = None,

View File

@@ -13,6 +13,7 @@ LLM processing, and text-to-speech components in conversational AI pipelines.
import asyncio
import json
from abc import abstractmethod
from typing import Any, Dict, List, Literal, Optional, Set
from loguru import logger
@@ -169,6 +170,11 @@ class LLMContextAggregator(FrameProcessor):
"""Reset the aggregation state."""
self._aggregation = ""
@abstractmethod
async def push_aggregation(self):
"""Push the current aggregation downstream."""
pass
class LLMUserAggregator(LLMContextAggregator):
"""User LLM aggregator that processes speech-to-text transcriptions.
@@ -301,7 +307,7 @@ class LLMUserAggregator(LLMContextAggregator):
frame = LLMContextFrame(self._context)
await self.push_frame(frame)
async def _push_aggregation(self):
async def push_aggregation(self):
"""Push the current aggregation based on interruption strategies and conditions."""
if len(self._aggregation) > 0:
if self.interruption_strategies and self._bot_speaking:
@@ -392,7 +398,7 @@ class LLMUserAggregator(LLMContextAggregator):
# pushing the aggregation as we will probably get a final transcription.
if len(self._aggregation) > 0:
if not self._seen_interim_results:
await self._push_aggregation()
await self.push_aggregation()
# Handles the case where both the user and the bot are not speaking,
# and the bot was previously speaking before the user interruption.
# So in this case we are resetting the aggregation timer
@@ -471,7 +477,7 @@ class LLMUserAggregator(LLMContextAggregator):
await self._maybe_emulate_user_speaking()
except asyncio.TimeoutError:
if not self._user_speaking:
await self._push_aggregation()
await self.push_aggregation()
# If we are emulating VAD we still need to send the user stopped
# speaking frame.
@@ -607,12 +613,12 @@ class LLMAssistantAggregator(LLMContextAggregator):
elif isinstance(frame, UserImageRawFrame) and frame.request and frame.request.tool_call_id:
await self._handle_user_image_frame(frame)
elif isinstance(frame, BotStoppedSpeakingFrame):
await self._push_aggregation()
await self.push_aggregation()
await self.push_frame(frame, direction)
else:
await self.push_frame(frame, direction)
async def _push_aggregation(self):
async def push_aggregation(self):
"""Push the current assistant aggregation with timestamp."""
if not self._aggregation:
return
@@ -644,7 +650,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
await self.push_context_frame(FrameDirection.UPSTREAM)
async def _handle_interruptions(self, frame: InterruptionFrame):
await self._push_aggregation()
await self.push_aggregation()
self._started = 0
await self.reset()
@@ -778,7 +784,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
text=frame.request.context,
)
await self._push_aggregation()
await self.push_aggregation()
await self.push_context_frame(FrameDirection.UPSTREAM)
async def _handle_llm_start(self, _: LLMFullResponseStartFrame):
@@ -786,7 +792,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
async def _handle_llm_end(self, _: LLMFullResponseEndFrame):
self._started -= 1
await self._push_aggregation()
await self.push_aggregation()
async def _handle_text(self, frame: TextFrame):
if not self._started:

View File

@@ -12,14 +12,14 @@ in conversational pipelines.
"""
from pipecat.frames.frames import TextFrame
from pipecat.processors.aggregators.llm_response import LLMUserContextAggregator
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMUserAggregator
class UserResponseAggregator(LLMUserContextAggregator):
class UserResponseAggregator(LLMUserAggregator):
"""Aggregates user responses into TextFrame objects.
This aggregator extends LLMUserContextAggregator to specifically handle
This aggregator extends LLMUserAggregator to specifically handle
user input by collecting text responses and outputting them as TextFrame
objects when the aggregation is complete.
"""
@@ -28,9 +28,9 @@ class UserResponseAggregator(LLMUserContextAggregator):
"""Initialize the user response aggregator.
Args:
**kwargs: Additional arguments passed to parent LLMUserContextAggregator.
**kwargs: Additional arguments passed to parent LLMUserAggregator.
"""
super().__init__(context=OpenAILLMContext(), **kwargs)
super().__init__(context=LLMContext(), **kwargs)
async def push_aggregation(self):
"""Push the aggregated user response as a TextFrame.

View File

@@ -25,7 +25,7 @@ from loguru import logger
from pydantic import BaseModel, Field
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.adapters.services.aws_nova_sonic_adapter import AWSNovaSonicLLMAdapter
from pipecat.adapters.services.aws_nova_sonic_adapter import AWSNovaSonicLLMAdapter, Role
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
CancelFrame,
@@ -33,36 +33,36 @@ from pipecat.frames.frames import (
Frame,
FunctionCallFromLLM,
InputAudioRawFrame,
InterimTranscriptionFrame,
InterruptionFrame,
LLMContextFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMTextFrame,
StartFrame,
TranscriptionFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
TTSTextFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response import (
LLMAssistantAggregatorParams,
LLMUserAggregatorParams,
)
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.aws_nova_sonic.context import (
AWSNovaSonicAssistantContextAggregator,
AWSNovaSonicContextAggregatorPair,
AWSNovaSonicLLMContext,
AWSNovaSonicUserContextAggregator,
Role,
)
from pipecat.services.aws_nova_sonic.frames import AWSNovaSonicFunctionCallResultFrame
from pipecat.services.llm_service import LLMService
from pipecat.services.openai.llm import (
OpenAIAssistantContextAggregator,
OpenAIContextAggregatorPair,
OpenAIUserContextAggregator,
)
from pipecat.utils.time import time_now_iso8601
try:
@@ -217,6 +217,11 @@ class AWSNovaSonicLLMService(LLMService):
system_instruction: System-level instruction for the model.
tools: Available tools/functions for the model to use.
send_transcription_frames: Whether to emit transcription frames.
.. deprecated:: 0.0.87
This parameter is deprecated and will be removed in a future version.
Transcription frames are always sent.
**kwargs: Additional arguments passed to the parent LLMService.
"""
super().__init__(**kwargs)
@@ -230,8 +235,20 @@ class AWSNovaSonicLLMService(LLMService):
self._params = params or Params()
self._system_instruction = system_instruction
self._tools = tools
self._send_transcription_frames = send_transcription_frames
self._context: Optional[AWSNovaSonicLLMContext] = None
if not send_transcription_frames:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"`send_transcription_frames` is deprecated and will be removed in a future version. "
"Transcription frames are always sent.",
DeprecationWarning,
stacklevel=2,
)
self._context: Optional[LLMContext] = None
self._stream: Optional[
DuplexEventStream[
InvokeModelWithBidirectionalStreamInput,
@@ -244,12 +261,17 @@ class AWSNovaSonicLLMService(LLMService):
self._input_audio_content_name: Optional[str] = None
self._content_being_received: Optional[CurrentContent] = None
self._assistant_is_responding = False
self._may_need_repush_assistant_text = False
self._ready_to_send_context = False
self._handling_bot_stopped_speaking = False
self._triggering_assistant_response = False
self._waiting_for_trigger_transcription = False
self._disconnecting = False
self._connected_time: Optional[float] = None
self._wants_connection = False
self._user_text_buffer = ""
self._assistant_text_buffer = ""
self._completed_tool_calls = set()
file_path = files("pipecat.services.aws_nova_sonic").joinpath("ready.wav")
with wave.open(file_path.open("rb"), "rb") as wav_file:
@@ -302,12 +324,12 @@ class AWSNovaSonicLLMService(LLMService):
logger.debug("Resetting conversation")
await self._handle_bot_stopped_speaking(delay_to_catch_trailing_assistant_text=False)
# Carry over previous context through disconnect
# Grab context to carry through disconnect/reconnect
context = self._context
await self._disconnect()
self._context = context
await self._disconnect()
await self._start_connecting()
await self._handle_context(context)
#
# frame processing
@@ -322,28 +344,35 @@ class AWSNovaSonicLLMService(LLMService):
"""
await super().process_frame(frame, direction)
if isinstance(frame, OpenAILLMContextFrame):
await self._handle_context(frame.context)
elif isinstance(frame, LLMContextFrame):
raise NotImplementedError(
"Universal LLMContext is not yet supported for AWS Nova Sonic."
if isinstance(frame, (LLMContextFrame, OpenAILLMContextFrame)):
context = (
frame.context
if isinstance(frame, LLMContextFrame)
else LLMContext.from_openai_context(frame.context)
)
await self._handle_context(context)
elif isinstance(frame, InputAudioRawFrame):
await self._handle_input_audio_frame(frame)
elif isinstance(frame, BotStoppedSpeakingFrame):
await self._handle_bot_stopped_speaking(delay_to_catch_trailing_assistant_text=True)
elif isinstance(frame, AWSNovaSonicFunctionCallResultFrame):
await self._handle_function_call_result(frame)
elif isinstance(frame, InterruptionFrame):
await self._handle_interruption_frame()
await self.push_frame(frame, direction)
async def _handle_context(self, context: OpenAILLMContext):
async def _handle_context(self, context: LLMContext):
if self._disconnecting:
return
if not self._context:
# We got our initial context - try to finish connecting
self._context = AWSNovaSonicLLMContext.upgrade_to_nova_sonic(
context, self._system_instruction
)
# We got our initial context
# Try to finish connecting
self._context = context
await self._finish_connecting_if_context_available()
else:
# We got an updated context
# Send results for any newly-completed function calls
await self._process_completed_function_calls(send_new_results=True)
async def _handle_input_audio_frame(self, frame: InputAudioRawFrame):
# Wait until we're done sending the assistant response trigger audio before sending audio
@@ -393,9 +422,9 @@ class AWSNovaSonicLLMService(LLMService):
else:
await finalize_assistant_response()
async def _handle_function_call_result(self, frame: AWSNovaSonicFunctionCallResultFrame):
result = frame.result_frame
await self._send_tool_result(tool_call_id=result.tool_call_id, result=result.result)
async def _handle_interruption_frame(self):
if self._assistant_is_responding:
self._may_need_repush_assistant_text = True
#
# LLM communication: lifecycle
@@ -431,6 +460,17 @@ class AWSNovaSonicLLMService(LLMService):
logger.error(f"{self} initialization error: {e}")
self._disconnect()
async def _process_completed_function_calls(self, send_new_results: bool):
# Check for set of completed function calls in the context
for message in self._context.get_messages():
if message.get("role") and message.get("content") != "IN_PROGRESS":
tool_call_id = message.get("tool_call_id")
if tool_call_id and tool_call_id not in self._completed_tool_calls:
# Found a newly-completed function call - send the result to the service
if send_new_results:
await self._send_tool_result(tool_call_id, message.get("content"))
self._completed_tool_calls.add(tool_call_id)
async def _finish_connecting_if_context_available(self):
# We can only finish connecting once we've gotten our initial context and we're ready to
# send it
@@ -439,30 +479,38 @@ class AWSNovaSonicLLMService(LLMService):
logger.info("Finishing connecting (setting up session)...")
# Initialize our bookkeeping of already-completed tool calls in the
# context
await self._process_completed_function_calls(send_new_results=False)
# Read context
history = self._context.get_messages_for_initializing_history()
adapter: AWSNovaSonicLLMAdapter = self.get_llm_adapter()
llm_connection_params = adapter.get_llm_invocation_params(self._context)
# Send prompt start event, specifying tools.
# Tools from context take priority over self._tools.
tools = (
self._context.tools
if self._context.tools
else self.get_llm_adapter().from_standard_tools(self._tools)
llm_connection_params["tools"]
if llm_connection_params["tools"]
else adapter.from_standard_tools(self._tools)
)
logger.debug(f"Using tools: {tools}")
await self._send_prompt_start_event(tools)
# Send system instruction.
# Instruction from context takes priority over self._system_instruction.
# (NOTE: this prioritizing occurred automatically behind the scenes: the context was
# initialized with self._system_instruction and then updated itself from its messages when
# get_messages_for_initializing_history() was called).
logger.debug(f"Using system instruction: {history.system_instruction}")
if history.system_instruction:
await self._send_text_event(text=history.system_instruction, role=Role.SYSTEM)
system_instruction = (
llm_connection_params["system_instruction"]
if llm_connection_params["system_instruction"]
else self._system_instruction
)
logger.debug(f"Using system instruction: {system_instruction}")
if system_instruction:
await self._send_text_event(text=system_instruction, role=Role.SYSTEM)
# Send conversation history
for message in history.messages:
for message in llm_connection_params["messages"]:
# logger.debug(f"Seeding conversation history with message: {message}")
await self._send_text_event(text=message.text, role=message.role)
# Start audio input
@@ -492,9 +540,12 @@ class AWSNovaSonicLLMService(LLMService):
await self._send_session_end_events()
self._client = None
# Clean up context
self._context = None
# Clean up stream
if self._stream:
await self._stream.input_stream.close()
await self._stream.close()
self._stream = None
# NOTE: see explanation of HACK, below
@@ -510,15 +561,23 @@ class AWSNovaSonicLLMService(LLMService):
self._receive_task = None
# Reset remaining connection-specific state
# Should be all private state except:
# - _wants_connection
# - _assistant_response_trigger_audio
self._prompt_name = None
self._input_audio_content_name = None
self._content_being_received = None
self._assistant_is_responding = False
self._may_need_repush_assistant_text = False
self._ready_to_send_context = False
self._handling_bot_stopped_speaking = False
self._triggering_assistant_response = False
self._waiting_for_trigger_transcription = False
self._disconnecting = False
self._connected_time = None
self._user_text_buffer = ""
self._assistant_text_buffer = ""
self._completed_tool_calls = set()
logger.info("Finished disconnecting")
except Exception as e:
@@ -830,6 +889,10 @@ class AWSNovaSonicLLMService(LLMService):
# Handle the LLM completion ending
await self._handle_completion_end_event(event_json)
except Exception as e:
if self._disconnecting:
# Errors are kind of expected while disconnecting, so just
# ignore them and do nothing
return
logger.error(f"{self} error processing responses: {e}")
if self._wants_connection:
await self.reset_conversation()
@@ -960,7 +1023,7 @@ class AWSNovaSonicLLMService(LLMService):
async def _report_assistant_response_started(self):
logger.debug("Assistant response started")
# Report that the assistant has started their response.
# Report the start of the assistant response.
await self.push_frame(LLMFullResponseStartFrame())
# Report that equivalent of TTS (this is a speech-to-speech model) started
@@ -972,23 +1035,16 @@ class AWSNovaSonicLLMService(LLMService):
logger.debug(f"Assistant response text added: {text}")
# Report some text added to the ongoing assistant response
await self.push_frame(LLMTextFrame(text))
# Report some text added to the *equivalent* of TTS (this is a speech-to-speech model)
# Report the text of the assistant response.
await self.push_frame(TTSTextFrame(text))
# TODO: this is a (hopefully temporary) HACK. Here we directly manipulate the context rather
# than relying on the frames pushed to the assistant context aggregator. The pattern of
# receiving full-sentence text after the assistant has spoken does not easily fit with the
# Pipecat expectation of chunks of text streaming in while the assistant is speaking.
# Interruption handling was especially challenging. Rather than spend days trying to fit a
# square peg in a round hole, I decided on this hack for the time being. We can most cleanly
# abandon this hack if/when AWS Nova Sonic implements streaming smaller text chunks
# interspersed with audio. Note that when we move away from this hack, we need to make sure
# that on an interruption we avoid sending LLMFullResponseEndFrame, which gets the
# LLMAssistantContextAggregator into a bad state.
self._context.buffer_assistant_text(text)
# HACK: here we're also buffering the assistant text ourselves as a
# backup rather than relying solely on the assistant context aggregator
# to do it, because the text arrives from Nova Sonic only after all the
# assistant audio frames have been pushed, meaning that if an
# interruption frame were to arrive we would lose all of it (the text
# frames sitting in the queue would be wiped).
self._assistant_text_buffer += text
async def _report_assistant_response_ended(self):
if not self._context: # should never happen
@@ -996,14 +1052,34 @@ class AWSNovaSonicLLMService(LLMService):
logger.debug("Assistant response ended")
# Report that the assistant has finished their response.
# If an interruption frame arrived while the assistant was responding
# we may have lost all of the assistant text (see HACK, above), so
# re-push it downstream to the aggregator now.
if self._may_need_repush_assistant_text:
# Just in case, check that assistant text hasn't already made it
# into the context (sometimes it does, despite the interruption).
messages = self._context.get_messages()
last_message = messages[-1] if messages else None
if (
not last_message
or last_message.get("role") != "assistant"
or last_message.get("content") != self._assistant_text_buffer
):
# We also need to re-push the LLMFullResponseStartFrame since the
# TTSTextFrame would be ignored otherwise (the interruption frame
# would have cleared the assistant aggregator state).
await self.push_frame(LLMFullResponseStartFrame())
await self.push_frame(TTSTextFrame(self._assistant_text_buffer))
self._may_need_repush_assistant_text = False
# Report the end of the assistant response.
await self.push_frame(LLMFullResponseEndFrame())
# Report that equivalent of TTS (this is a speech-to-speech model) stopped.
await self.push_frame(TTSStoppedFrame())
# For an explanation of this hack, see _report_assistant_response_text_added.
self._context.flush_aggregated_assistant_text()
# Clear out the buffered assistant text
self._assistant_text_buffer = ""
#
# user transcription reporting
@@ -1020,33 +1096,67 @@ class AWSNovaSonicLLMService(LLMService):
logger.debug(f"User transcription text added: {text}")
# Manually add new user transcription text to context.
# We can't rely on the user context aggregator to do this since it's upstream from the LLM.
self._context.buffer_user_text(text)
# Report that some new user transcription text is available.
if self._send_transcription_frames:
await self.push_frame(
InterimTranscriptionFrame(text=text, user_id="", timestamp=time_now_iso8601())
)
# HACK: here we're buffering the user text ourselves rather than
# relying on the upstream user context aggregator to do it, because the
# text arrives in fairly large chunks spaced fairly far apart in time.
# That means the user text would be split between different messages in
# context. Even if we sent placeholder InterimTranscriptionFrames in
# between each TranscriptionFrame to tell the aggregator to hold off on
# finalizing the user message, the aggregator would likely get the last
# chunk too late.
self._user_text_buffer += f" {text}" if self._user_text_buffer else text
async def _report_user_transcription_ended(self):
if not self._context: # should never happen
return
# Manually add user transcription to context (if any has been buffered).
# We can't rely on the user context aggregator to do this since it's upstream from the LLM.
transcription = self._context.flush_aggregated_user_text()
if not transcription:
return
logger.debug(f"User transcription ended")
if self._send_transcription_frames:
await self.push_frame(
TranscriptionFrame(text=transcription, user_id="", timestamp=time_now_iso8601())
# Report to the upstream user context aggregator that some new user
# transcription text is available.
# HACK: Check if this transcription was triggered by our own
# assistant response trigger. If so, we need to wrap it with
# UserStarted/StoppedSpeakingFrames; otherwise the user aggregator
# would fire an EmulatedUserStartedSpeakingFrame, which would
# trigger an interruption, which would prevent us from writing the
# assistant response to context.
#
# Sending an EmulateUserStartedSpeakingFrame ourselves doesn't
# work: it just causes the interruption we're trying to avoid.
#
# Setting enable_emulated_vad_interruptions also doesn't work: at
# the time the user aggregator receives the TranscriptionFrame, it
# doesn't yet know the assistant has started responding, so it
# doesn't know that emulating the user starting to speak would
# cause an interruption.
should_wrap_in_user_started_stopped_speaking_frames = (
self._waiting_for_trigger_transcription
and self._user_text_buffer.strip().lower() == "ready"
)
# Start wrapping the upstream transcription in UserStarted/StoppedSpeakingFrames if needed
if should_wrap_in_user_started_stopped_speaking_frames:
logger.debug(
"Wrapping assistant response trigger transcription with upstream UserStarted/StoppedSpeakingFrames"
)
await self.push_frame(UserStartedSpeakingFrame(), direction=FrameDirection.UPSTREAM)
# Send the transcription upstream for the user context aggregator
frame = TranscriptionFrame(
text=self._user_text_buffer, user_id="", timestamp=time_now_iso8601()
)
await self.push_frame(frame, direction=FrameDirection.UPSTREAM)
# Finish wrapping the upstream transcription in UserStarted/StoppedSpeakingFrames if needed
if should_wrap_in_user_started_stopped_speaking_frames:
await self.push_frame(UserStoppedSpeakingFrame(), direction=FrameDirection.UPSTREAM)
# Clear out the buffered user text
self._user_text_buffer = ""
# We're no longer waiting for a trigger transcription
self._waiting_for_trigger_transcription = False
#
# context
@@ -1058,23 +1168,26 @@ class AWSNovaSonicLLMService(LLMService):
*,
user_params: LLMUserAggregatorParams = LLMUserAggregatorParams(),
assistant_params: LLMAssistantAggregatorParams = LLMAssistantAggregatorParams(),
) -> AWSNovaSonicContextAggregatorPair:
) -> LLMContextAggregatorPair:
"""Create context aggregator pair for managing conversation context.
NOTE: this method exists only for backward compatibility. New code
should instead do:
context = LLMContext(...)
context_aggregator = LLMContextAggregatorPair(context)
Args:
context: The OpenAI LLM context to upgrade.
context: The OpenAI LLM context.
user_params: Parameters for the user context aggregator.
assistant_params: Parameters for the assistant context aggregator.
Returns:
A pair of user and assistant context aggregators.
"""
context.set_llm_adapter(self.get_llm_adapter())
user = AWSNovaSonicUserContextAggregator(context=context, params=user_params)
assistant = AWSNovaSonicAssistantContextAggregator(context=context, params=assistant_params)
return AWSNovaSonicContextAggregatorPair(user, assistant)
context = LLMContext.from_openai_context(context)
return LLMContextAggregatorPair(
context, user_params=user_params, assistant_params=assistant_params
)
#
# assistant response trigger (HACK)
@@ -1112,6 +1225,8 @@ class AWSNovaSonicLLMService(LLMService):
try:
logger.debug("Sending assistant response trigger...")
self._waiting_for_trigger_transcription = True
chunk_duration = 0.02 # what we might get from InputAudioRawFrame
chunk_size = int(
chunk_duration

View File

@@ -1,367 +0,0 @@
#
# Copyright (c) 2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Context management for AWS Nova Sonic LLM service.
This module provides specialized context aggregators and message handling for AWS Nova Sonic,
including conversation history management and role-specific message processing.
"""
import copy
from dataclasses import dataclass, field
from enum import Enum
from loguru import logger
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
DataFrame,
Frame,
FunctionCallResultFrame,
InterruptionFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMessagesAppendFrame,
LLMMessagesUpdateFrame,
LLMSetToolChoiceFrame,
LLMSetToolsFrame,
TextFrame,
UserImageRawFrame,
)
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.aws_nova_sonic.frames import AWSNovaSonicFunctionCallResultFrame
from pipecat.services.openai.llm import (
OpenAIAssistantContextAggregator,
OpenAIUserContextAggregator,
)
class Role(Enum):
"""Roles supported in AWS Nova Sonic conversations.
Parameters:
SYSTEM: System-level messages (not used in conversation history).
USER: Messages sent by the user.
ASSISTANT: Messages sent by the assistant.
TOOL: Messages sent by tools (not used in conversation history).
"""
SYSTEM = "SYSTEM"
USER = "USER"
ASSISTANT = "ASSISTANT"
TOOL = "TOOL"
@dataclass
class AWSNovaSonicConversationHistoryMessage:
"""A single message in AWS Nova Sonic conversation history.
Parameters:
role: The role of the message sender (USER or ASSISTANT only).
text: The text content of the message.
"""
role: Role # only USER and ASSISTANT
text: str
@dataclass
class AWSNovaSonicConversationHistory:
"""Complete conversation history for AWS Nova Sonic initialization.
Parameters:
system_instruction: System-level instruction for the conversation.
messages: List of conversation messages between user and assistant.
"""
system_instruction: str = None
messages: list[AWSNovaSonicConversationHistoryMessage] = field(default_factory=list)
class AWSNovaSonicLLMContext(OpenAILLMContext):
"""Specialized LLM context for AWS Nova Sonic service.
Extends OpenAI context with Nova Sonic-specific message handling,
conversation history management, and text buffering capabilities.
"""
def __init__(self, messages=None, tools=None, **kwargs):
"""Initialize AWS Nova Sonic LLM context.
Args:
messages: Initial messages for the context.
tools: Available tools for the context.
**kwargs: Additional arguments passed to parent class.
"""
super().__init__(messages=messages, tools=tools, **kwargs)
self.__setup_local()
def __setup_local(self, system_instruction: str = ""):
self._assistant_text = ""
self._user_text = ""
self._system_instruction = system_instruction
@staticmethod
def upgrade_to_nova_sonic(
obj: OpenAILLMContext, system_instruction: str
) -> "AWSNovaSonicLLMContext":
"""Upgrade an OpenAI context to AWS Nova Sonic context.
Args:
obj: The OpenAI context to upgrade.
system_instruction: System instruction for the context.
Returns:
The upgraded AWS Nova Sonic context.
"""
if isinstance(obj, OpenAILLMContext) and not isinstance(obj, AWSNovaSonicLLMContext):
obj.__class__ = AWSNovaSonicLLMContext
obj.__setup_local(system_instruction)
return obj
# NOTE: this method has the side-effect of updating _system_instruction from messages
def get_messages_for_initializing_history(self) -> AWSNovaSonicConversationHistory:
"""Get conversation history for initializing AWS Nova Sonic session.
Processes stored messages and extracts system instruction and conversation
history in the format expected by AWS Nova Sonic.
Returns:
Formatted conversation history with system instruction and messages.
"""
history = AWSNovaSonicConversationHistory(system_instruction=self._system_instruction)
# Bail if there are no messages
if not self.messages:
return history
messages = copy.deepcopy(self.messages)
# If we have a "system" message as our first message, let's pull that out into "instruction"
if messages[0].get("role") == "system":
system = messages.pop(0)
content = system.get("content")
if isinstance(content, str):
history.system_instruction = content
elif isinstance(content, list):
history.system_instruction = content[0].get("text")
if history.system_instruction:
self._system_instruction = history.system_instruction
# Process remaining messages to fill out conversation history.
# Nova Sonic supports "user" and "assistant" messages in history.
for message in messages:
history_message = self.from_standard_message(message)
if history_message:
history.messages.append(history_message)
return history
def get_messages_for_persistent_storage(self):
"""Get messages formatted for persistent storage.
Returns:
List of messages including system instruction if present.
"""
messages = super().get_messages_for_persistent_storage()
# If we have a system instruction and messages doesn't already contain it, add it
if self._system_instruction and not (messages and messages[0].get("role") == "system"):
messages.insert(0, {"role": "system", "content": self._system_instruction})
return messages
def from_standard_message(self, message) -> AWSNovaSonicConversationHistoryMessage:
"""Convert standard message format to Nova Sonic format.
Args:
message: Standard message dictionary to convert.
Returns:
Nova Sonic conversation history message, or None if not convertible.
"""
role = message.get("role")
if message.get("role") == "user" or message.get("role") == "assistant":
content = message.get("content")
if isinstance(message.get("content"), list):
content = ""
for c in message.get("content"):
if c.get("type") == "text":
content += " " + c.get("text")
else:
logger.error(
f"Unhandled content type in context message: {c.get('type')} - {message}"
)
# There won't be content if this is an assistant tool call entry.
# We're ignoring those since they can't be loaded into AWS Nova Sonic conversation
# history
if content:
return AWSNovaSonicConversationHistoryMessage(role=Role[role.upper()], text=content)
# NOTE: we're ignoring messages with role "tool" since they can't be loaded into AWS Nova
# Sonic conversation history
def buffer_user_text(self, text):
"""Buffer user text for later flushing to context.
Args:
text: User text to buffer.
"""
self._user_text += f" {text}" if self._user_text else text
# logger.debug(f"User text buffered: {self._user_text}")
def flush_aggregated_user_text(self) -> str:
"""Flush buffered user text to context as a complete message.
Returns:
The flushed user text, or empty string if no text was buffered.
"""
if not self._user_text:
return ""
user_text = self._user_text
message = {
"role": "user",
"content": [{"type": "text", "text": user_text}],
}
self._user_text = ""
self.add_message(message)
# logger.debug(f"Context updated (user): {self.get_messages_for_logging()}")
return user_text
def buffer_assistant_text(self, text):
"""Buffer assistant text for later flushing to context.
Args:
text: Assistant text to buffer.
"""
self._assistant_text += text
# logger.debug(f"Assistant text buffered: {self._assistant_text}")
def flush_aggregated_assistant_text(self):
"""Flush buffered assistant text to context as a complete message."""
if not self._assistant_text:
return
message = {
"role": "assistant",
"content": [{"type": "text", "text": self._assistant_text}],
}
self._assistant_text = ""
self.add_message(message)
# logger.debug(f"Context updated (assistant): {self.get_messages_for_logging()}")
@dataclass
class AWSNovaSonicMessagesUpdateFrame(DataFrame):
"""Frame containing updated AWS Nova Sonic context.
Parameters:
context: The updated AWS Nova Sonic LLM context.
"""
context: AWSNovaSonicLLMContext
class AWSNovaSonicUserContextAggregator(OpenAIUserContextAggregator):
"""Context aggregator for user messages in AWS Nova Sonic conversations.
Extends the OpenAI user context aggregator to emit Nova Sonic-specific
context update frames.
"""
async def process_frame(
self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM
):
"""Process frames and emit Nova Sonic-specific context updates.
Args:
frame: The frame to process.
direction: The direction the frame is traveling.
"""
await super().process_frame(frame, direction)
# Parent does not push LLMMessagesUpdateFrame
if isinstance(frame, LLMMessagesUpdateFrame):
await self.push_frame(AWSNovaSonicMessagesUpdateFrame(context=self._context))
class AWSNovaSonicAssistantContextAggregator(OpenAIAssistantContextAggregator):
"""Context aggregator for assistant messages in AWS Nova Sonic conversations.
Provides specialized handling for assistant responses and function calls
in AWS Nova Sonic context, with custom frame processing logic.
"""
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames with Nova Sonic-specific logic.
Args:
frame: The frame to process.
direction: The direction the frame is traveling.
"""
# HACK: For now, disable the context aggregator by making it just pass through all frames
# that the parent handles (except the function call stuff, which we still need).
# For an explanation of this hack, see
# AWSNovaSonicLLMService._report_assistant_response_text_added.
if isinstance(
frame,
(
InterruptionFrame,
LLMFullResponseStartFrame,
LLMFullResponseEndFrame,
TextFrame,
LLMMessagesAppendFrame,
LLMMessagesUpdateFrame,
LLMSetToolsFrame,
LLMSetToolChoiceFrame,
UserImageRawFrame,
BotStoppedSpeakingFrame,
),
):
await self.push_frame(frame, direction)
else:
await super().process_frame(frame, direction)
async def handle_function_call_result(self, frame: FunctionCallResultFrame):
"""Handle function call results for AWS Nova Sonic.
Args:
frame: The function call result frame to handle.
"""
await super().handle_function_call_result(frame)
# The standard function callback code path pushes the FunctionCallResultFrame from the LLM
# itself, so we didn't have a chance to add the result to the AWS Nova Sonic server-side
# context. Let's push a special frame to do that.
await self.push_frame(
AWSNovaSonicFunctionCallResultFrame(result_frame=frame), FrameDirection.UPSTREAM
)
@dataclass
class AWSNovaSonicContextAggregatorPair:
"""Pair of user and assistant context aggregators for AWS Nova Sonic.
Parameters:
_user: The user context aggregator.
_assistant: The assistant context aggregator.
"""
_user: AWSNovaSonicUserContextAggregator
_assistant: AWSNovaSonicAssistantContextAggregator
def user(self) -> AWSNovaSonicUserContextAggregator:
"""Get the user context aggregator.
Returns:
The user context aggregator instance.
"""
return self._user
def assistant(self) -> AWSNovaSonicAssistantContextAggregator:
"""Get the assistant context aggregator.
Returns:
The assistant context aggregator instance.
"""
return self._assistant

View File

@@ -39,24 +39,6 @@ class OpenAIRealtimeLLMContext(OpenAILLMContext):
realtime conversation items.
"""
def __init__(self, messages=None, tools=None, **kwargs):
"""Initialize the OpenAIRealtimeLLMContext.
Args:
messages: Initial conversation messages. Defaults to None.
tools: Available function tools. Defaults to None.
**kwargs: Additional arguments passed to parent OpenAILLMContext.
"""
super().__init__(messages=messages, tools=tools, **kwargs)
self.__setup_local()
def __setup_local(self):
self.llm_needs_settings_update = True
self.llm_needs_initial_messages = True
self._session_instructions = ""
return
@staticmethod
def upgrade_to_realtime(obj: OpenAILLMContext) -> "OpenAIRealtimeLLMContext":
"""Upgrade a standard OpenAI LLM context to a realtime context.
@@ -72,106 +54,6 @@ class OpenAIRealtimeLLMContext(OpenAILLMContext):
obj.__setup_local()
return obj
# todo
# - finish implementing all frames
def from_standard_message(self, message):
"""Convert a standard message format to a realtime conversation item.
Args:
message: The standard message dictionary to convert.
Returns:
A ConversationItem instance for the realtime API.
"""
if message.get("role") == "user":
content = message.get("content")
if isinstance(message.get("content"), list):
content = ""
for c in message.get("content"):
if c.get("type") == "text":
content += " " + c.get("text")
else:
logger.error(
f"Unhandled content type in context message: {c.get('type')} - {message}"
)
return events.ConversationItem(
role="user",
type="message",
content=[events.ItemContent(type="input_text", text=content)],
)
if message.get("role") == "assistant" and message.get("tool_calls"):
tc = message.get("tool_calls")[0]
return events.ConversationItem(
type="function_call",
call_id=tc["id"],
name=tc["function"]["name"],
arguments=tc["function"]["arguments"],
)
logger.error(f"Unhandled message type in from_standard_message: {message}")
def get_messages_for_initializing_history(self):
"""Get conversation items for initializing the realtime session history.
Converts the context's messages to a format suitable for the realtime API,
handling system instructions and conversation history packaging.
Returns:
List of conversation items for session initialization.
"""
# We can't load a long conversation history into the openai realtime api yet. (The API/model
# forgets that it can do audio, if you do a series of `conversation.item.create` calls.) So
# our general strategy until this is fixed is just to put everything into a first "user"
# message as a single input.
if not self.messages:
return []
messages = copy.deepcopy(self.messages)
# If we have a "system" message as our first message, let's pull that out into session
# "instructions"
if messages[0].get("role") == "system":
self.llm_needs_settings_update = True
system = messages.pop(0)
content = system.get("content")
if isinstance(content, str):
self._session_instructions = content
elif isinstance(content, list):
self._session_instructions = content[0].get("text")
if not messages:
return []
# If we have just a single "user" item, we can just send it normally
if len(messages) == 1 and messages[0].get("role") == "user":
return [self.from_standard_message(messages[0])]
# Otherwise, let's pack everything into a single "user" message with a bit of
# explanation for the LLM
intro_text = """
This is a previously saved conversation. Please treat this conversation history as a
starting point for the current conversation."""
trailing_text = """
This is the end of the previously saved conversation. Please continue the conversation
from here. If the last message is a user instruction or question, act on that instruction
or answer the question. If the last message is an assistant response, simple say that you
are ready to continue the conversation."""
return [
{
"role": "user",
"type": "message",
"content": [
{
"type": "input_text",
"text": "\n\n".join(
[intro_text, json.dumps(messages, indent=2), trailing_text]
),
}
],
}
]
def add_user_content_item_as_message(self, item):
"""Add a user content item as a standard message to the context.

View File

@@ -9,12 +9,16 @@
import base64
import json
import time
import traceback
from dataclasses import dataclass
from typing import Optional
from typing import Any, Dict, List, Optional
from loguru import logger
from pipecat.adapters.services.open_ai_realtime_adapter import OpenAIRealtimeLLMAdapter
from pipecat.adapters.services.open_ai_realtime_adapter import (
OpenAIRealtimeLLMAdapter,
OpenAIRealtimeLLMInvocationParams,
)
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
CancelFrame,
@@ -41,6 +45,7 @@ from pipecat.frames.frames import (
UserStoppedSpeakingFrame,
)
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response import (
LLMAssistantAggregatorParams,
LLMUserAggregatorParams,
@@ -138,7 +143,17 @@ class OpenAIRealtimeLLMService(LLMService):
self._send_transcription_frames = send_transcription_frames
self._websocket = None
self._receive_task = None
self._context = None
# "Last received context" is only needed while we still support
# OpenAILLMContextFrame. The "last received context" is the context received
# in the most recent OpenAILLMContextFrame or LLMContextFrame, before
# we convert it to an LLMContext if needed. Storing the "last received
# context" lets us determine whether the context has changed. (We can't
# compare contexts after conversion because conversion creates a new
# object.)
self._context: LLMContext = None
self._last_received_context: OpenAILLMContext | LLMContext = None
self._llm_needs_conversation_setup = True
self._disconnecting = False
self._api_session_ready = False
@@ -347,22 +362,22 @@ class OpenAIRealtimeLLMService(LLMService):
if isinstance(frame, TranscriptionFrame):
pass
elif isinstance(frame, OpenAILLMContextFrame):
context: OpenAIRealtimeLLMContext = OpenAIRealtimeLLMContext.upgrade_to_realtime(
elif isinstance(frame, (LLMContextFrame, OpenAILLMContextFrame)):
context = (
frame.context
if isinstance(frame, LLMContextFrame)
else LLMContext.from_openai_context(frame.context)
)
if not self._context:
self._last_received_context = frame.context
self._context = context
elif frame.context is not self._context:
elif frame.context is not self._last_received_context:
# If the context has changed, reset the conversation
self._last_received_context = frame.context
self._context = context
await self.reset_conversation()
# Run the LLM at next opportunity
await self._create_response()
elif isinstance(frame, LLMContextFrame):
raise NotImplementedError(
"Universal LLMContext is not yet supported for OpenAI Realtime."
)
elif isinstance(frame, InputAudioRawFrame):
if not self._audio_input_paused:
await self._send_user_audio(frame)
@@ -377,6 +392,7 @@ class OpenAIRealtimeLLMService(LLMService):
elif isinstance(frame, LLMMessagesAppendFrame):
await self._handle_messages_append(frame)
elif isinstance(frame, RealtimeMessagesUpdateFrame):
# TODO: we don't need RealtimeMessagesUpdateFrame, I think...?
self._context = frame.context
elif isinstance(frame, LLMUpdateSettingsFrame):
self._session_properties = events.SessionProperties(**frame.settings)
@@ -459,13 +475,20 @@ class OpenAIRealtimeLLMService(LLMService):
async def _update_settings(self):
settings = self._session_properties
# tools given in the context override the tools in the session properties
if self._context and self._context.tools:
settings.tools = self._context.tools
# instructions in the context come from an initial "system" message in the
# messages list, and override instructions in the session properties
if self._context and self._context._session_instructions:
settings.instructions = self._context._session_instructions
if self._context:
adapter: OpenAIRealtimeLLMAdapter = self.get_llm_adapter()
llm_invocation_params = adapter.get_llm_invocation_params(self._context)
# tools given in the context override the tools in the session properties
if llm_invocation_params["tools"]:
settings.tools = llm_invocation_params["tools"]
# instructions in the context come from an initial "system" message in the
# messages list, and override instructions in the session properties
if llm_invocation_params["system_instruction"]:
settings.instructions = llm_invocation_params["system_instruction"]
await self.send_client_event(events.SessionUpdateEvent(session=settings))
#
@@ -760,9 +783,7 @@ class OpenAIRealtimeLLMService(LLMService):
"""
logger.debug("Resetting conversation")
await self._disconnect()
if self._context:
self._context.llm_needs_settings_update = True
self._context.llm_needs_initial_messages = True
self._llm_needs_conversation_setup = True
await self._connect()
@traced_openai_realtime(operation="llm_request")
@@ -771,19 +792,25 @@ class OpenAIRealtimeLLMService(LLMService):
self._run_llm_when_api_session_ready = True
return
if self._context.llm_needs_initial_messages:
messages = self._context.get_messages_for_initializing_history()
adapter: OpenAIRealtimeLLMAdapter = self.get_llm_adapter()
# Configure the LLM for this session if needed
if self._llm_needs_conversation_setup:
# Send initial messages
llm_invocation_params = adapter.get_llm_invocation_params(self._context)
messages = llm_invocation_params["messages"]
for item in messages:
evt = events.ConversationItemCreateEvent(item=item)
self._messages_added_manually[evt.item.id] = True
await self.send_client_event(evt)
self._context.llm_needs_initial_messages = False
if self._context.llm_needs_settings_update:
# Send new settings if needed
await self._update_settings()
self._context.llm_needs_settings_update = False
logger.debug(f"Creating response: {self._context.get_messages_for_logging()}")
# We're done configuring the LLM for this session
self._llm_needs_conversation_setup = False
logger.debug(f"Creating response: {adapter.get_messages_for_logging(self._context)}")
await self.push_frame(LLMFullResponseStartFrame())
await self.start_processing_metrics()

View File

@@ -12,14 +12,12 @@ from dotenv import load_dotenv
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.frames.frames import LLMContextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.services.anthropic.llm import AnthropicLLMService
from pipecat.services.google.llm import GoogleLLMService
from pipecat.services.llm_service import LLMService
from pipecat.services.llm_service import FunctionCallParams, LLMService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.tests.utils import run_test
@@ -48,8 +46,13 @@ def standard_tools() -> ToolsSchema:
async def _test_llm_function_calling(llm: LLMService):
# Create an AsyncMock for the function
mock_fetch_weather = AsyncMock()
# Create a mock weather function
call_count = 0
async def mock_fetch_weather(params: FunctionCallParams):
nonlocal call_count
call_count += 1
pass
llm.register_function(None, mock_fetch_weather)
@@ -60,21 +63,19 @@ async def _test_llm_function_calling(llm: LLMService):
},
{"role": "user", "content": " How is the weather today in San Francisco, California?"},
]
context = OpenAILLMContext(messages, standard_tools())
# This is done by default inside the create_context_aggregator
context.set_llm_adapter(llm.get_llm_adapter())
context = LLMContext(messages, standard_tools())
pipeline = Pipeline([llm])
frames_to_send = [OpenAILLMContextFrame(context)]
frames_to_send = [LLMContextFrame(context)]
await run_test(
pipeline,
frames_to_send=frames_to_send,
expected_down_frames=None,
)
# Assert that the mock function was called
mock_fetch_weather.assert_called_once()
# Assert that the weather function was called once
assert call_count == 1
@pytest.mark.skipif(os.getenv("OPENAI_API_KEY") is None, reason="OPENAI_API_KEY is not set")

View File

@@ -10,24 +10,21 @@ from langchain.prompts import ChatPromptTemplate
from langchain_core.language_models import FakeStreamingListLLM
from pipecat.frames.frames import (
LLMContextAssistantTimestampFrame,
LLMContextFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
OpenAILLMContextAssistantTimestampFrame,
TextFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response import (
LLMAssistantAggregatorParams,
LLMAssistantContextAggregator,
LLMUserContextAggregator,
)
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.processors.frameworks.langchain import LangchainProcessor
from pipecat.tests.utils import SleepFrame, run_test
@@ -67,13 +64,14 @@ class TestLangchain(unittest.IsolatedAsyncioTestCase):
proc = LangchainProcessor(chain=chain)
self.mock_proc = self.MockProcessor("token_collector")
context = OpenAILLMContext()
tma_in = LLMUserContextAggregator(context)
tma_out = LLMAssistantContextAggregator(
context, params=LLMAssistantAggregatorParams(expect_stripped_words=False)
context = LLMContext()
context_aggregator = LLMContextAggregatorPair(
context, assistant_params=LLMAssistantAggregatorParams(expect_stripped_words=False)
)
pipeline = Pipeline([tma_in, proc, self.mock_proc, tma_out])
pipeline = Pipeline(
[context_aggregator.user(), proc, self.mock_proc, context_aggregator.assistant()]
)
frames_to_send = [
UserStartedSpeakingFrame(),
@@ -84,8 +82,8 @@ class TestLangchain(unittest.IsolatedAsyncioTestCase):
expected_down_frames = [
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
OpenAILLMContextFrame,
OpenAILLMContextAssistantTimestampFrame,
LLMContextFrame,
LLMContextAssistantTimestampFrame,
]
await run_test(
pipeline,
@@ -94,4 +92,6 @@ class TestLangchain(unittest.IsolatedAsyncioTestCase):
)
self.assertEqual("".join(self.mock_proc.token), self.expected_response)
self.assertEqual(tma_out.messages[-1]["content"], self.expected_response)
self.assertEqual(
context_aggregator.assistant().messages[-1]["content"], self.expected_response
)