From fe8573322f7d832a77dddc0411c78c620341a732 Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Mon, 7 Jul 2025 11:29:49 -0400 Subject: [PATCH] AWS Strands demos --- examples/aws-strands/README.md | 60 ++++++ examples/aws-strands/black-box.py | 206 +++++++++++++++++++ examples/aws-strands/env.example | 8 + examples/aws-strands/explain-thinking.py | 249 +++++++++++++++++++++++ examples/aws-strands/requirements.txt | 6 + 5 files changed, 529 insertions(+) create mode 100644 examples/aws-strands/README.md create mode 100644 examples/aws-strands/black-box.py create mode 100644 examples/aws-strands/env.example create mode 100644 examples/aws-strands/explain-thinking.py create mode 100644 examples/aws-strands/requirements.txt diff --git a/examples/aws-strands/README.md b/examples/aws-strands/README.md new file mode 100644 index 000000000..8f2f289fd --- /dev/null +++ b/examples/aws-strands/README.md @@ -0,0 +1,60 @@ +# AWS Strands Examples + +This folder contains two Python examples demonstrating how to use Pipecat with the AWS Strands agent. + +## Overview + +These examples show how to delegate complex, multi-step tasks to a Strands agent, which can reason step-by-step and call tools to accomplish user requests. + +These examples are intentionally simplified for demonstration, using mock API calls. They work best if you ask it: + +> What's the weather where the Golden Gate Bridge is? + +## Example Scripts + +### `black-box.py` + +A minimal example that demonstrates how to use the Strands agent with Pipecat. The agent can handle multi-step queries by calling tools, but does not explain its reasoning out loud. + +### `explain-thinking.py` + +An enhanced example where the Strands agent explains each step of its reasoning in clear, simple language as it works through a multi-step task. + +## Quick Start + +1. **Clone the repository and navigate to this example:** + + ```bash + git clone https://github.com/pipecat-ai/pipecat.git + cd pipecat/examples/aws-strands + ``` + +2. **Set up a virtual environment:** + + ```bash + python -m venv venv + source venv/bin/activate # On Windows: venv\Scripts\activate + ``` + +3. **Install dependencies:** + + ```bash + pip install -r requirements.txt + ``` + +4. **Configure environment variables:** + + Copy the provided `env.example` file to `.env` and fill in the necessary credentials: + + ```bash + cp env.example .env + # Then edit .env with your preferred editor + ``` + +5. **Run an example:** + + ```bash + python black-box.py + # or + python explain-thinking.py + ``` diff --git a/examples/aws-strands/black-box.py b/examples/aws-strands/black-box.py new file mode 100644 index 000000000..12ce441a0 --- /dev/null +++ b/examples/aws-strands/black-box.py @@ -0,0 +1,206 @@ +# +# Copyright (c) 2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import argparse +import asyncio +import os + +from dotenv import load_dotenv +from loguru import logger +from strands import Agent, tool +from strands.models import BedrockModel + +from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import TTSSpeakFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.llm_service import FunctionCallParams +from pipecat.services.openai.llm import OpenAILLMService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams +from pipecat.transports.services.daily import DailyParams + +load_dotenv(override=True) + +"""This example demonstrates how to use the Strands agent with Pipecat. + +You can delegate complex, multi-step tasks to the Strands agent, which can cycle through LLM-based reasoning and tool calls to accomplish the task. + +Try asking: "What's the weather where the Golden Gate Bridge is?" +""" + +# Strands agent tools + + +@tool +def get_location_name_from_landmark(landmark: str) -> str: + """ + Get the location name from a landmark. + + Args: + landmark (str): The name of the landmark, e.g. "Golden Gate Bridge". + """ + # Simulate fetching location + return "San Francisco, CA" + + +@tool +def get_lat_long_from_location_name(location: str) -> dict: + """ + Get the latitude and longitude for a location name. + + Args: + location (str): The city and state, e.g. "San Francisco, CA". + """ + # Simulate fetching lat/long from a geocoding service + return {"lat": 37.7749, "long": -122.4194} + + +@tool +def get_current_weather_from_lat_long(lat: float, long: float) -> dict: + """ + Get the current weather for a specific latitude and longitude. + + Args: + lat (float): The latitude of the location. + long (float): The longitude of the location. + """ + # Simulate fetching weather data from a weather service + return {"conditions": "nice", "temperature": "75"} + + +# We store functions so objects (e.g. SileroVADAnalyzer) don't get +# instantiated. The function will be called when the desired transport gets +# selected. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), +} + + +async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool): + logger.info(f"Starting bot") + + strands_agent = Agent( + model=BedrockModel( + model_id="us.anthropic.claude-3-7-sonnet-20250219-v1:0", max_tokens=64000 + ), + tools=[ + get_location_name_from_landmark, + get_lat_long_from_location_name, + get_current_weather_from_lat_long, + ], + system_prompt=""" + You are a helpful personal assistant who can look up information about places and weather. + + Your key capabilities: + 1. Look up where landmarks are located. + 2. Find latitude and longitude for a location. + 3. Look up the current weather for a specific latitude and longitude. + + Explain each step of your reasoning in clear, simple, and concise language. Your responses will be converted to audio, so avoid special characters and numbered lists. + """, + ) + + async def handle_location_or_weather_related_queries(params: FunctionCallParams, query: str): + """ + Handle location or weather related queries. + + Args: + query (str): The user's query, e.g. "What's the weather where the Golden Gate Bridge is?". + """ + # Run in a background thread + # (Otherwise the agent blocks the event loop; one effect of that is that we don't hear + # "let me check on that" until the agent finishes) + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, strands_agent, query) + await params.result_callback(result.message) + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) + + llm.register_direct_function(handle_location_or_weather_related_queries) + + @llm.event_handler("on_function_calls_started") + async def on_function_calls_started(service, function_calls): + await tts.queue_frame(TTSSpeakFrame("Let me check on that.")) + + tools = ToolsSchema(standard_tools=[handle_location_or_weather_related_queries]) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way. Start by suggesting that the user ask about the weather where the Golden Gate Bridge is.", + }, + ] + + context = OpenAILLMContext(messages, tools) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + # Kick off the conversation. + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=handle_sigint) + + await runner.run(task) + + +if __name__ == "__main__": + from pipecat.examples.run import main + + main(run_example, transport_params=transport_params) diff --git a/examples/aws-strands/env.example b/examples/aws-strands/env.example new file mode 100644 index 000000000..cf1803b4d --- /dev/null +++ b/examples/aws-strands/env.example @@ -0,0 +1,8 @@ +OPENAI_API_KEY= +CARTESIA_API_KEY= +DEEPGRAM_API_KEY= +DAILY_API_KEY= +DAILY_SAMPLE_ROOM_URL= +AWS_SECRET_ACCESS_KEY= +AWS_ACCESS_KEY_ID= +AWS_REGION= \ No newline at end of file diff --git a/examples/aws-strands/explain-thinking.py b/examples/aws-strands/explain-thinking.py new file mode 100644 index 000000000..1bc04d8e5 --- /dev/null +++ b/examples/aws-strands/explain-thinking.py @@ -0,0 +1,249 @@ +# +# Copyright (c) 2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import argparse +import asyncio +import os +import threading +import time + +from dotenv import load_dotenv +from loguru import logger +from strands import Agent, tool +from strands.models import BedrockModel + +from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import TTSSpeakFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.llm_service import FunctionCallParams +from pipecat.services.openai.llm import OpenAILLMService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams +from pipecat.transports.services.daily import DailyParams + +load_dotenv(override=True) + +"""This example demonstrates how to use the Strands agent with Pipecat in a way where the agent explains its reasoning step-by-step. + +You can delegate complex, multi-step tasks to the Strands agent, which can cycle through LLM-based reasoning and tool calls to accomplish the task. + +Try asking: "What's the weather where the Golden Gate Bridge is?" +""" + + +# Strands agent tools + + +@tool +def get_location_name_from_landmark(landmark: str) -> str: + """ + Get the location name from a landmark. + + Args: + landmark (str): The name of the landmark, e.g. "Golden Gate Bridge". + """ + # Simulate fetching location (slowly) + time.sleep(3) + return "San Francisco, CA" + + +@tool +def get_lat_long_from_location_name(location: str) -> dict: + """ + Get the latitude and longitude for a location name. + + Args: + location (str): The city and state, e.g. "San Francisco, CA". + """ + # Simulate fetching lat/long from a geocoding service (slowly) + time.sleep(3) + return {"lat": 37.7749, "long": -122.4194} + + +@tool +def get_current_weather_from_lat_long(lat: float, long: float) -> dict: + """ + Get the current weather for a specific latitude and longitude. + + Args: + lat (float): The latitude of the location. + long (float): The longitude of the location. + """ + # Simulate fetching weather data from a weather service (slowly) + time.sleep(3) + return {"conditions": "nice", "temperature": "75"} + + +# We store functions so objects (e.g. SileroVADAnalyzer) don't get +# instantiated. The function will be called when the desired transport gets +# selected. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), +} + + +async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool): + logger.info(f"Starting bot") + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + + next_strands_message_is_last = False + strands_messages_queue = asyncio.Queue() + + def strands_callback_handler(**kwargs): + """ + Handle events from the Strands agent. + """ + nonlocal next_strands_message_is_last + if "event" in kwargs: + event_obj = kwargs["event"] + if event_obj and "messageStop" in event_obj: + message_stop = event_obj["messageStop"] + if message_stop and "stopReason" in message_stop: + stop_reason = message_stop["stopReason"] + if stop_reason == "end_turn": + next_strands_message_is_last = True + elif "message" in kwargs: + message_obj = kwargs["message"] + if message_obj and "content" in message_obj and "role" in message_obj: + role = message_obj["role"] + content = message_obj["content"] + if role == "assistant" and isinstance(content, list): + for content_obj in content: + if isinstance(content_obj, dict) and "text" in content_obj: + message = content_obj["text"] + if not next_strands_message_is_last: + strands_messages_queue.put_nowait(message) + + async def process_strands_messages(): + while True: + message = await strands_messages_queue.get() + await tts.queue_frame(TTSSpeakFrame(message)) + strands_messages_queue.task_done() + + asyncio.create_task(process_strands_messages()) + + strands_agent = Agent( + model=BedrockModel( + model_id="us.anthropic.claude-3-7-sonnet-20250219-v1:0", max_tokens=64000 + ), + tools=[ + get_location_name_from_landmark, + get_lat_long_from_location_name, + get_current_weather_from_lat_long, + ], + system_prompt=""" + You are a helpful personal assistant who can look up information about places and weather. + + Your key capabilities: + 1. Look up where landmarks are located. + 2. Find latitude and longitude for a location. + 3. Look up the current weather for a specific latitude and longitude. + + Explain each step of your reasoning in clear, simple, and concise language. Your responses will be converted to audio, so avoid special characters and numbered lists. + """, + callback_handler=strands_callback_handler, + ) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) + + async def handle_location_or_weather_related_queries(params: FunctionCallParams, query: str): + """ + Handle location or weather related queries. + + Args: + query (str): The user's query, e.g. "What's the weather where the Golden Gate Bridge is?". + """ + # Run in a background thread + # (Otherwise the agent blocks the event loop; one effect of that is that we don't hear + # the agent's "thinking" messages until the agent finishes) + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, strands_agent, query) + await params.result_callback(result.message) + + llm.register_direct_function(handle_location_or_weather_related_queries) + + @llm.event_handler("on_function_calls_started") + async def on_function_calls_started(service, function_calls): + await tts.queue_frame(TTSSpeakFrame("Let me check on that.")) + + tools = ToolsSchema(standard_tools=[handle_location_or_weather_related_queries]) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way. Start by suggesting that the user ask about the weather where the Golden Gate Bridge is.", + }, + ] + + context = OpenAILLMContext(messages, tools) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + # Kick off the conversation. + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=handle_sigint) + + await runner.run(task) + + +if __name__ == "__main__": + from pipecat.examples.run import main + + main(run_example, transport_params=transport_params) diff --git a/examples/aws-strands/requirements.txt b/examples/aws-strands/requirements.txt new file mode 100644 index 000000000..ce3ab667c --- /dev/null +++ b/examples/aws-strands/requirements.txt @@ -0,0 +1,6 @@ +fastapi +uvicorn +python-dotenv +pipecat-ai[webrtc,daily,deepgram,cartesia] +pipecat-ai-small-webrtc-prebuilt +strands-agents \ No newline at end of file