diff --git a/src/dailyai/pipeline/frame_processor.py b/src/dailyai/pipeline/frame_processor.py index b435e9f06..5bebd9be2 100644 --- a/src/dailyai/pipeline/frame_processor.py +++ b/src/dailyai/pipeline/frame_processor.py @@ -3,6 +3,19 @@ from typing import AsyncGenerator from dailyai.pipeline.frames import ControlQueueFrame, QueueFrame +""" +This is the base class for all frame processors. Frame processors consume a frame +and yield 0 or more frames. Generally frame processors are used as part of a pipeline, +where frames come from a source queue, are processed by a series of frame processors, +then placed on a sink queue. + +By convention, FrameProcessors should immediately yield any frames they don't process. + +Stateful FrameProcessors should watch for the EndStreamQueueFrame and finalize their +output, eg. yielding an unfinished sentence if they're aggregating LLM output to full +sentences. EndStreamQueueFrame is also a chance to clean up any services that need to +be closed, del'd, etc. +""" class FrameProcessor: @abstractmethod diff --git a/src/dailyai/pipeline/pipeline.py b/src/dailyai/pipeline/pipeline.py index 780d31750..4cb2fb3f2 100644 --- a/src/dailyai/pipeline/pipeline.py +++ b/src/dailyai/pipeline/pipeline.py @@ -4,6 +4,12 @@ from dailyai.pipeline.frame_processor import FrameProcessor from dailyai.pipeline.frames import EndParallelPipeQueueFrame, EndStreamQueueFrame, QueueFrame +""" +This class manages a pipe of FrameProcessors, and runs them in sequence. The "source" +and "sink" queues are managed by the caller. You can use this class stand-alone to +perform specialized processing, or you can use the Transport's run_pipeline method to +instantiate and run a pipeline with the Transport's sink and source queues. +""" class Pipeline: def __init__( diff --git a/src/dailyai/services/ai_services.py b/src/dailyai/services/ai_services.py index 4860f245d..13b069a1c 100644 --- a/src/dailyai/services/ai_services.py +++ b/src/dailyai/services/ai_services.py @@ -7,7 +7,6 @@ from dailyai.pipeline.frame_processor import FrameProcessor from dailyai.pipeline.frames import ( AudioQueueFrame, - ControlQueueFrame, EndStreamQueueFrame, ImageQueueFrame, LLMMessagesQueueFrame,