Files
pipecat/examples/persistent-context/persistent-context-aws-nova-sonic.py
Aleix Conchillo Flaqué b03247f360 Rename BaseTask → BaseWorker and reserve "task" for asyncio
Replaces every "task" identifier that referred to the BaseTask
abstraction with "worker". Asyncio task plumbing (asyncio.Task,
BaseTaskManager, TaskManager, create_task, cancel_task, etc.) stays
untouched. Highlights:

- Classes: BaseTask → BaseWorker, PipelineTask → PipelineWorker,
  LLMTask → LLMWorker, LLMContextTask → LLMContextWorker, TaskBus →
  WorkerBus, TaskRegistry → WorkerRegistry, TaskActivationArgs →
  WorkerActivationArgs, TaskReadyData → WorkerReadyData,
  TaskRegistryEntry → WorkerRegistryEntry, TaskObserver →
  WorkerObserver, all Bus*TaskMessage → Bus*WorkerMessage,
  BusAddTaskMessage.task field → worker, BusWorkerRegistryMessage.tasks
  field → workers.
- Methods/decorators: activate_task → activate_worker, deactivate_task
  → deactivate_worker, add_task → add_worker, watch_task →
  watch_worker, @task_ready → @worker_ready, setup_pipeline_task hook
  → setup_pipeline_worker.
- Params/fields: FrameProcessorSetup.pipeline_task and
  FunctionCallParams.pipeline_task → pipeline_worker. Parameter names
  like task_name → worker_name; spawn/run accept worker:.
- Files: pipeline/base_task.py → base_worker.py, pipeline/task.py →
  worker.py (plus a re-export shim at pipeline/task.py),
  task_observer.py → worker_observer.py, task_ready_decorator.py →
  worker_ready_decorator.py, pipecat.tasks → pipecat.workers,
  llm_task.py → llm_worker.py, llm_context_task.py →
  llm_context_worker.py, examples/multi-task → examples/multi-worker.

Back-compat:
- PipelineTask kept as a deprecated subclass of PipelineWorker that
  warns on construction.
- pipecat.pipeline.task re-exports PipelineWorker/PipelineTask/etc. so
  existing user imports keep working.
- FrameProcessor.pipeline_task kept as a deprecated property that
  forwards to pipeline_worker.

Local variables in examples that hold a worker (task = PipelineTask(...))
are renamed to worker = PipelineWorker(...). Asyncio-task locals
(runner_task, etc.) are preserved.
2026-05-21 19:07:13 -07:00

299 lines
11 KiB
Python

