From ddf3a940cb48405842e8bcf7b16b5ec989bdb254 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Fri, 17 Jan 2025 10:28:58 -0500 Subject: [PATCH] Only start monitoring for user idleness once a User or Bot StartedSpeakingFrame is processed --- src/pipecat/processors/user_idle_processor.py | 61 +++++++++++++------ 1 file changed, 41 insertions(+), 20 deletions(-) diff --git a/src/pipecat/processors/user_idle_processor.py b/src/pipecat/processors/user_idle_processor.py index db3d32ffc..bf4a2fb94 100644 --- a/src/pipecat/processors/user_idle_processor.py +++ b/src/pipecat/processors/user_idle_processor.py @@ -13,7 +13,6 @@ from pipecat.frames.frames import ( CancelFrame, EndFrame, Frame, - StartFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame, ) @@ -26,11 +25,15 @@ class UserIdleProcessor(FrameProcessor): If the timeout is reached before any interaction occurred the provided callback will be called. The callback can be either: - - async def callback(processor: UserIdleProcessor) -> None # Legacy + - async def callback(processor: UserIdleProcessor) -> None # Old - async def callback(processor: UserIdleProcessor, retry_count: int) -> bool # New The new style callback receives the current retry count and should return True to continue monitoring or False to stop. + + The processor starts monitoring for idle time only after receiving the first + UserStartedSpeakingFrame or BotSpeakingFrame, ensuring that idle detection + begins when the actual conversation starts. """ def __init__( @@ -48,6 +51,9 @@ class UserIdleProcessor(FrameProcessor): self._timeout = timeout self._retry_count = 0 self._interrupted = False + self._conversation_started = False + self._idle_task = None + self._idle_event = asyncio.Event() def _wrap_callback( self, @@ -74,9 +80,17 @@ class UserIdleProcessor(FrameProcessor): return wrapper + def _create_idle_task(self): + """Create the idle task if it hasn't been created yet.""" + if self._idle_task is None: + self._idle_task = self.get_event_loop().create_task(self._idle_task_handler()) + async def _stop(self): - self._idle_task.cancel() - await self._idle_task + """Stop the idle task if it exists""" + if self._idle_task is not None: + self._idle_task.cancel() + await self._idle_task + self._idle_task = None async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) @@ -84,27 +98,34 @@ class UserIdleProcessor(FrameProcessor): # Check for end frames before processing if isinstance(frame, (EndFrame, CancelFrame)): await self.push_frame(frame, direction) # Push the frame down the pipeline - await self._stop() # Then stop the idle task - return # Return early since we're ending + if self._idle_task: + await self._stop() # Stop the idle task, if it exists + return await self.push_frame(frame, direction) - # We shouldn't call the idle callback if the user or the bot are speaking - if isinstance(frame, UserStartedSpeakingFrame): - self._interrupted = True - self._idle_event.set() - elif isinstance(frame, UserStoppedSpeakingFrame): - self._interrupted = False - self._idle_event.set() - elif isinstance(frame, BotSpeakingFrame): - self._idle_event.set() + # Start monitoring on first conversation activity + if not self._conversation_started and isinstance( + frame, (UserStartedSpeakingFrame, BotSpeakingFrame) + ): + self._conversation_started = True + self._create_idle_task() + + # Only process these events if conversation has started + if self._conversation_started: + # We shouldn't call the idle callback if the user or the bot are speaking + if isinstance(frame, UserStartedSpeakingFrame): + self._interrupted = True + self._idle_event.set() + elif isinstance(frame, UserStoppedSpeakingFrame): + self._interrupted = False + self._idle_event.set() + elif isinstance(frame, BotSpeakingFrame): + self._idle_event.set() async def cleanup(self): - await self._stop() - - def _create_idle_task(self): - self._idle_event = asyncio.Event() - self._idle_task = self.get_event_loop().create_task(self._idle_task_handler()) + if self._idle_task: # Only stop if task exists + await self._stop() async def _idle_task_handler(self): while True: