Compare commits

..

11 Commits

Author SHA1 Message Date
Kwindla Hultman Kramer
363a722370 example 02 2024-03-03 17:16:46 -08:00
Kwindla Hultman Kramer
0512765aeb example 01 2024-03-03 16:38:03 -08:00
Moishe Lettvin
2d8d9146f8 ... 2024-02-28 14:58:28 -05:00
Moishe Lettvin
9829f77052 sample 05 works 2024-02-28 13:49:08 -05:00
Moishe Lettvin
8071ee6b4b starting on sample 5 2024-02-28 13:12:08 -05:00
Moishe Lettvin
d26d23fed7 merge aggregator & test 2024-02-28 11:05:49 -05:00
Moishe Lettvin
38cb8ca6a3 ...? 2024-02-27 20:47:08 -05:00
Moishe Lettvin
d03dd62941 probably will undo most of this 2024-02-27 18:52:01 -05:00
Moishe Lettvin
fc19a55f04 ... 2024-02-27 13:51:55 -05:00
Moishe Lettvin
8f9c252af9 ... 2024-02-27 13:03:07 -05:00
Moishe Lettvin
cec2bdc15f playing around with this 2024-02-27 09:55:40 -05:00
21 changed files with 487 additions and 962 deletions

View File

@@ -12,16 +12,11 @@ dependencies = [
"daily-python",
"fal",
"faster_whisper",
"groq",
"google-cloud-texttospeech",
"numpy",
"openai",
"Pillow",
"pyht",
"python-dotenv",
"torch",
"torchaudio",
"pyaudio",
"typing-extensions"
]

View File

