import os from datetime import datetime from minio import Minio import uuid # MinIO 配置 MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "localhost:9000") MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "admin") MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "password123") MINIO_BUCKET = os.getenv("MINIO_BUCKET", "ai-audio") # 初始化客户端 minio_client = Minio( MINIO_ENDPOINT, access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY, secure=False ) def ensure_bucket(): """确保 Bucket 存在""" try: if not minio_client.bucket_exists(MINIO_BUCKET): minio_client.make_bucket(MINIO_BUCKET) except Exception as e: print(f"Warning: MinIO bucket check failed: {e}") def upload_audio(file_path: str, call_id: str, turn_index: int) -> str: """上传音频片段,返回访问 URL""" ensure_bucket() ext = os.path.splitext(file_path)[1] or ".mp3" object_name = f"{call_id}/{call_id}-{turn_index:03d}{ext}" try: minio_client.fput_object(MINIO_BUCKET, object_name, file_path) return minio_client.presigned_get_object(MINIO_BUCKET, object_name, expires=604800) except Exception as e: print(f"Warning: MinIO upload failed: {e}") return "" def get_audio_url(call_id: str, turn_index: int) -> str: """获取音频 URL""" object_name = f"{call_id}/{call_id}-{turn_index:03d}.mp3" try: return minio_client.presigned_get_object(MINIO_BUCKET, object_name, expires=604800) except Exception: return "" def generate_local_url(call_id: str, turn_index: int) -> str: """生成本地 URL(如果不用 MinIO)""" return f"/api/history/{call_id}/audio/{turn_index}"