Compare commits

...

19 Commits

Author SHA1 Message Date
filipi87
16133a2323 Removing the custom prompt. 2026-04-01 16:05:09 -03:00
filipi87
9d815cb5d2 Merge branch 'filipi/async_tools' into filipi/async_tools_structured_data 2026-04-01 15:50:35 -03:00
filipi87
2d87edac18 Merge branch 'main' into filipi/async_tools 2026-04-01 15:49:43 -03:00
filipi87
bce07e0c76 Merge branch 'filipi/async_tools' into filipi/async_tools_structured_data 2026-04-01 15:48:22 -03:00
filipi87
59092fe4fe Renaming the examples to match main. 2026-04-01 15:42:50 -03:00
filipi87
d515a81073 Updating the Anthropic example to use async function calls. 2026-04-01 15:31:32 -03:00
filipi87
e23cb46885 Trying to structure async tool responses and improve the LLM prompt to teach it how to handle them. 2026-04-01 14:48:09 -03:00
filipi87
72bbad51b7 Added group_parallel_tools parameter to LLMService. 2026-04-01 13:51:30 -03:00
filipi87
c066a913fe Adding changelogs for all the fixes. 2026-04-01 12:20:58 -03:00
filipi87
63bbfc3b27 Creating the concept of a group_id for the function calls. 2026-04-01 12:05:09 -03:00
filipi87
2458b9d42b Delaying the response for the get_current_weather in the openai example. 2026-04-01 10:47:29 -03:00
filipi87
4543aef3d9 Only pushing a context frame when we receive the function call result if the user is not speaking. 2026-04-01 10:45:00 -03:00
filipi87
260368b6f4 Fixing an issue where the BotOutputTransport was discarding the UninterruptibleFrames. 2026-04-01 10:32:11 -03:00
filipi87
3ad2675b24 Creating UninterruptibleProcessQueue. 2026-04-01 10:28:52 -03:00
filipi87
970d713d7a Using a JSON to send the result. 2026-04-01 10:28:03 -03:00
filipi87
f7012c570c Fixed an issue in the FrameProcessor where only the current frame was checked for being an UninterruptibleFrame, not other frames in the queue. 2026-03-31 18:38:11 -03:00
filipi87
4bfa084f77 Updating the openai example to be async. 2026-03-31 17:37:39 -03:00
filipi87
780d6c476d Merge branch 'main' into filipi/async_tools 2026-03-31 17:36:40 -03:00
filipi87
dfdb92958b Fix async tool handling for compatibility with all LLMs. 2026-03-31 17:26:06 -03:00
14 changed files with 231 additions and 53 deletions

View File

@@ -0,0 +1 @@
- Added `group_parallel_tools` parameter to `LLMService` (default `True`). When `True`, all function calls from the same LLM response batch share a group ID and the LLM is triggered exactly once after the last call completes. Set to `False` to trigger inference independently for each function call result as it arrives.

1
changelog/4217.added.md Normal file
View File

@@ -0,0 +1 @@
- Added `is_async=True` support to `register_function()` and `register_direct_function()`. When enabled, the LLM continues the conversation immediately without waiting for the function result. The result is injected back into the context as a `developer` message once available, triggering a new LLM inference at that point.

View File

@@ -0,0 +1 @@
- When multiple function calls are returned in a single LLM response, the LLM is now triggered exactly once after the last call in the batch completes, rather than waiting for all function calls.

View File

@@ -0,0 +1 @@
- Fixed `BaseOutputTransport` discarding pending `UninterruptibleFrame` items (e.g. function-call context updates) when an interruption arrived. The audio task is now kept alive and only interruptible frames are drained when uninterruptible frames are present in the queue.

View File

@@ -0,0 +1 @@
- Fixed spurious LLM inference being triggered when a function call result arrived while the user was actively speaking. The context frame is now suppressed until the user stops speaking.

