From 9e518cf2baced2ef5f24d14a22cc0419146fb7a5 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 26 Jun 2025 11:21:18 -0400 Subject: [PATCH] Update AWSNovaSonicLLMService docstrings --- src/pipecat/services/aws_nova_sonic/aws.py | 97 +++++++++++++ .../services/aws_nova_sonic/context.py | 131 ++++++++++++++++++ src/pipecat/services/aws_nova_sonic/frames.py | 11 ++ 3 files changed, 239 insertions(+) diff --git a/src/pipecat/services/aws_nova_sonic/aws.py b/src/pipecat/services/aws_nova_sonic/aws.py index 93eb77e90..c6ee1d6c7 100644 --- a/src/pipecat/services/aws_nova_sonic/aws.py +++ b/src/pipecat/services/aws_nova_sonic/aws.py @@ -4,6 +4,12 @@ # SPDX-License-Identifier: BSD 2-Clause License # +"""AWS Nova Sonic LLM service implementation for Pipecat AI framework. + +This module provides a speech-to-speech LLM service using AWS Nova Sonic, which supports +bidirectional audio streaming, text generation, and function calling capabilities. +""" + import asyncio import base64 import json @@ -83,22 +89,37 @@ except ModuleNotFoundError as e: class AWSNovaSonicUnhandledFunctionException(Exception): + """Exception raised when the LLM attempts to call an unregistered function.""" + pass class ContentType(Enum): + """Content types supported by AWS Nova Sonic.""" + AUDIO = "AUDIO" TEXT = "TEXT" TOOL = "TOOL" class TextStage(Enum): + """Text generation stages in AWS Nova Sonic responses.""" + FINAL = "FINAL" # what has been said SPECULATIVE = "SPECULATIVE" # what's planned to be said @dataclass class CurrentContent: + """Represents content currently being received from AWS Nova Sonic. + + Parameters: + type: The type of content (audio, text, or tool). + role: The role generating the content (user, assistant, etc.). + text_stage: The stage of text generation (final or speculative). + text_content: The actual text content if applicable. + """ + type: ContentType role: Role text_stage: TextStage # None if not text @@ -115,6 +136,20 @@ class CurrentContent: class Params(BaseModel): + """Configuration parameters for AWS Nova Sonic. + + Attributes: + input_sample_rate: Audio input sample rate in Hz. + input_sample_size: Audio input sample size in bits. + input_channel_count: Number of input audio channels. + output_sample_rate: Audio output sample rate in Hz. + output_sample_size: Audio output sample size in bits. + output_channel_count: Number of output audio channels. + max_tokens: Maximum number of tokens to generate. + top_p: Nucleus sampling parameter. + temperature: Sampling temperature for text generation. + """ + # Audio input input_sample_rate: Optional[int] = Field(default=16000) input_sample_size: Optional[int] = Field(default=16) @@ -132,6 +167,24 @@ class Params(BaseModel): class AWSNovaSonicLLMService(LLMService): + """AWS Nova Sonic speech-to-speech LLM service. + + Provides bidirectional audio streaming, real-time transcription, text generation, + and function calling capabilities using AWS Nova Sonic model. + + Args: + secret_access_key: AWS secret access key for authentication. + access_key_id: AWS access key ID for authentication. + region: AWS region where the service is hosted. + model: Model identifier. Defaults to "amazon.nova-sonic-v1:0". + voice_id: Voice ID for speech synthesis. Options: matthew, tiffany, amy. + params: Model parameters for audio configuration and inference. + system_instruction: System-level instruction for the model. + tools: Available tools/functions for the model to use. + send_transcription_frames: Whether to emit transcription frames. + **kwargs: Additional arguments passed to the parent LLMService. + """ + # Override the default adapter to use the AWSNovaSonicLLMAdapter one adapter_class = AWSNovaSonicLLMAdapter @@ -188,16 +241,31 @@ class AWSNovaSonicLLMService(LLMService): # async def start(self, frame: StartFrame): + """Start the service and initiate connection to AWS Nova Sonic. + + Args: + frame: The start frame triggering service initialization. + """ await super().start(frame) self._wants_connection = True await self._start_connecting() async def stop(self, frame: EndFrame): + """Stop the service and close connections. + + Args: + frame: The end frame triggering service shutdown. + """ await super().stop(frame) self._wants_connection = False await self._disconnect() async def cancel(self, frame: CancelFrame): + """Cancel the service and close connections. + + Args: + frame: The cancel frame triggering service cancellation. + """ await super().cancel(frame) self._wants_connection = False await self._disconnect() @@ -207,6 +275,11 @@ class AWSNovaSonicLLMService(LLMService): # async def reset_conversation(self): + """Reset the conversation state while preserving context. + + Handles bot stopped speaking event, disconnects from the service, + and reconnects with the preserved context. + """ logger.debug("Resetting conversation") await self._handle_bot_stopped_speaking(delay_to_catch_trailing_assistant_text=False) @@ -222,6 +295,12 @@ class AWSNovaSonicLLMService(LLMService): # async def process_frame(self, frame: Frame, direction: FrameDirection): + """Process incoming frames and handle service-specific logic. + + Args: + frame: The frame to process. + direction: The direction the frame is traveling. + """ await super().process_frame(frame, direction) if isinstance(frame, OpenAILLMContextFrame): @@ -960,6 +1039,16 @@ class AWSNovaSonicLLMService(LLMService): user_params: LLMUserAggregatorParams = LLMUserAggregatorParams(), assistant_params: LLMAssistantAggregatorParams = LLMAssistantAggregatorParams(), ) -> AWSNovaSonicContextAggregatorPair: + """Create context aggregator pair for managing conversation context. + + Args: + context: The OpenAI LLM context to upgrade. + user_params: Parameters for the user context aggregator. + assistant_params: Parameters for the assistant context aggregator. + + Returns: + A pair of user and assistant context aggregators. + """ context.set_llm_adapter(self.get_llm_adapter()) user = AWSNovaSonicUserContextAggregator(context=context, params=user_params) @@ -978,6 +1067,14 @@ class AWSNovaSonicLLMService(LLMService): ) async def trigger_assistant_response(self): + """Trigger an assistant response by sending audio cue. + + Sends a pre-recorded "ready" audio trigger to prompt the assistant + to start speaking. This is useful for controlling conversation flow. + + Returns: + False if already triggering a response, True otherwise. + """ if self._triggering_assistant_response: return False diff --git a/src/pipecat/services/aws_nova_sonic/context.py b/src/pipecat/services/aws_nova_sonic/context.py index 95f330f61..327da4e40 100644 --- a/src/pipecat/services/aws_nova_sonic/context.py +++ b/src/pipecat/services/aws_nova_sonic/context.py @@ -4,6 +4,12 @@ # SPDX-License-Identifier: BSD 2-Clause License # +"""Context management for AWS Nova Sonic LLM service. + +This module provides specialized context aggregators and message handling for AWS Nova Sonic, +including conversation history management and role-specific message processing. +""" + import copy from dataclasses import dataclass, field from enum import Enum @@ -35,6 +41,8 @@ from pipecat.services.openai.llm import ( class Role(Enum): + """Roles supported in AWS Nova Sonic conversations.""" + SYSTEM = "SYSTEM" USER = "USER" ASSISTANT = "ASSISTANT" @@ -43,17 +51,42 @@ class Role(Enum): @dataclass class AWSNovaSonicConversationHistoryMessage: + """A single message in AWS Nova Sonic conversation history. + + Parameters: + role: The role of the message sender (USER or ASSISTANT only). + text: The text content of the message. + """ + role: Role # only USER and ASSISTANT text: str @dataclass class AWSNovaSonicConversationHistory: + """Complete conversation history for AWS Nova Sonic initialization. + + Parameters: + system_instruction: System-level instruction for the conversation. + messages: List of conversation messages between user and assistant. + """ + system_instruction: str = None messages: list[AWSNovaSonicConversationHistoryMessage] = field(default_factory=list) class AWSNovaSonicLLMContext(OpenAILLMContext): + """Specialized LLM context for AWS Nova Sonic service. + + Extends OpenAI context with Nova Sonic-specific message handling, + conversation history management, and text buffering capabilities. + + Args: + messages: Initial messages for the context. + tools: Available tools for the context. + **kwargs: Additional arguments passed to parent class. + """ + def __init__(self, messages=None, tools=None, **kwargs): super().__init__(messages=messages, tools=tools, **kwargs) self.__setup_local() @@ -67,6 +100,15 @@ class AWSNovaSonicLLMContext(OpenAILLMContext): def upgrade_to_nova_sonic( obj: OpenAILLMContext, system_instruction: str ) -> "AWSNovaSonicLLMContext": + """Upgrade an OpenAI context to AWS Nova Sonic context. + + Args: + obj: The OpenAI context to upgrade. + system_instruction: System instruction for the context. + + Returns: + The upgraded AWS Nova Sonic context. + """ if isinstance(obj, OpenAILLMContext) and not isinstance(obj, AWSNovaSonicLLMContext): obj.__class__ = AWSNovaSonicLLMContext obj.__setup_local(system_instruction) @@ -74,6 +116,14 @@ class AWSNovaSonicLLMContext(OpenAILLMContext): # NOTE: this method has the side-effect of updating _system_instruction from messages def get_messages_for_initializing_history(self) -> AWSNovaSonicConversationHistory: + """Get conversation history for initializing AWS Nova Sonic session. + + Processes stored messages and extracts system instruction and conversation + history in the format expected by AWS Nova Sonic. + + Returns: + Formatted conversation history with system instruction and messages. + """ history = AWSNovaSonicConversationHistory(system_instruction=self._system_instruction) # Bail if there are no messages @@ -103,6 +153,11 @@ class AWSNovaSonicLLMContext(OpenAILLMContext): return history def get_messages_for_persistent_storage(self): + """Get messages formatted for persistent storage. + + Returns: + List of messages including system instruction if present. + """ messages = super().get_messages_for_persistent_storage() # If we have a system instruction and messages doesn't already contain it, add it if self._system_instruction and not (messages and messages[0].get("role") == "system"): @@ -110,6 +165,14 @@ class AWSNovaSonicLLMContext(OpenAILLMContext): return messages def from_standard_message(self, message) -> AWSNovaSonicConversationHistoryMessage: + """Convert standard message format to Nova Sonic format. + + Args: + message: Standard message dictionary to convert. + + Returns: + Nova Sonic conversation history message, or None if not convertible. + """ role = message.get("role") if message.get("role") == "user" or message.get("role") == "assistant": content = message.get("content") @@ -131,10 +194,20 @@ class AWSNovaSonicLLMContext(OpenAILLMContext): # Sonic conversation history def buffer_user_text(self, text): + """Buffer user text for later flushing to context. + + Args: + text: User text to buffer. + """ self._user_text += f" {text}" if self._user_text else text # logger.debug(f"User text buffered: {self._user_text}") def flush_aggregated_user_text(self) -> str: + """Flush buffered user text to context as a complete message. + + Returns: + The flushed user text, or empty string if no text was buffered. + """ if not self._user_text: return "" user_text = self._user_text @@ -148,10 +221,16 @@ class AWSNovaSonicLLMContext(OpenAILLMContext): return user_text def buffer_assistant_text(self, text): + """Buffer assistant text for later flushing to context. + + Args: + text: Assistant text to buffer. + """ self._assistant_text += text # logger.debug(f"Assistant text buffered: {self._assistant_text}") def flush_aggregated_assistant_text(self): + """Flush buffered assistant text to context as a complete message.""" if not self._assistant_text: return message = { @@ -165,13 +244,31 @@ class AWSNovaSonicLLMContext(OpenAILLMContext): @dataclass class AWSNovaSonicMessagesUpdateFrame(DataFrame): + """Frame containing updated AWS Nova Sonic context. + + Parameters: + context: The updated AWS Nova Sonic LLM context. + """ + context: AWSNovaSonicLLMContext class AWSNovaSonicUserContextAggregator(OpenAIUserContextAggregator): + """Context aggregator for user messages in AWS Nova Sonic conversations. + + Extends the OpenAI user context aggregator to emit Nova Sonic-specific + context update frames. + """ + async def process_frame( self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM ): + """Process frames and emit Nova Sonic-specific context updates. + + Args: + frame: The frame to process. + direction: The direction the frame is traveling. + """ await super().process_frame(frame, direction) # Parent does not push LLMMessagesUpdateFrame @@ -180,7 +277,19 @@ class AWSNovaSonicUserContextAggregator(OpenAIUserContextAggregator): class AWSNovaSonicAssistantContextAggregator(OpenAIAssistantContextAggregator): + """Context aggregator for assistant messages in AWS Nova Sonic conversations. + + Provides specialized handling for assistant responses and function calls + in AWS Nova Sonic context, with custom frame processing logic. + """ + async def process_frame(self, frame: Frame, direction: FrameDirection): + """Process frames with Nova Sonic-specific logic. + + Args: + frame: The frame to process. + direction: The direction the frame is traveling. + """ # HACK: For now, disable the context aggregator by making it just pass through all frames # that the parent handles (except the function call stuff, which we still need). # For an explanation of this hack, see @@ -205,6 +314,11 @@ class AWSNovaSonicAssistantContextAggregator(OpenAIAssistantContextAggregator): await super().process_frame(frame, direction) async def handle_function_call_result(self, frame: FunctionCallResultFrame): + """Handle function call results for AWS Nova Sonic. + + Args: + frame: The function call result frame to handle. + """ await super().handle_function_call_result(frame) # The standard function callback code path pushes the FunctionCallResultFrame from the LLM @@ -217,11 +331,28 @@ class AWSNovaSonicAssistantContextAggregator(OpenAIAssistantContextAggregator): @dataclass class AWSNovaSonicContextAggregatorPair: + """Pair of user and assistant context aggregators for AWS Nova Sonic. + + Parameters: + _user: The user context aggregator. + _assistant: The assistant context aggregator. + """ + _user: AWSNovaSonicUserContextAggregator _assistant: AWSNovaSonicAssistantContextAggregator def user(self) -> AWSNovaSonicUserContextAggregator: + """Get the user context aggregator. + + Returns: + The user context aggregator instance. + """ return self._user def assistant(self) -> AWSNovaSonicAssistantContextAggregator: + """Get the assistant context aggregator. + + Returns: + The assistant context aggregator instance. + """ return self._assistant diff --git a/src/pipecat/services/aws_nova_sonic/frames.py b/src/pipecat/services/aws_nova_sonic/frames.py index 94d410f22..7d4feb2ae 100644 --- a/src/pipecat/services/aws_nova_sonic/frames.py +++ b/src/pipecat/services/aws_nova_sonic/frames.py @@ -4,6 +4,8 @@ # SPDX-License-Identifier: BSD 2-Clause License # +"""Custom frames for AWS Nova Sonic LLM service.""" + from dataclasses import dataclass from pipecat.frames.frames import DataFrame, FunctionCallResultFrame @@ -11,4 +13,13 @@ from pipecat.frames.frames import DataFrame, FunctionCallResultFrame @dataclass class AWSNovaSonicFunctionCallResultFrame(DataFrame): + """Frame containing function call result for AWS Nova Sonic processing. + + This frame wraps a standard function call result frame to enable + AWS Nova Sonic-specific handling and context updates. + + Parameters: + result_frame: The underlying function call result frame. + """ + result_frame: FunctionCallResultFrame