Add presence probe configuration to Assistant model and API. Introduce new fields for enabling presence probes, idle and cooldown durations, maximum prompts, context inclusion, and custom questions. Update schemas, routers, and frontend components to support these features, along with corresponding tests to ensure functionality.

This commit is contained in:
Xin Wang
2026-02-28 15:47:53 +08:00
parent 0821d73e7c
commit 8f1317860f
11 changed files with 1006 additions and 3 deletions

View File

@@ -74,6 +74,25 @@ class DuplexPipeline:
_LLM_DELTA_THROTTLE_MS = 80
_ASR_CAPTURE_MAX_MS = 15000
_OPENER_PRE_ROLL_MS = 180
_PRESENCE_PROBE_LOOP_INTERVAL_SECONDS = 1.0
_PRESENCE_PROBE_MIN_IDLE_SECONDS = 5
_PRESENCE_PROBE_MAX_IDLE_SECONDS = 3600
_PRESENCE_PROBE_DEFAULT_IDLE_SECONDS = 30
_PRESENCE_PROBE_MIN_COOLDOWN_SECONDS = 5
_PRESENCE_PROBE_MAX_COOLDOWN_SECONDS = 7200
_PRESENCE_PROBE_DEFAULT_COOLDOWN_SECONDS = 90
_PRESENCE_PROBE_DEFAULT_MAX_PROMPTS = 2
_PRESENCE_PROBE_MAX_PROMPTS_CAP = 10
_PRESENCE_PROBE_CONTEXT_CHARS = 36
_PRESENCE_PROBE_AWAY_VALUE_HINTS = (
"away",
"later",
"not_now",
"leave",
"离开",
"稍后",
"不在",
)
_DEFAULT_TOOL_SCHEMAS: Dict[str, Dict[str, Any]] = {
"current_time": {
"name": "current_time",
@@ -168,6 +187,23 @@ class DuplexPipeline:
"required": ["msg"],
},
},
"choice_prompt": {
"name": "choice_prompt",
"description": "Show a multiple-choice prompt dialog on client side",
"parameters": {
"type": "object",
"properties": {
"question": {"type": "string", "description": "Question text to ask"},
"options": {
"type": "array",
"description": "Selectable options",
"items": {"type": "string"},
"minItems": 2,
},
},
"required": ["question", "options"],
},
},
}
_DEFAULT_CLIENT_EXECUTORS = frozenset({
"turn_on_camera",
@@ -176,6 +212,7 @@ class DuplexPipeline:
"decrease_volume",
"voice_message_prompt",
"text_msg_prompt",
"choice_prompt",
})
def __init__(
@@ -308,6 +345,16 @@ class DuplexPipeline:
self._runtime_barge_in_min_duration_ms: Optional[int] = None
self._runtime_knowledge: Dict[str, Any] = {}
self._runtime_knowledge_base_id: Optional[str] = None
self._runtime_presence_probe: Dict[str, Any] = {
"enabled": False,
"idleSeconds": self._PRESENCE_PROBE_DEFAULT_IDLE_SECONDS,
"cooldownSeconds": self._PRESENCE_PROBE_DEFAULT_COOLDOWN_SECONDS,
"maxPrompts": self._PRESENCE_PROBE_DEFAULT_MAX_PROMPTS,
"includeContext": True,
"question": "",
"waitForResponse": True,
"options": self._default_presence_probe_options(),
}
raw_default_tools = settings.tools if isinstance(settings.tools, list) else []
self._runtime_tools: List[Any] = list(raw_default_tools)
self._runtime_tool_executor: Dict[str, str] = {}
@@ -333,6 +380,13 @@ class DuplexPipeline:
self._current_tts_id: Optional[str] = None
self._pending_llm_delta: str = ""
self._last_llm_delta_emit_ms: float = 0.0
now_ms = time.monotonic() * 1000.0
self._last_user_activity_ms: float = now_ms
self._last_presence_probe_ms: float = 0.0
self._presence_probe_attempts: int = 0
self._presence_probe_seq: int = 0
self._active_presence_probe_call_id: Optional[str] = None
self._presence_probe_task: Optional[asyncio.Task] = None
self._runtime_tool_executor = self._resolved_tool_executor_map()
self._runtime_tool_default_args = self._resolved_tool_default_args_map()
@@ -432,7 +486,10 @@ class DuplexPipeline:
opener_audio = metadata.get("openerAudio")
if isinstance(opener_audio, dict):
self._runtime_opener_audio = dict(opener_audio)
kb_id = str(knowledge.get("kbId") or knowledge.get("knowledgeBaseId") or "").strip()
knowledge_payload = knowledge if isinstance(knowledge, dict) else {}
kb_id = str(
knowledge_payload.get("kbId") or knowledge_payload.get("knowledgeBaseId") or ""
).strip()
if kb_id:
self._runtime_knowledge_base_id = kb_id
@@ -452,10 +509,20 @@ class DuplexPipeline:
self._runtime_tool_display_names = {}
self._runtime_tool_wait_for_response = {}
if "presenceProbe" in metadata or "presence_probe" in metadata:
raw_presence_probe = metadata.get("presenceProbe")
if raw_presence_probe is None and "presence_probe" in metadata:
raw_presence_probe = metadata.get("presence_probe")
self._runtime_presence_probe = self._resolved_presence_probe_config(raw_presence_probe)
self._presence_probe_attempts = 0
self._last_presence_probe_ms = 0.0
self._active_presence_probe_call_id = None
if self.llm_service and hasattr(self.llm_service, "set_knowledge_config"):
self.llm_service.set_knowledge_config(self._resolved_knowledge_config())
if self.llm_service and hasattr(self.llm_service, "set_tool_schemas"):
self.llm_service.set_tool_schemas(self._resolved_tool_schemas())
self._refresh_presence_probe_task()
def resolved_runtime_config(self) -> Dict[str, Any]:
"""Return current effective runtime configuration without secrets."""
@@ -505,6 +572,14 @@ class DuplexPipeline:
"tools": {
"allowlist": self._resolved_tool_allowlist(),
},
"presenceProbe": {
"enabled": bool(self._runtime_presence_probe.get("enabled")),
"idleSeconds": float(self._runtime_presence_probe.get("idleSeconds") or 0),
"cooldownSeconds": float(self._runtime_presence_probe.get("cooldownSeconds") or 0),
"maxPrompts": int(self._runtime_presence_probe.get("maxPrompts") or 0),
"includeContext": bool(self._runtime_presence_probe.get("includeContext", True)),
"waitForResponse": bool(self._runtime_presence_probe.get("waitForResponse", True)),
},
"tracks": {
"audio_in": self.track_audio_in,
"audio_out": self.track_audio_out,
@@ -710,6 +785,308 @@ class DuplexPipeline:
chunk_ms = max(1, settings.chunk_size_ms)
return max(1, int(np.ceil(self._barge_in_silence_tolerance_ms / chunk_ms)))
def _default_presence_probe_options(self) -> List[Dict[str, str]]:
return [
{"id": "still_here", "label": "我在,继续", "value": "continue"},
{"id": "away", "label": "我先离开", "value": "away"},
]
def _normalize_presence_probe_options(self, raw_options: Any) -> List[Dict[str, str]]:
if not isinstance(raw_options, list):
return self._default_presence_probe_options()
normalized: List[Dict[str, str]] = []
used_ids: set[str] = set()
for index, raw in enumerate(raw_options):
option_id = f"opt_{index + 1}"
label = ""
value = ""
if isinstance(raw, (str, int, float, bool)):
label = str(raw).strip()
value = label
elif isinstance(raw, dict):
label = str(raw.get("label") or raw.get("text") or raw.get("name") or "").strip()
option_id = str(raw.get("id") or option_id).strip() or option_id
value_candidate = raw.get("value")
if value_candidate is None:
value = label
else:
value = str(value_candidate)
if not label:
continue
if option_id in used_ids:
suffix = 2
while f"{option_id}_{suffix}" in used_ids:
suffix += 1
option_id = f"{option_id}_{suffix}"
used_ids.add(option_id)
normalized.append({"id": option_id, "label": label, "value": value})
if len(normalized) < 2:
return self._default_presence_probe_options()
return normalized
def _coerce_bounded_float(
self,
raw_value: Any,
*,
default_value: float,
min_value: float,
max_value: float,
) -> float:
if isinstance(raw_value, (int, float)):
parsed = float(raw_value)
elif isinstance(raw_value, str):
try:
parsed = float(raw_value.strip())
except ValueError:
parsed = default_value
else:
parsed = default_value
if parsed < min_value:
return min_value
if parsed > max_value:
return max_value
return parsed
def _coerce_bounded_int(
self,
raw_value: Any,
*,
default_value: int,
min_value: int,
max_value: int,
) -> int:
if isinstance(raw_value, (int, float)):
parsed = int(raw_value)
elif isinstance(raw_value, str):
try:
parsed = int(raw_value.strip())
except ValueError:
parsed = default_value
else:
parsed = default_value
if parsed < min_value:
return min_value
if parsed > max_value:
return max_value
return parsed
def _resolved_presence_probe_config(self, raw_config: Any) -> Dict[str, Any]:
default_options = self._default_presence_probe_options()
default_config: Dict[str, Any] = {
"enabled": False,
"idleSeconds": float(self._PRESENCE_PROBE_DEFAULT_IDLE_SECONDS),
"cooldownSeconds": float(self._PRESENCE_PROBE_DEFAULT_COOLDOWN_SECONDS),
"maxPrompts": int(self._PRESENCE_PROBE_DEFAULT_MAX_PROMPTS),
"includeContext": True,
"question": "",
"waitForResponse": True,
"options": default_options,
}
if not isinstance(raw_config, dict):
return default_config
enabled = self._coerce_bool(raw_config.get("enabled"))
idle_seconds = self._coerce_bounded_float(
raw_config.get("idleSeconds", raw_config.get("idle_seconds")),
default_value=float(self._PRESENCE_PROBE_DEFAULT_IDLE_SECONDS),
min_value=float(self._PRESENCE_PROBE_MIN_IDLE_SECONDS),
max_value=float(self._PRESENCE_PROBE_MAX_IDLE_SECONDS),
)
cooldown_seconds = self._coerce_bounded_float(
raw_config.get("cooldownSeconds", raw_config.get("cooldown_seconds")),
default_value=float(self._PRESENCE_PROBE_DEFAULT_COOLDOWN_SECONDS),
min_value=float(self._PRESENCE_PROBE_MIN_COOLDOWN_SECONDS),
max_value=float(self._PRESENCE_PROBE_MAX_COOLDOWN_SECONDS),
)
max_prompts = self._coerce_bounded_int(
raw_config.get("maxPrompts", raw_config.get("max_prompts")),
default_value=self._PRESENCE_PROBE_DEFAULT_MAX_PROMPTS,
min_value=1,
max_value=self._PRESENCE_PROBE_MAX_PROMPTS_CAP,
)
include_context = self._coerce_bool(
raw_config.get("includeContext", raw_config.get("include_context"))
)
wait_for_response = self._coerce_bool(
raw_config.get("waitForResponse", raw_config.get("wait_for_response"))
)
question = str(raw_config.get("question") or "").strip()
if len(question) > 160:
question = question[:160]
resolved = dict(default_config)
resolved["enabled"] = bool(enabled) if enabled is not None else False
resolved["idleSeconds"] = idle_seconds
resolved["cooldownSeconds"] = cooldown_seconds
resolved["maxPrompts"] = max_prompts
resolved["includeContext"] = include_context if include_context is not None else True
resolved["waitForResponse"] = wait_for_response if wait_for_response is not None else True
resolved["question"] = question
resolved["options"] = self._normalize_presence_probe_options(raw_config.get("options"))
return resolved
def _presence_probe_enabled(self) -> bool:
return bool(self._runtime_presence_probe.get("enabled"))
def _presence_probe_idle_ms(self) -> float:
return max(1000.0, float(self._runtime_presence_probe.get("idleSeconds") or 0.0) * 1000.0)
def _presence_probe_cooldown_ms(self) -> float:
return max(1000.0, float(self._runtime_presence_probe.get("cooldownSeconds") or 0.0) * 1000.0)
def _presence_probe_max_prompts(self) -> int:
return max(1, int(self._runtime_presence_probe.get("maxPrompts") or 1))
def _presence_probe_wait_for_response(self) -> bool:
return bool(self._runtime_presence_probe.get("waitForResponse", True))
def _touch_user_activity(self) -> None:
self._last_user_activity_ms = time.monotonic() * 1000.0
self._active_presence_probe_call_id = None
def _presence_probe_in_progress(self) -> bool:
return bool(self._active_presence_probe_call_id)
def _has_active_turn(self) -> bool:
return bool(self._current_turn_task and not self._current_turn_task.done())
def _presence_probe_due(self, now_ms: float) -> bool:
if not self._presence_probe_enabled():
return False
if self._presence_probe_attempts >= self._presence_probe_max_prompts():
return False
if self._presence_probe_in_progress():
return False
if self._has_active_turn():
return False
if self._is_bot_speaking or self._interrupt_event.is_set():
return False
if self.conversation.state != ConversationState.IDLE:
return False
if self._pending_client_tool_call_ids:
return False
if self.conversation.turn_count <= 0:
return False
if now_ms - self._last_user_activity_ms < self._presence_probe_idle_ms():
return False
if self._last_presence_probe_ms > 0.0 and now_ms - self._last_presence_probe_ms < self._presence_probe_cooldown_ms():
return False
return True
def _clip_presence_context(self, text: str) -> str:
compact = " ".join(str(text or "").strip().split())
if not compact:
return ""
if len(compact) <= self._PRESENCE_PROBE_CONTEXT_CHARS:
return compact
return f"{compact[:self._PRESENCE_PROBE_CONTEXT_CHARS]}..."
def _build_presence_probe_question(self) -> str:
manual_question = str(self._runtime_presence_probe.get("question") or "").strip()
if manual_question:
return manual_question
include_context = bool(self._runtime_presence_probe.get("includeContext", True))
if include_context:
last_assistant = self._clip_presence_context(self.conversation.last_assistant_text or "")
if last_assistant:
return f"我们刚聊到“{last_assistant}”,你还在吗?"
last_user = self._clip_presence_context(self.conversation.last_user_text or "")
if last_user:
return f"关于你刚才说的“{last_user}”,你还在吗?"
return "我还在这边,你还在线吗?"
async def _run_presence_probe_once(self, now_ms: Optional[float] = None) -> bool:
current_ms = now_ms if now_ms is not None else (time.monotonic() * 1000.0)
if not self._presence_probe_due(current_ms):
return False
probe_id = self._new_id("presence_probe", self._presence_probe_seq + 1)
self._presence_probe_seq += 1
self._active_presence_probe_call_id = probe_id
self._last_presence_probe_ms = current_ms
self._presence_probe_attempts += 1
question = self._build_presence_probe_question()
probe_turn_id = self._start_turn()
probe_response_id = self._start_response()
try:
await self._send_event(
{
**ev(
"assistant.response.final",
trackId=self.track_audio_out,
text=question,
),
"turn_id": probe_turn_id,
"response_id": probe_response_id,
},
priority=20,
)
if self._tts_output_enabled():
await self._speak(question, audio_event_priority=30)
logger.info(
"[PresenceProbe] sent probe_id={} idle_ms={} question={}",
probe_id,
int(max(0.0, current_ms - self._last_user_activity_ms)),
question,
)
return True
finally:
self._active_presence_probe_call_id = None
self._current_response_id = None
self._current_tts_id = None
async def _presence_probe_loop(self) -> None:
try:
while self._running:
if not self._presence_probe_enabled():
await asyncio.sleep(self._PRESENCE_PROBE_LOOP_INTERVAL_SECONDS)
continue
try:
await self._run_presence_probe_once()
except asyncio.CancelledError:
raise
except Exception as exc:
logger.warning(f"Presence probe iteration failed: {exc}")
await asyncio.sleep(self._PRESENCE_PROBE_LOOP_INTERVAL_SECONDS)
except asyncio.CancelledError:
logger.debug("Presence probe loop cancelled")
raise
def _refresh_presence_probe_task(self) -> None:
task = self._presence_probe_task
if not self._running or not self._presence_probe_enabled():
if task and not task.done():
task.cancel()
self._presence_probe_task = None
return
if task and not task.done():
return
try:
asyncio.get_running_loop()
except RuntimeError:
return
self._presence_probe_task = asyncio.create_task(
self._presence_probe_loop(),
name=f"presence_probe_{self.session_id}",
)
async def _shutdown_presence_probe_task(self) -> None:
task = self._presence_probe_task
self._presence_probe_task = None
if not task or task.done():
return
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
async def _generate_runtime_greeting(self) -> Optional[str]:
if not self.llm_service:
return None
@@ -871,6 +1248,8 @@ class DuplexPipeline:
logger.info("DuplexPipeline services connected")
if not self._outbound_task or self._outbound_task.done():
self._outbound_task = asyncio.create_task(self._outbound_loop())
self._touch_user_activity()
self._refresh_presence_probe_task()
except Exception as e:
logger.error(f"Failed to start pipeline: {e}")
@@ -1182,6 +1561,8 @@ class DuplexPipeline:
# 2. Check for barge-in (user speaking while bot speaking)
# Filter false interruptions by requiring minimum speech duration
if vad_status == "Speech":
self._touch_user_activity()
if self._is_bot_speaking and self._barge_in_enabled():
if vad_status == "Speech":
# User is speaking while bot is speaking
@@ -1279,6 +1660,7 @@ class DuplexPipeline:
return
logger.info(f"Processing text input: {text[:50]}...")
self._touch_user_activity()
# Cancel any current speaking
await self._stop_current_speech()
@@ -1312,6 +1694,8 @@ class DuplexPipeline:
self._last_sent_transcript = text
if is_final:
if text.strip():
self._touch_user_activity()
self._pending_transcript_delta = ""
self._last_transcript_delta_emit_ms = 0.0
await self._send_event(
@@ -1433,6 +1817,7 @@ class DuplexPipeline:
return
logger.info(f"[EOU] Detected - user said: {user_text[:100]}...")
self._touch_user_activity()
self._finalize_utterance()
# For ASR backends that already emitted final via callback,
@@ -2364,6 +2749,7 @@ class DuplexPipeline:
return
logger.info("Barge-in detected - interrupting bot speech")
self._touch_user_activity()
# Reset barge-in tracking
self._barge_in_speech_start_time = None
@@ -2438,6 +2824,7 @@ class DuplexPipeline:
logger.info(f"Cleaning up DuplexPipeline for session {self.session_id}")
self._running = False
await self._shutdown_presence_probe_task()
await self._stop_current_speech()
if self._outbound_task and not self._outbound_task.done():
await self._enqueue_outbound("stop", None, priority=-1000)