Compare commits

...

8 Commits

Author SHA1 Message Date
Mark Backman
66b56b1edf Remove 31 example and update 17 example 2025-01-17 11:21:13 -05:00
Mark Backman
93743fdcbc Update CHANGELOG 2025-01-17 10:51:31 -05:00
Mark Backman
d43bc51531 Reset the retry counter when the user speaks 2025-01-17 10:51:31 -05:00
Mark Backman
a4ddf0645f Fix issue cancelling task during cleanup 2025-01-17 10:51:31 -05:00
Mark Backman
15af5e0cd6 Improve docstrings and fix type hinting issue by casting callback to new style callback 2025-01-17 10:51:31 -05:00
Mark Backman
ddf3a940cb Only start monitoring for user idleness once a User or Bot StartedSpeakingFrame is processed 2025-01-17 10:51:30 -05:00
Mark Backman
195fd43712 Fix an issue where the UserIdleProcessor was blocking the EndFrame 2025-01-17 10:50:45 -05:00
Mark Backman
7c6f45e2bc Add callbacks to the UserIdleProcessor 2025-01-17 10:49:14 -05:00
3 changed files with 168 additions and 38 deletions

View File

@@ -56,6 +56,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Updated the `17-detect-user-idle.py` to show how to use the `retry_count`.
- Enhanced `UserIdleProcessor` with retry counting functionality. Callbacks now
support an optional `retry_count` parameter to implement escalating responses
to user inactivity.
- Modified `UserIdleProcessor` to start monitoring only after first
conversation activity (`UserStartedSpeakingFrame` or
`BotStartedSpeakingFrame`) instead of immediately.
- Modified `OpenAIAssistantContextAggregator` to support controlled completions
and to emit context update callbacks via `FunctionCallResultProperties`.
@@ -79,6 +89,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed `UserIdleProcessor` not properly propagating `EndFrame`s through the
pipeline.
- Fixed an issue where websocket based TTS services could incorrectly terminate
their connection due to a retry counter not resetting.

View File

@@ -14,7 +14,7 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.frames.frames import EndFrame, LLMMessagesFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -63,16 +63,36 @@ async def main():
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
async def user_idle_callback(user_idle: UserIdleProcessor):
messages.append(
{
"role": "system",
"content": "Ask the user if they are still there and try to prompt for some input, but be short.",
}
)
await user_idle.push_frame(LLMMessagesFrame(messages))
async def handle_user_idle(processor: UserIdleProcessor, retry_count: int) -> bool:
if retry_count == 1:
# First attempt: Add a gentle prompt to the conversation
messages.append(
{
"role": "system",
"content": "The user has been quiet for a while. Politely and concisely ask if they're still there.",
}
)
await user_idle.push_frame(LLMMessagesFrame(messages))
return True
elif retry_count == 2:
# Second attempt: More direct prompt
messages.append(
{
"role": "system",
"content": "The user is still inactive. Concisely ask if they would like to continue the conversation.",
}
)
await user_idle.push_frame(LLMMessagesFrame(messages))
return True
else:
# Third attempt: End the conversation
await user_idle.push_frame(
TTSSpeakFrame("It seems like you're busy right now. Have a nice day!")
)
await task.queue_frame(EndFrame())
return False
user_idle = UserIdleProcessor(callback=user_idle_callback, timeout=5.0)
user_idle = UserIdleProcessor(callback=handle_user_idle, timeout=5.0)
pipeline = Pipeline(
[
@@ -102,6 +122,10 @@ async def main():
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
runner = PipelineRunner()
await runner.run(task)

View File

@@ -5,14 +5,14 @@
#
import asyncio
from typing import Awaitable, Callable
from functools import wraps
from typing import Awaitable, Callable, Union, cast
from pipecat.frames.frames import (
BotSpeakingFrame,
CancelFrame,
EndFrame,
Frame,
StartFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
@@ -20,63 +20,156 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class UserIdleProcessor(FrameProcessor):
"""This class is useful to check if the user is interacting with the bot
within a given timeout. If the timeout is reached before any interaction
occurred the provided callback will be called.
"""Monitors user inactivity and triggers callbacks after timeout periods.
Starts monitoring only after the first conversation activity (UserStartedSpeaking
or BotSpeaking). Supports both legacy and new-style callbacks for handling idle events.
Args:
callback: Function to call when user is idle. Can be either:
- Legacy: async def(processor) -> None
- New: async def(processor, retry_count: int) -> bool
timeout: Seconds to wait before considering user idle
**kwargs: Additional arguments passed to FrameProcessor
Example:
async def handle_idle(processor, retry_count: int) -> bool:
if retry_count <= 3:
await send_reminder()
return True
return False
processor = UserIdleProcessor(
callback=handle_idle,
timeout=5.0
)
"""
def __init__(
self,
*,
callback: Callable[["UserIdleProcessor"], Awaitable[None]],
callback: Union[
Callable[["UserIdleProcessor"], Awaitable[None]], # Old signature
Callable[["UserIdleProcessor", int], Awaitable[bool]], # New signature
],
timeout: float,
**kwargs,
):
super().__init__(**kwargs)
self._callback = callback
self._callback = self._wrap_callback(callback)
self._timeout = timeout
self._retry_count = 0
self._interrupted = False
self._conversation_started = False
self._idle_task = None
self._idle_event = asyncio.Event()
def _wrap_callback(
self,
callback: Union[
Callable[["UserIdleProcessor"], Awaitable[None]],
Callable[["UserIdleProcessor", int], Awaitable[bool]],
],
) -> Callable[["UserIdleProcessor", int], Awaitable[bool]]:
"""Wraps callback to support both old and new-style signatures.
Returns:
Wrapped callback that returns bool to indicate whether to continue monitoring
"""
# Cast the callback to the new-style signature for wraps
wrapped_cb = cast(Callable[["UserIdleProcessor", int], Awaitable[bool]], callback)
@wraps(wrapped_cb)
async def wrapper(processor: "UserIdleProcessor", retry_count: int) -> bool:
# Check callback signature
import inspect
sig = inspect.signature(callback)
param_count = len(sig.parameters)
if param_count == 1:
# Old callback
await callback(processor) # type: ignore
return True # Always continue for backwards compatibility
else:
# New callback
return await callback(processor, retry_count) # type: ignore
return wrapper
def _create_idle_task(self):
"""Create the idle task if it hasn't been created yet."""
if self._idle_task is None:
self._idle_task = self.get_event_loop().create_task(self._idle_task_handler())
async def _stop(self):
self._idle_task.cancel()
await self._idle_task
"""Stops and cleans up the idle monitoring task."""
if self._idle_task is not None:
self._idle_task.cancel()
try:
await self._idle_task
except asyncio.CancelledError:
pass # Expected when task is cancelled
self._idle_task = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Processes incoming frames and manages idle monitoring state.
Args:
frame: The frame to process
direction: Direction of the frame flow
"""
await super().process_frame(frame, direction)
# Check for end frames before processing
if isinstance(frame, StartFrame):
self._create_idle_task()
elif isinstance(frame, (EndFrame, CancelFrame)):
await self._stop()
if isinstance(frame, (EndFrame, CancelFrame)):
await self.push_frame(frame, direction) # Push the frame down the pipeline
if self._idle_task:
await self._stop() # Stop the idle task, if it exists
return
await self.push_frame(frame, direction)
# We shouldn't call the idle callback if the user or the bot are speaking
if isinstance(frame, UserStartedSpeakingFrame):
self._interrupted = True
self._idle_event.set()
elif isinstance(frame, UserStoppedSpeakingFrame):
self._interrupted = False
self._idle_event.set()
elif isinstance(frame, BotSpeakingFrame):
self._idle_event.set()
# Start monitoring on first conversation activity
if not self._conversation_started and isinstance(
frame, (UserStartedSpeakingFrame, BotSpeakingFrame)
):
self._conversation_started = True
self._create_idle_task()
# Only process these events if conversation has started
if self._conversation_started:
# We shouldn't call the idle callback if the user or the bot are speaking
if isinstance(frame, UserStartedSpeakingFrame):
self._retry_count = 0
self._interrupted = True
self._idle_event.set()
elif isinstance(frame, UserStoppedSpeakingFrame):
self._interrupted = False
self._idle_event.set()
elif isinstance(frame, BotSpeakingFrame):
self._idle_event.set()
async def cleanup(self):
await self._stop()
def _create_idle_task(self):
self._idle_event = asyncio.Event()
self._idle_task = self.get_event_loop().create_task(self._idle_task_handler())
"""Cleans up resources when processor is shutting down."""
if self._idle_task: # Only stop if task exists
await self._stop()
async def _idle_task_handler(self):
"""Monitors for idle timeout and triggers callbacks.
Runs in a loop until cancelled or callback indicates completion.
"""
while True:
try:
await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout)
except asyncio.TimeoutError:
if not self._interrupted:
await self._callback(self)
self._retry_count += 1
should_continue = await self._callback(self, self._retry_count)
if not should_continue:
await self._stop()
break
except asyncio.CancelledError:
break
finally: