diff --git a/examples/function-calling/function-calling-openai-async-stream.py b/examples/function-calling/function-calling-openai-async-stream.py index b1d9d724b..e8b10ad7f 100644 --- a/examples/function-calling/function-calling-openai-async-stream.py +++ b/examples/function-calling/function-calling-openai-async-stream.py @@ -74,6 +74,7 @@ async def track_current_location(params: FunctionCallParams): # Second update: revised city estimate. await asyncio.sleep(10) + # await asyncio.sleep(20) gps = {"lat": 33.96003, "lng": -118.40639} await params.result_callback( {"gps": gps, "city": "Los Angeles"}, @@ -82,6 +83,7 @@ async def track_current_location(params: FunctionCallParams): # Final result: confirmed city. await asyncio.sleep(10) + # await asyncio.sleep(20) gps = {"lat": 32.743569, "lng": -117.20466} await params.result_callback({"gps": gps, "city": "San Diego"}) diff --git a/examples/realtime/realtime-aws-nova-sonic-async-stream-tool.py b/examples/realtime/realtime-aws-nova-sonic-async-stream-tool.py index 2a5a2dea8..a3a042668 100644 --- a/examples/realtime/realtime-aws-nova-sonic-async-stream-tool.py +++ b/examples/realtime/realtime-aws-nova-sonic-async-stream-tool.py @@ -54,14 +54,14 @@ async def track_current_location(params: FunctionCallParams): properties=FunctionCallResultProperties(is_final=False), ) - await asyncio.sleep(20) + await asyncio.sleep(10) gps = {"lat": 33.96003, "lng": -118.40639} await params.result_callback( {"gps": gps, "city": "Los Angeles"}, properties=FunctionCallResultProperties(is_final=False), ) - await asyncio.sleep(20) + await asyncio.sleep(10) gps = {"lat": 32.743569, "lng": -117.20466} await params.result_callback({"gps": gps, "city": "San Diego"}) diff --git a/src/pipecat/processors/aggregators/async_tool_messages.py b/src/pipecat/processors/aggregators/async_tool_messages.py index 221054637..2c2ab824a 100644 --- a/src/pipecat/processors/aggregators/async_tool_messages.py +++ b/src/pipecat/processors/aggregators/async_tool_messages.py @@ -289,95 +289,82 @@ def parse_message(message: LLMStandardMessage) -> AsyncToolMessagePayload | None # --- Realtime preparation ---------------------------------------------------- -# Natural-language reminder grafted onto the ``description`` field of in-flight -# payloads (started / intermediate) when they're sent to a realtime LLM -# service. Realtime services receive these mid-stream while the model is -# still talking with the user, which is the moment the model is most likely -# to mistakenly re-issue the same tool call. Keeping this reminder out of the -# canonical payload descriptions (and confined to the realtime path) avoids -# influencing non-realtime consumers of the same context. We don't graft it -# onto ``final`` payloads, because at that point the task is done and -# re-invocation by the model is no longer a mistake. -# -# The reminder is appended *after* the canonical description so the model -# first reads the protocol-level explanation of what async-tool messages are -# and how they work, and only then encounters the behavioral directive, -# which now flows naturally from that context. -_REALTIME_REINVOCATION_REMINDER = ( - "While this task is in flight, do not call the same tool with the same " - "arguments again; you would just kick off a duplicate task." -) - - -def prepare_message_payload_for_realtime(payload: AsyncToolMessagePayload) -> str: +def prepare_message_payload_for_realtime( + payload: AsyncToolMessagePayload, + *, + template: str | None = None, +) -> str: """Prepare an async-tool message payload for sending to a realtime LLM service. - Returns a wire-ready JSON string. Realtime services that fully honor the - async-tool mechanism send the ``started`` payload via the formal - tool-result channel and the subsequent ``intermediate`` / ``final`` - payloads as text injected mid-conversation; this function returns the - string to send in either case, and callers route it to the appropriate - channel. - - The exact transformation depends on the payload kind. Each kind is - handled by its own private helper, so per-kind tweaks can be added later - without entangling the others. Today: - - - ``started`` / ``intermediate``: a natural-language reminder - discouraging the model from re-invoking the in-flight tool is grafted - onto the ``description`` field, then the payload is re-serialized. - Grafting into ``description`` (rather than wrapping the JSON with extra - text) keeps the output well-formed JSON, which the formal tool-result - channel requires. - - ``final``: pass-through; the payload is serialized as-is. The task is - done at this point, so re-invocation by the model (if the user asks - again later) is no longer a mistake. + Realtime services that fully honor the async-tool mechanism send the + ``started`` payload via the formal tool-result channel and the subsequent + ``intermediate`` / ``final`` payloads as text injected mid-conversation; + this function returns the string to send in either case, and callers + route it to the appropriate channel. Args: payload: The parsed async-tool message payload. + template: Optional format string. If provided, the rendered output is + ``template.format(tool_call_id=…, status=…, result=…, description=…)``. + If ``None``, the payload is serialized to its canonical JSON + form. Per-kind helpers ultimately decide what to do with the + template, so future per-kind tweaks (e.g. raising for a kind + that shouldn't accept templates) can be added without changing + this signature. Returns: - The prepared JSON string, ready to be sent to the realtime service. + The prepared string, ready to be sent to the realtime service. """ if payload.kind == "started": - return _prepare_started_message_payload_for_realtime(payload) + return _prepare_started_message_payload_for_realtime(payload, template=template) if payload.kind == "intermediate": - return _prepare_intermediate_result_message_payload_for_realtime(payload) + return _prepare_intermediate_result_message_payload_for_realtime(payload, template=template) if payload.kind == "final": - return _prepare_final_result_message_payload_for_realtime(payload) + return _prepare_final_result_message_payload_for_realtime(payload, template=template) raise ValueError(f"Unknown async-tool message payload kind: {payload.kind!r}") -def _prepare_started_message_payload_for_realtime(payload: AsyncToolMessagePayload) -> str: - return _payload_to_json(_with_reinvocation_reminder_grafted_in(payload)) +def _prepare_started_message_payload_for_realtime( + payload: AsyncToolMessagePayload, + *, + template: str | None = None, +) -> str: + if template is None: + return _payload_to_json(payload) + return _format_with_template(payload, template) def _prepare_intermediate_result_message_payload_for_realtime( payload: AsyncToolMessagePayload, + *, + template: str | None = None, ) -> str: - return _payload_to_json(_with_reinvocation_reminder_grafted_in(payload)) + if template is None: + return _payload_to_json(payload) + return _format_with_template(payload, template) -def _prepare_final_result_message_payload_for_realtime(payload: AsyncToolMessagePayload) -> str: - # Pass-through, for now - return _payload_to_json(payload) - - -def _with_reinvocation_reminder_grafted_in( +def _prepare_final_result_message_payload_for_realtime( payload: AsyncToolMessagePayload, -) -> AsyncToolMessagePayload: - """Return a copy of ``payload`` with the re-invocation reminder appended to ``description``. + *, + template: str | None = None, +) -> str: + if template is None: + return _payload_to_json(payload) + return _format_with_template(payload, template) - The reminder lives inside ``description`` so the surrounding JSON - envelope stays well-formed (which the formal tool-result channel - requires). It's appended (rather than prefixed) so the model first - reads the protocol-level explanation of what async-tool messages are - and only then encounters the behavioral directive. + +def _format_with_template(payload: AsyncToolMessagePayload, template: str) -> str: + """Render a payload via a caller-supplied template. + + Available substitution keys: ``tool_call_id``, ``status``, ``result``, + ``description``. Note that ``result`` is empty for ``started`` payloads + (no result has been produced yet); callers building templates intended + for ``started`` should not rely on it. """ - return AsyncToolMessagePayload( - kind=payload.kind, + return template.format( tool_call_id=payload.tool_call_id, status=payload.status, - description=f"{payload.description} {_REALTIME_REINVOCATION_REMINDER}", - result=payload.result, + result=payload.result or "", + description=payload.description, ) diff --git a/src/pipecat/services/aws/nova_sonic/llm.py b/src/pipecat/services/aws/nova_sonic/llm.py index dbf32eda5..f6436436a 100644 --- a/src/pipecat/services/aws/nova_sonic/llm.py +++ b/src/pipecat/services/aws/nova_sonic/llm.py @@ -237,6 +237,26 @@ class AWSNovaSonicLLMSettings(LLMSettings): endpointing_sensitivity: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) +# Bracketed plain-text template Nova Sonic uses when injecting async-tool +# result updates onto the cross-modal user-text channel. +# +# Note that this template intentionally drops the payload's ``description`` +# field (the protocol-level explanation of what async-tool messages are and +# how they work) and only carries ``tool_call_id``, ``status``, and +# ``result``. Counterintuitively, this short framing — minus the verbose +# protocol description, minus a JSON envelope altogether — empirically +# yields much better Nova Sonic behavior: noticeably fewer spurious +# re-invocations of the same tool than when the full JSON envelope (with +# its description) was injected as text. We don't fully understand why; one +# plausible explanation is that the model treats long, instruction-shaped +# description text as content demanding a response, where a terse +# bracketed status update reads more like ambient state. Worth revisiting +# if Nova Sonic's text-channel handling changes. +_ASYNC_TOOL_RESULT_TEXT_TEMPLATE = ( + "[Async tool update for tool_call_id={tool_call_id}, status={status}] {result}" +) + + class AWSNovaSonicLLMService(LLMService[AWSNovaSonicLLMAdapter]): """AWS Nova Sonic speech-to-speech LLM service. @@ -686,12 +706,14 @@ class AWSNovaSonicLLMService(LLMService[AWSNovaSonicLLMAdapter]): ) return if send_new_results: - payload = async_tool_messages.prepare_message_payload_for_realtime(info) + text = async_tool_messages.prepare_message_payload_for_realtime( + info, template=_ASYNC_TOOL_RESULT_TEXT_TEMPLATE + ) logger.debug( f"{self}: async_tool send {info.kind} as text input: " - f"tool_call_id={info.tool_call_id} text={payload!r}" + f"tool_call_id={info.tool_call_id} text={text!r}" ) - await self._send_async_tool_text(payload) + await self._send_async_tool_text(text) else: logger.trace( f"{self}: async_tool {info.kind} mark-handled (no send): " diff --git a/tests/test_async_tool_messages.py b/tests/test_async_tool_messages.py index d6bcc55ed..5e56d8ed7 100644 --- a/tests/test_async_tool_messages.py +++ b/tests/test_async_tool_messages.py @@ -233,54 +233,89 @@ class TestBuilders(unittest.TestCase): class TestPrepareMessagePayloadForRealtime(unittest.TestCase): - def test_started_grafts_reminder_into_description(self): + """Verify the realtime preparation behavior across kinds and template usage.""" + + # --- Default (no template) → raw JSON pass-through ----------------------- + + def test_started_default_is_raw_json(self): msg = async_tool_messages.build_started_message("call_42") info = async_tool_messages.parse_message(msg) assert info is not None text = async_tool_messages.prepare_message_payload_for_realtime(info) - # The output is well-formed JSON (the formal tool-result channel - # requires it). decoded = json.loads(text) - # The reminder lives inside the description field, not outside the - # JSON envelope. - assert "do not call the same tool" in decoded["description"] - assert "duplicate task" in decoded["description"] - # And the original description text is still present after the reminder. - assert "asynchronous task" in decoded["description"] - # Other payload fields are preserved. assert decoded["type"] == "async_tool" assert decoded["tool_call_id"] == "call_42" assert decoded["status"] == "running" # Started payloads have no result field. assert "result" not in decoded - def test_intermediate_grafts_reminder_into_description(self): + def test_intermediate_default_is_raw_json(self): msg = async_tool_messages.build_intermediate_result_message("call_42", '"step-1"') info = async_tool_messages.parse_message(msg) assert info is not None text = async_tool_messages.prepare_message_payload_for_realtime(info) decoded = json.loads(text) - assert "do not call the same tool" in decoded["description"] assert decoded["type"] == "async_tool" assert decoded["tool_call_id"] == "call_42" assert decoded["status"] == "running" assert decoded["result"] == '"step-1"' - def test_final_is_pass_through(self): - # The task is done at this point; the re-invocation reminder no - # longer applies, so the final payload is forwarded as-is (no - # reminder grafted onto the description). + def test_final_default_is_raw_json(self): msg = async_tool_messages.build_final_result_message("call_42", '"the answer"') info = async_tool_messages.parse_message(msg) assert info is not None text = async_tool_messages.prepare_message_payload_for_realtime(info) decoded = json.loads(text) - assert "do not call the same tool" not in decoded["description"] assert decoded["type"] == "async_tool" assert decoded["tool_call_id"] == "call_42" assert decoded["status"] == "finished" assert decoded["result"] == '"the answer"' + # --- Caller-supplied template applied across kinds ----------------------- + + def test_template_applied_to_started(self): + msg = async_tool_messages.build_started_message("call_42") + info = async_tool_messages.parse_message(msg) + assert info is not None + text = async_tool_messages.prepare_message_payload_for_realtime( + info, + template="[{tool_call_id} {status}] {result}", + ) + # Started has no result; substitution yields empty string after the bracket. + assert text == "[call_42 running] " + + def test_template_applied_to_intermediate(self): + msg = async_tool_messages.build_intermediate_result_message("call_42", '"step-1"') + info = async_tool_messages.parse_message(msg) + assert info is not None + text = async_tool_messages.prepare_message_payload_for_realtime( + info, + template="[{tool_call_id} {status}] {result}", + ) + assert text == '[call_42 running] "step-1"' + + def test_template_applied_to_final(self): + msg = async_tool_messages.build_final_result_message("call_42", '"the answer"') + info = async_tool_messages.parse_message(msg) + assert info is not None + text = async_tool_messages.prepare_message_payload_for_realtime( + info, + template="[{tool_call_id} {status}] {result}", + ) + assert text == '[call_42 finished] "the answer"' + + def test_template_can_use_description_field(self): + msg = async_tool_messages.build_intermediate_result_message("call_42", '"step-1"') + info = async_tool_messages.parse_message(msg) + assert info is not None + text = async_tool_messages.prepare_message_payload_for_realtime( + info, + template="{description} >> {result}", + ) + # The intermediate description text is preserved verbatim. + assert "intermediate result" in text + assert text.endswith('>> "step-1"') + if __name__ == "__main__": unittest.main()