Merge branch 'main' into cb/09-bots-arguing

This commit is contained in:
Chad Bailey
2024-01-23 20:33:57 +00:00
17 changed files with 337 additions and 301 deletions

1
.gitignore vendored
View File

@@ -2,6 +2,7 @@
env/
__pycache__/
*~
venv
#*#
# Distribution / packaging

View File

@@ -53,3 +53,88 @@ If you have those environment variables stored in an .env file, you can quickly
```bash
export $(grep -v '^#' .env | xargs)
```
## Overview
The Daily AI SDK allows you to build applications that can participate in WebRTC sessions and interact with AI Services. Some examples of what you can build with this:
* conversational bots that interact 1:1 with a user, using voice recognition and text-to-speech
* assistant bots that aggregate transcriptions from multiple participants in a meeting and provide realtime summaries or other AI-generated output.
* image-recognition bots
* etc
## Concepts
### Transport Service
The SDK provides one “transport service”, which is a wrapper around Dailys `daily-python` client (tk add link). You can use this service to listen for events related to a WebRTC session, such as “a participant joined the meeting”.
The transport service also exposes a send queue, and a receive queue. You can use the send queue to send audio and video to the WebRTC session, and you can listen to the receive queue to see audio, video and transcription data from the WebRTC session.
### AI Services
The AI Service classes provide wrappers around various AI providers, and allow you to query LLMs, convert text to speech and make images from text. The audio and images can then be placed on the transport services send queue, where theyll be sent to the WebRTC session.
### Queue Frames
Communication between the transport service and AI services, and between various AI services, takes place in Queue Frames. These frames contain an indication of the type of data as well as the data itself.
## Using Transports, AI Services and Frames
AI Services all define a `.run` method. This method consumes and generates `QueueFrame` frames. The kind of frames that can be consumed and generated depend on the kind of service. For instance, an LLM AI Service consumes `LLM_MESSAGE` frames (which define a history of interaction with an LLM) and emit `TEXT` frames (the response from the LLM).
The `.run` method is an `AsyncIterable`, and it takes an `iterable`, `AsyncIterable` or `asyncio.Queue` that produces QueueFrames as a parameter. This makes it easy to chain AI Services, and consume input from the Transports `receive_queue` .
AI Services also have a `.run_to_queue` method. This method is not an AsyncIterable, but instead sends processed QueueFrames to a queue. This makes it easy to send the output of an AI Service to the Transports `send_queue`.
AI Services also define convenience functions that let you bypass creating QueueFrames for some simple cases (eg. using the TTS service to convert a string to audio output and send that audio to the transports `send_queue`). See below for examples.
## Examples
### Say Something
The base TTS AI service exposes a `.say` method. After creating a transport and TTS service, you can use this method like so:
```
transport = DailyTransportService(...)
tts = AzureTTSService()
await tts.say("hello world", transport.send_queue)
```
This will call the TTS service to render the text to audio frames, then put the audio frames on the transports send queue. The transport will then send those frames along to the WebRTC session.
### Speak an LLM response
Given a system prompt contained in a `messages` array, you can emit the LLMs response as audio with a chain like this:
```
transport = DailyTransportService(...) # setup parameters omitted
tts = AzureTTSService()
llm = AzureLLMService()
messages = [...] # system prompt omitted for brevity
await tts.run_to_queue(
transport.send_queue,
llm.run([QueueFrame.LLM_MESSAGES, messages])
)
```
In this code, the LLM service object sends the messages to Azures OpenAI implementation, which streams chunks back asynchronously. Those chunks are aggregated by the TTS Service to ensure the best audio response (TTS works best when it gets complete sentence, so it can inflect correctly), then sent to Azures TTS service, converted to audio frames, and sent to the WebRTC session via the Daily transport.
### Pre-cache an LLM response
Sometimes LLMs can be slower than wed like for natural-feeling communication. Heres an example where we take advantage of the time it takes to speak some pre-defined text to get a head start on the LLM response:
(TK link to 04- sample)
In this sample, we set up a buffer queue to receive the audio frames from the LLM response before while we are joining the call and start an asynchronous task to start filling this buffer:
```
buffer_queue = asyncio.Queue()
llm_response_task = asyncio.create_task(
elevenlabs_tts.run_to_queue(
buffer_queue,
llm.run([QueueFrame(FrameType.LLM_MESSAGE, messages)]),
True,
)
)
```
Then, when weve joined the call, we speak the static text:
```
await azure_tts.say("My friend...", transport.send_queue)
```
As that text is being spoken, the asynchronous LLM task continues in the background. When the text is done, we pull the frames off the buffer queue and put them in the transports `send_queue`:
```
async def buffer_to_send_queue():
while True:
frame = await buffer_queue.get()
await transport.send_queue.put(frame)
buffer_queue.task_done()
if frame.frame_type == FrameType.END_STREAM:
break
await asyncio.gather(llm_response_task, buffer_to_send_queue())
```
One thing to note here is the last parameter to `run_to_queue` in the first code clause above: this causes the `run_to_queue` method to send an `END_STREAM` frame when its done rendering. This lets us know when to stop our `buffer_to_send_queue` task above.

