This commit is contained in:
Xin Wang
2026-05-29 12:37:41 +08:00

View File

@@ -8,6 +8,9 @@ from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
Frame,
LLMRunFrame,
OutputTransportMessageUrgentFrame,
TTSSpeakFrame,
@@ -189,7 +192,9 @@ async def run_pipeline_with_serializer(
),
idle_timeout_secs=config.session.inactivity_timeout_sec,
)
task.set_reached_upstream_filter((BotStartedSpeakingFrame, BotStoppedSpeakingFrame))
idle_prompt_count = 0
idle_prompt_speaking = False
idle_prompt_task: asyncio.Task | None = None
async def cancel_idle_prompt_timer() -> None:
@@ -200,7 +205,7 @@ async def run_pipeline_with_serializer(
await task.cancel_task(timer)
async def run_idle_prompt_timer() -> None:
nonlocal idle_prompt_count, idle_prompt_task
nonlocal idle_prompt_count, idle_prompt_speaking, idle_prompt_task
try:
await asyncio.sleep(config.turn.idle_prompt_timeout_sec)
@@ -215,6 +220,7 @@ async def run_pipeline_with_serializer(
"User idle prompt triggered "
f"count={idle_prompt_count}/{config.turn.idle_prompt_max_count}"
)
idle_prompt_speaking = True
await task.queue_frames([TTSSpeakFrame(text)])
finally:
if idle_prompt_task is asyncio.current_task():
@@ -267,17 +273,20 @@ async def run_pipeline_with_serializer(
await cancel_idle_prompt_timer()
await task.cancel()
@user_aggregator.event_handler("on_user_turn_started")
async def on_user_turn_started(_aggregator, _strategy):
nonlocal idle_prompt_count
idle_prompt_count = 0
await cancel_idle_prompt_timer()
@task.event_handler("on_frame_reached_upstream")
async def on_frame_reached_upstream(_task, frame: Frame):
nonlocal idle_prompt_count, idle_prompt_speaking
if isinstance(frame, BotStartedSpeakingFrame):
await cancel_idle_prompt_timer()
elif isinstance(frame, BotStoppedSpeakingFrame):
if idle_prompt_speaking:
idle_prompt_speaking = False
else:
idle_prompt_count = 0
await arm_idle_prompt_timer()
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(_aggregator, _strategy, message: UserTurnStoppedMessage):
nonlocal idle_prompt_count
idle_prompt_count = 0
await cancel_idle_prompt_timer()
logger.info(f"User: {message.content}")
text = (message.content or "").strip()
if not text:
@@ -296,10 +305,6 @@ async def run_pipeline_with_serializer(
# NOTE: assistant turn started/final events are emitted by
# ProductTextStreamProcessor, upstream of TTS, so text streams to the
# client ahead of audio. This logger is kept for server-side visibility.
@assistant_aggregator.event_handler("on_assistant_turn_started")
async def on_assistant_turn_started(_aggregator):
await cancel_idle_prompt_timer()
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
async def on_assistant_turn_stopped(_aggregator, message: AssistantTurnStoppedMessage):
logger.info(f"Assistant: {message.content}")
@@ -309,7 +314,6 @@ async def run_pipeline_with_serializer(
committed_text=message.content or "",
)
text_stream.take_interrupted_stream_text()
await arm_idle_prompt_timer()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)