Only start monitoring for user idleness once a User or Bot StartedSpeakingFrame is processed
This commit is contained in:
@@ -13,7 +13,6 @@ from pipecat.frames.frames import (
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
Frame,
|
||||
StartFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
@@ -26,11 +25,15 @@ class UserIdleProcessor(FrameProcessor):
|
||||
If the timeout is reached before any interaction occurred the provided callback will be called.
|
||||
|
||||
The callback can be either:
|
||||
- async def callback(processor: UserIdleProcessor) -> None # Legacy
|
||||
- async def callback(processor: UserIdleProcessor) -> None # Old
|
||||
- async def callback(processor: UserIdleProcessor, retry_count: int) -> bool # New
|
||||
|
||||
The new style callback receives the current retry count and should return True
|
||||
to continue monitoring or False to stop.
|
||||
|
||||
The processor starts monitoring for idle time only after receiving the first
|
||||
UserStartedSpeakingFrame or BotSpeakingFrame, ensuring that idle detection
|
||||
begins when the actual conversation starts.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@@ -48,6 +51,9 @@ class UserIdleProcessor(FrameProcessor):
|
||||
self._timeout = timeout
|
||||
self._retry_count = 0
|
||||
self._interrupted = False
|
||||
self._conversation_started = False
|
||||
self._idle_task = None
|
||||
self._idle_event = asyncio.Event()
|
||||
|
||||
def _wrap_callback(
|
||||
self,
|
||||
@@ -74,9 +80,17 @@ class UserIdleProcessor(FrameProcessor):
|
||||
|
||||
return wrapper
|
||||
|
||||
def _create_idle_task(self):
|
||||
"""Create the idle task if it hasn't been created yet."""
|
||||
if self._idle_task is None:
|
||||
self._idle_task = self.get_event_loop().create_task(self._idle_task_handler())
|
||||
|
||||
async def _stop(self):
|
||||
self._idle_task.cancel()
|
||||
await self._idle_task
|
||||
"""Stop the idle task if it exists"""
|
||||
if self._idle_task is not None:
|
||||
self._idle_task.cancel()
|
||||
await self._idle_task
|
||||
self._idle_task = None
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
@@ -84,27 +98,34 @@ class UserIdleProcessor(FrameProcessor):
|
||||
# Check for end frames before processing
|
||||
if isinstance(frame, (EndFrame, CancelFrame)):
|
||||
await self.push_frame(frame, direction) # Push the frame down the pipeline
|
||||
await self._stop() # Then stop the idle task
|
||||
return # Return early since we're ending
|
||||
if self._idle_task:
|
||||
await self._stop() # Stop the idle task, if it exists
|
||||
return
|
||||
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
# We shouldn't call the idle callback if the user or the bot are speaking
|
||||
if isinstance(frame, UserStartedSpeakingFrame):
|
||||
self._interrupted = True
|
||||
self._idle_event.set()
|
||||
elif isinstance(frame, UserStoppedSpeakingFrame):
|
||||
self._interrupted = False
|
||||
self._idle_event.set()
|
||||
elif isinstance(frame, BotSpeakingFrame):
|
||||
self._idle_event.set()
|
||||
# Start monitoring on first conversation activity
|
||||
if not self._conversation_started and isinstance(
|
||||
frame, (UserStartedSpeakingFrame, BotSpeakingFrame)
|
||||
):
|
||||
self._conversation_started = True
|
||||
self._create_idle_task()
|
||||
|
||||
# Only process these events if conversation has started
|
||||
if self._conversation_started:
|
||||
# We shouldn't call the idle callback if the user or the bot are speaking
|
||||
if isinstance(frame, UserStartedSpeakingFrame):
|
||||
self._interrupted = True
|
||||
self._idle_event.set()
|
||||
elif isinstance(frame, UserStoppedSpeakingFrame):
|
||||
self._interrupted = False
|
||||
self._idle_event.set()
|
||||
elif isinstance(frame, BotSpeakingFrame):
|
||||
self._idle_event.set()
|
||||
|
||||
async def cleanup(self):
|
||||
await self._stop()
|
||||
|
||||
def _create_idle_task(self):
|
||||
self._idle_event = asyncio.Event()
|
||||
self._idle_task = self.get_event_loop().create_task(self._idle_task_handler())
|
||||
if self._idle_task: # Only stop if task exists
|
||||
await self._stop()
|
||||
|
||||
async def _idle_task_handler(self):
|
||||
while True:
|
||||
|
||||
Reference in New Issue
Block a user