Extend cancel_on_interruption=False to InworldRealtimeLLMService (best-effort)

Same async-tool routing approach as #4441: detect async-tool messages in
the LLM context, deliver the final result via the formal tool-result
channel.

Caveat: as of this writing, Inworld Realtime doesn't appear to handle
the resulting delayed tool result reliably, so the routing is
best-effort and the service emits a one-time warning when async-tool
messages are seen. Streamed intermediate results remain unsupported.

Also adds function calling to the realtime-inworld.py example, and
softens the Inworld mention in the #4447 changelog now that the
exclusion is being closed.
This commit is contained in:
Paul Kompfner
2026-05-12 16:03:34 -04:00
parent 67f8d34e9f
commit 58333b2705
3 changed files with 127 additions and 4 deletions

View File

@@ -1 +1 @@
- Extended the `cancel_on_interruption=False` regression fix to `GrokRealtimeLLMService`, `AzureRealtimeLLMService`, and `UltravoxRealtimeLLMService`. Grok and Azure use the same approach as in #4441 (each service detects async-tool messages in the LLM context and routes the final result to its formal tool-result channel; Azure inherits transitively from `OpenAIRealtimeLLMService`). Ultravox needed a different approach because its API freezes the conversation between `client_tool_invocation` and the matching `client_tool_result` — for async-registered functions it now ships a placeholder `client_tool_result` immediately when the function is invoked (to unfreeze the conversation), then injects the real result as user-side text once the tool finishes. Streamed intermediate results (`FunctionCallResultProperties(is_final=False)`) are still not supported on any of these realtime services. `GeminiLiveLLMService` and `InworldRealtimeLLMService` are excluded for now: Gemini Live's async-tool path needs deeper investigation, and Inworld appears to have a pre-existing problem with even simple tool calling on its Realtime API.
- Extended the `cancel_on_interruption=False` regression fix to `GrokRealtimeLLMService`, `AzureRealtimeLLMService`, and `UltravoxRealtimeLLMService`. Grok and Azure use the same approach as in #4441 (each service detects async-tool messages in the LLM context and routes the final result to its formal tool-result channel; Azure inherits transitively from `OpenAIRealtimeLLMService`). Ultravox needed a different approach because its API freezes the conversation between `client_tool_invocation` and the matching `client_tool_result` — for async-registered functions it now ships a placeholder `client_tool_result` immediately when the function is invoked (to unfreeze the conversation), then injects the real result as user-side text once the tool finishes. Streamed intermediate results (`FunctionCallResultProperties(is_final=False)`) are still not supported on any of these realtime services. `GeminiLiveLLMService` and `InworldRealtimeLLMService` are excluded for now: Gemini Live's async-tool path needs deeper investigation, and Inworld tool calling needs to be sorted out first.

View File

