Compare commits
11 Commits
v0.0.44
...
khk/queues
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
363a722370 | ||
|
|
0512765aeb | ||
|
|
2d8d9146f8 | ||
|
|
9829f77052 | ||
|
|
8071ee6b4b | ||
|
|
d26d23fed7 | ||
|
|
38cb8ca6a3 | ||
|
|
d03dd62941 | ||
|
|
fc19a55f04 | ||
|
|
8f9c252af9 | ||
|
|
cec2bdc15f |
@@ -1,29 +1,106 @@
|
||||
import asyncio
|
||||
|
||||
from dailyai.queue_frame import LLMMessagesQueueFrame, QueueFrame, TextQueueFrame, TranscriptionQueueFrame
|
||||
from dailyai.services.ai_services import AIService
|
||||
from attr import dataclass
|
||||
|
||||
from typing import AsyncGenerator, List
|
||||
from dailyai.queue_frame import (
|
||||
ControlQueueFrame,
|
||||
EndStreamQueueFrame,
|
||||
LLMMessagesQueueFrame,
|
||||
QueueFrame,
|
||||
TextQueueFrame,
|
||||
TranscriptionQueueFrame,
|
||||
)
|
||||
from dailyai.services.ai_services import AIService, PipeService
|
||||
|
||||
from typing import Any, AsyncGenerator, Callable, List, Tuple
|
||||
|
||||
|
||||
class QueueTee:
|
||||
async def run_to_queue_and_generate(
|
||||
self,
|
||||
output_queue: asyncio.Queue,
|
||||
generator: AsyncGenerator[QueueFrame, None]
|
||||
) -> AsyncGenerator[QueueFrame, None]:
|
||||
async for frame in generator:
|
||||
await output_queue.put(frame)
|
||||
yield frame
|
||||
|
||||
async def run_to_queues(
|
||||
self,
|
||||
output_queues: List[asyncio.Queue],
|
||||
generator: AsyncGenerator[QueueFrame, None]
|
||||
class QueueTee(PipeService):
|
||||
def __init__(
|
||||
self, sinks: list[PipeService], *args, **kwargs
|
||||
):
|
||||
async for frame in generator:
|
||||
for queue in output_queues:
|
||||
await queue.put(frame)
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
self.sinks: List[PipeService] = []
|
||||
for sink in sinks:
|
||||
sink.source_queue = asyncio.Queue()
|
||||
self.sinks.append(sink)
|
||||
|
||||
async def process_queue(self):
|
||||
if not self.source_queue:
|
||||
return
|
||||
|
||||
while True:
|
||||
frame: QueueFrame = await self.source_queue.get()
|
||||
for sink in self.sinks:
|
||||
if sink.source_queue:
|
||||
await sink.source_queue.put(frame)
|
||||
|
||||
if isinstance(frame, EndStreamQueueFrame):
|
||||
break
|
||||
|
||||
|
||||
class QueueFrameAggregator(PipeService):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
aggregator: Callable[[Any, QueueFrame], Tuple[Any, QueueFrame | None]],
|
||||
finalizer: Callable[[Any], QueueFrame | None],
|
||||
*args,
|
||||
**kwargs
|
||||
):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.aggregator = aggregator
|
||||
self.finalizer = finalizer
|
||||
self.aggregation = None
|
||||
|
||||
async def process_frame(
|
||||
self, frame: QueueFrame
|
||||
) -> AsyncGenerator[QueueFrame, None]:
|
||||
output_frame: QueueFrame | None = None
|
||||
(self.aggregation, output_frame) = self.aggregator(
|
||||
self.aggregation, frame
|
||||
)
|
||||
if output_frame:
|
||||
yield output_frame
|
||||
|
||||
async def finalize(self) -> AsyncGenerator[QueueFrame, None]:
|
||||
output_frame = self.finalizer(self.aggregation)
|
||||
if output_frame:
|
||||
yield output_frame
|
||||
|
||||
class QueueMergeGateOnFirst(PipeService):
|
||||
|
||||
def __init__(
|
||||
self, source_queues: List[asyncio.Queue[QueueFrame]]
|
||||
):
|
||||
super().__init__()
|
||||
self.source_queues = source_queues
|
||||
|
||||
async def process_queue(self):
|
||||
(frames): list[QueueFrame] = await asyncio.gather(
|
||||
*[source_queue.get() for source_queue in self.source_queues]
|
||||
)
|
||||
for idx, frame in enumerate(frames):
|
||||
# if the frame we got from a source is an EndStreamQueueFrame, remove that source
|
||||
if isinstance(frame, EndStreamQueueFrame):
|
||||
self.source_queues.pop(idx)
|
||||
else:
|
||||
await self.sink_queue.put(frame)
|
||||
|
||||
async def pass_through(sink, source):
|
||||
while True:
|
||||
frame = await source.get()
|
||||
if isinstance(frame, EndStreamQueueFrame):
|
||||
break
|
||||
else:
|
||||
await sink.put(frame)
|
||||
|
||||
await asyncio.gather(
|
||||
*[pass_through(self.sink_queue, source) for source in self.source_queues]
|
||||
)
|
||||
|
||||
await self.sink_queue.put(EndStreamQueueFrame())
|
||||
|
||||
|
||||
class LLMContextAggregator(AIService):
|
||||
|
||||
@@ -6,7 +6,6 @@ import wave
|
||||
|
||||
from dailyai.queue_frame import (
|
||||
AudioQueueFrame,
|
||||
ControlQueueFrame,
|
||||
EndStreamQueueFrame,
|
||||
ImageQueueFrame,
|
||||
LLMMessagesQueueFrame,
|
||||
@@ -18,12 +17,58 @@ from dailyai.queue_frame import (
|
||||
|
||||
from abc import abstractmethod
|
||||
from typing import AsyncGenerator, AsyncIterable, BinaryIO, Iterable
|
||||
from dataclasses import dataclass
|
||||
|
||||
class AbstractPipeService:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
):
|
||||
self.sink_queue: asyncio.Queue[QueueFrame] = asyncio.Queue()
|
||||
|
||||
@abstractmethod
|
||||
async def process_queue(self):
|
||||
pass
|
||||
|
||||
class PipeService(AbstractPipeService):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
source_queue: asyncio.Queue[QueueFrame] | None = None,
|
||||
):
|
||||
super().__init__()
|
||||
self.logger: logging.Logger = logging.getLogger("dailyai")
|
||||
self.source_queue = source_queue
|
||||
|
||||
async def process_queue(self):
|
||||
if not self.source_queue:
|
||||
return
|
||||
|
||||
while True:
|
||||
frame: QueueFrame = await self.source_queue.get()
|
||||
async for output_frame in self.process_frame(frame):
|
||||
if isinstance(frame, EndStreamQueueFrame):
|
||||
async for final_frame in self.finalize():
|
||||
await self.sink_queue.put(final_frame)
|
||||
await self.sink_queue.put(output_frame)
|
||||
return
|
||||
|
||||
await self.sink_queue.put(output_frame)
|
||||
|
||||
@abstractmethod
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
yield frame
|
||||
|
||||
@abstractmethod
|
||||
async def finalize(self) -> AsyncGenerator[QueueFrame, None]:
|
||||
# This is a trick for the interpreter (and linter) to know that this is a generator.
|
||||
if False:
|
||||
yield QueueFrame()
|
||||
|
||||
|
||||
class AIService:
|
||||
class AIService(PipeService):
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.logger = logging.getLogger("dailyai")
|
||||
|
||||
def stop(self):
|
||||
@@ -67,17 +112,6 @@ class AIService:
|
||||
self.logger.error("Exception occurred while running AI service", e)
|
||||
raise e
|
||||
|
||||
@abstractmethod
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
if isinstance(frame, ControlQueueFrame):
|
||||
yield frame
|
||||
|
||||
@abstractmethod
|
||||
async def finalize(self) -> AsyncGenerator[QueueFrame, None]:
|
||||
# This is a trick for the interpreter (and linter) to know that this is a generator.
|
||||
if False:
|
||||
yield QueueFrame()
|
||||
|
||||
|
||||
class LLMService(AIService):
|
||||
@abstractmethod
|
||||
@@ -98,8 +132,8 @@ class LLMService(AIService):
|
||||
|
||||
|
||||
class TTSService(AIService):
|
||||
def __init__(self, aggregate_sentences=True):
|
||||
super().__init__()
|
||||
def __init__(self, aggregate_sentences=True, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.aggregate_sentences: bool = aggregate_sentences
|
||||
self.current_sentence: str = ""
|
||||
|
||||
@@ -138,7 +172,10 @@ class TTSService(AIService):
|
||||
yield AudioQueueFrame(audio_chunk)
|
||||
|
||||
# Convenience function to send the audio for a sentence to the given queue
|
||||
async def say(self, sentence, queue: asyncio.Queue):
|
||||
async def say(self, sentence, queue: asyncio.Queue|None=None):
|
||||
queue = queue or self.sink
|
||||
if not queue:
|
||||
raise Exception("No queue to send audio to")
|
||||
await self.run_to_queue(queue, [TextQueueFrame(sentence)])
|
||||
|
||||
|
||||
|
||||
@@ -48,8 +48,17 @@ class AzureTTSService(TTSService):
|
||||
|
||||
|
||||
class AzureLLMService(LLMService):
|
||||
def __init__(self, *, api_key, endpoint, api_version="2023-12-01-preview", model):
|
||||
super().__init__()
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
api_key,
|
||||
endpoint,
|
||||
model,
|
||||
api_version="2023-12-01-preview",
|
||||
*args,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(*args, **kwargs)
|
||||
self._model: str = model
|
||||
|
||||
self._client = AsyncAzureOpenAI(
|
||||
|
||||
@@ -82,7 +82,6 @@ class BaseTransportService():
|
||||
|
||||
self._stop_threads.set()
|
||||
|
||||
await self.send_queue.put(EndStreamQueueFrame())
|
||||
await async_output_queue_marshal_task
|
||||
await self.send_queue.join()
|
||||
self._frame_consumer_thread.join()
|
||||
|
||||
@@ -205,7 +205,7 @@ class DailyTransportService(BaseTransportService, EventHandler):
|
||||
def _post_run(self):
|
||||
self.client.leave()
|
||||
|
||||
def on_first_other_participant_joined(self):
|
||||
def on_first_other_participant_joined(self, participant):
|
||||
pass
|
||||
|
||||
def call_joined(self, join_data, client_error):
|
||||
@@ -226,7 +226,7 @@ class DailyTransportService(BaseTransportService, EventHandler):
|
||||
def on_participant_joined(self, participant):
|
||||
if not self._other_participant_has_joined and participant["id"] != self._my_participant_id:
|
||||
self._other_participant_has_joined = True
|
||||
self.on_first_other_participant_joined()
|
||||
self.on_first_other_participant_joined(participant)
|
||||
|
||||
def on_participant_left(self, participant, reason):
|
||||
if len(self.client.participants()) < self._min_others_count + 1:
|
||||
|
||||
@@ -12,12 +12,13 @@ class ElevenLabsTTSService(TTSService):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
aiohttp_session: aiohttp.ClientSession,
|
||||
api_key,
|
||||
voice_id,
|
||||
*args,
|
||||
**kwargs
|
||||
):
|
||||
super().__init__()
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
self._api_key = api_key
|
||||
self._voice_id = voice_id
|
||||
|
||||
@@ -11,12 +11,13 @@ class PlayHTAIService(TTSService):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
api_key,
|
||||
user_id,
|
||||
voice_url
|
||||
voice_url,
|
||||
*args,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__()
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
self.speech_key = api_key
|
||||
self.user_id = user_id
|
||||
|
||||
124
src/dailyai/tests/test_pipe_aggregators.py
Normal file
124
src/dailyai/tests/test_pipe_aggregators.py
Normal file
@@ -0,0 +1,124 @@
|
||||
import asyncio
|
||||
import unittest
|
||||
|
||||
from dailyai.queue_frame import EndStreamQueueFrame, TextQueueFrame
|
||||
from dailyai.services.ai_services import PipeService
|
||||
from dailyai.queue_aggregators import QueueFrameAggregator, QueueMergeGateOnFirst, QueueTee
|
||||
|
||||
class IncomingPipeService(PipeService):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.sink_queue = asyncio.Queue()
|
||||
|
||||
class QueueTeeTest(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
async def test_queue_tee(self):
|
||||
originpipe = IncomingPipeService()
|
||||
inpipe1 = PipeService(originpipe.sink_queue)
|
||||
outpipe1 = PipeService()
|
||||
outpipe2 = PipeService()
|
||||
teepipe = QueueTee(source_queue=inpipe1.sink_queue, sinks=[outpipe1, outpipe2])
|
||||
|
||||
originpipe.sink_queue.put_nowait(TextQueueFrame("test"))
|
||||
originpipe.sink_queue.put_nowait(EndStreamQueueFrame())
|
||||
|
||||
await asyncio.gather(*[pipe.process_queue() for pipe in [originpipe, inpipe1, outpipe1, outpipe2, teepipe]])
|
||||
|
||||
def validateOutputPipe(pipe: PipeService):
|
||||
self.assertEqual(pipe.sink_queue.qsize(), 2)
|
||||
frame = pipe.sink_queue.get_nowait()
|
||||
self.assertIsInstance(frame, TextQueueFrame)
|
||||
if isinstance(frame, TextQueueFrame):
|
||||
self.assertEqual(frame.text, "test")
|
||||
self.assertIsInstance(pipe.sink_queue.get_nowait(), EndStreamQueueFrame)
|
||||
|
||||
validateOutputPipe(outpipe1)
|
||||
validateOutputPipe(outpipe2)
|
||||
|
||||
class QueueFrameAggregatorTest(unittest.IsolatedAsyncioTestCase):
|
||||
async def test_queue_frame_aggregator(self):
|
||||
def aggregate_sentences(accumulation, frame):
|
||||
if not accumulation:
|
||||
accumulation = ""
|
||||
if isinstance(frame, TextQueueFrame):
|
||||
accumulation += frame.text
|
||||
if accumulation.endswith((".", "!", "?")):
|
||||
return ("", TextQueueFrame(accumulation))
|
||||
return (accumulation, None)
|
||||
|
||||
def finalize_sentences(accumulation):
|
||||
return TextQueueFrame(accumulation)
|
||||
|
||||
originpipe = IncomingPipeService()
|
||||
aggregator_pipe = QueueFrameAggregator(
|
||||
source_queue=originpipe.sink_queue,
|
||||
aggregator=aggregate_sentences,
|
||||
finalizer=finalize_sentences,
|
||||
)
|
||||
|
||||
originpipe.sink_queue.put_nowait(TextQueueFrame("testing, "))
|
||||
originpipe.sink_queue.put_nowait(TextQueueFrame("one."))
|
||||
originpipe.sink_queue.put_nowait(TextQueueFrame("two."))
|
||||
originpipe.sink_queue.put_nowait(TextQueueFrame("three."))
|
||||
originpipe.sink_queue.put_nowait(TextQueueFrame("can you "))
|
||||
originpipe.sink_queue.put_nowait(TextQueueFrame("hear me"))
|
||||
originpipe.sink_queue.put_nowait(EndStreamQueueFrame())
|
||||
|
||||
await asyncio.gather(originpipe.process_queue(), aggregator_pipe.process_queue())
|
||||
|
||||
self.assertEqual(aggregator_pipe.sink_queue.qsize(), 5)
|
||||
expected_text = ["testing, one.", "two.", "three.", "can you hear me"]
|
||||
for exepectation in expected_text:
|
||||
frame = aggregator_pipe.sink_queue.get_nowait()
|
||||
print(frame)
|
||||
self.assertIsInstance(frame, TextQueueFrame)
|
||||
if isinstance(frame, TextQueueFrame):
|
||||
self.assertEqual(frame.text, exepectation)
|
||||
|
||||
self.assertIsInstance(aggregator_pipe.sink_queue.get_nowait(), EndStreamQueueFrame)
|
||||
|
||||
class QueueMergeGateOnFirstTest(unittest.IsolatedAsyncioTestCase):
|
||||
async def test_queue_merge_gate_on_first(self):
|
||||
pipe1 = IncomingPipeService()
|
||||
pipe2 = IncomingPipeService()
|
||||
|
||||
merge_pipe = QueueMergeGateOnFirst(
|
||||
source_queues=[pipe1.sink_queue, pipe2.sink_queue],
|
||||
)
|
||||
|
||||
evt = asyncio.Event()
|
||||
|
||||
async def add_items_to_first_pipe():
|
||||
await evt.wait()
|
||||
await pipe1.sink_queue.put(TextQueueFrame("pipe1.1"))
|
||||
await pipe1.sink_queue.put(TextQueueFrame("pipe1.2"))
|
||||
await pipe1.sink_queue.put(EndStreamQueueFrame())
|
||||
|
||||
async def add_items_to_second_pipe():
|
||||
await pipe2.sink_queue.put(TextQueueFrame("pipe2.1"))
|
||||
evt.set()
|
||||
await pipe2.sink_queue.put(EndStreamQueueFrame())
|
||||
|
||||
await asyncio.gather(
|
||||
*[pipe.process_queue() for pipe in [pipe1, pipe2, merge_pipe]],
|
||||
add_items_to_first_pipe(),
|
||||
add_items_to_second_pipe())
|
||||
|
||||
self.assertEqual(merge_pipe.sink_queue.qsize(), 4)
|
||||
frame = merge_pipe.sink_queue.get_nowait()
|
||||
assert isinstance(frame, TextQueueFrame)
|
||||
if isinstance(frame, TextQueueFrame):
|
||||
self.assertEqual(frame.text, "pipe1.1")
|
||||
|
||||
frame = merge_pipe.sink_queue.get_nowait()
|
||||
assert isinstance(frame, TextQueueFrame)
|
||||
if isinstance(frame, TextQueueFrame):
|
||||
self.assertEqual(frame.text, "pipe2.1")
|
||||
|
||||
frame = merge_pipe.sink_queue.get_nowait()
|
||||
assert isinstance(frame, TextQueueFrame)
|
||||
if isinstance(frame, TextQueueFrame):
|
||||
self.assertEqual(frame.text, "pipe1.2")
|
||||
|
||||
frame = merge_pipe.sink_queue.get_nowait()
|
||||
assert isinstance(frame, EndStreamQueueFrame)
|
||||
30
src/dailyai/tests/test_pipe_service.py
Normal file
30
src/dailyai/tests/test_pipe_service.py
Normal file
@@ -0,0 +1,30 @@
|
||||
import asyncio
|
||||
import unittest
|
||||
|
||||
from dailyai.queue_frame import EndStreamQueueFrame, TextQueueFrame
|
||||
from dailyai.services.ai_services import PipeService
|
||||
|
||||
class TestPipeService(unittest.IsolatedAsyncioTestCase):
|
||||
class IncomingPipeService(PipeService):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.sink_queue = asyncio.Queue()
|
||||
|
||||
async def test_pipe_chain(self):
|
||||
pipe1 = TestPipeService.IncomingPipeService()
|
||||
pipe2 = PipeService(pipe1.sink_queue)
|
||||
pipe3 = PipeService(pipe2.sink_queue)
|
||||
|
||||
await pipe1.sink_queue.put(TextQueueFrame("test"))
|
||||
await pipe1.sink_queue.put(EndStreamQueueFrame())
|
||||
|
||||
await asyncio.gather(pipe1.process_queue(), pipe2.process_queue(), pipe3.process_queue())
|
||||
|
||||
self.assertEqual(pipe3.sink_queue.qsize(), 2)
|
||||
frame = await pipe3.sink_queue.get()
|
||||
self.assertIsInstance(frame, TextQueueFrame)
|
||||
if isinstance(frame, TextQueueFrame):
|
||||
self.assertEqual(frame.text, "test")
|
||||
|
||||
frame = await pipe3.sink_queue.get()
|
||||
self.assertIsInstance(frame, EndStreamQueueFrame)
|
||||
@@ -1,6 +1,7 @@
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
from dailyai.queue_frame import EndStreamQueueFrame, TextQueueFrame
|
||||
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
@@ -28,36 +29,40 @@ async def main(room_url):
|
||||
mic_enabled=True
|
||||
)
|
||||
|
||||
"""
|
||||
tts = ElevenLabsTTSService(
|
||||
source_queue=asyncio.Queue(),
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"))
|
||||
"""
|
||||
tts = PlayHTAIService(
|
||||
sink=transport.send_queue,
|
||||
api_key=os.getenv("PLAY_HT_API_KEY"),
|
||||
user_id=os.getenv("PLAY_HT_USER_ID"),
|
||||
voice_url=os.getenv("PLAY_HT_VOICE_URL"),
|
||||
)
|
||||
"""
|
||||
|
||||
tts.sink_queue = transport.send_queue
|
||||
|
||||
# Register an event handler so we can play the audio when the participant joins.
|
||||
@transport.event_handler("on_participant_joined")
|
||||
async def on_participant_joined(transport, participant):
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport, participant):
|
||||
nonlocal tts
|
||||
if participant["info"]["isLocal"]:
|
||||
return
|
||||
|
||||
await tts.say(
|
||||
"Hello there, " + participant["info"]["userName"] + "!",
|
||||
transport.send_queue,
|
||||
# todo: update the tts.say() convenience method to use the new queue architecture
|
||||
await tts.source_queue.put(
|
||||
TextQueueFrame("Hello there, " +
|
||||
participant["info"]["userName"] + "!")
|
||||
)
|
||||
await tts.source_queue.put(EndStreamQueueFrame())
|
||||
|
||||
# wait for the output queue to be empty, then leave the meeting
|
||||
await transport.stop_when_done()
|
||||
|
||||
await transport.run()
|
||||
del(tts)
|
||||
# todo: commented out because it seems to exit a little early, before
|
||||
# the audio is finished playing
|
||||
# await transport.stop_when_done()
|
||||
|
||||
await asyncio.gather(transport.run(), tts.process_queue())
|
||||
|
||||
if __name__ == "__main__":
|
||||
(url, token) = configure()
|
||||
|
||||
@@ -3,7 +3,7 @@ import os
|
||||
|
||||
import aiohttp
|
||||
|
||||
from dailyai.queue_frame import LLMMessagesQueueFrame
|
||||
from dailyai.queue_frame import EndStreamQueueFrame, LLMMessagesQueueFrame
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
@@ -23,35 +23,35 @@ async def main(room_url):
|
||||
mic_enabled=True
|
||||
)
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"))
|
||||
# tts = AzureTTSService(api_key=os.getenv("AZURE_SPEECH_API_KEY"), region=os.getenv("AZURE_SPEECH_REGION"))
|
||||
# tts = DeepgramTTSService(aiohttp_session=session, api_key=os.getenv("DEEPGRAM_API_KEY"), voice=os.getenv("DEEPGRAM_VOICE"))
|
||||
|
||||
llm = AzureLLMService(
|
||||
source_queue=asyncio.Queue(),
|
||||
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
|
||||
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
|
||||
model=os.getenv("AZURE_CHATGPT_MODEL"))
|
||||
# llm = OpenAILLMService(api_key=os.getenv("OPENAI_CHATGPT_API_KEY"))
|
||||
messages = [{
|
||||
"role": "system",
|
||||
"content": "You are an LLM in a WebRTC session, and this is a 'hello world' demo. Say hello to the world."
|
||||
}]
|
||||
tts_task = asyncio.create_task(
|
||||
tts.run_to_queue(
|
||||
transport.send_queue,
|
||||
llm.run([LLMMessagesQueueFrame(messages)]),
|
||||
)
|
||||
model=os.getenv("AZURE_CHATGPT_MODEL"),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
await tts_task
|
||||
await transport.stop_when_done()
|
||||
tts = ElevenLabsTTSService(
|
||||
|
||||
await transport.run()
|
||||
source_queue=llm.sink_queue, # this should really be a sentence aggregator?
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
|
||||
)
|
||||
|
||||
tts.sink_queue = transport.send_queue
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport, participant):
|
||||
messages = [{
|
||||
"role": "system",
|
||||
"content": "You are an LLM in a WebRTC session, and this is a 'hello world' demo. Say hello to the world."
|
||||
}]
|
||||
await llm.source_queue.put(LLMMessagesQueueFrame(messages))
|
||||
await llm.source_queue.put(EndStreamQueueFrame())
|
||||
# todo: commented out because it exits before audio plays
|
||||
# await transport.stop_when_done()
|
||||
|
||||
await asyncio.gather(transport.run(), llm.process_queue(), tts.process_queue())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
import asyncio
|
||||
from typing import Any, AsyncGenerator, Callable, Tuple
|
||||
import aiohttp
|
||||
import os
|
||||
from dailyai.queue_aggregators import QueueFrameAggregator, QueueMergeGateOnFirst, QueueTee
|
||||
|
||||
from dailyai.queue_frame import AudioQueueFrame, ImageQueueFrame
|
||||
from dailyai.queue_frame import AudioQueueFrame, EndStreamQueueFrame, ImageQueueFrame, LLMMessagesQueueFrame, LLMResponseEndQueueFrame, QueueFrame, TextQueueFrame
|
||||
from dailyai.services.ai_services import PipeService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureImageGenServiceREST, AzureTTSService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
@@ -27,34 +30,70 @@ async def main(room_url):
|
||||
camera_height=1024
|
||||
)
|
||||
|
||||
"""
|
||||
|
||||
/ TTS \
|
||||
Month prompt -> LLM -> Fork -> -> Gate -> Transport
|
||||
\ Aggregate -> ImageGen /
|
||||
"""
|
||||
|
||||
month_description_queue: asyncio.Queue[QueueFrame] = asyncio.Queue()
|
||||
llm = AzureLLMService(
|
||||
source_queue=month_description_queue,
|
||||
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
|
||||
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
|
||||
model=os.getenv("AZURE_CHATGPT_MODEL"))
|
||||
model=os.getenv("AZURE_CHATGPT_MODEL"),
|
||||
)
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id="ErXwobaYiN019PkySvjV")
|
||||
# tts = AzureTTSService(api_key=os.getenv("AZURE_SPEECH_API_KEY"), region=os.getenv("AZURE_SPEECH_REGION"))
|
||||
|
||||
dalle = FalImageGenService(
|
||||
image_size="1024x1024",
|
||||
aiohttp_session=session,
|
||||
key_id=os.getenv("FAL_KEY_ID"),
|
||||
key_secret=os.getenv("FAL_KEY_SECRET"))
|
||||
# dalle = OpenAIImageGenService(aiohttp_session=session, api_key=os.getenv("OPENAI_DALLE_API_KEY"), image_size="1024x1024")
|
||||
# dalle = AzureImageGenServiceREST(image_size="1024x1024", aiohttp_session=session, api_key=os.getenv("AZURE_DALLE_API_KEY"), endpoint=os.getenv("AZURE_DALLE_ENDPOINT"), model=os.getenv("AZURE_DALLE_MODEL"))
|
||||
key_secret=os.getenv("FAL_KEY_SECRET"),
|
||||
)
|
||||
|
||||
# Get a complete audio chunk from the given text. Splitting this into its own
|
||||
# coroutine lets us ensure proper ordering of the audio chunks on the send queue.
|
||||
async def get_all_audio(text):
|
||||
all_audio = bytearray()
|
||||
async for audio in tts.run_tts(text):
|
||||
all_audio.extend(audio)
|
||||
def aggregator(
|
||||
accumulation, frame: QueueFrame
|
||||
) -> tuple[Any, QueueFrame | None]:
|
||||
if not accumulation:
|
||||
accumulation = ""
|
||||
|
||||
return all_audio
|
||||
if isinstance(frame, TextQueueFrame):
|
||||
accumulation += frame.text
|
||||
return (accumulation, None)
|
||||
elif isinstance(frame, LLMResponseEndQueueFrame):
|
||||
return ("", TextQueueFrame(accumulation))
|
||||
else:
|
||||
return (accumulation, frame)
|
||||
|
||||
async def get_month_data(month):
|
||||
# This queue service takes chunks from LLM output and merges them into one text frame
|
||||
# that will be used to prompt the image service.
|
||||
llm_aggregator_for_image = QueueFrameAggregator(aggregator=aggregator, finalizer=lambda x: None)
|
||||
|
||||
# Set the source queue for the image service to the sink of the aggregator service
|
||||
dalle.source_queue = llm_aggregator_for_image.sink_queue
|
||||
|
||||
# This queue service takes the output from the LLM and sends it to the TTS service and
|
||||
# the aggregator for the image generation service.
|
||||
tee = QueueTee(source_queue=llm.sink_queue, sinks=[tts, llm_aggregator_for_image])
|
||||
|
||||
# This queue service takes input from the TTS service and the image service, and waits
|
||||
# to forward any audio frames until the image generation is complete. It will send
|
||||
# the image first, then the audio frames; this ensures that the image is shown before
|
||||
# the audio associated with the image is played.
|
||||
tts_image_gate = QueueMergeGateOnFirst([dalle.sink_queue, tts.sink_queue])
|
||||
|
||||
# We send the image of this queue service to the transport output.
|
||||
tts_image_gate.sink_queue = transport.send_queue
|
||||
|
||||
# Queue up all the months in the LLM service source queue
|
||||
months = ["January"] #, "February"]
|
||||
for month in months:
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
@@ -62,72 +101,21 @@ async def main(room_url):
|
||||
}
|
||||
]
|
||||
|
||||
image_description = await llm.run_llm(messages)
|
||||
if not image_description:
|
||||
return
|
||||
await month_description_queue.put(LLMMessagesQueueFrame(messages))
|
||||
|
||||
to_speak = f"{month}: {image_description}"
|
||||
audio_task = asyncio.create_task(get_all_audio(to_speak))
|
||||
image_task = asyncio.create_task(dalle.run_image_gen(image_description))
|
||||
print(f"about to gather tasks for {month}")
|
||||
(audio, image_data) = await asyncio.gather(
|
||||
audio_task, image_task
|
||||
)
|
||||
print(f"about to return from get_month_data for {month}")
|
||||
return {
|
||||
"month": month,
|
||||
"text": image_description,
|
||||
"image_url": image_data[0],
|
||||
"image": image_data[1],
|
||||
"audio": audio,
|
||||
}
|
||||
await month_description_queue.put(EndStreamQueueFrame())
|
||||
|
||||
months: list[str] = [
|
||||
"January",
|
||||
"February",
|
||||
"March",
|
||||
"April",
|
||||
"May",
|
||||
"June"
|
||||
pipeline = [
|
||||
llm,
|
||||
[tee,
|
||||
[tts,
|
||||
[llm_aggregator_for_image, dalle]
|
||||
],
|
||||
tts_image_gate],
|
||||
transport,
|
||||
]
|
||||
"""
|
||||
"February",
|
||||
"March",
|
||||
"April",
|
||||
"May",
|
||||
"June",
|
||||
"July",
|
||||
"August",
|
||||
"September",
|
||||
"October",
|
||||
"November",
|
||||
"December",
|
||||
"""
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
# This will play the months in the order they're completed. The benefit
|
||||
# is we'll have as little delay as possible before the first month, and
|
||||
# likely no delay between months, but the months won't display in order.
|
||||
for month_data_task in asyncio.as_completed(month_tasks):
|
||||
print(f"month_data_task: {month_data_task}")
|
||||
try:
|
||||
data = await month_data_task
|
||||
except Exception:
|
||||
print("OMG EXCEPTION!!!!")
|
||||
if data:
|
||||
await transport.send_queue.put(
|
||||
[
|
||||
ImageQueueFrame(data["image_url"], data["image"]),
|
||||
AudioQueueFrame(data["audio"]),
|
||||
]
|
||||
)
|
||||
|
||||
# wait for the output queue to be empty, then leave the meeting
|
||||
await transport.stop_when_done()
|
||||
|
||||
month_tasks = [asyncio.create_task(get_month_data(month)) for month in months]
|
||||
|
||||
await transport.run()
|
||||
await asyncio.gather(transport.run(), *[service.process_queue() for service in [llm, tts, dalle, tee, tts_image_gate, llm_aggregator_for_image]])
|
||||
|
||||
if __name__ == "__main__":
|
||||
(url, token) = configure()
|
||||
|
||||
Reference in New Issue
Block a user