Save text records to db

This commit is contained in:
Xin Wang
2026-02-09 14:10:19 +08:00
parent d4b645568e
commit 97984014e8
4 changed files with 213 additions and 2 deletions

View File

@@ -1,4 +1,4 @@
"""Backend API client for assistant config.""" """Backend API client for assistant config and history persistence."""
from __future__ import annotations from __future__ import annotations
@@ -42,3 +42,107 @@ async def fetch_assistant_config(assistant_id: str) -> Optional[Dict[str, Any]]:
logger.warning(f"Failed to fetch assistant config ({assistant_id}): {exc}") logger.warning(f"Failed to fetch assistant config ({assistant_id}): {exc}")
return None return None
def _backend_base_url() -> Optional[str]:
if not settings.backend_url:
return None
return settings.backend_url.rstrip("/")
def _timeout() -> aiohttp.ClientTimeout:
return aiohttp.ClientTimeout(total=settings.backend_timeout_sec)
async def create_history_call_record(
*,
user_id: int,
assistant_id: Optional[str],
source: str = "debug",
) -> Optional[str]:
"""Create a call record via backend history API and return call_id."""
base_url = _backend_base_url()
if not base_url:
return None
url = f"{base_url}/api/history"
payload: Dict[str, Any] = {
"user_id": user_id,
"assistant_id": assistant_id,
"source": source,
"status": "connected",
}
try:
async with aiohttp.ClientSession(timeout=_timeout()) as session:
async with session.post(url, json=payload) as resp:
resp.raise_for_status()
data = await resp.json()
call_id = str((data or {}).get("id") or "")
return call_id or None
except Exception as exc:
logger.warning(f"Failed to create history call record: {exc}")
return None
async def add_history_transcript(
*,
call_id: str,
turn_index: int,
speaker: str,
content: str,
start_ms: int,
end_ms: int,
confidence: Optional[float] = None,
duration_ms: Optional[int] = None,
) -> bool:
"""Append a transcript segment to backend history."""
base_url = _backend_base_url()
if not base_url or not call_id:
return False
url = f"{base_url}/api/history/{call_id}/transcripts"
payload: Dict[str, Any] = {
"turn_index": turn_index,
"speaker": speaker,
"content": content,
"confidence": confidence,
"start_ms": start_ms,
"end_ms": end_ms,
"duration_ms": duration_ms,
}
try:
async with aiohttp.ClientSession(timeout=_timeout()) as session:
async with session.post(url, json=payload) as resp:
resp.raise_for_status()
return True
except Exception as exc:
logger.warning(f"Failed to append history transcript (call_id={call_id}, turn={turn_index}): {exc}")
return False
async def finalize_history_call_record(
*,
call_id: str,
status: str,
duration_seconds: int,
) -> bool:
"""Finalize a call record with status and duration."""
base_url = _backend_base_url()
if not base_url or not call_id:
return False
url = f"{base_url}/api/history/{call_id}"
payload: Dict[str, Any] = {
"status": status,
"duration_seconds": duration_seconds,
}
try:
async with aiohttp.ClientSession(timeout=_timeout()) as session:
async with session.put(url, json=payload) as resp:
resp.raise_for_status()
return True
except Exception as exc:
logger.warning(f"Failed to finalize history call record ({call_id}): {exc}")
return False

View File

@@ -91,6 +91,11 @@ class Settings(BaseSettings):
ws_api_key: Optional[str] = Field(default=None, description="Optional API key required for WS hello auth") ws_api_key: Optional[str] = Field(default=None, description="Optional API key required for WS hello auth")
ws_require_auth: bool = Field(default=False, description="Require auth in hello message even when ws_api_key is not set") ws_require_auth: bool = Field(default=False, description="Require auth in hello message even when ws_api_key is not set")
# Backend bridge configuration (for call/transcript persistence)
backend_url: Optional[str] = Field(default=None, description="Backend API base URL (e.g. http://localhost:8787)")
backend_timeout_sec: int = Field(default=10, description="Backend API request timeout in seconds")
history_default_user_id: int = Field(default=1, description="Fallback user_id for history records")
@property @property
def chunk_size_bytes(self) -> int: def chunk_size_bytes(self) -> int:
"""Calculate chunk size in bytes based on sample rate and duration.""" """Calculate chunk size in bytes based on sample rate and duration."""

View File

