Compare commits

..

6 Commits

Author SHA1 Message Date
James Hush
d41ee5319e Add example for getting just recording links 2025-10-23 14:37:39 +08:00
James Hush
45256903b5 Remove log 2025-10-20 17:03:31 +08:00
James Hush
d37e63d972 Simplify create_daily_room.py 2025-10-20 17:02:05 +08:00
James Hush
013f869259 Simplfy fetch_s3_recording 2025-10-20 16:54:24 +08:00
James Hush
11594003e2 This is working 2025-10-20 16:49:37 +08:00
James Hush
2dd2a17b19 Backoff examples 2025-10-20 16:46:16 +08:00
17 changed files with 5272 additions and 4808 deletions

View File

@@ -21,21 +21,20 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0 # Fetch all history for setuptools_scm
- name: Install uv
uses: astral-sh/setup-uv@v3
with:
version: "latest"
- name: Set up Python
run: uv python install 3.10
- name: Install development dependencies
run: uv sync --group dev
- name: Build project
run: uv build
- name: Install project in editable mode
run: uv pip install --editable .
run: uv pip install --editable .

View File

@@ -9,69 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added the [Pipecat CLI](https://github.com/pipecat-ai/pipecat-cli) to the
required dependencies, enabling you to scaffold a new project directly from
`pipecat-ai`. Get started with:
```bash
uv run pipecat init
```
- Expanded support for universal `LLMContext` to `AWSNovaSonicLLMService`.
As a reminder, the context-setup pattern when using `LLMContext` is:
```python
context = LLMContext(messages, tools)
context_aggregator = LLMContextAggregatorPair(context)
```
(Note that even though `AWSNovaSonicLLMService` now supports the universal
`LLMContext`, it is not meant to be swapped out for another LLM service at
runtime.)
Worth noting: whether or not you use the new context-setup pattern with
`AWSNovaSonicLLMService`, some types have changed under the hood:
```python
## BEFORE:
# Context aggregator type
context_aggregator: AWSNovaSonicContextAggregatorPair
# Context frame type
frame: OpenAILLMContextFrame
# Context type
context: AWSNovaSonicLLMContext
# or
context: OpenAILLMContext
# Reading messages from context
messages = context.messages
## AFTER:
# Context aggregator type
context_aggregator: LLMContextAggregatorPair
# Context frame type
frame: LLMContextFrame
# Context type
context: LLMContext
# Reading messages from context
messages = context.get_messages()
```
- Added support for `bulbul:v3` model in `SarvamTTSService` and
`SarvamHttpTTSService`.
- Added support for `bulbul:v3` model in `SarvamTTSService` and `SarvamHttpTTSService`.
- Added `keyterms_prompt` parameter to `AssemblyAIConnectionParams`.
- Added `speech_model` parameter to `AssemblyAIConnectionParams` to access the
multilingual model.
- Added `speech_model` parameter to `AssemblyAIConnectionParams` to access the multilingual model.
-
- Added support for trickle ICE to the `SmallWebRTCTransport`.
- Added support for updating `OpenAITTSService` settings (`instructions` and
@@ -97,25 +40,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Package upgrades:
- `daily-python` upgraded to 0.20.0.
- `openai` upgraded to support up to 2.x.x.
- `openpipe` upgraded to support up to 5.x.x.
- `SpeechmaticsSTTService` updated dependencies for `speechmatics-rt>=0.5.0`.
### Deprecated
- The `send_transcription_frames` argument to `AWSNovaSonicLLMService` is
deprecated. Transcription frames are now always sent. They go upstream, to be
handled by the user context aggregator. See "Added" section for details.
- Types in `pipecat.services.aws.nova_sonic.context` have been deprecated due
to changes to support `LLMContext`. See "Changed" section for details.
### Fixed
- Fixed an issue in `RivaSegmentedSTTService` where a runtime error occurred due
to a mismatch in the \_handle_transcription method's signature.
to a mismatch in the _handle_transcription method's signature.
- Fixed multiple pipeline task cancellation issues. `asyncio.CancelledError` is
now handled properly in `PipelineTask` making it possible to cancel an asyncio

View File

@@ -44,10 +44,6 @@ Looking to build structured conversations? Check out [Pipecat Flows](https://git
Want to build beautiful and engaging experiences? Checkout the [Voice UI Kit](https://github.com/pipecat-ai/voice-ui-kit), a collection of components, hooks and templates for building voice AI applications quickly.
### 🛠️ CLI
Create a new project in under a minute with the [Pipecat CLI](https://github.com/pipecat-ai/pipecat-cli). Then use the CLI to monitor and deploy your agent to production.
### 🔍 Debugging
Looking for help debugging your pipeline and processors? Check out [Whisker](https://github.com/pipecat-ai/whisker), a real-time Pipecat debugger.

250
create_daily_room.py Executable file
View File

@@ -0,0 +1,250 @@
#!/usr/bin/env -S uv run
"""Utilities for creating Daily.co rooms with retry logic.
This module provides functions to create Daily rooms via REST API
with robust error handling, rate limiting, and exponential backoff retry logic.
"""
import asyncio
import os
import time
from typing import Dict, Optional
from httpx import AsyncClient, HTTPStatusError
from loguru import logger
from tenacity import (
AsyncRetrying,
RetryError,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
async def periodic_progress_logger(
progress_dict: Dict[str, int],
total: int,
interval_seconds: float = 5.0,
stop_event: Optional[asyncio.Event] = None,
):
"""Log progress periodically in the background.
Args:
progress_dict: Shared dict with 'completed' and 'failed' counts
total: Total number of items being processed
interval_seconds: How often to log progress (default 5 seconds)
stop_event: Event to signal when to stop logging
"""
if stop_event is None:
stop_event = asyncio.Event()
while not stop_event.is_set():
await asyncio.sleep(interval_seconds)
if stop_event.is_set():
break
total_processed = progress_dict["completed"] + progress_dict["failed"]
if total_processed > 0:
percentage = (total_processed / total) * 100
rate = total_processed / interval_seconds if interval_seconds > 0 else 0
logger.info(
f"⏳ Progress: {total_processed}/{total} ({percentage:.1f}%) - "
f"{progress_dict['completed']} succeeded, "
f"{progress_dict['failed']} failed"
)
async def create_daily_room(
name: Optional[str] = None,
privacy: str = "public",
exp_minutes: int = 10,
max_retries: int = 5,
) -> Optional[Dict]:
"""Create a Daily room with automatic retry on rate limit errors.
Uses tenacity library to handle rate limiting (429 errors) with
exponential backoff and automatic retries.
Args:
name: Room name (auto-generated if None). Must match /[A-Za-z0-9_-]+/ and be <= 128 chars
privacy: Room privacy setting ("public" or "private")
exp_minutes: Minutes until room expires (default 10)
max_retries: Maximum number of retry attempts on rate limit (default 5)
Returns:
Room object dict with 'name', 'url', 'id', 'config', etc., or None on failure
"""
# Calculate expiration timestamp (unix timestamp in seconds)
exp_timestamp = int(time.time()) + (exp_minutes * 60)
# Build request body
body = {
"privacy": privacy,
"properties": {
"exp": exp_timestamp,
},
}
if name:
body["name"] = name
headers = {
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
"Content-Type": "application/json",
}
try:
# Use tenacity's AsyncRetrying for automatic retry with exponential backoff
async for attempt in AsyncRetrying(
retry=retry_if_exception_type(HTTPStatusError),
stop=stop_after_attempt(max_retries),
wait=wait_exponential(multiplier=1, min=1, max=60),
reraise=True,
):
with attempt:
async with AsyncClient(timeout=30) as client:
response = await client.post(
url="https://api.daily.co/v1/rooms",
headers=headers,
json=body,
)
response.raise_for_status()
return response.json()
# This line should never be reached due to reraise=True, but satisfies type checker
return None
except RetryError as e:
# All retries exhausted
last_exception = e.last_attempt.exception()
logger.exception(f"Failed to create room after {max_retries} retries: {last_exception}")
return None
except Exception as e:
logger.exception(f"Unexpected error creating room: {e}")
return None
async def create_room_with_progress(
index: int, total: int, progress_dict: Dict[str, int], **kwargs
) -> Optional[Dict]:
"""Wrapper for create_daily_room that tracks progress.
Args:
index: Index of this room creation (0-based)
total: Total number of rooms being created
progress_dict: Shared dict for tracking progress {"completed": 0, "failed": 0}
**kwargs: Arguments passed to create_daily_room
Returns:
Room object dict or None
"""
result = await create_daily_room(**kwargs)
# Update progress
if result is not None:
progress_dict["completed"] += 1
else:
progress_dict["failed"] += 1
return result
async def test_create_rooms(
num_rooms: int = 1000,
progress_interval: float = 5.0,
) -> Dict[str, int | float]:
"""Attempt to create multiple Daily rooms concurrently.
This function demonstrates concurrent room creation and tracks
success/failure statistics. Rate limiting will likely occur when
creating many rooms quickly.
Args:
num_rooms: Number of rooms to attempt to create (default 1000)
progress_interval: How often to log progress in seconds (default 5.0)
Returns:
Dict with statistics: {'success': int, 'failed': int, 'total': int, 'elapsed_seconds': float}
"""
logger.info(f"Starting bulk room creation: attempting to create {num_rooms} rooms")
start_time = time.time()
# Shared progress tracking dictionary
progress_dict = {"completed": 0, "failed": 0}
# Start background progress logger
stop_event = asyncio.Event()
progress_task = asyncio.create_task(
periodic_progress_logger(progress_dict, num_rooms, progress_interval, stop_event)
)
# Create and execute all tasks concurrently
logger.info(f"Executing {num_rooms} concurrent room creation requests...")
tasks = [
create_room_with_progress(
index=i,
total=num_rooms,
progress_dict=progress_dict,
name=None, # Auto-generate names
privacy="public",
exp_minutes=10,
max_retries=5,
)
for i in range(num_rooms)
]
try:
results = await asyncio.gather(*tasks, return_exceptions=True)
finally:
# Stop the progress logger
stop_event.set()
await progress_task
# Count successes and failures
success_count = sum(1 for r in results if r is not None and not isinstance(r, Exception))
failed_count = num_rooms - success_count
elapsed_time = time.time() - start_time
# Log statistics
logger.info("=" * 60)
logger.info("BULK ROOM CREATION SUMMARY")
logger.info("=" * 60)
logger.info(f"Total rooms attempted: {num_rooms}")
logger.info(f"Successfully created: {success_count}")
logger.info(f"Failed to create: {failed_count}")
logger.info(f"Success rate: {(success_count / num_rooms * 100):.2f}%")
logger.info(f"Total time: {elapsed_time:.2f} seconds")
logger.info(f"Average time per room: {(elapsed_time / num_rooms):.3f} seconds")
logger.info("=" * 60)
return {
"success": success_count,
"failed": failed_count,
"total": num_rooms,
"elapsed_seconds": elapsed_time,
}
# Example usage
async def main():
"""Example usage of the room creation functions."""
# Test creating a single room
logger.info("Testing single room creation...")
room = await create_daily_room(exp_minutes=10)
if room:
logger.info(f"Created room: {room['name']} at {room['url']}")
else:
logger.error("Failed to create room")
# Uncomment to test bulk creation (warning: may hit rate limits!)
logger.info("\nTesting bulk room creation...")
stats = await test_create_rooms(num_rooms=1000)
logger.info(f"Final stats: {stats}")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -72,6 +72,7 @@ async def save_conversation(params: FunctionCallParams):
)
try:
with open(filename, "w") as file:
# todo: extract 'system' into the first message in the list
messages = params.context.get_messages()
# remove the last message, which is the instruction we just gave to save the conversation
messages.pop()

View File

@@ -90,6 +90,7 @@ async def save_conversation(params: FunctionCallParams):
)
try:
with open(filename, "w") as file:
# todo: extract 'system' into the first message in the list
messages = params.context.get_messages()
# remove the last message (the instruction to save the context)
messages.pop()

View File

@@ -20,8 +20,6 @@ from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
@@ -77,7 +75,7 @@ async def save_conversation(params: FunctionCallParams):
filename = f"{BASE_FILENAME}{timestamp}.json"
try:
with open(filename, "w") as file:
messages = params.context.get_messages()
messages = params.context.get_messages_for_persistent_storage()
# remove the last few messages. in reverse order, they are:
# - the in progress save tool call
# - the invocation of the save tool call
@@ -225,13 +223,13 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
llm.register_function("get_saved_conversation_filenames", get_saved_conversation_filenames)
llm.register_function("load_conversation", load_conversation)
context = LLMContext(
context = OpenAILLMContext(
messages=[
{"role": "system", "content": f"{system_instruction}"},
],
tools=tools,
)
context_aggregator = LLMContextAggregatorPair(context)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[

View File

@@ -18,8 +18,7 @@ from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.aws.nova_sonic.llm import AWSNovaSonicLLMService
@@ -120,7 +119,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
llm.register_function("get_current_weather", fetch_weather_from_api)
# Set up context and context management.
context = LLMContext(
# AWSNovaSonicService will adapt OpenAI LLM context objects with standard message format to
# what's expected by Nova Sonic.
context = OpenAILLMContext(
messages=[
{"role": "system", "content": f"{system_instruction}"},
{
@@ -130,7 +131,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
],
tools=tools,
)
context_aggregator = LLMContextAggregatorPair(context)
context_aggregator = llm.create_context_aggregator(context)
# Build the pipeline
pipeline = Pipeline(

267
fetch_s3_recording.py Executable file
View File

@@ -0,0 +1,267 @@
#!/usr/bin/env -S uv run
"""Utilities for fetching Daily.co recording URLs with retry logic.
This module provides functions to retrieve recording download links from Daily's REST API
with robust error handling, rate limiting, and exponential backoff retry logic.
"""
import asyncio
import os
from typing import Optional, Tuple
from httpx import AsyncClient, HTTPStatusError
from loguru import logger
from tenacity import (
AsyncRetrying,
RetryError,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
async def get_recording_s3_url_with_retry(
room_id: str,
max_retries: int = 5,
) -> Tuple[Optional[str], Optional[str]]:
"""Retrieve recording URL with exponential backoff and retry logic.
Uses tenacity library to handle rate limiting (429 errors) and other
transient errors with automatic exponential backoff.
Args:
room_id: Daily.co room identifier
max_retries: Maximum number of retry attempts (default 5)
Returns:
Tuple of (recording_url, recording_signed_url)
Returns (None, None) if no recording exists for the room.
"""
try:
# Use tenacity's AsyncRetrying for automatic retry with exponential backoff
async for attempt in AsyncRetrying(
retry=retry_if_exception_type((HTTPStatusError, Exception)),
stop=stop_after_attempt(max_retries),
wait=wait_exponential(multiplier=1, min=1, max=60),
reraise=True,
):
with attempt:
recording_url, recording_signed_url, status = await get_recording_s3_url(
room_id=room_id
)
# If no recording exists (status is None), return immediately - no retry
if status is None:
logger.debug(f"No recording found for room {room_id}")
return None, None
# If recording exists but is not finished yet, retry
if status != "finished":
logger.warning(
f"Recording not finished for room {room_id}, status: {status} "
f"(attempt {attempt.retry_state.attempt_number}/{max_retries})"
)
raise Exception(f"Recording not ready, status: {status}")
# Recording is finished, return the URLs
return recording_url, recording_signed_url
# This line should never be reached due to reraise=True, but satisfies type checker
return None, None
except RetryError as e:
# All retries exhausted
last_exception = e.last_attempt.exception()
logger.error(
f"Failed to retrieve recording URL for room {room_id} after {max_retries} attempts: "
f"{last_exception}"
)
return None, None
except Exception as e:
logger.exception(f"Unexpected error retrieving recording for room {room_id}: {e}")
return None, None
async def get_recording_s3_url(
room_id: str,
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
"""Get recording URL using Daily's REST API.
Args:
room_id: Daily.co room identifier
Returns:
Tuple of (recording_url, recording_signed_url, status)
- recording_url: The download link for the recording
- recording_signed_url: Same as recording_url (kept for backward compatibility)
- status: Recording status from Daily API
Raises:
HTTPStatusError: When HTTP errors occur (including rate limits)
"""
headers = {
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
"Content-Type": "application/json",
}
async with AsyncClient(timeout=180) as client:
# List recordings for the room
list_response = await client.get(
url=f"https://api.daily.co/v1/recordings?room_name={room_id}",
headers=headers,
)
list_response.raise_for_status()
list_data = list_response.json()
# Check if recording exists and is finished
if not list_data.get("data") or len(list_data["data"]) == 0:
return (None, None, None)
recording_id = list_data["data"][0].get("id")
status = list_data["data"][0].get("status")
if not recording_id or status != "finished":
return (None, None, status)
# Get the recording access link
link_response = await client.get(
url=f"https://api.daily.co/v1/recordings/{recording_id}/access-link",
headers=headers,
)
link_response.raise_for_status()
link_data = link_response.json()
recording_url = link_data.get("download_link")
if not recording_url:
logger.warning(f"No download link found for recording {recording_id}")
return (None, None, status)
# Return the same URL for both fields for backward compatibility
return (recording_url, recording_url, status)
async def get_recent_recordings(limit: int = 100) -> list[str]:
"""Get list of recent recording IDs from Daily API.
Args:
limit: Maximum number of recordings to retrieve (default 100)
Returns:
List of recording IDs (strings)
"""
try:
async with AsyncClient() as client:
response = await client.get(
url="https://api.daily.co/v1/recordings",
headers={
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
"Content-Type": "application/json",
},
timeout=30,
)
response.raise_for_status()
data = response.json()
recordings = data.get("data", [])
recording_ids = [rec.get("id") for rec in recordings[:limit] if rec.get("id")]
logger.info(f"Retrieved {len(recording_ids)} recording IDs from Daily API")
return recording_ids
except Exception as e:
logger.exception(f"Failed to get recordings from Daily API: {e}")
return []
async def main():
"""Test get_recording_s3_url_with_retry with recent recordings."""
logger.info("Starting recording fetch test...")
# Step 1: Get the most recent 100 recordings
logger.info("Fetching recent recordings...")
recording_ids = await get_recent_recordings(limit=100)
if not recording_ids:
logger.error("No recordings found. Cannot proceed with test.")
return
logger.info(f"Found {len(recording_ids)} recordings to fetch")
# Fetch access links for each recording concurrently
logger.info(
f"Attempting to fetch access links for {len(recording_ids)} recordings concurrently..."
)
# Create tasks for all recordings
async def get_recording_link(recording_id: str) -> Tuple[Optional[str], Optional[str]]:
"""Get download link for a specific recording ID."""
try:
headers = {
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
"Content-Type": "application/json",
}
async with AsyncClient(timeout=180) as client:
# Get the recording access link
link_response = await client.get(
url=f"https://api.daily.co/v1/recordings/{recording_id}/access-link",
headers=headers,
)
link_response.raise_for_status()
link_data = link_response.json()
recording_url = link_data.get("download_link")
if not recording_url:
logger.warning(f"No download link found for recording {recording_id}")
return (None, None)
return (recording_url, recording_url)
except Exception as e:
logger.exception(f"Failed to get access link for recording {recording_id}: {e}")
return (None, None)
tasks = [get_recording_link(recording_id) for recording_id in recording_ids]
# Execute all tasks concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
success_count = 0
not_found_count = 0
failed_count = 0
for i, (recording_id, result) in enumerate(zip(recording_ids, results), 1):
if isinstance(result, Exception):
failed_count += 1
logger.error(f"❌ [{i}/{len(recording_ids)}] Failed for {recording_id}: {result}")
elif isinstance(result, tuple) and len(result) == 2:
recording_url, recording_signed_url = result
if recording_url:
success_count += 1
logger.info(
f"✅ [{i}/{len(recording_ids)}] Found recording link for {recording_id}"
)
logger.debug(f" URL: {recording_url}")
else:
not_found_count += 1
logger.debug(f" [{i}/{len(recording_ids)}] No link for {recording_id}")
else:
failed_count += 1
logger.error(f"❌ [{i}/{len(recording_ids)}] Unexpected result type for {recording_id}")
# Summary
logger.info("\n" + "=" * 60)
logger.info("RECORDING FETCH TEST SUMMARY")
logger.info("=" * 60)
logger.info(f"Total recordings checked: {len(recording_ids)}")
logger.info(f"✅ Recordings found: {success_count}")
logger.info(f" No recordings: {not_found_count}")
logger.info(f"❌ Failed: {failed_count}")
logger.info(f"Success rate: {(success_count / len(recording_ids) * 100):.2f}%")
logger.info("=" * 60)
if __name__ == "__main__":
asyncio.run(main())

238
fetch_s3_recording_by_room.py Executable file
View File

@@ -0,0 +1,238 @@
#!/usr/bin/env -S uv run
"""Utilities for fetching Daily.co recording URLs with retry logic.
This module provides functions to retrieve recording download links from Daily's REST API
with robust error handling, rate limiting, and exponential backoff retry logic.
"""
import asyncio
import os
from typing import Optional, Tuple
from httpx import AsyncClient, HTTPStatusError
from loguru import logger
from tenacity import (
AsyncRetrying,
RetryError,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
async def get_recording_s3_url_with_retry(
room_id: str,
max_retries: int = 5,
) -> Tuple[Optional[str], Optional[str]]:
"""Retrieve recording URL with exponential backoff and retry logic.
Uses tenacity library to handle rate limiting (429 errors) and other
transient errors with automatic exponential backoff.
Args:
room_id: Daily.co room identifier
max_retries: Maximum number of retry attempts (default 5)
Returns:
Tuple of (recording_url, recording_signed_url)
Returns (None, None) if no recording exists for the room.
"""
try:
# Use tenacity's AsyncRetrying for automatic retry with exponential backoff
async for attempt in AsyncRetrying(
retry=retry_if_exception_type((HTTPStatusError, Exception)),
stop=stop_after_attempt(max_retries),
wait=wait_exponential(multiplier=1, min=1, max=60),
reraise=True,
):
with attempt:
recording_url, recording_signed_url, status = await get_recording_s3_url(
room_id=room_id
)
# If no recording exists (status is None), return immediately - no retry
if status is None:
logger.debug(f"No recording found for room {room_id}")
return None, None
# If recording exists but is not finished yet, retry
if status != "finished":
logger.warning(
f"Recording not finished for room {room_id}, status: {status} "
f"(attempt {attempt.retry_state.attempt_number}/{max_retries})"
)
raise Exception(f"Recording not ready, status: {status}")
# Recording is finished, return the URLs
return recording_url, recording_signed_url
# This line should never be reached due to reraise=True, but satisfies type checker
return None, None
except RetryError as e:
# All retries exhausted
last_exception = e.last_attempt.exception()
logger.error(
f"Failed to retrieve recording URL for room {room_id} after {max_retries} attempts: "
f"{last_exception}"
)
return None, None
except Exception as e:
logger.exception(f"Unexpected error retrieving recording for room {room_id}: {e}")
return None, None
async def get_recording_s3_url(
room_id: str,
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
"""Get recording URL using Daily's REST API.
Args:
room_id: Daily.co room identifier
Returns:
Tuple of (recording_url, recording_signed_url, status)
- recording_url: The download link for the recording
- recording_signed_url: Same as recording_url (kept for backward compatibility)
- status: Recording status from Daily API
Raises:
HTTPStatusError: When HTTP errors occur (including rate limits)
"""
headers = {
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
"Content-Type": "application/json",
}
async with AsyncClient(timeout=180) as client:
# List recordings for the room
list_response = await client.get(
url=f"https://api.daily.co/v1/recordings?room_name={room_id}",
headers=headers,
)
list_response.raise_for_status()
list_data = list_response.json()
# Check if recording exists and is finished
if not list_data.get("data") or len(list_data["data"]) == 0:
return (None, None, None)
recording_id = list_data["data"][0].get("id")
status = list_data["data"][0].get("status")
if not recording_id or status != "finished":
return (None, None, status)
# Get the recording access link
link_response = await client.get(
url=f"https://api.daily.co/v1/recordings/{recording_id}/access-link",
headers=headers,
)
link_response.raise_for_status()
link_data = link_response.json()
recording_url = link_data.get("download_link")
if not recording_url:
logger.warning(f"No download link found for recording {recording_id}")
return (None, None, status)
# Return the same URL for both fields for backward compatibility
return (recording_url, recording_url, status)
async def get_recent_rooms(limit: int = 100) -> list[str]:
"""Get list of recent room names from Daily API.
Args:
limit: Maximum number of rooms to retrieve (default 100, max 100)
Returns:
List of room names (strings)
"""
try:
async with AsyncClient() as client:
response = await client.get(
url=f"https://api.daily.co/v1/rooms?limit={min(limit, 100)}",
headers={
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
"Content-Type": "application/json",
},
timeout=30,
)
response.raise_for_status()
data = response.json()
rooms = data.get("data", [])
room_names = [room.get("name") for room in rooms if room.get("name")]
logger.info(f"Retrieved {len(room_names)} room names from Daily API")
return room_names
except Exception as e:
logger.exception(f"Failed to get rooms from Daily API: {e}")
return []
async def main():
"""Test get_recording_s3_url_with_retry with recent rooms."""
logger.info("Starting recording fetch test...")
# Step 1: Get the most recent 100 rooms
logger.info("Fetching recent rooms...")
room_names = await get_recent_rooms(limit=100)
if not room_names:
logger.error("No rooms found. Cannot proceed with test.")
return
logger.info(f"Found {len(room_names)} rooms to check for recordings")
# Call get_recording_s3_url_with_retry on each room concurrently
logger.info(f"Attempting to fetch recordings for {len(room_names)} rooms concurrently...")
# Create tasks for all rooms
tasks = [
get_recording_s3_url_with_retry(room_id=room_name, max_retries=3)
for room_name in room_names
]
# Execute all tasks concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
success_count = 0
not_found_count = 0
failed_count = 0
for i, (room_name, result) in enumerate(zip(room_names, results), 1):
if isinstance(result, Exception):
failed_count += 1
logger.error(f"❌ [{i}/{len(room_names)}] Failed for {room_name}: {result}")
elif isinstance(result, tuple) and len(result) == 2:
recording_url, recording_signed_url = result
if recording_url:
success_count += 1
logger.info(f"✅ [{i}/{len(room_names)}] Found recording for {room_name}")
logger.debug(f" URL: {recording_url[:80]}...")
else:
not_found_count += 1
logger.debug(f" [{i}/{len(room_names)}] No recording for {room_name}")
else:
failed_count += 1
logger.error(f"❌ [{i}/{len(room_names)}] Unexpected result type for {room_name}")
# Summary
logger.info("\n" + "=" * 60)
logger.info("RECORDING FETCH TEST SUMMARY")
logger.info("=" * 60)
logger.info(f"Total rooms checked: {len(room_names)}")
logger.info(f"✅ Recordings found: {success_count}")
logger.info(f" No recordings: {not_found_count}")
logger.info(f"❌ Failed: {failed_count}")
logger.info(f"Success rate: {(success_count / len(room_names) * 100):.2f}%")
logger.info("=" * 60)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -38,7 +38,6 @@ dependencies = [
# Pinning numba to resolve package dependencies
"numba==0.61.2",
"wait_for2>=0.4.1; python_version<'3.12'",
"pipecat-ai-cli"
]
[project.urls]
@@ -56,7 +55,7 @@ azure = [ "azure-cognitiveservices-speech~=1.42.0"]
cartesia = [ "cartesia~=2.0.3", "pipecat-ai[websockets-base]" ]
cerebras = []
deepseek = []
daily = [ "daily-python~=0.20.0" ]
daily = [ "daily-python~=0.19.9" ]
deepgram = [ "deepgram-sdk~=4.7.0" ]
elevenlabs = [ "pipecat-ai[websockets-base]" ]
fal = [ "fal-client~=0.5.9" ]

View File

@@ -6,47 +6,13 @@
"""AWS Nova Sonic LLM adapter for Pipecat."""
import copy
import json
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, List, Optional, TypedDict
from loguru import logger
from typing import Any, Dict, List, TypedDict
from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.processors.aggregators.llm_context import LLMContext, LLMContextMessage
class Role(Enum):
"""Roles supported in AWS Nova Sonic conversations.
Parameters:
SYSTEM: System-level messages (not used in conversation history).
USER: Messages sent by the user.
ASSISTANT: Messages sent by the assistant.
TOOL: Messages sent by tools (not used in conversation history).
"""
SYSTEM = "SYSTEM"
USER = "USER"
ASSISTANT = "ASSISTANT"
TOOL = "TOOL"
@dataclass
class AWSNovaSonicConversationHistoryMessage:
"""A single message in AWS Nova Sonic conversation history.
Parameters:
role: The role of the message sender (USER or ASSISTANT only).
text: The text content of the message.
"""
role: Role # only USER and ASSISTANT
text: str
from pipecat.processors.aggregators.llm_context import LLMContext
class AWSNovaSonicLLMInvocationParams(TypedDict):
@@ -55,9 +21,7 @@ class AWSNovaSonicLLMInvocationParams(TypedDict):
This is a placeholder until support for universal LLMContext machinery is added for AWS Nova Sonic.
"""
system_instruction: Optional[str]
messages: List[AWSNovaSonicConversationHistoryMessage]
tools: List[Dict[str, Any]]
pass
class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
@@ -70,7 +34,7 @@ class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
@property
def id_for_llm_specific_messages(self) -> str:
"""Get the identifier used in LLMSpecificMessage instances for AWS Nova Sonic."""
return "aws-nova-sonic"
raise NotImplementedError("Universal LLMContext is not yet supported for AWS Nova Sonic.")
def get_llm_invocation_params(self, context: LLMContext) -> AWSNovaSonicLLMInvocationParams:
"""Get AWS Nova Sonic-specific LLM invocation parameters from a universal LLM context.
@@ -83,13 +47,7 @@ class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
Returns:
Dictionary of parameters for invoking AWS Nova Sonic's LLM API.
"""
messages = self._from_universal_context_messages(self.get_messages(context))
return {
"system_instruction": messages.system_instruction,
"messages": messages.messages,
# NOTE: LLMContext's tools are guaranteed to be a ToolsSchema (or NOT_GIVEN)
"tools": self.from_standard_tools(context.tools) or [],
}
raise NotImplementedError("Universal LLMContext is not yet supported for AWS Nova Sonic.")
def get_messages_for_logging(self, context) -> List[Dict[str, Any]]:
"""Get messages from a universal LLM context in a format ready for logging about AWS Nova Sonic.
@@ -104,75 +62,7 @@ class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
Returns:
List of messages in a format ready for logging about AWS Nova Sonic.
"""
return self._from_universal_context_messages(self.get_messages(context)).messages
@dataclass
class ConvertedMessages:
"""Container for Google-formatted messages converted from universal context."""
messages: List[AWSNovaSonicConversationHistoryMessage]
system_instruction: Optional[str] = None
def _from_universal_context_messages(
self, universal_context_messages: List[LLMContextMessage]
) -> ConvertedMessages:
system_instruction = None
messages = []
# Bail if there are no messages
if not universal_context_messages:
return self.ConvertedMessages()
universal_context_messages = copy.deepcopy(universal_context_messages)
# If we have a "system" message as our first message, let's pull that out into "instruction"
if universal_context_messages[0].get("role") == "system":
system = universal_context_messages.pop(0)
content = system.get("content")
if isinstance(content, str):
system_instruction = content
elif isinstance(content, list):
system_instruction = content[0].get("text")
if system_instruction:
self._system_instruction = system_instruction
# Process remaining messages to fill out conversation history.
# Nova Sonic supports "user" and "assistant" messages in history.
for universal_context_message in universal_context_messages:
message = self._from_universal_context_message(universal_context_message)
if message:
messages.append(message)
return self.ConvertedMessages(messages=messages, system_instruction=system_instruction)
def _from_universal_context_message(self, message) -> AWSNovaSonicConversationHistoryMessage:
"""Convert standard message format to Nova Sonic format.
Args:
message: Standard message dictionary to convert.
Returns:
Nova Sonic conversation history message, or None if not convertible.
"""
role = message.get("role")
if message.get("role") == "user" or message.get("role") == "assistant":
content = message.get("content")
if isinstance(message.get("content"), list):
content = ""
for c in message.get("content"):
if c.get("type") == "text":
content += " " + c.get("text")
else:
logger.error(
f"Unhandled content type in context message: {c.get('type')} - {message}"
)
# There won't be content if this is an assistant tool call entry.
# We're ignoring those since they can't be loaded into AWS Nova Sonic conversation
# history
if content:
return AWSNovaSonicConversationHistoryMessage(role=Role[role.upper()], text=content)
# NOTE: we're ignoring messages with role "tool" since they can't be loaded into AWS Nova
# Sonic conversation history
raise NotImplementedError("Universal LLMContext is not yet supported for AWS Nova Sonic.")
@staticmethod
def _to_aws_nova_sonic_function_format(function: FunctionSchema) -> Dict[str, Any]:

View File

@@ -15,10 +15,9 @@ service-specific adapter.
"""
import base64
import copy
import io
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, List, Optional, TypeAlias, Union
from typing import Any, List, Optional, TypeAlias, Union
from loguru import logger
from openai._types import NOT_GIVEN as OPEN_AI_NOT_GIVEN
@@ -32,9 +31,6 @@ from PIL import Image
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.frames.frames import AudioRawFrame
if TYPE_CHECKING:
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
# "Re-export" types from OpenAI that we're using as universal context types.
# NOTE: if universal message types need to someday diverge from OpenAI's, we
# should consider managing our own definitions. But we should do so carefully,
@@ -69,26 +65,6 @@ class LLMContext:
and content formatting.
"""
@staticmethod
def from_openai_context(openai_context: "OpenAILLMContext") -> "LLMContext":
"""Create a universal LLM context from an OpenAI-specific context.
NOTE: this should only be used internally, for facilitating migration
from OpenAILLMContext to LLMContext. New user code should use
LLMContext directly.
Args:
openai_context: The OpenAI LLM context to convert.
Returns:
New LLMContext instance with converted messages and settings.
"""
return LLMContext(
messages=openai_context.get_messages(),
tools=openai_context.tools,
tool_choice=openai_context.tool_choice,
)
def __init__(
self,
messages: Optional[List[LLMContextMessage]] = None,

View File

@@ -8,80 +8,360 @@
This module provides specialized context aggregators and message handling for AWS Nova Sonic,
including conversation history management and role-specific message processing.
.. deprecated:: 0.0.91
AWS Nova Sonic now supports `LLMContext` and `LLMContextAggregatorPair`.
Using the new patterns should allow you to not need types from this module.
BEFORE:
```
# Setup
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
# Context frame type
frame: OpenAILLMContextFrame
# Context type
context: AWSNovaSonicLLMContext
# or
context: OpenAILLMContext
# Reading messages from context
messages = context.messages
```
AFTER:
```
# Setup
context = LLMContext(messages, tools)
context_aggregator = LLMContextAggregatorPair(context)
# Context frame type
frame: LLMContextFrame
# Context type
context: LLMContext
# Reading messages from context
messages = context.get_messages()
```
"""
import warnings
import copy
from dataclasses import dataclass, field
from enum import Enum
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Types in pipecat.services.aws.nova_sonic.context are deprecated. \n"
"AWS Nova Sonic now supports `LLMContext` and `LLMContextAggregatorPair`. \n"
"Using the new patterns should allow you to not need types from this module.\n\n"
"BEFORE:\n"
"```\n"
"# Setup\n"
"context = OpenAILLMContext(messages, tools)\n"
"context_aggregator = llm.create_context_aggregator(context)\n\n"
"# Context frame type\n"
"frame: OpenAILLMContextFrame\n\n"
"# Context type\n"
"context: AWSNovaSonicLLMContext\n"
"# or\n"
"context: OpenAILLMContext\n\n"
"# Reading messages from context\n"
"messages = context.messages\n"
"```\n\n"
"AFTER:\n"
"```\n"
"# Setup\n"
"context = LLMContext(messages, tools)\n"
"context_aggregator = LLMContextAggregatorPair(context)\n\n"
"# Context frame type\n"
"frame: LLMContextFrame\n\n"
"# Context type\n"
"context: LLMContext\n\n"
"# Reading messages from context\n"
"messages = context.messages\n"
"```",
DeprecationWarning,
stacklevel=2,
)
from loguru import logger
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
DataFrame,
Frame,
FunctionCallResultFrame,
InterruptionFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMessagesAppendFrame,
LLMMessagesUpdateFrame,
LLMSetToolChoiceFrame,
LLMSetToolsFrame,
TextFrame,
UserImageRawFrame,
)
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.aws.nova_sonic.frames import AWSNovaSonicFunctionCallResultFrame
from pipecat.services.openai.llm import (
OpenAIAssistantContextAggregator,
OpenAIUserContextAggregator,
)
class Role(Enum):
"""Roles supported in AWS Nova Sonic conversations.
Parameters:
SYSTEM: System-level messages (not used in conversation history).
USER: Messages sent by the user.
ASSISTANT: Messages sent by the assistant.
TOOL: Messages sent by tools (not used in conversation history).
"""
SYSTEM = "SYSTEM"
USER = "USER"
ASSISTANT = "ASSISTANT"
TOOL = "TOOL"
@dataclass
class AWSNovaSonicConversationHistoryMessage:
"""A single message in AWS Nova Sonic conversation history.
Parameters:
role: The role of the message sender (USER or ASSISTANT only).
text: The text content of the message.
"""
role: Role # only USER and ASSISTANT
text: str
@dataclass
class AWSNovaSonicConversationHistory:
"""Complete conversation history for AWS Nova Sonic initialization.
Parameters:
system_instruction: System-level instruction for the conversation.
messages: List of conversation messages between user and assistant.
"""
system_instruction: str = None
messages: list[AWSNovaSonicConversationHistoryMessage] = field(default_factory=list)
class AWSNovaSonicLLMContext(OpenAILLMContext):
"""Specialized LLM context for AWS Nova Sonic service.
Extends OpenAI context with Nova Sonic-specific message handling,
conversation history management, and text buffering capabilities.
"""
def __init__(self, messages=None, tools=None, **kwargs):
"""Initialize AWS Nova Sonic LLM context.
Args:
messages: Initial messages for the context.
tools: Available tools for the context.
**kwargs: Additional arguments passed to parent class.
"""
super().__init__(messages=messages, tools=tools, **kwargs)
self.__setup_local()
def __setup_local(self, system_instruction: str = ""):
self._assistant_text = ""
self._user_text = ""
self._system_instruction = system_instruction
@staticmethod
def upgrade_to_nova_sonic(
obj: OpenAILLMContext, system_instruction: str
) -> "AWSNovaSonicLLMContext":
"""Upgrade an OpenAI context to AWS Nova Sonic context.
Args:
obj: The OpenAI context to upgrade.
system_instruction: System instruction for the context.
Returns:
The upgraded AWS Nova Sonic context.
"""
if isinstance(obj, OpenAILLMContext) and not isinstance(obj, AWSNovaSonicLLMContext):
obj.__class__ = AWSNovaSonicLLMContext
obj.__setup_local(system_instruction)
return obj
# NOTE: this method has the side-effect of updating _system_instruction from messages
def get_messages_for_initializing_history(self) -> AWSNovaSonicConversationHistory:
"""Get conversation history for initializing AWS Nova Sonic session.
Processes stored messages and extracts system instruction and conversation
history in the format expected by AWS Nova Sonic.
Returns:
Formatted conversation history with system instruction and messages.
"""
history = AWSNovaSonicConversationHistory(system_instruction=self._system_instruction)
# Bail if there are no messages
if not self.messages:
return history
messages = copy.deepcopy(self.messages)
# If we have a "system" message as our first message, let's pull that out into "instruction"
if messages[0].get("role") == "system":
system = messages.pop(0)
content = system.get("content")
if isinstance(content, str):
history.system_instruction = content
elif isinstance(content, list):
history.system_instruction = content[0].get("text")
if history.system_instruction:
self._system_instruction = history.system_instruction
# Process remaining messages to fill out conversation history.
# Nova Sonic supports "user" and "assistant" messages in history.
for message in messages:
history_message = self.from_standard_message(message)
if history_message:
history.messages.append(history_message)
return history
def get_messages_for_persistent_storage(self):
"""Get messages formatted for persistent storage.
Returns:
List of messages including system instruction if present.
"""
messages = super().get_messages_for_persistent_storage()
# If we have a system instruction and messages doesn't already contain it, add it
if self._system_instruction and not (messages and messages[0].get("role") == "system"):
messages.insert(0, {"role": "system", "content": self._system_instruction})
return messages
def from_standard_message(self, message) -> AWSNovaSonicConversationHistoryMessage:
"""Convert standard message format to Nova Sonic format.
Args:
message: Standard message dictionary to convert.
Returns:
Nova Sonic conversation history message, or None if not convertible.
"""
role = message.get("role")
if message.get("role") == "user" or message.get("role") == "assistant":
content = message.get("content")
if isinstance(message.get("content"), list):
content = ""
for c in message.get("content"):
if c.get("type") == "text":
content += " " + c.get("text")
else:
logger.error(
f"Unhandled content type in context message: {c.get('type')} - {message}"
)
# There won't be content if this is an assistant tool call entry.
# We're ignoring those since they can't be loaded into AWS Nova Sonic conversation
# history
if content:
return AWSNovaSonicConversationHistoryMessage(role=Role[role.upper()], text=content)
# NOTE: we're ignoring messages with role "tool" since they can't be loaded into AWS Nova
# Sonic conversation history
def buffer_user_text(self, text):
"""Buffer user text for later flushing to context.
Args:
text: User text to buffer.
"""
self._user_text += f" {text}" if self._user_text else text
# logger.debug(f"User text buffered: {self._user_text}")
def flush_aggregated_user_text(self) -> str:
"""Flush buffered user text to context as a complete message.
Returns:
The flushed user text, or empty string if no text was buffered.
"""
if not self._user_text:
return ""
user_text = self._user_text
message = {
"role": "user",
"content": [{"type": "text", "text": user_text}],
}
self._user_text = ""
self.add_message(message)
# logger.debug(f"Context updated (user): {self.get_messages_for_logging()}")
return user_text
def buffer_assistant_text(self, text):
"""Buffer assistant text for later flushing to context.
Args:
text: Assistant text to buffer.
"""
self._assistant_text += text
# logger.debug(f"Assistant text buffered: {self._assistant_text}")
def flush_aggregated_assistant_text(self):
"""Flush buffered assistant text to context as a complete message."""
if not self._assistant_text:
return
message = {
"role": "assistant",
"content": [{"type": "text", "text": self._assistant_text}],
}
self._assistant_text = ""
self.add_message(message)
# logger.debug(f"Context updated (assistant): {self.get_messages_for_logging()}")
@dataclass
class AWSNovaSonicMessagesUpdateFrame(DataFrame):
"""Frame containing updated AWS Nova Sonic context.
Parameters:
context: The updated AWS Nova Sonic LLM context.
"""
context: AWSNovaSonicLLMContext
class AWSNovaSonicUserContextAggregator(OpenAIUserContextAggregator):
"""Context aggregator for user messages in AWS Nova Sonic conversations.
Extends the OpenAI user context aggregator to emit Nova Sonic-specific
context update frames.
"""
async def process_frame(
self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM
):
"""Process frames and emit Nova Sonic-specific context updates.
Args:
frame: The frame to process.
direction: The direction the frame is traveling.
"""
await super().process_frame(frame, direction)
# Parent does not push LLMMessagesUpdateFrame
if isinstance(frame, LLMMessagesUpdateFrame):
await self.push_frame(AWSNovaSonicMessagesUpdateFrame(context=self._context))
class AWSNovaSonicAssistantContextAggregator(OpenAIAssistantContextAggregator):
"""Context aggregator for assistant messages in AWS Nova Sonic conversations.
Provides specialized handling for assistant responses and function calls
in AWS Nova Sonic context, with custom frame processing logic.
"""
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames with Nova Sonic-specific logic.
Args:
frame: The frame to process.
direction: The direction the frame is traveling.
"""
# HACK: For now, disable the context aggregator by making it just pass through all frames
# that the parent handles (except the function call stuff, which we still need).
# For an explanation of this hack, see
# AWSNovaSonicLLMService._report_assistant_response_text_added.
if isinstance(
frame,
(
InterruptionFrame,
LLMFullResponseStartFrame,
LLMFullResponseEndFrame,
TextFrame,
LLMMessagesAppendFrame,
LLMMessagesUpdateFrame,
LLMSetToolsFrame,
LLMSetToolChoiceFrame,
UserImageRawFrame,
BotStoppedSpeakingFrame,
),
):
await self.push_frame(frame, direction)
else:
await super().process_frame(frame, direction)
async def handle_function_call_result(self, frame: FunctionCallResultFrame):
"""Handle function call results for AWS Nova Sonic.
Args:
frame: The function call result frame to handle.
"""
await super().handle_function_call_result(frame)
# The standard function callback code path pushes the FunctionCallResultFrame from the LLM
# itself, so we didn't have a chance to add the result to the AWS Nova Sonic server-side
# context. Let's push a special frame to do that.
await self.push_frame(
AWSNovaSonicFunctionCallResultFrame(result_frame=frame), FrameDirection.UPSTREAM
)
@dataclass
class AWSNovaSonicContextAggregatorPair:
"""Pair of user and assistant context aggregators for AWS Nova Sonic.
Parameters:
_user: The user context aggregator.
_assistant: The assistant context aggregator.
"""
_user: AWSNovaSonicUserContextAggregator
_assistant: AWSNovaSonicAssistantContextAggregator
def user(self) -> AWSNovaSonicUserContextAggregator:
"""Get the user context aggregator.
Returns:
The user context aggregator instance.
"""
return self._user
def assistant(self) -> AWSNovaSonicAssistantContextAggregator:
"""Get the assistant context aggregator.
Returns:
The assistant context aggregator instance.
"""
return self._assistant

View File

@@ -25,7 +25,7 @@ from loguru import logger
from pydantic import BaseModel, Field
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.adapters.services.aws_nova_sonic_adapter import AWSNovaSonicLLMAdapter, Role
from pipecat.adapters.services.aws_nova_sonic_adapter import AWSNovaSonicLLMAdapter
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
CancelFrame,
@@ -33,30 +33,35 @@ from pipecat.frames.frames import (
Frame,
FunctionCallFromLLM,
InputAudioRawFrame,
InterruptionFrame,
InterimTranscriptionFrame,
LLMContextFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMTextFrame,
StartFrame,
TranscriptionFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
TTSTextFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response import (
LLMAssistantAggregatorParams,
LLMUserAggregatorParams,
)
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.aws.nova_sonic.context import (
AWSNovaSonicAssistantContextAggregator,
AWSNovaSonicContextAggregatorPair,
AWSNovaSonicLLMContext,
AWSNovaSonicUserContextAggregator,
Role,
)
from pipecat.services.aws.nova_sonic.frames import AWSNovaSonicFunctionCallResultFrame
from pipecat.services.llm_service import LLMService
from pipecat.utils.time import time_now_iso8601
@@ -212,11 +217,6 @@ class AWSNovaSonicLLMService(LLMService):
system_instruction: System-level instruction for the model.
tools: Available tools/functions for the model to use.
send_transcription_frames: Whether to emit transcription frames.
.. deprecated:: 0.0.91
This parameter is deprecated and will be removed in a future version.
Transcription frames are always sent.
**kwargs: Additional arguments passed to the parent LLMService.
"""
super().__init__(**kwargs)
@@ -230,20 +230,8 @@ class AWSNovaSonicLLMService(LLMService):
self._params = params or Params()
self._system_instruction = system_instruction
self._tools = tools
if not send_transcription_frames:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"`send_transcription_frames` is deprecated and will be removed in a future version. "
"Transcription frames are always sent.",
DeprecationWarning,
stacklevel=2,
)
self._context: Optional[LLMContext] = None
self._send_transcription_frames = send_transcription_frames
self._context: Optional[AWSNovaSonicLLMContext] = None
self._stream: Optional[
DuplexEventStream[
InvokeModelWithBidirectionalStreamInput,
@@ -256,17 +244,12 @@ class AWSNovaSonicLLMService(LLMService):
self._input_audio_content_name: Optional[str] = None
self._content_being_received: Optional[CurrentContent] = None
self._assistant_is_responding = False
self._may_need_repush_assistant_text = False
self._ready_to_send_context = False
self._handling_bot_stopped_speaking = False
self._triggering_assistant_response = False
self._waiting_for_trigger_transcription = False
self._disconnecting = False
self._connected_time: Optional[float] = None
self._wants_connection = False
self._user_text_buffer = ""
self._assistant_text_buffer = ""
self._completed_tool_calls = set()
file_path = files("pipecat.services.aws.nova_sonic").joinpath("ready.wav")
with wave.open(file_path.open("rb"), "rb") as wav_file:
@@ -319,12 +302,12 @@ class AWSNovaSonicLLMService(LLMService):
logger.debug("Resetting conversation")
await self._handle_bot_stopped_speaking(delay_to_catch_trailing_assistant_text=False)
# Grab context to carry through disconnect/reconnect
# Carry over previous context through disconnect
context = self._context
await self._disconnect()
self._context = context
await self._start_connecting()
await self._handle_context(context)
#
# frame processing
@@ -339,35 +322,28 @@ class AWSNovaSonicLLMService(LLMService):
"""
await super().process_frame(frame, direction)
if isinstance(frame, (LLMContextFrame, OpenAILLMContextFrame)):
context = (
frame.context
if isinstance(frame, LLMContextFrame)
else LLMContext.from_openai_context(frame.context)
if isinstance(frame, OpenAILLMContextFrame):
await self._handle_context(frame.context)
elif isinstance(frame, LLMContextFrame):
raise NotImplementedError(
"Universal LLMContext is not yet supported for AWS Nova Sonic."
)
await self._handle_context(context)
elif isinstance(frame, InputAudioRawFrame):
await self._handle_input_audio_frame(frame)
elif isinstance(frame, BotStoppedSpeakingFrame):
await self._handle_bot_stopped_speaking(delay_to_catch_trailing_assistant_text=True)
elif isinstance(frame, InterruptionFrame):
await self._handle_interruption_frame()
elif isinstance(frame, AWSNovaSonicFunctionCallResultFrame):
await self._handle_function_call_result(frame)
await self.push_frame(frame, direction)
async def _handle_context(self, context: LLMContext):
if self._disconnecting:
return
async def _handle_context(self, context: OpenAILLMContext):
if not self._context:
# We got our initial context
# Try to finish connecting
self._context = context
# We got our initial context - try to finish connecting
self._context = AWSNovaSonicLLMContext.upgrade_to_nova_sonic(
context, self._system_instruction
)
await self._finish_connecting_if_context_available()
else:
# We got an updated context
# Send results for any newly-completed function calls
await self._process_completed_function_calls(send_new_results=True)
async def _handle_input_audio_frame(self, frame: InputAudioRawFrame):
# Wait until we're done sending the assistant response trigger audio before sending audio
@@ -417,9 +393,9 @@ class AWSNovaSonicLLMService(LLMService):
else:
await finalize_assistant_response()
async def _handle_interruption_frame(self):
if self._assistant_is_responding:
self._may_need_repush_assistant_text = True
async def _handle_function_call_result(self, frame: AWSNovaSonicFunctionCallResultFrame):
result = frame.result_frame
await self._send_tool_result(tool_call_id=result.tool_call_id, result=result.result)
#
# LLM communication: lifecycle
@@ -455,17 +431,6 @@ class AWSNovaSonicLLMService(LLMService):
logger.error(f"{self} initialization error: {e}")
await self._disconnect()
async def _process_completed_function_calls(self, send_new_results: bool):
# Check for set of completed function calls in the context
for message in self._context.get_messages():
if message.get("role") and message.get("content") != "IN_PROGRESS":
tool_call_id = message.get("tool_call_id")
if tool_call_id and tool_call_id not in self._completed_tool_calls:
# Found a newly-completed function call - send the result to the service
if send_new_results:
await self._send_tool_result(tool_call_id, message.get("content"))
self._completed_tool_calls.add(tool_call_id)
async def _finish_connecting_if_context_available(self):
# We can only finish connecting once we've gotten our initial context and we're ready to
# send it
@@ -474,38 +439,30 @@ class AWSNovaSonicLLMService(LLMService):
logger.info("Finishing connecting (setting up session)...")
# Initialize our bookkeeping of already-completed tool calls in the
# context
await self._process_completed_function_calls(send_new_results=False)
# Read context
adapter: AWSNovaSonicLLMAdapter = self.get_llm_adapter()
llm_connection_params = adapter.get_llm_invocation_params(self._context)
history = self._context.get_messages_for_initializing_history()
# Send prompt start event, specifying tools.
# Tools from context take priority over self._tools.
tools = (
llm_connection_params["tools"]
if llm_connection_params["tools"]
else adapter.from_standard_tools(self._tools)
self._context.tools
if self._context.tools
else self.get_llm_adapter().from_standard_tools(self._tools)
)
logger.debug(f"Using tools: {tools}")
await self._send_prompt_start_event(tools)
# Send system instruction.
# Instruction from context takes priority over self._system_instruction.
system_instruction = (
llm_connection_params["system_instruction"]
if llm_connection_params["system_instruction"]
else self._system_instruction
)
logger.debug(f"Using system instruction: {system_instruction}")
if system_instruction:
await self._send_text_event(text=system_instruction, role=Role.SYSTEM)
# (NOTE: this prioritizing occurred automatically behind the scenes: the context was
# initialized with self._system_instruction and then updated itself from its messages when
# get_messages_for_initializing_history() was called).
logger.debug(f"Using system instruction: {history.system_instruction}")
if history.system_instruction:
await self._send_text_event(text=history.system_instruction, role=Role.SYSTEM)
# Send conversation history
for message in llm_connection_params["messages"]:
# logger.debug(f"Seeding conversation history with message: {message}")
for message in history.messages:
await self._send_text_event(text=message.text, role=message.role)
# Start audio input
@@ -535,12 +492,9 @@ class AWSNovaSonicLLMService(LLMService):
await self._send_session_end_events()
self._client = None
# Clean up context
self._context = None
# Clean up stream
if self._stream:
await self._stream.close()
await self._stream.input_stream.close()
self._stream = None
# NOTE: see explanation of HACK, below
@@ -556,23 +510,15 @@ class AWSNovaSonicLLMService(LLMService):
self._receive_task = None
# Reset remaining connection-specific state
# Should be all private state except:
# - _wants_connection
# - _assistant_response_trigger_audio
self._prompt_name = None
self._input_audio_content_name = None
self._content_being_received = None
self._assistant_is_responding = False
self._may_need_repush_assistant_text = False
self._ready_to_send_context = False
self._handling_bot_stopped_speaking = False
self._triggering_assistant_response = False
self._waiting_for_trigger_transcription = False
self._disconnecting = False
self._connected_time = None
self._user_text_buffer = ""
self._assistant_text_buffer = ""
self._completed_tool_calls = set()
logger.info("Finished disconnecting")
except Exception as e:
@@ -880,10 +826,6 @@ class AWSNovaSonicLLMService(LLMService):
# Handle the LLM completion ending
await self._handle_completion_end_event(event_json)
except Exception as e:
if self._disconnecting:
# Errors are kind of expected while disconnecting, so just
# ignore them and do nothing
return
logger.error(f"{self} error processing responses: {e}")
if self._wants_connection:
await self.reset_conversation()
@@ -1014,7 +956,7 @@ class AWSNovaSonicLLMService(LLMService):
async def _report_assistant_response_started(self):
logger.debug("Assistant response started")
# Report the start of the assistant response.
# Report that the assistant has started their response.
await self.push_frame(LLMFullResponseStartFrame())
# Report that equivalent of TTS (this is a speech-to-speech model) started
@@ -1026,16 +968,23 @@ class AWSNovaSonicLLMService(LLMService):
logger.debug(f"Assistant response text added: {text}")
# Report the text of the assistant response.
# Report some text added to the ongoing assistant response
await self.push_frame(LLMTextFrame(text))
# Report some text added to the *equivalent* of TTS (this is a speech-to-speech model)
await self.push_frame(TTSTextFrame(text))
# HACK: here we're also buffering the assistant text ourselves as a
# backup rather than relying solely on the assistant context aggregator
# to do it, because the text arrives from Nova Sonic only after all the
# assistant audio frames have been pushed, meaning that if an
# interruption frame were to arrive we would lose all of it (the text
# frames sitting in the queue would be wiped).
self._assistant_text_buffer += text
# TODO: this is a (hopefully temporary) HACK. Here we directly manipulate the context rather
# than relying on the frames pushed to the assistant context aggregator. The pattern of
# receiving full-sentence text after the assistant has spoken does not easily fit with the
# Pipecat expectation of chunks of text streaming in while the assistant is speaking.
# Interruption handling was especially challenging. Rather than spend days trying to fit a
# square peg in a round hole, I decided on this hack for the time being. We can most cleanly
# abandon this hack if/when AWS Nova Sonic implements streaming smaller text chunks
# interspersed with audio. Note that when we move away from this hack, we need to make sure
# that on an interruption we avoid sending LLMFullResponseEndFrame, which gets the
# LLMAssistantContextAggregator into a bad state.
self._context.buffer_assistant_text(text)
async def _report_assistant_response_ended(self):
if not self._context: # should never happen
@@ -1043,34 +992,14 @@ class AWSNovaSonicLLMService(LLMService):
logger.debug("Assistant response ended")
# If an interruption frame arrived while the assistant was responding
# we may have lost all of the assistant text (see HACK, above), so
# re-push it downstream to the aggregator now.
if self._may_need_repush_assistant_text:
# Just in case, check that assistant text hasn't already made it
# into the context (sometimes it does, despite the interruption).
messages = self._context.get_messages()
last_message = messages[-1] if messages else None
if (
not last_message
or last_message.get("role") != "assistant"
or last_message.get("content") != self._assistant_text_buffer
):
# We also need to re-push the LLMFullResponseStartFrame since the
# TTSTextFrame would be ignored otherwise (the interruption frame
# would have cleared the assistant aggregator state).
await self.push_frame(LLMFullResponseStartFrame())
await self.push_frame(TTSTextFrame(self._assistant_text_buffer))
self._may_need_repush_assistant_text = False
# Report the end of the assistant response.
# Report that the assistant has finished their response.
await self.push_frame(LLMFullResponseEndFrame())
# Report that equivalent of TTS (this is a speech-to-speech model) stopped.
await self.push_frame(TTSStoppedFrame())
# Clear out the buffered assistant text
self._assistant_text_buffer = ""
# For an explanation of this hack, see _report_assistant_response_text_added.
self._context.flush_aggregated_assistant_text()
#
# user transcription reporting
@@ -1087,67 +1016,33 @@ class AWSNovaSonicLLMService(LLMService):
logger.debug(f"User transcription text added: {text}")
# HACK: here we're buffering the user text ourselves rather than
# relying on the upstream user context aggregator to do it, because the
# text arrives in fairly large chunks spaced fairly far apart in time.
# That means the user text would be split between different messages in
# context. Even if we sent placeholder InterimTranscriptionFrames in
# between each TranscriptionFrame to tell the aggregator to hold off on
# finalizing the user message, the aggregator would likely get the last
# chunk too late.
self._user_text_buffer += f" {text}" if self._user_text_buffer else text
# Manually add new user transcription text to context.
# We can't rely on the user context aggregator to do this since it's upstream from the LLM.
self._context.buffer_user_text(text)
# Report that some new user transcription text is available.
if self._send_transcription_frames:
await self.push_frame(
InterimTranscriptionFrame(text=text, user_id="", timestamp=time_now_iso8601())
)
async def _report_user_transcription_ended(self):
if not self._context: # should never happen
return
# Manually add user transcription to context (if any has been buffered).
# We can't rely on the user context aggregator to do this since it's upstream from the LLM.
transcription = self._context.flush_aggregated_user_text()
if not transcription:
return
logger.debug(f"User transcription ended")
# Report to the upstream user context aggregator that some new user
# transcription text is available.
# HACK: Check if this transcription was triggered by our own
# assistant response trigger. If so, we need to wrap it with
# UserStarted/StoppedSpeakingFrames; otherwise the user aggregator
# would fire an EmulatedUserStartedSpeakingFrame, which would
# trigger an interruption, which would prevent us from writing the
# assistant response to context.
#
# Sending an EmulateUserStartedSpeakingFrame ourselves doesn't
# work: it just causes the interruption we're trying to avoid.
#
# Setting enable_emulated_vad_interruptions also doesn't work: at
# the time the user aggregator receives the TranscriptionFrame, it
# doesn't yet know the assistant has started responding, so it
# doesn't know that emulating the user starting to speak would
# cause an interruption.
should_wrap_in_user_started_stopped_speaking_frames = (
self._waiting_for_trigger_transcription
and self._user_text_buffer.strip().lower() == "ready"
)
# Start wrapping the upstream transcription in UserStarted/StoppedSpeakingFrames if needed
if should_wrap_in_user_started_stopped_speaking_frames:
logger.debug(
"Wrapping assistant response trigger transcription with upstream UserStarted/StoppedSpeakingFrames"
if self._send_transcription_frames:
await self.push_frame(
TranscriptionFrame(text=transcription, user_id="", timestamp=time_now_iso8601())
)
await self.push_frame(UserStartedSpeakingFrame(), direction=FrameDirection.UPSTREAM)
# Send the transcription upstream for the user context aggregator
frame = TranscriptionFrame(
text=self._user_text_buffer, user_id="", timestamp=time_now_iso8601()
)
await self.push_frame(frame, direction=FrameDirection.UPSTREAM)
# Finish wrapping the upstream transcription in UserStarted/StoppedSpeakingFrames if needed
if should_wrap_in_user_started_stopped_speaking_frames:
await self.push_frame(UserStoppedSpeakingFrame(), direction=FrameDirection.UPSTREAM)
# Clear out the buffered user text
self._user_text_buffer = ""
# We're no longer waiting for a trigger transcription
self._waiting_for_trigger_transcription = False
#
# context
@@ -1159,26 +1054,23 @@ class AWSNovaSonicLLMService(LLMService):
*,
user_params: LLMUserAggregatorParams = LLMUserAggregatorParams(),
assistant_params: LLMAssistantAggregatorParams = LLMAssistantAggregatorParams(),
) -> LLMContextAggregatorPair:
) -> AWSNovaSonicContextAggregatorPair:
"""Create context aggregator pair for managing conversation context.
NOTE: this method exists only for backward compatibility. New code
should instead do:
context = LLMContext(...)
context_aggregator = LLMContextAggregatorPair(context)
Args:
context: The OpenAI LLM context.
context: The OpenAI LLM context to upgrade.
user_params: Parameters for the user context aggregator.
assistant_params: Parameters for the assistant context aggregator.
Returns:
A pair of user and assistant context aggregators.
"""
context = LLMContext.from_openai_context(context)
return LLMContextAggregatorPair(
context, user_params=user_params, assistant_params=assistant_params
)
context.set_llm_adapter(self.get_llm_adapter())
user = AWSNovaSonicUserContextAggregator(context=context, params=user_params)
assistant = AWSNovaSonicAssistantContextAggregator(context=context, params=assistant_params)
return AWSNovaSonicContextAggregatorPair(user, assistant)
#
# assistant response trigger (HACK)
@@ -1216,8 +1108,6 @@ class AWSNovaSonicLLMService(LLMService):
try:
logger.debug("Sending assistant response trigger...")
self._waiting_for_trigger_transcription = True
chunk_duration = 0.02 # what we might get from InputAudioRawFrame
chunk_size = int(
chunk_duration

View File

@@ -8,80 +8,18 @@
This module provides specialized context aggregators and message handling for AWS Nova Sonic,
including conversation history management and role-specific message processing.
.. deprecated:: 0.0.91
AWS Nova Sonic now supports `LLMContext` and `LLMContextAggregatorPair`.
Using the new patterns should allow you to not need types from this module.
BEFORE:
```
# Setup
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
# Context frame type
frame: OpenAILLMContextFrame
# Context type
context: AWSNovaSonicLLMContext
# or
context: OpenAILLMContext
# Reading messages from context
messages = context.messages
```
AFTER:
```
# Setup
context = LLMContext(messages, tools)
context_aggregator = LLMContextAggregatorPair(context)
# Context frame type
frame: LLMContextFrame
# Context type
context: LLMContext
# Reading messages from context
messages = context.get_messages()
```
"""
import warnings
from pipecat.services.aws.nova_sonic.context import *
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Types in pipecat.services.aws_nova_sonic.context are deprecated. \n"
"AWS Nova Sonic now supports `LLMContext` and `LLMContextAggregatorPair`. \n"
"Using the new patterns should allow you to not need types from this module.\n\n"
"BEFORE:\n"
"```\n"
"# Setup\n"
"context = OpenAILLMContext(messages, tools)\n"
"context_aggregator = llm.create_context_aggregator(context)\n\n"
"# Context frame type\n"
"frame: OpenAILLMContextFrame\n\n"
"# Context type\n"
"context: AWSNovaSonicLLMContext\n"
"# or\n"
"context: OpenAILLMContext\n\n"
"# Reading messages from context\n"
"messages = context.messages\n"
"```\n\n"
"AFTER:\n"
"```\n"
"# Setup\n"
"context = LLMContext(messages, tools)\n"
"context_aggregator = LLMContextAggregatorPair(context)\n\n"
"# Context frame type\n"
"frame: LLMContextFrame\n\n"
"# Context type\n"
"context: LLMContext\n\n"
"# Reading messages from context\n"
"messages = context.messages\n"
"```",
"Types in pipecat.services.aws_nova_sonic.context are deprecated. "
"Please use the equivalent types from "
"pipecat.services.aws.nova_sonic.context instead.",
DeprecationWarning,
stacklevel=2,
)

8275
uv.lock generated

File diff suppressed because it is too large Load Diff