Compare commits

...

1 Commits

Author SHA1 Message Date
Aleix Conchillo Flaqué
3efc5fd4a9 introducing urgent frames 2025-04-01 15:25:41 -07:00
3 changed files with 259 additions and 171 deletions

View File

@@ -73,8 +73,24 @@ class Frame:
@dataclass
class SystemFrame(Frame):
"""System frames are frames that are not internally queued by any of the
frame processors and should be processed immediately.
"""A frame that bypasses the internal processing queues and is handled
immediately upon being pushed. Unlike other frames, system frames are not
queued by any frame processor. Instead, they are processed directly in the
task that pushed them, operating outside (i.e. out-of-band) the normal
streaming workflow of frame processors. System frames are not affected by
user interruptions.
"""
pass
@dataclass
class UrgentFrame(Frame):
"""A high-priority frame that is internally queued but always processed
before other frames. Unlike system frames, urgent frames are processed from
the frame processors' streaming task but are prioritized over other
frames. Urgent frames are not affected by user interruptions.
"""
@@ -83,8 +99,9 @@ class SystemFrame(Frame):
@dataclass
class DataFrame(Frame):
"""Data frames are frames that will be processed in order and usually
contain data such as LLM context, text, audio or images.
"""A frame that is processed in order and usually contains data such as LLM
context, text, audio or images. They are processed from the frame
processors' streaming task. Data frames are cancelled by user interruptions.
"""
@@ -93,9 +110,10 @@ class DataFrame(Frame):
@dataclass
class ControlFrame(Frame):
"""Control frames are frames that, similar to data frames, will be processed
in order and usually contain control information such as frames to update
settings or to end the pipeline.
"""A frame that, as data frames, is processed in order and usually contains
control information such as update settings or to end the pipeline after
everything is flushed. They are processed from the frame processors'
streaming task. Control frames are cancelled by user interruptions.
"""
@@ -478,38 +496,6 @@ class HeartbeatFrame(SystemFrame):
timestamp: int
@dataclass
class EndTaskFrame(SystemFrame):
"""This is used to notify the pipeline task that the pipeline should be
closed nicely (flushing all the queued frames) by pushing an EndFrame
downstream.
"""
pass
@dataclass
class CancelTaskFrame(SystemFrame):
"""This is used to notify the pipeline task that the pipeline should be
stopped immediately by pushing a CancelFrame downstream.
"""
pass
@dataclass
class StopTaskFrame(SystemFrame):
"""This is used to notify the pipeline task that it should be stopped as
soon as possible (flushing all the queued frames) but that the pipeline
processors should be kept in a running state.
"""
pass
@dataclass
class StartInterruptionFrame(SystemFrame):
"""Emitted by VAD to indicate that a user has started speaking (i.e. is
@@ -534,8 +520,56 @@ class StopInterruptionFrame(SystemFrame):
pass
#
# Urgent frames
#
@dataclass
class UserStartedSpeakingFrame(SystemFrame):
class EndTaskFrame(UrgentFrame):
"""This is used to notify the pipeline task that the pipeline should be
closed nicely (flushing all the queued frames) by pushing an EndFrame
downstream.
"""
pass
@dataclass
class CancelTaskFrame(UrgentFrame):
"""This is used to notify the pipeline task that the pipeline should be
stopped immediately by pushing a CancelFrame downstream.
"""
pass
@dataclass
class StopTaskFrame(UrgentFrame):
"""This is used to notify the pipeline task that it should be stopped as
soon as possible (flushing all the queued frames) but that the pipeline
processors should be kept in a running state.
"""
pass
@dataclass
class BotInterruptionFrame(UrgentFrame):
"""Emitted by when the bot should be interrupted. This will mainly cause the
same actions as if the user interrupted except that the
UserStartedSpeakingFrame and UserStoppedSpeakingFrame won't be generated.
"""
pass
@dataclass
class UserStartedSpeakingFrame(UrgentFrame):
"""Emitted by VAD to indicate that a user has started speaking. This can be
used for interruptions or other times when detecting that someone is
speaking is more important than knowing what they're saying (as you will
@@ -547,57 +581,28 @@ class UserStartedSpeakingFrame(SystemFrame):
@dataclass
class UserStoppedSpeakingFrame(SystemFrame):
class UserStoppedSpeakingFrame(UrgentFrame):
"""Emitted by the VAD to indicate that a user stopped speaking."""
emulated: bool = False
@dataclass
class EmulateUserStartedSpeakingFrame(SystemFrame):
"""Emitted by internal processors upstream to emulate VAD behavior when a
user starts speaking.
"""
pass
@dataclass
class EmulateUserStoppedSpeakingFrame(SystemFrame):
"""Emitted by internal processors upstream to emulate VAD behavior when a
user stops speaking.
"""
pass
@dataclass
class BotInterruptionFrame(SystemFrame):
"""Emitted by when the bot should be interrupted. This will mainly cause the
same actions as if the user interrupted except that the
UserStartedSpeakingFrame and UserStoppedSpeakingFrame won't be generated.
"""
pass
@dataclass
class BotStartedSpeakingFrame(SystemFrame):
class BotStartedSpeakingFrame(UrgentFrame):
"""Emitted upstream by transport outputs to indicate the bot started speaking."""
pass
@dataclass
class BotStoppedSpeakingFrame(SystemFrame):
class BotStoppedSpeakingFrame(UrgentFrame):
"""Emitted upstream by transport outputs to indicate the bot stopped speaking."""
pass
@dataclass
class BotSpeakingFrame(SystemFrame):
class BotSpeakingFrame(UrgentFrame):
"""Emitted upstream by transport outputs while the bot is still
speaking. This can be used, for example, to detect when a user is idle. That
is, while the bot is speaking we don't want to trigger any user idle timeout
@@ -609,14 +614,32 @@ class BotSpeakingFrame(SystemFrame):
@dataclass
class MetricsFrame(SystemFrame):
class EmulateUserStartedSpeakingFrame(UrgentFrame):
"""Emitted by internal processors upstream to emulate VAD behavior when a
user starts speaking.
"""
pass
@dataclass
class EmulateUserStoppedSpeakingFrame(UrgentFrame):
"""Emitted by internal processors upstream to emulate VAD behavior when a
user stops speaking.
"""
pass
@dataclass
class MetricsFrame(UrgentFrame):
"""Emitted by processor that can compute metrics like latencies."""
data: List[MetricsData]
@dataclass
class FunctionCallInProgressFrame(SystemFrame):
class FunctionCallInProgressFrame(UrgentFrame):
"""A frame signaling that a function call is in progress."""
function_name: str
@@ -626,7 +649,7 @@ class FunctionCallInProgressFrame(SystemFrame):
@dataclass
class FunctionCallCancelFrame(SystemFrame):
class FunctionCallCancelFrame(UrgentFrame):
"""A frame to signal a function call has been cancelled."""
function_name: str
@@ -642,7 +665,7 @@ class FunctionCallResultProperties:
@dataclass
class FunctionCallResultFrame(SystemFrame):
class FunctionCallResultFrame(UrgentFrame):
"""A frame containing the result of an LLM function (tool) call."""
function_name: str
@@ -653,14 +676,14 @@ class FunctionCallResultFrame(SystemFrame):
@dataclass
class STTMuteFrame(SystemFrame):
"""System frame to mute/unmute the STT service."""
class STTMuteFrame(UrgentFrame):
"""A frame to mute/unmute the STT service."""
mute: bool
@dataclass
class TransportMessageUrgentFrame(SystemFrame):
class TransportMessageUrgentFrame(UrgentFrame):
message: Any
def __str__(self):
@@ -668,7 +691,7 @@ class TransportMessageUrgentFrame(SystemFrame):
@dataclass
class UserImageRequestFrame(SystemFrame):
class UserImageRequestFrame(UrgentFrame):
"""A frame to request an image from the given user. The frame might be
generated by a function call in which case the corresponding fields will be
properly set.
@@ -685,7 +708,7 @@ class UserImageRequestFrame(SystemFrame):
@dataclass
class InputAudioRawFrame(SystemFrame, AudioRawFrame):
class InputAudioRawFrame(UrgentFrame, AudioRawFrame):
"""A chunk of audio usually coming from an input transport."""
def __post_init__(self):
@@ -698,7 +721,7 @@ class InputAudioRawFrame(SystemFrame, AudioRawFrame):
@dataclass
class InputImageRawFrame(SystemFrame, ImageRawFrame):
class InputImageRawFrame(UrgentFrame, ImageRawFrame):
"""An image usually coming from an input transport."""
def __str__(self):
@@ -740,7 +763,7 @@ class EndFrame(ControlFrame):
should be shut down. If the transport receives this frame, it will stop
sending frames to its output channel(s) and close all its threads. Note,
that this is a control frame, which means it will received in the order it
was sent (unline system frames).
was sent.
"""

View File

@@ -5,6 +5,7 @@
#
import asyncio
from dataclasses import dataclass
from enum import Enum
from typing import Awaitable, Callable, Coroutine, Optional
@@ -19,6 +20,7 @@ from pipecat.frames.frames import (
StartInterruptionFrame,
StopInterruptionFrame,
SystemFrame,
UrgentFrame,
)
from pipecat.metrics.metrics import LLMTokenUsage, MetricsData
from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics
@@ -31,6 +33,51 @@ class FrameDirection(Enum):
UPSTREAM = 2
@dataclass
class FrameProcessorQueueItem:
frame: Frame
direction: FrameDirection
callback: Optional[Callable[["FrameProcessor", Frame, FrameDirection], Awaitable[None]]]
class FrameProcessorQueue:
def __init__(self):
self._queue = asyncio.Queue()
self._urgent_queue = asyncio.Queue()
self._event = asyncio.Event()
async def put(self, item: FrameProcessorQueueItem):
if isinstance(item.frame, UrgentFrame):
await self._urgent_queue.put(item)
else:
await self._queue.put(item)
self._event.set()
async def get(self) -> FrameProcessorQueueItem:
# Wait for an item in any of the queues.
await self._event.wait()
if self._urgent_queue.empty():
item = await self._queue.get()
self._queue.task_done()
else:
item = await self._urgent_queue.get()
self._urgent_queue.task_done()
# Clear the event only if all queues are empty.
if self._queue.empty() and self._urgent_queue.empty():
self._event.clear()
return item
def clear(self):
self._queue = asyncio.Queue()
# Clear the event only if all queues are empty.
if self._queue.empty() and self._urgent_queue.empty():
self._event.clear()
class FrameProcessor(BaseObject):
def __init__(
self,
@@ -68,18 +115,21 @@ class FrameProcessor(BaseObject):
self._metrics = metrics or FrameProcessorMetrics()
self._metrics.set_processor_name(self.name)
# Processors have an input queue. The input queue will be processed
# immediately (default) or it will block if `pause_processing_frames()`
# Processors receive frames on a streaming queue which are then
# processed by a streaming task. This guarantees that all frames are
# processed in the same task. By default, the streaming queue is
# processed immediately but it may block if `pause_processing_frames()`
# is called. To resume processing frames we need to call
# `resume_processing_frames()` which will wake up the event.
self.__should_block_frames = False
self.__input_event = asyncio.Event()
self.__input_frame_task: Optional[asyncio.Task] = None
self.__streaming_event = asyncio.Event()
self.__streaming_queue = FrameProcessorQueue()
self.__streaming_frame_task: Optional[asyncio.Task] = None
# Every processor in Pipecat should only output frames from a single
# task. This avoid problems like audio overlapping. System frames are the
# exception to this rule. This create this task.
self.__push_frame_task: Optional[asyncio.Task] = None
self.__process_queue = asyncio.Queue()
self.__process_task: Optional[asyncio.Task] = None
self.__process_urgent_queue = asyncio.Queue()
self.__process_urgent_task: Optional[asyncio.Task] = None
@property
def id(self) -> int:
@@ -169,7 +219,8 @@ class FrameProcessor(BaseObject):
async def cleanup(self):
await super().cleanup()
await self.__cancel_input_task()
await self.__cancel_push_task()
await self.__cancel_process_task()
await self.__cancel_process_urgent_task()
def link(self, processor: "FrameProcessor"):
self._next = processor
@@ -214,7 +265,7 @@ class FrameProcessor(BaseObject):
await self.process_frame(frame, direction)
else:
# We queue everything else.
await self.__input_queue.put((frame, direction, callback))
await self.__streaming_queue.put(FrameProcessorQueueItem(frame, direction, callback))
async def pause_processing_frames(self):
logger.trace(f"{self}: pausing frame processing")
@@ -222,7 +273,7 @@ class FrameProcessor(BaseObject):
async def resume_processing_frames(self):
logger.trace(f"{self}: resuming frame processing")
self.__input_event.set()
self.__streaming_event.set()
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, StartFrame):
@@ -249,47 +300,6 @@ class FrameProcessor(BaseObject):
if not self._check_ready(frame):
return
if isinstance(frame, SystemFrame):
await self.__internal_push_frame(frame, direction)
else:
await self.__push_queue.put((frame, direction))
async def __start(self, frame: StartFrame):
self.__create_input_task()
self.__create_push_task()
async def __cancel(self, frame: CancelFrame):
self._cancelling = True
await self.__cancel_input_task()
await self.__cancel_push_task()
#
# Handle interruptions
#
async def _start_interruption(self):
try:
# Cancel the push frame task. This will stop pushing frames downstream.
await self.__cancel_push_task()
# Cancel the input task. This will stop processing queued frames.
await self.__cancel_input_task()
except Exception as e:
logger.exception(f"Uncaught exception in {self}: {e}")
await self.push_error(ErrorFrame(str(e)))
raise
# Create a new input queue and task.
self.__create_input_task()
# Create a new output queue and task.
self.__create_push_task()
async def _stop_interruption(self):
# Nothing to do right now.
pass
async def __internal_push_frame(self, frame: Frame, direction: FrameDirection):
try:
timestamp = self._clock.get_time() if self._clock else 0
if direction == FrameDirection.DOWNSTREAM and self._next:
@@ -311,6 +321,49 @@ class FrameProcessor(BaseObject):
await self.push_error(ErrorFrame(str(e)))
raise
async def __start(self, frame: StartFrame):
self.__create_process_task()
self.__create_process_urgent_task()
self.__create_input_task()
async def __cancel(self, frame: CancelFrame):
self._cancelling = True
await self.__cancel_input_task()
await self.__cancel_process_task()
await self.__cancel_process_urgent_task()
#
# Handle interruptions
#
async def _start_interruption(self):
try:
# Cancel the streaming task.
await self.__cancel_input_task()
# Cancel the task processing frames. We do not cancel the task that
# is processing urgent frames.
await self.__cancel_process_task()
# If there's an interruption we should not block frames anymore.
self.__should_block_frames = False
# Clear the streaming queue, since we don't want to process its
# frame anymore (except system and urgent frames).
self.__streaming_queue.clear()
except Exception as e:
logger.exception(f"Uncaught exception in {self}: {e}")
await self.push_error(ErrorFrame(str(e)))
raise
# Create a new tasks.
self.__create_process_task()
self.__create_input_task()
async def _stop_interruption(self):
# Nothing to do right now.
pass
def _check_ready(self, frame: Frame):
# If we are trying to push a frame but we still have no clock, it means
# we didn't process a StartFrame.
@@ -322,49 +375,60 @@ class FrameProcessor(BaseObject):
return True
def __create_input_task(self):
if not self.__input_frame_task:
self.__should_block_frames = False
self.__input_event.clear()
self.__input_queue = asyncio.Queue()
self.__input_frame_task = self.create_task(self.__input_frame_task_handler())
if not self.__streaming_frame_task:
self.__streaming_frame_task = self.create_task(self.__streaming_frame_task_handler())
async def __cancel_input_task(self):
if self.__input_frame_task:
await self.cancel_task(self.__input_frame_task)
self.__input_frame_task = None
if self.__streaming_frame_task:
await self.cancel_task(self.__streaming_frame_task)
self.__streaming_frame_task = None
async def __input_frame_task_handler(self):
def __create_process_task(self):
if not self.__process_task:
self.__process_queue = asyncio.Queue()
self.__process_task = self.create_task(
self.__process_task_handler(self.__process_queue)
)
async def __cancel_process_task(self):
if self.__process_task:
await self.cancel_task(self.__process_task)
self.__process_task = None
def __create_process_urgent_task(self):
if not self.__process_urgent_task:
self.__process_urgent_task = self.create_task(
self.__process_task_handler(self.__process_urgent_queue)
)
async def __cancel_process_urgent_task(self):
if self.__process_urgent_task:
await self.cancel_task(self.__process_urgent_task)
self.__process_urgent_task = None
async def __streaming_frame_task_handler(self):
while True:
if self.__should_block_frames:
logger.trace(f"{self}: frame processing paused")
await self.__input_event.wait()
self.__input_event.clear()
await self.__streaming_event.wait()
self.__streaming_event.clear()
self.__should_block_frames = False
logger.trace(f"{self}: frame processing resumed")
(frame, direction, callback) = await self.__input_queue.get()
item = await self.__streaming_queue.get()
if isinstance(item.frame, UrgentFrame):
await self.__process_urgent_queue.put(item)
else:
await self.__process_queue.put(item)
async def __process_task_handler(self, queue: asyncio.Queue):
while True:
item = await queue.get()
# Process the frame.
await self.process_frame(frame, direction)
await self.process_frame(item.frame, item.direction)
# If this frame has an associated callback, call it now.
if callback:
await callback(self, frame, direction)
self.__input_queue.task_done()
def __create_push_task(self):
if not self.__push_frame_task:
self.__push_queue = asyncio.Queue()
self.__push_frame_task = self.create_task(self.__push_frame_task_handler())
async def __cancel_push_task(self):
if self.__push_frame_task:
await self.cancel_task(self.__push_frame_task)
self.__push_frame_task = None
async def __push_frame_task_handler(self):
while True:
(frame, direction) = await self.__push_queue.get()
await self.__internal_push_frame(frame, direction)
self.__push_queue.task_done()
if item.callback:
await item.callback(self, item.frame, item.direction)

View File

@@ -9,6 +9,7 @@ import unittest
from pipecat.frames.frames import (
EndFrame,
Frame,
StartInterruptionFrame,
TextFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
@@ -57,8 +58,8 @@ class TestFrameFilter(unittest.IsolatedAsyncioTestCase):
async def test_system_frame(self):
filter = FrameFilter(types=())
frames_to_send = [UserStartedSpeakingFrame()]
expected_down_frames = [UserStartedSpeakingFrame]
frames_to_send = [StartInterruptionFrame()]
expected_down_frames = [StartInterruptionFrame]
await run_test(
filter,
frames_to_send=frames_to_send,