[WIP] AWS Nova Sonic service - add ability to pass in OpenAILLMContext

This commit is contained in:
Paul Kompfner
2025-04-29 11:39:32 -04:00
parent 9f7f42e885
commit f182eafb40
4 changed files with 303 additions and 54 deletions

View File

@@ -16,7 +16,8 @@ from pipecat.frames.frames import LLMMessagesAppendFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.services.aws_nova_sonic import AWSNovaSonicService
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.aws_nova_sonic import AWSNovaSonicLLMService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
@@ -47,13 +48,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
),
)
# Create the AWS Nova Sonic LLM service
# system_instruction = f"""
# You are a helpful AI assistant.
# Your goal is to demonstrate your capabilities in a helpful and engaging way.
# Your output will be converted to audio so don't include special characters in your answers.
# Respond to what the user said in a creative and helpful way.
# """
# Specify initial system instruction
# TODO: looks like Nova Sonic can't handle new lines?
system_instruction = (
"You are a friendly assistant. The user and you will engage in a spoken dialog "
@@ -61,20 +56,37 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
"generally two or three sentences for chatty scenarios."
)
llm = AWSNovaSonicService(
instruction=system_instruction,
# Create the AWS Nova Sonic LLM service
llm = AWSNovaSonicLLMService(
secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
region=os.getenv("AWS_REGION"),
voice_id="tiffany", # matthew, tiffany, amy
voice_id="tiffany", # matthew, tiffany, amy
# instruction=system_instruction # could pass instruction here rather than context, below
)
# Set up context and context management.
# AWSNovaSonicService will adapt OpenAI LLM context objects with standard message format to
# what's expected by Nova Sonic.
context = OpenAILLMContext(
messages=[
{"role": "system", "content": f"{system_instruction}"},
{
"role": "user",
"content": "Tell me hello! Don't wait for me to say anything else first!",
},
]
)
context_aggregator = llm.create_context_aggregator(context)
# Build the pipeline
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
transport.output(),
context_aggregator.assistant(),
]
)
@@ -93,18 +105,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
await task.queue_frames(
[
LLMMessagesAppendFrame(
messages=[
{
"role": "user",
"content": f"Greet the user and introduce yourself.",
}
]
)
]
)
await task.queue_frames([context_aggregator.user().get_context_frame()])
# Handle client disconnection events
@transport.event_handler("on_client_disconnected")

View File

@@ -1 +1 @@
from .aws import AWSNovaSonicService
from .aws import AWSNovaSonicLLMService

View File

