Compare commits

...

1 Commits

Author SHA1 Message Date
Aleix Conchillo Flaqué
41695806e8 process SystemFrames in a queue as high priority frames 2025-05-07 17:05:35 -07:00
3 changed files with 161 additions and 96 deletions

View File

@@ -77,8 +77,8 @@ class Frame:
@dataclass
class SystemFrame(Frame):
"""System frames are frames that are not internally queued by any of the
frame processors and should be processed immediately.
"""A frame that takes higher priority than other frames. System frames are
handled in order and are not affected by user interruptions.
"""
@@ -87,8 +87,9 @@ class SystemFrame(Frame):
@dataclass
class DataFrame(Frame):
"""Data frames are frames that will be processed in order and usually
contain data such as LLM context, text, audio or images.
"""A frame that is processed in order and usually contains data such as LLM
context, text, audio or images. Data frames are cancelled by user
interruptions.
"""
@@ -97,9 +98,9 @@ class DataFrame(Frame):
@dataclass
class ControlFrame(Frame):
"""Control frames are frames that, similar to data frames, will be processed
in order and usually contain control information such as frames to update
settings or to end the pipeline.
"""A frame that, as data frames, is processed in order and usually contains
control information such as update settings or to end the pipeline after
everything is flushed. Control frames are cancelled by user interruptions.
"""
@@ -690,7 +691,7 @@ class FunctionCallResultFrame(SystemFrame):
@dataclass
class STTMuteFrame(SystemFrame):
"""System frame to mute/unmute the STT service."""
"""A frame to mute/unmute the STT service."""
mute: bool
@@ -796,7 +797,7 @@ class EndFrame(ControlFrame):
should be shut down. If the transport receives this frame, it will stop
sending frames to its output channel(s) and close all its threads. Note,
that this is a control frame, which means it will received in the order it
was sent (unline system frames).
was sent.
"""

View File

