Compare commits

...

1 Commits

Author SHA1 Message Date
Aleix Conchillo Flaqué
41695806e8 process SystemFrames in a queue as high priority frames 2025-05-07 17:05:35 -07:00
3 changed files with 161 additions and 96 deletions

View File

@@ -77,8 +77,8 @@ class Frame:
@dataclass @dataclass
class SystemFrame(Frame): class SystemFrame(Frame):
"""System frames are frames that are not internally queued by any of the """A frame that takes higher priority than other frames. System frames are
frame processors and should be processed immediately. handled in order and are not affected by user interruptions.
""" """
@@ -87,8 +87,9 @@ class SystemFrame(Frame):
@dataclass @dataclass
class DataFrame(Frame): class DataFrame(Frame):
"""Data frames are frames that will be processed in order and usually """A frame that is processed in order and usually contains data such as LLM
contain data such as LLM context, text, audio or images. context, text, audio or images. Data frames are cancelled by user
interruptions.
""" """
@@ -97,9 +98,9 @@ class DataFrame(Frame):
@dataclass @dataclass
class ControlFrame(Frame): class ControlFrame(Frame):
"""Control frames are frames that, similar to data frames, will be processed """A frame that, as data frames, is processed in order and usually contains
in order and usually contain control information such as frames to update control information such as update settings or to end the pipeline after
settings or to end the pipeline. everything is flushed. Control frames are cancelled by user interruptions.
""" """
@@ -690,7 +691,7 @@ class FunctionCallResultFrame(SystemFrame):
@dataclass @dataclass
class STTMuteFrame(SystemFrame): class STTMuteFrame(SystemFrame):
"""System frame to mute/unmute the STT service.""" """A frame to mute/unmute the STT service."""
mute: bool mute: bool
@@ -796,7 +797,7 @@ class EndFrame(ControlFrame):
should be shut down. If the transport receives this frame, it will stop should be shut down. If the transport receives this frame, it will stop
sending frames to its output channel(s) and close all its threads. Note, sending frames to its output channel(s) and close all its threads. Note,
that this is a control frame, which means it will received in the order it that this is a control frame, which means it will received in the order it
was sent (unline system frames). was sent.
""" """

View File

