Merge pull request #1081 from pipecat-ai/mb/user-idle-add-retry

Added retry functionality and a new callback to the UserIdleProcessor
This commit is contained in:
Mark Backman
2025-01-27 10:30:45 -05:00
committed by GitHub
3 changed files with 103 additions and 23 deletions

View File

@@ -12,7 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- It is now possible to specify the period of the `PipelineTask` heartbeat
frames with `heartbeats_period_secs`.
- Added `DailyMeetingTokenProperties` and `DailyMeetingTokenParams` Pydantic models
- Added `DailyMeetingTokenProperties` and `DailyMeetingTokenParams` Pydantic models
for meeting token creation in `get_token` method of `DailyRESTHelper`.
- Added `enable_recording` and `geo` parameters to `DailyRoomProperties`.
@@ -21,6 +21,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Enhanced `UserIdleProcessor` with retry functionality and control over idle
monitoring via new callback signature `(processor, retry_count) -> bool`.
Updated the `17-detect-user-idle.py` to show how to use the `retry_count`.
- Add defensive error handling for `OpenAIRealtimeBetaLLMService`'s audio
truncation. Audio truncation errors during interruptions now log a warning
and allow the session to continue instead of throwing an exception.

View File

@@ -14,7 +14,7 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.frames.frames import EndFrame, LLMMessagesFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -63,16 +63,36 @@ async def main():
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
async def user_idle_callback(user_idle: UserIdleProcessor):
messages.append(
{
"role": "system",
"content": "Ask the user if they are still there and try to prompt for some input, but be short.",
}
)
await user_idle.push_frame(LLMMessagesFrame(messages))
async def handle_user_idle(user_idle: UserIdleProcessor, retry_count: int) -> bool:
if retry_count == 1:
# First attempt: Add a gentle prompt to the conversation
messages.append(
{
"role": "system",
"content": "The user has been quiet. Politely and briefly ask if they're still there.",
}
)
await user_idle.push_frame(LLMMessagesFrame(messages))
return True
elif retry_count == 2:
# Second attempt: More direct prompt
messages.append(
{
"role": "system",
"content": "The user is still inactive. Ask if they'd like to continue our conversation.",
}
)
await user_idle.push_frame(LLMMessagesFrame(messages))
return True
else:
# Third attempt: End the conversation
await user_idle.push_frame(
TTSSpeakFrame("It seems like you're busy right now. Have a nice day!")
)
await task.queue_frame(EndFrame())
return False
user_idle = UserIdleProcessor(callback=user_idle_callback, timeout=5.0)
user_idle = UserIdleProcessor(callback=handle_user_idle, timeout=5.0)
pipeline = Pipeline(
[

View File

@@ -5,7 +5,8 @@
#
import asyncio
from typing import Awaitable, Callable
import inspect
from typing import Awaitable, Callable, Union
from pipecat.frames.frames import (
BotSpeakingFrame,
@@ -25,11 +26,23 @@ class UserIdleProcessor(FrameProcessor):
or BotSpeaking).
Args:
callback: Function to call when user is idle
callback: Function to call when user is idle. Can be either:
- Basic callback(processor) -> None
- Retry callback(processor, retry_count) -> bool
Return True to continue monitoring for idle events,
Return False to stop the idle monitoring task
timeout: Seconds to wait before considering user idle
**kwargs: Additional arguments passed to FrameProcessor
Example:
# Retry callback:
async def handle_idle(processor: "UserIdleProcessor", retry_count: int) -> bool:
if retry_count < 3:
await send_reminder("Are you still there?")
return True
return False
# Basic callback:
async def handle_idle(processor: "UserIdleProcessor") -> None:
await send_reminder("Are you still there?")
@@ -42,24 +55,62 @@ class UserIdleProcessor(FrameProcessor):
def __init__(
self,
*,
callback: Callable[["UserIdleProcessor"], Awaitable[None]],
callback: Union[
Callable[["UserIdleProcessor"], Awaitable[None]], # Basic
Callable[["UserIdleProcessor", int], Awaitable[bool]], # Retry
],
timeout: float,
**kwargs,
):
super().__init__(**kwargs)
self._callback = callback
self._callback = self._wrap_callback(callback)
self._timeout = timeout
self._retry_count = 0
self._interrupted = False
self._conversation_started = False
self._idle_task = None
self._idle_event = asyncio.Event()
def _create_idle_task(self):
"""Create the idle task if it hasn't been created yet."""
def _wrap_callback(
self,
callback: Union[
Callable[["UserIdleProcessor"], Awaitable[None]],
Callable[["UserIdleProcessor", int], Awaitable[bool]],
],
) -> Callable[["UserIdleProcessor", int], Awaitable[bool]]:
"""Wraps callback to support both basic and retry signatures.
Args:
callback: The callback function to wrap.
Returns:
A wrapped callback that returns bool to indicate whether to continue monitoring.
"""
sig = inspect.signature(callback)
param_count = len(sig.parameters)
async def wrapper(processor: "UserIdleProcessor", retry_count: int) -> bool:
if param_count == 1:
# Basic callback
await callback(processor) # type: ignore
return True
else:
# Retry callback
return await callback(processor, retry_count) # type: ignore
return wrapper
def _create_idle_task(self) -> None:
"""Creates the idle task if it hasn't been created yet."""
if self._idle_task is None:
self._idle_task = self.get_event_loop().create_task(self._idle_task_handler())
async def _stop(self):
@property
def retry_count(self) -> int:
"""Returns the current retry count."""
return self._retry_count
async def _stop(self) -> None:
"""Stops and cleans up the idle monitoring task."""
if self._idle_task is not None:
self._idle_task.cancel()
@@ -69,7 +120,7 @@ class UserIdleProcessor(FrameProcessor):
pass # Expected when task is cancelled
self._idle_task = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
"""Processes incoming frames and manages idle monitoring state.
Args:
@@ -98,6 +149,7 @@ class UserIdleProcessor(FrameProcessor):
if self._conversation_started:
# We shouldn't call the idle callback if the user or the bot are speaking
if isinstance(frame, UserStartedSpeakingFrame):
self._retry_count = 0 # Reset retry count when user speaks
self._interrupted = True
self._idle_event.set()
elif isinstance(frame, UserStoppedSpeakingFrame):
@@ -106,22 +158,26 @@ class UserIdleProcessor(FrameProcessor):
elif isinstance(frame, BotSpeakingFrame):
self._idle_event.set()
async def cleanup(self):
async def cleanup(self) -> None:
"""Cleans up resources when processor is shutting down."""
if self._idle_task: # Only stop if task exists
await self._stop()
async def _idle_task_handler(self):
async def _idle_task_handler(self) -> None:
"""Monitors for idle timeout and triggers callbacks.
Runs in a loop until cancelled.
Runs in a loop until cancelled or callback indicates completion.
"""
while True:
try:
await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout)
except asyncio.TimeoutError:
if not self._interrupted:
await self._callback(self)
self._retry_count += 1
should_continue = await self._callback(self, self._retry_count)
if not should_continue:
await self._stop()
break
except asyncio.CancelledError:
break
finally: