feat: support new async-tool mechanism in AWS Nova Sonic

Add support to AWSNovaSonicLLMService for the new "async tool call"
mechanism activated by `cancel_on_interruption=False`, which includes:

- delivering results asynchronously
- delivering result streams
- cancelling running async tools

Note that the introduction of the new mechanism had actually caused a
regression in AWS Nova Sonic, which previously supported
`cancel_on_interruption=False` with the old mechanism (simply avoiding
discarding tool calls on interruptions).

Support for the other major realtime services (`GeminiLiveLLMService`,
`OpenAIRealtimeLLMService`) will follow in a separate PR — Gemini Live
in particular needs more work before it can support long-running tool
calls reliably.
This commit is contained in:
Paul Kompfner
2026-05-01 15:08:06 -04:00
parent a745e8d318
commit 7d3726a74b
9 changed files with 964 additions and 52 deletions

1
changelog/4425.added.md Normal file
View File

@@ -0,0 +1 @@
- Added support to `AWSNovaSonicLLMService` for the new "async tool call" mechanism activated by `cancel_on_interruption=False`, which includes delivering results asynchronously, delivering result streams, and cancelling running async tools. Support for the other major realtime services (`GeminiLiveLLMService`, `OpenAIRealtimeLLMService`) will be added in a follow-up PR.

1
changelog/4425.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed a regression in `AWSNovaSonicLLMService` where `cancel_on_interruption=False` (which previously worked under the old async-tool-call mechanism, by simply avoiding discarding tool calls on interruptions) stopped working after the introduction of the new "async tool call" mechanism.

View File

