frames: StartFrame is now a SystemFrame

This commit is contained in:
Aleix Conchillo Flaqué
2024-09-18 22:36:59 -07:00
parent 23d6eed5ea
commit f078d156de
6 changed files with 38 additions and 32 deletions

View File

@@ -40,6 +40,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- `StartFrame` is back a system frame so we make sure it's processed immediately
by all processors. `EndFrame` stays a control frame since it needs to be
ordered allowing the frames in the pipeline to be processed.
- Updated `MoondreamService` revision to `2024-08-26`.
- `CartesiaTTSService` and `ElevenLabsTTSService` now add presentation

View File

@@ -248,6 +248,16 @@ class SystemFrame(Frame):
pass
@dataclass
class StartFrame(SystemFrame):
"""This is the first frame that should be pushed down a pipeline."""
clock: BaseClock
allow_interruptions: bool = False
enable_metrics: bool = False
enable_usage_metrics: bool = False
report_only_initial_ttfb: bool = False
@dataclass
class CancelFrame(SystemFrame):
"""Indicates that a pipeline needs to stop right away."""
@@ -338,16 +348,6 @@ class ControlFrame(Frame):
pass
@dataclass
class StartFrame(ControlFrame):
"""This is the first frame that should be pushed down a pipeline."""
clock: BaseClock
allow_interruptions: bool = False
enable_metrics: bool = False
enable_usage_metrics: bool = False
report_only_initial_ttfb: bool = False
@dataclass
class EndFrame(ControlFrame):
"""Indicates that a pipeline has ended and frame processors and pipelines

View File

@@ -337,7 +337,12 @@ class RTVIProcessor(FrameProcessor):
await super().process_frame(frame, direction)
# Specific system frames
if isinstance(frame, CancelFrame):
if isinstance(frame, StartFrame):
# Push StartFrame before start(), because we want StartFrame to be
# processed by every processor before any other frame is processed.
await self.push_frame(frame, direction)
await self._start(frame)
elif isinstance(frame, CancelFrame):
await self._cancel(frame)
await self.push_frame(frame, direction)
elif isinstance(frame, ErrorFrame):
@@ -347,11 +352,6 @@ class RTVIProcessor(FrameProcessor):
elif isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
# Control frames
elif isinstance(frame, StartFrame):
# Push StartFrame before start(), because we want StartFrame to be
# processed by every processor before any other frame is processed.
await self.push_frame(frame, direction)
await self._start(frame)
elif isinstance(frame, EndFrame):
# Push EndFrame before stop(), because stop() waits on the task to
# finish and the task finishes when EndFrame is processed.

View File

@@ -66,18 +66,18 @@ class GStreamerPipelineSource(FrameProcessor):
await super().process_frame(frame, direction)
# Specific system frames
if isinstance(frame, CancelFrame):
if isinstance(frame, StartFrame):
# Push StartFrame before start(), because we want StartFrame to be
# processed by every processor before any other frame is processed.
await self.push_frame(frame, direction)
await self._start(frame)
elif isinstance(frame, CancelFrame):
await self._cancel(frame)
await self.push_frame(frame, direction)
# All other system frames
elif isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
# Control frames
elif isinstance(frame, StartFrame):
# Push StartFrame before start(), because we want StartFrame to be
# processed by every processor before any other frame is processed.
await self.push_frame(frame, direction)
await self._start(frame)
elif isinstance(frame, EndFrame):
# Push EndFrame before stop(), because stop() waits on the task to
# finish and the task finishes when EndFrame is processed.

View File

@@ -71,7 +71,12 @@ class BaseInputTransport(FrameProcessor):
await super().process_frame(frame, direction)
# Specific system frames
if isinstance(frame, CancelFrame):
if isinstance(frame, StartFrame):
# Push StartFrame before start(), because we want StartFrame to be
# processed by every processor before any other frame is processed.
await self.push_frame(frame, direction)
await self.start(frame)
elif isinstance(frame, CancelFrame):
await self.cancel(frame)
await self.push_frame(frame, direction)
elif isinstance(frame, BotInterruptionFrame):
@@ -81,11 +86,6 @@ class BaseInputTransport(FrameProcessor):
elif isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
# Control frames
elif isinstance(frame, StartFrame):
# Push StartFrame before start(), because we want StartFrame to be
# processed by every processor before any other frame is processed.
await self.push_frame(frame, direction)
await self.start(frame)
elif isinstance(frame, EndFrame):
# Push EndFrame before stop(), because stop() waits on the task to
# finish and the task finishes when EndFrame is processed.

View File

@@ -129,7 +129,12 @@ class BaseOutputTransport(FrameProcessor):
# immediately. Other frames require order so they are put in the sink
# queue.
#
if isinstance(frame, CancelFrame):
if isinstance(frame, StartFrame):
# Push StartFrame before start(), because we want StartFrame to be
# processed by every processor before any other frame is processed.
await self.push_frame(frame, direction)
await self.start(frame)
elif isinstance(frame, CancelFrame):
await self.cancel(frame)
await self.push_frame(frame, direction)
elif isinstance(frame, StartInterruptionFrame) or isinstance(frame, StopInterruptionFrame):
@@ -141,9 +146,6 @@ class BaseOutputTransport(FrameProcessor):
elif isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
# Control frames.
elif isinstance(frame, StartFrame):
await self._sink_queue.put(frame)
await self.start(frame)
elif isinstance(frame, EndFrame):
await self._sink_clock_queue.put((sys.maxsize, frame.id, frame))
await self._sink_queue.put(frame)