handle EndTaskFrame, StopTaskFrame and CancelTaskFrame downstream

EndTaskFrame and StopTaskFrame are now ControlFrames instead of
SystemFrames, so they flow through the pipeline and queue behind
pending work. This prevents races where EndFrame could overtake
in-flight frames (e.g. function call responses).

CancelTaskFrame and InterruptionTaskFrame remain SystemFrames
(via new TaskSystemFrame base): since they need immediate propagation.

The sink now catches EndTaskFrame, StopTaskFrame and CancelTaskFrame
downstream and re-queues it upstream to the task, ensuring the full
pipeline drains before shutdown begins.
This commit is contained in:
Aleix Conchillo Flaqué
2026-03-11 23:16:15 -07:00
parent 374bfd4068
commit 2eccd28cf0
2 changed files with 47 additions and 21 deletions

View File

@@ -1742,7 +1742,7 @@ class ServiceSwitcherRequestMetadataFrame(ControlFrame):
@dataclass
class TaskFrame(SystemFrame):
class TaskFrame(ControlFrame):
"""Base frame for task frames.
This is a base class for frames that are meant to be sent and handled
@@ -1756,7 +1756,21 @@ class TaskFrame(SystemFrame):
@dataclass
class EndTaskFrame(TaskFrame):
class TaskSystemFrame(SystemFrame):
"""Base frame for task system frames.
This is a base class for frames that are meant to be sent and handled
upstream by the pipeline task. This might result in a corresponding frame
sent downstream (e.g. `InterruptionTaskFrame` / `InterruptionFrame` or
`EndTaskFrame` / `EndFrame`).
"""
pass
@dataclass
class EndTaskFrame(TaskFrame, UninterruptibleFrame):
"""Frame to request graceful pipeline task closure.
This is used to notify the pipeline task that the pipeline should be
@@ -1774,7 +1788,20 @@ class EndTaskFrame(TaskFrame):
@dataclass
class CancelTaskFrame(TaskFrame):
class StopTaskFrame(TaskFrame, UninterruptibleFrame):
"""Frame to request pipeline task stop while keeping processors running.
This is used to notify the pipeline task that it should be stopped as
soon as possible (flushing all the queued frames) but that the pipeline
processors should be kept in a running state. This frame should be pushed
upstream.
"""
pass
@dataclass
class CancelTaskFrame(TaskSystemFrame):
"""Frame to request immediate pipeline task cancellation.
This is used to notify the pipeline task that the pipeline should be
@@ -1792,20 +1819,7 @@ class CancelTaskFrame(TaskFrame):
@dataclass
class StopTaskFrame(TaskFrame):
"""Frame to request pipeline task stop while keeping processors running.
This is used to notify the pipeline task that it should be stopped as
soon as possible (flushing all the queued frames) but that the pipeline
processors should be kept in a running state. This frame should be pushed
upstream.
"""
pass
@dataclass
class InterruptionTaskFrame(TaskFrame):
class InterruptionTaskFrame(TaskSystemFrame):
"""Frame indicating the pipeline should be interrupted.
This frame should be pushed upstream to indicate the pipeline should be

View File

@@ -876,22 +876,22 @@ class PipelineTask(BasePipelineTask):
if isinstance(frame, EndTaskFrame):
# Tell the task we should end nicely.
logger.debug(f"{self}: received end task frame {frame}")
logger.debug(f"{self}: received end task frame upstream {frame}")
await self.queue_frame(EndFrame(reason=frame.reason))
elif isinstance(frame, CancelTaskFrame):
# Tell the task we should end right away.
logger.debug(f"{self}: received cancel task frame {frame}")
logger.debug(f"{self}: received cancel task frame upstream {frame}")
await self.queue_frame(CancelFrame(reason=frame.reason))
elif isinstance(frame, StopTaskFrame):
# Tell the task we should stop nicely.
logger.debug(f"{self}: received stop task frame {frame}")
logger.debug(f"{self}: received stop task frame upstream {frame}")
await self.queue_frame(StopFrame())
elif isinstance(frame, InterruptionTaskFrame):
# Tell the task we should interrupt the pipeline. Note that we are
# bypassing the push queue and directly queue into the
# pipeline. This is in case the push task is blocked waiting for a
# pipeline-ending frame to finish traversing the pipeline.
logger.debug(f"{self}: received interruption task frame {frame}")
logger.debug(f"{self}: received interruption task frame upstream {frame}")
await self._pipeline.queue_frame(InterruptionFrame())
elif isinstance(frame, ErrorFrame):
await self._call_event_handler("on_pipeline_error", frame)
@@ -934,6 +934,18 @@ class PipelineTask(BasePipelineTask):
self._pipeline_end_event.set()
elif isinstance(frame, HeartbeatFrame):
await self._heartbeat_queue.put(frame)
elif isinstance(frame, EndTaskFrame):
logger.debug(f"{self}: received end task frame downstream {frame}")
await self.queue_frame(EndTaskFrame(reason=frame.reason), FrameDirection.UPSTREAM)
elif isinstance(frame, StopTaskFrame):
logger.debug(f"{self}: received stop task frame downstream {frame}")
await self.queue_frame(StopTaskFrame(), FrameDirection.UPSTREAM)
elif isinstance(frame, CancelTaskFrame):
logger.debug(f"{self}: received cancel task frame downstream {frame}")
await self.queue_frame(CancelTaskFrame(reason=frame.reason), FrameDirection.UPSTREAM)
elif isinstance(frame, InterruptionTaskFrame):
logger.debug(f"{self}: received interruption task frame downstream {frame}")
await self.queue_frame(InterruptionTaskFrame(), FrameDirection.UPSTREAM)
async def _heartbeat_push_handler(self):
"""Push heartbeat frames at regular intervals."""