Use decoupled way for backend client

This commit is contained in:
Xin Wang
2026-02-25 17:05:40 +08:00
parent 1cd2da1042
commit 08319a4cc7
15 changed files with 1203 additions and 228 deletions

View File

@@ -15,7 +15,7 @@ import asyncio
import json
import time
import uuid
from typing import Any, Callable, Dict, List, Optional, Tuple
from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple
import numpy as np
from loguru import logger
@@ -86,7 +86,16 @@ class DuplexPipeline:
tts_service: Optional[BaseTTSService] = None,
asr_service: Optional[BaseASRService] = None,
system_prompt: Optional[str] = None,
greeting: Optional[str] = None
greeting: Optional[str] = None,
knowledge_searcher: Optional[
Callable[..., Awaitable[List[Dict[str, Any]]]]
] = None,
tool_resource_resolver: Optional[
Callable[[str], Awaitable[Optional[Dict[str, Any]]]]
] = None,
server_tool_executor: Optional[
Callable[[Dict[str, Any]], Awaitable[Dict[str, Any]]]
] = None,
):
"""
Initialize duplex pipeline.
@@ -127,6 +136,9 @@ class DuplexPipeline:
self.llm_service = llm_service
self.tts_service = tts_service
self.asr_service = asr_service # Will be initialized in start()
self._knowledge_searcher = knowledge_searcher
self._tool_resource_resolver = tool_resource_resolver
self._server_tool_executor = server_tool_executor
# Track last sent transcript to avoid duplicates
self._last_sent_transcript = ""
@@ -215,6 +227,18 @@ class DuplexPipeline:
self._pending_llm_delta: str = ""
self._last_llm_delta_emit_ms: float = 0.0
if self._server_tool_executor is None:
if self._tool_resource_resolver:
async def _executor(call: Dict[str, Any]) -> Dict[str, Any]:
return await execute_server_tool(
call,
tool_resource_fetcher=self._tool_resource_resolver,
)
self._server_tool_executor = _executor
else:
self._server_tool_executor = execute_server_tool
logger.info(f"DuplexPipeline initialized for session {session_id}")
def set_event_sequence_provider(self, provider: Callable[[], int]) -> None:
@@ -559,6 +583,7 @@ class DuplexPipeline:
base_url=llm_base_url,
model=llm_model,
knowledge_config=self._resolved_knowledge_config(),
knowledge_searcher=self._knowledge_searcher,
)
else:
logger.warning("LLM provider unsupported or API key missing - using mock LLM")
@@ -1491,7 +1516,7 @@ class DuplexPipeline:
try:
result = await asyncio.wait_for(
execute_server_tool(call),
self._server_tool_executor(call),
timeout=self._SERVER_TOOL_TIMEOUT_SECONDS,
)
except asyncio.TimeoutError:

244
core/history_bridge.py Normal file
View File

@@ -0,0 +1,244 @@
"""Async history bridge for non-blocking transcript persistence."""
from __future__ import annotations
import asyncio
import time
from dataclasses import dataclass
from typing import Any, Optional
from loguru import logger
@dataclass
class _HistoryTranscriptJob:
call_id: str
turn_index: int
speaker: str
content: str
start_ms: int
end_ms: int
duration_ms: int
class SessionHistoryBridge:
"""Session-scoped buffered history writer with background retries."""
_STOP_SENTINEL = object()
def __init__(
self,
*,
history_writer: Any,
enabled: bool,
queue_max_size: int,
retry_max_attempts: int,
retry_backoff_sec: float,
finalize_drain_timeout_sec: float,
):
self._history_writer = history_writer
self._enabled = bool(enabled and history_writer is not None)
self._queue_max_size = max(1, int(queue_max_size))
self._retry_max_attempts = max(0, int(retry_max_attempts))
self._retry_backoff_sec = max(0.0, float(retry_backoff_sec))
self._finalize_drain_timeout_sec = max(0.0, float(finalize_drain_timeout_sec))
self._call_id: Optional[str] = None
self._turn_index: int = 0
self._started_mono: Optional[float] = None
self._finalized: bool = False
self._worker_task: Optional[asyncio.Task] = None
self._finalize_lock = asyncio.Lock()
self._queue: asyncio.Queue[_HistoryTranscriptJob | object] = asyncio.Queue(maxsize=self._queue_max_size)
@property
def enabled(self) -> bool:
return self._enabled
@property
def call_id(self) -> Optional[str]:
return self._call_id
async def start_call(
self,
*,
user_id: int,
assistant_id: Optional[str],
source: str,
) -> Optional[str]:
"""Create remote call record and start background worker."""
if not self._enabled or self._call_id:
return self._call_id
call_id = await self._history_writer.create_call_record(
user_id=user_id,
assistant_id=assistant_id,
source=source,
)
if not call_id:
return None
self._call_id = str(call_id)
self._turn_index = 0
self._finalized = False
self._started_mono = time.monotonic()
self._ensure_worker()
return self._call_id
def elapsed_ms(self) -> int:
if self._started_mono is None:
return 0
return max(0, int((time.monotonic() - self._started_mono) * 1000))
def enqueue_turn(self, *, role: str, text: str) -> bool:
"""Queue one transcript write without blocking the caller."""
if not self._enabled or not self._call_id or self._finalized:
return False
content = str(text or "").strip()
if not content:
return False
speaker = "human" if str(role or "").strip().lower() == "user" else "ai"
end_ms = self.elapsed_ms()
estimated_duration_ms = max(300, min(12000, len(content) * 80))
start_ms = max(0, end_ms - estimated_duration_ms)
job = _HistoryTranscriptJob(
call_id=self._call_id,
turn_index=self._turn_index,
speaker=speaker,
content=content,
start_ms=start_ms,
end_ms=end_ms,
duration_ms=max(1, end_ms - start_ms),
)
self._turn_index += 1
self._ensure_worker()
try:
self._queue.put_nowait(job)
return True
except asyncio.QueueFull:
logger.warning(
"History queue full; dropping transcript call_id={} turn={}",
self._call_id,
job.turn_index,
)
return False
async def finalize(self, *, status: str) -> bool:
"""Finalize history record once; waits briefly for queue drain."""
if not self._enabled or not self._call_id:
return False
async with self._finalize_lock:
if self._finalized:
return True
await self._drain_queue()
ok = await self._history_writer.finalize_call_record(
call_id=self._call_id,
status=status,
duration_seconds=self.duration_seconds(),
)
if ok:
self._finalized = True
await self._stop_worker()
return ok
async def shutdown(self) -> None:
"""Stop worker task and release queue resources."""
await self._stop_worker()
def duration_seconds(self) -> int:
if self._started_mono is None:
return 0
return max(0, int(time.monotonic() - self._started_mono))
def _ensure_worker(self) -> None:
if self._worker_task and not self._worker_task.done():
return
self._worker_task = asyncio.create_task(self._worker_loop())
async def _drain_queue(self) -> None:
if self._finalize_drain_timeout_sec <= 0:
return
try:
await asyncio.wait_for(self._queue.join(), timeout=self._finalize_drain_timeout_sec)
except asyncio.TimeoutError:
logger.warning("History queue drain timed out after {}s", self._finalize_drain_timeout_sec)
async def _stop_worker(self) -> None:
task = self._worker_task
if not task:
return
if task.done():
self._worker_task = None
return
sent = False
try:
self._queue.put_nowait(self._STOP_SENTINEL)
sent = True
except asyncio.QueueFull:
pass
if not sent:
try:
await asyncio.wait_for(self._queue.put(self._STOP_SENTINEL), timeout=0.5)
except asyncio.TimeoutError:
task.cancel()
try:
await asyncio.wait_for(task, timeout=1.5)
except asyncio.TimeoutError:
task.cancel()
try:
await task
except Exception:
pass
except asyncio.CancelledError:
pass
finally:
self._worker_task = None
async def _worker_loop(self) -> None:
while True:
item = await self._queue.get()
try:
if item is self._STOP_SENTINEL:
return
assert isinstance(item, _HistoryTranscriptJob)
await self._write_with_retry(item)
except Exception as exc:
logger.warning("History worker write failed unexpectedly: {}", exc)
finally:
self._queue.task_done()
async def _write_with_retry(self, job: _HistoryTranscriptJob) -> bool:
for attempt in range(self._retry_max_attempts + 1):
ok = await self._history_writer.add_transcript(
call_id=job.call_id,
turn_index=job.turn_index,
speaker=job.speaker,
content=job.content,
start_ms=job.start_ms,
end_ms=job.end_ms,
duration_ms=job.duration_ms,
)
if ok:
return True
if attempt >= self._retry_max_attempts:
logger.warning(
"History write dropped after retries call_id={} turn={}",
job.call_id,
job.turn_index,
)
return False
if self._retry_backoff_sec > 0:
await asyncio.sleep(self._retry_backoff_sec * (2**attempt))
return False

17
core/ports/__init__.py Normal file
View File

@@ -0,0 +1,17 @@
"""Port interfaces for engine-side integration boundaries."""
from core.ports.backend import (
AssistantConfigProvider,
BackendGateway,
HistoryWriter,
KnowledgeSearcher,
ToolResourceResolver,
)
__all__ = [
"AssistantConfigProvider",
"BackendGateway",
"HistoryWriter",
"KnowledgeSearcher",
"ToolResourceResolver",
]

84
core/ports/backend.py Normal file
View File

@@ -0,0 +1,84 @@
"""Backend integration ports.
These interfaces define the boundary between engine runtime logic and
backend-side capabilities (config lookup, history persistence, retrieval,
and tool resource discovery).
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional, Protocol
class AssistantConfigProvider(Protocol):
"""Port for loading trusted assistant runtime configuration."""
async def fetch_assistant_config(self, assistant_id: str) -> Optional[Dict[str, Any]]:
"""Fetch assistant configuration payload."""
class HistoryWriter(Protocol):
"""Port for persisting call and transcript history."""
async def create_call_record(
self,
*,
user_id: int,
assistant_id: Optional[str],
source: str = "debug",
) -> Optional[str]:
"""Create a call record and return backend call ID."""
async def add_transcript(
self,
*,
call_id: str,
turn_index: int,
speaker: str,
content: str,
start_ms: int,
end_ms: int,
confidence: Optional[float] = None,
duration_ms: Optional[int] = None,
) -> bool:
"""Append one transcript turn segment."""
async def finalize_call_record(
self,
*,
call_id: str,
status: str,
duration_seconds: int,
) -> bool:
"""Finalize a call record."""
class KnowledgeSearcher(Protocol):
"""Port for RAG / knowledge retrieval operations."""
async def search_knowledge_context(
self,
*,
kb_id: str,
query: str,
n_results: int = 5,
) -> List[Dict[str, Any]]:
"""Search a knowledge source and return ranked snippets."""
class ToolResourceResolver(Protocol):
"""Port for resolving tool metadata/configuration."""
async def fetch_tool_resource(self, tool_id: str) -> Optional[Dict[str, Any]]:
"""Fetch tool resource configuration."""
class BackendGateway(
AssistantConfigProvider,
HistoryWriter,
KnowledgeSearcher,
ToolResourceResolver,
Protocol,
):
"""Composite backend gateway interface used by engine services."""

