frames: input frames are now system frames

Input frames from a transport should be processed fast and there's no need for
them to be queued internally in each element.
This commit is contained in:
Aleix Conchillo Flaqué
2024-11-20 10:23:51 +01:00
parent 6b9223d87e
commit a5c7b02a73

View File

@@ -21,6 +21,8 @@ def format_pts(pts: int | None):
@dataclass
class Frame:
"""Base frame class."""
id: int = field(init=False)
name: str = field(init=False)
pts: Optional[int] = field(init=False)
@@ -35,13 +37,47 @@ class Frame:
@dataclass
class DataFrame(Frame):
class SystemFrame(Frame):
"""System frames are frames that are not internally queued by any of the
frame processors and should be processed immediately.
"""
pass
@dataclass
class AudioRawFrame(DataFrame):
"""A chunk of audio."""
class DataFrame(Frame):
"""Data frames are frames that will be processed in order and usually
contain data such as LLM context, text, audio or images.
"""
pass
@dataclass
class ControlFrame(Frame):
"""Control frames are frames that, similar to data frames, will be processed
in order and usually contain control information such as frames to update
settings or to end the pipeline.
"""
pass
#
# Data frames.
#
@dataclass
class OutputAudioRawFrame(DataFrame):
"""A chunk of audio. Will be played by the output transport if the
transport's microphone has been enabled.
"""
audio: bytes
sample_rate: int
@@ -57,32 +93,8 @@ class AudioRawFrame(DataFrame):
@dataclass
class InputAudioRawFrame(AudioRawFrame):
"""A chunk of audio usually coming from an input transport."""
pass
@dataclass
class OutputAudioRawFrame(AudioRawFrame):
"""A chunk of audio. Will be played by the output transport if the
transport's microphone has been enabled.
"""
pass
@dataclass
class TTSAudioRawFrame(OutputAudioRawFrame):
"""A chunk of output audio generated by a TTS service."""
pass
@dataclass
class ImageRawFrame(DataFrame):
"""An image. Will be shown by the transport if the transport's camera is
class OutputImageRawFrame(DataFrame):
"""An image that will be shown by the transport if the transport's camera is
enabled.
"""
@@ -97,47 +109,16 @@ class ImageRawFrame(DataFrame):
@dataclass
class InputImageRawFrame(ImageRawFrame):
class TTSAudioRawFrame(OutputAudioRawFrame):
"""A chunk of output audio generated by a TTS service."""
pass
@dataclass
class OutputImageRawFrame(ImageRawFrame):
pass
@dataclass
class UserImageRawFrame(InputImageRawFrame):
"""An image associated to a user. Will be shown by the transport if the
transport's camera is enabled.
"""
user_id: str
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, user: {self.user_id}, size: {self.size}, format: {self.format})"
@dataclass
class VisionImageRawFrame(InputImageRawFrame):
"""An image with an associated text to ask for a description of it. Will be
shown by the transport if the transport's camera is enabled.
"""
text: str | None
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, text: [{self.text}], size: {self.size}, format: {self.format})"
@dataclass
class URLImageRawFrame(OutputImageRawFrame):
"""An image with an associated URL. Will be shown by the transport if the
transport's camera is enabled.
"""An output image with an associated URL. These images are usually
generated by third-party services that provide a URL to download the image.
"""
@@ -149,14 +130,14 @@ class URLImageRawFrame(OutputImageRawFrame):
@dataclass
class SpriteFrame(Frame):
class SpriteFrame(DataFrame):
"""An animated sprite. Will be shown by the transport if the transport's
camera is enabled. Will play at the framerate specified in the transport's
`camera_out_framerate` constructor parameter.
"""
images: List[ImageRawFrame]
images: List[OutputImageRawFrame]
def __str__(self):
pts = format_pts(self.pts)
@@ -166,7 +147,7 @@ class SpriteFrame(Frame):
@dataclass
class TextFrame(DataFrame):
"""A chunk of text. Emitted by LLM services, consumed by TTS services, can
be used to send text through pipelines.
be used to send text through processors.
"""
@@ -177,41 +158,13 @@ class TextFrame(DataFrame):
return f"{self.name}(pts: {pts}, text: [{self.text}])"
@dataclass
class TranscriptionFrame(TextFrame):
"""A text frame with transcription-specific data. Will be placed in the
transport's receive queue when a participant speaks.
"""
user_id: str
timestamp: str
language: Language | None = None
def __str__(self):
return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})"
@dataclass
class InterimTranscriptionFrame(TextFrame):
"""A text frame with interim transcription-specific data. Will be placed in
the transport's receive queue when a participant speaks."""
user_id: str
timestamp: str
language: Language | None = None
def __str__(self):
return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})"
@dataclass
class LLMMessagesFrame(DataFrame):
"""A frame containing a list of LLM messages. Used to signal that an LLM
service should run a chat completion and emit an LLMStartFrames, TextFrames
and an LLMEndFrame. Note that the messages property on this class is
mutable, and will be be updated by various ResponseAggregator frame
processors.
service should run a chat completion and emit an LLMFullResponseStartFrame,
TextFrames and an LLMFullResponseStartFrame. Note that the `messages`
property in this class is mutable, and will be be updated by various
aggregators.
"""
@@ -220,7 +173,7 @@ class LLMMessagesFrame(DataFrame):
@dataclass
class LLMMessagesAppendFrame(DataFrame):
"""A frame containing a list of LLM messages that neeed to be added to the
"""A frame containing a list of LLM messages that need to be added to the
current context.
"""
@@ -274,17 +227,6 @@ class TransportMessageFrame(DataFrame):
return f"{self.name}(message: {self.message})"
@dataclass
class FunctionCallResultFrame(DataFrame):
"""A frame containing the result of an LLM function (tool) call."""
function_name: str
tool_call_id: str
arguments: str
result: Any
run_llm: bool = True
#
# App frames. Application user-defined frames.
#
@@ -300,11 +242,6 @@ class AppFrame(Frame):
#
@dataclass
class SystemFrame(Frame):
pass
@dataclass
class StartFrame(SystemFrame):
"""This is the first frame that should be pushed down a pipeline."""
@@ -461,14 +398,10 @@ class BotSpeakingFrame(SystemFrame):
@dataclass
class UserImageRequestFrame(SystemFrame):
"""A frame user to request an image from the given user."""
class MetricsFrame(SystemFrame):
"""Emitted by processor that can compute metrics like latencies."""
user_id: str
context: Optional[Any] = None
def __str__(self):
return f"{self.name}, user: {self.user_id}"
data: List[MetricsData]
@dataclass
@@ -480,6 +413,17 @@ class FunctionCallInProgressFrame(SystemFrame):
arguments: str
@dataclass
class FunctionCallResultFrame(SystemFrame):
"""A frame containing the result of an LLM function (tool) call."""
function_name: str
tool_call_id: str
arguments: str
result: Any
run_llm: bool = True
@dataclass
class TransportMessageUrgentFrame(SystemFrame):
message: Any
@@ -489,10 +433,96 @@ class TransportMessageUrgentFrame(SystemFrame):
@dataclass
class MetricsFrame(SystemFrame):
"""Emitted by processor that can compute metrics like latencies."""
class TranscriptionFrame(SystemFrame):
"""A text frame with transcription-specific data. Will be placed in the
transport's receive queue when a participant speaks.
data: List[MetricsData]
"""
text: str
user_id: str
timestamp: str
language: Language | None = None
def __str__(self):
return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})"
@dataclass
class InterimTranscriptionFrame(SystemFrame):
"""A text frame with interim transcription-specific data. Will be placed in
the transport's receive queue when a participant speaks."""
text: str
user_id: str
timestamp: str
language: Language | None = None
def __str__(self):
return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})"
@dataclass
class UserImageRequestFrame(SystemFrame):
"""A frame user to request an image from the given user."""
user_id: str
context: Optional[Any] = None
def __str__(self):
return f"{self.name}, user: {self.user_id}"
@dataclass
class InputAudioRawFrame(SystemFrame):
"""A chunk of audio usually coming from an input transport."""
audio: bytes
sample_rate: int
num_channels: int
def __post_init__(self):
super().__post_init__()
self.num_frames = int(len(self.audio) / (self.num_channels * 2))
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
@dataclass
class InputImageRawFrame(SystemFrame):
"""An image usually coming from an input transport."""
image: bytes
size: Tuple[int, int]
format: str | None
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, size: {self.size}, format: {self.format})"
@dataclass
class UserImageRawFrame(InputImageRawFrame):
"""An image associated to a user."""
user_id: str
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, user: {self.user_id}, size: {self.size}, format: {self.format})"
@dataclass
class VisionImageRawFrame(InputImageRawFrame):
"""An image with an associated text to ask for a description of it."""
text: str | None
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, text: [{self.text}], size: {self.size}, format: {self.format})"
#
@@ -500,11 +530,6 @@ class MetricsFrame(SystemFrame):
#
@dataclass
class ControlFrame(Frame):
pass
@dataclass
class EndFrame(ControlFrame):
"""Indicates that a pipeline has ended and frame processors and pipelines