Compare commits

...

6 Commits

Author SHA1 Message Date
James Hush
d41ee5319e Add example for getting just recording links 2025-10-23 14:37:39 +08:00
James Hush
45256903b5 Remove log 2025-10-20 17:03:31 +08:00
James Hush
d37e63d972 Simplify create_daily_room.py 2025-10-20 17:02:05 +08:00
James Hush
013f869259 Simplfy fetch_s3_recording 2025-10-20 16:54:24 +08:00
James Hush
11594003e2 This is working 2025-10-20 16:49:37 +08:00
James Hush
2dd2a17b19 Backoff examples 2025-10-20 16:46:16 +08:00
4 changed files with 4794 additions and 4040 deletions

250
create_daily_room.py Executable file
View File

@@ -0,0 +1,250 @@
#!/usr/bin/env -S uv run
"""Utilities for creating Daily.co rooms with retry logic.
This module provides functions to create Daily rooms via REST API
with robust error handling, rate limiting, and exponential backoff retry logic.
"""
import asyncio
import os
import time
from typing import Dict, Optional
from httpx import AsyncClient, HTTPStatusError
from loguru import logger
from tenacity import (
AsyncRetrying,
RetryError,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
async def periodic_progress_logger(
progress_dict: Dict[str, int],
total: int,
interval_seconds: float = 5.0,
stop_event: Optional[asyncio.Event] = None,
):
"""Log progress periodically in the background.
Args:
progress_dict: Shared dict with 'completed' and 'failed' counts
total: Total number of items being processed
interval_seconds: How often to log progress (default 5 seconds)
stop_event: Event to signal when to stop logging
"""
if stop_event is None:
stop_event = asyncio.Event()
while not stop_event.is_set():
await asyncio.sleep(interval_seconds)
if stop_event.is_set():
break
total_processed = progress_dict["completed"] + progress_dict["failed"]
if total_processed > 0:
percentage = (total_processed / total) * 100
rate = total_processed / interval_seconds if interval_seconds > 0 else 0
logger.info(
f"⏳ Progress: {total_processed}/{total} ({percentage:.1f}%) - "
f"{progress_dict['completed']} succeeded, "
f"{progress_dict['failed']} failed"
)
async def create_daily_room(
name: Optional[str] = None,
privacy: str = "public",
exp_minutes: int = 10,
max_retries: int = 5,
) -> Optional[Dict]:
"""Create a Daily room with automatic retry on rate limit errors.
Uses tenacity library to handle rate limiting (429 errors) with
exponential backoff and automatic retries.
Args:
name: Room name (auto-generated if None). Must match /[A-Za-z0-9_-]+/ and be <= 128 chars
privacy: Room privacy setting ("public" or "private")
exp_minutes: Minutes until room expires (default 10)
max_retries: Maximum number of retry attempts on rate limit (default 5)
Returns:
Room object dict with 'name', 'url', 'id', 'config', etc., or None on failure
"""
# Calculate expiration timestamp (unix timestamp in seconds)
exp_timestamp = int(time.time()) + (exp_minutes * 60)
# Build request body
body = {
"privacy": privacy,
"properties": {
"exp": exp_timestamp,
},
}
if name:
body["name"] = name
headers = {
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
"Content-Type": "application/json",
}
try:
# Use tenacity's AsyncRetrying for automatic retry with exponential backoff
async for attempt in AsyncRetrying(
retry=retry_if_exception_type(HTTPStatusError),
stop=stop_after_attempt(max_retries),
wait=wait_exponential(multiplier=1, min=1, max=60),
reraise=True,
):
with attempt:
async with AsyncClient(timeout=30) as client:
response = await client.post(
url="https://api.daily.co/v1/rooms",
headers=headers,
json=body,
)
response.raise_for_status()
return response.json()
# This line should never be reached due to reraise=True, but satisfies type checker
return None
except RetryError as e:
# All retries exhausted
last_exception = e.last_attempt.exception()
logger.exception(f"Failed to create room after {max_retries} retries: {last_exception}")
return None
except Exception as e:
logger.exception(f"Unexpected error creating room: {e}")
return None
async def create_room_with_progress(
index: int, total: int, progress_dict: Dict[str, int], **kwargs
) -> Optional[Dict]:
"""Wrapper for create_daily_room that tracks progress.
Args:
index: Index of this room creation (0-based)
total: Total number of rooms being created
progress_dict: Shared dict for tracking progress {"completed": 0, "failed": 0}
**kwargs: Arguments passed to create_daily_room
Returns:
Room object dict or None
"""
result = await create_daily_room(**kwargs)
# Update progress
if result is not None:
progress_dict["completed"] += 1
else:
progress_dict["failed"] += 1
return result
async def test_create_rooms(
num_rooms: int = 1000,
progress_interval: float = 5.0,
) -> Dict[str, int | float]:
"""Attempt to create multiple Daily rooms concurrently.
This function demonstrates concurrent room creation and tracks
success/failure statistics. Rate limiting will likely occur when
creating many rooms quickly.
Args:
num_rooms: Number of rooms to attempt to create (default 1000)
progress_interval: How often to log progress in seconds (default 5.0)
Returns:
Dict with statistics: {'success': int, 'failed': int, 'total': int, 'elapsed_seconds': float}
"""
logger.info(f"Starting bulk room creation: attempting to create {num_rooms} rooms")
start_time = time.time()
# Shared progress tracking dictionary
progress_dict = {"completed": 0, "failed": 0}
# Start background progress logger
stop_event = asyncio.Event()
progress_task = asyncio.create_task(
periodic_progress_logger(progress_dict, num_rooms, progress_interval, stop_event)
)
# Create and execute all tasks concurrently
logger.info(f"Executing {num_rooms} concurrent room creation requests...")
tasks = [
create_room_with_progress(
index=i,
total=num_rooms,
progress_dict=progress_dict,
name=None, # Auto-generate names
privacy="public",
exp_minutes=10,
max_retries=5,
)
for i in range(num_rooms)
]
try:
results = await asyncio.gather(*tasks, return_exceptions=True)
finally:
# Stop the progress logger
stop_event.set()
await progress_task
# Count successes and failures
success_count = sum(1 for r in results if r is not None and not isinstance(r, Exception))
failed_count = num_rooms - success_count
elapsed_time = time.time() - start_time
# Log statistics
logger.info("=" * 60)
logger.info("BULK ROOM CREATION SUMMARY")
logger.info("=" * 60)
logger.info(f"Total rooms attempted: {num_rooms}")
logger.info(f"Successfully created: {success_count}")
logger.info(f"Failed to create: {failed_count}")
logger.info(f"Success rate: {(success_count / num_rooms * 100):.2f}%")
logger.info(f"Total time: {elapsed_time:.2f} seconds")
logger.info(f"Average time per room: {(elapsed_time / num_rooms):.3f} seconds")
logger.info("=" * 60)
return {
"success": success_count,
"failed": failed_count,
"total": num_rooms,
"elapsed_seconds": elapsed_time,
}
# Example usage
async def main():
"""Example usage of the room creation functions."""
# Test creating a single room
logger.info("Testing single room creation...")
room = await create_daily_room(exp_minutes=10)
if room:
logger.info(f"Created room: {room['name']} at {room['url']}")
else:
logger.error("Failed to create room")
# Uncomment to test bulk creation (warning: may hit rate limits!)
logger.info("\nTesting bulk room creation...")
stats = await test_create_rooms(num_rooms=1000)
logger.info(f"Final stats: {stats}")
if __name__ == "__main__":
asyncio.run(main())

267
fetch_s3_recording.py Executable file
View File

@@ -0,0 +1,267 @@
#!/usr/bin/env -S uv run
"""Utilities for fetching Daily.co recording URLs with retry logic.
This module provides functions to retrieve recording download links from Daily's REST API
with robust error handling, rate limiting, and exponential backoff retry logic.
"""
import asyncio
import os
from typing import Optional, Tuple
from httpx import AsyncClient, HTTPStatusError
from loguru import logger
from tenacity import (
AsyncRetrying,
RetryError,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
async def get_recording_s3_url_with_retry(
room_id: str,
max_retries: int = 5,
) -> Tuple[Optional[str], Optional[str]]:
"""Retrieve recording URL with exponential backoff and retry logic.
Uses tenacity library to handle rate limiting (429 errors) and other
transient errors with automatic exponential backoff.
Args:
room_id: Daily.co room identifier
max_retries: Maximum number of retry attempts (default 5)
Returns:
Tuple of (recording_url, recording_signed_url)
Returns (None, None) if no recording exists for the room.
"""
try:
# Use tenacity's AsyncRetrying for automatic retry with exponential backoff
async for attempt in AsyncRetrying(
retry=retry_if_exception_type((HTTPStatusError, Exception)),
stop=stop_after_attempt(max_retries),
wait=wait_exponential(multiplier=1, min=1, max=60),
reraise=True,
):
with attempt:
recording_url, recording_signed_url, status = await get_recording_s3_url(
room_id=room_id
)
# If no recording exists (status is None), return immediately - no retry
if status is None:
logger.debug(f"No recording found for room {room_id}")
return None, None
# If recording exists but is not finished yet, retry
if status != "finished":
logger.warning(
f"Recording not finished for room {room_id}, status: {status} "
f"(attempt {attempt.retry_state.attempt_number}/{max_retries})"
)
raise Exception(f"Recording not ready, status: {status}")
# Recording is finished, return the URLs
return recording_url, recording_signed_url
# This line should never be reached due to reraise=True, but satisfies type checker
return None, None
except RetryError as e:
# All retries exhausted
last_exception = e.last_attempt.exception()
logger.error(
f"Failed to retrieve recording URL for room {room_id} after {max_retries} attempts: "
f"{last_exception}"
)
return None, None
except Exception as e:
logger.exception(f"Unexpected error retrieving recording for room {room_id}: {e}")
return None, None
async def get_recording_s3_url(
room_id: str,
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
"""Get recording URL using Daily's REST API.
Args:
room_id: Daily.co room identifier
Returns:
Tuple of (recording_url, recording_signed_url, status)
- recording_url: The download link for the recording
- recording_signed_url: Same as recording_url (kept for backward compatibility)
- status: Recording status from Daily API
Raises:
HTTPStatusError: When HTTP errors occur (including rate limits)
"""
headers = {
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
"Content-Type": "application/json",
}
async with AsyncClient(timeout=180) as client:
# List recordings for the room
list_response = await client.get(
url=f"https://api.daily.co/v1/recordings?room_name={room_id}",
headers=headers,
)
list_response.raise_for_status()
list_data = list_response.json()
# Check if recording exists and is finished
if not list_data.get("data") or len(list_data["data"]) == 0:
return (None, None, None)
recording_id = list_data["data"][0].get("id")
status = list_data["data"][0].get("status")
if not recording_id or status != "finished":
return (None, None, status)
# Get the recording access link
link_response = await client.get(
url=f"https://api.daily.co/v1/recordings/{recording_id}/access-link",
headers=headers,
)
link_response.raise_for_status()
link_data = link_response.json()
recording_url = link_data.get("download_link")
if not recording_url:
logger.warning(f"No download link found for recording {recording_id}")
return (None, None, status)
# Return the same URL for both fields for backward compatibility
return (recording_url, recording_url, status)
async def get_recent_recordings(limit: int = 100) -> list[str]:
"""Get list of recent recording IDs from Daily API.
Args:
limit: Maximum number of recordings to retrieve (default 100)
Returns:
List of recording IDs (strings)
"""
try:
async with AsyncClient() as client:
response = await client.get(
url="https://api.daily.co/v1/recordings",
headers={
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
"Content-Type": "application/json",
},
timeout=30,
)
response.raise_for_status()
data = response.json()
recordings = data.get("data", [])
recording_ids = [rec.get("id") for rec in recordings[:limit] if rec.get("id")]
logger.info(f"Retrieved {len(recording_ids)} recording IDs from Daily API")
return recording_ids
except Exception as e:
logger.exception(f"Failed to get recordings from Daily API: {e}")
return []
async def main():
"""Test get_recording_s3_url_with_retry with recent recordings."""
logger.info("Starting recording fetch test...")
# Step 1: Get the most recent 100 recordings
logger.info("Fetching recent recordings...")
recording_ids = await get_recent_recordings(limit=100)
if not recording_ids:
logger.error("No recordings found. Cannot proceed with test.")
return
logger.info(f"Found {len(recording_ids)} recordings to fetch")
# Fetch access links for each recording concurrently
logger.info(
f"Attempting to fetch access links for {len(recording_ids)} recordings concurrently..."
)
# Create tasks for all recordings
async def get_recording_link(recording_id: str) -> Tuple[Optional[str], Optional[str]]:
"""Get download link for a specific recording ID."""
try:
headers = {
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
"Content-Type": "application/json",
}
async with AsyncClient(timeout=180) as client:
# Get the recording access link
link_response = await client.get(
url=f"https://api.daily.co/v1/recordings/{recording_id}/access-link",
headers=headers,
)
link_response.raise_for_status()
link_data = link_response.json()
recording_url = link_data.get("download_link")
if not recording_url:
logger.warning(f"No download link found for recording {recording_id}")
return (None, None)
return (recording_url, recording_url)
except Exception as e:
logger.exception(f"Failed to get access link for recording {recording_id}: {e}")
return (None, None)
tasks = [get_recording_link(recording_id) for recording_id in recording_ids]
# Execute all tasks concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
success_count = 0
not_found_count = 0
failed_count = 0
for i, (recording_id, result) in enumerate(zip(recording_ids, results), 1):
if isinstance(result, Exception):
failed_count += 1
logger.error(f"❌ [{i}/{len(recording_ids)}] Failed for {recording_id}: {result}")
elif isinstance(result, tuple) and len(result) == 2:
recording_url, recording_signed_url = result
if recording_url:
success_count += 1
logger.info(
f"✅ [{i}/{len(recording_ids)}] Found recording link for {recording_id}"
)
logger.debug(f" URL: {recording_url}")
else:
not_found_count += 1
logger.debug(f" [{i}/{len(recording_ids)}] No link for {recording_id}")
else:
failed_count += 1
logger.error(f"❌ [{i}/{len(recording_ids)}] Unexpected result type for {recording_id}")
# Summary
logger.info("\n" + "=" * 60)
logger.info("RECORDING FETCH TEST SUMMARY")
logger.info("=" * 60)
logger.info(f"Total recordings checked: {len(recording_ids)}")
logger.info(f"✅ Recordings found: {success_count}")
logger.info(f" No recordings: {not_found_count}")
logger.info(f"❌ Failed: {failed_count}")
logger.info(f"Success rate: {(success_count / len(recording_ids) * 100):.2f}%")
logger.info("=" * 60)
if __name__ == "__main__":
asyncio.run(main())

238
fetch_s3_recording_by_room.py Executable file
View File

@@ -0,0 +1,238 @@
#!/usr/bin/env -S uv run
"""Utilities for fetching Daily.co recording URLs with retry logic.
This module provides functions to retrieve recording download links from Daily's REST API
with robust error handling, rate limiting, and exponential backoff retry logic.
"""
import asyncio
import os
from typing import Optional, Tuple
from httpx import AsyncClient, HTTPStatusError
from loguru import logger
from tenacity import (
AsyncRetrying,
RetryError,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
async def get_recording_s3_url_with_retry(
room_id: str,
max_retries: int = 5,
) -> Tuple[Optional[str], Optional[str]]:
"""Retrieve recording URL with exponential backoff and retry logic.
Uses tenacity library to handle rate limiting (429 errors) and other
transient errors with automatic exponential backoff.
Args:
room_id: Daily.co room identifier
max_retries: Maximum number of retry attempts (default 5)
Returns:
Tuple of (recording_url, recording_signed_url)
Returns (None, None) if no recording exists for the room.
"""
try:
# Use tenacity's AsyncRetrying for automatic retry with exponential backoff
async for attempt in AsyncRetrying(
retry=retry_if_exception_type((HTTPStatusError, Exception)),
stop=stop_after_attempt(max_retries),
wait=wait_exponential(multiplier=1, min=1, max=60),
reraise=True,
):
with attempt:
recording_url, recording_signed_url, status = await get_recording_s3_url(
room_id=room_id
)
# If no recording exists (status is None), return immediately - no retry
if status is None:
logger.debug(f"No recording found for room {room_id}")
return None, None
# If recording exists but is not finished yet, retry
if status != "finished":
logger.warning(
f"Recording not finished for room {room_id}, status: {status} "
f"(attempt {attempt.retry_state.attempt_number}/{max_retries})"
)
raise Exception(f"Recording not ready, status: {status}")
# Recording is finished, return the URLs
return recording_url, recording_signed_url
# This line should never be reached due to reraise=True, but satisfies type checker
return None, None
except RetryError as e:
# All retries exhausted
last_exception = e.last_attempt.exception()
logger.error(
f"Failed to retrieve recording URL for room {room_id} after {max_retries} attempts: "
f"{last_exception}"
)
return None, None
except Exception as e:
logger.exception(f"Unexpected error retrieving recording for room {room_id}: {e}")
return None, None
async def get_recording_s3_url(
room_id: str,
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
"""Get recording URL using Daily's REST API.
Args:
room_id: Daily.co room identifier
Returns:
Tuple of (recording_url, recording_signed_url, status)
- recording_url: The download link for the recording
- recording_signed_url: Same as recording_url (kept for backward compatibility)
- status: Recording status from Daily API
Raises:
HTTPStatusError: When HTTP errors occur (including rate limits)
"""
headers = {
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
"Content-Type": "application/json",
}
async with AsyncClient(timeout=180) as client:
# List recordings for the room
list_response = await client.get(
url=f"https://api.daily.co/v1/recordings?room_name={room_id}",
headers=headers,
)
list_response.raise_for_status()
list_data = list_response.json()
# Check if recording exists and is finished
if not list_data.get("data") or len(list_data["data"]) == 0:
return (None, None, None)
recording_id = list_data["data"][0].get("id")
status = list_data["data"][0].get("status")
if not recording_id or status != "finished":
return (None, None, status)
# Get the recording access link
link_response = await client.get(
url=f"https://api.daily.co/v1/recordings/{recording_id}/access-link",
headers=headers,
)
link_response.raise_for_status()
link_data = link_response.json()
recording_url = link_data.get("download_link")
if not recording_url:
logger.warning(f"No download link found for recording {recording_id}")
return (None, None, status)
# Return the same URL for both fields for backward compatibility
return (recording_url, recording_url, status)
async def get_recent_rooms(limit: int = 100) -> list[str]:
"""Get list of recent room names from Daily API.
Args:
limit: Maximum number of rooms to retrieve (default 100, max 100)
Returns:
List of room names (strings)
"""
try:
async with AsyncClient() as client:
response = await client.get(
url=f"https://api.daily.co/v1/rooms?limit={min(limit, 100)}",
headers={
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
"Content-Type": "application/json",
},
timeout=30,
)
response.raise_for_status()
data = response.json()
rooms = data.get("data", [])
room_names = [room.get("name") for room in rooms if room.get("name")]
logger.info(f"Retrieved {len(room_names)} room names from Daily API")
return room_names
except Exception as e:
logger.exception(f"Failed to get rooms from Daily API: {e}")
return []
async def main():
"""Test get_recording_s3_url_with_retry with recent rooms."""
logger.info("Starting recording fetch test...")
# Step 1: Get the most recent 100 rooms
logger.info("Fetching recent rooms...")
room_names = await get_recent_rooms(limit=100)
if not room_names:
logger.error("No rooms found. Cannot proceed with test.")
return
logger.info(f"Found {len(room_names)} rooms to check for recordings")
# Call get_recording_s3_url_with_retry on each room concurrently
logger.info(f"Attempting to fetch recordings for {len(room_names)} rooms concurrently...")
# Create tasks for all rooms
tasks = [
get_recording_s3_url_with_retry(room_id=room_name, max_retries=3)
for room_name in room_names
]
# Execute all tasks concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
success_count = 0
not_found_count = 0
failed_count = 0
for i, (room_name, result) in enumerate(zip(room_names, results), 1):
if isinstance(result, Exception):
failed_count += 1
logger.error(f"❌ [{i}/{len(room_names)}] Failed for {room_name}: {result}")
elif isinstance(result, tuple) and len(result) == 2:
recording_url, recording_signed_url = result
if recording_url:
success_count += 1
logger.info(f"✅ [{i}/{len(room_names)}] Found recording for {room_name}")
logger.debug(f" URL: {recording_url[:80]}...")
else:
not_found_count += 1
logger.debug(f" [{i}/{len(room_names)}] No recording for {room_name}")
else:
failed_count += 1
logger.error(f"❌ [{i}/{len(room_names)}] Unexpected result type for {room_name}")
# Summary
logger.info("\n" + "=" * 60)
logger.info("RECORDING FETCH TEST SUMMARY")
logger.info("=" * 60)
logger.info(f"Total rooms checked: {len(room_names)}")
logger.info(f"✅ Recordings found: {success_count}")
logger.info(f" No recordings: {not_found_count}")
logger.info(f"❌ Failed: {failed_count}")
logger.info(f"Success rate: {(success_count / len(room_names) * 100):.2f}%")
logger.info("=" * 60)
if __name__ == "__main__":
asyncio.run(main())

8079
uv.lock generated

File diff suppressed because it is too large Load Diff