@@ -28,10 +28,14 @@ Usage:
"""
import os
import random
from datetime import datetime
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.frames.frames import LLMRunFrame
from pipecat.observers.loggers.transcription_log_observer import (
TranscriptionLogObserver,
@@ -48,6 +52,7 @@ from pipecat.processors.aggregators.llm_response_universal import (
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.inworld.realtime.llm import InworldRealtimeLLMService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
@@ -55,6 +60,43 @@ from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
temperature = (
random.randint(60, 85)
if params.arguments["format"] == "fahrenheit"
else random.randint(15, 30)
)
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"location": params.arguments["location"],
"format": params.arguments["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
# --- Transport Configuration ---
# No local VAD needed — Inworld's server-side semantic VAD handles turn detection.
@@ -85,7 +127,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# See: https://docs.inworld.ai/router/introduction
llm = InworldRealtimeLLMService(
api_key=os.environ["INWORLD_API_KEY"],
llm_model="xai/grok-4-1-fast-non-reasoning",
llm_model="openai/gpt-4.1-mini",
voice="Sarah",
settings=InworldRealtimeLLMService.Settings(
system_instruction="""You are a helpful and friendly AI assistant powered by Inworld.
@@ -97,9 +139,14 @@ Always be helpful and proactive in offering assistance.""",
),
)
# Create context with initial message
# Note: function calling requires a paid Inworld account and a
# function-calling-capable model
llm.register_function("get_current_weather", fetch_weather_from_api)
# Create context with initial message + tools
context = LLMContext(
[{"role": "developer", "content": "Say hello and introduce yourself!"}],
tools,
)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context)

View File

@@ -48,7 +48,8 @@ from pipecat.frames.frames import (
UserStoppedSpeakingFrame,
)
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators import async_tool_messages
from pipecat.processors.aggregators.llm_context import LLMContext, LLMSpecificMessage
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
from pipecat.services.settings import (
@@ -361,6 +362,7 @@ class InworldRealtimeLLMService(LLMService[InworldRealtimeLLMAdapter]):
self._messages_added_manually = {}
self._pending_function_calls = {}
self._completed_tool_calls = set()
self._async_tool_warning_logged: bool = False
self._register_event_handler("on_conversation_item_created")
self._register_event_handler("on_conversation_item_updated")
@@ -629,6 +631,7 @@ class InworldRealtimeLLMService(LLMService[InworldRealtimeLLMAdapter]):
self._receive_task = None
self._completed_tool_calls = set()
self._async_tool_warning_logged = False
self._audio_buffer = b""
self._interim_transcription_text = ""
self._disconnecting = False
@@ -1000,9 +1003,80 @@ class InworldRealtimeLLMService(LLMService[InworldRealtimeLLMAdapter]):
async def _process_completed_function_calls(self, send_new_results: bool):
"""Process completed function calls and send results to the service."""
# If the user registered a function with cancel_on_interruption=False,
# the aggregator emits async-tool-style messages into the context. As
# of this writing, Inworld Realtime doesn't appear to handle the
# resulting delayed tool result reliably — the routing code below
# is best-effort. Surface a one-time warning so users see they're not
# getting what they expect.
if not self._async_tool_warning_logged:
for message in self._context.get_messages():
if isinstance(message, LLMSpecificMessage):
continue
if async_tool_messages.parse_message(message) is not None:
logger.error(
f"{self}: cancel_on_interruption=False is not reliably "
f"supported by Inworld Realtime as of this writing. "
f"Use cancel_on_interruption=True (the default), or "
f"consider another LLM service if your tool needs the "
f"async semantics."
)
await self.push_error(
error_msg=(
"cancel_on_interruption=False is not reliably supported "
"by Inworld Realtime as of this writing."
),
)
self._async_tool_warning_logged = True
break
sent_new_result = False
for message in self._context.get_messages():
# LLMSpecificMessages are opaque provider-specific payloads, not
# standard tool-result messages — skip them.
if isinstance(message, LLMSpecificMessage):
continue
# Async-tool messages live alongside regular tool messages in the
# context; detect and route them before the regular logic so we
# don't try to send the async-tool envelope JSON as a tool result.
async_payload = async_tool_messages.parse_message(message)
if async_payload is not None:
if async_payload.tool_call_id in self._completed_tool_calls:
continue
if async_payload.kind == "started":
# The provider already issued the tool call and natively
# awaits a result; nothing to send for the started marker.
continue
if async_payload.kind == "intermediate":
logger.error(
f"{self}: Inworld Realtime does not support streamed async "
f"tool results; dropping intermediate result for "
f"tool_call_id={async_payload.tool_call_id}. Consider "
f"another LLM service if your tool needs to stream "
f"intermediate results."
)
await self.push_error(
error_msg="Inworld Realtime does not support streamed async tool results.",
)
continue
if async_payload.kind == "final":
# Deliver via the formal tool-result channel — same path
# as a synchronous tool result, just delayed.
if send_new_results:
sent_new_result = True
await self._send_tool_result(
async_payload.tool_call_id, async_payload.result
)
self._completed_tool_calls.add(async_payload.tool_call_id)
continue
# Defensive: any async-tool message must not fall through
# to the regular tool-result block below, even if it
# carries a kind we don't recognize.
continue
# Look for newly-completed "regular" (as opposed to async-tool) results
if message.get("role") == "tool" and message.get("content") != "IN_PROGRESS":
tool_call_id = message.get("tool_call_id")
if tool_call_id and tool_call_id not in self._completed_tool_calls:
@@ -1011,6 +1085,8 @@ class InworldRealtimeLLMService(LLMService[InworldRealtimeLLMAdapter]):
await self._send_tool_result(tool_call_id, message.get("content"))
self._completed_tool_calls.add(tool_call_id)
# If we reported any new tool call results to the service, trigger
# another response
if sent_new_result:
await self._create_response()