diff --git a/CHANGELOG.md b/CHANGELOG.md index 2323b8d10..d4a97762f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- System frames are now queued. Before, system frames could be generated from + any task and would not guarantee any order which was causing undesired + behavior. Also, it was possible to get into some rare recursion issues because + of the way system frames were executed (they were executed in-place, meaning + calling `push_frame()` would finish after the system frame traversed all the + pipeline). This makes system frames more deterministic. + - Changed the default model for both `ElevenLabsTTSService` and `ElevenLabsHttpTTSService` to `eleven_turbo_v2_5`. The rationale for this change is that the Turbo v2.5 model exhibits the most stable voice quality diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 4c9b57332..e587e563d 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -121,8 +121,8 @@ class Frame: class SystemFrame(Frame): """System frame class for immediate processing. - System frames are frames that are not internally queued by any of the - frame processors and should be processed immediately. + A frame that takes higher priority than other frames. System frames are + handled in order and are not affected by user interruptions. """ pass @@ -132,8 +132,9 @@ class SystemFrame(Frame): class DataFrame(Frame): """Data frame class for processing data in order. - Data frames are frames that will be processed in order and usually - contain data such as LLM context, text, audio or images. + A frame that is processed in order and usually contains data such as LLM + context, text, audio or images. Data frames are cancelled by user + interruptions. """ pass @@ -143,9 +144,11 @@ class DataFrame(Frame): class ControlFrame(Frame): """Control frame class for processing control information in order. - Control frames are frames that, similar to data frames, will be processed - in order and usually contain control information such as frames to update - settings or to end the pipeline. + A frame that, similar to data frames, is processed in order and usually + contains control information such as update settings or to end the pipeline + after everything is flushed. Control frames are cancelled by user + interruptions. + """ pass @@ -1206,7 +1209,7 @@ class EndFrame(ControlFrame): should be shut down. If the transport receives this frame, it will stop sending frames to its output channel(s) and close all its threads. Note, that this is a control frame, which means it will be received in the order it - was sent (unlike system frames). + was sent. """ pass diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index a3dfbab24..1640d589c 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -459,15 +459,20 @@ class PipelineTask(BasePipelineTask): # awaiting a task. pass finally: - # It's possibe that we get an asyncio.CancelledError from the - # outside, if so we need to make sure everything gets cancelled - # properly. - if cleanup_pipeline: - await self._cancel() - await self._cancel_tasks() - await self._cleanup(cleanup_pipeline) - if self._check_dangling_tasks: - self._print_dangling_tasks() + # We only cancel things cleanly if we know we are the ones + # cancelling. It's possibe that we get an asyncio.CancelledError + # from the outside, in which case it is very likely other tasks have + # been already cancelled (e.g. when python is shutting down) so we + # can't assume things are being cancelled nicely. + if self._cancelled: + await self._cancel_tasks() + await self._cleanup(cleanup_pipeline) + if self._check_dangling_tasks: + self._print_dangling_tasks() + else: + logger.warning( + f"Pipeline task {self} is not being cancelled properly (use cancel() method)" + ) self._finished = True async def queue_frame(self, frame: Frame): @@ -494,13 +499,17 @@ class PipelineTask(BasePipelineTask): async def _cancel(self): """Internal cancellation logic for the pipeline task.""" if not self._cancelled: - logger.debug(f"Canceling pipeline task {self}") + logger.debug(f"Cancelling pipeline task {self}") self._cancelled = True # Make sure everything is cleaned up downstream. This is sent # out-of-band from the main streaming task which is what we want since # we want to cancel right away. await self._source.push_frame(CancelFrame()) - # Only cancel the push task. Everything else will be cancelled in run(). + # Wait for CancelFrame to make it throught the pipeline. + await self._wait_for_pipeline_end() + # Only cancel the push task, we don't want to be able to process any + # other frame after cancel. Everything else will be cancelled in + # run(). if self._process_push_task: await self._task_manager.cancel_task(self._process_push_task) self._process_push_task = None @@ -542,6 +551,10 @@ class PipelineTask(BasePipelineTask): """Cancel all running pipeline tasks.""" await self._observer.stop() + if self._process_push_task: + await self._task_manager.cancel_task(self._process_push_task) + self._process_push_task = None + if self._process_up_task: await self._task_manager.cancel_task(self._process_up_task) self._process_up_task = None @@ -654,7 +667,7 @@ class PipelineTask(BasePipelineTask): while running: frame = await self._push_queue.get() await self._source.queue_frame(frame, FrameDirection.DOWNSTREAM) - if isinstance(frame, (EndFrame, StopFrame)): + if isinstance(frame, (CancelFrame, EndFrame, StopFrame)): await self._wait_for_pipeline_end() running = not isinstance(frame, (CancelFrame, EndFrame, StopFrame)) cleanup_pipeline = not isinstance(frame, StopFrame) @@ -727,6 +740,7 @@ class PipelineTask(BasePipelineTask): self._pipeline_end_event.set() elif isinstance(frame, CancelFrame): await self._call_event_handler("on_pipeline_cancelled", frame) + self._pipeline_end_event.set() elif isinstance(frame, HeartbeatFrame): await self._heartbeat_queue.put(frame) self._down_queue.task_done() diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index e6611a161..087b97c2f 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -14,7 +14,7 @@ management, and frame flow control mechanisms. import asyncio from dataclasses import dataclass from enum import Enum -from typing import Awaitable, Callable, Coroutine, List, Optional, Sequence +from typing import Any, Awaitable, Callable, Coroutine, List, Optional, Sequence from loguru import logger @@ -71,13 +71,104 @@ class FrameProcessorSetup: watchdog_timers_enabled: bool = False +class FrameProcessorQueue(WatchdogQueue): + """A priority queue for systems frames and other frames. + + This is a specialized queue for frame processors that separates and + prioritizes system frames over other frames. + + This queue uses two internal `WatchdogQueue` instances: + - One for system-level frames (`SystemFrame`) + - One for regular frames + + It ensures that `SystemFrame` objects are processed before any other + frames. Additionally, it uses an `asyncio.Event` to signal when new items + have been added to either queue, allowing consumers to wait efficiently when + the queue is empty. + + """ + + def __init__(self, manager: BaseTaskManager): + """Initialize the FrameProcessorQueue. + + Args: + manager (BaseTaskManager): The task manager used by the internal watchdog queues. + + """ + super().__init__(manager) + self.__event = WatchdogEvent(manager) + self.__main_queue = WatchdogQueue(manager) + self.__system_queue = WatchdogQueue(manager) + + async def put(self, item: Any): + """Put an item into the appropriate queue. + + System frames (`SystemFrame`) are placed into the system queue and all others + into the regular queue. Signals the event to wake up any waiting consumers. + + Args: + item (Any): The item to enqueue. + + """ + if isinstance(item, SystemFrame): + await self.__system_queue.put(item) + else: + await self.__main_queue.put(item) + self.__event.set() + + async def get(self) -> Any: + """Retrieve the next item from the queue. + + System frames are prioritized. If both queues are empty, this method + waits until an item is available. + + Returns: + Any: The next item from the system or main queue. + + """ + # Wait for an item in any of the queues if they are empty. + if self.__main_queue.empty() and self.__system_queue.empty(): + await self.__event.wait() + + # Prioritize system frames. + if self.__system_queue.qsize() > 0: + item = await self.__system_queue.get() + self.__system_queue.task_done() + else: + item = await self.__main_queue.get() + self.__main_queue.task_done() + + # Clear the event only if all queues are empty. + if self.__main_queue.empty() and self.__system_queue.empty(): + self.__event.clear() + + return item + + def cancel(self): + """Cancel both internal queues. + + This method is used to stop processing and release any pending tasks + in both the system and main queues. Typically used during shutdown + or cleanup to prevent further processing of frames. + + """ + self.__main_queue.cancel() + self.__system_queue.cancel() + + +FrameCallback = Callable[["FrameProcessor", Frame, FrameDirection], Awaitable[None]] + + class FrameProcessor(BaseObject): """Base class for all frame processors in the pipeline. - Frame processors are the building blocks of Pipecat pipelines. They receive - frames, process them, and pass them to the next processor in the chain. - Each processor runs in its own task and can be linked to form complex - processing pipelines. + Frame processors are the building blocks of Pipecat pipelines, they can be + linked to form complex processing pipelines. They receive frames, process + them, and pass them to the next or previous processor in the chain. Each + frame processor guarantees frame ordering and processes frames in its own + task. System frames are also processed in a separate task which guarantees + frame priority. + """ def __init__( @@ -144,13 +235,27 @@ class FrameProcessor(BaseObject): self._metrics = metrics or FrameProcessorMetrics() self._metrics.set_processor_name(self.name) - # Processors have an input queue. The input queue will be processed - # immediately (default) or it will block if `pause_processing_frames()` - # is called. To resume processing frames we need to call + # Processors have an input priority queue which stores any type of + # frames in order. System frames have higher priority than any other + # frames, so they will be returned first from the queue. + # + # If a system frame is obtained it will be processed immediately any + # other type of frame (data and control) will be put in a separate queue + # for later processing. This guarantees that each frame processor will + # always process system frames before any other frame in the queue. + + # The input task that handles all types of frames. It processes system + # frames right away and queues non-system frames for later processing. + self.__input_frame_task: Optional[asyncio.Task] = None + + # The process task processes non-system frames. Non-system frames will + # be processed as soon as they are received by the processing task + # (default) or they will block if `pause_processing_frames()` is + # called. To resume processing frames we need to call # `resume_processing_frames()` which will wake up the event. self.__should_block_frames = False - self.__input_event = None - self.__input_frame_task: Optional[asyncio.Task] = None + self.__process_event = None + self.__process_frame_task: Optional[asyncio.Task] = None @property def id(self) -> int: @@ -373,6 +478,10 @@ class FrameProcessor(BaseObject): if self._enable_watchdog_timers else setup.watchdog_timers_enabled ) + + # Create processing tasks. + self.__create_input_task() + if self._metrics is not None: await self._metrics.setup(self._task_manager) @@ -380,6 +489,7 @@ class FrameProcessor(BaseObject): """Clean up processor resources.""" await super().cleanup() await self.__cancel_input_task() + await self.__cancel_process_task() if self._metrics is not None: await self._metrics.cleanup() @@ -434,9 +544,7 @@ class FrameProcessor(BaseObject): self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM, - callback: Optional[ - Callable[["FrameProcessor", Frame, FrameDirection], Awaitable[None]] - ] = None, + callback: Optional[FrameCallback] = None, ): """Queue a frame for processing. @@ -449,12 +557,7 @@ class FrameProcessor(BaseObject): if self._cancelling: return - if isinstance(frame, SystemFrame): - # We don't want to queue system frames. - await self.process_frame(frame, direction) - else: - # We queue everything else. - await self.__input_queue.put((frame, direction, callback)) + await self.__input_queue.put((frame, direction, callback)) async def pause_processing_frames(self): """Pause processing of queued frames.""" @@ -464,8 +567,8 @@ class FrameProcessor(BaseObject): async def resume_processing_frames(self): """Resume processing of queued frames.""" logger.trace(f"{self}: resuming frame processing") - if self.__input_event: - self.__input_event.set() + if self.__process_event: + self.__process_event.set() async def process_frame(self, frame: Frame, direction: FrameDirection): """Process a frame. @@ -522,7 +625,8 @@ class FrameProcessor(BaseObject): self._enable_usage_metrics = frame.enable_usage_metrics self._interruption_strategies = frame.interruption_strategies self._report_only_initial_ttfb = frame.report_only_initial_ttfb - self.__create_input_task() + + self.__create_process_task() async def __cancel(self, frame: CancelFrame): """Handle the cancel frame to stop processor operation. @@ -531,7 +635,7 @@ class FrameProcessor(BaseObject): frame: The cancel frame. """ self._cancelling = True - await self.__cancel_input_task() + await self.__cancel_process_task() async def __pause(self, frame: FrameProcessorPauseFrame | FrameProcessorPauseUrgentFrame): """Handle pause frame to pause processor operation. @@ -556,16 +660,16 @@ class FrameProcessor(BaseObject): # async def _start_interruption(self): - """Start handling an interruption by canceling current tasks.""" + """Start handling an interruption by cancelling current tasks.""" try: - # Cancel the input task. This will stop processing queued frames. - await self.__cancel_input_task() + # Cancel the process task. This will stop processing queued frames. + await self.__cancel_process_task() except Exception as e: logger.exception(f"Uncaught exception in {self} when handling _start_interruption: {e}") await self.push_error(ErrorFrame(str(e))) - # Create a new input queue and task. - self.__create_input_task() + # Create a new process queue and task. + self.__create_process_task() async def _stop_interruption(self): """Stop handling an interruption.""" @@ -624,41 +728,73 @@ class FrameProcessor(BaseObject): return self.__started def __create_input_task(self): - """Create the input processing task.""" + """Create the frame input processing task.""" if not self.__input_frame_task: - self.__should_block_frames = False - if not self.__input_event: - self.__input_event = WatchdogEvent(self.task_manager) - self.__input_event.clear() - self.__input_queue = WatchdogQueue(self.task_manager) + self.__input_queue = FrameProcessorQueue(self.task_manager) self.__input_frame_task = self.create_task(self.__input_frame_task_handler()) async def __cancel_input_task(self): - """Cancel the input processing task.""" + """Cancel the frame input processing task.""" if self.__input_frame_task: self.__input_queue.cancel() await self.cancel_task(self.__input_frame_task) self.__input_frame_task = None + def __create_process_task(self): + """Create the non-system frame processing task.""" + if not self.__process_frame_task: + self.__should_block_frames = False + if not self.__process_event: + self.__process_event = WatchdogEvent(self.task_manager) + self.__process_event.clear() + self.__process_queue = WatchdogQueue(self.task_manager) + self.__process_frame_task = self.create_task(self.__process_frame_task_handler()) + + async def __cancel_process_task(self): + """Cancel the non-system frame processing task.""" + if self.__process_frame_task: + self.__process_queue.cancel() + await self.cancel_task(self.__process_frame_task) + self.__process_frame_task = None + + async def __process_frame( + self, frame: Frame, direction: FrameDirection, callback: FrameCallback + ): + try: + # Process the frame. + await self.process_frame(frame, direction) + # If this frame has an associated callback, call it now. + if callback: + await callback(self, frame, direction) + except Exception as e: + logger.exception(f"{self}: error processing frame: {e}") + await self.push_error(ErrorFrame(str(e))) + async def __input_frame_task_handler(self): - """Handle frames from the input queue.""" + """Handle frames from the input queue. + + It only processes system frames. Other frames are queue for another task + to execute. + + """ while True: - if self.__should_block_frames and self.__input_event: + (frame, direction, callback) = await self.__input_queue.get() + + if isinstance(frame, SystemFrame): + await self.__process_frame(frame, direction, callback) + else: + await self.__process_queue.put((frame, direction, callback)) + + async def __process_frame_task_handler(self): + """Handle non-system frames from the process queue.""" + while True: + if self.__should_block_frames and self.__process_event: logger.trace(f"{self}: frame processing paused") - await self.__input_event.wait() - self.__input_event.clear() + await self.__process_event.wait() + self.__process_event.clear() self.__should_block_frames = False logger.trace(f"{self}: frame processing resumed") - (frame, direction, callback) = await self.__input_queue.get() - try: - # Process the frame. - await self.process_frame(frame, direction) - # If this frame has an associated callback, call it now. - if callback: - await callback(self, frame, direction) - except Exception as e: - logger.exception(f"{self}: error processing frame: {e}") - await self.push_error(ErrorFrame(str(e))) - finally: - self.__input_queue.task_done() + (frame, direction, callback) = await self.__process_queue.get() + + await self.__process_frame(frame, direction, callback) diff --git a/src/pipecat/services/heygen/video.py b/src/pipecat/services/heygen/video.py index 237dfc36f..f32c8f392 100644 --- a/src/pipecat/services/heygen/video.py +++ b/src/pipecat/services/heygen/video.py @@ -183,7 +183,7 @@ class HeyGenVideoService(AIService): async def stop(self, frame: EndFrame): """Stop the HeyGen video service gracefully. - Performs cleanup by ending the conversation and canceling ongoing tasks + Performs cleanup by ending the conversation and cancelling ongoing tasks in a controlled manner. Args: @@ -241,7 +241,7 @@ class HeyGenVideoService(AIService): Manages the interruption flow by: 1. Setting the interruption flag 2. Signaling the client to interrupt current speech - 3. Canceling ongoing audio sending tasks + 3. Cancelling ongoing audio sending tasks 4. Creating a new send task 5. Activating the avatar's listening animation """