Merge pull request #3212 from pipecat-ai/pk/nova-2-sonic

Nova 2 Sonic support
This commit is contained in:
kompfner
2025-12-11 09:36:50 -05:00
committed by GitHub
5 changed files with 110 additions and 24 deletions

6
changelog/3212.added.md Normal file
View File

@@ -0,0 +1,6 @@
- Added to `AWSNovaSonicLLMService` functionality related to the new (and now
default) Nova 2 Sonic model (`"amazon.nova-2-sonic-v1:0"`):
- Added the `endpointing_sensitivity` parameter to control how quickly the
model decides the user has stopped speaking.
- Made the assistant-response-trigger hack a no-op. It's only needed for the
older Nova Sonic model.

View File

@@ -0,0 +1 @@
- Made `"amazon.nova-2-sonic-v1:0"` the new default model for `AWSNovaSonicLLMService`.

2
changelog/3212.fixed.md Normal file
View File

@@ -0,0 +1,2 @@
- Fixed a bug in `AWSNovaSonicLLMService` where we would mishandle cancelled
tool calls in the context, resulting in errors.

View File

@@ -5,7 +5,9 @@
#
import asyncio
import os
import random
from datetime import datetime
from dotenv import load_dotenv
@@ -33,11 +35,21 @@ load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
temperature = 75 if params.arguments["format"] == "fahrenheit" else 24
temperature = (
random.randint(60, 85)
if params.arguments["format"] == "fahrenheit"
else random.randint(15, 30)
)
# Simulate a long network delay.
# You can continue chatting while waiting for this to complete.
# With Nova 2 Sonic (the default model), the assistant will respond
# appropriately once the function call is complete.
await asyncio.sleep(5)
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"location": params.arguments["location"],
"format": params.arguments["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
@@ -91,23 +103,31 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
# Specify initial system instruction.
# HACK: note that, for now, we need to inject a special bit of text into this instruction to
# allow the first assistant response to be programmatically triggered (which happens in the
# on_client_connected handler, below)
system_instruction = (
"You are a friendly assistant. The user and you will engage in a spoken dialog exchanging "
"the transcripts of a natural real-time conversation. Keep your responses short, generally "
"two or three sentences for chatty scenarios. "
f"{AWSNovaSonicLLMService.AWAIT_TRIGGER_ASSISTANT_RESPONSE_INSTRUCTION}"
"two or three sentences for chatty scenarios."
# HACK: if using the older Nova Sonic (pre-2) model, note that you need to inject a special
# bit of text into this instruction to allow the first assistant response to be
# programmatically triggered (which happens in the on_client_connected handler)
# f"{AWSNovaSonicLLMService.AWAIT_TRIGGER_ASSISTANT_RESPONSE_INSTRUCTION}"
)
# Create the AWS Nova Sonic LLM service
llm = AWSNovaSonicLLMService(
secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
region=os.getenv("AWS_REGION"), # as of 2025-05-06, us-east-1 is the only supported region
# as of 2025-12-09, these are the supported regions:
# - Nova 2 Sonic (the default model):
# - us-east-1
# - us-west-2
# - ap-northeast-1
# - Nova Sonic (the older model):
# - us-east-1
# - ap-northeast-1
region=os.getenv("AWS_REGION"),
session_token=os.getenv("AWS_SESSION_TOKEN"),
voice_id="tiffany", # matthew, tiffany, amy
voice_id="tiffany",
# you could choose to pass instruction here rather than via context
# system_instruction=system_instruction
# you could choose to pass tools here rather than via context
@@ -117,7 +137,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# Register function for function calls
# you can either register a single function for all function calls, or specific functions
# llm.register_function(None, fetch_weather_from_api)
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function(
"get_current_weather", fetch_weather_from_api, cancel_on_interruption=False
)
# Set up context and context management.
context = LLMContext(
@@ -159,10 +181,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Client connected")
# Kick off the conversation.
await task.queue_frames([LLMRunFrame()])
# HACK: for now, we need this special way of triggering the first assistant response in AWS
# Nova Sonic. Note that this trigger requires a special corresponding bit of text in the
# system instruction. In the future, simply queueing the context frame should be sufficient.
await llm.trigger_assistant_response()
# HACK: if using the older Nova Sonic (pre-2) model, you need this special way of
# triggering the first assistant response. Note that this trigger requires a special
# corresponding bit of text in the system instruction.
# await llm.trigger_assistant_response()
# Handle client disconnection events
@transport.event_handler("on_client_disconnected")

View File

@@ -157,6 +157,12 @@ class Params(BaseModel):
max_tokens: Maximum number of tokens to generate.
top_p: Nucleus sampling parameter.
temperature: Sampling temperature for text generation.
endpointing_sensitivity: Controls how quickly Nova Sonic decides the
user has stopped speaking. Can be "LOW", "MEDIUM", or "HIGH", with
"HIGH" being the most sensitive (i.e., causing the model to respond
most quickly).
If not set, uses the model's default behavior.
Only supported with Nova 2 Sonic (the default model).
"""
# Audio input
@@ -174,6 +180,9 @@ class Params(BaseModel):
top_p: Optional[float] = Field(default=0.9)
temperature: Optional[float] = Field(default=0.7)
# Turn-taking
endpointing_sensitivity: Optional[str] = Field(default=None)
class AWSNovaSonicLLMService(LLMService):
"""AWS Nova Sonic speech-to-speech LLM service.
@@ -192,8 +201,8 @@ class AWSNovaSonicLLMService(LLMService):
access_key_id: str,
session_token: Optional[str] = None,
region: str,
model: str = "amazon.nova-sonic-v1:0",
voice_id: str = "matthew", # matthew, tiffany, amy
model: str = "amazon.nova-2-sonic-v1:0",
voice_id: str = "matthew",
params: Optional[Params] = None,
system_instruction: Optional[str] = None,
tools: Optional[ToolsSchema] = None,
@@ -207,8 +216,15 @@ class AWSNovaSonicLLMService(LLMService):
access_key_id: AWS access key ID for authentication.
session_token: AWS session token for authentication.
region: AWS region where the service is hosted.
model: Model identifier. Defaults to "amazon.nova-sonic-v1:0".
voice_id: Voice ID for speech synthesis. Options: matthew, tiffany, amy.
Supported regions:
- Nova 2 Sonic (the default model): "us-east-1", "us-west-2", "ap-northeast-1"
- Nova Sonic (the older model): "us-east-1", "ap-northeast-1"
model: Model identifier. Defaults to "amazon.nova-2-sonic-v1:0".
voice_id: Voice ID for speech synthesis.
Note that some voices are designed for use with a specific language.
Options:
- Nova 2 Sonic (the default model): see https://docs.aws.amazon.com/nova/latest/nova2-userguide/sonic-language-support.html
- Nova Sonic (the older model): see https://docs.aws.amazon.com/nova/latest/userguide/available-voices.html.
params: Model parameters for audio configuration and inference.
system_instruction: System-level instruction for the model.
tools: Available tools/functions for the model to use.
@@ -232,6 +248,17 @@ class AWSNovaSonicLLMService(LLMService):
self._system_instruction = system_instruction
self._tools = tools
# Validate endpointing_sensitivity parameter
if (
self._params.endpointing_sensitivity
and not self._is_endpointing_sensitivity_supported()
):
logger.warning(
f"endpointing_sensitivity is not supported for model '{model}' and will be ignored. "
"This parameter is only supported starting with Nova 2 Sonic (amazon.nova-2-sonic-v1:0)."
)
self._params.endpointing_sensitivity = None
if not send_transcription_frames:
import warnings
@@ -459,7 +486,7 @@ class AWSNovaSonicLLMService(LLMService):
async def _process_completed_function_calls(self, send_new_results: bool):
# Check for set of completed function calls in the context
for message in self._context.get_messages():
if message.get("role") and message.get("content") != "IN_PROGRESS":
if message.get("role") and message.get("content") not in ["IN_PROGRESS", "CANCELLED"]:
tool_call_id = message.get("tool_call_id")
if tool_call_id and tool_call_id not in self._completed_tool_calls:
# Found a newly-completed function call - send the result to the service
@@ -591,11 +618,33 @@ class AWSNovaSonicLLMService(LLMService):
)
return BedrockRuntimeClient(config=config)
def _is_first_generation_sonic_model(self) -> bool:
# Nova Sonic (the older model) is identified by "amazon.nova-sonic-v1:0"
return self._model == "amazon.nova-sonic-v1:0"
def _is_endpointing_sensitivity_supported(self) -> bool:
# endpointing_sensitivity is only supported with Nova 2 Sonic (and,
# presumably, future models)
return not self._is_first_generation_sonic_model()
def _is_assistant_response_trigger_needed(self) -> bool:
# Assistant response trigger audio is only needed with the older model
return self._is_first_generation_sonic_model()
#
# LLM communication: input events (pipecat -> LLM)
#
async def _send_session_start_event(self):
turn_detection_config = (
f""",
"turnDetectionConfiguration": {{
"endpointingSensitivity": "{self._params.endpointing_sensitivity}"
}}"""
if self._params.endpointing_sensitivity
else ""
)
session_start = f"""
{{
"event": {{
@@ -604,7 +653,7 @@ class AWSNovaSonicLLMService(LLMService):
"maxTokens": {self._params.max_tokens},
"topP": {self._params.top_p},
"temperature": {self._params.temperature}
}}
}}{turn_detection_config}
}}
}}
}}
@@ -1189,7 +1238,8 @@ class AWSNovaSonicLLMService(LLMService):
)
#
# assistant response trigger (HACK)
# assistant response trigger
# HACK: only needed for the older Nova Sonic (as opposed to Nova 2 Sonic) model
#
# Class variable
@@ -1203,12 +1253,17 @@ class AWSNovaSonicLLMService(LLMService):
Sends a pre-recorded "ready" audio trigger to prompt the assistant
to start speaking. This is useful for controlling conversation flow.
Returns:
False if already triggering a response, True otherwise.
"""
if not self._is_assistant_response_trigger_needed():
logger.warning(
f"Assistant response trigger not needed for model '{self._model}'; skipping. "
"An LLMRunFrame() should be sufficient to prompt the assistant to respond, "
"assuming the context ends in a user message."
)
return
if self._triggering_assistant_response:
return False
return
self._triggering_assistant_response = True