From 1071dede1a4d25162eaa33325af2e762b8a7a76d Mon Sep 17 00:00:00 2001 From: Liza <65890040+lazeratops@users.noreply.github.com> Date: Fri, 19 Jan 2024 14:59:48 +0100 Subject: [PATCH 1/9] Only initialize Daily once (#5) --- src/dailyai/services/daily_transport_service.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/dailyai/services/daily_transport_service.py b/src/dailyai/services/daily_transport_service.py index 681c02a4e..3e8cfda18 100644 --- a/src/dailyai/services/daily_transport_service.py +++ b/src/dailyai/services/daily_transport_service.py @@ -1,6 +1,7 @@ import asyncio import inspect import logging +import threading import time import types @@ -21,6 +22,8 @@ from daily import ( ) class DailyTransportService(EventHandler): + _daily_initialized = False + _lock = threading.Lock() def __init__( self, room_url: str, @@ -114,7 +117,11 @@ class DailyTransportService(EventHandler): return decorator def configure_daily(self): - Daily.init() + # Only initialize Daily once + if not DailyTransportService._daily_initialized: + with DailyTransportService._lock: + Daily.init() + DailyTransportService._daily_initialized = True self.client = CallClient(event_handler=self) if self.mic_enabled: From 6ae733ebfecf63c36cbb66fdec6af20296ce4077 Mon Sep 17 00:00:00 2001 From: Moishe Lettvin Date: Fri, 19 Jan 2024 13:06:15 -0500 Subject: [PATCH 2/9] Cleanup: no more sentence aggregator, let the TTS service deal with that; also removed the queue typing stuff from ai_services --- src/dailyai/queue_frame.py | 10 +- src/dailyai/services/ai_services.py | 165 +++++++----------- .../services/daily_transport_service.py | 2 +- src/dailyai/tests/test_ai_services.py | 92 +--------- .../02-llm-say-one-thing.py | 5 +- .../theoretical-to-real/03-still-frame.py | 2 +- .../04-utterance-and-speech.py | 10 +- .../05-sync-speech-and-text.py | 1 - .../06-listen-and-respond.py | 2 +- 9 files changed, 82 insertions(+), 207 deletions(-) diff --git a/src/dailyai/queue_frame.py b/src/dailyai/queue_frame.py index 7b4d1ca83..7e77ff89d 100644 --- a/src/dailyai/queue_frame.py +++ b/src/dailyai/queue_frame.py @@ -2,16 +2,14 @@ from enum import Enum from dataclasses import dataclass class FrameType(Enum): + NOOP = -1 START_STREAM = 0 END_STREAM = 1 AUDIO = 2 IMAGE = 3 - SENTENCE = 4 - TEXT_CHUNK = 5 - LLM_MESSAGE = 6 - APP_MESSAGE = 7 - IMAGE_DESCRIPTION = 8 - TRANSCRIPTION = 9 + TEXT = 4 + LLM_MESSAGE = 5 + APP_MESSAGE = 6 @dataclass(frozen=True) class QueueFrame: diff --git a/src/dailyai/services/ai_services.py b/src/dailyai/services/ai_services.py index 83c1c7099..9ad8e0399 100644 --- a/src/dailyai/services/ai_services.py +++ b/src/dailyai/services/ai_services.py @@ -21,12 +21,6 @@ class AIService: def stop(self): pass - def allowed_input_frame_types(self) -> set[FrameType]: - return set() - - def possible_output_frame_types(self) -> set[FrameType]: - return set() - async def run_to_queue(self, queue: asyncio.Queue, frames, add_end_of_stream=False) -> None: async for frame in self.run(frames): await queue.put(frame) @@ -39,76 +33,50 @@ class AIService: frames: Iterable[QueueFrame] | AsyncIterable[QueueFrame] | asyncio.Queue[QueueFrame], - requested_frame_types: set[FrameType] | None=None, ) -> AsyncGenerator[QueueFrame, None]: - if requested_frame_types and self.possible_output_frame_types().intersection(requested_frame_types) == set(): - raise Exception(f"Requested frame types {requested_frame_types} are not supported by this service.") + try: + if isinstance(frames, AsyncIterable): + async for frame in frames: + async for output_frame in self.process_frame(frame): + yield output_frame + elif isinstance(frames, Iterable): + for frame in frames: + async for output_frame in self.process_frame(frame): + yield output_frame + elif isinstance(frames, asyncio.Queue): + while True: + frame = await frames.get() + async for output_frame in self.process_frame(frame): + yield output_frame + if frame.frame_type == FrameType.END_STREAM: + break + else: + raise Exception("Frames must be an iterable or async iterable") - if not requested_frame_types: - requested_frame_types = self.possible_output_frame_types() - - if isinstance(frames, AsyncIterable): - async for frame in frames: - async for output_frame in self.process_frame(requested_frame_types, frame): - yield output_frame - elif isinstance(frames, Iterable): - for frame in frames: - async for output_frame in self.process_frame(requested_frame_types, frame): - yield output_frame - elif isinstance(frames, asyncio.Queue): - while True: - frame = await frames.get() - async for output_frame in self.process_frame(requested_frame_types, frame): - yield output_frame - if frame.frame_type == FrameType.END_STREAM: - break - else: - raise Exception("Frames must be an iterable or async iterable") + async for output_frame in self.finalize(): + yield output_frame + except Exception as e: + self.logger.error("Exception occurred while running AI service", e) + raise e @abstractmethod - async def process_frame(self, requested_frame_types:set[FrameType], frame:QueueFrame) -> AsyncGenerator[QueueFrame, None]: - # Yield something so the linter can deduce what should happen here. - yield QueueFrame(FrameType.END_STREAM, None) - -class SentenceAggregator(AIService): - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.current_sentence = "" - - def allowed_input_frame_types(self) -> set[FrameType]: - return set([FrameType.TEXT_CHUNK, FrameType.SENTENCE]) - - def possible_output_frame_types(self) -> set[FrameType]: - return set([FrameType.SENTENCE]) - - async def process_frame(self, requested_frame_types: set[FrameType], frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: - if not FrameType.SENTENCE in requested_frame_types: - return - - if frame.frame_type == FrameType.TEXT_CHUNK: - if type(frame.frame_data) != str: - raise Exception( - "Sentence aggregator requires a string for the data field" - ) - - self.current_sentence += frame.frame_data - if self.current_sentence.endswith((".", "?", "!")): - sentence = self.current_sentence - self.current_sentence = "" - yield QueueFrame(FrameType.SENTENCE, sentence) - elif frame.frame_type == FrameType.END_STREAM: - if self.current_sentence: - yield QueueFrame(FrameType.SENTENCE, self.current_sentence) - elif frame.frame_type == FrameType.SENTENCE: - yield frame + async def process_frame(self, frame:QueueFrame) -> AsyncGenerator[QueueFrame, None]: + # This is a trick for the interpreter (and linter) to know that this is a generator. + if False: + yield QueueFrame(FrameType.NOOP, None) + @abstractmethod + async def finalize(self) -> AsyncGenerator[QueueFrame, None]: + # This is a trick for the interpreter (and linter) to know that this is a generator. + if False: + yield QueueFrame(FrameType.NOOP, None) class LLMService(AIService): def allowed_input_frame_types(self) -> set[FrameType]: - return set([FrameType.LLM_MESSAGE, FrameType.SENTENCE, FrameType.TRANSCRIPTION]) + return set([FrameType.LLM_MESSAGE]) def allowed_output_frame_types(self) -> set[FrameType]: - return set([FrameType.SENTENCE, FrameType.TEXT_CHUNK]) + return set([FrameType.TEXT]) @abstractmethod async def run_llm_async(self, messages) -> AsyncGenerator[str, None]: @@ -118,52 +86,58 @@ class LLMService(AIService): async def run_llm(self, messages) -> str: pass - async def process_frame(self, requested_frame_types: set[FrameType], frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: + async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: if frame.frame_type == FrameType.LLM_MESSAGE: if type(frame.frame_data) != list: raise Exception("LLM service requires a dict for the data field") messages: list[dict[str, str]] = frame.frame_data - if FrameType.SENTENCE in requested_frame_types: - yield QueueFrame(FrameType.SENTENCE, await self.run_llm(messages)) - else: - async for text_chunk in self.run_llm_async(messages): - yield QueueFrame(FrameType.TEXT_CHUNK, text_chunk) - - # TODO: handle other frame types! Need to aggregate into messages + async for text_chunk in self.run_llm_async(messages): + yield QueueFrame(FrameType.TEXT, text_chunk) class TTSService(AIService): + def __init__(self, aggregate_sentences=True): + super().__init__() + self.aggregate_sentences: bool = aggregate_sentences + self.current_sentence: str = "" + # Some TTS services require a specific sample rate. We default to 16k def get_mic_sample_rate(self): return 16000 - def allowed_input_frame_types(self) -> set[FrameType]: - return set([FrameType.SENTENCE, FrameType.TRANSCRIPTION, FrameType.TEXT_CHUNK]) - - def possible_output_frame_types(self) -> set[FrameType]: - return set([FrameType.AUDIO]) - - # Converts the sentence to audio. Yields a list of audio frames that can + # Converts the text to audio. Yields a list of audio frames that can # be sent to the microphone device @abstractmethod - async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]: + async def run_tts(self, text) -> AsyncGenerator[bytes, None]: # yield empty bytes here, so linting can infer what this method does yield bytes() - async def process_frame(self, requested_frame_types: set[FrameType], frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: - if not FrameType.AUDIO in requested_frame_types: - return + async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: + if frame.frame_type != FrameType.TEXT or type(frame.frame_data) != str: + raise Exception(f"TTS service requires a string for the data field, got {frame.frame_type} and frame_data type {type(frame.frame_data)}") - if type(frame.frame_data) != str: - raise Exception("TTS service requires a string for the data field") + text: str | None = None + if not self.aggregate_sentences: + text = frame.frame_data + else: + self.current_sentence += frame.frame_data + if self.current_sentence.endswith((".", "?", "!")): + text = self.current_sentence + self.current_sentence = "" - async for audio_chunk in self.run_tts(frame.frame_data): - yield QueueFrame(FrameType.AUDIO, audio_chunk) + if text: + async for audio_chunk in self.run_tts(text): + yield QueueFrame(FrameType.AUDIO, audio_chunk) + + async def finalize(self): + if self.current_sentence: + async for audio_chunk in self.run_tts(self.current_sentence): + yield QueueFrame(FrameType.AUDIO, audio_chunk) # Convenience function to send the audio for a sentence to the given queue async def say(self, sentence, queue: asyncio.Queue): - await self.run_to_queue(queue, [QueueFrame(FrameType.SENTENCE, sentence)]) + await self.run_to_queue(queue, [QueueFrame(FrameType.TEXT, sentence)]) class ImageGenService(AIService): @@ -171,21 +145,12 @@ class ImageGenService(AIService): super().__init__(**kwargs) self.image_size = image_size - def allowed_input_frame_types(self) -> set[FrameType]: - return set([FrameType.SENTENCE, FrameType.TRANSCRIPTION, FrameType.TEXT_CHUNK, FrameType.IMAGE_DESCRIPTION]) - - def possible_output_frame_types(self) -> set[FrameType]: - return set([FrameType.IMAGE]) - # Renders the image. Returns an Image object. @abstractmethod async def run_image_gen(self, sentence) -> tuple[str, bytes]: pass - async def process_frame(self, requested_frame_types: set[FrameType], frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: - if not FrameType.IMAGE in requested_frame_types: - return - + async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: if type(frame.frame_data) != str: raise Exception("Image service requires a string for the data field") diff --git a/src/dailyai/services/daily_transport_service.py b/src/dailyai/services/daily_transport_service.py index 3e8cfda18..5d300cad3 100644 --- a/src/dailyai/services/daily_transport_service.py +++ b/src/dailyai/services/daily_transport_service.py @@ -279,7 +279,7 @@ class DailyTransportService(EventHandler): def on_transcription_message(self, message:dict): if self.loop: - frame = QueueFrame(FrameType.TRANSCRIPTION, message) + frame = QueueFrame(FrameType.TEXT, message) asyncio.run_coroutine_threadsafe(self.receive_queue.put(frame), self.loop) def on_transcription_stopped(self, stopped_by, stopped_by_error): diff --git a/src/dailyai/tests/test_ai_services.py b/src/dailyai/tests/test_ai_services.py index 6467442a1..bfe8dbde8 100644 --- a/src/dailyai/tests/test_ai_services.py +++ b/src/dailyai/tests/test_ai_services.py @@ -3,25 +3,19 @@ import unittest from typing import AsyncGenerator, Generator -from dailyai.services.ai_services import AIService, SentenceAggregator +from dailyai.services.ai_services import AIService from dailyai.queue_frame import QueueFrame, FrameType class SimpleAIService(AIService): - def allowed_input_frame_types(self) -> set[FrameType]: - return set([FrameType.TEXT_CHUNK]) - - def possible_output_frame_types(self) -> set[FrameType]: - return set([FrameType.TEXT_CHUNK]) - - async def process_frame(self, requested_frame_types: set[FrameType], frame: QueueFrame) -> QueueFrame | None: - return frame + async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: + yield frame class TestBaseAIService(unittest.IsolatedAsyncioTestCase): async def test_async_input(self): service = SimpleAIService() input_frames = [ - QueueFrame(FrameType.TEXT_CHUNK, "hello"), + QueueFrame(FrameType.TEXT, "hello"), QueueFrame(FrameType.END_STREAM, None), ] async def iterate_frames() -> AsyncGenerator[QueueFrame, None]: @@ -29,7 +23,7 @@ class TestBaseAIService(unittest.IsolatedAsyncioTestCase): yield frame output_frames = [] - async for frame in service.run(set([FrameType.TEXT_CHUNK]), iterate_frames()): + async for frame in service.run(iterate_frames()): output_frames.append(frame) self.assertEqual(input_frames, output_frames) @@ -38,7 +32,7 @@ class TestBaseAIService(unittest.IsolatedAsyncioTestCase): service = SimpleAIService() input_frames = [ - QueueFrame(FrameType.TEXT_CHUNK, "hello"), + QueueFrame(FrameType.TEXT, "hello"), QueueFrame(FrameType.END_STREAM, None), ] @@ -47,83 +41,11 @@ class TestBaseAIService(unittest.IsolatedAsyncioTestCase): yield frame output_frames = [] - async for frame in service.run(set([FrameType.TEXT_CHUNK]), iterate_frames()): + async for frame in service.run(iterate_frames()): output_frames.append(frame) self.assertEqual(input_frames, output_frames) -class TestSentenceAggregator(unittest.IsolatedAsyncioTestCase): - async def test_clause(self) -> None: - input_frames = [ - QueueFrame(FrameType.TEXT_CHUNK, "hello"), - QueueFrame(FrameType.END_STREAM, None), - ] - - service = SentenceAggregator() - output_frames = [] - async for frame in service.run(set([FrameType.SENTENCE]), input_frames): - output_frames.append(frame) - - self.assertEqual(1, len(output_frames)) - self.assertEqual(QueueFrame(FrameType.SENTENCE, "hello"), output_frames[0]) - - async def test_sentence(self) -> None: - input_frames = [ - QueueFrame(FrameType.TEXT_CHUNK, "hello, "), - QueueFrame(FrameType.TEXT_CHUNK, "world."), - QueueFrame(FrameType.END_STREAM, None), - ] - - service = SentenceAggregator() - output_frames = [] - async for frame in service.run(set([FrameType.SENTENCE]), input_frames): - output_frames.append(frame) - - self.assertEqual(1, len(output_frames)) - self.assertEqual(QueueFrame(FrameType.SENTENCE, "hello, world."), output_frames[0]) - - async def test_sentence_and_clause(self) -> None: - input_frames = [ - QueueFrame(FrameType.TEXT_CHUNK, "hello, "), - QueueFrame(FrameType.TEXT_CHUNK, "world."), - QueueFrame(FrameType.TEXT_CHUNK, " How are"), - QueueFrame(FrameType.END_STREAM, None), - ] - - service = SentenceAggregator() - output_frames = [] - async for frame in service.run(set([FrameType.SENTENCE]), input_frames): - output_frames.append(frame) - - self.assertEqual(2, len(output_frames)) - self.assertEqual( - QueueFrame(FrameType.SENTENCE, "hello, world."), output_frames[0] - ) - self.assertEqual( - QueueFrame(FrameType.SENTENCE, " How are"), output_frames[1] - ) - - async def test_two_sentences(self) -> None: - input_frames = [ - QueueFrame(FrameType.TEXT_CHUNK, "hello, "), - QueueFrame(FrameType.TEXT_CHUNK, "world."), - QueueFrame(FrameType.TEXT_CHUNK, " How are"), - QueueFrame(FrameType.TEXT_CHUNK, " you doing?"), - QueueFrame(FrameType.END_STREAM, None), - ] - - service = SentenceAggregator() - output_frames = [] - async for frame in service.run(set([FrameType.SENTENCE]), input_frames): - output_frames.append(frame) - - self.assertEqual(2, len(output_frames)) - self.assertEqual( - QueueFrame(FrameType.SENTENCE, "hello, world."), output_frames[0] - ) - self.assertEqual(QueueFrame(FrameType.SENTENCE, " How are you doing?"), output_frames[1]) - - if __name__ == "__main__": unittest.main() diff --git a/src/samples/theoretical-to-real/02-llm-say-one-thing.py b/src/samples/theoretical-to-real/02-llm-say-one-thing.py index a2842315c..3ea0f2714 100644 --- a/src/samples/theoretical-to-real/02-llm-say-one-thing.py +++ b/src/samples/theoretical-to-real/02-llm-say-one-thing.py @@ -4,7 +4,6 @@ from typing import AsyncGenerator from dailyai.queue_frame import QueueFrame, FrameType from dailyai.services.daily_transport_service import DailyTransportService -from dailyai.services.ai_services import SentenceAggregator from dailyai.services.azure_ai_services import AzureLLMService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService @@ -28,9 +27,7 @@ async def main(room_url): tts_task = asyncio.create_task( tts.run_to_queue( transport.send_queue, - SentenceAggregator().run( - llm.run([QueueFrame(FrameType.LLM_MESSAGE, messages)]) - ) + llm.run([QueueFrame(FrameType.LLM_MESSAGE, messages)]) ) ) diff --git a/src/samples/theoretical-to-real/03-still-frame.py b/src/samples/theoretical-to-real/03-still-frame.py index 79261214d..c69b12547 100644 --- a/src/samples/theoretical-to-real/03-still-frame.py +++ b/src/samples/theoretical-to-real/03-still-frame.py @@ -23,7 +23,7 @@ async def main(room_url): imagegen = OpenAIImageGenService(image_size="1024x1024") image_task = asyncio.create_task( - imagegen.run_to_queue(transport.send_queue, [QueueFrame(FrameType.IMAGE_DESCRIPTION, "a cat in the style of picasso")]) + imagegen.run_to_queue(transport.send_queue, [QueueFrame(FrameType.TEXT, "a cat in the style of picasso")]) ) @transport.event_handler("on_participant_joined") diff --git a/src/samples/theoretical-to-real/04-utterance-and-speech.py b/src/samples/theoretical-to-real/04-utterance-and-speech.py index 93130d0c3..296d17e6d 100644 --- a/src/samples/theoretical-to-real/04-utterance-and-speech.py +++ b/src/samples/theoretical-to-real/04-utterance-and-speech.py @@ -2,7 +2,6 @@ import argparse import asyncio import re -from dailyai.services.ai_services import SentenceAggregator from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService from dailyai.queue_frame import QueueFrame, FrameType @@ -36,9 +35,7 @@ async def main(room_url:str): llm_response_task = asyncio.create_task( elevenlabs_tts.run_to_queue( buffer_queue, - SentenceAggregator().run( - llm.run([QueueFrame(FrameType.LLM_MESSAGE, messages)]) - ), + llm.run([QueueFrame(FrameType.LLM_MESSAGE, messages)]), True, ) ) @@ -48,10 +45,7 @@ async def main(room_url:str): if participant["id"] == transport.my_participant_id: return - await azure_tts.run_to_queue( - transport.send_queue, - [QueueFrame(FrameType.SENTENCE, "My friend the LLM is now going to tell a joke about llamas.")] - ) + await azure_tts.say("My friend the LLM is now going to tell a joke about llamas.", transport.send_queue) async def buffer_to_send_queue(): while True: diff --git a/src/samples/theoretical-to-real/05-sync-speech-and-text.py b/src/samples/theoretical-to-real/05-sync-speech-and-text.py index e38020cec..d589e778a 100644 --- a/src/samples/theoretical-to-real/05-sync-speech-and-text.py +++ b/src/samples/theoretical-to-real/05-sync-speech-and-text.py @@ -5,7 +5,6 @@ from asyncio.queues import Queue import re from dailyai.queue_frame import QueueFrame, FrameType -from dailyai.services.ai_services import SentenceAggregator from dailyai.services.azure_ai_services import AzureLLMService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from dailyai.services.open_ai_services import OpenAIImageGenService diff --git a/src/samples/theoretical-to-real/06-listen-and-respond.py b/src/samples/theoretical-to-real/06-listen-and-respond.py index d4bcf492f..ef34d1832 100644 --- a/src/samples/theoretical-to-real/06-listen-and-respond.py +++ b/src/samples/theoretical-to-real/06-listen-and-respond.py @@ -33,7 +33,7 @@ async def main(room_url:str, token): sentence = "" async for frame in transport.get_receive_frames(): - if frame.frame_type != FrameType.TRANSCRIPTION: + if frame.frame_type != FrameType.TEXT: continue message = frame.frame_data From ccd2fa31e50b551a8706a297b72422b221e656ba Mon Sep 17 00:00:00 2001 From: Moishe Lettvin Date: Fri, 19 Jan 2024 13:57:52 -0500 Subject: [PATCH 3/9] Rename 'theoretical-to-real' samples to 'foundational' --- .../{theoretical-to-real => foundational}/01-say-one-thing.py | 0 .../{theoretical-to-real => foundational}/01a-greet-user.py | 0 .../{theoretical-to-real => foundational}/02-llm-say-one-thing.py | 0 .../{theoretical-to-real => foundational}/03-still-frame.py | 0 .../04-utterance-and-speech.py | 0 .../05-sync-speech-and-text.py | 0 .../06-listen-and-respond.py | 0 7 files changed, 0 insertions(+), 0 deletions(-) rename src/samples/{theoretical-to-real => foundational}/01-say-one-thing.py (100%) rename src/samples/{theoretical-to-real => foundational}/01a-greet-user.py (100%) rename src/samples/{theoretical-to-real => foundational}/02-llm-say-one-thing.py (100%) rename src/samples/{theoretical-to-real => foundational}/03-still-frame.py (100%) rename src/samples/{theoretical-to-real => foundational}/04-utterance-and-speech.py (100%) rename src/samples/{theoretical-to-real => foundational}/05-sync-speech-and-text.py (100%) rename src/samples/{theoretical-to-real => foundational}/06-listen-and-respond.py (100%) diff --git a/src/samples/theoretical-to-real/01-say-one-thing.py b/src/samples/foundational/01-say-one-thing.py similarity index 100% rename from src/samples/theoretical-to-real/01-say-one-thing.py rename to src/samples/foundational/01-say-one-thing.py diff --git a/src/samples/theoretical-to-real/01a-greet-user.py b/src/samples/foundational/01a-greet-user.py similarity index 100% rename from src/samples/theoretical-to-real/01a-greet-user.py rename to src/samples/foundational/01a-greet-user.py diff --git a/src/samples/theoretical-to-real/02-llm-say-one-thing.py b/src/samples/foundational/02-llm-say-one-thing.py similarity index 100% rename from src/samples/theoretical-to-real/02-llm-say-one-thing.py rename to src/samples/foundational/02-llm-say-one-thing.py diff --git a/src/samples/theoretical-to-real/03-still-frame.py b/src/samples/foundational/03-still-frame.py similarity index 100% rename from src/samples/theoretical-to-real/03-still-frame.py rename to src/samples/foundational/03-still-frame.py diff --git a/src/samples/theoretical-to-real/04-utterance-and-speech.py b/src/samples/foundational/04-utterance-and-speech.py similarity index 100% rename from src/samples/theoretical-to-real/04-utterance-and-speech.py rename to src/samples/foundational/04-utterance-and-speech.py diff --git a/src/samples/theoretical-to-real/05-sync-speech-and-text.py b/src/samples/foundational/05-sync-speech-and-text.py similarity index 100% rename from src/samples/theoretical-to-real/05-sync-speech-and-text.py rename to src/samples/foundational/05-sync-speech-and-text.py diff --git a/src/samples/theoretical-to-real/06-listen-and-respond.py b/src/samples/foundational/06-listen-and-respond.py similarity index 100% rename from src/samples/theoretical-to-real/06-listen-and-respond.py rename to src/samples/foundational/06-listen-and-respond.py From b443fbdb60b05789c12e96711d54d23f4777cfaf Mon Sep 17 00:00:00 2001 From: Moishe Lettvin Date: Fri, 19 Jan 2024 16:20:08 -0500 Subject: [PATCH 4/9] Very rough draft at intro/overview in README --- README.md | 85 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/README.md b/README.md index 74459f38a..9e782214e 100644 --- a/README.md +++ b/README.md @@ -53,3 +53,88 @@ If you have those environment variables stored in an .env file, you can quickly ```bash export $(grep -v '^#' .env | xargs) ``` + +## Overview +The Daily AI SDK allows you to build applications that can participate in WebRTC sessions and interact with AI Services. Some examples of what you can build with this: +* conversational bots that interact 1:1 with a user, using voice recognition and text-to-speech +* assistant bots that aggregate transcriptions from multiple participants in a meeting and provide realtime summaries or other AI-generated output. +* image-recognition bots +* etc +## Concepts +### Transport Service +The SDK provides one “transport service”, which is a wrapper around Daily’s `daily-python` client (tk add link). You can use this service to listen for events related to a WebRTC session, such as “a participant joined the meeting”. +The transport service also exposes a send queue, and a receive queue. You can use the send queue to send audio and video to the WebRTC session, and you can listen to the receive queue to see audio, video and transcription data from the WebRTC session. +### AI Services +The AI Service classes provide wrappers around various AI providers, and allow you to query LLMs, convert text to speech and make images from text. The audio and images can then be placed on the transport service’s send queue, where they’ll be sent to the WebRTC session. +### Queue Frames +Communication between the transport service and AI services, and between various AI services, takes place in Queue Frames. These frames contain an indication of the type of data as well as the data itself. +## Using Transports, AI Services and Frames +AI Services all define a `.run` method. This method consumes and generates `QueueFrame` frames. The kind of frames that can be consumed and generated depend on the kind of service. For instance, an LLM AI Service consumes `LLM_MESSAGE` frames (which define a history of interaction with an LLM) and emit `TEXT` frames (the response from the LLM). + +The `.run` method is an `AsyncIterable`, and it takes an `iterable`, `AsyncIterable` or `asyncio.Queue` that produces QueueFrames as a parameter. This makes it easy to chain AI Services, and consume input from the Transport’s `receive_queue` . + +AI Services also have a `.run_to_queue` method. This method is not an AsyncIterable, but instead sends processed QueueFrames to a queue. This makes it easy to send the output of an AI Service to the Transport’s `send_queue`. + +AI Services also define convenience functions that let you bypass creating QueueFrames for some simple cases (eg. using the TTS service to convert a string to audio output and send that audio to the transport’s `send_queue`). See below for examples. +## Examples +### Say Something +The base TTS AI service exposes a `.say` method. After creating a transport and TTS service, you can use this method like so: +``` +transport = DailyTransportService(...) +tts = AzureTTSService() +await tts.say("hello world", transport.send_queue) +``` +This will call the TTS service to render the text to audio frames, then put the audio frames on the transport’s send queue. The transport will then send those frames along to the WebRTC session. + +### Speak an LLM response +Given a system prompt contained in a `messages` array, you can emit the LLM’s response as audio with a chain like this: +``` +transport = DailyTransportService(...) # setup parameters omitted +tts = AzureTTSService() +llm = AzureLLMService() +messages = [...] # system prompt omitted for brevity + +await tts.run_to_queue( + transport.send_queue, + llm.run([QueueFrame.LLM_MESSAGES, messages]) +) +``` +In this code, the LLM service object sends the messages to Azure’s OpenAI implementation, which streams chunks back asynchronously. Those chunks are aggregated by the TTS Service to ensure the best audio response (TTS works best when it gets complete sentence, so it can inflect correctly), then sent to Azure’s TTS service, converted to audio frames, and sent to the WebRTC session via the Daily transport. + +### Pre-cache an LLM response +Sometimes LLMs can be slower than we’d like for natural-feeling communication. Here’s an example where we take advantage of the time it takes to speak some pre-defined text to get a head start on the LLM response: + +(TK link to 04- sample) + +In this sample, we set up a buffer queue to receive the audio frames from the LLM response before while we are joining the call and start an asynchronous task to start filling this buffer: +``` + buffer_queue = asyncio.Queue() + llm_response_task = asyncio.create_task( + elevenlabs_tts.run_to_queue( + buffer_queue, + llm.run([QueueFrame(FrameType.LLM_MESSAGE, messages)]), + True, + ) + ) +``` + +Then, when we’ve joined the call, we speak the static text: +``` + await azure_tts.say("My friend...", transport.send_queue) +``` + +As that text is being spoken, the asynchronous LLM task continues in the background. When the text is done, we pull the frames off the buffer queue and put them in the transport’s `send_queue`: +``` + async def buffer_to_send_queue(): + while True: + frame = await buffer_queue.get() + await transport.send_queue.put(frame) + buffer_queue.task_done() + if frame.frame_type == FrameType.END_STREAM: + break + + await asyncio.gather(llm_response_task, buffer_to_send_queue()) + +``` + +One thing to note here is the last parameter to `run_to_queue` in the first code clause above: this causes the `run_to_queue` method to send an `END_STREAM` frame when it’s done rendering. This lets us know when to stop our `buffer_to_send_queue` task above. From 95c92e53040eebbb70f56906bc16aa808c7fede9 Mon Sep 17 00:00:00 2001 From: Moishe Lettvin Date: Mon, 22 Jan 2024 10:59:13 -0500 Subject: [PATCH 5/9] Aggregators for LLM messages --- src/dailyai/queue_aggregators.py | 67 +++++++++++++++++++ src/dailyai/queue_frame.py | 5 +- src/dailyai/services/ai_services.py | 4 +- .../services/daily_transport_service.py | 6 +- .../foundational/06-listen-and-respond.py | 45 ++++++------- 5 files changed, 95 insertions(+), 32 deletions(-) create mode 100644 src/dailyai/queue_aggregators.py diff --git a/src/dailyai/queue_aggregators.py b/src/dailyai/queue_aggregators.py new file mode 100644 index 000000000..a7d815d85 --- /dev/null +++ b/src/dailyai/queue_aggregators.py @@ -0,0 +1,67 @@ +import asyncio + +from dailyai.queue_frame import QueueFrame, FrameType +from dailyai.services.ai_services import AIService + +from typing import AsyncGenerator, List + +class QueueTee: + async def run_to_queue_and_generate( + self, + output_queue: asyncio.Queue, + generator: AsyncGenerator[QueueFrame, None] + ) -> AsyncGenerator[QueueFrame, None]: + async for frame in generator: + await output_queue.put(frame) + yield frame + + async def run_to_queues( + self, + output_queues: List[asyncio.Queue], + generator: AsyncGenerator[QueueFrame, None] + ): + async for frame in generator: + for queue in output_queues: + await queue.put(frame) + +class TranscriptionToLLMMessageAggregator(AIService): + def __init__(self, messages, bot_participant_id): + self.messages = messages + self.bot_participant_id = bot_participant_id + self.sentence = "" + + async def process_frame(self, frame:QueueFrame) -> AsyncGenerator[QueueFrame, None]: + if frame.frame_type != FrameType.TRANSCRIPTION: + return + + message = frame.frame_data + if not isinstance(message, dict): + return + + if message["session_id"] == self.bot_participant_id: + return + + print("transcription to message", frame) + + # todo: we could differentiate between transcriptions from different participants + self.sentence += message["text"] + if self.sentence.endswith((".", "?", "!")): + self.messages.append({"role": "user", "content": self.sentence}) + self.sentence = "" + yield QueueFrame(FrameType.LLM_MESSAGE, self.messages) + + +class LLMResponseToLLMMessageAggregator(AIService): + def __init__(self, messages): + self.messages = messages + self.sentence = "" + + async def process_frame(self, frame:QueueFrame) -> AsyncGenerator[QueueFrame, None]: + if frame.frame_type == FrameType.TEXT and isinstance(frame.frame_data, str): + print("llmresponse to message", frame) + self.sentence += frame.frame_data + if self.sentence.endswith((".", "?", "!")): + self.messages.append({"role": "assistant", "content": self.sentence}) + self.sentence = "" + + yield frame diff --git a/src/dailyai/queue_frame.py b/src/dailyai/queue_frame.py index 7e77ff89d..9d6ce6407 100644 --- a/src/dailyai/queue_frame.py +++ b/src/dailyai/queue_frame.py @@ -8,8 +8,9 @@ class FrameType(Enum): AUDIO = 2 IMAGE = 3 TEXT = 4 - LLM_MESSAGE = 5 - APP_MESSAGE = 6 + TRANSCRIPTION = 5 + LLM_MESSAGE = 6 + APP_MESSAGE = 7 @dataclass(frozen=True) class QueueFrame: diff --git a/src/dailyai/services/ai_services.py b/src/dailyai/services/ai_services.py index 9ad8e0399..fc7002d92 100644 --- a/src/dailyai/services/ai_services.py +++ b/src/dailyai/services/ai_services.py @@ -7,11 +7,9 @@ from httpx import request from dailyai.queue_frame import QueueFrame, FrameType from abc import abstractmethod -from typing import AsyncGenerator, Iterable +from typing import AsyncGenerator, AsyncIterable, Iterable from dataclasses import dataclass -from typing import AsyncGenerator -from collections.abc import Iterable, AsyncIterable class AIService: diff --git a/src/dailyai/services/daily_transport_service.py b/src/dailyai/services/daily_transport_service.py index 5d300cad3..f71e19268 100644 --- a/src/dailyai/services/daily_transport_service.py +++ b/src/dailyai/services/daily_transport_service.py @@ -57,6 +57,7 @@ class DailyTransportService(EventHandler): self.receive_queue = asyncio.Queue() self.other_participant_has_joined = False + self.my_participant_id = None self.camera_thread = None self.frame_consumer_thread = None @@ -150,6 +151,7 @@ class DailyTransportService(EventHandler): self.client.set_user_name(self.bot_name) self.client.join(self.room_url, self.token, completion=self.call_joined) + self.my_participant_id = self.client.participants()["local"]["id"] self.client.update_inputs( { @@ -193,8 +195,6 @@ class DailyTransportService(EventHandler): if self.token: self.client.start_transcription(self.transcription_settings) - self.my_participant_id = self.client.participants()["local"]["id"] - async def get_receive_frames(self): while True: frame = await self.receive_queue.get() @@ -279,7 +279,7 @@ class DailyTransportService(EventHandler): def on_transcription_message(self, message:dict): if self.loop: - frame = QueueFrame(FrameType.TEXT, message) + frame = QueueFrame(FrameType.TRANSCRIPTION, message) asyncio.run_coroutine_threadsafe(self.receive_queue.put(frame), self.loop) def on_transcription_stopped(self, stopped_by, stopped_by_error): diff --git a/src/samples/foundational/06-listen-and-respond.py b/src/samples/foundational/06-listen-and-respond.py index ef34d1832..50e8c1cc3 100644 --- a/src/samples/foundational/06-listen-and-respond.py +++ b/src/samples/foundational/06-listen-and-respond.py @@ -6,7 +6,10 @@ import urllib.parse from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService -from dailyai.queue_frame import QueueFrame, FrameType +from dailyai.queue_aggregators import ( + TranscriptionToLLMMessageAggregator, + LLMResponseToLLMMessageAggregator, +) async def main(room_url:str, token): global transport @@ -17,7 +20,7 @@ async def main(room_url:str, token): room_url, token, "Respond bot", - 1, + 5, ) transport.mic_enabled = True transport.mic_sample_rate = 16000 @@ -26,33 +29,27 @@ async def main(room_url:str, token): llm = AzureLLMService() tts = AzureTTSService() + @transport.event_handler("on_first_other_participant_joined") + async def on_first_other_participant_joined(transport): + await tts.say("Hi, I'm listening!", transport.send_queue) + async def handle_transcriptions(): messages = [ {"role": "system", "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way."}, ] - sentence = "" - async for frame in transport.get_receive_frames(): - if frame.frame_type != FrameType.TEXT: - continue - - message = frame.frame_data - if message["session_id"] == transport.my_participant_id: - continue - - # todo: we could differentiate between transcriptions from different participants - sentence += message["text"] - if sentence.endswith((".", "?", "!")): - messages.append({"role": "user", "content": sentence}) - sentence = '' - - full_response = "" - async for response in llm.run_llm_async_sentences(messages): - full_response += response - async for audio in tts.run_tts(response): - await transport.send_queue.put(QueueFrame(FrameType.AUDIO, audio)) - - messages.append({"role": "assistant", "content": full_response}) + tma_in = TranscriptionToLLMMessageAggregator(messages, transport.my_participant_id) + tma_out = LLMResponseToLLMMessageAggregator(messages) + await tts.run_to_queue( + transport.send_queue, + tma_out.run( + llm.run( + tma_in.run( + transport.get_receive_frames() + ) + ) + ) + ) transport.transcription_settings["extra"]["punctuate"] = True await asyncio.gather(transport.run(), handle_transcriptions()) From 3fda9b0ecbf1bfd238e3821f5b20849b9e3904f5 Mon Sep 17 00:00:00 2001 From: Moishe Lettvin Date: Mon, 22 Jan 2024 16:02:35 -0500 Subject: [PATCH 6/9] Use more flexibile aggregator --- src/dailyai/queue_aggregators.py | 46 ++++++++----------- src/dailyai/services/ai_services.py | 8 +++- .../foundational/06-listen-and-respond.py | 13 +++--- 3 files changed, 32 insertions(+), 35 deletions(-) diff --git a/src/dailyai/queue_aggregators.py b/src/dailyai/queue_aggregators.py index a7d815d85..2681b622d 100644 --- a/src/dailyai/queue_aggregators.py +++ b/src/dailyai/queue_aggregators.py @@ -24,44 +24,36 @@ class QueueTee: for queue in output_queues: await queue.put(frame) -class TranscriptionToLLMMessageAggregator(AIService): - def __init__(self, messages, bot_participant_id): +class LLMContextAggregator(AIService): + def __init__(self, messages: list[dict], role:str, bot_participant_id=None): self.messages = messages self.bot_participant_id = bot_participant_id + self.role = role self.sentence = "" async def process_frame(self, frame:QueueFrame) -> AsyncGenerator[QueueFrame, None]: - if frame.frame_type != FrameType.TRANSCRIPTION: - return + content: str = "" - message = frame.frame_data - if not isinstance(message, dict): - return + if frame.frame_type == FrameType.TRANSCRIPTION: + message = frame.frame_data + if not isinstance(message, dict): + return - if message["session_id"] == self.bot_participant_id: - return + if message["session_id"] == self.bot_participant_id: + return - print("transcription to message", frame) + content = message["text"] + elif frame.frame_type == FrameType.TEXT: + if not isinstance(frame.frame_data, str): + return - # todo: we could differentiate between transcriptions from different participants - self.sentence += message["text"] + content = frame.frame_data + + # todo: we should differentiate between transcriptions from different participants + self.sentence += content if self.sentence.endswith((".", "?", "!")): - self.messages.append({"role": "user", "content": self.sentence}) + self.messages.append({"role": self.role, "content": self.sentence}) self.sentence = "" yield QueueFrame(FrameType.LLM_MESSAGE, self.messages) - -class LLMResponseToLLMMessageAggregator(AIService): - def __init__(self, messages): - self.messages = messages - self.sentence = "" - - async def process_frame(self, frame:QueueFrame) -> AsyncGenerator[QueueFrame, None]: - if frame.frame_type == FrameType.TEXT and isinstance(frame.frame_data, str): - print("llmresponse to message", frame) - self.sentence += frame.frame_data - if self.sentence.endswith((".", "?", "!")): - self.messages.append({"role": "assistant", "content": self.sentence}) - self.sentence = "" - yield frame diff --git a/src/dailyai/services/ai_services.py b/src/dailyai/services/ai_services.py index fc7002d92..ab7652dde 100644 --- a/src/dailyai/services/ai_services.py +++ b/src/dailyai/services/ai_services.py @@ -112,8 +112,12 @@ class TTSService(AIService): yield bytes() async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: - if frame.frame_type != FrameType.TEXT or type(frame.frame_data) != str: - raise Exception(f"TTS service requires a string for the data field, got {frame.frame_type} and frame_data type {type(frame.frame_data)}") + if frame.frame_type != FrameType.TEXT: + yield frame + return + + if not isinstance(frame.frame_data, str): + raise(Exception(f"Invalid data type in frame type: {frame.frame_type}, type: {type(frame.frame_data)}")) text: str | None = None if not self.aggregate_sentences: diff --git a/src/samples/foundational/06-listen-and-respond.py b/src/samples/foundational/06-listen-and-respond.py index 50e8c1cc3..22337f414 100644 --- a/src/samples/foundational/06-listen-and-respond.py +++ b/src/samples/foundational/06-listen-and-respond.py @@ -6,10 +6,7 @@ import urllib.parse from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService -from dailyai.queue_aggregators import ( - TranscriptionToLLMMessageAggregator, - LLMResponseToLLMMessageAggregator, -) +from dailyai.queue_aggregators import LLMContextAggregator async def main(room_url:str, token): global transport @@ -38,8 +35,12 @@ async def main(room_url:str, token): {"role": "system", "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way."}, ] - tma_in = TranscriptionToLLMMessageAggregator(messages, transport.my_participant_id) - tma_out = LLMResponseToLLMMessageAggregator(messages) + tma_in = LLMContextAggregator( + messages, "user", transport.my_participant_id + ) + tma_out = LLMContextAggregator( + messages, "assistant", transport.my_participant_id + ) await tts.run_to_queue( transport.send_queue, tma_out.run( From b51abd2969c0db8a74720541856a6ae2b3f4de41 Mon Sep 17 00:00:00 2001 From: Liza <65890040+lazeratops@users.noreply.github.com> Date: Tue, 23 Jan 2024 14:33:27 +0100 Subject: [PATCH 7/9] facilitate manual call management (#7) --- .gitignore | 1 + src/dailyai/services/daily_transport_service.py | 10 ++++++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index 62f24b6da..d23851756 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ env/ __pycache__/ *~ +venv #*# # Distribution / packaging diff --git a/src/dailyai/services/daily_transport_service.py b/src/dailyai/services/daily_transport_service.py index f71e19268..77756a5c2 100644 --- a/src/dailyai/services/daily_transport_service.py +++ b/src/dailyai/services/daily_transport_service.py @@ -30,6 +30,7 @@ class DailyTransportService(EventHandler): token: str | None, bot_name: str, duration: float = 10, + min_others_count: int = 1, ): super().__init__() self.bot_name: str = bot_name @@ -37,6 +38,7 @@ class DailyTransportService(EventHandler): self.token: str | None = token self.duration: float = duration self.expiration = time.time() + duration * 60 + self.min_others_count = min_others_count # This queue is used to marshal frames from the async send queue to the thread that emits audio & video. # We need this to maintain the asynchronous behavior of asyncio queues -- to give async functions @@ -224,14 +226,14 @@ class DailyTransportService(EventHandler): async def run(self) -> None: self.configure_daily() - self.participant_left = False + self.do_shutdown = False async_output_queue_marshal_task = asyncio.create_task(self.marshal_frames()) try: participant_count: int = len(self.client.participants()) self.logger.info(f"{participant_count} participants in room") - while time.time() < self.expiration and not self.participant_left and not self.stop_threads.is_set(): + while time.time() < self.expiration and not self.do_shutdown and not self.stop_threads.is_set(): await asyncio.sleep(1) except Exception as e: self.logger.error(f"Exception {e}") @@ -270,8 +272,8 @@ class DailyTransportService(EventHandler): self.on_first_other_participant_joined() def on_participant_left(self, participant, reason): - if len(self.client.participants()) < 2: - self.participant_left = True + if len(self.client.participants()) < self.min_others_count + 1: + self.do_shutdown = True pass def on_app_message(self, message, sender): From c134598a77faf9276506937d2c9a263281aad2c3 Mon Sep 17 00:00:00 2001 From: Moishe Lettvin Date: Tue, 23 Jan 2024 09:33:51 -0500 Subject: [PATCH 8/9] Refactor QueueFrame --- src/dailyai/queue_aggregators.py | 22 ++----- src/dailyai/queue_frame.py | 37 +++++++++++ src/dailyai/services/ai_services.py | 61 ++++++++----------- .../services/daily_transport_service.py | 42 ++++++++----- src/dailyai/tests/test_ai_services.py | 11 ++-- src/samples/foundational/01-say-one-thing.py | 3 - .../foundational/02-llm-say-one-thing.py | 5 +- src/samples/foundational/03-still-frame.py | 4 +- .../foundational/04-utterance-and-speech.py | 6 +- .../foundational/05-sync-speech-and-text.py | 18 +++--- 10 files changed, 116 insertions(+), 93 deletions(-) diff --git a/src/dailyai/queue_aggregators.py b/src/dailyai/queue_aggregators.py index 2681b622d..ee6ca34bb 100644 --- a/src/dailyai/queue_aggregators.py +++ b/src/dailyai/queue_aggregators.py @@ -1,6 +1,6 @@ import asyncio -from dailyai.queue_frame import QueueFrame, FrameType +from dailyai.queue_frame import LLMMessagesQueueFrame, QueueFrame, TextQueueFrame from dailyai.services.ai_services import AIService from typing import AsyncGenerator, List @@ -34,26 +34,14 @@ class LLMContextAggregator(AIService): async def process_frame(self, frame:QueueFrame) -> AsyncGenerator[QueueFrame, None]: content: str = "" - if frame.frame_type == FrameType.TRANSCRIPTION: - message = frame.frame_data - if not isinstance(message, dict): - return + # TODO: split up transcription by participant + if isinstance(frame, TextQueueFrame): + content = frame.text - if message["session_id"] == self.bot_participant_id: - return - - content = message["text"] - elif frame.frame_type == FrameType.TEXT: - if not isinstance(frame.frame_data, str): - return - - content = frame.frame_data - - # todo: we should differentiate between transcriptions from different participants self.sentence += content if self.sentence.endswith((".", "?", "!")): self.messages.append({"role": self.role, "content": self.sentence}) self.sentence = "" - yield QueueFrame(FrameType.LLM_MESSAGE, self.messages) + yield LLMMessagesQueueFrame(self.messages) yield frame diff --git a/src/dailyai/queue_frame.py b/src/dailyai/queue_frame.py index 9d6ce6407..fb3b51d18 100644 --- a/src/dailyai/queue_frame.py +++ b/src/dailyai/queue_frame.py @@ -1,6 +1,8 @@ from enum import Enum from dataclasses import dataclass +from typing import Any +""" class FrameType(Enum): NOOP = -1 START_STREAM = 0 @@ -16,3 +18,38 @@ class FrameType(Enum): class QueueFrame: frame_type: FrameType frame_data: str | dict | bytes | list | None +""" +class QueueFrame: + pass + +class StartStreamQueueFrame(QueueFrame): + pass + +class EndStreamQueueFrame(QueueFrame): + pass + +@dataclass() +class AudioQueueFrame(QueueFrame): + data: bytes + +@dataclass() +class ImageQueueFrame(QueueFrame): + url: str | None + image: bytes + +@dataclass() +class TextQueueFrame(QueueFrame): + text: str + +@dataclass() +class TranscriptionQueueFrame(TextQueueFrame): + participantId: str + timestamp: str + +@dataclass() +class LLMMessagesQueueFrame(QueueFrame): + messages: list[dict[str,str]] # TODO: define this more concretely! + +class AppMessageQueueFrame(QueueFrame): + message: Any + participantId: str diff --git a/src/dailyai/services/ai_services.py b/src/dailyai/services/ai_services.py index ab7652dde..684912989 100644 --- a/src/dailyai/services/ai_services.py +++ b/src/dailyai/services/ai_services.py @@ -1,10 +1,14 @@ import asyncio import logging -import re -from httpx import request - -from dailyai.queue_frame import QueueFrame, FrameType +from dailyai.queue_frame import ( + AudioQueueFrame, + EndStreamQueueFrame, + ImageQueueFrame, + LLMMessagesQueueFrame, + QueueFrame, + TextQueueFrame, +) from abc import abstractmethod from typing import AsyncGenerator, AsyncIterable, Iterable @@ -24,7 +28,7 @@ class AIService: await queue.put(frame) if add_end_of_stream: - await queue.put(QueueFrame(FrameType.END_STREAM, None)) + await queue.put(EndStreamQueueFrame()) async def run( self, @@ -46,7 +50,7 @@ class AIService: frame = await frames.get() async for output_frame in self.process_frame(frame): yield output_frame - if frame.frame_type == FrameType.END_STREAM: + if isinstance(frame, EndStreamQueueFrame): break else: raise Exception("Frames must be an iterable or async iterable") @@ -61,21 +65,15 @@ class AIService: async def process_frame(self, frame:QueueFrame) -> AsyncGenerator[QueueFrame, None]: # This is a trick for the interpreter (and linter) to know that this is a generator. if False: - yield QueueFrame(FrameType.NOOP, None) + yield QueueFrame() @abstractmethod async def finalize(self) -> AsyncGenerator[QueueFrame, None]: # This is a trick for the interpreter (and linter) to know that this is a generator. if False: - yield QueueFrame(FrameType.NOOP, None) + yield QueueFrame() class LLMService(AIService): - def allowed_input_frame_types(self) -> set[FrameType]: - return set([FrameType.LLM_MESSAGE]) - - def allowed_output_frame_types(self) -> set[FrameType]: - return set([FrameType.TEXT]) - @abstractmethod async def run_llm_async(self, messages) -> AsyncGenerator[str, None]: yield "" @@ -85,13 +83,9 @@ class LLMService(AIService): pass async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: - if frame.frame_type == FrameType.LLM_MESSAGE: - if type(frame.frame_data) != list: - raise Exception("LLM service requires a dict for the data field") - - messages: list[dict[str, str]] = frame.frame_data - async for text_chunk in self.run_llm_async(messages): - yield QueueFrame(FrameType.TEXT, text_chunk) + if isinstance(frame, LLMMessagesQueueFrame): + async for text_chunk in self.run_llm_async(frame.messages): + yield TextQueueFrame(text_chunk) class TTSService(AIService): @@ -112,34 +106,31 @@ class TTSService(AIService): yield bytes() async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: - if frame.frame_type != FrameType.TEXT: + if not isinstance(frame, TextQueueFrame): yield frame return - if not isinstance(frame.frame_data, str): - raise(Exception(f"Invalid data type in frame type: {frame.frame_type}, type: {type(frame.frame_data)}")) - text: str | None = None if not self.aggregate_sentences: - text = frame.frame_data + text = frame.text else: - self.current_sentence += frame.frame_data + self.current_sentence += frame.text if self.current_sentence.endswith((".", "?", "!")): text = self.current_sentence self.current_sentence = "" if text: async for audio_chunk in self.run_tts(text): - yield QueueFrame(FrameType.AUDIO, audio_chunk) + yield AudioQueueFrame(audio_chunk) async def finalize(self): if self.current_sentence: async for audio_chunk in self.run_tts(self.current_sentence): - yield QueueFrame(FrameType.AUDIO, audio_chunk) + yield AudioQueueFrame(audio_chunk) # Convenience function to send the audio for a sentence to the given queue async def say(self, sentence, queue: asyncio.Queue): - await self.run_to_queue(queue, [QueueFrame(FrameType.TEXT, sentence)]) + await self.run_to_queue(queue, [TextQueueFrame(sentence)]) class ImageGenService(AIService): @@ -149,15 +140,15 @@ class ImageGenService(AIService): # Renders the image. Returns an Image object. @abstractmethod - async def run_image_gen(self, sentence) -> tuple[str, bytes]: + async def run_image_gen(self, sentence:str) -> tuple[str, bytes]: pass async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: - if type(frame.frame_data) != str: - raise Exception("Image service requires a string for the data field") + if not isinstance(frame, TextQueueFrame): + return - (_, image_data) = await self.run_image_gen(frame.frame_data) - yield QueueFrame(FrameType.IMAGE, image_data) + (url, image_data) = await self.run_image_gen(frame.text) + yield ImageQueueFrame(url, image_data) @dataclass diff --git a/src/dailyai/services/daily_transport_service.py b/src/dailyai/services/daily_transport_service.py index f71e19268..4388df324 100644 --- a/src/dailyai/services/daily_transport_service.py +++ b/src/dailyai/services/daily_transport_service.py @@ -8,9 +8,16 @@ import types from functools import partial from queue import Queue, Empty -from dailyai.queue_frame import QueueFrame, FrameType +from dailyai.queue_frame import ( + AudioQueueFrame, + EndStreamQueueFrame, + ImageQueueFrame, + QueueFrame, + StartStreamQueueFrame, + TranscriptionQueueFrame, +) -from threading import Thread, Event, Timer +from threading import Thread, Event from daily import ( EventHandler, @@ -199,7 +206,7 @@ class DailyTransportService(EventHandler): while True: frame = await self.receive_queue.get() yield frame - if frame.frame_type == FrameType.END_STREAM: + if isinstance(frame, EndStreamQueueFrame): break def get_async_send_queue(self): @@ -210,7 +217,7 @@ class DailyTransportService(EventHandler): frame: QueueFrame | list = await self.send_queue.get() self.threadsafe_send_queue.put(frame) self.send_queue.task_done() - if type(frame) == QueueFrame and frame.frame_type == FrameType.END_STREAM: + if isinstance(frame, EndStreamQueueFrame): break async def wait_for_send_queue_to_empty(self): @@ -240,8 +247,8 @@ class DailyTransportService(EventHandler): self.stop_threads.set() - await self.receive_queue.put(QueueFrame(FrameType.END_STREAM, None)) - await self.send_queue.put(QueueFrame(FrameType.END_STREAM, None)) + await self.receive_queue.put(EndStreamQueueFrame()) + await self.send_queue.put(EndStreamQueueFrame()) await async_output_queue_marshal_task if self.camera_thread and self.camera_thread.is_alive(): @@ -279,7 +286,12 @@ class DailyTransportService(EventHandler): def on_transcription_message(self, message:dict): if self.loop: - frame = QueueFrame(FrameType.TRANSCRIPTION, message) + participantId = "" + if "participantId" in message: + participantId = message["participantId"] + elif "session_id" in message: + participantId = message["session_id"] + frame = TranscriptionQueueFrame(message["text"], participantId, message["timestamp"]) asyncio.run_coroutine_threadsafe(self.receive_queue.put(frame), self.loop) def on_transcription_stopped(self, stopped_by, stopped_by_error): @@ -312,15 +324,15 @@ class DailyTransportService(EventHandler): while True: try: frames_or_frame: QueueFrame | list[QueueFrame] = self.threadsafe_send_queue.get() - if type(frames_or_frame) == QueueFrame: + if isinstance(frames_or_frame, QueueFrame): frames: list[QueueFrame] = [frames_or_frame] - elif type(frames_or_frame) == list: + elif isinstance(frames_or_frame, list): frames: list[QueueFrame] = frames_or_frame else: raise Exception("Unknown type in output queue") for frame in frames: - if frame.frame_type == FrameType.END_STREAM: + if isinstance(frame, EndStreamQueueFrame): self.logger.info("Stopping frame consumer thread") self.threadsafe_send_queue.task_done() return @@ -328,8 +340,8 @@ class DailyTransportService(EventHandler): # if interrupted, we just pull frames off the queue and discard them if not self.is_interrupted.is_set(): if frame: - if frame.frame_type == FrameType.AUDIO: - chunk = frame.frame_data + if isinstance(frame, AudioQueueFrame): + chunk = frame.data all_audio_frames.extend(chunk) @@ -338,8 +350,8 @@ class DailyTransportService(EventHandler): if l: self.mic.write_frames(bytes(b[:l])) b = b[l:] - elif frame.frame_type == FrameType.IMAGE: - self.set_image(frame.frame_data) + elif isinstance(frame, ImageQueueFrame): + self.set_image(frame.image) elif len(b): self.mic.write_frames(bytes(b)) b = bytearray() @@ -350,7 +362,7 @@ class DailyTransportService(EventHandler): ) self.interrupt_time = None - if frame.frame_type == FrameType.START_STREAM: + if isinstance(frame, StartStreamQueueFrame): self.is_interrupted.clear() self.threadsafe_send_queue.task_done() diff --git a/src/dailyai/tests/test_ai_services.py b/src/dailyai/tests/test_ai_services.py index bfe8dbde8..8bb1e2b00 100644 --- a/src/dailyai/tests/test_ai_services.py +++ b/src/dailyai/tests/test_ai_services.py @@ -4,7 +4,7 @@ import unittest from typing import AsyncGenerator, Generator from dailyai.services.ai_services import AIService -from dailyai.queue_frame import QueueFrame, FrameType +from dailyai.queue_frame import EndStreamQueueFrame, QueueFrame, TextQueueFrame class SimpleAIService(AIService): async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: @@ -15,8 +15,8 @@ class TestBaseAIService(unittest.IsolatedAsyncioTestCase): service = SimpleAIService() input_frames = [ - QueueFrame(FrameType.TEXT, "hello"), - QueueFrame(FrameType.END_STREAM, None), + TextQueueFrame("hello"), + EndStreamQueueFrame() ] async def iterate_frames() -> AsyncGenerator[QueueFrame, None]: for frame in input_frames: @@ -31,10 +31,7 @@ class TestBaseAIService(unittest.IsolatedAsyncioTestCase): async def test_nonasync_input(self): service = SimpleAIService() - input_frames = [ - QueueFrame(FrameType.TEXT, "hello"), - QueueFrame(FrameType.END_STREAM, None), - ] + input_frames = [TextQueueFrame("hello"), EndStreamQueueFrame()] def iterate_frames() -> Generator[QueueFrame, None, None]: for frame in input_frames: diff --git a/src/samples/foundational/01-say-one-thing.py b/src/samples/foundational/01-say-one-thing.py index 27b6a3b3d..ac0778fbc 100644 --- a/src/samples/foundational/01-say-one-thing.py +++ b/src/samples/foundational/01-say-one-thing.py @@ -1,10 +1,7 @@ import argparse import asyncio -from typing import AsyncGenerator -from dailyai.queue_frame import QueueFrame, FrameType from dailyai.services.daily_transport_service import DailyTransportService -from dailyai.services.azure_ai_services import AzureTTSService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService async def main(room_url): diff --git a/src/samples/foundational/02-llm-say-one-thing.py b/src/samples/foundational/02-llm-say-one-thing.py index 3ea0f2714..a54c0ecb7 100644 --- a/src/samples/foundational/02-llm-say-one-thing.py +++ b/src/samples/foundational/02-llm-say-one-thing.py @@ -1,8 +1,7 @@ import argparse import asyncio -from typing import AsyncGenerator -from dailyai.queue_frame import QueueFrame, FrameType +from dailyai.queue_frame import LLMMessagesQueueFrame from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.azure_ai_services import AzureLLMService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService @@ -27,7 +26,7 @@ async def main(room_url): tts_task = asyncio.create_task( tts.run_to_queue( transport.send_queue, - llm.run([QueueFrame(FrameType.LLM_MESSAGE, messages)]) + llm.run([LLMMessagesQueueFrame(messages)]), ) ) diff --git a/src/samples/foundational/03-still-frame.py b/src/samples/foundational/03-still-frame.py index c69b12547..5ffdcc5ac 100644 --- a/src/samples/foundational/03-still-frame.py +++ b/src/samples/foundational/03-still-frame.py @@ -1,7 +1,7 @@ import argparse import asyncio -from dailyai.queue_frame import QueueFrame, FrameType +from dailyai.queue_frame import TextQueueFrame from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.open_ai_services import OpenAIImageGenService @@ -23,7 +23,7 @@ async def main(room_url): imagegen = OpenAIImageGenService(image_size="1024x1024") image_task = asyncio.create_task( - imagegen.run_to_queue(transport.send_queue, [QueueFrame(FrameType.TEXT, "a cat in the style of picasso")]) + imagegen.run_to_queue(transport.send_queue, [TextQueueFrame("a cat in the style of picasso")]) ) @transport.event_handler("on_participant_joined") diff --git a/src/samples/foundational/04-utterance-and-speech.py b/src/samples/foundational/04-utterance-and-speech.py index 296d17e6d..32844cb6f 100644 --- a/src/samples/foundational/04-utterance-and-speech.py +++ b/src/samples/foundational/04-utterance-and-speech.py @@ -4,7 +4,7 @@ import re from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService -from dailyai.queue_frame import QueueFrame, FrameType +from dailyai.queue_frame import EndStreamQueueFrame, LLMMessagesQueueFrame from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService async def main(room_url:str): @@ -35,7 +35,7 @@ async def main(room_url:str): llm_response_task = asyncio.create_task( elevenlabs_tts.run_to_queue( buffer_queue, - llm.run([QueueFrame(FrameType.LLM_MESSAGE, messages)]), + llm.run([LLMMessagesQueueFrame(messages)]), True, ) ) @@ -52,7 +52,7 @@ async def main(room_url:str): frame = await buffer_queue.get() await transport.send_queue.put(frame) buffer_queue.task_done() - if frame.frame_type == FrameType.END_STREAM: + if isinstance(frame, EndStreamQueueFrame): break await asyncio.gather(llm_response_task, buffer_to_send_queue()) diff --git a/src/samples/foundational/05-sync-speech-and-text.py b/src/samples/foundational/05-sync-speech-and-text.py index d589e778a..7e6239288 100644 --- a/src/samples/foundational/05-sync-speech-and-text.py +++ b/src/samples/foundational/05-sync-speech-and-text.py @@ -1,13 +1,9 @@ import argparse import asyncio -from asyncio.queues import Queue -import re - -from dailyai.queue_frame import QueueFrame, FrameType +from dailyai.queue_frame import AudioQueueFrame, ImageQueueFrame from dailyai.services.azure_ai_services import AzureLLMService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService -from dailyai.services.open_ai_services import OpenAIImageGenService from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.fal_ai_services import FalImageGenService @@ -48,14 +44,20 @@ async def main(room_url): ] image_description = await llm.run_llm(messages) + if not image_description: + return + to_speak = f"{month}: {image_description}" + audio_task = asyncio.create_task(get_all_audio(to_speak)) + image_task = asyncio.create_task(dalle.run_image_gen(image_description)) (audio, image_data) = await asyncio.gather( - get_all_audio(to_speak), dalle.run_image_gen(image_description) + audio_task, image_task ) return { "month": month, "text": image_description, + "image_url": image_data[0], "image": image_data[1], "audio": audio, } @@ -84,8 +86,8 @@ async def main(room_url): data = await month_data_task await transport.send_queue.put( [ - QueueFrame(FrameType.IMAGE, data["image"]), - QueueFrame(FrameType.AUDIO, data["audio"]), + ImageQueueFrame(data["image_url"], data["image"]), + AudioQueueFrame(data["audio"]), ] ) From ba89e41c5b6b9dec2b8ac454408cf0f8972fddf4 Mon Sep 17 00:00:00 2001 From: Moishe Lettvin Date: Tue, 23 Jan 2024 09:37:15 -0500 Subject: [PATCH 9/9] remove commented-out code --- src/dailyai/queue_frame.py | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/src/dailyai/queue_frame.py b/src/dailyai/queue_frame.py index fb3b51d18..d72345aaf 100644 --- a/src/dailyai/queue_frame.py +++ b/src/dailyai/queue_frame.py @@ -2,23 +2,6 @@ from enum import Enum from dataclasses import dataclass from typing import Any -""" -class FrameType(Enum): - NOOP = -1 - START_STREAM = 0 - END_STREAM = 1 - AUDIO = 2 - IMAGE = 3 - TEXT = 4 - TRANSCRIPTION = 5 - LLM_MESSAGE = 6 - APP_MESSAGE = 7 - -@dataclass(frozen=True) -class QueueFrame: - frame_type: FrameType - frame_data: str | dict | bytes | list | None -""" class QueueFrame: pass