Merge pull request #2358 from pipecat-ai/aleix/system-frames-queued

system frames are now queued
This commit is contained in:
Aleix Conchillo Flaqué
2025-08-05 15:09:52 -07:00
committed by GitHub
5 changed files with 233 additions and 73 deletions

View File

@@ -48,6 +48,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- System frames are now queued. Before, system frames could be generated from
any task and would not guarantee any order which was causing undesired
behavior. Also, it was possible to get into some rare recursion issues because
of the way system frames were executed (they were executed in-place, meaning
calling `push_frame()` would finish after the system frame traversed all the
pipeline). This makes system frames more deterministic.
- Changed the default model for both `ElevenLabsTTSService` and
`ElevenLabsHttpTTSService` to `eleven_turbo_v2_5`. The rationale for this
change is that the Turbo v2.5 model exhibits the most stable voice quality

View File

@@ -121,8 +121,8 @@ class Frame:
class SystemFrame(Frame):
"""System frame class for immediate processing.
System frames are frames that are not internally queued by any of the
frame processors and should be processed immediately.
A frame that takes higher priority than other frames. System frames are
handled in order and are not affected by user interruptions.
"""
pass
@@ -132,8 +132,9 @@ class SystemFrame(Frame):
class DataFrame(Frame):
"""Data frame class for processing data in order.
Data frames are frames that will be processed in order and usually
contain data such as LLM context, text, audio or images.
A frame that is processed in order and usually contains data such as LLM
context, text, audio or images. Data frames are cancelled by user
interruptions.
"""
pass
@@ -143,9 +144,11 @@ class DataFrame(Frame):
class ControlFrame(Frame):
"""Control frame class for processing control information in order.
Control frames are frames that, similar to data frames, will be processed
in order and usually contain control information such as frames to update
settings or to end the pipeline.
A frame that, similar to data frames, is processed in order and usually
contains control information such as update settings or to end the pipeline
after everything is flushed. Control frames are cancelled by user
interruptions.
"""
pass
@@ -1206,7 +1209,7 @@ class EndFrame(ControlFrame):
should be shut down. If the transport receives this frame, it will stop
sending frames to its output channel(s) and close all its threads. Note,
that this is a control frame, which means it will be received in the order it
was sent (unlike system frames).
was sent.
"""
pass

View File

@@ -459,15 +459,20 @@ class PipelineTask(BasePipelineTask):
# awaiting a task.
pass
finally:
# It's possibe that we get an asyncio.CancelledError from the
# outside, if so we need to make sure everything gets cancelled
# properly.
if cleanup_pipeline:
await self._cancel()
await self._cancel_tasks()
await self._cleanup(cleanup_pipeline)
if self._check_dangling_tasks:
self._print_dangling_tasks()
# We only cancel things cleanly if we know we are the ones
# cancelling. It's possibe that we get an asyncio.CancelledError
# from the outside, in which case it is very likely other tasks have
# been already cancelled (e.g. when python is shutting down) so we
# can't assume things are being cancelled nicely.
if self._cancelled:
await self._cancel_tasks()
await self._cleanup(cleanup_pipeline)
if self._check_dangling_tasks:
self._print_dangling_tasks()
else:
logger.warning(
f"Pipeline task {self} is not being cancelled properly (use cancel() method)"
)
self._finished = True
async def queue_frame(self, frame: Frame):
@@ -494,13 +499,17 @@ class PipelineTask(BasePipelineTask):
async def _cancel(self):
"""Internal cancellation logic for the pipeline task."""
if not self._cancelled:
logger.debug(f"Canceling pipeline task {self}")
logger.debug(f"Cancelling pipeline task {self}")
self._cancelled = True
# Make sure everything is cleaned up downstream. This is sent
# out-of-band from the main streaming task which is what we want since
# we want to cancel right away.
await self._source.push_frame(CancelFrame())
# Only cancel the push task. Everything else will be cancelled in run().
# Wait for CancelFrame to make it throught the pipeline.
await self._wait_for_pipeline_end()
# Only cancel the push task, we don't want to be able to process any
# other frame after cancel. Everything else will be cancelled in
# run().
if self._process_push_task:
await self._task_manager.cancel_task(self._process_push_task)
self._process_push_task = None
@@ -542,6 +551,10 @@ class PipelineTask(BasePipelineTask):
"""Cancel all running pipeline tasks."""
await self._observer.stop()
if self._process_push_task:
await self._task_manager.cancel_task(self._process_push_task)
self._process_push_task = None
if self._process_up_task:
await self._task_manager.cancel_task(self._process_up_task)
self._process_up_task = None
@@ -654,7 +667,7 @@ class PipelineTask(BasePipelineTask):
while running:
frame = await self._push_queue.get()
await self._source.queue_frame(frame, FrameDirection.DOWNSTREAM)
if isinstance(frame, (EndFrame, StopFrame)):
if isinstance(frame, (CancelFrame, EndFrame, StopFrame)):
await self._wait_for_pipeline_end()
running = not isinstance(frame, (CancelFrame, EndFrame, StopFrame))
cleanup_pipeline = not isinstance(frame, StopFrame)
@@ -727,6 +740,7 @@ class PipelineTask(BasePipelineTask):
self._pipeline_end_event.set()
elif isinstance(frame, CancelFrame):
await self._call_event_handler("on_pipeline_cancelled", frame)
self._pipeline_end_event.set()
elif isinstance(frame, HeartbeatFrame):
await self._heartbeat_queue.put(frame)
self._down_queue.task_done()

View File

@@ -14,7 +14,7 @@ management, and frame flow control mechanisms.
import asyncio
from dataclasses import dataclass
from enum import Enum
from typing import Awaitable, Callable, Coroutine, List, Optional, Sequence
from typing import Any, Awaitable, Callable, Coroutine, List, Optional, Sequence
from loguru import logger
@@ -71,13 +71,104 @@ class FrameProcessorSetup:
watchdog_timers_enabled: bool = False
class FrameProcessorQueue(WatchdogQueue):
"""A priority queue for systems frames and other frames.
This is a specialized queue for frame processors that separates and
prioritizes system frames over other frames.
This queue uses two internal `WatchdogQueue` instances:
- One for system-level frames (`SystemFrame`)
- One for regular frames
It ensures that `SystemFrame` objects are processed before any other
frames. Additionally, it uses an `asyncio.Event` to signal when new items
have been added to either queue, allowing consumers to wait efficiently when
the queue is empty.
"""
def __init__(self, manager: BaseTaskManager):
"""Initialize the FrameProcessorQueue.
Args:
manager (BaseTaskManager): The task manager used by the internal watchdog queues.
"""
super().__init__(manager)
self.__event = WatchdogEvent(manager)
self.__main_queue = WatchdogQueue(manager)
self.__system_queue = WatchdogQueue(manager)
async def put(self, item: Any):
"""Put an item into the appropriate queue.
System frames (`SystemFrame`) are placed into the system queue and all others
into the regular queue. Signals the event to wake up any waiting consumers.
Args:
item (Any): The item to enqueue.
"""
if isinstance(item, SystemFrame):
await self.__system_queue.put(item)
else:
await self.__main_queue.put(item)
self.__event.set()
async def get(self) -> Any:
"""Retrieve the next item from the queue.
System frames are prioritized. If both queues are empty, this method
waits until an item is available.
Returns:
Any: The next item from the system or main queue.
"""
# Wait for an item in any of the queues if they are empty.
if self.__main_queue.empty() and self.__system_queue.empty():
await self.__event.wait()
# Prioritize system frames.
if self.__system_queue.qsize() > 0:
item = await self.__system_queue.get()
self.__system_queue.task_done()
else:
item = await self.__main_queue.get()
self.__main_queue.task_done()
# Clear the event only if all queues are empty.
if self.__main_queue.empty() and self.__system_queue.empty():
self.__event.clear()
return item
def cancel(self):
"""Cancel both internal queues.
This method is used to stop processing and release any pending tasks
in both the system and main queues. Typically used during shutdown
or cleanup to prevent further processing of frames.
"""
self.__main_queue.cancel()
self.__system_queue.cancel()
FrameCallback = Callable[["FrameProcessor", Frame, FrameDirection], Awaitable[None]]
class FrameProcessor(BaseObject):
"""Base class for all frame processors in the pipeline.
Frame processors are the building blocks of Pipecat pipelines. They receive
frames, process them, and pass them to the next processor in the chain.
Each processor runs in its own task and can be linked to form complex
processing pipelines.
Frame processors are the building blocks of Pipecat pipelines, they can be
linked to form complex processing pipelines. They receive frames, process
them, and pass them to the next or previous processor in the chain. Each
frame processor guarantees frame ordering and processes frames in its own
task. System frames are also processed in a separate task which guarantees
frame priority.
"""
def __init__(
@@ -144,13 +235,27 @@ class FrameProcessor(BaseObject):
self._metrics = metrics or FrameProcessorMetrics()
self._metrics.set_processor_name(self.name)
# Processors have an input queue. The input queue will be processed
# immediately (default) or it will block if `pause_processing_frames()`
# is called. To resume processing frames we need to call
# Processors have an input priority queue which stores any type of
# frames in order. System frames have higher priority than any other
# frames, so they will be returned first from the queue.
#
# If a system frame is obtained it will be processed immediately any
# other type of frame (data and control) will be put in a separate queue
# for later processing. This guarantees that each frame processor will
# always process system frames before any other frame in the queue.
# The input task that handles all types of frames. It processes system
# frames right away and queues non-system frames for later processing.
self.__input_frame_task: Optional[asyncio.Task] = None
# The process task processes non-system frames. Non-system frames will
# be processed as soon as they are received by the processing task
# (default) or they will block if `pause_processing_frames()` is
# called. To resume processing frames we need to call
# `resume_processing_frames()` which will wake up the event.
self.__should_block_frames = False
self.__input_event = None
self.__input_frame_task: Optional[asyncio.Task] = None
self.__process_event = None
self.__process_frame_task: Optional[asyncio.Task] = None
@property
def id(self) -> int:
@@ -373,6 +478,10 @@ class FrameProcessor(BaseObject):
if self._enable_watchdog_timers
else setup.watchdog_timers_enabled
)
# Create processing tasks.
self.__create_input_task()
if self._metrics is not None:
await self._metrics.setup(self._task_manager)
@@ -380,6 +489,7 @@ class FrameProcessor(BaseObject):
"""Clean up processor resources."""
await super().cleanup()
await self.__cancel_input_task()
await self.__cancel_process_task()
if self._metrics is not None:
await self._metrics.cleanup()
@@ -434,9 +544,7 @@ class FrameProcessor(BaseObject):
self,
frame: Frame,
direction: FrameDirection = FrameDirection.DOWNSTREAM,
callback: Optional[
Callable[["FrameProcessor", Frame, FrameDirection], Awaitable[None]]
] = None,
callback: Optional[FrameCallback] = None,
):
"""Queue a frame for processing.
@@ -449,12 +557,7 @@ class FrameProcessor(BaseObject):
if self._cancelling:
return
if isinstance(frame, SystemFrame):
# We don't want to queue system frames.
await self.process_frame(frame, direction)
else:
# We queue everything else.
await self.__input_queue.put((frame, direction, callback))
await self.__input_queue.put((frame, direction, callback))
async def pause_processing_frames(self):
"""Pause processing of queued frames."""
@@ -464,8 +567,8 @@ class FrameProcessor(BaseObject):
async def resume_processing_frames(self):
"""Resume processing of queued frames."""
logger.trace(f"{self}: resuming frame processing")
if self.__input_event:
self.__input_event.set()
if self.__process_event:
self.__process_event.set()
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process a frame.
@@ -522,7 +625,8 @@ class FrameProcessor(BaseObject):
self._enable_usage_metrics = frame.enable_usage_metrics
self._interruption_strategies = frame.interruption_strategies
self._report_only_initial_ttfb = frame.report_only_initial_ttfb
self.__create_input_task()
self.__create_process_task()
async def __cancel(self, frame: CancelFrame):
"""Handle the cancel frame to stop processor operation.
@@ -531,7 +635,7 @@ class FrameProcessor(BaseObject):
frame: The cancel frame.
"""
self._cancelling = True
await self.__cancel_input_task()
await self.__cancel_process_task()
async def __pause(self, frame: FrameProcessorPauseFrame | FrameProcessorPauseUrgentFrame):
"""Handle pause frame to pause processor operation.
@@ -556,16 +660,16 @@ class FrameProcessor(BaseObject):
#
async def _start_interruption(self):
"""Start handling an interruption by canceling current tasks."""
"""Start handling an interruption by cancelling current tasks."""
try:
# Cancel the input task. This will stop processing queued frames.
await self.__cancel_input_task()
# Cancel the process task. This will stop processing queued frames.
await self.__cancel_process_task()
except Exception as e:
logger.exception(f"Uncaught exception in {self} when handling _start_interruption: {e}")
await self.push_error(ErrorFrame(str(e)))
# Create a new input queue and task.
self.__create_input_task()
# Create a new process queue and task.
self.__create_process_task()
async def _stop_interruption(self):
"""Stop handling an interruption."""
@@ -624,41 +728,73 @@ class FrameProcessor(BaseObject):
return self.__started
def __create_input_task(self):
"""Create the input processing task."""
"""Create the frame input processing task."""
if not self.__input_frame_task:
self.__should_block_frames = False
if not self.__input_event:
self.__input_event = WatchdogEvent(self.task_manager)
self.__input_event.clear()
self.__input_queue = WatchdogQueue(self.task_manager)
self.__input_queue = FrameProcessorQueue(self.task_manager)
self.__input_frame_task = self.create_task(self.__input_frame_task_handler())
async def __cancel_input_task(self):
"""Cancel the input processing task."""
"""Cancel the frame input processing task."""
if self.__input_frame_task:
self.__input_queue.cancel()
await self.cancel_task(self.__input_frame_task)
self.__input_frame_task = None
def __create_process_task(self):
"""Create the non-system frame processing task."""
if not self.__process_frame_task:
self.__should_block_frames = False
if not self.__process_event:
self.__process_event = WatchdogEvent(self.task_manager)
self.__process_event.clear()
self.__process_queue = WatchdogQueue(self.task_manager)
self.__process_frame_task = self.create_task(self.__process_frame_task_handler())
async def __cancel_process_task(self):
"""Cancel the non-system frame processing task."""
if self.__process_frame_task:
self.__process_queue.cancel()
await self.cancel_task(self.__process_frame_task)
self.__process_frame_task = None
async def __process_frame(
self, frame: Frame, direction: FrameDirection, callback: FrameCallback
):
try:
# Process the frame.
await self.process_frame(frame, direction)
# If this frame has an associated callback, call it now.
if callback:
await callback(self, frame, direction)
except Exception as e:
logger.exception(f"{self}: error processing frame: {e}")
await self.push_error(ErrorFrame(str(e)))
async def __input_frame_task_handler(self):
"""Handle frames from the input queue."""
"""Handle frames from the input queue.
It only processes system frames. Other frames are queue for another task
to execute.
"""
while True:
if self.__should_block_frames and self.__input_event:
(frame, direction, callback) = await self.__input_queue.get()
if isinstance(frame, SystemFrame):
await self.__process_frame(frame, direction, callback)
else:
await self.__process_queue.put((frame, direction, callback))
async def __process_frame_task_handler(self):
"""Handle non-system frames from the process queue."""
while True:
if self.__should_block_frames and self.__process_event:
logger.trace(f"{self}: frame processing paused")
await self.__input_event.wait()
self.__input_event.clear()
await self.__process_event.wait()
self.__process_event.clear()
self.__should_block_frames = False
logger.trace(f"{self}: frame processing resumed")
(frame, direction, callback) = await self.__input_queue.get()
try:
# Process the frame.
await self.process_frame(frame, direction)
# If this frame has an associated callback, call it now.
if callback:
await callback(self, frame, direction)
except Exception as e:
logger.exception(f"{self}: error processing frame: {e}")
await self.push_error(ErrorFrame(str(e)))
finally:
self.__input_queue.task_done()
(frame, direction, callback) = await self.__process_queue.get()
await self.__process_frame(frame, direction, callback)

View File

@@ -183,7 +183,7 @@ class HeyGenVideoService(AIService):
async def stop(self, frame: EndFrame):
"""Stop the HeyGen video service gracefully.
Performs cleanup by ending the conversation and canceling ongoing tasks
Performs cleanup by ending the conversation and cancelling ongoing tasks
in a controlled manner.
Args:
@@ -241,7 +241,7 @@ class HeyGenVideoService(AIService):
Manages the interruption flow by:
1. Setting the interruption flag
2. Signaling the client to interrupt current speech
3. Canceling ongoing audio sending tasks
3. Cancelling ongoing audio sending tasks
4. Creating a new send task
5. Activating the avatar's listening animation
"""