diff --git a/src/dailyai/async_processor/async_processor.py b/src/dailyai/async_processor/async_processor.py index b0ff21a7f..9c4b4d0aa 100644 --- a/src/dailyai/async_processor/async_processor.py +++ b/src/dailyai/async_processor/async_processor.py @@ -9,7 +9,7 @@ from queue import Queue, PriorityQueue, Empty from threading import Event, Semaphore, Thread from typing import Any, Generator, Iterator, Optional, Type -from dailyai.output_queue import OutputQueueFrame, FrameType +from dailyai.queue_frame import QueueFrame, FrameType from dailyai.message_handler.message_handler import MessageHandler from dailyai.services.ai_services import AIServiceConfig @@ -268,10 +268,10 @@ class LLMResponse(OrchestratorResponse): if out.strip(): yield out.strip() - def get_frames_from_tts_response(self, audio_frame) -> list[OutputQueueFrame]: - return [OutputQueueFrame(FrameType.AUDIO_FRAME, audio_frame)] + def get_frames_from_tts_response(self, audio_frame) -> list[QueueFrame]: + return [QueueFrame(FrameType.AUDIO_FRAME, audio_frame)] - def get_frames_from_chunk(self, chunk) -> Generator[list[OutputQueueFrame], Any, None]: + def get_frames_from_chunk(self, chunk) -> Generator[list[QueueFrame], Any, None]: for audio_frame in self.services.tts.run_tts(chunk): yield self.get_frames_from_tts_response(audio_frame) @@ -317,7 +317,7 @@ class LLMResponse(OrchestratorResponse): break if not self.has_sent_first_frame: - self.output_queue.put(OutputQueueFrame(FrameType.START_STREAM, None)) + self.output_queue.put(QueueFrame(FrameType.START_STREAM, None)) self.has_sent_first_frame = True for frame in frames: diff --git a/src/dailyai/orchestrator.py b/src/dailyai/orchestrator.py index 19b885d1c..707a2a058 100644 --- a/src/dailyai/orchestrator.py +++ b/src/dailyai/orchestrator.py @@ -15,7 +15,7 @@ from dailyai.async_processor.async_processor import ( OrchestratorResponse, LLMResponse, ) -from dailyai.output_queue import OutputQueueFrame, FrameType +from dailyai.queue_frame import QueueFrame, FrameType from dailyai.services.ai_services import AIServiceConfig from dailyai.message_handler.message_handler import MessageHandler @@ -197,7 +197,7 @@ class Orchestrator(EventHandler): self.logger.info("Camera thread stopped") self.logger.info("Put stop in output queue") - self.output_queue.put(OutputQueueFrame(FrameType.END_STREAM, None)) + self.output_queue.put(QueueFrame(FrameType.END_STREAM, None)) self.frame_consumer_thread.join() self.logger.info("Orchestrator stopped.") @@ -367,7 +367,7 @@ class Orchestrator(EventHandler): all_audio_frames = bytearray() while True: try: - frame:OutputQueueFrame = self.output_queue.get() + frame:QueueFrame = self.output_queue.get() if frame.frame_type == FrameType.END_STREAM: self.logger.info("Stopping frame consumer thread") return diff --git a/src/dailyai/output_queue.py b/src/dailyai/output_queue.py deleted file mode 100644 index bfd1760d4..000000000 --- a/src/dailyai/output_queue.py +++ /dev/null @@ -1,14 +0,0 @@ -from enum import Enum -from dataclasses import dataclass - -class FrameType(Enum): - AUDIO_FRAME = 1 - IMAGE_FRAME = 2 - START_STREAM = 3 - END_STREAM = 4 - - -@dataclass(frozen=True) -class OutputQueueFrame: - frame_type: FrameType - frame_data: bytes | None diff --git a/src/dailyai/queue_frame.py b/src/dailyai/queue_frame.py new file mode 100644 index 000000000..9033992e7 --- /dev/null +++ b/src/dailyai/queue_frame.py @@ -0,0 +1,18 @@ +from enum import Enum +from dataclasses import dataclass + +class FrameType(Enum): + START_STREAM = 0 + END_STREAM = 1 + AUDIO_FRAME = 2 + IMAGE_FRAME = 3 + SENTENCE_FRAME = 4 + TEXT_CHUNK_FRAME = 5 + LLM_MESSAGE_FRAME = 6 + APP_MESSAGE_FRAME = 7 + IMAGE_DESCRIPTION = 8 + +@dataclass(frozen=True) +class QueueFrame: + frame_type: FrameType + frame_data: str | dict | bytes | list | None diff --git a/src/dailyai/services/ai_services.py b/src/dailyai/services/ai_services.py index 60f04cba5..4c39a5cd3 100644 --- a/src/dailyai/services/ai_services.py +++ b/src/dailyai/services/ai_services.py @@ -1,23 +1,56 @@ +import asyncio import logging +import re +from tkinter import END + +from dailyai.queue_frame import QueueFrame, FrameType -from asyncio import Queue from abc import abstractmethod from typing import AsyncGenerator from dataclasses import dataclass - class AIService: - def __init__(self): - self.logger = logging.getLogger("dailyai") - def close(self): + def __init__( + self, + input_queue: asyncio.Queue[QueueFrame] | None = None, + output_queue: asyncio.Queue[QueueFrame] | None = None, + ): + self.logger = logging.getLogger("dailyai") + self.input_queue: asyncio.Queue[QueueFrame] | None = input_queue + self.output_queue: asyncio.Queue[QueueFrame] | None = output_queue + + def stop(self): pass + async def run(self) -> None: + if self.input_queue is None or self.output_queue is None: + raise Exception("Input and output queues must be set before using the run method.") + + while True: + frame = await self.input_queue.get() + self.logger.debug(f"{self.__class__.__name__} got frame:", frame.frame_type) + if frame.frame_type == FrameType.END_STREAM: + self.input_queue.task_done() + await self.output_queue.put(QueueFrame(FrameType.END_STREAM, None)) + break + + output_frame = await self.process_frame(frame) + if output_frame: + await self.output_queue.put(output_frame) + self.input_queue.task_done() + + @abstractmethod + async def process_frame(self, frame) -> QueueFrame | None: + pass + + class LLMService(AIService): # Generate a set of responses to a prompt. Yields a list of responses. @abstractmethod async def run_llm_async(self, messages) -> AsyncGenerator[str, None]: - pass + # Adding a yield here lets the linter know what this method actually does + yield "" # Generate a responses to a prompt. Returns the response @abstractmethod @@ -26,6 +59,29 @@ class LLMService(AIService): ) -> str or None: pass + async def run_llm_async_sentences(self, messages) -> AsyncGenerator[str, None]: + current_text = "" + async for text in self.run_llm_async(messages): + current_text += text + if re.match(r"^.*[.!?]$", text): + yield current_text + current_text = "" + + if current_text: + yield current_text + + async def process_frame(self, frame:QueueFrame) -> QueueFrame | None: + if not self.output_queue: + raise Exception("Output queue must be set before using the run method.") + + if frame.frame_type == FrameType.LLM_MESSAGE_FRAME: + if type(frame.frame_data) != list: + raise Exception("LLM service requires a dict for the data field") + + messages: list[dict[str, str]] = frame.frame_data + async for message in self.run_llm_async_sentences(messages): + await self.output_queue.put(QueueFrame(FrameType.SENTENCE_FRAME, message)) + class TTSService(AIService): # Some TTS services require a specific sample rate. We default to 16k @@ -36,7 +92,20 @@ class TTSService(AIService): # be sent to the microphone device @abstractmethod async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]: - pass + # yield empty bytes here, so linting can infer what this method does + yield bytes() + + async def process_frame(self, frame:QueueFrame) -> QueueFrame | None: + if not self.output_queue: + raise Exception("Output queue must be set before using the run method.") + + if frame.frame_type == FrameType.SENTENCE_FRAME: + if type(frame.frame_data) != str: + raise Exception("TTS service requires a string for the data field") + + text = frame.frame_data + async for audio in self.run_tts(text): + await self.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio)) class ImageGenService(AIService): diff --git a/src/dailyai/services/azure_ai_services.py b/src/dailyai/services/azure_ai_services.py index 4ae1e25aa..452797be0 100644 --- a/src/dailyai/services/azure_ai_services.py +++ b/src/dailyai/services/azure_ai_services.py @@ -16,8 +16,8 @@ from PIL import Image from azure.cognitiveservices.speech import SpeechSynthesizer, SpeechConfig, ResultReason, CancellationReason class AzureTTSService(TTSService): - def __init__(self, speech_key=None, speech_region=None): - super().__init__() + def __init__(self, input_queue=None, output_queue=None, speech_key=None, speech_region=None): + super().__init__(input_queue, output_queue) speech_key = speech_key or os.getenv("AZURE_SPEECH_SERVICE_KEY") speech_region = speech_region or os.getenv("AZURE_SPEECH_SERVICE_REGION") @@ -48,8 +48,8 @@ class AzureTTSService(TTSService): self.logger.info("Error details: {}".format(cancellation_details.error_details)) class AzureLLMService(LLMService): - def __init__(self, api_key=None, azure_endpoint=None, api_version=None, model=None): - super().__init__() + def __init__(self, input_queue=None, output_queue=None, api_key=None, azure_endpoint=None, api_version=None, model=None): + super().__init__(input_queue, output_queue) api_key = api_key or os.getenv("AZURE_CHATGPT_KEY") azure_endpoint = azure_endpoint or os.getenv("AZURE_CHATGPT_ENDPOINT") diff --git a/src/dailyai/services/daily_transport_service.py b/src/dailyai/services/daily_transport_service.py index 485ee1772..1f1fe483b 100644 --- a/src/dailyai/services/daily_transport_service.py +++ b/src/dailyai/services/daily_transport_service.py @@ -7,7 +7,7 @@ import types from functools import partial from queue import Queue, Empty -from dailyai.output_queue import OutputQueueFrame, FrameType +from dailyai.queue_frame import QueueFrame, FrameType from threading import Thread, Event, Timer @@ -45,9 +45,29 @@ class DailyTransportService(EventHandler): self.camera_height = 768 self.camera_enabled = False + self.other_participant_has_joined = False + self.camera_thread = None self.frame_consumer_thread = None + self.transcription_settings = { + "language": "en", + "tier": "nova", + "model": "2-conversationalai", + "profanity_filter": True, + "redact": False, + "extra": { + "endpointing": True, + "punctuate": False, + }, + } + + # This queue is used to marshal frames from the async output queue to the sync output queue + # We need this to maintain the asynchronous behavior of asyncio queues -- to give async functions + # a chance to run while waiting for queue items -- but also to maintain thread safety for the + # primary output queue. + self.async_output_queue = asyncio.Queue() + self.logger: logging.Logger = logging.getLogger("dailyai") self.event_handlers = {} @@ -162,27 +182,34 @@ class DailyTransportService(EventHandler): ) if self.token: - self.client.start_transcription( - { - "language": "en", - "tier": "nova", - "model": "2-conversationalai", - "profanity_filter": True, - "redact": False, - "extra": { - "endpointing": True, - "punctuate": False, - }, - } - ) + self.transcription_queue = asyncio.Queue() + self.client.start_transcription(self.transcription_settings) self.my_participant_id = self.client.participants()["local"]["id"] + async def get_transcriptions(self): + while True: + transcript = await self.transcription_queue.get() + yield transcript + + def get_async_output_queue(self): + return self.async_output_queue + + async def marshal_frames(self): + while True: + frame = await self.async_output_queue.get() + self.output_queue.put(frame) + self.async_output_queue.task_done() + if frame.frame_type == FrameType.END_STREAM: + break + async def run(self) -> None: self.configure_daily() self.participant_left = False + async_output_queue_marshal_task = asyncio.create_task(self.marshal_frames()) + try: participant_count: int = len(self.client.participants()) self.logger.info(f"{participant_count} participants in room") @@ -194,15 +221,21 @@ class DailyTransportService(EventHandler): self.client.leave() self.stop_threads.set() + + await self.async_output_queue.put(QueueFrame(FrameType.END_STREAM, None)) + await async_output_queue_marshal_task + if self.camera_thread and self.camera_thread.is_alive(): self.camera_thread.join() if self.frame_consumer_thread and self.frame_consumer_thread.is_alive(): - self.output_queue.put(OutputQueueFrame(FrameType.END_STREAM, None)) self.frame_consumer_thread.join() def stop(self): self.stop_threads.set() + def on_first_other_participant_joined(self): + pass + def call_joined(self, join_data, client_error): self.logger.info(f"Call_joined: {join_data}, {client_error}") @@ -213,7 +246,9 @@ class DailyTransportService(EventHandler): pass def on_participant_joined(self, participant): - pass + if not self.other_participant_has_joined and participant["id"] != self.my_participant_id: + self.other_participant_has_joined = True + self.on_first_other_participant_joined() def on_participant_left(self, participant, reason): if len(self.client.participants()) < 2: @@ -224,7 +259,10 @@ class DailyTransportService(EventHandler): pass def on_transcription_message(self, message): - pass + print("got transcription", message) + if self.loop: + asyncio.run_coroutine_threadsafe(self.transcription_queue.put(message["text"]), self.loop) + print("put transcription in queue", message) def on_transcription_stopped(self, stopped_by, stopped_by_error): pass @@ -255,11 +293,11 @@ class DailyTransportService(EventHandler): all_audio_frames = bytearray() while True: try: - frames_or_frame: OutputQueueFrame | list[OutputQueueFrame] = self.output_queue.get() - if type(frames_or_frame) == OutputQueueFrame: - frames: list[OutputQueueFrame] = [frames_or_frame] + frames_or_frame: QueueFrame | list[QueueFrame] = self.output_queue.get() + if type(frames_or_frame) == QueueFrame: + frames: list[QueueFrame] = [frames_or_frame] elif type(frames_or_frame) == list: - frames: list[OutputQueueFrame] = frames_or_frame + frames: list[QueueFrame] = frames_or_frame else: raise Exception("Unknown type in output queue") diff --git a/src/dailyai/services/elevenlabs_ai_service.py b/src/dailyai/services/elevenlabs_ai_service.py index 51feb22f3..0d9ec8b54 100644 --- a/src/dailyai/services/elevenlabs_ai_service.py +++ b/src/dailyai/services/elevenlabs_ai_service.py @@ -9,11 +9,11 @@ from dailyai.services.ai_services import TTSService class ElevenLabsTTSService(TTSService): - def __init__(self): - super().__init__() + def __init__(self, input_queue=None, output_queue=None, api_key=None, voice_id=None): + super().__init__(input_queue, output_queue) - self.api_key = os.getenv("ELEVENLABS_API_KEY") - self.voice_id = os.getenv("ELEVENLABS_VOICE_ID") + self.api_key = api_key or os.getenv("ELEVENLABS_API_KEY") + self.voice_id = voice_id or os.getenv("ELEVENLABS_VOICE_ID") async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]: async with aiohttp.ClientSession() as session: diff --git a/src/dailyai/services/open_ai_services.py b/src/dailyai/services/open_ai_services.py index 9d17a7c87..8f2b6154a 100644 --- a/src/dailyai/services/open_ai_services.py +++ b/src/dailyai/services/open_ai_services.py @@ -66,9 +66,14 @@ class OpenAIImageGenService(ImageGenService): size=size ) image_url = image.data[0].url - response = requests.get(image_url) + if not image_url: + raise Exception("No image provided in response", image) - dalle_stream = io.BytesIO(response.content) - dalle_im = Image.open(dalle_stream) + # Load the image from the url + async with aiohttp.ClientSession() as session: + async with session.get(image_url) as response: + image_stream = io.BytesIO(await response.content.read()) + image = Image.open(image_stream) + return (image_url, image.tobytes()) return (image_url, dalle_im.tobytes()) diff --git a/src/dailyai/tests/test_asyncprocessor.py b/src/dailyai/tests/test_asyncprocessor.py index e61dd525f..e9196c520 100644 --- a/src/dailyai/tests/test_asyncprocessor.py +++ b/src/dailyai/tests/test_asyncprocessor.py @@ -11,7 +11,7 @@ from dailyai.async_processor.async_processor import ( LLMResponse, ) from dailyai.message_handler.message_handler import MessageHandler -from dailyai.output_queue import OutputQueueFrame, FrameType +from dailyai.queue_frame import QueueFrame, FrameType from dailyai.services.ai_services import ( AIServiceConfig, ImageGenService, @@ -71,7 +71,7 @@ class TestResponse(unittest.TestCase): output_queue.task_done() while expected_words: - actual_word:OutputQueueFrame = output_queue.get() + actual_word:QueueFrame = output_queue.get() word = expected_words.pop(0) self.assertEqual(actual_word.frame_type, FrameType.AUDIO_FRAME) self.assertEqual(actual_word.frame_data, bytes(word, "utf-8")) @@ -127,7 +127,7 @@ class TestResponse(unittest.TestCase): expected_words = ["Hello", "there.", "How", "are", "you?", "I", "hope", "you", "are", "well."] while expected_words and not stop_processing_output_queue.is_set(): try: - actual_word:OutputQueueFrame = output_queue.get_nowait() + actual_word:QueueFrame = output_queue.get_nowait() if actual_word.frame_type == FrameType.AUDIO_FRAME: time.sleep(0.1) word = expected_words.pop(0) diff --git a/src/samples/static-sprite/sprite-sample.py b/src/samples/static-sprite/sprite-sample.py index b5ec4f5f0..9413a1534 100644 --- a/src/samples/static-sprite/sprite-sample.py +++ b/src/samples/static-sprite/sprite-sample.py @@ -15,7 +15,7 @@ from dailyai.async_processor.async_processor import ( OrchestratorResponse ) from dailyai.orchestrator import OrchestratorConfig, Orchestrator -from dailyai.output_queue import OutputQueueFrame, FrameType +from dailyai.queue_frame import QueueFrame, FrameType from dailyai.message_handler.message_handler import MessageHandler from dailyai.services.ai_services import AIServiceConfig from dailyai.services.azure_ai_services import AzureImageGenService, AzureTTSService, AzureLLMService @@ -40,7 +40,7 @@ class StaticSpriteResponse(OrchestratorResponse): self.image_bytes = img.tobytes() def do_play(self) -> None: - self.output_queue.put(OutputQueueFrame(FrameType.IMAGE_FRAME, self.image_bytes)) + self.output_queue.put(QueueFrame(FrameType.IMAGE_FRAME, self.image_bytes)) class IntroSpriteResponse(StaticSpriteResponse): @@ -71,10 +71,10 @@ class AnimatedSpriteLLMResponse(LLMResponse): with Image.open(full_path) as img: self.image_bytes.append(img.tobytes()) - def get_frames_from_tts_response(self, audio_frame) -> list[OutputQueueFrame]: + def get_frames_from_tts_response(self, audio_frame) -> list[QueueFrame]: return [ - OutputQueueFrame(FrameType.AUDIO_FRAME, audio_frame), - OutputQueueFrame(FrameType.IMAGE_FRAME, random.choice(self.image_bytes)) + QueueFrame(FrameType.AUDIO_FRAME, audio_frame), + QueueFrame(FrameType.IMAGE_FRAME, random.choice(self.image_bytes)) ] diff --git a/src/samples/theoretical-to-real/01-say-one-thing.py b/src/samples/theoretical-to-real/01-say-one-thing.py index fd9ce43f4..f26de3ba3 100644 --- a/src/samples/theoretical-to-real/01-say-one-thing.py +++ b/src/samples/theoretical-to-real/01-say-one-thing.py @@ -2,7 +2,7 @@ import argparse import asyncio from typing import AsyncGenerator -from dailyai.output_queue import OutputQueueFrame, FrameType +from dailyai.queue_frame import QueueFrame, FrameType from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.azure_ai_services import AzureTTSService @@ -37,7 +37,7 @@ async def main(room_url): if participant["info"]["isLocal"]: return async for audio in audio_generator: - transport.output_queue.put(OutputQueueFrame(FrameType.AUDIO_FRAME, audio)) + transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio)) # wait for the output queue to be empty, then leave the meeting transport.output_queue.join() diff --git a/src/samples/theoretical-to-real/01a-greet-user.py b/src/samples/theoretical-to-real/01a-greet-user.py index d3edb9dd0..aadc73fd3 100644 --- a/src/samples/theoretical-to-real/01a-greet-user.py +++ b/src/samples/theoretical-to-real/01a-greet-user.py @@ -2,7 +2,7 @@ import asyncio import time from typing import AsyncGenerator -from dailyai.output_queue import OutputQueueFrame, FrameType +from dailyai.queue_frame import QueueFrame, FrameType from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.azure_ai_services import AzureTTSService from dailyai.services.deepgram_ai_services import DeepgramTTSService @@ -41,13 +41,13 @@ async def main(room_url): audio_generator: AsyncGenerator[bytes, None] = tts.run_tts(f"Hello there, {participant['info']['userName']}!") async for audio in audio_generator: - transport.output_queue.put(OutputQueueFrame(FrameType.AUDIO_FRAME, audio)) + transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio)) print("setting up call state handler") @transport.event_handler("on_call_state_updated") async def on_call_joined(transport, state): print(f"call state callback: {state}") - + await transport.run() diff --git a/src/samples/theoretical-to-real/02-llm-say-one-thing.py b/src/samples/theoretical-to-real/02-llm-say-one-thing.py index 7b717d4ea..f09103bfb 100644 --- a/src/samples/theoretical-to-real/02-llm-say-one-thing.py +++ b/src/samples/theoretical-to-real/02-llm-say-one-thing.py @@ -1,16 +1,12 @@ import argparse import asyncio -import re from typing import AsyncGenerator -from dailyai.output_queue import OutputQueueFrame, FrameType +from dailyai.queue_frame import QueueFrame, FrameType from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.azure_ai_services import AzureLLMService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService -local_joined = False -participant_joined = False - async def main(room_url): meeting_duration_minutes = 1 transport = DailyTransportService( @@ -21,27 +17,26 @@ async def main(room_url): ) transport.mic_enabled = True - tts = ElevenLabsTTSService() - llm = AzureLLMService() + text_to_llm_queue = asyncio.Queue() + llm_to_tts_queue = asyncio.Queue() + + tts = ElevenLabsTTSService( + llm_to_tts_queue, transport.get_async_output_queue(), voice_id="29vD33N1CtxCmqQRPOHJ" + ) + llm = AzureLLMService(text_to_llm_queue, llm_to_tts_queue) messages = [{ "role": "system", - "content": "You are an LLM in a WebRTC session, and your text will be converted to audio. Introduce yourself." + "content": "You are an LLM in a WebRTC session, and this is a 'hello world' demo. Say hello to the world." }] - llm_generator: AsyncGenerator[str, None] = llm.run_llm_async(messages) + await text_to_llm_queue.put(QueueFrame(FrameType.LLM_MESSAGE_FRAME, messages)) + await text_to_llm_queue.put(QueueFrame(FrameType.END_STREAM, None)) - @transport.event_handler("on_participant_joined") - async def on_participant_joined(transport, participant): - if participant["id"] == transport.my_participant_id: - return + llm_task = asyncio.create_task(llm.run()) - current_text = "" - async for text in llm_generator: - current_text += text - if re.match(r"^.*[.!?]$", text): - async for audio in tts.run_tts(current_text): - transport.output_queue.put(OutputQueueFrame(FrameType.AUDIO_FRAME, audio)) - current_text = "" + @transport.event_handler("on_first_other_participant_joined") + async def on_first_other_participant_joined(transport): + await asyncio.gather(llm_task, tts.run()) # wait for the output queue to be empty, then leave the meeting transport.output_queue.join() @@ -56,6 +51,5 @@ if __name__ == "__main__": "-u", "--url", type=str, required=True, help="URL of the Daily room to join" ) - args: argparse.Namespace = parser.parse_args() - + args, unknown = parser.parse_known_args() asyncio.run(main(args.url)) diff --git a/src/samples/theoretical-to-real/03-still-frame.py b/src/samples/theoretical-to-real/03-still-frame.py index 1ec5dccfb..2255dc47b 100644 --- a/src/samples/theoretical-to-real/03-still-frame.py +++ b/src/samples/theoretical-to-real/03-still-frame.py @@ -1,7 +1,7 @@ import argparse import asyncio -from dailyai.output_queue import OutputQueueFrame, FrameType +from dailyai.queue_frame import QueueFrame, FrameType from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.open_ai_services import OpenAIImageGenService @@ -27,7 +27,7 @@ async def main(room_url): @transport.event_handler("on_participant_joined") async def on_participant_joined(transport, participant): (_, image_bytes) = await image_task - transport.output_queue.put(OutputQueueFrame(FrameType.IMAGE_FRAME, image_bytes)) + transport.output_queue.put(QueueFrame(FrameType.IMAGE_FRAME, image_bytes)) await transport.run() diff --git a/src/samples/theoretical-to-real/04-utterance-and-speech.py b/src/samples/theoretical-to-real/04-utterance-and-speech.py index 880845834..0e4cb9caf 100644 --- a/src/samples/theoretical-to-real/04-utterance-and-speech.py +++ b/src/samples/theoretical-to-real/04-utterance-and-speech.py @@ -4,7 +4,7 @@ import re from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService -from dailyai.output_queue import OutputQueueFrame, FrameType +from dailyai.queue_frame import QueueFrame, FrameType async def main(room_url:str): global transport @@ -32,16 +32,20 @@ async def main(room_url:str): # queue two pieces of speech: one specified as a text literal, # and one generated by an llm. We'll kick off the llm first, and let # it generate a response while we're speaking the literal string. + # + # Note that in this case, we don't use `run_llm_async` because we're + # taking advantage of the time spent speaking the first phrase to generate + # the entire LLM response, and this happens asynchronously in a task. llm_response_task = asyncio.create_task(llm.run_llm( [{"role": "system", "content": "tell the user a joke about llamas"}] )) async for audio_chunk in tts.run_tts("My friend the LLM is now going to tell a joke about llamas."): - transport.output_queue.put(OutputQueueFrame(FrameType.AUDIO_FRAME, audio_chunk)) + transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio_chunk)) llm_response = await llm_response_task async for audio_chunk in tts.run_tts(llm_response): - transport.output_queue.put(OutputQueueFrame(FrameType.AUDIO_FRAME, audio_chunk)) + transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio_chunk)) # wait for the output queue to be empty, then leave the meeting diff --git a/src/samples/theoretical-to-real/05-queued.py b/src/samples/theoretical-to-real/05-queued.py index 5c61f8fe5..60711f4bf 100644 --- a/src/samples/theoretical-to-real/05-queued.py +++ b/src/samples/theoretical-to-real/05-queued.py @@ -4,7 +4,7 @@ import asyncio from asyncio.queues import Queue import re -from dailyai.output_queue import OutputQueueFrame, FrameType +from dailyai.queue_frame import QueueFrame, FrameType from dailyai.services.azure_ai_services import AzureLLMService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from dailyai.services.open_ai_services import OpenAIImageGenService @@ -25,7 +25,7 @@ async def main(room_url): transport.camera_height = 1024 llm = AzureLLMService() - tts = ElevenLabsTTSService() + tts = ElevenLabsTTSService(voice_id="ErXwobaYiN019PkySvjV") dalle = OpenAIImageGenService() # Get a complete audio chunk from the given text. Splitting this into its own @@ -39,9 +39,9 @@ async def main(room_url): async def get_month_data(month): image_text = "" - current_clause = "" tts_tasks = [] - async for text in llm.run_llm_async( + first_sentence = True + async for sentence in llm.run_llm_async_sentences( [ { "role": "system", @@ -49,18 +49,24 @@ async def main(room_url): } ] ): - image_text += text - current_clause += text - if re.match(r"^.*[.!?]$", text): - tts_tasks.append(get_all_audio(current_clause)) - current_clause = "" + image_text += sentence + + if first_sentence: + sentence = f"{month}: {sentence}" + else: + first_sentence = False + + tts_tasks.append(get_all_audio(sentence)) tts_tasks.insert(0, dalle.run_image_gen(image_text, "1024x1024")) + print(f"waiting for tasks to finish for {month}") data = await asyncio.gather( *tts_tasks ) + print(f"done gathering tts tasks for {month}") + return { "month": month, "text": image_text, @@ -83,11 +89,8 @@ async def main(room_url): "December", ] - @transport.event_handler("on_participant_joined") - async def on_participant_joined(transport, participant): - if participant["id"] == transport.my_participant_id: - return - + @transport.event_handler("on_first_other_participant_joined") + async def on_first_other_participant_joined(transport): # This will play the months in the order they're completed. The benefit # is we'll have as little delay as possible before the first month, and # likely no delay between months, but the months won't display in order. @@ -95,12 +98,12 @@ async def main(room_url): data = await month_data_task transport.output_queue.put( [ - OutputQueueFrame(FrameType.IMAGE_FRAME, data["image"]), - OutputQueueFrame(FrameType.AUDIO_FRAME, data["audio"][0]), + QueueFrame(FrameType.IMAGE_FRAME, data["image"]), + QueueFrame(FrameType.AUDIO_FRAME, data["audio"][0]), ] ) for audio in data["audio"][1:]: - transport.output_queue.put(OutputQueueFrame(FrameType.AUDIO_FRAME, audio)) + transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio)) # wait for the output queue to be empty, then leave the meeting transport.output_queue.join() @@ -116,6 +119,6 @@ if __name__=="__main__": "-u", "--url", type=str, required=True, help="URL of the Daily room to join" ) - args: argparse.Namespace = parser.parse_args() + args, unknown = parser.parse_known_args() asyncio.run(main(args.url)) diff --git a/src/samples/theoretical-to-real/05-sync-speech-and-text.py b/src/samples/theoretical-to-real/05-sync-speech-and-text.py index e03fc15c6..b66caa89c 100644 --- a/src/samples/theoretical-to-real/05-sync-speech-and-text.py +++ b/src/samples/theoretical-to-real/05-sync-speech-and-text.py @@ -5,7 +5,7 @@ import asyncio from asyncio.queues import Queue import re -from dailyai.output_queue import OutputQueueFrame, FrameType +from dailyai.queue_frame import QueueFrame, FrameType from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from dailyai.services.open_ai_services import OpenAILLMService, OpenAIImageGenService @@ -97,12 +97,12 @@ async def main(room_url): data = await month_data_task transport.output_queue.put( [ - OutputQueueFrame(FrameType.IMAGE_FRAME, data["image"]), - OutputQueueFrame(FrameType.AUDIO_FRAME, data["audio"][0]), + QueueFrame(FrameType.IMAGE_FRAME, data["image"]), + QueueFrame(FrameType.AUDIO_FRAME, data["audio"][0]), ] ) for audio in data["audio"][1:]: - transport.output_queue.put(OutputQueueFrame(FrameType.AUDIO_FRAME, audio)) + transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio)) # wait for the output queue to be empty, then leave the meeting transport.output_queue.join() diff --git a/src/samples/theoretical-to-real/06-listen-and-respond.py b/src/samples/theoretical-to-real/06-listen-and-respond.py index 8dde23eae..034e608f0 100644 --- a/src/samples/theoretical-to-real/06-listen-and-respond.py +++ b/src/samples/theoretical-to-real/06-listen-and-respond.py @@ -6,7 +6,7 @@ import urllib.parse from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService -from dailyai.output_queue import OutputQueueFrame, FrameType +from dailyai.queue_frame import QueueFrame, FrameType async def main(room_url:str, token): global transport @@ -26,28 +26,28 @@ async def main(room_url:str, token): llm = AzureLLMService() tts = AzureTTSService() - transcribed_message = "" - transcription_timeout = None + async def handle_transcriptions(): + messages = [ + {"role": "system", "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way."}, + ] - @transport.event_handler("on_participant_joined") - async def on_joined(transport, participant): - if participant["id"] == transport.my_participant_id: - return + sentence = "" + async for message in transport.get_transcriptions(): + sentence += message + if sentence.endswith((".", "?", "!")): + messages.append({"role": "user", "content": sentence}) + sentence = '' - async for audio_chunk in tts.run_tts("If you say something, I will respond."): - transport.output_queue.put(OutputQueueFrame(FrameType.AUDIO_FRAME, audio_chunk)) + full_response = "" + async for response in llm.run_llm_async_sentences(messages): + full_response += response + async for audio in tts.run_tts(response): + transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio)) - @transport.event_handler("on_transcription_message") - async def on_transcription_message(transport, message) -> None: - nonlocal transcribed_message - nonlocal transcription_timeout - print(message) - if message["session_id"] != transport.my_participant_id: - transcribed_message += message['text'] + messages.append({"role": "assistant", "content": full_response}) - print("message received", transcribed_message) - - await transport.run() + transport.transcription_settings["extra"]["punctuate"] = True + await asyncio.gather(transport.run(), handle_transcriptions()) if __name__ == "__main__":