Files
pipecat/examples/function-calling/function-calling-openai-async-stream.py
Paul Kompfner 9a8cd5cee5 refactor(async-tool-messages): replace reminder grafting with caller-supplied template
Empirical testing showed the previous design — grafting a verbose
re-invocation reminder into the payload's `description` field for
started and intermediate messages — was actually making Nova Sonic
*worse*, not better: more spurious re-invocations of the same tool,
not fewer. Plausibly the long, instruction-shaped description text
reads as content the model has to respond to, where a terse status
update reads as ambient state.

Replace the reminder grafting with a caller-supplied `template`
keyword argument on `prepare_message_payload_for_realtime`. When
`None` (the default), the payload is serialized to its canonical
JSON form. When provided, `template.format(tool_call_id=…, status=…,
result=…, description=…)` is applied. The template is honored across
all kinds, so callers route per kind based on which wire channel
they're using.

Nova Sonic now defines its own bracketed plain-text template
(`_ASYNC_TOOL_RESULT_TEXT_TEMPLATE`) and applies it on the
cross-modal user-text channel (intermediate / final). The started
path stays on raw JSON (the formal AWS tool-result channel requires
valid JSON). A code comment at the template constant captures the
empirical finding for the next person — short framing yields much
better behavior, surprising as it sounds.

Tests updated for the new template behavior across all kinds. Also
reverts a stream-tool example sleep-duration tweak (20s → 10s) and
adds a commented-out alternative in the function-calling-openai-async-stream
example for parallel testing.
2026-05-06 16:50:56 -04:00

217 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: async function call with intermediate updates.
The ``track_current_location`` tool simulates a GPS tracker reporting the
device's position during a road trip from San Francisco to San Diego. It
sends two intermediate updates (via ``params.result_callback`` with
``is_final=False``) as the vehicle passes through cities along the way, then
delivers the final destination (via ``params.result_callback``). Each update
returns the same structure with a different city:
Update 1 {gps, city: "San Francisco"} ← trip start
Update 2 {gps, city: "Los Angeles"} ← passing through
Final {gps, city: "San Diego"} ← destination reached
Because the function is registered with ``cancel_on_interruption=False``, the
LLM can keep talking while the trip is in progress; each position update
arrives as a developer message so the LLM can narrate the journey to the user.
"""
import asyncio
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
FunctionCallResultProperties,
LLMRunFrame,
TTSSpeakFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def track_current_location(params: FunctionCallParams):
"""Simulate a GPS tracker reporting position during a road trip.
Step 1 San Francisco (trip start) (update)
Step 2 Los Angeles (passing through) (update)
Step 3 San Diego (destination) (final result)
"""
# First update: initial city estimate.
gps = {"lat": 37.7310, "lng": -122.4527}
await params.result_callback(
{"gps": gps, "city": "San Francisco"},
properties=FunctionCallResultProperties(is_final=False),
)
# Second update: revised city estimate.
await asyncio.sleep(10)
# await asyncio.sleep(20)
gps = {"lat": 33.96003, "lng": -118.40639}
await params.result_callback(
{"gps": gps, "city": "Los Angeles"},
properties=FunctionCallResultProperties(is_final=False),
)
# Final result: confirmed city.
await asyncio.sleep(10)
# await asyncio.sleep(20)
gps = {"lat": 32.743569, "lng": -117.20466}
await params.result_callback({"gps": gps, "city": "San Diego"})
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"])
tts = CartesiaTTSService(
api_key=os.environ["CARTESIA_API_KEY"],
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OpenAILLMService(
api_key=os.environ["OPENAI_API_KEY"],
enable_async_tool_cancellation=True,
settings=OpenAILLMService.Settings(
system_instruction=(
"You are a helpful assistant in a voice conversation. "
"Your responses will be spoken aloud, so avoid emojis, bullet points, or other "
"formatting that can't be spoken. "
"You have access to a function that starts tracking the user's location and "
"provides regular updates on it. When you receive the final location, tell the user "
"the destination has been reached."
),
),
)
# cancel_on_interruption=False makes this an async function call: the LLM
# continues the conversation immediately and receives updates/result later.
llm.register_function(
"track_current_location",
track_current_location,
cancel_on_interruption=False,
timeout_secs=30,
)
@llm.event_handler("on_function_calls_started")
async def on_function_calls_started(service, function_calls):
await tts.queue_frame(TTSSpeakFrame("Sure, tracking your location now."))
@llm.event_handler("on_function_calls_cancelled")
async def on_function_calls_cancelled(service, function_calls):
for item in function_calls:
logger.info(f"Function call cancelled: {item.function_name} [{item.tool_call_id}]")
location_function = FunctionSchema(
name="track_current_location",
description="Start tracking the user's current GPS location, reporting position updates until the user reaches their destination.",
properties={},
required=[],
)
tools = ToolsSchema(standard_tools=[location_function])
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
stt,
user_aggregator,
llm,
tts,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()