Compare commits
19 Commits
main
...
filipi/asy
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
16133a2323 | ||
|
|
9d815cb5d2 | ||
|
|
2d87edac18 | ||
|
|
bce07e0c76 | ||
|
|
59092fe4fe | ||
|
|
d515a81073 | ||
|
|
e23cb46885 | ||
|
|
72bbad51b7 | ||
|
|
c066a913fe | ||
|
|
63bbfc3b27 | ||
|
|
2458b9d42b | ||
|
|
4543aef3d9 | ||
|
|
260368b6f4 | ||
|
|
3ad2675b24 | ||
|
|
970d713d7a | ||
|
|
f7012c570c | ||
|
|
4bfa084f77 | ||
|
|
780d6c476d | ||
|
|
dfdb92958b |
1
changelog/4217.added.2.md
Normal file
1
changelog/4217.added.2.md
Normal file
@@ -0,0 +1 @@
|
||||
- Added `group_parallel_tools` parameter to `LLMService` (default `True`). When `True`, all function calls from the same LLM response batch share a group ID and the LLM is triggered exactly once after the last call completes. Set to `False` to trigger inference independently for each function call result as it arrives.
|
||||
1
changelog/4217.added.md
Normal file
1
changelog/4217.added.md
Normal file
@@ -0,0 +1 @@
|
||||
- Added `is_async=True` support to `register_function()` and `register_direct_function()`. When enabled, the LLM continues the conversation immediately without waiting for the function result. The result is injected back into the context as a `developer` message once available, triggering a new LLM inference at that point.
|
||||
1
changelog/4217.changed.md
Normal file
1
changelog/4217.changed.md
Normal file
@@ -0,0 +1 @@
|
||||
- When multiple function calls are returned in a single LLM response, the LLM is now triggered exactly once after the last call in the batch completes, rather than waiting for all function calls.
|
||||
1
changelog/4217.fixed.2.md
Normal file
1
changelog/4217.fixed.2.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed `BaseOutputTransport` discarding pending `UninterruptibleFrame` items (e.g. function-call context updates) when an interruption arrived. The audio task is now kept alive and only interruptible frames are drained when uninterruptible frames are present in the queue.
|
||||
1
changelog/4217.fixed.3.md
Normal file
1
changelog/4217.fixed.3.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed spurious LLM inference being triggered when a function call result arrived while the user was actively speaking. The context frame is now suppressed until the user stops speaking.
|
||||
1
changelog/4217.fixed.md
Normal file
1
changelog/4217.fixed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Fixed an issue where `UninterruptibleFrame` items queued in `FrameProcessor` could be incorrectly dropped on interruption. Previously only the frame currently being processed was checked; now the entire process queue is scanned so pending uninterruptible frames are always delivered.
|
||||
@@ -4,7 +4,7 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
@@ -35,9 +35,10 @@ from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def get_weather(params: FunctionCallParams):
|
||||
location = params.arguments["location"]
|
||||
await params.result_callback(f"The weather in {location} is currently 72 degrees and sunny.")
|
||||
async def fetch_weather_from_api(params: FunctionCallParams):
|
||||
# Simulate a long-running API call, so we can test async function calls.
|
||||
await asyncio.sleep(20)
|
||||
await params.result_callback({"conditions": "nice", "temperature": "75"})
|
||||
|
||||
|
||||
async def fetch_restaurant_recommendation(params: FunctionCallParams):
|
||||
@@ -80,11 +81,20 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
|
||||
),
|
||||
)
|
||||
llm.register_function("get_weather", get_weather)
|
||||
|
||||
# You can also register a function_name of None to get all functions
|
||||
# sent to the same callback with an additional function_name parameter.
|
||||
llm.register_function(
|
||||
"get_current_weather",
|
||||
fetch_weather_from_api,
|
||||
cancel_on_interruption=False,
|
||||
is_async=True,
|
||||
timeout_secs=30,
|
||||
)
|
||||
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
|
||||
|
||||
weather_function = FunctionSchema(
|
||||
name="get_weather",
|
||||
name="get_current_weather",
|
||||
description="Get the current weather",
|
||||
properties={
|
||||
"location": {
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
@@ -12,7 +13,10 @@ from loguru import logger
|
||||
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import LLMRunFrame, TTSSpeakFrame
|
||||
from pipecat.frames.frames import (
|
||||
LLMRunFrame,
|
||||
TTSSpeakFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -35,6 +39,8 @@ load_dotenv(override=True)
|
||||
|
||||
|
||||
async def fetch_weather_from_api(params: FunctionCallParams):
|
||||
# Simulate a long-running API call, so we can test async function calls.
|
||||
await asyncio.sleep(20)
|
||||
await params.result_callback({"conditions": "nice", "temperature": "75"})
|
||||
|
||||
|
||||
@@ -88,7 +94,13 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
|
||||
# You can also register a function_name of None to get all functions
|
||||
# sent to the same callback with an additional function_name parameter.
|
||||
llm.register_function("get_current_weather", fetch_weather_from_api)
|
||||
llm.register_function(
|
||||
"get_current_weather",
|
||||
fetch_weather_from_api,
|
||||
cancel_on_interruption=False,
|
||||
is_async=True,
|
||||
timeout_secs=30,
|
||||
)
|
||||
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
|
||||
|
||||
@llm.event_handler("on_function_calls_started")
|
||||
|
||||
@@ -1850,12 +1850,20 @@ class FunctionCallInProgressFrame(ControlFrame, UninterruptibleFrame):
|
||||
tool_call_id: Unique identifier for this function call.
|
||||
arguments: Arguments passed to the function.
|
||||
cancel_on_interruption: Whether to cancel this call if interrupted.
|
||||
is_async: Whether this function call runs asynchronously. When True,
|
||||
the LLM continues the conversation immediately without waiting for
|
||||
the result. The result is injected later via a developer message.
|
||||
group_id: Identifier shared by all function calls originating from the
|
||||
same LLM response batch. Used to determine when the last call in a
|
||||
group completes so the LLM can be triggered exactly once.
|
||||
"""
|
||||
|
||||
function_name: str
|
||||
tool_call_id: str
|
||||
arguments: Any
|
||||
cancel_on_interruption: bool = False
|
||||
is_async: bool = False
|
||||
group_id: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -866,6 +866,8 @@ class LLMAssistantAggregator(LLMContextAggregator):
|
||||
self._function_calls_image_results: Dict[str, UserImageRawFrame] = {}
|
||||
self._context_updated_tasks: Set[asyncio.Task] = set()
|
||||
|
||||
self._user_speaking: bool = False
|
||||
|
||||
self._assistant_turn_start_timestamp = ""
|
||||
|
||||
self._thought_append_to_context = False
|
||||
@@ -968,6 +970,12 @@ class LLMAssistantAggregator(LLMContextAggregator):
|
||||
await self._handle_user_image_frame(frame)
|
||||
elif isinstance(frame, AssistantImageRawFrame):
|
||||
await self._handle_assistant_image_frame(frame)
|
||||
elif isinstance(frame, UserStartedSpeakingFrame):
|
||||
self._user_speaking = True
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, UserStoppedSpeakingFrame):
|
||||
self._user_speaking = False
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
@@ -1047,13 +1055,22 @@ class LLMAssistantAggregator(LLMContextAggregator):
|
||||
],
|
||||
}
|
||||
)
|
||||
self._context.add_message(
|
||||
{
|
||||
"role": "tool",
|
||||
"content": "IN_PROGRESS",
|
||||
"tool_call_id": frame.tool_call_id,
|
||||
}
|
||||
)
|
||||
if frame.is_async:
|
||||
self._context.add_message(
|
||||
{
|
||||
"role": "tool",
|
||||
"content": json.dumps({"type": "async_tool", "status": "started"}),
|
||||
"tool_call_id": frame.tool_call_id,
|
||||
}
|
||||
)
|
||||
else:
|
||||
self._context.add_message(
|
||||
{
|
||||
"role": "tool",
|
||||
"content": "IN_PROGRESS",
|
||||
"tool_call_id": frame.tool_call_id,
|
||||
}
|
||||
)
|
||||
|
||||
self._function_calls_in_progress[frame.tool_call_id] = frame
|
||||
|
||||
@@ -1067,16 +1084,34 @@ class LLMAssistantAggregator(LLMContextAggregator):
|
||||
)
|
||||
return
|
||||
|
||||
in_progress_frame = self._function_calls_in_progress[frame.tool_call_id]
|
||||
is_async = in_progress_frame.is_async if in_progress_frame else False
|
||||
group_id = in_progress_frame.group_id if in_progress_frame else None
|
||||
|
||||
del self._function_calls_in_progress[frame.tool_call_id]
|
||||
|
||||
properties = frame.properties
|
||||
|
||||
# Update context with the function call result
|
||||
if frame.result:
|
||||
result = json.dumps(frame.result, ensure_ascii=False)
|
||||
self._update_function_call_result(frame.function_name, frame.tool_call_id, result)
|
||||
result = json.dumps(frame.result, ensure_ascii=False) if frame.result else "COMPLETED"
|
||||
|
||||
if is_async:
|
||||
# For async function calls instead of updating the existing IN_PROGRESS tool message we inject
|
||||
# a new developer message so the LLM is notified of the completed result.
|
||||
self._context.add_message(
|
||||
{
|
||||
"role": "developer",
|
||||
"content": json.dumps(
|
||||
{
|
||||
"type": "async_tool",
|
||||
"tool_call_id": frame.tool_call_id,
|
||||
"status": "finished",
|
||||
"result": result,
|
||||
}
|
||||
),
|
||||
}
|
||||
)
|
||||
else:
|
||||
self._update_function_call_result(frame.function_name, frame.tool_call_id, "COMPLETED")
|
||||
self._update_function_call_result(frame.function_name, frame.tool_call_id, result)
|
||||
|
||||
run_llm = False
|
||||
|
||||
@@ -1098,10 +1133,18 @@ class LLMAssistantAggregator(LLMContextAggregator):
|
||||
# If the frame is indicating we should run the LLM, do it.
|
||||
run_llm = frame.run_llm
|
||||
else:
|
||||
# If this is the last function call in progress, run the LLM.
|
||||
run_llm = not bool(self._function_calls_in_progress)
|
||||
# Run the LLM when this is the last function call in the group
|
||||
# to complete. If group_id is set, only consider sibling calls;
|
||||
# otherwise always execute as soon as we receive the result.
|
||||
if group_id:
|
||||
run_llm = not any(
|
||||
f is not None and f.group_id == group_id
|
||||
for f in self._function_calls_in_progress.values()
|
||||
)
|
||||
else:
|
||||
run_llm = True
|
||||
|
||||
if run_llm:
|
||||
if run_llm and not self._user_speaking:
|
||||
await self.push_context_frame(FrameDirection.UPSTREAM)
|
||||
|
||||
# Call the `on_context_updated` callback once the function call result
|
||||
|
||||
@@ -133,6 +133,65 @@ class FrameProcessorQueue(asyncio.PriorityQueue):
|
||||
return item
|
||||
|
||||
|
||||
class UninterruptibleProcessQueue(asyncio.Queue):
|
||||
"""An asyncio.Queue that tracks whether any UninterruptibleFrame is enqueued.
|
||||
|
||||
Extends ``asyncio.Queue`` and maintains an O(1) ``has_uninterruptible``
|
||||
flag so interrupt-handling code can decide whether to cancel a task or
|
||||
merely drain non-uninterruptible items without scanning the queue.
|
||||
|
||||
Items may be raw ``Frame`` objects or tuples whose first element is a
|
||||
``Frame`` (e.g. ``(frame, direction, callback)``). Pass a ``frame_getter``
|
||||
callable to extract the frame from each item; the default treats the item
|
||||
itself as the frame.
|
||||
|
||||
Also exposes a ``reset()`` helper that drains all non-``UninterruptibleFrame``
|
||||
items while keeping uninterruptible ones in place.
|
||||
"""
|
||||
|
||||
def __init__(self, frame_getter: Callable[[Any], Frame] = lambda item: item):
|
||||
"""Initialize the UninterruptibleProcessQueue.
|
||||
|
||||
Args:
|
||||
frame_getter: Callable that extracts a ``Frame`` from a queue item.
|
||||
Defaults to the identity function (item is a raw ``Frame``).
|
||||
Pass ``lambda item: item[0]`` when items are
|
||||
``(frame, direction, callback)`` tuples.
|
||||
"""
|
||||
super().__init__()
|
||||
self._frame_getter = frame_getter
|
||||
self._uninterruptible_count: int = 0
|
||||
|
||||
@property
|
||||
def has_uninterruptible(self) -> bool:
|
||||
"""Return True if any UninterruptibleFrame is currently in the queue."""
|
||||
return self._uninterruptible_count > 0
|
||||
|
||||
def _put(self, item: Any) -> None:
|
||||
if isinstance(self._frame_getter(item), UninterruptibleFrame):
|
||||
self._uninterruptible_count += 1
|
||||
super()._put(item)
|
||||
|
||||
def _get(self) -> Any:
|
||||
item = super()._get()
|
||||
if isinstance(self._frame_getter(item), UninterruptibleFrame):
|
||||
self._uninterruptible_count -= 1
|
||||
return item
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Remove all non-UninterruptibleFrame items, keeping uninterruptible ones."""
|
||||
kept: asyncio.Queue = asyncio.Queue()
|
||||
while not self.empty():
|
||||
item = self.get_nowait()
|
||||
if isinstance(self._frame_getter(item), UninterruptibleFrame):
|
||||
kept.put_nowait(item)
|
||||
self.task_done()
|
||||
while not kept.empty():
|
||||
item = kept.get_nowait()
|
||||
self.put_nowait(item)
|
||||
kept.task_done()
|
||||
|
||||
|
||||
# Timeout in seconds for cancelling the input frame processing task.
|
||||
# This prevents hanging if a library swallows asyncio.CancelledError.
|
||||
INPUT_TASK_CANCEL_TIMEOUT_SECS = 3
|
||||
@@ -233,7 +292,7 @@ class FrameProcessor(BaseObject):
|
||||
# called. To resume processing frames we need to call
|
||||
# `resume_processing_frames()` which will wake up the event.
|
||||
self.__should_block_frames = False
|
||||
self.__process_queue = asyncio.Queue()
|
||||
self.__process_queue = UninterruptibleProcessQueue(frame_getter=lambda item: item[0])
|
||||
self.__process_event: Optional[asyncio.Event] = None
|
||||
self.__process_frame_task: Optional[asyncio.Task] = None
|
||||
self.__process_current_frame: Optional[Frame] = None
|
||||
@@ -861,9 +920,14 @@ class FrameProcessor(BaseObject):
|
||||
async def _start_interruption(self):
|
||||
"""Start handling an interruption by cancelling current tasks."""
|
||||
try:
|
||||
if isinstance(self.__process_current_frame, UninterruptibleFrame):
|
||||
# We don't want to cancel UninterruptibleFrame, so we simply
|
||||
# cleanup the queue.
|
||||
current_is_uninterruptible = isinstance(
|
||||
self.__process_current_frame, UninterruptibleFrame
|
||||
)
|
||||
if current_is_uninterruptible or self.__process_queue.has_uninterruptible:
|
||||
# We don't want to cancel an UninterruptibleFrame (either the
|
||||
# one currently being processed or one waiting in the queue),
|
||||
# so we simply cleanup the queue keeping only
|
||||
# UninterruptibleFrames.
|
||||
self.__reset_process_queue()
|
||||
else:
|
||||
# Cancel and re-create the process task.
|
||||
@@ -963,22 +1027,7 @@ class FrameProcessor(BaseObject):
|
||||
|
||||
def __reset_process_queue(self):
|
||||
"""Reset non-system frame processing queue."""
|
||||
# Create a new queue to insert UninterruptibleFrame frames.
|
||||
new_queue = asyncio.Queue()
|
||||
|
||||
# Process current queue and keep UninterruptibleFrame frames.
|
||||
while not self.__process_queue.empty():
|
||||
item = self.__process_queue.get_nowait()
|
||||
frame = item[0]
|
||||
if isinstance(frame, UninterruptibleFrame):
|
||||
new_queue.put_nowait(item)
|
||||
self.__process_queue.task_done()
|
||||
|
||||
# Put back UninterruptibleFrame frames into our process queue.
|
||||
while not new_queue.empty():
|
||||
item = new_queue.get_nowait()
|
||||
self.__process_queue.put_nowait(item)
|
||||
new_queue.task_done()
|
||||
self.__process_queue.reset()
|
||||
|
||||
async def __cancel_process_task(self):
|
||||
"""Cancel the non-system frame processing task."""
|
||||
|
||||
@@ -7,8 +7,8 @@
|
||||
"""Base classes for Large Language Model services with function calling support."""
|
||||
|
||||
import asyncio
|
||||
import inspect
|
||||
import json
|
||||
import uuid
|
||||
import warnings
|
||||
from dataclasses import dataclass
|
||||
from typing import (
|
||||
@@ -119,6 +119,9 @@ class FunctionCallRegistryItem:
|
||||
function_name: The name of the function (None for catch-all handler).
|
||||
handler: The handler for processing function call parameters.
|
||||
cancel_on_interruption: Whether to cancel the call on interruption.
|
||||
is_async: Whether this function call runs asynchronously. When True,
|
||||
the LLM continues the conversation immediately without waiting for
|
||||
the result. The result is injected later via a developer message.
|
||||
timeout_secs: Optional per-tool timeout in seconds. Overrides the global
|
||||
``function_call_timeout_secs`` for this specific function.
|
||||
"""
|
||||
@@ -126,6 +129,7 @@ class FunctionCallRegistryItem:
|
||||
function_name: Optional[str]
|
||||
handler: FunctionCallHandler | "DirectFunctionWrapper"
|
||||
cancel_on_interruption: bool
|
||||
is_async: bool = False
|
||||
timeout_secs: Optional[float] = None
|
||||
|
||||
|
||||
@@ -142,6 +146,9 @@ class FunctionCallRunnerItem:
|
||||
arguments: The arguments for the function.
|
||||
context: The LLM context.
|
||||
run_llm: Optional flag to control LLM execution after function call.
|
||||
group_id: Shared identifier for all function calls from the same LLM
|
||||
response batch. Used to trigger the LLM exactly once when the last
|
||||
call in the group completes.
|
||||
"""
|
||||
|
||||
registry_item: FunctionCallRegistryItem
|
||||
@@ -150,6 +157,7 @@ class FunctionCallRunnerItem:
|
||||
arguments: Mapping[str, Any]
|
||||
context: LLMContext
|
||||
run_llm: Optional[bool] = None
|
||||
group_id: Optional[str] = None
|
||||
|
||||
|
||||
class LLMService(UserTurnCompletionLLMServiceMixin, AIService):
|
||||
@@ -185,6 +193,7 @@ class LLMService(UserTurnCompletionLLMServiceMixin, AIService):
|
||||
def __init__(
|
||||
self,
|
||||
run_in_parallel: bool = True,
|
||||
group_parallel_tools: bool = True,
|
||||
function_call_timeout_secs: float = 10.0,
|
||||
settings: Optional[LLMSettings] = None,
|
||||
**kwargs,
|
||||
@@ -194,6 +203,10 @@ class LLMService(UserTurnCompletionLLMServiceMixin, AIService):
|
||||
Args:
|
||||
run_in_parallel: Whether to run function calls in parallel or sequentially.
|
||||
Defaults to True.
|
||||
group_parallel_tools: Whether to group parallel function calls so the LLM
|
||||
is triggered exactly once after all calls in the batch complete. When
|
||||
False, each function call result triggers the LLM independently as it
|
||||
arrives. Defaults to True.
|
||||
function_call_timeout_secs: Timeout in seconds for deferred function calls.
|
||||
Defaults to 10.0 seconds.
|
||||
settings: The runtime-updatable settings for the LLM service.
|
||||
@@ -208,6 +221,7 @@ class LLMService(UserTurnCompletionLLMServiceMixin, AIService):
|
||||
**kwargs,
|
||||
)
|
||||
self._run_in_parallel = run_in_parallel
|
||||
self._group_parallel_tools = group_parallel_tools
|
||||
self._function_call_timeout_secs = function_call_timeout_secs
|
||||
self._filter_incomplete_user_turns: bool = False
|
||||
self._base_system_instruction: Optional[str] = None
|
||||
@@ -538,6 +552,7 @@ class LLMService(UserTurnCompletionLLMServiceMixin, AIService):
|
||||
handler: Any,
|
||||
*,
|
||||
cancel_on_interruption: bool = True,
|
||||
is_async: bool = False,
|
||||
timeout_secs: Optional[float] = None,
|
||||
):
|
||||
"""Register a function handler for LLM function calls.
|
||||
@@ -549,6 +564,10 @@ class LLMService(UserTurnCompletionLLMServiceMixin, AIService):
|
||||
parameter.
|
||||
cancel_on_interruption: Whether to cancel this function call when an
|
||||
interruption occurs. Defaults to True.
|
||||
is_async: Whether this function call runs asynchronously. When True,
|
||||
the LLM continues the conversation immediately without waiting for
|
||||
the result. The result is injected later via a developer message.
|
||||
Defaults to False.
|
||||
timeout_secs: Optional per-tool timeout in seconds. Overrides the global
|
||||
``function_call_timeout_secs`` for this specific function. Defaults to
|
||||
None, which uses the global timeout.
|
||||
@@ -559,6 +578,7 @@ class LLMService(UserTurnCompletionLLMServiceMixin, AIService):
|
||||
function_name=function_name,
|
||||
handler=handler,
|
||||
cancel_on_interruption=cancel_on_interruption,
|
||||
is_async=is_async,
|
||||
timeout_secs=timeout_secs,
|
||||
)
|
||||
|
||||
@@ -567,6 +587,7 @@ class LLMService(UserTurnCompletionLLMServiceMixin, AIService):
|
||||
handler: DirectFunction,
|
||||
*,
|
||||
cancel_on_interruption: bool = True,
|
||||
is_async: bool = False,
|
||||
timeout_secs: Optional[float] = None,
|
||||
):
|
||||
"""Register a direct function handler for LLM function calls.
|
||||
@@ -579,6 +600,10 @@ class LLMService(UserTurnCompletionLLMServiceMixin, AIService):
|
||||
handler: The direct function to register. Must follow DirectFunction protocol.
|
||||
cancel_on_interruption: Whether to cancel this function call when an
|
||||
interruption occurs. Defaults to True.
|
||||
is_async: Whether this function call runs asynchronously. When True,
|
||||
the LLM continues the conversation immediately without waiting for
|
||||
the result. The result is injected later via a developer message.
|
||||
Defaults to False.
|
||||
timeout_secs: Optional per-tool timeout in seconds. Overrides the global
|
||||
``function_call_timeout_secs`` for this specific function. Defaults to
|
||||
None, which uses the global timeout.
|
||||
@@ -588,6 +613,7 @@ class LLMService(UserTurnCompletionLLMServiceMixin, AIService):
|
||||
function_name=wrapper.name,
|
||||
handler=wrapper,
|
||||
cancel_on_interruption=cancel_on_interruption,
|
||||
is_async=is_async,
|
||||
timeout_secs=timeout_secs,
|
||||
)
|
||||
|
||||
@@ -639,6 +665,11 @@ class LLMService(UserTurnCompletionLLMServiceMixin, AIService):
|
||||
|
||||
await self.broadcast_frame(FunctionCallsStartedFrame, function_calls=function_calls)
|
||||
|
||||
# When group_parallel_tools is True all calls share a group_id so the
|
||||
# aggregator triggers the LLM exactly once after the last one completes.
|
||||
# When False, group_id is None and each result triggers inference independently.
|
||||
group_id = str(uuid.uuid4()) if self._group_parallel_tools else None
|
||||
|
||||
runner_items = []
|
||||
for function_call in function_calls:
|
||||
if function_call.function_name in self._functions.keys():
|
||||
@@ -658,6 +689,7 @@ class LLMService(UserTurnCompletionLLMServiceMixin, AIService):
|
||||
tool_call_id=function_call.tool_call_id,
|
||||
arguments=function_call.arguments,
|
||||
context=function_call.context,
|
||||
group_id=group_id,
|
||||
)
|
||||
)
|
||||
|
||||
@@ -726,6 +758,8 @@ class LLMService(UserTurnCompletionLLMServiceMixin, AIService):
|
||||
tool_call_id=runner_item.tool_call_id,
|
||||
arguments=runner_item.arguments,
|
||||
cancel_on_interruption=item.cancel_on_interruption,
|
||||
is_async=item.is_async,
|
||||
group_id=runner_item.group_id,
|
||||
)
|
||||
|
||||
timeout_task: Optional[asyncio.Task] = None
|
||||
|
||||
@@ -46,7 +46,11 @@ from pipecat.frames.frames import (
|
||||
TTSAudioRawFrame,
|
||||
TTSStoppedFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.processors.frame_processor import (
|
||||
FrameDirection,
|
||||
FrameProcessor,
|
||||
UninterruptibleProcessQueue,
|
||||
)
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
from pipecat.utils.time import nanoseconds_to_seconds
|
||||
|
||||
@@ -520,15 +524,21 @@ class BaseOutputTransport(FrameProcessor):
|
||||
if not self._transport._allow_interruptions:
|
||||
return
|
||||
|
||||
# Cancel tasks.
|
||||
await self._cancel_audio_task()
|
||||
# Cancel all tasks.
|
||||
await self._cancel_clock_task()
|
||||
await self._cancel_video_task()
|
||||
|
||||
if self._audio_queue.has_uninterruptible:
|
||||
# Keep the audio task running but drain all interruptible frames
|
||||
# so the pending UninterruptibleFrames are still delivered.
|
||||
self._audio_queue.reset()
|
||||
else:
|
||||
await self._cancel_audio_task()
|
||||
self._create_audio_task()
|
||||
|
||||
# Create tasks.
|
||||
self._create_video_task()
|
||||
self._create_clock_task()
|
||||
self._create_audio_task()
|
||||
|
||||
# Let's send a bot stopped speaking if we have to.
|
||||
await self._bot_stopped_speaking()
|
||||
@@ -612,7 +622,7 @@ class BaseOutputTransport(FrameProcessor):
|
||||
def _create_audio_task(self):
|
||||
"""Create the audio processing task."""
|
||||
if not self._audio_task:
|
||||
self._audio_queue = asyncio.Queue()
|
||||
self._audio_queue = UninterruptibleProcessQueue()
|
||||
self._audio_task = self._transport.create_task(self._audio_task_handler())
|
||||
|
||||
async def _cancel_audio_task(self):
|
||||
|
||||
@@ -389,9 +389,13 @@ class LLMContextSummarizationUtil:
|
||||
|
||||
Scans messages from ``start_idx`` up to (but not including)
|
||||
``summary_end`` to identify tool calls whose responses either don't
|
||||
exist yet or fall in the kept portion of the context (>= summary_end).
|
||||
exist yet, fall in the kept portion of the context (>= summary_end),
|
||||
or are still marked as ``IN_PROGRESS`` (async calls whose results have
|
||||
not yet arrived).
|
||||
|
||||
This prevents summarizing tool call requests when their responses would
|
||||
remain in the kept context as orphans, which the OpenAI API rejects.
|
||||
remain in the kept context as orphans, which the OpenAI API rejects,
|
||||
and avoids summarizing async function calls before their results arrive.
|
||||
|
||||
Args:
|
||||
messages: List of messages to check.
|
||||
@@ -428,11 +432,13 @@ class LLMContextSummarizationUtil:
|
||||
if tool_call_id:
|
||||
pending_tool_calls[tool_call_id] = i
|
||||
|
||||
# Check for tool results
|
||||
# Check for tool results — treat IN_PROGRESS as still pending
|
||||
# (async function calls whose results have not yet arrived).
|
||||
if role == "tool":
|
||||
tool_call_id = msg.get("tool_call_id")
|
||||
if tool_call_id and tool_call_id in pending_tool_calls:
|
||||
pending_tool_calls.pop(tool_call_id)
|
||||
if msg.get("content") != "IN_PROGRESS":
|
||||
pending_tool_calls.pop(tool_call_id)
|
||||
|
||||
# If we have pending tool calls, return the earliest index
|
||||
if pending_tool_calls:
|
||||
|
||||
Reference in New Issue
Block a user