From f078d156de65f674effa7df8418c168c00a288cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 18 Sep 2024 22:36:59 -0700 Subject: [PATCH] frames: StartFrame is now a SystemFrame --- CHANGELOG.md | 4 ++++ src/pipecat/frames/frames.py | 20 +++++++++---------- src/pipecat/processors/frameworks/rtvi.py | 12 +++++------ .../processors/gstreamer/pipeline_source.py | 12 +++++------ src/pipecat/transports/base_input.py | 12 +++++------ src/pipecat/transports/base_output.py | 10 ++++++---- 6 files changed, 38 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f2618edab..e1c6bbadc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- `StartFrame` is back a system frame so we make sure it's processed immediately + by all processors. `EndFrame` stays a control frame since it needs to be + ordered allowing the frames in the pipeline to be processed. + - Updated `MoondreamService` revision to `2024-08-26`. - `CartesiaTTSService` and `ElevenLabsTTSService` now add presentation diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 51770dff1..a400d68d9 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -248,6 +248,16 @@ class SystemFrame(Frame): pass +@dataclass +class StartFrame(SystemFrame): + """This is the first frame that should be pushed down a pipeline.""" + clock: BaseClock + allow_interruptions: bool = False + enable_metrics: bool = False + enable_usage_metrics: bool = False + report_only_initial_ttfb: bool = False + + @dataclass class CancelFrame(SystemFrame): """Indicates that a pipeline needs to stop right away.""" @@ -338,16 +348,6 @@ class ControlFrame(Frame): pass -@dataclass -class StartFrame(ControlFrame): - """This is the first frame that should be pushed down a pipeline.""" - clock: BaseClock - allow_interruptions: bool = False - enable_metrics: bool = False - enable_usage_metrics: bool = False - report_only_initial_ttfb: bool = False - - @dataclass class EndFrame(ControlFrame): """Indicates that a pipeline has ended and frame processors and pipelines diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index d32f0f640..66adb9ad0 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -337,7 +337,12 @@ class RTVIProcessor(FrameProcessor): await super().process_frame(frame, direction) # Specific system frames - if isinstance(frame, CancelFrame): + if isinstance(frame, StartFrame): + # Push StartFrame before start(), because we want StartFrame to be + # processed by every processor before any other frame is processed. + await self.push_frame(frame, direction) + await self._start(frame) + elif isinstance(frame, CancelFrame): await self._cancel(frame) await self.push_frame(frame, direction) elif isinstance(frame, ErrorFrame): @@ -347,11 +352,6 @@ class RTVIProcessor(FrameProcessor): elif isinstance(frame, SystemFrame): await self.push_frame(frame, direction) # Control frames - elif isinstance(frame, StartFrame): - # Push StartFrame before start(), because we want StartFrame to be - # processed by every processor before any other frame is processed. - await self.push_frame(frame, direction) - await self._start(frame) elif isinstance(frame, EndFrame): # Push EndFrame before stop(), because stop() waits on the task to # finish and the task finishes when EndFrame is processed. diff --git a/src/pipecat/processors/gstreamer/pipeline_source.py b/src/pipecat/processors/gstreamer/pipeline_source.py index 5f0ee089b..8d46105a7 100644 --- a/src/pipecat/processors/gstreamer/pipeline_source.py +++ b/src/pipecat/processors/gstreamer/pipeline_source.py @@ -66,18 +66,18 @@ class GStreamerPipelineSource(FrameProcessor): await super().process_frame(frame, direction) # Specific system frames - if isinstance(frame, CancelFrame): + if isinstance(frame, StartFrame): + # Push StartFrame before start(), because we want StartFrame to be + # processed by every processor before any other frame is processed. + await self.push_frame(frame, direction) + await self._start(frame) + elif isinstance(frame, CancelFrame): await self._cancel(frame) await self.push_frame(frame, direction) # All other system frames elif isinstance(frame, SystemFrame): await self.push_frame(frame, direction) # Control frames - elif isinstance(frame, StartFrame): - # Push StartFrame before start(), because we want StartFrame to be - # processed by every processor before any other frame is processed. - await self.push_frame(frame, direction) - await self._start(frame) elif isinstance(frame, EndFrame): # Push EndFrame before stop(), because stop() waits on the task to # finish and the task finishes when EndFrame is processed. diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index 8836fbd1e..de1ec8884 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -71,7 +71,12 @@ class BaseInputTransport(FrameProcessor): await super().process_frame(frame, direction) # Specific system frames - if isinstance(frame, CancelFrame): + if isinstance(frame, StartFrame): + # Push StartFrame before start(), because we want StartFrame to be + # processed by every processor before any other frame is processed. + await self.push_frame(frame, direction) + await self.start(frame) + elif isinstance(frame, CancelFrame): await self.cancel(frame) await self.push_frame(frame, direction) elif isinstance(frame, BotInterruptionFrame): @@ -81,11 +86,6 @@ class BaseInputTransport(FrameProcessor): elif isinstance(frame, SystemFrame): await self.push_frame(frame, direction) # Control frames - elif isinstance(frame, StartFrame): - # Push StartFrame before start(), because we want StartFrame to be - # processed by every processor before any other frame is processed. - await self.push_frame(frame, direction) - await self.start(frame) elif isinstance(frame, EndFrame): # Push EndFrame before stop(), because stop() waits on the task to # finish and the task finishes when EndFrame is processed. diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index bc683721a..461f0567d 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -129,7 +129,12 @@ class BaseOutputTransport(FrameProcessor): # immediately. Other frames require order so they are put in the sink # queue. # - if isinstance(frame, CancelFrame): + if isinstance(frame, StartFrame): + # Push StartFrame before start(), because we want StartFrame to be + # processed by every processor before any other frame is processed. + await self.push_frame(frame, direction) + await self.start(frame) + elif isinstance(frame, CancelFrame): await self.cancel(frame) await self.push_frame(frame, direction) elif isinstance(frame, StartInterruptionFrame) or isinstance(frame, StopInterruptionFrame): @@ -141,9 +146,6 @@ class BaseOutputTransport(FrameProcessor): elif isinstance(frame, SystemFrame): await self.push_frame(frame, direction) # Control frames. - elif isinstance(frame, StartFrame): - await self._sink_queue.put(frame) - await self.start(frame) elif isinstance(frame, EndFrame): await self._sink_clock_queue.put((sys.maxsize, frame.id, frame)) await self._sink_queue.put(frame)