Files
AI-VideoAssistant/engine/tests/test_backend_adapters.py
Xin Wang 7e0b777923 Refactor project structure and enhance backend integration
- Expanded package inclusion in `pyproject.toml` to support new modules.
- Introduced new `adapters` and `protocol` packages for better organization.
- Added backend adapter implementations for control plane integration.
- Updated main application imports to reflect new package structure.
- Removed deprecated core components and adjusted documentation accordingly.
- Enhanced architecture documentation to clarify the new runtime and integration layers.
2026-03-06 09:51:56 +08:00

325 lines
10 KiB
Python

import aiohttp
import pytest
from adapters.control_plane.backend import (
AssistantConfigSourceAdapter,
LocalYamlAssistantConfigAdapter,
build_backend_adapter,
)
@pytest.mark.asyncio
async def test_without_backend_url_uses_local_yaml_for_assistant_config(tmp_path):
config_dir = tmp_path / "assistants"
config_dir.mkdir(parents=True, exist_ok=True)
(config_dir / "dev_local.yaml").write_text(
"\n".join(
[
"assistant:",
" assistantId: dev_local",
" systemPrompt: local prompt",
" greeting: local greeting",
]
),
encoding="utf-8",
)
adapter = build_backend_adapter(
backend_url=None,
backend_mode="auto",
history_enabled=True,
timeout_sec=3,
assistant_local_config_dir=str(config_dir),
)
assert isinstance(adapter, AssistantConfigSourceAdapter)
payload = await adapter.fetch_assistant_config("dev_local")
assert isinstance(payload, dict)
assert payload.get("__error_code") in (None, "")
assert payload["assistant"]["assistantId"] == "dev_local"
assert payload["assistant"]["systemPrompt"] == "local prompt"
assert (
await adapter.create_call_record(
user_id=1,
assistant_id="assistant_1",
source="debug",
)
is None
)
assert (
await adapter.add_transcript(
call_id="call_1",
turn_index=0,
speaker="human",
content="hi",
start_ms=0,
end_ms=100,
confidence=0.9,
duration_ms=100,
)
is False
)
assert (
await adapter.finalize_call_record(
call_id="call_1",
status="connected",
duration_seconds=2,
)
is False
)
assert await adapter.search_knowledge_context(kb_id="kb_1", query="hello", n_results=3) == []
assert await adapter.fetch_tool_resource("tool_1") is None
@pytest.mark.asyncio
async def test_http_backend_adapter_create_call_record_posts_expected_payload(monkeypatch, tmp_path):
captured = {}
class _FakeResponse:
def __init__(self, status=200, payload=None):
self.status = status
self._payload = payload if payload is not None else {}
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return None
async def json(self):
return self._payload
def raise_for_status(self):
if self.status >= 400:
raise RuntimeError("http_error")
class _FakeClientSession:
def __init__(self, timeout=None):
captured["timeout"] = timeout
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return None
def post(self, url, json=None):
captured["url"] = url
captured["json"] = json
return _FakeResponse(status=200, payload={"id": "call_123"})
def get(self, url):
_ = url
return _FakeResponse(
status=200,
payload={
"assistant": {
"assistantId": "assistant_9",
"systemPrompt": "backend prompt",
}
},
)
monkeypatch.setattr("adapters.control_plane.backend.aiohttp.ClientSession", _FakeClientSession)
config_dir = tmp_path / "assistants"
config_dir.mkdir(parents=True, exist_ok=True)
adapter = build_backend_adapter(
backend_url="http://localhost:8100",
backend_mode="auto",
history_enabled=True,
timeout_sec=7,
assistant_local_config_dir=str(config_dir),
)
assert isinstance(adapter, AssistantConfigSourceAdapter)
call_id = await adapter.create_call_record(
user_id=99,
assistant_id="assistant_9",
source="debug",
)
assert call_id == "call_123"
assert captured["url"] == "http://localhost:8100/api/history"
assert captured["json"] == {
"user_id": 99,
"assistant_id": "assistant_9",
"source": "debug",
"status": "connected",
}
assert isinstance(captured["timeout"], aiohttp.ClientTimeout)
assert captured["timeout"].total == 7
@pytest.mark.asyncio
async def test_with_backend_url_uses_backend_for_assistant_config(monkeypatch, tmp_path):
class _FakeResponse:
def __init__(self, status=200, payload=None):
self.status = status
self._payload = payload if payload is not None else {}
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return None
async def json(self):
return self._payload
def raise_for_status(self):
if self.status >= 400:
raise RuntimeError("http_error")
class _FakeClientSession:
def __init__(self, timeout=None):
self.timeout = timeout
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return None
def get(self, url):
_ = url
return _FakeResponse(
status=200,
payload={
"assistant": {
"assistantId": "dev_http",
"systemPrompt": "backend prompt",
}
},
)
def post(self, url, json=None):
_ = (url, json)
return _FakeResponse(status=200, payload={"id": "call_1"})
monkeypatch.setattr("adapters.control_plane.backend.aiohttp.ClientSession", _FakeClientSession)
config_dir = tmp_path / "assistants"
config_dir.mkdir(parents=True, exist_ok=True)
(config_dir / "dev_http.yaml").write_text(
"\n".join(
[
"assistant:",
" assistantId: dev_http",
" systemPrompt: local prompt",
]
),
encoding="utf-8",
)
adapter = build_backend_adapter(
backend_url="http://localhost:8100",
backend_mode="auto",
history_enabled=True,
timeout_sec=3,
assistant_local_config_dir=str(config_dir),
)
assert isinstance(adapter, AssistantConfigSourceAdapter)
payload = await adapter.fetch_assistant_config("dev_http")
assert payload["assistant"]["assistantId"] == "dev_http"
assert payload["assistant"]["systemPrompt"] == "backend prompt"
@pytest.mark.asyncio
async def test_backend_mode_disabled_uses_local_assistant_config_even_with_url(monkeypatch, tmp_path):
class _FailIfCalledClientSession:
def __init__(self, timeout=None):
_ = timeout
raise AssertionError("HTTP client should not be created when backend_mode=disabled")
monkeypatch.setattr("adapters.control_plane.backend.aiohttp.ClientSession", _FailIfCalledClientSession)
config_dir = tmp_path / "assistants"
config_dir.mkdir(parents=True, exist_ok=True)
(config_dir / "dev_disabled.yaml").write_text(
"\n".join(
[
"assistant:",
" assistantId: dev_disabled",
" systemPrompt: local disabled prompt",
]
),
encoding="utf-8",
)
adapter = build_backend_adapter(
backend_url="http://localhost:8100",
backend_mode="disabled",
history_enabled=True,
timeout_sec=3,
assistant_local_config_dir=str(config_dir),
)
assert isinstance(adapter, AssistantConfigSourceAdapter)
payload = await adapter.fetch_assistant_config("dev_disabled")
assert payload["assistant"]["assistantId"] == "dev_disabled"
assert payload["assistant"]["systemPrompt"] == "local disabled prompt"
assert await adapter.create_call_record(user_id=1, assistant_id="a1", source="debug") is None
assert await adapter.add_transcript(
call_id="c1",
turn_index=0,
speaker="human",
content="hi",
start_ms=0,
end_ms=10,
duration_ms=10,
) is False
@pytest.mark.asyncio
async def test_local_yaml_adapter_rejects_path_traversal_like_assistant_id(tmp_path):
adapter = LocalYamlAssistantConfigAdapter(str(tmp_path))
payload = await adapter.fetch_assistant_config("../etc/passwd")
assert payload == {"__error_code": "assistant.not_found", "assistantId": "../etc/passwd"}
@pytest.mark.asyncio
async def test_local_yaml_translates_agent_schema_to_runtime_services(tmp_path):
config_dir = tmp_path / "assistants"
config_dir.mkdir(parents=True, exist_ok=True)
(config_dir / "default.yaml").write_text(
"\n".join(
[
"agent:",
" llm:",
" provider: openai",
" model: gpt-4o-mini",
" api_key: sk-llm",
" api_url: https://api.example.com/v1",
" tts:",
" provider: openai_compatible",
" model: tts-model",
" api_key: sk-tts",
" api_url: https://tts.example.com/v1/audio/speech",
" voice: anna",
" asr:",
" provider: openai_compatible",
" model: asr-model",
" api_key: sk-asr",
" api_url: https://asr.example.com/v1/audio/transcriptions",
" duplex:",
" system_prompt: You are test assistant",
]
),
encoding="utf-8",
)
adapter = LocalYamlAssistantConfigAdapter(str(config_dir))
payload = await adapter.fetch_assistant_config("default")
assert isinstance(payload, dict)
assistant = payload.get("assistant", {})
services = assistant.get("services", {})
assert services.get("llm", {}).get("apiKey") == "sk-llm"
assert services.get("tts", {}).get("apiKey") == "sk-tts"
assert services.get("asr", {}).get("apiKey") == "sk-asr"
assert assistant.get("systemPrompt") == "You are test assistant"