Compare commits

...

53 Commits

Author SHA1 Message Date
Filipi Fuchter
e44f3d3b61 Adding TODO 2025-12-01 09:33:35 -03:00
Filipi Fuchter
c408b00bb6 Removing the dailytransport option 2025-12-01 07:48:10 -03:00
Filipi Fuchter
21ced5ba68 Improving the launch script to already update the server .env file. 2025-12-01 07:43:29 -03:00
Filipi Fuchter
75765a876b Disabling memory by default since it is not needed by this example 2025-12-01 07:32:47 -03:00
Filipi Fuchter
6b0194daf1 Improving the script to configure the bot 2025-12-01 07:23:51 -03:00
Filipi Fuchter
9c8192b505 Using pipecat-ai-small-webrtc-prebuilt 2.0.0 2025-11-30 21:33:44 -03:00
Filipi Fuchter
1cb6afe527 Fixing ruff format. 2025-11-29 07:14:31 -03:00
Filipi Fuchter
19878888e7 Script for configuring the agent. 2025-11-29 07:13:54 -03:00
Filipi Fuchter
8da56fa56f Enabling RTVI 2025-11-29 07:13:40 -03:00
Filipi Fuchter
b69ede2fbe script for destroying the agent. 2025-11-29 06:56:44 -03:00
Filipi Fuchter
119f753848 Script fou lauching the agent. 2025-11-29 06:54:53 -03:00
Filipi Fuchter
7dd5a21b4b Handling different session ids. 2025-11-29 06:53:37 -03:00
Filipi Fuchter
ed17aab7dc Client for smallwebrtc 2025-11-28 15:58:51 -03:00
Filipi Fuchter
a3d3e3136c Configuring the ice servers. 2025-11-28 15:40:59 -03:00
Filipi Fuchter
9fcc7a25fd Returning the answer to web request. 2025-11-28 15:21:11 -03:00
Filipi Fuchter
8d98af4990 Returning the answer to web request. 2025-11-28 15:20:17 -03:00
Filipi Fuchter
73b9bbaad2 Receiving the answer 2025-11-28 15:16:24 -03:00
Filipi Fuchter
e1def85747 Sending the answer in chunks 2025-11-28 14:37:53 -03:00
Filipi Fuchter
43e8641303 Fixing to receive all chunks 2025-11-28 13:20:17 -03:00
Filipi Fuchter
9d5acb12c7 Trying to fix the server 2025-11-28 12:06:43 -03:00
Filipi Fuchter
6e88619e0d Trying to fix the server 2025-11-28 10:57:49 -03:00
Filipi Fuchter
e8584eda03 Trying to fix the server 2025-11-28 10:32:51 -03:00
Filipi Fuchter
a94cc6543f Fixing pipecat agent. 2025-11-28 10:29:22 -03:00
Filipi Fuchter
3de809157f Passing the correct runner arguments. 2025-11-28 10:15:48 -03:00
Filipi Fuchter
16b833c194 Refactoring to use smallwebrtc 2025-11-28 09:33:36 -03:00
Filipi Fuchter
b8a89589d6 Refactoring pipecat agent. 2025-11-28 07:47:39 -03:00
Filipi Fuchter
bffdf43fd4 Refactoring the pipecat agent 2025-11-28 07:34:39 -03:00
Filipi Fuchter
b235ea7aaf Extracting the result from response. 2025-11-28 07:28:45 -03:00
Filipi Fuchter
79b6ac0514 Invoking the agent. 2025-11-28 07:23:16 -03:00
Filipi Fuchter
ddb0cbed77 Trying to invoke agentcore. 2025-11-28 07:00:12 -03:00
Filipi Fuchter
ebbb097e5f Creating server 2025-11-28 06:32:29 -03:00
Filipi Fuchter
a902981b95 Updating dependencies. 2025-11-28 06:18:28 -03:00
Filipi Fuchter
ced707a5c7 Refactoring the examples. 2025-11-28 06:15:38 -03:00
Filipi Fuchter
921d2a0ced Adding missing dependency 2025-11-28 06:05:08 -03:00
Filipi Fuchter
3f3b8547a1 Ignoring local dev variable when deploying the agent. 2025-11-28 06:01:23 -03:00
Filipi Fuchter
1acb7b18b1 Refactoring how we are structuring the code. 2025-11-28 05:58:32 -03:00
Filipi Fuchter
4b230860a5 Simple script for testing if we are correctly gathering the ice candidates in the browser. 2025-11-28 05:51:28 -03:00
Filipi Fuchter
db0583ae88 Refactoring the bots 2025-11-28 05:44:34 -03:00
Filipi Fuchter
9c9e328d67 Fixing launch. 2025-11-27 22:29:25 -03:00
Filipi Fuchter
fe68b27559 passing flag if is ssl or not. 2025-11-27 18:36:29 -03:00
Filipi Fuchter
bbf6077ef6 testing turn 2025-11-27 18:12:24 -03:00
Filipi Fuchter
94a7ffcbcf fixing format 2025-11-27 18:04:45 -03:00
Filipi Fuchter
8ae27020a8 Testing STUN\TURN candidates 2025-11-27 18:00:47 -03:00
Filipi Fuchter
eda2e3c3f1 Testing STUN\TURN candidates 2025-11-27 17:51:05 -03:00
Filipi Fuchter
28c8fbeab6 Testing UDP connection. 2025-11-27 17:19:35 -03:00
Filipi Fuchter
f42fba6fa9 Fixing ruff format. 2025-11-27 16:35:49 -03:00
Filipi Fuchter
e891f05bb9 Creating a script to launch already passing the environemnt variables. 2025-11-27 16:34:43 -03:00
Filipi Fuchter
625d56e8fa Simplest possible example using agentcore 2025-11-27 16:22:32 -03:00
Paul Kompfner
2edfa493b6 Amazon Bedrock AgentCore exploration, cont'd 2025-11-25 10:46:33 -05:00
Paul Kompfner
efa0669155 Amazon Bedrock AgentCore exploration, cont'd 2025-11-20 11:36:51 -05:00
Paul Kompfner
19f344e41a Amazon Bedrock AgentCore exploration, cont'd 2025-11-20 11:04:35 -05:00
Paul Kompfner
d1ce2f52f3 Amazon Bedrock AgentCore exploration, cont'd 2025-11-20 10:59:07 -05:00
Paul Kompfner
0c2723052c Amazon Bedrock AgentCore exploration 2025-11-19 17:39:55 -05:00
18 changed files with 5330 additions and 0 deletions

