From d0076dd4ee07596769a65448d714c3a7badb6b37 Mon Sep 17 00:00:00 2001 From: Moishe Lettvin Date: Thu, 7 Mar 2024 18:59:24 -0500 Subject: [PATCH 1/3] Optimize pipeline processing so we don't wait for the completion of one generator to move onto the next. --- src/dailyai/pipeline/pipeline.py | 31 +++++++++---------- src/dailyai/services/ai_services.py | 1 + src/dailyai/tests/test_pipeline.py | 1 - .../foundational/05-sync-speech-and-image.py | 15 ++++++++- .../foundational/06-listen-and-respond.py | 2 +- 5 files changed, 31 insertions(+), 19 deletions(-) diff --git a/src/dailyai/pipeline/pipeline.py b/src/dailyai/pipeline/pipeline.py index 8e6c13d2b..e755377ef 100644 --- a/src/dailyai/pipeline/pipeline.py +++ b/src/dailyai/pipeline/pipeline.py @@ -48,6 +48,16 @@ class Pipeline: raise ValueError("Source queue not set") yield await self.source.get() + async def run_pipeline_recursively( + self, initial_frame: Frame, processors: List[FrameProcessor] + ) -> AsyncGenerator[Frame, None]: + if processors: + async for frame in processors[0].process_frame(initial_frame): + async for final_frame in self.run_pipeline_recursively(frame, processors[1:]): + yield final_frame + else: + yield initial_frame + async def run_pipeline(self): """ Run the pipeline. Take each frame from the source queue, pass it to the first frame_processor, pass the output of that frame_processor to the @@ -65,23 +75,12 @@ class Pipeline: try: while True: - frame_generators = [self.get_next_source_frame()] - for processor in self.processors: - next_frame_generators = [] - for frame_generator in frame_generators: - async for frame in frame_generator: - next_frame_generators.append(processor.process_frame(frame)) - frame_generators = next_frame_generators + initial_frame = await self.source.get() + async for frame in self.run_pipeline_recursively(initial_frame, self.processors): + await self.sink.put(frame) - for frame_generator in frame_generators: - async for frame in frame_generator: - await self.sink.put(frame) - if isinstance( - frame, EndFrame - ) or isinstance( - frame, EndPipeFrame - ): - return + if isinstance(initial_frame, EndFrame) or isinstance(initial_frame, EndPipeFrame): + break except asyncio.CancelledError: # this means there's been an interruption, do any cleanup necessary here. for processor in self.processors: diff --git a/src/dailyai/services/ai_services.py b/src/dailyai/services/ai_services.py index e738b6cdb..28b50dcac 100644 --- a/src/dailyai/services/ai_services.py +++ b/src/dailyai/services/ai_services.py @@ -138,6 +138,7 @@ class TTSService(AIService): text = frame.text else: self.current_sentence += frame.text + print(f"current sentence: {self.current_sentence}") if self.current_sentence.endswith((".", "?", "!")): text = self.current_sentence self.current_sentence = "" diff --git a/src/dailyai/tests/test_pipeline.py b/src/dailyai/tests/test_pipeline.py index b5c9edd05..0d7f43f3b 100644 --- a/src/dailyai/tests/test_pipeline.py +++ b/src/dailyai/tests/test_pipeline.py @@ -1,5 +1,4 @@ import asyncio -from doctest import OutputChecker import unittest from dailyai.pipeline.aggregators import SentenceAggregator, StatelessTextTransformer from dailyai.pipeline.frames import EndFrame, TextFrame diff --git a/src/examples/foundational/05-sync-speech-and-image.py b/src/examples/foundational/05-sync-speech-and-image.py index ee02f6ed3..cb72642d1 100644 --- a/src/examples/foundational/05-sync-speech-and-image.py +++ b/src/examples/foundational/05-sync-speech-and-image.py @@ -47,7 +47,20 @@ async def main(room_url): source_queue = asyncio.Queue() - for month in ["January", "February"]: + for month in [ + "January", + "February", + "March", + "April", + "May", + "June", + "July", + "August", + "September", + "October", + "November", + "December", + ]: messages = [ { "role": "system", diff --git a/src/examples/foundational/06-listen-and-respond.py b/src/examples/foundational/06-listen-and-respond.py index ce117a9a9..90fcfb396 100644 --- a/src/examples/foundational/06-listen-and-respond.py +++ b/src/examples/foundational/06-listen-and-respond.py @@ -51,8 +51,8 @@ async def main(room_url: str, token): tma_in, llm, fl2, + tts, tma_out, - tts ], ) await transport.run_uninterruptible_pipeline(pipeline) From edd93bc4cb32cab777b7caf60e66dab17a65edb0 Mon Sep 17 00:00:00 2001 From: Moishe Lettvin Date: Thu, 7 Mar 2024 19:05:03 -0500 Subject: [PATCH 2/3] remove errant print statement --- src/dailyai/services/ai_services.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/dailyai/services/ai_services.py b/src/dailyai/services/ai_services.py index 28b50dcac..e738b6cdb 100644 --- a/src/dailyai/services/ai_services.py +++ b/src/dailyai/services/ai_services.py @@ -138,7 +138,6 @@ class TTSService(AIService): text = frame.text else: self.current_sentence += frame.text - print(f"current sentence: {self.current_sentence}") if self.current_sentence.endswith((".", "?", "!")): text = self.current_sentence self.current_sentence = "" From 196279e34259be221050bb7a37167b648986ca69 Mon Sep 17 00:00:00 2001 From: Moishe Lettvin Date: Thu, 7 Mar 2024 19:24:27 -0500 Subject: [PATCH 3/3] Add endframe to sample 4 --- src/examples/foundational/04-utterance-and-speech.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/examples/foundational/04-utterance-and-speech.py b/src/examples/foundational/04-utterance-and-speech.py index 3b67fb149..242a5c21c 100644 --- a/src/examples/foundational/04-utterance-and-speech.py +++ b/src/examples/foundational/04-utterance-and-speech.py @@ -44,7 +44,8 @@ async def main(room_url: str): buffer_queue = asyncio.Queue() source_queue = asyncio.Queue() pipeline = Pipeline(source = source_queue, sink=buffer_queue, processors=[llm, elevenlabs_tts]) - source_queue.put_nowait(LLMMessagesQueueFrame(messages)) + await source_queue.put(LLMMessagesQueueFrame(messages)) + await source_queue.put(EndFrame()) pipeline_run_task = pipeline.run_pipeline() @transport.event_handler("on_first_other_participant_joined")