View File

@@ -0,0 +1,47 @@
import asyncio
from dailyai.queue_frame import LLMMessagesQueueFrame, QueueFrame, TextQueueFrame
from dailyai.services.ai_services import AIService
from typing import AsyncGenerator, List
class QueueTee:
async def run_to_queue_and_generate(
self,
output_queue: asyncio.Queue,
generator: AsyncGenerator[QueueFrame, None]
) -> AsyncGenerator[QueueFrame, None]:
async for frame in generator:
await output_queue.put(frame)
yield frame
async def run_to_queues(
self,
output_queues: List[asyncio.Queue],
generator: AsyncGenerator[QueueFrame, None]
):
async for frame in generator:
for queue in output_queues:
await queue.put(frame)
class LLMContextAggregator(AIService):
def __init__(self, messages: list[dict], role:str, bot_participant_id=None):
self.messages = messages
self.bot_participant_id = bot_participant_id
self.role = role
self.sentence = ""
async def process_frame(self, frame:QueueFrame) -> AsyncGenerator[QueueFrame, None]:
content: str = ""
# TODO: split up transcription by participant
if isinstance(frame, TextQueueFrame):
content = frame.text
self.sentence += content
if self.sentence.endswith((".", "?", "!")):
self.messages.append({"role": self.role, "content": self.sentence})
self.sentence = ""
yield LLMMessagesQueueFrame(self.messages)
yield frame

View File

@@ -1,19 +1,38 @@
from enum import Enum
from dataclasses import dataclass
from typing import Any
class FrameType(Enum):
START_STREAM = 0
END_STREAM = 1
AUDIO = 2
IMAGE = 3
SENTENCE = 4
TEXT_CHUNK = 5
LLM_MESSAGE = 6
APP_MESSAGE = 7
IMAGE_DESCRIPTION = 8
TRANSCRIPTION = 9
@dataclass(frozen=True)
class QueueFrame:
frame_type: FrameType
frame_data: str | dict | bytes | list | None
pass
class StartStreamQueueFrame(QueueFrame):
pass
class EndStreamQueueFrame(QueueFrame):
pass
@dataclass()
class AudioQueueFrame(QueueFrame):
data: bytes
@dataclass()
class ImageQueueFrame(QueueFrame):
url: str | None
image: bytes
@dataclass()
class TextQueueFrame(QueueFrame):
text: str
@dataclass()
class TranscriptionQueueFrame(TextQueueFrame):
participantId: str
timestamp: str
@dataclass()
class LLMMessagesQueueFrame(QueueFrame):
messages: list[dict[str,str]] # TODO: define this more concretely!
class AppMessageQueueFrame(QueueFrame):
message: Any
participantId: str

View File