5
examples/aws-agentcore/.gitignore vendored Normal file
View File

@@ -0,0 +1,5 @@
.bedrock_agentcore.yaml
.dockerignore
.bedrock_agentcore
.bkp
logs

View File

@@ -0,0 +1,121 @@
# Amazon Bedrock AgentCore Runtime Example
This example demonstrates how to prepare a Pipecat bot for deployment to **Amazon Bedrock AgentCore Runtime** and enable it to invoke AgentCore tools.
## Overview
This example shows the set needed to:
- Deploy your Pipecat bot to Amazon Bedrock AgentCore Runtime (which hosts and runs your bot)
- Enable your bot to invoke AgentCore tools while running in the AgentCore Runtime
The key additions to a standard Pipecat bot are the AgentCore-specific configurations and tool invocation handling that allow your bot to leverage the full AgentCore ecosystem.
## Prerequisites
- Accounts with:
- AWS
- OpenAI
- Deepgram
- Cartesia
- Daily
- Python 3.10 or higher
- `uv` package manager
## IAM Configuration
Configure your IAM user with the necessary policies for AgentCore usage. Start with these:
- `BedrockAgentCoreFullAccess`
- A new policy (maybe named `BedrockAgentCoreCLI`) configured [like this](https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/runtime-permissions.html#runtime-permissions-starter-toolkit)
You can also choose to specify more granular permissions; see [Amazon Bedrock AgentCore docs](https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/runtime-permissions.html) for more information.
To simplify the remaining steps in this README, it's a good idea to export some AWS-specific environment variables:
```bash
export AWS_SECRET_ACCESS_KEY=...
export AWS_ACCESS_KEY_ID=...
export AWS_REGION=...
```
## Agent Configuration
Configure your bot as an AgentCore agent.
```bash
agentcore configure -e bot.py
```
Follow the prompts to complete the configuration.
**IMPORTANT:** when asked if you want to use "Direct Code Deploy" or "Container", choose "Container". Today there is an incompatibility between Pipecat and "Direct Code Deploy".
> For the curious: "Direct Code Deploy" requires that all bot dependencies have an `aarch64_manylinux2014` wheel...which is unfortunately not true for `numba`.
## Deployment to AgentCore Runtime
Deploy your configured bot to Amazon Bedrock AgentCore Runtime for production hosting.
```bash
agentcore launch --env OPENAI_API_KEY=... --env DEEPGRAM_API_KEY=... --env CARTESIA_API_KEY=... # -a <agent_name> (if multiple agents configured)
# or just use this script, which will read and send all environment variables
./scripts/launch.sh
```
You should see commands related to tailing logs printed to the console. Copy and save them for later use.
This is also the command you need to run after you've updated your bot code.
## Running on AgentCore Runtime
Run your bot on AgentCore Runtime.
```bash
agentcore invoke '{"roomUrl": "https://<your-domain>.daily.co/<room-name>"}' # -a <agent_name> (if multiple agents configured)
```
In case we wish to define the session-id, to create a new session each time we invoke:
```bash
aws-agentcore % agentcore invoke \
--session-id user-123456-conversation-12345679 \
'{"roomUrl": "https://<your-domain>.daily.co/<room-name>"}'
```
## Observation
Paste the log tailing command you received when deploying your bot to AgentCore Runtime. It should look something like:
```bash
# Replace with your actual command
aws logs tail /aws/bedrock-agentcore/runtimes/bot1-0uJkkT7QHC-DEFAULT --log-stream-name-prefix "2025/11/19/[runtime-logs]" --follow
```
## Removing your local agent
```bash
agentcore destroy
```
## Running Locally
You can also run your bot locally, using either the SmallWebRTC or Daily transport.
First, copy `env.example` to `.env` and fill in the values.
Then, run the bot:
```bash
# SmallWebRTC
PIPECAT_LOCAL_DEV=1 uv run python bot.py
# Daily
PIPECAT_LOCAL_DEV=1 uv run python bot.py -t daily -d
```
> Ideally you should be able to use `agentcore launch --local`, but it doesn't currently appear to be working (even with [this workaround](https://github.com/aws/bedrock-agentcore-starter-toolkit/issues/156) applied), at least not for this project.
## Additional Resources
For a comprehensive guide to getting started with Amazon Bedrock AgentCore, including detailed setup instructions, see the [Amazon Bedrock AgentCore Developer Guide](https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/what-is-bedrock-agentcore.html).

View File

@@ -0,0 +1,3 @@
PIPECAT_LOCAL_DEV=...
TURN_USERNAME=
TURN_CREDENTIAL=

View File

@@ -0,0 +1,196 @@
import asyncio
import os
import socket
import aioice
from bedrock_agentcore import BedrockAgentCoreApp
from dotenv import load_dotenv
load_dotenv(override=True)
app = BedrockAgentCoreApp()
def test_udp():
"""Test UDP connectivity using STUN server"""
stun_server = ("stun.l.google.com", 19302)
msg = b"\x00\x01\x00\x00" + b"\x21\x12\xa4\x42" + b"\x00" * 12
sock = None
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(3)
print("Testing UDP connectivity to STUN server...")
sock.sendto(msg, stun_server)
_, _ = sock.recvfrom(1024)
print("STUN response received")
return True
except Exception as e:
print("STUN test failed:", e)
return False
finally:
if sock:
sock.close()
async def _async_turn_test(turn_server, turn_port, username, password, turn_transport, turn_ssl):
"""Internal async TURN test using aioice."""
print(f"Testing TURN server: {turn_server}:{turn_port}:{turn_transport}")
connection = aioice.Connection(
ice_controlling=True,
turn_server=(turn_server, turn_port),
turn_username=username,
turn_password=password,
turn_ssl=turn_ssl,
turn_transport=turn_transport,
)
try:
print(f"Gathering ICE candidates via TURN {turn_server}:{turn_port} ...")
await connection.gather_candidates()
candidates = connection.local_candidates
relay_candidates = [c for c in candidates if c.type == "relay"]
if relay_candidates:
print("TURN relay candidate acquired:", relay_candidates[0])
return True
print("No TURN relay candidates received — allocation failed.")
return False
except Exception as e:
print(f"TURN test failed: {e}")
return False
finally:
await connection.close()
def test_turn_with_auth(server, port, username, password, transport, turn_ssl=False):
"""Sync wrapper for aioice TURN test."""
return asyncio.run(_async_turn_test(server, port, username, password, transport, turn_ssl))
def comprehensive_network_test():
"""Run comprehensive network connectivity tests."""
results = {}
# Test basic UDP connectivity
results["udp_stun"] = test_udp()
turn_username = os.getenv("TURN_USERNAME")
turn_credential = os.getenv("TURN_CREDENTIAL")
# TURN test list
turn_servers = [
(
"turn.cloudflare.com", # cleaned
3478,
turn_username,
turn_credential,
"udp",
False,
),
(
"turn.cloudflare.com", # cleaned
5349,
turn_username,
turn_credential,
"tcp",
True,
),
(
"turn.cloudflare.com", # cleaned
443,
turn_username,
turn_credential,
"tcp",
True,
),
(
"turn.cloudflare.com", # cleaned
80,
turn_username,
turn_credential,
"tcp",
False,
),
(
"turn.cloudflare.com", # cleaned
3478,
turn_username,
turn_credential,
"tcp",
False,
),
]
results["turn_tests"] = []
for host, port, username, password, transport, tls in turn_servers:
success = test_turn_with_auth(host, port, username, password, transport, tls)
results["turn_tests"].append({"server": f"{host}:{port}", "success": success})
return results
def test_tcp_connectivity(host, port):
"""Test TCP connectivity to a host."""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
result = sock.connect_ex((host, port))
sock.close()
if result == 0:
print(f"TCP connection to {host}:{port} successful")
return True
print(f"TCP connection to {host}:{port} failed")
return False
except Exception as e:
print(f"TCP test failed: {e}")
return False
@app.entrypoint
def my_agent(payload):
network_results = comprehensive_network_test()
udp_ok = network_results.get("udp_stun", False)
turn_ok = any(t["success"] for t in network_results["turn_tests"])
tcp_ok = network_results.get("tcp_test", False)
connectivity_status = []
if udp_ok:
connectivity_status.append("UDP/STUN")
if turn_ok:
connectivity_status.append("TURN")
if tcp_ok:
connectivity_status.append("TCP")
return {
"result": f"Hello {payload.get('name', 'World')}!",
"network_test_results": network_results,
"connectivity_status": ", ".join(connectivity_status)
if connectivity_status
else "No connectivity",
"webrtc_feasible": udp_ok or turn_ok,
"turn_available": turn_ok,
}
if __name__ == "__main__":
if os.getenv("PIPECAT_LOCAL_DEV") == "1":
# Running locally
results = comprehensive_network_test()
print(results)
else:
# Running on AgentCore Runtime
app.run()

View File

@@ -0,0 +1,4 @@
aioice
aiohttp
bedrock-agentcore
python-dotenv

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,7 @@
OPENAI_API_KEY=...
DEEPGRAM_API_KEY=...
CARTESIA_API_KEY=...
PIPECAT_LOCAL_DEV=...
TURN_USERNAME=
TURN_CREDENTIAL=

View File

@@ -0,0 +1,179 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from bedrock_agentcore import BedrockAgentCoreApp
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments, SmallWebRTCRunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.smallwebrtc.connection import IceServer, SmallWebRTCConnection
from pipecat.transports.smallwebrtc.request_handler import (
SmallWebRTCRequest,
)
from pipecat.transports.smallwebrtc.transport import SmallWebRTCTransport
app = BedrockAgentCoreApp()
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
yield {"status": "initializing bot!"}
# Returning the answer
if isinstance(transport, SmallWebRTCTransport):
yield {"status": "ANSWER:START"}
yield {"answer": transport._client._webrtc_connection.get_answer()}
yield {"status": "ANSWER:END"}
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
pipeline = Pipeline(
[
transport.input(),
rtvi,
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
observers=[RTVIObserver(rtvi)],
)
@rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi):
logger.info(f"Client ready")
await rtvi.set_bot_ready()
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
task_id = app.add_async_task("voice_agent")
await runner.run(task)
app.complete_async_task(task_id)
yield {"status": "completed"}
@app.entrypoint
async def agentcore_bot(payload, context):
"""Bot entry point for running on Amazon Bedrock AgentCore Runtime."""
request = SmallWebRTCRequest.from_dict(payload)
ice_servers = [
IceServer(
urls=[
"turn:turn.cloudflare.com:3478?transport=tcp",
"turn:turn.cloudflare.com:80?transport=tcp",
"turns:turn.cloudflare.com:5349?transport=tcp",
"turns:turn.cloudflare.com:443?transport=tcp",
],
username=os.getenv("TURN_USERNAME"),
credential=os.getenv("TURN_CREDENTIAL"),
)
]
pipecat_connection = SmallWebRTCConnection(ice_servers=ice_servers)
await pipecat_connection.initialize(sdp=request.sdp, type=request.type)
# Prepare runner arguments with the callback to run your bot
runner_args = SmallWebRTCRunnerArguments(
webrtc_connection=pipecat_connection, body=request.request_data
)
transport = await create_transport(runner_args, transport_params)
async for result in run_bot(transport, runner_args):
yield result
# Used for local development
async def bot(runner_args: RunnerArguments):
"""Bot entry point for running locally and on Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
async for result in run_bot(transport, runner_args):
pass # Consume the stream
if __name__ == "__main__":
# NOTE: ideally we shouldn't have to branch for local dev vs AgentCore, but
# local AgentCore container-based dev doesn't seem to be working, or at
# least not for this project.
if os.getenv("PIPECAT_LOCAL_DEV") == "1":
# Running locally
from pipecat.runner.run import main
main()
else:
# Running on AgentCore Runtime
app.run()

View File

@@ -0,0 +1,3 @@
aiohttp
bedrock-agentcore
pipecat-ai[webrtc,daily,silero,deepgram,openai,cartesia,runner]

View File

@@ -0,0 +1,38 @@
#!/bin/bash
# Script to configure the bot, patch Dockerfile and sync AGENT_RUNTIME_ARN
DOCKERFILE=".bedrock_agentcore/pipecat_agent/Dockerfile"
TARGET_LINE="RUN uv pip install -r requirements.txt"
# Extra dependencies needed by SmallWebRTC
INSERT_LINE="RUN apt update && apt install -y libgl1 libglib2.0-0 && apt clean"
###############################################
# STEP 1 — Configure agentcore
# Already configuring to use Docker as it is required by Pipecat
# Disabling memory by default since it is not needed by this example
###############################################
agentcore configure -e pipecat-agent.py --deployment-type container --container-runtime docker --disable-memory
###############################################
# STEP 2 — Wait until Dockerfile exists
###############################################
while [ ! -s "$DOCKERFILE" ]; do
sleep 0.2
done
###############################################
# STEP 3 — Patch Dockerfile
###############################################
cp "$DOCKERFILE" "$DOCKERFILE.bak"
awk -v target="$TARGET_LINE" -v insert="$INSERT_LINE" '
{
print $0
if ($0 ~ target) {
print insert
}
}
' "$DOCKERFILE.bak" > "$DOCKERFILE"
echo "Dockerfile patched successfully!"

View File

@@ -0,0 +1,5 @@
#!/bin/bash
# Script to destroy the agent
agentcore destroy

View File

@@ -0,0 +1,101 @@
#!/bin/bash
# Script to dynamically read all variables from .env file and launch agentcore
YAML_FILE=".bedrock_agentcore.yaml"
SERVER_ENV_FILE="../../server/.env"
###############################################
# STEP 1 — Launch the new agent
###############################################
# Check if the local .env file exists
if [ ! -f ".env" ]; then
echo "Error: .env file not found in current directory"
echo "Please create a .env file with your environment variables"
exit 1
fi
# Start building the agentcore launch command
LAUNCH_CMD="agentcore launch --auto-update-on-conflict"
echo "Loading environment variables from .env file..."
# Read each line from .env file and process it
while IFS= read -r line || [ -n "$line" ]; do
# Skip empty lines and comments
if [[ -z "$line" || "$line" =~ ^[[:space:]]*# ]]; then
continue
fi
# Check if line contains an equals sign (valid env var format)
if [[ "$line" =~ ^[^=]+=[^=]*$ ]]; then
# Extract variable name and value
VAR_NAME=$(echo "$line" | cut -d'=' -f1 | xargs)
VAR_VALUE=$(echo "$line" | cut -d'=' -f2- | xargs)
# Skip PIPECAT_LOCAL_DEV variable
if [[ "$VAR_NAME" == "PIPECAT_LOCAL_DEV" ]]; then
echo " Skipping: $VAR_NAME (ignored for deployment)"
continue
fi
# Skip if variable name or value is empty
if [[ -n "$VAR_NAME" && -n "$VAR_VALUE" ]]; then
# Add to launch command
LAUNCH_CMD="$LAUNCH_CMD --env $VAR_NAME=$VAR_VALUE"
echo " Added: $VAR_NAME"
fi
fi
done < ".env"
# Check if any environment variables were added
if [[ "$LAUNCH_CMD" == "agentcore launch --auto-update-on-conflict" ]]; then
echo "Warning: No valid environment variables found in .env file"
echo "Make sure your .env file contains variables in the format: KEY=value"
exit 1
fi
# Execute the command
echo ""
echo "Executing: $LAUNCH_CMD"
eval "$LAUNCH_CMD"
###############################################
# STEP 2 — Extract AGENT ARN from YAML
###############################################
if [ ! -f "$YAML_FILE" ]; then
echo "ERROR: $YAML_FILE not found!"
exit 1
fi
# Extracts: agent_arn: <value>
AGENT_ARN=$(grep -E "^\s*agent_arn:" "$YAML_FILE" | awk '{print $2}')
# Wait until it exists
while [ -z "$AGENT_ARN" ]; do
sleep 0.2
AGENT_ARN=$(grep -E "^\s*agent_arn:" "$YAML_FILE" | awk '{print $2}')
done
echo "Extracted Agent ARN: $AGENT_ARN"
###############################################
# STEP 3 — Update server .env
###############################################
if [ ! -f "$SERVER_ENV_FILE" ]; then
echo "ERROR: $SERVER_ENV_FILE not found!"
exit 1
fi
# If AGENT_RUNTIME_ARN already exists → replace
# If not → append
if grep -q "^AGENT_RUNTIME_ARN=" "$SERVER_ENV_FILE"; then
sed -i.bak "s|^AGENT_RUNTIME_ARN=.*|AGENT_RUNTIME_ARN=$AGENT_ARN|" "$SERVER_ENV_FILE"
else
echo "AGENT_RUNTIME_ARN=$AGENT_ARN" >> "$SERVER_ENV_FILE"
fi
echo ".env updated successfully!"
echo "AGENT_RUNTIME_ARN is now set to:"
echo "$AGENT_ARN"

View File

@@ -0,0 +1,25 @@
[project]
name = "agentcore-pipecat"
version = "0.1.0"
description = "Example for building Pipecat bots deployable to Amazon Bedrock AgentCore"
requires-python = ">=3.10"
dependencies = [
"aioice",
"aiohttp",
"bedrock-agentcore",
"python-dotenv",
"pipecat-ai[webrtc,daily,silero,deepgram,openai,cartesia,runner]",
"pipecat-ai-small-webrtc-prebuilt>=2.0.0"
]
[dependency-groups]
dev = [
"bedrock-agentcore-starter-toolkit",
"pyright>=1.1.404,<2",
"ruff>=0.12.11,<1",
]
[tool.ruff]
line-length = 100
[tool.ruff.lint]
select = ["I"]

View File

@@ -0,0 +1,56 @@
#!/bin/bash
# Script to dynamically read all variables from .env file and launch agentcore
# Check if .env file exists
if [ ! -f ".env" ]; then
echo "Error: .env file not found in current directory"
echo "Please create a .env file with your environment variables"
exit 1
fi
# Start building the agentcore launch command
LAUNCH_CMD="agentcore launch --auto-update-on-conflict"
echo "Loading environment variables from .env file..."
# Read each line from .env file and process it
while IFS= read -r line || [ -n "$line" ]; do
# Skip empty lines and comments
if [[ -z "$line" || "$line" =~ ^[[:space:]]*# ]]; then
continue
fi
# Check if line contains an equals sign (valid env var format)
if [[ "$line" =~ ^[^=]+=[^=]*$ ]]; then
# Extract variable name and value
VAR_NAME=$(echo "$line" | cut -d'=' -f1 | xargs)
VAR_VALUE=$(echo "$line" | cut -d'=' -f2- | xargs)
# Skip PIPECAT_LOCAL_DEV variable
if [[ "$VAR_NAME" == "PIPECAT_LOCAL_DEV" ]]; then
echo " Skipping: $VAR_NAME (ignored for deployment)"
continue
fi
# Skip if variable name or value is empty
if [[ -n "$VAR_NAME" && -n "$VAR_VALUE" ]]; then
# Add to launch command
LAUNCH_CMD="$LAUNCH_CMD --env $VAR_NAME=$VAR_VALUE"
echo " Added: $VAR_NAME"
fi
fi
done < ".env"
# Check if any environment variables were added
if [[ "$LAUNCH_CMD" == "agentcore launch --auto-update-on-conflict" ]]; then
echo "Warning: No valid environment variables found in .env file"
echo "Make sure your .env file contains variables in the format: KEY=value"
exit 1
fi
# Execute the command
echo ""
echo "Executing: $LAUNCH_CMD"
eval "$LAUNCH_CMD"

View File

@@ -0,0 +1,47 @@
// Simple script for testing if we are correctly gathering the ice candidates from a specific
// turn server in the browser
(async () => {
console.clear();
console.log("Starting ICE candidate test…");
const turnServer = {
urls: "turn:turn.cloudflare.com:80?transport=tcp",
username: "username",
credential: "password"
};
const pc = new RTCPeerConnection({
iceServers: [ turnServer ],
iceTransportPolicy: "all" // or "relay" if you want only TURN
});
let gotRelay = false;
pc.onicecandidate = event => {
if (!event.candidate) {
console.log("ICE gathering finished.");
if (gotRelay) {
console.log("%cTURN relay candidate FOUND ✔️", "color: green; font-weight: bold;");
} else {
console.log("%cNo TURN relay candidates detected ❌", "color: red; font-weight: bold;");
}
return;
}
const cand = event.candidate.candidate;
console.log("ICE Candidate:", cand);
if (cand.includes("typ relay")) {
console.log("%cTURN relay candidate detected!", "color: green;");
gotRelay = true;
}
};
// Create empty data channel (required to trigger ICE)
pc.createDataChannel("test");
const offer = await pc.createOffer();
await pc.setLocalDescription(offer);
console.log("Gathering ICE candidates…");
})();

View File

@@ -0,0 +1,5 @@
AWS_SECRET_ACCESS_KEY=
AWS_ACCESS_KEY_ID=
AWS_REGION=us-east-1=
# You can find this inside .bedrock_agentcore.yaml
AGENT_RUNTIME_ARN=

View File

@@ -0,0 +1,188 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import json
import os
import sys
import uuid
from contextlib import asynccontextmanager
from http import HTTPMethod
from typing import Any, Dict, List, Optional, TypedDict, Union
import boto3
import uvicorn
from botocore.response import StreamingBody
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI, HTTPException, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import RedirectResponse
from loguru import logger
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
load_dotenv(override=True)
app = FastAPI()
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Add your frontend URL
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# In-memory store of active sessions: session_id -> session info
active_sessions: Dict[str, Dict[str, Any]] = {}
# Initialize Bedrock client
bedrock = boto3.client("bedrock-agentcore")
# You can find this inside .bedrock_agentcore.yaml
AGENT_RUNTIME_ARN = os.getenv("AGENT_RUNTIME_ARN")
# Mount the frontend at /
app.mount("/client", SmallWebRTCPrebuiltUI)
@app.get("/", include_in_schema=False)
async def root_redirect():
return RedirectResponse(url="/client/")
async def post_offer(request: Request, session_id: str):
"""Handle WebRTC offer requests via SmallWebRTCRequestHandler."""
data = await request.json()
response = bedrock.invoke_agent_runtime(
agentRuntimeArn=AGENT_RUNTIME_ARN,
contentType="application/json",
payload=json.dumps(data),
runtimeSessionId=session_id,
)
answer_sdp = None
if "text/event-stream" in response.get("contentType", ""):
# Handle streaming response
streaming_body: StreamingBody = response["response"]
for line in streaming_body.iter_lines(chunk_size=1):
if line:
line = line.decode("utf-8")
if line.startswith("data: "):
line = line[6:]
print(f"Received line: {line}")
try:
event = json.loads(line)
print("Received event:", event)
# 4. Check for the 'answer' key
if "answer" in event:
payload = event["answer"]
if payload.get("type") == "answer":
answer_sdp = payload
print("WebRTC answer found. Stopping stream processing.")
# Break the line loop immediately
break
except json.JSONDecodeError:
print(f"Failed to parse extracted SSE payload as JSON: {line}")
pass
if answer_sdp is None:
raise HTTPException(500, "Did not find WebRTC answer in agent output")
return answer_sdp
@app.post("/start")
async def rtvi_start(request: Request):
"""Mimic Pipecat Cloud's /start endpoint."""
class IceServer(TypedDict, total=False):
urls: Union[str, List[str]]
class IceConfig(TypedDict):
iceServers: List[IceServer]
class StartBotResult(TypedDict, total=False):
sessionId: str
iceConfig: Optional[IceConfig]
# Parse the request body
try:
request_data = await request.json()
logger.debug(f"Received request: {request_data}")
except Exception as e:
logger.error(f"Failed to parse request body: {e}")
request_data = {}
# Store session info immediately in memory, replicate the behavior expected on Pipecat Cloud
session_id = str(uuid.uuid4())
active_sessions[session_id] = request_data
result: StartBotResult = {"sessionId": session_id}
if request_data.get("enableDefaultIceServers"):
result["iceConfig"] = IceConfig(
iceServers=[IceServer(urls=["stun:stun.l.google.com:19302"])]
)
return result
@app.api_route(
"/sessions/{session_id}/{path:path}",
methods=["GET", "POST", "PUT", "PATCH", "DELETE"],
)
async def proxy_request(
session_id: str, path: str, request: Request, background_tasks: BackgroundTasks
):
"""Mimic Pipecat Cloud's proxy."""
active_session = active_sessions.get(session_id)
if active_session is None:
return Response(content="Invalid or not-yet-ready session_id", status_code=404)
if path.endswith("api/offer"):
try:
# TODO add support for the PATCH method
# Add a variable to the payload as well to say if it is a new session of not
# and the OPERATION: if it is POST or PATCH, so we can validate it inside the agent.
if request.method == HTTPMethod.POST.value:
return await post_offer(request, session_id)
except Exception as e:
logger.error(f"Failed to parse WebRTC request: {e}")
return Response(content="Invalid WebRTC request", status_code=400)
logger.info(f"Received request for path: {path}")
return Response(status_code=200)
@asynccontextmanager
async def lifespan(app: FastAPI):
yield # Run app
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="WebRTC demo")
parser.add_argument(
"--host", default="localhost", help="Host for HTTP server (default: localhost)"
)
parser.add_argument(
"--port", type=int, default=7860, help="Port for HTTP server (default: 7860)"
)
parser.add_argument("--verbose", "-v", action="count")
args = parser.parse_args()
logger.remove(0)
if args.verbose:
logger.add(sys.stderr, level="TRACE")
else:
logger.add(sys.stderr, level="DEBUG")
uvicorn.run(app, host=args.host, port=args.port)

4331
examples/aws-agentcore/uv.lock generated Normal file

File diff suppressed because it is too large Load Diff