Compare commits
6 Commits
hush/TurnT
...
hush/backo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d41ee5319e | ||
|
|
45256903b5 | ||
|
|
d37e63d972 | ||
|
|
013f869259 | ||
|
|
11594003e2 | ||
|
|
2dd2a17b19 |
250
create_daily_room.py
Executable file
250
create_daily_room.py
Executable file
@@ -0,0 +1,250 @@
|
||||
#!/usr/bin/env -S uv run
|
||||
"""Utilities for creating Daily.co rooms with retry logic.
|
||||
|
||||
This module provides functions to create Daily rooms via REST API
|
||||
with robust error handling, rate limiting, and exponential backoff retry logic.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import time
|
||||
from typing import Dict, Optional
|
||||
|
||||
from httpx import AsyncClient, HTTPStatusError
|
||||
from loguru import logger
|
||||
from tenacity import (
|
||||
AsyncRetrying,
|
||||
RetryError,
|
||||
retry_if_exception_type,
|
||||
stop_after_attempt,
|
||||
wait_exponential,
|
||||
)
|
||||
|
||||
|
||||
async def periodic_progress_logger(
|
||||
progress_dict: Dict[str, int],
|
||||
total: int,
|
||||
interval_seconds: float = 5.0,
|
||||
stop_event: Optional[asyncio.Event] = None,
|
||||
):
|
||||
"""Log progress periodically in the background.
|
||||
|
||||
Args:
|
||||
progress_dict: Shared dict with 'completed' and 'failed' counts
|
||||
total: Total number of items being processed
|
||||
interval_seconds: How often to log progress (default 5 seconds)
|
||||
stop_event: Event to signal when to stop logging
|
||||
"""
|
||||
if stop_event is None:
|
||||
stop_event = asyncio.Event()
|
||||
|
||||
while not stop_event.is_set():
|
||||
await asyncio.sleep(interval_seconds)
|
||||
|
||||
if stop_event.is_set():
|
||||
break
|
||||
|
||||
total_processed = progress_dict["completed"] + progress_dict["failed"]
|
||||
if total_processed > 0:
|
||||
percentage = (total_processed / total) * 100
|
||||
rate = total_processed / interval_seconds if interval_seconds > 0 else 0
|
||||
|
||||
logger.info(
|
||||
f"⏳ Progress: {total_processed}/{total} ({percentage:.1f}%) - "
|
||||
f"✅ {progress_dict['completed']} succeeded, "
|
||||
f"❌ {progress_dict['failed']} failed"
|
||||
)
|
||||
|
||||
|
||||
async def create_daily_room(
|
||||
name: Optional[str] = None,
|
||||
privacy: str = "public",
|
||||
exp_minutes: int = 10,
|
||||
max_retries: int = 5,
|
||||
) -> Optional[Dict]:
|
||||
"""Create a Daily room with automatic retry on rate limit errors.
|
||||
|
||||
Uses tenacity library to handle rate limiting (429 errors) with
|
||||
exponential backoff and automatic retries.
|
||||
|
||||
Args:
|
||||
name: Room name (auto-generated if None). Must match /[A-Za-z0-9_-]+/ and be <= 128 chars
|
||||
privacy: Room privacy setting ("public" or "private")
|
||||
exp_minutes: Minutes until room expires (default 10)
|
||||
max_retries: Maximum number of retry attempts on rate limit (default 5)
|
||||
|
||||
Returns:
|
||||
Room object dict with 'name', 'url', 'id', 'config', etc., or None on failure
|
||||
"""
|
||||
# Calculate expiration timestamp (unix timestamp in seconds)
|
||||
exp_timestamp = int(time.time()) + (exp_minutes * 60)
|
||||
|
||||
# Build request body
|
||||
body = {
|
||||
"privacy": privacy,
|
||||
"properties": {
|
||||
"exp": exp_timestamp,
|
||||
},
|
||||
}
|
||||
|
||||
if name:
|
||||
body["name"] = name
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
try:
|
||||
# Use tenacity's AsyncRetrying for automatic retry with exponential backoff
|
||||
async for attempt in AsyncRetrying(
|
||||
retry=retry_if_exception_type(HTTPStatusError),
|
||||
stop=stop_after_attempt(max_retries),
|
||||
wait=wait_exponential(multiplier=1, min=1, max=60),
|
||||
reraise=True,
|
||||
):
|
||||
with attempt:
|
||||
async with AsyncClient(timeout=30) as client:
|
||||
response = await client.post(
|
||||
url="https://api.daily.co/v1/rooms",
|
||||
headers=headers,
|
||||
json=body,
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
# This line should never be reached due to reraise=True, but satisfies type checker
|
||||
return None
|
||||
|
||||
except RetryError as e:
|
||||
# All retries exhausted
|
||||
last_exception = e.last_attempt.exception()
|
||||
logger.exception(f"Failed to create room after {max_retries} retries: {last_exception}")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Unexpected error creating room: {e}")
|
||||
return None
|
||||
|
||||
|
||||
async def create_room_with_progress(
|
||||
index: int, total: int, progress_dict: Dict[str, int], **kwargs
|
||||
) -> Optional[Dict]:
|
||||
"""Wrapper for create_daily_room that tracks progress.
|
||||
|
||||
Args:
|
||||
index: Index of this room creation (0-based)
|
||||
total: Total number of rooms being created
|
||||
progress_dict: Shared dict for tracking progress {"completed": 0, "failed": 0}
|
||||
**kwargs: Arguments passed to create_daily_room
|
||||
|
||||
Returns:
|
||||
Room object dict or None
|
||||
"""
|
||||
result = await create_daily_room(**kwargs)
|
||||
|
||||
# Update progress
|
||||
if result is not None:
|
||||
progress_dict["completed"] += 1
|
||||
else:
|
||||
progress_dict["failed"] += 1
|
||||
|
||||
return result
|
||||
|
||||
|
||||
async def test_create_rooms(
|
||||
num_rooms: int = 1000,
|
||||
progress_interval: float = 5.0,
|
||||
) -> Dict[str, int | float]:
|
||||
"""Attempt to create multiple Daily rooms concurrently.
|
||||
|
||||
This function demonstrates concurrent room creation and tracks
|
||||
success/failure statistics. Rate limiting will likely occur when
|
||||
creating many rooms quickly.
|
||||
|
||||
Args:
|
||||
num_rooms: Number of rooms to attempt to create (default 1000)
|
||||
progress_interval: How often to log progress in seconds (default 5.0)
|
||||
|
||||
Returns:
|
||||
Dict with statistics: {'success': int, 'failed': int, 'total': int, 'elapsed_seconds': float}
|
||||
"""
|
||||
logger.info(f"Starting bulk room creation: attempting to create {num_rooms} rooms")
|
||||
start_time = time.time()
|
||||
|
||||
# Shared progress tracking dictionary
|
||||
progress_dict = {"completed": 0, "failed": 0}
|
||||
|
||||
# Start background progress logger
|
||||
stop_event = asyncio.Event()
|
||||
progress_task = asyncio.create_task(
|
||||
periodic_progress_logger(progress_dict, num_rooms, progress_interval, stop_event)
|
||||
)
|
||||
|
||||
# Create and execute all tasks concurrently
|
||||
logger.info(f"Executing {num_rooms} concurrent room creation requests...")
|
||||
tasks = [
|
||||
create_room_with_progress(
|
||||
index=i,
|
||||
total=num_rooms,
|
||||
progress_dict=progress_dict,
|
||||
name=None, # Auto-generate names
|
||||
privacy="public",
|
||||
exp_minutes=10,
|
||||
max_retries=5,
|
||||
)
|
||||
for i in range(num_rooms)
|
||||
]
|
||||
|
||||
try:
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
finally:
|
||||
# Stop the progress logger
|
||||
stop_event.set()
|
||||
await progress_task
|
||||
|
||||
# Count successes and failures
|
||||
success_count = sum(1 for r in results if r is not None and not isinstance(r, Exception))
|
||||
failed_count = num_rooms - success_count
|
||||
|
||||
elapsed_time = time.time() - start_time
|
||||
|
||||
# Log statistics
|
||||
logger.info("=" * 60)
|
||||
logger.info("BULK ROOM CREATION SUMMARY")
|
||||
logger.info("=" * 60)
|
||||
logger.info(f"Total rooms attempted: {num_rooms}")
|
||||
logger.info(f"Successfully created: {success_count}")
|
||||
logger.info(f"Failed to create: {failed_count}")
|
||||
logger.info(f"Success rate: {(success_count / num_rooms * 100):.2f}%")
|
||||
logger.info(f"Total time: {elapsed_time:.2f} seconds")
|
||||
logger.info(f"Average time per room: {(elapsed_time / num_rooms):.3f} seconds")
|
||||
logger.info("=" * 60)
|
||||
|
||||
return {
|
||||
"success": success_count,
|
||||
"failed": failed_count,
|
||||
"total": num_rooms,
|
||||
"elapsed_seconds": elapsed_time,
|
||||
}
|
||||
|
||||
|
||||
# Example usage
|
||||
async def main():
|
||||
"""Example usage of the room creation functions."""
|
||||
# Test creating a single room
|
||||
logger.info("Testing single room creation...")
|
||||
room = await create_daily_room(exp_minutes=10)
|
||||
if room:
|
||||
logger.info(f"Created room: {room['name']} at {room['url']}")
|
||||
else:
|
||||
logger.error("Failed to create room")
|
||||
|
||||
# Uncomment to test bulk creation (warning: may hit rate limits!)
|
||||
logger.info("\nTesting bulk room creation...")
|
||||
stats = await test_create_rooms(num_rooms=1000)
|
||||
logger.info(f"Final stats: {stats}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
267
fetch_s3_recording.py
Executable file
267
fetch_s3_recording.py
Executable file
@@ -0,0 +1,267 @@
|
||||
#!/usr/bin/env -S uv run
|
||||
"""Utilities for fetching Daily.co recording URLs with retry logic.
|
||||
|
||||
This module provides functions to retrieve recording download links from Daily's REST API
|
||||
with robust error handling, rate limiting, and exponential backoff retry logic.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from httpx import AsyncClient, HTTPStatusError
|
||||
from loguru import logger
|
||||
from tenacity import (
|
||||
AsyncRetrying,
|
||||
RetryError,
|
||||
retry_if_exception_type,
|
||||
stop_after_attempt,
|
||||
wait_exponential,
|
||||
)
|
||||
|
||||
|
||||
async def get_recording_s3_url_with_retry(
|
||||
room_id: str,
|
||||
max_retries: int = 5,
|
||||
) -> Tuple[Optional[str], Optional[str]]:
|
||||
"""Retrieve recording URL with exponential backoff and retry logic.
|
||||
|
||||
Uses tenacity library to handle rate limiting (429 errors) and other
|
||||
transient errors with automatic exponential backoff.
|
||||
|
||||
Args:
|
||||
room_id: Daily.co room identifier
|
||||
max_retries: Maximum number of retry attempts (default 5)
|
||||
|
||||
Returns:
|
||||
Tuple of (recording_url, recording_signed_url)
|
||||
Returns (None, None) if no recording exists for the room.
|
||||
"""
|
||||
try:
|
||||
# Use tenacity's AsyncRetrying for automatic retry with exponential backoff
|
||||
async for attempt in AsyncRetrying(
|
||||
retry=retry_if_exception_type((HTTPStatusError, Exception)),
|
||||
stop=stop_after_attempt(max_retries),
|
||||
wait=wait_exponential(multiplier=1, min=1, max=60),
|
||||
reraise=True,
|
||||
):
|
||||
with attempt:
|
||||
recording_url, recording_signed_url, status = await get_recording_s3_url(
|
||||
room_id=room_id
|
||||
)
|
||||
|
||||
# If no recording exists (status is None), return immediately - no retry
|
||||
if status is None:
|
||||
logger.debug(f"No recording found for room {room_id}")
|
||||
return None, None
|
||||
|
||||
# If recording exists but is not finished yet, retry
|
||||
if status != "finished":
|
||||
logger.warning(
|
||||
f"Recording not finished for room {room_id}, status: {status} "
|
||||
f"(attempt {attempt.retry_state.attempt_number}/{max_retries})"
|
||||
)
|
||||
raise Exception(f"Recording not ready, status: {status}")
|
||||
|
||||
# Recording is finished, return the URLs
|
||||
return recording_url, recording_signed_url
|
||||
|
||||
# This line should never be reached due to reraise=True, but satisfies type checker
|
||||
return None, None
|
||||
|
||||
except RetryError as e:
|
||||
# All retries exhausted
|
||||
last_exception = e.last_attempt.exception()
|
||||
logger.error(
|
||||
f"Failed to retrieve recording URL for room {room_id} after {max_retries} attempts: "
|
||||
f"{last_exception}"
|
||||
)
|
||||
return None, None
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Unexpected error retrieving recording for room {room_id}: {e}")
|
||||
return None, None
|
||||
|
||||
|
||||
async def get_recording_s3_url(
|
||||
room_id: str,
|
||||
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
|
||||
"""Get recording URL using Daily's REST API.
|
||||
|
||||
Args:
|
||||
room_id: Daily.co room identifier
|
||||
|
||||
Returns:
|
||||
Tuple of (recording_url, recording_signed_url, status)
|
||||
- recording_url: The download link for the recording
|
||||
- recording_signed_url: Same as recording_url (kept for backward compatibility)
|
||||
- status: Recording status from Daily API
|
||||
|
||||
Raises:
|
||||
HTTPStatusError: When HTTP errors occur (including rate limits)
|
||||
"""
|
||||
headers = {
|
||||
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
async with AsyncClient(timeout=180) as client:
|
||||
# List recordings for the room
|
||||
list_response = await client.get(
|
||||
url=f"https://api.daily.co/v1/recordings?room_name={room_id}",
|
||||
headers=headers,
|
||||
)
|
||||
list_response.raise_for_status()
|
||||
list_data = list_response.json()
|
||||
|
||||
# Check if recording exists and is finished
|
||||
if not list_data.get("data") or len(list_data["data"]) == 0:
|
||||
return (None, None, None)
|
||||
|
||||
recording_id = list_data["data"][0].get("id")
|
||||
status = list_data["data"][0].get("status")
|
||||
|
||||
if not recording_id or status != "finished":
|
||||
return (None, None, status)
|
||||
|
||||
# Get the recording access link
|
||||
link_response = await client.get(
|
||||
url=f"https://api.daily.co/v1/recordings/{recording_id}/access-link",
|
||||
headers=headers,
|
||||
)
|
||||
link_response.raise_for_status()
|
||||
link_data = link_response.json()
|
||||
|
||||
recording_url = link_data.get("download_link")
|
||||
if not recording_url:
|
||||
logger.warning(f"No download link found for recording {recording_id}")
|
||||
return (None, None, status)
|
||||
|
||||
# Return the same URL for both fields for backward compatibility
|
||||
return (recording_url, recording_url, status)
|
||||
|
||||
|
||||
async def get_recent_recordings(limit: int = 100) -> list[str]:
|
||||
"""Get list of recent recording IDs from Daily API.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of recordings to retrieve (default 100)
|
||||
|
||||
Returns:
|
||||
List of recording IDs (strings)
|
||||
"""
|
||||
try:
|
||||
async with AsyncClient() as client:
|
||||
response = await client.get(
|
||||
url="https://api.daily.co/v1/recordings",
|
||||
headers={
|
||||
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
timeout=30,
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
data = response.json()
|
||||
recordings = data.get("data", [])
|
||||
recording_ids = [rec.get("id") for rec in recordings[:limit] if rec.get("id")]
|
||||
|
||||
logger.info(f"Retrieved {len(recording_ids)} recording IDs from Daily API")
|
||||
return recording_ids
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to get recordings from Daily API: {e}")
|
||||
return []
|
||||
|
||||
|
||||
async def main():
|
||||
"""Test get_recording_s3_url_with_retry with recent recordings."""
|
||||
logger.info("Starting recording fetch test...")
|
||||
|
||||
# Step 1: Get the most recent 100 recordings
|
||||
logger.info("Fetching recent recordings...")
|
||||
recording_ids = await get_recent_recordings(limit=100)
|
||||
|
||||
if not recording_ids:
|
||||
logger.error("No recordings found. Cannot proceed with test.")
|
||||
return
|
||||
|
||||
logger.info(f"Found {len(recording_ids)} recordings to fetch")
|
||||
|
||||
# Fetch access links for each recording concurrently
|
||||
logger.info(
|
||||
f"Attempting to fetch access links for {len(recording_ids)} recordings concurrently..."
|
||||
)
|
||||
|
||||
# Create tasks for all recordings
|
||||
async def get_recording_link(recording_id: str) -> Tuple[Optional[str], Optional[str]]:
|
||||
"""Get download link for a specific recording ID."""
|
||||
try:
|
||||
headers = {
|
||||
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
async with AsyncClient(timeout=180) as client:
|
||||
# Get the recording access link
|
||||
link_response = await client.get(
|
||||
url=f"https://api.daily.co/v1/recordings/{recording_id}/access-link",
|
||||
headers=headers,
|
||||
)
|
||||
link_response.raise_for_status()
|
||||
link_data = link_response.json()
|
||||
|
||||
recording_url = link_data.get("download_link")
|
||||
if not recording_url:
|
||||
logger.warning(f"No download link found for recording {recording_id}")
|
||||
return (None, None)
|
||||
|
||||
return (recording_url, recording_url)
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to get access link for recording {recording_id}: {e}")
|
||||
return (None, None)
|
||||
|
||||
tasks = [get_recording_link(recording_id) for recording_id in recording_ids]
|
||||
|
||||
# Execute all tasks concurrently
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
# Process results
|
||||
success_count = 0
|
||||
not_found_count = 0
|
||||
failed_count = 0
|
||||
|
||||
for i, (recording_id, result) in enumerate(zip(recording_ids, results), 1):
|
||||
if isinstance(result, Exception):
|
||||
failed_count += 1
|
||||
logger.error(f"❌ [{i}/{len(recording_ids)}] Failed for {recording_id}: {result}")
|
||||
elif isinstance(result, tuple) and len(result) == 2:
|
||||
recording_url, recording_signed_url = result
|
||||
if recording_url:
|
||||
success_count += 1
|
||||
logger.info(
|
||||
f"✅ [{i}/{len(recording_ids)}] Found recording link for {recording_id}"
|
||||
)
|
||||
logger.debug(f" URL: {recording_url}")
|
||||
else:
|
||||
not_found_count += 1
|
||||
logger.debug(f"ℹ️ [{i}/{len(recording_ids)}] No link for {recording_id}")
|
||||
else:
|
||||
failed_count += 1
|
||||
logger.error(f"❌ [{i}/{len(recording_ids)}] Unexpected result type for {recording_id}")
|
||||
|
||||
# Summary
|
||||
logger.info("\n" + "=" * 60)
|
||||
logger.info("RECORDING FETCH TEST SUMMARY")
|
||||
logger.info("=" * 60)
|
||||
logger.info(f"Total recordings checked: {len(recording_ids)}")
|
||||
logger.info(f"✅ Recordings found: {success_count}")
|
||||
logger.info(f"ℹ️ No recordings: {not_found_count}")
|
||||
logger.info(f"❌ Failed: {failed_count}")
|
||||
logger.info(f"Success rate: {(success_count / len(recording_ids) * 100):.2f}%")
|
||||
logger.info("=" * 60)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
238
fetch_s3_recording_by_room.py
Executable file
238
fetch_s3_recording_by_room.py
Executable file
@@ -0,0 +1,238 @@
|
||||
#!/usr/bin/env -S uv run
|
||||
"""Utilities for fetching Daily.co recording URLs with retry logic.
|
||||
|
||||
This module provides functions to retrieve recording download links from Daily's REST API
|
||||
with robust error handling, rate limiting, and exponential backoff retry logic.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from httpx import AsyncClient, HTTPStatusError
|
||||
from loguru import logger
|
||||
from tenacity import (
|
||||
AsyncRetrying,
|
||||
RetryError,
|
||||
retry_if_exception_type,
|
||||
stop_after_attempt,
|
||||
wait_exponential,
|
||||
)
|
||||
|
||||
|
||||
async def get_recording_s3_url_with_retry(
|
||||
room_id: str,
|
||||
max_retries: int = 5,
|
||||
) -> Tuple[Optional[str], Optional[str]]:
|
||||
"""Retrieve recording URL with exponential backoff and retry logic.
|
||||
|
||||
Uses tenacity library to handle rate limiting (429 errors) and other
|
||||
transient errors with automatic exponential backoff.
|
||||
|
||||
Args:
|
||||
room_id: Daily.co room identifier
|
||||
max_retries: Maximum number of retry attempts (default 5)
|
||||
|
||||
Returns:
|
||||
Tuple of (recording_url, recording_signed_url)
|
||||
Returns (None, None) if no recording exists for the room.
|
||||
"""
|
||||
try:
|
||||
# Use tenacity's AsyncRetrying for automatic retry with exponential backoff
|
||||
async for attempt in AsyncRetrying(
|
||||
retry=retry_if_exception_type((HTTPStatusError, Exception)),
|
||||
stop=stop_after_attempt(max_retries),
|
||||
wait=wait_exponential(multiplier=1, min=1, max=60),
|
||||
reraise=True,
|
||||
):
|
||||
with attempt:
|
||||
recording_url, recording_signed_url, status = await get_recording_s3_url(
|
||||
room_id=room_id
|
||||
)
|
||||
|
||||
# If no recording exists (status is None), return immediately - no retry
|
||||
if status is None:
|
||||
logger.debug(f"No recording found for room {room_id}")
|
||||
return None, None
|
||||
|
||||
# If recording exists but is not finished yet, retry
|
||||
if status != "finished":
|
||||
logger.warning(
|
||||
f"Recording not finished for room {room_id}, status: {status} "
|
||||
f"(attempt {attempt.retry_state.attempt_number}/{max_retries})"
|
||||
)
|
||||
raise Exception(f"Recording not ready, status: {status}")
|
||||
|
||||
# Recording is finished, return the URLs
|
||||
return recording_url, recording_signed_url
|
||||
|
||||
# This line should never be reached due to reraise=True, but satisfies type checker
|
||||
return None, None
|
||||
|
||||
except RetryError as e:
|
||||
# All retries exhausted
|
||||
last_exception = e.last_attempt.exception()
|
||||
logger.error(
|
||||
f"Failed to retrieve recording URL for room {room_id} after {max_retries} attempts: "
|
||||
f"{last_exception}"
|
||||
)
|
||||
return None, None
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Unexpected error retrieving recording for room {room_id}: {e}")
|
||||
return None, None
|
||||
|
||||
|
||||
async def get_recording_s3_url(
|
||||
room_id: str,
|
||||
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
|
||||
"""Get recording URL using Daily's REST API.
|
||||
|
||||
Args:
|
||||
room_id: Daily.co room identifier
|
||||
|
||||
Returns:
|
||||
Tuple of (recording_url, recording_signed_url, status)
|
||||
- recording_url: The download link for the recording
|
||||
- recording_signed_url: Same as recording_url (kept for backward compatibility)
|
||||
- status: Recording status from Daily API
|
||||
|
||||
Raises:
|
||||
HTTPStatusError: When HTTP errors occur (including rate limits)
|
||||
"""
|
||||
headers = {
|
||||
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
async with AsyncClient(timeout=180) as client:
|
||||
# List recordings for the room
|
||||
list_response = await client.get(
|
||||
url=f"https://api.daily.co/v1/recordings?room_name={room_id}",
|
||||
headers=headers,
|
||||
)
|
||||
list_response.raise_for_status()
|
||||
list_data = list_response.json()
|
||||
|
||||
# Check if recording exists and is finished
|
||||
if not list_data.get("data") or len(list_data["data"]) == 0:
|
||||
return (None, None, None)
|
||||
|
||||
recording_id = list_data["data"][0].get("id")
|
||||
status = list_data["data"][0].get("status")
|
||||
|
||||
if not recording_id or status != "finished":
|
||||
return (None, None, status)
|
||||
|
||||
# Get the recording access link
|
||||
link_response = await client.get(
|
||||
url=f"https://api.daily.co/v1/recordings/{recording_id}/access-link",
|
||||
headers=headers,
|
||||
)
|
||||
link_response.raise_for_status()
|
||||
link_data = link_response.json()
|
||||
|
||||
recording_url = link_data.get("download_link")
|
||||
if not recording_url:
|
||||
logger.warning(f"No download link found for recording {recording_id}")
|
||||
return (None, None, status)
|
||||
|
||||
# Return the same URL for both fields for backward compatibility
|
||||
return (recording_url, recording_url, status)
|
||||
|
||||
|
||||
async def get_recent_rooms(limit: int = 100) -> list[str]:
|
||||
"""Get list of recent room names from Daily API.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of rooms to retrieve (default 100, max 100)
|
||||
|
||||
Returns:
|
||||
List of room names (strings)
|
||||
"""
|
||||
try:
|
||||
async with AsyncClient() as client:
|
||||
response = await client.get(
|
||||
url=f"https://api.daily.co/v1/rooms?limit={min(limit, 100)}",
|
||||
headers={
|
||||
"Authorization": f"Bearer {os.getenv('DAILY_API_KEY')}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
timeout=30,
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
data = response.json()
|
||||
rooms = data.get("data", [])
|
||||
room_names = [room.get("name") for room in rooms if room.get("name")]
|
||||
|
||||
logger.info(f"Retrieved {len(room_names)} room names from Daily API")
|
||||
return room_names
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to get rooms from Daily API: {e}")
|
||||
return []
|
||||
|
||||
|
||||
async def main():
|
||||
"""Test get_recording_s3_url_with_retry with recent rooms."""
|
||||
logger.info("Starting recording fetch test...")
|
||||
|
||||
# Step 1: Get the most recent 100 rooms
|
||||
logger.info("Fetching recent rooms...")
|
||||
room_names = await get_recent_rooms(limit=100)
|
||||
|
||||
if not room_names:
|
||||
logger.error("No rooms found. Cannot proceed with test.")
|
||||
return
|
||||
|
||||
logger.info(f"Found {len(room_names)} rooms to check for recordings")
|
||||
|
||||
# Call get_recording_s3_url_with_retry on each room concurrently
|
||||
logger.info(f"Attempting to fetch recordings for {len(room_names)} rooms concurrently...")
|
||||
|
||||
# Create tasks for all rooms
|
||||
tasks = [
|
||||
get_recording_s3_url_with_retry(room_id=room_name, max_retries=3)
|
||||
for room_name in room_names
|
||||
]
|
||||
|
||||
# Execute all tasks concurrently
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
# Process results
|
||||
success_count = 0
|
||||
not_found_count = 0
|
||||
failed_count = 0
|
||||
|
||||
for i, (room_name, result) in enumerate(zip(room_names, results), 1):
|
||||
if isinstance(result, Exception):
|
||||
failed_count += 1
|
||||
logger.error(f"❌ [{i}/{len(room_names)}] Failed for {room_name}: {result}")
|
||||
elif isinstance(result, tuple) and len(result) == 2:
|
||||
recording_url, recording_signed_url = result
|
||||
if recording_url:
|
||||
success_count += 1
|
||||
logger.info(f"✅ [{i}/{len(room_names)}] Found recording for {room_name}")
|
||||
logger.debug(f" URL: {recording_url[:80]}...")
|
||||
else:
|
||||
not_found_count += 1
|
||||
logger.debug(f"ℹ️ [{i}/{len(room_names)}] No recording for {room_name}")
|
||||
else:
|
||||
failed_count += 1
|
||||
logger.error(f"❌ [{i}/{len(room_names)}] Unexpected result type for {room_name}")
|
||||
|
||||
# Summary
|
||||
logger.info("\n" + "=" * 60)
|
||||
logger.info("RECORDING FETCH TEST SUMMARY")
|
||||
logger.info("=" * 60)
|
||||
logger.info(f"Total rooms checked: {len(room_names)}")
|
||||
logger.info(f"✅ Recordings found: {success_count}")
|
||||
logger.info(f"ℹ️ No recordings: {not_found_count}")
|
||||
logger.info(f"❌ Failed: {failed_count}")
|
||||
logger.info(f"Success rate: {(success_count / len(room_names) * 100):.2f}%")
|
||||
logger.info("=" * 60)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user