@@ -3,6 +3,7 @@ import json
import uuid
from dataclasses import dataclass
from enum import Enum
from typing import Any
from aws_sdk_bedrock_runtime.client import (
BedrockRuntimeClient,
@@ -41,18 +42,26 @@ from pipecat.frames.frames import (
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.aggregators.llm_response import (
LLMAssistantAggregatorParams,
LLMUserAggregatorParams,
)
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.aws_nova_sonic.context import (
AWSNovaSonicAssistantContextAggregator,
AWSNovaSonicContextAggregatorPair,
AWSNovaSonicLLMContext,
AWSNovaSonicUserContextAggregator,
Role,
)
from pipecat.services.llm_service import LLMService
from pipecat.utils.time import time_now_iso8601
class Role(Enum):
SYSTEM = "SYSTEM"
USER = "USER"
ASSISTANT = "ASSISTANT"
TOOL = "TOOL"
class ContentType(Enum):
AUDIO = "AUDIO"
TEXT = "TEXT"
@@ -81,36 +90,40 @@ class CurrentContent:
)
class AWSNovaSonicService(LLMService):
class AWSNovaSonicLLMService(LLMService):
def __init__(
self,
*,
instruction: str,
# TODO: if we have instruction here as an alternative to using context, we should do the same for tools...right?
secret_access_key: str,
access_key_id: str,
region: str,
model: str = "amazon.nova-sonic-v1:0",
voice_id: str = "matthew", # matthew, tiffany, amy
instruction: str = None,
**kwargs,
):
super().__init__(**kwargs)
self._instruction = instruction
self._secret_access_key = secret_access_key
self._access_key_id = access_key_id
self._region = region
self._model = model
self._client: BedrockRuntimeClient = None
self._voice_id = voice_id
self._instruction = instruction
self._context: AWSNovaSonicLLMContext = None
self._stream: DuplexEventStream[
InvokeModelWithBidirectionalStreamInput,
InvokeModelWithBidirectionalStreamOutput,
InvokeModelWithBidirectionalStreamOperationOutput,
] = None
self._receive_task = None
self._prompt_name = str(uuid.uuid4())
self._input_audio_content_name = str(uuid.uuid4())
self._content_being_received = None # TODO: clean this up on error or when finished
self._prompt_name = None
self._input_audio_content_name = None
self._content_being_received = None
self._assistant_is_responding = False
self._context_available = False
self._ready_to_send_context = False
#
# standard AIService frame handling
@@ -118,7 +131,14 @@ class AWSNovaSonicService(LLMService):
async def start(self, frame: StartFrame):
await super().start(frame)
await self._connect()
# TODO: maybe connect but don't send history until we get all of our settings?
# how do we know how long to wait?
# ah, i think we'll *always* get at least one OpenAILLMContextFrame which kicks things off
# so we need to send the initial history when:
# - we're connected
# - we've gotten the first context
# i *think* this is what's controlled by _api_session_ready/_run_llm_when_api_session_ready
await self._start_connecting()
async def stop(self, frame: EndFrame):
await super().stop(frame)
@@ -135,10 +155,14 @@ class AWSNovaSonicService(LLMService):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, InputAudioRawFrame):
if isinstance(frame, OpenAILLMContextFrame):
await self._handle_context(frame.context)
elif isinstance(frame, InputAudioRawFrame):
# TODO: check if _audio_input_paused? what causes that?
await self._send_user_audio_event(frame)
# TODO: do we need to do anything for these?
elif isinstance(frame, BotStoppedSpeakingFrame):
await self._handle_bot_stopped_speaking()
# TODO: do we need to do anything for the below four frame types?
elif isinstance(frame, StartInterruptionFrame):
# print("[pk] StartInterruptionFrame")
pass
@@ -151,11 +175,19 @@ class AWSNovaSonicService(LLMService):
elif isinstance(frame, UserStoppedSpeakingFrame):
# print("[pk] UserStoppedSpeakingFrame")
pass
elif isinstance(frame, BotStoppedSpeakingFrame):
await self._handle_bot_stopped_speaking()
await self.push_frame(frame, direction)
async def _handle_context(self, context: OpenAILLMContext):
# TODO: if context has changed, reconnect
# TODO: remove
print(f"[pk] _handle_context: {context.get_messages_for_initializing_history()}")
if not self._context:
# We got our initial context - try to finish connecting
self._context = AWSNovaSonicLLMContext.upgrade_to_nova_sonic(context)
self._context_available = True
await self._finish_connecting_if_context_available()
async def _handle_bot_stopped_speaking(self):
if self._assistant_is_responding:
# Consider the assistant finished with their response.
@@ -178,12 +210,16 @@ class AWSNovaSonicService(LLMService):
# LLM communication: lifecycle
#
async def _connect(self):
async def _start_connecting(self):
try:
if self._client:
# Here we assume that if we have a client we are connected
# Here we assume that if we have a client we are connected or connecting
return
# Set IDs for the connection
self._prompt_name = str(uuid.uuid4())
self._input_audio_content_name = str(uuid.uuid4())
# Create the client
self._client = self._create_client()
@@ -195,19 +231,71 @@ class AWSNovaSonicService(LLMService):
# Send session start events
await self._send_session_start_events()
# Send initial system instruction
await self._send_text_event(text=self._instruction, role=Role.SYSTEM)
# Start audio input
await self._send_audio_input_start_event()
self._receive_task = self.create_task(self._receive_task_handler())
# Finish connecting
self._ready_to_send_context = True
await self._finish_connecting_if_context_available()
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._client = None
self._disconnect()
async def _finish_connecting_if_context_available(self):
# We can only finish connecting once we've gotten our initial context and we're ready to
# send it
if not (self._context_available and self._ready_to_send_context):
return
# Read context
history = self._context.get_messages_for_initializing_history()
# Send system instruction
# Instruction from context takes priority
instruction = history.instruction if history.instruction else self._instruction
if instruction:
await self._send_text_event(text=instruction, role=Role.SYSTEM)
# Send conversation history
for message in history.messages:
await self._send_text_event(text=message.text, role=message.role)
# Send initial context (system instruction and conversation history)
# TODO: finish implementing
# - pass additional message(s)
# - merge init-passed system instruction + context instruction (latter takes precedence)
# - merge init-passed tools + context tools (latter takes precedence)
await self._send_text_event(text=self._instruction, role=Role.SYSTEM)
# Start audio input
await self._send_audio_input_start_event()
# Start receiving events
self._receive_task = self.create_task(self._receive_task_handler())
async def _disconnect(self):
pass
try:
# Clean up receive task
if self._receive_task:
await self.cancel_task(self._receive_task, timeout=1.0)
self._receive_task = None
# Clean up client
if self._client:
await self._send_session_end_events()
self._client = None
# Clean up stream
if self._stream:
await self._stream.input_stream.close()
self._stream = None
# Reset remaining connection-specific state
self._prompt_name = None
self._input_audio_content_name = None
self._content_being_received = None
self._assistant_is_responding = False
self._context_available = False
self._ready_to_send_context = False
except Exception as e:
logger.error(f"{self} error disconnecting: {e}")
def _create_client(self) -> BedrockRuntimeClient:
config = Config(
@@ -340,7 +428,7 @@ class AWSNovaSonicService(LLMService):
await self._send_client_event(text_content_end)
async def _send_user_audio_event(self, frame: InputAudioRawFrame):
if not self._client:
if not self._stream:
return
blob = base64.b64encode(frame.audio)
@@ -357,6 +445,30 @@ class AWSNovaSonicService(LLMService):
'''
await self._send_client_event(audio_event)
async def _send_session_end_events(self):
if not self._stream:
return
prompt_end = f'''
{{
"event": {{
"promptEnd": {{
"promptName": "{self._prompt_name}"
}}
}}
}}
'''
await self._send_client_event(prompt_end)
session_end = """
{
"event": {
"sessionEnd": {}
}
}
"""
await self._send_client_event(session_end)
async def _send_client_event(self, event_json: str):
event = InvokeModelWithBidirectionalStreamInputChunk(
value=BidirectionalInputPayloadPart(bytes_=event_json.encode("utf-8"))
@@ -547,3 +659,18 @@ class AWSNovaSonicService(LLMService):
await self.push_frame(
TranscriptionFrame(text=text, user_id="", timestamp=time_now_iso8601())
)
#
# Context
#
def create_context_aggregator(
self,
context: OpenAILLMContext,
*,
user_params: LLMUserAggregatorParams = LLMUserAggregatorParams(),
assistant_params: LLMAssistantAggregatorParams = LLMAssistantAggregatorParams(),
) -> AWSNovaSonicContextAggregatorPair:
user = AWSNovaSonicUserContextAggregator(context=context, params=user_params)
assistant = AWSNovaSonicAssistantContextAggregator(context=context, params=assistant_params)
return AWSNovaSonicContextAggregatorPair(user, assistant)

View File

@@ -0,0 +1,121 @@
import copy
from dataclasses import dataclass, field
from enum import Enum
from loguru import logger
from pipecat.frames.frames import DataFrame, Frame, LLMMessagesUpdateFrame, LLMSetToolsFrame
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.openai.llm import (
OpenAIAssistantContextAggregator,
OpenAIUserContextAggregator,
)
class Role(Enum):
SYSTEM = "SYSTEM"
USER = "USER"
ASSISTANT = "ASSISTANT"
TOOL = "TOOL"
@dataclass
class AWSNovaSonicConversationHistoryMessage:
role: Role # only USER and ASSISTANT
text: str
@dataclass
class AWSNovaSonicConversationHistory:
instruction: str = None
messages: list[AWSNovaSonicConversationHistoryMessage] = field(default_factory=list)
@dataclass
class AWSNovaSonicLLMContext(OpenAILLMContext):
@staticmethod
def upgrade_to_nova_sonic(obj: OpenAILLMContext) -> "AWSNovaSonicLLMContext":
if isinstance(obj, OpenAILLMContext) and not isinstance(obj, AWSNovaSonicLLMContext):
obj.__class__ = AWSNovaSonicLLMContext
return obj
def get_messages_for_initializing_history(self) -> AWSNovaSonicConversationHistory:
history = AWSNovaSonicConversationHistory()
# Bail if there are no messages
if not self.messages:
return history
messages = copy.deepcopy(self.messages)
# If we have a "system" message as our first message, let's pull that out into "instruction"
if messages[0].get("role") == "system":
system = messages.pop(0)
content = system.get("content")
if isinstance(content, str):
history.instruction = content
elif isinstance(content, list):
history.instruction = content[0].get("text")
# Process remaining messages to fill out conversation history.
# Nova Sonic supports "user" and "assistant" messages in history.
for message in messages:
history_message = self.from_standard_message(message)
if history_message:
history.messages.append(history_message)
return history
def from_standard_message(self, message) -> AWSNovaSonicConversationHistoryMessage:
role = message.get("role")
if message.get("role") == "user" or message.get("role") == "assistant":
content = message.get("content")
if isinstance(message.get("content"), list):
content = ""
for c in message.get("content"):
if c.get("type") == "text":
content += " " + c.get("text")
else:
logger.error(
f"Unhandled content type in context message: {c.get('type')} - {message}"
)
return AWSNovaSonicConversationHistoryMessage(role=Role[role.upper()], text=content)
logger.error(f"Unhandled message type in from_standard_message: {message}")
@dataclass
class AWSNovaSonicMessagesUpdateFrame(DataFrame):
context: AWSNovaSonicLLMContext
class AWSNovaSonicUserContextAggregator(OpenAIUserContextAggregator):
async def process_frame(
self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM
):
await super().process_frame(frame, direction)
# Parent does not push LLMMessagesUpdateFrame
if isinstance(frame, LLMMessagesUpdateFrame):
await self.push_frame(AWSNovaSonicMessagesUpdateFrame(context=self._context))
# Parent also doesn't push the LLMSetToolsFrame
# TODO: this
# if isinstance(frame, LLMSetToolsFrame):
# await self.push_frame(frame, direction)
class AWSNovaSonicAssistantContextAggregator(OpenAIAssistantContextAggregator):
pass
@dataclass
class AWSNovaSonicContextAggregatorPair:
_user: AWSNovaSonicUserContextAggregator
_assistant: AWSNovaSonicAssistantContextAggregator
def user(self) -> AWSNovaSonicUserContextAggregator:
return self._user
def assistant(self) -> AWSNovaSonicAssistantContextAggregator:
return self._assistant