1
changelog/4217.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed an issue where `UninterruptibleFrame` items queued in `FrameProcessor` could be incorrectly dropped on interruption. Previously only the frame currently being processed was checked; now the entire process queue is scanned so pending uninterruptible frames are always delivered.

View File

@@ -4,7 +4,7 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
from dotenv import load_dotenv
@@ -35,9 +35,10 @@ from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def get_weather(params: FunctionCallParams):
location = params.arguments["location"]
await params.result_callback(f"The weather in {location} is currently 72 degrees and sunny.")
async def fetch_weather_from_api(params: FunctionCallParams):
# Simulate a long-running API call, so we can test async function calls.
await asyncio.sleep(20)
await params.result_callback({"conditions": "nice", "temperature": "75"})
async def fetch_restaurant_recommendation(params: FunctionCallParams):
@@ -80,11 +81,20 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
),
)
llm.register_function("get_weather", get_weather)
# You can also register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function(
"get_current_weather",
fetch_weather_from_api,
cancel_on_interruption=False,
is_async=True,
timeout_secs=30,
)
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
weather_function = FunctionSchema(
name="get_weather",
name="get_current_weather",
description="Get the current weather",
properties={
"location": {

View File

@@ -4,6 +4,7 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
from dotenv import load_dotenv
@@ -12,7 +13,10 @@ from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame, TTSSpeakFrame
from pipecat.frames.frames import (
LLMRunFrame,
TTSSpeakFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -35,6 +39,8 @@ load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
# Simulate a long-running API call, so we can test async function calls.
await asyncio.sleep(20)
await params.result_callback({"conditions": "nice", "temperature": "75"})
@@ -88,7 +94,13 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# You can also register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function(
"get_current_weather",
fetch_weather_from_api,
cancel_on_interruption=False,
is_async=True,
timeout_secs=30,
)
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
@llm.event_handler("on_function_calls_started")

View File

@@ -1850,12 +1850,20 @@ class FunctionCallInProgressFrame(ControlFrame, UninterruptibleFrame):
tool_call_id: Unique identifier for this function call.
arguments: Arguments passed to the function.
cancel_on_interruption: Whether to cancel this call if interrupted.
is_async: Whether this function call runs asynchronously. When True,
the LLM continues the conversation immediately without waiting for
the result. The result is injected later via a developer message.
group_id: Identifier shared by all function calls originating from the
same LLM response batch. Used to determine when the last call in a
group completes so the LLM can be triggered exactly once.
"""
function_name: str
tool_call_id: str
arguments: Any
cancel_on_interruption: bool = False
is_async: bool = False
group_id: Optional[str] = None
@dataclass

View File

@@ -866,6 +866,8 @@ class LLMAssistantAggregator(LLMContextAggregator):
self._function_calls_image_results: Dict[str, UserImageRawFrame] = {}
self._context_updated_tasks: Set[asyncio.Task] = set()
self._user_speaking: bool = False
self._assistant_turn_start_timestamp = ""
self._thought_append_to_context = False
@@ -968,6 +970,12 @@ class LLMAssistantAggregator(LLMContextAggregator):
await self._handle_user_image_frame(frame)
elif isinstance(frame, AssistantImageRawFrame):
await self._handle_assistant_image_frame(frame)
elif isinstance(frame, UserStartedSpeakingFrame):
self._user_speaking = True
await self.push_frame(frame, direction)
elif isinstance(frame, UserStoppedSpeakingFrame):
self._user_speaking = False
await self.push_frame(frame, direction)
else:
await self.push_frame(frame, direction)
@@ -1047,13 +1055,22 @@ class LLMAssistantAggregator(LLMContextAggregator):
],
}
)
self._context.add_message(
{
"role": "tool",
"content": "IN_PROGRESS",
"tool_call_id": frame.tool_call_id,
}
)
if frame.is_async:
self._context.add_message(
{
"role": "tool",
"content": json.dumps({"type": "async_tool", "status": "started"}),
"tool_call_id": frame.tool_call_id,
}
)
else:
self._context.add_message(
{
"role": "tool",
"content": "IN_PROGRESS",
"tool_call_id": frame.tool_call_id,
}
)
self._function_calls_in_progress[frame.tool_call_id] = frame
@@ -1067,16 +1084,34 @@ class LLMAssistantAggregator(LLMContextAggregator):
)
return
in_progress_frame = self._function_calls_in_progress[frame.tool_call_id]
is_async = in_progress_frame.is_async if in_progress_frame else False
group_id = in_progress_frame.group_id if in_progress_frame else None
del self._function_calls_in_progress[frame.tool_call_id]
properties = frame.properties
# Update context with the function call result
if frame.result:
result = json.dumps(frame.result, ensure_ascii=False)
self._update_function_call_result(frame.function_name, frame.tool_call_id, result)
result = json.dumps(frame.result, ensure_ascii=False) if frame.result else "COMPLETED"
if is_async:
# For async function calls instead of updating the existing IN_PROGRESS tool message we inject
# a new developer message so the LLM is notified of the completed result.
self._context.add_message(
{
"role": "developer",
"content": json.dumps(
{
"type": "async_tool",
"tool_call_id": frame.tool_call_id,
"status": "finished",
"result": result,
}
),
}
)
else:
self._update_function_call_result(frame.function_name, frame.tool_call_id, "COMPLETED")
self._update_function_call_result(frame.function_name, frame.tool_call_id, result)
run_llm = False
@@ -1098,10 +1133,18 @@ class LLMAssistantAggregator(LLMContextAggregator):
# If the frame is indicating we should run the LLM, do it.
run_llm = frame.run_llm
else:
# If this is the last function call in progress, run the LLM.
run_llm = not bool(self._function_calls_in_progress)
# Run the LLM when this is the last function call in the group
# to complete. If group_id is set, only consider sibling calls;
# otherwise always execute as soon as we receive the result.
if group_id:
run_llm = not any(
f is not None and f.group_id == group_id
for f in self._function_calls_in_progress.values()
)
else:
run_llm = True
if run_llm:
if run_llm and not self._user_speaking:
await self.push_context_frame(FrameDirection.UPSTREAM)
# Call the `on_context_updated` callback once the function call result

View File

@@ -133,6 +133,65 @@ class FrameProcessorQueue(asyncio.PriorityQueue):
return item
class UninterruptibleProcessQueue(asyncio.Queue):
"""An asyncio.Queue that tracks whether any UninterruptibleFrame is enqueued.
Extends ``asyncio.Queue`` and maintains an O(1) ``has_uninterruptible``
flag so interrupt-handling code can decide whether to cancel a task or
merely drain non-uninterruptible items without scanning the queue.
Items may be raw ``Frame`` objects or tuples whose first element is a
``Frame`` (e.g. ``(frame, direction, callback)``). Pass a ``frame_getter``
callable to extract the frame from each item; the default treats the item
itself as the frame.
Also exposes a ``reset()`` helper that drains all non-``UninterruptibleFrame``
items while keeping uninterruptible ones in place.
"""
def __init__(self, frame_getter: Callable[[Any], Frame] = lambda item: item):
"""Initialize the UninterruptibleProcessQueue.
Args:
frame_getter: Callable that extracts a ``Frame`` from a queue item.
Defaults to the identity function (item is a raw ``Frame``).
Pass ``lambda item: item[0]`` when items are
``(frame, direction, callback)`` tuples.
"""
super().__init__()
self._frame_getter = frame_getter
self._uninterruptible_count: int = 0
@property
def has_uninterruptible(self) -> bool:
"""Return True if any UninterruptibleFrame is currently in the queue."""
return self._uninterruptible_count > 0
def _put(self, item: Any) -> None:
if isinstance(self._frame_getter(item), UninterruptibleFrame):
self._uninterruptible_count += 1
super()._put(item)
def _get(self) -> Any:
item = super()._get()
if isinstance(self._frame_getter(item), UninterruptibleFrame):
self._uninterruptible_count -= 1
return item
def reset(self) -> None:
"""Remove all non-UninterruptibleFrame items, keeping uninterruptible ones."""
kept: asyncio.Queue = asyncio.Queue()
while not self.empty():
item = self.get_nowait()
if isinstance(self._frame_getter(item), UninterruptibleFrame):
kept.put_nowait(item)
self.task_done()
while not kept.empty():
item = kept.get_nowait()
self.put_nowait(item)
kept.task_done()
# Timeout in seconds for cancelling the input frame processing task.
# This prevents hanging if a library swallows asyncio.CancelledError.
INPUT_TASK_CANCEL_TIMEOUT_SECS = 3
@@ -233,7 +292,7 @@ class FrameProcessor(BaseObject):
# called. To resume processing frames we need to call
# `resume_processing_frames()` which will wake up the event.
self.__should_block_frames = False
self.__process_queue = asyncio.Queue()
self.__process_queue = UninterruptibleProcessQueue(frame_getter=lambda item: item[0])
self.__process_event: Optional[asyncio.Event] = None
self.__process_frame_task: Optional[asyncio.Task] = None
self.__process_current_frame: Optional[Frame] = None
@@ -861,9 +920,14 @@ class FrameProcessor(BaseObject):
async def _start_interruption(self):
"""Start handling an interruption by cancelling current tasks."""
try:
if isinstance(self.__process_current_frame, UninterruptibleFrame):
# We don't want to cancel UninterruptibleFrame, so we simply
# cleanup the queue.
current_is_uninterruptible = isinstance(
self.__process_current_frame, UninterruptibleFrame
)
if current_is_uninterruptible or self.__process_queue.has_uninterruptible:
# We don't want to cancel an UninterruptibleFrame (either the
# one currently being processed or one waiting in the queue),
# so we simply cleanup the queue keeping only
# UninterruptibleFrames.
self.__reset_process_queue()
else:
# Cancel and re-create the process task.
@@ -963,22 +1027,7 @@ class FrameProcessor(BaseObject):
def __reset_process_queue(self):
"""Reset non-system frame processing queue."""
# Create a new queue to insert UninterruptibleFrame frames.
new_queue = asyncio.Queue()
# Process current queue and keep UninterruptibleFrame frames.
while not self.__process_queue.empty():
item = self.__process_queue.get_nowait()
frame = item[0]
if isinstance(frame, UninterruptibleFrame):
new_queue.put_nowait(item)
self.__process_queue.task_done()
# Put back UninterruptibleFrame frames into our process queue.
while not new_queue.empty():
item = new_queue.get_nowait()
self.__process_queue.put_nowait(item)
new_queue.task_done()
self.__process_queue.reset()
async def __cancel_process_task(self):
"""Cancel the non-system frame processing task."""

View File

@@ -7,8 +7,8 @@
"""Base classes for Large Language Model services with function calling support."""
import asyncio
import inspect
import json
import uuid
import warnings
from dataclasses import dataclass
from typing import (
@@ -119,6 +119,9 @@ class FunctionCallRegistryItem:
function_name: The name of the function (None for catch-all handler).
handler: The handler for processing function call parameters.
cancel_on_interruption: Whether to cancel the call on interruption.
is_async: Whether this function call runs asynchronously. When True,
the LLM continues the conversation immediately without waiting for
the result. The result is injected later via a developer message.
timeout_secs: Optional per-tool timeout in seconds. Overrides the global
``function_call_timeout_secs`` for this specific function.
"""
@@ -126,6 +129,7 @@ class FunctionCallRegistryItem:
function_name: Optional[str]
handler: FunctionCallHandler | "DirectFunctionWrapper"
cancel_on_interruption: bool
is_async: bool = False
timeout_secs: Optional[float] = None
@@ -142,6 +146,9 @@ class FunctionCallRunnerItem:
arguments: The arguments for the function.
context: The LLM context.
run_llm: Optional flag to control LLM execution after function call.
group_id: Shared identifier for all function calls from the same LLM
response batch. Used to trigger the LLM exactly once when the last
call in the group completes.
"""
registry_item: FunctionCallRegistryItem
@@ -150,6 +157,7 @@ class FunctionCallRunnerItem:
arguments: Mapping[str, Any]
context: LLMContext
run_llm: Optional[bool] = None
group_id: Optional[str] = None
class LLMService(UserTurnCompletionLLMServiceMixin, AIService):
@@ -185,6 +193,7 @@ class LLMService(UserTurnCompletionLLMServiceMixin, AIService):
def __init__(
self,
run_in_parallel: bool = True,
group_parallel_tools: bool = True,
function_call_timeout_secs: float = 10.0,
settings: Optional[LLMSettings] = None,
**kwargs,
@@ -194,6 +203,10 @@ class LLMService(UserTurnCompletionLLMServiceMixin, AIService):
Args:
run_in_parallel: Whether to run function calls in parallel or sequentially.
Defaults to True.
group_parallel_tools: Whether to group parallel function calls so the LLM
is triggered exactly once after all calls in the batch complete. When
False, each function call result triggers the LLM independently as it
arrives. Defaults to True.
function_call_timeout_secs: Timeout in seconds for deferred function calls.
Defaults to 10.0 seconds.
settings: The runtime-updatable settings for the LLM service.
@@ -208,6 +221,7 @@ class LLMService(UserTurnCompletionLLMServiceMixin, AIService):
**kwargs,
)
self._run_in_parallel = run_in_parallel
self._group_parallel_tools = group_parallel_tools
self._function_call_timeout_secs = function_call_timeout_secs
self._filter_incomplete_user_turns: bool = False
self._base_system_instruction: Optional[str] = None
@@ -538,6 +552,7 @@ class LLMService(UserTurnCompletionLLMServiceMixin, AIService):
handler: Any,
*,
cancel_on_interruption: bool = True,
is_async: bool = False,
timeout_secs: Optional[float] = None,
):
"""Register a function handler for LLM function calls.
@@ -549,6 +564,10 @@ class LLMService(UserTurnCompletionLLMServiceMixin, AIService):
parameter.
cancel_on_interruption: Whether to cancel this function call when an
interruption occurs. Defaults to True.
is_async: Whether this function call runs asynchronously. When True,
the LLM continues the conversation immediately without waiting for
the result. The result is injected later via a developer message.
Defaults to False.
timeout_secs: Optional per-tool timeout in seconds. Overrides the global
``function_call_timeout_secs`` for this specific function. Defaults to
None, which uses the global timeout.
@@ -559,6 +578,7 @@ class LLMService(UserTurnCompletionLLMServiceMixin, AIService):
function_name=function_name,
handler=handler,
cancel_on_interruption=cancel_on_interruption,
is_async=is_async,
timeout_secs=timeout_secs,
)
@@ -567,6 +587,7 @@ class LLMService(UserTurnCompletionLLMServiceMixin, AIService):
handler: DirectFunction,
*,
cancel_on_interruption: bool = True,
is_async: bool = False,
timeout_secs: Optional[float] = None,
):
"""Register a direct function handler for LLM function calls.
@@ -579,6 +600,10 @@ class LLMService(UserTurnCompletionLLMServiceMixin, AIService):
handler: The direct function to register. Must follow DirectFunction protocol.
cancel_on_interruption: Whether to cancel this function call when an
interruption occurs. Defaults to True.
is_async: Whether this function call runs asynchronously. When True,
the LLM continues the conversation immediately without waiting for
the result. The result is injected later via a developer message.
Defaults to False.
timeout_secs: Optional per-tool timeout in seconds. Overrides the global
``function_call_timeout_secs`` for this specific function. Defaults to
None, which uses the global timeout.
@@ -588,6 +613,7 @@ class LLMService(UserTurnCompletionLLMServiceMixin, AIService):
function_name=wrapper.name,
handler=wrapper,
cancel_on_interruption=cancel_on_interruption,
is_async=is_async,
timeout_secs=timeout_secs,
)
@@ -639,6 +665,11 @@ class LLMService(UserTurnCompletionLLMServiceMixin, AIService):
await self.broadcast_frame(FunctionCallsStartedFrame, function_calls=function_calls)
# When group_parallel_tools is True all calls share a group_id so the
# aggregator triggers the LLM exactly once after the last one completes.
# When False, group_id is None and each result triggers inference independently.
group_id = str(uuid.uuid4()) if self._group_parallel_tools else None
runner_items = []
for function_call in function_calls:
if function_call.function_name in self._functions.keys():
@@ -658,6 +689,7 @@ class LLMService(UserTurnCompletionLLMServiceMixin, AIService):
tool_call_id=function_call.tool_call_id,
arguments=function_call.arguments,
context=function_call.context,
group_id=group_id,
)
)
@@ -726,6 +758,8 @@ class LLMService(UserTurnCompletionLLMServiceMixin, AIService):
tool_call_id=runner_item.tool_call_id,
arguments=runner_item.arguments,
cancel_on_interruption=item.cancel_on_interruption,
is_async=item.is_async,
group_id=runner_item.group_id,
)
timeout_task: Optional[asyncio.Task] = None

View File

@@ -46,7 +46,11 @@ from pipecat.frames.frames import (
TTSAudioRawFrame,
TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frame_processor import (
FrameDirection,
FrameProcessor,
UninterruptibleProcessQueue,
)
from pipecat.transports.base_transport import TransportParams
from pipecat.utils.time import nanoseconds_to_seconds
@@ -520,15 +524,21 @@ class BaseOutputTransport(FrameProcessor):
if not self._transport._allow_interruptions:
return
# Cancel tasks.
await self._cancel_audio_task()
# Cancel all tasks.
await self._cancel_clock_task()
await self._cancel_video_task()
if self._audio_queue.has_uninterruptible:
# Keep the audio task running but drain all interruptible frames
# so the pending UninterruptibleFrames are still delivered.
self._audio_queue.reset()
else:
await self._cancel_audio_task()
self._create_audio_task()
# Create tasks.
self._create_video_task()
self._create_clock_task()
self._create_audio_task()
# Let's send a bot stopped speaking if we have to.
await self._bot_stopped_speaking()
@@ -612,7 +622,7 @@ class BaseOutputTransport(FrameProcessor):
def _create_audio_task(self):
"""Create the audio processing task."""
if not self._audio_task:
self._audio_queue = asyncio.Queue()
self._audio_queue = UninterruptibleProcessQueue()
self._audio_task = self._transport.create_task(self._audio_task_handler())
async def _cancel_audio_task(self):

View File

@@ -389,9 +389,13 @@ class LLMContextSummarizationUtil:
Scans messages from ``start_idx`` up to (but not including)
``summary_end`` to identify tool calls whose responses either don't
exist yet or fall in the kept portion of the context (>= summary_end).
exist yet, fall in the kept portion of the context (>= summary_end),
or are still marked as ``IN_PROGRESS`` (async calls whose results have
not yet arrived).
This prevents summarizing tool call requests when their responses would
remain in the kept context as orphans, which the OpenAI API rejects.
remain in the kept context as orphans, which the OpenAI API rejects,
and avoids summarizing async function calls before their results arrive.
Args:
messages: List of messages to check.
@@ -428,11 +432,13 @@ class LLMContextSummarizationUtil:
if tool_call_id:
pending_tool_calls[tool_call_id] = i
# Check for tool results
# Check for tool results — treat IN_PROGRESS as still pending
# (async function calls whose results have not yet arrived).
if role == "tool":
tool_call_id = msg.get("tool_call_id")
if tool_call_id and tool_call_id in pending_tool_calls:
pending_tool_calls.pop(tool_call_id)
if msg.get("content") != "IN_PROGRESS":
pending_tool_calls.pop(tool_call_id)
# If we have pending tool calls, return the earliest index
if pending_tool_calls: