From ceeb93dd4645f493e63c2daabf60e9c9bcee22b1 Mon Sep 17 00:00:00 2001 From: Moishe Lettvin Date: Thu, 18 Jan 2024 11:58:08 -0500 Subject: [PATCH] Clean up example 5 --- .../services/daily_transport_service.py | 7 +-- .../theoretical-to-real/01-say-one-thing.py | 2 +- .../02-llm-say-one-thing.py | 3 +- .../04-utterance-and-speech.py | 2 +- .../05-sync-speech-and-text.py | 44 +++++-------------- 5 files changed, 17 insertions(+), 41 deletions(-) diff --git a/src/dailyai/services/daily_transport_service.py b/src/dailyai/services/daily_transport_service.py index 5349036e5..681c02a4e 100644 --- a/src/dailyai/services/daily_transport_service.py +++ b/src/dailyai/services/daily_transport_service.py @@ -206,11 +206,12 @@ class DailyTransportService(EventHandler): if type(frame) == QueueFrame and frame.frame_type == FrameType.END_STREAM: break - def wait_for_send_queue_to_empty(self): + async def wait_for_send_queue_to_empty(self): + await self.send_queue.join() self.threadsafe_send_queue.join() - def stop_when_done(self): - self.wait_for_send_queue_to_empty() + async def stop_when_done(self): + await self.wait_for_send_queue_to_empty() self.stop() async def run(self) -> None: diff --git a/src/samples/theoretical-to-real/01-say-one-thing.py b/src/samples/theoretical-to-real/01-say-one-thing.py index 9a95bbd9b..27b6a3b3d 100644 --- a/src/samples/theoretical-to-real/01-say-one-thing.py +++ b/src/samples/theoretical-to-real/01-say-one-thing.py @@ -38,7 +38,7 @@ async def main(room_url): ) # wait for the output queue to be empty, then leave the meeting - transport.stop_when_done() + await transport.stop_when_done() await transport.run() diff --git a/src/samples/theoretical-to-real/02-llm-say-one-thing.py b/src/samples/theoretical-to-real/02-llm-say-one-thing.py index 425eacf2f..a2842315c 100644 --- a/src/samples/theoretical-to-real/02-llm-say-one-thing.py +++ b/src/samples/theoretical-to-real/02-llm-say-one-thing.py @@ -37,8 +37,7 @@ async def main(room_url): @transport.event_handler("on_first_other_participant_joined") async def on_first_other_participant_joined(transport): await tts_task - - transport.stop_when_done() + await transport.stop_when_done() await transport.run() diff --git a/src/samples/theoretical-to-real/04-utterance-and-speech.py b/src/samples/theoretical-to-real/04-utterance-and-speech.py index bf831c29b..93130d0c3 100644 --- a/src/samples/theoretical-to-real/04-utterance-and-speech.py +++ b/src/samples/theoretical-to-real/04-utterance-and-speech.py @@ -63,7 +63,7 @@ async def main(room_url:str): await asyncio.gather(llm_response_task, buffer_to_send_queue()) - transport.stop_when_done() + await transport.stop_when_done() await transport.run() diff --git a/src/samples/theoretical-to-real/05-sync-speech-and-text.py b/src/samples/theoretical-to-real/05-sync-speech-and-text.py index b0cd1e7c7..e38020cec 100644 --- a/src/samples/theoretical-to-real/05-sync-speech-and-text.py +++ b/src/samples/theoretical-to-real/05-sync-speech-and-text.py @@ -41,9 +41,6 @@ async def main(room_url): return all_audio async def get_month_data(month): - image_text = "" - tts_tasks = [] - first_sentence = True messages = [ { "role": "system", @@ -51,41 +48,23 @@ async def main(room_url): } ] - async for frame in SentenceAggregator().run(llm.run([QueueFrame(FrameType.LLM_MESSAGE, messages)])): - if type(frame.frame_data) != str: - raise Exception("LLM service requires a string for the data field") - - sentence: str = frame.frame_data - image_text += sentence - - if first_sentence: - sentence = f"{month}: {sentence}" - else: - first_sentence = False - - tts_tasks.append(get_all_audio(sentence)) - - tts_tasks.insert(0, dalle.run_image_gen(image_text)) - - print(f"waiting for tasks to finish for {month}") - data = await asyncio.gather( - *tts_tasks + image_description = await llm.run_llm(messages) + to_speak = f"{month}: {image_description}" + (audio, image_data) = await asyncio.gather( + get_all_audio(to_speak), dalle.run_image_gen(image_description) ) - print(f"done gathering tts tasks for {month}") - return { "month": month, - "text": image_text, - "image": data[0][1], - "audio": data[1:], + "text": image_description, + "image": image_data[1], + "audio": audio, } months: list[str] = [ "January", "February", - "March"] - """ + "March", "April", "May", "June", @@ -96,7 +75,6 @@ async def main(room_url): "November", "December", ] - """ @transport.event_handler("on_first_other_participant_joined") async def on_first_other_participant_joined(transport): @@ -108,14 +86,12 @@ async def main(room_url): await transport.send_queue.put( [ QueueFrame(FrameType.IMAGE, data["image"]), - QueueFrame(FrameType.AUDIO, data["audio"][0]), + QueueFrame(FrameType.AUDIO, data["audio"]), ] ) - for audio in data["audio"][1:]: - await transport.send_queue.put(QueueFrame(FrameType.AUDIO, audio)) # wait for the output queue to be empty, then leave the meeting - transport.stop_when_done() + await transport.stop_when_done() month_tasks = [asyncio.create_task(get_month_data(month)) for month in months]