@@ -5,6 +5,7 @@
# #
import asyncio import asyncio
from dataclasses import dataclass
from enum import Enum from enum import Enum
from typing import Awaitable, Callable, Coroutine, Optional from typing import Awaitable, Callable, Coroutine, Optional
@@ -32,6 +33,51 @@ class FrameDirection(Enum):
UPSTREAM = 2 UPSTREAM = 2
@dataclass
class FrameProcessorQueueItem:
frame: Frame
direction: FrameDirection
callback: Optional[Callable[["FrameProcessor", Frame, FrameDirection], Awaitable[None]]]
class FrameProcessorQueue:
def __init__(self):
self._queue = asyncio.Queue()
self._urgent_queue = asyncio.Queue()
self._event = asyncio.Event()
async def put(self, item: FrameProcessorQueueItem):
if isinstance(item.frame, SystemFrame):
await self._urgent_queue.put(item)
else:
await self._queue.put(item)
self._event.set()
async def get(self) -> FrameProcessorQueueItem:
# Wait for an item in any of the queues.
await self._event.wait()
if self._urgent_queue.empty():
item = await self._queue.get()
self._queue.task_done()
else:
item = await self._urgent_queue.get()
self._urgent_queue.task_done()
# Clear the event only if all queues are empty.
if self._queue.empty() and self._urgent_queue.empty():
self._event.clear()
return item
def clear(self):
self._queue = asyncio.Queue()
# Clear the event only if all queues are empty.
if self._queue.empty() and self._urgent_queue.empty():
self._event.clear()
class FrameProcessor(BaseObject): class FrameProcessor(BaseObject):
def __init__( def __init__(
self, self,
@@ -69,18 +115,21 @@ class FrameProcessor(BaseObject):
self._metrics = metrics or FrameProcessorMetrics() self._metrics = metrics or FrameProcessorMetrics()
self._metrics.set_processor_name(self.name) self._metrics.set_processor_name(self.name)
# Processors have an input queue. The input queue will be processed # Processors receive frames on a streaming queue which are then
# immediately (default) or it will block if `pause_processing_frames()` # processed by a streaming task. This guarantees that all frames are
# processed in the same task. By default, the streaming queue is
# processed immediately but it may block if `pause_processing_frames()`
# is called. To resume processing frames we need to call # is called. To resume processing frames we need to call
# `resume_processing_frames()` which will wake up the event. # `resume_processing_frames()` which will wake up the event.
self.__should_block_frames = False self.__should_block_frames = False
self.__input_event = asyncio.Event() self.__streaming_event = asyncio.Event()
self.__input_frame_task: Optional[asyncio.Task] = None self.__streaming_queue = FrameProcessorQueue()
self.__streaming_frame_task: Optional[asyncio.Task] = None
# Every processor in Pipecat should only output frames from a single self.__process_queue = asyncio.Queue()
# task. This avoid problems like audio overlapping. System frames are the self.__process_task: Optional[asyncio.Task] = None
# exception to this rule. This create this task. self.__process_urgent_queue = asyncio.Queue()
self.__push_frame_task: Optional[asyncio.Task] = None self.__process_urgent_task: Optional[asyncio.Task] = None
@property @property
def id(self) -> int: def id(self) -> int:
@@ -170,7 +219,8 @@ class FrameProcessor(BaseObject):
async def cleanup(self): async def cleanup(self):
await super().cleanup() await super().cleanup()
await self.__cancel_input_task() await self.__cancel_input_task()
await self.__cancel_push_task() await self.__cancel_process_task()
await self.__cancel_process_urgent_task()
def link(self, processor: "FrameProcessor"): def link(self, processor: "FrameProcessor"):
self._next = processor self._next = processor
@@ -215,7 +265,7 @@ class FrameProcessor(BaseObject):
await self.process_frame(frame, direction) await self.process_frame(frame, direction)
else: else:
# We queue everything else. # We queue everything else.
await self.__input_queue.put((frame, direction, callback)) await self.__streaming_queue.put(FrameProcessorQueueItem(frame, direction, callback))
async def pause_processing_frames(self): async def pause_processing_frames(self):
logger.trace(f"{self}: pausing frame processing") logger.trace(f"{self}: pausing frame processing")
@@ -223,7 +273,7 @@ class FrameProcessor(BaseObject):
async def resume_processing_frames(self): async def resume_processing_frames(self):
logger.trace(f"{self}: resuming frame processing") logger.trace(f"{self}: resuming frame processing")
self.__input_event.set() self.__streaming_event.set()
async def process_frame(self, frame: Frame, direction: FrameDirection): async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, StartFrame): if isinstance(frame, StartFrame):
@@ -250,47 +300,6 @@ class FrameProcessor(BaseObject):
if not self._check_ready(frame): if not self._check_ready(frame):
return return
if isinstance(frame, SystemFrame):
await self.__internal_push_frame(frame, direction)
else:
await self.__push_queue.put((frame, direction))
async def __start(self, frame: StartFrame):
self.__create_input_task()
self.__create_push_task()
async def __cancel(self, frame: CancelFrame):
self._cancelling = True
await self.__cancel_input_task()
await self.__cancel_push_task()
#
# Handle interruptions
#
async def _start_interruption(self):
try:
# Cancel the push frame task. This will stop pushing frames downstream.
await self.__cancel_push_task()
# Cancel the input task. This will stop processing queued frames.
await self.__cancel_input_task()
except Exception as e:
logger.exception(f"Uncaught exception in {self}: {e}")
await self.push_error(ErrorFrame(str(e)))
raise
# Create a new input queue and task.
self.__create_input_task()
# Create a new output queue and task.
self.__create_push_task()
async def _stop_interruption(self):
# Nothing to do right now.
pass
async def __internal_push_frame(self, frame: Frame, direction: FrameDirection):
try: try:
timestamp = self._clock.get_time() if self._clock else 0 timestamp = self._clock.get_time() if self._clock else 0
if direction == FrameDirection.DOWNSTREAM and self._next: if direction == FrameDirection.DOWNSTREAM and self._next:
@@ -323,6 +332,49 @@ class FrameProcessor(BaseObject):
await self.push_error(ErrorFrame(str(e))) await self.push_error(ErrorFrame(str(e)))
raise raise
async def __start(self, frame: StartFrame):
self.__create_process_task()
self.__create_process_urgent_task()
self.__create_input_task()
async def __cancel(self, frame: CancelFrame):
self._cancelling = True
await self.__cancel_input_task()
await self.__cancel_process_task()
await self.__cancel_process_urgent_task()
#
# Handle interruptions
#
async def _start_interruption(self):
try:
# Cancel the streaming task.
await self.__cancel_input_task()
# Cancel the task processing frames. We do not cancel the task that
# is processing urgent frames.
await self.__cancel_process_task()
# If there's an interruption we should not block frames anymore.
self.__should_block_frames = False
# Clear the streaming queue, since we don't want to process its
# frame anymore (except system and urgent frames).
self.__streaming_queue.clear()
except Exception as e:
logger.exception(f"Uncaught exception in {self}: {e}")
await self.push_error(ErrorFrame(str(e)))
raise
# Create a new tasks.
self.__create_process_task()
self.__create_input_task()
async def _stop_interruption(self):
# Nothing to do right now.
pass
def _check_ready(self, frame: Frame): def _check_ready(self, frame: Frame):
# If we are trying to push a frame but we still have no clock, it means # If we are trying to push a frame but we still have no clock, it means
# we didn't process a StartFrame. # we didn't process a StartFrame.
@@ -334,49 +386,60 @@ class FrameProcessor(BaseObject):
return True return True
def __create_input_task(self): def __create_input_task(self):
if not self.__input_frame_task: if not self.__streaming_frame_task:
self.__should_block_frames = False self.__streaming_frame_task = self.create_task(self.__streaming_frame_task_handler())
self.__input_event.clear()
self.__input_queue = asyncio.Queue()
self.__input_frame_task = self.create_task(self.__input_frame_task_handler())
async def __cancel_input_task(self): async def __cancel_input_task(self):
if self.__input_frame_task: if self.__streaming_frame_task:
await self.cancel_task(self.__input_frame_task) await self.cancel_task(self.__streaming_frame_task)
self.__input_frame_task = None self.__streaming_frame_task = None
async def __input_frame_task_handler(self): def __create_process_task(self):
if not self.__process_task:
self.__process_queue = asyncio.Queue()
self.__process_task = self.create_task(
self.__process_task_handler(self.__process_queue)
)
async def __cancel_process_task(self):
if self.__process_task:
await self.cancel_task(self.__process_task)
self.__process_task = None
def __create_process_urgent_task(self):
if not self.__process_urgent_task:
self.__process_urgent_task = self.create_task(
self.__process_task_handler(self.__process_urgent_queue)
)
async def __cancel_process_urgent_task(self):
if self.__process_urgent_task:
await self.cancel_task(self.__process_urgent_task)
self.__process_urgent_task = None
async def __streaming_frame_task_handler(self):
while True: while True:
if self.__should_block_frames: if self.__should_block_frames:
logger.trace(f"{self}: frame processing paused") logger.trace(f"{self}: frame processing paused")
await self.__input_event.wait() await self.__streaming_event.wait()
self.__input_event.clear() self.__streaming_event.clear()
self.__should_block_frames = False self.__should_block_frames = False
logger.trace(f"{self}: frame processing resumed") logger.trace(f"{self}: frame processing resumed")
(frame, direction, callback) = await self.__input_queue.get() item = await self.__streaming_queue.get()
if isinstance(item.frame, SystemFrame):
await self.__process_urgent_queue.put(item)
else:
await self.__process_queue.put(item)
async def __process_task_handler(self, queue: asyncio.Queue):
while True:
item = await queue.get()
# Process the frame. # Process the frame.
await self.process_frame(frame, direction) await self.process_frame(item.frame, item.direction)
# If this frame has an associated callback, call it now. # If this frame has an associated callback, call it now.
if callback: if item.callback:
await callback(self, frame, direction) await item.callback(self, item.frame, item.direction)
self.__input_queue.task_done()
def __create_push_task(self):
if not self.__push_frame_task:
self.__push_queue = asyncio.Queue()
self.__push_frame_task = self.create_task(self.__push_frame_task_handler())
async def __cancel_push_task(self):
if self.__push_frame_task:
await self.cancel_task(self.__push_frame_task)
self.__push_frame_task = None
async def __push_frame_task_handler(self):
while True:
(frame, direction) = await self.__push_queue.get()
await self.__internal_push_frame(frame, direction)
self.__push_queue.task_done()

View File

@@ -9,6 +9,7 @@ import unittest
from pipecat.frames.frames import ( from pipecat.frames.frames import (
EndFrame, EndFrame,
Frame, Frame,
StartInterruptionFrame,
TextFrame, TextFrame,
TranscriptionFrame, TranscriptionFrame,
UserStartedSpeakingFrame, UserStartedSpeakingFrame,
@@ -57,8 +58,8 @@ class TestFrameFilter(unittest.IsolatedAsyncioTestCase):
async def test_system_frame(self): async def test_system_frame(self):
filter = FrameFilter(types=()) filter = FrameFilter(types=())
frames_to_send = [UserStartedSpeakingFrame()] frames_to_send = [StartInterruptionFrame()]
expected_down_frames = [UserStartedSpeakingFrame] expected_down_frames = [StartInterruptionFrame]
await run_test( await run_test(
filter, filter,
frames_to_send=frames_to_send, frames_to_send=frames_to_send,