Compare commits
10 Commits
khk/queues
...
khk-functi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5d6d674ff6 | ||
|
|
1e552958aa | ||
|
|
17edfe98bd | ||
|
|
5100a7599b | ||
|
|
18c2b37358 | ||
|
|
0244f358d2 | ||
|
|
85fe6c0580 | ||
|
|
ae7482ed18 | ||
|
|
90d928be99 | ||
|
|
0703b926a3 |
@@ -12,11 +12,16 @@ dependencies = [
|
||||
"daily-python",
|
||||
"fal",
|
||||
"faster_whisper",
|
||||
"groq",
|
||||
"google-cloud-texttospeech",
|
||||
"numpy",
|
||||
"openai",
|
||||
"Pillow",
|
||||
"pyht",
|
||||
"python-dotenv",
|
||||
"torch",
|
||||
"torchaudio",
|
||||
"pyaudio",
|
||||
"typing-extensions"
|
||||
]
|
||||
|
||||
|
||||
@@ -1,106 +1,29 @@
|
||||
import asyncio
|
||||
|
||||
from attr import dataclass
|
||||
from dailyai.queue_frame import LLMMessagesQueueFrame, QueueFrame, TextQueueFrame, TranscriptionQueueFrame
|
||||
from dailyai.services.ai_services import AIService
|
||||
|
||||
from dailyai.queue_frame import (
|
||||
ControlQueueFrame,
|
||||
EndStreamQueueFrame,
|
||||
LLMMessagesQueueFrame,
|
||||
QueueFrame,
|
||||
TextQueueFrame,
|
||||
TranscriptionQueueFrame,
|
||||
)
|
||||
from dailyai.services.ai_services import AIService, PipeService
|
||||
|
||||
from typing import Any, AsyncGenerator, Callable, List, Tuple
|
||||
from typing import AsyncGenerator, List
|
||||
|
||||
|
||||
class QueueTee(PipeService):
|
||||
def __init__(
|
||||
self, sinks: list[PipeService], *args, **kwargs
|
||||
):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
self.sinks: List[PipeService] = []
|
||||
for sink in sinks:
|
||||
sink.source_queue = asyncio.Queue()
|
||||
self.sinks.append(sink)
|
||||
|
||||
async def process_queue(self):
|
||||
if not self.source_queue:
|
||||
return
|
||||
|
||||
while True:
|
||||
frame: QueueFrame = await self.source_queue.get()
|
||||
for sink in self.sinks:
|
||||
if sink.source_queue:
|
||||
await sink.source_queue.put(frame)
|
||||
|
||||
if isinstance(frame, EndStreamQueueFrame):
|
||||
break
|
||||
|
||||
|
||||
class QueueFrameAggregator(PipeService):
|
||||
|
||||
def __init__(
|
||||
class QueueTee:
|
||||
async def run_to_queue_and_generate(
|
||||
self,
|
||||
aggregator: Callable[[Any, QueueFrame], Tuple[Any, QueueFrame | None]],
|
||||
finalizer: Callable[[Any], QueueFrame | None],
|
||||
*args,
|
||||
**kwargs
|
||||
):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.aggregator = aggregator
|
||||
self.finalizer = finalizer
|
||||
self.aggregation = None
|
||||
|
||||
async def process_frame(
|
||||
self, frame: QueueFrame
|
||||
output_queue: asyncio.Queue,
|
||||
generator: AsyncGenerator[QueueFrame, None]
|
||||
) -> AsyncGenerator[QueueFrame, None]:
|
||||
output_frame: QueueFrame | None = None
|
||||
(self.aggregation, output_frame) = self.aggregator(
|
||||
self.aggregation, frame
|
||||
)
|
||||
if output_frame:
|
||||
yield output_frame
|
||||
async for frame in generator:
|
||||
await output_queue.put(frame)
|
||||
yield frame
|
||||
|
||||
async def finalize(self) -> AsyncGenerator[QueueFrame, None]:
|
||||
output_frame = self.finalizer(self.aggregation)
|
||||
if output_frame:
|
||||
yield output_frame
|
||||
|
||||
class QueueMergeGateOnFirst(PipeService):
|
||||
|
||||
def __init__(
|
||||
self, source_queues: List[asyncio.Queue[QueueFrame]]
|
||||
async def run_to_queues(
|
||||
self,
|
||||
output_queues: List[asyncio.Queue],
|
||||
generator: AsyncGenerator[QueueFrame, None]
|
||||
):
|
||||
super().__init__()
|
||||
self.source_queues = source_queues
|
||||
|
||||
async def process_queue(self):
|
||||
(frames): list[QueueFrame] = await asyncio.gather(
|
||||
*[source_queue.get() for source_queue in self.source_queues]
|
||||
)
|
||||
for idx, frame in enumerate(frames):
|
||||
# if the frame we got from a source is an EndStreamQueueFrame, remove that source
|
||||
if isinstance(frame, EndStreamQueueFrame):
|
||||
self.source_queues.pop(idx)
|
||||
else:
|
||||
await self.sink_queue.put(frame)
|
||||
|
||||
async def pass_through(sink, source):
|
||||
while True:
|
||||
frame = await source.get()
|
||||
if isinstance(frame, EndStreamQueueFrame):
|
||||
break
|
||||
else:
|
||||
await sink.put(frame)
|
||||
|
||||
await asyncio.gather(
|
||||
*[pass_through(self.sink_queue, source) for source in self.source_queues]
|
||||
)
|
||||
|
||||
await self.sink_queue.put(EndStreamQueueFrame())
|
||||
async for frame in generator:
|
||||
for queue in output_queues:
|
||||
await queue.put(frame)
|
||||
|
||||
|
||||
class LLMContextAggregator(AIService):
|
||||
|
||||
@@ -23,6 +23,14 @@ class LLMResponseEndQueueFrame(QueueFrame):
|
||||
pass
|
||||
|
||||
|
||||
class UserStartedSpeakingFrame(QueueFrame):
|
||||
pass
|
||||
|
||||
|
||||
class UserStoppedSpeakingFrame(QueueFrame):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass()
|
||||
class AudioQueueFrame(QueueFrame):
|
||||
data: bytes
|
||||
@@ -44,6 +52,17 @@ class TextQueueFrame(QueueFrame):
|
||||
text: str
|
||||
|
||||
|
||||
@dataclass()
|
||||
class TextQueueOutOfBandFrame(TextQueueFrame):
|
||||
outOfBand: bool = True
|
||||
|
||||
|
||||
@dataclass()
|
||||
class TTSCompletedFrame(QueueFrame):
|
||||
text: str
|
||||
outOfBand: bool = False
|
||||
|
||||
|
||||
@dataclass()
|
||||
class TranscriptionQueueFrame(TextQueueFrame):
|
||||
participantId: str
|
||||
|
||||
@@ -2,73 +2,32 @@ import asyncio
|
||||
import io
|
||||
import logging
|
||||
import time
|
||||
import datetime
|
||||
import wave
|
||||
|
||||
from dailyai.queue_frame import (
|
||||
QueueFrame,
|
||||
AudioQueueFrame,
|
||||
ControlQueueFrame,
|
||||
EndStreamQueueFrame,
|
||||
ImageQueueFrame,
|
||||
LLMMessagesQueueFrame,
|
||||
LLMResponseEndQueueFrame,
|
||||
QueueFrame,
|
||||
TextQueueFrame,
|
||||
TTSCompletedFrame,
|
||||
TranscriptionQueueFrame,
|
||||
UserStoppedSpeakingFrame
|
||||
)
|
||||
|
||||
from abc import abstractmethod
|
||||
from typing import AsyncGenerator, AsyncIterable, BinaryIO, Iterable
|
||||
|
||||
class AbstractPipeService:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
):
|
||||
self.sink_queue: asyncio.Queue[QueueFrame] = asyncio.Queue()
|
||||
|
||||
@abstractmethod
|
||||
async def process_queue(self):
|
||||
pass
|
||||
|
||||
class PipeService(AbstractPipeService):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
source_queue: asyncio.Queue[QueueFrame] | None = None,
|
||||
):
|
||||
super().__init__()
|
||||
self.logger: logging.Logger = logging.getLogger("dailyai")
|
||||
self.source_queue = source_queue
|
||||
|
||||
async def process_queue(self):
|
||||
if not self.source_queue:
|
||||
return
|
||||
|
||||
while True:
|
||||
frame: QueueFrame = await self.source_queue.get()
|
||||
async for output_frame in self.process_frame(frame):
|
||||
if isinstance(frame, EndStreamQueueFrame):
|
||||
async for final_frame in self.finalize():
|
||||
await self.sink_queue.put(final_frame)
|
||||
await self.sink_queue.put(output_frame)
|
||||
return
|
||||
|
||||
await self.sink_queue.put(output_frame)
|
||||
|
||||
@abstractmethod
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
yield frame
|
||||
|
||||
@abstractmethod
|
||||
async def finalize(self) -> AsyncGenerator[QueueFrame, None]:
|
||||
# This is a trick for the interpreter (and linter) to know that this is a generator.
|
||||
if False:
|
||||
yield QueueFrame()
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
class AIService(PipeService):
|
||||
class AIService:
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
def __init__(self):
|
||||
self.logger = logging.getLogger("dailyai")
|
||||
|
||||
def stop(self):
|
||||
@@ -112,8 +71,24 @@ class AIService(PipeService):
|
||||
self.logger.error("Exception occurred while running AI service", e)
|
||||
raise e
|
||||
|
||||
@abstractmethod
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
if isinstance(frame, ControlQueueFrame):
|
||||
yield frame
|
||||
|
||||
@abstractmethod
|
||||
async def finalize(self) -> AsyncGenerator[QueueFrame, None]:
|
||||
# This is a trick for the interpreter (and linter) to know that this is a generator.
|
||||
if False:
|
||||
yield QueueFrame()
|
||||
|
||||
|
||||
class LLMService(AIService):
|
||||
|
||||
def __init__(self, context):
|
||||
super().__init__()
|
||||
self._context = context
|
||||
|
||||
@abstractmethod
|
||||
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
|
||||
yield ""
|
||||
@@ -123,17 +98,28 @@ class LLMService(AIService):
|
||||
pass
|
||||
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
if isinstance(frame, LLMMessagesQueueFrame):
|
||||
async for text_chunk in self.run_llm_async(frame.messages):
|
||||
yield TextQueueFrame(text_chunk)
|
||||
print(f"##### process frame got a frame, {type(frame)}")
|
||||
if isinstance(frame, UserStoppedSpeakingFrame):
|
||||
print(
|
||||
f"### Got a user stopped speaking frame, context is {self._context}")
|
||||
async for chunk in self.run_llm_async(self._context):
|
||||
# if we get a string, wrap it in a frame
|
||||
if isinstance(chunk, str):
|
||||
yield TextQueueFrame(chunk)
|
||||
# if we get a frame, pass it through
|
||||
elif isinstance(chunk, QueueFrame):
|
||||
print(f"### Got a frame chunk: {chunk}")
|
||||
yield chunk
|
||||
else:
|
||||
print(f"### Got an unknown chunk: {chunk}")
|
||||
yield LLMResponseEndQueueFrame()
|
||||
else:
|
||||
yield frame
|
||||
|
||||
|
||||
class TTSService(AIService):
|
||||
def __init__(self, aggregate_sentences=True, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
def __init__(self, aggregate_sentences=True):
|
||||
super().__init__()
|
||||
self.aggregate_sentences: bool = aggregate_sentences
|
||||
self.current_sentence: str = ""
|
||||
|
||||
@@ -150,6 +136,12 @@ class TTSService(AIService):
|
||||
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
if not isinstance(frame, TextQueueFrame):
|
||||
# We don't want transcription frames, which are a subclass
|
||||
yield frame
|
||||
return
|
||||
|
||||
# TODO-CB: Clean this up
|
||||
if isinstance(frame, TranscriptionQueueFrame):
|
||||
yield frame
|
||||
return
|
||||
|
||||
@@ -164,7 +156,11 @@ class TTSService(AIService):
|
||||
|
||||
if text:
|
||||
async for audio_chunk in self.run_tts(text):
|
||||
yield AudioQueueFrame(audio_chunk)
|
||||
size = 8000
|
||||
for i in range(0, len(audio_chunk), size):
|
||||
yield AudioQueueFrame(audio_chunk[i: i+size])
|
||||
print("### ABOUT TO YIELD TTS COMPLETED FRAME", frame)
|
||||
yield TTSCompletedFrame(text, hasattr(frame, 'outOfBand') and frame.outOfBand)
|
||||
|
||||
async def finalize(self):
|
||||
if self.current_sentence:
|
||||
@@ -172,10 +168,7 @@ class TTSService(AIService):
|
||||
yield AudioQueueFrame(audio_chunk)
|
||||
|
||||
# Convenience function to send the audio for a sentence to the given queue
|
||||
async def say(self, sentence, queue: asyncio.Queue|None=None):
|
||||
queue = queue or self.sink
|
||||
if not queue:
|
||||
raise Exception("No queue to send audio to")
|
||||
async def say(self, sentence, queue: asyncio.Queue):
|
||||
await self.run_to_queue(queue, [TextQueueFrame(sentence)])
|
||||
|
||||
|
||||
@@ -237,8 +230,9 @@ class FrameLogger(AIService):
|
||||
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
if isinstance(frame, (AudioQueueFrame, ImageQueueFrame)):
|
||||
self.logger.info(f"{self.prefix}: {type(frame)}")
|
||||
self.logger.info(
|
||||
f"{datetime.datetime.utcnow().isoformat()} {self.prefix}: {type(frame)}")
|
||||
else:
|
||||
print(f"{self.prefix}: {frame}")
|
||||
print(f"{datetime.datetime.utcnow().isoformat()} {self.prefix}: {frame}")
|
||||
|
||||
yield frame
|
||||
|
||||
@@ -42,23 +42,16 @@ class AzureTTSService(TTSService):
|
||||
yield result.audio_data[44:]
|
||||
elif result.reason == ResultReason.Canceled:
|
||||
cancellation_details = result.cancellation_details
|
||||
self.logger.info("Speech synthesis canceled: {}".format(cancellation_details.reason))
|
||||
self.logger.info("Speech synthesis canceled: {}".format(
|
||||
cancellation_details.reason))
|
||||
if cancellation_details.reason == CancellationReason.Error:
|
||||
self.logger.info("Error details: {}".format(cancellation_details.error_details))
|
||||
self.logger.info("Error details: {}".format(
|
||||
cancellation_details.error_details))
|
||||
|
||||
|
||||
class AzureLLMService(LLMService):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
api_key,
|
||||
endpoint,
|
||||
model,
|
||||
api_version="2023-12-01-preview",
|
||||
*args,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(*args, **kwargs)
|
||||
def __init__(self, *, api_key, endpoint, api_version="2023-12-01-preview", model, context):
|
||||
super().__init__(context)
|
||||
self._model: str = model
|
||||
|
||||
self._client = AsyncAzureOpenAI(
|
||||
@@ -111,7 +104,8 @@ class AzureImageGenServiceREST(ImageGenService):
|
||||
|
||||
async def run_image_gen(self, sentence) -> tuple[str, bytes]:
|
||||
url = f"{self._azure_endpoint}openai/images/generations:submit?api-version={self._api_version}"
|
||||
headers = {"api-key": self._api_key, "Content-Type": "application/json"}
|
||||
headers = {"api-key": self._api_key,
|
||||
"Content-Type": "application/json"}
|
||||
body = {
|
||||
# Enter your prompt text here
|
||||
"prompt": sentence,
|
||||
|
||||
@@ -1,11 +1,23 @@
|
||||
from abc import abstractmethod
|
||||
import asyncio
|
||||
import copy
|
||||
import functools
|
||||
import itertools
|
||||
import logging
|
||||
import queue
|
||||
import threading
|
||||
import time
|
||||
from typing import AsyncGenerator
|
||||
import numpy as np
|
||||
import pyaudio
|
||||
import torch
|
||||
import torchaudio
|
||||
from enum import Enum
|
||||
import datetime
|
||||
import traceback
|
||||
|
||||
from typing import AsyncGenerator, AsyncIterable, BinaryIO, Iterable
|
||||
from dailyai.queue_aggregators import LLMAssistantContextAggregator, LLMUserContextAggregator
|
||||
|
||||
from dailyai.queue_frame import (
|
||||
AudioQueueFrame,
|
||||
@@ -14,8 +26,59 @@ from dailyai.queue_frame import (
|
||||
QueueFrame,
|
||||
SpriteQueueFrame,
|
||||
StartStreamQueueFrame,
|
||||
TranscriptionQueueFrame,
|
||||
TTSCompletedFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame
|
||||
)
|
||||
|
||||
torch.set_num_threads(1)
|
||||
|
||||
model, utils = torch.hub.load(repo_or_dir='snakers4/silero-vad',
|
||||
model='silero_vad',
|
||||
force_reload=False)
|
||||
|
||||
(get_speech_timestamps,
|
||||
save_audio,
|
||||
read_audio,
|
||||
VADIterator,
|
||||
collect_chunks) = utils
|
||||
|
||||
# Taken from utils_vad.py
|
||||
|
||||
|
||||
def validate(model,
|
||||
inputs: torch.Tensor):
|
||||
with torch.no_grad():
|
||||
outs = model(inputs)
|
||||
return outs
|
||||
|
||||
# Provided by Alexander Veysov
|
||||
|
||||
|
||||
def int2float(sound):
|
||||
abs_max = np.abs(sound).max()
|
||||
sound = sound.astype('float32')
|
||||
if abs_max > 0:
|
||||
sound *= 1/32768
|
||||
sound = sound.squeeze() # depends on the use case
|
||||
return sound
|
||||
|
||||
|
||||
FORMAT = pyaudio.paInt16
|
||||
CHANNELS = 1
|
||||
SAMPLE_RATE = 16000
|
||||
CHUNK = int(SAMPLE_RATE / 10)
|
||||
|
||||
audio = pyaudio.PyAudio()
|
||||
|
||||
|
||||
class VADState(Enum):
|
||||
QUIET = 1
|
||||
STARTING = 2
|
||||
SPEAKING = 3
|
||||
STOPPING = 4
|
||||
|
||||
|
||||
class BaseTransportService():
|
||||
|
||||
@@ -31,6 +94,17 @@ class BaseTransportService():
|
||||
self._speaker_enabled = kwargs.get("speaker_enabled") or False
|
||||
self._speaker_sample_rate = kwargs.get("speaker_sample_rate") or 16000
|
||||
self._fps = kwargs.get("fps") or 8
|
||||
self._vad_start_s = kwargs.get("vad_start_s") or 0.2
|
||||
self._vad_stop_s = kwargs.get("vad_stop_s") or 0.5
|
||||
self._context = kwargs.get("context") or []
|
||||
|
||||
self._vad_samples = 1536
|
||||
vad_frame_s = self._vad_samples / SAMPLE_RATE
|
||||
self._vad_start_frames = round(self._vad_start_s / vad_frame_s)
|
||||
self._vad_stop_frames = round(self._vad_stop_s / vad_frame_s)
|
||||
self._vad_starting_count = 0
|
||||
self._vad_stopping_count = 0
|
||||
self._vad_state = VADState.QUIET
|
||||
|
||||
duration_minutes = kwargs.get("duration_minutes") or 10
|
||||
self._expiration = time.time() + duration_minutes * 60
|
||||
@@ -41,6 +115,8 @@ class BaseTransportService():
|
||||
self._threadsafe_send_queue = queue.Queue()
|
||||
|
||||
self._images = None
|
||||
self._user_is_speaking = False
|
||||
self._current_phrase = ""
|
||||
|
||||
try:
|
||||
self._loop: asyncio.AbstractEventLoop | None = asyncio.get_running_loop()
|
||||
@@ -52,20 +128,94 @@ class BaseTransportService():
|
||||
|
||||
self._logger: logging.Logger = logging.getLogger()
|
||||
|
||||
def update_messages(self, new_context: list[dict[str, str]], task: asyncio.Task | None):
|
||||
if task:
|
||||
if not task.cancelled():
|
||||
self._current_phrase = ""
|
||||
self._context = new_context
|
||||
|
||||
def append_to_context(self, role, chunk_or_text):
|
||||
print("IN APPEND", chunk_or_text)
|
||||
# if we get a non-string, append it to the context without further error checking
|
||||
# unless the outOfBand property is True
|
||||
if not isinstance(chunk_or_text, str):
|
||||
|
||||
if not chunk_or_text.get("outOfBand") == True:
|
||||
self._context.append(chunk_or_text)
|
||||
return
|
||||
|
||||
text = chunk_or_text
|
||||
last_context_item = self._context[-1]
|
||||
|
||||
print("TEXT", text)
|
||||
print("LAST CONTEXT ITEM", last_context_item)
|
||||
traceback.print_stack()
|
||||
|
||||
if last_context_item and last_context_item['role'] == role:
|
||||
last_context_item['content'] += f" {text}"
|
||||
else:
|
||||
self._context.append({"role": role, "content": text})
|
||||
|
||||
async def run_pipeline(self, frame):
|
||||
print(f"starting to speak_after_delay, {frame}")
|
||||
# TODO-CB: This exception for missing class gets eaten!
|
||||
await self._runner(frame)
|
||||
|
||||
async def run_conversation(self, runner: Iterable[QueueFrame]
|
||||
| AsyncIterable[QueueFrame]
|
||||
| asyncio.Queue[QueueFrame],
|
||||
) -> AsyncGenerator[QueueFrame, None]:
|
||||
current_response_task = None
|
||||
self._runner = runner
|
||||
|
||||
async for frame in self.get_receive_frames():
|
||||
print(f"got frame of type: {type(frame)}, {frame}")
|
||||
if isinstance(frame, EndStreamQueueFrame):
|
||||
break
|
||||
# elif not isinstance(frame, TranscriptionQueueFrame):
|
||||
# continue
|
||||
# TODO-CB: Verify this is an accurate replacement
|
||||
# if hasattr(frame, 'participantId') and frame.participantId == self._my_participant_id:
|
||||
if not isinstance(frame, UserStoppedSpeakingFrame):
|
||||
continue
|
||||
|
||||
if current_response_task:
|
||||
# TODO-CB: Maybe not always interrupt? Are there frame types we can pass through?
|
||||
current_response_task.cancel()
|
||||
self.interrupt()
|
||||
|
||||
# self._current_phrase += " " + frame.text
|
||||
# current_llm_context = copy.deepcopy(self._context)
|
||||
current_response_task = asyncio.create_task(
|
||||
self.run_pipeline(
|
||||
frame)
|
||||
)
|
||||
current_response_task.add_done_callback(
|
||||
functools.partial(self.update_messages, self._context)
|
||||
)
|
||||
|
||||
async def run(self):
|
||||
self._prerun()
|
||||
|
||||
async_output_queue_marshal_task = asyncio.create_task(self._marshal_frames())
|
||||
async_output_queue_marshal_task = asyncio.create_task(
|
||||
self._marshal_frames())
|
||||
|
||||
self._camera_thread = threading.Thread(target=self._run_camera, daemon=True)
|
||||
self._camera_thread = threading.Thread(
|
||||
target=self._run_camera, daemon=True)
|
||||
self._camera_thread.start()
|
||||
|
||||
self._frame_consumer_thread = threading.Thread(target=self._frame_consumer, daemon=True)
|
||||
self._frame_consumer_thread = threading.Thread(
|
||||
target=self._frame_consumer, daemon=True)
|
||||
self._frame_consumer_thread.start()
|
||||
|
||||
if self._speaker_enabled:
|
||||
self._receive_audio_thread = threading.Thread(target=self._receive_audio, daemon=True)
|
||||
self._receive_audio_thread.start()
|
||||
# TODO-CB: This is interesting
|
||||
# self._receive_audio_thread = threading.Thread(
|
||||
# target=self._receive_audio, daemon=True)
|
||||
# self._receive_audio_thread.start()
|
||||
|
||||
self._vad_thread = threading.Thread(target=self._vad, daemon=True)
|
||||
self._vad_thread.start()
|
||||
|
||||
try:
|
||||
while (
|
||||
@@ -82,6 +232,7 @@ class BaseTransportService():
|
||||
|
||||
self._stop_threads.set()
|
||||
|
||||
await self.send_queue.put(EndStreamQueueFrame())
|
||||
await async_output_queue_marshal_task
|
||||
await self.send_queue.join()
|
||||
self._frame_consumer_thread.join()
|
||||
@@ -121,6 +272,61 @@ class BaseTransportService():
|
||||
def _prerun(self):
|
||||
pass
|
||||
|
||||
def _vad(self):
|
||||
# CB: Starting silero VAD stuff
|
||||
# TODO-CB: Probably need to force virtual speaker creation if we're
|
||||
# going to build this in?
|
||||
# TODO-CB: pyaudio installation
|
||||
while not self._stop_threads.is_set():
|
||||
audio_chunk = self.read_audio_frames(self._vad_samples)
|
||||
audio_int16 = np.frombuffer(audio_chunk, np.int16)
|
||||
audio_float32 = int2float(audio_int16)
|
||||
new_confidence = model(
|
||||
torch.from_numpy(audio_float32), 16000).item()
|
||||
speaking = new_confidence > 0.5
|
||||
|
||||
if speaking:
|
||||
match self._vad_state:
|
||||
case VADState.QUIET:
|
||||
self._vad_state = VADState.STARTING
|
||||
self._vad_starting_count = 1
|
||||
case VADState.STARTING:
|
||||
self._vad_starting_count += 1
|
||||
case VADState.STOPPING:
|
||||
self._vad_state = VADState.SPEAKING
|
||||
self._vad_stopping_count = 0
|
||||
else:
|
||||
match self._vad_state:
|
||||
case VADState.STARTING:
|
||||
self._vad_state = VADState.QUIET
|
||||
self._vad_starting_count = 0
|
||||
case VADState.SPEAKING:
|
||||
self._vad_state = VADState.STOPPING
|
||||
self._vad_stopping_count = 1
|
||||
case VADState.STOPPING:
|
||||
self._vad_stopping_count += 1
|
||||
|
||||
if self._vad_state == VADState.STARTING and self._vad_starting_count >= self._vad_start_frames:
|
||||
print(
|
||||
f'!!! {datetime.datetime.utcnow().isoformat()} queueing start frame')
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.receive_queue.put(
|
||||
UserStartedSpeakingFrame()), self._loop
|
||||
)
|
||||
print(f"!!! VAD started, calling interrupt")
|
||||
self.interrupt()
|
||||
self._vad_state = VADState.SPEAKING
|
||||
self._vad_starting_count = 0
|
||||
if self._vad_state == VADState.STOPPING and self._vad_stopping_count >= self._vad_stop_frames:
|
||||
print(
|
||||
f'!!! {datetime.datetime.utcnow().isoformat()} queueing stop frame')
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.receive_queue.put(
|
||||
UserStoppedSpeakingFrame()), self._loop
|
||||
)
|
||||
self._vad_state = VADState.QUIET
|
||||
self._vad_stopping_count = 0
|
||||
|
||||
async def _marshal_frames(self):
|
||||
while True:
|
||||
frame: QueueFrame | list = await self.send_queue.get()
|
||||
@@ -130,6 +336,7 @@ class BaseTransportService():
|
||||
break
|
||||
|
||||
def interrupt(self):
|
||||
print(f"!!! setting interrupt")
|
||||
self._is_interrupted.set()
|
||||
|
||||
async def get_receive_frames(self) -> AsyncGenerator[QueueFrame, None]:
|
||||
@@ -204,7 +411,6 @@ class BaseTransportService():
|
||||
if frame:
|
||||
if isinstance(frame, AudioQueueFrame):
|
||||
chunk = frame.data
|
||||
|
||||
all_audio_frames.extend(chunk)
|
||||
|
||||
b.extend(chunk)
|
||||
@@ -212,21 +418,27 @@ class BaseTransportService():
|
||||
len(b) % smallest_write_size
|
||||
)
|
||||
if truncated_length:
|
||||
self.write_frame_to_mic(bytes(b[:truncated_length]))
|
||||
self.write_frame_to_mic(
|
||||
bytes(b[:truncated_length]))
|
||||
b = b[truncated_length:]
|
||||
elif isinstance(frame, ImageQueueFrame):
|
||||
self._set_image(frame.image)
|
||||
elif isinstance(frame, SpriteQueueFrame):
|
||||
self._set_images(frame.images)
|
||||
elif isinstance(frame, TTSCompletedFrame) and not frame.outOfBand:
|
||||
self.append_to_context(
|
||||
"assistant", frame.text)
|
||||
elif len(b):
|
||||
self.write_frame_to_mic(bytes(b))
|
||||
b = bytearray()
|
||||
else:
|
||||
# if there are leftover audio bytes, write them now; failing to do so
|
||||
# can cause static in the audio stream.
|
||||
print(f"!!! interrupted, flushing audio")
|
||||
if len(b):
|
||||
truncated_length = len(b) - (len(b) % 160)
|
||||
self.write_frame_to_mic(bytes(b[:truncated_length]))
|
||||
self.write_frame_to_mic(
|
||||
bytes(b[:truncated_length]))
|
||||
b = bytearray()
|
||||
|
||||
if isinstance(frame, StartStreamQueueFrame):
|
||||
@@ -239,5 +451,6 @@ class BaseTransportService():
|
||||
|
||||
b = bytearray()
|
||||
except Exception as e:
|
||||
self._logger.error(f"Exception in frame_consumer: {e}, {len(b)}")
|
||||
self._logger.error(
|
||||
f"Exception in frame_consumer: {e}, {len(b)}")
|
||||
raise e
|
||||
|
||||
@@ -1,18 +1,4 @@
|
||||
import asyncio
|
||||
import inspect
|
||||
import logging
|
||||
import signal
|
||||
import threading
|
||||
import types
|
||||
|
||||
from functools import partial
|
||||
|
||||
from dailyai.queue_frame import (
|
||||
TranscriptionQueueFrame,
|
||||
)
|
||||
|
||||
from threading import Event
|
||||
|
||||
from dailyai.services.base_transport_service import BaseTransportService
|
||||
from daily import (
|
||||
EventHandler,
|
||||
CallClient,
|
||||
@@ -21,8 +7,61 @@ from daily import (
|
||||
VirtualMicrophoneDevice,
|
||||
VirtualSpeakerDevice,
|
||||
)
|
||||
from threading import Event
|
||||
from dailyai.queue_frame import (
|
||||
TranscriptionQueueFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame
|
||||
)
|
||||
from functools import partial
|
||||
import types
|
||||
import pyaudio
|
||||
import torchaudio
|
||||
import asyncio
|
||||
import inspect
|
||||
import io
|
||||
import logging
|
||||
import numpy as np
|
||||
import signal
|
||||
import threading
|
||||
import torch
|
||||
torch.set_num_threads(1)
|
||||
|
||||
from dailyai.services.base_transport_service import BaseTransportService
|
||||
model, utils = torch.hub.load(repo_or_dir='snakers4/silero-vad',
|
||||
model='silero_vad',
|
||||
force_reload=False)
|
||||
|
||||
(get_speech_timestamps,
|
||||
save_audio,
|
||||
read_audio,
|
||||
VADIterator,
|
||||
collect_chunks) = utils
|
||||
|
||||
# Taken from utils_vad.py
|
||||
|
||||
|
||||
def validate(model,
|
||||
inputs: torch.Tensor):
|
||||
with torch.no_grad():
|
||||
outs = model(inputs)
|
||||
return outs
|
||||
|
||||
# Provided by Alexander Veysov
|
||||
|
||||
|
||||
def int2float(sound):
|
||||
abs_max = np.abs(sound).max()
|
||||
sound = sound.astype('float32')
|
||||
if abs_max > 0:
|
||||
sound *= 1/32768
|
||||
sound = sound.squeeze() # depends on the use case
|
||||
return sound
|
||||
|
||||
|
||||
FORMAT = pyaudio.paInt16
|
||||
CHANNELS = 1
|
||||
SAMPLE_RATE = 16000
|
||||
CHUNK = int(SAMPLE_RATE / 10)
|
||||
|
||||
audio = pyaudio.PyAudio()
|
||||
|
||||
|
||||
class DailyTransportService(BaseTransportService, EventHandler):
|
||||
@@ -45,7 +84,8 @@ class DailyTransportService(BaseTransportService, EventHandler):
|
||||
start_transcription: bool = False,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs) # This will call BaseTransportService.__init__ method, not EventHandler
|
||||
# This will call BaseTransportService.__init__ method, not EventHandler
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self._room_url: str = room_url
|
||||
self._bot_name: str = bot_name
|
||||
@@ -80,7 +120,8 @@ class DailyTransportService(BaseTransportService, EventHandler):
|
||||
for handler in self._event_handlers[event_name]:
|
||||
if inspect.iscoroutinefunction(handler):
|
||||
if self._loop:
|
||||
asyncio.run_coroutine_threadsafe(handler(*args, **kwargs), self._loop)
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
handler(*args, **kwargs), self._loop)
|
||||
else:
|
||||
raise Exception(
|
||||
"No event loop to run coroutine. In order to use async event handlers, you must run the DailyTransportService in an asyncio event loop.")
|
||||
@@ -92,7 +133,8 @@ class DailyTransportService(BaseTransportService, EventHandler):
|
||||
|
||||
def add_event_handler(self, event_name: str, handler):
|
||||
if not event_name.startswith("on_"):
|
||||
raise Exception(f"Event handler {event_name} must start with 'on_'")
|
||||
raise Exception(
|
||||
f"Event handler {event_name} must start with 'on_'")
|
||||
|
||||
methods = inspect.getmembers(self, predicate=inspect.ismethod)
|
||||
if event_name not in [method[0] for method in methods]:
|
||||
@@ -105,7 +147,8 @@ class DailyTransportService(BaseTransportService, EventHandler):
|
||||
handler, self)]
|
||||
setattr(self, event_name, partial(self._patch_method, event_name))
|
||||
else:
|
||||
self._event_handlers[event_name].append(types.MethodType(handler, self))
|
||||
self._event_handlers[event_name].append(
|
||||
types.MethodType(handler, self))
|
||||
|
||||
def event_handler(self, event_name: str):
|
||||
def decorator(handler):
|
||||
@@ -149,7 +192,8 @@ class DailyTransportService(BaseTransportService, EventHandler):
|
||||
Daily.select_speaker_device("speaker")
|
||||
|
||||
self.client.set_user_name(self._bot_name)
|
||||
self.client.join(self._room_url, self._token, completion=self.call_joined)
|
||||
self.client.join(self._room_url, self._token,
|
||||
completion=self.call_joined)
|
||||
self._my_participant_id = self.client.participants()["local"]["id"]
|
||||
|
||||
self.client.update_inputs(
|
||||
@@ -205,7 +249,7 @@ class DailyTransportService(BaseTransportService, EventHandler):
|
||||
def _post_run(self):
|
||||
self.client.leave()
|
||||
|
||||
def on_first_other_participant_joined(self, participant):
|
||||
def on_first_other_participant_joined(self):
|
||||
pass
|
||||
|
||||
def call_joined(self, join_data, client_error):
|
||||
@@ -226,24 +270,47 @@ class DailyTransportService(BaseTransportService, EventHandler):
|
||||
def on_participant_joined(self, participant):
|
||||
if not self._other_participant_has_joined and participant["id"] != self._my_participant_id:
|
||||
self._other_participant_has_joined = True
|
||||
self.on_first_other_participant_joined(participant)
|
||||
self.on_first_other_participant_joined()
|
||||
|
||||
def on_participant_left(self, participant, reason):
|
||||
if len(self.client.participants()) < self._min_others_count + 1:
|
||||
self._stop_threads.set()
|
||||
|
||||
async def insert_speech(self, text, sender, date):
|
||||
await self.receive_queue.put(UserStartedSpeakingFrame())
|
||||
await asyncio.sleep(0.3)
|
||||
|
||||
# frame = TranscriptionQueueFrame(text, sender, date)
|
||||
# await self.receive_queue.put(frame)
|
||||
self.on_transcription_message({
|
||||
"text": text,
|
||||
"participantId": "cb65b845-aac0-4fc8-987d-2e7ce3c7d8f0",
|
||||
"timestamp": date
|
||||
})
|
||||
|
||||
await asyncio.sleep(0.3)
|
||||
await self.receive_queue.put(UserStoppedSpeakingFrame())
|
||||
|
||||
def on_app_message(self, message, sender):
|
||||
pass
|
||||
if self._loop:
|
||||
print("APP MESSAGE", message)
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.insert_speech(message["message"], sender, message["date"]), self._loop)
|
||||
|
||||
def on_transcription_message(self, message: dict):
|
||||
if self._loop:
|
||||
print(f"transcription: {message}")
|
||||
participantId = ""
|
||||
if "participantId" in message:
|
||||
participantId = message["participantId"]
|
||||
elif "session_id" in message:
|
||||
participantId = message["session_id"]
|
||||
frame = TranscriptionQueueFrame(message["text"], participantId, message["timestamp"])
|
||||
asyncio.run_coroutine_threadsafe(self.receive_queue.put(frame), self._loop)
|
||||
frame = TranscriptionQueueFrame(
|
||||
message["text"], participantId, message["timestamp"])
|
||||
if self._my_participant_id and participantId != self._my_participant_id:
|
||||
self.append_to_context("user", message["text"])
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.receive_queue.put(frame), self._loop)
|
||||
|
||||
def on_transcription_stopped(self, stopped_by, stopped_by_error):
|
||||
pass
|
||||
|
||||
@@ -12,13 +12,12 @@ class ElevenLabsTTSService(TTSService):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
aiohttp_session: aiohttp.ClientSession,
|
||||
api_key,
|
||||
voice_id,
|
||||
*args,
|
||||
**kwargs
|
||||
):
|
||||
super().__init__(*args, **kwargs)
|
||||
super().__init__()
|
||||
|
||||
self._api_key = api_key
|
||||
self._voice_id = voice_id
|
||||
|
||||
@@ -32,7 +32,8 @@ class FalImageGenService(ImageGenService):
|
||||
handler = fal.apps.submit(
|
||||
"110602490-fast-sdxl",
|
||||
arguments={
|
||||
"prompt": sentence
|
||||
"prompt": sentence,
|
||||
"seed": 23
|
||||
},
|
||||
)
|
||||
for event in handler.iter_events():
|
||||
|
||||
122
src/dailyai/services/fireworks_ai_services.py
Normal file
122
src/dailyai/services/fireworks_ai_services.py
Normal file
@@ -0,0 +1,122 @@
|
||||
import aiohttp
|
||||
from PIL import Image
|
||||
import io
|
||||
from openai import AsyncOpenAI
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from collections.abc import AsyncGenerator
|
||||
|
||||
from dailyai.services.ai_services import LLMService, ImageGenService
|
||||
|
||||
from dailyai.queue_frame import (TextQueueFrame, TextQueueOutOfBandFrame)
|
||||
|
||||
|
||||
class FireworksLLMService(LLMService):
|
||||
def __init__(self, *, api_key, model="", tools=[], context, change_appearance, transport=""):
|
||||
super().__init__(context)
|
||||
self._model = model
|
||||
self._tools = tools
|
||||
self._change_appearance = change_appearance
|
||||
self._transport = transport
|
||||
self._client = AsyncOpenAI(
|
||||
api_key=api_key,
|
||||
base_url="https://api.fireworks.ai/inference/v1"
|
||||
)
|
||||
|
||||
async def get_response(self, messages, stream):
|
||||
print("GET RESPONSE ... WHEN DO WE EXPECT THIS TO BE CALLED?")
|
||||
return await self._client.chat.completions.create(
|
||||
stream=stream,
|
||||
messages=messages,
|
||||
model=self._model,
|
||||
temperature=0.1,
|
||||
tools=self._tools
|
||||
)
|
||||
|
||||
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
|
||||
print("IN ASYNC")
|
||||
messages_for_log = json.dumps(messages)
|
||||
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
|
||||
|
||||
chunks = await self._client.chat.completions.create(
|
||||
model=self._model,
|
||||
stream=True, # BLARGH
|
||||
messages=messages,
|
||||
temperature=0.1,
|
||||
tools=self._tools
|
||||
)
|
||||
|
||||
tool_call = {}
|
||||
|
||||
async for chunk in chunks:
|
||||
print(f"CHUNK: {chunk}")
|
||||
if len(chunk.choices) == 0:
|
||||
continue
|
||||
|
||||
if chunk.choices[0].delta.content:
|
||||
yield chunk.choices[0].delta.content
|
||||
|
||||
if chunk.choices[0].delta.tool_calls:
|
||||
print(f"TOOL CALLS: {chunk.choices[0].delta.tool_calls[0]}")
|
||||
if chunk.choices[0].delta.tool_calls[0].function.name:
|
||||
tool_call["id"] = chunk.choices[0].delta.tool_calls[0].id
|
||||
tool_call["name"] = chunk.choices[0].delta.tool_calls[0].function.name
|
||||
tool_call["arguments"] = ''
|
||||
if chunk.choices[0].delta.tool_calls[0].function.arguments:
|
||||
tool_call["arguments"] += chunk.choices[0].delta.tool_calls[0].function.arguments
|
||||
|
||||
if chunk.choices[0].finish_reason:
|
||||
print(f"TOOL CALLS ACCUM -- {tool_call}")
|
||||
if tool_call.get("name"):
|
||||
# hard coding tool call action for now. we should assemble the tool call
|
||||
# from the streaming response, then yield it to the pipeline.
|
||||
# this approach works for the first few change appearance requests but
|
||||
# then the model starts refusing. need to read more about function
|
||||
# calling, try this with the OpenAI APIs, and talk to the Fireworks people.
|
||||
self._transport.append_to_context("assistant", {
|
||||
# pipeline will append the content to this context after it goes
|
||||
# through tts. we need to manually append the tool call, though
|
||||
"content": "",
|
||||
"role": "assistant",
|
||||
"tool_calls": [
|
||||
{
|
||||
"id": tool_call["id"],
|
||||
"type": "function",
|
||||
"index": 0,
|
||||
"function": {
|
||||
"name": tool_call["name"],
|
||||
"arguments": tool_call["arguments"]
|
||||
},
|
||||
}
|
||||
],
|
||||
})
|
||||
self._transport.append_to_context("tool", {
|
||||
"content": "image generated by prompt arguments: " + tool_call["arguments"],
|
||||
"role": "tool",
|
||||
"tool_call_id": tool_call["id"]
|
||||
})
|
||||
self._transport.append_to_context("assistant", {
|
||||
"content": f"call to {tool_call['name']} function succeeded",
|
||||
"role": "assistant",
|
||||
})
|
||||
print("APPENDED TO CONTEXT")
|
||||
image_prompt = json.loads(
|
||||
tool_call["arguments"]).get("appearance")
|
||||
print("IMAGE PROMPT", image_prompt)
|
||||
asyncio.create_task(
|
||||
self._change_appearance(image_prompt))
|
||||
yield TextQueueOutOfBandFrame("Sure, let me work on that for you!")
|
||||
# yield {"content": "Sure, let me work on that for you!"}
|
||||
# yield "Sure, let me work on that for you!"
|
||||
|
||||
async def run_llm(self, messages) -> str | None:
|
||||
print("--> IN SYNC ... WHEN DO WE EXPECT THIS TO BE CALLED?")
|
||||
messages_for_log = json.dumps(messages)
|
||||
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
|
||||
|
||||
response = await self._client.chat.completions.create(model=self._model, stream=False, messages=messages)
|
||||
if response and len(response.choices) > 0:
|
||||
return response.choices[0].message.content
|
||||
else:
|
||||
return None
|
||||
33
src/dailyai/services/groq_ai_services.py
Normal file
33
src/dailyai/services/groq_ai_services.py
Normal file
@@ -0,0 +1,33 @@
|
||||
import os
|
||||
import groq
|
||||
from groq import AsyncGroq
|
||||
from dailyai.services.ai_services import LLMService
|
||||
from collections.abc import AsyncGenerator
|
||||
|
||||
|
||||
class GroqLLMService(LLMService):
|
||||
def __init__(self, *, api_key, model="mixtral-8x7b-32768", context):
|
||||
super().__init__(context)
|
||||
self._model = model
|
||||
# os.environ["GROQ_SECRET_ACCESS_KEY"] = api_key
|
||||
|
||||
self._client = AsyncGroq()
|
||||
|
||||
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
|
||||
print(f"messages are {messages}")
|
||||
try:
|
||||
resp = await self._client.chat.completions.create(messages=messages, model=self._model)
|
||||
print(f"got chunks from groq: {resp}")
|
||||
|
||||
if resp.choices[0].message.content:
|
||||
yield resp.choices[0].message.content
|
||||
except groq.APIConnectionError as e:
|
||||
print("The server could not be reached")
|
||||
print(e.__cause__) # an underlying Exception, likely raised within httpx.
|
||||
except groq.RateLimitError as e:
|
||||
print("A 429 status code was received; we should back off a bit.")
|
||||
except groq.APIStatusError as e:
|
||||
print("Another non-200-range status code was received")
|
||||
print(e.status_code)
|
||||
print(e.response)
|
||||
|
||||
@@ -10,8 +10,8 @@ from dailyai.services.ai_services import LLMService, ImageGenService
|
||||
|
||||
|
||||
class OpenAILLMService(LLMService):
|
||||
def __init__(self, *, api_key, model="gpt-4"):
|
||||
super().__init__()
|
||||
def __init__(self, *, api_key, model="gpt-4-turbo-preview", context):
|
||||
super().__init__(context)
|
||||
self._model = model
|
||||
self._client = AsyncOpenAI(api_key=api_key)
|
||||
|
||||
|
||||
@@ -11,13 +11,12 @@ class PlayHTAIService(TTSService):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
api_key,
|
||||
user_id,
|
||||
voice_url,
|
||||
*args,
|
||||
**kwargs,
|
||||
voice_url
|
||||
):
|
||||
super().__init__(*args, **kwargs)
|
||||
super().__init__()
|
||||
|
||||
self.speech_key = api_key
|
||||
self.user_id = user_id
|
||||
|
||||
@@ -1,124 +0,0 @@
|
||||
import asyncio
|
||||
import unittest
|
||||
|
||||
from dailyai.queue_frame import EndStreamQueueFrame, TextQueueFrame
|
||||
from dailyai.services.ai_services import PipeService
|
||||
from dailyai.queue_aggregators import QueueFrameAggregator, QueueMergeGateOnFirst, QueueTee
|
||||
|
||||
class IncomingPipeService(PipeService):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.sink_queue = asyncio.Queue()
|
||||
|
||||
class QueueTeeTest(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
async def test_queue_tee(self):
|
||||
originpipe = IncomingPipeService()
|
||||
inpipe1 = PipeService(originpipe.sink_queue)
|
||||
outpipe1 = PipeService()
|
||||
outpipe2 = PipeService()
|
||||
teepipe = QueueTee(source_queue=inpipe1.sink_queue, sinks=[outpipe1, outpipe2])
|
||||
|
||||
originpipe.sink_queue.put_nowait(TextQueueFrame("test"))
|
||||
originpipe.sink_queue.put_nowait(EndStreamQueueFrame())
|
||||
|
||||
await asyncio.gather(*[pipe.process_queue() for pipe in [originpipe, inpipe1, outpipe1, outpipe2, teepipe]])
|
||||
|
||||
def validateOutputPipe(pipe: PipeService):
|
||||
self.assertEqual(pipe.sink_queue.qsize(), 2)
|
||||
frame = pipe.sink_queue.get_nowait()
|
||||
self.assertIsInstance(frame, TextQueueFrame)
|
||||
if isinstance(frame, TextQueueFrame):
|
||||
self.assertEqual(frame.text, "test")
|
||||
self.assertIsInstance(pipe.sink_queue.get_nowait(), EndStreamQueueFrame)
|
||||
|
||||
validateOutputPipe(outpipe1)
|
||||
validateOutputPipe(outpipe2)
|
||||
|
||||
class QueueFrameAggregatorTest(unittest.IsolatedAsyncioTestCase):
|
||||
async def test_queue_frame_aggregator(self):
|
||||
def aggregate_sentences(accumulation, frame):
|
||||
if not accumulation:
|
||||
accumulation = ""
|
||||
if isinstance(frame, TextQueueFrame):
|
||||
accumulation += frame.text
|
||||
if accumulation.endswith((".", "!", "?")):
|
||||
return ("", TextQueueFrame(accumulation))
|
||||
return (accumulation, None)
|
||||
|
||||
def finalize_sentences(accumulation):
|
||||
return TextQueueFrame(accumulation)
|
||||
|
||||
originpipe = IncomingPipeService()
|
||||
aggregator_pipe = QueueFrameAggregator(
|
||||
source_queue=originpipe.sink_queue,
|
||||
aggregator=aggregate_sentences,
|
||||
finalizer=finalize_sentences,
|
||||
)
|
||||
|
||||
originpipe.sink_queue.put_nowait(TextQueueFrame("testing, "))
|
||||
originpipe.sink_queue.put_nowait(TextQueueFrame("one."))
|
||||
originpipe.sink_queue.put_nowait(TextQueueFrame("two."))
|
||||
originpipe.sink_queue.put_nowait(TextQueueFrame("three."))
|
||||
originpipe.sink_queue.put_nowait(TextQueueFrame("can you "))
|
||||
originpipe.sink_queue.put_nowait(TextQueueFrame("hear me"))
|
||||
originpipe.sink_queue.put_nowait(EndStreamQueueFrame())
|
||||
|
||||
await asyncio.gather(originpipe.process_queue(), aggregator_pipe.process_queue())
|
||||
|
||||
self.assertEqual(aggregator_pipe.sink_queue.qsize(), 5)
|
||||
expected_text = ["testing, one.", "two.", "three.", "can you hear me"]
|
||||
for exepectation in expected_text:
|
||||
frame = aggregator_pipe.sink_queue.get_nowait()
|
||||
print(frame)
|
||||
self.assertIsInstance(frame, TextQueueFrame)
|
||||
if isinstance(frame, TextQueueFrame):
|
||||
self.assertEqual(frame.text, exepectation)
|
||||
|
||||
self.assertIsInstance(aggregator_pipe.sink_queue.get_nowait(), EndStreamQueueFrame)
|
||||
|
||||
class QueueMergeGateOnFirstTest(unittest.IsolatedAsyncioTestCase):
|
||||
async def test_queue_merge_gate_on_first(self):
|
||||
pipe1 = IncomingPipeService()
|
||||
pipe2 = IncomingPipeService()
|
||||
|
||||
merge_pipe = QueueMergeGateOnFirst(
|
||||
source_queues=[pipe1.sink_queue, pipe2.sink_queue],
|
||||
)
|
||||
|
||||
evt = asyncio.Event()
|
||||
|
||||
async def add_items_to_first_pipe():
|
||||
await evt.wait()
|
||||
await pipe1.sink_queue.put(TextQueueFrame("pipe1.1"))
|
||||
await pipe1.sink_queue.put(TextQueueFrame("pipe1.2"))
|
||||
await pipe1.sink_queue.put(EndStreamQueueFrame())
|
||||
|
||||
async def add_items_to_second_pipe():
|
||||
await pipe2.sink_queue.put(TextQueueFrame("pipe2.1"))
|
||||
evt.set()
|
||||
await pipe2.sink_queue.put(EndStreamQueueFrame())
|
||||
|
||||
await asyncio.gather(
|
||||
*[pipe.process_queue() for pipe in [pipe1, pipe2, merge_pipe]],
|
||||
add_items_to_first_pipe(),
|
||||
add_items_to_second_pipe())
|
||||
|
||||
self.assertEqual(merge_pipe.sink_queue.qsize(), 4)
|
||||
frame = merge_pipe.sink_queue.get_nowait()
|
||||
assert isinstance(frame, TextQueueFrame)
|
||||
if isinstance(frame, TextQueueFrame):
|
||||
self.assertEqual(frame.text, "pipe1.1")
|
||||
|
||||
frame = merge_pipe.sink_queue.get_nowait()
|
||||
assert isinstance(frame, TextQueueFrame)
|
||||
if isinstance(frame, TextQueueFrame):
|
||||
self.assertEqual(frame.text, "pipe2.1")
|
||||
|
||||
frame = merge_pipe.sink_queue.get_nowait()
|
||||
assert isinstance(frame, TextQueueFrame)
|
||||
if isinstance(frame, TextQueueFrame):
|
||||
self.assertEqual(frame.text, "pipe1.2")
|
||||
|
||||
frame = merge_pipe.sink_queue.get_nowait()
|
||||
assert isinstance(frame, EndStreamQueueFrame)
|
||||
@@ -1,30 +0,0 @@
|
||||
import asyncio
|
||||
import unittest
|
||||
|
||||
from dailyai.queue_frame import EndStreamQueueFrame, TextQueueFrame
|
||||
from dailyai.services.ai_services import PipeService
|
||||
|
||||
class TestPipeService(unittest.IsolatedAsyncioTestCase):
|
||||
class IncomingPipeService(PipeService):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.sink_queue = asyncio.Queue()
|
||||
|
||||
async def test_pipe_chain(self):
|
||||
pipe1 = TestPipeService.IncomingPipeService()
|
||||
pipe2 = PipeService(pipe1.sink_queue)
|
||||
pipe3 = PipeService(pipe2.sink_queue)
|
||||
|
||||
await pipe1.sink_queue.put(TextQueueFrame("test"))
|
||||
await pipe1.sink_queue.put(EndStreamQueueFrame())
|
||||
|
||||
await asyncio.gather(pipe1.process_queue(), pipe2.process_queue(), pipe3.process_queue())
|
||||
|
||||
self.assertEqual(pipe3.sink_queue.qsize(), 2)
|
||||
frame = await pipe3.sink_queue.get()
|
||||
self.assertIsInstance(frame, TextQueueFrame)
|
||||
if isinstance(frame, TextQueueFrame):
|
||||
self.assertEqual(frame.text, "test")
|
||||
|
||||
frame = await pipe3.sink_queue.get()
|
||||
self.assertIsInstance(frame, EndStreamQueueFrame)
|
||||
@@ -1,7 +1,6 @@
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
from dailyai.queue_frame import EndStreamQueueFrame, TextQueueFrame
|
||||
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
@@ -29,40 +28,36 @@ async def main(room_url):
|
||||
mic_enabled=True
|
||||
)
|
||||
|
||||
"""
|
||||
tts = ElevenLabsTTSService(
|
||||
source_queue=asyncio.Queue(),
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"))
|
||||
"""
|
||||
tts = PlayHTAIService(
|
||||
sink=transport.send_queue,
|
||||
api_key=os.getenv("PLAY_HT_API_KEY"),
|
||||
user_id=os.getenv("PLAY_HT_USER_ID"),
|
||||
voice_url=os.getenv("PLAY_HT_VOICE_URL"),
|
||||
)
|
||||
"""
|
||||
|
||||
tts.sink_queue = transport.send_queue
|
||||
|
||||
# Register an event handler so we can play the audio when the participant joins.
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport, participant):
|
||||
@transport.event_handler("on_participant_joined")
|
||||
async def on_participant_joined(transport, participant):
|
||||
nonlocal tts
|
||||
if participant["info"]["isLocal"]:
|
||||
return
|
||||
|
||||
# todo: update the tts.say() convenience method to use the new queue architecture
|
||||
await tts.source_queue.put(
|
||||
TextQueueFrame("Hello there, " +
|
||||
participant["info"]["userName"] + "!")
|
||||
await tts.say(
|
||||
"Hello there, " + participant["info"]["userName"] + "!",
|
||||
transport.send_queue,
|
||||
)
|
||||
await tts.source_queue.put(EndStreamQueueFrame())
|
||||
|
||||
# wait for the output queue to be empty, then leave the meeting
|
||||
# todo: commented out because it seems to exit a little early, before
|
||||
# the audio is finished playing
|
||||
# await transport.stop_when_done()
|
||||
await transport.stop_when_done()
|
||||
|
||||
await transport.run()
|
||||
del(tts)
|
||||
|
||||
await asyncio.gather(transport.run(), tts.process_queue())
|
||||
|
||||
if __name__ == "__main__":
|
||||
(url, token) = configure()
|
||||
|
||||
@@ -3,7 +3,7 @@ import os
|
||||
|
||||
import aiohttp
|
||||
|
||||
from dailyai.queue_frame import EndStreamQueueFrame, LLMMessagesQueueFrame
|
||||
from dailyai.queue_frame import LLMMessagesQueueFrame
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
@@ -20,38 +20,39 @@ async def main(room_url):
|
||||
None,
|
||||
"Say One Thing From an LLM",
|
||||
duration_minutes=meeting_duration_minutes,
|
||||
mic_enabled=True
|
||||
)
|
||||
|
||||
llm = AzureLLMService(
|
||||
source_queue=asyncio.Queue(),
|
||||
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
|
||||
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
|
||||
model=os.getenv("AZURE_CHATGPT_MODEL"),
|
||||
mic_enabled=True,
|
||||
speaker_enabled=True
|
||||
)
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
|
||||
source_queue=llm.sink_queue, # this should really be a sentence aggregator?
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"))
|
||||
# tts = AzureTTSService(api_key=os.getenv("AZURE_SPEECH_API_KEY"), region=os.getenv("AZURE_SPEECH_REGION"))
|
||||
# tts = DeepgramTTSService(aiohttp_session=session, api_key=os.getenv("DEEPGRAM_API_KEY"), voice=os.getenv("DEEPGRAM_VOICE"))
|
||||
|
||||
llm = AzureLLMService(
|
||||
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
|
||||
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
|
||||
model=os.getenv("AZURE_CHATGPT_MODEL"))
|
||||
# llm = OpenAILLMService(api_key=os.getenv("OPENAI_CHATGPT_API_KEY"))
|
||||
messages = [{
|
||||
"role": "system",
|
||||
"content": "You are an LLM in a WebRTC session, and this is a 'hello world' demo. Say hello to the world."
|
||||
}]
|
||||
tts_task = asyncio.create_task(
|
||||
tts.run_to_queue(
|
||||
transport.send_queue,
|
||||
llm.run([LLMMessagesQueueFrame(messages)]),
|
||||
)
|
||||
)
|
||||
|
||||
tts.sink_queue = transport.send_queue
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport, participant):
|
||||
messages = [{
|
||||
"role": "system",
|
||||
"content": "You are an LLM in a WebRTC session, and this is a 'hello world' demo. Say hello to the world."
|
||||
}]
|
||||
await llm.source_queue.put(LLMMessagesQueueFrame(messages))
|
||||
await llm.source_queue.put(EndStreamQueueFrame())
|
||||
# todo: commented out because it exits before audio plays
|
||||
# await transport.stop_when_done()
|
||||
async def on_first_other_participant_joined(transport):
|
||||
await tts_task
|
||||
await transport.stop_when_done()
|
||||
|
||||
await asyncio.gather(transport.run(), llm.process_queue(), tts.process_queue())
|
||||
await transport.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,11 +1,8 @@
|
||||
import asyncio
|
||||
from typing import Any, AsyncGenerator, Callable, Tuple
|
||||
import aiohttp
|
||||
import os
|
||||
from dailyai.queue_aggregators import QueueFrameAggregator, QueueMergeGateOnFirst, QueueTee
|
||||
|
||||
from dailyai.queue_frame import AudioQueueFrame, EndStreamQueueFrame, ImageQueueFrame, LLMMessagesQueueFrame, LLMResponseEndQueueFrame, QueueFrame, TextQueueFrame
|
||||
from dailyai.services.ai_services import PipeService
|
||||
from dailyai.queue_frame import AudioQueueFrame, ImageQueueFrame
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureImageGenServiceREST, AzureTTSService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
@@ -30,70 +27,34 @@ async def main(room_url):
|
||||
camera_height=1024
|
||||
)
|
||||
|
||||
"""
|
||||
|
||||
/ TTS \
|
||||
Month prompt -> LLM -> Fork -> -> Gate -> Transport
|
||||
\ Aggregate -> ImageGen /
|
||||
"""
|
||||
|
||||
month_description_queue: asyncio.Queue[QueueFrame] = asyncio.Queue()
|
||||
llm = AzureLLMService(
|
||||
source_queue=month_description_queue,
|
||||
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
|
||||
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
|
||||
model=os.getenv("AZURE_CHATGPT_MODEL"),
|
||||
)
|
||||
|
||||
model=os.getenv("AZURE_CHATGPT_MODEL"))
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id="ErXwobaYiN019PkySvjV")
|
||||
# tts = AzureTTSService(api_key=os.getenv("AZURE_SPEECH_API_KEY"), region=os.getenv("AZURE_SPEECH_REGION"))
|
||||
|
||||
dalle = FalImageGenService(
|
||||
image_size="1024x1024",
|
||||
aiohttp_session=session,
|
||||
key_id=os.getenv("FAL_KEY_ID"),
|
||||
key_secret=os.getenv("FAL_KEY_SECRET"),
|
||||
)
|
||||
key_secret=os.getenv("FAL_KEY_SECRET"))
|
||||
# dalle = OpenAIImageGenService(aiohttp_session=session, api_key=os.getenv("OPENAI_DALLE_API_KEY"), image_size="1024x1024")
|
||||
# dalle = AzureImageGenServiceREST(image_size="1024x1024", aiohttp_session=session, api_key=os.getenv("AZURE_DALLE_API_KEY"), endpoint=os.getenv("AZURE_DALLE_ENDPOINT"), model=os.getenv("AZURE_DALLE_MODEL"))
|
||||
|
||||
def aggregator(
|
||||
accumulation, frame: QueueFrame
|
||||
) -> tuple[Any, QueueFrame | None]:
|
||||
if not accumulation:
|
||||
accumulation = ""
|
||||
# Get a complete audio chunk from the given text. Splitting this into its own
|
||||
# coroutine lets us ensure proper ordering of the audio chunks on the send queue.
|
||||
async def get_all_audio(text):
|
||||
all_audio = bytearray()
|
||||
async for audio in tts.run_tts(text):
|
||||
all_audio.extend(audio)
|
||||
|
||||
if isinstance(frame, TextQueueFrame):
|
||||
accumulation += frame.text
|
||||
return (accumulation, None)
|
||||
elif isinstance(frame, LLMResponseEndQueueFrame):
|
||||
return ("", TextQueueFrame(accumulation))
|
||||
else:
|
||||
return (accumulation, frame)
|
||||
return all_audio
|
||||
|
||||
# This queue service takes chunks from LLM output and merges them into one text frame
|
||||
# that will be used to prompt the image service.
|
||||
llm_aggregator_for_image = QueueFrameAggregator(aggregator=aggregator, finalizer=lambda x: None)
|
||||
|
||||
# Set the source queue for the image service to the sink of the aggregator service
|
||||
dalle.source_queue = llm_aggregator_for_image.sink_queue
|
||||
|
||||
# This queue service takes the output from the LLM and sends it to the TTS service and
|
||||
# the aggregator for the image generation service.
|
||||
tee = QueueTee(source_queue=llm.sink_queue, sinks=[tts, llm_aggregator_for_image])
|
||||
|
||||
# This queue service takes input from the TTS service and the image service, and waits
|
||||
# to forward any audio frames until the image generation is complete. It will send
|
||||
# the image first, then the audio frames; this ensures that the image is shown before
|
||||
# the audio associated with the image is played.
|
||||
tts_image_gate = QueueMergeGateOnFirst([dalle.sink_queue, tts.sink_queue])
|
||||
|
||||
# We send the image of this queue service to the transport output.
|
||||
tts_image_gate.sink_queue = transport.send_queue
|
||||
|
||||
# Queue up all the months in the LLM service source queue
|
||||
months = ["January"] #, "February"]
|
||||
for month in months:
|
||||
async def get_month_data(month):
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
@@ -101,21 +62,72 @@ async def main(room_url):
|
||||
}
|
||||
]
|
||||
|
||||
await month_description_queue.put(LLMMessagesQueueFrame(messages))
|
||||
image_description = await llm.run_llm(messages)
|
||||
if not image_description:
|
||||
return
|
||||
|
||||
await month_description_queue.put(EndStreamQueueFrame())
|
||||
to_speak = f"{month}: {image_description}"
|
||||
audio_task = asyncio.create_task(get_all_audio(to_speak))
|
||||
image_task = asyncio.create_task(dalle.run_image_gen(image_description))
|
||||
print(f"about to gather tasks for {month}")
|
||||
(audio, image_data) = await asyncio.gather(
|
||||
audio_task, image_task
|
||||
)
|
||||
print(f"about to return from get_month_data for {month}")
|
||||
return {
|
||||
"month": month,
|
||||
"text": image_description,
|
||||
"image_url": image_data[0],
|
||||
"image": image_data[1],
|
||||
"audio": audio,
|
||||
}
|
||||
|
||||
pipeline = [
|
||||
llm,
|
||||
[tee,
|
||||
[tts,
|
||||
[llm_aggregator_for_image, dalle]
|
||||
],
|
||||
tts_image_gate],
|
||||
transport,
|
||||
months: list[str] = [
|
||||
"January",
|
||||
"February",
|
||||
"March",
|
||||
"April",
|
||||
"May",
|
||||
"June"
|
||||
]
|
||||
"""
|
||||
"February",
|
||||
"March",
|
||||
"April",
|
||||
"May",
|
||||
"June",
|
||||
"July",
|
||||
"August",
|
||||
"September",
|
||||
"October",
|
||||
"November",
|
||||
"December",
|
||||
"""
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
# This will play the months in the order they're completed. The benefit
|
||||
# is we'll have as little delay as possible before the first month, and
|
||||
# likely no delay between months, but the months won't display in order.
|
||||
for month_data_task in asyncio.as_completed(month_tasks):
|
||||
print(f"month_data_task: {month_data_task}")
|
||||
try:
|
||||
data = await month_data_task
|
||||
except Exception:
|
||||
print("OMG EXCEPTION!!!!")
|
||||
if data:
|
||||
await transport.send_queue.put(
|
||||
[
|
||||
ImageQueueFrame(data["image_url"], data["image"]),
|
||||
AudioQueueFrame(data["audio"]),
|
||||
]
|
||||
)
|
||||
|
||||
await asyncio.gather(transport.run(), *[service.process_queue() for service in [llm, tts, dalle, tee, tts_image_gate, llm_aggregator_for_image]])
|
||||
# wait for the output queue to be empty, then leave the meeting
|
||||
await transport.stop_when_done()
|
||||
|
||||
month_tasks = [asyncio.create_task(get_month_data(month)) for month in months]
|
||||
|
||||
await transport.run()
|
||||
|
||||
if __name__ == "__main__":
|
||||
(url, token) = configure()
|
||||
|
||||
@@ -5,9 +5,16 @@ from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.queue_aggregators import LLMAssistantContextAggregator, LLMContextAggregator, LLMUserContextAggregator
|
||||
from examples.foundational.support.runner import configure
|
||||
from dailyai.services.ai_services import FrameLogger
|
||||
|
||||
|
||||
async def main(room_url: str, token):
|
||||
context = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
token,
|
||||
@@ -16,7 +23,9 @@ async def main(room_url: str, token):
|
||||
start_transcription=True,
|
||||
mic_enabled=True,
|
||||
mic_sample_rate=16000,
|
||||
camera_enabled=False
|
||||
camera_enabled=False,
|
||||
speaker_enabled=True,
|
||||
context=context
|
||||
)
|
||||
|
||||
llm = AzureLLMService(
|
||||
@@ -26,33 +35,33 @@ async def main(room_url: str, token):
|
||||
tts = AzureTTSService(
|
||||
api_key=os.getenv("AZURE_SPEECH_API_KEY"),
|
||||
region=os.getenv("AZURE_SPEECH_REGION"))
|
||||
fl = FrameLogger("transport")
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
await tts.say("Hi, I'm listening!", transport.send_queue)
|
||||
|
||||
async def handle_transcriptions():
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
tma_in = LLMUserContextAggregator(messages, transport._my_participant_id)
|
||||
tma_out = LLMAssistantContextAggregator(messages, transport._my_participant_id)
|
||||
tma_in = LLMUserContextAggregator(
|
||||
context, transport._my_participant_id)
|
||||
tma_out = LLMAssistantContextAggregator(
|
||||
context, transport._my_participant_id)
|
||||
await tts.run_to_queue(
|
||||
transport.send_queue,
|
||||
tma_out.run(
|
||||
llm.run(
|
||||
tma_in.run(
|
||||
transport.get_receive_frames()
|
||||
fl.run(
|
||||
transport.get_receive_frames()
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
transport.transcription_settings["extra"]["punctuate"] = True
|
||||
transport.transcription_settings["extra"]["endpointing"] = True
|
||||
await asyncio.gather(transport.run(), handle_transcriptions())
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,83 @@
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
from dailyai.conversation_wrappers import InterruptibleConversationWrapper
|
||||
|
||||
from dailyai.queue_frame import StartStreamQueueFrame, TextQueueFrame
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from dailyai.services.open_ai_services import OpenAILLMService
|
||||
from dailyai.services.deepgram_ai_services import DeepgramTTSService
|
||||
from dailyai.services.ai_services import FrameLogger
|
||||
from dailyai.services.groq_ai_services import GroqLLMService
|
||||
|
||||
from examples.foundational.support.runner import configure
|
||||
|
||||
|
||||
async def main(room_url: str, token):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
context = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
duration_minutes=5,
|
||||
start_transcription=True,
|
||||
mic_enabled=True,
|
||||
mic_sample_rate=16000,
|
||||
camera_enabled=False,
|
||||
# TODO-CB: Should this be VAD enabled or something?
|
||||
speaker_enabled=True,
|
||||
context=context
|
||||
)
|
||||
|
||||
# llm = AzureLLMService(
|
||||
# api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
|
||||
# endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
|
||||
# model=os.getenv("AZURE_CHATGPT_MODEL"),
|
||||
# context=context)
|
||||
llm = OpenAILLMService(
|
||||
context=context, api_key=os.getenv("OPENAI_CHATGPT_API_KEY"))
|
||||
# llm = GroqLLMService(api_key=os.getenv("GROQ_API_KEY"), context=context)
|
||||
# tts = AzureTTSService(
|
||||
# api_key=os.getenv("AZURE_SPEECH_API_KEY"),
|
||||
# region=os.getenv("AZURE_SPEECH_REGION"))
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"))
|
||||
# tts = DeepgramTTSService(aiohttp_session=session, api_key=os.getenv("DEEPGRAM_API_KEY"), voice=os.getenv("DEEPGRAM_VOICE"))
|
||||
fl = FrameLogger("just outside the innermost layer")
|
||||
|
||||
async def run_response(in_frame):
|
||||
await tts.run_to_queue(
|
||||
transport.send_queue,
|
||||
# tma_out.run(
|
||||
llm.run(
|
||||
# tma_in.run(
|
||||
fl.run(
|
||||
[StartStreamQueueFrame(), in_frame]
|
||||
)
|
||||
# )
|
||||
)
|
||||
# ),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
await tts.say("Hi, I'm listening!", transport.send_queue)
|
||||
|
||||
transport.transcription_settings["extra"]["endpointing"] = True
|
||||
transport.transcription_settings["extra"]["punctuate"] = True
|
||||
await asyncio.gather(transport.run(), transport.run_conversation(run_response))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
(url, token) = configure()
|
||||
asyncio.run(main(url, token))
|
||||
160
src/khk-hackathon/06d-listen.py
Normal file
160
src/khk-hackathon/06d-listen.py
Normal file
@@ -0,0 +1,160 @@
|
||||
from datetime import datetime
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
from dailyai.conversation_wrappers import InterruptibleConversationWrapper
|
||||
|
||||
from dailyai.queue_frame import StartStreamQueueFrame, TranscriptionQueueFrame, TextQueueFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.fireworks_ai_services import FireworksLLMService
|
||||
from dailyai.services.deepgram_ai_services import DeepgramTTSService
|
||||
from dailyai.services.ai_services import FrameLogger
|
||||
|
||||
from dailyai.services.fal_ai_services import FalImageGenService
|
||||
|
||||
from examples.foundational.support.runner import configure
|
||||
|
||||
|
||||
command_line_prompt = ' '.join(sys.argv[1:])
|
||||
|
||||
system_prompt = """
|
||||
You are a friendly robot character with a cartoon body with head, torso, arms, feet,
|
||||
and legs.
|
||||
|
||||
You can change your appearance using the `change_appearance` function call.
|
||||
You can add or remove items from your body, change
|
||||
your color, and more. You can use function calling to change your appearance.
|
||||
|
||||
When changing your appearance, please create a prompt as an argument to the function.
|
||||
The prompt will help the image generation model
|
||||
create a new appearance for you. Include as much detail as possible. Include the
|
||||
keywords "robot", "friendly", "cartoon", "smiling", "happy", "animated".
|
||||
The initial image prompt you are adding to or changing is
|
||||
"A friendly cartoon robot, smiling and happy, animated."
|
||||
|
||||
Do not include the image model prompt in your response. The prompt must be passed to the function
|
||||
as a parameter.
|
||||
"""
|
||||
|
||||
change_appearance_function = {
|
||||
"name": "change_appearance",
|
||||
"description": "Call this function when the users want you to change your appearance.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"appearance": {
|
||||
"type": "string",
|
||||
"description": "The new appearance for the robot, in the form of a prompt for an generative AI diffusion model."
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tools = [
|
||||
{
|
||||
"type": "function",
|
||||
"function": change_appearance_function
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
async def main(room_url: str, token):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
context = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": system_prompt,
|
||||
},
|
||||
]
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
duration_minutes=30,
|
||||
start_transcription=True,
|
||||
mic_enabled=True,
|
||||
mic_sample_rate=16000,
|
||||
camera_enabled=True,
|
||||
camera_width=1024,
|
||||
camera_height=1024,
|
||||
# TODO-CB: Should this be VAD enabled or something?
|
||||
speaker_enabled=True,
|
||||
context=context
|
||||
)
|
||||
|
||||
imagegen = FalImageGenService(
|
||||
image_size="512x512",
|
||||
aiohttp_session=session,
|
||||
key_id=os.getenv("FAL_KEY_ID"),
|
||||
key_secret=os.getenv("FAL_KEY_SECRET"))
|
||||
|
||||
async def change_appearance(appearance):
|
||||
await asyncio.create_task(
|
||||
imagegen.run_to_queue(
|
||||
transport.send_queue, [
|
||||
TextQueueFrame(appearance)]))
|
||||
|
||||
llm = FireworksLLMService(
|
||||
context=context,
|
||||
api_key=os.getenv("FIREWORKS_API_KEY"),
|
||||
model="accounts/fireworks/models/firefunction-v1",
|
||||
# TODO - how can we modify tools list on the fly?
|
||||
tools=tools,
|
||||
change_appearance=change_appearance,
|
||||
transport=transport
|
||||
)
|
||||
tts = DeepgramTTSService(aiohttp_session=session, api_key=os.getenv(
|
||||
"DEEPGRAM_API_KEY"), voice=os.getenv("DEEPGRAM_VOICE"))
|
||||
fl = FrameLogger("just outside the innermost layer")
|
||||
|
||||
async def run_response(in_frame):
|
||||
await tts.run_to_queue(
|
||||
transport.send_queue,
|
||||
# tma_out.run(
|
||||
llm.run(
|
||||
# tma_in.run(
|
||||
fl.run(
|
||||
[StartStreamQueueFrame(), in_frame]
|
||||
)
|
||||
# )
|
||||
)
|
||||
# ),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
await change_appearance("A friendly cartoon robot, smiling and happy, animated.")
|
||||
return
|
||||
|
||||
await tts.say("Hi, I'm listening!", transport.send_queue)
|
||||
await asyncio.sleep(1)
|
||||
|
||||
await transport.receive_queue.put(UserStartedSpeakingFrame())
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
transport.on_transcription_message({
|
||||
"text": command_line_prompt,
|
||||
"participantId": "cb65b845-aac0-4fc8-987d-2e7ce3c7d8f0",
|
||||
"timestamp": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
|
||||
})
|
||||
# putting the frame into the queue directly doesn't seem to work
|
||||
# await transport.receive_queue.put(
|
||||
# TranscriptionQueueFrame(
|
||||
# "tell me a joke.",
|
||||
# "cb65b845-aac0-4fc8-987d-2e7ce3c7d8f0",
|
||||
# datetime.utcnow().strftime(
|
||||
# '%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
|
||||
# ))
|
||||
await asyncio.sleep(0.1)
|
||||
await transport.receive_queue.put(UserStoppedSpeakingFrame())
|
||||
|
||||
transport.transcription_settings["extra"]["endpointing"] = True
|
||||
transport.transcription_settings["extra"]["punctuate"] = True
|
||||
|
||||
await asyncio.gather(transport.run(), transport.run_conversation(run_response))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
(url, token) = configure()
|
||||
asyncio.run(main(url, token))
|
||||
Reference in New Issue
Block a user