Files
pipecat/tests/test_async_tool_messages.py
Paul Kompfner 72d0fb418a fix: restore cancel_on_interruption=False support in AWS Nova Sonic and OpenAI Realtime
Before the new async-tool mechanism landed, AWSNovaSonicLLMService and
OpenAIRealtimeLLMService honored cancel_on_interruption=False by simply
not cancelling in-flight function calls on interruption — the eventual
result then flowed through the same channel as any synchronous tool
result. The new mechanism (which appends started/intermediate/final
messages to the LLM context as the underlying task progresses) broke
that path: the realtime services didn't know how to interpret those
messages, and the eventual result was never delivered to the provider.

Restore the flag's behavior by teaching both services to detect
async-tool messages in the context and route them appropriately:

- started → skipped silently. The provider already issued the tool call
  and natively awaits a result; nothing to send for the started marker.
- final → delivered via the formal tool-result channel. Same path as a
  synchronous tool result, just delayed.

Streamed intermediate results (FunctionCallResultProperties(is_final=
False)) are not supported on these realtime services. An intermediate
result is logged as an error and surfaced via push_error, then dropped.
Use a non-realtime LLM service if a tool needs to stream intermediate
results. (Docstrings on register_function, register_direct_function, and
FunctionCallResultProperties.is_final updated to call this out.)

A new shared module pipecat.processors.aggregators.async_tool_messages
is the single source of truth for the on-the-wire payload shape: the
aggregator uses its build_*_message functions when injecting messages,
and the realtime services use parse_message when scanning the context.

Adds two example files exercising a network-delayed weather tool with
each service. The plain realtime-aws-nova-sonic.py example is also
reverted to a synchronous tool call now that the async variant lives in
its own file.

Similar fixes for other realtime services are forthcoming.
2026-05-08 09:33:06 -04:00