@@ -5,6 +5,7 @@
#
import asyncio
from dataclasses import dataclass
from enum import Enum
from typing import Awaitable, Callable, Coroutine, Optional
@@ -32,6 +33,51 @@ class FrameDirection(Enum):
UPSTREAM = 2
@dataclass
class FrameProcessorQueueItem:
frame: Frame
direction: FrameDirection
callback: Optional[Callable[["FrameProcessor", Frame, FrameDirection], Awaitable[None]]]
class FrameProcessorQueue:
def __init__(self):
self._queue = asyncio.Queue()
self._urgent_queue = asyncio.Queue()
self._event = asyncio.Event()
async def put(self, item: FrameProcessorQueueItem):
if isinstance(item.frame, SystemFrame):
await self._urgent_queue.put(item)
else:
await self._queue.put(item)
self._event.set()
async def get(self) -> FrameProcessorQueueItem:
# Wait for an item in any of the queues.
await self._event.wait()
if self._urgent_queue.empty():
item = await self._queue.get()
self._queue.task_done()
else:
item = await self._urgent_queue.get()
self._urgent_queue.task_done()
# Clear the event only if all queues are empty.
if self._queue.empty() and self._urgent_queue.empty():
self._event.clear()
return item
def clear(self):
self._queue = asyncio.Queue()
# Clear the event only if all queues are empty.
if self._queue.empty() and self._urgent_queue.empty():
self._event.clear()
class FrameProcessor(BaseObject):
def __init__(
self,
@@ -69,18 +115,21 @@ class FrameProcessor(BaseObject):
self._metrics = metrics or FrameProcessorMetrics()
self._metrics.set_processor_name(self.name)
# Processors have an input queue. The input queue will be processed
# immediately (default) or it will block if `pause_processing_frames()`
# Processors receive frames on a streaming queue which are then
# processed by a streaming task. This guarantees that all frames are
# processed in the same task. By default, the streaming queue is
# processed immediately but it may block if `pause_processing_frames()`
# is called. To resume processing frames we need to call
# `resume_processing_frames()` which will wake up the event.
self.__should_block_frames = False
self.__input_event = asyncio.Event()
self.__input_frame_task: Optional[asyncio.Task] = None
self.__streaming_event = asyncio.Event()
self.__streaming_queue = FrameProcessorQueue()
self.__streaming_frame_task: Optional[asyncio.Task] = None
# Every processor in Pipecat should only output frames from a single
# task. This avoid problems like audio overlapping. System frames are the
# exception to this rule. This create this task.
self.__push_frame_task: Optional[asyncio.Task] = None
self.__process_queue = asyncio.Queue()
self.__process_task: Optional[asyncio.Task] = None
self.__process_urgent_queue = asyncio.Queue()
self.__process_urgent_task: Optional[asyncio.Task] = None
@property
def id(self) -> int:
@@ -170,7 +219,8 @@ class FrameProcessor(BaseObject):
async def cleanup(self):
await super().cleanup()
await self.__cancel_input_task()
await self.__cancel_push_task()
await self.__cancel_process_task()
await self.__cancel_process_urgent_task()
def link(self, processor: "FrameProcessor"):
self._next = processor
@@ -215,7 +265,7 @@ class FrameProcessor(BaseObject):
await self.process_frame(frame, direction)
else:
# We queue everything else.
await self.__input_queue.put((frame, direction, callback))
await self.__streaming_queue.put(FrameProcessorQueueItem(frame, direction, callback))
async def pause_processing_frames(self):
logger.trace(f"{self}: pausing frame processing")
@@ -223,7 +273,7 @@ class FrameProcessor(BaseObject):
async def resume_processing_frames(self):
logger.trace(f"{self}: resuming frame processing")
self.__input_event.set()
self.__streaming_event.set()
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, StartFrame):
@@ -250,47 +300,6 @@ class FrameProcessor(BaseObject):
if not self._check_ready(frame):
return
if isinstance(frame, SystemFrame):
await self.__internal_push_frame(frame, direction)
else:
await self.__push_queue.put((frame, direction))
async def __start(self, frame: StartFrame):
self.__create_input_task()
self.__create_push_task()
async def __cancel(self, frame: CancelFrame):
self._cancelling = True
await self.__cancel_input_task()
await self.__cancel_push_task()
#
# Handle interruptions
#
async def _start_interruption(self):
try:
# Cancel the push frame task. This will stop pushing frames downstream.
await self.__cancel_push_task()
# Cancel the input task. This will stop processing queued frames.
await self.__cancel_input_task()
except Exception as e:
logger.exception(f"Uncaught exception in {self}: {e}")
await self.push_error(ErrorFrame(str(e)))
raise
# Create a new input queue and task.
self.__create_input_task()
# Create a new output queue and task.
self.__create_push_task()
async def _stop_interruption(self):
# Nothing to do right now.
pass
async def __internal_push_frame(self, frame: Frame, direction: FrameDirection):
try:
timestamp = self._clock.get_time() if self._clock else 0
if direction == FrameDirection.DOWNSTREAM and self._next:
@@ -323,6 +332,49 @@ class FrameProcessor(BaseObject):
await self.push_error(ErrorFrame(str(e)))
raise
async def __start(self, frame: StartFrame):
self.__create_process_task()
self.__create_process_urgent_task()
self.__create_input_task()
async def __cancel(self, frame: CancelFrame):
self._cancelling = True
await self.__cancel_input_task()
await self.__cancel_process_task()
await self.__cancel_process_urgent_task()
#
# Handle interruptions
#
async def _start_interruption(self):
try:
# Cancel the streaming task.
await self.__cancel_input_task()
# Cancel the task processing frames. We do not cancel the task that
# is processing urgent frames.
await self.__cancel_process_task()
# If there's an interruption we should not block frames anymore.
self.__should_block_frames = False
# Clear the streaming queue, since we don't want to process its
# frame anymore (except system and urgent frames).
self.__streaming_queue.clear()
except Exception as e:
logger.exception(f"Uncaught exception in {self}: {e}")
await self.push_error(ErrorFrame(str(e)))
raise
# Create a new tasks.
self.__create_process_task()
self.__create_input_task()
async def _stop_interruption(self):
# Nothing to do right now.
pass
def _check_ready(self, frame: Frame):
# If we are trying to push a frame but we still have no clock, it means
# we didn't process a StartFrame.
@@ -334,49 +386,60 @@ class FrameProcessor(BaseObject):
return True
def __create_input_task(self):
if not self.__input_frame_task:
self.__should_block_frames = False
self.__input_event.clear()
self.__input_queue = asyncio.Queue()
self.__input_frame_task = self.create_task(self.__input_frame_task_handler())
if not self.__streaming_frame_task:
self.__streaming_frame_task = self.create_task(self.__streaming_frame_task_handler())
async def __cancel_input_task(self):
if self.__input_frame_task:
await self.cancel_task(self.__input_frame_task)
self.__input_frame_task = None
if self.__streaming_frame_task:
await self.cancel_task(self.__streaming_frame_task)
self.__streaming_frame_task = None
async def __input_frame_task_handler(self):
def __create_process_task(self):
if not self.__process_task:
self.__process_queue = asyncio.Queue()
self.__process_task = self.create_task(
self.__process_task_handler(self.__process_queue)
)
async def __cancel_process_task(self):
if self.__process_task:
await self.cancel_task(self.__process_task)
self.__process_task = None
def __create_process_urgent_task(self):
if not self.__process_urgent_task:
self.__process_urgent_task = self.create_task(
self.__process_task_handler(self.__process_urgent_queue)
)
async def __cancel_process_urgent_task(self):
if self.__process_urgent_task:
await self.cancel_task(self.__process_urgent_task)
self.__process_urgent_task = None
async def __streaming_frame_task_handler(self):
while True:
if self.__should_block_frames:
logger.trace(f"{self}: frame processing paused")
await self.__input_event.wait()
self.__input_event.clear()
await self.__streaming_event.wait()
self.__streaming_event.clear()
self.__should_block_frames = False
logger.trace(f"{self}: frame processing resumed")
(frame, direction, callback) = await self.__input_queue.get()
item = await self.__streaming_queue.get()
if isinstance(item.frame, SystemFrame):
await self.__process_urgent_queue.put(item)
else:
await self.__process_queue.put(item)
async def __process_task_handler(self, queue: asyncio.Queue):
while True:
item = await queue.get()
# Process the frame.
await self.process_frame(frame, direction)
await self.process_frame(item.frame, item.direction)
# If this frame has an associated callback, call it now.
if callback:
await callback(self, frame, direction)
self.__input_queue.task_done()
def __create_push_task(self):
if not self.__push_frame_task:
self.__push_queue = asyncio.Queue()
self.__push_frame_task = self.create_task(self.__push_frame_task_handler())
async def __cancel_push_task(self):
if self.__push_frame_task:
await self.cancel_task(self.__push_frame_task)
self.__push_frame_task = None
async def __push_frame_task_handler(self):
while True:
(frame, direction) = await self.__push_queue.get()
await self.__internal_push_frame(frame, direction)
self.__push_queue.task_done()
if item.callback:
await item.callback(self, item.frame, item.direction)

View File

@@ -9,6 +9,7 @@ import unittest
from pipecat.frames.frames import (
EndFrame,
Frame,
StartInterruptionFrame,
TextFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
@@ -57,8 +58,8 @@ class TestFrameFilter(unittest.IsolatedAsyncioTestCase):
async def test_system_frame(self):
filter = FrameFilter(types=())
frames_to_send = [UserStartedSpeakingFrame()]
expected_down_frames = [UserStartedSpeakingFrame]
frames_to_send = [StartInterruptionFrame()]
expected_down_frames = [StartInterruptionFrame]
await run_test(
filter,
frames_to_send=frames_to_send,