diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 72acf1a2a..4e98425c9 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -73,8 +73,24 @@ class Frame: @dataclass class SystemFrame(Frame): - """System frames are frames that are not internally queued by any of the - frame processors and should be processed immediately. + """A frame that bypasses the internal processing queues and is handled + immediately upon being pushed. Unlike other frames, system frames are not + queued by any frame processor. Instead, they are processed directly in the + task that pushed them, operating outside (i.e. out-of-band) the normal + streaming workflow of frame processors. System frames are not affected by + user interruptions. + + """ + + pass + + +@dataclass +class UrgentFrame(Frame): + """A high-priority frame that is internally queued but always processed + before other frames. Unlike system frames, urgent frames are processed from + the frame processors' streaming task but are prioritized over other + frames. Urgent frames are not affected by user interruptions. """ @@ -83,8 +99,9 @@ class SystemFrame(Frame): @dataclass class DataFrame(Frame): - """Data frames are frames that will be processed in order and usually - contain data such as LLM context, text, audio or images. + """A frame that is processed in order and usually contains data such as LLM + context, text, audio or images. They are processed from the frame + processors' streaming task. Data frames are cancelled by user interruptions. """ @@ -93,9 +110,10 @@ class DataFrame(Frame): @dataclass class ControlFrame(Frame): - """Control frames are frames that, similar to data frames, will be processed - in order and usually contain control information such as frames to update - settings or to end the pipeline. + """A frame that, as data frames, is processed in order and usually contains + control information such as update settings or to end the pipeline after + everything is flushed. They are processed from the frame processors' + streaming task. Control frames are cancelled by user interruptions. """ @@ -478,38 +496,6 @@ class HeartbeatFrame(SystemFrame): timestamp: int -@dataclass -class EndTaskFrame(SystemFrame): - """This is used to notify the pipeline task that the pipeline should be - closed nicely (flushing all the queued frames) by pushing an EndFrame - downstream. - - """ - - pass - - -@dataclass -class CancelTaskFrame(SystemFrame): - """This is used to notify the pipeline task that the pipeline should be - stopped immediately by pushing a CancelFrame downstream. - - """ - - pass - - -@dataclass -class StopTaskFrame(SystemFrame): - """This is used to notify the pipeline task that it should be stopped as - soon as possible (flushing all the queued frames) but that the pipeline - processors should be kept in a running state. - - """ - - pass - - @dataclass class StartInterruptionFrame(SystemFrame): """Emitted by VAD to indicate that a user has started speaking (i.e. is @@ -534,8 +520,56 @@ class StopInterruptionFrame(SystemFrame): pass +# +# Urgent frames +# + + @dataclass -class UserStartedSpeakingFrame(SystemFrame): +class EndTaskFrame(UrgentFrame): + """This is used to notify the pipeline task that the pipeline should be + closed nicely (flushing all the queued frames) by pushing an EndFrame + downstream. + + """ + + pass + + +@dataclass +class CancelTaskFrame(UrgentFrame): + """This is used to notify the pipeline task that the pipeline should be + stopped immediately by pushing a CancelFrame downstream. + + """ + + pass + + +@dataclass +class StopTaskFrame(UrgentFrame): + """This is used to notify the pipeline task that it should be stopped as + soon as possible (flushing all the queued frames) but that the pipeline + processors should be kept in a running state. + + """ + + pass + + +@dataclass +class BotInterruptionFrame(UrgentFrame): + """Emitted by when the bot should be interrupted. This will mainly cause the + same actions as if the user interrupted except that the + UserStartedSpeakingFrame and UserStoppedSpeakingFrame won't be generated. + + """ + + pass + + +@dataclass +class UserStartedSpeakingFrame(UrgentFrame): """Emitted by VAD to indicate that a user has started speaking. This can be used for interruptions or other times when detecting that someone is speaking is more important than knowing what they're saying (as you will @@ -547,57 +581,28 @@ class UserStartedSpeakingFrame(SystemFrame): @dataclass -class UserStoppedSpeakingFrame(SystemFrame): +class UserStoppedSpeakingFrame(UrgentFrame): """Emitted by the VAD to indicate that a user stopped speaking.""" emulated: bool = False @dataclass -class EmulateUserStartedSpeakingFrame(SystemFrame): - """Emitted by internal processors upstream to emulate VAD behavior when a - user starts speaking. - """ - - pass - - -@dataclass -class EmulateUserStoppedSpeakingFrame(SystemFrame): - """Emitted by internal processors upstream to emulate VAD behavior when a - user stops speaking. - """ - - pass - - -@dataclass -class BotInterruptionFrame(SystemFrame): - """Emitted by when the bot should be interrupted. This will mainly cause the - same actions as if the user interrupted except that the - UserStartedSpeakingFrame and UserStoppedSpeakingFrame won't be generated. - - """ - - pass - - -@dataclass -class BotStartedSpeakingFrame(SystemFrame): +class BotStartedSpeakingFrame(UrgentFrame): """Emitted upstream by transport outputs to indicate the bot started speaking.""" pass @dataclass -class BotStoppedSpeakingFrame(SystemFrame): +class BotStoppedSpeakingFrame(UrgentFrame): """Emitted upstream by transport outputs to indicate the bot stopped speaking.""" pass @dataclass -class BotSpeakingFrame(SystemFrame): +class BotSpeakingFrame(UrgentFrame): """Emitted upstream by transport outputs while the bot is still speaking. This can be used, for example, to detect when a user is idle. That is, while the bot is speaking we don't want to trigger any user idle timeout @@ -609,14 +614,32 @@ class BotSpeakingFrame(SystemFrame): @dataclass -class MetricsFrame(SystemFrame): +class EmulateUserStartedSpeakingFrame(UrgentFrame): + """Emitted by internal processors upstream to emulate VAD behavior when a + user starts speaking. + """ + + pass + + +@dataclass +class EmulateUserStoppedSpeakingFrame(UrgentFrame): + """Emitted by internal processors upstream to emulate VAD behavior when a + user stops speaking. + """ + + pass + + +@dataclass +class MetricsFrame(UrgentFrame): """Emitted by processor that can compute metrics like latencies.""" data: List[MetricsData] @dataclass -class FunctionCallInProgressFrame(SystemFrame): +class FunctionCallInProgressFrame(UrgentFrame): """A frame signaling that a function call is in progress.""" function_name: str @@ -626,7 +649,7 @@ class FunctionCallInProgressFrame(SystemFrame): @dataclass -class FunctionCallCancelFrame(SystemFrame): +class FunctionCallCancelFrame(UrgentFrame): """A frame to signal a function call has been cancelled.""" function_name: str @@ -642,7 +665,7 @@ class FunctionCallResultProperties: @dataclass -class FunctionCallResultFrame(SystemFrame): +class FunctionCallResultFrame(UrgentFrame): """A frame containing the result of an LLM function (tool) call.""" function_name: str @@ -653,14 +676,14 @@ class FunctionCallResultFrame(SystemFrame): @dataclass -class STTMuteFrame(SystemFrame): - """System frame to mute/unmute the STT service.""" +class STTMuteFrame(UrgentFrame): + """A frame to mute/unmute the STT service.""" mute: bool @dataclass -class TransportMessageUrgentFrame(SystemFrame): +class TransportMessageUrgentFrame(UrgentFrame): message: Any def __str__(self): @@ -668,7 +691,7 @@ class TransportMessageUrgentFrame(SystemFrame): @dataclass -class UserImageRequestFrame(SystemFrame): +class UserImageRequestFrame(UrgentFrame): """A frame to request an image from the given user. The frame might be generated by a function call in which case the corresponding fields will be properly set. @@ -685,7 +708,7 @@ class UserImageRequestFrame(SystemFrame): @dataclass -class InputAudioRawFrame(SystemFrame, AudioRawFrame): +class InputAudioRawFrame(UrgentFrame, AudioRawFrame): """A chunk of audio usually coming from an input transport.""" def __post_init__(self): @@ -698,7 +721,7 @@ class InputAudioRawFrame(SystemFrame, AudioRawFrame): @dataclass -class InputImageRawFrame(SystemFrame, ImageRawFrame): +class InputImageRawFrame(UrgentFrame, ImageRawFrame): """An image usually coming from an input transport.""" def __str__(self): @@ -740,7 +763,7 @@ class EndFrame(ControlFrame): should be shut down. If the transport receives this frame, it will stop sending frames to its output channel(s) and close all its threads. Note, that this is a control frame, which means it will received in the order it - was sent (unline system frames). + was sent. """ diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 590698e7f..a076b34ff 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -5,6 +5,7 @@ # import asyncio +from dataclasses import dataclass from enum import Enum from typing import Awaitable, Callable, Coroutine, Optional @@ -19,6 +20,7 @@ from pipecat.frames.frames import ( StartInterruptionFrame, StopInterruptionFrame, SystemFrame, + UrgentFrame, ) from pipecat.metrics.metrics import LLMTokenUsage, MetricsData from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics @@ -31,6 +33,51 @@ class FrameDirection(Enum): UPSTREAM = 2 +@dataclass +class FrameProcessorQueueItem: + frame: Frame + direction: FrameDirection + callback: Optional[Callable[["FrameProcessor", Frame, FrameDirection], Awaitable[None]]] + + +class FrameProcessorQueue: + def __init__(self): + self._queue = asyncio.Queue() + self._urgent_queue = asyncio.Queue() + self._event = asyncio.Event() + + async def put(self, item: FrameProcessorQueueItem): + if isinstance(item.frame, UrgentFrame): + await self._urgent_queue.put(item) + else: + await self._queue.put(item) + self._event.set() + + async def get(self) -> FrameProcessorQueueItem: + # Wait for an item in any of the queues. + await self._event.wait() + + if self._urgent_queue.empty(): + item = await self._queue.get() + self._queue.task_done() + else: + item = await self._urgent_queue.get() + self._urgent_queue.task_done() + + # Clear the event only if all queues are empty. + if self._queue.empty() and self._urgent_queue.empty(): + self._event.clear() + + return item + + def clear(self): + self._queue = asyncio.Queue() + + # Clear the event only if all queues are empty. + if self._queue.empty() and self._urgent_queue.empty(): + self._event.clear() + + class FrameProcessor(BaseObject): def __init__( self, @@ -68,18 +115,21 @@ class FrameProcessor(BaseObject): self._metrics = metrics or FrameProcessorMetrics() self._metrics.set_processor_name(self.name) - # Processors have an input queue. The input queue will be processed - # immediately (default) or it will block if `pause_processing_frames()` + # Processors receive frames on a streaming queue which are then + # processed by a streaming task. This guarantees that all frames are + # processed in the same task. By default, the streaming queue is + # processed immediately but it may block if `pause_processing_frames()` # is called. To resume processing frames we need to call # `resume_processing_frames()` which will wake up the event. self.__should_block_frames = False - self.__input_event = asyncio.Event() - self.__input_frame_task: Optional[asyncio.Task] = None + self.__streaming_event = asyncio.Event() + self.__streaming_queue = FrameProcessorQueue() + self.__streaming_frame_task: Optional[asyncio.Task] = None - # Every processor in Pipecat should only output frames from a single - # task. This avoid problems like audio overlapping. System frames are the - # exception to this rule. This create this task. - self.__push_frame_task: Optional[asyncio.Task] = None + self.__process_queue = asyncio.Queue() + self.__process_task: Optional[asyncio.Task] = None + self.__process_urgent_queue = asyncio.Queue() + self.__process_urgent_task: Optional[asyncio.Task] = None @property def id(self) -> int: @@ -169,7 +219,8 @@ class FrameProcessor(BaseObject): async def cleanup(self): await super().cleanup() await self.__cancel_input_task() - await self.__cancel_push_task() + await self.__cancel_process_task() + await self.__cancel_process_urgent_task() def link(self, processor: "FrameProcessor"): self._next = processor @@ -214,7 +265,7 @@ class FrameProcessor(BaseObject): await self.process_frame(frame, direction) else: # We queue everything else. - await self.__input_queue.put((frame, direction, callback)) + await self.__streaming_queue.put(FrameProcessorQueueItem(frame, direction, callback)) async def pause_processing_frames(self): logger.trace(f"{self}: pausing frame processing") @@ -222,7 +273,7 @@ class FrameProcessor(BaseObject): async def resume_processing_frames(self): logger.trace(f"{self}: resuming frame processing") - self.__input_event.set() + self.__streaming_event.set() async def process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, StartFrame): @@ -249,47 +300,6 @@ class FrameProcessor(BaseObject): if not self._check_ready(frame): return - if isinstance(frame, SystemFrame): - await self.__internal_push_frame(frame, direction) - else: - await self.__push_queue.put((frame, direction)) - - async def __start(self, frame: StartFrame): - self.__create_input_task() - self.__create_push_task() - - async def __cancel(self, frame: CancelFrame): - self._cancelling = True - await self.__cancel_input_task() - await self.__cancel_push_task() - - # - # Handle interruptions - # - - async def _start_interruption(self): - try: - # Cancel the push frame task. This will stop pushing frames downstream. - await self.__cancel_push_task() - - # Cancel the input task. This will stop processing queued frames. - await self.__cancel_input_task() - except Exception as e: - logger.exception(f"Uncaught exception in {self}: {e}") - await self.push_error(ErrorFrame(str(e))) - raise - - # Create a new input queue and task. - self.__create_input_task() - - # Create a new output queue and task. - self.__create_push_task() - - async def _stop_interruption(self): - # Nothing to do right now. - pass - - async def __internal_push_frame(self, frame: Frame, direction: FrameDirection): try: timestamp = self._clock.get_time() if self._clock else 0 if direction == FrameDirection.DOWNSTREAM and self._next: @@ -311,6 +321,49 @@ class FrameProcessor(BaseObject): await self.push_error(ErrorFrame(str(e))) raise + async def __start(self, frame: StartFrame): + self.__create_process_task() + self.__create_process_urgent_task() + self.__create_input_task() + + async def __cancel(self, frame: CancelFrame): + self._cancelling = True + await self.__cancel_input_task() + await self.__cancel_process_task() + await self.__cancel_process_urgent_task() + + # + # Handle interruptions + # + + async def _start_interruption(self): + try: + # Cancel the streaming task. + await self.__cancel_input_task() + + # Cancel the task processing frames. We do not cancel the task that + # is processing urgent frames. + await self.__cancel_process_task() + + # If there's an interruption we should not block frames anymore. + self.__should_block_frames = False + + # Clear the streaming queue, since we don't want to process its + # frame anymore (except system and urgent frames). + self.__streaming_queue.clear() + except Exception as e: + logger.exception(f"Uncaught exception in {self}: {e}") + await self.push_error(ErrorFrame(str(e))) + raise + + # Create a new tasks. + self.__create_process_task() + self.__create_input_task() + + async def _stop_interruption(self): + # Nothing to do right now. + pass + def _check_ready(self, frame: Frame): # If we are trying to push a frame but we still have no clock, it means # we didn't process a StartFrame. @@ -322,49 +375,60 @@ class FrameProcessor(BaseObject): return True def __create_input_task(self): - if not self.__input_frame_task: - self.__should_block_frames = False - self.__input_event.clear() - self.__input_queue = asyncio.Queue() - self.__input_frame_task = self.create_task(self.__input_frame_task_handler()) + if not self.__streaming_frame_task: + self.__streaming_frame_task = self.create_task(self.__streaming_frame_task_handler()) async def __cancel_input_task(self): - if self.__input_frame_task: - await self.cancel_task(self.__input_frame_task) - self.__input_frame_task = None + if self.__streaming_frame_task: + await self.cancel_task(self.__streaming_frame_task) + self.__streaming_frame_task = None - async def __input_frame_task_handler(self): + def __create_process_task(self): + if not self.__process_task: + self.__process_queue = asyncio.Queue() + self.__process_task = self.create_task( + self.__process_task_handler(self.__process_queue) + ) + + async def __cancel_process_task(self): + if self.__process_task: + await self.cancel_task(self.__process_task) + self.__process_task = None + + def __create_process_urgent_task(self): + if not self.__process_urgent_task: + self.__process_urgent_task = self.create_task( + self.__process_task_handler(self.__process_urgent_queue) + ) + + async def __cancel_process_urgent_task(self): + if self.__process_urgent_task: + await self.cancel_task(self.__process_urgent_task) + self.__process_urgent_task = None + + async def __streaming_frame_task_handler(self): while True: if self.__should_block_frames: logger.trace(f"{self}: frame processing paused") - await self.__input_event.wait() - self.__input_event.clear() + await self.__streaming_event.wait() + self.__streaming_event.clear() self.__should_block_frames = False logger.trace(f"{self}: frame processing resumed") - (frame, direction, callback) = await self.__input_queue.get() + item = await self.__streaming_queue.get() + + if isinstance(item.frame, UrgentFrame): + await self.__process_urgent_queue.put(item) + else: + await self.__process_queue.put(item) + + async def __process_task_handler(self, queue: asyncio.Queue): + while True: + item = await queue.get() # Process the frame. - await self.process_frame(frame, direction) + await self.process_frame(item.frame, item.direction) # If this frame has an associated callback, call it now. - if callback: - await callback(self, frame, direction) - - self.__input_queue.task_done() - - def __create_push_task(self): - if not self.__push_frame_task: - self.__push_queue = asyncio.Queue() - self.__push_frame_task = self.create_task(self.__push_frame_task_handler()) - - async def __cancel_push_task(self): - if self.__push_frame_task: - await self.cancel_task(self.__push_frame_task) - self.__push_frame_task = None - - async def __push_frame_task_handler(self): - while True: - (frame, direction) = await self.__push_queue.get() - await self.__internal_push_frame(frame, direction) - self.__push_queue.task_done() + if item.callback: + await item.callback(self, item.frame, item.direction) diff --git a/tests/test_filters.py b/tests/test_filters.py index a47903232..a205b78cf 100644 --- a/tests/test_filters.py +++ b/tests/test_filters.py @@ -9,6 +9,7 @@ import unittest from pipecat.frames.frames import ( EndFrame, Frame, + StartInterruptionFrame, TextFrame, TranscriptionFrame, UserStartedSpeakingFrame, @@ -57,8 +58,8 @@ class TestFrameFilter(unittest.IsolatedAsyncioTestCase): async def test_system_frame(self): filter = FrameFilter(types=()) - frames_to_send = [UserStartedSpeakingFrame()] - expected_down_frames = [UserStartedSpeakingFrame] + frames_to_send = [StartInterruptionFrame()] + expected_down_frames = [StartInterruptionFrame] await run_test( filter, frames_to_send=frames_to_send,