Merge pull request #2 from daily-co/queueing
Adding queue transportation to services
This commit is contained in:
@@ -9,7 +9,7 @@ from queue import Queue, PriorityQueue, Empty
|
||||
from threading import Event, Semaphore, Thread
|
||||
from typing import Any, Generator, Iterator, Optional, Type
|
||||
|
||||
from dailyai.output_queue import OutputQueueFrame, FrameType
|
||||
from dailyai.queue_frame import QueueFrame, FrameType
|
||||
from dailyai.message_handler.message_handler import MessageHandler
|
||||
from dailyai.services.ai_services import AIServiceConfig
|
||||
|
||||
@@ -268,10 +268,10 @@ class LLMResponse(OrchestratorResponse):
|
||||
if out.strip():
|
||||
yield out.strip()
|
||||
|
||||
def get_frames_from_tts_response(self, audio_frame) -> list[OutputQueueFrame]:
|
||||
return [OutputQueueFrame(FrameType.AUDIO_FRAME, audio_frame)]
|
||||
def get_frames_from_tts_response(self, audio_frame) -> list[QueueFrame]:
|
||||
return [QueueFrame(FrameType.AUDIO_FRAME, audio_frame)]
|
||||
|
||||
def get_frames_from_chunk(self, chunk) -> Generator[list[OutputQueueFrame], Any, None]:
|
||||
def get_frames_from_chunk(self, chunk) -> Generator[list[QueueFrame], Any, None]:
|
||||
for audio_frame in self.services.tts.run_tts(chunk):
|
||||
yield self.get_frames_from_tts_response(audio_frame)
|
||||
|
||||
@@ -317,7 +317,7 @@ class LLMResponse(OrchestratorResponse):
|
||||
break
|
||||
|
||||
if not self.has_sent_first_frame:
|
||||
self.output_queue.put(OutputQueueFrame(FrameType.START_STREAM, None))
|
||||
self.output_queue.put(QueueFrame(FrameType.START_STREAM, None))
|
||||
self.has_sent_first_frame = True
|
||||
|
||||
for frame in frames:
|
||||
|
||||
@@ -15,7 +15,7 @@ from dailyai.async_processor.async_processor import (
|
||||
OrchestratorResponse,
|
||||
LLMResponse,
|
||||
)
|
||||
from dailyai.output_queue import OutputQueueFrame, FrameType
|
||||
from dailyai.queue_frame import QueueFrame, FrameType
|
||||
from dailyai.services.ai_services import AIServiceConfig
|
||||
from dailyai.message_handler.message_handler import MessageHandler
|
||||
|
||||
@@ -197,7 +197,7 @@ class Orchestrator(EventHandler):
|
||||
self.logger.info("Camera thread stopped")
|
||||
|
||||
self.logger.info("Put stop in output queue")
|
||||
self.output_queue.put(OutputQueueFrame(FrameType.END_STREAM, None))
|
||||
self.output_queue.put(QueueFrame(FrameType.END_STREAM, None))
|
||||
|
||||
self.frame_consumer_thread.join()
|
||||
self.logger.info("Orchestrator stopped.")
|
||||
@@ -367,7 +367,7 @@ class Orchestrator(EventHandler):
|
||||
all_audio_frames = bytearray()
|
||||
while True:
|
||||
try:
|
||||
frame:OutputQueueFrame = self.output_queue.get()
|
||||
frame:QueueFrame = self.output_queue.get()
|
||||
if frame.frame_type == FrameType.END_STREAM:
|
||||
self.logger.info("Stopping frame consumer thread")
|
||||
return
|
||||
|
||||
@@ -1,14 +0,0 @@
|
||||
from enum import Enum
|
||||
from dataclasses import dataclass
|
||||
|
||||
class FrameType(Enum):
|
||||
AUDIO_FRAME = 1
|
||||
IMAGE_FRAME = 2
|
||||
START_STREAM = 3
|
||||
END_STREAM = 4
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class OutputQueueFrame:
|
||||
frame_type: FrameType
|
||||
frame_data: bytes | None
|
||||
18
src/dailyai/queue_frame.py
Normal file
18
src/dailyai/queue_frame.py
Normal file
@@ -0,0 +1,18 @@
|
||||
from enum import Enum
|
||||
from dataclasses import dataclass
|
||||
|
||||
class FrameType(Enum):
|
||||
START_STREAM = 0
|
||||
END_STREAM = 1
|
||||
AUDIO_FRAME = 2
|
||||
IMAGE_FRAME = 3
|
||||
SENTENCE_FRAME = 4
|
||||
TEXT_CHUNK_FRAME = 5
|
||||
LLM_MESSAGE_FRAME = 6
|
||||
APP_MESSAGE_FRAME = 7
|
||||
IMAGE_DESCRIPTION = 8
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class QueueFrame:
|
||||
frame_type: FrameType
|
||||
frame_data: str | dict | bytes | list | None
|
||||
@@ -1,23 +1,56 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import re
|
||||
from tkinter import END
|
||||
|
||||
from dailyai.queue_frame import QueueFrame, FrameType
|
||||
|
||||
from asyncio import Queue
|
||||
from abc import abstractmethod
|
||||
from typing import AsyncGenerator
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
class AIService:
|
||||
def __init__(self):
|
||||
self.logger = logging.getLogger("dailyai")
|
||||
|
||||
def close(self):
|
||||
def __init__(
|
||||
self,
|
||||
input_queue: asyncio.Queue[QueueFrame] | None = None,
|
||||
output_queue: asyncio.Queue[QueueFrame] | None = None,
|
||||
):
|
||||
self.logger = logging.getLogger("dailyai")
|
||||
self.input_queue: asyncio.Queue[QueueFrame] | None = input_queue
|
||||
self.output_queue: asyncio.Queue[QueueFrame] | None = output_queue
|
||||
|
||||
def stop(self):
|
||||
pass
|
||||
|
||||
async def run(self) -> None:
|
||||
if self.input_queue is None or self.output_queue is None:
|
||||
raise Exception("Input and output queues must be set before using the run method.")
|
||||
|
||||
while True:
|
||||
frame = await self.input_queue.get()
|
||||
self.logger.debug(f"{self.__class__.__name__} got frame:", frame.frame_type)
|
||||
if frame.frame_type == FrameType.END_STREAM:
|
||||
self.input_queue.task_done()
|
||||
await self.output_queue.put(QueueFrame(FrameType.END_STREAM, None))
|
||||
break
|
||||
|
||||
output_frame = await self.process_frame(frame)
|
||||
if output_frame:
|
||||
await self.output_queue.put(output_frame)
|
||||
self.input_queue.task_done()
|
||||
|
||||
@abstractmethod
|
||||
async def process_frame(self, frame) -> QueueFrame | None:
|
||||
pass
|
||||
|
||||
|
||||
class LLMService(AIService):
|
||||
# Generate a set of responses to a prompt. Yields a list of responses.
|
||||
@abstractmethod
|
||||
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
|
||||
pass
|
||||
# Adding a yield here lets the linter know what this method actually does
|
||||
yield ""
|
||||
|
||||
# Generate a responses to a prompt. Returns the response
|
||||
@abstractmethod
|
||||
@@ -26,6 +59,29 @@ class LLMService(AIService):
|
||||
) -> str or None:
|
||||
pass
|
||||
|
||||
async def run_llm_async_sentences(self, messages) -> AsyncGenerator[str, None]:
|
||||
current_text = ""
|
||||
async for text in self.run_llm_async(messages):
|
||||
current_text += text
|
||||
if re.match(r"^.*[.!?]$", text):
|
||||
yield current_text
|
||||
current_text = ""
|
||||
|
||||
if current_text:
|
||||
yield current_text
|
||||
|
||||
async def process_frame(self, frame:QueueFrame) -> QueueFrame | None:
|
||||
if not self.output_queue:
|
||||
raise Exception("Output queue must be set before using the run method.")
|
||||
|
||||
if frame.frame_type == FrameType.LLM_MESSAGE_FRAME:
|
||||
if type(frame.frame_data) != list:
|
||||
raise Exception("LLM service requires a dict for the data field")
|
||||
|
||||
messages: list[dict[str, str]] = frame.frame_data
|
||||
async for message in self.run_llm_async_sentences(messages):
|
||||
await self.output_queue.put(QueueFrame(FrameType.SENTENCE_FRAME, message))
|
||||
|
||||
|
||||
class TTSService(AIService):
|
||||
# Some TTS services require a specific sample rate. We default to 16k
|
||||
@@ -36,7 +92,20 @@ class TTSService(AIService):
|
||||
# be sent to the microphone device
|
||||
@abstractmethod
|
||||
async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]:
|
||||
pass
|
||||
# yield empty bytes here, so linting can infer what this method does
|
||||
yield bytes()
|
||||
|
||||
async def process_frame(self, frame:QueueFrame) -> QueueFrame | None:
|
||||
if not self.output_queue:
|
||||
raise Exception("Output queue must be set before using the run method.")
|
||||
|
||||
if frame.frame_type == FrameType.SENTENCE_FRAME:
|
||||
if type(frame.frame_data) != str:
|
||||
raise Exception("TTS service requires a string for the data field")
|
||||
|
||||
text = frame.frame_data
|
||||
async for audio in self.run_tts(text):
|
||||
await self.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio))
|
||||
|
||||
|
||||
class ImageGenService(AIService):
|
||||
|
||||
@@ -16,8 +16,8 @@ from PIL import Image
|
||||
from azure.cognitiveservices.speech import SpeechSynthesizer, SpeechConfig, ResultReason, CancellationReason
|
||||
|
||||
class AzureTTSService(TTSService):
|
||||
def __init__(self, speech_key=None, speech_region=None):
|
||||
super().__init__()
|
||||
def __init__(self, input_queue=None, output_queue=None, speech_key=None, speech_region=None):
|
||||
super().__init__(input_queue, output_queue)
|
||||
|
||||
speech_key = speech_key or os.getenv("AZURE_SPEECH_SERVICE_KEY")
|
||||
speech_region = speech_region or os.getenv("AZURE_SPEECH_SERVICE_REGION")
|
||||
@@ -48,8 +48,8 @@ class AzureTTSService(TTSService):
|
||||
self.logger.info("Error details: {}".format(cancellation_details.error_details))
|
||||
|
||||
class AzureLLMService(LLMService):
|
||||
def __init__(self, api_key=None, azure_endpoint=None, api_version=None, model=None):
|
||||
super().__init__()
|
||||
def __init__(self, input_queue=None, output_queue=None, api_key=None, azure_endpoint=None, api_version=None, model=None):
|
||||
super().__init__(input_queue, output_queue)
|
||||
api_key = api_key or os.getenv("AZURE_CHATGPT_KEY")
|
||||
|
||||
azure_endpoint = azure_endpoint or os.getenv("AZURE_CHATGPT_ENDPOINT")
|
||||
|
||||
@@ -7,7 +7,7 @@ import types
|
||||
from functools import partial
|
||||
from queue import Queue, Empty
|
||||
|
||||
from dailyai.output_queue import OutputQueueFrame, FrameType
|
||||
from dailyai.queue_frame import QueueFrame, FrameType
|
||||
|
||||
from threading import Thread, Event, Timer
|
||||
|
||||
@@ -45,9 +45,29 @@ class DailyTransportService(EventHandler):
|
||||
self.camera_height = 768
|
||||
self.camera_enabled = False
|
||||
|
||||
self.other_participant_has_joined = False
|
||||
|
||||
self.camera_thread = None
|
||||
self.frame_consumer_thread = None
|
||||
|
||||
self.transcription_settings = {
|
||||
"language": "en",
|
||||
"tier": "nova",
|
||||
"model": "2-conversationalai",
|
||||
"profanity_filter": True,
|
||||
"redact": False,
|
||||
"extra": {
|
||||
"endpointing": True,
|
||||
"punctuate": False,
|
||||
},
|
||||
}
|
||||
|
||||
# This queue is used to marshal frames from the async output queue to the sync output queue
|
||||
# We need this to maintain the asynchronous behavior of asyncio queues -- to give async functions
|
||||
# a chance to run while waiting for queue items -- but also to maintain thread safety for the
|
||||
# primary output queue.
|
||||
self.async_output_queue = asyncio.Queue()
|
||||
|
||||
self.logger: logging.Logger = logging.getLogger("dailyai")
|
||||
|
||||
self.event_handlers = {}
|
||||
@@ -162,27 +182,34 @@ class DailyTransportService(EventHandler):
|
||||
)
|
||||
|
||||
if self.token:
|
||||
self.client.start_transcription(
|
||||
{
|
||||
"language": "en",
|
||||
"tier": "nova",
|
||||
"model": "2-conversationalai",
|
||||
"profanity_filter": True,
|
||||
"redact": False,
|
||||
"extra": {
|
||||
"endpointing": True,
|
||||
"punctuate": False,
|
||||
},
|
||||
}
|
||||
)
|
||||
self.transcription_queue = asyncio.Queue()
|
||||
self.client.start_transcription(self.transcription_settings)
|
||||
|
||||
self.my_participant_id = self.client.participants()["local"]["id"]
|
||||
|
||||
async def get_transcriptions(self):
|
||||
while True:
|
||||
transcript = await self.transcription_queue.get()
|
||||
yield transcript
|
||||
|
||||
def get_async_output_queue(self):
|
||||
return self.async_output_queue
|
||||
|
||||
async def marshal_frames(self):
|
||||
while True:
|
||||
frame = await self.async_output_queue.get()
|
||||
self.output_queue.put(frame)
|
||||
self.async_output_queue.task_done()
|
||||
if frame.frame_type == FrameType.END_STREAM:
|
||||
break
|
||||
|
||||
async def run(self) -> None:
|
||||
self.configure_daily()
|
||||
|
||||
self.participant_left = False
|
||||
|
||||
async_output_queue_marshal_task = asyncio.create_task(self.marshal_frames())
|
||||
|
||||
try:
|
||||
participant_count: int = len(self.client.participants())
|
||||
self.logger.info(f"{participant_count} participants in room")
|
||||
@@ -194,15 +221,21 @@ class DailyTransportService(EventHandler):
|
||||
self.client.leave()
|
||||
|
||||
self.stop_threads.set()
|
||||
|
||||
await self.async_output_queue.put(QueueFrame(FrameType.END_STREAM, None))
|
||||
await async_output_queue_marshal_task
|
||||
|
||||
if self.camera_thread and self.camera_thread.is_alive():
|
||||
self.camera_thread.join()
|
||||
if self.frame_consumer_thread and self.frame_consumer_thread.is_alive():
|
||||
self.output_queue.put(OutputQueueFrame(FrameType.END_STREAM, None))
|
||||
self.frame_consumer_thread.join()
|
||||
|
||||
def stop(self):
|
||||
self.stop_threads.set()
|
||||
|
||||
def on_first_other_participant_joined(self):
|
||||
pass
|
||||
|
||||
def call_joined(self, join_data, client_error):
|
||||
self.logger.info(f"Call_joined: {join_data}, {client_error}")
|
||||
|
||||
@@ -213,7 +246,9 @@ class DailyTransportService(EventHandler):
|
||||
pass
|
||||
|
||||
def on_participant_joined(self, participant):
|
||||
pass
|
||||
if not self.other_participant_has_joined and participant["id"] != self.my_participant_id:
|
||||
self.other_participant_has_joined = True
|
||||
self.on_first_other_participant_joined()
|
||||
|
||||
def on_participant_left(self, participant, reason):
|
||||
if len(self.client.participants()) < 2:
|
||||
@@ -224,7 +259,10 @@ class DailyTransportService(EventHandler):
|
||||
pass
|
||||
|
||||
def on_transcription_message(self, message):
|
||||
pass
|
||||
print("got transcription", message)
|
||||
if self.loop:
|
||||
asyncio.run_coroutine_threadsafe(self.transcription_queue.put(message["text"]), self.loop)
|
||||
print("put transcription in queue", message)
|
||||
|
||||
def on_transcription_stopped(self, stopped_by, stopped_by_error):
|
||||
pass
|
||||
@@ -255,11 +293,11 @@ class DailyTransportService(EventHandler):
|
||||
all_audio_frames = bytearray()
|
||||
while True:
|
||||
try:
|
||||
frames_or_frame: OutputQueueFrame | list[OutputQueueFrame] = self.output_queue.get()
|
||||
if type(frames_or_frame) == OutputQueueFrame:
|
||||
frames: list[OutputQueueFrame] = [frames_or_frame]
|
||||
frames_or_frame: QueueFrame | list[QueueFrame] = self.output_queue.get()
|
||||
if type(frames_or_frame) == QueueFrame:
|
||||
frames: list[QueueFrame] = [frames_or_frame]
|
||||
elif type(frames_or_frame) == list:
|
||||
frames: list[OutputQueueFrame] = frames_or_frame
|
||||
frames: list[QueueFrame] = frames_or_frame
|
||||
else:
|
||||
raise Exception("Unknown type in output queue")
|
||||
|
||||
|
||||
@@ -9,11 +9,11 @@ from dailyai.services.ai_services import TTSService
|
||||
|
||||
|
||||
class ElevenLabsTTSService(TTSService):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
def __init__(self, input_queue=None, output_queue=None, api_key=None, voice_id=None):
|
||||
super().__init__(input_queue, output_queue)
|
||||
|
||||
self.api_key = os.getenv("ELEVENLABS_API_KEY")
|
||||
self.voice_id = os.getenv("ELEVENLABS_VOICE_ID")
|
||||
self.api_key = api_key or os.getenv("ELEVENLABS_API_KEY")
|
||||
self.voice_id = voice_id or os.getenv("ELEVENLABS_VOICE_ID")
|
||||
|
||||
async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
|
||||
@@ -66,9 +66,14 @@ class OpenAIImageGenService(ImageGenService):
|
||||
size=size
|
||||
)
|
||||
image_url = image.data[0].url
|
||||
response = requests.get(image_url)
|
||||
if not image_url:
|
||||
raise Exception("No image provided in response", image)
|
||||
|
||||
dalle_stream = io.BytesIO(response.content)
|
||||
dalle_im = Image.open(dalle_stream)
|
||||
# Load the image from the url
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(image_url) as response:
|
||||
image_stream = io.BytesIO(await response.content.read())
|
||||
image = Image.open(image_stream)
|
||||
return (image_url, image.tobytes())
|
||||
|
||||
return (image_url, dalle_im.tobytes())
|
||||
|
||||
@@ -11,7 +11,7 @@ from dailyai.async_processor.async_processor import (
|
||||
LLMResponse,
|
||||
)
|
||||
from dailyai.message_handler.message_handler import MessageHandler
|
||||
from dailyai.output_queue import OutputQueueFrame, FrameType
|
||||
from dailyai.queue_frame import QueueFrame, FrameType
|
||||
from dailyai.services.ai_services import (
|
||||
AIServiceConfig,
|
||||
ImageGenService,
|
||||
@@ -71,7 +71,7 @@ class TestResponse(unittest.TestCase):
|
||||
output_queue.task_done()
|
||||
|
||||
while expected_words:
|
||||
actual_word:OutputQueueFrame = output_queue.get()
|
||||
actual_word:QueueFrame = output_queue.get()
|
||||
word = expected_words.pop(0)
|
||||
self.assertEqual(actual_word.frame_type, FrameType.AUDIO_FRAME)
|
||||
self.assertEqual(actual_word.frame_data, bytes(word, "utf-8"))
|
||||
@@ -127,7 +127,7 @@ class TestResponse(unittest.TestCase):
|
||||
expected_words = ["Hello", "there.", "How", "are", "you?", "I", "hope", "you", "are", "well."]
|
||||
while expected_words and not stop_processing_output_queue.is_set():
|
||||
try:
|
||||
actual_word:OutputQueueFrame = output_queue.get_nowait()
|
||||
actual_word:QueueFrame = output_queue.get_nowait()
|
||||
if actual_word.frame_type == FrameType.AUDIO_FRAME:
|
||||
time.sleep(0.1)
|
||||
word = expected_words.pop(0)
|
||||
|
||||
@@ -15,7 +15,7 @@ from dailyai.async_processor.async_processor import (
|
||||
OrchestratorResponse
|
||||
)
|
||||
from dailyai.orchestrator import OrchestratorConfig, Orchestrator
|
||||
from dailyai.output_queue import OutputQueueFrame, FrameType
|
||||
from dailyai.queue_frame import QueueFrame, FrameType
|
||||
from dailyai.message_handler.message_handler import MessageHandler
|
||||
from dailyai.services.ai_services import AIServiceConfig
|
||||
from dailyai.services.azure_ai_services import AzureImageGenService, AzureTTSService, AzureLLMService
|
||||
@@ -40,7 +40,7 @@ class StaticSpriteResponse(OrchestratorResponse):
|
||||
self.image_bytes = img.tobytes()
|
||||
|
||||
def do_play(self) -> None:
|
||||
self.output_queue.put(OutputQueueFrame(FrameType.IMAGE_FRAME, self.image_bytes))
|
||||
self.output_queue.put(QueueFrame(FrameType.IMAGE_FRAME, self.image_bytes))
|
||||
|
||||
|
||||
class IntroSpriteResponse(StaticSpriteResponse):
|
||||
@@ -71,10 +71,10 @@ class AnimatedSpriteLLMResponse(LLMResponse):
|
||||
with Image.open(full_path) as img:
|
||||
self.image_bytes.append(img.tobytes())
|
||||
|
||||
def get_frames_from_tts_response(self, audio_frame) -> list[OutputQueueFrame]:
|
||||
def get_frames_from_tts_response(self, audio_frame) -> list[QueueFrame]:
|
||||
return [
|
||||
OutputQueueFrame(FrameType.AUDIO_FRAME, audio_frame),
|
||||
OutputQueueFrame(FrameType.IMAGE_FRAME, random.choice(self.image_bytes))
|
||||
QueueFrame(FrameType.AUDIO_FRAME, audio_frame),
|
||||
QueueFrame(FrameType.IMAGE_FRAME, random.choice(self.image_bytes))
|
||||
]
|
||||
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@ import argparse
|
||||
import asyncio
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from dailyai.output_queue import OutputQueueFrame, FrameType
|
||||
from dailyai.queue_frame import QueueFrame, FrameType
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureTTSService
|
||||
|
||||
@@ -37,7 +37,7 @@ async def main(room_url):
|
||||
if participant["info"]["isLocal"]:
|
||||
return
|
||||
async for audio in audio_generator:
|
||||
transport.output_queue.put(OutputQueueFrame(FrameType.AUDIO_FRAME, audio))
|
||||
transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio))
|
||||
|
||||
# wait for the output queue to be empty, then leave the meeting
|
||||
transport.output_queue.join()
|
||||
|
||||
@@ -2,7 +2,7 @@ import asyncio
|
||||
import time
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from dailyai.output_queue import OutputQueueFrame, FrameType
|
||||
from dailyai.queue_frame import QueueFrame, FrameType
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureTTSService
|
||||
from dailyai.services.deepgram_ai_services import DeepgramTTSService
|
||||
@@ -41,13 +41,13 @@ async def main(room_url):
|
||||
audio_generator: AsyncGenerator[bytes, None] = tts.run_tts(f"Hello there, {participant['info']['userName']}!")
|
||||
|
||||
async for audio in audio_generator:
|
||||
transport.output_queue.put(OutputQueueFrame(FrameType.AUDIO_FRAME, audio))
|
||||
transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio))
|
||||
|
||||
print("setting up call state handler")
|
||||
@transport.event_handler("on_call_state_updated")
|
||||
async def on_call_joined(transport, state):
|
||||
print(f"call state callback: {state}")
|
||||
|
||||
|
||||
await transport.run()
|
||||
|
||||
|
||||
|
||||
@@ -1,16 +1,12 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
import re
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from dailyai.output_queue import OutputQueueFrame, FrameType
|
||||
from dailyai.queue_frame import QueueFrame, FrameType
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
|
||||
local_joined = False
|
||||
participant_joined = False
|
||||
|
||||
async def main(room_url):
|
||||
meeting_duration_minutes = 1
|
||||
transport = DailyTransportService(
|
||||
@@ -21,27 +17,26 @@ async def main(room_url):
|
||||
)
|
||||
transport.mic_enabled = True
|
||||
|
||||
tts = ElevenLabsTTSService()
|
||||
llm = AzureLLMService()
|
||||
text_to_llm_queue = asyncio.Queue()
|
||||
llm_to_tts_queue = asyncio.Queue()
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
llm_to_tts_queue, transport.get_async_output_queue(), voice_id="29vD33N1CtxCmqQRPOHJ"
|
||||
)
|
||||
llm = AzureLLMService(text_to_llm_queue, llm_to_tts_queue)
|
||||
|
||||
messages = [{
|
||||
"role": "system",
|
||||
"content": "You are an LLM in a WebRTC session, and your text will be converted to audio. Introduce yourself."
|
||||
"content": "You are an LLM in a WebRTC session, and this is a 'hello world' demo. Say hello to the world."
|
||||
}]
|
||||
llm_generator: AsyncGenerator[str, None] = llm.run_llm_async(messages)
|
||||
await text_to_llm_queue.put(QueueFrame(FrameType.LLM_MESSAGE_FRAME, messages))
|
||||
await text_to_llm_queue.put(QueueFrame(FrameType.END_STREAM, None))
|
||||
|
||||
@transport.event_handler("on_participant_joined")
|
||||
async def on_participant_joined(transport, participant):
|
||||
if participant["id"] == transport.my_participant_id:
|
||||
return
|
||||
llm_task = asyncio.create_task(llm.run())
|
||||
|
||||
current_text = ""
|
||||
async for text in llm_generator:
|
||||
current_text += text
|
||||
if re.match(r"^.*[.!?]$", text):
|
||||
async for audio in tts.run_tts(current_text):
|
||||
transport.output_queue.put(OutputQueueFrame(FrameType.AUDIO_FRAME, audio))
|
||||
current_text = ""
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
await asyncio.gather(llm_task, tts.run())
|
||||
|
||||
# wait for the output queue to be empty, then leave the meeting
|
||||
transport.output_queue.join()
|
||||
@@ -56,6 +51,5 @@ if __name__ == "__main__":
|
||||
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
|
||||
)
|
||||
|
||||
args: argparse.Namespace = parser.parse_args()
|
||||
|
||||
args, unknown = parser.parse_known_args()
|
||||
asyncio.run(main(args.url))
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
|
||||
from dailyai.output_queue import OutputQueueFrame, FrameType
|
||||
from dailyai.queue_frame import QueueFrame, FrameType
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.open_ai_services import OpenAIImageGenService
|
||||
|
||||
@@ -27,7 +27,7 @@ async def main(room_url):
|
||||
@transport.event_handler("on_participant_joined")
|
||||
async def on_participant_joined(transport, participant):
|
||||
(_, image_bytes) = await image_task
|
||||
transport.output_queue.put(OutputQueueFrame(FrameType.IMAGE_FRAME, image_bytes))
|
||||
transport.output_queue.put(QueueFrame(FrameType.IMAGE_FRAME, image_bytes))
|
||||
|
||||
await transport.run()
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ import re
|
||||
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.output_queue import OutputQueueFrame, FrameType
|
||||
from dailyai.queue_frame import QueueFrame, FrameType
|
||||
|
||||
async def main(room_url:str):
|
||||
global transport
|
||||
@@ -32,16 +32,20 @@ async def main(room_url:str):
|
||||
# queue two pieces of speech: one specified as a text literal,
|
||||
# and one generated by an llm. We'll kick off the llm first, and let
|
||||
# it generate a response while we're speaking the literal string.
|
||||
#
|
||||
# Note that in this case, we don't use `run_llm_async` because we're
|
||||
# taking advantage of the time spent speaking the first phrase to generate
|
||||
# the entire LLM response, and this happens asynchronously in a task.
|
||||
llm_response_task = asyncio.create_task(llm.run_llm(
|
||||
[{"role": "system", "content": "tell the user a joke about llamas"}]
|
||||
))
|
||||
|
||||
async for audio_chunk in tts.run_tts("My friend the LLM is now going to tell a joke about llamas."):
|
||||
transport.output_queue.put(OutputQueueFrame(FrameType.AUDIO_FRAME, audio_chunk))
|
||||
transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio_chunk))
|
||||
|
||||
llm_response = await llm_response_task
|
||||
async for audio_chunk in tts.run_tts(llm_response):
|
||||
transport.output_queue.put(OutputQueueFrame(FrameType.AUDIO_FRAME, audio_chunk))
|
||||
transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio_chunk))
|
||||
|
||||
|
||||
# wait for the output queue to be empty, then leave the meeting
|
||||
|
||||
@@ -4,7 +4,7 @@ import asyncio
|
||||
from asyncio.queues import Queue
|
||||
import re
|
||||
|
||||
from dailyai.output_queue import OutputQueueFrame, FrameType
|
||||
from dailyai.queue_frame import QueueFrame, FrameType
|
||||
from dailyai.services.azure_ai_services import AzureLLMService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from dailyai.services.open_ai_services import OpenAIImageGenService
|
||||
@@ -25,7 +25,7 @@ async def main(room_url):
|
||||
transport.camera_height = 1024
|
||||
|
||||
llm = AzureLLMService()
|
||||
tts = ElevenLabsTTSService()
|
||||
tts = ElevenLabsTTSService(voice_id="ErXwobaYiN019PkySvjV")
|
||||
dalle = OpenAIImageGenService()
|
||||
|
||||
# Get a complete audio chunk from the given text. Splitting this into its own
|
||||
@@ -39,9 +39,9 @@ async def main(room_url):
|
||||
|
||||
async def get_month_data(month):
|
||||
image_text = ""
|
||||
current_clause = ""
|
||||
tts_tasks = []
|
||||
async for text in llm.run_llm_async(
|
||||
first_sentence = True
|
||||
async for sentence in llm.run_llm_async_sentences(
|
||||
[
|
||||
{
|
||||
"role": "system",
|
||||
@@ -49,18 +49,24 @@ async def main(room_url):
|
||||
}
|
||||
]
|
||||
):
|
||||
image_text += text
|
||||
current_clause += text
|
||||
if re.match(r"^.*[.!?]$", text):
|
||||
tts_tasks.append(get_all_audio(current_clause))
|
||||
current_clause = ""
|
||||
image_text += sentence
|
||||
|
||||
if first_sentence:
|
||||
sentence = f"{month}: {sentence}"
|
||||
else:
|
||||
first_sentence = False
|
||||
|
||||
tts_tasks.append(get_all_audio(sentence))
|
||||
|
||||
tts_tasks.insert(0, dalle.run_image_gen(image_text, "1024x1024"))
|
||||
|
||||
print(f"waiting for tasks to finish for {month}")
|
||||
data = await asyncio.gather(
|
||||
*tts_tasks
|
||||
)
|
||||
|
||||
print(f"done gathering tts tasks for {month}")
|
||||
|
||||
return {
|
||||
"month": month,
|
||||
"text": image_text,
|
||||
@@ -83,11 +89,8 @@ async def main(room_url):
|
||||
"December",
|
||||
]
|
||||
|
||||
@transport.event_handler("on_participant_joined")
|
||||
async def on_participant_joined(transport, participant):
|
||||
if participant["id"] == transport.my_participant_id:
|
||||
return
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
# This will play the months in the order they're completed. The benefit
|
||||
# is we'll have as little delay as possible before the first month, and
|
||||
# likely no delay between months, but the months won't display in order.
|
||||
@@ -95,12 +98,12 @@ async def main(room_url):
|
||||
data = await month_data_task
|
||||
transport.output_queue.put(
|
||||
[
|
||||
OutputQueueFrame(FrameType.IMAGE_FRAME, data["image"]),
|
||||
OutputQueueFrame(FrameType.AUDIO_FRAME, data["audio"][0]),
|
||||
QueueFrame(FrameType.IMAGE_FRAME, data["image"]),
|
||||
QueueFrame(FrameType.AUDIO_FRAME, data["audio"][0]),
|
||||
]
|
||||
)
|
||||
for audio in data["audio"][1:]:
|
||||
transport.output_queue.put(OutputQueueFrame(FrameType.AUDIO_FRAME, audio))
|
||||
transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio))
|
||||
|
||||
# wait for the output queue to be empty, then leave the meeting
|
||||
transport.output_queue.join()
|
||||
@@ -116,6 +119,6 @@ if __name__=="__main__":
|
||||
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
|
||||
)
|
||||
|
||||
args: argparse.Namespace = parser.parse_args()
|
||||
args, unknown = parser.parse_known_args()
|
||||
|
||||
asyncio.run(main(args.url))
|
||||
|
||||
@@ -5,7 +5,7 @@ import asyncio
|
||||
from asyncio.queues import Queue
|
||||
import re
|
||||
|
||||
from dailyai.output_queue import OutputQueueFrame, FrameType
|
||||
from dailyai.queue_frame import QueueFrame, FrameType
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from dailyai.services.open_ai_services import OpenAILLMService, OpenAIImageGenService
|
||||
@@ -97,12 +97,12 @@ async def main(room_url):
|
||||
data = await month_data_task
|
||||
transport.output_queue.put(
|
||||
[
|
||||
OutputQueueFrame(FrameType.IMAGE_FRAME, data["image"]),
|
||||
OutputQueueFrame(FrameType.AUDIO_FRAME, data["audio"][0]),
|
||||
QueueFrame(FrameType.IMAGE_FRAME, data["image"]),
|
||||
QueueFrame(FrameType.AUDIO_FRAME, data["audio"][0]),
|
||||
]
|
||||
)
|
||||
for audio in data["audio"][1:]:
|
||||
transport.output_queue.put(OutputQueueFrame(FrameType.AUDIO_FRAME, audio))
|
||||
transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio))
|
||||
|
||||
# wait for the output queue to be empty, then leave the meeting
|
||||
transport.output_queue.join()
|
||||
|
||||
@@ -6,7 +6,7 @@ import urllib.parse
|
||||
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.output_queue import OutputQueueFrame, FrameType
|
||||
from dailyai.queue_frame import QueueFrame, FrameType
|
||||
|
||||
async def main(room_url:str, token):
|
||||
global transport
|
||||
@@ -26,28 +26,28 @@ async def main(room_url:str, token):
|
||||
llm = AzureLLMService()
|
||||
tts = AzureTTSService()
|
||||
|
||||
transcribed_message = ""
|
||||
transcription_timeout = None
|
||||
async def handle_transcriptions():
|
||||
messages = [
|
||||
{"role": "system", "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way."},
|
||||
]
|
||||
|
||||
@transport.event_handler("on_participant_joined")
|
||||
async def on_joined(transport, participant):
|
||||
if participant["id"] == transport.my_participant_id:
|
||||
return
|
||||
sentence = ""
|
||||
async for message in transport.get_transcriptions():
|
||||
sentence += message
|
||||
if sentence.endswith((".", "?", "!")):
|
||||
messages.append({"role": "user", "content": sentence})
|
||||
sentence = ''
|
||||
|
||||
async for audio_chunk in tts.run_tts("If you say something, I will respond."):
|
||||
transport.output_queue.put(OutputQueueFrame(FrameType.AUDIO_FRAME, audio_chunk))
|
||||
full_response = ""
|
||||
async for response in llm.run_llm_async_sentences(messages):
|
||||
full_response += response
|
||||
async for audio in tts.run_tts(response):
|
||||
transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio))
|
||||
|
||||
@transport.event_handler("on_transcription_message")
|
||||
async def on_transcription_message(transport, message) -> None:
|
||||
nonlocal transcribed_message
|
||||
nonlocal transcription_timeout
|
||||
print(message)
|
||||
if message["session_id"] != transport.my_participant_id:
|
||||
transcribed_message += message['text']
|
||||
messages.append({"role": "assistant", "content": full_response})
|
||||
|
||||
print("message received", transcribed_message)
|
||||
|
||||
await transport.run()
|
||||
transport.transcription_settings["extra"]["punctuate"] = True
|
||||
await asyncio.gather(transport.run(), handle_transcriptions())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user