Compare commits

...

11 Commits

Author SHA1 Message Date
Kwindla Hultman Kramer
363a722370 example 02 2024-03-03 17:16:46 -08:00
Kwindla Hultman Kramer
0512765aeb example 01 2024-03-03 16:38:03 -08:00
Moishe Lettvin
2d8d9146f8 ... 2024-02-28 14:58:28 -05:00
Moishe Lettvin
9829f77052 sample 05 works 2024-02-28 13:49:08 -05:00
Moishe Lettvin
8071ee6b4b starting on sample 5 2024-02-28 13:12:08 -05:00
Moishe Lettvin
d26d23fed7 merge aggregator & test 2024-02-28 11:05:49 -05:00
Moishe Lettvin
38cb8ca6a3 ...? 2024-02-27 20:47:08 -05:00
Moishe Lettvin
d03dd62941 probably will undo most of this 2024-02-27 18:52:01 -05:00
Moishe Lettvin
fc19a55f04 ... 2024-02-27 13:51:55 -05:00
Moishe Lettvin
8f9c252af9 ... 2024-02-27 13:03:07 -05:00
Moishe Lettvin
cec2bdc15f playing around with this 2024-02-27 09:55:40 -05:00
12 changed files with 431 additions and 160 deletions

View File

@@ -1,29 +1,106 @@
import asyncio
from dailyai.queue_frame import LLMMessagesQueueFrame, QueueFrame, TextQueueFrame, TranscriptionQueueFrame
from dailyai.services.ai_services import AIService
from attr import dataclass
from typing import AsyncGenerator, List
from dailyai.queue_frame import (
ControlQueueFrame,
EndStreamQueueFrame,
LLMMessagesQueueFrame,
QueueFrame,
TextQueueFrame,
TranscriptionQueueFrame,
)
from dailyai.services.ai_services import AIService, PipeService
from typing import Any, AsyncGenerator, Callable, List, Tuple
class QueueTee:
async def run_to_queue_and_generate(
self,
output_queue: asyncio.Queue,
generator: AsyncGenerator[QueueFrame, None]
) -> AsyncGenerator[QueueFrame, None]:
async for frame in generator:
await output_queue.put(frame)
yield frame
async def run_to_queues(
self,
output_queues: List[asyncio.Queue],
generator: AsyncGenerator[QueueFrame, None]
class QueueTee(PipeService):
def __init__(
self, sinks: list[PipeService], *args, **kwargs
):
async for frame in generator:
for queue in output_queues:
await queue.put(frame)
super().__init__(*args, **kwargs)
self.sinks: List[PipeService] = []
for sink in sinks:
sink.source_queue = asyncio.Queue()
self.sinks.append(sink)
async def process_queue(self):
if not self.source_queue:
return
while True:
frame: QueueFrame = await self.source_queue.get()
for sink in self.sinks:
if sink.source_queue:
await sink.source_queue.put(frame)
if isinstance(frame, EndStreamQueueFrame):
break
class QueueFrameAggregator(PipeService):
def __init__(
self,
aggregator: Callable[[Any, QueueFrame], Tuple[Any, QueueFrame | None]],
finalizer: Callable[[Any], QueueFrame | None],
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.aggregator = aggregator
self.finalizer = finalizer
self.aggregation = None
async def process_frame(
self, frame: QueueFrame
) -> AsyncGenerator[QueueFrame, None]:
output_frame: QueueFrame | None = None
(self.aggregation, output_frame) = self.aggregator(
self.aggregation, frame
)
if output_frame:
yield output_frame
async def finalize(self) -> AsyncGenerator[QueueFrame, None]:
output_frame = self.finalizer(self.aggregation)
if output_frame:
yield output_frame
class QueueMergeGateOnFirst(PipeService):
def __init__(
self, source_queues: List[asyncio.Queue[QueueFrame]]
):
super().__init__()
self.source_queues = source_queues
async def process_queue(self):
(frames): list[QueueFrame] = await asyncio.gather(
*[source_queue.get() for source_queue in self.source_queues]
)
for idx, frame in enumerate(frames):
# if the frame we got from a source is an EndStreamQueueFrame, remove that source
if isinstance(frame, EndStreamQueueFrame):
self.source_queues.pop(idx)
else:
await self.sink_queue.put(frame)
async def pass_through(sink, source):
while True:
frame = await source.get()
if isinstance(frame, EndStreamQueueFrame):
break
else:
await sink.put(frame)
await asyncio.gather(
*[pass_through(self.sink_queue, source) for source in self.source_queues]
)
await self.sink_queue.put(EndStreamQueueFrame())
class LLMContextAggregator(AIService):

View File

@@ -6,7 +6,6 @@ import wave
from dailyai.queue_frame import (
AudioQueueFrame,
ControlQueueFrame,
EndStreamQueueFrame,
ImageQueueFrame,
LLMMessagesQueueFrame,
@@ -18,12 +17,58 @@ from dailyai.queue_frame import (
from abc import abstractmethod
from typing import AsyncGenerator, AsyncIterable, BinaryIO, Iterable
from dataclasses import dataclass
class AbstractPipeService:
def __init__(
self,
):
self.sink_queue: asyncio.Queue[QueueFrame] = asyncio.Queue()
@abstractmethod
async def process_queue(self):
pass
class PipeService(AbstractPipeService):
def __init__(
self,
source_queue: asyncio.Queue[QueueFrame] | None = None,
):
super().__init__()
self.logger: logging.Logger = logging.getLogger("dailyai")
self.source_queue = source_queue
async def process_queue(self):
if not self.source_queue:
return
while True:
frame: QueueFrame = await self.source_queue.get()
async for output_frame in self.process_frame(frame):
if isinstance(frame, EndStreamQueueFrame):
async for final_frame in self.finalize():
await self.sink_queue.put(final_frame)
await self.sink_queue.put(output_frame)
return
await self.sink_queue.put(output_frame)
@abstractmethod
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
yield frame
@abstractmethod
async def finalize(self) -> AsyncGenerator[QueueFrame, None]:
# This is a trick for the interpreter (and linter) to know that this is a generator.
if False:
yield QueueFrame()
class AIService:
class AIService(PipeService):
def __init__(self):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.logger = logging.getLogger("dailyai")
def stop(self):
@@ -67,17 +112,6 @@ class AIService:
self.logger.error("Exception occurred while running AI service", e)
raise e
@abstractmethod
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if isinstance(frame, ControlQueueFrame):
yield frame
@abstractmethod
async def finalize(self) -> AsyncGenerator[QueueFrame, None]:
# This is a trick for the interpreter (and linter) to know that this is a generator.
if False:
yield QueueFrame()
class LLMService(AIService):
@abstractmethod
@@ -98,8 +132,8 @@ class LLMService(AIService):
class TTSService(AIService):
def __init__(self, aggregate_sentences=True):
super().__init__()
def __init__(self, aggregate_sentences=True, *args, **kwargs):
super().__init__(*args, **kwargs)
self.aggregate_sentences: bool = aggregate_sentences
self.current_sentence: str = ""
@@ -138,7 +172,10 @@ class TTSService(AIService):
yield AudioQueueFrame(audio_chunk)
# Convenience function to send the audio for a sentence to the given queue
async def say(self, sentence, queue: asyncio.Queue):
async def say(self, sentence, queue: asyncio.Queue|None=None):
queue = queue or self.sink
if not queue:
raise Exception("No queue to send audio to")
await self.run_to_queue(queue, [TextQueueFrame(sentence)])

View File

@@ -48,8 +48,17 @@ class AzureTTSService(TTSService):
class AzureLLMService(LLMService):
def __init__(self, *, api_key, endpoint, api_version="2023-12-01-preview", model):
super().__init__()
def __init__(
self,
api_key,
endpoint,
model,
api_version="2023-12-01-preview",
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self._model: str = model
self._client = AsyncAzureOpenAI(

View File

@@ -82,7 +82,6 @@ class BaseTransportService():
self._stop_threads.set()
await self.send_queue.put(EndStreamQueueFrame())
await async_output_queue_marshal_task
await self.send_queue.join()
self._frame_consumer_thread.join()

View File

@@ -205,7 +205,7 @@ class DailyTransportService(BaseTransportService, EventHandler):
def _post_run(self):
self.client.leave()
def on_first_other_participant_joined(self):
def on_first_other_participant_joined(self, participant):
pass
def call_joined(self, join_data, client_error):
@@ -226,7 +226,7 @@ class DailyTransportService(BaseTransportService, EventHandler):
def on_participant_joined(self, participant):
if not self._other_participant_has_joined and participant["id"] != self._my_participant_id:
self._other_participant_has_joined = True
self.on_first_other_participant_joined()
self.on_first_other_participant_joined(participant)
def on_participant_left(self, participant, reason):
if len(self.client.participants()) < self._min_others_count + 1:

View File

@@ -12,12 +12,13 @@ class ElevenLabsTTSService(TTSService):
def __init__(
self,
*,
aiohttp_session: aiohttp.ClientSession,
api_key,
voice_id,
*args,
**kwargs
):
super().__init__()
super().__init__(*args, **kwargs)
self._api_key = api_key
self._voice_id = voice_id

View File

@@ -11,12 +11,13 @@ class PlayHTAIService(TTSService):
def __init__(
self,
*,
api_key,
user_id,
voice_url
voice_url,
*args,
**kwargs,
):
super().__init__()
super().__init__(*args, **kwargs)
self.speech_key = api_key
self.user_id = user_id

View File

@@ -0,0 +1,124 @@
import asyncio
import unittest
from dailyai.queue_frame import EndStreamQueueFrame, TextQueueFrame
from dailyai.services.ai_services import PipeService
from dailyai.queue_aggregators import QueueFrameAggregator, QueueMergeGateOnFirst, QueueTee
class IncomingPipeService(PipeService):
def __init__(self):
super().__init__()
self.sink_queue = asyncio.Queue()
class QueueTeeTest(unittest.IsolatedAsyncioTestCase):
async def test_queue_tee(self):
originpipe = IncomingPipeService()
inpipe1 = PipeService(originpipe.sink_queue)
outpipe1 = PipeService()
outpipe2 = PipeService()
teepipe = QueueTee(source_queue=inpipe1.sink_queue, sinks=[outpipe1, outpipe2])
originpipe.sink_queue.put_nowait(TextQueueFrame("test"))
originpipe.sink_queue.put_nowait(EndStreamQueueFrame())
await asyncio.gather(*[pipe.process_queue() for pipe in [originpipe, inpipe1, outpipe1, outpipe2, teepipe]])
def validateOutputPipe(pipe: PipeService):
self.assertEqual(pipe.sink_queue.qsize(), 2)
frame = pipe.sink_queue.get_nowait()
self.assertIsInstance(frame, TextQueueFrame)
if isinstance(frame, TextQueueFrame):
self.assertEqual(frame.text, "test")
self.assertIsInstance(pipe.sink_queue.get_nowait(), EndStreamQueueFrame)
validateOutputPipe(outpipe1)
validateOutputPipe(outpipe2)
class QueueFrameAggregatorTest(unittest.IsolatedAsyncioTestCase):
async def test_queue_frame_aggregator(self):
def aggregate_sentences(accumulation, frame):
if not accumulation:
accumulation = ""
if isinstance(frame, TextQueueFrame):
accumulation += frame.text
if accumulation.endswith((".", "!", "?")):
return ("", TextQueueFrame(accumulation))
return (accumulation, None)
def finalize_sentences(accumulation):
return TextQueueFrame(accumulation)
originpipe = IncomingPipeService()
aggregator_pipe = QueueFrameAggregator(
source_queue=originpipe.sink_queue,
aggregator=aggregate_sentences,
finalizer=finalize_sentences,
)
originpipe.sink_queue.put_nowait(TextQueueFrame("testing, "))
originpipe.sink_queue.put_nowait(TextQueueFrame("one."))
originpipe.sink_queue.put_nowait(TextQueueFrame("two."))
originpipe.sink_queue.put_nowait(TextQueueFrame("three."))
originpipe.sink_queue.put_nowait(TextQueueFrame("can you "))
originpipe.sink_queue.put_nowait(TextQueueFrame("hear me"))
originpipe.sink_queue.put_nowait(EndStreamQueueFrame())
await asyncio.gather(originpipe.process_queue(), aggregator_pipe.process_queue())
self.assertEqual(aggregator_pipe.sink_queue.qsize(), 5)
expected_text = ["testing, one.", "two.", "three.", "can you hear me"]
for exepectation in expected_text:
frame = aggregator_pipe.sink_queue.get_nowait()
print(frame)
self.assertIsInstance(frame, TextQueueFrame)
if isinstance(frame, TextQueueFrame):
self.assertEqual(frame.text, exepectation)
self.assertIsInstance(aggregator_pipe.sink_queue.get_nowait(), EndStreamQueueFrame)
class QueueMergeGateOnFirstTest(unittest.IsolatedAsyncioTestCase):
async def test_queue_merge_gate_on_first(self):
pipe1 = IncomingPipeService()
pipe2 = IncomingPipeService()
merge_pipe = QueueMergeGateOnFirst(
source_queues=[pipe1.sink_queue, pipe2.sink_queue],
)
evt = asyncio.Event()
async def add_items_to_first_pipe():
await evt.wait()
await pipe1.sink_queue.put(TextQueueFrame("pipe1.1"))
await pipe1.sink_queue.put(TextQueueFrame("pipe1.2"))
await pipe1.sink_queue.put(EndStreamQueueFrame())
async def add_items_to_second_pipe():
await pipe2.sink_queue.put(TextQueueFrame("pipe2.1"))
evt.set()
await pipe2.sink_queue.put(EndStreamQueueFrame())
await asyncio.gather(
*[pipe.process_queue() for pipe in [pipe1, pipe2, merge_pipe]],
add_items_to_first_pipe(),
add_items_to_second_pipe())
self.assertEqual(merge_pipe.sink_queue.qsize(), 4)
frame = merge_pipe.sink_queue.get_nowait()
assert isinstance(frame, TextQueueFrame)
if isinstance(frame, TextQueueFrame):
self.assertEqual(frame.text, "pipe1.1")
frame = merge_pipe.sink_queue.get_nowait()
assert isinstance(frame, TextQueueFrame)
if isinstance(frame, TextQueueFrame):
self.assertEqual(frame.text, "pipe2.1")
frame = merge_pipe.sink_queue.get_nowait()
assert isinstance(frame, TextQueueFrame)
if isinstance(frame, TextQueueFrame):
self.assertEqual(frame.text, "pipe1.2")
frame = merge_pipe.sink_queue.get_nowait()
assert isinstance(frame, EndStreamQueueFrame)

View File

@@ -0,0 +1,30 @@
import asyncio
import unittest
from dailyai.queue_frame import EndStreamQueueFrame, TextQueueFrame
from dailyai.services.ai_services import PipeService
class TestPipeService(unittest.IsolatedAsyncioTestCase):
class IncomingPipeService(PipeService):
def __init__(self):
super().__init__()
self.sink_queue = asyncio.Queue()
async def test_pipe_chain(self):
pipe1 = TestPipeService.IncomingPipeService()
pipe2 = PipeService(pipe1.sink_queue)
pipe3 = PipeService(pipe2.sink_queue)
await pipe1.sink_queue.put(TextQueueFrame("test"))
await pipe1.sink_queue.put(EndStreamQueueFrame())
await asyncio.gather(pipe1.process_queue(), pipe2.process_queue(), pipe3.process_queue())
self.assertEqual(pipe3.sink_queue.qsize(), 2)
frame = await pipe3.sink_queue.get()
self.assertIsInstance(frame, TextQueueFrame)
if isinstance(frame, TextQueueFrame):
self.assertEqual(frame.text, "test")
frame = await pipe3.sink_queue.get()
self.assertIsInstance(frame, EndStreamQueueFrame)

View File

@@ -1,6 +1,7 @@
import asyncio
import aiohttp
import os
from dailyai.queue_frame import EndStreamQueueFrame, TextQueueFrame
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
@@ -28,36 +29,40 @@ async def main(room_url):
mic_enabled=True
)
"""
tts = ElevenLabsTTSService(
source_queue=asyncio.Queue(),
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"))
"""
tts = PlayHTAIService(
sink=transport.send_queue,
api_key=os.getenv("PLAY_HT_API_KEY"),
user_id=os.getenv("PLAY_HT_USER_ID"),
voice_url=os.getenv("PLAY_HT_VOICE_URL"),
)
"""
tts.sink_queue = transport.send_queue
# Register an event handler so we can play the audio when the participant joins.
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, participant):
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport, participant):
nonlocal tts
if participant["info"]["isLocal"]:
return
await tts.say(
"Hello there, " + participant["info"]["userName"] + "!",
transport.send_queue,
# todo: update the tts.say() convenience method to use the new queue architecture
await tts.source_queue.put(
TextQueueFrame("Hello there, " +
participant["info"]["userName"] + "!")
)
await tts.source_queue.put(EndStreamQueueFrame())
# wait for the output queue to be empty, then leave the meeting
await transport.stop_when_done()
await transport.run()
del(tts)
# todo: commented out because it seems to exit a little early, before
# the audio is finished playing
# await transport.stop_when_done()
await asyncio.gather(transport.run(), tts.process_queue())
if __name__ == "__main__":
(url, token) = configure()

View File

@@ -3,7 +3,7 @@ import os
import aiohttp
from dailyai.queue_frame import LLMMessagesQueueFrame
from dailyai.queue_frame import EndStreamQueueFrame, LLMMessagesQueueFrame
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
@@ -23,35 +23,35 @@ async def main(room_url):
mic_enabled=True
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"))
# tts = AzureTTSService(api_key=os.getenv("AZURE_SPEECH_API_KEY"), region=os.getenv("AZURE_SPEECH_REGION"))
# tts = DeepgramTTSService(aiohttp_session=session, api_key=os.getenv("DEEPGRAM_API_KEY"), voice=os.getenv("DEEPGRAM_VOICE"))
llm = AzureLLMService(
source_queue=asyncio.Queue(),
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
model=os.getenv("AZURE_CHATGPT_MODEL"))
# llm = OpenAILLMService(api_key=os.getenv("OPENAI_CHATGPT_API_KEY"))
messages = [{
"role": "system",
"content": "You are an LLM in a WebRTC session, and this is a 'hello world' demo. Say hello to the world."
}]
tts_task = asyncio.create_task(
tts.run_to_queue(
transport.send_queue,
llm.run([LLMMessagesQueueFrame(messages)]),
)
model=os.getenv("AZURE_CHATGPT_MODEL"),
)
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
await tts_task
await transport.stop_when_done()
tts = ElevenLabsTTSService(
await transport.run()
source_queue=llm.sink_queue, # this should really be a sentence aggregator?
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
tts.sink_queue = transport.send_queue
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport, participant):
messages = [{
"role": "system",
"content": "You are an LLM in a WebRTC session, and this is a 'hello world' demo. Say hello to the world."
}]
await llm.source_queue.put(LLMMessagesQueueFrame(messages))
await llm.source_queue.put(EndStreamQueueFrame())
# todo: commented out because it exits before audio plays
# await transport.stop_when_done()
await asyncio.gather(transport.run(), llm.process_queue(), tts.process_queue())
if __name__ == "__main__":

View File

@@ -1,8 +1,11 @@
import asyncio
from typing import Any, AsyncGenerator, Callable, Tuple
import aiohttp
import os
from dailyai.queue_aggregators import QueueFrameAggregator, QueueMergeGateOnFirst, QueueTee
from dailyai.queue_frame import AudioQueueFrame, ImageQueueFrame
from dailyai.queue_frame import AudioQueueFrame, EndStreamQueueFrame, ImageQueueFrame, LLMMessagesQueueFrame, LLMResponseEndQueueFrame, QueueFrame, TextQueueFrame
from dailyai.services.ai_services import PipeService
from dailyai.services.azure_ai_services import AzureLLMService, AzureImageGenServiceREST, AzureTTSService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.daily_transport_service import DailyTransportService
@@ -27,34 +30,70 @@ async def main(room_url):
camera_height=1024
)
"""
/ TTS \
Month prompt -> LLM -> Fork -> -> Gate -> Transport
\ Aggregate -> ImageGen /
"""
month_description_queue: asyncio.Queue[QueueFrame] = asyncio.Queue()
llm = AzureLLMService(
source_queue=month_description_queue,
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
model=os.getenv("AZURE_CHATGPT_MODEL"))
model=os.getenv("AZURE_CHATGPT_MODEL"),
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id="ErXwobaYiN019PkySvjV")
# tts = AzureTTSService(api_key=os.getenv("AZURE_SPEECH_API_KEY"), region=os.getenv("AZURE_SPEECH_REGION"))
dalle = FalImageGenService(
image_size="1024x1024",
aiohttp_session=session,
key_id=os.getenv("FAL_KEY_ID"),
key_secret=os.getenv("FAL_KEY_SECRET"))
# dalle = OpenAIImageGenService(aiohttp_session=session, api_key=os.getenv("OPENAI_DALLE_API_KEY"), image_size="1024x1024")
# dalle = AzureImageGenServiceREST(image_size="1024x1024", aiohttp_session=session, api_key=os.getenv("AZURE_DALLE_API_KEY"), endpoint=os.getenv("AZURE_DALLE_ENDPOINT"), model=os.getenv("AZURE_DALLE_MODEL"))
key_secret=os.getenv("FAL_KEY_SECRET"),
)
# Get a complete audio chunk from the given text. Splitting this into its own
# coroutine lets us ensure proper ordering of the audio chunks on the send queue.
async def get_all_audio(text):
all_audio = bytearray()
async for audio in tts.run_tts(text):
all_audio.extend(audio)
def aggregator(
accumulation, frame: QueueFrame
) -> tuple[Any, QueueFrame | None]:
if not accumulation:
accumulation = ""
return all_audio
if isinstance(frame, TextQueueFrame):
accumulation += frame.text
return (accumulation, None)
elif isinstance(frame, LLMResponseEndQueueFrame):
return ("", TextQueueFrame(accumulation))
else:
return (accumulation, frame)
async def get_month_data(month):
# This queue service takes chunks from LLM output and merges them into one text frame
# that will be used to prompt the image service.
llm_aggregator_for_image = QueueFrameAggregator(aggregator=aggregator, finalizer=lambda x: None)
# Set the source queue for the image service to the sink of the aggregator service
dalle.source_queue = llm_aggregator_for_image.sink_queue
# This queue service takes the output from the LLM and sends it to the TTS service and
# the aggregator for the image generation service.
tee = QueueTee(source_queue=llm.sink_queue, sinks=[tts, llm_aggregator_for_image])
# This queue service takes input from the TTS service and the image service, and waits
# to forward any audio frames until the image generation is complete. It will send
# the image first, then the audio frames; this ensures that the image is shown before
# the audio associated with the image is played.
tts_image_gate = QueueMergeGateOnFirst([dalle.sink_queue, tts.sink_queue])
# We send the image of this queue service to the transport output.
tts_image_gate.sink_queue = transport.send_queue
# Queue up all the months in the LLM service source queue
months = ["January"] #, "February"]
for month in months:
messages = [
{
"role": "system",
@@ -62,72 +101,21 @@ async def main(room_url):
}
]
image_description = await llm.run_llm(messages)
if not image_description:
return
await month_description_queue.put(LLMMessagesQueueFrame(messages))
to_speak = f"{month}: {image_description}"
audio_task = asyncio.create_task(get_all_audio(to_speak))
image_task = asyncio.create_task(dalle.run_image_gen(image_description))
print(f"about to gather tasks for {month}")
(audio, image_data) = await asyncio.gather(
audio_task, image_task
)
print(f"about to return from get_month_data for {month}")
return {
"month": month,
"text": image_description,
"image_url": image_data[0],
"image": image_data[1],
"audio": audio,
}
await month_description_queue.put(EndStreamQueueFrame())
months: list[str] = [
"January",
"February",
"March",
"April",
"May",
"June"
pipeline = [
llm,
[tee,
[tts,
[llm_aggregator_for_image, dalle]
],
tts_image_gate],
transport,
]
"""
"February",
"March",
"April",
"May",
"June",
"July",
"August",
"September",
"October",
"November",
"December",
"""
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
# This will play the months in the order they're completed. The benefit
# is we'll have as little delay as possible before the first month, and
# likely no delay between months, but the months won't display in order.
for month_data_task in asyncio.as_completed(month_tasks):
print(f"month_data_task: {month_data_task}")
try:
data = await month_data_task
except Exception:
print("OMG EXCEPTION!!!!")
if data:
await transport.send_queue.put(
[
ImageQueueFrame(data["image_url"], data["image"]),
AudioQueueFrame(data["audio"]),
]
)
# wait for the output queue to be empty, then leave the meeting
await transport.stop_when_done()
month_tasks = [asyncio.create_task(get_month_data(month)) for month in months]
await transport.run()
await asyncio.gather(transport.run(), *[service.process_queue() for service in [llm, tts, dalle, tee, tts_image_gate, llm_aggregator_for_image]])
if __name__ == "__main__":
(url, token) = configure()