Compare commits
6 Commits
mb/cli
...
hush/backo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d41ee5319e | ||
|
|
45256903b5 | ||
|
|
d37e63d972 | ||
|
|
013f869259 | ||
|
|
11594003e2 | ||
|
|
2dd2a17b19 |
13
.github/workflows/build.yaml
vendored
13
.github/workflows/build.yaml
vendored
@@ -21,21 +21,20 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0 # Fetch all history for setuptools_scm
|
||||
|
||||
- name: Install uv
|
||||
uses: astral-sh/setup-uv@v3
|
||||
with:
|
||||
version: "latest"
|
||||
|
||||
|
||||
- name: Set up Python
|
||||
run: uv python install 3.10
|
||||
|
||||
|
||||
- name: Install development dependencies
|
||||
run: uv sync --group dev
|
||||
|
||||
|
||||
- name: Build project
|
||||
run: uv build
|
||||
|
||||
|
||||
- name: Install project in editable mode
|
||||
run: uv pip install --editable .
|
||||
run: uv pip install --editable .
|
||||
75
CHANGELOG.md
75
CHANGELOG.md
@@ -9,69 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Added
|
||||
|
||||
- Added the [Pipecat CLI](https://github.com/pipecat-ai/pipecat-cli) to the
|
||||
required dependencies, enabling you to scaffold a new project directly from
|
||||
`pipecat-ai`. Get started with:
|
||||
|
||||
```bash
|
||||
uv run pipecat init
|
||||
```
|
||||
|
||||
- Expanded support for universal `LLMContext` to `AWSNovaSonicLLMService`.
|
||||
As a reminder, the context-setup pattern when using `LLMContext` is:
|
||||
|
||||
```python
|
||||
context = LLMContext(messages, tools)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
```
|
||||
|
||||
(Note that even though `AWSNovaSonicLLMService` now supports the universal
|
||||
`LLMContext`, it is not meant to be swapped out for another LLM service at
|
||||
runtime.)
|
||||
|
||||
Worth noting: whether or not you use the new context-setup pattern with
|
||||
`AWSNovaSonicLLMService`, some types have changed under the hood:
|
||||
|
||||
```python
|
||||
## BEFORE:
|
||||
|
||||
# Context aggregator type
|
||||
context_aggregator: AWSNovaSonicContextAggregatorPair
|
||||
|
||||
# Context frame type
|
||||
frame: OpenAILLMContextFrame
|
||||
|
||||
# Context type
|
||||
context: AWSNovaSonicLLMContext
|
||||
# or
|
||||
context: OpenAILLMContext
|
||||
|
||||
# Reading messages from context
|
||||
messages = context.messages
|
||||
|
||||
## AFTER:
|
||||
|
||||
# Context aggregator type
|
||||
context_aggregator: LLMContextAggregatorPair
|
||||
|
||||
# Context frame type
|
||||
frame: LLMContextFrame
|
||||
|
||||
# Context type
|
||||
context: LLMContext
|
||||
|
||||
# Reading messages from context
|
||||
messages = context.get_messages()
|
||||
```
|
||||
|
||||
- Added support for `bulbul:v3` model in `SarvamTTSService` and
|
||||
`SarvamHttpTTSService`.
|
||||
- Added support for `bulbul:v3` model in `SarvamTTSService` and `SarvamHttpTTSService`.
|
||||
|
||||
- Added `keyterms_prompt` parameter to `AssemblyAIConnectionParams`.
|
||||
|
||||
- Added `speech_model` parameter to `AssemblyAIConnectionParams` to access the
|
||||
multilingual model.
|
||||
|
||||
- Added `speech_model` parameter to `AssemblyAIConnectionParams` to access the multilingual model.
|
||||
-
|
||||
- Added support for trickle ICE to the `SmallWebRTCTransport`.
|
||||
|
||||
- Added support for updating `OpenAITTSService` settings (`instructions` and
|
||||
@@ -97,25 +40,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
- Package upgrades:
|
||||
|
||||
- `daily-python` upgraded to 0.20.0.
|
||||
- `openai` upgraded to support up to 2.x.x.
|
||||
- `openpipe` upgraded to support up to 5.x.x.
|
||||
|
||||
- `SpeechmaticsSTTService` updated dependencies for `speechmatics-rt>=0.5.0`.
|
||||
|
||||
### Deprecated
|
||||
|
||||
- The `send_transcription_frames` argument to `AWSNovaSonicLLMService` is
|
||||
deprecated. Transcription frames are now always sent. They go upstream, to be
|
||||
handled by the user context aggregator. See "Added" section for details.
|
||||
|
||||
- Types in `pipecat.services.aws.nova_sonic.context` have been deprecated due
|
||||
to changes to support `LLMContext`. See "Changed" section for details.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed an issue in `RivaSegmentedSTTService` where a runtime error occurred due
|
||||
to a mismatch in the \_handle_transcription method's signature.
|
||||
to a mismatch in the _handle_transcription method's signature.
|
||||
|
||||
- Fixed multiple pipeline task cancellation issues. `asyncio.CancelledError` is
|
||||
now handled properly in `PipelineTask` making it possible to cancel an asyncio
|
||||
|
||||
@@ -44,10 +44,6 @@ Looking to build structured conversations? Check out [Pipecat Flows](https://git
|
||||
|
||||
Want to build beautiful and engaging experiences? Checkout the [Voice UI Kit](https://github.com/pipecat-ai/voice-ui-kit), a collection of components, hooks and templates for building voice AI applications quickly.
|
||||
|
||||
### 🛠️ CLI
|
||||
|
||||
Create a new project in under a minute with the [Pipecat CLI](https://github.com/pipecat-ai/pipecat-cli). Then use the CLI to monitor and deploy your agent to production.
|
||||
|
||||
### 🔍 Debugging
|
||||
|
||||
Looking for help debugging your pipeline and processors? Check out [Whisker](https://github.com/pipecat-ai/whisker), a real-time Pipecat debugger.
|
||||
|
||||
250
create_daily_room.py
Executable file
250
create_daily_room.py
Executable file
@@ -0,0 +1,250 @@
|
||||
#!/usr/bin/env -S uv run
|
||||
"""Utilities for creating Daily.co rooms with retry logic.
|
||||
|
||||
This module provides functions to create Daily rooms via REST API
|
||||
with robust error handling, rate limiting, and exponential backoff retry logic.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import time
|
||||
from typing import Dict, Optional
|
||||
|
||||
from httpx import AsyncClient, HTTPStatusError
|
||||
from loguru import logger
|
||||
from tenacity import (
|
||||
AsyncRetrying,
|
||||
RetryError,
|
||||
retry_if_exception_type,
|
||||
stop_after_attempt,
|
||||
wait_exponential,
|
||||
)
|
||||
|
||||
|
||||
async def periodic_progress_logger(
|
||||
progress_dict: Dict[str, int],
|
||||
total: int,
|
||||
interval_seconds: float = 5.0,
|
||||
stop_event: Optional[asyncio.Event] = None,
|
||||
):
|
||||
"""Log progress periodically in the background.
|
||||
|
||||
Args:
|
||||
progress_dict: Shared dict with 'completed' and 'failed' counts
|
||||
total: Total number of items being processed
|
||||
interval_seconds: How often to log progress (default 5 seconds)
|
||||
stop_event: Event to signal when to stop logging
|
||||
"""
|
||||
if stop_event is None:
|
||||
stop_event = asyncio.Event()
|
||||
|
||||
while not stop_event.is_set():
|
||||
await asyncio.sleep(interval_seconds)
|
||||
|
||||
if stop_event.is_set():
|
||||
break
|
||||
|
||||
total_processed = progress_dict["completed"] + progress_dict["failed"]
|
||||
if total_processed > 0:
|
||||
percentage = (total_processed / total) * 100
|
||||
rate = total_processed / interval_seconds if interval_seconds > 0 else 0
|
||||
|
||||
logger.info(
|
||||
f"⏳ Progress: {total_processed}/{total} ({percentage:.1f}%) - "
|
||||
f"✅ {progress_dict['completed']} succeeded, "
|
||||
f"❌ {progress_dict['failed']} failed"
|
||||
)
|
||||
|
||||
|
||||
async def create_daily_room(
|
||||
name: Optional[str] = None,
|
||||
privacy: str = "public",
|
||||
exp_minutes: int = 10,
|
||||
max_retries: int = 5,
|
||||
) -> Optional[Dict]:
|
||||
"""Create a Daily room with automatic retry on rate limit errors.
|
||||
|
||||
Uses tenacity library to handle rate limiting (429 errors) with
|
||||
exponential backoff and automatic retries.
|
||||
|
||||
Args:
|
||||
name: Room name (auto-generated if None). Must match /[A-Za-z0-9_-]+/ and be <= 128 chars
|
||||
privacy: Room privacy setting ("public" or "private")
|
||||
exp_minutes: Minutes until room expires (default 10)
|
||||
max_retries: Maximum number of retry attempts on rate limit (default 5)
|
||||
|
||||
Returns:
|
||||
Room object dict with 'name', 'url', 'id', 'config', etc., or None on failure
|
||||
"""
|
||||
# Calculate expiration timestamp (unix timestamp in seconds)
|
||||
exp_timestamp = int(time.time()) + (exp_minutes * 60)
|
||||
|
||||
# Build request body
|
||||
body = {
|
||||
"privacy": privacy,
|
||||
"properties": {
|
||||
"exp": exp_timestamp,
|
||||
},
|
||||
}
|
||||
|
||||
if name:
|
||||
body["name"] = name
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
try:
|
||||
# Use tenacity's AsyncRetrying for automatic retry with exponential backoff
|
||||
async for attempt in AsyncRetrying(
|
||||
retry=retry_if_exception_type(HTTPStatusError),
|
||||
stop=stop_after_attempt(max_retries),
|
||||
wait=wait_exponential(multiplier=1, min=1, max=60),
|
||||
reraise=True,
|
||||
):
|
||||
with attempt:
|
||||
async with AsyncClient(timeout=30) as client:
|
||||
response = await client.post(
|
||||
url="https://api.daily.co/v1/rooms",
|
||||
headers=headers,
|
||||
json=body,
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
# This line should never be reached due to reraise=True, but satisfies type checker
|
||||
return None
|
||||
|
||||
except RetryError as e:
|
||||
# All retries exhausted
|
||||
last_exception = e.last_attempt.exception()
|
||||
logger.exception(f"Failed to create room after {max_retries} retries: {last_exception}")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Unexpected error creating room: {e}")
|
||||
return None
|
||||
|
||||
|
||||
async def create_room_with_progress(
|
||||
index: int, total: int, progress_dict: Dict[str, int], **kwargs
|
||||
) -> Optional[Dict]:
|
||||
"""Wrapper for create_daily_room that tracks progress.
|
||||
|
||||
Args:
|
||||
index: Index of this room creation (0-based)
|
||||
total: Total number of rooms being created
|
||||
progress_dict: Shared dict for tracking progress {"completed": 0, "failed": 0}
|
||||
**kwargs: Arguments passed to create_daily_room
|
||||
|
||||
Returns:
|
||||
Room object dict or None
|
||||
"""
|
||||
result = await create_daily_room(**kwargs)
|
||||
|
||||
# Update progress
|
||||
if result is not None:
|
||||
progress_dict["completed"] += 1
|
||||
else:
|
||||
progress_dict["failed"] += 1
|
||||
|
||||
return result
|
||||
|
||||
|
||||
async def test_create_rooms(
|
||||
num_rooms: int = 1000,
|
||||
progress_interval: float = 5.0,
|
||||
) -> Dict[str, int | float]:
|
||||
"""Attempt to create multiple Daily rooms concurrently.
|
||||
|
||||
This function demonstrates concurrent room creation and tracks
|
||||
success/failure statistics. Rate limiting will likely occur when
|
||||
creating many rooms quickly.
|
||||
|
||||
Args:
|
||||
num_rooms: Number of rooms to attempt to create (default 1000)
|
||||
progress_interval: How often to log progress in seconds (default 5.0)
|
||||
|
||||
Returns:
|
||||
Dict with statistics: {'success': int, 'failed': int, 'total': int, 'elapsed_seconds': float}
|
||||
"""
|
||||
logger.info(f"Starting bulk room creation: attempting to create {num_rooms} rooms")
|
||||
start_time = time.time()
|
||||
|
||||
# Shared progress tracking dictionary
|
||||
progress_dict = {"completed": 0, "failed": 0}
|
||||
|
||||
# Start background progress logger
|
||||
stop_event = asyncio.Event()
|
||||
progress_task = asyncio.create_task(
|
||||
periodic_progress_logger(progress_dict, num_rooms, progress_interval, stop_event)
|
||||
)
|
||||
|
||||
# Create and execute all tasks concurrently
|
||||
logger.info(f"Executing {num_rooms} concurrent room creation requests...")
|
||||
tasks = [
|
||||
create_room_with_progress(
|
||||
index=i,
|
||||
total=num_rooms,
|
||||
progress_dict=progress_dict,
|
||||
name=None, # Auto-generate names
|
||||
privacy="public",
|
||||
exp_minutes=10,
|
||||
max_retries=5,
|
||||
)
|
||||
for i in range(num_rooms)
|
||||
]
|
||||
|
||||
try:
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
finally:
|
||||
# Stop the progress logger
|
||||
stop_event.set()
|
||||
await progress_task
|
||||
|
||||
# Count successes and failures
|
||||
success_count = sum(1 for r in results if r is not None and not isinstance(r, Exception))
|
||||
failed_count = num_rooms - success_count
|
||||
|
||||
elapsed_time = time.time() - start_time
|
||||
|
||||
# Log statistics
|
||||
logger.info("=" * 60)
|
||||
logger.info("BULK ROOM CREATION SUMMARY")
|
||||
logger.info("=" * 60)
|
||||
logger.info(f"Total rooms attempted: {num_rooms}")
|
||||
logger.info(f"Successfully created: {success_count}")
|
||||
logger.info(f"Failed to create: {failed_count}")
|
||||
logger.info(f"Success rate: {(success_count / num_rooms * 100):.2f}%")
|
||||
logger.info(f"Total time: {elapsed_time:.2f} seconds")
|
||||
logger.info(f"Average time per room: {(elapsed_time / num_rooms):.3f} seconds")
|
||||
logger.info("=" * 60)
|
||||
|
||||
return {
|
||||
"success": success_count,
|
||||
"failed": failed_count,
|
||||
"total": num_rooms,
|
||||
"elapsed_seconds": elapsed_time,
|
||||
}
|
||||
|
||||
|
||||
# Example usage
|
||||
async def main():
|
||||
"""Example usage of the room creation functions."""
|
||||
# Test creating a single room
|
||||
logger.info("Testing single room creation...")
|
||||
room = await create_daily_room(exp_minutes=10)
|
||||
if room:
|
||||
logger.info(f"Created room: {room['name']} at {room['url']}")
|
||||
else:
|
||||
logger.error("Failed to create room")
|
||||
|
||||
# Uncomment to test bulk creation (warning: may hit rate limits!)
|
||||
logger.info("\nTesting bulk room creation...")
|
||||
stats = await test_create_rooms(num_rooms=1000)
|
||||
logger.info(f"Final stats: {stats}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -72,6 +72,7 @@ async def save_conversation(params: FunctionCallParams):
|
||||
)
|
||||
try:
|
||||
with open(filename, "w") as file:
|
||||
# todo: extract 'system' into the first message in the list
|
||||
messages = params.context.get_messages()
|
||||
# remove the last message, which is the instruction we just gave to save the conversation
|
||||
messages.pop()
|
||||
|
||||
@@ -90,6 +90,7 @@ async def save_conversation(params: FunctionCallParams):
|
||||
)
|
||||
try:
|
||||
with open(filename, "w") as file:
|
||||
# todo: extract 'system' into the first message in the list
|
||||
messages = params.context.get_messages()
|
||||
# remove the last message (the instruction to save the context)
|
||||
messages.pop()
|
||||
|
||||
@@ -20,8 +20,6 @@ from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
@@ -77,7 +75,7 @@ async def save_conversation(params: FunctionCallParams):
|
||||
filename = f"{BASE_FILENAME}{timestamp}.json"
|
||||
try:
|
||||
with open(filename, "w") as file:
|
||||
messages = params.context.get_messages()
|
||||
messages = params.context.get_messages_for_persistent_storage()
|
||||
# remove the last few messages. in reverse order, they are:
|
||||
# - the in progress save tool call
|
||||
# - the invocation of the save tool call
|
||||
@@ -225,13 +223,13 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
llm.register_function("get_saved_conversation_filenames", get_saved_conversation_filenames)
|
||||
llm.register_function("load_conversation", load_conversation)
|
||||
|
||||
context = LLMContext(
|
||||
context = OpenAILLMContext(
|
||||
messages=[
|
||||
{"role": "system", "content": f"{system_instruction}"},
|
||||
],
|
||||
tools=tools,
|
||||
)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
|
||||
@@ -18,8 +18,7 @@ from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.aws.nova_sonic.llm import AWSNovaSonicLLMService
|
||||
@@ -120,7 +119,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
llm.register_function("get_current_weather", fetch_weather_from_api)
|
||||
|
||||
# Set up context and context management.
|
||||
context = LLMContext(
|
||||
# AWSNovaSonicService will adapt OpenAI LLM context objects with standard message format to
|
||||
# what's expected by Nova Sonic.
|
||||
context = OpenAILLMContext(
|
||||
messages=[
|
||||
{"role": "system", "content": f"{system_instruction}"},
|
||||
{
|
||||
@@ -130,7 +131,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
],
|
||||
tools=tools,
|
||||
)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
# Build the pipeline
|
||||
pipeline = Pipeline(
|
||||
|
||||
267
fetch_s3_recording.py
Executable file
267
fetch_s3_recording.py
Executable file
@@ -0,0 +1,267 @@
|
||||
#!/usr/bin/env -S uv run
|
||||
"""Utilities for fetching Daily.co recording URLs with retry logic.
|
||||
|
||||
This module provides functions to retrieve recording download links from Daily's REST API
|
||||
with robust error handling, rate limiting, and exponential backoff retry logic.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from httpx import AsyncClient, HTTPStatusError
|
||||
from loguru import logger
|
||||
from tenacity import (
|
||||
AsyncRetrying,
|
||||
RetryError,
|
||||
retry_if_exception_type,
|
||||
stop_after_attempt,
|
||||
wait_exponential,
|
||||
)
|
||||
|
||||
|
||||
async def get_recording_s3_url_with_retry(
|
||||
room_id: str,
|
||||
max_retries: int = 5,
|
||||
) -> Tuple[Optional[str], Optional[str]]:
|
||||
"""Retrieve recording URL with exponential backoff and retry logic.
|
||||
|
||||
Uses tenacity library to handle rate limiting (429 errors) and other
|
||||
transient errors with automatic exponential backoff.
|
||||
|
||||
Args:
|
||||
room_id: Daily.co room identifier
|
||||
max_retries: Maximum number of retry attempts (default 5)
|
||||
|
||||
Returns:
|
||||
Tuple of (recording_url, recording_signed_url)
|
||||
Returns (None, None) if no recording exists for the room.
|
||||
"""
|
||||
try:
|
||||
# Use tenacity's AsyncRetrying for automatic retry with exponential backoff
|
||||
async for attempt in AsyncRetrying(
|
||||
retry=retry_if_exception_type((HTTPStatusError, Exception)),
|
||||
stop=stop_after_attempt(max_retries),
|
||||
wait=wait_exponential(multiplier=1, min=1, max=60),
|
||||
reraise=True,
|
||||
):
|
||||
with attempt:
|
||||
recording_url, recording_signed_url, status = await get_recording_s3_url(
|
||||
room_id=room_id
|
||||
)
|
||||
|
||||
# If no recording exists (status is None), return immediately - no retry
|
||||
if status is None:
|
||||
logger.debug(f"No recording found for room {room_id}")
|
||||
return None, None
|
||||
|
||||
# If recording exists but is not finished yet, retry
|
||||
if status != "finished":
|
||||
logger.warning(
|
||||
f"Recording not finished for room {room_id}, status: {status} "
|
||||
f"(attempt {attempt.retry_state.attempt_number}/{max_retries})"
|
||||
)
|
||||
raise Exception(f"Recording not ready, status: {status}")
|
||||
|
||||
# Recording is finished, return the URLs
|
||||
return recording_url, recording_signed_url
|
||||
|
||||
# This line should never be reached due to reraise=True, but satisfies type checker
|
||||
return None, None
|
||||
|
||||
except RetryError as e:
|
||||
# All retries exhausted
|
||||
last_exception = e.last_attempt.exception()
|
||||
logger.error(
|
||||
f"Failed to retrieve recording URL for room {room_id} after {max_retries} attempts: "
|
||||
f"{last_exception}"
|
||||
)
|
||||
return None, None
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Unexpected error retrieving recording for room {room_id}: {e}")
|
||||
return None, None
|
||||
|
||||
|
||||
async def get_recording_s3_url(
|
||||
room_id: str,
|
||||
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
|
||||
"""Get recording URL using Daily's REST API.
|
||||
|
||||
Args:
|
||||
room_id: Daily.co room identifier
|
||||
|
||||
Returns:
|
||||
Tuple of (recording_url, recording_signed_url, status)
|
||||
- recording_url: The download link for the recording
|
||||
- recording_signed_url: Same as recording_url (kept for backward compatibility)
|
||||
- status: Recording status from Daily API
|
||||
|
||||
Raises:
|
||||
HTTPStatusError: When HTTP errors occur (including rate limits)
|
||||
"""
|
||||
headers = {
|
||||
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
async with AsyncClient(timeout=180) as client:
|
||||
# List recordings for the room
|
||||
list_response = await client.get(
|
||||
url=f"https://api.daily.co/v1/recordings?room_name={room_id}",
|
||||
headers=headers,
|
||||
)
|
||||
list_response.raise_for_status()
|
||||
list_data = list_response.json()
|
||||
|
||||
# Check if recording exists and is finished
|
||||
if not list_data.get("data") or len(list_data["data"]) == 0:
|
||||
return (None, None, None)
|
||||
|
||||
recording_id = list_data["data"][0].get("id")
|
||||
status = list_data["data"][0].get("status")
|
||||
|
||||
if not recording_id or status != "finished":
|
||||
return (None, None, status)
|
||||
|
||||
# Get the recording access link
|
||||
link_response = await client.get(
|
||||
url=f"https://api.daily.co/v1/recordings/{recording_id}/access-link",
|
||||
headers=headers,
|
||||
)
|
||||
link_response.raise_for_status()
|
||||
link_data = link_response.json()
|
||||
|
||||
recording_url = link_data.get("download_link")
|
||||
if not recording_url:
|
||||
logger.warning(f"No download link found for recording {recording_id}")
|
||||
return (None, None, status)
|
||||
|
||||
# Return the same URL for both fields for backward compatibility
|
||||
return (recording_url, recording_url, status)
|
||||
|
||||
|
||||
async def get_recent_recordings(limit: int = 100) -> list[str]:
|
||||
"""Get list of recent recording IDs from Daily API.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of recordings to retrieve (default 100)
|
||||
|
||||
Returns:
|
||||
List of recording IDs (strings)
|
||||
"""
|
||||
try:
|
||||
async with AsyncClient() as client:
|
||||
response = await client.get(
|
||||
url="https://api.daily.co/v1/recordings",
|
||||
headers={
|
||||
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
timeout=30,
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
data = response.json()
|
||||
recordings = data.get("data", [])
|
||||
recording_ids = [rec.get("id") for rec in recordings[:limit] if rec.get("id")]
|
||||
|
||||
logger.info(f"Retrieved {len(recording_ids)} recording IDs from Daily API")
|
||||
return recording_ids
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to get recordings from Daily API: {e}")
|
||||
return []
|
||||
|
||||
|
||||
async def main():
|
||||
"""Test get_recording_s3_url_with_retry with recent recordings."""
|
||||
logger.info("Starting recording fetch test...")
|
||||
|
||||
# Step 1: Get the most recent 100 recordings
|
||||
logger.info("Fetching recent recordings...")
|
||||
recording_ids = await get_recent_recordings(limit=100)
|
||||
|
||||
if not recording_ids:
|
||||
logger.error("No recordings found. Cannot proceed with test.")
|
||||
return
|
||||
|
||||
logger.info(f"Found {len(recording_ids)} recordings to fetch")
|
||||
|
||||
# Fetch access links for each recording concurrently
|
||||
logger.info(
|
||||
f"Attempting to fetch access links for {len(recording_ids)} recordings concurrently..."
|
||||
)
|
||||
|
||||
# Create tasks for all recordings
|
||||
async def get_recording_link(recording_id: str) -> Tuple[Optional[str], Optional[str]]:
|
||||
"""Get download link for a specific recording ID."""
|
||||
try:
|
||||
headers = {
|
||||
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
async with AsyncClient(timeout=180) as client:
|
||||
# Get the recording access link
|
||||
link_response = await client.get(
|
||||
url=f"https://api.daily.co/v1/recordings/{recording_id}/access-link",
|
||||
headers=headers,
|
||||
)
|
||||
link_response.raise_for_status()
|
||||
link_data = link_response.json()
|
||||
|
||||
recording_url = link_data.get("download_link")
|
||||
if not recording_url:
|
||||
logger.warning(f"No download link found for recording {recording_id}")
|
||||
return (None, None)
|
||||
|
||||
return (recording_url, recording_url)
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to get access link for recording {recording_id}: {e}")
|
||||
return (None, None)
|
||||
|
||||
tasks = [get_recording_link(recording_id) for recording_id in recording_ids]
|
||||
|
||||
# Execute all tasks concurrently
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
# Process results
|
||||
success_count = 0
|
||||
not_found_count = 0
|
||||
failed_count = 0
|
||||
|
||||
for i, (recording_id, result) in enumerate(zip(recording_ids, results), 1):
|
||||
if isinstance(result, Exception):
|
||||
failed_count += 1
|
||||
logger.error(f"❌ [{i}/{len(recording_ids)}] Failed for {recording_id}: {result}")
|
||||
elif isinstance(result, tuple) and len(result) == 2:
|
||||
recording_url, recording_signed_url = result
|
||||
if recording_url:
|
||||
success_count += 1
|
||||
logger.info(
|
||||
f"✅ [{i}/{len(recording_ids)}] Found recording link for {recording_id}"
|
||||
)
|
||||
logger.debug(f" URL: {recording_url}")
|
||||
else:
|
||||
not_found_count += 1
|
||||
logger.debug(f"ℹ️ [{i}/{len(recording_ids)}] No link for {recording_id}")
|
||||
else:
|
||||
failed_count += 1
|
||||
logger.error(f"❌ [{i}/{len(recording_ids)}] Unexpected result type for {recording_id}")
|
||||
|
||||
# Summary
|
||||
logger.info("\n" + "=" * 60)
|
||||
logger.info("RECORDING FETCH TEST SUMMARY")
|
||||
logger.info("=" * 60)
|
||||
logger.info(f"Total recordings checked: {len(recording_ids)}")
|
||||
logger.info(f"✅ Recordings found: {success_count}")
|
||||
logger.info(f"ℹ️ No recordings: {not_found_count}")
|
||||
logger.info(f"❌ Failed: {failed_count}")
|
||||
logger.info(f"Success rate: {(success_count / len(recording_ids) * 100):.2f}%")
|
||||
logger.info("=" * 60)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
238
fetch_s3_recording_by_room.py
Executable file
238
fetch_s3_recording_by_room.py
Executable file
@@ -0,0 +1,238 @@
|
||||
#!/usr/bin/env -S uv run
|
||||
"""Utilities for fetching Daily.co recording URLs with retry logic.
|
||||
|
||||
This module provides functions to retrieve recording download links from Daily's REST API
|
||||
with robust error handling, rate limiting, and exponential backoff retry logic.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from httpx import AsyncClient, HTTPStatusError
|
||||
from loguru import logger
|
||||
from tenacity import (
|
||||
AsyncRetrying,
|
||||
RetryError,
|
||||
retry_if_exception_type,
|
||||
stop_after_attempt,
|
||||
wait_exponential,
|
||||
)
|
||||
|
||||
|
||||
async def get_recording_s3_url_with_retry(
|
||||
room_id: str,
|
||||
max_retries: int = 5,
|
||||
) -> Tuple[Optional[str], Optional[str]]:
|
||||
"""Retrieve recording URL with exponential backoff and retry logic.
|
||||
|
||||
Uses tenacity library to handle rate limiting (429 errors) and other
|
||||
transient errors with automatic exponential backoff.
|
||||
|
||||
Args:
|
||||
room_id: Daily.co room identifier
|
||||
max_retries: Maximum number of retry attempts (default 5)
|
||||
|
||||
Returns:
|
||||
Tuple of (recording_url, recording_signed_url)
|
||||
Returns (None, None) if no recording exists for the room.
|
||||
"""
|
||||
try:
|
||||
# Use tenacity's AsyncRetrying for automatic retry with exponential backoff
|
||||
async for attempt in AsyncRetrying(
|
||||
retry=retry_if_exception_type((HTTPStatusError, Exception)),
|
||||
stop=stop_after_attempt(max_retries),
|
||||
wait=wait_exponential(multiplier=1, min=1, max=60),
|
||||
reraise=True,
|
||||
):
|
||||
with attempt:
|
||||
recording_url, recording_signed_url, status = await get_recording_s3_url(
|
||||
room_id=room_id
|
||||
)
|
||||
|
||||
# If no recording exists (status is None), return immediately - no retry
|
||||
if status is None:
|
||||
logger.debug(f"No recording found for room {room_id}")
|
||||
return None, None
|
||||
|
||||
# If recording exists but is not finished yet, retry
|
||||
if status != "finished":
|
||||
logger.warning(
|
||||
f"Recording not finished for room {room_id}, status: {status} "
|
||||
f"(attempt {attempt.retry_state.attempt_number}/{max_retries})"
|
||||
)
|
||||
raise Exception(f"Recording not ready, status: {status}")
|
||||
|
||||
# Recording is finished, return the URLs
|
||||
return recording_url, recording_signed_url
|
||||
|
||||
# This line should never be reached due to reraise=True, but satisfies type checker
|
||||
return None, None
|
||||
|
||||
except RetryError as e:
|
||||
# All retries exhausted
|
||||
last_exception = e.last_attempt.exception()
|
||||
logger.error(
|
||||
f"Failed to retrieve recording URL for room {room_id} after {max_retries} attempts: "
|
||||
f"{last_exception}"
|
||||
)
|
||||
return None, None
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Unexpected error retrieving recording for room {room_id}: {e}")
|
||||
return None, None
|
||||
|
||||
|
||||
async def get_recording_s3_url(
|
||||
room_id: str,
|
||||
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
|
||||
"""Get recording URL using Daily's REST API.
|
||||
|
||||
Args:
|
||||
room_id: Daily.co room identifier
|
||||
|
||||
Returns:
|
||||
Tuple of (recording_url, recording_signed_url, status)
|
||||
- recording_url: The download link for the recording
|
||||
- recording_signed_url: Same as recording_url (kept for backward compatibility)
|
||||
- status: Recording status from Daily API
|
||||
|
||||
Raises:
|
||||
HTTPStatusError: When HTTP errors occur (including rate limits)
|
||||
"""
|
||||
headers = {
|
||||
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
async with AsyncClient(timeout=180) as client:
|
||||
# List recordings for the room
|
||||
list_response = await client.get(
|
||||
url=f"https://api.daily.co/v1/recordings?room_name={room_id}",
|
||||
headers=headers,
|
||||
)
|
||||
list_response.raise_for_status()
|
||||
list_data = list_response.json()
|
||||
|
||||
# Check if recording exists and is finished
|
||||
if not list_data.get("data") or len(list_data["data"]) == 0:
|
||||
return (None, None, None)
|
||||
|
||||
recording_id = list_data["data"][0].get("id")
|
||||
status = list_data["data"][0].get("status")
|
||||
|
||||
if not recording_id or status != "finished":
|
||||
return (None, None, status)
|
||||
|
||||
# Get the recording access link
|
||||
link_response = await client.get(
|
||||
url=f"https://api.daily.co/v1/recordings/{recording_id}/access-link",
|
||||
headers=headers,
|
||||
)
|
||||
link_response.raise_for_status()
|
||||
link_data = link_response.json()
|
||||
|
||||
recording_url = link_data.get("download_link")
|
||||
if not recording_url:
|
||||
logger.warning(f"No download link found for recording {recording_id}")
|
||||
return (None, None, status)
|
||||
|
||||
# Return the same URL for both fields for backward compatibility
|
||||
return (recording_url, recording_url, status)
|
||||
|
||||
|
||||
async def get_recent_rooms(limit: int = 100) -> list[str]:
|
||||
"""Get list of recent room names from Daily API.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of rooms to retrieve (default 100, max 100)
|
||||
|
||||
Returns:
|
||||
List of room names (strings)
|
||||
"""
|
||||
try:
|
||||
async with AsyncClient() as client:
|
||||
response = await client.get(
|
||||
url=f"https://api.daily.co/v1/rooms?limit={min(limit, 100)}",
|
||||
headers={
|
||||
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
timeout=30,
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
data = response.json()
|
||||
rooms = data.get("data", [])
|
||||
room_names = [room.get("name") for room in rooms if room.get("name")]
|
||||
|
||||
logger.info(f"Retrieved {len(room_names)} room names from Daily API")
|
||||
return room_names
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to get rooms from Daily API: {e}")
|
||||
return []
|
||||
|
||||
|
||||
async def main():
|
||||
"""Test get_recording_s3_url_with_retry with recent rooms."""
|
||||
logger.info("Starting recording fetch test...")
|
||||
|
||||
# Step 1: Get the most recent 100 rooms
|
||||
logger.info("Fetching recent rooms...")
|
||||
room_names = await get_recent_rooms(limit=100)
|
||||
|
||||
if not room_names:
|
||||
logger.error("No rooms found. Cannot proceed with test.")
|
||||
return
|
||||
|
||||
logger.info(f"Found {len(room_names)} rooms to check for recordings")
|
||||
|
||||
# Call get_recording_s3_url_with_retry on each room concurrently
|
||||
logger.info(f"Attempting to fetch recordings for {len(room_names)} rooms concurrently...")
|
||||
|
||||
# Create tasks for all rooms
|
||||
tasks = [
|
||||
get_recording_s3_url_with_retry(room_id=room_name, max_retries=3)
|
||||
for room_name in room_names
|
||||
]
|
||||
|
||||
# Execute all tasks concurrently
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
# Process results
|
||||
success_count = 0
|
||||
not_found_count = 0
|
||||
failed_count = 0
|
||||
|
||||
for i, (room_name, result) in enumerate(zip(room_names, results), 1):
|
||||
if isinstance(result, Exception):
|
||||
failed_count += 1
|
||||
logger.error(f"❌ [{i}/{len(room_names)}] Failed for {room_name}: {result}")
|
||||
elif isinstance(result, tuple) and len(result) == 2:
|
||||
recording_url, recording_signed_url = result
|
||||
if recording_url:
|
||||
success_count += 1
|
||||
logger.info(f"✅ [{i}/{len(room_names)}] Found recording for {room_name}")
|
||||
logger.debug(f" URL: {recording_url[:80]}...")
|
||||
else:
|
||||
not_found_count += 1
|
||||
logger.debug(f"ℹ️ [{i}/{len(room_names)}] No recording for {room_name}")
|
||||
else:
|
||||
failed_count += 1
|
||||
logger.error(f"❌ [{i}/{len(room_names)}] Unexpected result type for {room_name}")
|
||||
|
||||
# Summary
|
||||
logger.info("\n" + "=" * 60)
|
||||
logger.info("RECORDING FETCH TEST SUMMARY")
|
||||
logger.info("=" * 60)
|
||||
logger.info(f"Total rooms checked: {len(room_names)}")
|
||||
logger.info(f"✅ Recordings found: {success_count}")
|
||||
logger.info(f"ℹ️ No recordings: {not_found_count}")
|
||||
logger.info(f"❌ Failed: {failed_count}")
|
||||
logger.info(f"Success rate: {(success_count / len(room_names) * 100):.2f}%")
|
||||
logger.info("=" * 60)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -38,7 +38,6 @@ dependencies = [
|
||||
# Pinning numba to resolve package dependencies
|
||||
"numba==0.61.2",
|
||||
"wait_for2>=0.4.1; python_version<'3.12'",
|
||||
"pipecat-ai-cli"
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
@@ -56,7 +55,7 @@ azure = [ "azure-cognitiveservices-speech~=1.42.0"]
|
||||
cartesia = [ "cartesia~=2.0.3", "pipecat-ai[websockets-base]" ]
|
||||
cerebras = []
|
||||
deepseek = []
|
||||
daily = [ "daily-python~=0.20.0" ]
|
||||
daily = [ "daily-python~=0.19.9" ]
|
||||
deepgram = [ "deepgram-sdk~=4.7.0" ]
|
||||
elevenlabs = [ "pipecat-ai[websockets-base]" ]
|
||||
fal = [ "fal-client~=0.5.9" ]
|
||||
|
||||
@@ -6,47 +6,13 @@
|
||||
|
||||
"""AWS Nova Sonic LLM adapter for Pipecat."""
|
||||
|
||||
import copy
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, List, Optional, TypedDict
|
||||
|
||||
from loguru import logger
|
||||
from typing import Any, Dict, List, TypedDict
|
||||
|
||||
from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
|
||||
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext, LLMContextMessage
|
||||
|
||||
|
||||
class Role(Enum):
|
||||
"""Roles supported in AWS Nova Sonic conversations.
|
||||
|
||||
Parameters:
|
||||
SYSTEM: System-level messages (not used in conversation history).
|
||||
USER: Messages sent by the user.
|
||||
ASSISTANT: Messages sent by the assistant.
|
||||
TOOL: Messages sent by tools (not used in conversation history).
|
||||
"""
|
||||
|
||||
SYSTEM = "SYSTEM"
|
||||
USER = "USER"
|
||||
ASSISTANT = "ASSISTANT"
|
||||
TOOL = "TOOL"
|
||||
|
||||
|
||||
@dataclass
|
||||
class AWSNovaSonicConversationHistoryMessage:
|
||||
"""A single message in AWS Nova Sonic conversation history.
|
||||
|
||||
Parameters:
|
||||
role: The role of the message sender (USER or ASSISTANT only).
|
||||
text: The text content of the message.
|
||||
"""
|
||||
|
||||
role: Role # only USER and ASSISTANT
|
||||
text: str
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
|
||||
|
||||
class AWSNovaSonicLLMInvocationParams(TypedDict):
|
||||
@@ -55,9 +21,7 @@ class AWSNovaSonicLLMInvocationParams(TypedDict):
|
||||
This is a placeholder until support for universal LLMContext machinery is added for AWS Nova Sonic.
|
||||
"""
|
||||
|
||||
system_instruction: Optional[str]
|
||||
messages: List[AWSNovaSonicConversationHistoryMessage]
|
||||
tools: List[Dict[str, Any]]
|
||||
pass
|
||||
|
||||
|
||||
class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
|
||||
@@ -70,7 +34,7 @@ class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
|
||||
@property
|
||||
def id_for_llm_specific_messages(self) -> str:
|
||||
"""Get the identifier used in LLMSpecificMessage instances for AWS Nova Sonic."""
|
||||
return "aws-nova-sonic"
|
||||
raise NotImplementedError("Universal LLMContext is not yet supported for AWS Nova Sonic.")
|
||||
|
||||
def get_llm_invocation_params(self, context: LLMContext) -> AWSNovaSonicLLMInvocationParams:
|
||||
"""Get AWS Nova Sonic-specific LLM invocation parameters from a universal LLM context.
|
||||
@@ -83,13 +47,7 @@ class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
|
||||
Returns:
|
||||
Dictionary of parameters for invoking AWS Nova Sonic's LLM API.
|
||||
"""
|
||||
messages = self._from_universal_context_messages(self.get_messages(context))
|
||||
return {
|
||||
"system_instruction": messages.system_instruction,
|
||||
"messages": messages.messages,
|
||||
# NOTE: LLMContext's tools are guaranteed to be a ToolsSchema (or NOT_GIVEN)
|
||||
"tools": self.from_standard_tools(context.tools) or [],
|
||||
}
|
||||
raise NotImplementedError("Universal LLMContext is not yet supported for AWS Nova Sonic.")
|
||||
|
||||
def get_messages_for_logging(self, context) -> List[Dict[str, Any]]:
|
||||
"""Get messages from a universal LLM context in a format ready for logging about AWS Nova Sonic.
|
||||
@@ -104,75 +62,7 @@ class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
|
||||
Returns:
|
||||
List of messages in a format ready for logging about AWS Nova Sonic.
|
||||
"""
|
||||
return self._from_universal_context_messages(self.get_messages(context)).messages
|
||||
|
||||
@dataclass
|
||||
class ConvertedMessages:
|
||||
"""Container for Google-formatted messages converted from universal context."""
|
||||
|
||||
messages: List[AWSNovaSonicConversationHistoryMessage]
|
||||
system_instruction: Optional[str] = None
|
||||
|
||||
def _from_universal_context_messages(
|
||||
self, universal_context_messages: List[LLMContextMessage]
|
||||
) -> ConvertedMessages:
|
||||
system_instruction = None
|
||||
messages = []
|
||||
|
||||
# Bail if there are no messages
|
||||
if not universal_context_messages:
|
||||
return self.ConvertedMessages()
|
||||
|
||||
universal_context_messages = copy.deepcopy(universal_context_messages)
|
||||
|
||||
# If we have a "system" message as our first message, let's pull that out into "instruction"
|
||||
if universal_context_messages[0].get("role") == "system":
|
||||
system = universal_context_messages.pop(0)
|
||||
content = system.get("content")
|
||||
if isinstance(content, str):
|
||||
system_instruction = content
|
||||
elif isinstance(content, list):
|
||||
system_instruction = content[0].get("text")
|
||||
if system_instruction:
|
||||
self._system_instruction = system_instruction
|
||||
|
||||
# Process remaining messages to fill out conversation history.
|
||||
# Nova Sonic supports "user" and "assistant" messages in history.
|
||||
for universal_context_message in universal_context_messages:
|
||||
message = self._from_universal_context_message(universal_context_message)
|
||||
if message:
|
||||
messages.append(message)
|
||||
|
||||
return self.ConvertedMessages(messages=messages, system_instruction=system_instruction)
|
||||
|
||||
def _from_universal_context_message(self, message) -> AWSNovaSonicConversationHistoryMessage:
|
||||
"""Convert standard message format to Nova Sonic format.
|
||||
|
||||
Args:
|
||||
message: Standard message dictionary to convert.
|
||||
|
||||
Returns:
|
||||
Nova Sonic conversation history message, or None if not convertible.
|
||||
"""
|
||||
role = message.get("role")
|
||||
if message.get("role") == "user" or message.get("role") == "assistant":
|
||||
content = message.get("content")
|
||||
if isinstance(message.get("content"), list):
|
||||
content = ""
|
||||
for c in message.get("content"):
|
||||
if c.get("type") == "text":
|
||||
content += " " + c.get("text")
|
||||
else:
|
||||
logger.error(
|
||||
f"Unhandled content type in context message: {c.get('type')} - {message}"
|
||||
)
|
||||
# There won't be content if this is an assistant tool call entry.
|
||||
# We're ignoring those since they can't be loaded into AWS Nova Sonic conversation
|
||||
# history
|
||||
if content:
|
||||
return AWSNovaSonicConversationHistoryMessage(role=Role[role.upper()], text=content)
|
||||
# NOTE: we're ignoring messages with role "tool" since they can't be loaded into AWS Nova
|
||||
# Sonic conversation history
|
||||
raise NotImplementedError("Universal LLMContext is not yet supported for AWS Nova Sonic.")
|
||||
|
||||
@staticmethod
|
||||
def _to_aws_nova_sonic_function_format(function: FunctionSchema) -> Dict[str, Any]:
|
||||
|
||||
@@ -15,10 +15,9 @@ service-specific adapter.
|
||||
"""
|
||||
|
||||
import base64
|
||||
import copy
|
||||
import io
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING, Any, List, Optional, TypeAlias, Union
|
||||
from typing import Any, List, Optional, TypeAlias, Union
|
||||
|
||||
from loguru import logger
|
||||
from openai._types import NOT_GIVEN as OPEN_AI_NOT_GIVEN
|
||||
@@ -32,9 +31,6 @@ from PIL import Image
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.frames.frames import AudioRawFrame
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
|
||||
# "Re-export" types from OpenAI that we're using as universal context types.
|
||||
# NOTE: if universal message types need to someday diverge from OpenAI's, we
|
||||
# should consider managing our own definitions. But we should do so carefully,
|
||||
@@ -69,26 +65,6 @@ class LLMContext:
|
||||
and content formatting.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def from_openai_context(openai_context: "OpenAILLMContext") -> "LLMContext":
|
||||
"""Create a universal LLM context from an OpenAI-specific context.
|
||||
|
||||
NOTE: this should only be used internally, for facilitating migration
|
||||
from OpenAILLMContext to LLMContext. New user code should use
|
||||
LLMContext directly.
|
||||
|
||||
Args:
|
||||
openai_context: The OpenAI LLM context to convert.
|
||||
|
||||
Returns:
|
||||
New LLMContext instance with converted messages and settings.
|
||||
"""
|
||||
return LLMContext(
|
||||
messages=openai_context.get_messages(),
|
||||
tools=openai_context.tools,
|
||||
tool_choice=openai_context.tool_choice,
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
messages: Optional[List[LLMContextMessage]] = None,
|
||||
|
||||
@@ -8,80 +8,360 @@
|
||||
|
||||
This module provides specialized context aggregators and message handling for AWS Nova Sonic,
|
||||
including conversation history management and role-specific message processing.
|
||||
|
||||
.. deprecated:: 0.0.91
|
||||
AWS Nova Sonic now supports `LLMContext` and `LLMContextAggregatorPair`.
|
||||
Using the new patterns should allow you to not need types from this module.
|
||||
|
||||
BEFORE:
|
||||
```
|
||||
# Setup
|
||||
context = OpenAILLMContext(messages, tools)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
# Context frame type
|
||||
frame: OpenAILLMContextFrame
|
||||
|
||||
# Context type
|
||||
context: AWSNovaSonicLLMContext
|
||||
# or
|
||||
context: OpenAILLMContext
|
||||
|
||||
# Reading messages from context
|
||||
messages = context.messages
|
||||
```
|
||||
|
||||
AFTER:
|
||||
```
|
||||
# Setup
|
||||
context = LLMContext(messages, tools)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
|
||||
# Context frame type
|
||||
frame: LLMContextFrame
|
||||
|
||||
# Context type
|
||||
context: LLMContext
|
||||
|
||||
# Reading messages from context
|
||||
messages = context.get_messages()
|
||||
```
|
||||
"""
|
||||
|
||||
import warnings
|
||||
import copy
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"Types in pipecat.services.aws.nova_sonic.context are deprecated. \n"
|
||||
"AWS Nova Sonic now supports `LLMContext` and `LLMContextAggregatorPair`. \n"
|
||||
"Using the new patterns should allow you to not need types from this module.\n\n"
|
||||
"BEFORE:\n"
|
||||
"```\n"
|
||||
"# Setup\n"
|
||||
"context = OpenAILLMContext(messages, tools)\n"
|
||||
"context_aggregator = llm.create_context_aggregator(context)\n\n"
|
||||
"# Context frame type\n"
|
||||
"frame: OpenAILLMContextFrame\n\n"
|
||||
"# Context type\n"
|
||||
"context: AWSNovaSonicLLMContext\n"
|
||||
"# or\n"
|
||||
"context: OpenAILLMContext\n\n"
|
||||
"# Reading messages from context\n"
|
||||
"messages = context.messages\n"
|
||||
"```\n\n"
|
||||
"AFTER:\n"
|
||||
"```\n"
|
||||
"# Setup\n"
|
||||
"context = LLMContext(messages, tools)\n"
|
||||
"context_aggregator = LLMContextAggregatorPair(context)\n\n"
|
||||
"# Context frame type\n"
|
||||
"frame: LLMContextFrame\n\n"
|
||||
"# Context type\n"
|
||||
"context: LLMContext\n\n"
|
||||
"# Reading messages from context\n"
|
||||
"messages = context.messages\n"
|
||||
"```",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
BotStoppedSpeakingFrame,
|
||||
DataFrame,
|
||||
Frame,
|
||||
FunctionCallResultFrame,
|
||||
InterruptionFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMMessagesAppendFrame,
|
||||
LLMMessagesUpdateFrame,
|
||||
LLMSetToolChoiceFrame,
|
||||
LLMSetToolsFrame,
|
||||
TextFrame,
|
||||
UserImageRawFrame,
|
||||
)
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.aws.nova_sonic.frames import AWSNovaSonicFunctionCallResultFrame
|
||||
from pipecat.services.openai.llm import (
|
||||
OpenAIAssistantContextAggregator,
|
||||
OpenAIUserContextAggregator,
|
||||
)
|
||||
|
||||
|
||||
class Role(Enum):
|
||||
"""Roles supported in AWS Nova Sonic conversations.
|
||||
|
||||
Parameters:
|
||||
SYSTEM: System-level messages (not used in conversation history).
|
||||
USER: Messages sent by the user.
|
||||
ASSISTANT: Messages sent by the assistant.
|
||||
TOOL: Messages sent by tools (not used in conversation history).
|
||||
"""
|
||||
|
||||
SYSTEM = "SYSTEM"
|
||||
USER = "USER"
|
||||
ASSISTANT = "ASSISTANT"
|
||||
TOOL = "TOOL"
|
||||
|
||||
|
||||
@dataclass
|
||||
class AWSNovaSonicConversationHistoryMessage:
|
||||
"""A single message in AWS Nova Sonic conversation history.
|
||||
|
||||
Parameters:
|
||||
role: The role of the message sender (USER or ASSISTANT only).
|
||||
text: The text content of the message.
|
||||
"""
|
||||
|
||||
role: Role # only USER and ASSISTANT
|
||||
text: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class AWSNovaSonicConversationHistory:
|
||||
"""Complete conversation history for AWS Nova Sonic initialization.
|
||||
|
||||
Parameters:
|
||||
system_instruction: System-level instruction for the conversation.
|
||||
messages: List of conversation messages between user and assistant.
|
||||
"""
|
||||
|
||||
system_instruction: str = None
|
||||
messages: list[AWSNovaSonicConversationHistoryMessage] = field(default_factory=list)
|
||||
|
||||
|
||||
class AWSNovaSonicLLMContext(OpenAILLMContext):
|
||||
"""Specialized LLM context for AWS Nova Sonic service.
|
||||
|
||||
Extends OpenAI context with Nova Sonic-specific message handling,
|
||||
conversation history management, and text buffering capabilities.
|
||||
"""
|
||||
|
||||
def __init__(self, messages=None, tools=None, **kwargs):
|
||||
"""Initialize AWS Nova Sonic LLM context.
|
||||
|
||||
Args:
|
||||
messages: Initial messages for the context.
|
||||
tools: Available tools for the context.
|
||||
**kwargs: Additional arguments passed to parent class.
|
||||
"""
|
||||
super().__init__(messages=messages, tools=tools, **kwargs)
|
||||
self.__setup_local()
|
||||
|
||||
def __setup_local(self, system_instruction: str = ""):
|
||||
self._assistant_text = ""
|
||||
self._user_text = ""
|
||||
self._system_instruction = system_instruction
|
||||
|
||||
@staticmethod
|
||||
def upgrade_to_nova_sonic(
|
||||
obj: OpenAILLMContext, system_instruction: str
|
||||
) -> "AWSNovaSonicLLMContext":
|
||||
"""Upgrade an OpenAI context to AWS Nova Sonic context.
|
||||
|
||||
Args:
|
||||
obj: The OpenAI context to upgrade.
|
||||
system_instruction: System instruction for the context.
|
||||
|
||||
Returns:
|
||||
The upgraded AWS Nova Sonic context.
|
||||
"""
|
||||
if isinstance(obj, OpenAILLMContext) and not isinstance(obj, AWSNovaSonicLLMContext):
|
||||
obj.__class__ = AWSNovaSonicLLMContext
|
||||
obj.__setup_local(system_instruction)
|
||||
return obj
|
||||
|
||||
# NOTE: this method has the side-effect of updating _system_instruction from messages
|
||||
def get_messages_for_initializing_history(self) -> AWSNovaSonicConversationHistory:
|
||||
"""Get conversation history for initializing AWS Nova Sonic session.
|
||||
|
||||
Processes stored messages and extracts system instruction and conversation
|
||||
history in the format expected by AWS Nova Sonic.
|
||||
|
||||
Returns:
|
||||
Formatted conversation history with system instruction and messages.
|
||||
"""
|
||||
history = AWSNovaSonicConversationHistory(system_instruction=self._system_instruction)
|
||||
|
||||
# Bail if there are no messages
|
||||
if not self.messages:
|
||||
return history
|
||||
|
||||
messages = copy.deepcopy(self.messages)
|
||||
|
||||
# If we have a "system" message as our first message, let's pull that out into "instruction"
|
||||
if messages[0].get("role") == "system":
|
||||
system = messages.pop(0)
|
||||
content = system.get("content")
|
||||
if isinstance(content, str):
|
||||
history.system_instruction = content
|
||||
elif isinstance(content, list):
|
||||
history.system_instruction = content[0].get("text")
|
||||
if history.system_instruction:
|
||||
self._system_instruction = history.system_instruction
|
||||
|
||||
# Process remaining messages to fill out conversation history.
|
||||
# Nova Sonic supports "user" and "assistant" messages in history.
|
||||
for message in messages:
|
||||
history_message = self.from_standard_message(message)
|
||||
if history_message:
|
||||
history.messages.append(history_message)
|
||||
|
||||
return history
|
||||
|
||||
def get_messages_for_persistent_storage(self):
|
||||
"""Get messages formatted for persistent storage.
|
||||
|
||||
Returns:
|
||||
List of messages including system instruction if present.
|
||||
"""
|
||||
messages = super().get_messages_for_persistent_storage()
|
||||
# If we have a system instruction and messages doesn't already contain it, add it
|
||||
if self._system_instruction and not (messages and messages[0].get("role") == "system"):
|
||||
messages.insert(0, {"role": "system", "content": self._system_instruction})
|
||||
return messages
|
||||
|
||||
def from_standard_message(self, message) -> AWSNovaSonicConversationHistoryMessage:
|
||||
"""Convert standard message format to Nova Sonic format.
|
||||
|
||||
Args:
|
||||
message: Standard message dictionary to convert.
|
||||
|
||||
Returns:
|
||||
Nova Sonic conversation history message, or None if not convertible.
|
||||
"""
|
||||
role = message.get("role")
|
||||
if message.get("role") == "user" or message.get("role") == "assistant":
|
||||
content = message.get("content")
|
||||
if isinstance(message.get("content"), list):
|
||||
content = ""
|
||||
for c in message.get("content"):
|
||||
if c.get("type") == "text":
|
||||
content += " " + c.get("text")
|
||||
else:
|
||||
logger.error(
|
||||
f"Unhandled content type in context message: {c.get('type')} - {message}"
|
||||
)
|
||||
# There won't be content if this is an assistant tool call entry.
|
||||
# We're ignoring those since they can't be loaded into AWS Nova Sonic conversation
|
||||
# history
|
||||
if content:
|
||||
return AWSNovaSonicConversationHistoryMessage(role=Role[role.upper()], text=content)
|
||||
# NOTE: we're ignoring messages with role "tool" since they can't be loaded into AWS Nova
|
||||
# Sonic conversation history
|
||||
|
||||
def buffer_user_text(self, text):
|
||||
"""Buffer user text for later flushing to context.
|
||||
|
||||
Args:
|
||||
text: User text to buffer.
|
||||
"""
|
||||
self._user_text += f" {text}" if self._user_text else text
|
||||
# logger.debug(f"User text buffered: {self._user_text}")
|
||||
|
||||
def flush_aggregated_user_text(self) -> str:
|
||||
"""Flush buffered user text to context as a complete message.
|
||||
|
||||
Returns:
|
||||
The flushed user text, or empty string if no text was buffered.
|
||||
"""
|
||||
if not self._user_text:
|
||||
return ""
|
||||
user_text = self._user_text
|
||||
message = {
|
||||
"role": "user",
|
||||
"content": [{"type": "text", "text": user_text}],
|
||||
}
|
||||
self._user_text = ""
|
||||
self.add_message(message)
|
||||
# logger.debug(f"Context updated (user): {self.get_messages_for_logging()}")
|
||||
return user_text
|
||||
|
||||
def buffer_assistant_text(self, text):
|
||||
"""Buffer assistant text for later flushing to context.
|
||||
|
||||
Args:
|
||||
text: Assistant text to buffer.
|
||||
"""
|
||||
self._assistant_text += text
|
||||
# logger.debug(f"Assistant text buffered: {self._assistant_text}")
|
||||
|
||||
def flush_aggregated_assistant_text(self):
|
||||
"""Flush buffered assistant text to context as a complete message."""
|
||||
if not self._assistant_text:
|
||||
return
|
||||
message = {
|
||||
"role": "assistant",
|
||||
"content": [{"type": "text", "text": self._assistant_text}],
|
||||
}
|
||||
self._assistant_text = ""
|
||||
self.add_message(message)
|
||||
# logger.debug(f"Context updated (assistant): {self.get_messages_for_logging()}")
|
||||
|
||||
|
||||
@dataclass
|
||||
class AWSNovaSonicMessagesUpdateFrame(DataFrame):
|
||||
"""Frame containing updated AWS Nova Sonic context.
|
||||
|
||||
Parameters:
|
||||
context: The updated AWS Nova Sonic LLM context.
|
||||
"""
|
||||
|
||||
context: AWSNovaSonicLLMContext
|
||||
|
||||
|
||||
class AWSNovaSonicUserContextAggregator(OpenAIUserContextAggregator):
|
||||
"""Context aggregator for user messages in AWS Nova Sonic conversations.
|
||||
|
||||
Extends the OpenAI user context aggregator to emit Nova Sonic-specific
|
||||
context update frames.
|
||||
"""
|
||||
|
||||
async def process_frame(
|
||||
self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM
|
||||
):
|
||||
"""Process frames and emit Nova Sonic-specific context updates.
|
||||
|
||||
Args:
|
||||
frame: The frame to process.
|
||||
direction: The direction the frame is traveling.
|
||||
"""
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
# Parent does not push LLMMessagesUpdateFrame
|
||||
if isinstance(frame, LLMMessagesUpdateFrame):
|
||||
await self.push_frame(AWSNovaSonicMessagesUpdateFrame(context=self._context))
|
||||
|
||||
|
||||
class AWSNovaSonicAssistantContextAggregator(OpenAIAssistantContextAggregator):
|
||||
"""Context aggregator for assistant messages in AWS Nova Sonic conversations.
|
||||
|
||||
Provides specialized handling for assistant responses and function calls
|
||||
in AWS Nova Sonic context, with custom frame processing logic.
|
||||
"""
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
"""Process frames with Nova Sonic-specific logic.
|
||||
|
||||
Args:
|
||||
frame: The frame to process.
|
||||
direction: The direction the frame is traveling.
|
||||
"""
|
||||
# HACK: For now, disable the context aggregator by making it just pass through all frames
|
||||
# that the parent handles (except the function call stuff, which we still need).
|
||||
# For an explanation of this hack, see
|
||||
# AWSNovaSonicLLMService._report_assistant_response_text_added.
|
||||
if isinstance(
|
||||
frame,
|
||||
(
|
||||
InterruptionFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
TextFrame,
|
||||
LLMMessagesAppendFrame,
|
||||
LLMMessagesUpdateFrame,
|
||||
LLMSetToolsFrame,
|
||||
LLMSetToolChoiceFrame,
|
||||
UserImageRawFrame,
|
||||
BotStoppedSpeakingFrame,
|
||||
),
|
||||
):
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
async def handle_function_call_result(self, frame: FunctionCallResultFrame):
|
||||
"""Handle function call results for AWS Nova Sonic.
|
||||
|
||||
Args:
|
||||
frame: The function call result frame to handle.
|
||||
"""
|
||||
await super().handle_function_call_result(frame)
|
||||
|
||||
# The standard function callback code path pushes the FunctionCallResultFrame from the LLM
|
||||
# itself, so we didn't have a chance to add the result to the AWS Nova Sonic server-side
|
||||
# context. Let's push a special frame to do that.
|
||||
await self.push_frame(
|
||||
AWSNovaSonicFunctionCallResultFrame(result_frame=frame), FrameDirection.UPSTREAM
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class AWSNovaSonicContextAggregatorPair:
|
||||
"""Pair of user and assistant context aggregators for AWS Nova Sonic.
|
||||
|
||||
Parameters:
|
||||
_user: The user context aggregator.
|
||||
_assistant: The assistant context aggregator.
|
||||
"""
|
||||
|
||||
_user: AWSNovaSonicUserContextAggregator
|
||||
_assistant: AWSNovaSonicAssistantContextAggregator
|
||||
|
||||
def user(self) -> AWSNovaSonicUserContextAggregator:
|
||||
"""Get the user context aggregator.
|
||||
|
||||
Returns:
|
||||
The user context aggregator instance.
|
||||
"""
|
||||
return self._user
|
||||
|
||||
def assistant(self) -> AWSNovaSonicAssistantContextAggregator:
|
||||
"""Get the assistant context aggregator.
|
||||
|
||||
Returns:
|
||||
The assistant context aggregator instance.
|
||||
"""
|
||||
return self._assistant
|
||||
|
||||
@@ -25,7 +25,7 @@ from loguru import logger
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.adapters.services.aws_nova_sonic_adapter import AWSNovaSonicLLMAdapter, Role
|
||||
from pipecat.adapters.services.aws_nova_sonic_adapter import AWSNovaSonicLLMAdapter
|
||||
from pipecat.frames.frames import (
|
||||
BotStoppedSpeakingFrame,
|
||||
CancelFrame,
|
||||
@@ -33,30 +33,35 @@ from pipecat.frames.frames import (
|
||||
Frame,
|
||||
FunctionCallFromLLM,
|
||||
InputAudioRawFrame,
|
||||
InterruptionFrame,
|
||||
InterimTranscriptionFrame,
|
||||
LLMContextFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMTextFrame,
|
||||
StartFrame,
|
||||
TranscriptionFrame,
|
||||
TTSAudioRawFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
TTSTextFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantAggregatorParams,
|
||||
LLMUserAggregatorParams,
|
||||
)
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContext,
|
||||
OpenAILLMContextFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.aws.nova_sonic.context import (
|
||||
AWSNovaSonicAssistantContextAggregator,
|
||||
AWSNovaSonicContextAggregatorPair,
|
||||
AWSNovaSonicLLMContext,
|
||||
AWSNovaSonicUserContextAggregator,
|
||||
Role,
|
||||
)
|
||||
from pipecat.services.aws.nova_sonic.frames import AWSNovaSonicFunctionCallResultFrame
|
||||
from pipecat.services.llm_service import LLMService
|
||||
from pipecat.utils.time import time_now_iso8601
|
||||
|
||||
@@ -212,11 +217,6 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
system_instruction: System-level instruction for the model.
|
||||
tools: Available tools/functions for the model to use.
|
||||
send_transcription_frames: Whether to emit transcription frames.
|
||||
|
||||
.. deprecated:: 0.0.91
|
||||
This parameter is deprecated and will be removed in a future version.
|
||||
Transcription frames are always sent.
|
||||
|
||||
**kwargs: Additional arguments passed to the parent LLMService.
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
@@ -230,20 +230,8 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
self._params = params or Params()
|
||||
self._system_instruction = system_instruction
|
||||
self._tools = tools
|
||||
|
||||
if not send_transcription_frames:
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"`send_transcription_frames` is deprecated and will be removed in a future version. "
|
||||
"Transcription frames are always sent.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
self._context: Optional[LLMContext] = None
|
||||
self._send_transcription_frames = send_transcription_frames
|
||||
self._context: Optional[AWSNovaSonicLLMContext] = None
|
||||
self._stream: Optional[
|
||||
DuplexEventStream[
|
||||
InvokeModelWithBidirectionalStreamInput,
|
||||
@@ -256,17 +244,12 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
self._input_audio_content_name: Optional[str] = None
|
||||
self._content_being_received: Optional[CurrentContent] = None
|
||||
self._assistant_is_responding = False
|
||||
self._may_need_repush_assistant_text = False
|
||||
self._ready_to_send_context = False
|
||||
self._handling_bot_stopped_speaking = False
|
||||
self._triggering_assistant_response = False
|
||||
self._waiting_for_trigger_transcription = False
|
||||
self._disconnecting = False
|
||||
self._connected_time: Optional[float] = None
|
||||
self._wants_connection = False
|
||||
self._user_text_buffer = ""
|
||||
self._assistant_text_buffer = ""
|
||||
self._completed_tool_calls = set()
|
||||
|
||||
file_path = files("pipecat.services.aws.nova_sonic").joinpath("ready.wav")
|
||||
with wave.open(file_path.open("rb"), "rb") as wav_file:
|
||||
@@ -319,12 +302,12 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
logger.debug("Resetting conversation")
|
||||
await self._handle_bot_stopped_speaking(delay_to_catch_trailing_assistant_text=False)
|
||||
|
||||
# Grab context to carry through disconnect/reconnect
|
||||
# Carry over previous context through disconnect
|
||||
context = self._context
|
||||
|
||||
await self._disconnect()
|
||||
self._context = context
|
||||
|
||||
await self._start_connecting()
|
||||
await self._handle_context(context)
|
||||
|
||||
#
|
||||
# frame processing
|
||||
@@ -339,35 +322,28 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
"""
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, (LLMContextFrame, OpenAILLMContextFrame)):
|
||||
context = (
|
||||
frame.context
|
||||
if isinstance(frame, LLMContextFrame)
|
||||
else LLMContext.from_openai_context(frame.context)
|
||||
if isinstance(frame, OpenAILLMContextFrame):
|
||||
await self._handle_context(frame.context)
|
||||
elif isinstance(frame, LLMContextFrame):
|
||||
raise NotImplementedError(
|
||||
"Universal LLMContext is not yet supported for AWS Nova Sonic."
|
||||
)
|
||||
await self._handle_context(context)
|
||||
elif isinstance(frame, InputAudioRawFrame):
|
||||
await self._handle_input_audio_frame(frame)
|
||||
elif isinstance(frame, BotStoppedSpeakingFrame):
|
||||
await self._handle_bot_stopped_speaking(delay_to_catch_trailing_assistant_text=True)
|
||||
elif isinstance(frame, InterruptionFrame):
|
||||
await self._handle_interruption_frame()
|
||||
elif isinstance(frame, AWSNovaSonicFunctionCallResultFrame):
|
||||
await self._handle_function_call_result(frame)
|
||||
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
async def _handle_context(self, context: LLMContext):
|
||||
if self._disconnecting:
|
||||
return
|
||||
|
||||
async def _handle_context(self, context: OpenAILLMContext):
|
||||
if not self._context:
|
||||
# We got our initial context
|
||||
# Try to finish connecting
|
||||
self._context = context
|
||||
# We got our initial context - try to finish connecting
|
||||
self._context = AWSNovaSonicLLMContext.upgrade_to_nova_sonic(
|
||||
context, self._system_instruction
|
||||
)
|
||||
await self._finish_connecting_if_context_available()
|
||||
else:
|
||||
# We got an updated context
|
||||
# Send results for any newly-completed function calls
|
||||
await self._process_completed_function_calls(send_new_results=True)
|
||||
|
||||
async def _handle_input_audio_frame(self, frame: InputAudioRawFrame):
|
||||
# Wait until we're done sending the assistant response trigger audio before sending audio
|
||||
@@ -417,9 +393,9 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
else:
|
||||
await finalize_assistant_response()
|
||||
|
||||
async def _handle_interruption_frame(self):
|
||||
if self._assistant_is_responding:
|
||||
self._may_need_repush_assistant_text = True
|
||||
async def _handle_function_call_result(self, frame: AWSNovaSonicFunctionCallResultFrame):
|
||||
result = frame.result_frame
|
||||
await self._send_tool_result(tool_call_id=result.tool_call_id, result=result.result)
|
||||
|
||||
#
|
||||
# LLM communication: lifecycle
|
||||
@@ -455,17 +431,6 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
logger.error(f"{self} initialization error: {e}")
|
||||
await self._disconnect()
|
||||
|
||||
async def _process_completed_function_calls(self, send_new_results: bool):
|
||||
# Check for set of completed function calls in the context
|
||||
for message in self._context.get_messages():
|
||||
if message.get("role") and message.get("content") != "IN_PROGRESS":
|
||||
tool_call_id = message.get("tool_call_id")
|
||||
if tool_call_id and tool_call_id not in self._completed_tool_calls:
|
||||
# Found a newly-completed function call - send the result to the service
|
||||
if send_new_results:
|
||||
await self._send_tool_result(tool_call_id, message.get("content"))
|
||||
self._completed_tool_calls.add(tool_call_id)
|
||||
|
||||
async def _finish_connecting_if_context_available(self):
|
||||
# We can only finish connecting once we've gotten our initial context and we're ready to
|
||||
# send it
|
||||
@@ -474,38 +439,30 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
|
||||
logger.info("Finishing connecting (setting up session)...")
|
||||
|
||||
# Initialize our bookkeeping of already-completed tool calls in the
|
||||
# context
|
||||
await self._process_completed_function_calls(send_new_results=False)
|
||||
|
||||
# Read context
|
||||
adapter: AWSNovaSonicLLMAdapter = self.get_llm_adapter()
|
||||
llm_connection_params = adapter.get_llm_invocation_params(self._context)
|
||||
history = self._context.get_messages_for_initializing_history()
|
||||
|
||||
# Send prompt start event, specifying tools.
|
||||
# Tools from context take priority over self._tools.
|
||||
tools = (
|
||||
llm_connection_params["tools"]
|
||||
if llm_connection_params["tools"]
|
||||
else adapter.from_standard_tools(self._tools)
|
||||
self._context.tools
|
||||
if self._context.tools
|
||||
else self.get_llm_adapter().from_standard_tools(self._tools)
|
||||
)
|
||||
logger.debug(f"Using tools: {tools}")
|
||||
await self._send_prompt_start_event(tools)
|
||||
|
||||
# Send system instruction.
|
||||
# Instruction from context takes priority over self._system_instruction.
|
||||
system_instruction = (
|
||||
llm_connection_params["system_instruction"]
|
||||
if llm_connection_params["system_instruction"]
|
||||
else self._system_instruction
|
||||
)
|
||||
logger.debug(f"Using system instruction: {system_instruction}")
|
||||
if system_instruction:
|
||||
await self._send_text_event(text=system_instruction, role=Role.SYSTEM)
|
||||
# (NOTE: this prioritizing occurred automatically behind the scenes: the context was
|
||||
# initialized with self._system_instruction and then updated itself from its messages when
|
||||
# get_messages_for_initializing_history() was called).
|
||||
logger.debug(f"Using system instruction: {history.system_instruction}")
|
||||
if history.system_instruction:
|
||||
await self._send_text_event(text=history.system_instruction, role=Role.SYSTEM)
|
||||
|
||||
# Send conversation history
|
||||
for message in llm_connection_params["messages"]:
|
||||
# logger.debug(f"Seeding conversation history with message: {message}")
|
||||
for message in history.messages:
|
||||
await self._send_text_event(text=message.text, role=message.role)
|
||||
|
||||
# Start audio input
|
||||
@@ -535,12 +492,9 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
await self._send_session_end_events()
|
||||
self._client = None
|
||||
|
||||
# Clean up context
|
||||
self._context = None
|
||||
|
||||
# Clean up stream
|
||||
if self._stream:
|
||||
await self._stream.close()
|
||||
await self._stream.input_stream.close()
|
||||
self._stream = None
|
||||
|
||||
# NOTE: see explanation of HACK, below
|
||||
@@ -556,23 +510,15 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
self._receive_task = None
|
||||
|
||||
# Reset remaining connection-specific state
|
||||
# Should be all private state except:
|
||||
# - _wants_connection
|
||||
# - _assistant_response_trigger_audio
|
||||
self._prompt_name = None
|
||||
self._input_audio_content_name = None
|
||||
self._content_being_received = None
|
||||
self._assistant_is_responding = False
|
||||
self._may_need_repush_assistant_text = False
|
||||
self._ready_to_send_context = False
|
||||
self._handling_bot_stopped_speaking = False
|
||||
self._triggering_assistant_response = False
|
||||
self._waiting_for_trigger_transcription = False
|
||||
self._disconnecting = False
|
||||
self._connected_time = None
|
||||
self._user_text_buffer = ""
|
||||
self._assistant_text_buffer = ""
|
||||
self._completed_tool_calls = set()
|
||||
|
||||
logger.info("Finished disconnecting")
|
||||
except Exception as e:
|
||||
@@ -880,10 +826,6 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
# Handle the LLM completion ending
|
||||
await self._handle_completion_end_event(event_json)
|
||||
except Exception as e:
|
||||
if self._disconnecting:
|
||||
# Errors are kind of expected while disconnecting, so just
|
||||
# ignore them and do nothing
|
||||
return
|
||||
logger.error(f"{self} error processing responses: {e}")
|
||||
if self._wants_connection:
|
||||
await self.reset_conversation()
|
||||
@@ -1014,7 +956,7 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
async def _report_assistant_response_started(self):
|
||||
logger.debug("Assistant response started")
|
||||
|
||||
# Report the start of the assistant response.
|
||||
# Report that the assistant has started their response.
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
|
||||
# Report that equivalent of TTS (this is a speech-to-speech model) started
|
||||
@@ -1026,16 +968,23 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
|
||||
logger.debug(f"Assistant response text added: {text}")
|
||||
|
||||
# Report the text of the assistant response.
|
||||
# Report some text added to the ongoing assistant response
|
||||
await self.push_frame(LLMTextFrame(text))
|
||||
|
||||
# Report some text added to the *equivalent* of TTS (this is a speech-to-speech model)
|
||||
await self.push_frame(TTSTextFrame(text))
|
||||
|
||||
# HACK: here we're also buffering the assistant text ourselves as a
|
||||
# backup rather than relying solely on the assistant context aggregator
|
||||
# to do it, because the text arrives from Nova Sonic only after all the
|
||||
# assistant audio frames have been pushed, meaning that if an
|
||||
# interruption frame were to arrive we would lose all of it (the text
|
||||
# frames sitting in the queue would be wiped).
|
||||
self._assistant_text_buffer += text
|
||||
# TODO: this is a (hopefully temporary) HACK. Here we directly manipulate the context rather
|
||||
# than relying on the frames pushed to the assistant context aggregator. The pattern of
|
||||
# receiving full-sentence text after the assistant has spoken does not easily fit with the
|
||||
# Pipecat expectation of chunks of text streaming in while the assistant is speaking.
|
||||
# Interruption handling was especially challenging. Rather than spend days trying to fit a
|
||||
# square peg in a round hole, I decided on this hack for the time being. We can most cleanly
|
||||
# abandon this hack if/when AWS Nova Sonic implements streaming smaller text chunks
|
||||
# interspersed with audio. Note that when we move away from this hack, we need to make sure
|
||||
# that on an interruption we avoid sending LLMFullResponseEndFrame, which gets the
|
||||
# LLMAssistantContextAggregator into a bad state.
|
||||
self._context.buffer_assistant_text(text)
|
||||
|
||||
async def _report_assistant_response_ended(self):
|
||||
if not self._context: # should never happen
|
||||
@@ -1043,34 +992,14 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
|
||||
logger.debug("Assistant response ended")
|
||||
|
||||
# If an interruption frame arrived while the assistant was responding
|
||||
# we may have lost all of the assistant text (see HACK, above), so
|
||||
# re-push it downstream to the aggregator now.
|
||||
if self._may_need_repush_assistant_text:
|
||||
# Just in case, check that assistant text hasn't already made it
|
||||
# into the context (sometimes it does, despite the interruption).
|
||||
messages = self._context.get_messages()
|
||||
last_message = messages[-1] if messages else None
|
||||
if (
|
||||
not last_message
|
||||
or last_message.get("role") != "assistant"
|
||||
or last_message.get("content") != self._assistant_text_buffer
|
||||
):
|
||||
# We also need to re-push the LLMFullResponseStartFrame since the
|
||||
# TTSTextFrame would be ignored otherwise (the interruption frame
|
||||
# would have cleared the assistant aggregator state).
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
await self.push_frame(TTSTextFrame(self._assistant_text_buffer))
|
||||
self._may_need_repush_assistant_text = False
|
||||
|
||||
# Report the end of the assistant response.
|
||||
# Report that the assistant has finished their response.
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
|
||||
# Report that equivalent of TTS (this is a speech-to-speech model) stopped.
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
|
||||
# Clear out the buffered assistant text
|
||||
self._assistant_text_buffer = ""
|
||||
# For an explanation of this hack, see _report_assistant_response_text_added.
|
||||
self._context.flush_aggregated_assistant_text()
|
||||
|
||||
#
|
||||
# user transcription reporting
|
||||
@@ -1087,67 +1016,33 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
|
||||
logger.debug(f"User transcription text added: {text}")
|
||||
|
||||
# HACK: here we're buffering the user text ourselves rather than
|
||||
# relying on the upstream user context aggregator to do it, because the
|
||||
# text arrives in fairly large chunks spaced fairly far apart in time.
|
||||
# That means the user text would be split between different messages in
|
||||
# context. Even if we sent placeholder InterimTranscriptionFrames in
|
||||
# between each TranscriptionFrame to tell the aggregator to hold off on
|
||||
# finalizing the user message, the aggregator would likely get the last
|
||||
# chunk too late.
|
||||
self._user_text_buffer += f" {text}" if self._user_text_buffer else text
|
||||
# Manually add new user transcription text to context.
|
||||
# We can't rely on the user context aggregator to do this since it's upstream from the LLM.
|
||||
self._context.buffer_user_text(text)
|
||||
|
||||
# Report that some new user transcription text is available.
|
||||
if self._send_transcription_frames:
|
||||
await self.push_frame(
|
||||
InterimTranscriptionFrame(text=text, user_id="", timestamp=time_now_iso8601())
|
||||
)
|
||||
|
||||
async def _report_user_transcription_ended(self):
|
||||
if not self._context: # should never happen
|
||||
return
|
||||
|
||||
# Manually add user transcription to context (if any has been buffered).
|
||||
# We can't rely on the user context aggregator to do this since it's upstream from the LLM.
|
||||
transcription = self._context.flush_aggregated_user_text()
|
||||
|
||||
if not transcription:
|
||||
return
|
||||
|
||||
logger.debug(f"User transcription ended")
|
||||
|
||||
# Report to the upstream user context aggregator that some new user
|
||||
# transcription text is available.
|
||||
|
||||
# HACK: Check if this transcription was triggered by our own
|
||||
# assistant response trigger. If so, we need to wrap it with
|
||||
# UserStarted/StoppedSpeakingFrames; otherwise the user aggregator
|
||||
# would fire an EmulatedUserStartedSpeakingFrame, which would
|
||||
# trigger an interruption, which would prevent us from writing the
|
||||
# assistant response to context.
|
||||
#
|
||||
# Sending an EmulateUserStartedSpeakingFrame ourselves doesn't
|
||||
# work: it just causes the interruption we're trying to avoid.
|
||||
#
|
||||
# Setting enable_emulated_vad_interruptions also doesn't work: at
|
||||
# the time the user aggregator receives the TranscriptionFrame, it
|
||||
# doesn't yet know the assistant has started responding, so it
|
||||
# doesn't know that emulating the user starting to speak would
|
||||
# cause an interruption.
|
||||
should_wrap_in_user_started_stopped_speaking_frames = (
|
||||
self._waiting_for_trigger_transcription
|
||||
and self._user_text_buffer.strip().lower() == "ready"
|
||||
)
|
||||
|
||||
# Start wrapping the upstream transcription in UserStarted/StoppedSpeakingFrames if needed
|
||||
if should_wrap_in_user_started_stopped_speaking_frames:
|
||||
logger.debug(
|
||||
"Wrapping assistant response trigger transcription with upstream UserStarted/StoppedSpeakingFrames"
|
||||
if self._send_transcription_frames:
|
||||
await self.push_frame(
|
||||
TranscriptionFrame(text=transcription, user_id="", timestamp=time_now_iso8601())
|
||||
)
|
||||
await self.push_frame(UserStartedSpeakingFrame(), direction=FrameDirection.UPSTREAM)
|
||||
|
||||
# Send the transcription upstream for the user context aggregator
|
||||
frame = TranscriptionFrame(
|
||||
text=self._user_text_buffer, user_id="", timestamp=time_now_iso8601()
|
||||
)
|
||||
await self.push_frame(frame, direction=FrameDirection.UPSTREAM)
|
||||
|
||||
# Finish wrapping the upstream transcription in UserStarted/StoppedSpeakingFrames if needed
|
||||
if should_wrap_in_user_started_stopped_speaking_frames:
|
||||
await self.push_frame(UserStoppedSpeakingFrame(), direction=FrameDirection.UPSTREAM)
|
||||
|
||||
# Clear out the buffered user text
|
||||
self._user_text_buffer = ""
|
||||
|
||||
# We're no longer waiting for a trigger transcription
|
||||
self._waiting_for_trigger_transcription = False
|
||||
|
||||
#
|
||||
# context
|
||||
@@ -1159,26 +1054,23 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
*,
|
||||
user_params: LLMUserAggregatorParams = LLMUserAggregatorParams(),
|
||||
assistant_params: LLMAssistantAggregatorParams = LLMAssistantAggregatorParams(),
|
||||
) -> LLMContextAggregatorPair:
|
||||
) -> AWSNovaSonicContextAggregatorPair:
|
||||
"""Create context aggregator pair for managing conversation context.
|
||||
|
||||
NOTE: this method exists only for backward compatibility. New code
|
||||
should instead do:
|
||||
context = LLMContext(...)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
|
||||
Args:
|
||||
context: The OpenAI LLM context.
|
||||
context: The OpenAI LLM context to upgrade.
|
||||
user_params: Parameters for the user context aggregator.
|
||||
assistant_params: Parameters for the assistant context aggregator.
|
||||
|
||||
Returns:
|
||||
A pair of user and assistant context aggregators.
|
||||
"""
|
||||
context = LLMContext.from_openai_context(context)
|
||||
return LLMContextAggregatorPair(
|
||||
context, user_params=user_params, assistant_params=assistant_params
|
||||
)
|
||||
context.set_llm_adapter(self.get_llm_adapter())
|
||||
|
||||
user = AWSNovaSonicUserContextAggregator(context=context, params=user_params)
|
||||
assistant = AWSNovaSonicAssistantContextAggregator(context=context, params=assistant_params)
|
||||
|
||||
return AWSNovaSonicContextAggregatorPair(user, assistant)
|
||||
|
||||
#
|
||||
# assistant response trigger (HACK)
|
||||
@@ -1216,8 +1108,6 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
try:
|
||||
logger.debug("Sending assistant response trigger...")
|
||||
|
||||
self._waiting_for_trigger_transcription = True
|
||||
|
||||
chunk_duration = 0.02 # what we might get from InputAudioRawFrame
|
||||
chunk_size = int(
|
||||
chunk_duration
|
||||
|
||||
@@ -8,80 +8,18 @@
|
||||
|
||||
This module provides specialized context aggregators and message handling for AWS Nova Sonic,
|
||||
including conversation history management and role-specific message processing.
|
||||
|
||||
.. deprecated:: 0.0.91
|
||||
AWS Nova Sonic now supports `LLMContext` and `LLMContextAggregatorPair`.
|
||||
Using the new patterns should allow you to not need types from this module.
|
||||
|
||||
BEFORE:
|
||||
```
|
||||
# Setup
|
||||
context = OpenAILLMContext(messages, tools)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
# Context frame type
|
||||
frame: OpenAILLMContextFrame
|
||||
|
||||
# Context type
|
||||
context: AWSNovaSonicLLMContext
|
||||
# or
|
||||
context: OpenAILLMContext
|
||||
|
||||
# Reading messages from context
|
||||
messages = context.messages
|
||||
```
|
||||
|
||||
AFTER:
|
||||
```
|
||||
# Setup
|
||||
context = LLMContext(messages, tools)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
|
||||
# Context frame type
|
||||
frame: LLMContextFrame
|
||||
|
||||
# Context type
|
||||
context: LLMContext
|
||||
|
||||
# Reading messages from context
|
||||
messages = context.get_messages()
|
||||
```
|
||||
"""
|
||||
|
||||
import warnings
|
||||
|
||||
from pipecat.services.aws.nova_sonic.context import *
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"Types in pipecat.services.aws_nova_sonic.context are deprecated. \n"
|
||||
"AWS Nova Sonic now supports `LLMContext` and `LLMContextAggregatorPair`. \n"
|
||||
"Using the new patterns should allow you to not need types from this module.\n\n"
|
||||
"BEFORE:\n"
|
||||
"```\n"
|
||||
"# Setup\n"
|
||||
"context = OpenAILLMContext(messages, tools)\n"
|
||||
"context_aggregator = llm.create_context_aggregator(context)\n\n"
|
||||
"# Context frame type\n"
|
||||
"frame: OpenAILLMContextFrame\n\n"
|
||||
"# Context type\n"
|
||||
"context: AWSNovaSonicLLMContext\n"
|
||||
"# or\n"
|
||||
"context: OpenAILLMContext\n\n"
|
||||
"# Reading messages from context\n"
|
||||
"messages = context.messages\n"
|
||||
"```\n\n"
|
||||
"AFTER:\n"
|
||||
"```\n"
|
||||
"# Setup\n"
|
||||
"context = LLMContext(messages, tools)\n"
|
||||
"context_aggregator = LLMContextAggregatorPair(context)\n\n"
|
||||
"# Context frame type\n"
|
||||
"frame: LLMContextFrame\n\n"
|
||||
"# Context type\n"
|
||||
"context: LLMContext\n\n"
|
||||
"# Reading messages from context\n"
|
||||
"messages = context.messages\n"
|
||||
"```",
|
||||
"Types in pipecat.services.aws_nova_sonic.context are deprecated. "
|
||||
"Please use the equivalent types from "
|
||||
"pipecat.services.aws.nova_sonic.context instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user