From 61cb45d61b983574eef17c066f6324e3148d83e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Mon, 4 Aug 2025 17:12:25 -0700 Subject: [PATCH 1/3] PipelineTask: also wait on CancelFrame Before CancelFrames didn't need to be waited for because system frames were processed in-place and therefore calling push_frame() would finalize after it traversed all the pipeline. Now, system frames are queued so we need to wait until CancelFrame reaches the end of the pipeline. --- src/pipecat/pipeline/task.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index a3dfbab24..8b745c2a7 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -500,6 +500,8 @@ class PipelineTask(BasePipelineTask): # out-of-band from the main streaming task which is what we want since # we want to cancel right away. await self._source.push_frame(CancelFrame()) + # Wait for CancelFrame to make it throught the pipeline. + await self._wait_for_pipeline_end() # Only cancel the push task. Everything else will be cancelled in run(). if self._process_push_task: await self._task_manager.cancel_task(self._process_push_task) @@ -654,7 +656,7 @@ class PipelineTask(BasePipelineTask): while running: frame = await self._push_queue.get() await self._source.queue_frame(frame, FrameDirection.DOWNSTREAM) - if isinstance(frame, (EndFrame, StopFrame)): + if isinstance(frame, (CancelFrame, EndFrame, StopFrame)): await self._wait_for_pipeline_end() running = not isinstance(frame, (CancelFrame, EndFrame, StopFrame)) cleanup_pipeline = not isinstance(frame, StopFrame) @@ -727,6 +729,7 @@ class PipelineTask(BasePipelineTask): self._pipeline_end_event.set() elif isinstance(frame, CancelFrame): await self._call_event_handler("on_pipeline_cancelled", frame) + self._pipeline_end_event.set() elif isinstance(frame, HeartbeatFrame): await self._heartbeat_queue.put(frame) self._down_queue.task_done() From 49a5a1e375fe0eca0e345c36bbbd94a85e56c91b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 5 Aug 2025 11:25:29 -0700 Subject: [PATCH 2/3] PipelineTask: improve task cancellation --- src/pipecat/pipeline/task.py | 33 +++++++++++++++-------- src/pipecat/processors/frame_processor.py | 2 +- src/pipecat/services/heygen/video.py | 4 +-- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 8b745c2a7..1640d589c 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -459,15 +459,20 @@ class PipelineTask(BasePipelineTask): # awaiting a task. pass finally: - # It's possibe that we get an asyncio.CancelledError from the - # outside, if so we need to make sure everything gets cancelled - # properly. - if cleanup_pipeline: - await self._cancel() - await self._cancel_tasks() - await self._cleanup(cleanup_pipeline) - if self._check_dangling_tasks: - self._print_dangling_tasks() + # We only cancel things cleanly if we know we are the ones + # cancelling. It's possibe that we get an asyncio.CancelledError + # from the outside, in which case it is very likely other tasks have + # been already cancelled (e.g. when python is shutting down) so we + # can't assume things are being cancelled nicely. + if self._cancelled: + await self._cancel_tasks() + await self._cleanup(cleanup_pipeline) + if self._check_dangling_tasks: + self._print_dangling_tasks() + else: + logger.warning( + f"Pipeline task {self} is not being cancelled properly (use cancel() method)" + ) self._finished = True async def queue_frame(self, frame: Frame): @@ -494,7 +499,7 @@ class PipelineTask(BasePipelineTask): async def _cancel(self): """Internal cancellation logic for the pipeline task.""" if not self._cancelled: - logger.debug(f"Canceling pipeline task {self}") + logger.debug(f"Cancelling pipeline task {self}") self._cancelled = True # Make sure everything is cleaned up downstream. This is sent # out-of-band from the main streaming task which is what we want since @@ -502,7 +507,9 @@ class PipelineTask(BasePipelineTask): await self._source.push_frame(CancelFrame()) # Wait for CancelFrame to make it throught the pipeline. await self._wait_for_pipeline_end() - # Only cancel the push task. Everything else will be cancelled in run(). + # Only cancel the push task, we don't want to be able to process any + # other frame after cancel. Everything else will be cancelled in + # run(). if self._process_push_task: await self._task_manager.cancel_task(self._process_push_task) self._process_push_task = None @@ -544,6 +551,10 @@ class PipelineTask(BasePipelineTask): """Cancel all running pipeline tasks.""" await self._observer.stop() + if self._process_push_task: + await self._task_manager.cancel_task(self._process_push_task) + self._process_push_task = None + if self._process_up_task: await self._task_manager.cancel_task(self._process_up_task) self._process_up_task = None diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index e6611a161..fda818572 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -556,7 +556,7 @@ class FrameProcessor(BaseObject): # async def _start_interruption(self): - """Start handling an interruption by canceling current tasks.""" + """Start handling an interruption by cancelling current tasks.""" try: # Cancel the input task. This will stop processing queued frames. await self.__cancel_input_task() diff --git a/src/pipecat/services/heygen/video.py b/src/pipecat/services/heygen/video.py index 237dfc36f..f32c8f392 100644 --- a/src/pipecat/services/heygen/video.py +++ b/src/pipecat/services/heygen/video.py @@ -183,7 +183,7 @@ class HeyGenVideoService(AIService): async def stop(self, frame: EndFrame): """Stop the HeyGen video service gracefully. - Performs cleanup by ending the conversation and canceling ongoing tasks + Performs cleanup by ending the conversation and cancelling ongoing tasks in a controlled manner. Args: @@ -241,7 +241,7 @@ class HeyGenVideoService(AIService): Manages the interruption flow by: 1. Setting the interruption flag 2. Signaling the client to interrupt current speech - 3. Canceling ongoing audio sending tasks + 3. Cancelling ongoing audio sending tasks 4. Creating a new send task 5. Activating the avatar's listening animation """ From a5ea6e1642d28c1a443a056ea12316f5c09ca3bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Mon, 4 Aug 2025 17:17:03 -0700 Subject: [PATCH 3/3] FrameProcessor: system frames are now queued System frames are now queued. Before, system frames could be generated from any task and would not guarantee any order which was causing undesired behavior. Also, it was possible to get into some rare recursion issues because of the way system frames were executed (they were executed in-place, meaning calling `push_frame()` would finish after the system frame traversed all the pipeline). This makes system frames more deterministic. --- CHANGELOG.md | 7 + src/pipecat/frames/frames.py | 19 +- src/pipecat/processors/frame_processor.py | 236 +++++++++++++++++----- 3 files changed, 204 insertions(+), 58 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2323b8d10..d4a97762f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- System frames are now queued. Before, system frames could be generated from + any task and would not guarantee any order which was causing undesired + behavior. Also, it was possible to get into some rare recursion issues because + of the way system frames were executed (they were executed in-place, meaning + calling `push_frame()` would finish after the system frame traversed all the + pipeline). This makes system frames more deterministic. + - Changed the default model for both `ElevenLabsTTSService` and `ElevenLabsHttpTTSService` to `eleven_turbo_v2_5`. The rationale for this change is that the Turbo v2.5 model exhibits the most stable voice quality diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 94a60787d..8f2764f5d 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -121,8 +121,8 @@ class Frame: class SystemFrame(Frame): """System frame class for immediate processing. - System frames are frames that are not internally queued by any of the - frame processors and should be processed immediately. + A frame that takes higher priority than other frames. System frames are + handled in order and are not affected by user interruptions. """ pass @@ -132,8 +132,9 @@ class SystemFrame(Frame): class DataFrame(Frame): """Data frame class for processing data in order. - Data frames are frames that will be processed in order and usually - contain data such as LLM context, text, audio or images. + A frame that is processed in order and usually contains data such as LLM + context, text, audio or images. Data frames are cancelled by user + interruptions. """ pass @@ -143,9 +144,11 @@ class DataFrame(Frame): class ControlFrame(Frame): """Control frame class for processing control information in order. - Control frames are frames that, similar to data frames, will be processed - in order and usually contain control information such as frames to update - settings or to end the pipeline. + A frame that, similar to data frames, is processed in order and usually + contains control information such as update settings or to end the pipeline + after everything is flushed. Control frames are cancelled by user + interruptions. + """ pass @@ -1206,7 +1209,7 @@ class EndFrame(ControlFrame): should be shut down. If the transport receives this frame, it will stop sending frames to its output channel(s) and close all its threads. Note, that this is a control frame, which means it will be received in the order it - was sent (unlike system frames). + was sent. """ pass diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index fda818572..087b97c2f 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -14,7 +14,7 @@ management, and frame flow control mechanisms. import asyncio from dataclasses import dataclass from enum import Enum -from typing import Awaitable, Callable, Coroutine, List, Optional, Sequence +from typing import Any, Awaitable, Callable, Coroutine, List, Optional, Sequence from loguru import logger @@ -71,13 +71,104 @@ class FrameProcessorSetup: watchdog_timers_enabled: bool = False +class FrameProcessorQueue(WatchdogQueue): + """A priority queue for systems frames and other frames. + + This is a specialized queue for frame processors that separates and + prioritizes system frames over other frames. + + This queue uses two internal `WatchdogQueue` instances: + - One for system-level frames (`SystemFrame`) + - One for regular frames + + It ensures that `SystemFrame` objects are processed before any other + frames. Additionally, it uses an `asyncio.Event` to signal when new items + have been added to either queue, allowing consumers to wait efficiently when + the queue is empty. + + """ + + def __init__(self, manager: BaseTaskManager): + """Initialize the FrameProcessorQueue. + + Args: + manager (BaseTaskManager): The task manager used by the internal watchdog queues. + + """ + super().__init__(manager) + self.__event = WatchdogEvent(manager) + self.__main_queue = WatchdogQueue(manager) + self.__system_queue = WatchdogQueue(manager) + + async def put(self, item: Any): + """Put an item into the appropriate queue. + + System frames (`SystemFrame`) are placed into the system queue and all others + into the regular queue. Signals the event to wake up any waiting consumers. + + Args: + item (Any): The item to enqueue. + + """ + if isinstance(item, SystemFrame): + await self.__system_queue.put(item) + else: + await self.__main_queue.put(item) + self.__event.set() + + async def get(self) -> Any: + """Retrieve the next item from the queue. + + System frames are prioritized. If both queues are empty, this method + waits until an item is available. + + Returns: + Any: The next item from the system or main queue. + + """ + # Wait for an item in any of the queues if they are empty. + if self.__main_queue.empty() and self.__system_queue.empty(): + await self.__event.wait() + + # Prioritize system frames. + if self.__system_queue.qsize() > 0: + item = await self.__system_queue.get() + self.__system_queue.task_done() + else: + item = await self.__main_queue.get() + self.__main_queue.task_done() + + # Clear the event only if all queues are empty. + if self.__main_queue.empty() and self.__system_queue.empty(): + self.__event.clear() + + return item + + def cancel(self): + """Cancel both internal queues. + + This method is used to stop processing and release any pending tasks + in both the system and main queues. Typically used during shutdown + or cleanup to prevent further processing of frames. + + """ + self.__main_queue.cancel() + self.__system_queue.cancel() + + +FrameCallback = Callable[["FrameProcessor", Frame, FrameDirection], Awaitable[None]] + + class FrameProcessor(BaseObject): """Base class for all frame processors in the pipeline. - Frame processors are the building blocks of Pipecat pipelines. They receive - frames, process them, and pass them to the next processor in the chain. - Each processor runs in its own task and can be linked to form complex - processing pipelines. + Frame processors are the building blocks of Pipecat pipelines, they can be + linked to form complex processing pipelines. They receive frames, process + them, and pass them to the next or previous processor in the chain. Each + frame processor guarantees frame ordering and processes frames in its own + task. System frames are also processed in a separate task which guarantees + frame priority. + """ def __init__( @@ -144,13 +235,27 @@ class FrameProcessor(BaseObject): self._metrics = metrics or FrameProcessorMetrics() self._metrics.set_processor_name(self.name) - # Processors have an input queue. The input queue will be processed - # immediately (default) or it will block if `pause_processing_frames()` - # is called. To resume processing frames we need to call + # Processors have an input priority queue which stores any type of + # frames in order. System frames have higher priority than any other + # frames, so they will be returned first from the queue. + # + # If a system frame is obtained it will be processed immediately any + # other type of frame (data and control) will be put in a separate queue + # for later processing. This guarantees that each frame processor will + # always process system frames before any other frame in the queue. + + # The input task that handles all types of frames. It processes system + # frames right away and queues non-system frames for later processing. + self.__input_frame_task: Optional[asyncio.Task] = None + + # The process task processes non-system frames. Non-system frames will + # be processed as soon as they are received by the processing task + # (default) or they will block if `pause_processing_frames()` is + # called. To resume processing frames we need to call # `resume_processing_frames()` which will wake up the event. self.__should_block_frames = False - self.__input_event = None - self.__input_frame_task: Optional[asyncio.Task] = None + self.__process_event = None + self.__process_frame_task: Optional[asyncio.Task] = None @property def id(self) -> int: @@ -373,6 +478,10 @@ class FrameProcessor(BaseObject): if self._enable_watchdog_timers else setup.watchdog_timers_enabled ) + + # Create processing tasks. + self.__create_input_task() + if self._metrics is not None: await self._metrics.setup(self._task_manager) @@ -380,6 +489,7 @@ class FrameProcessor(BaseObject): """Clean up processor resources.""" await super().cleanup() await self.__cancel_input_task() + await self.__cancel_process_task() if self._metrics is not None: await self._metrics.cleanup() @@ -434,9 +544,7 @@ class FrameProcessor(BaseObject): self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM, - callback: Optional[ - Callable[["FrameProcessor", Frame, FrameDirection], Awaitable[None]] - ] = None, + callback: Optional[FrameCallback] = None, ): """Queue a frame for processing. @@ -449,12 +557,7 @@ class FrameProcessor(BaseObject): if self._cancelling: return - if isinstance(frame, SystemFrame): - # We don't want to queue system frames. - await self.process_frame(frame, direction) - else: - # We queue everything else. - await self.__input_queue.put((frame, direction, callback)) + await self.__input_queue.put((frame, direction, callback)) async def pause_processing_frames(self): """Pause processing of queued frames.""" @@ -464,8 +567,8 @@ class FrameProcessor(BaseObject): async def resume_processing_frames(self): """Resume processing of queued frames.""" logger.trace(f"{self}: resuming frame processing") - if self.__input_event: - self.__input_event.set() + if self.__process_event: + self.__process_event.set() async def process_frame(self, frame: Frame, direction: FrameDirection): """Process a frame. @@ -522,7 +625,8 @@ class FrameProcessor(BaseObject): self._enable_usage_metrics = frame.enable_usage_metrics self._interruption_strategies = frame.interruption_strategies self._report_only_initial_ttfb = frame.report_only_initial_ttfb - self.__create_input_task() + + self.__create_process_task() async def __cancel(self, frame: CancelFrame): """Handle the cancel frame to stop processor operation. @@ -531,7 +635,7 @@ class FrameProcessor(BaseObject): frame: The cancel frame. """ self._cancelling = True - await self.__cancel_input_task() + await self.__cancel_process_task() async def __pause(self, frame: FrameProcessorPauseFrame | FrameProcessorPauseUrgentFrame): """Handle pause frame to pause processor operation. @@ -558,14 +662,14 @@ class FrameProcessor(BaseObject): async def _start_interruption(self): """Start handling an interruption by cancelling current tasks.""" try: - # Cancel the input task. This will stop processing queued frames. - await self.__cancel_input_task() + # Cancel the process task. This will stop processing queued frames. + await self.__cancel_process_task() except Exception as e: logger.exception(f"Uncaught exception in {self} when handling _start_interruption: {e}") await self.push_error(ErrorFrame(str(e))) - # Create a new input queue and task. - self.__create_input_task() + # Create a new process queue and task. + self.__create_process_task() async def _stop_interruption(self): """Stop handling an interruption.""" @@ -624,41 +728,73 @@ class FrameProcessor(BaseObject): return self.__started def __create_input_task(self): - """Create the input processing task.""" + """Create the frame input processing task.""" if not self.__input_frame_task: - self.__should_block_frames = False - if not self.__input_event: - self.__input_event = WatchdogEvent(self.task_manager) - self.__input_event.clear() - self.__input_queue = WatchdogQueue(self.task_manager) + self.__input_queue = FrameProcessorQueue(self.task_manager) self.__input_frame_task = self.create_task(self.__input_frame_task_handler()) async def __cancel_input_task(self): - """Cancel the input processing task.""" + """Cancel the frame input processing task.""" if self.__input_frame_task: self.__input_queue.cancel() await self.cancel_task(self.__input_frame_task) self.__input_frame_task = None + def __create_process_task(self): + """Create the non-system frame processing task.""" + if not self.__process_frame_task: + self.__should_block_frames = False + if not self.__process_event: + self.__process_event = WatchdogEvent(self.task_manager) + self.__process_event.clear() + self.__process_queue = WatchdogQueue(self.task_manager) + self.__process_frame_task = self.create_task(self.__process_frame_task_handler()) + + async def __cancel_process_task(self): + """Cancel the non-system frame processing task.""" + if self.__process_frame_task: + self.__process_queue.cancel() + await self.cancel_task(self.__process_frame_task) + self.__process_frame_task = None + + async def __process_frame( + self, frame: Frame, direction: FrameDirection, callback: FrameCallback + ): + try: + # Process the frame. + await self.process_frame(frame, direction) + # If this frame has an associated callback, call it now. + if callback: + await callback(self, frame, direction) + except Exception as e: + logger.exception(f"{self}: error processing frame: {e}") + await self.push_error(ErrorFrame(str(e))) + async def __input_frame_task_handler(self): - """Handle frames from the input queue.""" + """Handle frames from the input queue. + + It only processes system frames. Other frames are queue for another task + to execute. + + """ while True: - if self.__should_block_frames and self.__input_event: + (frame, direction, callback) = await self.__input_queue.get() + + if isinstance(frame, SystemFrame): + await self.__process_frame(frame, direction, callback) + else: + await self.__process_queue.put((frame, direction, callback)) + + async def __process_frame_task_handler(self): + """Handle non-system frames from the process queue.""" + while True: + if self.__should_block_frames and self.__process_event: logger.trace(f"{self}: frame processing paused") - await self.__input_event.wait() - self.__input_event.clear() + await self.__process_event.wait() + self.__process_event.clear() self.__should_block_frames = False logger.trace(f"{self}: frame processing resumed") - (frame, direction, callback) = await self.__input_queue.get() - try: - # Process the frame. - await self.process_frame(frame, direction) - # If this frame has an associated callback, call it now. - if callback: - await callback(self, frame, direction) - except Exception as e: - logger.exception(f"{self}: error processing frame: {e}") - await self.push_error(ErrorFrame(str(e))) - finally: - self.__input_queue.task_done() + (frame, direction, callback) = await self.__process_queue.get() + + await self.__process_frame(frame, direction, callback)