diff --git a/pyproject.toml b/pyproject.toml index cf83e53ee..f5ade679b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,7 +49,7 @@ aic = [ "aic-sdk~=1.1.0" ] anthropic = [ "anthropic~=0.49.0" ] assemblyai = [ "pipecat-ai[websockets-base]" ] asyncai = [ "pipecat-ai[websockets-base]" ] -aws = [ "aioboto3~=15.0.0", "pipecat-ai[websockets-base]" ] +aws = [ "aioboto3~=15.5.0", "pipecat-ai[websockets-base]" ] aws-nova-sonic = [ "aws_sdk_bedrock_runtime~=0.2.0; python_version>='3.12'" ] azure = [ "azure-cognitiveservices-speech~=1.42.0"] cartesia = [ "cartesia~=2.0.3", "pipecat-ai[websockets-base]" ] diff --git a/src/pipecat/services/aws/__init__.py b/src/pipecat/services/aws/__init__.py index 6f6903f75..88725f965 100644 --- a/src/pipecat/services/aws/__init__.py +++ b/src/pipecat/services/aws/__init__.py @@ -8,6 +8,7 @@ import sys from pipecat.services import DeprecatedModuleProxy +from .agent_core import * from .llm import * from .nova_sonic import * from .sagemaker import * diff --git a/src/pipecat/services/aws/agent_core.py b/src/pipecat/services/aws/agent_core.py new file mode 100644 index 000000000..be4806221 --- /dev/null +++ b/src/pipecat/services/aws/agent_core.py @@ -0,0 +1,258 @@ +# +# Copyright (c) 2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""AWS AgentCore Processor Module. + +This module defines the AWSAgentCoreProcessor, which invokes agents hosted on +Amazon Bedrock AgentCore Runtime and streams their responses as LLMTextFrames. +""" + +import asyncio +import json +import os +from typing import Callable, Optional + +import aioboto3 +from loguru import logger + +from pipecat.frames.frames import ( + Frame, + LLMContextFrame, + LLMFullResponseEndFrame, + LLMFullResponseStartFrame, + LLMTextFrame, +) +from pipecat.processors.aggregators.llm_context import LLMContext, LLMSpecificMessage +from pipecat.processors.aggregators.openai_llm_context import ( + OpenAILLMContext, + OpenAILLMContextFrame, +) +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor + + +def default_context_to_payload_transformer( + context: LLMContext | OpenAILLMContext, +) -> Optional[str]: + """Default transformer to create AgentCore payload from LLM context. + + Extracts the latest user or system message text and wraps it in {"prompt": ""}. + + Args: + context: The LLM context containing conversation messages. + + Returns: + A JSON string payload for AgentCore, or None if no valid message found. + """ + messages = context.messages + + if not messages: + return None + + last_message = messages[-1] + if isinstance(last_message, LLMSpecificMessage) or last_message.get("role") not in ( + "user", + "system", + ): + return None + + content = last_message.get("content") + if not content: + return None + + if isinstance(content, str): + prompt = content + elif isinstance(content, list): + prompt = " ".join([part.get("text", "") for part in content]) + else: + return None + + return json.dumps({"prompt": prompt}) + + +def default_response_to_output_transformer(response_line: str) -> Optional[str]: + """Default transformer to extract output text from AgentCore response. + + Expects responses with {"response": ""} format. + + Args: + response_line: The raw response line from AgentCore (without "data: " prefix). + + Returns: + The extracted output text, or None if no text found. + """ + response_json = json.loads(response_line) + return response_json.get("response") + + +class AWSAgentCoreProcessor(FrameProcessor): + """Processor that runs an Amazon Bedrock AgentCore agent. + + Input: + - LLMContextFrame: Supplies a context used to invoke the agent. + + Output: + - LLMTextFrame: The agent's text response(s). + A single agent invocation may result in multiple text frames. + + This processor transforms the input context to a payload for the AgentCore + agent, and transforms the agent's response(s) into output text frame(s). Both + mappings are configurable via transformers. Below is the default behavior. + + Input transformer (context_to_payload_transformer): + - Grabs the latest user or system message (if it's the latest message) + - Extracts its text content + - Constructs a payload that looks like {"prompt": ""} + + Output transformer (response_to_output_transformer): + - Expects responses that look like {"response": ""} + - Extracts the text for use in the LLMTextFrame(s) + """ + + def __init__( + self, + agentArn: str, + aws_access_key: Optional[str] = None, + aws_secret_key: Optional[str] = None, + aws_session_token: Optional[str] = None, + aws_region: Optional[str] = None, + context_to_payload_transformer: Optional[ + Callable[[LLMContext | OpenAILLMContext], Optional[str]] + ] = None, + response_to_output_transformer: Optional[Callable[[str], Optional[str]]] = None, + **kwargs, + ): + """Initialize the AWS AgentCore processor. + + Args: + agentArn: The Amazon Web Services Resource Name (ARN) of the agent. + aws_access_key: AWS access key ID. If None, uses default credentials. + aws_secret_key: AWS secret access key. If None, uses default credentials. + aws_session_token: AWS session token for temporary credentials. + aws_region: AWS region. + context_to_payload_transformer: Optional callable to transform + LLMContext into AgentCore payload string. If None, uses + default_context_to_payload_transformer. + response_to_output_transformer: Optional callable to extract output text + from AgentCore response. If None, uses + default_response_to_output_transformer. + **kwargs: Additional arguments passed to parent FrameProcessor. + """ + super().__init__(**kwargs) + + self._agentArn = agentArn + self._aws_session = aioboto3.Session() + + # Store AWS session parameters for creating client in async context + self._aws_params = { + "aws_access_key_id": aws_access_key or os.getenv("AWS_ACCESS_KEY_ID"), + "aws_secret_access_key": aws_secret_key or os.getenv("AWS_SECRET_ACCESS_KEY"), + "aws_session_token": aws_session_token or os.getenv("AWS_SESSION_TOKEN"), + "region_name": aws_region or os.getenv("AWS_REGION", "us-east-1"), + } + + # Set transformers with defaults + self._context_to_payload_transformer = ( + context_to_payload_transformer or default_context_to_payload_transformer + ) + self._response_to_output_transformer = ( + response_to_output_transformer or default_response_to_output_transformer + ) + + # State for managing output response bookends + self._output_response_open = False + self._last_text_frame_time: Optional[float] = None + self._close_task: Optional[asyncio.Task] = None + self._output_response_timeout = 1.0 # seconds + + async def _close_output_response_after_timeout(self): + """Close the output response after timeout if no new text frames arrive.""" + await asyncio.sleep(self._output_response_timeout) + if self._output_response_open: + self._output_response_open = False + await self.push_frame(LLMFullResponseEndFrame()) + + async def _push_text_frame(self, text: str): + """Push a text frame, managing output response bookends.""" + # Cancel any pending close task + if self._close_task and not self._close_task.done(): + await self.cancel_task(self._close_task) + + # Open output response if needed + if not self._output_response_open: + await self.push_frame(LLMFullResponseStartFrame()) + self._output_response_open = True + + # Push the text frame + await self.push_frame(LLMTextFrame(text)) + self._last_text_frame_time = asyncio.get_event_loop().time() + + # Schedule closing the output response after timeout + self._close_task = self.create_task(self._close_output_response_after_timeout()) + + async def process_frame(self, frame: Frame, direction: FrameDirection): + """Process incoming frames and handle LLM message frames. + + Args: + frame: The incoming frame to process. + direction: The direction of frame flow in the pipeline. + """ + await super().process_frame(frame, direction) + if isinstance(frame, (LLMContextFrame, OpenAILLMContextFrame)): + # Create payload to invoke AgentCore agent + payload = self._context_to_payload_transformer(frame.context) + + if not payload: + return + + async with self._aws_session.client("bedrock-agentcore", **self._aws_params) as client: + # Invoke the AgentCore agent + response = await client.invoke_agent_runtime( + agentRuntimeArn=self._agentArn, payload=payload.encode() + ) + + # Determine if this is a streamed multi-part response, which + # will affect our parsing + is_multi_part_response = "text/event-stream" in response.get("contentType", "") + + # Handle each response part (there may be one, for single + # responses, or multiple, for streamed multi-part responses) + async for part in response.get("response", []): + part_string = part.decode("utf-8") + + # In streamed multi-part responses, each part might have + # one or more lines, each of which starts with "data: ". + # Treat each line as a response. + if is_multi_part_response: + for line in part_string.split("\n"): + # Get response text from this line + if not line: + continue + if not line.startswith("data: "): + logger.warning(f"Expected line to start with 'data: ', got: {line}") + continue + line = line[6:] # omit "data: " + + # Transform response line to output text + text = self._response_to_output_transformer(line) + if text: + await self._push_text_frame(text) + + # In single-part responses, the whole part is one response + # and there's no "data: " prefix + else: + # Transform response part string to output text + text = self._response_to_output_transformer(part_string) + if text: + await self._push_text_frame(text) + + # Final close if output response is still open after all parts processed + if self._output_response_open: + if self._close_task and not self._close_task.done(): + await self.cancel_task(self._close_task) + self._output_response_open = False + await self.push_frame(LLMFullResponseEndFrame()) + else: + await self.push_frame(frame, direction) diff --git a/uv.lock b/uv.lock index eb2fca39c..7a937e54d 100644 --- a/uv.lock +++ b/uv.lock @@ -45,20 +45,20 @@ sdist = { url = "https://files.pythonhosted.org/packages/99/83/bf38b95d98c67b8eb [[package]] name = "aioboto3" -version = "15.0.0" +version = "15.5.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "aiobotocore", extra = ["boto3"] }, { name = "aiofiles" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/80/d0/ed107e16551ba1b93ddcca9a6bf79580450945268a8bc396530687b3189f/aioboto3-15.0.0.tar.gz", hash = "sha256:dce40b701d1f8e0886dc874d27cd9799b8bf6b32d63743f57e7bef7e4a562756", size = 225278, upload-time = "2025-06-26T16:30:48.967Z" } +sdist = { url = "https://files.pythonhosted.org/packages/a2/01/92e9ab00f36e2899315f49eefcd5b4685fbb19016c7f19a9edf06da80bb0/aioboto3-15.5.0.tar.gz", hash = "sha256:ea8d8787d315594842fbfcf2c4dce3bac2ad61be275bc8584b2ce9a3402a6979", size = 255069, upload-time = "2025-10-30T13:37:16.122Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/bf/95/d69c744f408e5e4592fe53ed98fc244dd13b83d84cf1f83b2499d98bfcc9/aioboto3-15.0.0-py3-none-any.whl", hash = "sha256:9cf54b3627c8b34bb82eaf43ab327e7027e37f92b1e10dd5cfe343cd512568d0", size = 35785, upload-time = "2025-06-26T16:30:47.444Z" }, + { url = "https://files.pythonhosted.org/packages/e5/3e/e8f5b665bca646d43b916763c901e00a07e40f7746c9128bdc912a089424/aioboto3-15.5.0-py3-none-any.whl", hash = "sha256:cc880c4d6a8481dd7e05da89f41c384dbd841454fc1998ae25ca9c39201437a6", size = 35913, upload-time = "2025-10-30T13:37:14.549Z" }, ] [[package]] name = "aiobotocore" -version = "2.23.0" +version = "2.25.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "aiohttp" }, @@ -69,9 +69,9 @@ dependencies = [ { name = "python-dateutil" }, { name = "wrapt" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/9d/25/4b06ea1214ddf020a28df27dc7136ac9dfaf87929d51e6f6044dd350ed67/aiobotocore-2.23.0.tar.gz", hash = "sha256:0333931365a6c7053aee292fe6ef50c74690c4ae06bb019afdf706cb6f2f5e32", size = 115825, upload-time = "2025-06-12T23:46:38.055Z" } +sdist = { url = "https://files.pythonhosted.org/packages/62/94/2e4ec48cf1abb89971cb2612d86f979a6240520f0a659b53a43116d344dc/aiobotocore-2.25.1.tar.gz", hash = "sha256:ea9be739bfd7ece8864f072ec99bb9ed5c7e78ebb2b0b15f29781fbe02daedbc", size = 120560, upload-time = "2025-10-28T22:33:21.787Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/ea/43/ccf9b29669cdb09fd4bfc0a8effeb2973b22a0f3c3be4142d0b485975d11/aiobotocore-2.23.0-py3-none-any.whl", hash = "sha256:8202cebbf147804a083a02bc282fbfda873bfdd0065fd34b64784acb7757b66e", size = 84161, upload-time = "2025-06-12T23:46:36.305Z" }, + { url = "https://files.pythonhosted.org/packages/95/2a/d275ec4ce5cd0096665043995a7d76f5d0524853c76a3d04656de49f8808/aiobotocore-2.25.1-py3-none-any.whl", hash = "sha256:eb6daebe3cbef5b39a0bb2a97cffbe9c7cb46b2fcc399ad141f369f3c2134b1f", size = 86039, upload-time = "2025-10-28T22:33:19.949Z" }, ] [package.optional-dependencies] @@ -620,30 +620,30 @@ wheels = [ [[package]] name = "boto3" -version = "1.38.27" +version = "1.40.61" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "botocore" }, { name = "jmespath" }, { name = "s3transfer" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/e7/96/fc74d8521d2369dd8c412438401ff12e1350a1cd3eab5c758ed3dd5e5f82/boto3-1.38.27.tar.gz", hash = "sha256:94bd7fdd92d5701b362d4df100d21e28f8307a67ff56b6a8b0398119cf22f859", size = 111875, upload-time = "2025-05-30T19:32:41.352Z" } +sdist = { url = "https://files.pythonhosted.org/packages/ed/f9/6ef8feb52c3cce5ec3967a535a6114b57ac7949fd166b0f3090c2b06e4e5/boto3-1.40.61.tar.gz", hash = "sha256:d6c56277251adf6c2bdd25249feae625abe4966831676689ff23b4694dea5b12", size = 111535, upload-time = "2025-10-28T19:26:57.247Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/43/8b/b2361188bd1e293eede1bc165e2461d390394f71ec0c8c21211c8dabf62c/boto3-1.38.27-py3-none-any.whl", hash = "sha256:95f5fe688795303a8a15e8b7e7f255cadab35eae459d00cc281a4fd77252ea80", size = 139938, upload-time = "2025-05-30T19:32:38.006Z" }, + { url = "https://files.pythonhosted.org/packages/61/24/3bf865b07d15fea85b63504856e137029b6acbc73762496064219cdb265d/boto3-1.40.61-py3-none-any.whl", hash = "sha256:6b9c57b2a922b5d8c17766e29ed792586a818098efe84def27c8f582b33f898c", size = 139321, upload-time = "2025-10-28T19:26:55.007Z" }, ] [[package]] name = "botocore" -version = "1.38.27" +version = "1.40.61" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "jmespath" }, { name = "python-dateutil" }, { name = "urllib3" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/36/5e/67899214ad57f7f26af5bd776ac5eb583dc4ecf5c1e52e2cbfdc200e487a/botocore-1.38.27.tar.gz", hash = "sha256:9788f7efe974328a38cbade64cc0b1e67d27944b899f88cb786ae362973133b6", size = 13919963, upload-time = "2025-05-30T19:32:29.657Z" } +sdist = { url = "https://files.pythonhosted.org/packages/28/a3/81d3a47c2dbfd76f185d3b894f2ad01a75096c006a2dd91f237dca182188/botocore-1.40.61.tar.gz", hash = "sha256:a2487ad69b090f9cccd64cf07c7021cd80ee9c0655ad974f87045b02f3ef52cd", size = 14393956, upload-time = "2025-10-28T19:26:46.108Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/7e/83/a753562020b69fa90cebc39e8af2c753b24dcdc74bee8355ee3f6cefdf34/botocore-1.38.27-py3-none-any.whl", hash = "sha256:a785d5e9a5eda88ad6ab9ed8b87d1f2ac409d0226bba6ff801c55359e94d91a8", size = 13580545, upload-time = "2025-05-30T19:32:26.712Z" }, + { url = "https://files.pythonhosted.org/packages/38/c5/f6ce561004db45f0b847c2cd9b19c67c6bf348a82018a48cb718be6b58b0/botocore-1.40.61-py3-none-any.whl", hash = "sha256:17ebae412692fd4824f99cde0f08d50126dc97954008e5ba2b522eb049238aa7", size = 14055973, upload-time = "2025-10-28T19:26:42.15Z" }, ] [[package]] @@ -4665,7 +4665,7 @@ docs = [ requires-dist = [ { name = "accelerate", marker = "extra == 'moondream'", specifier = "~=1.10.0" }, { name = "aic-sdk", marker = "extra == 'aic'", specifier = "~=1.1.0" }, - { name = "aioboto3", marker = "extra == 'aws'", specifier = "~=15.0.0" }, + { name = "aioboto3", marker = "extra == 'aws'", specifier = "~=15.5.0" }, { name = "aiofiles", specifier = ">=24.1.0,<25" }, { name = "aiohttp", specifier = ">=3.11.12,<4" }, { name = "aiortc", marker = "extra == 'webrtc'", specifier = ">=1.13.0,<2" }, @@ -6220,14 +6220,14 @@ wheels = [ [[package]] name = "s3transfer" -version = "0.13.1" +version = "0.14.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "botocore" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/6d/05/d52bf1e65044b4e5e27d4e63e8d1579dbdec54fce685908ae09bc3720030/s3transfer-0.13.1.tar.gz", hash = "sha256:c3fdba22ba1bd367922f27ec8032d6a1cf5f10c934fb5d68cf60fd5a23d936cf", size = 150589, upload-time = "2025-07-18T19:22:42.31Z" } +sdist = { url = "https://files.pythonhosted.org/packages/62/74/8d69dcb7a9efe8baa2046891735e5dfe433ad558ae23d9e3c14c633d1d58/s3transfer-0.14.0.tar.gz", hash = "sha256:eff12264e7c8b4985074ccce27a3b38a485bb7f7422cc8046fee9be4983e4125", size = 151547, upload-time = "2025-09-09T19:23:31.089Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/6d/4f/d073e09df851cfa251ef7840007d04db3293a0482ce607d2b993926089be/s3transfer-0.13.1-py3-none-any.whl", hash = "sha256:a981aa7429be23fe6dfc13e80e4020057cbab622b08c0315288758d67cabc724", size = 85308, upload-time = "2025-07-18T19:22:40.947Z" }, + { url = "https://files.pythonhosted.org/packages/48/f0/ae7ca09223a81a1d890b2557186ea015f6e0502e9b8cb8e1813f1d8cfa4e/s3transfer-0.14.0-py3-none-any.whl", hash = "sha256:ea3b790c7077558ed1f02a3072fb3cb992bbbd253392f4b6e9e8976941c7d456", size = 85712, upload-time = "2025-09-09T19:23:30.041Z" }, ] [[package]]