Refactor assistant configuration management and update documentation

- Removed legacy agent profile settings from the .env.example and README, streamlining the configuration process.
- Introduced a new local YAML configuration adapter for assistant settings, allowing for easier management of assistant profiles.
- Updated backend integration documentation to clarify the behavior of assistant config sourcing based on backend URL settings.
- Adjusted various service implementations to directly utilize API keys from the new configuration structure.
- Enhanced test coverage for the new local YAML adapter and its integration with backend services.
This commit is contained in:
Xin Wang
2026-03-05 21:24:15 +08:00
parent d0a6419990
commit 935f2fbd1f
17 changed files with 585 additions and 739 deletions

View File

@@ -2,6 +2,8 @@
from __future__ import annotations
import re
from pathlib import Path
from typing import Any, Dict, List, Optional
import aiohttp
@@ -9,6 +11,18 @@ from loguru import logger
from app.config import settings
try:
import yaml
except ImportError: # pragma: no cover - validated when local YAML source is enabled
yaml = None
_ASSISTANT_ID_PATTERN = re.compile(r"^[A-Za-z0-9][A-Za-z0-9_.-]{0,127}$")
def _assistant_error(code: str, assistant_id: str) -> Dict[str, Any]:
return {"__error_code": code, "assistantId": str(assistant_id or "")}
class NullBackendAdapter:
"""No-op adapter for engine-only runtime without backend dependencies."""
@@ -128,6 +142,283 @@ class HistoryDisabledBackendAdapter:
return await self._delegate.fetch_tool_resource(tool_id)
class LocalYamlAssistantConfigAdapter(NullBackendAdapter):
"""Load assistant runtime config from local YAML files."""
def __init__(self, config_dir: str):
self._config_dir = self._resolve_base_dir(config_dir)
@staticmethod
def _resolve_base_dir(config_dir: str) -> Path:
raw = Path(str(config_dir or "").strip() or "engine/config/agents")
if raw.is_absolute():
return raw.resolve()
cwd_candidate = (Path.cwd() / raw).resolve()
if cwd_candidate.exists():
return cwd_candidate
engine_dir = Path(__file__).resolve().parent.parent
engine_candidate = (engine_dir / raw).resolve()
if engine_candidate.exists():
return engine_candidate
parts = raw.parts
if parts and parts[0] == "engine" and len(parts) > 1:
trimmed_candidate = (engine_dir / Path(*parts[1:])).resolve()
if trimmed_candidate.exists():
return trimmed_candidate
return cwd_candidate
def _resolve_config_file(self, assistant_id: str) -> Optional[Path]:
normalized = str(assistant_id or "").strip()
if not _ASSISTANT_ID_PATTERN.match(normalized):
return None
yaml_path = self._config_dir / f"{normalized}.yaml"
yml_path = self._config_dir / f"{normalized}.yml"
if yaml_path.exists():
return yaml_path
if yml_path.exists():
return yml_path
return None
@staticmethod
def _as_str(value: Any) -> Optional[str]:
if value is None:
return None
text = str(value).strip()
return text or None
@classmethod
def _translate_agent_schema(cls, assistant_id: str, payload: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Translate legacy `agent:` YAML schema into runtime assistant metadata."""
agent = payload.get("agent")
if not isinstance(agent, dict):
return None
runtime: Dict[str, Any] = {
"assistantId": str(assistant_id),
"services": {},
}
llm = agent.get("llm")
if isinstance(llm, dict):
llm_runtime: Dict[str, Any] = {}
if cls._as_str(llm.get("provider")):
llm_runtime["provider"] = cls._as_str(llm.get("provider"))
if cls._as_str(llm.get("model")):
llm_runtime["model"] = cls._as_str(llm.get("model"))
if cls._as_str(llm.get("api_key")):
llm_runtime["apiKey"] = cls._as_str(llm.get("api_key"))
if cls._as_str(llm.get("api_url")):
llm_runtime["baseUrl"] = cls._as_str(llm.get("api_url"))
if llm_runtime:
runtime["services"]["llm"] = llm_runtime
tts = agent.get("tts")
if isinstance(tts, dict):
tts_runtime: Dict[str, Any] = {}
if cls._as_str(tts.get("provider")):
tts_runtime["provider"] = cls._as_str(tts.get("provider"))
if cls._as_str(tts.get("model")):
tts_runtime["model"] = cls._as_str(tts.get("model"))
if cls._as_str(tts.get("api_key")):
tts_runtime["apiKey"] = cls._as_str(tts.get("api_key"))
if cls._as_str(tts.get("api_url")):
tts_runtime["baseUrl"] = cls._as_str(tts.get("api_url"))
if cls._as_str(tts.get("voice")):
tts_runtime["voice"] = cls._as_str(tts.get("voice"))
if tts.get("speed") is not None:
tts_runtime["speed"] = tts.get("speed")
dashscope_mode = cls._as_str(tts.get("dashscope_mode")) or cls._as_str(tts.get("mode"))
if dashscope_mode:
tts_runtime["mode"] = dashscope_mode
if tts_runtime:
runtime["services"]["tts"] = tts_runtime
asr = agent.get("asr")
if isinstance(asr, dict):
asr_runtime: Dict[str, Any] = {}
if cls._as_str(asr.get("provider")):
asr_runtime["provider"] = cls._as_str(asr.get("provider"))
if cls._as_str(asr.get("model")):
asr_runtime["model"] = cls._as_str(asr.get("model"))
if cls._as_str(asr.get("api_key")):
asr_runtime["apiKey"] = cls._as_str(asr.get("api_key"))
if cls._as_str(asr.get("api_url")):
asr_runtime["baseUrl"] = cls._as_str(asr.get("api_url"))
if asr.get("interim_interval_ms") is not None:
asr_runtime["interimIntervalMs"] = asr.get("interim_interval_ms")
if asr.get("min_audio_ms") is not None:
asr_runtime["minAudioMs"] = asr.get("min_audio_ms")
if asr_runtime:
runtime["services"]["asr"] = asr_runtime
duplex = agent.get("duplex")
if isinstance(duplex, dict):
if cls._as_str(duplex.get("system_prompt")):
runtime["systemPrompt"] = cls._as_str(duplex.get("system_prompt"))
if duplex.get("greeting") is not None:
runtime["greeting"] = duplex.get("greeting")
barge_in = agent.get("barge_in")
if isinstance(barge_in, dict):
runtime["bargeIn"] = {}
if barge_in.get("min_duration_ms") is not None:
runtime["bargeIn"]["minDurationMs"] = barge_in.get("min_duration_ms")
if barge_in.get("silence_tolerance_ms") is not None:
runtime["bargeIn"]["silenceToleranceMs"] = barge_in.get("silence_tolerance_ms")
if not runtime["bargeIn"]:
runtime.pop("bargeIn", None)
if isinstance(agent.get("tools"), list):
runtime["tools"] = agent.get("tools")
if not runtime.get("services"):
runtime.pop("services", None)
return runtime
async def fetch_assistant_config(self, assistant_id: str) -> Optional[Dict[str, Any]]:
config_file = self._resolve_config_file(assistant_id)
if config_file is None:
return _assistant_error("assistant.not_found", assistant_id)
if yaml is None:
logger.warning(
"Local assistant config requested but PyYAML is unavailable (assistant_id={})",
assistant_id,
)
return _assistant_error("assistant.config_unavailable", assistant_id)
try:
with config_file.open("r", encoding="utf-8") as handle:
payload = yaml.safe_load(handle) or {}
except Exception as exc:
logger.warning(
"Failed to read local assistant config {} (assistant_id={}): {}",
config_file,
assistant_id,
exc,
)
return _assistant_error("assistant.config_unavailable", assistant_id)
if not isinstance(payload, dict):
logger.warning(
"Local assistant config is not an object (assistant_id={}, file={})",
assistant_id,
config_file,
)
return _assistant_error("assistant.config_unavailable", assistant_id)
translated = self._translate_agent_schema(assistant_id, payload)
if translated is not None:
payload = translated
# Accept either backend-like payload shape or a direct assistant metadata object.
if isinstance(payload.get("assistant"), dict) or isinstance(payload.get("sessionStartMetadata"), dict):
normalized_payload = dict(payload)
else:
normalized_payload = {"assistant": dict(payload)}
assistant_obj = normalized_payload.get("assistant")
if isinstance(assistant_obj, dict):
resolved_assistant_id = assistant_obj.get("assistantId") or assistant_obj.get("id") or assistant_id
assistant_obj["assistantId"] = str(resolved_assistant_id)
else:
normalized_payload["assistant"] = {"assistantId": str(assistant_id)}
normalized_payload.setdefault("assistantId", str(assistant_id))
normalized_payload.setdefault("configVersionId", f"local:{config_file.name}")
return normalized_payload
class AssistantConfigSourceAdapter:
"""Route assistant config reads by backend availability without changing other APIs."""
def __init__(
self,
*,
delegate: HttpBackendAdapter | NullBackendAdapter | HistoryDisabledBackendAdapter,
local_delegate: LocalYamlAssistantConfigAdapter,
use_backend_assistant_config: bool,
):
self._delegate = delegate
self._local_delegate = local_delegate
self._use_backend_assistant_config = bool(use_backend_assistant_config)
async def fetch_assistant_config(self, assistant_id: str) -> Optional[Dict[str, Any]]:
if self._use_backend_assistant_config:
return await self._delegate.fetch_assistant_config(assistant_id)
return await self._local_delegate.fetch_assistant_config(assistant_id)
async def create_call_record(
self,
*,
user_id: int,
assistant_id: Optional[str],
source: str = "debug",
) -> Optional[str]:
return await self._delegate.create_call_record(
user_id=user_id,
assistant_id=assistant_id,
source=source,
)
async def add_transcript(
self,
*,
call_id: str,
turn_index: int,
speaker: str,
content: str,
start_ms: int,
end_ms: int,
confidence: Optional[float] = None,
duration_ms: Optional[int] = None,
) -> bool:
return await self._delegate.add_transcript(
call_id=call_id,
turn_index=turn_index,
speaker=speaker,
content=content,
start_ms=start_ms,
end_ms=end_ms,
confidence=confidence,
duration_ms=duration_ms,
)
async def finalize_call_record(
self,
*,
call_id: str,
status: str,
duration_seconds: int,
) -> bool:
return await self._delegate.finalize_call_record(
call_id=call_id,
status=status,
duration_seconds=duration_seconds,
)
async def search_knowledge_context(
self,
*,
kb_id: str,
query: str,
n_results: int = 5,
) -> List[Dict[str, Any]]:
return await self._delegate.search_knowledge_context(
kb_id=kb_id,
query=query,
n_results=n_results,
)
async def fetch_tool_resource(self, tool_id: str) -> Optional[Dict[str, Any]]:
return await self._delegate.fetch_tool_resource(tool_id)
class HttpBackendAdapter:
"""HTTP implementation of backend integration ports."""
@@ -322,36 +613,49 @@ def build_backend_adapter(
backend_mode: str = "auto",
history_enabled: bool = True,
timeout_sec: int = 10,
) -> HttpBackendAdapter | NullBackendAdapter | HistoryDisabledBackendAdapter:
assistant_local_config_dir: str = "engine/config/agents",
) -> AssistantConfigSourceAdapter:
"""Create backend adapter implementation based on runtime settings."""
mode = str(backend_mode or "auto").strip().lower()
has_url = bool(str(backend_url or "").strip())
base_adapter: HttpBackendAdapter | NullBackendAdapter
using_http_backend = False
if mode in {"disabled", "off", "none", "null", "engine_only", "engine-only"}:
base_adapter = NullBackendAdapter()
elif mode == "http":
if has_url:
base_adapter = HttpBackendAdapter(backend_url=str(backend_url), timeout_sec=timeout_sec)
using_http_backend = True
else:
logger.warning("BACKEND_MODE=http but BACKEND_URL is empty; falling back to NullBackendAdapter")
base_adapter = NullBackendAdapter()
else:
if has_url:
base_adapter = HttpBackendAdapter(backend_url=str(backend_url), timeout_sec=timeout_sec)
using_http_backend = True
else:
base_adapter = NullBackendAdapter()
runtime_adapter: HttpBackendAdapter | NullBackendAdapter | HistoryDisabledBackendAdapter
if not history_enabled:
return HistoryDisabledBackendAdapter(base_adapter)
return base_adapter
runtime_adapter = HistoryDisabledBackendAdapter(base_adapter)
else:
runtime_adapter = base_adapter
return AssistantConfigSourceAdapter(
delegate=runtime_adapter,
local_delegate=LocalYamlAssistantConfigAdapter(assistant_local_config_dir),
use_backend_assistant_config=using_http_backend,
)
def build_backend_adapter_from_settings() -> HttpBackendAdapter | NullBackendAdapter | HistoryDisabledBackendAdapter:
def build_backend_adapter_from_settings() -> AssistantConfigSourceAdapter:
"""Create backend adapter using current app settings."""
return build_backend_adapter(
backend_url=settings.backend_url,
backend_mode=settings.backend_mode,
history_enabled=settings.history_enabled,
timeout_sec=settings.backend_timeout_sec,
assistant_local_config_dir=settings.assistant_local_config_dir,
)

View File

@@ -1,371 +1,31 @@
"""Configuration management using Pydantic settings and agent YAML profiles."""
"""Configuration management using Pydantic settings."""
import json
import os
import re
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, List, Optional
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
try:
import yaml
except ImportError: # pragma: no cover - validated when agent YAML is used
yaml = None
from dotenv import load_dotenv
except ImportError: # pragma: no cover - optional dependency in some runtimes
load_dotenv = None
def _prime_process_env_from_dotenv() -> None:
"""Load .env into process env early."""
if load_dotenv is None:
return
cwd_env = Path.cwd() / ".env"
engine_env = Path(__file__).resolve().parent.parent / ".env"
load_dotenv(dotenv_path=cwd_env, override=False)
if engine_env != cwd_env:
load_dotenv(dotenv_path=engine_env, override=False)
_ENV_REF_PATTERN = re.compile(r"\$\{([A-Za-z_][A-Za-z0-9_]*)(?::([^}]*))?\}")
_DEFAULT_AGENT_CONFIG_DIR = "config/agents"
_DEFAULT_AGENT_CONFIG_FILE = "default.yaml"
_AGENT_SECTION_KEY_MAP: Dict[str, Dict[str, str]] = {
"vad": {
"type": "vad_type",
"model_path": "vad_model_path",
"threshold": "vad_threshold",
"min_speech_duration_ms": "vad_min_speech_duration_ms",
"eou_threshold_ms": "vad_eou_threshold_ms",
},
"llm": {
"provider": "llm_provider",
"model": "llm_model",
"temperature": "llm_temperature",
"api_key": "llm_api_key",
"api_url": "llm_api_url",
},
"tts": {
"provider": "tts_provider",
"api_key": "tts_api_key",
"api_url": "tts_api_url",
"model": "tts_model",
"voice": "tts_voice",
"dashscope_mode": "tts_mode",
"mode": "tts_mode",
"speed": "tts_speed",
},
"asr": {
"provider": "asr_provider",
"api_key": "asr_api_key",
"api_url": "asr_api_url",
"model": "asr_model",
"interim_interval_ms": "asr_interim_interval_ms",
"min_audio_ms": "asr_min_audio_ms",
"start_min_speech_ms": "asr_start_min_speech_ms",
"pre_speech_ms": "asr_pre_speech_ms",
"final_tail_ms": "asr_final_tail_ms",
},
"duplex": {
"enabled": "duplex_enabled",
"greeting": "duplex_greeting",
"system_prompt": "duplex_system_prompt",
"opener_audio_file": "duplex_opener_audio_file",
},
"barge_in": {
"min_duration_ms": "barge_in_min_duration_ms",
"silence_tolerance_ms": "barge_in_silence_tolerance_ms",
},
}
_AGENT_SETTING_KEYS = {
"vad_type",
"vad_model_path",
"vad_threshold",
"vad_min_speech_duration_ms",
"vad_eou_threshold_ms",
"llm_provider",
"llm_api_key",
"llm_api_url",
"llm_model",
"llm_temperature",
"tts_provider",
"tts_api_key",
"tts_api_url",
"tts_model",
"tts_voice",
"tts_mode",
"tts_speed",
"asr_provider",
"asr_api_key",
"asr_api_url",
"asr_model",
"asr_interim_interval_ms",
"asr_min_audio_ms",
"asr_start_min_speech_ms",
"asr_pre_speech_ms",
"asr_final_tail_ms",
"duplex_enabled",
"duplex_greeting",
"duplex_system_prompt",
"duplex_opener_audio_file",
"barge_in_min_duration_ms",
"barge_in_silence_tolerance_ms",
"tools",
}
_BASE_REQUIRED_AGENT_SETTING_KEYS = {
"vad_type",
"vad_model_path",
"vad_threshold",
"vad_min_speech_duration_ms",
"vad_eou_threshold_ms",
"llm_provider",
"llm_model",
"llm_temperature",
"tts_provider",
"tts_voice",
"tts_speed",
"asr_provider",
"asr_interim_interval_ms",
"asr_min_audio_ms",
"asr_start_min_speech_ms",
"asr_pre_speech_ms",
"asr_final_tail_ms",
"duplex_enabled",
"duplex_system_prompt",
"barge_in_min_duration_ms",
"barge_in_silence_tolerance_ms",
}
_OPENAI_COMPATIBLE_LLM_PROVIDERS = {"openai_compatible", "openai-compatible", "siliconflow"}
_OPENAI_COMPATIBLE_TTS_PROVIDERS = {"openai_compatible", "openai-compatible", "siliconflow"}
_DASHSCOPE_TTS_PROVIDERS = {"dashscope"}
_OPENAI_COMPATIBLE_ASR_PROVIDERS = {"openai_compatible", "openai-compatible", "siliconflow"}
def _normalized_provider(overrides: Dict[str, Any], key: str, default: str) -> str:
return str(overrides.get(key) or default).strip().lower()
def _is_blank(value: Any) -> bool:
return value is None or (isinstance(value, str) and not value.strip())
@dataclass(frozen=True)
class AgentConfigSelection:
"""Resolved agent config location and how it was selected."""
path: Optional[Path]
source: str
def _parse_cli_agent_args(argv: List[str]) -> Tuple[Optional[str], Optional[str]]:
"""Parse only agent-related CLI flags from argv."""
config_path: Optional[str] = None
profile: Optional[str] = None
i = 0
while i < len(argv):
arg = argv[i]
if arg.startswith("--agent-config="):
config_path = arg.split("=", 1)[1].strip() or None
elif arg == "--agent-config" and i + 1 < len(argv):
config_path = argv[i + 1].strip() or None
i += 1
elif arg.startswith("--agent-profile="):
profile = arg.split("=", 1)[1].strip() or None
elif arg == "--agent-profile" and i + 1 < len(argv):
profile = argv[i + 1].strip() or None
i += 1
i += 1
return config_path, profile
def _agent_config_dir() -> Path:
base_dir = Path(os.getenv("AGENT_CONFIG_DIR", _DEFAULT_AGENT_CONFIG_DIR))
if not base_dir.is_absolute():
base_dir = Path.cwd() / base_dir
return base_dir.resolve()
def _resolve_agent_selection(
agent_config_path: Optional[str] = None,
agent_profile: Optional[str] = None,
argv: Optional[List[str]] = None,
) -> AgentConfigSelection:
cli_path, cli_profile = _parse_cli_agent_args(list(argv if argv is not None else sys.argv[1:]))
path_value = agent_config_path or cli_path or os.getenv("AGENT_CONFIG_PATH")
profile_value = agent_profile or cli_profile or os.getenv("AGENT_PROFILE")
source = "none"
candidate: Optional[Path] = None
if path_value:
source = "cli_path" if (agent_config_path or cli_path) else "env_path"
candidate = Path(path_value)
elif profile_value:
source = "cli_profile" if (agent_profile or cli_profile) else "env_profile"
candidate = _agent_config_dir() / f"{profile_value}.yaml"
else:
fallback = _agent_config_dir() / _DEFAULT_AGENT_CONFIG_FILE
if fallback.exists():
source = "default"
candidate = fallback
if candidate is None:
raise ValueError(
"Agent YAML config is required. Provide --agent-config/--agent-profile "
"or create config/agents/default.yaml."
)
if not candidate.is_absolute():
candidate = (Path.cwd() / candidate).resolve()
else:
candidate = candidate.resolve()
if not candidate.exists():
raise ValueError(f"Agent config file not found ({source}): {candidate}")
if not candidate.is_file():
raise ValueError(f"Agent config path is not a file: {candidate}")
return AgentConfigSelection(path=candidate, source=source)
def _resolve_env_refs(value: Any) -> Any:
"""Resolve ${ENV_VAR} / ${ENV_VAR:default} placeholders recursively."""
if isinstance(value, dict):
return {k: _resolve_env_refs(v) for k, v in value.items()}
if isinstance(value, list):
return [_resolve_env_refs(item) for item in value]
if not isinstance(value, str) or "${" not in value:
return value
def _replace(match: re.Match[str]) -> str:
env_key = match.group(1)
default_value = match.group(2)
env_value = os.getenv(env_key)
if env_value is None:
if default_value is None:
raise ValueError(f"Missing environment variable referenced in agent YAML: {env_key}")
return default_value
return env_value
return _ENV_REF_PATTERN.sub(_replace, value)
def _normalize_agent_overrides(raw: Dict[str, Any]) -> Dict[str, Any]:
"""Normalize YAML into flat Settings fields."""
normalized: Dict[str, Any] = {}
for key, value in raw.items():
if key == "siliconflow":
raise ValueError(
"Section 'siliconflow' is no longer supported. "
"Move provider-specific fields into agent.llm / agent.asr / agent.tts."
)
if key == "tools":
if not isinstance(value, list):
raise ValueError("Agent config key 'tools' must be a list")
normalized["tools"] = value
continue
section_map = _AGENT_SECTION_KEY_MAP.get(key)
if section_map is None:
normalized[key] = value
continue
if not isinstance(value, dict):
raise ValueError(f"Agent config section '{key}' must be a mapping")
for nested_key, nested_value in value.items():
mapped_key = section_map.get(nested_key)
if mapped_key is None:
raise ValueError(f"Unknown key in '{key}' section: '{nested_key}'")
normalized[mapped_key] = nested_value
unknown_keys = sorted(set(normalized) - _AGENT_SETTING_KEYS)
if unknown_keys:
raise ValueError(
"Unknown agent config keys in YAML: "
+ ", ".join(unknown_keys)
)
return normalized
def _missing_required_keys(overrides: Dict[str, Any]) -> List[str]:
missing = set(_BASE_REQUIRED_AGENT_SETTING_KEYS - set(overrides))
string_required = {
"vad_type",
"vad_model_path",
"llm_provider",
"llm_model",
"tts_provider",
"tts_voice",
"asr_provider",
"duplex_system_prompt",
}
for key in string_required:
if key in overrides and _is_blank(overrides.get(key)):
missing.add(key)
llm_provider = _normalized_provider(overrides, "llm_provider", "openai")
if llm_provider in _OPENAI_COMPATIBLE_LLM_PROVIDERS or llm_provider == "openai":
if "llm_api_key" not in overrides or _is_blank(overrides.get("llm_api_key")):
missing.add("llm_api_key")
tts_provider = _normalized_provider(overrides, "tts_provider", "openai_compatible")
if tts_provider in _OPENAI_COMPATIBLE_TTS_PROVIDERS:
if "tts_api_key" not in overrides or _is_blank(overrides.get("tts_api_key")):
missing.add("tts_api_key")
if "tts_api_url" not in overrides or _is_blank(overrides.get("tts_api_url")):
missing.add("tts_api_url")
if "tts_model" not in overrides or _is_blank(overrides.get("tts_model")):
missing.add("tts_model")
elif tts_provider in _DASHSCOPE_TTS_PROVIDERS:
if "tts_api_key" not in overrides or _is_blank(overrides.get("tts_api_key")):
missing.add("tts_api_key")
asr_provider = _normalized_provider(overrides, "asr_provider", "openai_compatible")
if asr_provider in _OPENAI_COMPATIBLE_ASR_PROVIDERS:
if "asr_api_key" not in overrides or _is_blank(overrides.get("asr_api_key")):
missing.add("asr_api_key")
if "asr_api_url" not in overrides or _is_blank(overrides.get("asr_api_url")):
missing.add("asr_api_url")
if "asr_model" not in overrides or _is_blank(overrides.get("asr_model")):
missing.add("asr_model")
return sorted(missing)
def _load_agent_overrides(selection: AgentConfigSelection) -> Dict[str, Any]:
if yaml is None:
raise RuntimeError(
"PyYAML is required for agent YAML configuration. Install with: pip install pyyaml"
)
with selection.path.open("r", encoding="utf-8") as file:
raw = yaml.safe_load(file) or {}
if not isinstance(raw, dict):
raise ValueError(f"Agent config must be a YAML mapping: {selection.path}")
if "agent" in raw:
agent_value = raw["agent"]
if not isinstance(agent_value, dict):
raise ValueError("The 'agent' key in YAML must be a mapping")
raw = agent_value
resolved = _resolve_env_refs(raw)
overrides = _normalize_agent_overrides(resolved)
missing_required = _missing_required_keys(overrides)
if missing_required:
raise ValueError(
f"Missing required agent settings in YAML ({selection.path}): "
+ ", ".join(missing_required)
)
overrides["agent_config_path"] = str(selection.path)
overrides["agent_config_source"] = selection.source
return overrides
def load_settings(
agent_config_path: Optional[str] = None,
agent_profile: Optional[str] = None,
argv: Optional[List[str]] = None,
) -> "Settings":
"""Load settings from .env and optional agent YAML."""
selection = _resolve_agent_selection(
agent_config_path=agent_config_path,
agent_profile=agent_profile,
argv=argv,
)
agent_overrides = _load_agent_overrides(selection)
return Settings(**agent_overrides)
_prime_process_env_from_dotenv()
class Settings(BaseSettings):
@@ -404,7 +64,6 @@ class Settings(BaseSettings):
default="openai",
description="LLM provider (openai, openai_compatible, siliconflow)"
)
llm_api_key: Optional[str] = Field(default=None, description="LLM provider API key")
llm_api_url: Optional[str] = Field(default=None, description="LLM provider API base URL")
llm_model: str = Field(default="gpt-4o-mini", description="LLM model name")
llm_temperature: float = Field(default=0.7, description="LLM temperature for response generation")
@@ -414,7 +73,6 @@ class Settings(BaseSettings):
default="openai_compatible",
description="TTS provider (edge, openai_compatible, siliconflow, dashscope)"
)
tts_api_key: Optional[str] = Field(default=None, description="TTS provider API key")
tts_api_url: Optional[str] = Field(default=None, description="TTS provider API URL")
tts_model: Optional[str] = Field(default=None, description="TTS model name")
tts_voice: str = Field(default="anna", description="TTS voice name")
@@ -429,7 +87,6 @@ class Settings(BaseSettings):
default="openai_compatible",
description="ASR provider (openai_compatible, buffered, siliconflow)"
)
asr_api_key: Optional[str] = Field(default=None, description="ASR provider API key")
asr_api_url: Optional[str] = Field(default=None, description="ASR provider API URL")
asr_model: Optional[str] = Field(default=None, description="ASR model name")
asr_interim_interval_ms: int = Field(default=500, description="Interval for interim ASR results in ms")
@@ -505,6 +162,10 @@ class Settings(BaseSettings):
)
backend_url: Optional[str] = Field(default=None, description="Backend API base URL (e.g. http://localhost:8787)")
backend_timeout_sec: int = Field(default=10, description="Backend API request timeout in seconds")
assistant_local_config_dir: str = Field(
default="engine/config/agents",
description="Directory containing local assistant runtime YAML files"
)
history_enabled: bool = Field(default=True, description="Enable history write bridge")
history_default_user_id: int = Field(default=1, description="Fallback user_id for history records")
history_queue_max_size: int = Field(default=256, description="Max buffered transcript writes per session")
@@ -515,10 +176,6 @@ class Settings(BaseSettings):
description="Max wait before finalizing history when queue is still draining"
)
# Agent YAML metadata
agent_config_path: Optional[str] = Field(default=None, description="Resolved agent YAML path")
agent_config_source: str = Field(default="none", description="How the agent YAML was selected")
@property
def chunk_size_bytes(self) -> int:
"""Calculate chunk size in bytes based on sample rate and duration."""
@@ -543,7 +200,7 @@ class Settings(BaseSettings):
# Global settings instance
settings = load_settings()
settings = Settings()
def get_settings() -> Settings:

View File

@@ -371,12 +371,10 @@ async def startup_event():
logger.info(f"Server: {settings.host}:{settings.port}")
logger.info(f"Sample rate: {settings.sample_rate} Hz")
logger.info(f"VAD model: {settings.vad_model_path}")
if settings.agent_config_path:
logger.info(
f"Agent config loaded ({settings.agent_config_source}): {settings.agent_config_path}"
)
else:
logger.info("Agent config: none (using .env/default agent values)")
logger.info(
"Assistant runtime config source: backend when BACKEND_URL is set, "
"otherwise local YAML by assistant_id from ASSISTANT_LOCAL_CONFIG_DIR"
)
@app.on_event("shutdown")