Merge pull request #4474 from pipecat-ai/pk/inworld-realtime-tools
Extend cancel_on_interruption=False to Inworld Realtime (best-effort + warning)
This commit is contained in:
@@ -1 +1 @@
|
||||
- Extended the `cancel_on_interruption=False` regression fix to `GrokRealtimeLLMService`, `AzureRealtimeLLMService`, and `UltravoxRealtimeLLMService`. Grok and Azure use the same approach as in #4441 (each service detects async-tool messages in the LLM context and routes the final result to its formal tool-result channel; Azure inherits transitively from `OpenAIRealtimeLLMService`). Ultravox needed a different approach because its API freezes the conversation between `client_tool_invocation` and the matching `client_tool_result` — for async-registered functions it now ships a placeholder `client_tool_result` immediately when the function is invoked (to unfreeze the conversation), then injects the real result as user-side text once the tool finishes. Streamed intermediate results (`FunctionCallResultProperties(is_final=False)`) are still not supported on any of these realtime services. `GeminiLiveLLMService` and `InworldRealtimeLLMService` are excluded for now: Gemini Live's async-tool path needs deeper investigation, and Inworld appears to have a pre-existing problem with even simple tool calling on its Realtime API.
|
||||
- Extended the `cancel_on_interruption=False` regression fix to `GrokRealtimeLLMService`, `AzureRealtimeLLMService`, and `UltravoxRealtimeLLMService`. Grok and Azure use the same approach as in #4441 (each service detects async-tool messages in the LLM context and routes the final result to its formal tool-result channel; Azure inherits transitively from `OpenAIRealtimeLLMService`). Ultravox needed a different approach because its API freezes the conversation between `client_tool_invocation` and the matching `client_tool_result` — for async-registered functions it now ships a placeholder `client_tool_result` immediately when the function is invoked (to unfreeze the conversation), then injects the real result as user-side text once the tool finishes. Streamed intermediate results (`FunctionCallResultProperties(is_final=False)`) are still not supported on any of these realtime services. `GeminiLiveLLMService` and `InworldRealtimeLLMService` are excluded for now: Gemini Live's async-tool path needs deeper investigation, and Inworld tool calling needs to be sorted out first.
|
||||
|
||||
1
changelog/4474.fixed.md
Normal file
1
changelog/4474.fixed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Extended the `cancel_on_interruption=False` regression fix to `InworldRealtimeLLMService`. Uses the same approach as in #4441 (the service detects async-tool messages in the LLM context and routes the final result to its formal tool-result channel). Note: as of this writing, Inworld Realtime doesn't appear to handle the resulting delayed tool result reliably — the routing is best-effort and the service surfaces a one-time warning when async-tool messages are seen. Streamed intermediate results (`FunctionCallResultProperties(is_final=False)`) are still not supported on this realtime service. (Inworld was excluded from #4447 pending resolution of an unrelated tool-calling issue, which turned out to be an account-level matter.)
|
||||
@@ -28,10 +28,14 @@ Usage:
|
||||
"""
|
||||
|
||||
import os
|
||||
import random
|
||||
from datetime import datetime
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.observers.loggers.transcription_log_observer import (
|
||||
TranscriptionLogObserver,
|
||||
@@ -48,6 +52,7 @@ from pipecat.processors.aggregators.llm_response_universal import (
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.inworld.realtime.llm import InworldRealtimeLLMService
|
||||
from pipecat.services.llm_service import FunctionCallParams
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
@@ -55,6 +60,43 @@ from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def fetch_weather_from_api(params: FunctionCallParams):
|
||||
temperature = (
|
||||
random.randint(60, 85)
|
||||
if params.arguments["format"] == "fahrenheit"
|
||||
else random.randint(15, 30)
|
||||
)
|
||||
await params.result_callback(
|
||||
{
|
||||
"conditions": "nice",
|
||||
"temperature": temperature,
|
||||
"location": params.arguments["location"],
|
||||
"format": params.arguments["format"],
|
||||
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
weather_function = FunctionSchema(
|
||||
name="get_current_weather",
|
||||
description="Get the current weather",
|
||||
properties={
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
"format": {
|
||||
"type": "string",
|
||||
"enum": ["celsius", "fahrenheit"],
|
||||
"description": "The temperature unit to use. Infer this from the users location.",
|
||||
},
|
||||
},
|
||||
required=["location", "format"],
|
||||
)
|
||||
|
||||
tools = ToolsSchema(standard_tools=[weather_function])
|
||||
|
||||
|
||||
# --- Transport Configuration ---
|
||||
|
||||
# No local VAD needed — Inworld's server-side semantic VAD handles turn detection.
|
||||
@@ -85,7 +127,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
# See: https://docs.inworld.ai/router/introduction
|
||||
llm = InworldRealtimeLLMService(
|
||||
api_key=os.environ["INWORLD_API_KEY"],
|
||||
llm_model="xai/grok-4-1-fast-non-reasoning",
|
||||
llm_model="openai/gpt-4.1-mini",
|
||||
voice="Sarah",
|
||||
settings=InworldRealtimeLLMService.Settings(
|
||||
system_instruction="""You are a helpful and friendly AI assistant powered by Inworld.
|
||||
@@ -97,9 +139,14 @@ Always be helpful and proactive in offering assistance.""",
|
||||
),
|
||||
)
|
||||
|
||||
# Create context with initial message
|
||||
# Note: function calling requires a paid Inworld account and a
|
||||
# function-calling-capable model
|
||||
llm.register_function("get_current_weather", fetch_weather_from_api)
|
||||
|
||||
# Create context with initial message + tools
|
||||
context = LLMContext(
|
||||
[{"role": "developer", "content": "Say hello and introduce yourself!"}],
|
||||
tools,
|
||||
)
|
||||
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context)
|
||||
|
||||
@@ -48,7 +48,8 @@ from pipecat.frames.frames import (
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.metrics.metrics import LLMTokenUsage
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators import async_tool_messages
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext, LLMSpecificMessage
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
|
||||
from pipecat.services.settings import (
|
||||
@@ -361,6 +362,7 @@ class InworldRealtimeLLMService(LLMService[InworldRealtimeLLMAdapter]):
|
||||
self._messages_added_manually = {}
|
||||
self._pending_function_calls = {}
|
||||
self._completed_tool_calls = set()
|
||||
self._async_tool_warning_logged: bool = False
|
||||
|
||||
self._register_event_handler("on_conversation_item_created")
|
||||
self._register_event_handler("on_conversation_item_updated")
|
||||
@@ -629,6 +631,7 @@ class InworldRealtimeLLMService(LLMService[InworldRealtimeLLMAdapter]):
|
||||
self._receive_task = None
|
||||
|
||||
self._completed_tool_calls = set()
|
||||
self._async_tool_warning_logged = False
|
||||
self._audio_buffer = b""
|
||||
self._interim_transcription_text = ""
|
||||
self._disconnecting = False
|
||||
@@ -1000,9 +1003,80 @@ class InworldRealtimeLLMService(LLMService[InworldRealtimeLLMAdapter]):
|
||||
|
||||
async def _process_completed_function_calls(self, send_new_results: bool):
|
||||
"""Process completed function calls and send results to the service."""
|
||||
# If the user registered a function with cancel_on_interruption=False,
|
||||
# the aggregator emits async-tool-style messages into the context. As
|
||||
# of this writing, Inworld Realtime doesn't appear to handle the
|
||||
# resulting delayed tool result reliably — the routing code below
|
||||
# is best-effort. Surface a one-time warning so users see they're not
|
||||
# getting what they expect.
|
||||
if not self._async_tool_warning_logged:
|
||||
for message in self._context.get_messages():
|
||||
if isinstance(message, LLMSpecificMessage):
|
||||
continue
|
||||
if async_tool_messages.parse_message(message) is not None:
|
||||
logger.error(
|
||||
f"{self}: cancel_on_interruption=False is not reliably "
|
||||
f"supported by Inworld Realtime as of this writing. "
|
||||
f"Use cancel_on_interruption=True (the default), or "
|
||||
f"consider another LLM service if your tool needs the "
|
||||
f"async semantics."
|
||||
)
|
||||
await self.push_error(
|
||||
error_msg=(
|
||||
"cancel_on_interruption=False is not reliably supported "
|
||||
"by Inworld Realtime as of this writing."
|
||||
),
|
||||
)
|
||||
self._async_tool_warning_logged = True
|
||||
break
|
||||
|
||||
sent_new_result = False
|
||||
|
||||
for message in self._context.get_messages():
|
||||
# LLMSpecificMessages are opaque provider-specific payloads, not
|
||||
# standard tool-result messages — skip them.
|
||||
if isinstance(message, LLMSpecificMessage):
|
||||
continue
|
||||
|
||||
# Async-tool messages live alongside regular tool messages in the
|
||||
# context; detect and route them before the regular logic so we
|
||||
# don't try to send the async-tool envelope JSON as a tool result.
|
||||
async_payload = async_tool_messages.parse_message(message)
|
||||
if async_payload is not None:
|
||||
if async_payload.tool_call_id in self._completed_tool_calls:
|
||||
continue
|
||||
if async_payload.kind == "started":
|
||||
# The provider already issued the tool call and natively
|
||||
# awaits a result; nothing to send for the started marker.
|
||||
continue
|
||||
if async_payload.kind == "intermediate":
|
||||
logger.error(
|
||||
f"{self}: Inworld Realtime does not support streamed async "
|
||||
f"tool results; dropping intermediate result for "
|
||||
f"tool_call_id={async_payload.tool_call_id}. Consider "
|
||||
f"another LLM service if your tool needs to stream "
|
||||
f"intermediate results."
|
||||
)
|
||||
await self.push_error(
|
||||
error_msg="Inworld Realtime does not support streamed async tool results.",
|
||||
)
|
||||
continue
|
||||
if async_payload.kind == "final":
|
||||
# Deliver via the formal tool-result channel — same path
|
||||
# as a synchronous tool result, just delayed.
|
||||
if send_new_results:
|
||||
sent_new_result = True
|
||||
await self._send_tool_result(
|
||||
async_payload.tool_call_id, async_payload.result
|
||||
)
|
||||
self._completed_tool_calls.add(async_payload.tool_call_id)
|
||||
continue
|
||||
# Defensive: any async-tool message must not fall through
|
||||
# to the regular tool-result block below, even if it
|
||||
# carries a kind we don't recognize.
|
||||
continue
|
||||
|
||||
# Look for newly-completed "regular" (as opposed to async-tool) results
|
||||
if message.get("role") == "tool" and message.get("content") != "IN_PROGRESS":
|
||||
tool_call_id = message.get("tool_call_id")
|
||||
if tool_call_id and tool_call_id not in self._completed_tool_calls:
|
||||
@@ -1011,6 +1085,8 @@ class InworldRealtimeLLMService(LLMService[InworldRealtimeLLMAdapter]):
|
||||
await self._send_tool_result(tool_call_id, message.get("content"))
|
||||
self._completed_tool_calls.add(tool_call_id)
|
||||
|
||||
# If we reported any new tool call results to the service, trigger
|
||||
# another response
|
||||
if sent_new_result:
|
||||
await self._create_response()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user