View File

@@ -9,15 +9,11 @@ from enum import Enum
from typing import Optional, Dict, Any, List
from loguru import logger
from app.backend_client import (
fetch_assistant_config,
create_history_call_record,
add_history_transcript,
finalize_history_call_record,
)
from app.backend_adapters import build_backend_adapter_from_settings
from core.transports import BaseTransport
from core.duplex_pipeline import DuplexPipeline
from core.conversation import ConversationTurn
from core.history_bridge import SessionHistoryBridge
from core.workflow_runner import WorkflowRunner, WorkflowTransition, WorkflowNodeDef, WorkflowEdgeDef
from app.config import settings
from services.base import LLMMessage
@@ -76,7 +72,13 @@ class Session:
"config_version_id",
}
def __init__(self, session_id: str, transport: BaseTransport, use_duplex: bool = None):
def __init__(
self,
session_id: str,
transport: BaseTransport,
use_duplex: bool = None,
backend_gateway: Optional[Any] = None,
):
"""
Initialize session.
@@ -88,12 +90,23 @@ class Session:
self.id = session_id
self.transport = transport
self.use_duplex = use_duplex if use_duplex is not None else settings.duplex_enabled
self._backend_gateway = backend_gateway or build_backend_adapter_from_settings()
self._history_bridge = SessionHistoryBridge(
history_writer=self._backend_gateway,
enabled=settings.history_enabled,
queue_max_size=settings.history_queue_max_size,
retry_max_attempts=settings.history_retry_max_attempts,
retry_backoff_sec=settings.history_retry_backoff_sec,
finalize_drain_timeout_sec=settings.history_finalize_drain_timeout_sec,
)
self.pipeline = DuplexPipeline(
transport=transport,
session_id=session_id,
system_prompt=settings.duplex_system_prompt,
greeting=settings.duplex_greeting
greeting=settings.duplex_greeting,
knowledge_searcher=getattr(self._backend_gateway, "search_knowledge_context", None),
tool_resource_resolver=getattr(self._backend_gateway, "fetch_tool_resource", None),
)
# Session state
@@ -107,10 +120,6 @@ class Session:
# Track IDs
self.current_track_id: str = self.TRACK_CONTROL
self._event_seq: int = 0
self._history_call_id: Optional[str] = None
self._history_turn_index: int = 0
self._history_call_started_mono: Optional[float] = None
self._history_finalized: bool = False
self._cleanup_lock = asyncio.Lock()
self._cleaned_up = False
self.workflow_runner: Optional[WorkflowRunner] = None
@@ -424,11 +433,12 @@ class Session:
logger.info(f"Session {self.id} cleaning up")
await self._finalize_history(status="connected")
await self.pipeline.cleanup()
await self._history_bridge.shutdown()
await self.transport.close()
async def _start_history_bridge(self, metadata: Dict[str, Any]) -> None:
"""Initialize backend history call record for this session."""
if self._history_call_id:
if self._history_bridge.call_id:
return
history_meta: Dict[str, Any] = {}
@@ -444,7 +454,7 @@ class Session:
assistant_id = history_meta.get("assistantId", metadata.get("assistantId"))
source = str(history_meta.get("source", metadata.get("source", "debug")))
call_id = await create_history_call_record(
call_id = await self._history_bridge.start_call(
user_id=user_id,
assistant_id=str(assistant_id) if assistant_id else None,
source=source,
@@ -452,10 +462,6 @@ class Session:
if not call_id:
return
self._history_call_id = call_id
self._history_call_started_mono = time.monotonic()
self._history_turn_index = 0
self._history_finalized = False
logger.info(f"Session {self.id} history bridge enabled (call_id={call_id}, source={source})")
async def _on_turn_complete(self, turn: ConversationTurn) -> None:
@@ -467,48 +473,11 @@ class Session:
elif role == "assistant":
await self._maybe_advance_workflow(turn.text.strip())
if not self._history_call_id:
return
if not turn.text or not turn.text.strip():
return
role = (turn.role or "").lower()
speaker = "human" if role == "user" else "ai"
end_ms = 0
if self._history_call_started_mono is not None:
end_ms = max(0, int((time.monotonic() - self._history_call_started_mono) * 1000))
estimated_duration_ms = max(300, min(12000, len(turn.text.strip()) * 80))
start_ms = max(0, end_ms - estimated_duration_ms)
turn_index = self._history_turn_index
await add_history_transcript(
call_id=self._history_call_id,
turn_index=turn_index,
speaker=speaker,
content=turn.text.strip(),
start_ms=start_ms,
end_ms=end_ms,
duration_ms=max(1, end_ms - start_ms),
)
self._history_turn_index += 1
self._history_bridge.enqueue_turn(role=turn.role or "", text=turn.text or "")
async def _finalize_history(self, status: str) -> None:
"""Finalize history call record once."""
if not self._history_call_id or self._history_finalized:
return
duration_seconds = 0
if self._history_call_started_mono is not None:
duration_seconds = max(0, int(time.monotonic() - self._history_call_started_mono))
ok = await finalize_history_call_record(
call_id=self._history_call_id,
status=status,
duration_seconds=duration_seconds,
)
if ok:
self._history_finalized = True
await self._history_bridge.finalize(status=status)
def _bootstrap_workflow(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
"""Parse workflow payload and return initial runtime overrides."""
@@ -795,10 +764,12 @@ class Session:
)
if assistant_id is None:
return {}
if not settings.backend_url:
provider = getattr(self._backend_gateway, "fetch_assistant_config", None)
if not callable(provider):
return {}
payload = await fetch_assistant_config(str(assistant_id).strip())
payload = await provider(str(assistant_id).strip())
if not isinstance(payload, dict):
return {}

View File

@@ -4,11 +4,13 @@ import asyncio
import ast
import operator
from datetime import datetime
from typing import Any, Dict
from typing import Any, Awaitable, Callable, Dict, Optional
import aiohttp
from app.backend_client import fetch_tool_resource
from app.backend_adapters import build_backend_adapter_from_settings
ToolResourceFetcher = Callable[[str], Awaitable[Optional[Dict[str, Any]]]]
_BIN_OPS = {
ast.Add: operator.add,
@@ -170,11 +172,21 @@ def _extract_tool_args(tool_call: Dict[str, Any]) -> Dict[str, Any]:
return {}
async def execute_server_tool(tool_call: Dict[str, Any]) -> Dict[str, Any]:
async def fetch_tool_resource(tool_id: str) -> Optional[Dict[str, Any]]:
"""Default tool resource resolver via backend adapter."""
adapter = build_backend_adapter_from_settings()
return await adapter.fetch_tool_resource(tool_id)
async def execute_server_tool(
tool_call: Dict[str, Any],
tool_resource_fetcher: Optional[ToolResourceFetcher] = None,
) -> Dict[str, Any]:
"""Execute a server-side tool and return normalized result payload."""
call_id = str(tool_call.get("id") or "").strip()
tool_name = _extract_tool_name(tool_call)
args = _extract_tool_args(tool_call)
resource_fetcher = tool_resource_fetcher or fetch_tool_resource
if tool_name == "calculator":
expression = str(args.get("expression") or "").strip()
@@ -257,7 +269,7 @@ async def execute_server_tool(tool_call: Dict[str, Any]) -> Dict[str, Any]:
}
if tool_name and tool_name not in {"calculator", "code_interpreter", "current_time"}:
resource = await fetch_tool_resource(tool_name)
resource = await resource_fetcher(tool_name)
if resource and str(resource.get("category") or "") == "query":
method = str(resource.get("http_method") or "GET").strip().upper()
if method not in {"GET", "POST", "PUT", "PATCH", "DELETE"}: