Start UserIdleProcessor on speaking frame, fix bug not pushing EndFrame

This commit is contained in:
Mark Backman
2025-01-17 12:54:17 -05:00
parent 915e3bb3c7
commit 85f4663a41
2 changed files with 76 additions and 24 deletions

View File

@@ -56,6 +56,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Modified `UserIdleProcessor` to start monitoring only after first
conversation activity (`UserStartedSpeakingFrame` or
`BotStartedSpeakingFrame`) instead of immediately.
- Modified `OpenAIAssistantContextAggregator` to support controlled completions
and to emit context update callbacks via `FunctionCallResultProperties`.
@@ -79,6 +83,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed `UserIdleProcessor` not properly propagating `EndFrame`s through the
pipeline.
- Fixed an issue where websocket based TTS services could incorrectly terminate
their connection due to a retry counter not resetting.

View File

@@ -12,7 +12,6 @@ from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
StartFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
@@ -20,10 +19,24 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class UserIdleProcessor(FrameProcessor):
"""This class is useful to check if the user is interacting with the bot
within a given timeout. If the timeout is reached before any interaction
occurred the provided callback will be called.
"""Monitors user inactivity and triggers callbacks after timeout periods.
Starts monitoring only after the first conversation activity (UserStartedSpeaking
or BotSpeaking).
Args:
callback: Function to call when user is idle
timeout: Seconds to wait before considering user idle
**kwargs: Additional arguments passed to FrameProcessor
Example:
async def handle_idle(processor: "UserIdleProcessor") -> None:
await send_reminder("Are you still there?")
processor = UserIdleProcessor(
callback=handle_idle,
timeout=5.0
)
"""
def __init__(
@@ -37,40 +50,72 @@ class UserIdleProcessor(FrameProcessor):
self._callback = callback
self._timeout = timeout
self._interrupted = False
self._conversation_started = False
self._idle_task = None
self._idle_event = asyncio.Event()
def _create_idle_task(self):
"""Create the idle task if it hasn't been created yet."""
if self._idle_task is None:
self._idle_task = self.get_event_loop().create_task(self._idle_task_handler())
async def _stop(self):
self._idle_task.cancel()
await self._idle_task
"""Stops and cleans up the idle monitoring task."""
if self._idle_task is not None:
self._idle_task.cancel()
try:
await self._idle_task
except asyncio.CancelledError:
pass # Expected when task is cancelled
self._idle_task = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Processes incoming frames and manages idle monitoring state.
Args:
frame: The frame to process
direction: Direction of the frame flow
"""
await super().process_frame(frame, direction)
# Check for end frames before processing
if isinstance(frame, StartFrame):
self._create_idle_task()
elif isinstance(frame, (EndFrame, CancelFrame)):
await self._stop()
if isinstance(frame, (EndFrame, CancelFrame)):
await self.push_frame(frame, direction) # Push the frame down the pipeline
if self._idle_task:
await self._stop() # Stop the idle task, if it exists
return
await self.push_frame(frame, direction)
# We shouldn't call the idle callback if the user or the bot are speaking
if isinstance(frame, UserStartedSpeakingFrame):
self._interrupted = True
self._idle_event.set()
elif isinstance(frame, UserStoppedSpeakingFrame):
self._interrupted = False
self._idle_event.set()
elif isinstance(frame, BotSpeakingFrame):
self._idle_event.set()
# Start monitoring on first conversation activity
if not self._conversation_started and isinstance(
frame, (UserStartedSpeakingFrame, BotSpeakingFrame)
):
self._conversation_started = True
self._create_idle_task()
# Only process these events if conversation has started
if self._conversation_started:
# We shouldn't call the idle callback if the user or the bot are speaking
if isinstance(frame, UserStartedSpeakingFrame):
self._interrupted = True
self._idle_event.set()
elif isinstance(frame, UserStoppedSpeakingFrame):
self._interrupted = False
self._idle_event.set()
elif isinstance(frame, BotSpeakingFrame):
self._idle_event.set()
async def cleanup(self):
await self._stop()
def _create_idle_task(self):
self._idle_event = asyncio.Event()
self._idle_task = self.get_event_loop().create_task(self._idle_task_handler())
"""Cleans up resources when processor is shutting down."""
if self._idle_task: # Only stop if task exists
await self._stop()
async def _idle_task_handler(self):
"""Monitors for idle timeout and triggers callbacks.
Runs in a loop until cancelled.
"""
while True:
try:
await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout)