From ee3786fe155a426843707d465b302f3419d569f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 24 Sep 2024 19:10:22 -0700 Subject: [PATCH] frames: add EndTaskFrame and CancelTaskFrame --- CHANGELOG.md | 4 ++++ src/pipecat/frames/frames.py | 21 +++++++++++++++++++++ src/pipecat/pipeline/task.py | 16 ++++++++++++++-- 3 files changed, 39 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c1d571da5..fb08aab2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added `EndTaskFrame` and `CancelTaskFrame`. These are new frames that are + meant to be pushed upstream to tell the pipeline task to stop nicely or + immediately respectively. + - Added configurable LLM parameters (e.g., temperature, top_p, max_tokens, seed) for OpenAI, Anthropic, and Together AI services along with corresponding setter functions. diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 768e53e39..273aad214 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -339,6 +339,27 @@ class FatalErrorFrame(ErrorFrame): fatal: bool = field(default=True, init=False) +@dataclass +class EndTaskFrame(SystemFrame): + """This is used to notify the pipeline task that the pipeline should be + closed nicely (flushing all the queued frames) by pushing an EndFrame + downstream. + + """ + + pass + + +@dataclass +class CancelTaskFrame(SystemFrame): + """This is used to notify the pipeline task that the pipeline should be + stopped immediately by pushing a CancelFrame downstream. + + """ + + pass + + @dataclass class StopTaskFrame(SystemFrame): """Indicates that a pipeline task should be stopped but that the pipeline diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 2b46c47c2..f79ff6f39 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -14,7 +14,9 @@ from pipecat.clocks.base_clock import BaseClock from pipecat.clocks.system_clock import SystemClock from pipecat.frames.frames import ( CancelFrame, + CancelTaskFrame, EndFrame, + EndTaskFrame, ErrorFrame, Frame, MetricsFrame, @@ -52,7 +54,13 @@ class Source(FrameProcessor): await self.push_frame(frame, direction) async def _handle_upstream_frame(self, frame: Frame): - if isinstance(frame, ErrorFrame): + if isinstance(frame, EndTaskFrame): + # Tell the task we should end nicely. + await self._up_queue.put(EndTaskFrame()) + elif isinstance(frame, CancelTaskFrame): + # Tell the task we should end right away. + await self._up_queue.put(CancelTaskFrame()) + elif isinstance(frame, ErrorFrame): logger.error(f"Error running app: {frame}") if frame.fatal: # Cancel all tasks downstream. @@ -165,7 +173,11 @@ class PipelineTask: while True: try: frame = await self._up_queue.get() - if isinstance(frame, StopTaskFrame): + if isinstance(frame, EndTaskFrame): + await self.queue_frame(EndFrame()) + elif isinstance(frame, CancelTaskFrame): + await self.queue_frame(CancelFrame()) + elif isinstance(frame, StopTaskFrame): await self.queue_frame(StopTaskFrame()) self._up_queue.task_done() except asyncio.CancelledError: