diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index f58dc957f..1ef7bdb4a 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -1742,7 +1742,7 @@ class ServiceSwitcherRequestMetadataFrame(ControlFrame): @dataclass -class TaskFrame(SystemFrame): +class TaskFrame(ControlFrame): """Base frame for task frames. This is a base class for frames that are meant to be sent and handled @@ -1756,7 +1756,21 @@ class TaskFrame(SystemFrame): @dataclass -class EndTaskFrame(TaskFrame): +class TaskSystemFrame(SystemFrame): + """Base frame for task system frames. + + This is a base class for frames that are meant to be sent and handled + upstream by the pipeline task. This might result in a corresponding frame + sent downstream (e.g. `InterruptionTaskFrame` / `InterruptionFrame` or + `EndTaskFrame` / `EndFrame`). + + """ + + pass + + +@dataclass +class EndTaskFrame(TaskFrame, UninterruptibleFrame): """Frame to request graceful pipeline task closure. This is used to notify the pipeline task that the pipeline should be @@ -1774,7 +1788,20 @@ class EndTaskFrame(TaskFrame): @dataclass -class CancelTaskFrame(TaskFrame): +class StopTaskFrame(TaskFrame, UninterruptibleFrame): + """Frame to request pipeline task stop while keeping processors running. + + This is used to notify the pipeline task that it should be stopped as + soon as possible (flushing all the queued frames) but that the pipeline + processors should be kept in a running state. This frame should be pushed + upstream. + """ + + pass + + +@dataclass +class CancelTaskFrame(TaskSystemFrame): """Frame to request immediate pipeline task cancellation. This is used to notify the pipeline task that the pipeline should be @@ -1792,20 +1819,7 @@ class CancelTaskFrame(TaskFrame): @dataclass -class StopTaskFrame(TaskFrame): - """Frame to request pipeline task stop while keeping processors running. - - This is used to notify the pipeline task that it should be stopped as - soon as possible (flushing all the queued frames) but that the pipeline - processors should be kept in a running state. This frame should be pushed - upstream. - """ - - pass - - -@dataclass -class InterruptionTaskFrame(TaskFrame): +class InterruptionTaskFrame(TaskSystemFrame): """Frame indicating the pipeline should be interrupted. This frame should be pushed upstream to indicate the pipeline should be diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index e795961a1..56df719d5 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -876,22 +876,22 @@ class PipelineTask(BasePipelineTask): if isinstance(frame, EndTaskFrame): # Tell the task we should end nicely. - logger.debug(f"{self}: received end task frame {frame}") + logger.debug(f"{self}: received end task frame upstream {frame}") await self.queue_frame(EndFrame(reason=frame.reason)) elif isinstance(frame, CancelTaskFrame): # Tell the task we should end right away. - logger.debug(f"{self}: received cancel task frame {frame}") + logger.debug(f"{self}: received cancel task frame upstream {frame}") await self.queue_frame(CancelFrame(reason=frame.reason)) elif isinstance(frame, StopTaskFrame): # Tell the task we should stop nicely. - logger.debug(f"{self}: received stop task frame {frame}") + logger.debug(f"{self}: received stop task frame upstream {frame}") await self.queue_frame(StopFrame()) elif isinstance(frame, InterruptionTaskFrame): # Tell the task we should interrupt the pipeline. Note that we are # bypassing the push queue and directly queue into the # pipeline. This is in case the push task is blocked waiting for a # pipeline-ending frame to finish traversing the pipeline. - logger.debug(f"{self}: received interruption task frame {frame}") + logger.debug(f"{self}: received interruption task frame upstream {frame}") await self._pipeline.queue_frame(InterruptionFrame()) elif isinstance(frame, ErrorFrame): await self._call_event_handler("on_pipeline_error", frame) @@ -934,6 +934,18 @@ class PipelineTask(BasePipelineTask): self._pipeline_end_event.set() elif isinstance(frame, HeartbeatFrame): await self._heartbeat_queue.put(frame) + elif isinstance(frame, EndTaskFrame): + logger.debug(f"{self}: received end task frame downstream {frame}") + await self.queue_frame(EndTaskFrame(reason=frame.reason), FrameDirection.UPSTREAM) + elif isinstance(frame, StopTaskFrame): + logger.debug(f"{self}: received stop task frame downstream {frame}") + await self.queue_frame(StopTaskFrame(), FrameDirection.UPSTREAM) + elif isinstance(frame, CancelTaskFrame): + logger.debug(f"{self}: received cancel task frame downstream {frame}") + await self.queue_frame(CancelTaskFrame(reason=frame.reason), FrameDirection.UPSTREAM) + elif isinstance(frame, InterruptionTaskFrame): + logger.debug(f"{self}: received interruption task frame downstream {frame}") + await self.queue_frame(InterruptionTaskFrame(), FrameDirection.UPSTREAM) async def _heartbeat_push_handler(self): """Push heartbeat frames at regular intervals."""