@@ -1,17 +1,19 @@
import asyncio
import logging
import re
from httpx import request
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.queue_frame import (
AudioQueueFrame,
EndStreamQueueFrame,
ImageQueueFrame,
LLMMessagesQueueFrame,
QueueFrame,
TextQueueFrame,
)
from abc import abstractmethod
from typing import AsyncGenerator, Iterable
from typing import AsyncGenerator, AsyncIterable, Iterable
from dataclasses import dataclass
from typing import AsyncGenerator
from collections.abc import Iterable, AsyncIterable
class AIService:
@@ -21,95 +23,57 @@ class AIService:
def stop(self):
pass
def allowed_input_frame_types(self) -> set[FrameType]:
return set()
def possible_output_frame_types(self) -> set[FrameType]:
return set()
async def run_to_queue(self, queue: asyncio.Queue, frames, add_end_of_stream=False) -> None:
async for frame in self.run(frames):
await queue.put(frame)
if add_end_of_stream:
await queue.put(QueueFrame(FrameType.END_STREAM, None))
await queue.put(EndStreamQueueFrame())
async def run(
self,
frames: Iterable[QueueFrame]
| AsyncIterable[QueueFrame]
| asyncio.Queue[QueueFrame],
requested_frame_types: set[FrameType] | None=None,
) -> AsyncGenerator[QueueFrame, None]:
if requested_frame_types and self.possible_output_frame_types().intersection(requested_frame_types) == set():
raise Exception(f"Requested frame types {requested_frame_types} are not supported by this service.")
try:
if isinstance(frames, AsyncIterable):
async for frame in frames:
async for output_frame in self.process_frame(frame):
yield output_frame
elif isinstance(frames, Iterable):
for frame in frames:
async for output_frame in self.process_frame(frame):
yield output_frame
elif isinstance(frames, asyncio.Queue):
while True:
frame = await frames.get()
async for output_frame in self.process_frame(frame):
yield output_frame
if isinstance(frame, EndStreamQueueFrame):
break
else:
raise Exception("Frames must be an iterable or async iterable")
if not requested_frame_types:
requested_frame_types = self.possible_output_frame_types()
if isinstance(frames, AsyncIterable):
async for frame in frames:
async for output_frame in self.process_frame(requested_frame_types, frame):
yield output_frame
elif isinstance(frames, Iterable):
for frame in frames:
async for output_frame in self.process_frame(requested_frame_types, frame):
yield output_frame
elif isinstance(frames, asyncio.Queue):
while True:
frame = await frames.get()
async for output_frame in self.process_frame(requested_frame_types, frame):
yield output_frame
if frame.frame_type == FrameType.END_STREAM:
break
else:
raise Exception("Frames must be an iterable or async iterable")
async for output_frame in self.finalize():
yield output_frame
except Exception as e:
self.logger.error("Exception occurred while running AI service", e)
raise e
@abstractmethod
async def process_frame(self, requested_frame_types:set[FrameType], frame:QueueFrame) -> AsyncGenerator[QueueFrame, None]:
# Yield something so the linter can deduce what should happen here.
yield QueueFrame(FrameType.END_STREAM, None)
class SentenceAggregator(AIService):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.current_sentence = ""
def allowed_input_frame_types(self) -> set[FrameType]:
return set([FrameType.TEXT_CHUNK, FrameType.SENTENCE])
def possible_output_frame_types(self) -> set[FrameType]:
return set([FrameType.SENTENCE])
async def process_frame(self, requested_frame_types: set[FrameType], frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if not FrameType.SENTENCE in requested_frame_types:
return
if frame.frame_type == FrameType.TEXT_CHUNK:
if type(frame.frame_data) != str:
raise Exception(
"Sentence aggregator requires a string for the data field"
)
self.current_sentence += frame.frame_data
if self.current_sentence.endswith((".", "?", "!")):
sentence = self.current_sentence
self.current_sentence = ""
yield QueueFrame(FrameType.SENTENCE, sentence)
elif frame.frame_type == FrameType.END_STREAM:
if self.current_sentence:
yield QueueFrame(FrameType.SENTENCE, self.current_sentence)
elif frame.frame_type == FrameType.SENTENCE:
yield frame
async def process_frame(self, frame:QueueFrame) -> AsyncGenerator[QueueFrame, None]:
# This is a trick for the interpreter (and linter) to know that this is a generator.
if False:
yield QueueFrame()
@abstractmethod
async def finalize(self) -> AsyncGenerator[QueueFrame, None]:
# This is a trick for the interpreter (and linter) to know that this is a generator.
if False:
yield QueueFrame()
class LLMService(AIService):
def allowed_input_frame_types(self) -> set[FrameType]:
return set([FrameType.LLM_MESSAGE, FrameType.SENTENCE, FrameType.TRANSCRIPTION])
def allowed_output_frame_types(self) -> set[FrameType]:
return set([FrameType.SENTENCE, FrameType.TEXT_CHUNK])
@abstractmethod
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
yield ""
@@ -118,52 +82,55 @@ class LLMService(AIService):
async def run_llm(self, messages) -> str:
pass
async def process_frame(self, requested_frame_types: set[FrameType], frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if frame.frame_type == FrameType.LLM_MESSAGE:
if type(frame.frame_data) != list:
raise Exception("LLM service requires a dict for the data field")
messages: list[dict[str, str]] = frame.frame_data
if FrameType.SENTENCE in requested_frame_types:
yield QueueFrame(FrameType.SENTENCE, await self.run_llm(messages))
else:
async for text_chunk in self.run_llm_async(messages):
yield QueueFrame(FrameType.TEXT_CHUNK, text_chunk)
# TODO: handle other frame types! Need to aggregate into messages
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if isinstance(frame, LLMMessagesQueueFrame):
async for text_chunk in self.run_llm_async(frame.messages):
yield TextQueueFrame(text_chunk)
class TTSService(AIService):
def __init__(self, aggregate_sentences=True):
super().__init__()
self.aggregate_sentences: bool = aggregate_sentences
self.current_sentence: str = ""
# Some TTS services require a specific sample rate. We default to 16k
def get_mic_sample_rate(self):
return 16000
def allowed_input_frame_types(self) -> set[FrameType]:
return set([FrameType.SENTENCE, FrameType.TRANSCRIPTION, FrameType.TEXT_CHUNK])
def possible_output_frame_types(self) -> set[FrameType]:
return set([FrameType.AUDIO])
# Converts the sentence to audio. Yields a list of audio frames that can
# Converts the text to audio. Yields a list of audio frames that can
# be sent to the microphone device
@abstractmethod
async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]:
async def run_tts(self, text) -> AsyncGenerator[bytes, None]:
# yield empty bytes here, so linting can infer what this method does
yield bytes()
async def process_frame(self, requested_frame_types: set[FrameType], frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if not FrameType.AUDIO in requested_frame_types:
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if not isinstance(frame, TextQueueFrame):
yield frame
return
if type(frame.frame_data) != str:
raise Exception("TTS service requires a string for the data field")
text: str | None = None
if not self.aggregate_sentences:
text = frame.text
else:
self.current_sentence += frame.text
if self.current_sentence.endswith((".", "?", "!")):
text = self.current_sentence
self.current_sentence = ""
async for audio_chunk in self.run_tts(frame.frame_data):
yield QueueFrame(FrameType.AUDIO, audio_chunk)
if text:
async for audio_chunk in self.run_tts(text):
yield AudioQueueFrame(audio_chunk)
async def finalize(self):
if self.current_sentence:
async for audio_chunk in self.run_tts(self.current_sentence):
yield AudioQueueFrame(audio_chunk)
# Convenience function to send the audio for a sentence to the given queue
async def say(self, sentence, queue: asyncio.Queue):
await self.run_to_queue(queue, [QueueFrame(FrameType.SENTENCE, sentence)])
await self.run_to_queue(queue, [TextQueueFrame(sentence)])
class ImageGenService(AIService):
@@ -171,26 +138,17 @@ class ImageGenService(AIService):
super().__init__(**kwargs)
self.image_size = image_size
def allowed_input_frame_types(self) -> set[FrameType]:
return set([FrameType.SENTENCE, FrameType.TRANSCRIPTION, FrameType.TEXT_CHUNK, FrameType.IMAGE_DESCRIPTION])
def possible_output_frame_types(self) -> set[FrameType]:
return set([FrameType.IMAGE])
# Renders the image. Returns an Image object.
@abstractmethod
async def run_image_gen(self, sentence) -> tuple[str, bytes]:
async def run_image_gen(self, sentence:str) -> tuple[str, bytes]:
pass
async def process_frame(self, requested_frame_types: set[FrameType], frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if not FrameType.IMAGE in requested_frame_types:
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if not isinstance(frame, TextQueueFrame):
return
if type(frame.frame_data) != str:
raise Exception("Image service requires a string for the data field")
(_, image_data) = await self.run_image_gen(frame.frame_data)
yield QueueFrame(FrameType.IMAGE, image_data)
(url, image_data) = await self.run_image_gen(frame.text)
yield ImageQueueFrame(url, image_data)
@dataclass

View File

@@ -1,15 +1,23 @@
import asyncio
import inspect
import logging
import threading
import time
import types
from functools import partial
from queue import Queue, Empty
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.queue_frame import (
AudioQueueFrame,
EndStreamQueueFrame,
ImageQueueFrame,
QueueFrame,
StartStreamQueueFrame,
TranscriptionQueueFrame,
)
from threading import Thread, Event, Timer
from threading import Thread, Event
from daily import (
EventHandler,
@@ -21,12 +29,15 @@ from daily import (
)
class DailyTransportService(EventHandler):
_daily_initialized = False
_lock = threading.Lock()
def __init__(
self,
room_url: str,
token: str | None,
bot_name: str,
duration: float = 10,
min_others_count: int = 1,
):
super().__init__()
self.bot_name: str = bot_name
@@ -34,6 +45,7 @@ class DailyTransportService(EventHandler):
self.token: str | None = token
self.duration: float = duration
self.expiration = time.time() + duration * 60
self.min_others_count = min_others_count
# This queue is used to marshal frames from the async send queue to the thread that emits audio & video.
# We need this to maintain the asynchronous behavior of asyncio queues -- to give async functions
@@ -54,6 +66,7 @@ class DailyTransportService(EventHandler):
self.receive_queue = asyncio.Queue()
self.other_participant_has_joined = False
self.my_participant_id = None
self.camera_thread = None
self.frame_consumer_thread = None
@@ -114,7 +127,11 @@ class DailyTransportService(EventHandler):
return decorator
def configure_daily(self):
Daily.init()
# Only initialize Daily once
if not DailyTransportService._daily_initialized:
with DailyTransportService._lock:
Daily.init()
DailyTransportService._daily_initialized = True
self.client = CallClient(event_handler=self)
if self.mic_enabled:
@@ -143,6 +160,7 @@ class DailyTransportService(EventHandler):
self.client.set_user_name(self.bot_name)
self.client.join(self.room_url, self.token, completion=self.call_joined)
self.my_participant_id = self.client.participants()["local"]["id"]
self.client.update_inputs(
{
@@ -186,13 +204,11 @@ class DailyTransportService(EventHandler):
if self.token:
self.client.start_transcription(self.transcription_settings)
self.my_participant_id = self.client.participants()["local"]["id"]
async def get_receive_frames(self):
while True:
frame = await self.receive_queue.get()
yield frame
if frame.frame_type == FrameType.END_STREAM:
if isinstance(frame, EndStreamQueueFrame):
break
def get_async_send_queue(self):
@@ -203,7 +219,7 @@ class DailyTransportService(EventHandler):
frame: QueueFrame | list = await self.send_queue.get()
self.threadsafe_send_queue.put(frame)
self.send_queue.task_done()
if type(frame) == QueueFrame and frame.frame_type == FrameType.END_STREAM:
if isinstance(frame, EndStreamQueueFrame):
break
async def wait_for_send_queue_to_empty(self):
@@ -217,14 +233,14 @@ class DailyTransportService(EventHandler):
async def run(self) -> None:
self.configure_daily()
self.participant_left = False
self.do_shutdown = False
async_output_queue_marshal_task = asyncio.create_task(self.marshal_frames())
try:
participant_count: int = len(self.client.participants())
self.logger.info(f"{participant_count} participants in room")
while time.time() < self.expiration and not self.participant_left and not self.stop_threads.is_set():
while time.time() < self.expiration and not self.do_shutdown and not self.stop_threads.is_set():
await asyncio.sleep(1)
except Exception as e:
self.logger.error(f"Exception {e}")
@@ -233,8 +249,8 @@ class DailyTransportService(EventHandler):
self.stop_threads.set()
await self.receive_queue.put(QueueFrame(FrameType.END_STREAM, None))
await self.send_queue.put(QueueFrame(FrameType.END_STREAM, None))
await self.receive_queue.put(EndStreamQueueFrame())
await self.send_queue.put(EndStreamQueueFrame())
await async_output_queue_marshal_task
if self.camera_thread and self.camera_thread.is_alive():
@@ -263,8 +279,8 @@ class DailyTransportService(EventHandler):
self.on_first_other_participant_joined()
def on_participant_left(self, participant, reason):
if len(self.client.participants()) < 2:
self.participant_left = True
if len(self.client.participants()) < self.min_others_count + 1:
self.do_shutdown = True
pass
def on_app_message(self, message, sender):
@@ -272,7 +288,12 @@ class DailyTransportService(EventHandler):
def on_transcription_message(self, message:dict):
if self.loop:
frame = QueueFrame(FrameType.TRANSCRIPTION, message)
participantId = ""
if "participantId" in message:
participantId = message["participantId"]
elif "session_id" in message:
participantId = message["session_id"]
frame = TranscriptionQueueFrame(message["text"], participantId, message["timestamp"])
asyncio.run_coroutine_threadsafe(self.receive_queue.put(frame), self.loop)
def on_transcription_stopped(self, stopped_by, stopped_by_error):
@@ -305,15 +326,15 @@ class DailyTransportService(EventHandler):
while True:
try:
frames_or_frame: QueueFrame | list[QueueFrame] = self.threadsafe_send_queue.get()
if type(frames_or_frame) == QueueFrame:
if isinstance(frames_or_frame, QueueFrame):
frames: list[QueueFrame] = [frames_or_frame]
elif type(frames_or_frame) == list:
elif isinstance(frames_or_frame, list):
frames: list[QueueFrame] = frames_or_frame
else:
raise Exception("Unknown type in output queue")
for frame in frames:
if frame.frame_type == FrameType.END_STREAM:
if isinstance(frame, EndStreamQueueFrame):
self.logger.info("Stopping frame consumer thread")
self.threadsafe_send_queue.task_done()
return
@@ -321,8 +342,8 @@ class DailyTransportService(EventHandler):
# if interrupted, we just pull frames off the queue and discard them
if not self.is_interrupted.is_set():
if frame:
if frame.frame_type == FrameType.AUDIO:
chunk = frame.frame_data
if isinstance(frame, AudioQueueFrame):
chunk = frame.data
all_audio_frames.extend(chunk)
@@ -331,8 +352,8 @@ class DailyTransportService(EventHandler):
if l:
self.mic.write_frames(bytes(b[:l]))
b = b[l:]
elif frame.frame_type == FrameType.IMAGE:
self.set_image(frame.frame_data)
elif isinstance(frame, ImageQueueFrame):
self.set_image(frame.image)
elif len(b):
self.mic.write_frames(bytes(b))
b = bytearray()
@@ -343,7 +364,7 @@ class DailyTransportService(EventHandler):
)
self.interrupt_time = None
if frame.frame_type == FrameType.START_STREAM:
if isinstance(frame, StartStreamQueueFrame):
self.is_interrupted.clear()
self.threadsafe_send_queue.task_done()

View File

@@ -3,33 +3,27 @@ import unittest
from typing import AsyncGenerator, Generator
from dailyai.services.ai_services import AIService, SentenceAggregator
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.services.ai_services import AIService
from dailyai.queue_frame import EndStreamQueueFrame, QueueFrame, TextQueueFrame
class SimpleAIService(AIService):
def allowed_input_frame_types(self) -> set[FrameType]:
return set([FrameType.TEXT_CHUNK])
def possible_output_frame_types(self) -> set[FrameType]:
return set([FrameType.TEXT_CHUNK])
async def process_frame(self, requested_frame_types: set[FrameType], frame: QueueFrame) -> QueueFrame | None:
return frame
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
yield frame
class TestBaseAIService(unittest.IsolatedAsyncioTestCase):
async def test_async_input(self):
service = SimpleAIService()
input_frames = [
QueueFrame(FrameType.TEXT_CHUNK, "hello"),
QueueFrame(FrameType.END_STREAM, None),
TextQueueFrame("hello"),
EndStreamQueueFrame()
]
async def iterate_frames() -> AsyncGenerator[QueueFrame, None]:
for frame in input_frames:
yield frame
output_frames = []
async for frame in service.run(set([FrameType.TEXT_CHUNK]), iterate_frames()):
async for frame in service.run(iterate_frames()):
output_frames.append(frame)
self.assertEqual(input_frames, output_frames)
@@ -37,93 +31,18 @@ class TestBaseAIService(unittest.IsolatedAsyncioTestCase):
async def test_nonasync_input(self):
service = SimpleAIService()
input_frames = [
QueueFrame(FrameType.TEXT_CHUNK, "hello"),
QueueFrame(FrameType.END_STREAM, None),
]
input_frames = [TextQueueFrame("hello"), EndStreamQueueFrame()]
def iterate_frames() -> Generator[QueueFrame, None, None]:
for frame in input_frames:
yield frame
output_frames = []
async for frame in service.run(set([FrameType.TEXT_CHUNK]), iterate_frames()):
async for frame in service.run(iterate_frames()):
output_frames.append(frame)
self.assertEqual(input_frames, output_frames)
class TestSentenceAggregator(unittest.IsolatedAsyncioTestCase):
async def test_clause(self) -> None:
input_frames = [
QueueFrame(FrameType.TEXT_CHUNK, "hello"),
QueueFrame(FrameType.END_STREAM, None),
]
service = SentenceAggregator()
output_frames = []
async for frame in service.run(set([FrameType.SENTENCE]), input_frames):
output_frames.append(frame)
self.assertEqual(1, len(output_frames))
self.assertEqual(QueueFrame(FrameType.SENTENCE, "hello"), output_frames[0])
async def test_sentence(self) -> None:
input_frames = [
QueueFrame(FrameType.TEXT_CHUNK, "hello, "),
QueueFrame(FrameType.TEXT_CHUNK, "world."),
QueueFrame(FrameType.END_STREAM, None),
]
service = SentenceAggregator()
output_frames = []
async for frame in service.run(set([FrameType.SENTENCE]), input_frames):
output_frames.append(frame)
self.assertEqual(1, len(output_frames))
self.assertEqual(QueueFrame(FrameType.SENTENCE, "hello, world."), output_frames[0])
async def test_sentence_and_clause(self) -> None:
input_frames = [
QueueFrame(FrameType.TEXT_CHUNK, "hello, "),
QueueFrame(FrameType.TEXT_CHUNK, "world."),
QueueFrame(FrameType.TEXT_CHUNK, " How are"),
QueueFrame(FrameType.END_STREAM, None),
]
service = SentenceAggregator()
output_frames = []
async for frame in service.run(set([FrameType.SENTENCE]), input_frames):
output_frames.append(frame)
self.assertEqual(2, len(output_frames))
self.assertEqual(
QueueFrame(FrameType.SENTENCE, "hello, world."), output_frames[0]
)
self.assertEqual(
QueueFrame(FrameType.SENTENCE, " How are"), output_frames[1]
)
async def test_two_sentences(self) -> None:
input_frames = [
QueueFrame(FrameType.TEXT_CHUNK, "hello, "),
QueueFrame(FrameType.TEXT_CHUNK, "world."),
QueueFrame(FrameType.TEXT_CHUNK, " How are"),
QueueFrame(FrameType.TEXT_CHUNK, " you doing?"),
QueueFrame(FrameType.END_STREAM, None),
]
service = SentenceAggregator()
output_frames = []
async for frame in service.run(set([FrameType.SENTENCE]), input_frames):
output_frames.append(frame)
self.assertEqual(2, len(output_frames))
self.assertEqual(
QueueFrame(FrameType.SENTENCE, "hello, world."), output_frames[0]
)
self.assertEqual(QueueFrame(FrameType.SENTENCE, " How are you doing?"), output_frames[1])
if __name__ == "__main__":
unittest.main()

View File

@@ -1,10 +1,7 @@
import argparse
import asyncio
from typing import AsyncGenerator
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureTTSService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
async def main(room_url):

View File

@@ -1,10 +1,8 @@
import argparse
import asyncio
from typing import AsyncGenerator
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.queue_frame import LLMMessagesQueueFrame
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.ai_services import SentenceAggregator
from dailyai.services.azure_ai_services import AzureLLMService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
@@ -28,9 +26,7 @@ async def main(room_url):
tts_task = asyncio.create_task(
tts.run_to_queue(
transport.send_queue,
SentenceAggregator().run(
llm.run([QueueFrame(FrameType.LLM_MESSAGE, messages)])
)
llm.run([LLMMessagesQueueFrame(messages)]),
)
)

View File

@@ -1,7 +1,7 @@
import argparse
import asyncio
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.queue_frame import TextQueueFrame
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.open_ai_services import OpenAIImageGenService
@@ -23,7 +23,7 @@ async def main(room_url):
imagegen = OpenAIImageGenService(image_size="1024x1024")
image_task = asyncio.create_task(
imagegen.run_to_queue(transport.send_queue, [QueueFrame(FrameType.IMAGE_DESCRIPTION, "a cat in the style of picasso")])
imagegen.run_to_queue(transport.send_queue, [TextQueueFrame("a cat in the style of picasso")])
)
@transport.event_handler("on_participant_joined")

View File

@@ -2,10 +2,9 @@ import argparse
import asyncio
import re
from dailyai.services.ai_services import SentenceAggregator
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.queue_frame import EndStreamQueueFrame, LLMMessagesQueueFrame
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
async def main(room_url:str):
@@ -36,9 +35,7 @@ async def main(room_url:str):
llm_response_task = asyncio.create_task(
elevenlabs_tts.run_to_queue(
buffer_queue,
SentenceAggregator().run(
llm.run([QueueFrame(FrameType.LLM_MESSAGE, messages)])
),
llm.run([LLMMessagesQueueFrame(messages)]),
True,
)
)
@@ -48,17 +45,14 @@ async def main(room_url:str):
if participant["id"] == transport.my_participant_id:
return
await azure_tts.run_to_queue(
transport.send_queue,
[QueueFrame(FrameType.SENTENCE, "My friend the LLM is now going to tell a joke about llamas.")]
)
await azure_tts.say("My friend the LLM is now going to tell a joke about llamas.", transport.send_queue)
async def buffer_to_send_queue():
while True:
frame = await buffer_queue.get()
await transport.send_queue.put(frame)
buffer_queue.task_done()
if frame.frame_type == FrameType.END_STREAM:
if isinstance(frame, EndStreamQueueFrame):
break
await asyncio.gather(llm_response_task, buffer_to_send_queue())

View File

@@ -1,14 +1,9 @@
import argparse
import asyncio
from asyncio.queues import Queue
import re
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.services.ai_services import SentenceAggregator
from dailyai.queue_frame import AudioQueueFrame, ImageQueueFrame
from dailyai.services.azure_ai_services import AzureLLMService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.open_ai_services import OpenAIImageGenService
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.fal_ai_services import FalImageGenService
@@ -50,14 +45,20 @@ async def main(room_url):
]
image_description = await llm.run_llm(messages)
if not image_description:
return
to_speak = f"{month}: {image_description}"
audio_task = asyncio.create_task(get_all_audio(to_speak))
image_task = asyncio.create_task(dalle.run_image_gen(image_description))
(audio, image_data) = await asyncio.gather(
get_all_audio(to_speak), dalle.run_image_gen(image_description)
audio_task, image_task
)
return {
"month": month,
"text": image_description,
"image_url": image_data[0],
"image": image_data[1],
"audio": audio,
}
@@ -86,8 +87,8 @@ async def main(room_url):
data = await month_data_task
await transport.send_queue.put(
[
QueueFrame(FrameType.IMAGE, data["image"]),
QueueFrame(FrameType.AUDIO, data["audio"]),
ImageQueueFrame(data["image_url"], data["image"]),
AudioQueueFrame(data["audio"]),
]
)

View File

@@ -6,7 +6,7 @@ import urllib.parse
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.queue_aggregators import LLMContextAggregator
async def main(room_url:str, token):
global transport
@@ -17,7 +17,7 @@ async def main(room_url:str, token):
room_url,
token,
"Respond bot",
1,
5,
)
transport.mic_enabled = True
transport.mic_sample_rate = 16000
@@ -26,33 +26,31 @@ async def main(room_url:str, token):
llm = AzureLLMService()
tts = AzureTTSService()
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
await tts.say("Hi, I'm listening!", transport.send_queue)
async def handle_transcriptions():
messages = [
{"role": "system", "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way."},
]
sentence = ""
async for frame in transport.get_receive_frames():
if frame.frame_type != FrameType.TRANSCRIPTION:
continue
message = frame.frame_data
if message["session_id"] == transport.my_participant_id:
continue
# todo: we could differentiate between transcriptions from different participants
sentence += message["text"]
if sentence.endswith((".", "?", "!")):
messages.append({"role": "user", "content": sentence})
sentence = ''
full_response = ""
async for response in llm.run_llm_async_sentences(messages):
full_response += response
async for audio in tts.run_tts(response):
await transport.send_queue.put(QueueFrame(FrameType.AUDIO, audio))
messages.append({"role": "assistant", "content": full_response})
tma_in = LLMContextAggregator(
messages, "user", transport.my_participant_id
)
tma_out = LLMContextAggregator(
messages, "assistant", transport.my_participant_id
)
await tts.run_to_queue(
transport.send_queue,
tma_out.run(
llm.run(
tma_in.run(
transport.get_receive_frames()
)
)
)
)
transport.transcription_settings["extra"]["punctuate"] = True
await asyncio.gather(transport.run(), handle_transcriptions())