diff --git a/changelog/4447.fixed.md b/changelog/4447.fixed.md index 63ae73c86..ba3f17994 100644 --- a/changelog/4447.fixed.md +++ b/changelog/4447.fixed.md @@ -1 +1 @@ -- Extended the `cancel_on_interruption=False` regression fix to `GrokRealtimeLLMService`, `AzureRealtimeLLMService`, and `UltravoxRealtimeLLMService`. Grok and Azure use the same approach as in #4441 (each service detects async-tool messages in the LLM context and routes the final result to its formal tool-result channel; Azure inherits transitively from `OpenAIRealtimeLLMService`). Ultravox needed a different approach because its API freezes the conversation between `client_tool_invocation` and the matching `client_tool_result` — for async-registered functions it now ships a placeholder `client_tool_result` immediately when the function is invoked (to unfreeze the conversation), then injects the real result as user-side text once the tool finishes. Streamed intermediate results (`FunctionCallResultProperties(is_final=False)`) are still not supported on any of these realtime services. `GeminiLiveLLMService` and `InworldRealtimeLLMService` are excluded for now: Gemini Live's async-tool path needs deeper investigation, and Inworld appears to have a pre-existing problem with even simple tool calling on its Realtime API. +- Extended the `cancel_on_interruption=False` regression fix to `GrokRealtimeLLMService`, `AzureRealtimeLLMService`, and `UltravoxRealtimeLLMService`. Grok and Azure use the same approach as in #4441 (each service detects async-tool messages in the LLM context and routes the final result to its formal tool-result channel; Azure inherits transitively from `OpenAIRealtimeLLMService`). Ultravox needed a different approach because its API freezes the conversation between `client_tool_invocation` and the matching `client_tool_result` — for async-registered functions it now ships a placeholder `client_tool_result` immediately when the function is invoked (to unfreeze the conversation), then injects the real result as user-side text once the tool finishes. Streamed intermediate results (`FunctionCallResultProperties(is_final=False)`) are still not supported on any of these realtime services. `GeminiLiveLLMService` and `InworldRealtimeLLMService` are excluded for now: Gemini Live's async-tool path needs deeper investigation, and Inworld tool calling needs to be sorted out first. diff --git a/examples/realtime/realtime-inworld.py b/examples/realtime/realtime-inworld.py index 76defbc24..fdc1712ce 100644 --- a/examples/realtime/realtime-inworld.py +++ b/examples/realtime/realtime-inworld.py @@ -28,10 +28,14 @@ Usage: """ import os +import random +from datetime import datetime from dotenv import load_dotenv from loguru import logger +from pipecat.adapters.schemas.function_schema import FunctionSchema +from pipecat.adapters.schemas.tools_schema import ToolsSchema from pipecat.frames.frames import LLMRunFrame from pipecat.observers.loggers.transcription_log_observer import ( TranscriptionLogObserver, @@ -48,6 +52,7 @@ from pipecat.processors.aggregators.llm_response_universal import ( from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.inworld.realtime.llm import InworldRealtimeLLMService +from pipecat.services.llm_service import FunctionCallParams from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams @@ -55,6 +60,43 @@ from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams load_dotenv(override=True) +async def fetch_weather_from_api(params: FunctionCallParams): + temperature = ( + random.randint(60, 85) + if params.arguments["format"] == "fahrenheit" + else random.randint(15, 30) + ) + await params.result_callback( + { + "conditions": "nice", + "temperature": temperature, + "location": params.arguments["location"], + "format": params.arguments["format"], + "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"), + } + ) + + +weather_function = FunctionSchema( + name="get_current_weather", + description="Get the current weather", + properties={ + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "format": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "The temperature unit to use. Infer this from the users location.", + }, + }, + required=["location", "format"], +) + +tools = ToolsSchema(standard_tools=[weather_function]) + + # --- Transport Configuration --- # No local VAD needed — Inworld's server-side semantic VAD handles turn detection. @@ -85,7 +127,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): # See: https://docs.inworld.ai/router/introduction llm = InworldRealtimeLLMService( api_key=os.environ["INWORLD_API_KEY"], - llm_model="xai/grok-4-1-fast-non-reasoning", + llm_model="openai/gpt-4.1-mini", voice="Sarah", settings=InworldRealtimeLLMService.Settings( system_instruction="""You are a helpful and friendly AI assistant powered by Inworld. @@ -97,9 +139,14 @@ Always be helpful and proactive in offering assistance.""", ), ) - # Create context with initial message + # Note: function calling requires a paid Inworld account and a + # function-calling-capable model + llm.register_function("get_current_weather", fetch_weather_from_api) + + # Create context with initial message + tools context = LLMContext( [{"role": "developer", "content": "Say hello and introduce yourself!"}], + tools, ) user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context) diff --git a/src/pipecat/services/inworld/realtime/llm.py b/src/pipecat/services/inworld/realtime/llm.py index ea5117467..023e09fbc 100644 --- a/src/pipecat/services/inworld/realtime/llm.py +++ b/src/pipecat/services/inworld/realtime/llm.py @@ -48,7 +48,8 @@ from pipecat.frames.frames import ( UserStoppedSpeakingFrame, ) from pipecat.metrics.metrics import LLMTokenUsage -from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators import async_tool_messages +from pipecat.processors.aggregators.llm_context import LLMContext, LLMSpecificMessage from pipecat.processors.frame_processor import FrameDirection from pipecat.services.llm_service import FunctionCallFromLLM, LLMService from pipecat.services.settings import ( @@ -361,6 +362,7 @@ class InworldRealtimeLLMService(LLMService[InworldRealtimeLLMAdapter]): self._messages_added_manually = {} self._pending_function_calls = {} self._completed_tool_calls = set() + self._async_tool_warning_logged: bool = False self._register_event_handler("on_conversation_item_created") self._register_event_handler("on_conversation_item_updated") @@ -629,6 +631,7 @@ class InworldRealtimeLLMService(LLMService[InworldRealtimeLLMAdapter]): self._receive_task = None self._completed_tool_calls = set() + self._async_tool_warning_logged = False self._audio_buffer = b"" self._interim_transcription_text = "" self._disconnecting = False @@ -1000,9 +1003,80 @@ class InworldRealtimeLLMService(LLMService[InworldRealtimeLLMAdapter]): async def _process_completed_function_calls(self, send_new_results: bool): """Process completed function calls and send results to the service.""" + # If the user registered a function with cancel_on_interruption=False, + # the aggregator emits async-tool-style messages into the context. As + # of this writing, Inworld Realtime doesn't appear to handle the + # resulting delayed tool result reliably — the routing code below + # is best-effort. Surface a one-time warning so users see they're not + # getting what they expect. + if not self._async_tool_warning_logged: + for message in self._context.get_messages(): + if isinstance(message, LLMSpecificMessage): + continue + if async_tool_messages.parse_message(message) is not None: + logger.error( + f"{self}: cancel_on_interruption=False is not reliably " + f"supported by Inworld Realtime as of this writing. " + f"Use cancel_on_interruption=True (the default), or " + f"consider another LLM service if your tool needs the " + f"async semantics." + ) + await self.push_error( + error_msg=( + "cancel_on_interruption=False is not reliably supported " + "by Inworld Realtime as of this writing." + ), + ) + self._async_tool_warning_logged = True + break + sent_new_result = False for message in self._context.get_messages(): + # LLMSpecificMessages are opaque provider-specific payloads, not + # standard tool-result messages — skip them. + if isinstance(message, LLMSpecificMessage): + continue + + # Async-tool messages live alongside regular tool messages in the + # context; detect and route them before the regular logic so we + # don't try to send the async-tool envelope JSON as a tool result. + async_payload = async_tool_messages.parse_message(message) + if async_payload is not None: + if async_payload.tool_call_id in self._completed_tool_calls: + continue + if async_payload.kind == "started": + # The provider already issued the tool call and natively + # awaits a result; nothing to send for the started marker. + continue + if async_payload.kind == "intermediate": + logger.error( + f"{self}: Inworld Realtime does not support streamed async " + f"tool results; dropping intermediate result for " + f"tool_call_id={async_payload.tool_call_id}. Consider " + f"another LLM service if your tool needs to stream " + f"intermediate results." + ) + await self.push_error( + error_msg="Inworld Realtime does not support streamed async tool results.", + ) + continue + if async_payload.kind == "final": + # Deliver via the formal tool-result channel — same path + # as a synchronous tool result, just delayed. + if send_new_results: + sent_new_result = True + await self._send_tool_result( + async_payload.tool_call_id, async_payload.result + ) + self._completed_tool_calls.add(async_payload.tool_call_id) + continue + # Defensive: any async-tool message must not fall through + # to the regular tool-result block below, even if it + # carries a kind we don't recognize. + continue + + # Look for newly-completed "regular" (as opposed to async-tool) results if message.get("role") == "tool" and message.get("content") != "IN_PROGRESS": tool_call_id = message.get("tool_call_id") if tool_call_id and tool_call_id not in self._completed_tool_calls: @@ -1011,6 +1085,8 @@ class InworldRealtimeLLMService(LLMService[InworldRealtimeLLMAdapter]): await self._send_tool_result(tool_call_id, message.get("content")) self._completed_tool_calls.add(tool_call_id) + # If we reported any new tool call results to the service, trigger + # another response if sent_new_result: await self._create_response()