Compare commits
10 Commits
cb/valoran
...
khk-functi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5d6d674ff6 | ||
|
|
1e552958aa | ||
|
|
17edfe98bd | ||
|
|
5100a7599b | ||
|
|
18c2b37358 | ||
|
|
0244f358d2 | ||
|
|
85fe6c0580 | ||
|
|
ae7482ed18 | ||
|
|
90d928be99 | ||
|
|
0703b926a3 |
@@ -12,6 +12,7 @@ dependencies = [
|
||||
"daily-python",
|
||||
"fal",
|
||||
"faster_whisper",
|
||||
"groq",
|
||||
"google-cloud-texttospeech",
|
||||
"numpy",
|
||||
"openai",
|
||||
@@ -19,6 +20,7 @@ dependencies = [
|
||||
"pyht",
|
||||
"python-dotenv",
|
||||
"torch",
|
||||
"torchaudio",
|
||||
"pyaudio",
|
||||
"typing-extensions"
|
||||
]
|
||||
|
||||
@@ -3,7 +3,7 @@ import copy
|
||||
import functools
|
||||
from typing import AsyncGenerator, Awaitable, Callable
|
||||
from dailyai.queue_aggregators import LLMAssistantContextAggregator, LLMContextAggregator, LLMUserContextAggregator
|
||||
from dailyai.queue_frame import EndStreamQueueFrame, QueueFrame, TranscriptionQueueFrame, UserStartedSpeakingFrame
|
||||
from dailyai.queue_frame import EndStreamQueueFrame, QueueFrame, TranscriptionQueueFrame
|
||||
|
||||
|
||||
class InterruptibleConversationWrapper:
|
||||
@@ -63,10 +63,9 @@ class InterruptibleConversationWrapper:
|
||||
if frame.participantId == self._my_participant_id:
|
||||
continue
|
||||
|
||||
if current_response_task and isinstance(frame, UserStartedSpeakingFrame):
|
||||
if current_response_task:
|
||||
current_response_task.cancel()
|
||||
self._interrupt()
|
||||
|
||||
|
||||
self._current_phrase += " " + frame.text
|
||||
current_llm_messages = copy.deepcopy(self._messages)
|
||||
|
||||
@@ -52,7 +52,7 @@ class LLMContextAggregator(AIService):
|
||||
if isinstance(frame, TranscriptionQueueFrame):
|
||||
if frame.participantId == self.bot_participant_id:
|
||||
return
|
||||
print(f"@@@ tma got a frame: {frame.text}")
|
||||
|
||||
# The common case for "pass through" is receiving frames from the LLM that we'll
|
||||
# use to update the "assistant" LLM messages, but also passing the text frames
|
||||
# along to a TTS service to be spoken to the user.
|
||||
@@ -65,11 +65,8 @@ class LLMContextAggregator(AIService):
|
||||
# though we check it above
|
||||
self.sentence += frame.text
|
||||
if self.sentence.endswith((".", "?", "!")):
|
||||
self.messages.append(
|
||||
{"role": self.role, "content": self.sentence})
|
||||
self.messages.append({"role": self.role, "content": self.sentence})
|
||||
self.sentence = ""
|
||||
# for message in self.messages:
|
||||
# print(f"{message['role']}: {message['content']}")
|
||||
yield LLMMessagesQueueFrame(self.messages)
|
||||
else:
|
||||
# type: ignore -- the linter thinks this isn't a TextQueueFrame, even
|
||||
@@ -81,8 +78,6 @@ class LLMContextAggregator(AIService):
|
||||
# Send any dangling words that weren't finished with punctuation.
|
||||
if self.complete_sentences and self.sentence:
|
||||
self.messages.append({"role": self.role, "content": self.sentence})
|
||||
# for message in self.messages:
|
||||
# print(f"{message['role']}: {message['content']}")
|
||||
yield LLMMessagesQueueFrame(self.messages)
|
||||
|
||||
|
||||
|
||||
@@ -23,15 +23,12 @@ class LLMResponseEndQueueFrame(QueueFrame):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass()
|
||||
class ChatMessageQueueFrame(QueueFrame):
|
||||
message: str
|
||||
class UserStartedSpeakingFrame(QueueFrame):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass()
|
||||
class LLMFunctionCallFrame(QueueFrame):
|
||||
function_name: str
|
||||
arguments: str
|
||||
class UserStoppedSpeakingFrame(QueueFrame):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass()
|
||||
@@ -55,6 +52,17 @@ class TextQueueFrame(QueueFrame):
|
||||
text: str
|
||||
|
||||
|
||||
@dataclass()
|
||||
class TextQueueOutOfBandFrame(TextQueueFrame):
|
||||
outOfBand: bool = True
|
||||
|
||||
|
||||
@dataclass()
|
||||
class TTSCompletedFrame(QueueFrame):
|
||||
text: str
|
||||
outOfBand: bool = False
|
||||
|
||||
|
||||
@dataclass()
|
||||
class TranscriptionQueueFrame(TextQueueFrame):
|
||||
participantId: str
|
||||
@@ -69,9 +77,3 @@ class LLMMessagesQueueFrame(QueueFrame):
|
||||
class AppMessageQueueFrame(QueueFrame):
|
||||
message: Any
|
||||
participantId: str
|
||||
|
||||
class UserStartedSpeakingFrame(QueueFrame):
|
||||
pass
|
||||
|
||||
class UserStoppedSpeakingFrame(QueueFrame):
|
||||
pass
|
||||
@@ -2,20 +2,22 @@ import asyncio
|
||||
import io
|
||||
import logging
|
||||
import time
|
||||
import datetime
|
||||
import wave
|
||||
|
||||
from dailyai.queue_frame import (
|
||||
QueueFrame,
|
||||
AudioQueueFrame,
|
||||
ControlQueueFrame,
|
||||
EndStreamQueueFrame,
|
||||
ImageQueueFrame,
|
||||
LLMMessagesQueueFrame,
|
||||
LLMResponseEndQueueFrame,
|
||||
LLMFunctionCallFrame,
|
||||
ChatMessageQueueFrame,
|
||||
QueueFrame,
|
||||
TextQueueFrame,
|
||||
TTSCompletedFrame,
|
||||
TranscriptionQueueFrame,
|
||||
UserStoppedSpeakingFrame
|
||||
)
|
||||
|
||||
from abc import abstractmethod
|
||||
@@ -43,7 +45,7 @@ class AIService:
|
||||
frames: Iterable[QueueFrame]
|
||||
| AsyncIterable[QueueFrame]
|
||||
| asyncio.Queue[QueueFrame],
|
||||
**kwargs) -> AsyncGenerator[QueueFrame, None]:
|
||||
) -> AsyncGenerator[QueueFrame, None]:
|
||||
try:
|
||||
if isinstance(frames, AsyncIterable):
|
||||
async for frame in frames:
|
||||
@@ -82,6 +84,11 @@ class AIService:
|
||||
|
||||
|
||||
class LLMService(AIService):
|
||||
|
||||
def __init__(self, context):
|
||||
super().__init__()
|
||||
self._context = context
|
||||
|
||||
@abstractmethod
|
||||
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
|
||||
yield ""
|
||||
@@ -90,25 +97,21 @@ class LLMService(AIService):
|
||||
async def run_llm(self, messages) -> str:
|
||||
pass
|
||||
|
||||
async def process_frame(self, frame: QueueFrame, tool_choice: str = None) -> AsyncGenerator[QueueFrame, None]:
|
||||
function_name = ""
|
||||
arguments = ""
|
||||
if isinstance(frame, LLMMessagesQueueFrame):
|
||||
async for text_chunk in self.run_llm_async(frame.messages, tool_choice):
|
||||
if isinstance(text_chunk, str):
|
||||
yield TextQueueFrame(text_chunk)
|
||||
elif text_chunk.function:
|
||||
if text_chunk.function.name:
|
||||
# function_name += text_chunk.function.name
|
||||
yield LLMFunctionCallFrame(function_name=text_chunk.function.name, arguments=None)
|
||||
if text_chunk.function.arguments:
|
||||
# arguments += text_chunk.function.arguments
|
||||
yield LLMFunctionCallFrame(function_name=None, arguments=text_chunk.function.arguments)
|
||||
|
||||
if (function_name and arguments):
|
||||
# yield LLMFunctionCallFrame(function_name=function_name, arguments=arguments)
|
||||
function_name = ""
|
||||
arguments = ""
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
print(f"##### process frame got a frame, {type(frame)}")
|
||||
if isinstance(frame, UserStoppedSpeakingFrame):
|
||||
print(
|
||||
f"### Got a user stopped speaking frame, context is {self._context}")
|
||||
async for chunk in self.run_llm_async(self._context):
|
||||
# if we get a string, wrap it in a frame
|
||||
if isinstance(chunk, str):
|
||||
yield TextQueueFrame(chunk)
|
||||
# if we get a frame, pass it through
|
||||
elif isinstance(chunk, QueueFrame):
|
||||
print(f"### Got a frame chunk: {chunk}")
|
||||
yield chunk
|
||||
else:
|
||||
print(f"### Got an unknown chunk: {chunk}")
|
||||
yield LLMResponseEndQueueFrame()
|
||||
else:
|
||||
yield frame
|
||||
@@ -133,6 +136,12 @@ class TTSService(AIService):
|
||||
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
if not isinstance(frame, TextQueueFrame):
|
||||
# We don't want transcription frames, which are a subclass
|
||||
yield frame
|
||||
return
|
||||
|
||||
# TODO-CB: Clean this up
|
||||
if isinstance(frame, TranscriptionQueueFrame):
|
||||
yield frame
|
||||
return
|
||||
|
||||
@@ -146,9 +155,12 @@ class TTSService(AIService):
|
||||
self.current_sentence = ""
|
||||
|
||||
if text:
|
||||
# yield ChatMessageQueueFrame(message=text)
|
||||
async for audio_chunk in self.run_tts(text):
|
||||
yield AudioQueueFrame(audio_chunk)
|
||||
size = 8000
|
||||
for i in range(0, len(audio_chunk), size):
|
||||
yield AudioQueueFrame(audio_chunk[i: i+size])
|
||||
print("### ABOUT TO YIELD TTS COMPLETED FRAME", frame)
|
||||
yield TTSCompletedFrame(text, hasattr(frame, 'outOfBand') and frame.outOfBand)
|
||||
|
||||
async def finalize(self):
|
||||
if self.current_sentence:
|
||||
@@ -218,8 +230,9 @@ class FrameLogger(AIService):
|
||||
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
if isinstance(frame, (AudioQueueFrame, ImageQueueFrame)):
|
||||
self.logger.info(f"{self.prefix}: {type(frame)}")
|
||||
self.logger.info(
|
||||
f"{datetime.datetime.utcnow().isoformat()} {self.prefix}: {type(frame)}")
|
||||
else:
|
||||
self.logger.info(f"{self.prefix}: {frame}")
|
||||
print(f"{datetime.datetime.utcnow().isoformat()} {self.prefix}: {frame}")
|
||||
|
||||
yield frame
|
||||
|
||||
@@ -42,14 +42,16 @@ class AzureTTSService(TTSService):
|
||||
yield result.audio_data[44:]
|
||||
elif result.reason == ResultReason.Canceled:
|
||||
cancellation_details = result.cancellation_details
|
||||
self.logger.info("Speech synthesis canceled: {}".format(cancellation_details.reason))
|
||||
self.logger.info("Speech synthesis canceled: {}".format(
|
||||
cancellation_details.reason))
|
||||
if cancellation_details.reason == CancellationReason.Error:
|
||||
self.logger.info("Error details: {}".format(cancellation_details.error_details))
|
||||
self.logger.info("Error details: {}".format(
|
||||
cancellation_details.error_details))
|
||||
|
||||
|
||||
class AzureLLMService(LLMService):
|
||||
def __init__(self, *, api_key, endpoint, api_version="2023-12-01-preview", model):
|
||||
super().__init__()
|
||||
def __init__(self, *, api_key, endpoint, api_version="2023-12-01-preview", model, context):
|
||||
super().__init__(context)
|
||||
self._model: str = model
|
||||
|
||||
self._client = AsyncAzureOpenAI(
|
||||
@@ -102,7 +104,8 @@ class AzureImageGenServiceREST(ImageGenService):
|
||||
|
||||
async def run_image_gen(self, sentence) -> tuple[str, bytes]:
|
||||
url = f"{self._azure_endpoint}openai/images/generations:submit?api-version={self._api_version}"
|
||||
headers = {"api-key": self._api_key, "Content-Type": "application/json"}
|
||||
headers = {"api-key": self._api_key,
|
||||
"Content-Type": "application/json"}
|
||||
body = {
|
||||
# Enter your prompt text here
|
||||
"prompt": sentence,
|
||||
|
||||
@@ -1,32 +1,37 @@
|
||||
from abc import abstractmethod
|
||||
import asyncio
|
||||
import copy
|
||||
import functools
|
||||
import itertools
|
||||
import logging
|
||||
import numpy as np
|
||||
import pyaudio
|
||||
import torch
|
||||
import torchaudio
|
||||
import queue
|
||||
import threading
|
||||
import time
|
||||
from typing import AsyncGenerator
|
||||
import numpy as np
|
||||
import pyaudio
|
||||
import torch
|
||||
import torchaudio
|
||||
from enum import Enum
|
||||
import datetime
|
||||
import traceback
|
||||
|
||||
from typing import AsyncGenerator, AsyncIterable, BinaryIO, Iterable
|
||||
from dailyai.queue_aggregators import LLMAssistantContextAggregator, LLMUserContextAggregator
|
||||
|
||||
from dailyai.queue_frame import (
|
||||
AudioQueueFrame,
|
||||
ChatMessageQueueFrame,
|
||||
EndStreamQueueFrame,
|
||||
ImageQueueFrame,
|
||||
QueueFrame,
|
||||
SpriteQueueFrame,
|
||||
StartStreamQueueFrame,
|
||||
TranscriptionQueueFrame,
|
||||
TTSCompletedFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame
|
||||
)
|
||||
|
||||
|
||||
torch.set_num_threads(1)
|
||||
|
||||
model, utils = torch.hub.load(repo_or_dir='snakers4/silero-vad',
|
||||
@@ -74,6 +79,7 @@ class VADState(Enum):
|
||||
SPEAKING = 3
|
||||
STOPPING = 4
|
||||
|
||||
|
||||
class BaseTransportService():
|
||||
|
||||
def __init__(
|
||||
@@ -89,13 +95,9 @@ class BaseTransportService():
|
||||
self._speaker_sample_rate = kwargs.get("speaker_sample_rate") or 16000
|
||||
self._fps = kwargs.get("fps") or 8
|
||||
self._vad_start_s = kwargs.get("vad_start_s") or 0.2
|
||||
self._vad_stop_s = kwargs.get("vad_stop_s") or 0.8
|
||||
self._vad_stop_s = kwargs.get("vad_stop_s") or 0.5
|
||||
self._context = kwargs.get("context") or []
|
||||
self._vad_enabled = kwargs.get("vad_enabled") or False
|
||||
|
||||
if self._vad_enabled and self._speaker_enabled:
|
||||
raise Exception("Sorry, you can't use speaker_enabled and vad_enabled at the same time. Please set one to False.")
|
||||
|
||||
|
||||
self._vad_samples = 1536
|
||||
vad_frame_s = self._vad_samples / SAMPLE_RATE
|
||||
self._vad_start_frames = round(self._vad_start_s / vad_frame_s)
|
||||
@@ -103,8 +105,7 @@ class BaseTransportService():
|
||||
self._vad_starting_count = 0
|
||||
self._vad_stopping_count = 0
|
||||
self._vad_state = VADState.QUIET
|
||||
self._user_is_speaking = False
|
||||
|
||||
|
||||
duration_minutes = kwargs.get("duration_minutes") or 10
|
||||
self._expiration = time.time() + duration_minutes * 60
|
||||
|
||||
@@ -114,6 +115,8 @@ class BaseTransportService():
|
||||
self._threadsafe_send_queue = queue.Queue()
|
||||
|
||||
self._images = None
|
||||
self._user_is_speaking = False
|
||||
self._current_phrase = ""
|
||||
|
||||
try:
|
||||
self._loop: asyncio.AbstractEventLoop | None = asyncio.get_running_loop()
|
||||
@@ -124,41 +127,63 @@ class BaseTransportService():
|
||||
self._is_interrupted = threading.Event()
|
||||
|
||||
self._logger: logging.Logger = logging.getLogger()
|
||||
|
||||
|
||||
def update_messages(self, new_context: list[dict[str, str]], task: asyncio.Task | None):
|
||||
if task:
|
||||
if not task.cancelled():
|
||||
self._current_phrase = ""
|
||||
self._context = new_context
|
||||
|
||||
|
||||
|
||||
|
||||
def append_to_context(self, role, chunk_or_text):
|
||||
print("IN APPEND", chunk_or_text)
|
||||
# if we get a non-string, append it to the context without further error checking
|
||||
# unless the outOfBand property is True
|
||||
if not isinstance(chunk_or_text, str):
|
||||
|
||||
if not chunk_or_text.get("outOfBand") == True:
|
||||
self._context.append(chunk_or_text)
|
||||
return
|
||||
|
||||
text = chunk_or_text
|
||||
last_context_item = self._context[-1]
|
||||
|
||||
print("TEXT", text)
|
||||
print("LAST CONTEXT ITEM", last_context_item)
|
||||
traceback.print_stack()
|
||||
|
||||
if last_context_item and last_context_item['role'] == role:
|
||||
last_context_item['content'] += f" {text}"
|
||||
else:
|
||||
self._context.append({"role": role, "content": text})
|
||||
|
||||
async def run_pipeline(self, frame):
|
||||
print(f"starting to speak_after_delay, {frame}")
|
||||
# TODO-CB: This exception for missing class gets eaten!
|
||||
await self._runner(frame)
|
||||
|
||||
|
||||
async def run_conversation(self, runner: Iterable[QueueFrame]
|
||||
| AsyncIterable[QueueFrame]
|
||||
| asyncio.Queue[QueueFrame],
|
||||
) -> AsyncGenerator[QueueFrame, None]:
|
||||
current_response_task = None
|
||||
self._runner = runner
|
||||
|
||||
|
||||
async for frame in self.get_receive_frames():
|
||||
print(f"got frame of type: {type(frame)}, {frame}")
|
||||
if isinstance(frame, EndStreamQueueFrame):
|
||||
break
|
||||
# elif not isinstance(frame, TranscriptionQueueFrame):
|
||||
# continue
|
||||
# TODO-CB: Verify this is an accurate replacement
|
||||
# if hasattr(frame, 'participantId') and frame.participantId == self._my_participant_id:
|
||||
# if not isinstance(frame, UserStoppedSpeakingFrame):
|
||||
# continue
|
||||
|
||||
if current_response_task and isinstance(frame, UserStartedSpeakingFrame):
|
||||
if not isinstance(frame, UserStoppedSpeakingFrame):
|
||||
continue
|
||||
|
||||
if current_response_task:
|
||||
# TODO-CB: Maybe not always interrupt? Are there frame types we can pass through?
|
||||
current_response_task.cancel()
|
||||
self.interrupt()
|
||||
|
||||
|
||||
# self._current_phrase += " " + frame.text
|
||||
# current_llm_context = copy.deepcopy(self._context)
|
||||
current_response_task = asyncio.create_task(
|
||||
@@ -184,11 +209,11 @@ class BaseTransportService():
|
||||
self._frame_consumer_thread.start()
|
||||
|
||||
if self._speaker_enabled:
|
||||
self._receive_audio_thread = threading.Thread(
|
||||
target=self._receive_audio, daemon=True)
|
||||
self._receive_audio_thread.start()
|
||||
|
||||
if self._vad_enabled:
|
||||
# TODO-CB: This is interesting
|
||||
# self._receive_audio_thread = threading.Thread(
|
||||
# target=self._receive_audio, daemon=True)
|
||||
# self._receive_audio_thread.start()
|
||||
|
||||
self._vad_thread = threading.Thread(target=self._vad, daemon=True)
|
||||
self._vad_thread.start()
|
||||
|
||||
@@ -214,10 +239,6 @@ class BaseTransportService():
|
||||
|
||||
if self._speaker_enabled:
|
||||
self._receive_audio_thread.join()
|
||||
|
||||
if self._vad_enabled:
|
||||
self._vad_thread.join()
|
||||
|
||||
|
||||
def _post_run(self):
|
||||
# Note that this function must be idempotent! It can be called multiple times
|
||||
@@ -250,7 +271,7 @@ class BaseTransportService():
|
||||
@abstractmethod
|
||||
def _prerun(self):
|
||||
pass
|
||||
|
||||
|
||||
def _vad(self):
|
||||
# CB: Starting silero VAD stuff
|
||||
# TODO-CB: Probably need to force virtual speaker creation if we're
|
||||
@@ -263,6 +284,7 @@ class BaseTransportService():
|
||||
new_confidence = model(
|
||||
torch.from_numpy(audio_float32), 16000).item()
|
||||
speaking = new_confidence > 0.5
|
||||
|
||||
if speaking:
|
||||
match self._vad_state:
|
||||
case VADState.QUIET:
|
||||
@@ -283,25 +305,28 @@ class BaseTransportService():
|
||||
self._vad_stopping_count = 1
|
||||
case VADState.STOPPING:
|
||||
self._vad_stopping_count += 1
|
||||
|
||||
|
||||
if self._vad_state == VADState.STARTING and self._vad_starting_count >= self._vad_start_frames:
|
||||
print("##### VAD START")
|
||||
print(
|
||||
f'!!! {datetime.datetime.utcnow().isoformat()} queueing start frame')
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.receive_queue.put(
|
||||
UserStartedSpeakingFrame()), self._loop
|
||||
)
|
||||
print(f"!!! VAD started, calling interrupt")
|
||||
self.interrupt()
|
||||
self._vad_state = VADState.SPEAKING
|
||||
self._vad_starting_count = 0
|
||||
if self._vad_state == VADState.STOPPING and self._vad_stopping_count >= self._vad_stop_frames:
|
||||
print("##### VAD STOP")
|
||||
print(
|
||||
f'!!! {datetime.datetime.utcnow().isoformat()} queueing stop frame')
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.receive_queue.put(
|
||||
UserStoppedSpeakingFrame()), self._loop
|
||||
)
|
||||
self._vad_state = VADState.QUIET
|
||||
self._vad_stopping_count = 0
|
||||
|
||||
|
||||
async def _marshal_frames(self):
|
||||
while True:
|
||||
frame: QueueFrame | list = await self.send_queue.get()
|
||||
@@ -311,7 +336,7 @@ class BaseTransportService():
|
||||
break
|
||||
|
||||
def interrupt(self):
|
||||
print(f"!!!!! INTERRUPT")
|
||||
print(f"!!! setting interrupt")
|
||||
self._is_interrupted.set()
|
||||
|
||||
async def get_receive_frames(self) -> AsyncGenerator[QueueFrame, None]:
|
||||
@@ -335,6 +360,7 @@ class BaseTransportService():
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.receive_queue.put(frame), self._loop
|
||||
)
|
||||
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.receive_queue.put(EndStreamQueueFrame()), self._loop
|
||||
)
|
||||
@@ -361,19 +387,13 @@ class BaseTransportService():
|
||||
self._logger.info("🎬 Starting frame consumer thread")
|
||||
b = bytearray()
|
||||
smallest_write_size = 3200
|
||||
largest_write_size = 8000
|
||||
all_audio_frames = bytearray()
|
||||
while True:
|
||||
try:
|
||||
frames_or_frame: QueueFrame | list[QueueFrame] = (
|
||||
self._threadsafe_send_queue.get()
|
||||
)
|
||||
if isinstance(frames_or_frame, AudioQueueFrame) and len(frames_or_frame.data) > largest_write_size:
|
||||
# subdivide large audio frames to enable interruption
|
||||
frames = []
|
||||
for i in range(0, len(frames_or_frame.data), largest_write_size):
|
||||
frames.append(AudioQueueFrame(frames_or_frame.data[i : i+largest_write_size]))
|
||||
elif isinstance(frames_or_frame, QueueFrame):
|
||||
if isinstance(frames_or_frame, QueueFrame):
|
||||
frames: list[QueueFrame] = [frames_or_frame]
|
||||
elif isinstance(frames_or_frame, list):
|
||||
frames: list[QueueFrame] = frames_or_frame
|
||||
@@ -391,7 +411,6 @@ class BaseTransportService():
|
||||
if frame:
|
||||
if isinstance(frame, AudioQueueFrame):
|
||||
chunk = frame.data
|
||||
|
||||
all_audio_frames.extend(chunk)
|
||||
|
||||
b.extend(chunk)
|
||||
@@ -406,14 +425,16 @@ class BaseTransportService():
|
||||
self._set_image(frame.image)
|
||||
elif isinstance(frame, SpriteQueueFrame):
|
||||
self._set_images(frame.images)
|
||||
elif isinstance(frame, ChatMessageQueueFrame):
|
||||
self._send_chat_message(frame)
|
||||
elif isinstance(frame, TTSCompletedFrame) and not frame.outOfBand:
|
||||
self.append_to_context(
|
||||
"assistant", frame.text)
|
||||
elif len(b):
|
||||
self.write_frame_to_mic(bytes(b))
|
||||
b = bytearray()
|
||||
else:
|
||||
# if there are leftover audio bytes, write them now; failing to do so
|
||||
# can cause static in the audio stream.
|
||||
print(f"!!! interrupted, flushing audio")
|
||||
if len(b):
|
||||
truncated_length = len(b) - (len(b) % 160)
|
||||
self.write_frame_to_mic(
|
||||
@@ -430,6 +451,6 @@ class BaseTransportService():
|
||||
|
||||
b = bytearray()
|
||||
except Exception as e:
|
||||
print(
|
||||
self._logger.error(
|
||||
f"Exception in frame_consumer: {e}, {len(b)}")
|
||||
raise e
|
||||
|
||||
@@ -1,18 +1,4 @@
|
||||
import asyncio
|
||||
import inspect
|
||||
import logging
|
||||
import signal
|
||||
import threading
|
||||
import types
|
||||
|
||||
from functools import partial
|
||||
|
||||
from dailyai.queue_frame import (
|
||||
TranscriptionQueueFrame,
|
||||
)
|
||||
|
||||
from threading import Event
|
||||
|
||||
from dailyai.services.base_transport_service import BaseTransportService
|
||||
from daily import (
|
||||
EventHandler,
|
||||
CallClient,
|
||||
@@ -21,8 +7,61 @@ from daily import (
|
||||
VirtualMicrophoneDevice,
|
||||
VirtualSpeakerDevice,
|
||||
)
|
||||
from threading import Event
|
||||
from dailyai.queue_frame import (
|
||||
TranscriptionQueueFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame
|
||||
)
|
||||
from functools import partial
|
||||
import types
|
||||
import pyaudio
|
||||
import torchaudio
|
||||
import asyncio
|
||||
import inspect
|
||||
import io
|
||||
import logging
|
||||
import numpy as np
|
||||
import signal
|
||||
import threading
|
||||
import torch
|
||||
torch.set_num_threads(1)
|
||||
|
||||
from dailyai.services.base_transport_service import BaseTransportService
|
||||
model, utils = torch.hub.load(repo_or_dir='snakers4/silero-vad',
|
||||
model='silero_vad',
|
||||
force_reload=False)
|
||||
|
||||
(get_speech_timestamps,
|
||||
save_audio,
|
||||
read_audio,
|
||||
VADIterator,
|
||||
collect_chunks) = utils
|
||||
|
||||
# Taken from utils_vad.py
|
||||
|
||||
|
||||
def validate(model,
|
||||
inputs: torch.Tensor):
|
||||
with torch.no_grad():
|
||||
outs = model(inputs)
|
||||
return outs
|
||||
|
||||
# Provided by Alexander Veysov
|
||||
|
||||
|
||||
def int2float(sound):
|
||||
abs_max = np.abs(sound).max()
|
||||
sound = sound.astype('float32')
|
||||
if abs_max > 0:
|
||||
sound *= 1/32768
|
||||
sound = sound.squeeze() # depends on the use case
|
||||
return sound
|
||||
|
||||
|
||||
FORMAT = pyaudio.paInt16
|
||||
CHANNELS = 1
|
||||
SAMPLE_RATE = 16000
|
||||
CHUNK = int(SAMPLE_RATE / 10)
|
||||
|
||||
audio = pyaudio.PyAudio()
|
||||
|
||||
|
||||
class DailyTransportService(BaseTransportService, EventHandler):
|
||||
@@ -31,7 +70,6 @@ class DailyTransportService(BaseTransportService, EventHandler):
|
||||
|
||||
_speaker_enabled: bool
|
||||
_speaker_sample_rate: int
|
||||
_vad_enabled: bool
|
||||
|
||||
# This is necessary to override EventHandler's __new__ method.
|
||||
def __new__(cls, *args, **kwargs):
|
||||
@@ -46,7 +84,8 @@ class DailyTransportService(BaseTransportService, EventHandler):
|
||||
start_transcription: bool = False,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs) # This will call BaseTransportService.__init__ method, not EventHandler
|
||||
# This will call BaseTransportService.__init__ method, not EventHandler
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self._room_url: str = room_url
|
||||
self._bot_name: str = bot_name
|
||||
@@ -146,61 +185,55 @@ class DailyTransportService(BaseTransportService, EventHandler):
|
||||
"camera", width=self._camera_width, height=self._camera_height, color_format="RGB"
|
||||
)
|
||||
|
||||
if self._speaker_enabled or self._vad_enabled:
|
||||
if self._speaker_enabled:
|
||||
self._speaker: VirtualSpeakerDevice = Daily.create_speaker_device(
|
||||
"speaker", sample_rate=self._speaker_sample_rate, channels=1
|
||||
)
|
||||
Daily.select_speaker_device("speaker")
|
||||
|
||||
self.client.set_user_name(self._bot_name)
|
||||
|
||||
self.client.join(
|
||||
self._room_url,
|
||||
self._token,
|
||||
completion=self.call_joined,
|
||||
client_settings={
|
||||
"inputs": {
|
||||
"camera": {
|
||||
"isEnabled": True,
|
||||
"settings": {
|
||||
"deviceId": "camera",
|
||||
},
|
||||
},
|
||||
"microphone": {
|
||||
"isEnabled": True,
|
||||
"settings": {
|
||||
"deviceId": "mic",
|
||||
"customConstraints": {
|
||||
"autoGainControl": {"exact": False},
|
||||
"echoCancellation": {"exact": False},
|
||||
"noiseSuppression": {"exact": False},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"publishing": {
|
||||
"camera": {
|
||||
"sendSettings": {
|
||||
"maxQuality": "low",
|
||||
"encodings": {
|
||||
"low": {
|
||||
"maxBitrate": 250000,
|
||||
"scaleResolutionDownBy": 1.333,
|
||||
"maxFramerate": 8,
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
},
|
||||
)
|
||||
self.client.join(self._room_url, self._token,
|
||||
completion=self.call_joined)
|
||||
self._my_participant_id = self.client.participants()["local"]["id"]
|
||||
|
||||
self.client.update_subscription_profiles({
|
||||
"base": {
|
||||
"camera": "unsubscribed",
|
||||
self.client.update_inputs(
|
||||
{
|
||||
"camera": {
|
||||
"isEnabled": True,
|
||||
"settings": {
|
||||
"deviceId": "camera",
|
||||
},
|
||||
},
|
||||
"microphone": {
|
||||
"isEnabled": True,
|
||||
"settings": {
|
||||
"deviceId": "mic",
|
||||
"customConstraints": {
|
||||
"autoGainControl": {"exact": False},
|
||||
"echoCancellation": {"exact": False},
|
||||
"noiseSuppression": {"exact": False},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
self.client.update_publishing(
|
||||
{
|
||||
"camera": {
|
||||
"sendSettings": {
|
||||
"maxQuality": "low",
|
||||
"encodings": {
|
||||
"low": {
|
||||
"maxBitrate": 250000,
|
||||
"scaleResolutionDownBy": 1.333,
|
||||
"maxFramerate": 8,
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
if self._token and self._start_transcription:
|
||||
self.client.start_transcription(self.transcription_settings)
|
||||
@@ -239,23 +272,34 @@ class DailyTransportService(BaseTransportService, EventHandler):
|
||||
self._other_participant_has_joined = True
|
||||
self.on_first_other_participant_joined()
|
||||
|
||||
"""
|
||||
def on_participant_left(self, participant, reason):
|
||||
if len(self.client.participants()) < self._min_others_count + 1:
|
||||
self._stop_threads.set()
|
||||
"""
|
||||
|
||||
async def insert_speech(self, text, sender, date):
|
||||
await self.receive_queue.put(UserStartedSpeakingFrame())
|
||||
await asyncio.sleep(0.3)
|
||||
|
||||
# frame = TranscriptionQueueFrame(text, sender, date)
|
||||
# await self.receive_queue.put(frame)
|
||||
self.on_transcription_message({
|
||||
"text": text,
|
||||
"participantId": "cb65b845-aac0-4fc8-987d-2e7ce3c7d8f0",
|
||||
"timestamp": date
|
||||
})
|
||||
|
||||
await asyncio.sleep(0.3)
|
||||
await self.receive_queue.put(UserStoppedSpeakingFrame())
|
||||
|
||||
def on_app_message(self, message, sender):
|
||||
print(f"app message: {message}")
|
||||
if self._loop:
|
||||
frame = TranscriptionQueueFrame(
|
||||
message["message"], message["name"], message["date"])
|
||||
print("APP MESSAGE", message)
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.receive_queue.put(frame), self._loop)
|
||||
pass
|
||||
self.insert_speech(message["message"], sender, message["date"]), self._loop)
|
||||
|
||||
def on_transcription_message(self, message: dict):
|
||||
if self._loop:
|
||||
print(f"transcription: {message}")
|
||||
participantId = ""
|
||||
if "participantId" in message:
|
||||
participantId = message["participantId"]
|
||||
@@ -263,6 +307,8 @@ class DailyTransportService(BaseTransportService, EventHandler):
|
||||
participantId = message["session_id"]
|
||||
frame = TranscriptionQueueFrame(
|
||||
message["text"], participantId, message["timestamp"])
|
||||
if self._my_participant_id and participantId != self._my_participant_id:
|
||||
self.append_to_context("user", message["text"])
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.receive_queue.put(frame), self._loop)
|
||||
|
||||
@@ -274,11 +320,3 @@ class DailyTransportService(BaseTransportService, EventHandler):
|
||||
|
||||
def on_transcription_started(self, status):
|
||||
pass
|
||||
|
||||
def _send_chat_message(self, frame):
|
||||
self.client.send_app_message(
|
||||
{'message': frame.message, 'event': 'chat-msg', 'name': self._bot_name, 'date': time.time(), 'room': 'main-room'})
|
||||
|
||||
def stop(self):
|
||||
super().stop()
|
||||
self.client.leave()
|
||||
|
||||
@@ -26,8 +26,7 @@ class ElevenLabsTTSService(TTSService):
|
||||
async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]:
|
||||
url = f"https://api.elevenlabs.io/v1/text-to-speech/{self._voice_id}/stream"
|
||||
payload = {"text": sentence, "model_id": "eleven_turbo_v2"}
|
||||
querystring = {"output_format": "pcm_16000",
|
||||
"optimize_streaming_latency": 2}
|
||||
querystring = {"output_format": "pcm_16000", "optimize_streaming_latency": 2}
|
||||
headers = {
|
||||
"xi-api-key": self._api_key,
|
||||
"Content-Type": "application/json",
|
||||
|
||||
@@ -32,7 +32,8 @@ class FalImageGenService(ImageGenService):
|
||||
handler = fal.apps.submit(
|
||||
"110602490-fast-sdxl",
|
||||
arguments={
|
||||
"prompt": sentence
|
||||
"prompt": sentence,
|
||||
"seed": 23
|
||||
},
|
||||
)
|
||||
for event in handler.iter_events():
|
||||
|
||||
122
src/dailyai/services/fireworks_ai_services.py
Normal file
122
src/dailyai/services/fireworks_ai_services.py
Normal file
@@ -0,0 +1,122 @@
|
||||
import aiohttp
|
||||
from PIL import Image
|
||||
import io
|
||||
from openai import AsyncOpenAI
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from collections.abc import AsyncGenerator
|
||||
|
||||
from dailyai.services.ai_services import LLMService, ImageGenService
|
||||
|
||||
from dailyai.queue_frame import (TextQueueFrame, TextQueueOutOfBandFrame)
|
||||
|
||||
|
||||
class FireworksLLMService(LLMService):
|
||||
def __init__(self, *, api_key, model="", tools=[], context, change_appearance, transport=""):
|
||||
super().__init__(context)
|
||||
self._model = model
|
||||
self._tools = tools
|
||||
self._change_appearance = change_appearance
|
||||
self._transport = transport
|
||||
self._client = AsyncOpenAI(
|
||||
api_key=api_key,
|
||||
base_url="https://api.fireworks.ai/inference/v1"
|
||||
)
|
||||
|
||||
async def get_response(self, messages, stream):
|
||||
print("GET RESPONSE ... WHEN DO WE EXPECT THIS TO BE CALLED?")
|
||||
return await self._client.chat.completions.create(
|
||||
stream=stream,
|
||||
messages=messages,
|
||||
model=self._model,
|
||||
temperature=0.1,
|
||||
tools=self._tools
|
||||
)
|
||||
|
||||
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
|
||||
print("IN ASYNC")
|
||||
messages_for_log = json.dumps(messages)
|
||||
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
|
||||
|
||||
chunks = await self._client.chat.completions.create(
|
||||
model=self._model,
|
||||
stream=True, # BLARGH
|
||||
messages=messages,
|
||||
temperature=0.1,
|
||||
tools=self._tools
|
||||
)
|
||||
|
||||
tool_call = {}
|
||||
|
||||
async for chunk in chunks:
|
||||
print(f"CHUNK: {chunk}")
|
||||
if len(chunk.choices) == 0:
|
||||
continue
|
||||
|
||||
if chunk.choices[0].delta.content:
|
||||
yield chunk.choices[0].delta.content
|
||||
|
||||
if chunk.choices[0].delta.tool_calls:
|
||||
print(f"TOOL CALLS: {chunk.choices[0].delta.tool_calls[0]}")
|
||||
if chunk.choices[0].delta.tool_calls[0].function.name:
|
||||
tool_call["id"] = chunk.choices[0].delta.tool_calls[0].id
|
||||
tool_call["name"] = chunk.choices[0].delta.tool_calls[0].function.name
|
||||
tool_call["arguments"] = ''
|
||||
if chunk.choices[0].delta.tool_calls[0].function.arguments:
|
||||
tool_call["arguments"] += chunk.choices[0].delta.tool_calls[0].function.arguments
|
||||
|
||||
if chunk.choices[0].finish_reason:
|
||||
print(f"TOOL CALLS ACCUM -- {tool_call}")
|
||||
if tool_call.get("name"):
|
||||
# hard coding tool call action for now. we should assemble the tool call
|
||||
# from the streaming response, then yield it to the pipeline.
|
||||
# this approach works for the first few change appearance requests but
|
||||
# then the model starts refusing. need to read more about function
|
||||
# calling, try this with the OpenAI APIs, and talk to the Fireworks people.
|
||||
self._transport.append_to_context("assistant", {
|
||||
# pipeline will append the content to this context after it goes
|
||||
# through tts. we need to manually append the tool call, though
|
||||
"content": "",
|
||||
"role": "assistant",
|
||||
"tool_calls": [
|
||||
{
|
||||
"id": tool_call["id"],
|
||||
"type": "function",
|
||||
"index": 0,
|
||||
"function": {
|
||||
"name": tool_call["name"],
|
||||
"arguments": tool_call["arguments"]
|
||||
},
|
||||
}
|
||||
],
|
||||
})
|
||||
self._transport.append_to_context("tool", {
|
||||
"content": "image generated by prompt arguments: " + tool_call["arguments"],
|
||||
"role": "tool",
|
||||
"tool_call_id": tool_call["id"]
|
||||
})
|
||||
self._transport.append_to_context("assistant", {
|
||||
"content": f"call to {tool_call['name']} function succeeded",
|
||||
"role": "assistant",
|
||||
})
|
||||
print("APPENDED TO CONTEXT")
|
||||
image_prompt = json.loads(
|
||||
tool_call["arguments"]).get("appearance")
|
||||
print("IMAGE PROMPT", image_prompt)
|
||||
asyncio.create_task(
|
||||
self._change_appearance(image_prompt))
|
||||
yield TextQueueOutOfBandFrame("Sure, let me work on that for you!")
|
||||
# yield {"content": "Sure, let me work on that for you!"}
|
||||
# yield "Sure, let me work on that for you!"
|
||||
|
||||
async def run_llm(self, messages) -> str | None:
|
||||
print("--> IN SYNC ... WHEN DO WE EXPECT THIS TO BE CALLED?")
|
||||
messages_for_log = json.dumps(messages)
|
||||
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
|
||||
|
||||
response = await self._client.chat.completions.create(model=self._model, stream=False, messages=messages)
|
||||
if response and len(response.choices) > 0:
|
||||
return response.choices[0].message.content
|
||||
else:
|
||||
return None
|
||||
33
src/dailyai/services/groq_ai_services.py
Normal file
33
src/dailyai/services/groq_ai_services.py
Normal file
@@ -0,0 +1,33 @@
|
||||
import os
|
||||
import groq
|
||||
from groq import AsyncGroq
|
||||
from dailyai.services.ai_services import LLMService
|
||||
from collections.abc import AsyncGenerator
|
||||
|
||||
|
||||
class GroqLLMService(LLMService):
|
||||
def __init__(self, *, api_key, model="mixtral-8x7b-32768", context):
|
||||
super().__init__(context)
|
||||
self._model = model
|
||||
# os.environ["GROQ_SECRET_ACCESS_KEY"] = api_key
|
||||
|
||||
self._client = AsyncGroq()
|
||||
|
||||
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
|
||||
print(f"messages are {messages}")
|
||||
try:
|
||||
resp = await self._client.chat.completions.create(messages=messages, model=self._model)
|
||||
print(f"got chunks from groq: {resp}")
|
||||
|
||||
if resp.choices[0].message.content:
|
||||
yield resp.choices[0].message.content
|
||||
except groq.APIConnectionError as e:
|
||||
print("The server could not be reached")
|
||||
print(e.__cause__) # an underlying Exception, likely raised within httpx.
|
||||
except groq.RateLimitError as e:
|
||||
print("A 429 status code was received; we should back off a bit.")
|
||||
except groq.APIStatusError as e:
|
||||
print("Another non-200-range status code was received")
|
||||
print(e.status_code)
|
||||
print(e.response)
|
||||
|
||||
@@ -10,39 +10,28 @@ from dailyai.services.ai_services import LLMService, ImageGenService
|
||||
|
||||
|
||||
class OpenAILLMService(LLMService):
|
||||
def __init__(self, *, api_key, model="gpt-4", tools=None):
|
||||
super().__init__()
|
||||
def __init__(self, *, api_key, model="gpt-4-turbo-preview", context):
|
||||
super().__init__(context)
|
||||
self._model = model
|
||||
self._tools = tools
|
||||
self._client = AsyncOpenAI(api_key=api_key)
|
||||
|
||||
async def get_response(self, messages, stream):
|
||||
return await self._client.chat.completions.create(
|
||||
stream=stream,
|
||||
messages=messages,
|
||||
model=self._model,
|
||||
tools=self._tools
|
||||
model=self._model
|
||||
)
|
||||
|
||||
async def run_llm_async(self, messages, tool_choice=None) -> AsyncGenerator[str, None]:
|
||||
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
|
||||
messages_for_log = json.dumps(messages)
|
||||
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
|
||||
print("---")
|
||||
print(f"tools: {self._tools}")
|
||||
print("---")
|
||||
print(f"messages: {messages_for_log}")
|
||||
print("-----")
|
||||
if self._tools:
|
||||
tools = self._tools
|
||||
else:
|
||||
tools = None
|
||||
chunks = await self._client.chat.completions.create(model=self._model, stream=True, messages=messages, tools=tools, tool_choice=tool_choice)
|
||||
|
||||
chunks = await self._client.chat.completions.create(model=self._model, stream=True, messages=messages)
|
||||
async for chunk in chunks:
|
||||
if len(chunk.choices) == 0:
|
||||
continue
|
||||
if chunk.choices[0].delta.tool_calls:
|
||||
yield chunk.choices[0].delta.tool_calls[0]
|
||||
elif chunk.choices[0].delta.content:
|
||||
|
||||
if chunk.choices[0].delta.content:
|
||||
yield chunk.choices[0].delta.content
|
||||
|
||||
async def run_llm(self, messages) -> str | None:
|
||||
|
||||
@@ -17,8 +17,7 @@ class CloudflareAIService(AIService):
|
||||
|
||||
# base endpoint, used by the others
|
||||
def run(self, model, input):
|
||||
response = requests.post(
|
||||
f"{self.api_base_url}{model}", headers=self.headers, json=input)
|
||||
response = requests.post(f"{self.api_base_url}{model}", headers=self.headers, json=input)
|
||||
return response.json()
|
||||
|
||||
# https://developers.cloudflare.com/workers-ai/models/llm/
|
||||
|
||||
@@ -20,7 +20,8 @@ async def main(room_url):
|
||||
None,
|
||||
"Say One Thing From an LLM",
|
||||
duration_minutes=meeting_duration_minutes,
|
||||
mic_enabled=True
|
||||
mic_enabled=True,
|
||||
speaker_enabled=True
|
||||
)
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
|
||||
@@ -1,97 +1,68 @@
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import os
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.services.open_ai_services import OpenAILLMService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from dailyai.queue_aggregators import LLMAssistantContextAggregator, LLMContextAggregator, LLMUserContextAggregator
|
||||
from examples.foundational.support.runner import configure
|
||||
from dailyai.queue_frame import LLMMessagesQueueFrame, TranscriptionQueueFrame, QueueFrame, TextQueueFrame
|
||||
from dailyai.services.ai_services import FrameLogger, AIService
|
||||
from dailyai.services.ai_services import FrameLogger
|
||||
|
||||
class TranscriptFilter(AIService):
|
||||
def __init__(self, bot_participant_id=None):
|
||||
super().__init__()
|
||||
self.bot_participant_id = bot_participant_id
|
||||
print(f"Filtering transcripts from : {self.bot_participant_id}")
|
||||
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
if isinstance(frame, TranscriptionQueueFrame):
|
||||
if frame.participantId != self.bot_participant_id:
|
||||
yield frame
|
||||
|
||||
async def main(room_url: str, token):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
global transport
|
||||
global llm
|
||||
global tts
|
||||
context = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
duration_minutes=5,
|
||||
start_transcription=True,
|
||||
mic_enabled=True,
|
||||
mic_sample_rate=16000,
|
||||
camera_enabled=False,
|
||||
speaker_enabled=True,
|
||||
context=context
|
||||
)
|
||||
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
5,
|
||||
mic_enabled=True,
|
||||
mic_sample_rate=16000,
|
||||
camera_enabled=False
|
||||
)
|
||||
llm = AzureLLMService(
|
||||
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
|
||||
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
|
||||
model=os.getenv("AZURE_CHATGPT_MODEL"))
|
||||
tts = AzureTTSService(
|
||||
api_key=os.getenv("AZURE_SPEECH_API_KEY"),
|
||||
region=os.getenv("AZURE_SPEECH_REGION"))
|
||||
fl = FrameLogger("transport")
|
||||
|
||||
# llm = AzureLLMService(api_key=os.getenv("AZURE_CHATGPT_API_KEY"), endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"), model=os.getenv("AZURE_CHATGPT_MODEL"))
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_CHATGPT_API_KEY"))
|
||||
# tts = AzureTTSService(api_key=os.getenv("AZURE_SPEECH_API_KEY"), region=os.getenv("AZURE_SPEECH_REGION"))
|
||||
tts = ElevenLabsTTSService(aiohttp_session=session, api_key=os.getenv("ELEVENLABS_API_KEY"), voice_id="EXAVITQu4vr4xnSDxMaL")
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
await tts.say("Hi, I'm listening!", transport.send_queue)
|
||||
|
||||
messages = [
|
||||
{"role": "system", "content": """You are Valerie, an agent for a company called Valorant Health. Your job is to help users get access to health care. You're talking to Chad Bailey, a 40 year old male who needs to see a doctor.
|
||||
async def handle_transcriptions():
|
||||
|
||||
You need to do three things, in this order:
|
||||
|
||||
1. Confirm the user's identity.
|
||||
2. Find out what kinds of doctors the user needs to see.
|
||||
3. Get the name of their insurance company.
|
||||
|
||||
Start by introducing yourself and asking the user to verify their identity by providing their date of birth. Once their identity is confirmed, move on to step 2, then to step 3.
|
||||
|
||||
Once you have collected all of that information, respond with a JSON object containing the answers."""}
|
||||
]
|
||||
tma_in = LLMUserContextAggregator(messages, transport._my_participant_id)
|
||||
tma_out = LLMAssistantContextAggregator(messages, transport._my_participant_id)
|
||||
# checklist = ChecklistProcessor(messages, llm)
|
||||
|
||||
async def handle_transcriptions():
|
||||
tf = TranscriptFilter(transport._my_participant_id)
|
||||
await tts.run_to_queue(
|
||||
transport.send_queue,
|
||||
tma_out.run(
|
||||
llm.run(
|
||||
tma_in.run(
|
||||
tf.run(
|
||||
transport.get_receive_frames()
|
||||
)
|
||||
)
|
||||
tma_in = LLMUserContextAggregator(
|
||||
context, transport._my_participant_id)
|
||||
tma_out = LLMAssistantContextAggregator(
|
||||
context, transport._my_participant_id)
|
||||
await tts.run_to_queue(
|
||||
transport.send_queue,
|
||||
tma_out.run(
|
||||
llm.run(
|
||||
tma_in.run(
|
||||
fl.run(
|
||||
transport.get_receive_frames()
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
fl = FrameLogger("first other participant")
|
||||
await tts.run_to_queue(
|
||||
transport.send_queue,
|
||||
fl.run(
|
||||
tma_out.run(
|
||||
llm.run([LLMMessagesQueueFrame(messages)]),
|
||||
)
|
||||
)
|
||||
)
|
||||
transport.transcription_settings["extra"]["endpointing"] = True
|
||||
transport.transcription_settings["extra"]["punctuate"] = True
|
||||
await asyncio.gather(transport.run(), handle_transcriptions())
|
||||
)
|
||||
|
||||
transport.transcription_settings["extra"]["punctuate"] = True
|
||||
transport.transcription_settings["extra"]["endpointing"] = True
|
||||
await asyncio.gather(transport.run(), handle_transcriptions())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,120 +0,0 @@
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import os
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.services.open_ai_services import OpenAILLMService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from dailyai.queue_aggregators import LLMAssistantContextAggregator, LLMContextAggregator, LLMUserContextAggregator
|
||||
from examples.foundational.support.runner import configure
|
||||
from dailyai.queue_frame import LLMMessagesQueueFrame, TranscriptionQueueFrame, QueueFrame, TextQueueFrame
|
||||
from dailyai.services.ai_services import FrameLogger, AIService
|
||||
|
||||
class TranscriptFilter(AIService):
|
||||
def __init__(self, bot_participant_id=None):
|
||||
super().__init__()
|
||||
self.bot_participant_id = bot_participant_id
|
||||
print(f"Filtering transcripts from : {self.bot_participant_id}")
|
||||
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
if isinstance(frame, TranscriptionQueueFrame):
|
||||
if frame.participantId != self.bot_participant_id:
|
||||
yield frame
|
||||
|
||||
class ChecklistProcessor(AIService):
|
||||
def __init__(self, messages, llm, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self._current_step = 0
|
||||
self._messages = messages
|
||||
self._llm = llm
|
||||
self._id = "You are Valerie, an agent for a company called Valorant Health. Your job is to help users get access to health care. You're talking to Chad Bailey, a 40 year old male who needs to see a doctor."
|
||||
self._steps = [
|
||||
"Start by introducing yourself. Then, ask the user to confirm their identity by telling you their birthday. After the user has confirmed their identity, respond only with ABC.",
|
||||
"Now that the user has confirmed their identity, ask them to describe what kind of doctor they need to see. When the user has responded with at least one kind of doctor, respond only with ABC.",
|
||||
"Next, you need to ask the user what kind of health insurance they have. Once the user has told you what insurance company they use, respond only with ABC.",
|
||||
"Tell the user goodbye.",
|
||||
""
|
||||
]
|
||||
messages.append({"role": "system", "content": f"{self._id} {self._steps[0]}"})
|
||||
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
if isinstance(frame, TextQueueFrame):
|
||||
print(f"got a text frame: {frame.text}")
|
||||
if isinstance(frame, TextQueueFrame) and frame.text == "ABC":
|
||||
self._current_step += 1
|
||||
# yield TextQueueFrame(f"We should move on to Step {self._current_step}.")
|
||||
self._messages.append({"role": "system", "content": self._steps[self._current_step]})
|
||||
yield LLMMessagesQueueFrame(self._messages)
|
||||
print(f"past llmmessagesqueueframe yield")
|
||||
async for frame in llm.process_frame(LLMMessagesQueueFrame(self._messages)):
|
||||
yield frame
|
||||
else:
|
||||
yield frame
|
||||
|
||||
async def main(room_url: str, token):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
global transport
|
||||
global llm
|
||||
global tts
|
||||
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
5,
|
||||
)
|
||||
transport.mic_enabled = True
|
||||
transport.mic_sample_rate = 16000
|
||||
transport.camera_enabled = False
|
||||
|
||||
# llm = AzureLLMService(api_key=os.getenv("AZURE_CHATGPT_API_KEY"), endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"), model=os.getenv("AZURE_CHATGPT_MODEL"))
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_CHATGPT_API_KEY"))
|
||||
# tts = AzureTTSService(api_key=os.getenv("AZURE_SPEECH_API_KEY"), region=os.getenv("AZURE_SPEECH_REGION"))
|
||||
tts = ElevenLabsTTSService(aiohttp_session=session, api_key=os.getenv("ELEVENLABS_API_KEY"), voice_id="EXAVITQu4vr4xnSDxMaL")
|
||||
|
||||
messages = [
|
||||
]
|
||||
tma_in = LLMUserContextAggregator(messages, transport._my_participant_id)
|
||||
tma_out = LLMAssistantContextAggregator(messages, transport._my_participant_id)
|
||||
checklist = ChecklistProcessor(messages, llm)
|
||||
|
||||
async def handle_transcriptions():
|
||||
tf = TranscriptFilter(transport._my_participant_id)
|
||||
await tts.run_to_queue(
|
||||
transport.send_queue,
|
||||
checklist.run(
|
||||
tma_out.run(
|
||||
llm.run(
|
||||
tma_in.run(
|
||||
tf.run(
|
||||
transport.get_receive_frames()
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
)
|
||||
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
fl = FrameLogger("first other participant")
|
||||
await tts.run_to_queue(
|
||||
transport.send_queue,
|
||||
fl.run(
|
||||
tma_out.run(
|
||||
llm.run([LLMMessagesQueueFrame(messages)]),
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
transport.transcription_settings["extra"]["punctuate"] = True
|
||||
await asyncio.gather(transport.run(), handle_transcriptions())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
(url, token) = configure()
|
||||
asyncio.run(main(url, token))
|
||||
@@ -1,483 +0,0 @@
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import json
|
||||
import random
|
||||
import os
|
||||
import wave
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.services.open_ai_services import OpenAILLMService
|
||||
from dailyai.services.deepgram_ai_services import DeepgramTTSService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from dailyai.queue_aggregators import LLMAssistantContextAggregator, LLMContextAggregator, LLMUserContextAggregator
|
||||
from support.runner import configure
|
||||
from dailyai.queue_frame import LLMMessagesQueueFrame, TranscriptionQueueFrame, QueueFrame, TextQueueFrame, LLMFunctionCallFrame, LLMResponseEndQueueFrame, StartStreamQueueFrame, AudioQueueFrame
|
||||
from dailyai.services.ai_services import FrameLogger, AIService
|
||||
from dailyai.conversation_wrappers import InterruptibleConversationWrapper
|
||||
|
||||
import logging
|
||||
logging.basicConfig(level=logging.ERROR)
|
||||
|
||||
sounds = {}
|
||||
sound_files = [
|
||||
'clack-short.wav',
|
||||
'clack.wav',
|
||||
'clack-short-quiet.wav'
|
||||
]
|
||||
|
||||
script_dir = os.path.dirname(__file__)
|
||||
|
||||
for file in sound_files:
|
||||
# Build the full path to the image file
|
||||
full_path = os.path.join(script_dir, "assets", file)
|
||||
# Get the filename without the extension to use as the dictionary key
|
||||
filename = os.path.splitext(os.path.basename(full_path))[0]
|
||||
# Open the image and convert it to bytes
|
||||
with wave.open(full_path) as audio_file:
|
||||
sounds[file] = audio_file.readframes(-1)
|
||||
|
||||
tools = [
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "verify_birthday",
|
||||
"description": "Use this function to verify the user has provided their correct birthday.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"birthday": {
|
||||
"type": "string",
|
||||
"description": "The user's birthdate, including the year. The user can provide it in any format, but convert it to YYYY-MM-DD format to call this function."
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "list_prescriptions",
|
||||
"description": "Once the user has provided a list of their prescription medications, call this function.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"prescriptions": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string",
|
||||
"description": "The medication's name"
|
||||
},
|
||||
"dosage": {
|
||||
"type": "string",
|
||||
"description": "The prescription's dosage"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "list_allergies",
|
||||
"description": "Once the user has provided a list of their allergies, call this function.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"allergies": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string",
|
||||
"description": "What the user is allergic to"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "list_conditions",
|
||||
"description": "Once the user has provided a list of their medical conditions, call this function.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"conditions": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string",
|
||||
"description": "The user's medical condition"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "list_visit_reasons",
|
||||
"description": "Once the user has provided a list of the reasons they are visiting a doctor today, call this function.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"visit_reasons": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string",
|
||||
"description": "The user's reason for visiting the doctor"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
|
||||
steps = [
|
||||
{
|
||||
"prompt": "Start by introducing yourself. Then, ask the user to confirm their identity by telling you their birthday, including the year. When they answer with their birthday, call the verify_birthday function.",
|
||||
"run_async": False,
|
||||
"failed": "The user provided an incorrect birthday. Ask them for their birthday again. When they answer, call the verify_birthday function.", "tools": [{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "verify_birthday",
|
||||
"description": "Use this function to verify the user has provided their correct birthday.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"birthday": {
|
||||
"type": "string",
|
||||
"description": "The user's birthdate, including the year. The user can provide it in any format, but convert it to YYYY-MM-DD format to call this function."
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}]},
|
||||
{
|
||||
"prompt": "Next, thank the user for confirming their identity, then ask the user to list their current prescriptions. Each prescription needs to have a medication name and a dosage. Do not call the list_prescriptions function with any unknown dosages.",
|
||||
"run_async": True,
|
||||
"tools": [{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "list_prescriptions",
|
||||
"description": "Once the user has provided a list of their prescription medications, call this function.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"prescriptions": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"medication": {
|
||||
"type": "string",
|
||||
"description": "The medication's name"
|
||||
},
|
||||
"dosage": {
|
||||
"type": "string",
|
||||
"description": "The prescription's dosage"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}]
|
||||
},
|
||||
{
|
||||
"prompt": "Next, ask the user if they have any allergies. Once they have listed their allergies or confirmed they don't have any, call the list_allergies function.",
|
||||
"run_async": True,
|
||||
"tools": [
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "list_allergies",
|
||||
"description": "Once the user has provided a list of their allergies, call this function.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"allergies": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string",
|
||||
"description": "What the user is allergic to"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"prompt": "Now ask the user if they have any medical conditions the doctor should know about. Once they've answered the question, call the list_conditions function.",
|
||||
"run_async": True,
|
||||
"tools": [
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "list_conditions",
|
||||
"description": "Once the user has provided a list of their medical conditions, call this function.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"conditions": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string",
|
||||
"description": "The user's medical condition"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
"prompt": "Finally, ask the user the reason for their doctor visit today. Once they answer, double-check to make sure they don't have any other health concerns. After that, call the list_visit_reasons function.",
|
||||
"run_async": True,
|
||||
"tools": [
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "list_visit_reasons",
|
||||
"description": "Once the user has provided a list of the reasons they are visiting a doctor today, call this function.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"visit_reasons": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string",
|
||||
"description": "The user's reason for visiting the doctor"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{"prompt": "Now, thank the user and end the conversation.", "run_async": True, "tools": []},
|
||||
{"prompt": "", "run_async": True, "tools": []}
|
||||
]
|
||||
current_step = 0
|
||||
|
||||
class TranscriptFilter(AIService):
|
||||
def __init__(self, bot_participant_id=None):
|
||||
super().__init__()
|
||||
self.bot_participant_id = bot_participant_id
|
||||
print(f"Filtering transcripts from : {self.bot_participant_id}")
|
||||
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
if isinstance(frame, TranscriptionQueueFrame):
|
||||
if frame.participantId != self.bot_participant_id:
|
||||
yield frame
|
||||
|
||||
|
||||
class ChecklistProcessor(AIService):
|
||||
def __init__(self, messages, llm, tools, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self._messages = messages
|
||||
self._llm = llm
|
||||
self._tools = tools
|
||||
self._function_name = ""
|
||||
self._arguments = ""
|
||||
self._id = "You are Jessica, an agent for a company called Tri-County Advanced Optimum Health Solution Specialists. Your job is to collect important information from the user before they visit a doctor. You're talking to Chad Bailey. You should address the user by their first name and be polite and professional. You're not a medical professional, so you shouldn't provide any advice. Keep your responses short. Your job is to collect information to give to a doctor. Don't make assumptions about what values to plug into functions. Ask for clarification if a user response is ambiguous."
|
||||
self._acks = [ "One sec.", "Let me confirm that.", "Thanks.", "OK."]
|
||||
|
||||
messages.append(
|
||||
{"role": "system", "content": f"{self._id} {steps[0]['prompt']}"})
|
||||
|
||||
def verify_birthday(self, args):
|
||||
return args['birthday'] == "1983-08-19"
|
||||
|
||||
def list_prescriptions(self, args):
|
||||
print(f"Prescriptions: {args['prescriptions']}")
|
||||
|
||||
def list_allergies(self, args):
|
||||
print(f"Allergies: {args['allergies']}")
|
||||
|
||||
def list_conditions(self, args):
|
||||
print(f"Medical Conditions: {args['conditions']}")
|
||||
|
||||
def list_visit_reasons(self, args):
|
||||
print(f"Visit Reasons: {args['visit_reasons']}")
|
||||
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
global current_step
|
||||
this_step = steps[current_step]
|
||||
# TODO-CB: forcing a global here :/
|
||||
self._tools.clear()
|
||||
self._tools.extend(this_step['tools'])
|
||||
if isinstance(frame, LLMFunctionCallFrame) and frame.function_name:
|
||||
print(f"FUNCTION CALL: {frame}")
|
||||
self._function_name = frame.function_name
|
||||
if this_step['run_async']:
|
||||
# Get the LLM talking about the next step before getting the rest
|
||||
# of the function call completion
|
||||
current_step += 1
|
||||
# yield TextQueueFrame(f"We should move on to Step {current_step}.")
|
||||
self._messages.append({
|
||||
"role": "system", "content": steps[current_step]['prompt']})
|
||||
# yield LLMMessagesQueueFrame(self._messages)
|
||||
yield LLMMessagesQueueFrame(self._messages)
|
||||
async for frame in llm.process_frame(LLMMessagesQueueFrame(self._messages), tool_choice="none"):
|
||||
yield frame
|
||||
else:
|
||||
# Insert a quick response while we run the function
|
||||
yield AudioQueueFrame(sounds["clack-short-quiet.wav"])
|
||||
elif isinstance(frame, LLMFunctionCallFrame) and frame.arguments:
|
||||
self._arguments += frame.arguments
|
||||
elif isinstance(frame, LLMResponseEndQueueFrame):
|
||||
print(
|
||||
f"%%% got a response end. function_name is {self._function_name}, arguments is {self._arguments}")
|
||||
print(f"%%%% messages is {self._messages}")
|
||||
|
||||
if self._function_name and self._arguments:
|
||||
|
||||
fn = getattr(self, self._function_name)
|
||||
print(f"fn is: {fn}")
|
||||
result = fn(json.loads(self._arguments))
|
||||
self._function_name = ""
|
||||
self._arguments = ""
|
||||
if not this_step['run_async']:
|
||||
if result:
|
||||
current_step += 1
|
||||
# yield TextQueueFrame(f"We should move on to Step {current_step}.")
|
||||
self._messages.append({
|
||||
"role": "system", "content": steps[current_step]['prompt']})
|
||||
# yield LLMMessagesQueueFrame(self._messages)
|
||||
yield LLMMessagesQueueFrame(self._messages)
|
||||
async for frame in llm.process_frame(LLMMessagesQueueFrame(self._messages), tool_choice="none"):
|
||||
yield frame
|
||||
else:
|
||||
self._messages.append({
|
||||
"role": "system", "content": this_step['failed']})
|
||||
# yield LLMMessagesQueueFrame(self._messages)
|
||||
yield LLMMessagesQueueFrame(self._messages)
|
||||
async for frame in llm.process_frame(LLMMessagesQueueFrame(self._messages), tool_choice="none"):
|
||||
yield frame
|
||||
print(f"VERIFY RESULT: {result}")
|
||||
|
||||
else:
|
||||
yield frame
|
||||
|
||||
|
||||
async def main(room_url: str, token):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
global transport
|
||||
global llm
|
||||
global tts
|
||||
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
5,
|
||||
mic_enabled=True,
|
||||
mic_sample_rate=16000,
|
||||
camera_enabled=False,
|
||||
start_transcription=True,
|
||||
vad_enabled=True
|
||||
)
|
||||
|
||||
messages = []
|
||||
tools = []
|
||||
|
||||
# llm = AzureLLMService(api_key=os.getenv("AZURE_CHATGPT_API_KEY"), endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"), model=os.getenv("AZURE_CHATGPT_MODEL"))
|
||||
llm = OpenAILLMService(api_key=os.getenv(
|
||||
"OPENAI_CHATGPT_API_KEY"), model="gpt-4-turbo-preview", tools=tools)
|
||||
# tts = AzureTTSService(api_key=os.getenv(
|
||||
# "AZURE_SPEECH_API_KEY"), region=os.getenv("AZURE_SPEECH_REGION"))
|
||||
tts = ElevenLabsTTSService(aiohttp_session=session, api_key=os.getenv(
|
||||
"ELEVENLABS_API_KEY"), voice_id="XrExE9yKIg1WjnnlVkGX") # matilda
|
||||
# tts = DeepgramTTSService(aiohttp_session=session, api_key=os.getenv("DEEPGRAM_API_KEY"), voice=os.getenv("DEEPGRAM_VOICE"))
|
||||
|
||||
tma_in = LLMUserContextAggregator(
|
||||
messages, transport._my_participant_id)
|
||||
tma_out = LLMAssistantContextAggregator(
|
||||
messages, transport._my_participant_id)
|
||||
checklist = ChecklistProcessor(messages, llm, tools)
|
||||
fl = FrameLogger("got transcript")
|
||||
fl2 = FrameLogger("just above the checklist")
|
||||
|
||||
async def run_response(user_speech, tma_in, tma_out):
|
||||
tf = TranscriptFilter(transport._my_participant_id)
|
||||
await tts.run_to_queue(
|
||||
transport.send_queue,
|
||||
checklist.run(
|
||||
tma_out.run(
|
||||
llm.run(
|
||||
tma_in.run(
|
||||
[StartStreamQueueFrame(), TextQueueFrame(user_speech)]
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
fl = FrameLogger("first other participant")
|
||||
await tts.run_to_queue(
|
||||
transport.send_queue,
|
||||
fl.run(
|
||||
tma_out.run(
|
||||
llm.run([LLMMessagesQueueFrame(messages)]),
|
||||
)
|
||||
)
|
||||
)
|
||||
transport.transcription_settings["extra"]["endpointing"] = True
|
||||
transport.transcription_settings["extra"]["punctuate"] = True
|
||||
try:
|
||||
await asyncio.gather(transport.run(), transport.run_conversation(run_response))
|
||||
except (asyncio.CancelledError, KeyboardInterrupt):
|
||||
print('whoops')
|
||||
transport.stop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
(url, token) = configure()
|
||||
asyncio.run(main(url, token))
|
||||
@@ -0,0 +1,83 @@
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
from dailyai.conversation_wrappers import InterruptibleConversationWrapper
|
||||
|
||||
from dailyai.queue_frame import StartStreamQueueFrame, TextQueueFrame
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from dailyai.services.open_ai_services import OpenAILLMService
|
||||
from dailyai.services.deepgram_ai_services import DeepgramTTSService
|
||||
from dailyai.services.ai_services import FrameLogger
|
||||
from dailyai.services.groq_ai_services import GroqLLMService
|
||||
|
||||
from examples.foundational.support.runner import configure
|
||||
|
||||
|
||||
async def main(room_url: str, token):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
context = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
duration_minutes=5,
|
||||
start_transcription=True,
|
||||
mic_enabled=True,
|
||||
mic_sample_rate=16000,
|
||||
camera_enabled=False,
|
||||
# TODO-CB: Should this be VAD enabled or something?
|
||||
speaker_enabled=True,
|
||||
context=context
|
||||
)
|
||||
|
||||
# llm = AzureLLMService(
|
||||
# api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
|
||||
# endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
|
||||
# model=os.getenv("AZURE_CHATGPT_MODEL"),
|
||||
# context=context)
|
||||
llm = OpenAILLMService(
|
||||
context=context, api_key=os.getenv("OPENAI_CHATGPT_API_KEY"))
|
||||
# llm = GroqLLMService(api_key=os.getenv("GROQ_API_KEY"), context=context)
|
||||
# tts = AzureTTSService(
|
||||
# api_key=os.getenv("AZURE_SPEECH_API_KEY"),
|
||||
# region=os.getenv("AZURE_SPEECH_REGION"))
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"))
|
||||
# tts = DeepgramTTSService(aiohttp_session=session, api_key=os.getenv("DEEPGRAM_API_KEY"), voice=os.getenv("DEEPGRAM_VOICE"))
|
||||
fl = FrameLogger("just outside the innermost layer")
|
||||
|
||||
async def run_response(in_frame):
|
||||
await tts.run_to_queue(
|
||||
transport.send_queue,
|
||||
# tma_out.run(
|
||||
llm.run(
|
||||
# tma_in.run(
|
||||
fl.run(
|
||||
[StartStreamQueueFrame(), in_frame]
|
||||
)
|
||||
# )
|
||||
)
|
||||
# ),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
await tts.say("Hi, I'm listening!", transport.send_queue)
|
||||
|
||||
transport.transcription_settings["extra"]["endpointing"] = True
|
||||
transport.transcription_settings["extra"]["punctuate"] = True
|
||||
await asyncio.gather(transport.run(), transport.run_conversation(run_response))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
(url, token) = configure()
|
||||
asyncio.run(main(url, token))
|
||||
@@ -6,7 +6,6 @@ from dailyai.conversation_wrappers import InterruptibleConversationWrapper
|
||||
from dailyai.queue_frame import StartStreamQueueFrame, TextQueueFrame
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.services.open_ai_services import OpenAILLMService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
|
||||
from examples.foundational.support.runner import configure
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
160
src/khk-hackathon/06d-listen.py
Normal file
160
src/khk-hackathon/06d-listen.py
Normal file
@@ -0,0 +1,160 @@
|
||||
from datetime import datetime
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
from dailyai.conversation_wrappers import InterruptibleConversationWrapper
|
||||
|
||||
from dailyai.queue_frame import StartStreamQueueFrame, TranscriptionQueueFrame, TextQueueFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.fireworks_ai_services import FireworksLLMService
|
||||
from dailyai.services.deepgram_ai_services import DeepgramTTSService
|
||||
from dailyai.services.ai_services import FrameLogger
|
||||
|
||||
from dailyai.services.fal_ai_services import FalImageGenService
|
||||
|
||||
from examples.foundational.support.runner import configure
|
||||
|
||||
|
||||
command_line_prompt = ' '.join(sys.argv[1:])
|
||||
|
||||
system_prompt = """
|
||||
You are a friendly robot character with a cartoon body with head, torso, arms, feet,
|
||||
and legs.
|
||||
|
||||
You can change your appearance using the `change_appearance` function call.
|
||||
You can add or remove items from your body, change
|
||||
your color, and more. You can use function calling to change your appearance.
|
||||
|
||||
When changing your appearance, please create a prompt as an argument to the function.
|
||||
The prompt will help the image generation model
|
||||
create a new appearance for you. Include as much detail as possible. Include the
|
||||
keywords "robot", "friendly", "cartoon", "smiling", "happy", "animated".
|
||||
The initial image prompt you are adding to or changing is
|
||||
"A friendly cartoon robot, smiling and happy, animated."
|
||||
|
||||
Do not include the image model prompt in your response. The prompt must be passed to the function
|
||||
as a parameter.
|
||||
"""
|
||||
|
||||
change_appearance_function = {
|
||||
"name": "change_appearance",
|
||||
"description": "Call this function when the users want you to change your appearance.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"appearance": {
|
||||
"type": "string",
|
||||
"description": "The new appearance for the robot, in the form of a prompt for an generative AI diffusion model."
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tools = [
|
||||
{
|
||||
"type": "function",
|
||||
"function": change_appearance_function
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
async def main(room_url: str, token):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
context = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": system_prompt,
|
||||
},
|
||||
]
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
duration_minutes=30,
|
||||
start_transcription=True,
|
||||
mic_enabled=True,
|
||||
mic_sample_rate=16000,
|
||||
camera_enabled=True,
|
||||
camera_width=1024,
|
||||
camera_height=1024,
|
||||
# TODO-CB: Should this be VAD enabled or something?
|
||||
speaker_enabled=True,
|
||||
context=context
|
||||
)
|
||||
|
||||
imagegen = FalImageGenService(
|
||||
image_size="512x512",
|
||||
aiohttp_session=session,
|
||||
key_id=os.getenv("FAL_KEY_ID"),
|
||||
key_secret=os.getenv("FAL_KEY_SECRET"))
|
||||
|
||||
async def change_appearance(appearance):
|
||||
await asyncio.create_task(
|
||||
imagegen.run_to_queue(
|
||||
transport.send_queue, [
|
||||
TextQueueFrame(appearance)]))
|
||||
|
||||
llm = FireworksLLMService(
|
||||
context=context,
|
||||
api_key=os.getenv("FIREWORKS_API_KEY"),
|
||||
model="accounts/fireworks/models/firefunction-v1",
|
||||
# TODO - how can we modify tools list on the fly?
|
||||
tools=tools,
|
||||
change_appearance=change_appearance,
|
||||
transport=transport
|
||||
)
|
||||
tts = DeepgramTTSService(aiohttp_session=session, api_key=os.getenv(
|
||||
"DEEPGRAM_API_KEY"), voice=os.getenv("DEEPGRAM_VOICE"))
|
||||
fl = FrameLogger("just outside the innermost layer")
|
||||
|
||||
async def run_response(in_frame):
|
||||
await tts.run_to_queue(
|
||||
transport.send_queue,
|
||||
# tma_out.run(
|
||||
llm.run(
|
||||
# tma_in.run(
|
||||
fl.run(
|
||||
[StartStreamQueueFrame(), in_frame]
|
||||
)
|
||||
# )
|
||||
)
|
||||
# ),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
await change_appearance("A friendly cartoon robot, smiling and happy, animated.")
|
||||
return
|
||||
|
||||
await tts.say("Hi, I'm listening!", transport.send_queue)
|
||||
await asyncio.sleep(1)
|
||||
|
||||
await transport.receive_queue.put(UserStartedSpeakingFrame())
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
transport.on_transcription_message({
|
||||
"text": command_line_prompt,
|
||||
"participantId": "cb65b845-aac0-4fc8-987d-2e7ce3c7d8f0",
|
||||
"timestamp": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
|
||||
})
|
||||
# putting the frame into the queue directly doesn't seem to work
|
||||
# await transport.receive_queue.put(
|
||||
# TranscriptionQueueFrame(
|
||||
# "tell me a joke.",
|
||||
# "cb65b845-aac0-4fc8-987d-2e7ce3c7d8f0",
|
||||
# datetime.utcnow().strftime(
|
||||
# '%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
|
||||
# ))
|
||||
await asyncio.sleep(0.1)
|
||||
await transport.receive_queue.put(UserStoppedSpeakingFrame())
|
||||
|
||||
transport.transcription_settings["extra"]["endpointing"] = True
|
||||
transport.transcription_settings["extra"]["punctuate"] = True
|
||||
|
||||
await asyncio.gather(transport.run(), transport.run_conversation(run_response))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
(url, token) = configure()
|
||||
asyncio.run(main(url, token))
|
||||
Reference in New Issue
Block a user