Files
ai-video-fullstack/backend/services/model_resource_tester.py
Xin Wang 90e3e8a0c0 Refactor backend to support interface-definition driven model resources
- Introduce a new model structure for managing interface definitions and model resources, enhancing the backend's capability to handle various service integrations.
- Update the Makefile to reflect changes in database seeding and resource management commands.
- Remove the deprecated credentials management routes and replace them with a unified model registry API.
- Modify existing routes and schemas to align with the new model structure, ensuring seamless integration with the frontend.
- Enhance database seeding scripts to populate new model resources and their configurations.
- Update README documentation to reflect the new architecture and usage instructions for model resources and interface definitions.
2026-06-14 19:36:12 +08:00

153 lines
5.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Connectivity checks for interface-definition driven model resources."""
from __future__ import annotations
import io
import time
import wave
import httpx
import config
from schemas import ModelResourceTestResult
TEST_TIMEOUT_SECONDS = 10.0
def _endpoint(base_url: str, path: str) -> str:
return f"{base_url.rstrip('/')}/{path.lstrip('/')}"
def _silent_wav() -> bytes:
buffer = io.BytesIO()
with wave.open(buffer, "wb") as wav:
wav.setnchannels(1)
wav.setsampwidth(2)
wav.setframerate(16_000)
wav.writeframes(b"\x00\x00" * 1_600)
return buffer.getvalue()
def _error_detail(response: httpx.Response, secrets: dict) -> str:
try:
body = response.json()
detail = (
body.get("error", {}).get("message")
if isinstance(body, dict) and isinstance(body.get("error"), dict)
else body.get("detail") if isinstance(body, dict) else None
)
except ValueError:
detail = None
text = str(detail or response.text or response.reason_phrase).strip()
for secret in secrets.values():
if secret:
text = text.replace(str(secret), "***")
return text[:300]
async def test_model_resource(
interface_type: str,
capability: str,
values: dict,
secrets: dict,
) -> ModelResourceTestResult:
if interface_type.startswith("xfyun-"):
return ModelResourceTestResult(
ok=True,
message="讯飞连接参数有效",
detail="鉴权字段和连接参数完整,请在语音测试页验证签名及音频链路",
)
if capability == "Realtime":
return ModelResourceTestResult(
ok=False,
message="暂不支持 Realtime 连接测试",
detail="请在助手语音测试页验证实时连接",
)
api_url = str(values.get("apiUrl") or "")
model_id = str(values.get("modelId") or "")
api_key = str(secrets.get("apiKey") or "")
headers = {"Authorization": f"Bearer {api_key}"}
started = time.perf_counter()
try:
async with httpx.AsyncClient(timeout=TEST_TIMEOUT_SECONDS) as client:
if capability == "LLM":
response = await client.post(
_endpoint(api_url, "chat/completions"),
headers=headers,
json={
"model": model_id,
"messages": [{"role": "user", "content": "Reply with OK."}],
"max_tokens": 1,
"stream": False,
},
)
elif capability == "Embedding":
response = await client.post(
_endpoint(api_url, "embeddings"),
headers=headers,
json={"model": model_id, "input": "ping"},
)
elif capability == "ASR":
response = await client.post(
_endpoint(api_url, "audio/transcriptions"),
headers=headers,
data={
"model": model_id,
**(
{"language": str(values["language"])}
if values.get("language")
else {}
),
},
files={"file": ("test.wav", _silent_wav(), "audio/wav")},
)
elif capability == "TTS":
response = await client.post(
_endpoint(api_url, "audio/speech"),
headers=headers,
json={
"model": model_id,
"input": "测试",
"voice": str(values.get("voice") or config.TTS_VOICE),
"response_format": "pcm",
"speed": float(values.get("speed") or 1),
},
)
else:
return ModelResourceTestResult(
ok=False,
message="暂不支持该能力的连接测试",
detail=f"收到能力类型 {capability}",
)
latency_ms = round((time.perf_counter() - started) * 1000)
if response.is_success:
return ModelResourceTestResult(
ok=True,
latency_ms=latency_ms,
message="连接成功",
detail=f"接口响应正常HTTP {response.status_code}",
)
return ModelResourceTestResult(
ok=False,
latency_ms=latency_ms,
message=f"连接失败HTTP {response.status_code}",
detail=_error_detail(response, secrets),
)
except httpx.TimeoutException:
return ModelResourceTestResult(
ok=False,
latency_ms=round((time.perf_counter() - started) * 1000),
message="连接超时",
detail=f"服务未在 {TEST_TIMEOUT_SECONDS:g} 秒内响应",
)
except httpx.RequestError as exc:
return ModelResourceTestResult(
ok=False,
latency_ms=round((time.perf_counter() - started) * 1000),
message="无法连接到模型服务",
detail=str(exc)[:300],
)