diff --git a/examples/foundational/39-aws-nova-sonic.py b/examples/foundational/39-aws-nova-sonic.py index 445464957..c44f85a48 100644 --- a/examples/foundational/39-aws-nova-sonic.py +++ b/examples/foundational/39-aws-nova-sonic.py @@ -16,7 +16,8 @@ from pipecat.frames.frames import LLMMessagesAppendFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask -from pipecat.services.aws_nova_sonic import AWSNovaSonicService +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.services.aws_nova_sonic import AWSNovaSonicLLMService from pipecat.transports.base_transport import TransportParams from pipecat.transports.network.small_webrtc import SmallWebRTCTransport from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection @@ -47,13 +48,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection): ), ) - # Create the AWS Nova Sonic LLM service - # system_instruction = f""" - # You are a helpful AI assistant. - # Your goal is to demonstrate your capabilities in a helpful and engaging way. - # Your output will be converted to audio so don't include special characters in your answers. - # Respond to what the user said in a creative and helpful way. - # """ + # Specify initial system instruction # TODO: looks like Nova Sonic can't handle new lines? system_instruction = ( "You are a friendly assistant. The user and you will engage in a spoken dialog " @@ -61,20 +56,37 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection): "generally two or three sentences for chatty scenarios." ) - llm = AWSNovaSonicService( - instruction=system_instruction, + # Create the AWS Nova Sonic LLM service + llm = AWSNovaSonicLLMService( secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), region=os.getenv("AWS_REGION"), - voice_id="tiffany", # matthew, tiffany, amy + voice_id="tiffany", # matthew, tiffany, amy + # instruction=system_instruction # could pass instruction here rather than context, below ) + # Set up context and context management. + # AWSNovaSonicService will adapt OpenAI LLM context objects with standard message format to + # what's expected by Nova Sonic. + context = OpenAILLMContext( + messages=[ + {"role": "system", "content": f"{system_instruction}"}, + { + "role": "user", + "content": "Tell me hello! Don't wait for me to say anything else first!", + }, + ] + ) + context_aggregator = llm.create_context_aggregator(context) + # Build the pipeline pipeline = Pipeline( [ transport.input(), + context_aggregator.user(), llm, transport.output(), + context_aggregator.assistant(), ] ) @@ -93,18 +105,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection): async def on_client_connected(transport, client): logger.info(f"Client connected") # Kick off the conversation. - await task.queue_frames( - [ - LLMMessagesAppendFrame( - messages=[ - { - "role": "user", - "content": f"Greet the user and introduce yourself.", - } - ] - ) - ] - ) + await task.queue_frames([context_aggregator.user().get_context_frame()]) # Handle client disconnection events @transport.event_handler("on_client_disconnected") diff --git a/src/pipecat/services/aws_nova_sonic/__init__.py b/src/pipecat/services/aws_nova_sonic/__init__.py index b5559715a..e14c44f8a 100644 --- a/src/pipecat/services/aws_nova_sonic/__init__.py +++ b/src/pipecat/services/aws_nova_sonic/__init__.py @@ -1 +1 @@ -from .aws import AWSNovaSonicService +from .aws import AWSNovaSonicLLMService diff --git a/src/pipecat/services/aws_nova_sonic/aws.py b/src/pipecat/services/aws_nova_sonic/aws.py index 563d35422..cc07e5463 100644 --- a/src/pipecat/services/aws_nova_sonic/aws.py +++ b/src/pipecat/services/aws_nova_sonic/aws.py @@ -3,6 +3,7 @@ import json import uuid from dataclasses import dataclass from enum import Enum +from typing import Any from aws_sdk_bedrock_runtime.client import ( BedrockRuntimeClient, @@ -41,18 +42,26 @@ from pipecat.frames.frames import ( UserStartedSpeakingFrame, UserStoppedSpeakingFrame, ) +from pipecat.processors.aggregators.llm_response import ( + LLMAssistantAggregatorParams, + LLMUserAggregatorParams, +) +from pipecat.processors.aggregators.openai_llm_context import ( + OpenAILLMContext, + OpenAILLMContextFrame, +) from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.aws_nova_sonic.context import ( + AWSNovaSonicAssistantContextAggregator, + AWSNovaSonicContextAggregatorPair, + AWSNovaSonicLLMContext, + AWSNovaSonicUserContextAggregator, + Role, +) from pipecat.services.llm_service import LLMService from pipecat.utils.time import time_now_iso8601 -class Role(Enum): - SYSTEM = "SYSTEM" - USER = "USER" - ASSISTANT = "ASSISTANT" - TOOL = "TOOL" - - class ContentType(Enum): AUDIO = "AUDIO" TEXT = "TEXT" @@ -81,36 +90,40 @@ class CurrentContent: ) -class AWSNovaSonicService(LLMService): +class AWSNovaSonicLLMService(LLMService): def __init__( self, *, - instruction: str, + # TODO: if we have instruction here as an alternative to using context, we should do the same for tools...right? secret_access_key: str, access_key_id: str, region: str, model: str = "amazon.nova-sonic-v1:0", voice_id: str = "matthew", # matthew, tiffany, amy + instruction: str = None, **kwargs, ): super().__init__(**kwargs) - self._instruction = instruction self._secret_access_key = secret_access_key self._access_key_id = access_key_id self._region = region self._model = model self._client: BedrockRuntimeClient = None self._voice_id = voice_id + self._instruction = instruction + self._context: AWSNovaSonicLLMContext = None self._stream: DuplexEventStream[ InvokeModelWithBidirectionalStreamInput, InvokeModelWithBidirectionalStreamOutput, InvokeModelWithBidirectionalStreamOperationOutput, ] = None self._receive_task = None - self._prompt_name = str(uuid.uuid4()) - self._input_audio_content_name = str(uuid.uuid4()) - self._content_being_received = None # TODO: clean this up on error or when finished + self._prompt_name = None + self._input_audio_content_name = None + self._content_being_received = None self._assistant_is_responding = False + self._context_available = False + self._ready_to_send_context = False # # standard AIService frame handling @@ -118,7 +131,14 @@ class AWSNovaSonicService(LLMService): async def start(self, frame: StartFrame): await super().start(frame) - await self._connect() + # TODO: maybe connect but don't send history until we get all of our settings? + # how do we know how long to wait? + # ah, i think we'll *always* get at least one OpenAILLMContextFrame which kicks things off + # so we need to send the initial history when: + # - we're connected + # - we've gotten the first context + # i *think* this is what's controlled by _api_session_ready/_run_llm_when_api_session_ready + await self._start_connecting() async def stop(self, frame: EndFrame): await super().stop(frame) @@ -135,10 +155,14 @@ class AWSNovaSonicService(LLMService): async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, InputAudioRawFrame): + if isinstance(frame, OpenAILLMContextFrame): + await self._handle_context(frame.context) + elif isinstance(frame, InputAudioRawFrame): # TODO: check if _audio_input_paused? what causes that? await self._send_user_audio_event(frame) - # TODO: do we need to do anything for these? + elif isinstance(frame, BotStoppedSpeakingFrame): + await self._handle_bot_stopped_speaking() + # TODO: do we need to do anything for the below four frame types? elif isinstance(frame, StartInterruptionFrame): # print("[pk] StartInterruptionFrame") pass @@ -151,11 +175,19 @@ class AWSNovaSonicService(LLMService): elif isinstance(frame, UserStoppedSpeakingFrame): # print("[pk] UserStoppedSpeakingFrame") pass - elif isinstance(frame, BotStoppedSpeakingFrame): - await self._handle_bot_stopped_speaking() await self.push_frame(frame, direction) + async def _handle_context(self, context: OpenAILLMContext): + # TODO: if context has changed, reconnect + # TODO: remove + print(f"[pk] _handle_context: {context.get_messages_for_initializing_history()}") + if not self._context: + # We got our initial context - try to finish connecting + self._context = AWSNovaSonicLLMContext.upgrade_to_nova_sonic(context) + self._context_available = True + await self._finish_connecting_if_context_available() + async def _handle_bot_stopped_speaking(self): if self._assistant_is_responding: # Consider the assistant finished with their response. @@ -178,12 +210,16 @@ class AWSNovaSonicService(LLMService): # LLM communication: lifecycle # - async def _connect(self): + async def _start_connecting(self): try: if self._client: - # Here we assume that if we have a client we are connected + # Here we assume that if we have a client we are connected or connecting return + # Set IDs for the connection + self._prompt_name = str(uuid.uuid4()) + self._input_audio_content_name = str(uuid.uuid4()) + # Create the client self._client = self._create_client() @@ -195,19 +231,71 @@ class AWSNovaSonicService(LLMService): # Send session start events await self._send_session_start_events() - # Send initial system instruction - await self._send_text_event(text=self._instruction, role=Role.SYSTEM) - - # Start audio input - await self._send_audio_input_start_event() - - self._receive_task = self.create_task(self._receive_task_handler()) + # Finish connecting + self._ready_to_send_context = True + await self._finish_connecting_if_context_available() except Exception as e: logger.error(f"{self} initialization error: {e}") - self._client = None + self._disconnect() + + async def _finish_connecting_if_context_available(self): + # We can only finish connecting once we've gotten our initial context and we're ready to + # send it + if not (self._context_available and self._ready_to_send_context): + return + + # Read context + history = self._context.get_messages_for_initializing_history() + + # Send system instruction + # Instruction from context takes priority + instruction = history.instruction if history.instruction else self._instruction + if instruction: + await self._send_text_event(text=instruction, role=Role.SYSTEM) + + # Send conversation history + for message in history.messages: + await self._send_text_event(text=message.text, role=message.role) + + # Send initial context (system instruction and conversation history) + # TODO: finish implementing + # - pass additional message(s) + # - merge init-passed system instruction + context instruction (latter takes precedence) + # - merge init-passed tools + context tools (latter takes precedence) + await self._send_text_event(text=self._instruction, role=Role.SYSTEM) + + # Start audio input + await self._send_audio_input_start_event() + + # Start receiving events + self._receive_task = self.create_task(self._receive_task_handler()) async def _disconnect(self): - pass + try: + # Clean up receive task + if self._receive_task: + await self.cancel_task(self._receive_task, timeout=1.0) + self._receive_task = None + + # Clean up client + if self._client: + await self._send_session_end_events() + self._client = None + + # Clean up stream + if self._stream: + await self._stream.input_stream.close() + self._stream = None + + # Reset remaining connection-specific state + self._prompt_name = None + self._input_audio_content_name = None + self._content_being_received = None + self._assistant_is_responding = False + self._context_available = False + self._ready_to_send_context = False + except Exception as e: + logger.error(f"{self} error disconnecting: {e}") def _create_client(self) -> BedrockRuntimeClient: config = Config( @@ -340,7 +428,7 @@ class AWSNovaSonicService(LLMService): await self._send_client_event(text_content_end) async def _send_user_audio_event(self, frame: InputAudioRawFrame): - if not self._client: + if not self._stream: return blob = base64.b64encode(frame.audio) @@ -357,6 +445,30 @@ class AWSNovaSonicService(LLMService): ''' await self._send_client_event(audio_event) + async def _send_session_end_events(self): + if not self._stream: + return + + prompt_end = f''' + {{ + "event": {{ + "promptEnd": {{ + "promptName": "{self._prompt_name}" + }} + }} + }} + ''' + await self._send_client_event(prompt_end) + + session_end = """ + { + "event": { + "sessionEnd": {} + } + } + """ + await self._send_client_event(session_end) + async def _send_client_event(self, event_json: str): event = InvokeModelWithBidirectionalStreamInputChunk( value=BidirectionalInputPayloadPart(bytes_=event_json.encode("utf-8")) @@ -547,3 +659,18 @@ class AWSNovaSonicService(LLMService): await self.push_frame( TranscriptionFrame(text=text, user_id="", timestamp=time_now_iso8601()) ) + + # + # Context + # + + def create_context_aggregator( + self, + context: OpenAILLMContext, + *, + user_params: LLMUserAggregatorParams = LLMUserAggregatorParams(), + assistant_params: LLMAssistantAggregatorParams = LLMAssistantAggregatorParams(), + ) -> AWSNovaSonicContextAggregatorPair: + user = AWSNovaSonicUserContextAggregator(context=context, params=user_params) + assistant = AWSNovaSonicAssistantContextAggregator(context=context, params=assistant_params) + return AWSNovaSonicContextAggregatorPair(user, assistant) diff --git a/src/pipecat/services/aws_nova_sonic/context.py b/src/pipecat/services/aws_nova_sonic/context.py new file mode 100644 index 000000000..331ecc13e --- /dev/null +++ b/src/pipecat/services/aws_nova_sonic/context.py @@ -0,0 +1,121 @@ +import copy +from dataclasses import dataclass, field +from enum import Enum + +from loguru import logger + +from pipecat.frames.frames import DataFrame, Frame, LLMMessagesUpdateFrame, LLMSetToolsFrame +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.openai.llm import ( + OpenAIAssistantContextAggregator, + OpenAIUserContextAggregator, +) + + +class Role(Enum): + SYSTEM = "SYSTEM" + USER = "USER" + ASSISTANT = "ASSISTANT" + TOOL = "TOOL" + + +@dataclass +class AWSNovaSonicConversationHistoryMessage: + role: Role # only USER and ASSISTANT + text: str + + +@dataclass +class AWSNovaSonicConversationHistory: + instruction: str = None + messages: list[AWSNovaSonicConversationHistoryMessage] = field(default_factory=list) + + +@dataclass +class AWSNovaSonicLLMContext(OpenAILLMContext): + @staticmethod + def upgrade_to_nova_sonic(obj: OpenAILLMContext) -> "AWSNovaSonicLLMContext": + if isinstance(obj, OpenAILLMContext) and not isinstance(obj, AWSNovaSonicLLMContext): + obj.__class__ = AWSNovaSonicLLMContext + return obj + + def get_messages_for_initializing_history(self) -> AWSNovaSonicConversationHistory: + history = AWSNovaSonicConversationHistory() + + # Bail if there are no messages + if not self.messages: + return history + + messages = copy.deepcopy(self.messages) + + # If we have a "system" message as our first message, let's pull that out into "instruction" + if messages[0].get("role") == "system": + system = messages.pop(0) + content = system.get("content") + if isinstance(content, str): + history.instruction = content + elif isinstance(content, list): + history.instruction = content[0].get("text") + + # Process remaining messages to fill out conversation history. + # Nova Sonic supports "user" and "assistant" messages in history. + for message in messages: + history_message = self.from_standard_message(message) + if history_message: + history.messages.append(history_message) + + return history + + def from_standard_message(self, message) -> AWSNovaSonicConversationHistoryMessage: + role = message.get("role") + if message.get("role") == "user" or message.get("role") == "assistant": + content = message.get("content") + if isinstance(message.get("content"), list): + content = "" + for c in message.get("content"): + if c.get("type") == "text": + content += " " + c.get("text") + else: + logger.error( + f"Unhandled content type in context message: {c.get('type')} - {message}" + ) + return AWSNovaSonicConversationHistoryMessage(role=Role[role.upper()], text=content) + logger.error(f"Unhandled message type in from_standard_message: {message}") + + +@dataclass +class AWSNovaSonicMessagesUpdateFrame(DataFrame): + context: AWSNovaSonicLLMContext + + +class AWSNovaSonicUserContextAggregator(OpenAIUserContextAggregator): + async def process_frame( + self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM + ): + await super().process_frame(frame, direction) + + # Parent does not push LLMMessagesUpdateFrame + if isinstance(frame, LLMMessagesUpdateFrame): + await self.push_frame(AWSNovaSonicMessagesUpdateFrame(context=self._context)) + + # Parent also doesn't push the LLMSetToolsFrame + # TODO: this + # if isinstance(frame, LLMSetToolsFrame): + # await self.push_frame(frame, direction) + + +class AWSNovaSonicAssistantContextAggregator(OpenAIAssistantContextAggregator): + pass + + +@dataclass +class AWSNovaSonicContextAggregatorPair: + _user: AWSNovaSonicUserContextAggregator + _assistant: AWSNovaSonicAssistantContextAggregator + + def user(self) -> AWSNovaSonicUserContextAggregator: + return self._user + + def assistant(self) -> AWSNovaSonicAssistantContextAggregator: + return self._assistant