diff --git a/CHANGELOG.md b/CHANGELOG.md index be58657a0..07696a739 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,6 +56,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- Modified `UserIdleProcessor` to start monitoring only after first + conversation activity (`UserStartedSpeakingFrame` or + `BotStartedSpeakingFrame`) instead of immediately. + - Modified `OpenAIAssistantContextAggregator` to support controlled completions and to emit context update callbacks via `FunctionCallResultProperties`. @@ -79,6 +83,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed `UserIdleProcessor` not properly propagating `EndFrame`s through the + pipeline. + - Fixed an issue where websocket based TTS services could incorrectly terminate their connection due to a retry counter not resetting. diff --git a/src/pipecat/processors/user_idle_processor.py b/src/pipecat/processors/user_idle_processor.py index 8f40fd52e..3a7202c80 100644 --- a/src/pipecat/processors/user_idle_processor.py +++ b/src/pipecat/processors/user_idle_processor.py @@ -12,7 +12,6 @@ from pipecat.frames.frames import ( CancelFrame, EndFrame, Frame, - StartFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame, ) @@ -20,10 +19,24 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor class UserIdleProcessor(FrameProcessor): - """This class is useful to check if the user is interacting with the bot - within a given timeout. If the timeout is reached before any interaction - occurred the provided callback will be called. + """Monitors user inactivity and triggers callbacks after timeout periods. + Starts monitoring only after the first conversation activity (UserStartedSpeaking + or BotSpeaking). + + Args: + callback: Function to call when user is idle + timeout: Seconds to wait before considering user idle + **kwargs: Additional arguments passed to FrameProcessor + + Example: + async def handle_idle(processor: "UserIdleProcessor") -> None: + await send_reminder("Are you still there?") + + processor = UserIdleProcessor( + callback=handle_idle, + timeout=5.0 + ) """ def __init__( @@ -37,40 +50,72 @@ class UserIdleProcessor(FrameProcessor): self._callback = callback self._timeout = timeout self._interrupted = False + self._conversation_started = False + self._idle_task = None + self._idle_event = asyncio.Event() + + def _create_idle_task(self): + """Create the idle task if it hasn't been created yet.""" + if self._idle_task is None: + self._idle_task = self.get_event_loop().create_task(self._idle_task_handler()) async def _stop(self): - self._idle_task.cancel() - await self._idle_task + """Stops and cleans up the idle monitoring task.""" + if self._idle_task is not None: + self._idle_task.cancel() + try: + await self._idle_task + except asyncio.CancelledError: + pass # Expected when task is cancelled + self._idle_task = None async def process_frame(self, frame: Frame, direction: FrameDirection): + """Processes incoming frames and manages idle monitoring state. + + Args: + frame: The frame to process + direction: Direction of the frame flow + """ await super().process_frame(frame, direction) # Check for end frames before processing - if isinstance(frame, StartFrame): - self._create_idle_task() - elif isinstance(frame, (EndFrame, CancelFrame)): - await self._stop() + if isinstance(frame, (EndFrame, CancelFrame)): + await self.push_frame(frame, direction) # Push the frame down the pipeline + if self._idle_task: + await self._stop() # Stop the idle task, if it exists + return await self.push_frame(frame, direction) - # We shouldn't call the idle callback if the user or the bot are speaking - if isinstance(frame, UserStartedSpeakingFrame): - self._interrupted = True - self._idle_event.set() - elif isinstance(frame, UserStoppedSpeakingFrame): - self._interrupted = False - self._idle_event.set() - elif isinstance(frame, BotSpeakingFrame): - self._idle_event.set() + # Start monitoring on first conversation activity + if not self._conversation_started and isinstance( + frame, (UserStartedSpeakingFrame, BotSpeakingFrame) + ): + self._conversation_started = True + self._create_idle_task() + + # Only process these events if conversation has started + if self._conversation_started: + # We shouldn't call the idle callback if the user or the bot are speaking + if isinstance(frame, UserStartedSpeakingFrame): + self._interrupted = True + self._idle_event.set() + elif isinstance(frame, UserStoppedSpeakingFrame): + self._interrupted = False + self._idle_event.set() + elif isinstance(frame, BotSpeakingFrame): + self._idle_event.set() async def cleanup(self): - await self._stop() - - def _create_idle_task(self): - self._idle_event = asyncio.Event() - self._idle_task = self.get_event_loop().create_task(self._idle_task_handler()) + """Cleans up resources when processor is shutting down.""" + if self._idle_task: # Only stop if task exists + await self._stop() async def _idle_task_handler(self): + """Monitors for idle timeout and triggers callbacks. + + Runs in a loop until cancelled. + """ while True: try: await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout)