Compare commits
8 Commits
hush/prere
...
mb/user-id
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
66b56b1edf | ||
|
|
93743fdcbc | ||
|
|
d43bc51531 | ||
|
|
a4ddf0645f | ||
|
|
15af5e0cd6 | ||
|
|
ddf3a940cb | ||
|
|
195fd43712 | ||
|
|
7c6f45e2bc |
13
CHANGELOG.md
13
CHANGELOG.md
@@ -56,6 +56,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Changed
|
||||
|
||||
- Updated the `17-detect-user-idle.py` to show how to use the `retry_count`.
|
||||
|
||||
- Enhanced `UserIdleProcessor` with retry counting functionality. Callbacks now
|
||||
support an optional `retry_count` parameter to implement escalating responses
|
||||
to user inactivity.
|
||||
|
||||
- Modified `UserIdleProcessor` to start monitoring only after first
|
||||
conversation activity (`UserStartedSpeakingFrame` or
|
||||
`BotStartedSpeakingFrame`) instead of immediately.
|
||||
|
||||
- Modified `OpenAIAssistantContextAggregator` to support controlled completions
|
||||
and to emit context update callbacks via `FunctionCallResultProperties`.
|
||||
|
||||
@@ -79,6 +89,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed `UserIdleProcessor` not properly propagating `EndFrame`s through the
|
||||
pipeline.
|
||||
|
||||
- Fixed an issue where websocket based TTS services could incorrectly terminate
|
||||
their connection due to a retry counter not resetting.
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ from loguru import logger
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import LLMMessagesFrame
|
||||
from pipecat.frames.frames import EndFrame, LLMMessagesFrame, TTSSpeakFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -63,16 +63,36 @@ async def main():
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
async def user_idle_callback(user_idle: UserIdleProcessor):
|
||||
messages.append(
|
||||
{
|
||||
"role": "system",
|
||||
"content": "Ask the user if they are still there and try to prompt for some input, but be short.",
|
||||
}
|
||||
)
|
||||
await user_idle.push_frame(LLMMessagesFrame(messages))
|
||||
async def handle_user_idle(processor: UserIdleProcessor, retry_count: int) -> bool:
|
||||
if retry_count == 1:
|
||||
# First attempt: Add a gentle prompt to the conversation
|
||||
messages.append(
|
||||
{
|
||||
"role": "system",
|
||||
"content": "The user has been quiet for a while. Politely and concisely ask if they're still there.",
|
||||
}
|
||||
)
|
||||
await user_idle.push_frame(LLMMessagesFrame(messages))
|
||||
return True
|
||||
elif retry_count == 2:
|
||||
# Second attempt: More direct prompt
|
||||
messages.append(
|
||||
{
|
||||
"role": "system",
|
||||
"content": "The user is still inactive. Concisely ask if they would like to continue the conversation.",
|
||||
}
|
||||
)
|
||||
await user_idle.push_frame(LLMMessagesFrame(messages))
|
||||
return True
|
||||
else:
|
||||
# Third attempt: End the conversation
|
||||
await user_idle.push_frame(
|
||||
TTSSpeakFrame("It seems like you're busy right now. Have a nice day!")
|
||||
)
|
||||
await task.queue_frame(EndFrame())
|
||||
return False
|
||||
|
||||
user_idle = UserIdleProcessor(callback=user_idle_callback, timeout=5.0)
|
||||
user_idle = UserIdleProcessor(callback=handle_user_idle, timeout=5.0)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
@@ -102,6 +122,10 @@ async def main():
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
async def on_participant_left(transport, participant, reason):
|
||||
await task.queue_frame(EndFrame())
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
@@ -5,14 +5,14 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
from typing import Awaitable, Callable
|
||||
from functools import wraps
|
||||
from typing import Awaitable, Callable, Union, cast
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
BotSpeakingFrame,
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
Frame,
|
||||
StartFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
@@ -20,63 +20,156 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
|
||||
|
||||
class UserIdleProcessor(FrameProcessor):
|
||||
"""This class is useful to check if the user is interacting with the bot
|
||||
within a given timeout. If the timeout is reached before any interaction
|
||||
occurred the provided callback will be called.
|
||||
"""Monitors user inactivity and triggers callbacks after timeout periods.
|
||||
|
||||
Starts monitoring only after the first conversation activity (UserStartedSpeaking
|
||||
or BotSpeaking). Supports both legacy and new-style callbacks for handling idle events.
|
||||
|
||||
Args:
|
||||
callback: Function to call when user is idle. Can be either:
|
||||
- Legacy: async def(processor) -> None
|
||||
- New: async def(processor, retry_count: int) -> bool
|
||||
timeout: Seconds to wait before considering user idle
|
||||
**kwargs: Additional arguments passed to FrameProcessor
|
||||
|
||||
Example:
|
||||
async def handle_idle(processor, retry_count: int) -> bool:
|
||||
if retry_count <= 3:
|
||||
await send_reminder()
|
||||
return True
|
||||
return False
|
||||
|
||||
processor = UserIdleProcessor(
|
||||
callback=handle_idle,
|
||||
timeout=5.0
|
||||
)
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
callback: Callable[["UserIdleProcessor"], Awaitable[None]],
|
||||
callback: Union[
|
||||
Callable[["UserIdleProcessor"], Awaitable[None]], # Old signature
|
||||
Callable[["UserIdleProcessor", int], Awaitable[bool]], # New signature
|
||||
],
|
||||
timeout: float,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
self._callback = callback
|
||||
self._callback = self._wrap_callback(callback)
|
||||
self._timeout = timeout
|
||||
self._retry_count = 0
|
||||
self._interrupted = False
|
||||
self._conversation_started = False
|
||||
self._idle_task = None
|
||||
self._idle_event = asyncio.Event()
|
||||
|
||||
def _wrap_callback(
|
||||
self,
|
||||
callback: Union[
|
||||
Callable[["UserIdleProcessor"], Awaitable[None]],
|
||||
Callable[["UserIdleProcessor", int], Awaitable[bool]],
|
||||
],
|
||||
) -> Callable[["UserIdleProcessor", int], Awaitable[bool]]:
|
||||
"""Wraps callback to support both old and new-style signatures.
|
||||
|
||||
Returns:
|
||||
Wrapped callback that returns bool to indicate whether to continue monitoring
|
||||
"""
|
||||
# Cast the callback to the new-style signature for wraps
|
||||
wrapped_cb = cast(Callable[["UserIdleProcessor", int], Awaitable[bool]], callback)
|
||||
|
||||
@wraps(wrapped_cb)
|
||||
async def wrapper(processor: "UserIdleProcessor", retry_count: int) -> bool:
|
||||
# Check callback signature
|
||||
import inspect
|
||||
|
||||
sig = inspect.signature(callback)
|
||||
param_count = len(sig.parameters)
|
||||
|
||||
if param_count == 1:
|
||||
# Old callback
|
||||
await callback(processor) # type: ignore
|
||||
return True # Always continue for backwards compatibility
|
||||
else:
|
||||
# New callback
|
||||
return await callback(processor, retry_count) # type: ignore
|
||||
|
||||
return wrapper
|
||||
|
||||
def _create_idle_task(self):
|
||||
"""Create the idle task if it hasn't been created yet."""
|
||||
if self._idle_task is None:
|
||||
self._idle_task = self.get_event_loop().create_task(self._idle_task_handler())
|
||||
|
||||
async def _stop(self):
|
||||
self._idle_task.cancel()
|
||||
await self._idle_task
|
||||
"""Stops and cleans up the idle monitoring task."""
|
||||
if self._idle_task is not None:
|
||||
self._idle_task.cancel()
|
||||
try:
|
||||
await self._idle_task
|
||||
except asyncio.CancelledError:
|
||||
pass # Expected when task is cancelled
|
||||
self._idle_task = None
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
"""Processes incoming frames and manages idle monitoring state.
|
||||
|
||||
Args:
|
||||
frame: The frame to process
|
||||
direction: Direction of the frame flow
|
||||
"""
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
# Check for end frames before processing
|
||||
if isinstance(frame, StartFrame):
|
||||
self._create_idle_task()
|
||||
elif isinstance(frame, (EndFrame, CancelFrame)):
|
||||
await self._stop()
|
||||
if isinstance(frame, (EndFrame, CancelFrame)):
|
||||
await self.push_frame(frame, direction) # Push the frame down the pipeline
|
||||
if self._idle_task:
|
||||
await self._stop() # Stop the idle task, if it exists
|
||||
return
|
||||
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
# We shouldn't call the idle callback if the user or the bot are speaking
|
||||
if isinstance(frame, UserStartedSpeakingFrame):
|
||||
self._interrupted = True
|
||||
self._idle_event.set()
|
||||
elif isinstance(frame, UserStoppedSpeakingFrame):
|
||||
self._interrupted = False
|
||||
self._idle_event.set()
|
||||
elif isinstance(frame, BotSpeakingFrame):
|
||||
self._idle_event.set()
|
||||
# Start monitoring on first conversation activity
|
||||
if not self._conversation_started and isinstance(
|
||||
frame, (UserStartedSpeakingFrame, BotSpeakingFrame)
|
||||
):
|
||||
self._conversation_started = True
|
||||
self._create_idle_task()
|
||||
|
||||
# Only process these events if conversation has started
|
||||
if self._conversation_started:
|
||||
# We shouldn't call the idle callback if the user or the bot are speaking
|
||||
if isinstance(frame, UserStartedSpeakingFrame):
|
||||
self._retry_count = 0
|
||||
self._interrupted = True
|
||||
self._idle_event.set()
|
||||
elif isinstance(frame, UserStoppedSpeakingFrame):
|
||||
self._interrupted = False
|
||||
self._idle_event.set()
|
||||
elif isinstance(frame, BotSpeakingFrame):
|
||||
self._idle_event.set()
|
||||
|
||||
async def cleanup(self):
|
||||
await self._stop()
|
||||
|
||||
def _create_idle_task(self):
|
||||
self._idle_event = asyncio.Event()
|
||||
self._idle_task = self.get_event_loop().create_task(self._idle_task_handler())
|
||||
"""Cleans up resources when processor is shutting down."""
|
||||
if self._idle_task: # Only stop if task exists
|
||||
await self._stop()
|
||||
|
||||
async def _idle_task_handler(self):
|
||||
"""Monitors for idle timeout and triggers callbacks.
|
||||
|
||||
Runs in a loop until cancelled or callback indicates completion.
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout)
|
||||
except asyncio.TimeoutError:
|
||||
if not self._interrupted:
|
||||
await self._callback(self)
|
||||
self._retry_count += 1
|
||||
should_continue = await self._callback(self, self._retry_count)
|
||||
if not should_continue:
|
||||
await self._stop()
|
||||
break
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
finally:
|
||||
|
||||
Reference in New Issue
Block a user