Enhance DuplexPipeline to track assistant activity and improve presence probe logic. Introduce a method to update assistant activity timestamp and modify presence probe conditions to consider both user and assistant activity. Update tests to validate new behavior and ensure contextual prompts are generated correctly.

This commit is contained in:
Xin Wang
2026-02-28 16:09:05 +08:00
parent 8f1317860f
commit 20abf72ded
3 changed files with 100 additions and 9 deletions

View File

@@ -382,6 +382,7 @@ class DuplexPipeline:
self._last_llm_delta_emit_ms: float = 0.0 self._last_llm_delta_emit_ms: float = 0.0
now_ms = time.monotonic() * 1000.0 now_ms = time.monotonic() * 1000.0
self._last_user_activity_ms: float = now_ms self._last_user_activity_ms: float = now_ms
self._last_assistant_activity_ms: float = now_ms
self._last_presence_probe_ms: float = 0.0 self._last_presence_probe_ms: float = 0.0
self._presence_probe_attempts: int = 0 self._presence_probe_attempts: int = 0
self._presence_probe_seq: int = 0 self._presence_probe_seq: int = 0
@@ -947,6 +948,9 @@ class DuplexPipeline:
self._last_user_activity_ms = time.monotonic() * 1000.0 self._last_user_activity_ms = time.monotonic() * 1000.0
self._active_presence_probe_call_id = None self._active_presence_probe_call_id = None
def _touch_assistant_activity(self) -> None:
self._last_assistant_activity_ms = time.monotonic() * 1000.0
def _presence_probe_in_progress(self) -> bool: def _presence_probe_in_progress(self) -> bool:
return bool(self._active_presence_probe_call_id) return bool(self._active_presence_probe_call_id)
@@ -970,7 +974,8 @@ class DuplexPipeline:
return False return False
if self.conversation.turn_count <= 0: if self.conversation.turn_count <= 0:
return False return False
if now_ms - self._last_user_activity_ms < self._presence_probe_idle_ms(): last_activity_ms = max(self._last_user_activity_ms, self._last_assistant_activity_ms)
if now_ms - last_activity_ms < self._presence_probe_idle_ms():
return False return False
if self._last_presence_probe_ms > 0.0 and now_ms - self._last_presence_probe_ms < self._presence_probe_cooldown_ms(): if self._last_presence_probe_ms > 0.0 and now_ms - self._last_presence_probe_ms < self._presence_probe_cooldown_ms():
return False return False
@@ -984,12 +989,52 @@ class DuplexPipeline:
return compact return compact
return f"{compact[:self._PRESENCE_PROBE_CONTEXT_CHARS]}..." return f"{compact[:self._PRESENCE_PROBE_CONTEXT_CHARS]}..."
def _build_presence_probe_question(self) -> str: async def _build_presence_probe_question(self) -> str:
manual_question = str(self._runtime_presence_probe.get("question") or "").strip() manual_question = str(self._runtime_presence_probe.get("question") or "").strip()
if manual_question: if manual_question:
return manual_question return manual_question
include_context = bool(self._runtime_presence_probe.get("includeContext", True)) include_context = bool(self._runtime_presence_probe.get("includeContext", True))
if include_context and self.llm_service:
last_user = self._clip_presence_context(self.conversation.last_user_text or "")
last_assistant = self._clip_presence_context(self.conversation.last_assistant_text or "")
context_lines: List[str] = []
if last_user:
context_lines.append(f"用户:{last_user}")
if last_assistant:
context_lines.append(f"助手:{last_assistant}")
context_blob = "\n".join(context_lines).strip()
try:
generated = await self.llm_service.generate(
[
LLMMessage(
role="system",
content=(
"你是语音助手。目标是在用户长时间静默时发起一次自然、礼貌、简短的在线确认。"
"请只输出一句中文问句,不要解释,不要使用引号,不要使用 markdown。"
"优先沿用最近上下文,不要机械重复固定模板。"
),
),
LLMMessage(
role="user",
content=(
"请基于以下最近对话生成一句在线确认问句不超过22个汉字\n"
f"{context_blob or '(无明显上下文)'}"
),
),
],
temperature=0.7,
max_tokens=64,
)
cleaned = str(generated or "").strip().strip('"').strip("'")
if cleaned:
cleaned = cleaned.replace("\n", " ").strip()
if len(cleaned) > 60:
cleaned = cleaned[:60]
return cleaned
except Exception as exc:
logger.warning(f"Presence probe LLM question generation failed: {exc}")
if include_context: if include_context:
last_assistant = self._clip_presence_context(self.conversation.last_assistant_text or "") last_assistant = self._clip_presence_context(self.conversation.last_assistant_text or "")
if last_assistant: if last_assistant:
@@ -1010,10 +1055,11 @@ class DuplexPipeline:
self._last_presence_probe_ms = current_ms self._last_presence_probe_ms = current_ms
self._presence_probe_attempts += 1 self._presence_probe_attempts += 1
question = self._build_presence_probe_question() question = await self._build_presence_probe_question()
probe_turn_id = self._start_turn() probe_turn_id = self._start_turn()
probe_response_id = self._start_response() probe_response_id = self._start_response()
try: try:
await self.conversation.add_assistant_turn(question)
await self._send_event( await self._send_event(
{ {
**ev( **ev(
@@ -1028,11 +1074,12 @@ class DuplexPipeline:
) )
if self._tts_output_enabled(): if self._tts_output_enabled():
await self._speak(question, audio_event_priority=30) await self._speak(question, audio_event_priority=30)
self._touch_assistant_activity()
logger.info( logger.info(
"[PresenceProbe] sent probe_id={} idle_ms={} question={}", "[PresenceProbe] sent probe_id={} idle_ms={} question={}",
probe_id, probe_id,
int(max(0.0, current_ms - self._last_user_activity_ms)), int(max(0.0, current_ms - max(self._last_user_activity_ms, self._last_assistant_activity_ms))),
question, question,
) )
return True return True
@@ -1300,6 +1347,7 @@ class DuplexPipeline:
if self._tts_output_enabled() and not used_preloaded_audio: if self._tts_output_enabled() and not used_preloaded_audio:
# Keep opener text ahead of opener voice start. # Keep opener text ahead of opener voice start.
await self._speak(greeting_to_speak, audio_event_priority=30) await self._speak(greeting_to_speak, audio_event_priority=30)
self._touch_assistant_activity()
async def _play_preloaded_opener_audio(self) -> bool: async def _play_preloaded_opener_audio(self) -> bool:
""" """
@@ -2564,6 +2612,7 @@ class DuplexPipeline:
self._barge_in_silence_frames = 0 self._barge_in_silence_frames = 0
self._current_response_id = None self._current_response_id = None
self._current_tts_id = None self._current_tts_id = None
self._touch_assistant_activity()
async def _speak_sentence( async def _speak_sentence(
self, self,

View File

@@ -62,6 +62,12 @@ class _FakeLLM:
self._rounds = rounds self._rounds = rounds
self._call_index = 0 self._call_index = 0
async def generate(self, messages, temperature=0.7, max_tokens=None):
prompt = " ".join([str(getattr(m, "content", "")) for m in messages])
if "订单号" in prompt:
return "关于订单号这块,你还在线吗?"
return "你还在线吗?"
async def generate_stream(self, _messages, temperature=0.7, max_tokens=None): async def generate_stream(self, _messages, temperature=0.7, max_tokens=None):
idx = self._call_index idx = self._call_index
self._call_index += 1 self._call_index += 1
@@ -384,7 +390,9 @@ async def test_presence_probe_emits_contextual_direct_prompt(monkeypatch):
) )
await pipeline._shutdown_presence_probe_task() await pipeline._shutdown_presence_probe_task()
await pipeline.conversation.add_assistant_turn("请把你的订单号告诉我,我继续帮你处理。") await pipeline.conversation.add_assistant_turn("请把你的订单号告诉我,我继续帮你处理。")
pipeline._last_user_activity_ms = (time.monotonic() * 1000.0) - 8000.0 now_ms = time.monotonic() * 1000.0
pipeline._last_user_activity_ms = now_ms - 8000.0
pipeline._last_assistant_activity_ms = now_ms - 8000.0
fired = await pipeline._run_presence_probe_once() fired = await pipeline._run_presence_probe_once()
@@ -392,6 +400,7 @@ async def test_presence_probe_emits_contextual_direct_prompt(monkeypatch):
probe_text_events = [e for e in events if e.get("type") == "assistant.response.final"] probe_text_events = [e for e in events if e.get("type") == "assistant.response.final"]
assert probe_text_events assert probe_text_events
assert "订单号" in str(probe_text_events[-1].get("text") or "") assert "订单号" in str(probe_text_events[-1].get("text") or "")
assert "订单号" in str(pipeline.conversation.last_assistant_text or "")
assert any(e.get("type") == "output.audio.start" for e in events) assert any(e.get("type") == "output.audio.start" for e in events)
assert not any(e.get("type") == "assistant.tool_call" for e in events) assert not any(e.get("type") == "assistant.tool_call" for e in events)
@@ -412,7 +421,9 @@ async def test_presence_probe_respects_max_prompts_limit(monkeypatch):
) )
await pipeline._shutdown_presence_probe_task() await pipeline._shutdown_presence_probe_task()
await pipeline.conversation.add_assistant_turn("我们继续。") await pipeline.conversation.add_assistant_turn("我们继续。")
pipeline._last_user_activity_ms = (time.monotonic() * 1000.0) - 8000.0 now_ms = time.monotonic() * 1000.0
pipeline._last_user_activity_ms = now_ms - 8000.0
pipeline._last_assistant_activity_ms = now_ms - 8000.0
first_fired = await pipeline._run_presence_probe_once() first_fired = await pipeline._run_presence_probe_once()
second_fired = await pipeline._run_presence_probe_once( second_fired = await pipeline._run_presence_probe_once(
@@ -441,7 +452,9 @@ async def test_presence_probe_text_mode_emits_text_only(monkeypatch):
) )
await pipeline._shutdown_presence_probe_task() await pipeline._shutdown_presence_probe_task()
await pipeline.conversation.add_assistant_turn("我们继续。") await pipeline.conversation.add_assistant_turn("我们继续。")
pipeline._last_user_activity_ms = (time.monotonic() * 1000.0) - 8000.0 now_ms = time.monotonic() * 1000.0
pipeline._last_user_activity_ms = now_ms - 8000.0
pipeline._last_assistant_activity_ms = now_ms - 8000.0
fired = await pipeline._run_presence_probe_once() fired = await pipeline._run_presence_probe_once()
@@ -451,6 +464,28 @@ async def test_presence_probe_text_mode_emits_text_only(monkeypatch):
assert not any(e.get("type") == "output.audio.start" for e in events) assert not any(e.get("type") == "output.audio.start" for e in events)
@pytest.mark.asyncio
async def test_presence_probe_does_not_count_assistant_speaking_time_as_idle(monkeypatch):
pipeline, _events = _build_pipeline(monkeypatch, [[LLMStreamEvent(type="done")]])
pipeline.apply_runtime_overrides(
{
"presenceProbe": {
"enabled": True,
"idleSeconds": 10,
"cooldownSeconds": 5,
"maxPrompts": 1,
}
}
)
await pipeline._shutdown_presence_probe_task()
await pipeline.conversation.add_assistant_turn("我们继续。")
now_ms = time.monotonic() * 1000.0
pipeline._last_user_activity_ms = now_ms - 30_000.0
pipeline._last_assistant_activity_ms = now_ms - 2_000.0
assert pipeline._presence_probe_due(now_ms) is False
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_server_calculator_emits_tool_result(monkeypatch): async def test_server_calculator_emits_tool_result(monkeypatch):
pipeline, events = _build_pipeline( pipeline, events = _build_pipeline(

View File

@@ -3815,8 +3815,15 @@ export const DebugDrawer: React.FC<{
if (!finalText) return prev; if (!finalText) return prev;
const last = prev[prev.length - 1]; const last = prev[prev.length - 1];
if (last?.role === 'model') { if (last?.role === 'model') {
if (last.text === finalText) return prev; const sameResponse = Boolean(
if (finalText.startsWith(last.text) || last.text.startsWith(finalText)) { responseId
&& last.responseId
&& responseId === last.responseId
);
const bothWithoutResponseId = !responseId && !last.responseId;
const canMergeTail = sameResponse || bothWithoutResponseId;
if (canMergeTail && last.text === finalText) return prev;
if (canMergeTail && (finalText.startsWith(last.text) || last.text.startsWith(finalText))) {
const next = [...prev]; const next = [...prev];
next[next.length - 1] = { ...last, text: finalText }; next[next.length - 1] = { ...last, text: finalText };
return next; return next;