Update AWSNovaSonicLLMService docstrings

This commit is contained in:
Mark Backman
2025-06-26 11:21:18 -04:00
parent 2856372ad6
commit 9e518cf2ba
3 changed files with 239 additions and 0 deletions

View File

@@ -4,6 +4,12 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
"""AWS Nova Sonic LLM service implementation for Pipecat AI framework.
This module provides a speech-to-speech LLM service using AWS Nova Sonic, which supports
bidirectional audio streaming, text generation, and function calling capabilities.
"""
import asyncio
import base64
import json
@@ -83,22 +89,37 @@ except ModuleNotFoundError as e:
class AWSNovaSonicUnhandledFunctionException(Exception):
"""Exception raised when the LLM attempts to call an unregistered function."""
pass
class ContentType(Enum):
"""Content types supported by AWS Nova Sonic."""
AUDIO = "AUDIO"
TEXT = "TEXT"
TOOL = "TOOL"
class TextStage(Enum):
"""Text generation stages in AWS Nova Sonic responses."""
FINAL = "FINAL" # what has been said
SPECULATIVE = "SPECULATIVE" # what's planned to be said
@dataclass
class CurrentContent:
"""Represents content currently being received from AWS Nova Sonic.
Parameters:
type: The type of content (audio, text, or tool).
role: The role generating the content (user, assistant, etc.).
text_stage: The stage of text generation (final or speculative).
text_content: The actual text content if applicable.
"""
type: ContentType
role: Role
text_stage: TextStage # None if not text
@@ -115,6 +136,20 @@ class CurrentContent:
class Params(BaseModel):
"""Configuration parameters for AWS Nova Sonic.
Attributes:
input_sample_rate: Audio input sample rate in Hz.
input_sample_size: Audio input sample size in bits.
input_channel_count: Number of input audio channels.
output_sample_rate: Audio output sample rate in Hz.
output_sample_size: Audio output sample size in bits.
output_channel_count: Number of output audio channels.
max_tokens: Maximum number of tokens to generate.
top_p: Nucleus sampling parameter.
temperature: Sampling temperature for text generation.
"""
# Audio input
input_sample_rate: Optional[int] = Field(default=16000)
input_sample_size: Optional[int] = Field(default=16)
@@ -132,6 +167,24 @@ class Params(BaseModel):
class AWSNovaSonicLLMService(LLMService):
"""AWS Nova Sonic speech-to-speech LLM service.
Provides bidirectional audio streaming, real-time transcription, text generation,
and function calling capabilities using AWS Nova Sonic model.
Args:
secret_access_key: AWS secret access key for authentication.
access_key_id: AWS access key ID for authentication.
region: AWS region where the service is hosted.
model: Model identifier. Defaults to "amazon.nova-sonic-v1:0".
voice_id: Voice ID for speech synthesis. Options: matthew, tiffany, amy.
params: Model parameters for audio configuration and inference.
system_instruction: System-level instruction for the model.
tools: Available tools/functions for the model to use.
send_transcription_frames: Whether to emit transcription frames.
**kwargs: Additional arguments passed to the parent LLMService.
"""
# Override the default adapter to use the AWSNovaSonicLLMAdapter one
adapter_class = AWSNovaSonicLLMAdapter
@@ -188,16 +241,31 @@ class AWSNovaSonicLLMService(LLMService):
#
async def start(self, frame: StartFrame):
"""Start the service and initiate connection to AWS Nova Sonic.
Args:
frame: The start frame triggering service initialization.
"""
await super().start(frame)
self._wants_connection = True
await self._start_connecting()
async def stop(self, frame: EndFrame):
"""Stop the service and close connections.
Args:
frame: The end frame triggering service shutdown.
"""
await super().stop(frame)
self._wants_connection = False
await self._disconnect()
async def cancel(self, frame: CancelFrame):
"""Cancel the service and close connections.
Args:
frame: The cancel frame triggering service cancellation.
"""
await super().cancel(frame)
self._wants_connection = False
await self._disconnect()
@@ -207,6 +275,11 @@ class AWSNovaSonicLLMService(LLMService):
#
async def reset_conversation(self):
"""Reset the conversation state while preserving context.
Handles bot stopped speaking event, disconnects from the service,
and reconnects with the preserved context.
"""
logger.debug("Resetting conversation")
await self._handle_bot_stopped_speaking(delay_to_catch_trailing_assistant_text=False)
@@ -222,6 +295,12 @@ class AWSNovaSonicLLMService(LLMService):
#
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and handle service-specific logic.
Args:
frame: The frame to process.
direction: The direction the frame is traveling.
"""
await super().process_frame(frame, direction)
if isinstance(frame, OpenAILLMContextFrame):
@@ -960,6 +1039,16 @@ class AWSNovaSonicLLMService(LLMService):
user_params: LLMUserAggregatorParams = LLMUserAggregatorParams(),
assistant_params: LLMAssistantAggregatorParams = LLMAssistantAggregatorParams(),
) -> AWSNovaSonicContextAggregatorPair:
"""Create context aggregator pair for managing conversation context.
Args:
context: The OpenAI LLM context to upgrade.
user_params: Parameters for the user context aggregator.
assistant_params: Parameters for the assistant context aggregator.
Returns:
A pair of user and assistant context aggregators.
"""
context.set_llm_adapter(self.get_llm_adapter())
user = AWSNovaSonicUserContextAggregator(context=context, params=user_params)
@@ -978,6 +1067,14 @@ class AWSNovaSonicLLMService(LLMService):
)
async def trigger_assistant_response(self):
"""Trigger an assistant response by sending audio cue.
Sends a pre-recorded "ready" audio trigger to prompt the assistant
to start speaking. This is useful for controlling conversation flow.
Returns:
False if already triggering a response, True otherwise.
"""
if self._triggering_assistant_response:
return False

View File

@@ -4,6 +4,12 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Context management for AWS Nova Sonic LLM service.
This module provides specialized context aggregators and message handling for AWS Nova Sonic,
including conversation history management and role-specific message processing.
"""
import copy
from dataclasses import dataclass, field
from enum import Enum
@@ -35,6 +41,8 @@ from pipecat.services.openai.llm import (
class Role(Enum):
"""Roles supported in AWS Nova Sonic conversations."""
SYSTEM = "SYSTEM"
USER = "USER"
ASSISTANT = "ASSISTANT"
@@ -43,17 +51,42 @@ class Role(Enum):
@dataclass
class AWSNovaSonicConversationHistoryMessage:
"""A single message in AWS Nova Sonic conversation history.
Parameters:
role: The role of the message sender (USER or ASSISTANT only).
text: The text content of the message.
"""
role: Role # only USER and ASSISTANT
text: str
@dataclass
class AWSNovaSonicConversationHistory:
"""Complete conversation history for AWS Nova Sonic initialization.
Parameters:
system_instruction: System-level instruction for the conversation.
messages: List of conversation messages between user and assistant.
"""
system_instruction: str = None
messages: list[AWSNovaSonicConversationHistoryMessage] = field(default_factory=list)
class AWSNovaSonicLLMContext(OpenAILLMContext):
"""Specialized LLM context for AWS Nova Sonic service.
Extends OpenAI context with Nova Sonic-specific message handling,
conversation history management, and text buffering capabilities.
Args:
messages: Initial messages for the context.
tools: Available tools for the context.
**kwargs: Additional arguments passed to parent class.
"""
def __init__(self, messages=None, tools=None, **kwargs):
super().__init__(messages=messages, tools=tools, **kwargs)
self.__setup_local()
@@ -67,6 +100,15 @@ class AWSNovaSonicLLMContext(OpenAILLMContext):
def upgrade_to_nova_sonic(
obj: OpenAILLMContext, system_instruction: str
) -> "AWSNovaSonicLLMContext":
"""Upgrade an OpenAI context to AWS Nova Sonic context.
Args:
obj: The OpenAI context to upgrade.
system_instruction: System instruction for the context.
Returns:
The upgraded AWS Nova Sonic context.
"""
if isinstance(obj, OpenAILLMContext) and not isinstance(obj, AWSNovaSonicLLMContext):
obj.__class__ = AWSNovaSonicLLMContext
obj.__setup_local(system_instruction)
@@ -74,6 +116,14 @@ class AWSNovaSonicLLMContext(OpenAILLMContext):
# NOTE: this method has the side-effect of updating _system_instruction from messages
def get_messages_for_initializing_history(self) -> AWSNovaSonicConversationHistory:
"""Get conversation history for initializing AWS Nova Sonic session.
Processes stored messages and extracts system instruction and conversation
history in the format expected by AWS Nova Sonic.
Returns:
Formatted conversation history with system instruction and messages.
"""
history = AWSNovaSonicConversationHistory(system_instruction=self._system_instruction)
# Bail if there are no messages
@@ -103,6 +153,11 @@ class AWSNovaSonicLLMContext(OpenAILLMContext):
return history
def get_messages_for_persistent_storage(self):
"""Get messages formatted for persistent storage.
Returns:
List of messages including system instruction if present.
"""
messages = super().get_messages_for_persistent_storage()
# If we have a system instruction and messages doesn't already contain it, add it
if self._system_instruction and not (messages and messages[0].get("role") == "system"):
@@ -110,6 +165,14 @@ class AWSNovaSonicLLMContext(OpenAILLMContext):
return messages
def from_standard_message(self, message) -> AWSNovaSonicConversationHistoryMessage:
"""Convert standard message format to Nova Sonic format.
Args:
message: Standard message dictionary to convert.
Returns:
Nova Sonic conversation history message, or None if not convertible.
"""
role = message.get("role")
if message.get("role") == "user" or message.get("role") == "assistant":
content = message.get("content")
@@ -131,10 +194,20 @@ class AWSNovaSonicLLMContext(OpenAILLMContext):
# Sonic conversation history
def buffer_user_text(self, text):
"""Buffer user text for later flushing to context.
Args:
text: User text to buffer.
"""
self._user_text += f" {text}" if self._user_text else text
# logger.debug(f"User text buffered: {self._user_text}")
def flush_aggregated_user_text(self) -> str:
"""Flush buffered user text to context as a complete message.
Returns:
The flushed user text, or empty string if no text was buffered.
"""
if not self._user_text:
return ""
user_text = self._user_text
@@ -148,10 +221,16 @@ class AWSNovaSonicLLMContext(OpenAILLMContext):
return user_text
def buffer_assistant_text(self, text):
"""Buffer assistant text for later flushing to context.
Args:
text: Assistant text to buffer.
"""
self._assistant_text += text
# logger.debug(f"Assistant text buffered: {self._assistant_text}")
def flush_aggregated_assistant_text(self):
"""Flush buffered assistant text to context as a complete message."""
if not self._assistant_text:
return
message = {
@@ -165,13 +244,31 @@ class AWSNovaSonicLLMContext(OpenAILLMContext):
@dataclass
class AWSNovaSonicMessagesUpdateFrame(DataFrame):
"""Frame containing updated AWS Nova Sonic context.
Parameters:
context: The updated AWS Nova Sonic LLM context.
"""
context: AWSNovaSonicLLMContext
class AWSNovaSonicUserContextAggregator(OpenAIUserContextAggregator):
"""Context aggregator for user messages in AWS Nova Sonic conversations.
Extends the OpenAI user context aggregator to emit Nova Sonic-specific
context update frames.
"""
async def process_frame(
self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM
):
"""Process frames and emit Nova Sonic-specific context updates.
Args:
frame: The frame to process.
direction: The direction the frame is traveling.
"""
await super().process_frame(frame, direction)
# Parent does not push LLMMessagesUpdateFrame
@@ -180,7 +277,19 @@ class AWSNovaSonicUserContextAggregator(OpenAIUserContextAggregator):
class AWSNovaSonicAssistantContextAggregator(OpenAIAssistantContextAggregator):
"""Context aggregator for assistant messages in AWS Nova Sonic conversations.
Provides specialized handling for assistant responses and function calls
in AWS Nova Sonic context, with custom frame processing logic.
"""
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames with Nova Sonic-specific logic.
Args:
frame: The frame to process.
direction: The direction the frame is traveling.
"""
# HACK: For now, disable the context aggregator by making it just pass through all frames
# that the parent handles (except the function call stuff, which we still need).
# For an explanation of this hack, see
@@ -205,6 +314,11 @@ class AWSNovaSonicAssistantContextAggregator(OpenAIAssistantContextAggregator):
await super().process_frame(frame, direction)
async def handle_function_call_result(self, frame: FunctionCallResultFrame):
"""Handle function call results for AWS Nova Sonic.
Args:
frame: The function call result frame to handle.
"""
await super().handle_function_call_result(frame)
# The standard function callback code path pushes the FunctionCallResultFrame from the LLM
@@ -217,11 +331,28 @@ class AWSNovaSonicAssistantContextAggregator(OpenAIAssistantContextAggregator):
@dataclass
class AWSNovaSonicContextAggregatorPair:
"""Pair of user and assistant context aggregators for AWS Nova Sonic.
Parameters:
_user: The user context aggregator.
_assistant: The assistant context aggregator.
"""
_user: AWSNovaSonicUserContextAggregator
_assistant: AWSNovaSonicAssistantContextAggregator
def user(self) -> AWSNovaSonicUserContextAggregator:
"""Get the user context aggregator.
Returns:
The user context aggregator instance.
"""
return self._user
def assistant(self) -> AWSNovaSonicAssistantContextAggregator:
"""Get the assistant context aggregator.
Returns:
The assistant context aggregator instance.
"""
return self._assistant

View File

@@ -4,6 +4,8 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Custom frames for AWS Nova Sonic LLM service."""
from dataclasses import dataclass
from pipecat.frames.frames import DataFrame, FunctionCallResultFrame
@@ -11,4 +13,13 @@ from pipecat.frames.frames import DataFrame, FunctionCallResultFrame
@dataclass
class AWSNovaSonicFunctionCallResultFrame(DataFrame):
"""Frame containing function call result for AWS Nova Sonic processing.
This frame wraps a standard function call result frame to enable
AWS Nova Sonic-specific handling and context updates.
Parameters:
result_frame: The underlying function call result frame.
"""
result_frame: FunctionCallResultFrame