@@ -2,12 +2,19 @@
import uuid import uuid
import json import json
import time
from enum import Enum from enum import Enum
from typing import Optional, Dict, Any from typing import Optional, Dict, Any
from loguru import logger from loguru import logger
from app.backend_client import (
create_history_call_record,
add_history_transcript,
finalize_history_call_record,
)
from core.transports import BaseTransport from core.transports import BaseTransport
from core.duplex_pipeline import DuplexPipeline from core.duplex_pipeline import DuplexPipeline
from core.conversation import ConversationTurn
from app.config import settings from app.config import settings
from models.ws_v1 import ( from models.ws_v1 import (
parse_client_message, parse_client_message,
@@ -67,6 +74,12 @@ class Session:
# Track IDs # Track IDs
self.current_track_id: Optional[str] = str(uuid.uuid4()) self.current_track_id: Optional[str] = str(uuid.uuid4())
self._history_call_id: Optional[str] = None
self._history_turn_index: int = 0
self._history_call_started_mono: Optional[float] = None
self._history_finalized: bool = False
self.pipeline.conversation.on_turn_complete(self._on_turn_complete)
logger.info(f"Session {self.id} created (duplex={self.use_duplex})") logger.info(f"Session {self.id} created (duplex={self.use_duplex})")
@@ -206,8 +219,13 @@ class Session:
await self._send_error("client", "Duplicate session.start", "protocol.order") await self._send_error("client", "Duplicate session.start", "protocol.order")
return return
metadata = message.metadata or {}
# Create history call record early so later turn callbacks can append transcripts.
await self._start_history_bridge(metadata)
# Apply runtime service/prompt overrides from backend if provided # Apply runtime service/prompt overrides from backend if provided
self.pipeline.apply_runtime_overrides(message.metadata) self.pipeline.apply_runtime_overrides(metadata)
# Start duplex pipeline # Start duplex pipeline
if not self._pipeline_started: if not self._pipeline_started:
@@ -241,6 +259,7 @@ class Session:
reason=stop_reason, reason=stop_reason,
) )
) )
await self._finalize_history(status="connected")
await self.transport.close() await self.transport.close()
async def _send_error(self, sender: str, error_message: str, code: str) -> None: async def _send_error(self, sender: str, error_message: str, code: str) -> None:
@@ -270,5 +289,83 @@ class Session:
async def cleanup(self) -> None: async def cleanup(self) -> None:
"""Cleanup session resources.""" """Cleanup session resources."""
logger.info(f"Session {self.id} cleaning up") logger.info(f"Session {self.id} cleaning up")
await self._finalize_history(status="connected")
await self.pipeline.cleanup() await self.pipeline.cleanup()
await self.transport.close() await self.transport.close()
async def _start_history_bridge(self, metadata: Dict[str, Any]) -> None:
"""Initialize backend history call record for this session."""
if self._history_call_id:
return
history_meta: Dict[str, Any] = {}
if isinstance(metadata.get("history"), dict):
history_meta = metadata["history"]
raw_user_id = history_meta.get("userId", metadata.get("userId", settings.history_default_user_id))
try:
user_id = int(raw_user_id)
except (TypeError, ValueError):
user_id = settings.history_default_user_id
assistant_id = history_meta.get("assistantId", metadata.get("assistantId"))
source = str(history_meta.get("source", metadata.get("source", "debug")))
call_id = await create_history_call_record(
user_id=user_id,
assistant_id=str(assistant_id) if assistant_id else None,
source=source,
)
if not call_id:
return
self._history_call_id = call_id
self._history_call_started_mono = time.monotonic()
self._history_turn_index = 0
self._history_finalized = False
logger.info(f"Session {self.id} history bridge enabled (call_id={call_id}, source={source})")
async def _on_turn_complete(self, turn: ConversationTurn) -> None:
"""Persist completed turns to backend call transcripts."""
if not self._history_call_id:
return
if not turn.text or not turn.text.strip():
return
role = (turn.role or "").lower()
speaker = "human" if role == "user" else "ai"
end_ms = 0
if self._history_call_started_mono is not None:
end_ms = max(0, int((time.monotonic() - self._history_call_started_mono) * 1000))
estimated_duration_ms = max(300, min(12000, len(turn.text.strip()) * 80))
start_ms = max(0, end_ms - estimated_duration_ms)
turn_index = self._history_turn_index
await add_history_transcript(
call_id=self._history_call_id,
turn_index=turn_index,
speaker=speaker,
content=turn.text.strip(),
start_ms=start_ms,
end_ms=end_ms,
duration_ms=max(1, end_ms - start_ms),
)
self._history_turn_index += 1
async def _finalize_history(self, status: str) -> None:
"""Finalize history call record once."""
if not self._history_call_id or self._history_finalized:
return
duration_seconds = 0
if self._history_call_started_mono is not None:
duration_seconds = max(0, int(time.monotonic() - self._history_call_started_mono))
ok = await finalize_history_call_record(
call_id=self._history_call_id,
status=status,
duration_seconds=duration_seconds,
)
if ok:
self._history_finalized = True

View File

@@ -1420,6 +1420,11 @@ export const DebugDrawer: React.FC<{
systemPrompt: assistant.prompt || '', systemPrompt: assistant.prompt || '',
greeting: assistant.opener || '', greeting: assistant.opener || '',
services, services,
history: {
assistantId: assistant.id,
userId: 1,
source: 'debug',
},
}, },
}; };