@@ -1,29 +1,106 @@
import asyncio
from dailyai.queue_frame import LLMMessagesQueueFrame, QueueFrame, TextQueueFrame, TranscriptionQueueFrame
from dailyai.services.ai_services import AIService
from attr import dataclass
from typing import AsyncGenerator, List
from dailyai.queue_frame import (
ControlQueueFrame,
EndStreamQueueFrame,
LLMMessagesQueueFrame,
QueueFrame,
TextQueueFrame,
TranscriptionQueueFrame,
)
from dailyai.services.ai_services import AIService, PipeService
from typing import Any, AsyncGenerator, Callable, List, Tuple
class QueueTee:
async def run_to_queue_and_generate(
self,
output_queue: asyncio.Queue,
generator: AsyncGenerator[QueueFrame, None]
) -> AsyncGenerator[QueueFrame, None]:
async for frame in generator:
await output_queue.put(frame)
yield frame
async def run_to_queues(
self,
output_queues: List[asyncio.Queue],
generator: AsyncGenerator[QueueFrame, None]
class QueueTee(PipeService):
def __init__(
self, sinks: list[PipeService], *args, **kwargs
):
async for frame in generator:
for queue in output_queues:
await queue.put(frame)
super().__init__(*args, **kwargs)
self.sinks: List[PipeService] = []
for sink in sinks:
sink.source_queue = asyncio.Queue()
self.sinks.append(sink)
async def process_queue(self):
if not self.source_queue:
return
while True:
frame: QueueFrame = await self.source_queue.get()
for sink in self.sinks:
if sink.source_queue:
await sink.source_queue.put(frame)
if isinstance(frame, EndStreamQueueFrame):
break
class QueueFrameAggregator(PipeService):
def __init__(
self,
aggregator: Callable[[Any, QueueFrame], Tuple[Any, QueueFrame | None]],
finalizer: Callable[[Any], QueueFrame | None],
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.aggregator = aggregator
self.finalizer = finalizer
self.aggregation = None
async def process_frame(
self, frame: QueueFrame
) -> AsyncGenerator[QueueFrame, None]:
output_frame: QueueFrame | None = None
(self.aggregation, output_frame) = self.aggregator(
self.aggregation, frame
)
if output_frame:
yield output_frame
async def finalize(self) -> AsyncGenerator[QueueFrame, None]:
output_frame = self.finalizer(self.aggregation)
if output_frame:
yield output_frame
class QueueMergeGateOnFirst(PipeService):
def __init__(
self, source_queues: List[asyncio.Queue[QueueFrame]]
):
super().__init__()
self.source_queues = source_queues
async def process_queue(self):
(frames): list[QueueFrame] = await asyncio.gather(
*[source_queue.get() for source_queue in self.source_queues]
)
for idx, frame in enumerate(frames):
# if the frame we got from a source is an EndStreamQueueFrame, remove that source
if isinstance(frame, EndStreamQueueFrame):
self.source_queues.pop(idx)
else:
await self.sink_queue.put(frame)
async def pass_through(sink, source):
while True:
frame = await source.get()
if isinstance(frame, EndStreamQueueFrame):
break
else:
await sink.put(frame)
await asyncio.gather(
*[pass_through(self.sink_queue, source) for source in self.source_queues]
)
await self.sink_queue.put(EndStreamQueueFrame())
class LLMContextAggregator(AIService):

View File

@@ -23,14 +23,6 @@ class LLMResponseEndQueueFrame(QueueFrame):
pass
class UserStartedSpeakingFrame(QueueFrame):
pass
class UserStoppedSpeakingFrame(QueueFrame):
pass
@dataclass()
class AudioQueueFrame(QueueFrame):
data: bytes
@@ -52,17 +44,6 @@ class TextQueueFrame(QueueFrame):
text: str
@dataclass()
class TextQueueOutOfBandFrame(TextQueueFrame):
outOfBand: bool = True
@dataclass()
class TTSCompletedFrame(QueueFrame):
text: str
outOfBand: bool = False
@dataclass()
class TranscriptionQueueFrame(TextQueueFrame):
participantId: str

View File

@@ -2,32 +2,73 @@ import asyncio
import io
import logging
import time
import datetime
import wave
from dailyai.queue_frame import (
QueueFrame,
AudioQueueFrame,
ControlQueueFrame,
EndStreamQueueFrame,
ImageQueueFrame,
LLMMessagesQueueFrame,
LLMResponseEndQueueFrame,
QueueFrame,
TextQueueFrame,
TTSCompletedFrame,
TranscriptionQueueFrame,
UserStoppedSpeakingFrame
)
from abc import abstractmethod
from typing import AsyncGenerator, AsyncIterable, BinaryIO, Iterable
from dataclasses import dataclass
class AbstractPipeService:
def __init__(
self,
):
self.sink_queue: asyncio.Queue[QueueFrame] = asyncio.Queue()
@abstractmethod
async def process_queue(self):
pass
class PipeService(AbstractPipeService):
def __init__(
self,
source_queue: asyncio.Queue[QueueFrame] | None = None,
):
super().__init__()
self.logger: logging.Logger = logging.getLogger("dailyai")
self.source_queue = source_queue
async def process_queue(self):
if not self.source_queue:
return
while True:
frame: QueueFrame = await self.source_queue.get()
async for output_frame in self.process_frame(frame):
if isinstance(frame, EndStreamQueueFrame):
async for final_frame in self.finalize():
await self.sink_queue.put(final_frame)
await self.sink_queue.put(output_frame)
return
await self.sink_queue.put(output_frame)
@abstractmethod
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
yield frame
@abstractmethod
async def finalize(self) -> AsyncGenerator[QueueFrame, None]:
# This is a trick for the interpreter (and linter) to know that this is a generator.
if False:
yield QueueFrame()
class AIService:
class AIService(PipeService):
def __init__(self):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.logger = logging.getLogger("dailyai")
def stop(self):
@@ -71,24 +112,8 @@ class AIService:
self.logger.error("Exception occurred while running AI service", e)
raise e
@abstractmethod
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if isinstance(frame, ControlQueueFrame):
yield frame
@abstractmethod
async def finalize(self) -> AsyncGenerator[QueueFrame, None]:
# This is a trick for the interpreter (and linter) to know that this is a generator.
if False:
yield QueueFrame()
class LLMService(AIService):
def __init__(self, context):
super().__init__()
self._context = context
@abstractmethod
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
yield ""
@@ -98,28 +123,17 @@ class LLMService(AIService):
pass
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
print(f"##### process frame got a frame, {type(frame)}")
if isinstance(frame, UserStoppedSpeakingFrame):
print(
f"### Got a user stopped speaking frame, context is {self._context}")
async for chunk in self.run_llm_async(self._context):
# if we get a string, wrap it in a frame
if isinstance(chunk, str):
yield TextQueueFrame(chunk)
# if we get a frame, pass it through
elif isinstance(chunk, QueueFrame):
print(f"### Got a frame chunk: {chunk}")
yield chunk
else:
print(f"### Got an unknown chunk: {chunk}")
if isinstance(frame, LLMMessagesQueueFrame):
async for text_chunk in self.run_llm_async(frame.messages):
yield TextQueueFrame(text_chunk)
yield LLMResponseEndQueueFrame()
else:
yield frame
class TTSService(AIService):
def __init__(self, aggregate_sentences=True):
super().__init__()
def __init__(self, aggregate_sentences=True, *args, **kwargs):
super().__init__(*args, **kwargs)
self.aggregate_sentences: bool = aggregate_sentences
self.current_sentence: str = ""
@@ -136,12 +150,6 @@ class TTSService(AIService):
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if not isinstance(frame, TextQueueFrame):
# We don't want transcription frames, which are a subclass
yield frame
return
# TODO-CB: Clean this up
if isinstance(frame, TranscriptionQueueFrame):
yield frame
return
@@ -156,11 +164,7 @@ class TTSService(AIService):
if text:
async for audio_chunk in self.run_tts(text):
size = 8000
for i in range(0, len(audio_chunk), size):
yield AudioQueueFrame(audio_chunk[i: i+size])
print("### ABOUT TO YIELD TTS COMPLETED FRAME", frame)
yield TTSCompletedFrame(text, hasattr(frame, 'outOfBand') and frame.outOfBand)
yield AudioQueueFrame(audio_chunk)
async def finalize(self):
if self.current_sentence:
@@ -168,7 +172,10 @@ class TTSService(AIService):
yield AudioQueueFrame(audio_chunk)
# Convenience function to send the audio for a sentence to the given queue
async def say(self, sentence, queue: asyncio.Queue):
async def say(self, sentence, queue: asyncio.Queue|None=None):
queue = queue or self.sink
if not queue:
raise Exception("No queue to send audio to")
await self.run_to_queue(queue, [TextQueueFrame(sentence)])
@@ -230,9 +237,8 @@ class FrameLogger(AIService):
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if isinstance(frame, (AudioQueueFrame, ImageQueueFrame)):
self.logger.info(
f"{datetime.datetime.utcnow().isoformat()} {self.prefix}: {type(frame)}")
self.logger.info(f"{self.prefix}: {type(frame)}")
else:
print(f"{datetime.datetime.utcnow().isoformat()} {self.prefix}: {frame}")
print(f"{self.prefix}: {frame}")
yield frame

View File

@@ -42,16 +42,23 @@ class AzureTTSService(TTSService):
yield result.audio_data[44:]
elif result.reason == ResultReason.Canceled:
cancellation_details = result.cancellation_details
self.logger.info("Speech synthesis canceled: {}".format(
cancellation_details.reason))
self.logger.info("Speech synthesis canceled: {}".format(cancellation_details.reason))
if cancellation_details.reason == CancellationReason.Error:
self.logger.info("Error details: {}".format(
cancellation_details.error_details))
self.logger.info("Error details: {}".format(cancellation_details.error_details))
class AzureLLMService(LLMService):
def __init__(self, *, api_key, endpoint, api_version="2023-12-01-preview", model, context):
super().__init__(context)
def __init__(
self,
api_key,
endpoint,
model,
api_version="2023-12-01-preview",
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self._model: str = model
self._client = AsyncAzureOpenAI(
@@ -104,8 +111,7 @@ class AzureImageGenServiceREST(ImageGenService):
async def run_image_gen(self, sentence) -> tuple[str, bytes]:
url = f"{self._azure_endpoint}openai/images/generations:submit?api-version={self._api_version}"
headers = {"api-key": self._api_key,
"Content-Type": "application/json"}
headers = {"api-key": self._api_key, "Content-Type": "application/json"}
body = {
# Enter your prompt text here
"prompt": sentence,

View File

@@ -1,23 +1,11 @@
from abc import abstractmethod
import asyncio
import copy
import functools
import itertools
import logging
import queue
import threading
import time
from typing import AsyncGenerator
import numpy as np
import pyaudio
import torch
import torchaudio
from enum import Enum
import datetime
import traceback
from typing import AsyncGenerator, AsyncIterable, BinaryIO, Iterable
from dailyai.queue_aggregators import LLMAssistantContextAggregator, LLMUserContextAggregator
from dailyai.queue_frame import (
AudioQueueFrame,
@@ -26,59 +14,8 @@ from dailyai.queue_frame import (
QueueFrame,
SpriteQueueFrame,
StartStreamQueueFrame,
TranscriptionQueueFrame,
TTSCompletedFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame
)
torch.set_num_threads(1)
model, utils = torch.hub.load(repo_or_dir='snakers4/silero-vad',
model='silero_vad',
force_reload=False)
(get_speech_timestamps,
save_audio,
read_audio,
VADIterator,
collect_chunks) = utils
# Taken from utils_vad.py
def validate(model,
inputs: torch.Tensor):
with torch.no_grad():
outs = model(inputs)
return outs
# Provided by Alexander Veysov
def int2float(sound):
abs_max = np.abs(sound).max()
sound = sound.astype('float32')
if abs_max > 0:
sound *= 1/32768
sound = sound.squeeze() # depends on the use case
return sound
FORMAT = pyaudio.paInt16
CHANNELS = 1
SAMPLE_RATE = 16000
CHUNK = int(SAMPLE_RATE / 10)
audio = pyaudio.PyAudio()
class VADState(Enum):
QUIET = 1
STARTING = 2
SPEAKING = 3
STOPPING = 4
class BaseTransportService():
@@ -94,17 +31,6 @@ class BaseTransportService():
self._speaker_enabled = kwargs.get("speaker_enabled") or False
self._speaker_sample_rate = kwargs.get("speaker_sample_rate") or 16000
self._fps = kwargs.get("fps") or 8
self._vad_start_s = kwargs.get("vad_start_s") or 0.2
self._vad_stop_s = kwargs.get("vad_stop_s") or 0.5
self._context = kwargs.get("context") or []
self._vad_samples = 1536
vad_frame_s = self._vad_samples / SAMPLE_RATE
self._vad_start_frames = round(self._vad_start_s / vad_frame_s)
self._vad_stop_frames = round(self._vad_stop_s / vad_frame_s)
self._vad_starting_count = 0
self._vad_stopping_count = 0
self._vad_state = VADState.QUIET
duration_minutes = kwargs.get("duration_minutes") or 10
self._expiration = time.time() + duration_minutes * 60
@@ -115,8 +41,6 @@ class BaseTransportService():
self._threadsafe_send_queue = queue.Queue()
self._images = None
self._user_is_speaking = False
self._current_phrase = ""
try:
self._loop: asyncio.AbstractEventLoop | None = asyncio.get_running_loop()
@@ -128,94 +52,20 @@ class BaseTransportService():
self._logger: logging.Logger = logging.getLogger()
def update_messages(self, new_context: list[dict[str, str]], task: asyncio.Task | None):
if task:
if not task.cancelled():
self._current_phrase = ""
self._context = new_context
def append_to_context(self, role, chunk_or_text):
print("IN APPEND", chunk_or_text)
# if we get a non-string, append it to the context without further error checking
# unless the outOfBand property is True
if not isinstance(chunk_or_text, str):
if not chunk_or_text.get("outOfBand") == True:
self._context.append(chunk_or_text)
return
text = chunk_or_text
last_context_item = self._context[-1]
print("TEXT", text)
print("LAST CONTEXT ITEM", last_context_item)
traceback.print_stack()
if last_context_item and last_context_item['role'] == role:
last_context_item['content'] += f" {text}"
else:
self._context.append({"role": role, "content": text})
async def run_pipeline(self, frame):
print(f"starting to speak_after_delay, {frame}")
# TODO-CB: This exception for missing class gets eaten!
await self._runner(frame)
async def run_conversation(self, runner: Iterable[QueueFrame]
| AsyncIterable[QueueFrame]
| asyncio.Queue[QueueFrame],
) -> AsyncGenerator[QueueFrame, None]:
current_response_task = None
self._runner = runner
async for frame in self.get_receive_frames():
print(f"got frame of type: {type(frame)}, {frame}")
if isinstance(frame, EndStreamQueueFrame):
break
# elif not isinstance(frame, TranscriptionQueueFrame):
# continue
# TODO-CB: Verify this is an accurate replacement
# if hasattr(frame, 'participantId') and frame.participantId == self._my_participant_id:
if not isinstance(frame, UserStoppedSpeakingFrame):
continue
if current_response_task:
# TODO-CB: Maybe not always interrupt? Are there frame types we can pass through?
current_response_task.cancel()
self.interrupt()
# self._current_phrase += " " + frame.text
# current_llm_context = copy.deepcopy(self._context)
current_response_task = asyncio.create_task(
self.run_pipeline(
frame)
)
current_response_task.add_done_callback(
functools.partial(self.update_messages, self._context)
)
async def run(self):
self._prerun()
async_output_queue_marshal_task = asyncio.create_task(
self._marshal_frames())
async_output_queue_marshal_task = asyncio.create_task(self._marshal_frames())
self._camera_thread = threading.Thread(
target=self._run_camera, daemon=True)
self._camera_thread = threading.Thread(target=self._run_camera, daemon=True)
self._camera_thread.start()
self._frame_consumer_thread = threading.Thread(
target=self._frame_consumer, daemon=True)
self._frame_consumer_thread = threading.Thread(target=self._frame_consumer, daemon=True)
self._frame_consumer_thread.start()
if self._speaker_enabled:
# TODO-CB: This is interesting
# self._receive_audio_thread = threading.Thread(
# target=self._receive_audio, daemon=True)
# self._receive_audio_thread.start()
self._vad_thread = threading.Thread(target=self._vad, daemon=True)
self._vad_thread.start()
self._receive_audio_thread = threading.Thread(target=self._receive_audio, daemon=True)
self._receive_audio_thread.start()
try:
while (
@@ -232,7 +82,6 @@ class BaseTransportService():
self._stop_threads.set()
await self.send_queue.put(EndStreamQueueFrame())
await async_output_queue_marshal_task
await self.send_queue.join()
self._frame_consumer_thread.join()
@@ -272,61 +121,6 @@ class BaseTransportService():
def _prerun(self):
pass
def _vad(self):
# CB: Starting silero VAD stuff
# TODO-CB: Probably need to force virtual speaker creation if we're
# going to build this in?
# TODO-CB: pyaudio installation
while not self._stop_threads.is_set():
audio_chunk = self.read_audio_frames(self._vad_samples)
audio_int16 = np.frombuffer(audio_chunk, np.int16)
audio_float32 = int2float(audio_int16)
new_confidence = model(
torch.from_numpy(audio_float32), 16000).item()
speaking = new_confidence > 0.5
if speaking:
match self._vad_state:
case VADState.QUIET:
self._vad_state = VADState.STARTING
self._vad_starting_count = 1
case VADState.STARTING:
self._vad_starting_count += 1
case VADState.STOPPING:
self._vad_state = VADState.SPEAKING
self._vad_stopping_count = 0
else:
match self._vad_state:
case VADState.STARTING:
self._vad_state = VADState.QUIET
self._vad_starting_count = 0
case VADState.SPEAKING:
self._vad_state = VADState.STOPPING
self._vad_stopping_count = 1
case VADState.STOPPING:
self._vad_stopping_count += 1
if self._vad_state == VADState.STARTING and self._vad_starting_count >= self._vad_start_frames:
print(
f'!!! {datetime.datetime.utcnow().isoformat()} queueing start frame')
asyncio.run_coroutine_threadsafe(
self.receive_queue.put(
UserStartedSpeakingFrame()), self._loop
)
print(f"!!! VAD started, calling interrupt")
self.interrupt()
self._vad_state = VADState.SPEAKING
self._vad_starting_count = 0
if self._vad_state == VADState.STOPPING and self._vad_stopping_count >= self._vad_stop_frames:
print(
f'!!! {datetime.datetime.utcnow().isoformat()} queueing stop frame')
asyncio.run_coroutine_threadsafe(
self.receive_queue.put(
UserStoppedSpeakingFrame()), self._loop
)
self._vad_state = VADState.QUIET
self._vad_stopping_count = 0
async def _marshal_frames(self):
while True:
frame: QueueFrame | list = await self.send_queue.get()
@@ -336,7 +130,6 @@ class BaseTransportService():
break
def interrupt(self):
print(f"!!! setting interrupt")
self._is_interrupted.set()
async def get_receive_frames(self) -> AsyncGenerator[QueueFrame, None]:
@@ -411,6 +204,7 @@ class BaseTransportService():
if frame:
if isinstance(frame, AudioQueueFrame):
chunk = frame.data
all_audio_frames.extend(chunk)
b.extend(chunk)
@@ -418,27 +212,21 @@ class BaseTransportService():
len(b) % smallest_write_size
)
if truncated_length:
self.write_frame_to_mic(
bytes(b[:truncated_length]))
self.write_frame_to_mic(bytes(b[:truncated_length]))
b = b[truncated_length:]
elif isinstance(frame, ImageQueueFrame):
self._set_image(frame.image)
elif isinstance(frame, SpriteQueueFrame):
self._set_images(frame.images)
elif isinstance(frame, TTSCompletedFrame) and not frame.outOfBand:
self.append_to_context(
"assistant", frame.text)
elif len(b):
self.write_frame_to_mic(bytes(b))
b = bytearray()
else:
# if there are leftover audio bytes, write them now; failing to do so
# can cause static in the audio stream.
print(f"!!! interrupted, flushing audio")
if len(b):
truncated_length = len(b) - (len(b) % 160)
self.write_frame_to_mic(
bytes(b[:truncated_length]))
self.write_frame_to_mic(bytes(b[:truncated_length]))
b = bytearray()
if isinstance(frame, StartStreamQueueFrame):
@@ -451,6 +239,5 @@ class BaseTransportService():
b = bytearray()
except Exception as e:
self._logger.error(
f"Exception in frame_consumer: {e}, {len(b)}")
self._logger.error(f"Exception in frame_consumer: {e}, {len(b)}")
raise e

View File

@@ -1,4 +1,18 @@
from dailyai.services.base_transport_service import BaseTransportService
import asyncio
import inspect
import logging
import signal
import threading
import types
from functools import partial
from dailyai.queue_frame import (
TranscriptionQueueFrame,
)
from threading import Event
from daily import (
EventHandler,
CallClient,
@@ -7,61 +21,8 @@ from daily import (
VirtualMicrophoneDevice,
VirtualSpeakerDevice,
)
from threading import Event
from dailyai.queue_frame import (
TranscriptionQueueFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame
)
from functools import partial
import types
import pyaudio
import torchaudio
import asyncio
import inspect
import io
import logging
import numpy as np
import signal
import threading
import torch
torch.set_num_threads(1)
model, utils = torch.hub.load(repo_or_dir='snakers4/silero-vad',
model='silero_vad',
force_reload=False)
(get_speech_timestamps,
save_audio,
read_audio,
VADIterator,
collect_chunks) = utils
# Taken from utils_vad.py
def validate(model,
inputs: torch.Tensor):
with torch.no_grad():
outs = model(inputs)
return outs
# Provided by Alexander Veysov
def int2float(sound):
abs_max = np.abs(sound).max()
sound = sound.astype('float32')
if abs_max > 0:
sound *= 1/32768
sound = sound.squeeze() # depends on the use case
return sound
FORMAT = pyaudio.paInt16
CHANNELS = 1
SAMPLE_RATE = 16000
CHUNK = int(SAMPLE_RATE / 10)
audio = pyaudio.PyAudio()
from dailyai.services.base_transport_service import BaseTransportService
class DailyTransportService(BaseTransportService, EventHandler):
@@ -84,8 +45,7 @@ class DailyTransportService(BaseTransportService, EventHandler):
start_transcription: bool = False,
**kwargs,
):
# This will call BaseTransportService.__init__ method, not EventHandler
super().__init__(**kwargs)
super().__init__(**kwargs) # This will call BaseTransportService.__init__ method, not EventHandler
self._room_url: str = room_url
self._bot_name: str = bot_name
@@ -120,8 +80,7 @@ class DailyTransportService(BaseTransportService, EventHandler):
for handler in self._event_handlers[event_name]:
if inspect.iscoroutinefunction(handler):
if self._loop:
asyncio.run_coroutine_threadsafe(
handler(*args, **kwargs), self._loop)
asyncio.run_coroutine_threadsafe(handler(*args, **kwargs), self._loop)
else:
raise Exception(
"No event loop to run coroutine. In order to use async event handlers, you must run the DailyTransportService in an asyncio event loop.")
@@ -133,8 +92,7 @@ class DailyTransportService(BaseTransportService, EventHandler):
def add_event_handler(self, event_name: str, handler):
if not event_name.startswith("on_"):
raise Exception(
f"Event handler {event_name} must start with 'on_'")
raise Exception(f"Event handler {event_name} must start with 'on_'")
methods = inspect.getmembers(self, predicate=inspect.ismethod)
if event_name not in [method[0] for method in methods]:
@@ -147,8 +105,7 @@ class DailyTransportService(BaseTransportService, EventHandler):
handler, self)]
setattr(self, event_name, partial(self._patch_method, event_name))
else:
self._event_handlers[event_name].append(
types.MethodType(handler, self))
self._event_handlers[event_name].append(types.MethodType(handler, self))
def event_handler(self, event_name: str):
def decorator(handler):
@@ -192,8 +149,7 @@ class DailyTransportService(BaseTransportService, EventHandler):
Daily.select_speaker_device("speaker")
self.client.set_user_name(self._bot_name)
self.client.join(self._room_url, self._token,
completion=self.call_joined)
self.client.join(self._room_url, self._token, completion=self.call_joined)
self._my_participant_id = self.client.participants()["local"]["id"]
self.client.update_inputs(
@@ -249,7 +205,7 @@ class DailyTransportService(BaseTransportService, EventHandler):
def _post_run(self):
self.client.leave()
def on_first_other_participant_joined(self):
def on_first_other_participant_joined(self, participant):
pass
def call_joined(self, join_data, client_error):
@@ -270,47 +226,24 @@ class DailyTransportService(BaseTransportService, EventHandler):
def on_participant_joined(self, participant):
if not self._other_participant_has_joined and participant["id"] != self._my_participant_id:
self._other_participant_has_joined = True
self.on_first_other_participant_joined()
self.on_first_other_participant_joined(participant)
def on_participant_left(self, participant, reason):
if len(self.client.participants()) < self._min_others_count + 1:
self._stop_threads.set()
async def insert_speech(self, text, sender, date):
await self.receive_queue.put(UserStartedSpeakingFrame())
await asyncio.sleep(0.3)
# frame = TranscriptionQueueFrame(text, sender, date)
# await self.receive_queue.put(frame)
self.on_transcription_message({
"text": text,
"participantId": "cb65b845-aac0-4fc8-987d-2e7ce3c7d8f0",
"timestamp": date
})
await asyncio.sleep(0.3)
await self.receive_queue.put(UserStoppedSpeakingFrame())
def on_app_message(self, message, sender):
if self._loop:
print("APP MESSAGE", message)
asyncio.run_coroutine_threadsafe(
self.insert_speech(message["message"], sender, message["date"]), self._loop)
pass
def on_transcription_message(self, message: dict):
if self._loop:
print(f"transcription: {message}")
participantId = ""
if "participantId" in message:
participantId = message["participantId"]
elif "session_id" in message:
participantId = message["session_id"]
frame = TranscriptionQueueFrame(
message["text"], participantId, message["timestamp"])
if self._my_participant_id and participantId != self._my_participant_id:
self.append_to_context("user", message["text"])
asyncio.run_coroutine_threadsafe(
self.receive_queue.put(frame), self._loop)
frame = TranscriptionQueueFrame(message["text"], participantId, message["timestamp"])
asyncio.run_coroutine_threadsafe(self.receive_queue.put(frame), self._loop)
def on_transcription_stopped(self, stopped_by, stopped_by_error):
pass

View File

@@ -12,12 +12,13 @@ class ElevenLabsTTSService(TTSService):
def __init__(
self,
*,
aiohttp_session: aiohttp.ClientSession,
api_key,
voice_id,
*args,
**kwargs
):
super().__init__()
super().__init__(*args, **kwargs)
self._api_key = api_key
self._voice_id = voice_id

View File

@@ -32,8 +32,7 @@ class FalImageGenService(ImageGenService):
handler = fal.apps.submit(
"110602490-fast-sdxl",
arguments={
"prompt": sentence,
"seed": 23
"prompt": sentence
},
)
for event in handler.iter_events():

View File

@@ -1,122 +0,0 @@
import aiohttp
from PIL import Image
import io
from openai import AsyncOpenAI
import asyncio
import json
from collections.abc import AsyncGenerator
from dailyai.services.ai_services import LLMService, ImageGenService
from dailyai.queue_frame import (TextQueueFrame, TextQueueOutOfBandFrame)
class FireworksLLMService(LLMService):
def __init__(self, *, api_key, model="", tools=[], context, change_appearance, transport=""):
super().__init__(context)
self._model = model
self._tools = tools
self._change_appearance = change_appearance
self._transport = transport
self._client = AsyncOpenAI(
api_key=api_key,
base_url="https://api.fireworks.ai/inference/v1"
)
async def get_response(self, messages, stream):
print("GET RESPONSE ... WHEN DO WE EXPECT THIS TO BE CALLED?")
return await self._client.chat.completions.create(
stream=stream,
messages=messages,
model=self._model,
temperature=0.1,
tools=self._tools
)
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
print("IN ASYNC")
messages_for_log = json.dumps(messages)
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
chunks = await self._client.chat.completions.create(
model=self._model,
stream=True, # BLARGH
messages=messages,
temperature=0.1,
tools=self._tools
)
tool_call = {}
async for chunk in chunks:
print(f"CHUNK: {chunk}")
if len(chunk.choices) == 0:
continue
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
if chunk.choices[0].delta.tool_calls:
print(f"TOOL CALLS: {chunk.choices[0].delta.tool_calls[0]}")
if chunk.choices[0].delta.tool_calls[0].function.name:
tool_call["id"] = chunk.choices[0].delta.tool_calls[0].id
tool_call["name"] = chunk.choices[0].delta.tool_calls[0].function.name
tool_call["arguments"] = ''
if chunk.choices[0].delta.tool_calls[0].function.arguments:
tool_call["arguments"] += chunk.choices[0].delta.tool_calls[0].function.arguments
if chunk.choices[0].finish_reason:
print(f"TOOL CALLS ACCUM -- {tool_call}")
if tool_call.get("name"):
# hard coding tool call action for now. we should assemble the tool call
# from the streaming response, then yield it to the pipeline.
# this approach works for the first few change appearance requests but
# then the model starts refusing. need to read more about function
# calling, try this with the OpenAI APIs, and talk to the Fireworks people.
self._transport.append_to_context("assistant", {
# pipeline will append the content to this context after it goes
# through tts. we need to manually append the tool call, though
"content": "",
"role": "assistant",
"tool_calls": [
{
"id": tool_call["id"],
"type": "function",
"index": 0,
"function": {
"name": tool_call["name"],
"arguments": tool_call["arguments"]
},
}
],
})
self._transport.append_to_context("tool", {
"content": "image generated by prompt arguments: " + tool_call["arguments"],
"role": "tool",
"tool_call_id": tool_call["id"]
})
self._transport.append_to_context("assistant", {
"content": f"call to {tool_call['name']} function succeeded",
"role": "assistant",
})
print("APPENDED TO CONTEXT")
image_prompt = json.loads(
tool_call["arguments"]).get("appearance")
print("IMAGE PROMPT", image_prompt)
asyncio.create_task(
self._change_appearance(image_prompt))
yield TextQueueOutOfBandFrame("Sure, let me work on that for you!")
# yield {"content": "Sure, let me work on that for you!"}
# yield "Sure, let me work on that for you!"
async def run_llm(self, messages) -> str | None:
print("--> IN SYNC ... WHEN DO WE EXPECT THIS TO BE CALLED?")
messages_for_log = json.dumps(messages)
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
response = await self._client.chat.completions.create(model=self._model, stream=False, messages=messages)
if response and len(response.choices) > 0:
return response.choices[0].message.content
else:
return None

View File

@@ -1,33 +0,0 @@
import os
import groq
from groq import AsyncGroq
from dailyai.services.ai_services import LLMService
from collections.abc import AsyncGenerator
class GroqLLMService(LLMService):
def __init__(self, *, api_key, model="mixtral-8x7b-32768", context):
super().__init__(context)
self._model = model
# os.environ["GROQ_SECRET_ACCESS_KEY"] = api_key
self._client = AsyncGroq()
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
print(f"messages are {messages}")
try:
resp = await self._client.chat.completions.create(messages=messages, model=self._model)
print(f"got chunks from groq: {resp}")
if resp.choices[0].message.content:
yield resp.choices[0].message.content
except groq.APIConnectionError as e:
print("The server could not be reached")
print(e.__cause__) # an underlying Exception, likely raised within httpx.
except groq.RateLimitError as e:
print("A 429 status code was received; we should back off a bit.")
except groq.APIStatusError as e:
print("Another non-200-range status code was received")
print(e.status_code)
print(e.response)

View File

@@ -10,8 +10,8 @@ from dailyai.services.ai_services import LLMService, ImageGenService
class OpenAILLMService(LLMService):
def __init__(self, *, api_key, model="gpt-4-turbo-preview", context):
super().__init__(context)
def __init__(self, *, api_key, model="gpt-4"):
super().__init__()
self._model = model
self._client = AsyncOpenAI(api_key=api_key)

View File

@@ -11,12 +11,13 @@ class PlayHTAIService(TTSService):
def __init__(
self,
*,
api_key,
user_id,
voice_url
voice_url,
*args,
**kwargs,
):
super().__init__()
super().__init__(*args, **kwargs)
self.speech_key = api_key
self.user_id = user_id

View File

@@ -0,0 +1,124 @@
import asyncio
import unittest
from dailyai.queue_frame import EndStreamQueueFrame, TextQueueFrame
from dailyai.services.ai_services import PipeService
from dailyai.queue_aggregators import QueueFrameAggregator, QueueMergeGateOnFirst, QueueTee
class IncomingPipeService(PipeService):
def __init__(self):
super().__init__()
self.sink_queue = asyncio.Queue()
class QueueTeeTest(unittest.IsolatedAsyncioTestCase):
async def test_queue_tee(self):
originpipe = IncomingPipeService()
inpipe1 = PipeService(originpipe.sink_queue)
outpipe1 = PipeService()
outpipe2 = PipeService()
teepipe = QueueTee(source_queue=inpipe1.sink_queue, sinks=[outpipe1, outpipe2])
originpipe.sink_queue.put_nowait(TextQueueFrame("test"))
originpipe.sink_queue.put_nowait(EndStreamQueueFrame())
await asyncio.gather(*[pipe.process_queue() for pipe in [originpipe, inpipe1, outpipe1, outpipe2, teepipe]])
def validateOutputPipe(pipe: PipeService):
self.assertEqual(pipe.sink_queue.qsize(), 2)
frame = pipe.sink_queue.get_nowait()
self.assertIsInstance(frame, TextQueueFrame)
if isinstance(frame, TextQueueFrame):
self.assertEqual(frame.text, "test")
self.assertIsInstance(pipe.sink_queue.get_nowait(), EndStreamQueueFrame)
validateOutputPipe(outpipe1)
validateOutputPipe(outpipe2)
class QueueFrameAggregatorTest(unittest.IsolatedAsyncioTestCase):
async def test_queue_frame_aggregator(self):
def aggregate_sentences(accumulation, frame):
if not accumulation:
accumulation = ""
if isinstance(frame, TextQueueFrame):
accumulation += frame.text
if accumulation.endswith((".", "!", "?")):
return ("", TextQueueFrame(accumulation))
return (accumulation, None)
def finalize_sentences(accumulation):
return TextQueueFrame(accumulation)
originpipe = IncomingPipeService()
aggregator_pipe = QueueFrameAggregator(
source_queue=originpipe.sink_queue,
aggregator=aggregate_sentences,
finalizer=finalize_sentences,
)
originpipe.sink_queue.put_nowait(TextQueueFrame("testing, "))
originpipe.sink_queue.put_nowait(TextQueueFrame("one."))
originpipe.sink_queue.put_nowait(TextQueueFrame("two."))
originpipe.sink_queue.put_nowait(TextQueueFrame("three."))
originpipe.sink_queue.put_nowait(TextQueueFrame("can you "))
originpipe.sink_queue.put_nowait(TextQueueFrame("hear me"))
originpipe.sink_queue.put_nowait(EndStreamQueueFrame())
await asyncio.gather(originpipe.process_queue(), aggregator_pipe.process_queue())
self.assertEqual(aggregator_pipe.sink_queue.qsize(), 5)
expected_text = ["testing, one.", "two.", "three.", "can you hear me"]
for exepectation in expected_text:
frame = aggregator_pipe.sink_queue.get_nowait()
print(frame)
self.assertIsInstance(frame, TextQueueFrame)
if isinstance(frame, TextQueueFrame):
self.assertEqual(frame.text, exepectation)
self.assertIsInstance(aggregator_pipe.sink_queue.get_nowait(), EndStreamQueueFrame)
class QueueMergeGateOnFirstTest(unittest.IsolatedAsyncioTestCase):
async def test_queue_merge_gate_on_first(self):
pipe1 = IncomingPipeService()
pipe2 = IncomingPipeService()
merge_pipe = QueueMergeGateOnFirst(
source_queues=[pipe1.sink_queue, pipe2.sink_queue],
)
evt = asyncio.Event()
async def add_items_to_first_pipe():
await evt.wait()
await pipe1.sink_queue.put(TextQueueFrame("pipe1.1"))
await pipe1.sink_queue.put(TextQueueFrame("pipe1.2"))
await pipe1.sink_queue.put(EndStreamQueueFrame())
async def add_items_to_second_pipe():
await pipe2.sink_queue.put(TextQueueFrame("pipe2.1"))
evt.set()
await pipe2.sink_queue.put(EndStreamQueueFrame())
await asyncio.gather(
*[pipe.process_queue() for pipe in [pipe1, pipe2, merge_pipe]],
add_items_to_first_pipe(),
add_items_to_second_pipe())
self.assertEqual(merge_pipe.sink_queue.qsize(), 4)
frame = merge_pipe.sink_queue.get_nowait()
assert isinstance(frame, TextQueueFrame)
if isinstance(frame, TextQueueFrame):
self.assertEqual(frame.text, "pipe1.1")
frame = merge_pipe.sink_queue.get_nowait()
assert isinstance(frame, TextQueueFrame)
if isinstance(frame, TextQueueFrame):
self.assertEqual(frame.text, "pipe2.1")
frame = merge_pipe.sink_queue.get_nowait()
assert isinstance(frame, TextQueueFrame)
if isinstance(frame, TextQueueFrame):
self.assertEqual(frame.text, "pipe1.2")
frame = merge_pipe.sink_queue.get_nowait()
assert isinstance(frame, EndStreamQueueFrame)

View File

@@ -0,0 +1,30 @@
import asyncio
import unittest
from dailyai.queue_frame import EndStreamQueueFrame, TextQueueFrame
from dailyai.services.ai_services import PipeService
class TestPipeService(unittest.IsolatedAsyncioTestCase):
class IncomingPipeService(PipeService):
def __init__(self):
super().__init__()
self.sink_queue = asyncio.Queue()
async def test_pipe_chain(self):
pipe1 = TestPipeService.IncomingPipeService()
pipe2 = PipeService(pipe1.sink_queue)
pipe3 = PipeService(pipe2.sink_queue)
await pipe1.sink_queue.put(TextQueueFrame("test"))
await pipe1.sink_queue.put(EndStreamQueueFrame())
await asyncio.gather(pipe1.process_queue(), pipe2.process_queue(), pipe3.process_queue())
self.assertEqual(pipe3.sink_queue.qsize(), 2)
frame = await pipe3.sink_queue.get()
self.assertIsInstance(frame, TextQueueFrame)
if isinstance(frame, TextQueueFrame):
self.assertEqual(frame.text, "test")
frame = await pipe3.sink_queue.get()
self.assertIsInstance(frame, EndStreamQueueFrame)

View File

@@ -1,6 +1,7 @@
import asyncio
import aiohttp
import os
from dailyai.queue_frame import EndStreamQueueFrame, TextQueueFrame
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
@@ -28,36 +29,40 @@ async def main(room_url):
mic_enabled=True
)
"""
tts = ElevenLabsTTSService(
source_queue=asyncio.Queue(),
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"))
"""
tts = PlayHTAIService(
sink=transport.send_queue,
api_key=os.getenv("PLAY_HT_API_KEY"),
user_id=os.getenv("PLAY_HT_USER_ID"),
voice_url=os.getenv("PLAY_HT_VOICE_URL"),
)
"""
tts.sink_queue = transport.send_queue
# Register an event handler so we can play the audio when the participant joins.
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, participant):
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport, participant):
nonlocal tts
if participant["info"]["isLocal"]:
return
await tts.say(
"Hello there, " + participant["info"]["userName"] + "!",
transport.send_queue,
# todo: update the tts.say() convenience method to use the new queue architecture
await tts.source_queue.put(
TextQueueFrame("Hello there, " +
participant["info"]["userName"] + "!")
)
await tts.source_queue.put(EndStreamQueueFrame())
# wait for the output queue to be empty, then leave the meeting
await transport.stop_when_done()
await transport.run()
del(tts)
# todo: commented out because it seems to exit a little early, before
# the audio is finished playing
# await transport.stop_when_done()
await asyncio.gather(transport.run(), tts.process_queue())
if __name__ == "__main__":
(url, token) = configure()

View File

@@ -3,7 +3,7 @@ import os
import aiohttp
from dailyai.queue_frame import LLMMessagesQueueFrame
from dailyai.queue_frame import EndStreamQueueFrame, LLMMessagesQueueFrame
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
@@ -20,39 +20,38 @@ async def main(room_url):
None,
"Say One Thing From an LLM",
duration_minutes=meeting_duration_minutes,
mic_enabled=True,
speaker_enabled=True
mic_enabled=True
)
llm = AzureLLMService(
source_queue=asyncio.Queue(),
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
model=os.getenv("AZURE_CHATGPT_MODEL"),
)
tts = ElevenLabsTTSService(
source_queue=llm.sink_queue, # this should really be a sentence aggregator?
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"))
# tts = AzureTTSService(api_key=os.getenv("AZURE_SPEECH_API_KEY"), region=os.getenv("AZURE_SPEECH_REGION"))
# tts = DeepgramTTSService(aiohttp_session=session, api_key=os.getenv("DEEPGRAM_API_KEY"), voice=os.getenv("DEEPGRAM_VOICE"))
llm = AzureLLMService(
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
model=os.getenv("AZURE_CHATGPT_MODEL"))
# llm = OpenAILLMService(api_key=os.getenv("OPENAI_CHATGPT_API_KEY"))
messages = [{
"role": "system",
"content": "You are an LLM in a WebRTC session, and this is a 'hello world' demo. Say hello to the world."
}]
tts_task = asyncio.create_task(
tts.run_to_queue(
transport.send_queue,
llm.run([LLMMessagesQueueFrame(messages)]),
)
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
await tts_task
await transport.stop_when_done()
tts.sink_queue = transport.send_queue
await transport.run()
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport, participant):
messages = [{
"role": "system",
"content": "You are an LLM in a WebRTC session, and this is a 'hello world' demo. Say hello to the world."
}]
await llm.source_queue.put(LLMMessagesQueueFrame(messages))
await llm.source_queue.put(EndStreamQueueFrame())
# todo: commented out because it exits before audio plays
# await transport.stop_when_done()
await asyncio.gather(transport.run(), llm.process_queue(), tts.process_queue())
if __name__ == "__main__":

View File

@@ -1,8 +1,11 @@
import asyncio
from typing import Any, AsyncGenerator, Callable, Tuple
import aiohttp
import os
from dailyai.queue_aggregators import QueueFrameAggregator, QueueMergeGateOnFirst, QueueTee
from dailyai.queue_frame import AudioQueueFrame, ImageQueueFrame
from dailyai.queue_frame import AudioQueueFrame, EndStreamQueueFrame, ImageQueueFrame, LLMMessagesQueueFrame, LLMResponseEndQueueFrame, QueueFrame, TextQueueFrame
from dailyai.services.ai_services import PipeService
from dailyai.services.azure_ai_services import AzureLLMService, AzureImageGenServiceREST, AzureTTSService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.daily_transport_service import DailyTransportService
@@ -27,34 +30,70 @@ async def main(room_url):
camera_height=1024
)
"""
/ TTS \
Month prompt -> LLM -> Fork -> -> Gate -> Transport
\ Aggregate -> ImageGen /
"""
month_description_queue: asyncio.Queue[QueueFrame] = asyncio.Queue()
llm = AzureLLMService(
source_queue=month_description_queue,
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
model=os.getenv("AZURE_CHATGPT_MODEL"))
model=os.getenv("AZURE_CHATGPT_MODEL"),
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id="ErXwobaYiN019PkySvjV")
# tts = AzureTTSService(api_key=os.getenv("AZURE_SPEECH_API_KEY"), region=os.getenv("AZURE_SPEECH_REGION"))
dalle = FalImageGenService(
image_size="1024x1024",
aiohttp_session=session,
key_id=os.getenv("FAL_KEY_ID"),
key_secret=os.getenv("FAL_KEY_SECRET"))
# dalle = OpenAIImageGenService(aiohttp_session=session, api_key=os.getenv("OPENAI_DALLE_API_KEY"), image_size="1024x1024")
# dalle = AzureImageGenServiceREST(image_size="1024x1024", aiohttp_session=session, api_key=os.getenv("AZURE_DALLE_API_KEY"), endpoint=os.getenv("AZURE_DALLE_ENDPOINT"), model=os.getenv("AZURE_DALLE_MODEL"))
key_secret=os.getenv("FAL_KEY_SECRET"),
)
# Get a complete audio chunk from the given text. Splitting this into its own
# coroutine lets us ensure proper ordering of the audio chunks on the send queue.
async def get_all_audio(text):
all_audio = bytearray()
async for audio in tts.run_tts(text):
all_audio.extend(audio)
def aggregator(
accumulation, frame: QueueFrame
) -> tuple[Any, QueueFrame | None]:
if not accumulation:
accumulation = ""
return all_audio
if isinstance(frame, TextQueueFrame):
accumulation += frame.text
return (accumulation, None)
elif isinstance(frame, LLMResponseEndQueueFrame):
return ("", TextQueueFrame(accumulation))
else:
return (accumulation, frame)
async def get_month_data(month):
# This queue service takes chunks from LLM output and merges them into one text frame
# that will be used to prompt the image service.
llm_aggregator_for_image = QueueFrameAggregator(aggregator=aggregator, finalizer=lambda x: None)
# Set the source queue for the image service to the sink of the aggregator service
dalle.source_queue = llm_aggregator_for_image.sink_queue
# This queue service takes the output from the LLM and sends it to the TTS service and
# the aggregator for the image generation service.
tee = QueueTee(source_queue=llm.sink_queue, sinks=[tts, llm_aggregator_for_image])
# This queue service takes input from the TTS service and the image service, and waits
# to forward any audio frames until the image generation is complete. It will send
# the image first, then the audio frames; this ensures that the image is shown before
# the audio associated with the image is played.
tts_image_gate = QueueMergeGateOnFirst([dalle.sink_queue, tts.sink_queue])
# We send the image of this queue service to the transport output.
tts_image_gate.sink_queue = transport.send_queue
# Queue up all the months in the LLM service source queue
months = ["January"] #, "February"]
for month in months:
messages = [
{
"role": "system",
@@ -62,72 +101,21 @@ async def main(room_url):
}
]
image_description = await llm.run_llm(messages)
if not image_description:
return
await month_description_queue.put(LLMMessagesQueueFrame(messages))
to_speak = f"{month}: {image_description}"
audio_task = asyncio.create_task(get_all_audio(to_speak))
image_task = asyncio.create_task(dalle.run_image_gen(image_description))
print(f"about to gather tasks for {month}")
(audio, image_data) = await asyncio.gather(
audio_task, image_task
)
print(f"about to return from get_month_data for {month}")
return {
"month": month,
"text": image_description,
"image_url": image_data[0],
"image": image_data[1],
"audio": audio,
}
await month_description_queue.put(EndStreamQueueFrame())
months: list[str] = [
"January",
"February",
"March",
"April",
"May",
"June"
pipeline = [
llm,
[tee,
[tts,
[llm_aggregator_for_image, dalle]
],
tts_image_gate],
transport,
]
"""
"February",
"March",
"April",
"May",
"June",
"July",
"August",
"September",
"October",
"November",
"December",
"""
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
# This will play the months in the order they're completed. The benefit
# is we'll have as little delay as possible before the first month, and
# likely no delay between months, but the months won't display in order.
for month_data_task in asyncio.as_completed(month_tasks):
print(f"month_data_task: {month_data_task}")
try:
data = await month_data_task
except Exception:
print("OMG EXCEPTION!!!!")
if data:
await transport.send_queue.put(
[
ImageQueueFrame(data["image_url"], data["image"]),
AudioQueueFrame(data["audio"]),
]
)
# wait for the output queue to be empty, then leave the meeting
await transport.stop_when_done()
month_tasks = [asyncio.create_task(get_month_data(month)) for month in months]
await transport.run()
await asyncio.gather(transport.run(), *[service.process_queue() for service in [llm, tts, dalle, tee, tts_image_gate, llm_aggregator_for_image]])
if __name__ == "__main__":
(url, token) = configure()

View File

@@ -5,16 +5,9 @@ from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.queue_aggregators import LLMAssistantContextAggregator, LLMContextAggregator, LLMUserContextAggregator
from examples.foundational.support.runner import configure
from dailyai.services.ai_services import FrameLogger
async def main(room_url: str, token):
context = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way.",
},
]
transport = DailyTransportService(
room_url,
token,
@@ -23,9 +16,7 @@ async def main(room_url: str, token):
start_transcription=True,
mic_enabled=True,
mic_sample_rate=16000,
camera_enabled=False,
speaker_enabled=True,
context=context
camera_enabled=False
)
llm = AzureLLMService(
@@ -35,33 +26,33 @@ async def main(room_url: str, token):
tts = AzureTTSService(
api_key=os.getenv("AZURE_SPEECH_API_KEY"),
region=os.getenv("AZURE_SPEECH_REGION"))
fl = FrameLogger("transport")
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
await tts.say("Hi, I'm listening!", transport.send_queue)
async def handle_transcriptions():
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserContextAggregator(
context, transport._my_participant_id)
tma_out = LLMAssistantContextAggregator(
context, transport._my_participant_id)
tma_in = LLMUserContextAggregator(messages, transport._my_participant_id)
tma_out = LLMAssistantContextAggregator(messages, transport._my_participant_id)
await tts.run_to_queue(
transport.send_queue,
tma_out.run(
llm.run(
tma_in.run(
fl.run(
transport.get_receive_frames()
)
transport.get_receive_frames()
)
)
)
)
transport.transcription_settings["extra"]["punctuate"] = True
transport.transcription_settings["extra"]["endpointing"] = True
await asyncio.gather(transport.run(), handle_transcriptions())

View File

@@ -1,83 +0,0 @@
import asyncio
import aiohttp
import os
from dailyai.conversation_wrappers import InterruptibleConversationWrapper
from dailyai.queue_frame import StartStreamQueueFrame, TextQueueFrame
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.open_ai_services import OpenAILLMService
from dailyai.services.deepgram_ai_services import DeepgramTTSService
from dailyai.services.ai_services import FrameLogger
from dailyai.services.groq_ai_services import GroqLLMService
from examples.foundational.support.runner import configure
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
context = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way.",
},
]
transport = DailyTransportService(
room_url,
token,
"Respond bot",
duration_minutes=5,
start_transcription=True,
mic_enabled=True,
mic_sample_rate=16000,
camera_enabled=False,
# TODO-CB: Should this be VAD enabled or something?
speaker_enabled=True,
context=context
)
# llm = AzureLLMService(
# api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
# endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
# model=os.getenv("AZURE_CHATGPT_MODEL"),
# context=context)
llm = OpenAILLMService(
context=context, api_key=os.getenv("OPENAI_CHATGPT_API_KEY"))
# llm = GroqLLMService(api_key=os.getenv("GROQ_API_KEY"), context=context)
# tts = AzureTTSService(
# api_key=os.getenv("AZURE_SPEECH_API_KEY"),
# region=os.getenv("AZURE_SPEECH_REGION"))
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"))
# tts = DeepgramTTSService(aiohttp_session=session, api_key=os.getenv("DEEPGRAM_API_KEY"), voice=os.getenv("DEEPGRAM_VOICE"))
fl = FrameLogger("just outside the innermost layer")
async def run_response(in_frame):
await tts.run_to_queue(
transport.send_queue,
# tma_out.run(
llm.run(
# tma_in.run(
fl.run(
[StartStreamQueueFrame(), in_frame]
)
# )
)
# ),
)
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
await tts.say("Hi, I'm listening!", transport.send_queue)
transport.transcription_settings["extra"]["endpointing"] = True
transport.transcription_settings["extra"]["punctuate"] = True
await asyncio.gather(transport.run(), transport.run_conversation(run_response))
if __name__ == "__main__":
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -1,160 +0,0 @@
from datetime import datetime
import asyncio
import aiohttp
import os
import sys
from dailyai.conversation_wrappers import InterruptibleConversationWrapper
from dailyai.queue_frame import StartStreamQueueFrame, TranscriptionQueueFrame, TextQueueFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.fireworks_ai_services import FireworksLLMService
from dailyai.services.deepgram_ai_services import DeepgramTTSService
from dailyai.services.ai_services import FrameLogger
from dailyai.services.fal_ai_services import FalImageGenService
from examples.foundational.support.runner import configure
command_line_prompt = ' '.join(sys.argv[1:])
system_prompt = """
You are a friendly robot character with a cartoon body with head, torso, arms, feet,
and legs.
You can change your appearance using the `change_appearance` function call.
You can add or remove items from your body, change
your color, and more. You can use function calling to change your appearance.
When changing your appearance, please create a prompt as an argument to the function.
The prompt will help the image generation model
create a new appearance for you. Include as much detail as possible. Include the
keywords "robot", "friendly", "cartoon", "smiling", "happy", "animated".
The initial image prompt you are adding to or changing is
"A friendly cartoon robot, smiling and happy, animated."
Do not include the image model prompt in your response. The prompt must be passed to the function
as a parameter.
"""
change_appearance_function = {
"name": "change_appearance",
"description": "Call this function when the users want you to change your appearance.",
"parameters": {
"type": "object",
"properties": {
"appearance": {
"type": "string",
"description": "The new appearance for the robot, in the form of a prompt for an generative AI diffusion model."
}
}
}
}
tools = [
{
"type": "function",
"function": change_appearance_function
}
]
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
context = [
{
"role": "system",
"content": system_prompt,
},
]
transport = DailyTransportService(
room_url,
token,
"Respond bot",
duration_minutes=30,
start_transcription=True,
mic_enabled=True,
mic_sample_rate=16000,
camera_enabled=True,
camera_width=1024,
camera_height=1024,
# TODO-CB: Should this be VAD enabled or something?
speaker_enabled=True,
context=context
)
imagegen = FalImageGenService(
image_size="512x512",
aiohttp_session=session,
key_id=os.getenv("FAL_KEY_ID"),
key_secret=os.getenv("FAL_KEY_SECRET"))
async def change_appearance(appearance):
await asyncio.create_task(
imagegen.run_to_queue(
transport.send_queue, [
TextQueueFrame(appearance)]))
llm = FireworksLLMService(
context=context,
api_key=os.getenv("FIREWORKS_API_KEY"),
model="accounts/fireworks/models/firefunction-v1",
# TODO - how can we modify tools list on the fly?
tools=tools,
change_appearance=change_appearance,
transport=transport
)
tts = DeepgramTTSService(aiohttp_session=session, api_key=os.getenv(
"DEEPGRAM_API_KEY"), voice=os.getenv("DEEPGRAM_VOICE"))
fl = FrameLogger("just outside the innermost layer")
async def run_response(in_frame):
await tts.run_to_queue(
transport.send_queue,
# tma_out.run(
llm.run(
# tma_in.run(
fl.run(
[StartStreamQueueFrame(), in_frame]
)
# )
)
# ),
)
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
await change_appearance("A friendly cartoon robot, smiling and happy, animated.")
return
await tts.say("Hi, I'm listening!", transport.send_queue)
await asyncio.sleep(1)
await transport.receive_queue.put(UserStartedSpeakingFrame())
await asyncio.sleep(0.1)
transport.on_transcription_message({
"text": command_line_prompt,
"participantId": "cb65b845-aac0-4fc8-987d-2e7ce3c7d8f0",
"timestamp": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
})
# putting the frame into the queue directly doesn't seem to work
# await transport.receive_queue.put(
# TranscriptionQueueFrame(
# "tell me a joke.",
# "cb65b845-aac0-4fc8-987d-2e7ce3c7d8f0",
# datetime.utcnow().strftime(
# '%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
# ))
await asyncio.sleep(0.1)
await transport.receive_queue.put(UserStoppedSpeakingFrame())
transport.transcription_settings["extra"]["endpointing"] = True
transport.transcription_settings["extra"]["punctuate"] = True
await asyncio.gather(transport.run(), transport.run_conversation(run_response))
if __name__ == "__main__":
(url, token) = configure()
asyncio.run(main(url, token))