Compare commits
62 Commits
hush/backo
...
cb/test-se
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
33d813ed8f | ||
|
|
20f4b0e8ff | ||
|
|
6feaf91789 | ||
|
|
91d3ae07b3 | ||
|
|
71841f71ef | ||
|
|
949b807023 | ||
|
|
4ad15f9a01 | ||
|
|
99d94fc625 | ||
|
|
a3d630c0d1 | ||
|
|
04b482c445 | ||
|
|
b2bce4916f | ||
|
|
60e9817f16 | ||
|
|
c655d0d313 | ||
|
|
ea6e146f2d | ||
|
|
ec890a834f | ||
|
|
5b921fc054 | ||
|
|
f1040100f4 | ||
|
|
54691ee781 | ||
|
|
49239a23c6 | ||
|
|
e0c43de13f | ||
|
|
cc4c96d099 | ||
|
|
788465cb04 | ||
|
|
db934eade0 | ||
|
|
0b8c966a11 | ||
|
|
5849485bc6 | ||
|
|
459af58540 | ||
|
|
576bd67e85 | ||
|
|
1e8629bf96 | ||
|
|
776a3526f9 | ||
|
|
2ced044418 | ||
|
|
151f187837 | ||
|
|
67afa718d0 | ||
|
|
52ab0eccc0 | ||
|
|
d1f1b68b71 | ||
|
|
a479c32665 | ||
|
|
9f66b0ba41 | ||
|
|
23385ca3d2 | ||
|
|
8b24bae9c5 | ||
|
|
0502ec6c44 | ||
|
|
81645910e0 | ||
|
|
d6ab4c41b0 | ||
|
|
2f92cb8781 | ||
|
|
fbf274374c | ||
|
|
427efecf5b | ||
|
|
b3e54546ac | ||
|
|
de46631bac | ||
|
|
abf0150261 | ||
|
|
a0c93ab6de | ||
|
|
4bec566bbf | ||
|
|
ec3cd24182 | ||
|
|
e36e64c2e8 | ||
|
|
02a88022dd | ||
|
|
6cae61f2cc | ||
|
|
3b40079120 | ||
|
|
ff0b38859b | ||
|
|
4d499324d1 | ||
|
|
f13e006db2 | ||
|
|
87d9e8c9cd | ||
|
|
4820f1c059 | ||
|
|
860c39d1b1 | ||
|
|
ae5c5ed7f6 | ||
|
|
d1d74c571c |
98
CHANGELOG.md
98
CHANGELOG.md
@@ -7,14 +7,82 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
### Changed
|
||||
|
||||
- `FunctionFilter` now has a `filter_system_frames` arg, which controls whether
|
||||
or not SystemFrames are filtered.
|
||||
|
||||
- Upgraded `aws_sdk_bedrock_runtime` to v0.1.1 to resolve potential CPU issues
|
||||
when running `AWSNovaSonicLLMService`.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed an issue in the runner where starting a DailyTransport room via
|
||||
`/start` didn't support using the `DAILY_SAMPLE_ROOM_URL` env var.
|
||||
|
||||
- Fixed an issue in `ServiceSwitcher` where the `STTService`s would result in
|
||||
all STT services producing `TranscriptionFrame`s.
|
||||
|
||||
## [0.0.91] - 2025-10-21
|
||||
|
||||
### Added
|
||||
|
||||
- Added support for `bulbul:v3` model in `SarvamTTSService` and `SarvamHttpTTSService`.
|
||||
- It is now possible to start a bot from the `/start` endpoint when using the
|
||||
runner Daily's transport. This follows the Pipecat Cloud format with
|
||||
`createDailyRoom` and `body` fields in the POST request body.
|
||||
|
||||
- Added an ellipsis character (`…`) to the end of sentence detection in the
|
||||
string utils.
|
||||
|
||||
- Expanded support for universal `LLMContext` to `AWSNovaSonicLLMService`.
|
||||
As a reminder, the context-setup pattern when using `LLMContext` is:
|
||||
|
||||
```python
|
||||
context = LLMContext(messages, tools)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
```
|
||||
|
||||
(Note that even though `AWSNovaSonicLLMService` now supports the universal
|
||||
`LLMContext`, it is not meant to be swapped out for another LLM service at
|
||||
runtime.)
|
||||
|
||||
Worth noting: whether or not you use the new context-setup pattern with
|
||||
`AWSNovaSonicLLMService`, some types have changed under the hood:
|
||||
|
||||
```python
|
||||
## BEFORE:
|
||||
|
||||
# Context aggregator type
|
||||
context_aggregator: AWSNovaSonicContextAggregatorPair
|
||||
|
||||
# Context frame type
|
||||
frame: OpenAILLMContextFrame
|
||||
|
||||
# Context type
|
||||
context: AWSNovaSonicLLMContext
|
||||
# or
|
||||
context: OpenAILLMContext
|
||||
|
||||
## AFTER:
|
||||
|
||||
# Context aggregator type
|
||||
context_aggregator: LLMContextAggregatorPair
|
||||
|
||||
# Context frame type
|
||||
frame: LLMContextFrame
|
||||
|
||||
# Context type
|
||||
context: LLMContext
|
||||
```
|
||||
|
||||
- Added support for `bulbul:v3` model in `SarvamTTSService` and
|
||||
`SarvamHttpTTSService`.
|
||||
|
||||
- Added `keyterms_prompt` parameter to `AssemblyAIConnectionParams`.
|
||||
|
||||
- Added `speech_model` parameter to `AssemblyAIConnectionParams` to access the multilingual model.
|
||||
-
|
||||
- Added `speech_model` parameter to `AssemblyAIConnectionParams` to access the
|
||||
multilingual model.
|
||||
|
||||
- Added support for trickle ICE to the `SmallWebRTCTransport`.
|
||||
|
||||
- Added support for updating `OpenAITTSService` settings (`instructions` and
|
||||
@@ -36,19 +104,41 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Changed
|
||||
|
||||
- `RunnerArguments` now include the `body` field, so there's no need to add it
|
||||
to subclasses. Also, all `RunnerArguments` fields are now keyword-only.
|
||||
|
||||
- `CartesiaSTTService` now inherits from `WebsocketSTTService`.
|
||||
|
||||
- Package upgrades:
|
||||
|
||||
- `daily-python` upgraded to 0.20.0.
|
||||
- `openai` upgraded to support up to 2.x.x.
|
||||
- `openpipe` upgraded to support up to 5.x.x.
|
||||
|
||||
- `SpeechmaticsSTTService` updated dependencies for `speechmatics-rt>=0.5.0`.
|
||||
|
||||
### Deprecated
|
||||
|
||||
- The `send_transcription_frames` argument to `AWSNovaSonicLLMService` is
|
||||
deprecated. Transcription frames are now always sent. They go upstream, to be
|
||||
handled by the user context aggregator. See "Added" section for details.
|
||||
|
||||
- Types in `pipecat.services.aws.nova_sonic.context` have been deprecated due
|
||||
to changes to support `LLMContext`. See "Changed" section for details.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed an issue where the `RTVIProcessor` was sending duplicate
|
||||
`UserStartedSpeakingFrame` and `UserStoppedSpeakingFrame` messages.
|
||||
|
||||
- Fixed an issue in `AWSBedrockLLMService` where both `temperature` and `top_p`
|
||||
were always sent together, causing conflicts with models like Claude Sonnet 4.5
|
||||
that don't allow both parameters simultaneously. The service now only includes
|
||||
inference parameters that are explicitly set, and `InputParams` defaults have
|
||||
been changed to `None` to rely on AWS Bedrock's built-in model defaults.
|
||||
|
||||
- Fixed an issue in `RivaSegmentedSTTService` where a runtime error occurred due
|
||||
to a mismatch in the _handle_transcription method's signature.
|
||||
to a mismatch in the `_handle_transcription` method's signature.
|
||||
|
||||
- Fixed multiple pipeline task cancellation issues. `asyncio.CancelledError` is
|
||||
now handled properly in `PipelineTask` making it possible to cancel an asyncio
|
||||
|
||||
@@ -44,6 +44,10 @@ Looking to build structured conversations? Check out [Pipecat Flows](https://git
|
||||
|
||||
Want to build beautiful and engaging experiences? Checkout the [Voice UI Kit](https://github.com/pipecat-ai/voice-ui-kit), a collection of components, hooks and templates for building voice AI applications quickly.
|
||||
|
||||
### 🛠️ Create and deploy projects
|
||||
|
||||
Create a new project in under a minute with the [Pipecat CLI](https://github.com/pipecat-ai/pipecat-cli). Then use the CLI to monitor and deploy your agent to production.
|
||||
|
||||
### 🔍 Debugging
|
||||
|
||||
Looking for help debugging your pipeline and processors? Check out [Whisker](https://github.com/pipecat-ai/whisker), a real-time Pipecat debugger.
|
||||
|
||||
@@ -1,250 +0,0 @@
|
||||
#!/usr/bin/env -S uv run
|
||||
"""Utilities for creating Daily.co rooms with retry logic.
|
||||
|
||||
This module provides functions to create Daily rooms via REST API
|
||||
with robust error handling, rate limiting, and exponential backoff retry logic.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import time
|
||||
from typing import Dict, Optional
|
||||
|
||||
from httpx import AsyncClient, HTTPStatusError
|
||||
from loguru import logger
|
||||
from tenacity import (
|
||||
AsyncRetrying,
|
||||
RetryError,
|
||||
retry_if_exception_type,
|
||||
stop_after_attempt,
|
||||
wait_exponential,
|
||||
)
|
||||
|
||||
|
||||
async def periodic_progress_logger(
|
||||
progress_dict: Dict[str, int],
|
||||
total: int,
|
||||
interval_seconds: float = 5.0,
|
||||
stop_event: Optional[asyncio.Event] = None,
|
||||
):
|
||||
"""Log progress periodically in the background.
|
||||
|
||||
Args:
|
||||
progress_dict: Shared dict with 'completed' and 'failed' counts
|
||||
total: Total number of items being processed
|
||||
interval_seconds: How often to log progress (default 5 seconds)
|
||||
stop_event: Event to signal when to stop logging
|
||||
"""
|
||||
if stop_event is None:
|
||||
stop_event = asyncio.Event()
|
||||
|
||||
while not stop_event.is_set():
|
||||
await asyncio.sleep(interval_seconds)
|
||||
|
||||
if stop_event.is_set():
|
||||
break
|
||||
|
||||
total_processed = progress_dict["completed"] + progress_dict["failed"]
|
||||
if total_processed > 0:
|
||||
percentage = (total_processed / total) * 100
|
||||
rate = total_processed / interval_seconds if interval_seconds > 0 else 0
|
||||
|
||||
logger.info(
|
||||
f"⏳ Progress: {total_processed}/{total} ({percentage:.1f}%) - "
|
||||
f"✅ {progress_dict['completed']} succeeded, "
|
||||
f"❌ {progress_dict['failed']} failed"
|
||||
)
|
||||
|
||||
|
||||
async def create_daily_room(
|
||||
name: Optional[str] = None,
|
||||
privacy: str = "public",
|
||||
exp_minutes: int = 10,
|
||||
max_retries: int = 5,
|
||||
) -> Optional[Dict]:
|
||||
"""Create a Daily room with automatic retry on rate limit errors.
|
||||
|
||||
Uses tenacity library to handle rate limiting (429 errors) with
|
||||
exponential backoff and automatic retries.
|
||||
|
||||
Args:
|
||||
name: Room name (auto-generated if None). Must match /[A-Za-z0-9_-]+/ and be <= 128 chars
|
||||
privacy: Room privacy setting ("public" or "private")
|
||||
exp_minutes: Minutes until room expires (default 10)
|
||||
max_retries: Maximum number of retry attempts on rate limit (default 5)
|
||||
|
||||
Returns:
|
||||
Room object dict with 'name', 'url', 'id', 'config', etc., or None on failure
|
||||
"""
|
||||
# Calculate expiration timestamp (unix timestamp in seconds)
|
||||
exp_timestamp = int(time.time()) + (exp_minutes * 60)
|
||||
|
||||
# Build request body
|
||||
body = {
|
||||
"privacy": privacy,
|
||||
"properties": {
|
||||
"exp": exp_timestamp,
|
||||
},
|
||||
}
|
||||
|
||||
if name:
|
||||
body["name"] = name
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
try:
|
||||
# Use tenacity's AsyncRetrying for automatic retry with exponential backoff
|
||||
async for attempt in AsyncRetrying(
|
||||
retry=retry_if_exception_type(HTTPStatusError),
|
||||
stop=stop_after_attempt(max_retries),
|
||||
wait=wait_exponential(multiplier=1, min=1, max=60),
|
||||
reraise=True,
|
||||
):
|
||||
with attempt:
|
||||
async with AsyncClient(timeout=30) as client:
|
||||
response = await client.post(
|
||||
url="https://api.daily.co/v1/rooms",
|
||||
headers=headers,
|
||||
json=body,
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
# This line should never be reached due to reraise=True, but satisfies type checker
|
||||
return None
|
||||
|
||||
except RetryError as e:
|
||||
# All retries exhausted
|
||||
last_exception = e.last_attempt.exception()
|
||||
logger.exception(f"Failed to create room after {max_retries} retries: {last_exception}")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Unexpected error creating room: {e}")
|
||||
return None
|
||||
|
||||
|
||||
async def create_room_with_progress(
|
||||
index: int, total: int, progress_dict: Dict[str, int], **kwargs
|
||||
) -> Optional[Dict]:
|
||||
"""Wrapper for create_daily_room that tracks progress.
|
||||
|
||||
Args:
|
||||
index: Index of this room creation (0-based)
|
||||
total: Total number of rooms being created
|
||||
progress_dict: Shared dict for tracking progress {"completed": 0, "failed": 0}
|
||||
**kwargs: Arguments passed to create_daily_room
|
||||
|
||||
Returns:
|
||||
Room object dict or None
|
||||
"""
|
||||
result = await create_daily_room(**kwargs)
|
||||
|
||||
# Update progress
|
||||
if result is not None:
|
||||
progress_dict["completed"] += 1
|
||||
else:
|
||||
progress_dict["failed"] += 1
|
||||
|
||||
return result
|
||||
|
||||
|
||||
async def test_create_rooms(
|
||||
num_rooms: int = 1000,
|
||||
progress_interval: float = 5.0,
|
||||
) -> Dict[str, int | float]:
|
||||
"""Attempt to create multiple Daily rooms concurrently.
|
||||
|
||||
This function demonstrates concurrent room creation and tracks
|
||||
success/failure statistics. Rate limiting will likely occur when
|
||||
creating many rooms quickly.
|
||||
|
||||
Args:
|
||||
num_rooms: Number of rooms to attempt to create (default 1000)
|
||||
progress_interval: How often to log progress in seconds (default 5.0)
|
||||
|
||||
Returns:
|
||||
Dict with statistics: {'success': int, 'failed': int, 'total': int, 'elapsed_seconds': float}
|
||||
"""
|
||||
logger.info(f"Starting bulk room creation: attempting to create {num_rooms} rooms")
|
||||
start_time = time.time()
|
||||
|
||||
# Shared progress tracking dictionary
|
||||
progress_dict = {"completed": 0, "failed": 0}
|
||||
|
||||
# Start background progress logger
|
||||
stop_event = asyncio.Event()
|
||||
progress_task = asyncio.create_task(
|
||||
periodic_progress_logger(progress_dict, num_rooms, progress_interval, stop_event)
|
||||
)
|
||||
|
||||
# Create and execute all tasks concurrently
|
||||
logger.info(f"Executing {num_rooms} concurrent room creation requests...")
|
||||
tasks = [
|
||||
create_room_with_progress(
|
||||
index=i,
|
||||
total=num_rooms,
|
||||
progress_dict=progress_dict,
|
||||
name=None, # Auto-generate names
|
||||
privacy="public",
|
||||
exp_minutes=10,
|
||||
max_retries=5,
|
||||
)
|
||||
for i in range(num_rooms)
|
||||
]
|
||||
|
||||
try:
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
finally:
|
||||
# Stop the progress logger
|
||||
stop_event.set()
|
||||
await progress_task
|
||||
|
||||
# Count successes and failures
|
||||
success_count = sum(1 for r in results if r is not None and not isinstance(r, Exception))
|
||||
failed_count = num_rooms - success_count
|
||||
|
||||
elapsed_time = time.time() - start_time
|
||||
|
||||
# Log statistics
|
||||
logger.info("=" * 60)
|
||||
logger.info("BULK ROOM CREATION SUMMARY")
|
||||
logger.info("=" * 60)
|
||||
logger.info(f"Total rooms attempted: {num_rooms}")
|
||||
logger.info(f"Successfully created: {success_count}")
|
||||
logger.info(f"Failed to create: {failed_count}")
|
||||
logger.info(f"Success rate: {(success_count / num_rooms * 100):.2f}%")
|
||||
logger.info(f"Total time: {elapsed_time:.2f} seconds")
|
||||
logger.info(f"Average time per room: {(elapsed_time / num_rooms):.3f} seconds")
|
||||
logger.info("=" * 60)
|
||||
|
||||
return {
|
||||
"success": success_count,
|
||||
"failed": failed_count,
|
||||
"total": num_rooms,
|
||||
"elapsed_seconds": elapsed_time,
|
||||
}
|
||||
|
||||
|
||||
# Example usage
|
||||
async def main():
|
||||
"""Example usage of the room creation functions."""
|
||||
# Test creating a single room
|
||||
logger.info("Testing single room creation...")
|
||||
room = await create_daily_room(exp_minutes=10)
|
||||
if room:
|
||||
logger.info(f"Created room: {room['name']} at {room['url']}")
|
||||
else:
|
||||
logger.error("Failed to create room")
|
||||
|
||||
# Uncomment to test bulk creation (warning: may hit rate limits!)
|
||||
logger.info("\nTesting bulk room creation...")
|
||||
stats = await test_create_rooms(num_rooms=1000)
|
||||
logger.info(f"Final stats: {stats}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
200
env.example
200
env.example
@@ -4,6 +4,9 @@ AICOUSTICS_LICENSE_KEY=...
|
||||
# Anthropic
|
||||
ANTHROPIC_API_KEY=...
|
||||
|
||||
# Assembly AI
|
||||
ASSEMBLYAI_API_KEY=...
|
||||
|
||||
# Async
|
||||
ASYNCAI_API_KEY=...
|
||||
ASYNCAI_VOICE_ID=...
|
||||
@@ -21,12 +24,19 @@ AZURE_CHATGPT_API_KEY=...
|
||||
AZURE_CHATGPT_ENDPOINT=https://...
|
||||
AZURE_CHATGPT_MODEL=...
|
||||
|
||||
AZURE_REALTIME_API_KEY=...
|
||||
AZURE_REALTIME_BASE_URL=...
|
||||
|
||||
AZURE_DALLE_API_KEY=...
|
||||
AZURE_DALLE_ENDPOINT=https://...
|
||||
AZURE_DALLE_MODEL=...
|
||||
|
||||
# Cartesia
|
||||
CARTESIA_API_KEY=...
|
||||
CARTESIA_VOICE_ID=...
|
||||
|
||||
# Cerebras
|
||||
CEREBRAS_API_KEY=...
|
||||
|
||||
# Daily
|
||||
DAILY_API_KEY=...
|
||||
@@ -35,57 +45,48 @@ DAILY_SAMPLE_ROOM_URL=https://...
|
||||
# Deepgram
|
||||
DEEPGRAM_API_KEY=...
|
||||
|
||||
# DeepSeek
|
||||
DEEPSEEK_API_KEY=...
|
||||
|
||||
# ElevenLabs
|
||||
ELEVENLABS_API_KEY=...
|
||||
ELEVENLABS_VOICE_ID=...
|
||||
|
||||
# Neuphonic
|
||||
NEUPHONIC_API_KEY=...
|
||||
|
||||
# Fal
|
||||
FAL_KEY=...
|
||||
|
||||
# Fireworks
|
||||
FIREWORKS_API_KEY=...
|
||||
|
||||
# Fish Audio
|
||||
FISH_API_KEY=...
|
||||
|
||||
# Gladia
|
||||
GLADIA_API_KEY=...
|
||||
GLADIA_REGION=...
|
||||
|
||||
# Google
|
||||
GOOGLE_API_KEY=...
|
||||
GOOGLE_CLOUD_PROJECT_ID=...
|
||||
GOOGLE_TEST_CREDENTIALS=...
|
||||
GOOGLE_VERTEX_TEST_CREDENTIALS=...
|
||||
GOOGLE_CLOUD_PROJECT_ID=...
|
||||
GOOGLE_CLOUD_LOCATION=...
|
||||
GOOGLE_TEST_CREDENTIALS=...
|
||||
|
||||
# Grok
|
||||
GROK_API_KEY=...
|
||||
|
||||
# Groq
|
||||
GROQ_API_KEY=...
|
||||
|
||||
# Heygen
|
||||
HEYGEN_API_KEY=...
|
||||
|
||||
# Hume
|
||||
HUME_API_KEY=...
|
||||
HUME_VOICE_ID=...
|
||||
|
||||
# LMNT
|
||||
LMNT_API_KEY=...
|
||||
LMNT_VOICE_ID=...
|
||||
|
||||
# Perplexity
|
||||
PERPLEXITY_API_KEY=...
|
||||
|
||||
# PlayHT
|
||||
PLAYHT_USER_ID=...
|
||||
PLAYHT_API_KEY=...
|
||||
|
||||
# OpenAI
|
||||
OPENAI_API_KEY=...
|
||||
|
||||
# OpenPipe
|
||||
OPENPIPE_API_KEY=...
|
||||
|
||||
# Tavus
|
||||
TAVUS_API_KEY=...
|
||||
TAVUS_REPLICA_ID=...
|
||||
TAVUS_PERSONA_ID=...
|
||||
|
||||
# Simli
|
||||
SIMLI_API_KEY=...
|
||||
SIMLI_FACE_ID=...
|
||||
# Inworld
|
||||
INWORLD_API_KEY=...
|
||||
|
||||
# Krisp
|
||||
KRISP_MODEL_PATH=...
|
||||
@@ -93,77 +94,100 @@ KRISP_MODEL_PATH=...
|
||||
# Krisp Viva
|
||||
KRISP_VIVA_MODEL_PATH=...
|
||||
|
||||
# DeepSeek
|
||||
DEEPSEEK_API_KEY=...
|
||||
# LiveKit
|
||||
LIVEKIT_API_KEY=...
|
||||
LIVEKIT_API_SECRET=...
|
||||
|
||||
# Groq
|
||||
GROQ_API_KEY=...
|
||||
|
||||
# Grok
|
||||
GROK_API_KEY=...
|
||||
|
||||
# Inworld
|
||||
INWORLD_API_KEY=...
|
||||
|
||||
# Together.ai
|
||||
TOGETHER_API_KEY=...
|
||||
|
||||
# Cerebras
|
||||
CEREBRAS_API_KEY=...
|
||||
|
||||
# Fish Audio
|
||||
FISH_API_KEY=...
|
||||
|
||||
# Assembly AI
|
||||
ASSEMBLYAI_API_KEY=...
|
||||
|
||||
# OpenRouter
|
||||
OPENROUTER_API_KEY=...
|
||||
|
||||
# Piper
|
||||
PIPER_BASE_URL=...
|
||||
|
||||
# Smart turn
|
||||
LOCAL_SMART_TURN_MODEL_PATH=...
|
||||
FAL_SMART_TURN_API_KEY=...
|
||||
|
||||
# Twilio
|
||||
TWILIO_ACCOUNT_SID=...
|
||||
TWILIO_AUTH_TOKEN=...
|
||||
# LMNT
|
||||
LMNT_API_KEY=...
|
||||
LMNT_VOICE_ID=...
|
||||
|
||||
# MiniMax
|
||||
MINIMAX_API_KEY=...
|
||||
MINIMAX_GROUP_ID=...
|
||||
|
||||
# Sarvam AI
|
||||
SARVAM_API_KEY=...
|
||||
|
||||
# Soniox
|
||||
SONIOX_API_KEY=
|
||||
|
||||
# Speechmatics
|
||||
SPEECHMATICS_API_KEY=...
|
||||
|
||||
# SambaNova
|
||||
SAMBANOVA_API_KEY=...
|
||||
|
||||
# Sentry
|
||||
SENTRY_DSN=...
|
||||
|
||||
# Heygen
|
||||
HEYGEN_API_KEY=...
|
||||
|
||||
# Mistral
|
||||
MISTRAL_API_KEY=...
|
||||
|
||||
# Neuphonic
|
||||
NEUPHONIC_API_KEY=...
|
||||
|
||||
# NVIDIA
|
||||
NVIDIA_API_KEY=...
|
||||
|
||||
# OpenAI
|
||||
OPENAI_API_KEY=...
|
||||
|
||||
# OpenPipe
|
||||
OPENPIPE_API_KEY=...
|
||||
|
||||
# OpenRouter
|
||||
OPENROUTER_API_KEY=...
|
||||
|
||||
# Perplexity
|
||||
PERPLEXITY_API_KEY=...
|
||||
|
||||
# Picovoice Koala
|
||||
KOALA_ACCESS_KEY=...
|
||||
|
||||
# Piper
|
||||
PIPER_BASE_URL=...
|
||||
|
||||
# PlayHT
|
||||
PLAYHT_USER_ID=...
|
||||
PLAYHT_API_KEY=...
|
||||
|
||||
# Plivo
|
||||
PLIVO_AUTH_ID=...
|
||||
PLIVO_AUTH_TOKEN=...
|
||||
|
||||
# Qwen
|
||||
QWEN_API_KEY=...
|
||||
|
||||
# Rime
|
||||
RIME_API_KEY=...
|
||||
RIME_VOICE_ID=...
|
||||
|
||||
# SambaNova
|
||||
SAMBANOVA_API_KEY=...
|
||||
|
||||
# Sarvam AI
|
||||
SARVAM_API_KEY=...
|
||||
|
||||
# Sentry
|
||||
SENTRY_DSN=...
|
||||
|
||||
# Simli
|
||||
SIMLI_API_KEY=...
|
||||
SIMLI_FACE_ID=...
|
||||
|
||||
# Smart turn
|
||||
LOCAL_SMART_TURN_MODEL_PATH=...
|
||||
FAL_SMART_TURN_API_KEY=...
|
||||
|
||||
# Soniox
|
||||
SONIOX_API_KEY=...
|
||||
|
||||
# Speechmatics
|
||||
SPEECHMATICS_API_KEY=...
|
||||
|
||||
# Tavus
|
||||
TAVUS_API_KEY=...
|
||||
TAVUS_REPLICA_ID=...
|
||||
|
||||
# Telnyx
|
||||
TELNYX_API_KEY=...
|
||||
TELNYX_ACCOUNT_SID=...
|
||||
|
||||
# Together.ai
|
||||
TOGETHER_API_KEY=...
|
||||
|
||||
# Twilio
|
||||
TWILIO_ACCOUNT_SID=...
|
||||
TWILIO_AUTH_TOKEN=...
|
||||
|
||||
# WhatsApp
|
||||
WHATSAPP_TOKEN=
|
||||
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN=
|
||||
WHATSAPP_PHONE_NUMBER_ID=
|
||||
WHATSAPP_APP_SECRET=
|
||||
WHATSAPP_TOKEN=...
|
||||
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN=...
|
||||
WHATSAPP_PHONE_NUMBER_ID=...
|
||||
WHATSAPP_APP_SECRET=...
|
||||
@@ -67,8 +67,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
|
||||
llm = AWSBedrockLLMService(
|
||||
aws_region="us-west-2",
|
||||
model="us.anthropic.claude-3-5-haiku-20241022-v1:0",
|
||||
params=AWSBedrockLLMService.InputParams(temperature=0.8, latency="optimized"),
|
||||
model="us.anthropic.claude-haiku-4-5-20251001-v1:0",
|
||||
params=AWSBedrockLLMService.InputParams(temperature=0.8),
|
||||
)
|
||||
|
||||
messages = [
|
||||
|
||||
@@ -1,147 +0,0 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
from typing import Tuple
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
|
||||
from pipecat.frames.frames import AudioFrame, EndFrame, ImageFrame, LLMContextFrame, TextFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.processors.aggregators import SentenceAggregator
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.runner.daily import configure
|
||||
from pipecat.services.azure import AzureLLMService, AzureTTSService
|
||||
from pipecat.services.elevenlabs import ElevenLabsTTSService
|
||||
from pipecat.services.fal import FalImageGenService
|
||||
from pipecat.transports.daily.transport import DailyTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
|
||||
logger = logging.getLogger("pipecat")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, _) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
None,
|
||||
"Respond bot",
|
||||
duration_minutes=10,
|
||||
mic_enabled=True,
|
||||
mic_sample_rate=16000,
|
||||
camera_enabled=True,
|
||||
camera_width=1024,
|
||||
camera_height=1024,
|
||||
)
|
||||
|
||||
llm = AzureLLMService(
|
||||
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
|
||||
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
|
||||
model=os.getenv("AZURE_CHATGPT_MODEL"),
|
||||
)
|
||||
tts1 = AzureTTSService(
|
||||
api_key=os.getenv("AZURE_SPEECH_API_KEY"),
|
||||
region=os.getenv("AZURE_SPEECH_REGION"),
|
||||
)
|
||||
tts2 = ElevenLabsTTSService(
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id="jBpfuIE2acCO8z3wKNLl",
|
||||
)
|
||||
dalle = FalImageGenService(
|
||||
params=FalImageGenService.InputParams(image_size="1024x1024"),
|
||||
aiohttp_session=session,
|
||||
key=os.getenv("FAL_KEY"),
|
||||
)
|
||||
|
||||
bot1_messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a stern librarian. You strongly believe that a hot dog is a sandwich. Start by stating this fact in a few sentences, then be prepared to debate this with the user. You shouldn't ever compromise on the fundamental truth that a hot dog is a sandwich. Your responses should only be a few sentences long.",
|
||||
},
|
||||
]
|
||||
bot2_messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a silly cat, and you strongly believe that a hot dog is not a sandwich. Debate this with the user, only responding with a few sentences. Don't ever accept that a hot dog is a sandwich.",
|
||||
},
|
||||
]
|
||||
|
||||
async def get_text_and_audio(messages) -> Tuple[str, bytearray]:
|
||||
"""This function streams text from the LLM and uses the TTS service to convert
|
||||
that text to speech as it's received.
|
||||
"""
|
||||
source_queue = asyncio.Queue()
|
||||
sink_queue = asyncio.Queue()
|
||||
sentence_aggregator = SentenceAggregator()
|
||||
pipeline = Pipeline([llm, sentence_aggregator, tts1], source_queue, sink_queue)
|
||||
|
||||
await source_queue.put(LLMContextFrame(LLMContext(messages)))
|
||||
await source_queue.put(EndFrame())
|
||||
await pipeline.run_pipeline()
|
||||
|
||||
message = ""
|
||||
all_audio = bytearray()
|
||||
while sink_queue.qsize():
|
||||
frame = sink_queue.get_nowait()
|
||||
if isinstance(frame, TextFrame):
|
||||
message += frame.text
|
||||
elif isinstance(frame, AudioFrame):
|
||||
all_audio.extend(frame.audio)
|
||||
|
||||
return (message, all_audio)
|
||||
|
||||
async def get_bot1_statement():
|
||||
message, audio = await get_text_and_audio(bot1_messages)
|
||||
|
||||
bot1_messages.append({"role": "assistant", "content": message})
|
||||
bot2_messages.append({"role": "user", "content": message})
|
||||
|
||||
return audio
|
||||
|
||||
async def get_bot2_statement():
|
||||
message, audio = await get_text_and_audio(bot2_messages)
|
||||
|
||||
bot2_messages.append({"role": "assistant", "content": message})
|
||||
bot1_messages.append({"role": "user", "content": message})
|
||||
|
||||
return audio
|
||||
|
||||
async def argue():
|
||||
for i in range(100):
|
||||
print(f"In iteration {i}")
|
||||
|
||||
bot1_description = "A woman conservatively dressed as a librarian in a library surrounded by books, cartoon, serious, highly detailed"
|
||||
|
||||
(audio1, image_data1) = await asyncio.gather(
|
||||
get_bot1_statement(), dalle.run_image_gen(bot1_description)
|
||||
)
|
||||
await transport.send_queue.put(
|
||||
[
|
||||
ImageFrame(image_data1[1], image_data1[2]),
|
||||
AudioFrame(audio1),
|
||||
]
|
||||
)
|
||||
|
||||
bot2_description = "A cat dressed in a hot dog costume, cartoon, bright colors, funny, highly detailed"
|
||||
|
||||
(audio2, image_data2) = await asyncio.gather(
|
||||
get_bot2_statement(), dalle.run_image_gen(bot2_description)
|
||||
)
|
||||
await transport.send_queue.put(
|
||||
[
|
||||
ImageFrame(image_data2[1], image_data2[2]),
|
||||
AudioFrame(audio2),
|
||||
]
|
||||
)
|
||||
|
||||
await asyncio.gather(transport.run(), argue())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
170
examples/foundational/08-custom-frame-processor.py
Normal file
170
examples/foundational/08-custom-frame-processor.py
Normal file
@@ -0,0 +1,170 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import io
|
||||
import os
|
||||
import re
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
|
||||
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
LLMRunFrame,
|
||||
MetricsFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
def format_metrics(metrics, indent=0):
|
||||
lines = []
|
||||
tab = "\t" * indent
|
||||
|
||||
for metric in metrics:
|
||||
lines.append(tab + type(metric).__name__)
|
||||
for field, value in vars(metric).items():
|
||||
if hasattr(value, "__dict__") and not isinstance(
|
||||
value, (str, int, float, bool, type(None))
|
||||
):
|
||||
lines.append(f"{tab}\t{field}={type(value).__name__}")
|
||||
for k, v in vars(value).items():
|
||||
lines.append(f"{tab}\t\t{k}={repr(v)}")
|
||||
else:
|
||||
lines.append(f"{tab}\t{field}={repr(value)}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
class MetricsFrameLogger(FrameProcessor):
|
||||
"""MetricsFrameLogger formats and logs all MetericsFrames"""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, MetricsFrame):
|
||||
logger.info(f"{frame.name}\n {format_metrics(frame.data)}")
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
# ALWAYS push all frames
|
||||
else:
|
||||
# SUPER IMPORTANT: always push every frame!
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
# instantiated. The function will be called when the desired transport gets
|
||||
# selected.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
video_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
context = LLMContext(messages)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
|
||||
metrics_frame_processor = MetricsFrameLogger()
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
stt,
|
||||
context_aggregator.user(),
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
context_aggregator.assistant(),
|
||||
metrics_frame_processor, # pretty print metrics frames
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected: {client}")
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
@@ -79,8 +79,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
|
||||
llm = AWSBedrockLLMService(
|
||||
aws_region="us-west-2",
|
||||
model="us.anthropic.claude-3-5-haiku-20241022-v1:0",
|
||||
params=AWSBedrockLLMService.InputParams(temperature=0.8, latency="optimized"),
|
||||
model="us.anthropic.claude-haiku-4-5-20251001-v1:0",
|
||||
params=AWSBedrockLLMService.InputParams(temperature=0.8),
|
||||
)
|
||||
|
||||
# You can also register a function_name of None to get all functions
|
||||
|
||||
@@ -72,7 +72,6 @@ async def save_conversation(params: FunctionCallParams):
|
||||
)
|
||||
try:
|
||||
with open(filename, "w") as file:
|
||||
# todo: extract 'system' into the first message in the list
|
||||
messages = params.context.get_messages()
|
||||
# remove the last message, which is the instruction we just gave to save the conversation
|
||||
messages.pop()
|
||||
|
||||
@@ -90,7 +90,6 @@ async def save_conversation(params: FunctionCallParams):
|
||||
)
|
||||
try:
|
||||
with open(filename, "w") as file:
|
||||
# todo: extract 'system' into the first message in the list
|
||||
messages = params.context.get_messages()
|
||||
# remove the last message (the instruction to save the context)
|
||||
messages.pop()
|
||||
|
||||
@@ -20,6 +20,8 @@ from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
@@ -75,7 +77,7 @@ async def save_conversation(params: FunctionCallParams):
|
||||
filename = f"{BASE_FILENAME}{timestamp}.json"
|
||||
try:
|
||||
with open(filename, "w") as file:
|
||||
messages = params.context.get_messages_for_persistent_storage()
|
||||
messages = params.context.get_messages()
|
||||
# remove the last few messages. in reverse order, they are:
|
||||
# - the in progress save tool call
|
||||
# - the invocation of the save tool call
|
||||
@@ -223,13 +225,13 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
llm.register_function("get_saved_conversation_filenames", get_saved_conversation_filenames)
|
||||
llm.register_function("load_conversation", load_conversation)
|
||||
|
||||
context = OpenAILLMContext(
|
||||
context = LLMContext(
|
||||
messages=[
|
||||
{"role": "system", "content": f"{system_instruction}"},
|
||||
],
|
||||
tools=tools,
|
||||
)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
|
||||
@@ -18,7 +18,8 @@ from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.aws.nova_sonic.llm import AWSNovaSonicLLMService
|
||||
@@ -119,9 +120,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
llm.register_function("get_current_weather", fetch_weather_from_api)
|
||||
|
||||
# Set up context and context management.
|
||||
# AWSNovaSonicService will adapt OpenAI LLM context objects with standard message format to
|
||||
# what's expected by Nova Sonic.
|
||||
context = OpenAILLMContext(
|
||||
context = LLMContext(
|
||||
messages=[
|
||||
{"role": "system", "content": f"{system_instruction}"},
|
||||
{
|
||||
@@ -131,7 +130,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
],
|
||||
tools=tools,
|
||||
)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
|
||||
# Build the pipeline
|
||||
pipeline = Pipeline(
|
||||
|
||||
153
examples/foundational/48-service-switcher.py
Normal file
153
examples/foundational/48-service-switcher.py
Normal file
@@ -0,0 +1,153 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
|
||||
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.frames.frames import LLMRunFrame, ManuallySwitchServiceFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.service_switcher import ServiceSwitcher, ServiceSwitcherStrategyManual
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.cartesia.stt import CartesiaSTTService
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.deepgram.tts import DeepgramTTSService
|
||||
from pipecat.services.google.llm import GoogleLLMService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
# instantiated. The function will be called when the desired transport gets
|
||||
# selected.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt_cartesia = CartesiaSTTService(api_key=os.getenv("CARTESIA_API_KEY"))
|
||||
stt_deepgram = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
stt_switcher = ServiceSwitcher(
|
||||
services=[stt_cartesia, stt_deepgram], strategy_type=ServiceSwitcherStrategyManual
|
||||
)
|
||||
|
||||
tts_cartesia = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121",
|
||||
)
|
||||
tts_deepgram = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
tts_switcher = ServiceSwitcher(
|
||||
services=[tts_cartesia, tts_deepgram], strategy_type=ServiceSwitcherStrategyManual
|
||||
)
|
||||
|
||||
llm_openai = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
llm_google = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"))
|
||||
llm_switcher = ServiceSwitcher(
|
||||
services=[llm_openai, llm_google], strategy_type=ServiceSwitcherStrategyManual
|
||||
)
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
context = LLMContext(messages)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt_switcher,
|
||||
context_aggregator.user(), # User responses
|
||||
llm_switcher, # LLM
|
||||
tts_switcher, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
await asyncio.sleep(15)
|
||||
print(f"Switching to {stt_deepgram}")
|
||||
await task.queue_frames([ManuallySwitchServiceFrame(service=stt_deepgram)])
|
||||
await asyncio.sleep(15)
|
||||
print(f"Switching to {llm_google}")
|
||||
await task.queue_frames([ManuallySwitchServiceFrame(service=llm_google)])
|
||||
await asyncio.sleep(15)
|
||||
print(f"Switching to {tts_deepgram}")
|
||||
await task.queue_frames([ManuallySwitchServiceFrame(service=tts_deepgram)])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
@@ -73,13 +73,13 @@ Transform your local bot into a production-ready service. Pipecat Cloud handles
|
||||
|
||||
1. [Sign up for Pipecat Cloud](https://pipecat.daily.co/sign-up).
|
||||
|
||||
2. Install the Pipecat Cloud CLI:
|
||||
2. Install the Pipecat CLI:
|
||||
|
||||
```bash
|
||||
uv add pipecatcloud
|
||||
uv tool install pipecat-ai-cli
|
||||
```
|
||||
|
||||
> 💡 Tip: You can run the `pipecatcloud` CLI using the `pcc` alias.
|
||||
> 💡 Tip: You can run the `pipecat` CLI using the `pc` alias.
|
||||
|
||||
3. Set up Docker for building your bot image:
|
||||
|
||||
@@ -113,12 +113,22 @@ secret_set = "quickstart-secrets"
|
||||
|
||||
> 💡 Tip: [Set up `image_credentials`](https://docs.pipecat.ai/deployment/pipecat-cloud/fundamentals/secrets#image-pull-secrets) in your TOML file for authenticated image pulls
|
||||
|
||||
### Log in to Pipecat Cloud
|
||||
|
||||
To start using the CLI, authenticate to Pipecat Cloud:
|
||||
|
||||
```bash
|
||||
pipecat cloud auth login
|
||||
```
|
||||
|
||||
You'll be presented with a link that you can click to authenticate your client.
|
||||
|
||||
### Configure secrets
|
||||
|
||||
Upload your API keys to Pipecat Cloud's secure storage:
|
||||
|
||||
```bash
|
||||
uv run pcc secrets set quickstart-secrets --file .env
|
||||
pipecat cloud secrets set quickstart-secrets --file .env
|
||||
```
|
||||
|
||||
This creates a secret set called `quickstart-secrets` (matching your TOML file) and uploads all your API keys from `.env`.
|
||||
@@ -128,13 +138,13 @@ This creates a secret set called `quickstart-secrets` (matching your TOML file)
|
||||
Build your Docker image and push to Docker Hub:
|
||||
|
||||
```bash
|
||||
uv run pcc docker build-push
|
||||
pipecat cloud docker build-push
|
||||
```
|
||||
|
||||
Deploy to Pipecat Cloud:
|
||||
|
||||
```bash
|
||||
uv run pcc deploy
|
||||
pipecat cloud deploy
|
||||
```
|
||||
|
||||
### Connect to your agent
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
agent_name = "quickstart"
|
||||
image = "your_username/quickstart:0.1"
|
||||
secret_set = "quickstart-secrets"
|
||||
agent_profile = "agent-1x"
|
||||
|
||||
# RECOMMENDED: Set an image pull secret:
|
||||
# https://docs.pipecat.ai/deployment/pipecat-cloud/fundamentals/secrets#image-pull-secrets
|
||||
# image_credentials = "your_image_pull_secret"
|
||||
|
||||
[scaling]
|
||||
min_agents = 1
|
||||
|
||||
@@ -4,13 +4,14 @@ version = "0.1.0"
|
||||
description = "Quickstart example for building voice AI bots with Pipecat"
|
||||
requires-python = ">=3.10"
|
||||
dependencies = [
|
||||
"pipecat-ai[webrtc,daily,silero,deepgram,openai,cartesia,local-smart-turn-v3,runner]>=0.0.86",
|
||||
"pipecatcloud>=0.2.4"
|
||||
"pipecat-ai[webrtc,daily,silero,deepgram,openai,cartesia,local-smart-turn-v3,runner]",
|
||||
"pipecat-ai-cli"
|
||||
]
|
||||
|
||||
[dependency-groups]
|
||||
dev = [
|
||||
"ruff~=0.12.1",
|
||||
"pyright>=1.1.404,<2",
|
||||
"ruff>=0.12.11,<1",
|
||||
]
|
||||
|
||||
[tool.ruff]
|
||||
|
||||
@@ -1,267 +0,0 @@
|
||||
#!/usr/bin/env -S uv run
|
||||
"""Utilities for fetching Daily.co recording URLs with retry logic.
|
||||
|
||||
This module provides functions to retrieve recording download links from Daily's REST API
|
||||
with robust error handling, rate limiting, and exponential backoff retry logic.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from httpx import AsyncClient, HTTPStatusError
|
||||
from loguru import logger
|
||||
from tenacity import (
|
||||
AsyncRetrying,
|
||||
RetryError,
|
||||
retry_if_exception_type,
|
||||
stop_after_attempt,
|
||||
wait_exponential,
|
||||
)
|
||||
|
||||
|
||||
async def get_recording_s3_url_with_retry(
|
||||
room_id: str,
|
||||
max_retries: int = 5,
|
||||
) -> Tuple[Optional[str], Optional[str]]:
|
||||
"""Retrieve recording URL with exponential backoff and retry logic.
|
||||
|
||||
Uses tenacity library to handle rate limiting (429 errors) and other
|
||||
transient errors with automatic exponential backoff.
|
||||
|
||||
Args:
|
||||
room_id: Daily.co room identifier
|
||||
max_retries: Maximum number of retry attempts (default 5)
|
||||
|
||||
Returns:
|
||||
Tuple of (recording_url, recording_signed_url)
|
||||
Returns (None, None) if no recording exists for the room.
|
||||
"""
|
||||
try:
|
||||
# Use tenacity's AsyncRetrying for automatic retry with exponential backoff
|
||||
async for attempt in AsyncRetrying(
|
||||
retry=retry_if_exception_type((HTTPStatusError, Exception)),
|
||||
stop=stop_after_attempt(max_retries),
|
||||
wait=wait_exponential(multiplier=1, min=1, max=60),
|
||||
reraise=True,
|
||||
):
|
||||
with attempt:
|
||||
recording_url, recording_signed_url, status = await get_recording_s3_url(
|
||||
room_id=room_id
|
||||
)
|
||||
|
||||
# If no recording exists (status is None), return immediately - no retry
|
||||
if status is None:
|
||||
logger.debug(f"No recording found for room {room_id}")
|
||||
return None, None
|
||||
|
||||
# If recording exists but is not finished yet, retry
|
||||
if status != "finished":
|
||||
logger.warning(
|
||||
f"Recording not finished for room {room_id}, status: {status} "
|
||||
f"(attempt {attempt.retry_state.attempt_number}/{max_retries})"
|
||||
)
|
||||
raise Exception(f"Recording not ready, status: {status}")
|
||||
|
||||
# Recording is finished, return the URLs
|
||||
return recording_url, recording_signed_url
|
||||
|
||||
# This line should never be reached due to reraise=True, but satisfies type checker
|
||||
return None, None
|
||||
|
||||
except RetryError as e:
|
||||
# All retries exhausted
|
||||
last_exception = e.last_attempt.exception()
|
||||
logger.error(
|
||||
f"Failed to retrieve recording URL for room {room_id} after {max_retries} attempts: "
|
||||
f"{last_exception}"
|
||||
)
|
||||
return None, None
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Unexpected error retrieving recording for room {room_id}: {e}")
|
||||
return None, None
|
||||
|
||||
|
||||
async def get_recording_s3_url(
|
||||
room_id: str,
|
||||
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
|
||||
"""Get recording URL using Daily's REST API.
|
||||
|
||||
Args:
|
||||
room_id: Daily.co room identifier
|
||||
|
||||
Returns:
|
||||
Tuple of (recording_url, recording_signed_url, status)
|
||||
- recording_url: The download link for the recording
|
||||
- recording_signed_url: Same as recording_url (kept for backward compatibility)
|
||||
- status: Recording status from Daily API
|
||||
|
||||
Raises:
|
||||
HTTPStatusError: When HTTP errors occur (including rate limits)
|
||||
"""
|
||||
headers = {
|
||||
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
async with AsyncClient(timeout=180) as client:
|
||||
# List recordings for the room
|
||||
list_response = await client.get(
|
||||
url=f"https://api.daily.co/v1/recordings?room_name={room_id}",
|
||||
headers=headers,
|
||||
)
|
||||
list_response.raise_for_status()
|
||||
list_data = list_response.json()
|
||||
|
||||
# Check if recording exists and is finished
|
||||
if not list_data.get("data") or len(list_data["data"]) == 0:
|
||||
return (None, None, None)
|
||||
|
||||
recording_id = list_data["data"][0].get("id")
|
||||
status = list_data["data"][0].get("status")
|
||||
|
||||
if not recording_id or status != "finished":
|
||||
return (None, None, status)
|
||||
|
||||
# Get the recording access link
|
||||
link_response = await client.get(
|
||||
url=f"https://api.daily.co/v1/recordings/{recording_id}/access-link",
|
||||
headers=headers,
|
||||
)
|
||||
link_response.raise_for_status()
|
||||
link_data = link_response.json()
|
||||
|
||||
recording_url = link_data.get("download_link")
|
||||
if not recording_url:
|
||||
logger.warning(f"No download link found for recording {recording_id}")
|
||||
return (None, None, status)
|
||||
|
||||
# Return the same URL for both fields for backward compatibility
|
||||
return (recording_url, recording_url, status)
|
||||
|
||||
|
||||
async def get_recent_recordings(limit: int = 100) -> list[str]:
|
||||
"""Get list of recent recording IDs from Daily API.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of recordings to retrieve (default 100)
|
||||
|
||||
Returns:
|
||||
List of recording IDs (strings)
|
||||
"""
|
||||
try:
|
||||
async with AsyncClient() as client:
|
||||
response = await client.get(
|
||||
url="https://api.daily.co/v1/recordings",
|
||||
headers={
|
||||
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
timeout=30,
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
data = response.json()
|
||||
recordings = data.get("data", [])
|
||||
recording_ids = [rec.get("id") for rec in recordings[:limit] if rec.get("id")]
|
||||
|
||||
logger.info(f"Retrieved {len(recording_ids)} recording IDs from Daily API")
|
||||
return recording_ids
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to get recordings from Daily API: {e}")
|
||||
return []
|
||||
|
||||
|
||||
async def main():
|
||||
"""Test get_recording_s3_url_with_retry with recent recordings."""
|
||||
logger.info("Starting recording fetch test...")
|
||||
|
||||
# Step 1: Get the most recent 100 recordings
|
||||
logger.info("Fetching recent recordings...")
|
||||
recording_ids = await get_recent_recordings(limit=100)
|
||||
|
||||
if not recording_ids:
|
||||
logger.error("No recordings found. Cannot proceed with test.")
|
||||
return
|
||||
|
||||
logger.info(f"Found {len(recording_ids)} recordings to fetch")
|
||||
|
||||
# Fetch access links for each recording concurrently
|
||||
logger.info(
|
||||
f"Attempting to fetch access links for {len(recording_ids)} recordings concurrently..."
|
||||
)
|
||||
|
||||
# Create tasks for all recordings
|
||||
async def get_recording_link(recording_id: str) -> Tuple[Optional[str], Optional[str]]:
|
||||
"""Get download link for a specific recording ID."""
|
||||
try:
|
||||
headers = {
|
||||
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
async with AsyncClient(timeout=180) as client:
|
||||
# Get the recording access link
|
||||
link_response = await client.get(
|
||||
url=f"https://api.daily.co/v1/recordings/{recording_id}/access-link",
|
||||
headers=headers,
|
||||
)
|
||||
link_response.raise_for_status()
|
||||
link_data = link_response.json()
|
||||
|
||||
recording_url = link_data.get("download_link")
|
||||
if not recording_url:
|
||||
logger.warning(f"No download link found for recording {recording_id}")
|
||||
return (None, None)
|
||||
|
||||
return (recording_url, recording_url)
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to get access link for recording {recording_id}: {e}")
|
||||
return (None, None)
|
||||
|
||||
tasks = [get_recording_link(recording_id) for recording_id in recording_ids]
|
||||
|
||||
# Execute all tasks concurrently
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
# Process results
|
||||
success_count = 0
|
||||
not_found_count = 0
|
||||
failed_count = 0
|
||||
|
||||
for i, (recording_id, result) in enumerate(zip(recording_ids, results), 1):
|
||||
if isinstance(result, Exception):
|
||||
failed_count += 1
|
||||
logger.error(f"❌ [{i}/{len(recording_ids)}] Failed for {recording_id}: {result}")
|
||||
elif isinstance(result, tuple) and len(result) == 2:
|
||||
recording_url, recording_signed_url = result
|
||||
if recording_url:
|
||||
success_count += 1
|
||||
logger.info(
|
||||
f"✅ [{i}/{len(recording_ids)}] Found recording link for {recording_id}"
|
||||
)
|
||||
logger.debug(f" URL: {recording_url}")
|
||||
else:
|
||||
not_found_count += 1
|
||||
logger.debug(f"ℹ️ [{i}/{len(recording_ids)}] No link for {recording_id}")
|
||||
else:
|
||||
failed_count += 1
|
||||
logger.error(f"❌ [{i}/{len(recording_ids)}] Unexpected result type for {recording_id}")
|
||||
|
||||
# Summary
|
||||
logger.info("\n" + "=" * 60)
|
||||
logger.info("RECORDING FETCH TEST SUMMARY")
|
||||
logger.info("=" * 60)
|
||||
logger.info(f"Total recordings checked: {len(recording_ids)}")
|
||||
logger.info(f"✅ Recordings found: {success_count}")
|
||||
logger.info(f"ℹ️ No recordings: {not_found_count}")
|
||||
logger.info(f"❌ Failed: {failed_count}")
|
||||
logger.info(f"Success rate: {(success_count / len(recording_ids) * 100):.2f}%")
|
||||
logger.info("=" * 60)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -1,238 +0,0 @@
|
||||
#!/usr/bin/env -S uv run
|
||||
"""Utilities for fetching Daily.co recording URLs with retry logic.
|
||||
|
||||
This module provides functions to retrieve recording download links from Daily's REST API
|
||||
with robust error handling, rate limiting, and exponential backoff retry logic.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from httpx import AsyncClient, HTTPStatusError
|
||||
from loguru import logger
|
||||
from tenacity import (
|
||||
AsyncRetrying,
|
||||
RetryError,
|
||||
retry_if_exception_type,
|
||||
stop_after_attempt,
|
||||
wait_exponential,
|
||||
)
|
||||
|
||||
|
||||
async def get_recording_s3_url_with_retry(
|
||||
room_id: str,
|
||||
max_retries: int = 5,
|
||||
) -> Tuple[Optional[str], Optional[str]]:
|
||||
"""Retrieve recording URL with exponential backoff and retry logic.
|
||||
|
||||
Uses tenacity library to handle rate limiting (429 errors) and other
|
||||
transient errors with automatic exponential backoff.
|
||||
|
||||
Args:
|
||||
room_id: Daily.co room identifier
|
||||
max_retries: Maximum number of retry attempts (default 5)
|
||||
|
||||
Returns:
|
||||
Tuple of (recording_url, recording_signed_url)
|
||||
Returns (None, None) if no recording exists for the room.
|
||||
"""
|
||||
try:
|
||||
# Use tenacity's AsyncRetrying for automatic retry with exponential backoff
|
||||
async for attempt in AsyncRetrying(
|
||||
retry=retry_if_exception_type((HTTPStatusError, Exception)),
|
||||
stop=stop_after_attempt(max_retries),
|
||||
wait=wait_exponential(multiplier=1, min=1, max=60),
|
||||
reraise=True,
|
||||
):
|
||||
with attempt:
|
||||
recording_url, recording_signed_url, status = await get_recording_s3_url(
|
||||
room_id=room_id
|
||||
)
|
||||
|
||||
# If no recording exists (status is None), return immediately - no retry
|
||||
if status is None:
|
||||
logger.debug(f"No recording found for room {room_id}")
|
||||
return None, None
|
||||
|
||||
# If recording exists but is not finished yet, retry
|
||||
if status != "finished":
|
||||
logger.warning(
|
||||
f"Recording not finished for room {room_id}, status: {status} "
|
||||
f"(attempt {attempt.retry_state.attempt_number}/{max_retries})"
|
||||
)
|
||||
raise Exception(f"Recording not ready, status: {status}")
|
||||
|
||||
# Recording is finished, return the URLs
|
||||
return recording_url, recording_signed_url
|
||||
|
||||
# This line should never be reached due to reraise=True, but satisfies type checker
|
||||
return None, None
|
||||
|
||||
except RetryError as e:
|
||||
# All retries exhausted
|
||||
last_exception = e.last_attempt.exception()
|
||||
logger.error(
|
||||
f"Failed to retrieve recording URL for room {room_id} after {max_retries} attempts: "
|
||||
f"{last_exception}"
|
||||
)
|
||||
return None, None
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Unexpected error retrieving recording for room {room_id}: {e}")
|
||||
return None, None
|
||||
|
||||
|
||||
async def get_recording_s3_url(
|
||||
room_id: str,
|
||||
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
|
||||
"""Get recording URL using Daily's REST API.
|
||||
|
||||
Args:
|
||||
room_id: Daily.co room identifier
|
||||
|
||||
Returns:
|
||||
Tuple of (recording_url, recording_signed_url, status)
|
||||
- recording_url: The download link for the recording
|
||||
- recording_signed_url: Same as recording_url (kept for backward compatibility)
|
||||
- status: Recording status from Daily API
|
||||
|
||||
Raises:
|
||||
HTTPStatusError: When HTTP errors occur (including rate limits)
|
||||
"""
|
||||
headers = {
|
||||
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
async with AsyncClient(timeout=180) as client:
|
||||
# List recordings for the room
|
||||
list_response = await client.get(
|
||||
url=f"https://api.daily.co/v1/recordings?room_name={room_id}",
|
||||
headers=headers,
|
||||
)
|
||||
list_response.raise_for_status()
|
||||
list_data = list_response.json()
|
||||
|
||||
# Check if recording exists and is finished
|
||||
if not list_data.get("data") or len(list_data["data"]) == 0:
|
||||
return (None, None, None)
|
||||
|
||||
recording_id = list_data["data"][0].get("id")
|
||||
status = list_data["data"][0].get("status")
|
||||
|
||||
if not recording_id or status != "finished":
|
||||
return (None, None, status)
|
||||
|
||||
# Get the recording access link
|
||||
link_response = await client.get(
|
||||
url=f"https://api.daily.co/v1/recordings/{recording_id}/access-link",
|
||||
headers=headers,
|
||||
)
|
||||
link_response.raise_for_status()
|
||||
link_data = link_response.json()
|
||||
|
||||
recording_url = link_data.get("download_link")
|
||||
if not recording_url:
|
||||
logger.warning(f"No download link found for recording {recording_id}")
|
||||
return (None, None, status)
|
||||
|
||||
# Return the same URL for both fields for backward compatibility
|
||||
return (recording_url, recording_url, status)
|
||||
|
||||
|
||||
async def get_recent_rooms(limit: int = 100) -> list[str]:
|
||||
"""Get list of recent room names from Daily API.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of rooms to retrieve (default 100, max 100)
|
||||
|
||||
Returns:
|
||||
List of room names (strings)
|
||||
"""
|
||||
try:
|
||||
async with AsyncClient() as client:
|
||||
response = await client.get(
|
||||
url=f"https://api.daily.co/v1/rooms?limit={min(limit, 100)}",
|
||||
headers={
|
||||
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
timeout=30,
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
data = response.json()
|
||||
rooms = data.get("data", [])
|
||||
room_names = [room.get("name") for room in rooms if room.get("name")]
|
||||
|
||||
logger.info(f"Retrieved {len(room_names)} room names from Daily API")
|
||||
return room_names
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to get rooms from Daily API: {e}")
|
||||
return []
|
||||
|
||||
|
||||
async def main():
|
||||
"""Test get_recording_s3_url_with_retry with recent rooms."""
|
||||
logger.info("Starting recording fetch test...")
|
||||
|
||||
# Step 1: Get the most recent 100 rooms
|
||||
logger.info("Fetching recent rooms...")
|
||||
room_names = await get_recent_rooms(limit=100)
|
||||
|
||||
if not room_names:
|
||||
logger.error("No rooms found. Cannot proceed with test.")
|
||||
return
|
||||
|
||||
logger.info(f"Found {len(room_names)} rooms to check for recordings")
|
||||
|
||||
# Call get_recording_s3_url_with_retry on each room concurrently
|
||||
logger.info(f"Attempting to fetch recordings for {len(room_names)} rooms concurrently...")
|
||||
|
||||
# Create tasks for all rooms
|
||||
tasks = [
|
||||
get_recording_s3_url_with_retry(room_id=room_name, max_retries=3)
|
||||
for room_name in room_names
|
||||
]
|
||||
|
||||
# Execute all tasks concurrently
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
# Process results
|
||||
success_count = 0
|
||||
not_found_count = 0
|
||||
failed_count = 0
|
||||
|
||||
for i, (room_name, result) in enumerate(zip(room_names, results), 1):
|
||||
if isinstance(result, Exception):
|
||||
failed_count += 1
|
||||
logger.error(f"❌ [{i}/{len(room_names)}] Failed for {room_name}: {result}")
|
||||
elif isinstance(result, tuple) and len(result) == 2:
|
||||
recording_url, recording_signed_url = result
|
||||
if recording_url:
|
||||
success_count += 1
|
||||
logger.info(f"✅ [{i}/{len(room_names)}] Found recording for {room_name}")
|
||||
logger.debug(f" URL: {recording_url[:80]}...")
|
||||
else:
|
||||
not_found_count += 1
|
||||
logger.debug(f"ℹ️ [{i}/{len(room_names)}] No recording for {room_name}")
|
||||
else:
|
||||
failed_count += 1
|
||||
logger.error(f"❌ [{i}/{len(room_names)}] Unexpected result type for {room_name}")
|
||||
|
||||
# Summary
|
||||
logger.info("\n" + "=" * 60)
|
||||
logger.info("RECORDING FETCH TEST SUMMARY")
|
||||
logger.info("=" * 60)
|
||||
logger.info(f"Total rooms checked: {len(room_names)}")
|
||||
logger.info(f"✅ Recordings found: {success_count}")
|
||||
logger.info(f"ℹ️ No recordings: {not_found_count}")
|
||||
logger.info(f"❌ Failed: {failed_count}")
|
||||
logger.info(f"Success rate: {(success_count / len(room_names) * 100):.2f}%")
|
||||
logger.info("=" * 60)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -50,12 +50,12 @@ anthropic = [ "anthropic~=0.49.0" ]
|
||||
assemblyai = [ "pipecat-ai[websockets-base]" ]
|
||||
asyncai = [ "pipecat-ai[websockets-base]" ]
|
||||
aws = [ "aioboto3~=15.0.0", "pipecat-ai[websockets-base]" ]
|
||||
aws-nova-sonic = [ "aws_sdk_bedrock_runtime~=0.1.0; python_version>='3.12'" ]
|
||||
aws-nova-sonic = [ "aws_sdk_bedrock_runtime~=0.1.1; python_version>='3.12'" ]
|
||||
azure = [ "azure-cognitiveservices-speech~=1.42.0"]
|
||||
cartesia = [ "cartesia~=2.0.3", "pipecat-ai[websockets-base]" ]
|
||||
cerebras = []
|
||||
deepseek = []
|
||||
daily = [ "daily-python~=0.19.9" ]
|
||||
daily = [ "daily-python~=0.20.0" ]
|
||||
deepgram = [ "deepgram-sdk~=4.7.0" ]
|
||||
elevenlabs = [ "pipecat-ai[websockets-base]" ]
|
||||
fal = [ "fal-client~=0.5.9" ]
|
||||
|
||||
@@ -110,7 +110,7 @@ class AnthropicLLMAdapter(BaseLLMAdapter[AnthropicLLMInvocationParams]):
|
||||
system = NOT_GIVEN
|
||||
messages = []
|
||||
|
||||
# first, map messages using self._from_universal_context_message(m)
|
||||
# First, map messages using self._from_universal_context_message(m)
|
||||
try:
|
||||
messages = [self._from_universal_context_message(m) for m in universal_context_messages]
|
||||
except Exception as e:
|
||||
|
||||
@@ -6,13 +6,47 @@
|
||||
|
||||
"""AWS Nova Sonic LLM adapter for Pipecat."""
|
||||
|
||||
import copy
|
||||
import json
|
||||
from typing import Any, Dict, List, TypedDict
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, List, Optional, TypedDict
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
|
||||
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext, LLMContextMessage
|
||||
|
||||
|
||||
class Role(Enum):
|
||||
"""Roles supported in AWS Nova Sonic conversations.
|
||||
|
||||
Parameters:
|
||||
SYSTEM: System-level messages (not used in conversation history).
|
||||
USER: Messages sent by the user.
|
||||
ASSISTANT: Messages sent by the assistant.
|
||||
TOOL: Messages sent by tools (not used in conversation history).
|
||||
"""
|
||||
|
||||
SYSTEM = "SYSTEM"
|
||||
USER = "USER"
|
||||
ASSISTANT = "ASSISTANT"
|
||||
TOOL = "TOOL"
|
||||
|
||||
|
||||
@dataclass
|
||||
class AWSNovaSonicConversationHistoryMessage:
|
||||
"""A single message in AWS Nova Sonic conversation history.
|
||||
|
||||
Parameters:
|
||||
role: The role of the message sender (USER or ASSISTANT only).
|
||||
text: The text content of the message.
|
||||
"""
|
||||
|
||||
role: Role # only USER and ASSISTANT
|
||||
text: str
|
||||
|
||||
|
||||
class AWSNovaSonicLLMInvocationParams(TypedDict):
|
||||
@@ -21,7 +55,9 @@ class AWSNovaSonicLLMInvocationParams(TypedDict):
|
||||
This is a placeholder until support for universal LLMContext machinery is added for AWS Nova Sonic.
|
||||
"""
|
||||
|
||||
pass
|
||||
system_instruction: Optional[str]
|
||||
messages: List[AWSNovaSonicConversationHistoryMessage]
|
||||
tools: List[Dict[str, Any]]
|
||||
|
||||
|
||||
class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
|
||||
@@ -34,7 +70,7 @@ class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
|
||||
@property
|
||||
def id_for_llm_specific_messages(self) -> str:
|
||||
"""Get the identifier used in LLMSpecificMessage instances for AWS Nova Sonic."""
|
||||
raise NotImplementedError("Universal LLMContext is not yet supported for AWS Nova Sonic.")
|
||||
return "aws-nova-sonic"
|
||||
|
||||
def get_llm_invocation_params(self, context: LLMContext) -> AWSNovaSonicLLMInvocationParams:
|
||||
"""Get AWS Nova Sonic-specific LLM invocation parameters from a universal LLM context.
|
||||
@@ -47,7 +83,13 @@ class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
|
||||
Returns:
|
||||
Dictionary of parameters for invoking AWS Nova Sonic's LLM API.
|
||||
"""
|
||||
raise NotImplementedError("Universal LLMContext is not yet supported for AWS Nova Sonic.")
|
||||
messages = self._from_universal_context_messages(self.get_messages(context))
|
||||
return {
|
||||
"system_instruction": messages.system_instruction,
|
||||
"messages": messages.messages,
|
||||
# NOTE: LLMContext's tools are guaranteed to be a ToolsSchema (or NOT_GIVEN)
|
||||
"tools": self.from_standard_tools(context.tools) or [],
|
||||
}
|
||||
|
||||
def get_messages_for_logging(self, context) -> List[Dict[str, Any]]:
|
||||
"""Get messages from a universal LLM context in a format ready for logging about AWS Nova Sonic.
|
||||
@@ -62,7 +104,75 @@ class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
|
||||
Returns:
|
||||
List of messages in a format ready for logging about AWS Nova Sonic.
|
||||
"""
|
||||
raise NotImplementedError("Universal LLMContext is not yet supported for AWS Nova Sonic.")
|
||||
return self._from_universal_context_messages(self.get_messages(context)).messages
|
||||
|
||||
@dataclass
|
||||
class ConvertedMessages:
|
||||
"""Container for Google-formatted messages converted from universal context."""
|
||||
|
||||
messages: List[AWSNovaSonicConversationHistoryMessage]
|
||||
system_instruction: Optional[str] = None
|
||||
|
||||
def _from_universal_context_messages(
|
||||
self, universal_context_messages: List[LLMContextMessage]
|
||||
) -> ConvertedMessages:
|
||||
system_instruction = None
|
||||
messages = []
|
||||
|
||||
# Bail if there are no messages
|
||||
if not universal_context_messages:
|
||||
return self.ConvertedMessages()
|
||||
|
||||
universal_context_messages = copy.deepcopy(universal_context_messages)
|
||||
|
||||
# If we have a "system" message as our first message, let's pull that out into "instruction"
|
||||
if universal_context_messages[0].get("role") == "system":
|
||||
system = universal_context_messages.pop(0)
|
||||
content = system.get("content")
|
||||
if isinstance(content, str):
|
||||
system_instruction = content
|
||||
elif isinstance(content, list):
|
||||
system_instruction = content[0].get("text")
|
||||
if system_instruction:
|
||||
self._system_instruction = system_instruction
|
||||
|
||||
# Process remaining messages to fill out conversation history.
|
||||
# Nova Sonic supports "user" and "assistant" messages in history.
|
||||
for universal_context_message in universal_context_messages:
|
||||
message = self._from_universal_context_message(universal_context_message)
|
||||
if message:
|
||||
messages.append(message)
|
||||
|
||||
return self.ConvertedMessages(messages=messages, system_instruction=system_instruction)
|
||||
|
||||
def _from_universal_context_message(self, message) -> AWSNovaSonicConversationHistoryMessage:
|
||||
"""Convert standard message format to Nova Sonic format.
|
||||
|
||||
Args:
|
||||
message: Standard message dictionary to convert.
|
||||
|
||||
Returns:
|
||||
Nova Sonic conversation history message, or None if not convertible.
|
||||
"""
|
||||
role = message.get("role")
|
||||
if message.get("role") == "user" or message.get("role") == "assistant":
|
||||
content = message.get("content")
|
||||
if isinstance(message.get("content"), list):
|
||||
content = ""
|
||||
for c in message.get("content"):
|
||||
if c.get("type") == "text":
|
||||
content += " " + c.get("text")
|
||||
else:
|
||||
logger.error(
|
||||
f"Unhandled content type in context message: {c.get('type')} - {message}"
|
||||
)
|
||||
# There won't be content if this is an assistant tool call entry.
|
||||
# We're ignoring those since they can't be loaded into AWS Nova Sonic conversation
|
||||
# history
|
||||
if content:
|
||||
return AWSNovaSonicConversationHistoryMessage(role=Role[role.upper()], text=content)
|
||||
# NOTE: we're ignoring messages with role "tool" since they can't be loaded into AWS Nova
|
||||
# Sonic conversation history
|
||||
|
||||
@staticmethod
|
||||
def _to_aws_nova_sonic_function_format(function: FunctionSchema) -> Dict[str, Any]:
|
||||
|
||||
@@ -107,7 +107,7 @@ class AWSBedrockLLMAdapter(BaseLLMAdapter[AWSBedrockLLMInvocationParams]):
|
||||
system = None
|
||||
messages = []
|
||||
|
||||
# first, map messages using self._from_universal_context_message(m)
|
||||
# First, map messages using self._from_universal_context_message(m)
|
||||
try:
|
||||
messages = [self._from_universal_context_message(m) for m in universal_context_messages]
|
||||
except Exception as e:
|
||||
|
||||
@@ -8,8 +8,8 @@
|
||||
|
||||
import base64
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, List, Optional, TypedDict
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Dict, List, Optional, Tuple, TypedDict
|
||||
|
||||
from loguru import logger
|
||||
from openai import NotGiven
|
||||
@@ -133,6 +133,28 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
|
||||
messages: List[Content]
|
||||
system_instruction: Optional[str] = None
|
||||
|
||||
@dataclass
|
||||
class MessageConversionResult:
|
||||
"""Result of converting a single universal context message to Google format.
|
||||
|
||||
Either content (a Google Content object) or a system instruction string
|
||||
is guaranteed to be set.
|
||||
|
||||
Also returns a tool call ID to name mapping for any tool calls
|
||||
discovered in the message.
|
||||
"""
|
||||
|
||||
content: Optional[Content] = None
|
||||
system_instruction: Optional[str] = None
|
||||
tool_call_id_to_name_mapping: Dict[str, str] = field(default_factory=dict)
|
||||
|
||||
@dataclass
|
||||
class MessageConversionParams:
|
||||
"""Parameters for converting a single universal context message to Google format."""
|
||||
|
||||
already_have_system_instruction: bool
|
||||
tool_call_id_to_name_mapping: Dict[str, str]
|
||||
|
||||
def _from_universal_context_messages(
|
||||
self, universal_context_messages: List[LLMContextMessage]
|
||||
) -> ConvertedMessages:
|
||||
@@ -156,24 +178,26 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
|
||||
"""
|
||||
system_instruction = None
|
||||
messages = []
|
||||
tool_call_id_to_name_mapping = {}
|
||||
|
||||
# Process each message, preserving Google-formatted messages and converting others
|
||||
for message in universal_context_messages:
|
||||
if isinstance(message, LLMSpecificMessage):
|
||||
# Assume that LLMSpecificMessage wraps a message in Google format
|
||||
messages.append(message.message)
|
||||
continue
|
||||
|
||||
# Convert standard format to Google format
|
||||
converted = self._from_standard_message(
|
||||
message, already_have_system_instruction=bool(system_instruction)
|
||||
result = self._from_universal_context_message(
|
||||
message,
|
||||
params=self.MessageConversionParams(
|
||||
already_have_system_instruction=bool(system_instruction),
|
||||
tool_call_id_to_name_mapping=tool_call_id_to_name_mapping,
|
||||
),
|
||||
)
|
||||
if isinstance(converted, Content):
|
||||
# Regular (non-system) message
|
||||
messages.append(converted)
|
||||
else:
|
||||
# System instruction
|
||||
system_instruction = converted
|
||||
# Each result is either a Content or a system instruction
|
||||
if result.content:
|
||||
messages.append(result.content)
|
||||
elif result.system_instruction:
|
||||
system_instruction = result.system_instruction
|
||||
|
||||
# Merge tool call ID to name mapping
|
||||
if result.tool_call_id_to_name_mapping:
|
||||
tool_call_id_to_name_mapping.update(result.tool_call_id_to_name_mapping)
|
||||
|
||||
# Check if we only have function-related messages (no regular text)
|
||||
has_regular_messages = any(
|
||||
@@ -193,9 +217,16 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
|
||||
|
||||
return self.ConvertedMessages(messages=messages, system_instruction=system_instruction)
|
||||
|
||||
def _from_universal_context_message(
|
||||
self, message: LLMContextMessage, *, params: MessageConversionParams
|
||||
) -> MessageConversionResult:
|
||||
if isinstance(message, LLMSpecificMessage):
|
||||
return self.MessageConversionResult(content=message.message)
|
||||
return self._from_standard_message(message, params=params)
|
||||
|
||||
def _from_standard_message(
|
||||
self, message: LLMStandardMessage, already_have_system_instruction: bool
|
||||
) -> Content | str:
|
||||
self, message: LLMStandardMessage, *, params: MessageConversionParams
|
||||
) -> MessageConversionResult:
|
||||
"""Convert standard universal context message to Google Content object.
|
||||
|
||||
Handles conversion of text, images, and function calls to Google's
|
||||
@@ -205,10 +236,11 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
|
||||
Args:
|
||||
message: Message in standard universal context format.
|
||||
already_have_system_instruction: Whether we already have a system instruction
|
||||
params: Parameters for conversion.
|
||||
|
||||
Returns:
|
||||
Content object with role and parts, or a plain string for system
|
||||
messages.
|
||||
MessageConversionResult containing either a Content object or a
|
||||
system instruction string.
|
||||
|
||||
Examples:
|
||||
Standard text message::
|
||||
@@ -242,38 +274,48 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
|
||||
Converts to Google Content with::
|
||||
|
||||
Content(
|
||||
role="model",
|
||||
role="user",
|
||||
parts=[Part(function_call=FunctionCall(name="search", args={"query": "test"}))]
|
||||
)
|
||||
"""
|
||||
role = message["role"]
|
||||
content = message.get("content", [])
|
||||
|
||||
if role == "system":
|
||||
if already_have_system_instruction:
|
||||
if params.already_have_system_instruction:
|
||||
role = "user" # Convert system message to user role if we already have a system instruction
|
||||
else:
|
||||
# System instructions are returned as plain text
|
||||
system_instruction: str = None
|
||||
if isinstance(content, str):
|
||||
return content
|
||||
system_instruction = content
|
||||
elif isinstance(content, list):
|
||||
# If content is a list, we assume it's a list of text parts, per the standard
|
||||
return " ".join(part["text"] for part in content if part.get("type") == "text")
|
||||
system_instruction = " ".join(
|
||||
part["text"] for part in content if part.get("type") == "text"
|
||||
)
|
||||
if system_instruction:
|
||||
return self.MessageConversionResult(system_instruction=system_instruction)
|
||||
elif role == "assistant":
|
||||
role = "model"
|
||||
|
||||
parts = []
|
||||
tool_call_id_to_name_mapping = {}
|
||||
|
||||
if message.get("tool_calls"):
|
||||
for tc in message["tool_calls"]:
|
||||
id = tc["id"]
|
||||
name = tc["function"]["name"]
|
||||
tool_call_id_to_name_mapping[id] = name
|
||||
parts.append(
|
||||
Part(
|
||||
function_call=FunctionCall(
|
||||
name=tc["function"]["name"],
|
||||
name=name,
|
||||
args=json.loads(tc["function"]["arguments"]),
|
||||
)
|
||||
)
|
||||
)
|
||||
elif role == "tool":
|
||||
role = "model"
|
||||
role = "user"
|
||||
try:
|
||||
response = json.loads(message["content"])
|
||||
if isinstance(response, dict):
|
||||
@@ -284,12 +326,17 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
|
||||
# Response might not be JSON-deserializable.
|
||||
# This occurs with a UserImageFrame, for example, where we get a plain "COMPLETED" string.
|
||||
response_dict = {"value": message["content"]}
|
||||
|
||||
# Get function name from mapping using tool_call_id, or fallback
|
||||
tool_call_id = message.get("tool_call_id")
|
||||
function_name = "tool_call_result" # Default fallback
|
||||
if tool_call_id and tool_call_id in params.tool_call_id_to_name_mapping:
|
||||
function_name = params.tool_call_id_to_name_mapping[tool_call_id]
|
||||
|
||||
parts.append(
|
||||
Part(
|
||||
function_response=FunctionResponse(
|
||||
name="tool_call_result", # seems to work to hard-code the same name every time
|
||||
response=response_dict,
|
||||
)
|
||||
Part.from_function_response(
|
||||
name=function_name,
|
||||
response=response_dict,
|
||||
)
|
||||
)
|
||||
elif isinstance(content, str):
|
||||
@@ -312,4 +359,7 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
|
||||
audio_bytes = base64.b64decode(input_audio["data"])
|
||||
parts.append(Part(inline_data=Blob(mime_type="audio/wav", data=audio_bytes)))
|
||||
|
||||
return Content(role=role, parts=parts)
|
||||
return self.MessageConversionResult(
|
||||
content=Content(role=role, parts=parts),
|
||||
tool_call_id_to_name_mapping=tool_call_id_to_name_mapping,
|
||||
)
|
||||
|
||||
@@ -14,20 +14,41 @@ from pipecat.services.llm_service import LLMService
|
||||
|
||||
|
||||
class LLMSwitcher(ServiceSwitcher[StrategyType]):
|
||||
"""A pipeline that switches between different LLMs at runtime."""
|
||||
"""A pipeline that switches between different LLMs at runtime.
|
||||
|
||||
Example::
|
||||
|
||||
llm_switcher = LLMSwitcher(
|
||||
llms=[openai_llm, anthropic_llm],
|
||||
strategy_type=ServiceSwitcherStrategyManual
|
||||
)
|
||||
"""
|
||||
|
||||
def __init__(self, llms: List[LLMService], strategy_type: Type[StrategyType]):
|
||||
"""Initialize the service switcher with a list of LLMs and a switching strategy."""
|
||||
"""Initialize the service switcher with a list of LLMs and a switching strategy.
|
||||
|
||||
Args:
|
||||
llms: List of LLM services to switch between.
|
||||
strategy_type: The strategy class to use for switching between LLMs.
|
||||
"""
|
||||
super().__init__(llms, strategy_type)
|
||||
|
||||
@property
|
||||
def llms(self) -> List[LLMService]:
|
||||
"""Get the list of LLMs managed by this switcher."""
|
||||
"""Get the list of LLMs managed by this switcher.
|
||||
|
||||
Returns:
|
||||
List of LLM services managed by this switcher.
|
||||
"""
|
||||
return self.services
|
||||
|
||||
@property
|
||||
def active_llm(self) -> Optional[LLMService]:
|
||||
"""Get the currently active LLM, if any."""
|
||||
"""Get the currently active LLM.
|
||||
|
||||
Returns:
|
||||
The currently active LLM service, or None if no LLM is active.
|
||||
"""
|
||||
return self.strategy.active_service
|
||||
|
||||
async def run_inference(self, context: LLMContext) -> Optional[str]:
|
||||
|
||||
@@ -21,10 +21,22 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
|
||||
|
||||
class ServiceSwitcherStrategy:
|
||||
"""Base class for service switching strategies."""
|
||||
"""Base class for service switching strategies.
|
||||
|
||||
Note:
|
||||
Strategy classes are instantiated internally by ServiceSwitcher.
|
||||
Developers should pass the strategy class (not an instance) to ServiceSwitcher.
|
||||
"""
|
||||
|
||||
def __init__(self, services: List[FrameProcessor]):
|
||||
"""Initialize the service switcher strategy with a list of services."""
|
||||
"""Initialize the service switcher strategy with a list of services.
|
||||
|
||||
Note:
|
||||
This is called internally by ServiceSwitcher. Do not instantiate directly.
|
||||
|
||||
Args:
|
||||
services: List of frame processors to switch between.
|
||||
"""
|
||||
self.services = services
|
||||
self.active_service: Optional[FrameProcessor] = None
|
||||
|
||||
@@ -46,10 +58,24 @@ class ServiceSwitcherStrategyManual(ServiceSwitcherStrategy):
|
||||
|
||||
This strategy allows the user to manually select which service is active.
|
||||
The initial active service is the first one in the list.
|
||||
|
||||
Example::
|
||||
|
||||
stt_switcher = ServiceSwitcher(
|
||||
services=[stt_1, stt_2],
|
||||
strategy_type=ServiceSwitcherStrategyManual
|
||||
)
|
||||
"""
|
||||
|
||||
def __init__(self, services: List[FrameProcessor]):
|
||||
"""Initialize the manual service switcher strategy with a list of services."""
|
||||
"""Initialize the manual service switcher strategy with a list of services.
|
||||
|
||||
Note:
|
||||
This is called internally by ServiceSwitcher. Do not instantiate directly.
|
||||
|
||||
Args:
|
||||
services: List of frame processors to switch between.
|
||||
"""
|
||||
super().__init__(services)
|
||||
self.active_service = services[0] if services else None
|
||||
|
||||
@@ -85,7 +111,12 @@ class ServiceSwitcher(ParallelPipeline, Generic[StrategyType]):
|
||||
"""A pipeline that switches between different services at runtime."""
|
||||
|
||||
def __init__(self, services: List[FrameProcessor], strategy_type: Type[StrategyType]):
|
||||
"""Initialize the service switcher with a list of services and a switching strategy."""
|
||||
"""Initialize the service switcher with a list of services and a switching strategy.
|
||||
|
||||
Args:
|
||||
services: List of frame processors to switch between.
|
||||
strategy_type: The strategy class to use for switching between services.
|
||||
"""
|
||||
strategy = strategy_type(services)
|
||||
super().__init__(*self._make_pipeline_definitions(services, strategy))
|
||||
self.services = services
|
||||
@@ -100,14 +131,20 @@ class ServiceSwitcher(ParallelPipeline, Generic[StrategyType]):
|
||||
active_service: FrameProcessor,
|
||||
direction: FrameDirection,
|
||||
):
|
||||
"""Initialize the service switcher filter with a strategy and direction."""
|
||||
"""Initialize the service switcher filter with a strategy and direction.
|
||||
|
||||
Args:
|
||||
wrapped_service: The service that this filter wraps.
|
||||
active_service: The currently active service.
|
||||
direction: The direction of frame flow to filter.
|
||||
"""
|
||||
self._wrapped_service = wrapped_service
|
||||
self._active_service = active_service
|
||||
|
||||
async def filter(_: Frame) -> bool:
|
||||
return self._wrapped_service == self._active_service
|
||||
|
||||
super().__init__(filter, direction)
|
||||
self._wrapped_service = wrapped_service
|
||||
self._active_service = active_service
|
||||
super().__init__(filter, direction, filter_system_frames=True)
|
||||
|
||||
async def process_frame(self, frame, direction):
|
||||
"""Process a frame through the filter, handling special internal filter-updating frames."""
|
||||
|
||||
@@ -189,7 +189,7 @@ class TaskObserver(BaseObserver):
|
||||
if isinstance(data, FramePushed):
|
||||
if on_push_frame_deprecated:
|
||||
await observer.on_push_frame(
|
||||
data.src, data.dst, data.frame, data.direction, data.timestamp
|
||||
data.source, data.destination, data.frame, data.direction, data.timestamp
|
||||
)
|
||||
else:
|
||||
await observer.on_push_frame(data)
|
||||
|
||||
@@ -15,9 +15,10 @@ service-specific adapter.
|
||||
"""
|
||||
|
||||
import base64
|
||||
import copy
|
||||
import io
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, List, Optional, TypeAlias, Union
|
||||
from typing import TYPE_CHECKING, Any, List, Optional, TypeAlias, Union
|
||||
|
||||
from loguru import logger
|
||||
from openai._types import NOT_GIVEN as OPEN_AI_NOT_GIVEN
|
||||
@@ -31,6 +32,9 @@ from PIL import Image
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.frames.frames import AudioRawFrame
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
|
||||
# "Re-export" types from OpenAI that we're using as universal context types.
|
||||
# NOTE: if universal message types need to someday diverge from OpenAI's, we
|
||||
# should consider managing our own definitions. But we should do so carefully,
|
||||
@@ -65,6 +69,26 @@ class LLMContext:
|
||||
and content formatting.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def from_openai_context(openai_context: "OpenAILLMContext") -> "LLMContext":
|
||||
"""Create a universal LLM context from an OpenAI-specific context.
|
||||
|
||||
NOTE: this should only be used internally, for facilitating migration
|
||||
from OpenAILLMContext to LLMContext. New user code should use
|
||||
LLMContext directly.
|
||||
|
||||
Args:
|
||||
openai_context: The OpenAI LLM context to convert.
|
||||
|
||||
Returns:
|
||||
New LLMContext instance with converted messages and settings.
|
||||
"""
|
||||
return LLMContext(
|
||||
messages=openai_context.get_messages(),
|
||||
tools=openai_context.tools,
|
||||
tool_choice=openai_context.tool_choice,
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
messages: Optional[List[LLMContextMessage]] = None,
|
||||
@@ -82,6 +106,19 @@ class LLMContext:
|
||||
self._tools: ToolsSchema | NotGiven = LLMContext._normalize_and_validate_tools(tools)
|
||||
self._tool_choice: LLMContextToolChoice | NotGiven = tool_choice
|
||||
|
||||
@property
|
||||
def messages(self) -> List[LLMContextMessage]:
|
||||
"""Get the current messages list.
|
||||
|
||||
NOTE: This is equivalent to calling `get_messages()` with no filter. If
|
||||
you want to filter out LLM-specific messages that don't pertain to your
|
||||
LLM, use `get_messages()` directly.
|
||||
|
||||
Returns:
|
||||
List of conversation messages.
|
||||
"""
|
||||
return self.get_messages()
|
||||
|
||||
def get_messages(self, llm_specific_filter: Optional[str] = None) -> List[LLMContextMessage]:
|
||||
"""Get the current messages list.
|
||||
|
||||
@@ -89,7 +126,8 @@ class LLMContext:
|
||||
llm_specific_filter: Optional filter to return LLM-specific
|
||||
messages for the given LLM, in addition to the standard
|
||||
messages. If messages end up being filtered, an error will be
|
||||
logged.
|
||||
logged; this is intended to catch accidental use of
|
||||
incompatible LLM-specific messages.
|
||||
|
||||
Returns:
|
||||
List of conversation messages.
|
||||
|
||||
@@ -12,7 +12,7 @@ allowing for flexible frame filtering logic in processing pipelines.
|
||||
|
||||
from typing import Awaitable, Callable
|
||||
|
||||
from pipecat.frames.frames import EndFrame, Frame, SystemFrame
|
||||
from pipecat.frames.frames import CancelFrame, EndFrame, Frame, StartFrame, SystemFrame
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
|
||||
|
||||
@@ -28,6 +28,7 @@ class FunctionFilter(FrameProcessor):
|
||||
self,
|
||||
filter: Callable[[Frame], Awaitable[bool]],
|
||||
direction: FrameDirection = FrameDirection.DOWNSTREAM,
|
||||
filter_system_frames: bool = False,
|
||||
):
|
||||
"""Initialize the function filter.
|
||||
|
||||
@@ -36,22 +37,32 @@ class FunctionFilter(FrameProcessor):
|
||||
frame should pass through, False otherwise.
|
||||
direction: The direction to apply filtering. Only frames moving in
|
||||
this direction will be filtered. Defaults to DOWNSTREAM.
|
||||
filter_system_frames: Whether to filter system frames. Defaults to False.
|
||||
"""
|
||||
super().__init__()
|
||||
self._filter = filter
|
||||
self._direction = direction
|
||||
self._filter_system_frames = filter_system_frames
|
||||
|
||||
#
|
||||
# Frame processor
|
||||
#
|
||||
|
||||
# Ignore system frames, end frames and frames that are not following the
|
||||
# direction of this gate
|
||||
def _should_passthrough_frame(self, frame, direction):
|
||||
"""Check if a frame should pass through without filtering."""
|
||||
# Ignore system frames, end frames and frames that are not following the
|
||||
# direction of this gate
|
||||
return isinstance(frame, (SystemFrame, EndFrame)) or direction != self._direction
|
||||
# Always passthrough frames in the wrong direction
|
||||
if direction != self._direction:
|
||||
return True
|
||||
|
||||
# Always passthrough lifecycle frames
|
||||
if isinstance(frame, (StartFrame, EndFrame, CancelFrame)):
|
||||
return True
|
||||
|
||||
# If not filtering system frames, passthrough all other system frames
|
||||
if not self._filter_system_frames and isinstance(frame, SystemFrame):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
"""Process a frame through the filter.
|
||||
|
||||
@@ -1018,6 +1018,7 @@ class RTVIObserver(BaseObserver):
|
||||
|
||||
if (
|
||||
isinstance(frame, (UserStartedSpeakingFrame, UserStoppedSpeakingFrame))
|
||||
and (direction == FrameDirection.DOWNSTREAM)
|
||||
and self._params.user_speaking_enabled
|
||||
):
|
||||
await self._handle_interruptions(frame)
|
||||
|
||||
@@ -76,6 +76,7 @@ class DailyRoomConfig(BaseModel):
|
||||
async def configure(
|
||||
aiohttp_session: aiohttp.ClientSession,
|
||||
*,
|
||||
api_key: Optional[str] = None,
|
||||
room_exp_duration: Optional[float] = 2.0,
|
||||
token_exp_duration: Optional[float] = 2.0,
|
||||
sip_caller_phone: Optional[str] = None,
|
||||
@@ -92,6 +93,7 @@ async def configure(
|
||||
|
||||
Args:
|
||||
aiohttp_session: HTTP session for making API requests.
|
||||
api_key: Daily API key.
|
||||
room_exp_duration: Room expiration time in hours.
|
||||
token_exp_duration: Token expiration time in hours.
|
||||
sip_caller_phone: Phone number or identifier for SIP display name.
|
||||
@@ -129,7 +131,7 @@ async def configure(
|
||||
config = await configure(session, room_properties=custom_props)
|
||||
"""
|
||||
# Check for required API key
|
||||
api_key = os.getenv("DAILY_API_KEY")
|
||||
api_key = api_key or os.getenv("DAILY_API_KEY")
|
||||
if not api_key:
|
||||
raise Exception(
|
||||
"DAILY_API_KEY environment variable is required. "
|
||||
|
||||
@@ -82,6 +82,7 @@ from loguru import logger
|
||||
|
||||
from pipecat.runner.types import (
|
||||
DailyRunnerArguments,
|
||||
RunnerArguments,
|
||||
SmallWebRTCRunnerArguments,
|
||||
WebSocketRunnerArguments,
|
||||
)
|
||||
@@ -309,7 +310,7 @@ def _setup_webrtc_routes(
|
||||
):
|
||||
"""Mimic Pipecat Cloud's proxy."""
|
||||
active_session = active_sessions.get(session_id)
|
||||
if not active_session:
|
||||
if active_session is None:
|
||||
return Response(content="Invalid or not-yet-ready session_id", status_code=404)
|
||||
|
||||
if path.endswith("api/offer"):
|
||||
@@ -529,9 +530,9 @@ def _setup_daily_routes(app: FastAPI):
|
||||
"""Set up Daily-specific routes."""
|
||||
|
||||
@app.get("/")
|
||||
async def start_agent():
|
||||
async def create_room_and_start_agent():
|
||||
"""Launch a Daily bot and redirect to room."""
|
||||
print("Starting bot with Daily transport")
|
||||
print("Starting bot with Daily transport and redirecting to Daily room")
|
||||
|
||||
import aiohttp
|
||||
|
||||
@@ -546,11 +547,11 @@ def _setup_daily_routes(app: FastAPI):
|
||||
asyncio.create_task(bot_module.bot(runner_args))
|
||||
return RedirectResponse(room_url)
|
||||
|
||||
async def _handle_rtvi_request(request: Request):
|
||||
"""Common handler for both /start and /connect endpoints.
|
||||
@app.post("/start")
|
||||
async def start_agent(request: Request):
|
||||
"""Handler for /start endpoints.
|
||||
|
||||
Expects POST body like::
|
||||
|
||||
{
|
||||
"createDailyRoom": true,
|
||||
"dailyRoomProperties": { "start_video_off": true },
|
||||
@@ -567,45 +568,38 @@ def _setup_daily_routes(app: FastAPI):
|
||||
logger.error(f"Failed to parse request body: {e}")
|
||||
request_data = {}
|
||||
|
||||
# Extract the body data that should be passed to the bot
|
||||
# This mimics Pipecat Cloud's behavior
|
||||
bot_body = request_data.get("body", {})
|
||||
create_daily_room = request_data.get("createDailyRoom", False)
|
||||
body = request_data.get("body", {})
|
||||
|
||||
# Log the extracted body data for debugging
|
||||
if bot_body:
|
||||
logger.info(f"Extracted body data for bot: {bot_body}")
|
||||
bot_module = _get_bot_module()
|
||||
|
||||
existing_room_url = os.getenv("DAILY_SAMPLE_ROOM_URL")
|
||||
|
||||
result = None
|
||||
|
||||
# Configure room if:
|
||||
# 1. Explicitly requested via createDailyRoom in payload
|
||||
# 2. Using pre-configured room from DAILY_SAMPLE_ROOM_URL env var
|
||||
if create_daily_room or existing_room_url:
|
||||
import aiohttp
|
||||
|
||||
from pipecat.runner.daily import configure
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
room_url, token = await configure(session)
|
||||
runner_args = DailyRunnerArguments(room_url=room_url, token=token, body=body)
|
||||
result = {
|
||||
"dailyRoom": room_url,
|
||||
"dailyToken": token,
|
||||
"sessionId": str(uuid.uuid4()),
|
||||
}
|
||||
else:
|
||||
logger.debug("No body data provided in request")
|
||||
runner_args = RunnerArguments(body=body)
|
||||
|
||||
from pipecat.runner.daily import configure
|
||||
# Start the bot in the background
|
||||
asyncio.create_task(bot_module.bot(runner_args))
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
room_url, token = await configure(session)
|
||||
|
||||
# Start the bot in the background with extracted body data
|
||||
bot_module = _get_bot_module()
|
||||
runner_args = DailyRunnerArguments(room_url=room_url, token=token, body=bot_body)
|
||||
asyncio.create_task(bot_module.bot(runner_args))
|
||||
# Match PCC /start endpoint response format:
|
||||
return {"dailyRoom": room_url, "dailyToken": token}
|
||||
|
||||
@app.post("/start")
|
||||
async def rtvi_start(request: Request):
|
||||
"""Launch a Daily bot and return connection info for RTVI clients."""
|
||||
return await _handle_rtvi_request(request)
|
||||
|
||||
@app.post("/connect")
|
||||
async def rtvi_connect(request: Request):
|
||||
"""Launch a Daily bot and return connection info for RTVI clients.
|
||||
|
||||
.. deprecated:: 0.0.78
|
||||
Use /start instead. This endpoint will be removed in a future version.
|
||||
"""
|
||||
logger.warning(
|
||||
"DEPRECATED: /connect endpoint is deprecated. Please use /start instead. "
|
||||
"This endpoint will be removed in a future version."
|
||||
)
|
||||
return await _handle_rtvi_request(request)
|
||||
return result
|
||||
|
||||
|
||||
def _setup_telephony_routes(app: FastAPI, *, transport_type: str, proxy: str):
|
||||
|
||||
@@ -20,9 +20,11 @@ from fastapi import WebSocket
|
||||
class RunnerArguments:
|
||||
"""Base class for runner session arguments."""
|
||||
|
||||
handle_sigint: bool = field(init=False)
|
||||
handle_sigterm: bool = field(init=False)
|
||||
pipeline_idle_timeout_secs: int = field(init=False)
|
||||
# Use kw_only so subclasses don't need to worry about ordering.
|
||||
handle_sigint: bool = field(init=False, kw_only=True)
|
||||
handle_sigterm: bool = field(init=False, kw_only=True)
|
||||
pipeline_idle_timeout_secs: int = field(init=False, kw_only=True)
|
||||
body: Optional[Any] = field(default_factory=dict, kw_only=True)
|
||||
|
||||
def __post_init__(self):
|
||||
self.handle_sigint = False
|
||||
@@ -42,7 +44,6 @@ class DailyRunnerArguments(RunnerArguments):
|
||||
|
||||
room_url: str
|
||||
token: Optional[str] = None
|
||||
body: Optional[Any] = field(default_factory=dict)
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -55,7 +56,6 @@ class WebSocketRunnerArguments(RunnerArguments):
|
||||
"""
|
||||
|
||||
websocket: WebSocket
|
||||
body: Optional[Any] = field(default_factory=dict)
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -720,11 +720,11 @@ class AWSBedrockLLMService(LLMService):
|
||||
additional_model_request_fields: Additional model-specific parameters.
|
||||
"""
|
||||
|
||||
max_tokens: Optional[int] = Field(default_factory=lambda: 4096, ge=1)
|
||||
temperature: Optional[float] = Field(default_factory=lambda: 0.7, ge=0.0, le=1.0)
|
||||
top_p: Optional[float] = Field(default_factory=lambda: 0.999, ge=0.0, le=1.0)
|
||||
max_tokens: Optional[int] = Field(default=None, ge=1)
|
||||
temperature: Optional[float] = Field(default=None, ge=0.0, le=1.0)
|
||||
top_p: Optional[float] = Field(default=None, ge=0.0, le=1.0)
|
||||
stop_sequences: Optional[List[str]] = Field(default_factory=lambda: [])
|
||||
latency: Optional[str] = Field(default_factory=lambda: "standard")
|
||||
latency: Optional[str] = Field(default=None)
|
||||
additional_model_request_fields: Optional[Dict[str, Any]] = Field(default_factory=dict)
|
||||
|
||||
def __init__(
|
||||
@@ -801,6 +801,24 @@ class AWSBedrockLLMService(LLMService):
|
||||
"""
|
||||
return True
|
||||
|
||||
def _build_inference_config(self) -> Dict[str, Any]:
|
||||
"""Build inference config with only the parameters that are set.
|
||||
|
||||
This prevents conflicts with models (e.g., Claude Sonnet 4.5) that don't
|
||||
allow certain parameter combinations like temperature and top_p together.
|
||||
|
||||
Returns:
|
||||
Dictionary containing only the inference parameters that are not None.
|
||||
"""
|
||||
inference_config = {}
|
||||
if self._settings["max_tokens"] is not None:
|
||||
inference_config["maxTokens"] = self._settings["max_tokens"]
|
||||
if self._settings["temperature"] is not None:
|
||||
inference_config["temperature"] = self._settings["temperature"]
|
||||
if self._settings["top_p"] is not None:
|
||||
inference_config["topP"] = self._settings["top_p"]
|
||||
return inference_config
|
||||
|
||||
async def run_inference(self, context: LLMContext | OpenAILLMContext) -> Optional[str]:
|
||||
"""Run a one-shot, out-of-band (i.e. out-of-pipeline) inference with the given LLM context.
|
||||
|
||||
@@ -826,16 +844,16 @@ class AWSBedrockLLMService(LLMService):
|
||||
model_id = self.model_name
|
||||
|
||||
# Prepare request parameters
|
||||
inference_config = self._build_inference_config()
|
||||
|
||||
request_params = {
|
||||
"modelId": model_id,
|
||||
"messages": messages,
|
||||
"inferenceConfig": {
|
||||
"maxTokens": 8192,
|
||||
"temperature": 0.7,
|
||||
"topP": 0.9,
|
||||
},
|
||||
}
|
||||
|
||||
if inference_config:
|
||||
request_params["inferenceConfig"] = inference_config
|
||||
|
||||
if system:
|
||||
request_params["system"] = system
|
||||
|
||||
@@ -974,21 +992,20 @@ class AWSBedrockLLMService(LLMService):
|
||||
tools = params_from_context["tools"]
|
||||
tool_choice = params_from_context["tool_choice"]
|
||||
|
||||
# Set up inference config
|
||||
inference_config = {
|
||||
"maxTokens": self._settings["max_tokens"],
|
||||
"temperature": self._settings["temperature"],
|
||||
"topP": self._settings["top_p"],
|
||||
}
|
||||
# Set up inference config - only include parameters that are set
|
||||
inference_config = self._build_inference_config()
|
||||
|
||||
# Prepare request parameters
|
||||
request_params = {
|
||||
"modelId": self.model_name,
|
||||
"messages": messages,
|
||||
"inferenceConfig": inference_config,
|
||||
"additionalModelRequestFields": self._settings["additional_model_request_fields"],
|
||||
}
|
||||
|
||||
# Only add inference config if it has parameters
|
||||
if inference_config:
|
||||
request_params["inferenceConfig"] = inference_config
|
||||
|
||||
# Add system message
|
||||
if system:
|
||||
request_params["system"] = system
|
||||
|
||||
@@ -8,8 +8,77 @@
|
||||
|
||||
This module provides specialized context aggregators and message handling for AWS Nova Sonic,
|
||||
including conversation history management and role-specific message processing.
|
||||
|
||||
.. deprecated:: 0.0.91
|
||||
AWS Nova Sonic no longer uses types from this module under the hood.
|
||||
It now uses `LLMContext` and `LLMContextAggregatorPair`.
|
||||
Using the new patterns should allow you to not need types from this module.
|
||||
|
||||
BEFORE:
|
||||
```
|
||||
# Setup
|
||||
context = OpenAILLMContext(messages, tools)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
# Context frame type
|
||||
frame: OpenAILLMContextFrame
|
||||
|
||||
# Context type
|
||||
context: AWSNovaSonicLLMContext
|
||||
# or
|
||||
context: OpenAILLMContext
|
||||
```
|
||||
|
||||
AFTER:
|
||||
```
|
||||
# Setup
|
||||
context = LLMContext(messages, tools)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
|
||||
# Context frame type
|
||||
frame: LLMContextFrame
|
||||
|
||||
# Context type
|
||||
context: LLMContext
|
||||
```
|
||||
"""
|
||||
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"Types in pipecat.services.aws.nova_sonic.context (or "
|
||||
"pipecat.services.aws_nova_sonic.context) are deprecated. \n"
|
||||
"AWS Nova Sonic no longer uses types from this module under the hood. \n"
|
||||
"It now uses `LLMContext` and `LLMContextAggregatorPair`. \n"
|
||||
"Using the new patterns should allow you to not need types from this module.\n\n"
|
||||
"BEFORE:\n"
|
||||
"```\n"
|
||||
"# Setup\n"
|
||||
"context = OpenAILLMContext(messages, tools)\n"
|
||||
"context_aggregator = llm.create_context_aggregator(context)\n\n"
|
||||
"# Context frame type\n"
|
||||
"frame: OpenAILLMContextFrame\n\n"
|
||||
"# Context type\n"
|
||||
"context: AWSNovaSonicLLMContext\n"
|
||||
"# or\n"
|
||||
"context: OpenAILLMContext\n\n"
|
||||
"```\n\n"
|
||||
"AFTER:\n"
|
||||
"```\n"
|
||||
"# Setup\n"
|
||||
"context = LLMContext(messages, tools)\n"
|
||||
"context_aggregator = LLMContextAggregatorPair(context)\n\n"
|
||||
"# Context frame type\n"
|
||||
"frame: LLMContextFrame\n\n"
|
||||
"# Context type\n"
|
||||
"context: LLMContext\n\n"
|
||||
"```",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
import copy
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
|
||||
@@ -25,7 +25,7 @@ from loguru import logger
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.adapters.services.aws_nova_sonic_adapter import AWSNovaSonicLLMAdapter
|
||||
from pipecat.adapters.services.aws_nova_sonic_adapter import AWSNovaSonicLLMAdapter, Role
|
||||
from pipecat.frames.frames import (
|
||||
BotStoppedSpeakingFrame,
|
||||
CancelFrame,
|
||||
@@ -33,35 +33,30 @@ from pipecat.frames.frames import (
|
||||
Frame,
|
||||
FunctionCallFromLLM,
|
||||
InputAudioRawFrame,
|
||||
InterimTranscriptionFrame,
|
||||
InterruptionFrame,
|
||||
LLMContextFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMTextFrame,
|
||||
StartFrame,
|
||||
TranscriptionFrame,
|
||||
TTSAudioRawFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
TTSTextFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantAggregatorParams,
|
||||
LLMUserAggregatorParams,
|
||||
)
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContext,
|
||||
OpenAILLMContextFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.aws.nova_sonic.context import (
|
||||
AWSNovaSonicAssistantContextAggregator,
|
||||
AWSNovaSonicContextAggregatorPair,
|
||||
AWSNovaSonicLLMContext,
|
||||
AWSNovaSonicUserContextAggregator,
|
||||
Role,
|
||||
)
|
||||
from pipecat.services.aws.nova_sonic.frames import AWSNovaSonicFunctionCallResultFrame
|
||||
from pipecat.services.llm_service import LLMService
|
||||
from pipecat.utils.time import time_now_iso8601
|
||||
|
||||
@@ -217,6 +212,11 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
system_instruction: System-level instruction for the model.
|
||||
tools: Available tools/functions for the model to use.
|
||||
send_transcription_frames: Whether to emit transcription frames.
|
||||
|
||||
.. deprecated:: 0.0.91
|
||||
This parameter is deprecated and will be removed in a future version.
|
||||
Transcription frames are always sent.
|
||||
|
||||
**kwargs: Additional arguments passed to the parent LLMService.
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
@@ -230,8 +230,20 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
self._params = params or Params()
|
||||
self._system_instruction = system_instruction
|
||||
self._tools = tools
|
||||
self._send_transcription_frames = send_transcription_frames
|
||||
self._context: Optional[AWSNovaSonicLLMContext] = None
|
||||
|
||||
if not send_transcription_frames:
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"`send_transcription_frames` is deprecated and will be removed in a future version. "
|
||||
"Transcription frames are always sent.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
self._context: Optional[LLMContext] = None
|
||||
self._stream: Optional[
|
||||
DuplexEventStream[
|
||||
InvokeModelWithBidirectionalStreamInput,
|
||||
@@ -244,12 +256,17 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
self._input_audio_content_name: Optional[str] = None
|
||||
self._content_being_received: Optional[CurrentContent] = None
|
||||
self._assistant_is_responding = False
|
||||
self._may_need_repush_assistant_text = False
|
||||
self._ready_to_send_context = False
|
||||
self._handling_bot_stopped_speaking = False
|
||||
self._triggering_assistant_response = False
|
||||
self._waiting_for_trigger_transcription = False
|
||||
self._disconnecting = False
|
||||
self._connected_time: Optional[float] = None
|
||||
self._wants_connection = False
|
||||
self._user_text_buffer = ""
|
||||
self._assistant_text_buffer = ""
|
||||
self._completed_tool_calls = set()
|
||||
|
||||
file_path = files("pipecat.services.aws.nova_sonic").joinpath("ready.wav")
|
||||
with wave.open(file_path.open("rb"), "rb") as wav_file:
|
||||
@@ -302,12 +319,12 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
logger.debug("Resetting conversation")
|
||||
await self._handle_bot_stopped_speaking(delay_to_catch_trailing_assistant_text=False)
|
||||
|
||||
# Carry over previous context through disconnect
|
||||
# Grab context to carry through disconnect/reconnect
|
||||
context = self._context
|
||||
await self._disconnect()
|
||||
self._context = context
|
||||
|
||||
await self._disconnect()
|
||||
await self._start_connecting()
|
||||
await self._handle_context(context)
|
||||
|
||||
#
|
||||
# frame processing
|
||||
@@ -322,28 +339,35 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
"""
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, OpenAILLMContextFrame):
|
||||
await self._handle_context(frame.context)
|
||||
elif isinstance(frame, LLMContextFrame):
|
||||
raise NotImplementedError(
|
||||
"Universal LLMContext is not yet supported for AWS Nova Sonic."
|
||||
if isinstance(frame, (LLMContextFrame, OpenAILLMContextFrame)):
|
||||
context = (
|
||||
frame.context
|
||||
if isinstance(frame, LLMContextFrame)
|
||||
else LLMContext.from_openai_context(frame.context)
|
||||
)
|
||||
await self._handle_context(context)
|
||||
elif isinstance(frame, InputAudioRawFrame):
|
||||
await self._handle_input_audio_frame(frame)
|
||||
elif isinstance(frame, BotStoppedSpeakingFrame):
|
||||
await self._handle_bot_stopped_speaking(delay_to_catch_trailing_assistant_text=True)
|
||||
elif isinstance(frame, AWSNovaSonicFunctionCallResultFrame):
|
||||
await self._handle_function_call_result(frame)
|
||||
elif isinstance(frame, InterruptionFrame):
|
||||
await self._handle_interruption_frame()
|
||||
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
async def _handle_context(self, context: OpenAILLMContext):
|
||||
async def _handle_context(self, context: LLMContext):
|
||||
if self._disconnecting:
|
||||
return
|
||||
|
||||
if not self._context:
|
||||
# We got our initial context - try to finish connecting
|
||||
self._context = AWSNovaSonicLLMContext.upgrade_to_nova_sonic(
|
||||
context, self._system_instruction
|
||||
)
|
||||
# We got our initial context
|
||||
# Try to finish connecting
|
||||
self._context = context
|
||||
await self._finish_connecting_if_context_available()
|
||||
else:
|
||||
# We got an updated context
|
||||
# Send results for any newly-completed function calls
|
||||
await self._process_completed_function_calls(send_new_results=True)
|
||||
|
||||
async def _handle_input_audio_frame(self, frame: InputAudioRawFrame):
|
||||
# Wait until we're done sending the assistant response trigger audio before sending audio
|
||||
@@ -393,9 +417,9 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
else:
|
||||
await finalize_assistant_response()
|
||||
|
||||
async def _handle_function_call_result(self, frame: AWSNovaSonicFunctionCallResultFrame):
|
||||
result = frame.result_frame
|
||||
await self._send_tool_result(tool_call_id=result.tool_call_id, result=result.result)
|
||||
async def _handle_interruption_frame(self):
|
||||
if self._assistant_is_responding:
|
||||
self._may_need_repush_assistant_text = True
|
||||
|
||||
#
|
||||
# LLM communication: lifecycle
|
||||
@@ -431,6 +455,17 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
logger.error(f"{self} initialization error: {e}")
|
||||
await self._disconnect()
|
||||
|
||||
async def _process_completed_function_calls(self, send_new_results: bool):
|
||||
# Check for set of completed function calls in the context
|
||||
for message in self._context.get_messages():
|
||||
if message.get("role") and message.get("content") != "IN_PROGRESS":
|
||||
tool_call_id = message.get("tool_call_id")
|
||||
if tool_call_id and tool_call_id not in self._completed_tool_calls:
|
||||
# Found a newly-completed function call - send the result to the service
|
||||
if send_new_results:
|
||||
await self._send_tool_result(tool_call_id, message.get("content"))
|
||||
self._completed_tool_calls.add(tool_call_id)
|
||||
|
||||
async def _finish_connecting_if_context_available(self):
|
||||
# We can only finish connecting once we've gotten our initial context and we're ready to
|
||||
# send it
|
||||
@@ -439,30 +474,38 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
|
||||
logger.info("Finishing connecting (setting up session)...")
|
||||
|
||||
# Initialize our bookkeeping of already-completed tool calls in the
|
||||
# context
|
||||
await self._process_completed_function_calls(send_new_results=False)
|
||||
|
||||
# Read context
|
||||
history = self._context.get_messages_for_initializing_history()
|
||||
adapter: AWSNovaSonicLLMAdapter = self.get_llm_adapter()
|
||||
llm_connection_params = adapter.get_llm_invocation_params(self._context)
|
||||
|
||||
# Send prompt start event, specifying tools.
|
||||
# Tools from context take priority over self._tools.
|
||||
tools = (
|
||||
self._context.tools
|
||||
if self._context.tools
|
||||
else self.get_llm_adapter().from_standard_tools(self._tools)
|
||||
llm_connection_params["tools"]
|
||||
if llm_connection_params["tools"]
|
||||
else adapter.from_standard_tools(self._tools)
|
||||
)
|
||||
logger.debug(f"Using tools: {tools}")
|
||||
await self._send_prompt_start_event(tools)
|
||||
|
||||
# Send system instruction.
|
||||
# Instruction from context takes priority over self._system_instruction.
|
||||
# (NOTE: this prioritizing occurred automatically behind the scenes: the context was
|
||||
# initialized with self._system_instruction and then updated itself from its messages when
|
||||
# get_messages_for_initializing_history() was called).
|
||||
logger.debug(f"Using system instruction: {history.system_instruction}")
|
||||
if history.system_instruction:
|
||||
await self._send_text_event(text=history.system_instruction, role=Role.SYSTEM)
|
||||
system_instruction = (
|
||||
llm_connection_params["system_instruction"]
|
||||
if llm_connection_params["system_instruction"]
|
||||
else self._system_instruction
|
||||
)
|
||||
logger.debug(f"Using system instruction: {system_instruction}")
|
||||
if system_instruction:
|
||||
await self._send_text_event(text=system_instruction, role=Role.SYSTEM)
|
||||
|
||||
# Send conversation history
|
||||
for message in history.messages:
|
||||
for message in llm_connection_params["messages"]:
|
||||
# logger.debug(f"Seeding conversation history with message: {message}")
|
||||
await self._send_text_event(text=message.text, role=message.role)
|
||||
|
||||
# Start audio input
|
||||
@@ -492,9 +535,12 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
await self._send_session_end_events()
|
||||
self._client = None
|
||||
|
||||
# Clean up context
|
||||
self._context = None
|
||||
|
||||
# Clean up stream
|
||||
if self._stream:
|
||||
await self._stream.input_stream.close()
|
||||
await self._stream.close()
|
||||
self._stream = None
|
||||
|
||||
# NOTE: see explanation of HACK, below
|
||||
@@ -510,15 +556,23 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
self._receive_task = None
|
||||
|
||||
# Reset remaining connection-specific state
|
||||
# Should be all private state except:
|
||||
# - _wants_connection
|
||||
# - _assistant_response_trigger_audio
|
||||
self._prompt_name = None
|
||||
self._input_audio_content_name = None
|
||||
self._content_being_received = None
|
||||
self._assistant_is_responding = False
|
||||
self._may_need_repush_assistant_text = False
|
||||
self._ready_to_send_context = False
|
||||
self._handling_bot_stopped_speaking = False
|
||||
self._triggering_assistant_response = False
|
||||
self._waiting_for_trigger_transcription = False
|
||||
self._disconnecting = False
|
||||
self._connected_time = None
|
||||
self._user_text_buffer = ""
|
||||
self._assistant_text_buffer = ""
|
||||
self._completed_tool_calls = set()
|
||||
|
||||
logger.info("Finished disconnecting")
|
||||
except Exception as e:
|
||||
@@ -826,6 +880,10 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
# Handle the LLM completion ending
|
||||
await self._handle_completion_end_event(event_json)
|
||||
except Exception as e:
|
||||
if self._disconnecting:
|
||||
# Errors are kind of expected while disconnecting, so just
|
||||
# ignore them and do nothing
|
||||
return
|
||||
logger.error(f"{self} error processing responses: {e}")
|
||||
if self._wants_connection:
|
||||
await self.reset_conversation()
|
||||
@@ -956,7 +1014,7 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
async def _report_assistant_response_started(self):
|
||||
logger.debug("Assistant response started")
|
||||
|
||||
# Report that the assistant has started their response.
|
||||
# Report the start of the assistant response.
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
|
||||
# Report that equivalent of TTS (this is a speech-to-speech model) started
|
||||
@@ -968,23 +1026,16 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
|
||||
logger.debug(f"Assistant response text added: {text}")
|
||||
|
||||
# Report some text added to the ongoing assistant response
|
||||
await self.push_frame(LLMTextFrame(text))
|
||||
|
||||
# Report some text added to the *equivalent* of TTS (this is a speech-to-speech model)
|
||||
# Report the text of the assistant response.
|
||||
await self.push_frame(TTSTextFrame(text))
|
||||
|
||||
# TODO: this is a (hopefully temporary) HACK. Here we directly manipulate the context rather
|
||||
# than relying on the frames pushed to the assistant context aggregator. The pattern of
|
||||
# receiving full-sentence text after the assistant has spoken does not easily fit with the
|
||||
# Pipecat expectation of chunks of text streaming in while the assistant is speaking.
|
||||
# Interruption handling was especially challenging. Rather than spend days trying to fit a
|
||||
# square peg in a round hole, I decided on this hack for the time being. We can most cleanly
|
||||
# abandon this hack if/when AWS Nova Sonic implements streaming smaller text chunks
|
||||
# interspersed with audio. Note that when we move away from this hack, we need to make sure
|
||||
# that on an interruption we avoid sending LLMFullResponseEndFrame, which gets the
|
||||
# LLMAssistantContextAggregator into a bad state.
|
||||
self._context.buffer_assistant_text(text)
|
||||
# HACK: here we're also buffering the assistant text ourselves as a
|
||||
# backup rather than relying solely on the assistant context aggregator
|
||||
# to do it, because the text arrives from Nova Sonic only after all the
|
||||
# assistant audio frames have been pushed, meaning that if an
|
||||
# interruption frame were to arrive we would lose all of it (the text
|
||||
# frames sitting in the queue would be wiped).
|
||||
self._assistant_text_buffer += text
|
||||
|
||||
async def _report_assistant_response_ended(self):
|
||||
if not self._context: # should never happen
|
||||
@@ -992,14 +1043,34 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
|
||||
logger.debug("Assistant response ended")
|
||||
|
||||
# Report that the assistant has finished their response.
|
||||
# If an interruption frame arrived while the assistant was responding
|
||||
# we may have lost all of the assistant text (see HACK, above), so
|
||||
# re-push it downstream to the aggregator now.
|
||||
if self._may_need_repush_assistant_text:
|
||||
# Just in case, check that assistant text hasn't already made it
|
||||
# into the context (sometimes it does, despite the interruption).
|
||||
messages = self._context.get_messages()
|
||||
last_message = messages[-1] if messages else None
|
||||
if (
|
||||
not last_message
|
||||
or last_message.get("role") != "assistant"
|
||||
or last_message.get("content") != self._assistant_text_buffer
|
||||
):
|
||||
# We also need to re-push the LLMFullResponseStartFrame since the
|
||||
# TTSTextFrame would be ignored otherwise (the interruption frame
|
||||
# would have cleared the assistant aggregator state).
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
await self.push_frame(TTSTextFrame(self._assistant_text_buffer))
|
||||
self._may_need_repush_assistant_text = False
|
||||
|
||||
# Report the end of the assistant response.
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
|
||||
# Report that equivalent of TTS (this is a speech-to-speech model) stopped.
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
|
||||
# For an explanation of this hack, see _report_assistant_response_text_added.
|
||||
self._context.flush_aggregated_assistant_text()
|
||||
# Clear out the buffered assistant text
|
||||
self._assistant_text_buffer = ""
|
||||
|
||||
#
|
||||
# user transcription reporting
|
||||
@@ -1016,33 +1087,67 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
|
||||
logger.debug(f"User transcription text added: {text}")
|
||||
|
||||
# Manually add new user transcription text to context.
|
||||
# We can't rely on the user context aggregator to do this since it's upstream from the LLM.
|
||||
self._context.buffer_user_text(text)
|
||||
|
||||
# Report that some new user transcription text is available.
|
||||
if self._send_transcription_frames:
|
||||
await self.push_frame(
|
||||
InterimTranscriptionFrame(text=text, user_id="", timestamp=time_now_iso8601())
|
||||
)
|
||||
# HACK: here we're buffering the user text ourselves rather than
|
||||
# relying on the upstream user context aggregator to do it, because the
|
||||
# text arrives in fairly large chunks spaced fairly far apart in time.
|
||||
# That means the user text would be split between different messages in
|
||||
# context. Even if we sent placeholder InterimTranscriptionFrames in
|
||||
# between each TranscriptionFrame to tell the aggregator to hold off on
|
||||
# finalizing the user message, the aggregator would likely get the last
|
||||
# chunk too late.
|
||||
self._user_text_buffer += f" {text}" if self._user_text_buffer else text
|
||||
|
||||
async def _report_user_transcription_ended(self):
|
||||
if not self._context: # should never happen
|
||||
return
|
||||
|
||||
# Manually add user transcription to context (if any has been buffered).
|
||||
# We can't rely on the user context aggregator to do this since it's upstream from the LLM.
|
||||
transcription = self._context.flush_aggregated_user_text()
|
||||
|
||||
if not transcription:
|
||||
return
|
||||
|
||||
logger.debug(f"User transcription ended")
|
||||
|
||||
if self._send_transcription_frames:
|
||||
await self.push_frame(
|
||||
TranscriptionFrame(text=transcription, user_id="", timestamp=time_now_iso8601())
|
||||
# Report to the upstream user context aggregator that some new user
|
||||
# transcription text is available.
|
||||
|
||||
# HACK: Check if this transcription was triggered by our own
|
||||
# assistant response trigger. If so, we need to wrap it with
|
||||
# UserStarted/StoppedSpeakingFrames; otherwise the user aggregator
|
||||
# would fire an EmulatedUserStartedSpeakingFrame, which would
|
||||
# trigger an interruption, which would prevent us from writing the
|
||||
# assistant response to context.
|
||||
#
|
||||
# Sending an EmulateUserStartedSpeakingFrame ourselves doesn't
|
||||
# work: it just causes the interruption we're trying to avoid.
|
||||
#
|
||||
# Setting enable_emulated_vad_interruptions also doesn't work: at
|
||||
# the time the user aggregator receives the TranscriptionFrame, it
|
||||
# doesn't yet know the assistant has started responding, so it
|
||||
# doesn't know that emulating the user starting to speak would
|
||||
# cause an interruption.
|
||||
should_wrap_in_user_started_stopped_speaking_frames = (
|
||||
self._waiting_for_trigger_transcription
|
||||
and self._user_text_buffer.strip().lower() == "ready"
|
||||
)
|
||||
|
||||
# Start wrapping the upstream transcription in UserStarted/StoppedSpeakingFrames if needed
|
||||
if should_wrap_in_user_started_stopped_speaking_frames:
|
||||
logger.debug(
|
||||
"Wrapping assistant response trigger transcription with upstream UserStarted/StoppedSpeakingFrames"
|
||||
)
|
||||
await self.push_frame(UserStartedSpeakingFrame(), direction=FrameDirection.UPSTREAM)
|
||||
|
||||
# Send the transcription upstream for the user context aggregator
|
||||
frame = TranscriptionFrame(
|
||||
text=self._user_text_buffer, user_id="", timestamp=time_now_iso8601()
|
||||
)
|
||||
await self.push_frame(frame, direction=FrameDirection.UPSTREAM)
|
||||
|
||||
# Finish wrapping the upstream transcription in UserStarted/StoppedSpeakingFrames if needed
|
||||
if should_wrap_in_user_started_stopped_speaking_frames:
|
||||
await self.push_frame(UserStoppedSpeakingFrame(), direction=FrameDirection.UPSTREAM)
|
||||
|
||||
# Clear out the buffered user text
|
||||
self._user_text_buffer = ""
|
||||
|
||||
# We're no longer waiting for a trigger transcription
|
||||
self._waiting_for_trigger_transcription = False
|
||||
|
||||
#
|
||||
# context
|
||||
@@ -1054,23 +1159,26 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
*,
|
||||
user_params: LLMUserAggregatorParams = LLMUserAggregatorParams(),
|
||||
assistant_params: LLMAssistantAggregatorParams = LLMAssistantAggregatorParams(),
|
||||
) -> AWSNovaSonicContextAggregatorPair:
|
||||
) -> LLMContextAggregatorPair:
|
||||
"""Create context aggregator pair for managing conversation context.
|
||||
|
||||
NOTE: this method exists only for backward compatibility. New code
|
||||
should instead do:
|
||||
context = LLMContext(...)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
|
||||
Args:
|
||||
context: The OpenAI LLM context to upgrade.
|
||||
context: The OpenAI LLM context.
|
||||
user_params: Parameters for the user context aggregator.
|
||||
assistant_params: Parameters for the assistant context aggregator.
|
||||
|
||||
Returns:
|
||||
A pair of user and assistant context aggregators.
|
||||
"""
|
||||
context.set_llm_adapter(self.get_llm_adapter())
|
||||
|
||||
user = AWSNovaSonicUserContextAggregator(context=context, params=user_params)
|
||||
assistant = AWSNovaSonicAssistantContextAggregator(context=context, params=assistant_params)
|
||||
|
||||
return AWSNovaSonicContextAggregatorPair(user, assistant)
|
||||
context = LLMContext.from_openai_context(context)
|
||||
return LLMContextAggregatorPair(
|
||||
context, user_params=user_params, assistant_params=assistant_params
|
||||
)
|
||||
|
||||
#
|
||||
# assistant response trigger (HACK)
|
||||
@@ -1108,6 +1216,8 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
try:
|
||||
logger.debug("Sending assistant response trigger...")
|
||||
|
||||
self._waiting_for_trigger_transcription = True
|
||||
|
||||
chunk_duration = 0.02 # what we might get from InputAudioRawFrame
|
||||
chunk_size = int(
|
||||
chunk_duration
|
||||
|
||||
@@ -8,18 +8,14 @@
|
||||
|
||||
This module provides specialized context aggregators and message handling for AWS Nova Sonic,
|
||||
including conversation history management and role-specific message processing.
|
||||
|
||||
.. deprecated:: 0.0.91
|
||||
AWS Nova Sonic no longer uses types from this module under the hood.
|
||||
It now uses `LLMContext` and `LLMContextAggregatorPair`.
|
||||
Using the new patterns should allow you to not need types from this module.
|
||||
|
||||
See deprecation warning in pipecat.services.aws.nova_sonic.context for more
|
||||
details.
|
||||
"""
|
||||
|
||||
import warnings
|
||||
|
||||
from pipecat.services.aws.nova_sonic.context import *
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"Types in pipecat.services.aws_nova_sonic.context are deprecated. "
|
||||
"Please use the equivalent types from "
|
||||
"pipecat.services.aws.nova_sonic.context instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
@@ -1034,6 +1034,23 @@ class GoogleLLMService(LLMService):
|
||||
if context:
|
||||
await self._process_context(context)
|
||||
|
||||
async def stop(self, frame):
|
||||
"""Override stop to gracefully close the client."""
|
||||
await super().stop(frame)
|
||||
await self._close_client()
|
||||
|
||||
async def cancel(self, frame):
|
||||
"""Override cancel to gracefully close the client."""
|
||||
await super().cancel(frame)
|
||||
await self._close_client()
|
||||
|
||||
async def _close_client(self):
|
||||
try:
|
||||
await self._client.aio.aclose()
|
||||
except Exception:
|
||||
# Do nothing - we're shutting down anyway
|
||||
pass
|
||||
|
||||
def create_context_aggregator(
|
||||
self,
|
||||
context: OpenAILLMContext,
|
||||
|
||||
@@ -0,0 +1,12 @@
|
||||
#
|
||||
# Copyright (c) 2024-2025 Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Public testing API for Pipecat frame processors."""
|
||||
|
||||
from .serialization import dict_to_frame, frame_to_dict, load_frames_from_json
|
||||
from .test_runner import run_test_from_file
|
||||
|
||||
__all__ = ["dict_to_frame", "frame_to_dict", "load_frames_from_json", "run_test_from_file"]
|
||||
|
||||
150
src/pipecat/tests/serialization.py
Normal file
150
src/pipecat/tests/serialization.py
Normal file
@@ -0,0 +1,150 @@
|
||||
#
|
||||
# Copyright (c) 2024-2025 Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Frame serialization and deserialization for testing."""
|
||||
|
||||
import base64
|
||||
import inspect
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List
|
||||
|
||||
from pipecat.frames import frames
|
||||
|
||||
|
||||
def _get_frame_class(frame_type: str):
|
||||
"""Get a frame class by name from the frames module.
|
||||
|
||||
Args:
|
||||
frame_type: The name of the frame class (e.g., "TextFrame")
|
||||
|
||||
Returns:
|
||||
The frame class object
|
||||
|
||||
Raises:
|
||||
ValueError: If the frame type is not found
|
||||
"""
|
||||
if not hasattr(frames, frame_type):
|
||||
raise ValueError(f"Unknown frame type: {frame_type}")
|
||||
|
||||
cls = getattr(frames, frame_type)
|
||||
if not inspect.isclass(cls) or not issubclass(cls, frames.Frame):
|
||||
raise ValueError(f"{frame_type} is not a valid Frame class")
|
||||
|
||||
return cls
|
||||
|
||||
|
||||
def dict_to_frame(data: Dict[str, Any]) -> frames.Frame:
|
||||
"""Convert a dictionary to a Frame object.
|
||||
|
||||
Args:
|
||||
data: Dictionary containing frame data with a "type" key
|
||||
|
||||
Returns:
|
||||
A Frame instance
|
||||
|
||||
Raises:
|
||||
ValueError: If frame type is missing or invalid
|
||||
|
||||
Example:
|
||||
>>> dict_to_frame({"type": "TextFrame", "text": "hello"})
|
||||
TextFrame(text="hello")
|
||||
"""
|
||||
if "type" not in data:
|
||||
raise ValueError("Frame dictionary must contain a 'type' field")
|
||||
|
||||
frame_type = data["type"]
|
||||
frame_cls = _get_frame_class(frame_type)
|
||||
|
||||
# Build kwargs from data, excluding 'type'
|
||||
kwargs = {k: v for k, v in data.items() if k != "type"}
|
||||
|
||||
# Special handling for audio frames with base64 encoded audio
|
||||
if "audio" in kwargs and isinstance(kwargs["audio"], str):
|
||||
kwargs["audio"] = base64.b64decode(kwargs["audio"])
|
||||
|
||||
# Special handling for image frames with base64 encoded images
|
||||
if "image" in kwargs and isinstance(kwargs["image"], str):
|
||||
kwargs["image"] = base64.b64decode(kwargs["image"])
|
||||
|
||||
try:
|
||||
return frame_cls(**kwargs)
|
||||
except TypeError as e:
|
||||
raise ValueError(f"Failed to create {frame_type}: {e}")
|
||||
|
||||
|
||||
def load_frames_from_json(filepath: str) -> List[frames.Frame]:
|
||||
"""Load frames from a JSON file.
|
||||
|
||||
Args:
|
||||
filepath: Path to JSON file containing frame data
|
||||
|
||||
Returns:
|
||||
List of Frame objects
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the file doesn't exist
|
||||
ValueError: If JSON is invalid or frames cannot be deserialized
|
||||
|
||||
Example JSON format:
|
||||
{
|
||||
"input_frames": [
|
||||
{"type": "TextFrame", "text": "hello"},
|
||||
{"type": "EndFrame"}
|
||||
]
|
||||
}
|
||||
"""
|
||||
path = Path(filepath)
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(f"Frame file not found: {filepath}")
|
||||
|
||||
with open(path, "r") as f:
|
||||
data = json.load(f)
|
||||
|
||||
if not isinstance(data, dict):
|
||||
raise ValueError("JSON must contain a dictionary")
|
||||
|
||||
if "input_frames" not in data:
|
||||
raise ValueError("JSON must contain an 'input_frames' key")
|
||||
|
||||
frame_dicts = data["input_frames"]
|
||||
if not isinstance(frame_dicts, list):
|
||||
raise ValueError("'input_frames' must be a list")
|
||||
|
||||
return [dict_to_frame(frame_dict) for frame_dict in frame_dicts]
|
||||
|
||||
|
||||
def frame_to_dict(frame: frames.Frame) -> Dict[str, Any]:
|
||||
"""Convert a Frame object to a dictionary.
|
||||
|
||||
Args:
|
||||
frame: Frame object to serialize
|
||||
|
||||
Returns:
|
||||
Dictionary representation of the frame
|
||||
|
||||
Example:
|
||||
>>> frame_to_dict(TextFrame(text="hello"))
|
||||
{"type": "TextFrame", "text": "hello"}
|
||||
"""
|
||||
result = {"type": frame.__class__.__name__}
|
||||
|
||||
# Get all fields from the dataclass
|
||||
if hasattr(frame, "__dataclass_fields__"):
|
||||
for field_name in frame.__dataclass_fields__:
|
||||
# Skip internal fields from base Frame class
|
||||
if field_name in ("id", "name", "pts", "metadata", "transport_source", "transport_destination"):
|
||||
continue
|
||||
|
||||
value = getattr(frame, field_name, None)
|
||||
if value is not None:
|
||||
# Special handling for bytes (audio/image data)
|
||||
if isinstance(value, bytes):
|
||||
result[field_name] = base64.b64encode(value).decode("utf-8")
|
||||
else:
|
||||
result[field_name] = value
|
||||
|
||||
return result
|
||||
169
src/pipecat/tests/test_runner.py
Normal file
169
src/pipecat/tests/test_runner.py
Normal file
@@ -0,0 +1,169 @@
|
||||
#
|
||||
# Copyright (c) 2024-2025 Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Test runner for frame processors from JSON test files."""
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from pipecat.frames.frames import Frame
|
||||
from pipecat.processors.frame_processor import FrameProcessor
|
||||
|
||||
from .serialization import dict_to_frame, frame_to_dict, load_frames_from_json
|
||||
|
||||
|
||||
async def run_test_from_file(
|
||||
processor: FrameProcessor,
|
||||
test_file: str,
|
||||
) -> Tuple[List[Frame], Optional[List[Dict[str, Any]]], bool]:
|
||||
"""Run a processor test from a JSON test file.
|
||||
|
||||
Args:
|
||||
processor: The frame processor to test
|
||||
test_file: Path to JSON test file
|
||||
|
||||
Returns:
|
||||
Tuple of (output_frames, expected_output, passed)
|
||||
- output_frames: List of Frame objects that were output
|
||||
- expected_output: List of expected frame dicts (None if not specified)
|
||||
- passed: True if test passed, False if failed, None if no validation
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If test file doesn't exist
|
||||
ValueError: If test file is invalid
|
||||
|
||||
Example test file format:
|
||||
{
|
||||
"input_frames": [
|
||||
{"type": "TextFrame", "text": "hello"}
|
||||
],
|
||||
"expected_output": [
|
||||
{"type": "TextFrame"},
|
||||
{"type": "EndFrame"}
|
||||
]
|
||||
}
|
||||
"""
|
||||
path = Path(test_file)
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(f"Test file not found: {test_file}")
|
||||
|
||||
with open(path, "r") as f:
|
||||
test_data = json.load(f)
|
||||
|
||||
# Load input frames
|
||||
if "input_frames" not in test_data:
|
||||
raise ValueError("Test file must contain 'input_frames'")
|
||||
|
||||
input_frames = [dict_to_frame(frame_dict) for frame_dict in test_data["input_frames"]]
|
||||
|
||||
# Load expected output (optional)
|
||||
expected_output = test_data.get("expected_output", None)
|
||||
|
||||
# Run the test
|
||||
# Note: run_test() only collects frames if expected_down_frames is provided,
|
||||
# so we need to manually collect from the pipeline ourselves
|
||||
import asyncio
|
||||
from pipecat.frames.frames import EndFrame
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.tests.utils import QueuedFrameProcessor
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.task import PipelineTask, PipelineParams
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
|
||||
# Set up the test pipeline manually
|
||||
received_down = asyncio.Queue()
|
||||
received_up = asyncio.Queue()
|
||||
source = QueuedFrameProcessor(
|
||||
queue=received_up,
|
||||
queue_direction=FrameDirection.UPSTREAM,
|
||||
ignore_start=True,
|
||||
)
|
||||
sink = QueuedFrameProcessor(
|
||||
queue=received_down,
|
||||
queue_direction=FrameDirection.DOWNSTREAM,
|
||||
ignore_start=True,
|
||||
)
|
||||
|
||||
pipeline = Pipeline([source, processor, sink])
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(),
|
||||
observers=[],
|
||||
cancel_on_idle_timeout=False,
|
||||
)
|
||||
|
||||
async def push_frames():
|
||||
await asyncio.sleep(0.01)
|
||||
for frame in input_frames:
|
||||
await task.queue_frame(frame)
|
||||
await task.queue_frame(EndFrame())
|
||||
|
||||
runner = PipelineRunner()
|
||||
await asyncio.gather(runner.run(task), push_frames())
|
||||
|
||||
# Collect all frames from the downstream queue
|
||||
downstream_frames = []
|
||||
while not received_down.empty():
|
||||
frame = await received_down.get()
|
||||
if not isinstance(frame, EndFrame):
|
||||
downstream_frames.append(frame)
|
||||
|
||||
# Validate if expected_output is provided
|
||||
passed = None
|
||||
if expected_output is not None:
|
||||
passed = _validate_output(downstream_frames, expected_output)
|
||||
|
||||
return downstream_frames, expected_output, passed
|
||||
|
||||
|
||||
def _validate_output(actual_frames: List[Frame], expected_output: List[Dict[str, Any]]) -> bool:
|
||||
"""Validate actual output frames against expected output.
|
||||
|
||||
Args:
|
||||
actual_frames: List of frames that were actually output
|
||||
expected_output: List of expected frame specifications
|
||||
|
||||
Returns:
|
||||
True if validation passed, False otherwise
|
||||
"""
|
||||
if len(actual_frames) != len(expected_output):
|
||||
return False
|
||||
|
||||
for actual, expected in zip(actual_frames, expected_output):
|
||||
# Check frame type
|
||||
if "type" not in expected:
|
||||
return False
|
||||
|
||||
expected_type = expected["type"]
|
||||
if actual.__class__.__name__ != expected_type:
|
||||
return False
|
||||
|
||||
# Check specific fields if provided
|
||||
for field_name, expected_value in expected.items():
|
||||
if field_name == "type":
|
||||
continue
|
||||
|
||||
if not hasattr(actual, field_name):
|
||||
return False
|
||||
|
||||
actual_value = getattr(actual, field_name)
|
||||
|
||||
# Special handling for different types
|
||||
if isinstance(expected_value, str) and isinstance(actual_value, str):
|
||||
# For string fields, support partial matching with "contains"
|
||||
if field_name.endswith("_contains"):
|
||||
base_field = field_name.replace("_contains", "")
|
||||
if hasattr(actual, base_field):
|
||||
actual_text = getattr(actual, base_field)
|
||||
if expected_value not in actual_text:
|
||||
return False
|
||||
elif actual_value != expected_value:
|
||||
return False
|
||||
elif actual_value != expected_value:
|
||||
return False
|
||||
|
||||
return True
|
||||
@@ -47,6 +47,7 @@ SENTENCE_ENDING_PUNCTUATION: FrozenSet[str] = frozenset(
|
||||
"!",
|
||||
"?",
|
||||
";",
|
||||
"…",
|
||||
# East Asian punctuation (Chinese (Traditional & Simplified), Japanese, Korean)
|
||||
"。", # Ideographic full stop
|
||||
"?", # Full-width question mark
|
||||
|
||||
@@ -7,10 +7,12 @@
|
||||
"""Unit tests for ServiceSwitcher and related components."""
|
||||
|
||||
import unittest
|
||||
from dataclasses import dataclass
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
ManuallySwitchServiceFrame,
|
||||
SystemFrame,
|
||||
TextFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
@@ -52,6 +54,13 @@ class MockFrameProcessor(FrameProcessor):
|
||||
self.frame_count = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class DummySystemFrame(SystemFrame):
|
||||
"""A dummy system frame for testing purposes."""
|
||||
|
||||
text: str = ""
|
||||
|
||||
|
||||
class TestServiceSwitcherStrategyManual(unittest.IsolatedAsyncioTestCase):
|
||||
"""Test cases for ServiceSwitcherStrategyManual."""
|
||||
|
||||
@@ -140,14 +149,22 @@ class TestServiceSwitcher(unittest.IsolatedAsyncioTestCase):
|
||||
# Send some test frames
|
||||
frames_to_send = [
|
||||
TextFrame(text="Hello 1"),
|
||||
DummySystemFrame(text="System Message 1"),
|
||||
TextFrame(text="Hello 2"),
|
||||
DummySystemFrame(text="System Message 2"),
|
||||
TextFrame(text="Hello 3"),
|
||||
]
|
||||
|
||||
await run_test(
|
||||
switcher,
|
||||
frames_to_send=frames_to_send,
|
||||
expected_down_frames=[TextFrame, TextFrame, TextFrame],
|
||||
expected_down_frames=[
|
||||
DummySystemFrame,
|
||||
DummySystemFrame,
|
||||
TextFrame,
|
||||
TextFrame,
|
||||
TextFrame,
|
||||
],
|
||||
expected_up_frames=[], # Expect no error frames
|
||||
)
|
||||
|
||||
@@ -156,7 +173,13 @@ class TestServiceSwitcher(unittest.IsolatedAsyncioTestCase):
|
||||
text_frames = [f for f in self.service1.processed_frames if isinstance(f, TextFrame)]
|
||||
self.assertEqual(len(text_frames), 3)
|
||||
|
||||
# Check that other services don't receive text frames (they might get StartFrame/EndFrame)
|
||||
# Only service1 should have processed the system frames
|
||||
system_frames = [
|
||||
f for f in self.service1.processed_frames if isinstance(f, DummySystemFrame)
|
||||
]
|
||||
self.assertEqual(len(system_frames), 2)
|
||||
|
||||
# Check that other services don't receive text frames (they still get StartFrame/EndFrame)
|
||||
service2_text_frames = [
|
||||
f for f in self.service2.processed_frames if isinstance(f, TextFrame)
|
||||
]
|
||||
@@ -166,10 +189,24 @@ class TestServiceSwitcher(unittest.IsolatedAsyncioTestCase):
|
||||
self.assertEqual(len(service2_text_frames), 0)
|
||||
self.assertEqual(len(service3_text_frames), 0)
|
||||
|
||||
# Check that other services don't receive dummy system frames (they still get StartFrame/EndFrame)
|
||||
service2_system_frames = [
|
||||
f for f in self.service2.processed_frames if isinstance(f, DummySystemFrame)
|
||||
]
|
||||
service3_system_frames = [
|
||||
f for f in self.service3.processed_frames if isinstance(f, DummySystemFrame)
|
||||
]
|
||||
self.assertEqual(len(service2_system_frames), 0)
|
||||
self.assertEqual(len(service3_system_frames), 0)
|
||||
|
||||
# Verify the actual text frames processed
|
||||
for i, frame in enumerate(text_frames):
|
||||
self.assertEqual(frame.text, f"Hello {i + 1}")
|
||||
|
||||
# Verify the actual system frames processed
|
||||
for i, frame in enumerate(system_frames):
|
||||
self.assertEqual(frame.text, f"System Message {i + 1}")
|
||||
|
||||
async def test_service_switching(self):
|
||||
"""Test that after service switching using ManuallySwitchServiceFrame, the new active service receives frames while others don't."""
|
||||
switcher = ServiceSwitcher(self.services, ServiceSwitcherStrategyManual)
|
||||
|
||||
Reference in New Issue
Block a user