import json import os import requests import boto3 import time from typing import Dict, Any def get_sqs_client(): if __name__ == "__main__": # for local testing, load temporary credentials. see notes.md try: with open('assume_role_output.json', 'r') as f: credentials = json.load(f)['Credentials'] session = boto3.Session( aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) return session.client('sqs', region_name='us-west-2') except FileNotFoundError: print("Warning: assume_role_output.json not found. Using default credentials.") return boto3.client('sqs') else: # When running in Lambda, use the role attached to the function return boto3.client('sqs') sqs = get_sqs_client() DAILY_API_KEY = os.environ.get('DAILY_API_KEY') SQS_QUEUE_URL = 'https://sqs.us-west-2.amazonaws.com/955740203061/khk-sqs-launch-day-demos' def create_daily_room() -> Dict[str, Any]: if not DAILY_API_KEY: raise ValueError("DAILY_API_KEY environment variable is not set") url = "https://api.daily.co/v1/rooms" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {DAILY_API_KEY}" } exp = int(time.time()) + 305 payload = { "properties": { "exp": exp, "eject_at_room_exp": True } } response = requests.post(url, headers=headers, json=payload) response = requests.post(url, headers=headers, json=payload) response.raise_for_status() return response.json() def send_to_sqs(room_info: Dict[str, Any]) -> None: if not SQS_QUEUE_URL: raise ValueError("SQS_QUEUE_URL environment variable is not set") if sqs: sqs.send_message( QueueUrl=SQS_QUEUE_URL, MessageBody=json.dumps(room_info) ) print(f"Message sent to SQS queue: {SQS_QUEUE_URL}") else: print(f"Simulated sending message to SQS queue: {SQS_QUEUE_URL}") print(f"Message body: {json.dumps(room_info, indent=2)}") def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: try: print("Creating Daily room...") daily_room = create_daily_room() print(f"Daily room created: {json.dumps(daily_room, indent=2)}") print("Sending room info to SQS...") send_to_sqs(daily_room) return { 'statusCode': 200, 'body': json.dumps({"room_url": daily_room['url']}) } except Exception as e: print(f"Error occurred: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } if __name__ == "__main__": # Simulate the Lambda event and context event = {} context = None result = lambda_handler(event, context) print(f"Lambda handler result: {json.dumps(result, indent=2)}")