Files
AI-VideoAssistant/api/app/storage.py
2026-02-06 14:01:34 +08:00

57 lines
1.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
from datetime import datetime
from minio import Minio
import uuid
# MinIO 配置
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "localhost:9000")
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "admin")
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "password123")
MINIO_BUCKET = os.getenv("MINIO_BUCKET", "ai-audio")
# 初始化客户端
minio_client = Minio(
MINIO_ENDPOINT,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
secure=False
)
def ensure_bucket():
"""确保 Bucket 存在"""
try:
if not minio_client.bucket_exists(MINIO_BUCKET):
minio_client.make_bucket(MINIO_BUCKET)
except Exception as e:
print(f"Warning: MinIO bucket check failed: {e}")
def upload_audio(file_path: str, call_id: str, turn_index: int) -> str:
"""上传音频片段,返回访问 URL"""
ensure_bucket()
ext = os.path.splitext(file_path)[1] or ".mp3"
object_name = f"{call_id}/{call_id}-{turn_index:03d}{ext}"
try:
minio_client.fput_object(MINIO_BUCKET, object_name, file_path)
return minio_client.presigned_get_object(MINIO_BUCKET, object_name, expires=604800)
except Exception as e:
print(f"Warning: MinIO upload failed: {e}")
return ""
def get_audio_url(call_id: str, turn_index: int) -> str:
"""获取音频 URL"""
object_name = f"{call_id}/{call_id}-{turn_index:03d}.mp3"
try:
return minio_client.presigned_get_object(MINIO_BUCKET, object_name, expires=604800)
except Exception:
return ""
def generate_local_url(call_id: str, turn_index: int) -> str:
"""生成本地 URL如果不用 MinIO"""
return f"/api/history/{call_id}/audio/{turn_index}"