Merge pull request #503 from pipecat-ai/aleix/end-cancel-task-frames

frames: add EndTaskFrame and CancelTaskFrame
This commit is contained in:
Aleix Conchillo Flaqué
2024-09-24 23:59:10 -07:00
committed by GitHub
3 changed files with 39 additions and 2 deletions

View File

@@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added `EndTaskFrame` and `CancelTaskFrame`. These are new frames that are
meant to be pushed upstream to tell the pipeline task to stop nicely or
immediately respectively.
- Added configurable LLM parameters (e.g., temperature, top_p, max_tokens, seed)
for OpenAI, Anthropic, and Together AI services along with corresponding
setter functions.

View File

@@ -339,6 +339,27 @@ class FatalErrorFrame(ErrorFrame):
fatal: bool = field(default=True, init=False)
@dataclass
class EndTaskFrame(SystemFrame):
"""This is used to notify the pipeline task that the pipeline should be
closed nicely (flushing all the queued frames) by pushing an EndFrame
downstream.
"""
pass
@dataclass
class CancelTaskFrame(SystemFrame):
"""This is used to notify the pipeline task that the pipeline should be
stopped immediately by pushing a CancelFrame downstream.
"""
pass
@dataclass
class StopTaskFrame(SystemFrame):
"""Indicates that a pipeline task should be stopped but that the pipeline

View File

@@ -14,7 +14,9 @@ from pipecat.clocks.base_clock import BaseClock
from pipecat.clocks.system_clock import SystemClock
from pipecat.frames.frames import (
CancelFrame,
CancelTaskFrame,
EndFrame,
EndTaskFrame,
ErrorFrame,
Frame,
MetricsFrame,
@@ -52,7 +54,13 @@ class Source(FrameProcessor):
await self.push_frame(frame, direction)
async def _handle_upstream_frame(self, frame: Frame):
if isinstance(frame, ErrorFrame):
if isinstance(frame, EndTaskFrame):
# Tell the task we should end nicely.
await self._up_queue.put(EndTaskFrame())
elif isinstance(frame, CancelTaskFrame):
# Tell the task we should end right away.
await self._up_queue.put(CancelTaskFrame())
elif isinstance(frame, ErrorFrame):
logger.error(f"Error running app: {frame}")
if frame.fatal:
# Cancel all tasks downstream.
@@ -165,7 +173,11 @@ class PipelineTask:
while True:
try:
frame = await self._up_queue.get()
if isinstance(frame, StopTaskFrame):
if isinstance(frame, EndTaskFrame):
await self.queue_frame(EndFrame())
elif isinstance(frame, CancelTaskFrame):
await self.queue_frame(CancelFrame())
elif isinstance(frame, StopTaskFrame):
await self.queue_frame(StopTaskFrame())
self._up_queue.task_done()
except asyncio.CancelledError: