|
|
|
|
@@ -5,6 +5,7 @@
|
|
|
|
|
#
|
|
|
|
|
|
|
|
|
|
import asyncio
|
|
|
|
|
from dataclasses import dataclass
|
|
|
|
|
from enum import Enum
|
|
|
|
|
from typing import Awaitable, Callable, Coroutine, Optional
|
|
|
|
|
|
|
|
|
|
@@ -32,6 +33,51 @@ class FrameDirection(Enum):
|
|
|
|
|
UPSTREAM = 2
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class FrameProcessorQueueItem:
|
|
|
|
|
frame: Frame
|
|
|
|
|
direction: FrameDirection
|
|
|
|
|
callback: Optional[Callable[["FrameProcessor", Frame, FrameDirection], Awaitable[None]]]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class FrameProcessorQueue:
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self._queue = asyncio.Queue()
|
|
|
|
|
self._urgent_queue = asyncio.Queue()
|
|
|
|
|
self._event = asyncio.Event()
|
|
|
|
|
|
|
|
|
|
async def put(self, item: FrameProcessorQueueItem):
|
|
|
|
|
if isinstance(item.frame, SystemFrame):
|
|
|
|
|
await self._urgent_queue.put(item)
|
|
|
|
|
else:
|
|
|
|
|
await self._queue.put(item)
|
|
|
|
|
self._event.set()
|
|
|
|
|
|
|
|
|
|
async def get(self) -> FrameProcessorQueueItem:
|
|
|
|
|
# Wait for an item in any of the queues.
|
|
|
|
|
await self._event.wait()
|
|
|
|
|
|
|
|
|
|
if self._urgent_queue.empty():
|
|
|
|
|
item = await self._queue.get()
|
|
|
|
|
self._queue.task_done()
|
|
|
|
|
else:
|
|
|
|
|
item = await self._urgent_queue.get()
|
|
|
|
|
self._urgent_queue.task_done()
|
|
|
|
|
|
|
|
|
|
# Clear the event only if all queues are empty.
|
|
|
|
|
if self._queue.empty() and self._urgent_queue.empty():
|
|
|
|
|
self._event.clear()
|
|
|
|
|
|
|
|
|
|
return item
|
|
|
|
|
|
|
|
|
|
def clear(self):
|
|
|
|
|
self._queue = asyncio.Queue()
|
|
|
|
|
|
|
|
|
|
# Clear the event only if all queues are empty.
|
|
|
|
|
if self._queue.empty() and self._urgent_queue.empty():
|
|
|
|
|
self._event.clear()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class FrameProcessor(BaseObject):
|
|
|
|
|
def __init__(
|
|
|
|
|
self,
|
|
|
|
|
@@ -69,18 +115,21 @@ class FrameProcessor(BaseObject):
|
|
|
|
|
self._metrics = metrics or FrameProcessorMetrics()
|
|
|
|
|
self._metrics.set_processor_name(self.name)
|
|
|
|
|
|
|
|
|
|
# Processors have an input queue. The input queue will be processed
|
|
|
|
|
# immediately (default) or it will block if `pause_processing_frames()`
|
|
|
|
|
# Processors receive frames on a streaming queue which are then
|
|
|
|
|
# processed by a streaming task. This guarantees that all frames are
|
|
|
|
|
# processed in the same task. By default, the streaming queue is
|
|
|
|
|
# processed immediately but it may block if `pause_processing_frames()`
|
|
|
|
|
# is called. To resume processing frames we need to call
|
|
|
|
|
# `resume_processing_frames()` which will wake up the event.
|
|
|
|
|
self.__should_block_frames = False
|
|
|
|
|
self.__input_event = asyncio.Event()
|
|
|
|
|
self.__input_frame_task: Optional[asyncio.Task] = None
|
|
|
|
|
self.__streaming_event = asyncio.Event()
|
|
|
|
|
self.__streaming_queue = FrameProcessorQueue()
|
|
|
|
|
self.__streaming_frame_task: Optional[asyncio.Task] = None
|
|
|
|
|
|
|
|
|
|
# Every processor in Pipecat should only output frames from a single
|
|
|
|
|
# task. This avoid problems like audio overlapping. System frames are the
|
|
|
|
|
# exception to this rule. This create this task.
|
|
|
|
|
self.__push_frame_task: Optional[asyncio.Task] = None
|
|
|
|
|
self.__process_queue = asyncio.Queue()
|
|
|
|
|
self.__process_task: Optional[asyncio.Task] = None
|
|
|
|
|
self.__process_urgent_queue = asyncio.Queue()
|
|
|
|
|
self.__process_urgent_task: Optional[asyncio.Task] = None
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def id(self) -> int:
|
|
|
|
|
@@ -170,7 +219,8 @@ class FrameProcessor(BaseObject):
|
|
|
|
|
async def cleanup(self):
|
|
|
|
|
await super().cleanup()
|
|
|
|
|
await self.__cancel_input_task()
|
|
|
|
|
await self.__cancel_push_task()
|
|
|
|
|
await self.__cancel_process_task()
|
|
|
|
|
await self.__cancel_process_urgent_task()
|
|
|
|
|
|
|
|
|
|
def link(self, processor: "FrameProcessor"):
|
|
|
|
|
self._next = processor
|
|
|
|
|
@@ -215,7 +265,7 @@ class FrameProcessor(BaseObject):
|
|
|
|
|
await self.process_frame(frame, direction)
|
|
|
|
|
else:
|
|
|
|
|
# We queue everything else.
|
|
|
|
|
await self.__input_queue.put((frame, direction, callback))
|
|
|
|
|
await self.__streaming_queue.put(FrameProcessorQueueItem(frame, direction, callback))
|
|
|
|
|
|
|
|
|
|
async def pause_processing_frames(self):
|
|
|
|
|
logger.trace(f"{self}: pausing frame processing")
|
|
|
|
|
@@ -223,7 +273,7 @@ class FrameProcessor(BaseObject):
|
|
|
|
|
|
|
|
|
|
async def resume_processing_frames(self):
|
|
|
|
|
logger.trace(f"{self}: resuming frame processing")
|
|
|
|
|
self.__input_event.set()
|
|
|
|
|
self.__streaming_event.set()
|
|
|
|
|
|
|
|
|
|
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
|
|
|
|
if isinstance(frame, StartFrame):
|
|
|
|
|
@@ -250,47 +300,6 @@ class FrameProcessor(BaseObject):
|
|
|
|
|
if not self._check_ready(frame):
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if isinstance(frame, SystemFrame):
|
|
|
|
|
await self.__internal_push_frame(frame, direction)
|
|
|
|
|
else:
|
|
|
|
|
await self.__push_queue.put((frame, direction))
|
|
|
|
|
|
|
|
|
|
async def __start(self, frame: StartFrame):
|
|
|
|
|
self.__create_input_task()
|
|
|
|
|
self.__create_push_task()
|
|
|
|
|
|
|
|
|
|
async def __cancel(self, frame: CancelFrame):
|
|
|
|
|
self._cancelling = True
|
|
|
|
|
await self.__cancel_input_task()
|
|
|
|
|
await self.__cancel_push_task()
|
|
|
|
|
|
|
|
|
|
#
|
|
|
|
|
# Handle interruptions
|
|
|
|
|
#
|
|
|
|
|
|
|
|
|
|
async def _start_interruption(self):
|
|
|
|
|
try:
|
|
|
|
|
# Cancel the push frame task. This will stop pushing frames downstream.
|
|
|
|
|
await self.__cancel_push_task()
|
|
|
|
|
|
|
|
|
|
# Cancel the input task. This will stop processing queued frames.
|
|
|
|
|
await self.__cancel_input_task()
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.exception(f"Uncaught exception in {self}: {e}")
|
|
|
|
|
await self.push_error(ErrorFrame(str(e)))
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
# Create a new input queue and task.
|
|
|
|
|
self.__create_input_task()
|
|
|
|
|
|
|
|
|
|
# Create a new output queue and task.
|
|
|
|
|
self.__create_push_task()
|
|
|
|
|
|
|
|
|
|
async def _stop_interruption(self):
|
|
|
|
|
# Nothing to do right now.
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
async def __internal_push_frame(self, frame: Frame, direction: FrameDirection):
|
|
|
|
|
try:
|
|
|
|
|
timestamp = self._clock.get_time() if self._clock else 0
|
|
|
|
|
if direction == FrameDirection.DOWNSTREAM and self._next:
|
|
|
|
|
@@ -323,6 +332,49 @@ class FrameProcessor(BaseObject):
|
|
|
|
|
await self.push_error(ErrorFrame(str(e)))
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
async def __start(self, frame: StartFrame):
|
|
|
|
|
self.__create_process_task()
|
|
|
|
|
self.__create_process_urgent_task()
|
|
|
|
|
self.__create_input_task()
|
|
|
|
|
|
|
|
|
|
async def __cancel(self, frame: CancelFrame):
|
|
|
|
|
self._cancelling = True
|
|
|
|
|
await self.__cancel_input_task()
|
|
|
|
|
await self.__cancel_process_task()
|
|
|
|
|
await self.__cancel_process_urgent_task()
|
|
|
|
|
|
|
|
|
|
#
|
|
|
|
|
# Handle interruptions
|
|
|
|
|
#
|
|
|
|
|
|
|
|
|
|
async def _start_interruption(self):
|
|
|
|
|
try:
|
|
|
|
|
# Cancel the streaming task.
|
|
|
|
|
await self.__cancel_input_task()
|
|
|
|
|
|
|
|
|
|
# Cancel the task processing frames. We do not cancel the task that
|
|
|
|
|
# is processing urgent frames.
|
|
|
|
|
await self.__cancel_process_task()
|
|
|
|
|
|
|
|
|
|
# If there's an interruption we should not block frames anymore.
|
|
|
|
|
self.__should_block_frames = False
|
|
|
|
|
|
|
|
|
|
# Clear the streaming queue, since we don't want to process its
|
|
|
|
|
# frame anymore (except system and urgent frames).
|
|
|
|
|
self.__streaming_queue.clear()
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.exception(f"Uncaught exception in {self}: {e}")
|
|
|
|
|
await self.push_error(ErrorFrame(str(e)))
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
# Create a new tasks.
|
|
|
|
|
self.__create_process_task()
|
|
|
|
|
self.__create_input_task()
|
|
|
|
|
|
|
|
|
|
async def _stop_interruption(self):
|
|
|
|
|
# Nothing to do right now.
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
def _check_ready(self, frame: Frame):
|
|
|
|
|
# If we are trying to push a frame but we still have no clock, it means
|
|
|
|
|
# we didn't process a StartFrame.
|
|
|
|
|
@@ -334,49 +386,60 @@ class FrameProcessor(BaseObject):
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
def __create_input_task(self):
|
|
|
|
|
if not self.__input_frame_task:
|
|
|
|
|
self.__should_block_frames = False
|
|
|
|
|
self.__input_event.clear()
|
|
|
|
|
self.__input_queue = asyncio.Queue()
|
|
|
|
|
self.__input_frame_task = self.create_task(self.__input_frame_task_handler())
|
|
|
|
|
if not self.__streaming_frame_task:
|
|
|
|
|
self.__streaming_frame_task = self.create_task(self.__streaming_frame_task_handler())
|
|
|
|
|
|
|
|
|
|
async def __cancel_input_task(self):
|
|
|
|
|
if self.__input_frame_task:
|
|
|
|
|
await self.cancel_task(self.__input_frame_task)
|
|
|
|
|
self.__input_frame_task = None
|
|
|
|
|
if self.__streaming_frame_task:
|
|
|
|
|
await self.cancel_task(self.__streaming_frame_task)
|
|
|
|
|
self.__streaming_frame_task = None
|
|
|
|
|
|
|
|
|
|
async def __input_frame_task_handler(self):
|
|
|
|
|
def __create_process_task(self):
|
|
|
|
|
if not self.__process_task:
|
|
|
|
|
self.__process_queue = asyncio.Queue()
|
|
|
|
|
self.__process_task = self.create_task(
|
|
|
|
|
self.__process_task_handler(self.__process_queue)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
async def __cancel_process_task(self):
|
|
|
|
|
if self.__process_task:
|
|
|
|
|
await self.cancel_task(self.__process_task)
|
|
|
|
|
self.__process_task = None
|
|
|
|
|
|
|
|
|
|
def __create_process_urgent_task(self):
|
|
|
|
|
if not self.__process_urgent_task:
|
|
|
|
|
self.__process_urgent_task = self.create_task(
|
|
|
|
|
self.__process_task_handler(self.__process_urgent_queue)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
async def __cancel_process_urgent_task(self):
|
|
|
|
|
if self.__process_urgent_task:
|
|
|
|
|
await self.cancel_task(self.__process_urgent_task)
|
|
|
|
|
self.__process_urgent_task = None
|
|
|
|
|
|
|
|
|
|
async def __streaming_frame_task_handler(self):
|
|
|
|
|
while True:
|
|
|
|
|
if self.__should_block_frames:
|
|
|
|
|
logger.trace(f"{self}: frame processing paused")
|
|
|
|
|
await self.__input_event.wait()
|
|
|
|
|
self.__input_event.clear()
|
|
|
|
|
await self.__streaming_event.wait()
|
|
|
|
|
self.__streaming_event.clear()
|
|
|
|
|
self.__should_block_frames = False
|
|
|
|
|
logger.trace(f"{self}: frame processing resumed")
|
|
|
|
|
|
|
|
|
|
(frame, direction, callback) = await self.__input_queue.get()
|
|
|
|
|
item = await self.__streaming_queue.get()
|
|
|
|
|
|
|
|
|
|
if isinstance(item.frame, SystemFrame):
|
|
|
|
|
await self.__process_urgent_queue.put(item)
|
|
|
|
|
else:
|
|
|
|
|
await self.__process_queue.put(item)
|
|
|
|
|
|
|
|
|
|
async def __process_task_handler(self, queue: asyncio.Queue):
|
|
|
|
|
while True:
|
|
|
|
|
item = await queue.get()
|
|
|
|
|
|
|
|
|
|
# Process the frame.
|
|
|
|
|
await self.process_frame(frame, direction)
|
|
|
|
|
await self.process_frame(item.frame, item.direction)
|
|
|
|
|
|
|
|
|
|
# If this frame has an associated callback, call it now.
|
|
|
|
|
if callback:
|
|
|
|
|
await callback(self, frame, direction)
|
|
|
|
|
|
|
|
|
|
self.__input_queue.task_done()
|
|
|
|
|
|
|
|
|
|
def __create_push_task(self):
|
|
|
|
|
if not self.__push_frame_task:
|
|
|
|
|
self.__push_queue = asyncio.Queue()
|
|
|
|
|
self.__push_frame_task = self.create_task(self.__push_frame_task_handler())
|
|
|
|
|
|
|
|
|
|
async def __cancel_push_task(self):
|
|
|
|
|
if self.__push_frame_task:
|
|
|
|
|
await self.cancel_task(self.__push_frame_task)
|
|
|
|
|
self.__push_frame_task = None
|
|
|
|
|
|
|
|
|
|
async def __push_frame_task_handler(self):
|
|
|
|
|
while True:
|
|
|
|
|
(frame, direction) = await self.__push_queue.get()
|
|
|
|
|
await self.__internal_push_frame(frame, direction)
|
|
|
|
|
self.__push_queue.task_done()
|
|
|
|
|
if item.callback:
|
|
|
|
|
await item.callback(self, item.frame, item.direction)
|
|
|
|
|
|