From 6ebb20ae509c127806d5bce12f1f6bcab66aea85 Mon Sep 17 00:00:00 2001 From: Xin Wang Date: Fri, 29 May 2026 12:22:55 +0800 Subject: [PATCH] Update idle timer --- engine/pipeline.py | 76 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 64 insertions(+), 12 deletions(-) diff --git a/engine/pipeline.py b/engine/pipeline.py index 75361dd..c4b1b0c 100644 --- a/engine/pipeline.py +++ b/engine/pipeline.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import uuid from loguru import logger @@ -146,7 +147,6 @@ async def run_pipeline_with_serializer( user_params=LLMUserAggregatorParams( vad_analyzer=SileroVADAnalyzer(params=vad_params), user_turn_strategies=user_turn_strategies, - user_idle_timeout=config.turn.idle_prompt_timeout_sec, ), ) @@ -190,6 +190,55 @@ async def run_pipeline_with_serializer( idle_timeout_secs=config.session.inactivity_timeout_sec, ) idle_prompt_count = 0 + idle_prompt_task: asyncio.Task | None = None + + async def cancel_idle_prompt_timer() -> None: + nonlocal idle_prompt_task + timer = idle_prompt_task + idle_prompt_task = None + if timer and not timer.done(): + await task.cancel_task(timer) + + async def run_idle_prompt_timer() -> None: + nonlocal idle_prompt_count, idle_prompt_task + try: + await asyncio.sleep(config.turn.idle_prompt_timeout_sec) + + text = config.turn.idle_prompt_text.strip() + if not text or config.turn.idle_prompt_max_count <= 0: + return + if idle_prompt_count >= config.turn.idle_prompt_max_count: + return + + idle_prompt_count += 1 + logger.info( + "User idle prompt triggered " + f"count={idle_prompt_count}/{config.turn.idle_prompt_max_count}" + ) + await task.queue_frames([TTSSpeakFrame(text)]) + finally: + if idle_prompt_task is asyncio.current_task(): + idle_prompt_task = None + + async def arm_idle_prompt_timer() -> None: + nonlocal idle_prompt_task + await cancel_idle_prompt_timer() + + text = config.turn.idle_prompt_text.strip() + if ( + config.turn.idle_prompt_timeout_sec <= 0 + or config.turn.idle_prompt_max_count <= 0 + or not text + or idle_prompt_count >= config.turn.idle_prompt_max_count + ): + return + + logger.debug( + "Arming user idle prompt timer " + f"timeout={config.turn.idle_prompt_timeout_sec}s " + f"count={idle_prompt_count}/{config.turn.idle_prompt_max_count}" + ) + idle_prompt_task = task.create_task(run_idle_prompt_timer()) @transport.event_handler("on_client_connected") async def on_client_connected(_transport, _client): @@ -209,17 +258,26 @@ async def run_pipeline_with_serializer( @transport.event_handler("on_client_disconnected") async def on_client_disconnected(_transport, _client): logger.info(f"{client_label} websocket client disconnected") + await cancel_idle_prompt_timer() await task.cancel() @transport.event_handler("on_session_timeout") async def on_session_timeout(_transport, _client): logger.info(f"{client_label} websocket session timed out") + await cancel_idle_prompt_timer() await task.cancel() + @user_aggregator.event_handler("on_user_turn_started") + async def on_user_turn_started(_aggregator, _strategy): + nonlocal idle_prompt_count + idle_prompt_count = 0 + await cancel_idle_prompt_timer() + @user_aggregator.event_handler("on_user_turn_stopped") async def on_user_turn_stopped(_aggregator, _strategy, message: UserTurnStoppedMessage): nonlocal idle_prompt_count idle_prompt_count = 0 + await cancel_idle_prompt_timer() logger.info(f"User: {message.content}") text = (message.content or "").strip() if not text: @@ -235,20 +293,13 @@ async def run_pipeline_with_serializer( ) ) - @user_aggregator.event_handler("on_user_turn_idle") - async def on_user_turn_idle(_aggregator): - nonlocal idle_prompt_count - text = config.turn.idle_prompt_text.strip() - if not text or idle_prompt_count >= config.turn.idle_prompt_max_count: - return - - idle_prompt_count += 1 - logger.info("User idle prompt triggered") - await task.queue_frames([TTSSpeakFrame(text)]) - # NOTE: assistant turn started/final events are emitted by # ProductTextStreamProcessor, upstream of TTS, so text streams to the # client ahead of audio. This logger is kept for server-side visibility. + @assistant_aggregator.event_handler("on_assistant_turn_started") + async def on_assistant_turn_started(_aggregator): + await cancel_idle_prompt_timer() + @assistant_aggregator.event_handler("on_assistant_turn_stopped") async def on_assistant_turn_stopped(_aggregator, message: AssistantTurnStoppedMessage): logger.info(f"Assistant: {message.content}") @@ -258,6 +309,7 @@ async def run_pipeline_with_serializer( committed_text=message.content or "", ) text_stream.take_interrupted_stream_text() + await arm_idle_prompt_timer() runner = PipelineRunner(handle_sigint=False) await runner.run(task)