from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session from typing import Optional import uuid from ..db import get_db from ..models import Voice from ..schemas import VoiceCreate, VoiceUpdate, VoiceOut router = APIRouter(prefix="/voices", tags=["Voices"]) @router.get("") def list_voices( vendor: Optional[str] = None, language: Optional[str] = None, gender: Optional[str] = None, page: int = 1, limit: int = 50, db: Session = Depends(get_db) ): """获取声音库列表""" query = db.query(Voice) if vendor: query = query.filter(Voice.vendor == vendor) if language: query = query.filter(Voice.language == language) if gender: query = query.filter(Voice.gender == gender) total = query.count() voices = query.order_by(Voice.created_at.desc()) \ .offset((page - 1) * limit).limit(limit).all() return {"total": total, "page": page, "limit": limit, "list": voices} @router.post("", response_model=VoiceOut) def create_voice(data: VoiceCreate, db: Session = Depends(get_db)): """创建声音""" voice = Voice( id=data.id or str(uuid.uuid4())[:8], user_id=1, name=data.name, vendor=data.vendor, gender=data.gender, language=data.language, description=data.description, model=data.model, voice_key=data.voice_key, speed=data.speed, gain=data.gain, pitch=data.pitch, enabled=data.enabled, ) db.add(voice) db.commit() db.refresh(voice) return voice @router.get("/{id}", response_model=VoiceOut) def get_voice(id: str, db: Session = Depends(get_db)): """获取单个声音详情""" voice = db.query(Voice).filter(Voice.id == id).first() if not voice: raise HTTPException(status_code=404, detail="Voice not found") return voice @router.put("/{id}", response_model=VoiceOut) def update_voice(id: str, data: VoiceUpdate, db: Session = Depends(get_db)): """更新声音""" voice = db.query(Voice).filter(Voice.id == id).first() if not voice: raise HTTPException(status_code=404, detail="Voice not found") update_data = data.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(voice, field, value) db.commit() db.refresh(voice) return voice @router.delete("/{id}") def delete_voice(id: str, db: Session = Depends(get_db)): """删除声音""" voice = db.query(Voice).filter(Voice.id == id).first() if not voice: raise HTTPException(status_code=404, detail="Voice not found") db.delete(voice) db.commit() return {"message": "Deleted successfully"}