237 lines
9.0 KiB
Python

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import json
import unittest
from pipecat.processors.aggregators import async_tool_messages
# The parser tests intentionally exercise the parser via the canonical
# builders, so a drift between the two sides will surface as a parse failure
# in CI rather than as a silent contract break in production.
def _started_message(tool_call_id: str = "call_123") -> dict:
return async_tool_messages.build_started_message(tool_call_id)
def _intermediate_message(
tool_call_id: str = "call_123",
result: str = '"intermediate-1"',
) -> dict:
return async_tool_messages.build_intermediate_result_message(tool_call_id, result)
def _final_message(
tool_call_id: str = "call_123",
result: str = '"final-result"',
) -> dict:
return async_tool_messages.build_final_result_message(tool_call_id, result)
class TestParseMessage(unittest.TestCase):
def test_parses_started(self):
info = async_tool_messages.parse_message(_started_message("abc"))
assert info is not None
assert info.kind == "started"
assert info.tool_call_id == "abc"
assert info.status == "running"
assert info.result is None
assert "asynchronous task" in info.description
def test_parses_intermediate(self):
info = async_tool_messages.parse_message(_intermediate_message("abc", '"hello"'))
assert info is not None
assert info.kind == "intermediate"
assert info.tool_call_id == "abc"
assert info.status == "running"
assert info.result == '"hello"'
def test_parses_final(self):
info = async_tool_messages.parse_message(_final_message("abc", '"done"'))
assert info is not None
assert info.kind == "final"
assert info.tool_call_id == "abc"
assert info.status == "finished"
assert info.result == '"done"'
def test_parses_completed_sentinel_result(self):
# When a function returns no value, the aggregator sets the result to
# the literal "COMPLETED" — same convention used for synchronous tool
# calls. The parser doesn't treat it specially; it's just a string.
info = async_tool_messages.parse_message(_final_message("abc", "COMPLETED"))
assert info is not None
assert info.kind == "final"
assert info.result == "COMPLETED"
def test_returns_none_for_regular_user_message(self):
assert async_tool_messages.parse_message({"role": "user", "content": "hello"}) is None
def test_returns_none_for_regular_assistant_message(self):
assert async_tool_messages.parse_message({"role": "assistant", "content": "hi"}) is None
def test_returns_none_for_regular_tool_message(self):
# IN_PROGRESS / regular tool result string content.
assert (
async_tool_messages.parse_message(
{"role": "tool", "tool_call_id": "x", "content": "IN_PROGRESS"}
)
is None
)
assert (
async_tool_messages.parse_message(
{"role": "tool", "tool_call_id": "x", "content": "weather: sunny"}
)
is None
)
def test_returns_none_for_developer_message_without_payload(self):
# role=developer is also used for non-async-tool things (potentially).
assert (
async_tool_messages.parse_message(
{"role": "developer", "content": "some other developer note"}
)
is None
)
def test_returns_none_for_invalid_json_content(self):
assert async_tool_messages.parse_message({"role": "tool", "content": "{not json"}) is None
def test_returns_none_for_non_dict_json(self):
assert async_tool_messages.parse_message({"role": "tool", "content": "[1, 2, 3]"}) is None
def test_returns_none_for_wrong_payload_type(self):
assert (
async_tool_messages.parse_message(
{
"role": "tool",
"content": json.dumps({"type": "something_else", "tool_call_id": "x"}),
}
)
is None
)
def test_returns_none_when_tool_call_id_missing(self):
assert (
async_tool_messages.parse_message(
{
"role": "tool",
"content": json.dumps({"type": "async_tool", "status": "running"}),
}
)
is None
)
def test_returns_none_when_status_invalid(self):
assert (
async_tool_messages.parse_message(
{
"role": "tool",
"content": json.dumps(
{"type": "async_tool", "tool_call_id": "x", "status": "weird"}
),
}
)
is None
)
def test_returns_none_for_non_string_content(self):
# A multimodal message with content as a list would not be an async-tool message.
assert (
async_tool_messages.parse_message(
{"role": "tool", "content": [{"type": "text", "text": "hi"}]}
)
is None
)
def test_returns_none_for_missing_role(self):
assert async_tool_messages.parse_message({"content": "{}"}) is None
class TestBuilders(unittest.TestCase):
"""Verify the builders produce the canonical payload shape and round-trip cleanly."""
def test_started_message_shape(self):
msg = async_tool_messages.build_started_message("call_42")
# Top-level: role=tool plus the tool_call_id (so the message can sit
# alongside other regular tool messages in the context).
assert msg["role"] == "tool"
assert msg["tool_call_id"] == "call_42"
payload = json.loads(msg["content"])
assert payload["type"] == "async_tool"
assert payload["status"] == "running"
assert payload["tool_call_id"] == "call_42"
assert "result" not in payload
assert isinstance(payload["description"], str) and payload["description"]
def test_intermediate_message_shape(self):
msg = async_tool_messages.build_intermediate_result_message("call_99", '"step-1"')
# Intermediate/final use role=developer and don't carry tool_call_id at
# the top level (that's only inside the payload).
assert msg["role"] == "developer"
assert "tool_call_id" not in msg
payload = json.loads(msg["content"])
assert payload["type"] == "async_tool"
assert payload["status"] == "running"
assert payload["tool_call_id"] == "call_99"
assert payload["result"] == '"step-1"'
assert isinstance(payload["description"], str) and payload["description"]
def test_final_message_shape(self):
msg = async_tool_messages.build_final_result_message("call_7", '"all-done"')
assert msg["role"] == "developer"
assert "tool_call_id" not in msg
payload = json.loads(msg["content"])
assert payload["type"] == "async_tool"
assert payload["status"] == "finished"
assert payload["tool_call_id"] == "call_7"
assert payload["result"] == '"all-done"'
assert isinstance(payload["description"], str) and payload["description"]
def test_final_message_with_completed_sentinel(self):
# The aggregator passes the literal "COMPLETED" string when the
# function returned no value (same convention as for synchronous
# tool calls). The builder doesn't treat it specially; it just
# round-trips as the result.
msg = async_tool_messages.build_final_result_message("call_1", "COMPLETED")
payload = json.loads(msg["content"])
assert payload["result"] == "COMPLETED"
info = async_tool_messages.parse_message(msg)
assert info is not None
assert info.kind == "final"
assert info.result == "COMPLETED"
def test_started_round_trip(self):
msg = async_tool_messages.build_started_message("call_x")
info = async_tool_messages.parse_message(msg)
assert info is not None
assert info.kind == "started"
assert info.tool_call_id == "call_x"
assert info.status == "running"
assert info.result is None
def test_intermediate_round_trip(self):
msg = async_tool_messages.build_intermediate_result_message("call_x", '{"step": 1}')
info = async_tool_messages.parse_message(msg)
assert info is not None
assert info.kind == "intermediate"
assert info.tool_call_id == "call_x"
assert info.status == "running"
assert info.result == '{"step": 1}'
def test_final_round_trip(self):
msg = async_tool_messages.build_final_result_message("call_x", '{"answer": 42}')
info = async_tool_messages.parse_message(msg)
assert info is not None
assert info.kind == "final"
assert info.tool_call_id == "call_x"
assert info.status == "finished"
assert info.result == '{"answer": 42}'
if __name__ == "__main__":
unittest.main()