From a5c7b02a736491b7c8a28a68de2a5a9affd3138e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 20 Nov 2024 10:23:51 +0100 Subject: [PATCH] frames: input frames are now system frames Input frames from a transport should be processed fast and there's no need for them to be queued internally in each element. --- src/pipecat/frames/frames.py | 289 +++++++++++++++++++---------------- 1 file changed, 157 insertions(+), 132 deletions(-) diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index e057e97da..d10a1cd21 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -21,6 +21,8 @@ def format_pts(pts: int | None): @dataclass class Frame: + """Base frame class.""" + id: int = field(init=False) name: str = field(init=False) pts: Optional[int] = field(init=False) @@ -35,13 +37,47 @@ class Frame: @dataclass -class DataFrame(Frame): +class SystemFrame(Frame): + """System frames are frames that are not internally queued by any of the + frame processors and should be processed immediately. + + """ + pass @dataclass -class AudioRawFrame(DataFrame): - """A chunk of audio.""" +class DataFrame(Frame): + """Data frames are frames that will be processed in order and usually + contain data such as LLM context, text, audio or images. + + """ + + pass + + +@dataclass +class ControlFrame(Frame): + """Control frames are frames that, similar to data frames, will be processed + in order and usually contain control information such as frames to update + settings or to end the pipeline. + + """ + + pass + + +# +# Data frames. +# + + +@dataclass +class OutputAudioRawFrame(DataFrame): + """A chunk of audio. Will be played by the output transport if the + transport's microphone has been enabled. + + """ audio: bytes sample_rate: int @@ -57,32 +93,8 @@ class AudioRawFrame(DataFrame): @dataclass -class InputAudioRawFrame(AudioRawFrame): - """A chunk of audio usually coming from an input transport.""" - - pass - - -@dataclass -class OutputAudioRawFrame(AudioRawFrame): - """A chunk of audio. Will be played by the output transport if the - transport's microphone has been enabled. - - """ - - pass - - -@dataclass -class TTSAudioRawFrame(OutputAudioRawFrame): - """A chunk of output audio generated by a TTS service.""" - - pass - - -@dataclass -class ImageRawFrame(DataFrame): - """An image. Will be shown by the transport if the transport's camera is +class OutputImageRawFrame(DataFrame): + """An image that will be shown by the transport if the transport's camera is enabled. """ @@ -97,47 +109,16 @@ class ImageRawFrame(DataFrame): @dataclass -class InputImageRawFrame(ImageRawFrame): +class TTSAudioRawFrame(OutputAudioRawFrame): + """A chunk of output audio generated by a TTS service.""" + pass -@dataclass -class OutputImageRawFrame(ImageRawFrame): - pass - - -@dataclass -class UserImageRawFrame(InputImageRawFrame): - """An image associated to a user. Will be shown by the transport if the - transport's camera is enabled. - - """ - - user_id: str - - def __str__(self): - pts = format_pts(self.pts) - return f"{self.name}(pts: {pts}, user: {self.user_id}, size: {self.size}, format: {self.format})" - - -@dataclass -class VisionImageRawFrame(InputImageRawFrame): - """An image with an associated text to ask for a description of it. Will be - shown by the transport if the transport's camera is enabled. - - """ - - text: str | None - - def __str__(self): - pts = format_pts(self.pts) - return f"{self.name}(pts: {pts}, text: [{self.text}], size: {self.size}, format: {self.format})" - - @dataclass class URLImageRawFrame(OutputImageRawFrame): - """An image with an associated URL. Will be shown by the transport if the - transport's camera is enabled. + """An output image with an associated URL. These images are usually + generated by third-party services that provide a URL to download the image. """ @@ -149,14 +130,14 @@ class URLImageRawFrame(OutputImageRawFrame): @dataclass -class SpriteFrame(Frame): +class SpriteFrame(DataFrame): """An animated sprite. Will be shown by the transport if the transport's camera is enabled. Will play at the framerate specified in the transport's `camera_out_framerate` constructor parameter. """ - images: List[ImageRawFrame] + images: List[OutputImageRawFrame] def __str__(self): pts = format_pts(self.pts) @@ -166,7 +147,7 @@ class SpriteFrame(Frame): @dataclass class TextFrame(DataFrame): """A chunk of text. Emitted by LLM services, consumed by TTS services, can - be used to send text through pipelines. + be used to send text through processors. """ @@ -177,41 +158,13 @@ class TextFrame(DataFrame): return f"{self.name}(pts: {pts}, text: [{self.text}])" -@dataclass -class TranscriptionFrame(TextFrame): - """A text frame with transcription-specific data. Will be placed in the - transport's receive queue when a participant speaks. - - """ - - user_id: str - timestamp: str - language: Language | None = None - - def __str__(self): - return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})" - - -@dataclass -class InterimTranscriptionFrame(TextFrame): - """A text frame with interim transcription-specific data. Will be placed in - the transport's receive queue when a participant speaks.""" - - user_id: str - timestamp: str - language: Language | None = None - - def __str__(self): - return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})" - - @dataclass class LLMMessagesFrame(DataFrame): """A frame containing a list of LLM messages. Used to signal that an LLM - service should run a chat completion and emit an LLMStartFrames, TextFrames - and an LLMEndFrame. Note that the messages property on this class is - mutable, and will be be updated by various ResponseAggregator frame - processors. + service should run a chat completion and emit an LLMFullResponseStartFrame, + TextFrames and an LLMFullResponseStartFrame. Note that the `messages` + property in this class is mutable, and will be be updated by various + aggregators. """ @@ -220,7 +173,7 @@ class LLMMessagesFrame(DataFrame): @dataclass class LLMMessagesAppendFrame(DataFrame): - """A frame containing a list of LLM messages that neeed to be added to the + """A frame containing a list of LLM messages that need to be added to the current context. """ @@ -274,17 +227,6 @@ class TransportMessageFrame(DataFrame): return f"{self.name}(message: {self.message})" -@dataclass -class FunctionCallResultFrame(DataFrame): - """A frame containing the result of an LLM function (tool) call.""" - - function_name: str - tool_call_id: str - arguments: str - result: Any - run_llm: bool = True - - # # App frames. Application user-defined frames. # @@ -300,11 +242,6 @@ class AppFrame(Frame): # -@dataclass -class SystemFrame(Frame): - pass - - @dataclass class StartFrame(SystemFrame): """This is the first frame that should be pushed down a pipeline.""" @@ -461,14 +398,10 @@ class BotSpeakingFrame(SystemFrame): @dataclass -class UserImageRequestFrame(SystemFrame): - """A frame user to request an image from the given user.""" +class MetricsFrame(SystemFrame): + """Emitted by processor that can compute metrics like latencies.""" - user_id: str - context: Optional[Any] = None - - def __str__(self): - return f"{self.name}, user: {self.user_id}" + data: List[MetricsData] @dataclass @@ -480,6 +413,17 @@ class FunctionCallInProgressFrame(SystemFrame): arguments: str +@dataclass +class FunctionCallResultFrame(SystemFrame): + """A frame containing the result of an LLM function (tool) call.""" + + function_name: str + tool_call_id: str + arguments: str + result: Any + run_llm: bool = True + + @dataclass class TransportMessageUrgentFrame(SystemFrame): message: Any @@ -489,10 +433,96 @@ class TransportMessageUrgentFrame(SystemFrame): @dataclass -class MetricsFrame(SystemFrame): - """Emitted by processor that can compute metrics like latencies.""" +class TranscriptionFrame(SystemFrame): + """A text frame with transcription-specific data. Will be placed in the + transport's receive queue when a participant speaks. - data: List[MetricsData] + """ + + text: str + user_id: str + timestamp: str + language: Language | None = None + + def __str__(self): + return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})" + + +@dataclass +class InterimTranscriptionFrame(SystemFrame): + """A text frame with interim transcription-specific data. Will be placed in + the transport's receive queue when a participant speaks.""" + + text: str + user_id: str + timestamp: str + language: Language | None = None + + def __str__(self): + return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})" + + +@dataclass +class UserImageRequestFrame(SystemFrame): + """A frame user to request an image from the given user.""" + + user_id: str + context: Optional[Any] = None + + def __str__(self): + return f"{self.name}, user: {self.user_id}" + + +@dataclass +class InputAudioRawFrame(SystemFrame): + """A chunk of audio usually coming from an input transport.""" + + audio: bytes + sample_rate: int + num_channels: int + + def __post_init__(self): + super().__post_init__() + self.num_frames = int(len(self.audio) / (self.num_channels * 2)) + + def __str__(self): + pts = format_pts(self.pts) + return f"{self.name}(pts: {pts}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})" + + +@dataclass +class InputImageRawFrame(SystemFrame): + """An image usually coming from an input transport.""" + + image: bytes + size: Tuple[int, int] + format: str | None + + def __str__(self): + pts = format_pts(self.pts) + return f"{self.name}(pts: {pts}, size: {self.size}, format: {self.format})" + + +@dataclass +class UserImageRawFrame(InputImageRawFrame): + """An image associated to a user.""" + + user_id: str + + def __str__(self): + pts = format_pts(self.pts) + return f"{self.name}(pts: {pts}, user: {self.user_id}, size: {self.size}, format: {self.format})" + + +@dataclass +class VisionImageRawFrame(InputImageRawFrame): + """An image with an associated text to ask for a description of it.""" + + text: str | None + + def __str__(self): + pts = format_pts(self.pts) + return f"{self.name}(pts: {pts}, text: [{self.text}], size: {self.size}, format: {self.format})" # @@ -500,11 +530,6 @@ class MetricsFrame(SystemFrame): # -@dataclass -class ControlFrame(Frame): - pass - - @dataclass class EndFrame(ControlFrame): """Indicates that a pipeline has ended and frame processors and pipelines