diff --git a/engine/pipeline.py b/engine/pipeline.py index c4b1b0c..0c64139 100644 --- a/engine/pipeline.py +++ b/engine/pipeline.py @@ -8,6 +8,9 @@ from loguru import logger from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams from pipecat.frames.frames import ( + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, + Frame, LLMRunFrame, OutputTransportMessageUrgentFrame, TTSSpeakFrame, @@ -189,7 +192,9 @@ async def run_pipeline_with_serializer( ), idle_timeout_secs=config.session.inactivity_timeout_sec, ) + task.set_reached_upstream_filter((BotStartedSpeakingFrame, BotStoppedSpeakingFrame)) idle_prompt_count = 0 + idle_prompt_speaking = False idle_prompt_task: asyncio.Task | None = None async def cancel_idle_prompt_timer() -> None: @@ -200,7 +205,7 @@ async def run_pipeline_with_serializer( await task.cancel_task(timer) async def run_idle_prompt_timer() -> None: - nonlocal idle_prompt_count, idle_prompt_task + nonlocal idle_prompt_count, idle_prompt_speaking, idle_prompt_task try: await asyncio.sleep(config.turn.idle_prompt_timeout_sec) @@ -215,6 +220,7 @@ async def run_pipeline_with_serializer( "User idle prompt triggered " f"count={idle_prompt_count}/{config.turn.idle_prompt_max_count}" ) + idle_prompt_speaking = True await task.queue_frames([TTSSpeakFrame(text)]) finally: if idle_prompt_task is asyncio.current_task(): @@ -267,17 +273,20 @@ async def run_pipeline_with_serializer( await cancel_idle_prompt_timer() await task.cancel() - @user_aggregator.event_handler("on_user_turn_started") - async def on_user_turn_started(_aggregator, _strategy): - nonlocal idle_prompt_count - idle_prompt_count = 0 - await cancel_idle_prompt_timer() + @task.event_handler("on_frame_reached_upstream") + async def on_frame_reached_upstream(_task, frame: Frame): + nonlocal idle_prompt_count, idle_prompt_speaking + if isinstance(frame, BotStartedSpeakingFrame): + await cancel_idle_prompt_timer() + elif isinstance(frame, BotStoppedSpeakingFrame): + if idle_prompt_speaking: + idle_prompt_speaking = False + else: + idle_prompt_count = 0 + await arm_idle_prompt_timer() @user_aggregator.event_handler("on_user_turn_stopped") async def on_user_turn_stopped(_aggregator, _strategy, message: UserTurnStoppedMessage): - nonlocal idle_prompt_count - idle_prompt_count = 0 - await cancel_idle_prompt_timer() logger.info(f"User: {message.content}") text = (message.content or "").strip() if not text: @@ -296,10 +305,6 @@ async def run_pipeline_with_serializer( # NOTE: assistant turn started/final events are emitted by # ProductTextStreamProcessor, upstream of TTS, so text streams to the # client ahead of audio. This logger is kept for server-side visibility. - @assistant_aggregator.event_handler("on_assistant_turn_started") - async def on_assistant_turn_started(_aggregator): - await cancel_idle_prompt_timer() - @assistant_aggregator.event_handler("on_assistant_turn_stopped") async def on_assistant_turn_stopped(_aggregator, message: AssistantTurnStoppedMessage): logger.info(f"Assistant: {message.content}") @@ -309,7 +314,6 @@ async def run_pipeline_with_serializer( committed_text=message.content or "", ) text_stream.take_interrupted_stream_text() - await arm_idle_prompt_timer() runner = PipelineRunner(handle_sigint=False) await runner.run(task)