@@ -0,0 +1,182 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: streaming async function call with the AWS Nova Sonic LLM service.
The ``track_current_location`` tool simulates a GPS tracker reporting the
device's position during a road trip from San Francisco to San Diego. It
sends two intermediate updates (via ``params.result_callback`` with
``is_final=False``) as the vehicle passes through cities along the way, then
delivers the final destination.
The placeholder is sent as a formal Nova Sonic ``toolResult``; each
intermediate result is forwarded as a cross-modal user-role text input event
so the model can fold each update into its next turn.
"""
import asyncio
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import FunctionCallResultProperties, LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.aws.nova_sonic.llm import AWSNovaSonicLLMService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def track_current_location(params: FunctionCallParams):
"""Simulate a GPS tracker reporting position during a road trip."""
gps = {"lat": 37.7310, "lng": -122.4527}
await params.result_callback(
{"gps": gps, "city": "San Francisco"},
properties=FunctionCallResultProperties(is_final=False),
)
await asyncio.sleep(10)
gps = {"lat": 33.96003, "lng": -118.40639}
await params.result_callback(
{"gps": gps, "city": "Los Angeles"},
properties=FunctionCallResultProperties(is_final=False),
)
await asyncio.sleep(10)
gps = {"lat": 32.743569, "lng": -117.20466}
await params.result_callback({"gps": gps, "city": "San Diego"})
location_function = FunctionSchema(
name="track_current_location",
description=(
"Start tracking the user's current GPS location, reporting position "
"updates until the user reaches their destination. "
"Once this tracker is started, it doesn't need to be started again for subsequent updates; "
"just call this function once to kick it off and the updates will come in automatically."
),
properties={},
required=[],
)
tools = ToolsSchema(standard_tools=[location_function])
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
system_instruction = (
"You are a friendly assistant. The user and you will engage in a spoken "
"dialog exchanging the transcripts of a natural real-time conversation. "
"Keep your responses short, generally two or three sentences for chatty "
"scenarios. You have access to a function that starts tracking the user's "
"location and provides regular updates on it. Narrate each position "
"update to the user as it arrives (city only, no coordinates). "
"When you receive the final location, tell the user the destination has "
"been reached."
)
llm = AWSNovaSonicLLMService(
secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
region=os.environ["AWS_REGION"],
session_token=os.getenv("AWS_SESSION_TOKEN"),
settings=AWSNovaSonicLLMService.Settings(
voice="tiffany",
system_instruction=system_instruction,
),
)
llm.register_function(
"track_current_location",
track_current_location,
cancel_on_interruption=False,
)
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
user_aggregator,
llm,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,183 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: async function call with the AWS Nova Sonic LLM service.
The ``get_current_weather`` tool is registered with
``cancel_on_interruption=False`` and simulates a slow API call (20s sleep).
While the call is in flight the conversation continues; the result arrives
later via the async-tool mechanism and is forwarded to Nova Sonic so the
model can integrate it naturally into its next turn.
"""
import asyncio
import os
import random
from datetime import datetime
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.aws.nova_sonic.llm import AWSNovaSonicLLMService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
# Simulate a long-running API call so we can demonstrate that the
# conversation continues while the tool is in flight.
await asyncio.sleep(20)
temperature = (
random.randint(60, 85)
if params.arguments["format"] == "fahrenheit"
else random.randint(15, 30)
)
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"location": params.arguments["location"],
"format": params.arguments["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
system_instruction = (
"You are a friendly assistant. The user and you will engage in a spoken "
"dialog exchanging the transcripts of a natural real-time conversation. "
"Keep your responses short, generally two or three sentences for chatty "
"scenarios. When the user asks for the weather, call get_current_weather. "
"While you wait for the result, keep chatting with the user. When the "
"result arrives, share it with the user naturally."
)
llm = AWSNovaSonicLLMService(
secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
region=os.environ["AWS_REGION"],
session_token=os.getenv("AWS_SESSION_TOKEN"),
settings=AWSNovaSonicLLMService.Settings(
voice="tiffany",
system_instruction=system_instruction,
),
)
llm.register_function(
"get_current_weather",
fetch_weather_from_api,
cancel_on_interruption=False,
)
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
user_aggregator,
llm,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -5,7 +5,6 @@
#
import asyncio
import os
import random
from datetime import datetime
@@ -46,11 +45,6 @@ async def fetch_weather_from_api(params: FunctionCallParams):
if params.arguments["format"] == "fahrenheit"
else random.randint(15, 30)
)
# Simulate a long network delay.
# You can continue chatting while waiting for this to complete.
# With Nova 2 Sonic (the default model), the assistant will respond
# appropriately once the function call is complete.
await asyncio.sleep(5)
await params.result_callback(
{
"conditions": "nice",
@@ -150,9 +144,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# Register function for function calls
# you can either register a single function for all function calls, or specific functions
# llm.register_function(None, fetch_weather_from_api)
llm.register_function(
"get_current_weather", fetch_weather_from_api, cancel_on_interruption=False
)
llm.register_function("get_current_weather", fetch_weather_from_api)
# Set up context and context management.
context = LLMContext(tools=tools)

View File

@@ -0,0 +1,258 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Helpers for the async-tool message protocol used in LLM contexts.
When a function is registered with ``cancel_on_interruption=False``, the
``LLMUserContextAggregator`` / ``LLMAssistantContextAggregator`` pair appends
async-tool messages to the conversation context as the underlying task
progresses:
- A ``started`` message (``role="tool"``) is appended immediately when the
tool starts running.
- An ``intermediate`` message (``role="developer"``) is appended each time an
intermediate result is reported via
``result_callback(..., FunctionCallResultProperties(is_final=False))``.
- A ``final`` message (``role="developer"``) is appended when the task
finishes.
This module is the single source of truth for the on-the-wire payload shape:
- The aggregator uses the ``build_*_message`` functions when injecting messages.
- Realtime LLM services use ``parse_message`` when scanning the context for
async-tool messages to forward to their providers.
Keeping construction and parsing in one module ensures the two sides can't
drift out of sync. Consumers are expected to import the module rather than
its individual functions, e.g.::
from pipecat.processors.aggregators import async_tool_messages
...
async_tool_messages.build_started_message(tool_call_id)
async_tool_messages.parse_message(msg)
"""
import json
from dataclasses import dataclass
from typing import Literal
from pipecat.processors.aggregators.llm_context import LLMStandardMessage
AsyncToolMessageKind = Literal["started", "intermediate", "final"]
# --- Payload shape (private; canonical source of truth) ---------------------
# The ``type`` field that identifies an async-tool message payload. Both the
# builders and the parser use this constant; do not duplicate the literal.
_PAYLOAD_TYPE = "async_tool"
# Status value for started / intermediate messages (task still running).
_STATUS_RUNNING = "running"
# Status value for the final message (task complete).
_STATUS_FINISHED = "finished"
# Description shipped on the started message. The text is intentionally
# self-explanatory so a model reading the context can tell what's about to
# happen even without out-of-band knowledge of the protocol.
_STARTED_DESCRIPTION = (
"An asynchronous task associated with this tool_call_id has started "
"running. Expect results to arrive later as developer messages that look "
"roughly like this one (with 'type=async_tool' and a matching tool_call_id) "
"but with a 'result' field. Note that there *may* be more than one result "
"(i.e., a stream of results), but there doesn't have to be (there may be "
"only one). The last result will come in a message with 'status=finished'."
)
# Description shipped on each intermediate-result message.
_INTERMEDIATE_DESCRIPTION = (
"This is an intermediate result for the asynchronous task associated with "
"this tool_call_id. The task is still running. More intermediate results "
"may follow, or the next result may be the final one with "
"'status=finished'."
)
# Description shipped on the final-result message.
_FINAL_DESCRIPTION = (
"This is the final result for the asynchronous task associated with this "
"tool_call_id. The task has completed. No further results will arrive for "
"this tool_call_id."
)
@dataclass(frozen=True)
class AsyncToolMessagePayload:
"""A parsed async-tool message extracted from an LLM context entry.
Parameters:
kind: Which of the three async-tool message stages this is.
tool_call_id: The id of the tool invocation this message relates to.
status: ``"running"`` for started/intermediate, ``"finished"`` for
the final message.
description: Human-readable description from the message payload. May
be empty.
result: For ``intermediate`` and ``final`` messages, the JSON-encoded
result string (or the literal ``"COMPLETED"`` if the function
returned no value). ``None`` for ``started`` messages.
raw_content: The original JSON-encoded payload string (i.e. the
``content`` field of the source LLM context message). Use this
when forwarding the message to a provider as a formal tool
result, so the provider receives the complete payload (with
``type``, ``status``, ``tool_call_id``, ``description``, and any
``result``) rather than just a sub-field.
"""
kind: AsyncToolMessageKind
tool_call_id: str
status: Literal["running", "finished"]
description: str
result: str | None
raw_content: str
# --- Builders ----------------------------------------------------------------
def build_started_message(tool_call_id: str) -> LLMStandardMessage:
"""Build a ``started`` async-tool message for an LLM context.
Append the returned message to the LLM context immediately when an async
function call (registered with ``cancel_on_interruption=False``) starts
running. The message lets the model know a task is in flight and that its
results will arrive later in subsequent ``developer``-role messages.
Args:
tool_call_id: The id of the tool invocation this message is for.
Returns:
A message ready to pass to ``LLMContext.add_message``.
"""
return {
"role": "tool",
"content": json.dumps(
{
"type": _PAYLOAD_TYPE,
"status": _STATUS_RUNNING,
"tool_call_id": tool_call_id,
"description": _STARTED_DESCRIPTION,
}
),
"tool_call_id": tool_call_id,
}
def build_intermediate_result_message(tool_call_id: str, result: str) -> LLMStandardMessage:
"""Build an intermediate-result async-tool message for an LLM context.
Append the returned message to the LLM context each time the running async
function reports a non-final result via
``result_callback(..., FunctionCallResultProperties(is_final=False))``.
Args:
tool_call_id: The id of the tool invocation the result is for.
result: The JSON-encoded result string (caller is responsible for
encoding the function's actual return value, typically via
``json.dumps``).
Returns:
A message ready to pass to ``LLMContext.add_message``.
"""
return {
"role": "developer",
"content": json.dumps(
{
"type": _PAYLOAD_TYPE,
"tool_call_id": tool_call_id,
"status": _STATUS_RUNNING,
"description": _INTERMEDIATE_DESCRIPTION,
"result": result,
}
),
}
def build_final_result_message(tool_call_id: str, result: str) -> LLMStandardMessage:
"""Build a final-result async-tool message for an LLM context.
Append the returned message to the LLM context when the async function
finishes. After this message no further async-tool messages will arrive
for this ``tool_call_id``.
Args:
tool_call_id: The id of the tool invocation the result is for.
result: The JSON-encoded result string, or the literal ``"COMPLETED"``
sentinel when the function returned ``None`` (matching the same
convention used for synchronous tool calls).
Returns:
A message ready to pass to ``LLMContext.add_message``.
"""
return {
"role": "developer",
"content": json.dumps(
{
"type": _PAYLOAD_TYPE,
"tool_call_id": tool_call_id,
"status": _STATUS_FINISHED,
"description": _FINAL_DESCRIPTION,
"result": result,
}
),
}
# --- Parsing -----------------------------------------------------------------
def parse_message(message: LLMStandardMessage) -> AsyncToolMessagePayload | None:
"""Decode an async-tool message payload, or return None if not async-tool.
Args:
message: A standard message from the LLM context. Callers iterating
over ``LLMContext.get_messages()`` should filter out
``LLMSpecificMessage`` entries first; only ``LLMStandardMessage``
values can carry async-tool payloads.
Returns:
An ``AsyncToolMessagePayload`` if the message is a recognized async-tool
payload, otherwise ``None``.
"""
role = message.get("role")
if role not in ("tool", "developer"):
return None
content = message.get("content")
if not isinstance(content, str):
return None
try:
payload = json.loads(content)
except (json.JSONDecodeError, ValueError):
return None
if not isinstance(payload, dict) or payload.get("type") != _PAYLOAD_TYPE:
return None
tool_call_id = payload.get("tool_call_id")
status = payload.get("status")
if not isinstance(tool_call_id, str) or status not in (_STATUS_RUNNING, _STATUS_FINISHED):
return None
description = payload.get("description", "")
if not isinstance(description, str):
description = ""
result = payload.get("result")
if result is not None and not isinstance(result, str):
result = None
if result is None:
kind: AsyncToolMessageKind = "started"
elif status == _STATUS_FINISHED:
kind = "final"
else:
kind = "intermediate"
return AsyncToolMessagePayload(
kind=kind,
tool_call_id=tool_call_id,
status=status,
description=description,
result=result,
raw_content=content,
)

View File

@@ -67,6 +67,7 @@ from pipecat.frames.frames import (
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.processors.aggregators import async_tool_messages
from pipecat.processors.aggregators.llm_context import (
LLMContext,
LLMContextMessage,
@@ -1075,23 +1076,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
is_async = not frame.cancel_on_interruption
if is_async:
self._context.add_message(
{
"role": "tool",
"content": json.dumps(
{
"type": "async_tool",
"status": "running",
"tool_call_id": frame.tool_call_id,
"description": "An asynchronous task associated with this tool_call_id has started running. "
+ "Expect results to arrive later as developer messages that look roughly like this one (with 'type=async_tool' and a matching tool_call_id) but with a 'result' field. "
+ "Note that there *may* be more than one result (i.e., a stream of results), but there doesn't have to be (there may be only one). "
+ "The last result will come in a message with 'status=finished'.",
}
),
"tool_call_id": frame.tool_call_id,
}
)
self._context.add_message(async_tool_messages.build_started_message(frame.tool_call_id))
else:
self._context.add_message(
{
@@ -1204,19 +1189,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
result = json.dumps(frame.result, ensure_ascii=False)
self._context.add_message(
{
"role": "developer",
"content": json.dumps(
{
"type": "async_tool",
"tool_call_id": frame.tool_call_id,
"status": "running",
"description": "This is an intermediate result for the asynchronous task associated with this tool_call_id. "
+ "The task is still running. More intermediate results may follow, or the next result may be the final one with 'status=finished'.",
"result": result,
}
),
}
async_tool_messages.build_intermediate_result_message(frame.tool_call_id, result)
)
async def _handle_function_call_finished(
@@ -1237,19 +1210,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
# notified of the completed result instead of updating the IN_PROGRESS
# tool message.
self._context.add_message(
{
"role": "developer",
"content": json.dumps(
{
"type": "async_tool",
"tool_call_id": frame.tool_call_id,
"status": "finished",
"description": "This is the final result for the asynchronous task associated with this tool_call_id. "
+ "The task has completed. No further results will arrive for this tool_call_id.",
"result": result,
}
),
}
async_tool_messages.build_final_result_message(frame.tool_call_id, result)
)
else:
self._update_function_call_result(frame.function_name, frame.tool_call_id, result)

View File

@@ -49,6 +49,8 @@ from pipecat.frames.frames import (
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.aggregators import async_tool_messages
from pipecat.processors.aggregators.async_tool_messages import AsyncToolMessagePayload
from pipecat.processors.aggregators.llm_context import LLMContext, LLMSpecificMessage
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.aws.nova_sonic.session_continuation import (
@@ -414,6 +416,7 @@ class AWSNovaSonicLLMService(LLMService[AWSNovaSonicLLMAdapter]):
self._wants_connection = False
self._user_text_buffer = ""
self._completed_tool_calls = set()
self._async_tool_text_dispatched: set[str] = set()
self._audio_input_started = False
# Session continuation helper. The service itself implements the
@@ -620,6 +623,13 @@ class AWSNovaSonicLLMService(LLMService[AWSNovaSonicLLMAdapter]):
# standard tool-result messages — skip them.
if isinstance(message, LLMSpecificMessage):
continue
async_info = async_tool_messages.parse_message(message)
if async_info is not None:
# Async-tool message — dispatch per the configured support tier.
await self._dispatch_async_tool_message(async_info, send_new_results)
continue
if message.get("role") == "tool" and message.get("content") not in [
"IN_PROGRESS",
"CANCELLED",
@@ -631,6 +641,78 @@ class AWSNovaSonicLLMService(LLMService[AWSNovaSonicLLMAdapter]):
await self._send_tool_result(tool_call_id, message.get("content"))
self._completed_tool_calls.add(tool_call_id)
async def _dispatch_async_tool_message(
self, info: AsyncToolMessagePayload, send_new_results: bool
):
"""Dispatch an async-tool message to Nova Sonic.
The ``started`` message is sent as a formal ``toolResult``; subsequent
intermediate/final results are injected as cross-modal user-role text
input events (supporting streaming async results).
"""
logger.trace(
f"{self}: async_tool dispatch: kind={info.kind} "
f"tool_call_id={info.tool_call_id} status={info.status} "
f"send_new_results={send_new_results}"
)
if info.kind == "started":
if info.tool_call_id in self._completed_tool_calls:
logger.trace(
f"{self}: async_tool started already sent: tool_call_id={info.tool_call_id}"
)
return
if send_new_results:
logger.debug(
f"{self}: async_tool send started as tool result: "
f"tool_call_id={info.tool_call_id} payload={info.raw_content!r}"
)
await self._send_tool_result(info.tool_call_id, info.raw_content)
else:
logger.trace(
f"{self}: async_tool started mark-handled (no send): "
f"tool_call_id={info.tool_call_id}"
)
self._completed_tool_calls.add(info.tool_call_id)
return
# info.kind in ("intermediate", "final")
signature = self._async_tool_message_signature(info)
if signature in self._async_tool_text_dispatched:
logger.trace(
f"{self}: async_tool {info.kind} already dispatched: "
f"tool_call_id={info.tool_call_id}"
)
return
if send_new_results:
logger.debug(
f"{self}: async_tool send {info.kind} as text input: "
f"tool_call_id={info.tool_call_id} text={info.raw_content!r}"
)
await self._send_async_tool_text(info.raw_content)
else:
logger.trace(
f"{self}: async_tool {info.kind} mark-handled (no send): "
f"tool_call_id={info.tool_call_id}"
)
self._async_tool_text_dispatched.add(signature)
@staticmethod
def _async_tool_message_signature(info: AsyncToolMessagePayload) -> str:
return f"{info.tool_call_id}|{info.status}|{info.result or ''}"
async def _send_async_tool_text(self, text: str):
"""Inject mid-conversation text via Nova Sonic's cross-modal user text input.
Used to forward intermediate/final async-tool results to the provider.
Sends a USER-role text content block (contentStart/textInput/contentEnd)
with ``interactive=True``, the documented cross-modal pattern for mid-
conversation text injection.
"""
if not self._stream or not self._prompt_name or not text:
return
await self._send_text_event(text=text, role=Role.USER, interactive=True)
async def _finish_connecting_if_context_available(self):
# We can only finish connecting once we've gotten our initial context and we're ready to
# send it
@@ -758,6 +840,7 @@ class AWSNovaSonicLLMService(LLMService[AWSNovaSonicLLMAdapter]):
self._connected_time = None
self._user_text_buffer = ""
self._completed_tool_calls = set()
self._async_tool_text_dispatched = set()
self._audio_input_started = False
self._pending_speculative_text = None

View File

@@ -0,0 +1,251 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import json
import unittest
from pipecat.processors.aggregators import async_tool_messages
# The parser tests intentionally exercise the parser via the canonical
# builders, so a drift between the two sides will surface as a parse failure
# in CI rather than as a silent contract break in production.
def _started_message(tool_call_id: str = "call_123") -> dict:
return async_tool_messages.build_started_message(tool_call_id)
def _intermediate_message(
tool_call_id: str = "call_123",
result: str = '"intermediate-1"',
) -> dict:
return async_tool_messages.build_intermediate_result_message(tool_call_id, result)
def _final_message(
tool_call_id: str = "call_123",
result: str = '"final-result"',
) -> dict:
return async_tool_messages.build_final_result_message(tool_call_id, result)
class TestParseMessage(unittest.TestCase):
def test_parses_started(self):
info = async_tool_messages.parse_message(_started_message("abc"))
assert info is not None
assert info.kind == "started"
assert info.tool_call_id == "abc"
assert info.status == "running"
assert info.result is None
assert "asynchronous task" in info.description
def test_parses_intermediate(self):
info = async_tool_messages.parse_message(_intermediate_message("abc", '"hello"'))
assert info is not None
assert info.kind == "intermediate"
assert info.tool_call_id == "abc"
assert info.status == "running"
assert info.result == '"hello"'
def test_parses_final(self):
info = async_tool_messages.parse_message(_final_message("abc", '"done"'))
assert info is not None
assert info.kind == "final"
assert info.tool_call_id == "abc"
assert info.status == "finished"
assert info.result == '"done"'
def test_raw_content_preserves_original_payload(self):
# raw_content should round-trip the source message's `content` field so
# services can forward the full payload to providers.
msg = _final_message("abc", '"done"')
info = async_tool_messages.parse_message(msg)
assert info is not None
assert info.raw_content == msg["content"]
# Sanity: it should parse back to the original payload dict.
payload = json.loads(info.raw_content)
assert payload["type"] == "async_tool"
assert payload["tool_call_id"] == "abc"
assert payload["status"] == "finished"
assert payload["result"] == '"done"'
def test_parses_completed_sentinel_result(self):
# When a function returns no value, the aggregator sets the result to
# the literal "COMPLETED" — same convention used for synchronous tool
# calls. The parser doesn't treat it specially; it's just a string.
info = async_tool_messages.parse_message(_final_message("abc", "COMPLETED"))
assert info is not None
assert info.kind == "final"
assert info.result == "COMPLETED"
def test_returns_none_for_regular_user_message(self):
assert async_tool_messages.parse_message({"role": "user", "content": "hello"}) is None
def test_returns_none_for_regular_assistant_message(self):
assert async_tool_messages.parse_message({"role": "assistant", "content": "hi"}) is None
def test_returns_none_for_regular_tool_message(self):
# IN_PROGRESS / regular tool result string content.
assert (
async_tool_messages.parse_message(
{"role": "tool", "tool_call_id": "x", "content": "IN_PROGRESS"}
)
is None
)
assert (
async_tool_messages.parse_message(
{"role": "tool", "tool_call_id": "x", "content": "weather: sunny"}
)
is None
)
def test_returns_none_for_developer_message_without_payload(self):
# role=developer is also used for non-async-tool things (potentially).
assert (
async_tool_messages.parse_message(
{"role": "developer", "content": "some other developer note"}
)
is None
)
def test_returns_none_for_invalid_json_content(self):
assert async_tool_messages.parse_message({"role": "tool", "content": "{not json"}) is None
def test_returns_none_for_non_dict_json(self):
assert async_tool_messages.parse_message({"role": "tool", "content": "[1, 2, 3]"}) is None
def test_returns_none_for_wrong_payload_type(self):
assert (
async_tool_messages.parse_message(
{
"role": "tool",
"content": json.dumps({"type": "something_else", "tool_call_id": "x"}),
}
)
is None
)
def test_returns_none_when_tool_call_id_missing(self):
assert (
async_tool_messages.parse_message(
{
"role": "tool",
"content": json.dumps({"type": "async_tool", "status": "running"}),
}
)
is None
)
def test_returns_none_when_status_invalid(self):
assert (
async_tool_messages.parse_message(
{
"role": "tool",
"content": json.dumps(
{"type": "async_tool", "tool_call_id": "x", "status": "weird"}
),
}
)
is None
)
def test_returns_none_for_non_string_content(self):
# A multimodal message with content as a list would not be an async-tool message.
assert (
async_tool_messages.parse_message(
{"role": "tool", "content": [{"type": "text", "text": "hi"}]}
)
is None
)
def test_returns_none_for_missing_role(self):
assert async_tool_messages.parse_message({"content": "{}"}) is None
class TestBuilders(unittest.TestCase):
"""Verify the builders produce the canonical payload shape and round-trip cleanly."""
def test_started_message_shape(self):
msg = async_tool_messages.build_started_message("call_42")
# Top-level: role=tool plus the tool_call_id (so the message can sit
# alongside other regular tool messages in the context).
assert msg["role"] == "tool"
assert msg["tool_call_id"] == "call_42"
payload = json.loads(msg["content"])
assert payload["type"] == "async_tool"
assert payload["status"] == "running"
assert payload["tool_call_id"] == "call_42"
assert "result" not in payload
assert isinstance(payload["description"], str) and payload["description"]
def test_intermediate_message_shape(self):
msg = async_tool_messages.build_intermediate_result_message("call_99", '"step-1"')
# Intermediate/final use role=developer and don't carry tool_call_id at
# the top level (that's only inside the payload).
assert msg["role"] == "developer"
assert "tool_call_id" not in msg
payload = json.loads(msg["content"])
assert payload["type"] == "async_tool"
assert payload["status"] == "running"
assert payload["tool_call_id"] == "call_99"
assert payload["result"] == '"step-1"'
assert isinstance(payload["description"], str) and payload["description"]
def test_final_message_shape(self):
msg = async_tool_messages.build_final_result_message("call_7", '"all-done"')
assert msg["role"] == "developer"
assert "tool_call_id" not in msg
payload = json.loads(msg["content"])
assert payload["type"] == "async_tool"
assert payload["status"] == "finished"
assert payload["tool_call_id"] == "call_7"
assert payload["result"] == '"all-done"'
assert isinstance(payload["description"], str) and payload["description"]
def test_final_message_with_completed_sentinel(self):
# The aggregator passes the literal "COMPLETED" string when the
# function returned no value (same convention as for synchronous
# tool calls). The builder doesn't treat it specially; it just
# round-trips as the result.
msg = async_tool_messages.build_final_result_message("call_1", "COMPLETED")
payload = json.loads(msg["content"])
assert payload["result"] == "COMPLETED"
info = async_tool_messages.parse_message(msg)
assert info is not None
assert info.kind == "final"
assert info.result == "COMPLETED"
def test_started_round_trip(self):
msg = async_tool_messages.build_started_message("call_x")
info = async_tool_messages.parse_message(msg)
assert info is not None
assert info.kind == "started"
assert info.tool_call_id == "call_x"
assert info.status == "running"
assert info.result is None
assert info.raw_content == msg["content"]
def test_intermediate_round_trip(self):
msg = async_tool_messages.build_intermediate_result_message("call_x", '{"step": 1}')
info = async_tool_messages.parse_message(msg)
assert info is not None
assert info.kind == "intermediate"
assert info.tool_call_id == "call_x"
assert info.status == "running"
assert info.result == '{"step": 1}'
def test_final_round_trip(self):
msg = async_tool_messages.build_final_result_message("call_x", '{"answer": 42}')
info = async_tool_messages.parse_message(msg)
assert info is not None
assert info.kind == "final"
assert info.tool_call_id == "call_x"
assert info.status == "finished"
assert info.result == '{"answer": 42}'
if __name__ == "__main__":
unittest.main()