Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
358166f347 | ||
|
|
c006c123b2 | ||
|
|
cf302fb765 |
@@ -5,6 +5,7 @@ from dailyai.pipeline.frame_processor import FrameProcessor
|
||||
|
||||
from dailyai.pipeline.frames import (
|
||||
EndFrame,
|
||||
AudioFrame,
|
||||
EndPipeFrame,
|
||||
Frame,
|
||||
ImageFrame,
|
||||
@@ -14,7 +15,7 @@ from dailyai.pipeline.frames import (
|
||||
TextFrame,
|
||||
TranscriptionQueueFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
from dailyai.pipeline.pipeline import Pipeline
|
||||
from dailyai.services.ai_services import AIService
|
||||
@@ -23,6 +24,7 @@ from typing import AsyncGenerator, Callable, Coroutine, List
|
||||
|
||||
from dailyai.services.openai_llm_context import OpenAILLMContext
|
||||
|
||||
|
||||
class ResponseAggregator(FrameProcessor):
|
||||
|
||||
def __init__(
|
||||
@@ -44,9 +46,7 @@ class ResponseAggregator(FrameProcessor):
|
||||
self._accumulator_frame = accumulator_frame
|
||||
self._pass_through = pass_through
|
||||
|
||||
async def process_frame(
|
||||
self, frame: Frame
|
||||
) -> AsyncGenerator[Frame, None]:
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
if not self.messages:
|
||||
return
|
||||
|
||||
@@ -54,9 +54,13 @@ class ResponseAggregator(FrameProcessor):
|
||||
self.aggregating = True
|
||||
elif isinstance(frame, self._end_frame):
|
||||
self.aggregating = False
|
||||
self.messages.append({"role": self._role, "content": self.aggregation})
|
||||
self.aggregation = ""
|
||||
yield LLMMessagesQueueFrame(self.messages)
|
||||
# Sometimes VAD triggers quickly on and off. If we don't get any transcription,
|
||||
# it creates empty LLM message queue frames
|
||||
if len(self.aggregation) > 0:
|
||||
self.messages.append({"role": self._role, "content": self.aggregation})
|
||||
self.aggregation = ""
|
||||
yield self._end_frame()
|
||||
yield LLMMessagesQueueFrame(self.messages)
|
||||
elif isinstance(frame, self._accumulator_frame) and self.aggregating:
|
||||
self.aggregation += f" {frame.text}"
|
||||
if self._pass_through:
|
||||
@@ -64,6 +68,7 @@ class ResponseAggregator(FrameProcessor):
|
||||
else:
|
||||
yield frame
|
||||
|
||||
|
||||
class LLMResponseAggregator(ResponseAggregator):
|
||||
def __init__(self, messages: list[dict]):
|
||||
super().__init__(
|
||||
@@ -71,9 +76,10 @@ class LLMResponseAggregator(ResponseAggregator):
|
||||
role="assistant",
|
||||
start_frame=LLMResponseStartFrame,
|
||||
end_frame=LLMResponseEndFrame,
|
||||
accumulator_frame=TextFrame
|
||||
accumulator_frame=TextFrame,
|
||||
)
|
||||
|
||||
|
||||
class UserResponseAggregator(ResponseAggregator):
|
||||
def __init__(self, messages: list[dict]):
|
||||
super().__init__(
|
||||
@@ -82,7 +88,7 @@ class UserResponseAggregator(ResponseAggregator):
|
||||
start_frame=UserStartedSpeakingFrame,
|
||||
end_frame=UserStoppedSpeakingFrame,
|
||||
accumulator_frame=TranscriptionQueueFrame,
|
||||
pass_through=False
|
||||
pass_through=False,
|
||||
)
|
||||
|
||||
|
||||
@@ -103,9 +109,7 @@ class LLMContextAggregator(AIService):
|
||||
self.complete_sentences = complete_sentences
|
||||
self.pass_through = pass_through
|
||||
|
||||
async def process_frame(
|
||||
self, frame: Frame
|
||||
) -> AsyncGenerator[Frame, None]:
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
# We don't do anything with non-text frames, pass it along to next in the pipeline.
|
||||
if not isinstance(frame, TextFrame):
|
||||
yield frame
|
||||
@@ -137,6 +141,7 @@ class LLMContextAggregator(AIService):
|
||||
self.messages.append({"role": self.role, "content": frame.text})
|
||||
yield LLMMessagesQueueFrame(self.messages)
|
||||
|
||||
|
||||
class LLMUserContextAggregator(LLMContextAggregator):
|
||||
def __init__(
|
||||
self, messages: list[dict], bot_participant_id=None, complete_sentences=True
|
||||
@@ -176,12 +181,11 @@ class SentenceAggregator(FrameProcessor):
|
||||
>>> asyncio.run(print_frames(aggregator, TextFrame(" world.")))
|
||||
Hello, world.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.aggregation = ""
|
||||
|
||||
async def process_frame(
|
||||
self, frame: Frame
|
||||
) -> AsyncGenerator[Frame, None]:
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
if isinstance(frame, TextFrame):
|
||||
m = re.search("(.*[?.!])(.*)", frame.text)
|
||||
if m:
|
||||
@@ -233,12 +237,11 @@ class LLMFullResponseAggregator(FrameProcessor):
|
||||
Hello, world. I am an LLM.
|
||||
LLMResponseEndFrame
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.aggregation = ""
|
||||
|
||||
async def process_frame(
|
||||
self, frame: Frame
|
||||
) -> AsyncGenerator[Frame, None]:
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
if isinstance(frame, TextFrame):
|
||||
self.aggregation += frame.text
|
||||
elif isinstance(frame, LLMResponseEndFrame):
|
||||
@@ -274,8 +277,9 @@ class StatelessTextTransformer(FrameProcessor):
|
||||
else:
|
||||
yield frame
|
||||
|
||||
|
||||
class ParallelPipeline(FrameProcessor):
|
||||
""" Run multiple pipelines in parallel.
|
||||
"""Run multiple pipelines in parallel.
|
||||
|
||||
This class takes frames from its source queue and sends them to each
|
||||
sub-pipeline. Each sub-pipeline emits its frames into this class's
|
||||
@@ -292,6 +296,7 @@ class ParallelPipeline(FrameProcessor):
|
||||
Since frame handlers pass through unhandled frames by convention, this
|
||||
class de-dupes frames in its sink before yielding them.
|
||||
"""
|
||||
|
||||
def __init__(self, pipeline_definitions: List[List[FrameProcessor]]):
|
||||
self.sources = [asyncio.Queue() for _ in pipeline_definitions]
|
||||
self.sink: asyncio.Queue[Frame] = asyncio.Queue()
|
||||
@@ -327,6 +332,7 @@ class ParallelPipeline(FrameProcessor):
|
||||
if not isinstance(frame, EndPipeFrame):
|
||||
yield frame
|
||||
|
||||
|
||||
class GatedAggregator(FrameProcessor):
|
||||
"""Accumulate frames, with custom functions to start and stop accumulation.
|
||||
Yields gate-opening frame before any accumulated frames, then ensuing frames
|
||||
@@ -352,6 +358,7 @@ class GatedAggregator(FrameProcessor):
|
||||
>>> asyncio.run(print_frames(aggregator, TextFrame("Goodbye.")))
|
||||
Goodbye.
|
||||
"""
|
||||
|
||||
def __init__(self, gate_open_fn, gate_close_fn, start_open):
|
||||
self.gate_open_fn = gate_open_fn
|
||||
self.gate_close_fn = gate_close_fn
|
||||
|
||||
@@ -5,7 +5,9 @@ from dailyai.services.openai_llm_context import OpenAILLMContext
|
||||
|
||||
|
||||
class Frame:
|
||||
pass
|
||||
def __str__(self):
|
||||
return f"{self.__class__.__name__}"
|
||||
|
||||
|
||||
class ControlFrame(Frame):
|
||||
# Control frames should contain no instance data, so
|
||||
@@ -21,10 +23,21 @@ class StartFrame(ControlFrame):
|
||||
class EndFrame(ControlFrame):
|
||||
pass
|
||||
|
||||
|
||||
class EndPipeFrame(ControlFrame):
|
||||
pass
|
||||
|
||||
|
||||
class PipelineStartedFrame(ControlFrame):
|
||||
"""
|
||||
Used by the transport to indicate that execution of a pipeline is starting
|
||||
(or restarting). It should be the first frame your app receives when it
|
||||
starts, or when an interruptible pipeline has been interrupted.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class LLMResponseStartFrame(ControlFrame):
|
||||
pass
|
||||
|
||||
@@ -37,22 +50,34 @@ class LLMResponseEndFrame(ControlFrame):
|
||||
class AudioFrame(Frame):
|
||||
data: bytes
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.__class__.__name__}, size: {len(self.data)} B"
|
||||
|
||||
|
||||
@dataclass()
|
||||
class ImageFrame(Frame):
|
||||
url: str | None
|
||||
image: bytes
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.__class__.__name__}, url: {self.url}, image size: {len(self.image)} B"
|
||||
|
||||
|
||||
@dataclass()
|
||||
class SpriteFrame(Frame):
|
||||
images: list[bytes]
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.__class__.name__}, list size: {len(self.images)}"
|
||||
|
||||
|
||||
@dataclass()
|
||||
class TextFrame(Frame):
|
||||
text: str
|
||||
|
||||
def __str__(self):
|
||||
return f'{self.__class__.__name__}: "{self.text}"'
|
||||
|
||||
|
||||
@dataclass()
|
||||
class TranscriptionQueueFrame(TextFrame):
|
||||
@@ -74,15 +99,28 @@ class AppMessageQueueFrame(Frame):
|
||||
message: Any
|
||||
participantId: str
|
||||
|
||||
|
||||
class UserStartedSpeakingFrame(Frame):
|
||||
pass
|
||||
|
||||
|
||||
class UserStoppedSpeakingFrame(Frame):
|
||||
pass
|
||||
|
||||
|
||||
class BotStartedSpeakingFrame(Frame):
|
||||
pass
|
||||
|
||||
|
||||
class BotStoppedSpeakingFrame(Frame):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass()
|
||||
class LLMFunctionStartFrame(Frame):
|
||||
function_name: str
|
||||
|
||||
|
||||
@dataclass()
|
||||
class LLMFunctionCallFrame(Frame):
|
||||
function_name: str
|
||||
|
||||
@@ -19,7 +19,7 @@ class Pipeline:
|
||||
source: asyncio.Queue | None = None,
|
||||
sink: asyncio.Queue[Frame] | None = None,
|
||||
):
|
||||
""" Create a new pipeline. By default neither the source nor sink
|
||||
"""Create a new pipeline. By default neither the source nor sink
|
||||
queues are set, so you'll need to pass them to this constructor or
|
||||
call set_source and set_sink before using the pipeline. Note that
|
||||
the transport's run_*_pipeline methods will set the source and sink
|
||||
@@ -30,18 +30,18 @@ class Pipeline:
|
||||
self.sink: asyncio.Queue[Frame] | None = sink
|
||||
|
||||
def set_source(self, source: asyncio.Queue[Frame]):
|
||||
""" Set the source queue for this pipeline. Frames from this queue
|
||||
"""Set the source queue for this pipeline. Frames from this queue
|
||||
will be processed by each frame_processor in the pipeline, or order
|
||||
from first to last. """
|
||||
from first to last."""
|
||||
self.source = source
|
||||
|
||||
def set_sink(self, sink: asyncio.Queue[Frame]):
|
||||
""" Set the sink queue for this pipeline. After the last frame_processor
|
||||
"""Set the sink queue for this pipeline. After the last frame_processor
|
||||
has processed a frame, its output will be placed on this queue."""
|
||||
self.sink = sink
|
||||
|
||||
async def get_next_source_frame(self) -> AsyncGenerator[Frame, None]:
|
||||
""" Convenience function to get the next frame from the source queue. This
|
||||
"""Convenience function to get the next frame from the source queue. This
|
||||
lets us consistently have an AsyncGenerator yield frames, from either the
|
||||
source queue or a frame_processor."""
|
||||
if self.source is None:
|
||||
@@ -53,13 +53,15 @@ class Pipeline:
|
||||
) -> AsyncGenerator[Frame, None]:
|
||||
if processors:
|
||||
async for frame in processors[0].process_frame(initial_frame):
|
||||
async for final_frame in self.run_pipeline_recursively(frame, processors[1:]):
|
||||
async for final_frame in self.run_pipeline_recursively(
|
||||
frame, processors[1:]
|
||||
):
|
||||
yield final_frame
|
||||
else:
|
||||
yield initial_frame
|
||||
|
||||
async def run_pipeline(self):
|
||||
""" Run the pipeline. Take each frame from the source queue, pass it to
|
||||
"""Run the pipeline. Take each frame from the source queue, pass it to
|
||||
the first frame_processor, pass the output of that frame_processor to the
|
||||
next in the list, etc. until the last frame_processor has processed the
|
||||
resulting frames, then place those frames in the sink queue.
|
||||
@@ -68,7 +70,8 @@ class Pipeline:
|
||||
|
||||
This method will exit when an EndStreamQueueFrame is placed on the sink queue.
|
||||
No more frames will be placed on the sink queue after an EndStreamQueueFrame, even
|
||||
if it's not the last frame yielded by the last frame_processor in the pipeline.."""
|
||||
if it's not the last frame yielded by the last frame_processor in the pipeline..
|
||||
"""
|
||||
|
||||
if self.source is None or self.sink is None:
|
||||
raise ValueError("Source or sink queue not set")
|
||||
@@ -76,13 +79,26 @@ class Pipeline:
|
||||
try:
|
||||
while True:
|
||||
initial_frame = await self.source.get()
|
||||
async for frame in self.run_pipeline_recursively(initial_frame, self.processors):
|
||||
async for frame in self.run_pipeline_recursively(
|
||||
initial_frame, self.processors
|
||||
):
|
||||
await self.sink.put(frame)
|
||||
|
||||
if isinstance(initial_frame, EndFrame) or isinstance(initial_frame, EndPipeFrame):
|
||||
if isinstance(initial_frame, EndFrame) or isinstance(
|
||||
initial_frame, EndPipeFrame
|
||||
):
|
||||
break
|
||||
except asyncio.CancelledError:
|
||||
# this means there's been an interruption, do any cleanup necessary here.
|
||||
for processor in self.processors:
|
||||
await processor.interrupted()
|
||||
pass
|
||||
|
||||
async def queue_frames(self, frames: Frame | List[Frame]):
|
||||
"""Insert frames directly into a pipeline. This is typically used inside a transport
|
||||
participant_joined callback to prompt a bot to start a conversation, for example.
|
||||
"""
|
||||
if not isinstance(frames, List):
|
||||
frames = [frames]
|
||||
for f in frames:
|
||||
await self.source.put(f)
|
||||
|
||||
@@ -1,3 +0,0 @@
|
||||
Pillow==10.1.0
|
||||
typing_extensions==4.9.0
|
||||
faster-whisper==0.10.0
|
||||
@@ -16,12 +16,13 @@ from dailyai.pipeline.frames import (
|
||||
LLMFunctionCallFrame,
|
||||
Frame,
|
||||
TextFrame,
|
||||
TranscriptionQueueFrame
|
||||
TranscriptionQueueFrame,
|
||||
)
|
||||
|
||||
from abc import abstractmethod
|
||||
from typing import AsyncGenerator, AsyncIterable, BinaryIO, Iterable, List
|
||||
|
||||
|
||||
class AIService(FrameProcessor):
|
||||
|
||||
def __init__(self):
|
||||
@@ -30,7 +31,9 @@ class AIService(FrameProcessor):
|
||||
def stop(self):
|
||||
pass
|
||||
|
||||
async def run_to_queue(self, queue: asyncio.Queue, frames, add_end_of_stream=False) -> None:
|
||||
async def run_to_queue(
|
||||
self, queue: asyncio.Queue, frames, add_end_of_stream=False
|
||||
) -> None:
|
||||
async for frame in self.run(frames):
|
||||
await queue.put(frame)
|
||||
|
||||
@@ -39,9 +42,7 @@ class AIService(FrameProcessor):
|
||||
|
||||
async def run(
|
||||
self,
|
||||
frames: Iterable[Frame]
|
||||
| AsyncIterable[Frame]
|
||||
| asyncio.Queue[Frame],
|
||||
frames: Iterable[Frame] | AsyncIterable[Frame] | asyncio.Queue[Frame],
|
||||
) -> AsyncGenerator[Frame, None]:
|
||||
try:
|
||||
if isinstance(frames, AsyncIterable):
|
||||
@@ -67,7 +68,8 @@ class AIService(FrameProcessor):
|
||||
|
||||
|
||||
class LLMService(AIService):
|
||||
""" This class is a no-op but serves as a base class for LLM services. """
|
||||
"""This class is a no-op but serves as a base class for LLM services."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
@@ -105,7 +107,7 @@ class TTSService(AIService):
|
||||
text = frame.text
|
||||
else:
|
||||
self.current_sentence += frame.text
|
||||
if self.current_sentence.endswith((".", "?", "!")):
|
||||
if self.current_sentence.strip().endswith((".", "?", "!")):
|
||||
text = self.current_sentence
|
||||
self.current_sentence = ""
|
||||
|
||||
@@ -118,7 +120,9 @@ class TTSService(AIService):
|
||||
|
||||
# Convenience function to send the audio for a sentence to the given queue
|
||||
async def say(self, sentence, queue: asyncio.Queue):
|
||||
await self.run_to_queue(queue, [LLMResponseStartFrame(), TextFrame(sentence), LLMResponseEndFrame()])
|
||||
await self.run_to_queue(
|
||||
queue, [LLMResponseStartFrame(), TextFrame(sentence), LLMResponseEndFrame()]
|
||||
)
|
||||
|
||||
|
||||
class ImageGenService(AIService):
|
||||
@@ -169,7 +173,7 @@ class STTService(AIService):
|
||||
ww.close()
|
||||
content.seek(0)
|
||||
text = await self.run_stt(content)
|
||||
yield TranscriptionQueueFrame(text, '', str(time.time()))
|
||||
yield TranscriptionQueueFrame(text, "", str(time.time()))
|
||||
|
||||
|
||||
class FrameLogger(AIService):
|
||||
|
||||
@@ -17,43 +17,40 @@ from dailyai.pipeline.frames import (
|
||||
EndFrame,
|
||||
ImageFrame,
|
||||
Frame,
|
||||
PipelineStartedFrame,
|
||||
SpriteFrame,
|
||||
StartFrame,
|
||||
TranscriptionQueueFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
from dailyai.pipeline.pipeline import Pipeline
|
||||
|
||||
torch.set_num_threads(1)
|
||||
|
||||
model, utils = torch.hub.load(repo_or_dir='snakers4/silero-vad',
|
||||
model='silero_vad',
|
||||
force_reload=False)
|
||||
model, utils = torch.hub.load(
|
||||
repo_or_dir="snakers4/silero-vad", model="silero_vad", force_reload=False
|
||||
)
|
||||
|
||||
(get_speech_timestamps,
|
||||
save_audio,
|
||||
read_audio,
|
||||
VADIterator,
|
||||
collect_chunks) = utils
|
||||
(get_speech_timestamps, save_audio, read_audio, VADIterator, collect_chunks) = utils
|
||||
|
||||
# Taken from utils_vad.py
|
||||
|
||||
|
||||
def validate(model,
|
||||
inputs: torch.Tensor):
|
||||
def validate(model, inputs: torch.Tensor):
|
||||
with torch.no_grad():
|
||||
outs = model(inputs)
|
||||
return outs
|
||||
|
||||
|
||||
# Provided by Alexander Veysov
|
||||
|
||||
|
||||
def int2float(sound):
|
||||
abs_max = np.abs(sound).max()
|
||||
sound = sound.astype('float32')
|
||||
sound = sound.astype("float32")
|
||||
if abs_max > 0:
|
||||
sound *= 1/32768
|
||||
sound *= 1 / 32768
|
||||
sound = sound.squeeze() # depends on the use case
|
||||
return sound
|
||||
|
||||
@@ -73,7 +70,7 @@ class VADState(Enum):
|
||||
STOPPING = 4
|
||||
|
||||
|
||||
class BaseTransportService():
|
||||
class BaseTransportService:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -94,7 +91,8 @@ class BaseTransportService():
|
||||
|
||||
if self._vad_enabled and self._speaker_enabled:
|
||||
raise Exception(
|
||||
"Sorry, you can't use speaker_enabled and vad_enabled at the same time. Please set one to False.")
|
||||
"Sorry, you can't use speaker_enabled and vad_enabled at the same time. Please set one to False."
|
||||
)
|
||||
|
||||
self._vad_samples = 1536
|
||||
vad_frame_s = self._vad_samples / SAMPLE_RATE
|
||||
@@ -130,20 +128,20 @@ class BaseTransportService():
|
||||
async def run(self):
|
||||
self._prerun()
|
||||
|
||||
async_output_queue_marshal_task = asyncio.create_task(
|
||||
self._marshal_frames())
|
||||
async_output_queue_marshal_task = asyncio.create_task(self._marshal_frames())
|
||||
|
||||
self._camera_thread = threading.Thread(
|
||||
target=self._run_camera, daemon=True)
|
||||
self._camera_thread = threading.Thread(target=self._run_camera, daemon=True)
|
||||
self._camera_thread.start()
|
||||
|
||||
self._frame_consumer_thread = threading.Thread(
|
||||
target=self._frame_consumer, daemon=True)
|
||||
target=self._frame_consumer, daemon=True
|
||||
)
|
||||
self._frame_consumer_thread.start()
|
||||
|
||||
if self._speaker_enabled:
|
||||
self._receive_audio_thread = threading.Thread(
|
||||
target=self._receive_audio, daemon=True)
|
||||
target=self._receive_audio, daemon=True
|
||||
)
|
||||
self._receive_audio_thread.start()
|
||||
|
||||
if self._vad_enabled:
|
||||
@@ -151,10 +149,7 @@ class BaseTransportService():
|
||||
self._vad_thread.start()
|
||||
|
||||
try:
|
||||
while (
|
||||
time.time() < self._expiration
|
||||
and not self._stop_threads.is_set()
|
||||
):
|
||||
while time.time() < self._expiration and not self._stop_threads.is_set():
|
||||
await asyncio.sleep(1)
|
||||
except Exception as e:
|
||||
self._logger.error(f"Exception {e}")
|
||||
@@ -278,8 +273,7 @@ class BaseTransportService():
|
||||
audio_chunk = self.read_audio_frames(self._vad_samples)
|
||||
audio_int16 = np.frombuffer(audio_chunk, np.int16)
|
||||
audio_float32 = int2float(audio_int16)
|
||||
new_confidence = model(
|
||||
torch.from_numpy(audio_float32), 16000).item()
|
||||
new_confidence = model(torch.from_numpy(audio_float32), 16000).item()
|
||||
speaking = new_confidence > 0.5
|
||||
|
||||
if speaking:
|
||||
@@ -303,18 +297,22 @@ class BaseTransportService():
|
||||
case VADState.STOPPING:
|
||||
self._vad_stopping_count += 1
|
||||
|
||||
if self._vad_state == VADState.STARTING and self._vad_starting_count >= self._vad_start_frames:
|
||||
if (
|
||||
self._vad_state == VADState.STARTING
|
||||
and self._vad_starting_count >= self._vad_start_frames
|
||||
):
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.receive_queue.put(
|
||||
UserStartedSpeakingFrame()), self._loop
|
||||
self.receive_queue.put(UserStartedSpeakingFrame()), self._loop
|
||||
)
|
||||
# self.interrupt()
|
||||
self._vad_state = VADState.SPEAKING
|
||||
self._vad_starting_count = 0
|
||||
if self._vad_state == VADState.STOPPING and self._vad_stopping_count >= self._vad_stop_frames:
|
||||
if (
|
||||
self._vad_state == VADState.STOPPING
|
||||
and self._vad_stopping_count >= self._vad_stop_frames
|
||||
):
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.receive_queue.put(
|
||||
UserStoppedSpeakingFrame()), self._loop
|
||||
self.receive_queue.put(UserStoppedSpeakingFrame()), self._loop
|
||||
)
|
||||
self._vad_state = VADState.QUIET
|
||||
self._vad_stopping_count = 0
|
||||
@@ -353,9 +351,7 @@ class BaseTransportService():
|
||||
self.receive_queue.put(frame), self._loop
|
||||
)
|
||||
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.receive_queue.put(EndFrame()), self._loop
|
||||
)
|
||||
asyncio.run_coroutine_threadsafe(self.receive_queue.put(EndFrame()), self._loop)
|
||||
|
||||
def _set_image(self, image: bytes):
|
||||
self._images = itertools.cycle([image])
|
||||
@@ -382,15 +378,17 @@ class BaseTransportService():
|
||||
largest_write_size = 8000
|
||||
while True:
|
||||
try:
|
||||
frames_or_frame: Frame | list[Frame] = (
|
||||
self._threadsafe_send_queue.get()
|
||||
)
|
||||
if isinstance(frames_or_frame, AudioFrame) and len(frames_or_frame.data) > largest_write_size:
|
||||
frames_or_frame: Frame | list[Frame] = self._threadsafe_send_queue.get()
|
||||
if (
|
||||
isinstance(frames_or_frame, AudioFrame)
|
||||
and len(frames_or_frame.data) > largest_write_size
|
||||
):
|
||||
# subdivide large audio frames to enable interruption
|
||||
frames = []
|
||||
for i in range(0, len(frames_or_frame.data), largest_write_size):
|
||||
frames.append(AudioFrame(
|
||||
frames_or_frame.data[i: i+largest_write_size]))
|
||||
frames.append(
|
||||
AudioFrame(frames_or_frame.data[i : i + largest_write_size])
|
||||
)
|
||||
elif isinstance(frames_or_frame, Frame):
|
||||
frames: list[Frame] = [frames_or_frame]
|
||||
elif isinstance(frames_or_frame, list):
|
||||
@@ -419,8 +417,7 @@ class BaseTransportService():
|
||||
len(b) % smallest_write_size
|
||||
)
|
||||
if truncated_length:
|
||||
self.write_frame_to_mic(
|
||||
bytes(b[:truncated_length]))
|
||||
self.write_frame_to_mic(bytes(b[:truncated_length]))
|
||||
b = b[truncated_length:]
|
||||
elif isinstance(frame, ImageFrame):
|
||||
self._set_image(frame.image)
|
||||
@@ -434,12 +431,15 @@ class BaseTransportService():
|
||||
# can cause static in the audio stream.
|
||||
if len(b):
|
||||
truncated_length = len(b) - (len(b) % 160)
|
||||
self.write_frame_to_mic(
|
||||
bytes(b[:truncated_length]))
|
||||
self.write_frame_to_mic(bytes(b[:truncated_length]))
|
||||
b = bytearray()
|
||||
|
||||
if isinstance(frame, StartFrame):
|
||||
self._is_interrupted.clear()
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.receive_queue.put(PipelineStartedFrame()),
|
||||
self._loop,
|
||||
)
|
||||
|
||||
if self._loop:
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
@@ -453,6 +453,5 @@ class BaseTransportService():
|
||||
|
||||
b = bytearray()
|
||||
except Exception as e:
|
||||
self._logger.error(
|
||||
f"Exception in frame_consumer: {e}, {len(b)}")
|
||||
self._logger.error(f"Exception in frame_consumer: {e}, {len(b)}")
|
||||
raise e
|
||||
|
||||
BIN
src/examples/starter-apps/assets/ding3.wav
Normal file
BIN
src/examples/starter-apps/assets/grandma-listening.png
Normal file
|
After Width: | Height: | Size: 1.1 MiB |
BIN
src/examples/starter-apps/assets/grandma-writing.png
Normal file
|
After Width: | Height: | Size: 1.1 MiB |
BIN
src/examples/starter-apps/assets/listening.wav
Normal file
BIN
src/examples/starter-apps/assets/robot01.png
Normal file
|
After Width: | Height: | Size: 759 KiB |
BIN
src/examples/starter-apps/assets/robot010.png
Normal file
|
After Width: | Height: | Size: 884 KiB |
BIN
src/examples/starter-apps/assets/robot011.png
Normal file
|
After Width: | Height: | Size: 876 KiB |
BIN
src/examples/starter-apps/assets/robot012.png
Normal file
|
After Width: | Height: | Size: 881 KiB |
BIN
src/examples/starter-apps/assets/robot013.png
Normal file
|
After Width: | Height: | Size: 866 KiB |
BIN
src/examples/starter-apps/assets/robot014.png
Normal file
|
After Width: | Height: | Size: 874 KiB |
BIN
src/examples/starter-apps/assets/robot015.png
Normal file
|
After Width: | Height: | Size: 882 KiB |
BIN
src/examples/starter-apps/assets/robot016.png
Normal file
|
After Width: | Height: | Size: 885 KiB |
BIN
src/examples/starter-apps/assets/robot017.png
Normal file
|
After Width: | Height: | Size: 888 KiB |
BIN
src/examples/starter-apps/assets/robot018.png
Normal file
|
After Width: | Height: | Size: 890 KiB |
BIN
src/examples/starter-apps/assets/robot019.png
Normal file
|
After Width: | Height: | Size: 898 KiB |
BIN
src/examples/starter-apps/assets/robot02.png
Normal file
|
After Width: | Height: | Size: 836 KiB |
BIN
src/examples/starter-apps/assets/robot020.png
Normal file
|
After Width: | Height: | Size: 903 KiB |
BIN
src/examples/starter-apps/assets/robot021.png
Normal file
|
After Width: | Height: | Size: 908 KiB |
BIN
src/examples/starter-apps/assets/robot022.png
Normal file
|
After Width: | Height: | Size: 908 KiB |
BIN
src/examples/starter-apps/assets/robot023.png
Normal file
|
After Width: | Height: | Size: 905 KiB |
BIN
src/examples/starter-apps/assets/robot024.png
Normal file
|
After Width: | Height: | Size: 903 KiB |
BIN
src/examples/starter-apps/assets/robot025.png
Normal file
|
After Width: | Height: | Size: 866 KiB |
BIN
src/examples/starter-apps/assets/robot03.png
Normal file
|
After Width: | Height: | Size: 849 KiB |
BIN
src/examples/starter-apps/assets/robot04.png
Normal file
|
After Width: | Height: | Size: 866 KiB |
BIN
src/examples/starter-apps/assets/robot05.png
Normal file
|
After Width: | Height: | Size: 866 KiB |
BIN
src/examples/starter-apps/assets/robot06.png
Normal file
|
After Width: | Height: | Size: 864 KiB |
BIN
src/examples/starter-apps/assets/robot07.png
Normal file
|
After Width: | Height: | Size: 858 KiB |
BIN
src/examples/starter-apps/assets/robot08.png
Normal file
|
After Width: | Height: | Size: 875 KiB |
BIN
src/examples/starter-apps/assets/robot09.png
Normal file
|
After Width: | Height: | Size: 881 KiB |
BIN
src/examples/starter-apps/assets/talking.wav
Normal file
150
src/examples/starter-apps/chatbot.py
Normal file
@@ -0,0 +1,150 @@
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import logging
|
||||
import os
|
||||
from PIL import Image
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from dailyai.pipeline.aggregators import (
|
||||
LLMAssistantContextAggregator,
|
||||
LLMResponseAggregator,
|
||||
LLMUserContextAggregator,
|
||||
UserResponseAggregator,
|
||||
)
|
||||
from dailyai.pipeline.frames import (
|
||||
ImageFrame,
|
||||
SpriteFrame,
|
||||
Frame,
|
||||
LLMResponseEndFrame,
|
||||
LLMResponseStartFrame,
|
||||
LLMMessagesQueueFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
AudioFrame,
|
||||
PipelineStartedFrame,
|
||||
)
|
||||
from dailyai.services.ai_services import AIService
|
||||
from dailyai.pipeline.pipeline import Pipeline
|
||||
from dailyai.services.ai_services import FrameLogger
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.open_ai_services import OpenAILLMService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from examples.support.runner import configure
|
||||
|
||||
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
|
||||
logger = logging.getLogger("dailyai")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
sprites = []
|
||||
|
||||
script_dir = os.path.dirname(__file__)
|
||||
|
||||
for i in range(1, 26):
|
||||
# Build the full path to the image file
|
||||
full_path = os.path.join(script_dir, f"assets/robot0{i}.png")
|
||||
# Get the filename without the extension to use as the dictionary key
|
||||
# Open the image and convert it to bytes
|
||||
with Image.open(full_path) as img:
|
||||
sprites.append(img.tobytes())
|
||||
|
||||
flipped = sprites[::-1]
|
||||
sprites.extend(flipped)
|
||||
# When the bot isn't talking, show a static image of the cat listening
|
||||
quiet_frame = ImageFrame("", sprites[0])
|
||||
talking_frame = SpriteFrame(images=sprites)
|
||||
|
||||
|
||||
class TalkingAnimation(AIService):
|
||||
"""
|
||||
This class starts a talking animation when it receives an first AudioFrame,
|
||||
and then returns to a "quiet" sprite when it sees a LLMResponseEndFrame.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._is_talking = False
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
if isinstance(frame, AudioFrame):
|
||||
if not self._is_talking:
|
||||
yield talking_frame
|
||||
yield frame
|
||||
self._is_talking = True
|
||||
else:
|
||||
yield frame
|
||||
elif isinstance(frame, LLMResponseEndFrame):
|
||||
yield quiet_frame
|
||||
yield frame
|
||||
self._is_talking = False
|
||||
else:
|
||||
yield frame
|
||||
|
||||
|
||||
class AnimationInitializer(AIService):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
if isinstance(frame, PipelineStartedFrame):
|
||||
yield quiet_frame
|
||||
yield frame
|
||||
else:
|
||||
yield frame
|
||||
|
||||
|
||||
async def main(room_url: str, token):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
token,
|
||||
"Chatbot",
|
||||
duration_minutes=5,
|
||||
start_transcription=True,
|
||||
mic_enabled=True,
|
||||
mic_sample_rate=16000,
|
||||
camera_enabled=True,
|
||||
camera_width=1024,
|
||||
camera_height=576,
|
||||
vad_enabled=True,
|
||||
)
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id="pNInz6obpgDQGcFmaJgB",
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_CHATGPT_API_KEY"), model="gpt-4-turbo-preview"
|
||||
)
|
||||
|
||||
ta = TalkingAnimation()
|
||||
ai = AnimationInitializer()
|
||||
pipeline = Pipeline([ai, llm, tts, ta])
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself.",
|
||||
},
|
||||
]
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
print(f"!!! in here, pipeline.source is {pipeline.source}")
|
||||
await pipeline.queue_frames(LLMMessagesQueueFrame(messages))
|
||||
|
||||
async def run_conversation():
|
||||
|
||||
await transport.run_interruptible_pipeline(
|
||||
pipeline,
|
||||
post_processor=LLMResponseAggregator(messages),
|
||||
pre_processor=UserResponseAggregator(messages),
|
||||
)
|
||||
|
||||
transport.transcription_settings["extra"]["endpointing"] = True
|
||||
transport.transcription_settings["extra"]["punctuate"] = True
|
||||
await asyncio.gather(transport.run(), run_conversation())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
(url, token) = configure()
|
||||
asyncio.run(main(url, token))
|
||||
@@ -384,17 +384,13 @@ async def main(room_url: str, token):
|
||||
checklist = ChecklistProcessor(context, llm)
|
||||
fl = FrameLogger("FRAME LOGGER 1:")
|
||||
fl2 = FrameLogger("FRAME LOGGER 2:")
|
||||
pipeline = Pipeline(processors=[fl, llm, fl2, checklist, tts])
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
fl = FrameLogger("first other participant")
|
||||
await tts.run_to_queue(
|
||||
transport.send_queue,
|
||||
llm.run([OpenAILLMContextFrame(context)]),
|
||||
)
|
||||
await pipeline.queue_frames([OpenAILLMContextFrame(context)])
|
||||
|
||||
async def handle_intake():
|
||||
pipeline = Pipeline(processors=[fl, llm, fl2, checklist, tts])
|
||||
await transport.run_interruptible_pipeline(
|
||||
pipeline,
|
||||
post_processor=OpenAIAssistantContextAggregator(context),
|
||||
|
||||
291
src/examples/starter-apps/storybot.py
Normal file
@@ -0,0 +1,291 @@
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import json
|
||||
import random
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import wave
|
||||
from typing import AsyncGenerator
|
||||
from PIL import Image
|
||||
|
||||
from dailyai.pipeline.pipeline import Pipeline
|
||||
from dailyai.pipeline.frame_processor import FrameProcessor
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.services.fal_ai_services import FalImageGenService
|
||||
from dailyai.services.open_ai_services import OpenAILLMService
|
||||
from dailyai.services.deepgram_ai_services import DeepgramTTSService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from dailyai.pipeline.aggregators import (
|
||||
LLMAssistantContextAggregator,
|
||||
LLMContextAggregator,
|
||||
LLMUserContextAggregator,
|
||||
ParallelPipeline,
|
||||
UserResponseAggregator,
|
||||
LLMResponseAggregator,
|
||||
)
|
||||
from examples.support.runner import configure
|
||||
from dailyai.pipeline.frames import (
|
||||
LLMMessagesQueueFrame,
|
||||
TranscriptionQueueFrame,
|
||||
Frame,
|
||||
TextFrame,
|
||||
LLMFunctionCallFrame,
|
||||
LLMFunctionStartFrame,
|
||||
LLMResponseEndFrame,
|
||||
StartFrame,
|
||||
AudioFrame,
|
||||
SpriteFrame,
|
||||
ImageFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
from dailyai.services.ai_services import FrameLogger, AIService
|
||||
|
||||
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
|
||||
logger = logging.getLogger("dailyai")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
sounds = {}
|
||||
images = {}
|
||||
sound_files = ["talking.wav", "listening.wav", "ding3.wav"]
|
||||
image_files = ["grandma-writing.png", "grandma-listening.png"]
|
||||
script_dir = os.path.dirname(__file__)
|
||||
|
||||
for file in sound_files:
|
||||
# Build the full path to the sound file
|
||||
full_path = os.path.join(script_dir, "assets", file)
|
||||
# Get the filename without the extension to use as the dictionary key
|
||||
filename = os.path.splitext(os.path.basename(full_path))[0]
|
||||
# Open the sound and convert it to bytes
|
||||
with wave.open(full_path) as audio_file:
|
||||
sounds[file] = audio_file.readframes(-1)
|
||||
|
||||
for file in image_files:
|
||||
# Build the full path to the image file
|
||||
full_path = os.path.join(script_dir, "assets", file)
|
||||
# Get the filename without the extension to use as the dictionary key
|
||||
filename = os.path.splitext(os.path.basename(full_path))[0]
|
||||
# Open the image and convert it to bytes
|
||||
with Image.open(full_path) as img:
|
||||
images[file] = img.tobytes()
|
||||
|
||||
|
||||
class StoryStartFrame(TextFrame):
|
||||
pass
|
||||
|
||||
|
||||
class StoryPageFrame(TextFrame):
|
||||
pass
|
||||
|
||||
|
||||
class StoryPromptFrame(TextFrame):
|
||||
pass
|
||||
|
||||
|
||||
class StoryProcessor(FrameProcessor):
|
||||
def __init__(self, messages, story):
|
||||
self._messages = messages
|
||||
self._text = ""
|
||||
self._story = story
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
"""
|
||||
The response from the LLM service looks like:
|
||||
A comment about the user's choice
|
||||
[start] (when the cat starts telling parts of the story)
|
||||
A sentence of the story
|
||||
[break] (between each sentence/'page' of the story)
|
||||
[prompt] (when the cat asks the user to make a decision)
|
||||
Question about the next part of the story
|
||||
|
||||
1. Catch the frames that are generated by the LLM service
|
||||
"""
|
||||
if isinstance(frame, UserStoppedSpeakingFrame):
|
||||
yield ImageFrame(None, images["grandma-writing.png"])
|
||||
yield AudioFrame(sounds["talking.wav"])
|
||||
|
||||
elif isinstance(frame, TextFrame):
|
||||
self._text += frame.text
|
||||
|
||||
if re.findall(r".*\[[sS]tart\].*", self._text):
|
||||
# Then we have the intro. Send it to speech ASAP
|
||||
self._text = self._text.replace("[Start]", "")
|
||||
self._text = self._text.replace("[start]", "")
|
||||
|
||||
self._text = self._text.replace("\n", " ")
|
||||
if len(self._text) > 2:
|
||||
yield ImageFrame(None, images["grandma-writing.png"])
|
||||
yield StoryStartFrame(self._text)
|
||||
yield AudioFrame(sounds["ding3.wav"])
|
||||
self._text = ""
|
||||
|
||||
elif re.findall(r".*\[[bB]reak\].*", self._text):
|
||||
# Then it's a page of the story. Get an image too
|
||||
self._text = self._text.replace("[Break]", "")
|
||||
self._text = self._text.replace("[break]", "")
|
||||
self._text = self._text.replace("\n", " ")
|
||||
if len(self._text) > 2:
|
||||
self._story.append(self._text)
|
||||
yield StoryPageFrame(self._text)
|
||||
yield AudioFrame(sounds["ding3.wav"])
|
||||
|
||||
self._text = ""
|
||||
elif re.findall(r".*\[[pP]rompt\].*", self._text):
|
||||
# Then it's question time. Flush any
|
||||
# text here as a story page, then set
|
||||
# the var to get to prompt mode
|
||||
# cb: trying scene now
|
||||
# self.handle_chunk(self._text)
|
||||
self._text = self._text.replace("[Prompt]", "")
|
||||
self._text = self._text.replace("[prompt]", "")
|
||||
|
||||
self._text = self._text.replace("\n", " ")
|
||||
if len(self._text) > 2:
|
||||
self._story.append(self._text)
|
||||
yield StoryPageFrame(self._text)
|
||||
else:
|
||||
# After the prompt thing, we'll catch an LLM end to get the last bit
|
||||
pass
|
||||
elif isinstance(frame, LLMResponseEndFrame):
|
||||
yield ImageFrame(None, images["grandma-writing.png"])
|
||||
yield StoryPromptFrame(self._text)
|
||||
self._text = ""
|
||||
yield frame
|
||||
yield ImageFrame(None, images["grandma-listening.png"])
|
||||
yield AudioFrame(sounds["listening.wav"])
|
||||
|
||||
else:
|
||||
# pass through everything that's not a TextFrame
|
||||
yield frame
|
||||
|
||||
|
||||
class StoryImageGenerator(FrameProcessor):
|
||||
def __init__(self, story, llm, img):
|
||||
self._story = story
|
||||
self._llm = llm
|
||||
self._img = img
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
if isinstance(frame, StoryPageFrame):
|
||||
if len(self._story) == 1:
|
||||
prompt = f'You are an illustrator for a children\'s story book. Generate a prompt for DALL-E to create an illustration for the first page of the book, which reads: "{self._story[0]}"\n\n Your response should start with the phrase "Children\'s book illustration of".'
|
||||
else:
|
||||
prompt = f"You are an illustrator for a children's story book. Here is the story so far:\n\n\"{' '.join(self._story[:-1])}\"\n\nGenerate a prompt for DALL-E to create an illustration for the next page. Here's the sentence for the next page:\n\n\"{self._story[-1:][0]}\"\n\n Your response should start with the phrase \"Children's book illustration of\"."
|
||||
msgs = [{"role": "system", "content": prompt}]
|
||||
image_prompt = ""
|
||||
async for f in self._llm.process_frame(LLMMessagesQueueFrame(msgs)):
|
||||
if isinstance(f, TextFrame):
|
||||
image_prompt += f.text
|
||||
async for f in self._img.process_frame(TextFrame(image_prompt)):
|
||||
yield f
|
||||
# Yield the original StoryPageFrame for basic image/audio sync
|
||||
yield frame
|
||||
else:
|
||||
yield frame
|
||||
|
||||
|
||||
async def main(room_url: str, token):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
global transport
|
||||
global llm
|
||||
global tts
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a storytelling grandma who loves to make up fantastic, fun, and educational stories for children between the ages of 5 and 10 years old. Your stories are full of friendly, magical creatures. Your stories are never scary. Each sentence of your story will become a page in a storybook. Stop after 3-4 sentences and give the child a choice to make that will influence the next part of the story. Once the child responds, start by saying something nice about the choice they made, then include [start] in your response. Include [break] after each sentence of the story. Include [prompt] between the story and the prompt.",
|
||||
}
|
||||
]
|
||||
|
||||
story = []
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_CHATGPT_API_KEY"),
|
||||
model="gpt-4-1106-preview",
|
||||
) # gpt-4-1106-preview
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id="Xb7hH8MSUJpSbSDYk0k2",
|
||||
) # matilda
|
||||
img = FalImageGenService(
|
||||
image_size="1024x1024",
|
||||
aiohttp_session=session,
|
||||
key_id=os.getenv("FAL_KEY_ID"),
|
||||
key_secret=os.getenv("FAL_KEY_SECRET"),
|
||||
)
|
||||
lra = LLMResponseAggregator(messages)
|
||||
ura = UserResponseAggregator(messages)
|
||||
sp = StoryProcessor(messages, story)
|
||||
sig = StoryImageGenerator(story, llm, img)
|
||||
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
token,
|
||||
"Storybot",
|
||||
5,
|
||||
mic_enabled=True,
|
||||
mic_sample_rate=16000,
|
||||
camera_enabled=True,
|
||||
camera_width=1024,
|
||||
camera_height=1024,
|
||||
start_transcription=True,
|
||||
vad_enabled=True,
|
||||
vad_stop_s=1.5,
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
# We're being a bit tricky here by using a special system prompt to
|
||||
# ask the user for a story topic. After their intial response, we'll
|
||||
# use a different system prompt to create story pages.
|
||||
intro_messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a storytelling grandma who loves to make up fantastic, fun, and educational stories for children between the ages of 5 and 10 years old. Your stories are full of friendly, magical creatures. Your stories are never scary. Begin by asking what a child wants you to tell a story about. Keep your reponse to only a few sentences.",
|
||||
}
|
||||
]
|
||||
lca = LLMAssistantContextAggregator(messages)
|
||||
await tts.run_to_queue(
|
||||
transport.send_queue,
|
||||
lca.run(
|
||||
llm.run(
|
||||
[
|
||||
ImageFrame(None, images["grandma-listening.png"]),
|
||||
LLMMessagesQueueFrame(intro_messages),
|
||||
AudioFrame(sounds["listening.wav"]),
|
||||
]
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
async def storytime():
|
||||
fl = FrameLogger("### After Image Generation")
|
||||
pipeline = Pipeline(
|
||||
processors=[
|
||||
ura,
|
||||
llm,
|
||||
sp,
|
||||
sig,
|
||||
fl,
|
||||
tts,
|
||||
lra,
|
||||
]
|
||||
)
|
||||
await transport.run_uninterruptible_pipeline(
|
||||
pipeline,
|
||||
)
|
||||
|
||||
transport.transcription_settings["extra"]["endpointing"] = True
|
||||
transport.transcription_settings["extra"]["punctuate"] = True
|
||||
try:
|
||||
await asyncio.gather(transport.run(), storytime())
|
||||
except (asyncio.CancelledError, KeyboardInterrupt):
|
||||
print("whoops")
|
||||
transport.stop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
(url, token) = configure()
|
||||
asyncio.run(main(url, token))
|
||||
@@ -1,409 +0,0 @@
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import json
|
||||
import random
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import wave
|
||||
from typing import AsyncGenerator
|
||||
from PIL import Image
|
||||
|
||||
from dailyai.pipeline.pipeline import Pipeline
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.services.open_ai_services import OpenAILLMService
|
||||
from dailyai.services.deepgram_ai_services import DeepgramTTSService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from dailyai.pipeline.aggregators import (
|
||||
LLMAssistantContextAggregator,
|
||||
LLMContextAggregator,
|
||||
LLMUserContextAggregator,
|
||||
UserResponseAggregator,
|
||||
LLMResponseAggregator,
|
||||
)
|
||||
from examples.support.runner import configure
|
||||
from dailyai.pipeline.frames import (
|
||||
LLMMessagesQueueFrame,
|
||||
TranscriptionQueueFrame,
|
||||
Frame,
|
||||
TextFrame,
|
||||
LLMFunctionCallFrame,
|
||||
LLMFunctionStartFrame,
|
||||
LLMResponseEndFrame,
|
||||
StartFrame,
|
||||
AudioFrame,
|
||||
SpriteFrame,
|
||||
ImageFrame,
|
||||
)
|
||||
from dailyai.services.ai_services import FrameLogger, AIService
|
||||
|
||||
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
|
||||
logger = logging.getLogger("dailyai")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
sounds = {}
|
||||
sound_files = ["clack-short.wav", "clack.wav", "clack-short-quiet.wav"]
|
||||
|
||||
script_dir = os.path.dirname(__file__)
|
||||
|
||||
for file in sound_files:
|
||||
# Build the full path to the image file
|
||||
full_path = os.path.join(script_dir, "assets", file)
|
||||
# Get the filename without the extension to use as the dictionary key
|
||||
filename = os.path.splitext(os.path.basename(full_path))[0]
|
||||
# Open the image and convert it to bytes
|
||||
with wave.open(full_path) as audio_file:
|
||||
sounds[file] = audio_file.readframes(-1)
|
||||
|
||||
|
||||
steps = [
|
||||
{
|
||||
"prompt": "Start by introducing yourself. Then, ask the user to confirm their identity by telling you their birthday, including the year. When they answer with their birthday, call the verify_birthday function.",
|
||||
"run_async": False,
|
||||
"failed": "The user provided an incorrect birthday. Ask them for their birthday again. When they answer, call the verify_birthday function.",
|
||||
"tools": [
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "verify_birthday",
|
||||
"description": "Use this function to verify the user has provided their correct birthday.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"birthday": {
|
||||
"type": "string",
|
||||
"description": "The user's birthdate, including the year. The user can provide it in any format, but convert it to YYYY-MM-DD format to call this function.",
|
||||
}
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
],
|
||||
},
|
||||
{
|
||||
"prompt": "Next, thank the user for confirming their identity, then ask the user to list their current prescriptions. Each prescription needs to have a medication name and a dosage. Do not call the list_prescriptions function with any unknown dosages.",
|
||||
"run_async": True,
|
||||
"tools": [
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "list_prescriptions",
|
||||
"description": "Once the user has provided a list of their prescription medications, call this function.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"prescriptions": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"medication": {
|
||||
"type": "string",
|
||||
"description": "The medication's name",
|
||||
},
|
||||
"dosage": {
|
||||
"type": "string",
|
||||
"description": "The prescription's dosage",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
],
|
||||
},
|
||||
{
|
||||
"prompt": "Next, ask the user if they have any allergies. Once they have listed their allergies or confirmed they don't have any, call the list_allergies function.",
|
||||
"run_async": True,
|
||||
"tools": [
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "list_allergies",
|
||||
"description": "Once the user has provided a list of their allergies, call this function.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"allergies": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string",
|
||||
"description": "What the user is allergic to",
|
||||
}
|
||||
},
|
||||
},
|
||||
}
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
],
|
||||
},
|
||||
{
|
||||
"prompt": "Now ask the user if they have any medical conditions the doctor should know about. Once they've answered the question, call the list_conditions function.",
|
||||
"run_async": True,
|
||||
"tools": [
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "list_conditions",
|
||||
"description": "Once the user has provided a list of their medical conditions, call this function.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"conditions": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string",
|
||||
"description": "The user's medical condition",
|
||||
}
|
||||
},
|
||||
},
|
||||
}
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
"prompt": "Finally, ask the user the reason for their doctor visit today. Once they answer, call the list_visit_reasons function.",
|
||||
"run_async": True,
|
||||
"tools": [
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "list_visit_reasons",
|
||||
"description": "Once the user has provided a list of the reasons they are visiting a doctor today, call this function.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"visit_reasons": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string",
|
||||
"description": "The user's reason for visiting the doctor",
|
||||
}
|
||||
},
|
||||
},
|
||||
}
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
],
|
||||
},
|
||||
{
|
||||
"prompt": "Now, thank the user and end the conversation.",
|
||||
"run_async": True,
|
||||
"tools": [],
|
||||
},
|
||||
{"prompt": "", "run_async": True, "tools": []},
|
||||
]
|
||||
current_step = 0
|
||||
|
||||
|
||||
class TranscriptFilter(AIService):
|
||||
def __init__(self, bot_participant_id=None):
|
||||
super().__init__()
|
||||
self.bot_participant_id = bot_participant_id
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
if isinstance(frame, TranscriptionQueueFrame):
|
||||
if frame.participantId != self.bot_participant_id:
|
||||
yield frame
|
||||
|
||||
|
||||
class ChecklistProcessor(AIService):
|
||||
def __init__(self, messages, llm, tools, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self._messages = messages
|
||||
self._llm = llm
|
||||
self._tools = tools
|
||||
self._id = "You are Jessica, an agent for a company called Tri-County Health Services. Your job is to collect important information from the user before their doctor visit. You're talking to Chad Bailey. You should address the user by their first name and be polite and professional. You're not a medical professional, so you shouldn't provide any advice. Keep your responses short. Your job is to collect information to give to a doctor. Don't make assumptions about what values to plug into functions. Ask for clarification if a user response is ambiguous."
|
||||
self._acks = ["One sec.", "Let me confirm that.", "Thanks.", "OK."]
|
||||
|
||||
# Create an allowlist of functions that the LLM can call
|
||||
self._functions = [
|
||||
"verify_birthday",
|
||||
"list_prescriptions",
|
||||
"list_allergies",
|
||||
"list_conditions",
|
||||
"list_visit_reasons",
|
||||
]
|
||||
|
||||
messages.append(
|
||||
{"role": "system", "content": f"{self._id} {steps[0]['prompt']}"}
|
||||
)
|
||||
|
||||
def verify_birthday(self, args):
|
||||
return args["birthday"] == "1983-01-01"
|
||||
|
||||
def list_prescriptions(self, args):
|
||||
# print(f"--- Prescriptions: {args['prescriptions']}\n")
|
||||
pass
|
||||
|
||||
def list_allergies(self, args):
|
||||
# print(f"--- Allergies: {args['allergies']}\n")
|
||||
pass
|
||||
|
||||
def list_conditions(self, args):
|
||||
# print(f"--- Medical Conditions: {args['conditions']}")
|
||||
pass
|
||||
|
||||
def list_visit_reasons(self, args):
|
||||
# print(f"Visit Reasons: {args['visit_reasons']}")
|
||||
pass
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
global current_step
|
||||
this_step = steps[current_step]
|
||||
# TODO-CB: forcing a global here :/
|
||||
self._tools.clear()
|
||||
self._tools.extend(this_step["tools"])
|
||||
if isinstance(frame, LLMFunctionStartFrame):
|
||||
print(f"... Preparing function call: {frame.function_name}")
|
||||
self._function_name = frame.function_name
|
||||
if this_step["run_async"]:
|
||||
# Get the LLM talking about the next step before getting the rest
|
||||
# of the function call completion
|
||||
current_step += 1
|
||||
self._messages.append(
|
||||
{"role": "system", "content": steps[current_step]["prompt"]}
|
||||
)
|
||||
yield LLMMessagesQueueFrame(self._messages)
|
||||
async for frame in llm.process_frame(
|
||||
LLMMessagesQueueFrame(self._messages), tool_choice="none"
|
||||
):
|
||||
yield frame
|
||||
else:
|
||||
# Insert a quick response while we run the function
|
||||
# yield AudioFrame(sounds["clack-short-quiet.wav"])
|
||||
pass
|
||||
elif isinstance(frame, LLMFunctionCallFrame):
|
||||
|
||||
if frame.function_name and frame.arguments:
|
||||
print(f"--> Calling function: {frame.function_name} with arguments:")
|
||||
pretty_json = re.sub(
|
||||
"\n", "\n ", json.dumps(json.loads(frame.arguments), indent=2)
|
||||
)
|
||||
print(f"--> {pretty_json}\n")
|
||||
if not frame.function_name in self._functions:
|
||||
raise Exception(
|
||||
f"The LLM tried to call a function named {frame.function_name}, which isn't in the list of known functions. Please check your prompt and/or self._functions."
|
||||
)
|
||||
fn = getattr(self, frame.function_name)
|
||||
result = fn(json.loads(frame.arguments))
|
||||
|
||||
if not this_step["run_async"]:
|
||||
if result:
|
||||
current_step += 1
|
||||
self._messages.append(
|
||||
{"role": "system", "content": steps[current_step]["prompt"]}
|
||||
)
|
||||
yield LLMMessagesQueueFrame(self._messages)
|
||||
async for frame in llm.process_frame(
|
||||
LLMMessagesQueueFrame(self._messages), tool_choice="none"
|
||||
):
|
||||
yield frame
|
||||
else:
|
||||
self._messages.append(
|
||||
{"role": "system", "content": this_step["failed"]}
|
||||
)
|
||||
yield LLMMessagesQueueFrame(self._messages)
|
||||
async for frame in llm.process_frame(
|
||||
LLMMessagesQueueFrame(self._messages), tool_choice="none"
|
||||
):
|
||||
yield frame
|
||||
print(f"<-- Verify result: {result}\n")
|
||||
|
||||
else:
|
||||
yield frame
|
||||
|
||||
|
||||
async def main(room_url: str, token):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
global transport
|
||||
global llm
|
||||
global tts
|
||||
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
token,
|
||||
"Story Cat",
|
||||
5,
|
||||
mic_enabled=True,
|
||||
mic_sample_rate=16000,
|
||||
camera_enabled=False,
|
||||
start_transcription=True,
|
||||
vad_enabled=True,
|
||||
)
|
||||
# TODO-CB: Go back to vad_enabled
|
||||
|
||||
messages = []
|
||||
tools = []
|
||||
|
||||
# llm = AzureLLMService(api_key=os.getenv("AZURE_CHATGPT_API_KEY"), endpoint=os.getenv(
|
||||
# "AZURE_CHATGPT_ENDPOINT"), model=os.getenv("AZURE_CHATGPT_MODEL"))
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_CHATGPT_API_KEY"),
|
||||
model="gpt-4-1106-preview",
|
||||
tools=tools,
|
||||
) # gpt-4-1106-preview
|
||||
# tts = AzureTTSService(api_key=os.getenv(
|
||||
# "AZURE_SPEECH_API_KEY"), region=os.getenv("AZURE_SPEECH_REGION"))
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id="XrExE9yKIg1WjnnlVkGX",
|
||||
) # matilda
|
||||
# tts = DeepgramTTSService(aiohttp_session=session, api_key=os.getenv(
|
||||
# "DEEPGRAM_API_KEY"), voice="aura-asteria-en")
|
||||
|
||||
# lca = LLMContextAggregator(
|
||||
# messages=messages, bot_participant_id=transport._my_participant_id)
|
||||
checklist = ChecklistProcessor(messages, llm, tools)
|
||||
fl = FrameLogger("FRAME LOGGER 1:")
|
||||
fl2 = FrameLogger("FRAME LOGGER 2:")
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
fl = FrameLogger("first other participant")
|
||||
# TODO-CB: Make sure this message gets into the context somehow
|
||||
await tts.run_to_queue(
|
||||
transport.send_queue,
|
||||
llm.run([LLMMessagesQueueFrame(messages)]),
|
||||
)
|
||||
|
||||
async def handle_intake():
|
||||
pipeline = Pipeline(processors=[fl, llm, fl2, checklist, tts])
|
||||
await transport.run_interruptible_pipeline(
|
||||
pipeline,
|
||||
post_processor=LLMResponseAggregator(messages),
|
||||
pre_processor=UserResponseAggregator(messages),
|
||||
)
|
||||
|
||||
transport.transcription_settings["extra"]["endpointing"] = True
|
||||
transport.transcription_settings["extra"]["punctuate"] = True
|
||||
try:
|
||||
await asyncio.gather(transport.run(), handle_intake())
|
||||
except (asyncio.CancelledError, KeyboardInterrupt):
|
||||
print("whoops")
|
||||
transport.stop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
(url, token) = configure()
|
||||
asyncio.run(main(url, token))
|
||||