Compare commits
3 Commits
834c2dff35
...
8d203f0939
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8d203f0939 | ||
|
|
da4a77eac7 | ||
|
|
08319a4cc7 |
@@ -11,9 +11,16 @@ PORT=8000
|
|||||||
# EXTERNAL_IP=1.2.3.4
|
# EXTERNAL_IP=1.2.3.4
|
||||||
|
|
||||||
# Backend bridge (optional)
|
# Backend bridge (optional)
|
||||||
|
# BACKEND_MODE=auto|http|disabled
|
||||||
|
BACKEND_MODE=auto
|
||||||
BACKEND_URL=http://127.0.0.1:8100
|
BACKEND_URL=http://127.0.0.1:8100
|
||||||
BACKEND_TIMEOUT_SEC=10
|
BACKEND_TIMEOUT_SEC=10
|
||||||
|
HISTORY_ENABLED=true
|
||||||
HISTORY_DEFAULT_USER_ID=1
|
HISTORY_DEFAULT_USER_ID=1
|
||||||
|
HISTORY_QUEUE_MAX_SIZE=256
|
||||||
|
HISTORY_RETRY_MAX_ATTEMPTS=2
|
||||||
|
HISTORY_RETRY_BACKOFF_SEC=0.2
|
||||||
|
HISTORY_FINALIZE_DRAIN_TIMEOUT_SEC=1.5
|
||||||
|
|
||||||
# Audio
|
# Audio
|
||||||
SAMPLE_RATE=16000
|
SAMPLE_RATE=16000
|
||||||
|
|||||||
27
README.md
27
README.md
@@ -37,6 +37,33 @@ Agent 配置路径优先级
|
|||||||
- Agent 相关配置是严格模式:YAML 缺少必须项会直接报错,不会回退到 `.env` 或代码默认值。
|
- Agent 相关配置是严格模式:YAML 缺少必须项会直接报错,不会回退到 `.env` 或代码默认值。
|
||||||
- 如果要引用环境变量,请在 YAML 显式写 `${ENV_VAR}`。
|
- 如果要引用环境变量,请在 YAML 显式写 `${ENV_VAR}`。
|
||||||
- `siliconflow` 独立 section 已移除;请在 `agent.llm / agent.tts / agent.asr` 内通过 `provider`、`api_key`、`api_url`、`model` 配置。
|
- `siliconflow` 独立 section 已移除;请在 `agent.llm / agent.tts / agent.asr` 内通过 `provider`、`api_key`、`api_url`、`model` 配置。
|
||||||
|
- 现在支持在 Agent YAML 中配置 `agent.tools`(列表),用于声明运行时可调用工具。
|
||||||
|
- 工具配置示例见 `config/agents/tools.yaml`。
|
||||||
|
|
||||||
|
## Backend Integration
|
||||||
|
|
||||||
|
Engine runtime now supports adapter-based backend integration:
|
||||||
|
|
||||||
|
- `BACKEND_MODE=auto|http|disabled`
|
||||||
|
- `BACKEND_URL` + `BACKEND_TIMEOUT_SEC`
|
||||||
|
- `HISTORY_ENABLED=true|false`
|
||||||
|
|
||||||
|
Behavior:
|
||||||
|
|
||||||
|
- `auto`: use HTTP backend only when `BACKEND_URL` is set, otherwise engine-only mode.
|
||||||
|
- `http`: force HTTP backend; falls back to engine-only mode when URL is missing.
|
||||||
|
- `disabled`: force engine-only mode (no backend calls).
|
||||||
|
|
||||||
|
History write path is now asynchronous and buffered per session:
|
||||||
|
|
||||||
|
- `HISTORY_QUEUE_MAX_SIZE`
|
||||||
|
- `HISTORY_RETRY_MAX_ATTEMPTS`
|
||||||
|
- `HISTORY_RETRY_BACKOFF_SEC`
|
||||||
|
- `HISTORY_FINALIZE_DRAIN_TIMEOUT_SEC`
|
||||||
|
|
||||||
|
This keeps turn processing responsive even when backend history APIs are slow/failing.
|
||||||
|
|
||||||
|
Detailed notes: `docs/backend_integration.md`.
|
||||||
|
|
||||||
测试
|
测试
|
||||||
|
|
||||||
|
|||||||
357
app/backend_adapters.py
Normal file
357
app/backend_adapters.py
Normal file
@@ -0,0 +1,357 @@
|
|||||||
|
"""Backend adapter implementations for engine integration ports."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import Any, Dict, List, Optional
|
||||||
|
|
||||||
|
import aiohttp
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
|
from app.config import settings
|
||||||
|
|
||||||
|
|
||||||
|
class NullBackendAdapter:
|
||||||
|
"""No-op adapter for engine-only runtime without backend dependencies."""
|
||||||
|
|
||||||
|
async def fetch_assistant_config(self, assistant_id: str) -> Optional[Dict[str, Any]]:
|
||||||
|
_ = assistant_id
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def create_call_record(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
user_id: int,
|
||||||
|
assistant_id: Optional[str],
|
||||||
|
source: str = "debug",
|
||||||
|
) -> Optional[str]:
|
||||||
|
_ = (user_id, assistant_id, source)
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def add_transcript(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
call_id: str,
|
||||||
|
turn_index: int,
|
||||||
|
speaker: str,
|
||||||
|
content: str,
|
||||||
|
start_ms: int,
|
||||||
|
end_ms: int,
|
||||||
|
confidence: Optional[float] = None,
|
||||||
|
duration_ms: Optional[int] = None,
|
||||||
|
) -> bool:
|
||||||
|
_ = (call_id, turn_index, speaker, content, start_ms, end_ms, confidence, duration_ms)
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def finalize_call_record(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
call_id: str,
|
||||||
|
status: str,
|
||||||
|
duration_seconds: int,
|
||||||
|
) -> bool:
|
||||||
|
_ = (call_id, status, duration_seconds)
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def search_knowledge_context(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
kb_id: str,
|
||||||
|
query: str,
|
||||||
|
n_results: int = 5,
|
||||||
|
) -> List[Dict[str, Any]]:
|
||||||
|
_ = (kb_id, query, n_results)
|
||||||
|
return []
|
||||||
|
|
||||||
|
async def fetch_tool_resource(self, tool_id: str) -> Optional[Dict[str, Any]]:
|
||||||
|
_ = tool_id
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
class HistoryDisabledBackendAdapter:
|
||||||
|
"""Adapter wrapper that disables history writes while keeping reads available."""
|
||||||
|
|
||||||
|
def __init__(self, delegate: HttpBackendAdapter | NullBackendAdapter):
|
||||||
|
self._delegate = delegate
|
||||||
|
|
||||||
|
async def fetch_assistant_config(self, assistant_id: str) -> Optional[Dict[str, Any]]:
|
||||||
|
return await self._delegate.fetch_assistant_config(assistant_id)
|
||||||
|
|
||||||
|
async def create_call_record(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
user_id: int,
|
||||||
|
assistant_id: Optional[str],
|
||||||
|
source: str = "debug",
|
||||||
|
) -> Optional[str]:
|
||||||
|
_ = (user_id, assistant_id, source)
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def add_transcript(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
call_id: str,
|
||||||
|
turn_index: int,
|
||||||
|
speaker: str,
|
||||||
|
content: str,
|
||||||
|
start_ms: int,
|
||||||
|
end_ms: int,
|
||||||
|
confidence: Optional[float] = None,
|
||||||
|
duration_ms: Optional[int] = None,
|
||||||
|
) -> bool:
|
||||||
|
_ = (call_id, turn_index, speaker, content, start_ms, end_ms, confidence, duration_ms)
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def finalize_call_record(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
call_id: str,
|
||||||
|
status: str,
|
||||||
|
duration_seconds: int,
|
||||||
|
) -> bool:
|
||||||
|
_ = (call_id, status, duration_seconds)
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def search_knowledge_context(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
kb_id: str,
|
||||||
|
query: str,
|
||||||
|
n_results: int = 5,
|
||||||
|
) -> List[Dict[str, Any]]:
|
||||||
|
return await self._delegate.search_knowledge_context(
|
||||||
|
kb_id=kb_id,
|
||||||
|
query=query,
|
||||||
|
n_results=n_results,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def fetch_tool_resource(self, tool_id: str) -> Optional[Dict[str, Any]]:
|
||||||
|
return await self._delegate.fetch_tool_resource(tool_id)
|
||||||
|
|
||||||
|
|
||||||
|
class HttpBackendAdapter:
|
||||||
|
"""HTTP implementation of backend integration ports."""
|
||||||
|
|
||||||
|
def __init__(self, backend_url: str, timeout_sec: int = 10):
|
||||||
|
base_url = str(backend_url or "").strip().rstrip("/")
|
||||||
|
if not base_url:
|
||||||
|
raise ValueError("backend_url is required for HttpBackendAdapter")
|
||||||
|
self._base_url = base_url
|
||||||
|
self._timeout_sec = timeout_sec
|
||||||
|
|
||||||
|
def _timeout(self) -> aiohttp.ClientTimeout:
|
||||||
|
return aiohttp.ClientTimeout(total=self._timeout_sec)
|
||||||
|
|
||||||
|
async def fetch_assistant_config(self, assistant_id: str) -> Optional[Dict[str, Any]]:
|
||||||
|
"""Fetch assistant config payload from backend API.
|
||||||
|
|
||||||
|
Expected response shape:
|
||||||
|
{
|
||||||
|
"assistant": {...},
|
||||||
|
"voice": {...} | null
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
url = f"{self._base_url}/api/assistants/{assistant_id}/config"
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with aiohttp.ClientSession(timeout=self._timeout()) as session:
|
||||||
|
async with session.get(url) as resp:
|
||||||
|
if resp.status == 404:
|
||||||
|
logger.warning(f"Assistant config not found: {assistant_id}")
|
||||||
|
return None
|
||||||
|
resp.raise_for_status()
|
||||||
|
payload = await resp.json()
|
||||||
|
if not isinstance(payload, dict):
|
||||||
|
logger.warning("Assistant config payload is not a dict; ignoring")
|
||||||
|
return None
|
||||||
|
return payload
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning(f"Failed to fetch assistant config ({assistant_id}): {exc}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def create_call_record(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
user_id: int,
|
||||||
|
assistant_id: Optional[str],
|
||||||
|
source: str = "debug",
|
||||||
|
) -> Optional[str]:
|
||||||
|
"""Create a call record via backend history API and return call_id."""
|
||||||
|
url = f"{self._base_url}/api/history"
|
||||||
|
payload: Dict[str, Any] = {
|
||||||
|
"user_id": user_id,
|
||||||
|
"assistant_id": assistant_id,
|
||||||
|
"source": source,
|
||||||
|
"status": "connected",
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with aiohttp.ClientSession(timeout=self._timeout()) as session:
|
||||||
|
async with session.post(url, json=payload) as resp:
|
||||||
|
resp.raise_for_status()
|
||||||
|
data = await resp.json()
|
||||||
|
call_id = str((data or {}).get("id") or "")
|
||||||
|
return call_id or None
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning(f"Failed to create history call record: {exc}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def add_transcript(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
call_id: str,
|
||||||
|
turn_index: int,
|
||||||
|
speaker: str,
|
||||||
|
content: str,
|
||||||
|
start_ms: int,
|
||||||
|
end_ms: int,
|
||||||
|
confidence: Optional[float] = None,
|
||||||
|
duration_ms: Optional[int] = None,
|
||||||
|
) -> bool:
|
||||||
|
"""Append a transcript segment to backend history."""
|
||||||
|
if not call_id:
|
||||||
|
return False
|
||||||
|
|
||||||
|
url = f"{self._base_url}/api/history/{call_id}/transcripts"
|
||||||
|
payload: Dict[str, Any] = {
|
||||||
|
"turn_index": turn_index,
|
||||||
|
"speaker": speaker,
|
||||||
|
"content": content,
|
||||||
|
"confidence": confidence,
|
||||||
|
"start_ms": start_ms,
|
||||||
|
"end_ms": end_ms,
|
||||||
|
"duration_ms": duration_ms,
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with aiohttp.ClientSession(timeout=self._timeout()) as session:
|
||||||
|
async with session.post(url, json=payload) as resp:
|
||||||
|
resp.raise_for_status()
|
||||||
|
return True
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning(f"Failed to append history transcript (call_id={call_id}, turn={turn_index}): {exc}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def finalize_call_record(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
call_id: str,
|
||||||
|
status: str,
|
||||||
|
duration_seconds: int,
|
||||||
|
) -> bool:
|
||||||
|
"""Finalize a call record with status and duration."""
|
||||||
|
if not call_id:
|
||||||
|
return False
|
||||||
|
|
||||||
|
url = f"{self._base_url}/api/history/{call_id}"
|
||||||
|
payload: Dict[str, Any] = {
|
||||||
|
"status": status,
|
||||||
|
"duration_seconds": duration_seconds,
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with aiohttp.ClientSession(timeout=self._timeout()) as session:
|
||||||
|
async with session.put(url, json=payload) as resp:
|
||||||
|
resp.raise_for_status()
|
||||||
|
return True
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning(f"Failed to finalize history call record ({call_id}): {exc}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def search_knowledge_context(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
kb_id: str,
|
||||||
|
query: str,
|
||||||
|
n_results: int = 5,
|
||||||
|
) -> List[Dict[str, Any]]:
|
||||||
|
"""Search backend knowledge base and return retrieval results."""
|
||||||
|
if not kb_id or not query.strip():
|
||||||
|
return []
|
||||||
|
try:
|
||||||
|
safe_n_results = max(1, int(n_results))
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
safe_n_results = 5
|
||||||
|
|
||||||
|
url = f"{self._base_url}/api/knowledge/search"
|
||||||
|
payload: Dict[str, Any] = {
|
||||||
|
"kb_id": kb_id,
|
||||||
|
"query": query,
|
||||||
|
"nResults": safe_n_results,
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with aiohttp.ClientSession(timeout=self._timeout()) as session:
|
||||||
|
async with session.post(url, json=payload) as resp:
|
||||||
|
if resp.status == 404:
|
||||||
|
logger.warning(f"Knowledge base not found for retrieval: {kb_id}")
|
||||||
|
return []
|
||||||
|
resp.raise_for_status()
|
||||||
|
data = await resp.json()
|
||||||
|
if not isinstance(data, dict):
|
||||||
|
return []
|
||||||
|
results = data.get("results", [])
|
||||||
|
if not isinstance(results, list):
|
||||||
|
return []
|
||||||
|
return [r for r in results if isinstance(r, dict)]
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning(f"Knowledge search failed (kb_id={kb_id}): {exc}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
async def fetch_tool_resource(self, tool_id: str) -> Optional[Dict[str, Any]]:
|
||||||
|
"""Fetch tool resource configuration from backend API."""
|
||||||
|
if not tool_id:
|
||||||
|
return None
|
||||||
|
|
||||||
|
url = f"{self._base_url}/api/tools/resources/{tool_id}"
|
||||||
|
try:
|
||||||
|
async with aiohttp.ClientSession(timeout=self._timeout()) as session:
|
||||||
|
async with session.get(url) as resp:
|
||||||
|
if resp.status == 404:
|
||||||
|
return None
|
||||||
|
resp.raise_for_status()
|
||||||
|
data = await resp.json()
|
||||||
|
return data if isinstance(data, dict) else None
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning(f"Failed to fetch tool resource ({tool_id}): {exc}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def build_backend_adapter(
|
||||||
|
*,
|
||||||
|
backend_url: Optional[str],
|
||||||
|
backend_mode: str = "auto",
|
||||||
|
history_enabled: bool = True,
|
||||||
|
timeout_sec: int = 10,
|
||||||
|
) -> HttpBackendAdapter | NullBackendAdapter | HistoryDisabledBackendAdapter:
|
||||||
|
"""Create backend adapter implementation based on runtime settings."""
|
||||||
|
mode = str(backend_mode or "auto").strip().lower()
|
||||||
|
has_url = bool(str(backend_url or "").strip())
|
||||||
|
|
||||||
|
base_adapter: HttpBackendAdapter | NullBackendAdapter
|
||||||
|
if mode in {"disabled", "off", "none", "null", "engine_only", "engine-only"}:
|
||||||
|
base_adapter = NullBackendAdapter()
|
||||||
|
elif mode == "http":
|
||||||
|
if has_url:
|
||||||
|
base_adapter = HttpBackendAdapter(backend_url=str(backend_url), timeout_sec=timeout_sec)
|
||||||
|
else:
|
||||||
|
logger.warning("BACKEND_MODE=http but BACKEND_URL is empty; falling back to NullBackendAdapter")
|
||||||
|
base_adapter = NullBackendAdapter()
|
||||||
|
else:
|
||||||
|
if has_url:
|
||||||
|
base_adapter = HttpBackendAdapter(backend_url=str(backend_url), timeout_sec=timeout_sec)
|
||||||
|
else:
|
||||||
|
base_adapter = NullBackendAdapter()
|
||||||
|
|
||||||
|
if not history_enabled:
|
||||||
|
return HistoryDisabledBackendAdapter(base_adapter)
|
||||||
|
return base_adapter
|
||||||
|
|
||||||
|
|
||||||
|
def build_backend_adapter_from_settings() -> HttpBackendAdapter | NullBackendAdapter | HistoryDisabledBackendAdapter:
|
||||||
|
"""Create backend adapter using current app settings."""
|
||||||
|
return build_backend_adapter(
|
||||||
|
backend_url=settings.backend_url,
|
||||||
|
backend_mode=settings.backend_mode,
|
||||||
|
history_enabled=settings.history_enabled,
|
||||||
|
timeout_sec=settings.backend_timeout_sec,
|
||||||
|
)
|
||||||
@@ -1,56 +1,19 @@
|
|||||||
"""Backend API client for assistant config and history persistence."""
|
"""Compatibility wrappers around backend adapter implementations."""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from typing import Any, Dict, List, Optional
|
from typing import Any, Dict, List, Optional
|
||||||
|
|
||||||
import aiohttp
|
from app.backend_adapters import build_backend_adapter_from_settings
|
||||||
from loguru import logger
|
|
||||||
|
|
||||||
from app.config import settings
|
|
||||||
|
def _adapter():
|
||||||
|
return build_backend_adapter_from_settings()
|
||||||
|
|
||||||
|
|
||||||
async def fetch_assistant_config(assistant_id: str) -> Optional[Dict[str, Any]]:
|
async def fetch_assistant_config(assistant_id: str) -> Optional[Dict[str, Any]]:
|
||||||
"""Fetch assistant config payload from backend API.
|
"""Fetch assistant config payload from backend adapter."""
|
||||||
|
return await _adapter().fetch_assistant_config(assistant_id)
|
||||||
Expected response shape:
|
|
||||||
{
|
|
||||||
"assistant": {...},
|
|
||||||
"voice": {...} | null
|
|
||||||
}
|
|
||||||
"""
|
|
||||||
if not settings.backend_url:
|
|
||||||
logger.warning("BACKEND_URL not set; skipping assistant config fetch")
|
|
||||||
return None
|
|
||||||
|
|
||||||
url = f"{settings.backend_url.rstrip('/')}/api/assistants/{assistant_id}/config"
|
|
||||||
timeout = aiohttp.ClientTimeout(total=settings.backend_timeout_sec)
|
|
||||||
|
|
||||||
try:
|
|
||||||
async with aiohttp.ClientSession(timeout=timeout) as session:
|
|
||||||
async with session.get(url) as resp:
|
|
||||||
if resp.status == 404:
|
|
||||||
logger.warning(f"Assistant config not found: {assistant_id}")
|
|
||||||
return None
|
|
||||||
resp.raise_for_status()
|
|
||||||
payload = await resp.json()
|
|
||||||
if not isinstance(payload, dict):
|
|
||||||
logger.warning("Assistant config payload is not a dict; ignoring")
|
|
||||||
return None
|
|
||||||
return payload
|
|
||||||
except Exception as exc:
|
|
||||||
logger.warning(f"Failed to fetch assistant config ({assistant_id}): {exc}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def _backend_base_url() -> Optional[str]:
|
|
||||||
if not settings.backend_url:
|
|
||||||
return None
|
|
||||||
return settings.backend_url.rstrip("/")
|
|
||||||
|
|
||||||
|
|
||||||
def _timeout() -> aiohttp.ClientTimeout:
|
|
||||||
return aiohttp.ClientTimeout(total=settings.backend_timeout_sec)
|
|
||||||
|
|
||||||
|
|
||||||
async def create_history_call_record(
|
async def create_history_call_record(
|
||||||
@@ -60,28 +23,11 @@ async def create_history_call_record(
|
|||||||
source: str = "debug",
|
source: str = "debug",
|
||||||
) -> Optional[str]:
|
) -> Optional[str]:
|
||||||
"""Create a call record via backend history API and return call_id."""
|
"""Create a call record via backend history API and return call_id."""
|
||||||
base_url = _backend_base_url()
|
return await _adapter().create_call_record(
|
||||||
if not base_url:
|
user_id=user_id,
|
||||||
return None
|
assistant_id=assistant_id,
|
||||||
|
source=source,
|
||||||
url = f"{base_url}/api/history"
|
)
|
||||||
payload: Dict[str, Any] = {
|
|
||||||
"user_id": user_id,
|
|
||||||
"assistant_id": assistant_id,
|
|
||||||
"source": source,
|
|
||||||
"status": "connected",
|
|
||||||
}
|
|
||||||
|
|
||||||
try:
|
|
||||||
async with aiohttp.ClientSession(timeout=_timeout()) as session:
|
|
||||||
async with session.post(url, json=payload) as resp:
|
|
||||||
resp.raise_for_status()
|
|
||||||
data = await resp.json()
|
|
||||||
call_id = str((data or {}).get("id") or "")
|
|
||||||
return call_id or None
|
|
||||||
except Exception as exc:
|
|
||||||
logger.warning(f"Failed to create history call record: {exc}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
async def add_history_transcript(
|
async def add_history_transcript(
|
||||||
@@ -96,29 +42,16 @@ async def add_history_transcript(
|
|||||||
duration_ms: Optional[int] = None,
|
duration_ms: Optional[int] = None,
|
||||||
) -> bool:
|
) -> bool:
|
||||||
"""Append a transcript segment to backend history."""
|
"""Append a transcript segment to backend history."""
|
||||||
base_url = _backend_base_url()
|
return await _adapter().add_transcript(
|
||||||
if not base_url or not call_id:
|
call_id=call_id,
|
||||||
return False
|
turn_index=turn_index,
|
||||||
|
speaker=speaker,
|
||||||
url = f"{base_url}/api/history/{call_id}/transcripts"
|
content=content,
|
||||||
payload: Dict[str, Any] = {
|
start_ms=start_ms,
|
||||||
"turn_index": turn_index,
|
end_ms=end_ms,
|
||||||
"speaker": speaker,
|
confidence=confidence,
|
||||||
"content": content,
|
duration_ms=duration_ms,
|
||||||
"confidence": confidence,
|
)
|
||||||
"start_ms": start_ms,
|
|
||||||
"end_ms": end_ms,
|
|
||||||
"duration_ms": duration_ms,
|
|
||||||
}
|
|
||||||
|
|
||||||
try:
|
|
||||||
async with aiohttp.ClientSession(timeout=_timeout()) as session:
|
|
||||||
async with session.post(url, json=payload) as resp:
|
|
||||||
resp.raise_for_status()
|
|
||||||
return True
|
|
||||||
except Exception as exc:
|
|
||||||
logger.warning(f"Failed to append history transcript (call_id={call_id}, turn={turn_index}): {exc}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
async def finalize_history_call_record(
|
async def finalize_history_call_record(
|
||||||
@@ -128,24 +61,11 @@ async def finalize_history_call_record(
|
|||||||
duration_seconds: int,
|
duration_seconds: int,
|
||||||
) -> bool:
|
) -> bool:
|
||||||
"""Finalize a call record with status and duration."""
|
"""Finalize a call record with status and duration."""
|
||||||
base_url = _backend_base_url()
|
return await _adapter().finalize_call_record(
|
||||||
if not base_url or not call_id:
|
call_id=call_id,
|
||||||
return False
|
status=status,
|
||||||
|
duration_seconds=duration_seconds,
|
||||||
url = f"{base_url}/api/history/{call_id}"
|
)
|
||||||
payload: Dict[str, Any] = {
|
|
||||||
"status": status,
|
|
||||||
"duration_seconds": duration_seconds,
|
|
||||||
}
|
|
||||||
|
|
||||||
try:
|
|
||||||
async with aiohttp.ClientSession(timeout=_timeout()) as session:
|
|
||||||
async with session.put(url, json=payload) as resp:
|
|
||||||
resp.raise_for_status()
|
|
||||||
return True
|
|
||||||
except Exception as exc:
|
|
||||||
logger.warning(f"Failed to finalize history call record ({call_id}): {exc}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
async def search_knowledge_context(
|
async def search_knowledge_context(
|
||||||
@@ -155,57 +75,13 @@ async def search_knowledge_context(
|
|||||||
n_results: int = 5,
|
n_results: int = 5,
|
||||||
) -> List[Dict[str, Any]]:
|
) -> List[Dict[str, Any]]:
|
||||||
"""Search backend knowledge base and return retrieval results."""
|
"""Search backend knowledge base and return retrieval results."""
|
||||||
base_url = _backend_base_url()
|
return await _adapter().search_knowledge_context(
|
||||||
if not base_url:
|
kb_id=kb_id,
|
||||||
return []
|
query=query,
|
||||||
if not kb_id or not query.strip():
|
n_results=n_results,
|
||||||
return []
|
)
|
||||||
try:
|
|
||||||
safe_n_results = max(1, int(n_results))
|
|
||||||
except (TypeError, ValueError):
|
|
||||||
safe_n_results = 5
|
|
||||||
|
|
||||||
url = f"{base_url}/api/knowledge/search"
|
|
||||||
payload: Dict[str, Any] = {
|
|
||||||
"kb_id": kb_id,
|
|
||||||
"query": query,
|
|
||||||
"nResults": safe_n_results,
|
|
||||||
}
|
|
||||||
|
|
||||||
try:
|
|
||||||
async with aiohttp.ClientSession(timeout=_timeout()) as session:
|
|
||||||
async with session.post(url, json=payload) as resp:
|
|
||||||
if resp.status == 404:
|
|
||||||
logger.warning(f"Knowledge base not found for retrieval: {kb_id}")
|
|
||||||
return []
|
|
||||||
resp.raise_for_status()
|
|
||||||
data = await resp.json()
|
|
||||||
if not isinstance(data, dict):
|
|
||||||
return []
|
|
||||||
results = data.get("results", [])
|
|
||||||
if not isinstance(results, list):
|
|
||||||
return []
|
|
||||||
return [r for r in results if isinstance(r, dict)]
|
|
||||||
except Exception as exc:
|
|
||||||
logger.warning(f"Knowledge search failed (kb_id={kb_id}): {exc}")
|
|
||||||
return []
|
|
||||||
|
|
||||||
|
|
||||||
async def fetch_tool_resource(tool_id: str) -> Optional[Dict[str, Any]]:
|
async def fetch_tool_resource(tool_id: str) -> Optional[Dict[str, Any]]:
|
||||||
"""Fetch tool resource configuration from backend API."""
|
"""Fetch tool resource configuration from backend API."""
|
||||||
base_url = _backend_base_url()
|
return await _adapter().fetch_tool_resource(tool_id)
|
||||||
if not base_url or not tool_id:
|
|
||||||
return None
|
|
||||||
|
|
||||||
url = f"{base_url}/api/tools/resources/{tool_id}"
|
|
||||||
try:
|
|
||||||
async with aiohttp.ClientSession(timeout=_timeout()) as session:
|
|
||||||
async with session.get(url) as resp:
|
|
||||||
if resp.status == 404:
|
|
||||||
return None
|
|
||||||
resp.raise_for_status()
|
|
||||||
data = await resp.json()
|
|
||||||
return data if isinstance(data, dict) else None
|
|
||||||
except Exception as exc:
|
|
||||||
logger.warning(f"Failed to fetch tool resource ({tool_id}): {exc}")
|
|
||||||
return None
|
|
||||||
|
|||||||
@@ -95,6 +95,7 @@ _AGENT_SETTING_KEYS = {
|
|||||||
"duplex_system_prompt",
|
"duplex_system_prompt",
|
||||||
"barge_in_min_duration_ms",
|
"barge_in_min_duration_ms",
|
||||||
"barge_in_silence_tolerance_ms",
|
"barge_in_silence_tolerance_ms",
|
||||||
|
"tools",
|
||||||
}
|
}
|
||||||
_BASE_REQUIRED_AGENT_SETTING_KEYS = {
|
_BASE_REQUIRED_AGENT_SETTING_KEYS = {
|
||||||
"vad_type",
|
"vad_type",
|
||||||
@@ -239,6 +240,11 @@ def _normalize_agent_overrides(raw: Dict[str, Any]) -> Dict[str, Any]:
|
|||||||
"Section 'siliconflow' is no longer supported. "
|
"Section 'siliconflow' is no longer supported. "
|
||||||
"Move provider-specific fields into agent.llm / agent.asr / agent.tts."
|
"Move provider-specific fields into agent.llm / agent.asr / agent.tts."
|
||||||
)
|
)
|
||||||
|
if key == "tools":
|
||||||
|
if not isinstance(value, list):
|
||||||
|
raise ValueError("Agent config key 'tools' must be a list")
|
||||||
|
normalized["tools"] = value
|
||||||
|
continue
|
||||||
section_map = _AGENT_SECTION_KEY_MAP.get(key)
|
section_map = _AGENT_SECTION_KEY_MAP.get(key)
|
||||||
if section_map is None:
|
if section_map is None:
|
||||||
normalized[key] = value
|
normalized[key] = value
|
||||||
@@ -444,6 +450,10 @@ class Settings(BaseSettings):
|
|||||||
description="How much silence (ms) is tolerated during potential barge-in before reset"
|
description="How much silence (ms) is tolerated during potential barge-in before reset"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Optional tool declarations from agent YAML.
|
||||||
|
# Supports OpenAI function schema style entries and/or shorthand string names.
|
||||||
|
tools: List[Any] = Field(default_factory=list, description="Default tool definitions for runtime")
|
||||||
|
|
||||||
# Logging
|
# Logging
|
||||||
log_level: str = Field(default="INFO", description="Logging level")
|
log_level: str = Field(default="INFO", description="Logging level")
|
||||||
log_format: str = Field(default="json", description="Log format (json or text)")
|
log_format: str = Field(default="json", description="Log format (json or text)")
|
||||||
@@ -468,9 +478,21 @@ class Settings(BaseSettings):
|
|||||||
ws_require_auth: bool = Field(default=False, description="Require auth in hello message even when ws_api_key is not set")
|
ws_require_auth: bool = Field(default=False, description="Require auth in hello message even when ws_api_key is not set")
|
||||||
|
|
||||||
# Backend bridge configuration (for call/transcript persistence)
|
# Backend bridge configuration (for call/transcript persistence)
|
||||||
|
backend_mode: str = Field(
|
||||||
|
default="auto",
|
||||||
|
description="Backend integration mode: auto | http | disabled"
|
||||||
|
)
|
||||||
backend_url: Optional[str] = Field(default=None, description="Backend API base URL (e.g. http://localhost:8787)")
|
backend_url: Optional[str] = Field(default=None, description="Backend API base URL (e.g. http://localhost:8787)")
|
||||||
backend_timeout_sec: int = Field(default=10, description="Backend API request timeout in seconds")
|
backend_timeout_sec: int = Field(default=10, description="Backend API request timeout in seconds")
|
||||||
|
history_enabled: bool = Field(default=True, description="Enable history write bridge")
|
||||||
history_default_user_id: int = Field(default=1, description="Fallback user_id for history records")
|
history_default_user_id: int = Field(default=1, description="Fallback user_id for history records")
|
||||||
|
history_queue_max_size: int = Field(default=256, description="Max buffered transcript writes per session")
|
||||||
|
history_retry_max_attempts: int = Field(default=2, description="Retry attempts for each transcript write")
|
||||||
|
history_retry_backoff_sec: float = Field(default=0.2, description="Base retry backoff for transcript writes")
|
||||||
|
history_finalize_drain_timeout_sec: float = Field(
|
||||||
|
default=1.5,
|
||||||
|
description="Max wait before finalizing history when queue is still draining"
|
||||||
|
)
|
||||||
|
|
||||||
# Agent YAML metadata
|
# Agent YAML metadata
|
||||||
agent_config_path: Optional[str] = Field(default=None, description="Resolved agent YAML path")
|
agent_config_path: Optional[str] = Field(default=None, description="Resolved agent YAML path")
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ except ImportError:
|
|||||||
logger.warning("aiortc not available - WebRTC endpoint will be disabled")
|
logger.warning("aiortc not available - WebRTC endpoint will be disabled")
|
||||||
|
|
||||||
from app.config import settings
|
from app.config import settings
|
||||||
|
from app.backend_adapters import build_backend_adapter_from_settings
|
||||||
from core.transports import SocketTransport, WebRtcTransport, BaseTransport
|
from core.transports import SocketTransport, WebRtcTransport, BaseTransport
|
||||||
from core.session import Session
|
from core.session import Session
|
||||||
from processors.tracks import Resampled16kTrack
|
from processors.tracks import Resampled16kTrack
|
||||||
@@ -75,6 +76,7 @@ app.add_middleware(
|
|||||||
|
|
||||||
# Active sessions storage
|
# Active sessions storage
|
||||||
active_sessions: Dict[str, Session] = {}
|
active_sessions: Dict[str, Session] = {}
|
||||||
|
backend_gateway = build_backend_adapter_from_settings()
|
||||||
|
|
||||||
# Configure logging
|
# Configure logging
|
||||||
logger.remove()
|
logger.remove()
|
||||||
@@ -164,7 +166,7 @@ async def websocket_endpoint(websocket: WebSocket):
|
|||||||
|
|
||||||
# Create transport and session
|
# Create transport and session
|
||||||
transport = SocketTransport(websocket)
|
transport = SocketTransport(websocket)
|
||||||
session = Session(session_id, transport)
|
session = Session(session_id, transport, backend_gateway=backend_gateway)
|
||||||
active_sessions[session_id] = session
|
active_sessions[session_id] = session
|
||||||
|
|
||||||
logger.info(f"WebSocket connection established: {session_id}")
|
logger.info(f"WebSocket connection established: {session_id}")
|
||||||
@@ -243,7 +245,7 @@ async def webrtc_endpoint(websocket: WebSocket):
|
|||||||
|
|
||||||
# Create transport and session
|
# Create transport and session
|
||||||
transport = WebRtcTransport(websocket, pc)
|
transport = WebRtcTransport(websocket, pc)
|
||||||
session = Session(session_id, transport)
|
session = Session(session_id, transport, backend_gateway=backend_gateway)
|
||||||
active_sessions[session_id] = session
|
active_sessions[session_id] = session
|
||||||
|
|
||||||
logger.info(f"WebRTC connection established: {session_id}")
|
logger.info(f"WebRTC connection established: {session_id}")
|
||||||
|
|||||||
73
config/agents/tools.yaml
Normal file
73
config/agents/tools.yaml
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
# Agent behavior configuration with tool declarations.
|
||||||
|
# This profile is an example only.
|
||||||
|
|
||||||
|
agent:
|
||||||
|
vad:
|
||||||
|
type: silero
|
||||||
|
model_path: data/vad/silero_vad.onnx
|
||||||
|
threshold: 0.5
|
||||||
|
min_speech_duration_ms: 100
|
||||||
|
eou_threshold_ms: 800
|
||||||
|
|
||||||
|
llm:
|
||||||
|
# provider: openai | openai_compatible | siliconflow
|
||||||
|
provider: openai_compatible
|
||||||
|
model: deepseek-v3
|
||||||
|
temperature: 0.7
|
||||||
|
api_key: your_llm_api_key
|
||||||
|
api_url: https://api.qnaigc.com/v1
|
||||||
|
|
||||||
|
tts:
|
||||||
|
# provider: edge | openai_compatible | siliconflow
|
||||||
|
provider: openai_compatible
|
||||||
|
api_key: your_tts_api_key
|
||||||
|
api_url: https://api.siliconflow.cn/v1/audio/speech
|
||||||
|
model: FunAudioLLM/CosyVoice2-0.5B
|
||||||
|
voice: anna
|
||||||
|
speed: 1.0
|
||||||
|
|
||||||
|
asr:
|
||||||
|
# provider: buffered | openai_compatible | siliconflow
|
||||||
|
provider: openai_compatible
|
||||||
|
api_key: your_asr_api_key
|
||||||
|
api_url: https://api.siliconflow.cn/v1/audio/transcriptions
|
||||||
|
model: FunAudioLLM/SenseVoiceSmall
|
||||||
|
interim_interval_ms: 500
|
||||||
|
min_audio_ms: 300
|
||||||
|
start_min_speech_ms: 160
|
||||||
|
pre_speech_ms: 240
|
||||||
|
final_tail_ms: 120
|
||||||
|
|
||||||
|
duplex:
|
||||||
|
enabled: true
|
||||||
|
system_prompt: You are a helpful voice assistant with tool-calling support.
|
||||||
|
|
||||||
|
barge_in:
|
||||||
|
min_duration_ms: 200
|
||||||
|
silence_tolerance_ms: 60
|
||||||
|
|
||||||
|
# Tool declarations consumed by the engine at startup.
|
||||||
|
# - String form enables built-in/default tool schema when available.
|
||||||
|
# - Object form provides OpenAI function schema + executor hint.
|
||||||
|
tools:
|
||||||
|
- current_time
|
||||||
|
- calculator
|
||||||
|
- name: weather
|
||||||
|
description: Get weather by city name.
|
||||||
|
parameters:
|
||||||
|
type: object
|
||||||
|
properties:
|
||||||
|
city:
|
||||||
|
type: string
|
||||||
|
description: City name, for example "San Francisco".
|
||||||
|
required: [city]
|
||||||
|
executor: server
|
||||||
|
- name: open_map
|
||||||
|
description: Open map app on the client device.
|
||||||
|
parameters:
|
||||||
|
type: object
|
||||||
|
properties:
|
||||||
|
query:
|
||||||
|
type: string
|
||||||
|
required: [query]
|
||||||
|
executor: client
|
||||||
@@ -15,7 +15,7 @@ import asyncio
|
|||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
from typing import Any, Callable, Dict, List, Optional, Tuple
|
from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
@@ -86,7 +86,16 @@ class DuplexPipeline:
|
|||||||
tts_service: Optional[BaseTTSService] = None,
|
tts_service: Optional[BaseTTSService] = None,
|
||||||
asr_service: Optional[BaseASRService] = None,
|
asr_service: Optional[BaseASRService] = None,
|
||||||
system_prompt: Optional[str] = None,
|
system_prompt: Optional[str] = None,
|
||||||
greeting: Optional[str] = None
|
greeting: Optional[str] = None,
|
||||||
|
knowledge_searcher: Optional[
|
||||||
|
Callable[..., Awaitable[List[Dict[str, Any]]]]
|
||||||
|
] = None,
|
||||||
|
tool_resource_resolver: Optional[
|
||||||
|
Callable[[str], Awaitable[Optional[Dict[str, Any]]]]
|
||||||
|
] = None,
|
||||||
|
server_tool_executor: Optional[
|
||||||
|
Callable[[Dict[str, Any]], Awaitable[Dict[str, Any]]]
|
||||||
|
] = None,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Initialize duplex pipeline.
|
Initialize duplex pipeline.
|
||||||
@@ -127,6 +136,9 @@ class DuplexPipeline:
|
|||||||
self.llm_service = llm_service
|
self.llm_service = llm_service
|
||||||
self.tts_service = tts_service
|
self.tts_service = tts_service
|
||||||
self.asr_service = asr_service # Will be initialized in start()
|
self.asr_service = asr_service # Will be initialized in start()
|
||||||
|
self._knowledge_searcher = knowledge_searcher
|
||||||
|
self._tool_resource_resolver = tool_resource_resolver
|
||||||
|
self._server_tool_executor = server_tool_executor
|
||||||
|
|
||||||
# Track last sent transcript to avoid duplicates
|
# Track last sent transcript to avoid duplicates
|
||||||
self._last_sent_transcript = ""
|
self._last_sent_transcript = ""
|
||||||
@@ -194,7 +206,8 @@ class DuplexPipeline:
|
|||||||
self._runtime_barge_in_min_duration_ms: Optional[int] = None
|
self._runtime_barge_in_min_duration_ms: Optional[int] = None
|
||||||
self._runtime_knowledge: Dict[str, Any] = {}
|
self._runtime_knowledge: Dict[str, Any] = {}
|
||||||
self._runtime_knowledge_base_id: Optional[str] = None
|
self._runtime_knowledge_base_id: Optional[str] = None
|
||||||
self._runtime_tools: List[Any] = []
|
raw_default_tools = settings.tools if isinstance(settings.tools, list) else []
|
||||||
|
self._runtime_tools: List[Any] = list(raw_default_tools)
|
||||||
self._runtime_tool_executor: Dict[str, str] = {}
|
self._runtime_tool_executor: Dict[str, str] = {}
|
||||||
self._pending_tool_waiters: Dict[str, asyncio.Future] = {}
|
self._pending_tool_waiters: Dict[str, asyncio.Future] = {}
|
||||||
self._early_tool_results: Dict[str, Dict[str, Any]] = {}
|
self._early_tool_results: Dict[str, Dict[str, Any]] = {}
|
||||||
@@ -215,6 +228,20 @@ class DuplexPipeline:
|
|||||||
self._pending_llm_delta: str = ""
|
self._pending_llm_delta: str = ""
|
||||||
self._last_llm_delta_emit_ms: float = 0.0
|
self._last_llm_delta_emit_ms: float = 0.0
|
||||||
|
|
||||||
|
self._runtime_tool_executor = self._resolved_tool_executor_map()
|
||||||
|
|
||||||
|
if self._server_tool_executor is None:
|
||||||
|
if self._tool_resource_resolver:
|
||||||
|
async def _executor(call: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
|
return await execute_server_tool(
|
||||||
|
call,
|
||||||
|
tool_resource_fetcher=self._tool_resource_resolver,
|
||||||
|
)
|
||||||
|
|
||||||
|
self._server_tool_executor = _executor
|
||||||
|
else:
|
||||||
|
self._server_tool_executor = execute_server_tool
|
||||||
|
|
||||||
logger.info(f"DuplexPipeline initialized for session {session_id}")
|
logger.info(f"DuplexPipeline initialized for session {session_id}")
|
||||||
|
|
||||||
def set_event_sequence_provider(self, provider: Callable[[], int]) -> None:
|
def set_event_sequence_provider(self, provider: Callable[[], int]) -> None:
|
||||||
@@ -345,7 +372,7 @@ class DuplexPipeline:
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
"tools": {
|
"tools": {
|
||||||
"allowlist": sorted(self._runtime_tool_executor.keys()),
|
"allowlist": self._resolved_tool_allowlist(),
|
||||||
},
|
},
|
||||||
"tracks": {
|
"tracks": {
|
||||||
"audio_in": self.track_audio_in,
|
"audio_in": self.track_audio_in,
|
||||||
@@ -559,6 +586,7 @@ class DuplexPipeline:
|
|||||||
base_url=llm_base_url,
|
base_url=llm_base_url,
|
||||||
model=llm_model,
|
model=llm_model,
|
||||||
knowledge_config=self._resolved_knowledge_config(),
|
knowledge_config=self._resolved_knowledge_config(),
|
||||||
|
knowledge_searcher=self._knowledge_searcher,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
logger.warning("LLM provider unsupported or API key missing - using mock LLM")
|
logger.warning("LLM provider unsupported or API key missing - using mock LLM")
|
||||||
@@ -1140,6 +1168,23 @@ class DuplexPipeline:
|
|||||||
result[name] = executor
|
result[name] = executor
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
def _resolved_tool_allowlist(self) -> List[str]:
|
||||||
|
names: set[str] = set()
|
||||||
|
for item in self._runtime_tools:
|
||||||
|
if isinstance(item, str):
|
||||||
|
name = item.strip()
|
||||||
|
if name:
|
||||||
|
names.add(name)
|
||||||
|
continue
|
||||||
|
if not isinstance(item, dict):
|
||||||
|
continue
|
||||||
|
fn = item.get("function")
|
||||||
|
if isinstance(fn, dict) and fn.get("name"):
|
||||||
|
names.add(str(fn.get("name")).strip())
|
||||||
|
elif item.get("name"):
|
||||||
|
names.add(str(item.get("name")).strip())
|
||||||
|
return sorted([name for name in names if name])
|
||||||
|
|
||||||
def _tool_name(self, tool_call: Dict[str, Any]) -> str:
|
def _tool_name(self, tool_call: Dict[str, Any]) -> str:
|
||||||
fn = tool_call.get("function")
|
fn = tool_call.get("function")
|
||||||
if isinstance(fn, dict):
|
if isinstance(fn, dict):
|
||||||
@@ -1491,7 +1536,7 @@ class DuplexPipeline:
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
result = await asyncio.wait_for(
|
result = await asyncio.wait_for(
|
||||||
execute_server_tool(call),
|
self._server_tool_executor(call),
|
||||||
timeout=self._SERVER_TOOL_TIMEOUT_SECONDS,
|
timeout=self._SERVER_TOOL_TIMEOUT_SECONDS,
|
||||||
)
|
)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
|
|||||||
244
core/history_bridge.py
Normal file
244
core/history_bridge.py
Normal file
@@ -0,0 +1,244 @@
|
|||||||
|
"""Async history bridge for non-blocking transcript persistence."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import time
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Any, Optional
|
||||||
|
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class _HistoryTranscriptJob:
|
||||||
|
call_id: str
|
||||||
|
turn_index: int
|
||||||
|
speaker: str
|
||||||
|
content: str
|
||||||
|
start_ms: int
|
||||||
|
end_ms: int
|
||||||
|
duration_ms: int
|
||||||
|
|
||||||
|
|
||||||
|
class SessionHistoryBridge:
|
||||||
|
"""Session-scoped buffered history writer with background retries."""
|
||||||
|
|
||||||
|
_STOP_SENTINEL = object()
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
history_writer: Any,
|
||||||
|
enabled: bool,
|
||||||
|
queue_max_size: int,
|
||||||
|
retry_max_attempts: int,
|
||||||
|
retry_backoff_sec: float,
|
||||||
|
finalize_drain_timeout_sec: float,
|
||||||
|
):
|
||||||
|
self._history_writer = history_writer
|
||||||
|
self._enabled = bool(enabled and history_writer is not None)
|
||||||
|
self._queue_max_size = max(1, int(queue_max_size))
|
||||||
|
self._retry_max_attempts = max(0, int(retry_max_attempts))
|
||||||
|
self._retry_backoff_sec = max(0.0, float(retry_backoff_sec))
|
||||||
|
self._finalize_drain_timeout_sec = max(0.0, float(finalize_drain_timeout_sec))
|
||||||
|
|
||||||
|
self._call_id: Optional[str] = None
|
||||||
|
self._turn_index: int = 0
|
||||||
|
self._started_mono: Optional[float] = None
|
||||||
|
self._finalized: bool = False
|
||||||
|
self._worker_task: Optional[asyncio.Task] = None
|
||||||
|
self._finalize_lock = asyncio.Lock()
|
||||||
|
self._queue: asyncio.Queue[_HistoryTranscriptJob | object] = asyncio.Queue(maxsize=self._queue_max_size)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def enabled(self) -> bool:
|
||||||
|
return self._enabled
|
||||||
|
|
||||||
|
@property
|
||||||
|
def call_id(self) -> Optional[str]:
|
||||||
|
return self._call_id
|
||||||
|
|
||||||
|
async def start_call(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
user_id: int,
|
||||||
|
assistant_id: Optional[str],
|
||||||
|
source: str,
|
||||||
|
) -> Optional[str]:
|
||||||
|
"""Create remote call record and start background worker."""
|
||||||
|
if not self._enabled or self._call_id:
|
||||||
|
return self._call_id
|
||||||
|
|
||||||
|
call_id = await self._history_writer.create_call_record(
|
||||||
|
user_id=user_id,
|
||||||
|
assistant_id=assistant_id,
|
||||||
|
source=source,
|
||||||
|
)
|
||||||
|
if not call_id:
|
||||||
|
return None
|
||||||
|
|
||||||
|
self._call_id = str(call_id)
|
||||||
|
self._turn_index = 0
|
||||||
|
self._finalized = False
|
||||||
|
self._started_mono = time.monotonic()
|
||||||
|
self._ensure_worker()
|
||||||
|
return self._call_id
|
||||||
|
|
||||||
|
def elapsed_ms(self) -> int:
|
||||||
|
if self._started_mono is None:
|
||||||
|
return 0
|
||||||
|
return max(0, int((time.monotonic() - self._started_mono) * 1000))
|
||||||
|
|
||||||
|
def enqueue_turn(self, *, role: str, text: str) -> bool:
|
||||||
|
"""Queue one transcript write without blocking the caller."""
|
||||||
|
if not self._enabled or not self._call_id or self._finalized:
|
||||||
|
return False
|
||||||
|
|
||||||
|
content = str(text or "").strip()
|
||||||
|
if not content:
|
||||||
|
return False
|
||||||
|
|
||||||
|
speaker = "human" if str(role or "").strip().lower() == "user" else "ai"
|
||||||
|
end_ms = self.elapsed_ms()
|
||||||
|
estimated_duration_ms = max(300, min(12000, len(content) * 80))
|
||||||
|
start_ms = max(0, end_ms - estimated_duration_ms)
|
||||||
|
|
||||||
|
job = _HistoryTranscriptJob(
|
||||||
|
call_id=self._call_id,
|
||||||
|
turn_index=self._turn_index,
|
||||||
|
speaker=speaker,
|
||||||
|
content=content,
|
||||||
|
start_ms=start_ms,
|
||||||
|
end_ms=end_ms,
|
||||||
|
duration_ms=max(1, end_ms - start_ms),
|
||||||
|
)
|
||||||
|
self._turn_index += 1
|
||||||
|
self._ensure_worker()
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._queue.put_nowait(job)
|
||||||
|
return True
|
||||||
|
except asyncio.QueueFull:
|
||||||
|
logger.warning(
|
||||||
|
"History queue full; dropping transcript call_id={} turn={}",
|
||||||
|
self._call_id,
|
||||||
|
job.turn_index,
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def finalize(self, *, status: str) -> bool:
|
||||||
|
"""Finalize history record once; waits briefly for queue drain."""
|
||||||
|
if not self._enabled or not self._call_id:
|
||||||
|
return False
|
||||||
|
|
||||||
|
async with self._finalize_lock:
|
||||||
|
if self._finalized:
|
||||||
|
return True
|
||||||
|
|
||||||
|
await self._drain_queue()
|
||||||
|
ok = await self._history_writer.finalize_call_record(
|
||||||
|
call_id=self._call_id,
|
||||||
|
status=status,
|
||||||
|
duration_seconds=self.duration_seconds(),
|
||||||
|
)
|
||||||
|
if ok:
|
||||||
|
self._finalized = True
|
||||||
|
await self._stop_worker()
|
||||||
|
return ok
|
||||||
|
|
||||||
|
async def shutdown(self) -> None:
|
||||||
|
"""Stop worker task and release queue resources."""
|
||||||
|
await self._stop_worker()
|
||||||
|
|
||||||
|
def duration_seconds(self) -> int:
|
||||||
|
if self._started_mono is None:
|
||||||
|
return 0
|
||||||
|
return max(0, int(time.monotonic() - self._started_mono))
|
||||||
|
|
||||||
|
def _ensure_worker(self) -> None:
|
||||||
|
if self._worker_task and not self._worker_task.done():
|
||||||
|
return
|
||||||
|
self._worker_task = asyncio.create_task(self._worker_loop())
|
||||||
|
|
||||||
|
async def _drain_queue(self) -> None:
|
||||||
|
if self._finalize_drain_timeout_sec <= 0:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(self._queue.join(), timeout=self._finalize_drain_timeout_sec)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
logger.warning("History queue drain timed out after {}s", self._finalize_drain_timeout_sec)
|
||||||
|
|
||||||
|
async def _stop_worker(self) -> None:
|
||||||
|
task = self._worker_task
|
||||||
|
if not task:
|
||||||
|
return
|
||||||
|
if task.done():
|
||||||
|
self._worker_task = None
|
||||||
|
return
|
||||||
|
|
||||||
|
sent = False
|
||||||
|
try:
|
||||||
|
self._queue.put_nowait(self._STOP_SENTINEL)
|
||||||
|
sent = True
|
||||||
|
except asyncio.QueueFull:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if not sent:
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(self._queue.put(self._STOP_SENTINEL), timeout=0.5)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
task.cancel()
|
||||||
|
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(task, timeout=1.5)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
task.cancel()
|
||||||
|
try:
|
||||||
|
await task
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
self._worker_task = None
|
||||||
|
|
||||||
|
async def _worker_loop(self) -> None:
|
||||||
|
while True:
|
||||||
|
item = await self._queue.get()
|
||||||
|
try:
|
||||||
|
if item is self._STOP_SENTINEL:
|
||||||
|
return
|
||||||
|
|
||||||
|
assert isinstance(item, _HistoryTranscriptJob)
|
||||||
|
await self._write_with_retry(item)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("History worker write failed unexpectedly: {}", exc)
|
||||||
|
finally:
|
||||||
|
self._queue.task_done()
|
||||||
|
|
||||||
|
async def _write_with_retry(self, job: _HistoryTranscriptJob) -> bool:
|
||||||
|
for attempt in range(self._retry_max_attempts + 1):
|
||||||
|
ok = await self._history_writer.add_transcript(
|
||||||
|
call_id=job.call_id,
|
||||||
|
turn_index=job.turn_index,
|
||||||
|
speaker=job.speaker,
|
||||||
|
content=job.content,
|
||||||
|
start_ms=job.start_ms,
|
||||||
|
end_ms=job.end_ms,
|
||||||
|
duration_ms=job.duration_ms,
|
||||||
|
)
|
||||||
|
if ok:
|
||||||
|
return True
|
||||||
|
|
||||||
|
if attempt >= self._retry_max_attempts:
|
||||||
|
logger.warning(
|
||||||
|
"History write dropped after retries call_id={} turn={}",
|
||||||
|
job.call_id,
|
||||||
|
job.turn_index,
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
if self._retry_backoff_sec > 0:
|
||||||
|
await asyncio.sleep(self._retry_backoff_sec * (2**attempt))
|
||||||
|
return False
|
||||||
17
core/ports/__init__.py
Normal file
17
core/ports/__init__.py
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
"""Port interfaces for engine-side integration boundaries."""
|
||||||
|
|
||||||
|
from core.ports.backend import (
|
||||||
|
AssistantConfigProvider,
|
||||||
|
BackendGateway,
|
||||||
|
HistoryWriter,
|
||||||
|
KnowledgeSearcher,
|
||||||
|
ToolResourceResolver,
|
||||||
|
)
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"AssistantConfigProvider",
|
||||||
|
"BackendGateway",
|
||||||
|
"HistoryWriter",
|
||||||
|
"KnowledgeSearcher",
|
||||||
|
"ToolResourceResolver",
|
||||||
|
]
|
||||||
84
core/ports/backend.py
Normal file
84
core/ports/backend.py
Normal file
@@ -0,0 +1,84 @@
|
|||||||
|
"""Backend integration ports.
|
||||||
|
|
||||||
|
These interfaces define the boundary between engine runtime logic and
|
||||||
|
backend-side capabilities (config lookup, history persistence, retrieval,
|
||||||
|
and tool resource discovery).
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import Any, Dict, List, Optional, Protocol
|
||||||
|
|
||||||
|
|
||||||
|
class AssistantConfigProvider(Protocol):
|
||||||
|
"""Port for loading trusted assistant runtime configuration."""
|
||||||
|
|
||||||
|
async def fetch_assistant_config(self, assistant_id: str) -> Optional[Dict[str, Any]]:
|
||||||
|
"""Fetch assistant configuration payload."""
|
||||||
|
|
||||||
|
|
||||||
|
class HistoryWriter(Protocol):
|
||||||
|
"""Port for persisting call and transcript history."""
|
||||||
|
|
||||||
|
async def create_call_record(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
user_id: int,
|
||||||
|
assistant_id: Optional[str],
|
||||||
|
source: str = "debug",
|
||||||
|
) -> Optional[str]:
|
||||||
|
"""Create a call record and return backend call ID."""
|
||||||
|
|
||||||
|
async def add_transcript(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
call_id: str,
|
||||||
|
turn_index: int,
|
||||||
|
speaker: str,
|
||||||
|
content: str,
|
||||||
|
start_ms: int,
|
||||||
|
end_ms: int,
|
||||||
|
confidence: Optional[float] = None,
|
||||||
|
duration_ms: Optional[int] = None,
|
||||||
|
) -> bool:
|
||||||
|
"""Append one transcript turn segment."""
|
||||||
|
|
||||||
|
async def finalize_call_record(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
call_id: str,
|
||||||
|
status: str,
|
||||||
|
duration_seconds: int,
|
||||||
|
) -> bool:
|
||||||
|
"""Finalize a call record."""
|
||||||
|
|
||||||
|
|
||||||
|
class KnowledgeSearcher(Protocol):
|
||||||
|
"""Port for RAG / knowledge retrieval operations."""
|
||||||
|
|
||||||
|
async def search_knowledge_context(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
kb_id: str,
|
||||||
|
query: str,
|
||||||
|
n_results: int = 5,
|
||||||
|
) -> List[Dict[str, Any]]:
|
||||||
|
"""Search a knowledge source and return ranked snippets."""
|
||||||
|
|
||||||
|
|
||||||
|
class ToolResourceResolver(Protocol):
|
||||||
|
"""Port for resolving tool metadata/configuration."""
|
||||||
|
|
||||||
|
async def fetch_tool_resource(self, tool_id: str) -> Optional[Dict[str, Any]]:
|
||||||
|
"""Fetch tool resource configuration."""
|
||||||
|
|
||||||
|
|
||||||
|
class BackendGateway(
|
||||||
|
AssistantConfigProvider,
|
||||||
|
HistoryWriter,
|
||||||
|
KnowledgeSearcher,
|
||||||
|
ToolResourceResolver,
|
||||||
|
Protocol,
|
||||||
|
):
|
||||||
|
"""Composite backend gateway interface used by engine services."""
|
||||||
|
|
||||||
@@ -9,15 +9,11 @@ from enum import Enum
|
|||||||
from typing import Optional, Dict, Any, List
|
from typing import Optional, Dict, Any, List
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
from app.backend_client import (
|
from app.backend_adapters import build_backend_adapter_from_settings
|
||||||
fetch_assistant_config,
|
|
||||||
create_history_call_record,
|
|
||||||
add_history_transcript,
|
|
||||||
finalize_history_call_record,
|
|
||||||
)
|
|
||||||
from core.transports import BaseTransport
|
from core.transports import BaseTransport
|
||||||
from core.duplex_pipeline import DuplexPipeline
|
from core.duplex_pipeline import DuplexPipeline
|
||||||
from core.conversation import ConversationTurn
|
from core.conversation import ConversationTurn
|
||||||
|
from core.history_bridge import SessionHistoryBridge
|
||||||
from core.workflow_runner import WorkflowRunner, WorkflowTransition, WorkflowNodeDef, WorkflowEdgeDef
|
from core.workflow_runner import WorkflowRunner, WorkflowTransition, WorkflowNodeDef, WorkflowEdgeDef
|
||||||
from app.config import settings
|
from app.config import settings
|
||||||
from services.base import LLMMessage
|
from services.base import LLMMessage
|
||||||
@@ -76,7 +72,13 @@ class Session:
|
|||||||
"config_version_id",
|
"config_version_id",
|
||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, session_id: str, transport: BaseTransport, use_duplex: bool = None):
|
def __init__(
|
||||||
|
self,
|
||||||
|
session_id: str,
|
||||||
|
transport: BaseTransport,
|
||||||
|
use_duplex: bool = None,
|
||||||
|
backend_gateway: Optional[Any] = None,
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
Initialize session.
|
Initialize session.
|
||||||
|
|
||||||
@@ -88,12 +90,23 @@ class Session:
|
|||||||
self.id = session_id
|
self.id = session_id
|
||||||
self.transport = transport
|
self.transport = transport
|
||||||
self.use_duplex = use_duplex if use_duplex is not None else settings.duplex_enabled
|
self.use_duplex = use_duplex if use_duplex is not None else settings.duplex_enabled
|
||||||
|
self._backend_gateway = backend_gateway or build_backend_adapter_from_settings()
|
||||||
|
self._history_bridge = SessionHistoryBridge(
|
||||||
|
history_writer=self._backend_gateway,
|
||||||
|
enabled=settings.history_enabled,
|
||||||
|
queue_max_size=settings.history_queue_max_size,
|
||||||
|
retry_max_attempts=settings.history_retry_max_attempts,
|
||||||
|
retry_backoff_sec=settings.history_retry_backoff_sec,
|
||||||
|
finalize_drain_timeout_sec=settings.history_finalize_drain_timeout_sec,
|
||||||
|
)
|
||||||
|
|
||||||
self.pipeline = DuplexPipeline(
|
self.pipeline = DuplexPipeline(
|
||||||
transport=transport,
|
transport=transport,
|
||||||
session_id=session_id,
|
session_id=session_id,
|
||||||
system_prompt=settings.duplex_system_prompt,
|
system_prompt=settings.duplex_system_prompt,
|
||||||
greeting=settings.duplex_greeting
|
greeting=settings.duplex_greeting,
|
||||||
|
knowledge_searcher=getattr(self._backend_gateway, "search_knowledge_context", None),
|
||||||
|
tool_resource_resolver=getattr(self._backend_gateway, "fetch_tool_resource", None),
|
||||||
)
|
)
|
||||||
|
|
||||||
# Session state
|
# Session state
|
||||||
@@ -107,10 +120,6 @@ class Session:
|
|||||||
# Track IDs
|
# Track IDs
|
||||||
self.current_track_id: str = self.TRACK_CONTROL
|
self.current_track_id: str = self.TRACK_CONTROL
|
||||||
self._event_seq: int = 0
|
self._event_seq: int = 0
|
||||||
self._history_call_id: Optional[str] = None
|
|
||||||
self._history_turn_index: int = 0
|
|
||||||
self._history_call_started_mono: Optional[float] = None
|
|
||||||
self._history_finalized: bool = False
|
|
||||||
self._cleanup_lock = asyncio.Lock()
|
self._cleanup_lock = asyncio.Lock()
|
||||||
self._cleaned_up = False
|
self._cleaned_up = False
|
||||||
self.workflow_runner: Optional[WorkflowRunner] = None
|
self.workflow_runner: Optional[WorkflowRunner] = None
|
||||||
@@ -424,11 +433,12 @@ class Session:
|
|||||||
logger.info(f"Session {self.id} cleaning up")
|
logger.info(f"Session {self.id} cleaning up")
|
||||||
await self._finalize_history(status="connected")
|
await self._finalize_history(status="connected")
|
||||||
await self.pipeline.cleanup()
|
await self.pipeline.cleanup()
|
||||||
|
await self._history_bridge.shutdown()
|
||||||
await self.transport.close()
|
await self.transport.close()
|
||||||
|
|
||||||
async def _start_history_bridge(self, metadata: Dict[str, Any]) -> None:
|
async def _start_history_bridge(self, metadata: Dict[str, Any]) -> None:
|
||||||
"""Initialize backend history call record for this session."""
|
"""Initialize backend history call record for this session."""
|
||||||
if self._history_call_id:
|
if self._history_bridge.call_id:
|
||||||
return
|
return
|
||||||
|
|
||||||
history_meta: Dict[str, Any] = {}
|
history_meta: Dict[str, Any] = {}
|
||||||
@@ -444,7 +454,7 @@ class Session:
|
|||||||
assistant_id = history_meta.get("assistantId", metadata.get("assistantId"))
|
assistant_id = history_meta.get("assistantId", metadata.get("assistantId"))
|
||||||
source = str(history_meta.get("source", metadata.get("source", "debug")))
|
source = str(history_meta.get("source", metadata.get("source", "debug")))
|
||||||
|
|
||||||
call_id = await create_history_call_record(
|
call_id = await self._history_bridge.start_call(
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
assistant_id=str(assistant_id) if assistant_id else None,
|
assistant_id=str(assistant_id) if assistant_id else None,
|
||||||
source=source,
|
source=source,
|
||||||
@@ -452,10 +462,6 @@ class Session:
|
|||||||
if not call_id:
|
if not call_id:
|
||||||
return
|
return
|
||||||
|
|
||||||
self._history_call_id = call_id
|
|
||||||
self._history_call_started_mono = time.monotonic()
|
|
||||||
self._history_turn_index = 0
|
|
||||||
self._history_finalized = False
|
|
||||||
logger.info(f"Session {self.id} history bridge enabled (call_id={call_id}, source={source})")
|
logger.info(f"Session {self.id} history bridge enabled (call_id={call_id}, source={source})")
|
||||||
|
|
||||||
async def _on_turn_complete(self, turn: ConversationTurn) -> None:
|
async def _on_turn_complete(self, turn: ConversationTurn) -> None:
|
||||||
@@ -467,48 +473,11 @@ class Session:
|
|||||||
elif role == "assistant":
|
elif role == "assistant":
|
||||||
await self._maybe_advance_workflow(turn.text.strip())
|
await self._maybe_advance_workflow(turn.text.strip())
|
||||||
|
|
||||||
if not self._history_call_id:
|
self._history_bridge.enqueue_turn(role=turn.role or "", text=turn.text or "")
|
||||||
return
|
|
||||||
if not turn.text or not turn.text.strip():
|
|
||||||
return
|
|
||||||
|
|
||||||
role = (turn.role or "").lower()
|
|
||||||
speaker = "human" if role == "user" else "ai"
|
|
||||||
|
|
||||||
end_ms = 0
|
|
||||||
if self._history_call_started_mono is not None:
|
|
||||||
end_ms = max(0, int((time.monotonic() - self._history_call_started_mono) * 1000))
|
|
||||||
estimated_duration_ms = max(300, min(12000, len(turn.text.strip()) * 80))
|
|
||||||
start_ms = max(0, end_ms - estimated_duration_ms)
|
|
||||||
|
|
||||||
turn_index = self._history_turn_index
|
|
||||||
await add_history_transcript(
|
|
||||||
call_id=self._history_call_id,
|
|
||||||
turn_index=turn_index,
|
|
||||||
speaker=speaker,
|
|
||||||
content=turn.text.strip(),
|
|
||||||
start_ms=start_ms,
|
|
||||||
end_ms=end_ms,
|
|
||||||
duration_ms=max(1, end_ms - start_ms),
|
|
||||||
)
|
|
||||||
self._history_turn_index += 1
|
|
||||||
|
|
||||||
async def _finalize_history(self, status: str) -> None:
|
async def _finalize_history(self, status: str) -> None:
|
||||||
"""Finalize history call record once."""
|
"""Finalize history call record once."""
|
||||||
if not self._history_call_id or self._history_finalized:
|
await self._history_bridge.finalize(status=status)
|
||||||
return
|
|
||||||
|
|
||||||
duration_seconds = 0
|
|
||||||
if self._history_call_started_mono is not None:
|
|
||||||
duration_seconds = max(0, int(time.monotonic() - self._history_call_started_mono))
|
|
||||||
|
|
||||||
ok = await finalize_history_call_record(
|
|
||||||
call_id=self._history_call_id,
|
|
||||||
status=status,
|
|
||||||
duration_seconds=duration_seconds,
|
|
||||||
)
|
|
||||||
if ok:
|
|
||||||
self._history_finalized = True
|
|
||||||
|
|
||||||
def _bootstrap_workflow(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
|
def _bootstrap_workflow(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
"""Parse workflow payload and return initial runtime overrides."""
|
"""Parse workflow payload and return initial runtime overrides."""
|
||||||
@@ -795,10 +764,12 @@ class Session:
|
|||||||
)
|
)
|
||||||
if assistant_id is None:
|
if assistant_id is None:
|
||||||
return {}
|
return {}
|
||||||
if not settings.backend_url:
|
|
||||||
|
provider = getattr(self._backend_gateway, "fetch_assistant_config", None)
|
||||||
|
if not callable(provider):
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
payload = await fetch_assistant_config(str(assistant_id).strip())
|
payload = await provider(str(assistant_id).strip())
|
||||||
if not isinstance(payload, dict):
|
if not isinstance(payload, dict):
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
|||||||
@@ -4,11 +4,13 @@ import asyncio
|
|||||||
import ast
|
import ast
|
||||||
import operator
|
import operator
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import Any, Dict
|
from typing import Any, Awaitable, Callable, Dict, Optional
|
||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
|
|
||||||
from app.backend_client import fetch_tool_resource
|
from app.backend_adapters import build_backend_adapter_from_settings
|
||||||
|
|
||||||
|
ToolResourceFetcher = Callable[[str], Awaitable[Optional[Dict[str, Any]]]]
|
||||||
|
|
||||||
_BIN_OPS = {
|
_BIN_OPS = {
|
||||||
ast.Add: operator.add,
|
ast.Add: operator.add,
|
||||||
@@ -170,11 +172,21 @@ def _extract_tool_args(tool_call: Dict[str, Any]) -> Dict[str, Any]:
|
|||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
|
||||||
async def execute_server_tool(tool_call: Dict[str, Any]) -> Dict[str, Any]:
|
async def fetch_tool_resource(tool_id: str) -> Optional[Dict[str, Any]]:
|
||||||
|
"""Default tool resource resolver via backend adapter."""
|
||||||
|
adapter = build_backend_adapter_from_settings()
|
||||||
|
return await adapter.fetch_tool_resource(tool_id)
|
||||||
|
|
||||||
|
|
||||||
|
async def execute_server_tool(
|
||||||
|
tool_call: Dict[str, Any],
|
||||||
|
tool_resource_fetcher: Optional[ToolResourceFetcher] = None,
|
||||||
|
) -> Dict[str, Any]:
|
||||||
"""Execute a server-side tool and return normalized result payload."""
|
"""Execute a server-side tool and return normalized result payload."""
|
||||||
call_id = str(tool_call.get("id") or "").strip()
|
call_id = str(tool_call.get("id") or "").strip()
|
||||||
tool_name = _extract_tool_name(tool_call)
|
tool_name = _extract_tool_name(tool_call)
|
||||||
args = _extract_tool_args(tool_call)
|
args = _extract_tool_args(tool_call)
|
||||||
|
resource_fetcher = tool_resource_fetcher or fetch_tool_resource
|
||||||
|
|
||||||
if tool_name == "calculator":
|
if tool_name == "calculator":
|
||||||
expression = str(args.get("expression") or "").strip()
|
expression = str(args.get("expression") or "").strip()
|
||||||
@@ -257,7 +269,7 @@ async def execute_server_tool(tool_call: Dict[str, Any]) -> Dict[str, Any]:
|
|||||||
}
|
}
|
||||||
|
|
||||||
if tool_name and tool_name not in {"calculator", "code_interpreter", "current_time"}:
|
if tool_name and tool_name not in {"calculator", "code_interpreter", "current_time"}:
|
||||||
resource = await fetch_tool_resource(tool_name)
|
resource = await resource_fetcher(tool_name)
|
||||||
if resource and str(resource.get("category") or "") == "query":
|
if resource and str(resource.get("category") or "") == "query":
|
||||||
method = str(resource.get("http_method") or "GET").strip().upper()
|
method = str(resource.get("http_method") or "GET").strip().upper()
|
||||||
if method not in {"GET", "POST", "PUT", "PATCH", "DELETE"}:
|
if method not in {"GET", "POST", "PUT", "PATCH", "DELETE"}:
|
||||||
|
|||||||
47
docs/backend_integration.md
Normal file
47
docs/backend_integration.md
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
# Backend Integration and History Bridge
|
||||||
|
|
||||||
|
This engine uses adapter-based backend integration so core runtime logic can run
|
||||||
|
with or without an external backend service.
|
||||||
|
|
||||||
|
## Runtime Modes
|
||||||
|
|
||||||
|
Configure with environment variables:
|
||||||
|
|
||||||
|
- `BACKEND_MODE=auto|http|disabled`
|
||||||
|
- `BACKEND_URL`
|
||||||
|
- `BACKEND_TIMEOUT_SEC`
|
||||||
|
- `HISTORY_ENABLED=true|false`
|
||||||
|
|
||||||
|
Mode behavior:
|
||||||
|
|
||||||
|
- `auto`: use HTTP backend adapter only when `BACKEND_URL` is set.
|
||||||
|
- `http`: force HTTP backend adapter (falls back to null adapter when URL is missing).
|
||||||
|
- `disabled`: force null adapter and run engine-only.
|
||||||
|
|
||||||
|
## Architecture
|
||||||
|
|
||||||
|
- Ports: `core/ports/backend.py`
|
||||||
|
- Adapters: `app/backend_adapters.py`
|
||||||
|
- Compatibility wrappers: `app/backend_client.py`
|
||||||
|
|
||||||
|
`Session` and `DuplexPipeline` receive backend capabilities via injected adapter
|
||||||
|
methods instead of hard-coding backend client imports.
|
||||||
|
|
||||||
|
## Async History Writes
|
||||||
|
|
||||||
|
Session history persistence is handled by `core/history_bridge.py`.
|
||||||
|
|
||||||
|
Design:
|
||||||
|
|
||||||
|
- transcript writes are queued with `put_nowait` (non-blocking turn path)
|
||||||
|
- background worker drains queue
|
||||||
|
- failed writes retry with exponential backoff
|
||||||
|
- finalize waits briefly for queue drain before sending call finalize
|
||||||
|
- finalize is idempotent
|
||||||
|
|
||||||
|
Related settings:
|
||||||
|
|
||||||
|
- `HISTORY_QUEUE_MAX_SIZE`
|
||||||
|
- `HISTORY_RETRY_MAX_ATTEMPTS`
|
||||||
|
- `HISTORY_RETRY_BACKOFF_SEC`
|
||||||
|
- `HISTORY_FINALIZE_DRAIN_TIMEOUT_SEC`
|
||||||
@@ -7,10 +7,10 @@ for real-time voice conversation.
|
|||||||
import os
|
import os
|
||||||
import asyncio
|
import asyncio
|
||||||
import uuid
|
import uuid
|
||||||
from typing import AsyncIterator, Optional, List, Dict, Any
|
from typing import AsyncIterator, Optional, List, Dict, Any, Callable, Awaitable
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
from app.backend_client import search_knowledge_context
|
from app.backend_adapters import build_backend_adapter_from_settings
|
||||||
from services.base import BaseLLMService, LLMMessage, LLMStreamEvent, ServiceState
|
from services.base import BaseLLMService, LLMMessage, LLMStreamEvent, ServiceState
|
||||||
|
|
||||||
# Try to import openai
|
# Try to import openai
|
||||||
@@ -37,6 +37,7 @@ class OpenAILLMService(BaseLLMService):
|
|||||||
base_url: Optional[str] = None,
|
base_url: Optional[str] = None,
|
||||||
system_prompt: Optional[str] = None,
|
system_prompt: Optional[str] = None,
|
||||||
knowledge_config: Optional[Dict[str, Any]] = None,
|
knowledge_config: Optional[Dict[str, Any]] = None,
|
||||||
|
knowledge_searcher: Optional[Callable[..., Awaitable[List[Dict[str, Any]]]]] = None,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Initialize OpenAI LLM service.
|
Initialize OpenAI LLM service.
|
||||||
@@ -60,6 +61,11 @@ class OpenAILLMService(BaseLLMService):
|
|||||||
self.client: Optional[AsyncOpenAI] = None
|
self.client: Optional[AsyncOpenAI] = None
|
||||||
self._cancel_event = asyncio.Event()
|
self._cancel_event = asyncio.Event()
|
||||||
self._knowledge_config: Dict[str, Any] = knowledge_config or {}
|
self._knowledge_config: Dict[str, Any] = knowledge_config or {}
|
||||||
|
if knowledge_searcher is None:
|
||||||
|
adapter = build_backend_adapter_from_settings()
|
||||||
|
self._knowledge_searcher = adapter.search_knowledge_context
|
||||||
|
else:
|
||||||
|
self._knowledge_searcher = knowledge_searcher
|
||||||
self._tool_schemas: List[Dict[str, Any]] = []
|
self._tool_schemas: List[Dict[str, Any]] = []
|
||||||
|
|
||||||
_RAG_DEFAULT_RESULTS = 5
|
_RAG_DEFAULT_RESULTS = 5
|
||||||
@@ -224,7 +230,7 @@ class OpenAILLMService(BaseLLMService):
|
|||||||
n_results = self._coerce_int(cfg.get("nResults"), self._RAG_DEFAULT_RESULTS)
|
n_results = self._coerce_int(cfg.get("nResults"), self._RAG_DEFAULT_RESULTS)
|
||||||
n_results = max(1, min(n_results, self._RAG_MAX_RESULTS))
|
n_results = max(1, min(n_results, self._RAG_MAX_RESULTS))
|
||||||
|
|
||||||
results = await search_knowledge_context(
|
results = await self._knowledge_searcher(
|
||||||
kb_id=kb_id,
|
kb_id=kb_id,
|
||||||
query=latest_user,
|
query=latest_user,
|
||||||
n_results=n_results,
|
n_results=n_results,
|
||||||
|
|||||||
@@ -202,3 +202,51 @@ def test_agent_yaml_missing_env_reference_fails(monkeypatch, tmp_path):
|
|||||||
|
|
||||||
with pytest.raises(ValueError, match="Missing environment variable"):
|
with pytest.raises(ValueError, match="Missing environment variable"):
|
||||||
load_settings(argv=["--agent-config", str(file_path)])
|
load_settings(argv=["--agent-config", str(file_path)])
|
||||||
|
|
||||||
|
|
||||||
|
def test_agent_yaml_tools_list_is_loaded(monkeypatch, tmp_path):
|
||||||
|
monkeypatch.chdir(tmp_path)
|
||||||
|
file_path = tmp_path / "tools-agent.yaml"
|
||||||
|
_write_yaml(
|
||||||
|
file_path,
|
||||||
|
_full_agent_yaml()
|
||||||
|
+ """
|
||||||
|
|
||||||
|
tools:
|
||||||
|
- current_time
|
||||||
|
- name: weather
|
||||||
|
description: Get weather by city.
|
||||||
|
parameters:
|
||||||
|
type: object
|
||||||
|
properties:
|
||||||
|
city:
|
||||||
|
type: string
|
||||||
|
required: [city]
|
||||||
|
executor: server
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
|
||||||
|
settings = load_settings(argv=["--agent-config", str(file_path)])
|
||||||
|
|
||||||
|
assert isinstance(settings.tools, list)
|
||||||
|
assert settings.tools[0] == "current_time"
|
||||||
|
assert settings.tools[1]["name"] == "weather"
|
||||||
|
assert settings.tools[1]["executor"] == "server"
|
||||||
|
|
||||||
|
|
||||||
|
def test_agent_yaml_tools_must_be_list(monkeypatch, tmp_path):
|
||||||
|
monkeypatch.chdir(tmp_path)
|
||||||
|
file_path = tmp_path / "bad-tools-agent.yaml"
|
||||||
|
_write_yaml(
|
||||||
|
file_path,
|
||||||
|
_full_agent_yaml()
|
||||||
|
+ """
|
||||||
|
|
||||||
|
tools:
|
||||||
|
weather:
|
||||||
|
executor: server
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
|
||||||
|
with pytest.raises(ValueError, match="Agent config key 'tools' must be a list"):
|
||||||
|
load_settings(argv=["--agent-config", str(file_path)])
|
||||||
|
|||||||
150
tests/test_backend_adapters.py
Normal file
150
tests/test_backend_adapters.py
Normal file
@@ -0,0 +1,150 @@
|
|||||||
|
import aiohttp
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from app.backend_adapters import (
|
||||||
|
HistoryDisabledBackendAdapter,
|
||||||
|
HttpBackendAdapter,
|
||||||
|
NullBackendAdapter,
|
||||||
|
build_backend_adapter,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_build_backend_adapter_without_url_returns_null_adapter():
|
||||||
|
adapter = build_backend_adapter(
|
||||||
|
backend_url=None,
|
||||||
|
backend_mode="auto",
|
||||||
|
history_enabled=True,
|
||||||
|
timeout_sec=3,
|
||||||
|
)
|
||||||
|
assert isinstance(adapter, NullBackendAdapter)
|
||||||
|
|
||||||
|
assert await adapter.fetch_assistant_config("assistant_1") is None
|
||||||
|
assert (
|
||||||
|
await adapter.create_call_record(
|
||||||
|
user_id=1,
|
||||||
|
assistant_id="assistant_1",
|
||||||
|
source="debug",
|
||||||
|
)
|
||||||
|
is None
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
await adapter.add_transcript(
|
||||||
|
call_id="call_1",
|
||||||
|
turn_index=0,
|
||||||
|
speaker="human",
|
||||||
|
content="hi",
|
||||||
|
start_ms=0,
|
||||||
|
end_ms=100,
|
||||||
|
confidence=0.9,
|
||||||
|
duration_ms=100,
|
||||||
|
)
|
||||||
|
is False
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
await adapter.finalize_call_record(
|
||||||
|
call_id="call_1",
|
||||||
|
status="connected",
|
||||||
|
duration_seconds=2,
|
||||||
|
)
|
||||||
|
is False
|
||||||
|
)
|
||||||
|
assert await adapter.search_knowledge_context(kb_id="kb_1", query="hello", n_results=3) == []
|
||||||
|
assert await adapter.fetch_tool_resource("tool_1") is None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_http_backend_adapter_create_call_record_posts_expected_payload(monkeypatch):
|
||||||
|
captured = {}
|
||||||
|
|
||||||
|
class _FakeResponse:
|
||||||
|
def __init__(self, status=200, payload=None):
|
||||||
|
self.status = status
|
||||||
|
self._payload = payload if payload is not None else {}
|
||||||
|
|
||||||
|
async def __aenter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
async def __aexit__(self, exc_type, exc, tb):
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def json(self):
|
||||||
|
return self._payload
|
||||||
|
|
||||||
|
def raise_for_status(self):
|
||||||
|
if self.status >= 400:
|
||||||
|
raise RuntimeError("http_error")
|
||||||
|
|
||||||
|
class _FakeClientSession:
|
||||||
|
def __init__(self, timeout=None):
|
||||||
|
captured["timeout"] = timeout
|
||||||
|
|
||||||
|
async def __aenter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
async def __aexit__(self, exc_type, exc, tb):
|
||||||
|
return None
|
||||||
|
|
||||||
|
def post(self, url, json=None):
|
||||||
|
captured["url"] = url
|
||||||
|
captured["json"] = json
|
||||||
|
return _FakeResponse(status=200, payload={"id": "call_123"})
|
||||||
|
|
||||||
|
monkeypatch.setattr("app.backend_adapters.aiohttp.ClientSession", _FakeClientSession)
|
||||||
|
|
||||||
|
adapter = build_backend_adapter(
|
||||||
|
backend_url="http://localhost:8100",
|
||||||
|
backend_mode="auto",
|
||||||
|
history_enabled=True,
|
||||||
|
timeout_sec=7,
|
||||||
|
)
|
||||||
|
assert isinstance(adapter, HttpBackendAdapter)
|
||||||
|
|
||||||
|
call_id = await adapter.create_call_record(
|
||||||
|
user_id=99,
|
||||||
|
assistant_id="assistant_9",
|
||||||
|
source="debug",
|
||||||
|
)
|
||||||
|
|
||||||
|
assert call_id == "call_123"
|
||||||
|
assert captured["url"] == "http://localhost:8100/api/history"
|
||||||
|
assert captured["json"] == {
|
||||||
|
"user_id": 99,
|
||||||
|
"assistant_id": "assistant_9",
|
||||||
|
"source": "debug",
|
||||||
|
"status": "connected",
|
||||||
|
}
|
||||||
|
assert isinstance(captured["timeout"], aiohttp.ClientTimeout)
|
||||||
|
assert captured["timeout"].total == 7
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_backend_mode_disabled_forces_null_even_with_url():
|
||||||
|
adapter = build_backend_adapter(
|
||||||
|
backend_url="http://localhost:8100",
|
||||||
|
backend_mode="disabled",
|
||||||
|
history_enabled=True,
|
||||||
|
timeout_sec=7,
|
||||||
|
)
|
||||||
|
assert isinstance(adapter, NullBackendAdapter)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_history_disabled_wraps_backend_adapter():
|
||||||
|
adapter = build_backend_adapter(
|
||||||
|
backend_url="http://localhost:8100",
|
||||||
|
backend_mode="auto",
|
||||||
|
history_enabled=False,
|
||||||
|
timeout_sec=7,
|
||||||
|
)
|
||||||
|
assert isinstance(adapter, HistoryDisabledBackendAdapter)
|
||||||
|
assert await adapter.create_call_record(user_id=1, assistant_id="a1", source="debug") is None
|
||||||
|
assert await adapter.add_transcript(
|
||||||
|
call_id="c1",
|
||||||
|
turn_index=0,
|
||||||
|
speaker="human",
|
||||||
|
content="hi",
|
||||||
|
start_ms=0,
|
||||||
|
end_ms=10,
|
||||||
|
duration_ms=10,
|
||||||
|
) is False
|
||||||
147
tests/test_history_bridge.py
Normal file
147
tests/test_history_bridge.py
Normal file
@@ -0,0 +1,147 @@
|
|||||||
|
import asyncio
|
||||||
|
import time
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from core.history_bridge import SessionHistoryBridge
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeHistoryWriter:
|
||||||
|
def __init__(self, *, add_delay_s: float = 0.0, add_result: bool = True):
|
||||||
|
self.add_delay_s = add_delay_s
|
||||||
|
self.add_result = add_result
|
||||||
|
self.created_call_ids = []
|
||||||
|
self.transcripts = []
|
||||||
|
self.finalize_calls = 0
|
||||||
|
self.finalize_statuses = []
|
||||||
|
self.finalize_at = None
|
||||||
|
self.last_transcript_at = None
|
||||||
|
|
||||||
|
async def create_call_record(self, *, user_id: int, assistant_id: str | None, source: str = "debug"):
|
||||||
|
_ = (user_id, assistant_id, source)
|
||||||
|
call_id = "call_test_1"
|
||||||
|
self.created_call_ids.append(call_id)
|
||||||
|
return call_id
|
||||||
|
|
||||||
|
async def add_transcript(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
call_id: str,
|
||||||
|
turn_index: int,
|
||||||
|
speaker: str,
|
||||||
|
content: str,
|
||||||
|
start_ms: int,
|
||||||
|
end_ms: int,
|
||||||
|
confidence: float | None = None,
|
||||||
|
duration_ms: int | None = None,
|
||||||
|
) -> bool:
|
||||||
|
_ = confidence
|
||||||
|
if self.add_delay_s > 0:
|
||||||
|
await asyncio.sleep(self.add_delay_s)
|
||||||
|
self.transcripts.append(
|
||||||
|
{
|
||||||
|
"call_id": call_id,
|
||||||
|
"turn_index": turn_index,
|
||||||
|
"speaker": speaker,
|
||||||
|
"content": content,
|
||||||
|
"start_ms": start_ms,
|
||||||
|
"end_ms": end_ms,
|
||||||
|
"duration_ms": duration_ms,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
self.last_transcript_at = time.monotonic()
|
||||||
|
return self.add_result
|
||||||
|
|
||||||
|
async def finalize_call_record(self, *, call_id: str, status: str, duration_seconds: int) -> bool:
|
||||||
|
_ = (call_id, duration_seconds)
|
||||||
|
self.finalize_calls += 1
|
||||||
|
self.finalize_statuses.append(status)
|
||||||
|
self.finalize_at = time.monotonic()
|
||||||
|
return True
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_slow_backend_does_not_block_enqueue():
|
||||||
|
writer = _FakeHistoryWriter(add_delay_s=0.15, add_result=True)
|
||||||
|
bridge = SessionHistoryBridge(
|
||||||
|
history_writer=writer,
|
||||||
|
enabled=True,
|
||||||
|
queue_max_size=32,
|
||||||
|
retry_max_attempts=0,
|
||||||
|
retry_backoff_sec=0.01,
|
||||||
|
finalize_drain_timeout_sec=1.0,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
call_id = await bridge.start_call(user_id=1, assistant_id="assistant_1", source="debug")
|
||||||
|
assert call_id == "call_test_1"
|
||||||
|
|
||||||
|
t0 = time.perf_counter()
|
||||||
|
queued = bridge.enqueue_turn(role="user", text="hello world")
|
||||||
|
elapsed_s = time.perf_counter() - t0
|
||||||
|
|
||||||
|
assert queued is True
|
||||||
|
assert elapsed_s < 0.02
|
||||||
|
|
||||||
|
await bridge.finalize(status="connected")
|
||||||
|
assert len(writer.transcripts) == 1
|
||||||
|
assert writer.finalize_calls == 1
|
||||||
|
finally:
|
||||||
|
await bridge.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_failing_backend_retries_but_enqueue_remains_non_blocking():
|
||||||
|
writer = _FakeHistoryWriter(add_delay_s=0.01, add_result=False)
|
||||||
|
bridge = SessionHistoryBridge(
|
||||||
|
history_writer=writer,
|
||||||
|
enabled=True,
|
||||||
|
queue_max_size=32,
|
||||||
|
retry_max_attempts=2,
|
||||||
|
retry_backoff_sec=0.01,
|
||||||
|
finalize_drain_timeout_sec=0.5,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await bridge.start_call(user_id=1, assistant_id="assistant_1", source="debug")
|
||||||
|
t0 = time.perf_counter()
|
||||||
|
assert bridge.enqueue_turn(role="assistant", text="retry me")
|
||||||
|
elapsed_s = time.perf_counter() - t0
|
||||||
|
assert elapsed_s < 0.02
|
||||||
|
|
||||||
|
await bridge.finalize(status="connected")
|
||||||
|
|
||||||
|
# Initial try + 2 retries
|
||||||
|
assert len(writer.transcripts) == 3
|
||||||
|
assert writer.finalize_calls == 1
|
||||||
|
finally:
|
||||||
|
await bridge.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_finalize_is_idempotent_and_waits_for_queue_drain():
|
||||||
|
writer = _FakeHistoryWriter(add_delay_s=0.05, add_result=True)
|
||||||
|
bridge = SessionHistoryBridge(
|
||||||
|
history_writer=writer,
|
||||||
|
enabled=True,
|
||||||
|
queue_max_size=32,
|
||||||
|
retry_max_attempts=0,
|
||||||
|
retry_backoff_sec=0.01,
|
||||||
|
finalize_drain_timeout_sec=1.0,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await bridge.start_call(user_id=1, assistant_id="assistant_1", source="debug")
|
||||||
|
assert bridge.enqueue_turn(role="user", text="first")
|
||||||
|
|
||||||
|
ok_1 = await bridge.finalize(status="connected")
|
||||||
|
ok_2 = await bridge.finalize(status="connected")
|
||||||
|
|
||||||
|
assert ok_1 is True
|
||||||
|
assert ok_2 is True
|
||||||
|
assert len(writer.transcripts) == 1
|
||||||
|
assert writer.finalize_calls == 1
|
||||||
|
assert writer.last_transcript_at is not None
|
||||||
|
assert writer.finalize_at is not None
|
||||||
|
assert writer.finalize_at >= writer.last_transcript_at
|
||||||
|
finally:
|
||||||
|
await bridge.shutdown()
|
||||||
@@ -92,6 +92,34 @@ def _build_pipeline(monkeypatch, llm_rounds: List[List[LLMStreamEvent]]) -> tupl
|
|||||||
return pipeline, events
|
return pipeline, events
|
||||||
|
|
||||||
|
|
||||||
|
def test_pipeline_uses_default_tools_from_settings(monkeypatch):
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"core.duplex_pipeline.settings.tools",
|
||||||
|
[
|
||||||
|
"current_time",
|
||||||
|
{
|
||||||
|
"name": "weather",
|
||||||
|
"description": "Get weather by city",
|
||||||
|
"parameters": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {"city": {"type": "string"}},
|
||||||
|
"required": ["city"],
|
||||||
|
},
|
||||||
|
"executor": "server",
|
||||||
|
},
|
||||||
|
],
|
||||||
|
)
|
||||||
|
pipeline, _events = _build_pipeline(monkeypatch, [[LLMStreamEvent(type="done")]])
|
||||||
|
|
||||||
|
cfg = pipeline.resolved_runtime_config()
|
||||||
|
assert cfg["tools"]["allowlist"] == ["current_time", "weather"]
|
||||||
|
|
||||||
|
schemas = pipeline._resolved_tool_schemas()
|
||||||
|
names = [s.get("function", {}).get("name") for s in schemas if isinstance(s, dict)]
|
||||||
|
assert "current_time" in names
|
||||||
|
assert "weather" in names
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_ws_message_parses_tool_call_results():
|
async def test_ws_message_parses_tool_call_results():
|
||||||
msg = parse_client_message(
|
msg = parse_client_message(
|
||||||
|
|||||||
Reference in New Issue
Block a user