#
# Copyright (c) 2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import glob
import json
import os
from datetime import datetime
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.aws.nova_sonic.llm import AWSNovaSonicLLMService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
BASE_FILENAME = "/tmp/pipecat_conversation_"
async def fetch_weather_from_api(params: FunctionCallParams):
temperature = 75 if params.arguments["format"] == "fahrenheit" else 24
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"format": params.arguments["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
async def get_saved_conversation_filenames(params: FunctionCallParams):
# Construct the full pattern including the BASE_FILENAME
full_pattern = f"{BASE_FILENAME}*.json"
# Use glob to find all matching files
matching_files = glob.glob(full_pattern)
logger.debug(f"matching files: {matching_files}")
await params.result_callback({"filenames": matching_files})
# async def get_saved_conversation_filenames(
# function_name, tool_call_id, args, llm, context, result_callback
# ):
# pattern = re.compile(re.escape(BASE_FILENAME) + "\\d{8}_\\d{6}\\.json$")
# matching_files = []
# for filename in os.listdir("."):
# if pattern.match(filename):
# matching_files.append(filename)
# await result_callback({"filenames": matching_files})
async def save_conversation(params: FunctionCallParams):
timestamp = datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
filename = f"{BASE_FILENAME}{timestamp}.json"
try:
with open(filename, "w") as file:
messages = params.context.get_messages()
# remove the last few messages. in reverse order, they are:
# - the in progress save tool call
# - the invocation of the save tool call
# - the user ask to save (which may encompass one or more messages)
# the simplest thing to do is to pop messages until the last one is an assistant
# response
while messages and not (
isinstance(messages[-1], dict)
and messages[-1].get("role") == "assistant"
and "content" in messages[-1]
):
messages.pop()
if messages: # we never expect this to be empty
logger.debug(
f"writing conversation to {filename}\n{json.dumps(messages, indent=4)}"
)
json.dump(messages, file, indent=2)
await params.result_callback({"success": True})
except Exception as e:
await params.result_callback({"success": False, "error": str(e)})
async def load_conversation(params: FunctionCallParams):
async def _reset():
filename = params.arguments["filename"]
logger.debug(f"loading conversation from {filename}")
try:
with open(filename) as file:
messages = json.load(file)
# HACK: if using the older Nova Sonic (pre-2) model, you need a special way of
# triggering the first assistant response. The call to trigger_assistant_response(),
# commented out below, is part of this.
# messages.append(
# {
# "role": "developer",
# "content": f"{AWSNovaSonicLLMService.AWAIT_TRIGGER_ASSISTANT_RESPONSE_INSTRUCTION}",
# }
# )
# If the last message isn't from the user, add a message asking for a recap
if messages and messages[-1].get("role") != "user":
messages.append(
{
"role": "user",
"content": "Can you catch me up on what we were talking about?",
}
)
params.context.set_messages(messages)
assert isinstance(params.llm, AWSNovaSonicLLMService)
await params.llm.reset_conversation()
# await params.llm.trigger_assistant_response()
except Exception as e:
await params.result_callback({"success": False, "error": str(e)})
asyncio.create_task(_reset())
get_current_weather_tool = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
save_conversation_tool = FunctionSchema(
name="save_conversation",
description="Save the current conversation. Use this function to persist the current conversation to external storage.",
properties={},
required=[],
)
get_saved_conversation_filenames_tool = FunctionSchema(
name="get_saved_conversation_filenames",
description="Get a list of saved conversation histories. Returns a list of filenames. Each filename includes a date and timestamp. Each file is conversation history that can be loaded into this session.",
properties={},
required=[],
)
load_conversation_tool = FunctionSchema(
name="load_conversation",
description="Load a conversation history. Use this function to load a conversation history into the current session.",
properties={
"filename": {
"type": "string",
"description": "The filename of the conversation history to load.",
}
},
required=["filename"],
)
tools = ToolsSchema(
standard_tools=[
get_current_weather_tool,
save_conversation_tool,
get_saved_conversation_filenames_tool,
load_conversation_tool,
]
)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
# Specify initial system instruction.
system_instruction = (
"You are a friendly assistant. The user and you will engage in a spoken dialog exchanging "
"the transcripts of a natural real-time conversation. Keep your responses short, generally "
"two or three sentences for chatty scenarios. "
# HACK: if using the older Nova Sonic (pre-2) model, note that you need to inject a special
# bit of text into this instruction to allow the first assistant response to be
# programmatically triggered (which happens in the on_client_connected handler)
# f"{AWSNovaSonicLLMService.AWAIT_TRIGGER_ASSISTANT_RESPONSE_INSTRUCTION}"
)
llm = AWSNovaSonicLLMService(
secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
region=os.environ["AWS_REGION"], # as of 2025-05-06, us-east-1 is the only supported region
settings=AWSNovaSonicLLMService.Settings(
voice="tiffany", # matthew, tiffany, amy
system_instruction=system_instruction,
),
# you could choose to pass tools here rather than via context
# tools=tools
)
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("save_conversation", save_conversation)
llm.register_function("get_saved_conversation_filenames", get_saved_conversation_filenames)
llm.register_function("load_conversation", load_conversation)
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(), # Transport user input
user_aggregator,
llm, # LLM
transport.output(), # Transport bot output
assistant_aggregator,
]
)
worker = PipelineWorker(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await worker.queue_frames([LLMRunFrame()])
# HACK: if using the older Nova Sonic (pre-2) model, you need this special way of
# triggering the first assistant response. Note that this trigger requires a special
# corresponding bit of text in the system instruction.
# await llm.trigger_assistant_response()
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await worker.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(worker)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()