Compare commits

...

24 Commits

Author SHA1 Message Date
Moishe Lettvin
b168c53e44 Adding some more doscstrings, cleanup 2024-03-08 09:39:51 -05:00
chadbailey59
3c5f4800d4 Chad's big patient intake PR (#40)
* at least it runs, kind of

* wip

* wip with user response aggregator

* frame and pipeline docstrings

* Getting started on docstrings

* finish docstrings for aggregators

* patient intake is working!

* cleanup

* cleanup

---------

Co-authored-by: Moishe Lettvin <moishel@gmail.com>
2024-03-07 17:41:32 -06:00
Moishe Lettvin
2bcb4966d3 Merge pull request #39 from daily-co/docstrings
Docstrings
2024-03-07 15:39:50 -05:00
Moishe Lettvin
b14f08a7d5 finish docstrings for aggregators 2024-03-07 15:16:23 -05:00
Moishe Lettvin
8fb92e3fd7 Getting started on docstrings 2024-03-07 12:51:19 -05:00
Moishe Lettvin
337ca7f581 frame and pipeline docstrings 2024-03-07 10:16:27 -05:00
Moishe Lettvin
eb430621f1 Merge pull request #37 from daily-co/fix-interruptible
Fix interruptible pipeline runner and aggregator.
2024-03-07 09:09:41 -05:00
Moishe Lettvin
d5683c4f24 Fix interruptible pipeline runner and aggregator. 2024-03-07 09:05:49 -05:00
chadbailey59
b4505b7eff added audio chunking for better interruption support (#35) 2024-03-06 18:20:04 -06:00
Moishe Lettvin
3e46d28aff Add start frame to interrupt loop 2024-03-06 15:58:19 -05:00
Moishe Lettvin
d3e76c4fd6 Merge pull request #34 from daily-co/rename-frames
Remove Queue in frame names
2024-03-06 14:10:56 -05:00
Moishe Lettvin
62fd371b97 Remove Queue in frame names 2024-03-06 14:09:06 -05:00
Moishe Lettvin
b9556716dd Merge pull request #33 from daily-co/pipeline-instead-of-nest
Pipeline instead of nest
2024-03-05 11:04:20 -05:00
Moishe Lettvin
2708dcf7b5 Remove conversation wrapper 2024-03-04 14:07:49 -05:00
Moishe Lettvin
d3f86dab2e starting on interruptions 2024-03-04 13:41:28 -05:00
Moishe Lettvin
18e7626b9f Getting started on interruptible transport pipeline runner 2024-03-04 07:51:22 -05:00
Moishe Lettvin
763a50f8ec First cut at sample 6 rewrite with pipelines 2024-03-04 07:28:10 -05:00
Moishe Lettvin
3b282cc921 some comments 2024-03-03 20:17:48 -05:00
Moishe Lettvin
434772dc23 Update sample 5! 2024-03-03 19:50:13 -05:00
Moishe Lettvin
15df4a9d58 cleanup, make sample 4 work with new stuff 2024-03-03 19:37:30 -05:00
Moishe Lettvin
643be238f9 getting started 2024-03-03 16:31:31 -05:00
chadbailey59
d90fdb1cae Isolated changes to add VAD (#32)
* added VAD

* added separate 'vad enabled' property
2024-02-28 15:16:44 -06:00
Moishe Lettvin
f710aeae95 Merge pull request #30 from daily-co/unsub-video
cleanup client properties and unsubscribe from camera
2024-02-27 13:16:20 -05:00
Moishe Lettvin
20091d91c9 cleanup client properties and unsubscribe from camera 2024-02-27 13:09:55 -05:00
39 changed files with 1687 additions and 565 deletions

View File

@@ -13,10 +13,14 @@ dependencies = [
"fal",
"faster_whisper",
"google-cloud-texttospeech",
"numpy",
"openai",
"Pillow",
"pyht",
"python-dotenv",
"torch",
"torchaudio",
"pyaudio",
"typing-extensions"
]

View File

@@ -1,77 +0,0 @@
import asyncio
import copy
import functools
from typing import AsyncGenerator, Awaitable, Callable
from dailyai.queue_aggregators import LLMAssistantContextAggregator, LLMContextAggregator, LLMUserContextAggregator
from dailyai.queue_frame import EndStreamQueueFrame, QueueFrame, TranscriptionQueueFrame
class InterruptibleConversationWrapper:
def __init__(
self,
frame_generator: Callable[[], AsyncGenerator[QueueFrame, None]],
runner: Callable[
[str, LLMContextAggregator, LLMContextAggregator], Awaitable[None]
],
interrupt: Callable[[], None],
my_participant_id: str | None,
llm_messages: list[dict[str, str]],
llm_context_aggregator_in=LLMUserContextAggregator,
llm_context_aggregator_out=LLMAssistantContextAggregator,
delay_before_speech_seconds: float = 1.0,
):
self._frame_generator: Callable[[], AsyncGenerator[QueueFrame, None]] = frame_generator
self._runner: Callable[
[str, LLMContextAggregator, LLMContextAggregator], Awaitable[None]
] = runner
self._interrupt: Callable[[], None] = interrupt
self._my_participant_id = my_participant_id
self._messages: list[dict[str, str]] = llm_messages
self._delay_before_speech_seconds = delay_before_speech_seconds
self._llm_context_aggregator_in = llm_context_aggregator_in
self._llm_context_aggregator_out = llm_context_aggregator_out
self._current_phrase = ""
def update_messages(self, new_messages: list[dict[str, str]], task: asyncio.Task | None):
if task:
if not task.cancelled():
self._current_phrase = ""
self._messages = new_messages
async def speak_after_delay(self, user_speech, messages):
await asyncio.sleep(self._delay_before_speech_seconds)
tma_in = self._llm_context_aggregator_in(
messages, self._my_participant_id, complete_sentences=False
)
tma_out = self._llm_context_aggregator_out(
messages, self._my_participant_id
)
await self._runner(user_speech, tma_in, tma_out)
async def run_conversation(self):
current_response_task = None
async for frame in self._frame_generator():
if isinstance(frame, EndStreamQueueFrame):
break
elif not isinstance(frame, TranscriptionQueueFrame):
continue
if frame.participantId == self._my_participant_id:
continue
if current_response_task:
current_response_task.cancel()
self._interrupt()
self._current_phrase += " " + frame.text
current_llm_messages = copy.deepcopy(self._messages)
current_response_task = asyncio.create_task(
self.speak_after_delay(self._current_phrase, current_llm_messages)
)
current_response_task.add_done_callback(
functools.partial(self.update_messages, current_llm_messages)
)

View File

@@ -0,0 +1,360 @@
import asyncio
import re
from dailyai.pipeline.frame_processor import FrameProcessor
from dailyai.pipeline.frames import (
EndFrame,
EndPipeFrame,
Frame,
ImageFrame,
LLMMessagesQueueFrame,
LLMResponseEndFrame,
LLMResponseStartFrame,
TextFrame,
TranscriptionQueueFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame
)
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.ai_services import AIService
from typing import AsyncGenerator, Coroutine, List
class ResponseAggregator(FrameProcessor):
def __init__(self, *, messages: list[dict], role: str, start_frame, end_frame, accumulator_frame, pass_through=True):
self.aggregation = ""
self.aggregating = False
self.messages = messages
self._role = role
self._start_frame = start_frame
self._end_frame = end_frame
self._accumulator_frame = accumulator_frame
self._pass_through = pass_through
async def process_frame(
self, frame: Frame
) -> AsyncGenerator[Frame, None]:
if isinstance(frame, self._start_frame):
self.aggregating = True
elif isinstance(frame, self._end_frame):
self.aggregating = False
self.messages.append({"role": self._role, "content": self.aggregation})
self.aggregation = ""
yield LLMMessagesQueueFrame(self.messages)
elif isinstance(frame, self._accumulator_frame) and self.aggregating:
self.aggregation += f" {frame.text}"
if self._pass_through:
yield frame
else:
yield frame
class LLMResponseAggregator(ResponseAggregator):
def __init__(self, messages: list[dict]):
super().__init__(
messages=messages,
role="assistant",
start_frame=LLMResponseStartFrame,
end_frame=LLMResponseEndFrame,
accumulator_frame=TextFrame
)
class UserResponseAggregator(ResponseAggregator):
def __init__(self, messages: list[dict]):
super().__init__(
messages=messages,
role="user",
start_frame=UserStartedSpeakingFrame,
end_frame=UserStoppedSpeakingFrame,
accumulator_frame=TranscriptionQueueFrame,
pass_through=False
)
class LLMContextAggregator(AIService):
def __init__(
self,
messages: list[dict],
role: str,
bot_participant_id=None,
complete_sentences=True,
pass_through=True,
):
super().__init__()
self.messages = messages
self.bot_participant_id = bot_participant_id
self.role = role
self.sentence = ""
self.complete_sentences = complete_sentences
self.pass_through = pass_through
async def process_frame(
self, frame: Frame
) -> AsyncGenerator[Frame, None]:
# We don't do anything with non-text frames, pass it along to next in the pipeline.
if not isinstance(frame, TextFrame):
yield frame
return
# Ignore transcription frames from the bot
if isinstance(frame, TranscriptionQueueFrame):
if frame.participantId == self.bot_participant_id:
return
# The common case for "pass through" is receiving frames from the LLM that we'll
# use to update the "assistant" LLM messages, but also passing the text frames
# along to a TTS service to be spoken to the user.
if self.pass_through:
yield frame
# TODO: split up transcription by participant
if self.complete_sentences:
# type: ignore -- the linter thinks this isn't a TextQueueFrame, even
# though we check it above
self.sentence += frame.text
if self.sentence.endswith((".", "?", "!")):
self.messages.append({"role": self.role, "content": self.sentence})
self.sentence = ""
yield LLMMessagesQueueFrame(self.messages)
else:
# type: ignore -- the linter thinks this isn't a TextQueueFrame, even
# though we check it above
self.messages.append({"role": self.role, "content": frame.text})
yield LLMMessagesQueueFrame(self.messages)
class LLMUserContextAggregator(LLMContextAggregator):
def __init__(
self, messages: list[dict], bot_participant_id=None, complete_sentences=True
):
super().__init__(
messages, "user", bot_participant_id, complete_sentences, pass_through=False
)
class LLMAssistantContextAggregator(LLMContextAggregator):
def __init__(
self, messages: list[dict], bot_participant_id=None, complete_sentences=True
):
super().__init__(
messages,
"assistant",
bot_participant_id,
complete_sentences,
pass_through=True,
)
class SentenceAggregator(FrameProcessor):
"""This frame processor aggregates text frames into complete sentences.
Frame input/output:
TextFrame("Hello,") -> None
TextFrame(" world.") -> TextFrame("Hello world.")
Doctest:
>>> async def print_frames(aggregator, frame):
... async for frame in aggregator.process_frame(frame):
... print(frame.text)
>>> aggregator = SentenceAggregator()
>>> asyncio.run(print_frames(aggregator, TextFrame("Hello,")))
>>> asyncio.run(print_frames(aggregator, TextFrame(" world.")))
Hello, world.
"""
def __init__(self):
self.aggregation = ""
async def process_frame(
self, frame: Frame
) -> AsyncGenerator[Frame, None]:
if isinstance(frame, TextFrame):
m = re.search("(.*[?.!])(.*)", frame.text)
if m:
yield TextFrame(self.aggregation + m.group(1))
self.aggregation = m.group(2)
else:
self.aggregation += frame.text
elif isinstance(frame, EndFrame):
if self.aggregation:
yield TextFrame(self.aggregation)
yield frame
else:
yield frame
class LLMFullResponseAggregator(FrameProcessor):
"""This class aggregates Text frames until it receives a
LLMResponseEndFrame, then emits the concatenated text as
a single text frame.
given the following frames:
TextFrame("Hello,")
TextFrame(" world.")
TextFrame(" I am")
TextFrame(" an LLM.")
LLMResponseEndFrame()]
this processor will yield nothing for the first 4 frames, then
TextFrame("Hello, world. I am an LLM.")
LLMResponseEndFrame()
when passed the last frame.
>>> async def print_frames(aggregator, frame):
... async for frame in aggregator.process_frame(frame):
... if isinstance(frame, TextFrame):
... print(frame.text)
... else:
... print(frame.__class__.__name__)
>>> aggregator = LLMFullResponseAggregator()
>>> asyncio.run(print_frames(aggregator, TextFrame("Hello,")))
>>> asyncio.run(print_frames(aggregator, TextFrame(" world.")))
>>> asyncio.run(print_frames(aggregator, TextFrame(" I am")))
>>> asyncio.run(print_frames(aggregator, TextFrame(" an LLM.")))
>>> asyncio.run(print_frames(aggregator, LLMResponseEndFrame()))
Hello, world. I am an LLM.
LLMResponseEndFrame
"""
def __init__(self):
self.aggregation = ""
async def process_frame(
self, frame: Frame
) -> AsyncGenerator[Frame, None]:
if isinstance(frame, TextFrame):
self.aggregation += frame.text
elif isinstance(frame, LLMResponseEndFrame):
yield TextFrame(self.aggregation)
yield frame
self.aggregation = ""
else:
yield frame
class StatelessTextTransformer(FrameProcessor):
"""This processor calls the given function on any text in a text frame.
>>> async def print_frames(aggregator, frame):
... async for frame in aggregator.process_frame(frame):
... print(frame.text)
>>> aggregator = StatelessTextTransformer(lambda x: x.upper())
>>> asyncio.run(print_frames(aggregator, TextFrame("Hello")))
HELLO
"""
def __init__(self, transform_fn):
self.transform_fn = transform_fn
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
if isinstance(frame, TextFrame):
result = self.transform_fn(frame.text)
if isinstance(result, Coroutine):
result = await result
yield TextFrame(result)
else:
yield frame
class ParallelPipeline(FrameProcessor):
""" Run multiple pipelines in parallel.
This class takes frames from its source queue and sends them to each
sub-pipeline. Each sub-pipeline emits its frames into this class's
sink queue. No guarantees are made about the ordering of frames in
the sink queue (that is, no sub-pipeline has higher priority than
any other, frames are put on the sink in the order they're emitted
by the sub-pipelines).
After each frame is taken from this class's source queue and placed
in each sub-pipeline's source queue, an EndPipeFrame is put on each
sub-pipeline's source queue. This indicates to the sub-pipe runner
that it should exit.
Since frame handlers pass through unhandled frames by convention, this
class de-dupes frames in its sink before yielding them.
"""
def __init__(self, pipeline_definitions: List[List[FrameProcessor]]):
self.sources = [asyncio.Queue() for _ in pipeline_definitions]
self.sink: asyncio.Queue[Frame] = asyncio.Queue()
self.pipelines: list[Pipeline] = [
Pipeline(
pipeline_definition,
source,
self.sink,
)
for source, pipeline_definition in zip(self.sources, pipeline_definitions)
]
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
for source in self.sources:
await source.put(frame)
await source.put(EndPipeFrame())
await asyncio.gather(*[pipeline.run_pipeline() for pipeline in self.pipelines])
seen_ids = set()
while not self.sink.empty():
frame = await self.sink.get()
# de-dup frames. Because the convention is to yield a frame that isn't processed,
# each pipeline will likely yield the same frame, so we will end up with _n_ copies
# of unprocessed frames where _n_ is the number of parallel pipes that don't
# process that frame.
if id(frame) in seen_ids:
continue
seen_ids.add(id(frame))
# Skip passing along EndParallelPipeQueueFrame, because we use them for our own flow control.
if not isinstance(frame, EndPipeFrame):
yield frame
class GatedAggregator(FrameProcessor):
"""Accumulate frames, with custom functions to start and stop accumulation.
Yields gate-opening frame before any accumulated frames, then ensuing frames
until and not including the gate-closed frame.
>>> async def print_frames(aggregator, frame):
... async for frame in aggregator.process_frame(frame):
... if isinstance(frame, TextFrame):
... print(frame.text)
... else:
... print(frame.__class__.__name__)
>>> aggregator = GatedAggregator(
... gate_close_fn=lambda x: isinstance(x, LLMResponseStartFrame),
... gate_open_fn=lambda x: isinstance(x, ImageFrame),
... start_open=False)
>>> asyncio.run(print_frames(aggregator, TextFrame("Hello")))
>>> asyncio.run(print_frames(aggregator, TextFrame("Hello again.")))
>>> asyncio.run(print_frames(aggregator, ImageFrame(url='', image=bytes([]))))
ImageFrame
Hello
Hello again.
>>> asyncio.run(print_frames(aggregator, TextFrame("Goodbye.")))
Goodbye.
"""
def __init__(self, gate_open_fn, gate_close_fn, start_open):
self.gate_open_fn = gate_open_fn
self.gate_close_fn = gate_close_fn
self.gate_open = start_open
self.accumulator: List[Frame] = []
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
if self.gate_open:
if self.gate_close_fn(frame):
self.gate_open = False
else:
if self.gate_open_fn(frame):
self.gate_open = True
if self.gate_open:
yield frame
if self.accumulator:
for frame in self.accumulator:
yield frame
self.accumulator = []
else:
self.accumulator.append(frame)

View File

@@ -0,0 +1,33 @@
from abc import abstractmethod
from typing import AsyncGenerator
from dailyai.pipeline.frames import ControlFrame, Frame
class FrameProcessor:
"""This is the base class for all frame processors. Frame processors consume a frame
and yield 0 or more frames. Generally frame processors are used as part of a pipeline
where frames come from a source queue, are processed by a series of frame processors,
then placed on a sink queue.
By convention, FrameProcessors should immediately yield any frames they don't process.
Stateful FrameProcessors should watch for the EndStreamQueueFrame and finalize their
output, eg. yielding an unfinished sentence if they're aggregating LLM output to full
sentences. EndStreamQueueFrame is also a chance to clean up any services that need to
be closed, del'd, etc.
"""
@abstractmethod
async def process_frame(
self, frame: Frame
) -> AsyncGenerator[Frame, None]:
"""Process a single frame and yield 0 or more frames."""
if isinstance(frame, ControlFrame):
yield frame
yield frame
@abstractmethod
async def interrupted(self) -> None:
"""Handle any cleanup if the pipeline was interrupted."""
pass

View File

@@ -0,0 +1,79 @@
from dataclasses import dataclass
from typing import Any
class Frame:
pass
class ControlFrame(Frame):
# Control frames should contain no instance data, so
# equality is based solely on the class.
def __eq__(self, other):
return type(other) == self.__class__
class StartFrame(ControlFrame):
pass
class EndFrame(ControlFrame):
pass
class EndPipeFrame(ControlFrame):
pass
class LLMResponseStartFrame(ControlFrame):
pass
class LLMResponseEndFrame(ControlFrame):
pass
@dataclass()
class AudioFrame(Frame):
data: bytes
@dataclass()
class ImageFrame(Frame):
url: str | None
image: bytes
@dataclass()
class SpriteFrame(Frame):
images: list[bytes]
@dataclass()
class TextFrame(Frame):
text: str
@dataclass()
class TranscriptionQueueFrame(TextFrame):
participantId: str
timestamp: str
@dataclass()
class LLMMessagesQueueFrame(Frame):
messages: list[dict[str, str]] # TODO: define this more concretely!
class AppMessageQueueFrame(Frame):
message: Any
participantId: str
class UserStartedSpeakingFrame(Frame):
pass
class UserStoppedSpeakingFrame(Frame):
pass
@dataclass()
class LLMFunctionCallFrame(Frame):
function_name: str
arguments: str

View File

@@ -0,0 +1,89 @@
import asyncio
from typing import AsyncGenerator, List
from dailyai.pipeline.frame_processor import FrameProcessor
from dailyai.pipeline.frames import EndPipeFrame, EndFrame, Frame
class Pipeline:
"""
This class manages a pipe of FrameProcessors, and runs them in sequence. The "source"
and "sink" queues are managed by the caller. You can use this class stand-alone to
perform specialized processing, or you can use the Transport's run_pipeline method to
instantiate and run a pipeline with the Transport's sink and source queues.
"""
def __init__(
self,
processors: List[FrameProcessor],
source: asyncio.Queue | None = None,
sink: asyncio.Queue[Frame] | None = None,
):
""" Create a new pipeline. By default neither the source nor sink
queues are set, so you'll need to pass them to this constructor or
call set_source and set_sink before using the pipeline. Note that
the transport's run_*_pipeline methods will set the source and sink
queues on the pipeline for you.
"""
self.processors = processors
self.source: asyncio.Queue[Frame] | None = source
self.sink: asyncio.Queue[Frame] | None = sink
def set_source(self, source: asyncio.Queue[Frame]):
""" Set the source queue for this pipeline. Frames from this queue
will be processed by each frame_processor in the pipeline, or order
from first to last. """
self.source = source
def set_sink(self, sink: asyncio.Queue[Frame]):
""" Set the sink queue for this pipeline. After the last frame_processor
has processed a frame, its output will be placed on this queue."""
self.sink = sink
async def get_next_source_frame(self) -> AsyncGenerator[Frame, None]:
""" Convenience function to get the next frame from the source queue. This
lets us consistently have an AsyncGenerator yield frames, from either the
source queue or a frame_processor."""
if self.source is None:
raise ValueError("Source queue not set")
yield await self.source.get()
async def run_pipeline(self):
""" Run the pipeline. Take each frame from the source queue, pass it to
the first frame_processor, pass the output of that frame_processor to the
next in the list, etc. until the last frame_processor has processed the
resulting frames, then place those frames in the sink queue.
The source and sink queues must be set before calling this method.
This method will exit when an EndStreamQueueFrame is placed on the sink queue.
No more frames will be placed on the sink queue after an EndStreamQueueFrame, even
if it's not the last frame yielded by the last frame_processor in the pipeline.."""
if self.source is None or self.sink is None:
raise ValueError("Source or sink queue not set")
try:
while True:
frame_generators = [self.get_next_source_frame()]
for processor in self.processors:
next_frame_generators = []
for frame_generator in frame_generators:
async for frame in frame_generator:
next_frame_generators.append(processor.process_frame(frame))
frame_generators = next_frame_generators
for frame_generator in frame_generators:
async for frame in frame_generator:
await self.sink.put(frame)
if isinstance(
frame, EndFrame
) or isinstance(
frame, EndPipeFrame
):
return
except asyncio.CancelledError:
# this means there's been an interruption, do any cleanup necessary here.
for processor in self.processors:
await processor.interrupted()
pass

View File

@@ -1,98 +0,0 @@
import asyncio
from dailyai.queue_frame import LLMMessagesQueueFrame, QueueFrame, TextQueueFrame, TranscriptionQueueFrame
from dailyai.services.ai_services import AIService
from typing import AsyncGenerator, List
class QueueTee:
async def run_to_queue_and_generate(
self,
output_queue: asyncio.Queue,
generator: AsyncGenerator[QueueFrame, None]
) -> AsyncGenerator[QueueFrame, None]:
async for frame in generator:
await output_queue.put(frame)
yield frame
async def run_to_queues(
self,
output_queues: List[asyncio.Queue],
generator: AsyncGenerator[QueueFrame, None]
):
async for frame in generator:
for queue in output_queues:
await queue.put(frame)
class LLMContextAggregator(AIService):
def __init__(
self,
messages: list[dict],
role: str,
bot_participant_id=None,
complete_sentences=True,
pass_through=True):
super().__init__()
self.messages = messages
self.bot_participant_id = bot_participant_id
self.role = role
self.sentence = ""
self.complete_sentences = complete_sentences
self.pass_through = pass_through
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
# We don't do anything with non-text frames, pass it along to next in the pipeline.
if not isinstance(frame, TextQueueFrame):
yield frame
return
# Ignore transcription frames from the bot
if isinstance(frame, TranscriptionQueueFrame):
if frame.participantId == self.bot_participant_id:
return
# The common case for "pass through" is receiving frames from the LLM that we'll
# use to update the "assistant" LLM messages, but also passing the text frames
# along to a TTS service to be spoken to the user.
if self.pass_through:
yield frame
# TODO: split up transcription by participant
if self.complete_sentences:
# type: ignore -- the linter thinks this isn't a TextQueueFrame, even
# though we check it above
self.sentence += frame.text
if self.sentence.endswith((".", "?", "!")):
self.messages.append({"role": self.role, "content": self.sentence})
self.sentence = ""
yield LLMMessagesQueueFrame(self.messages)
else:
# type: ignore -- the linter thinks this isn't a TextQueueFrame, even
# though we check it above
self.messages.append({"role": self.role, "content": frame.text})
yield LLMMessagesQueueFrame(self.messages)
async def finalize(self) -> AsyncGenerator[QueueFrame, None]:
# Send any dangling words that weren't finished with punctuation.
if self.complete_sentences and self.sentence:
self.messages.append({"role": self.role, "content": self.sentence})
yield LLMMessagesQueueFrame(self.messages)
class LLMUserContextAggregator(LLMContextAggregator):
def __init__(self,
messages: list[dict],
bot_participant_id=None,
complete_sentences=True):
super().__init__(messages, "user", bot_participant_id, complete_sentences, pass_through=False)
class LLMAssistantContextAggregator(LLMContextAggregator):
def __init__(
self, messages: list[dict], bot_participant_id=None, complete_sentences=True
):
super().__init__(
messages, "assistant", bot_participant_id, complete_sentences, pass_through=True
)

View File

@@ -1,60 +0,0 @@
from enum import Enum
from dataclasses import dataclass
from typing import Any
class QueueFrame:
pass
class ControlQueueFrame(QueueFrame):
pass
class StartStreamQueueFrame(ControlQueueFrame):
pass
class EndStreamQueueFrame(ControlQueueFrame):
pass
class LLMResponseEndQueueFrame(QueueFrame):
pass
@dataclass()
class AudioQueueFrame(QueueFrame):
data: bytes
@dataclass()
class ImageQueueFrame(QueueFrame):
url: str | None
image: bytes
@dataclass()
class SpriteQueueFrame(QueueFrame):
images: list[bytes]
@dataclass()
class TextQueueFrame(QueueFrame):
text: str
@dataclass()
class TranscriptionQueueFrame(TextQueueFrame):
participantId: str
timestamp: str
@dataclass()
class LLMMessagesQueueFrame(QueueFrame):
messages: list[dict[str, str]] # TODO: define this more concretely!
class AppMessageQueueFrame(QueueFrame):
message: Any
participantId: str

View File

@@ -3,25 +3,33 @@ import io
import logging
import time
import wave
from dailyai.pipeline.frame_processor import FrameProcessor
from dailyai.queue_frame import (
AudioQueueFrame,
ControlQueueFrame,
EndStreamQueueFrame,
ImageQueueFrame,
from dailyai.pipeline.frames import (
AudioFrame,
EndFrame,
ImageFrame,
LLMMessagesQueueFrame,
LLMResponseEndQueueFrame,
QueueFrame,
TextQueueFrame,
LLMResponseEndFrame,
LLMResponseStartFrame,
LLMFunctionCallFrame,
Frame,
TextFrame,
TranscriptionQueueFrame,
)
from abc import abstractmethod
from typing import AsyncGenerator, AsyncIterable, BinaryIO, Iterable
from dataclasses import dataclass
from typing import AsyncGenerator, AsyncIterable, BinaryIO, Iterable, List
class AIService(FrameProcessor):
""" This is the base class for various AI services (LLM, TTS and Image)
class AIService:
This class adds some convenienence functions to run, effectively, a one-stage
pipeline where the incoming frames can come from an iterable or queue
and the processed frames go to a queue. Child classes extend those convenience
functions, eg. TTS's `say` method runs the TTS and emits the AudioFrames to a
queue.
"""
def __init__(self):
self.logger = logging.getLogger("dailyai")
@@ -29,19 +37,34 @@ class AIService:
def stop(self):
pass
async def run_to_queue(self, queue: asyncio.Queue, frames, add_end_of_stream=False) -> None:
async def run_to_queue(
self,
queue: asyncio.Queue,
frames: Iterable[Frame] | AsyncIterable[Frame] | asyncio.Queue[Frame]
) -> None:
""" Process the given frames (from an iterable or queue) and send them to
the given queue.
"""
async for frame in self.run(frames):
await queue.put(frame)
if add_end_of_stream:
await queue.put(EndStreamQueueFrame())
async def run(
self,
frames: Iterable[QueueFrame]
| AsyncIterable[QueueFrame]
| asyncio.Queue[QueueFrame],
) -> AsyncGenerator[QueueFrame, None]:
frames: Iterable[Frame]
| AsyncIterable[Frame]
| asyncio.Queue[Frame],
) -> AsyncGenerator[Frame, None]:
""" Generates 0 or more frames from the given iterable or queue.
This is a convenience function to take a collection of frames, process
them, and yield processed frames.
The preferred way to use FrameProcessors is with a pipeline, but if you
have a very simple case (eg. a list of static text blocks you want to speak,
or a list of static image description you want to render) this function
will be helpful.
"""
try:
if isinstance(frames, AsyncIterable):
async for frame in frames:
@@ -56,43 +79,50 @@ class AIService:
frame = await frames.get()
async for output_frame in self.process_frame(frame):
yield output_frame
if isinstance(frame, EndStreamQueueFrame):
if isinstance(frame, EndFrame):
break
else:
raise Exception("Frames must be an iterable or async iterable")
async for output_frame in self.finalize():
yield output_frame
except Exception as e:
self.logger.error("Exception occurred while running AI service", e)
raise e
@abstractmethod
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if isinstance(frame, ControlQueueFrame):
yield frame
@abstractmethod
async def finalize(self) -> AsyncGenerator[QueueFrame, None]:
# This is a trick for the interpreter (and linter) to know that this is a generator.
if False:
yield QueueFrame()
class LLMService(AIService):
def __init__(self, messages=None, tools=None):
super().__init__()
self._tools = tools
self._messages = messages
@abstractmethod
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
async def run_llm_async(self, messages, tool_choice=None) -> AsyncGenerator[str, None]:
yield ""
@abstractmethod
async def run_llm(self, messages) -> str:
pass
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
async def process_frame(self, frame: Frame, tool_choice: str | None = None) -> AsyncGenerator[Frame, None]:
if isinstance(frame, LLMMessagesQueueFrame):
async for text_chunk in self.run_llm_async(frame.messages):
yield TextQueueFrame(text_chunk)
yield LLMResponseEndQueueFrame()
function_name = ""
arguments = ""
if isinstance(frame, LLMMessagesQueueFrame):
yield LLMResponseStartFrame()
async for text_chunk in self.run_llm_async(frame.messages, tool_choice):
if isinstance(text_chunk, str):
yield TextFrame(text_chunk)
elif text_chunk.function:
if text_chunk.function.name:
# function_name += text_chunk.function.name
yield LLMFunctionCallFrame(function_name=text_chunk.function.name, arguments=None)
if text_chunk.function.arguments:
# arguments += text_chunk.function.arguments
yield LLMFunctionCallFrame(function_name=None, arguments=text_chunk.function.arguments)
if (function_name and arguments):
function_name = ""
arguments = ""
yield LLMResponseEndFrame()
else:
yield frame
@@ -114,8 +144,14 @@ class TTSService(AIService):
# yield empty bytes here, so linting can infer what this method does
yield bytes()
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if not isinstance(frame, TextQueueFrame):
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
if isinstance(frame, EndFrame):
if self.current_sentence:
async for audio_chunk in self.run_tts(self.current_sentence):
yield AudioFrame(audio_chunk)
yield TextFrame(self.current_sentence)
if not isinstance(frame, TextFrame):
yield frame
return
@@ -130,16 +166,14 @@ class TTSService(AIService):
if text:
async for audio_chunk in self.run_tts(text):
yield AudioQueueFrame(audio_chunk)
yield AudioFrame(audio_chunk)
async def finalize(self):
if self.current_sentence:
async for audio_chunk in self.run_tts(self.current_sentence):
yield AudioQueueFrame(audio_chunk)
# note we pass along the text frame *after* the audio, so the text frame is completed after the audio is processed.
yield TextFrame(text)
# Convenience function to send the audio for a sentence to the given queue
async def say(self, sentence, queue: asyncio.Queue):
await self.run_to_queue(queue, [TextQueueFrame(sentence)])
await self.run_to_queue(queue, [LLMResponseStartFrame(), TextFrame(sentence), LLMResponseEndFrame()])
class ImageGenService(AIService):
@@ -152,13 +186,13 @@ class ImageGenService(AIService):
async def run_image_gen(self, sentence: str) -> tuple[str, bytes]:
pass
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if not isinstance(frame, TextQueueFrame):
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
if not isinstance(frame, TextFrame):
yield frame
return
(url, image_data) = await self.run_image_gen(frame.text)
yield ImageQueueFrame(url, image_data)
yield ImageFrame(url, image_data)
class STTService(AIService):
@@ -175,9 +209,9 @@ class STTService(AIService):
"""Returns transcript as a string"""
pass
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
"""Processes a frame of audio data, either buffering or transcribing it."""
if not isinstance(frame, AudioQueueFrame):
if not isinstance(frame, AudioFrame):
return
data = frame.data
@@ -198,8 +232,8 @@ class FrameLogger(AIService):
super().__init__(**kwargs)
self.prefix = prefix
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if isinstance(frame, (AudioQueueFrame, ImageQueueFrame)):
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
if isinstance(frame, (AudioFrame, ImageFrame)):
self.logger.info(f"{self.prefix}: {type(frame)}")
else:
print(f"{self.prefix}: {frame}")

View File

@@ -2,6 +2,7 @@ import aiohttp
import asyncio
import io
import json
import time
from openai import AsyncAzureOpenAI
import os
@@ -48,8 +49,8 @@ class AzureTTSService(TTSService):
class AzureLLMService(LLMService):
def __init__(self, *, api_key, endpoint, api_version="2023-12-01-preview", model):
super().__init__()
def __init__(self, *, api_key, endpoint, api_version="2023-12-01-preview", model, tools=None, messages=None):
super().__init__(tools=tools, messages=messages)
self._model: str = model
self._client = AsyncAzureOpenAI(
@@ -58,16 +59,22 @@ class AzureLLMService(LLMService):
api_version=api_version,
)
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
async def run_llm_async(self, messages, tool_choice=None) -> AsyncGenerator[str, None]:
messages_for_log = json.dumps(messages)
self.logger.debug(f"Generating chat via azure: {messages_for_log}")
chunks = await self._client.chat.completions.create(model=self._model, stream=True, messages=messages)
if self._tools:
tools = self._tools
else:
tools = None
start_time = time.time()
chunks = await self._client.chat.completions.create(model=self._model, stream=True, messages=messages, tools=tools, tool_choice=tool_choice)
self.logger.info(f"=== Azure OpenAI LLM TTFB: {time.time() - start_time}")
async for chunk in chunks:
if len(chunk.choices) == 0:
continue
if chunk.choices[0].delta.content:
if chunk.choices[0].delta.tool_calls:
yield chunk.choices[0].delta.tool_calls[0]
elif chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
async def run_llm(self, messages) -> str | None:
@@ -112,11 +119,9 @@ class AzureImageGenServiceREST(ImageGenService):
async with self._aiohttp_session.post(
url, headers=headers, json=body
) as submission:
print(f"submission: {submission}")
# We never get past this line, because this header isn't
# defined on a 429 response, but something is eating our exceptions!
operation_location = submission.headers['operation-location']
print(f"submission status: {submission.status}")
status = ""
attempts_left = 120
json_response = None
@@ -139,5 +144,4 @@ class AzureImageGenServiceREST(ImageGenService):
async with self._aiohttp_session.get(image_url) as response:
image_stream = io.BytesIO(await response.content.read())
image = Image.open(image_stream)
print("i got an image file!")
return (image_url, image.tobytes())

View File

@@ -2,19 +2,75 @@ from abc import abstractmethod
import asyncio
import itertools
import logging
import numpy as np
import pyaudio
import torch
import queue
import threading
import time
from typing import AsyncGenerator
from enum import Enum
from dailyai.pipeline.frame_processor import FrameProcessor
from dailyai.queue_frame import (
AudioQueueFrame,
EndStreamQueueFrame,
ImageQueueFrame,
QueueFrame,
SpriteQueueFrame,
StartStreamQueueFrame,
from dailyai.pipeline.frames import (
AudioFrame,
EndFrame,
ImageFrame,
Frame,
SpriteFrame,
StartFrame,
TranscriptionQueueFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame
)
from dailyai.pipeline.pipeline import Pipeline
torch.set_num_threads(1)
model, utils = torch.hub.load(repo_or_dir='snakers4/silero-vad',
model='silero_vad',
force_reload=False)
(get_speech_timestamps,
save_audio,
read_audio,
VADIterator,
collect_chunks) = utils
# Taken from utils_vad.py
def validate(model,
inputs: torch.Tensor):
with torch.no_grad():
outs = model(inputs)
return outs
# Provided by Alexander Veysov
def int2float(sound):
abs_max = np.abs(sound).max()
sound = sound.astype('float32')
if abs_max > 0:
sound *= 1/32768
sound = sound.squeeze() # depends on the use case
return sound
FORMAT = pyaudio.paInt16
CHANNELS = 1
SAMPLE_RATE = 16000
CHUNK = int(SAMPLE_RATE / 10)
audio = pyaudio.PyAudio()
class VADState(Enum):
QUIET = 1
STARTING = 2
SPEAKING = 3
STOPPING = 4
class BaseTransportService():
@@ -31,6 +87,23 @@ class BaseTransportService():
self._speaker_enabled = kwargs.get("speaker_enabled") or False
self._speaker_sample_rate = kwargs.get("speaker_sample_rate") or 16000
self._fps = kwargs.get("fps") or 8
self._vad_start_s = kwargs.get("vad_start_s") or 0.2
self._vad_stop_s = kwargs.get("vad_stop_s") or 0.8
self._context = kwargs.get("context") or []
self._vad_enabled = kwargs.get("vad_enabled") or False
if self._vad_enabled and self._speaker_enabled:
raise Exception(
"Sorry, you can't use speaker_enabled and vad_enabled at the same time. Please set one to False.")
self._vad_samples = 1536
vad_frame_s = self._vad_samples / SAMPLE_RATE
self._vad_start_frames = round(self._vad_start_s / vad_frame_s)
self._vad_stop_frames = round(self._vad_stop_s / vad_frame_s)
self._vad_starting_count = 0
self._vad_stopping_count = 0
self._vad_state = VADState.QUIET
self._user_is_speaking = False
duration_minutes = kwargs.get("duration_minutes") or 10
self._expiration = time.time() + duration_minutes * 60
@@ -38,6 +111,8 @@ class BaseTransportService():
self.send_queue = asyncio.Queue()
self.receive_queue = asyncio.Queue()
self.completed_queue = asyncio.Queue()
self._threadsafe_send_queue = queue.Queue()
self._images = None
@@ -55,18 +130,26 @@ class BaseTransportService():
async def run(self):
self._prerun()
async_output_queue_marshal_task = asyncio.create_task(self._marshal_frames())
async_output_queue_marshal_task = asyncio.create_task(
self._marshal_frames())
self._camera_thread = threading.Thread(target=self._run_camera, daemon=True)
self._camera_thread = threading.Thread(
target=self._run_camera, daemon=True)
self._camera_thread.start()
self._frame_consumer_thread = threading.Thread(target=self._frame_consumer, daemon=True)
self._frame_consumer_thread = threading.Thread(
target=self._frame_consumer, daemon=True)
self._frame_consumer_thread.start()
if self._speaker_enabled:
self._receive_audio_thread = threading.Thread(target=self._receive_audio, daemon=True)
self._receive_audio_thread = threading.Thread(
target=self._receive_audio, daemon=True)
self._receive_audio_thread.start()
if self._vad_enabled:
self._vad_thread = threading.Thread(target=self._vad, daemon=True)
self._vad_thread.start()
try:
while (
time.time() < self._expiration
@@ -82,7 +165,7 @@ class BaseTransportService():
self._stop_threads.set()
await self.send_queue.put(EndStreamQueueFrame())
await self.send_queue.put(EndFrame())
await async_output_queue_marshal_task
await self.send_queue.join()
self._frame_consumer_thread.join()
@@ -90,6 +173,70 @@ class BaseTransportService():
if self._speaker_enabled:
self._receive_audio_thread.join()
if self._vad_enabled:
self._vad_thread.join()
async def run_uninterruptible_pipeline(self, pipeline: Pipeline):
pipeline.set_sink(self.send_queue)
pipeline.set_source(self.receive_queue)
await pipeline.run_pipeline()
async def run_interruptible_pipeline(
self,
pipeline: Pipeline,
allow_interruptions=True,
pre_processor=None,
post_processor: FrameProcessor | None = None,
):
pipeline.set_sink(self.send_queue)
source_queue = asyncio.Queue()
pipeline.set_source(source_queue)
pipeline.set_sink(self.send_queue)
pipeline_task = asyncio.create_task(pipeline.run_pipeline())
async def yield_frame(frame: Frame) -> AsyncGenerator[Frame, None]:
yield frame
async def post_process(post_processor: FrameProcessor):
while True:
frame = await self.completed_queue.get()
# We ignore the output of the post_processor's process frame;
# this is called to update the post-processor's state.
async for frame in post_processor.process_frame(frame):
pass
if isinstance(frame, EndFrame):
break
if post_processor:
post_process_task = asyncio.create_task(post_process(post_processor))
started = False
async for frame in self.get_receive_frames():
if isinstance(frame, UserStartedSpeakingFrame):
pipeline_task.cancel()
self.interrupt()
pipeline_task = asyncio.create_task(pipeline.run_pipeline())
started = False
if not started:
await self.send_queue.put(StartFrame())
if pre_processor:
frame_generator = pre_processor.process_frame(frame)
else:
frame_generator = yield_frame(frame)
async for frame in frame_generator:
await source_queue.put(frame)
if isinstance(frame, EndFrame):
break
await asyncio.gather(pipeline_task, post_process_task)
def _post_run(self):
# Note that this function must be idempotent! It can be called multiple times
# if, for example, a keyboard interrupt occurs.
@@ -122,22 +269,73 @@ class BaseTransportService():
def _prerun(self):
pass
def _vad(self):
# CB: Starting silero VAD stuff
# TODO-CB: Probably need to force virtual speaker creation if we're
# going to build this in?
# TODO-CB: pyaudio installation
while not self._stop_threads.is_set():
audio_chunk = self.read_audio_frames(self._vad_samples)
audio_int16 = np.frombuffer(audio_chunk, np.int16)
audio_float32 = int2float(audio_int16)
new_confidence = model(
torch.from_numpy(audio_float32), 16000).item()
speaking = new_confidence > 0.5
if speaking:
match self._vad_state:
case VADState.QUIET:
self._vad_state = VADState.STARTING
self._vad_starting_count = 1
case VADState.STARTING:
self._vad_starting_count += 1
case VADState.STOPPING:
self._vad_state = VADState.SPEAKING
self._vad_stopping_count = 0
else:
match self._vad_state:
case VADState.STARTING:
self._vad_state = VADState.QUIET
self._vad_starting_count = 0
case VADState.SPEAKING:
self._vad_state = VADState.STOPPING
self._vad_stopping_count = 1
case VADState.STOPPING:
self._vad_stopping_count += 1
if self._vad_state == VADState.STARTING and self._vad_starting_count >= self._vad_start_frames:
asyncio.run_coroutine_threadsafe(
self.receive_queue.put(
UserStartedSpeakingFrame()), self._loop
)
# self.interrupt()
self._vad_state = VADState.SPEAKING
self._vad_starting_count = 0
if self._vad_state == VADState.STOPPING and self._vad_stopping_count >= self._vad_stop_frames:
asyncio.run_coroutine_threadsafe(
self.receive_queue.put(
UserStoppedSpeakingFrame()), self._loop
)
self._vad_state = VADState.QUIET
self._vad_stopping_count = 0
async def _marshal_frames(self):
while True:
frame: QueueFrame | list = await self.send_queue.get()
frame: Frame | list = await self.send_queue.get()
self._threadsafe_send_queue.put(frame)
self.send_queue.task_done()
if isinstance(frame, EndStreamQueueFrame):
if isinstance(frame, EndFrame):
break
def interrupt(self):
self._logger.debug("!!! Interrupting")
self._is_interrupted.set()
async def get_receive_frames(self) -> AsyncGenerator[QueueFrame, None]:
async def get_receive_frames(self) -> AsyncGenerator[Frame, None]:
while True:
frame = await self.receive_queue.get()
yield frame
if isinstance(frame, EndStreamQueueFrame):
if isinstance(frame, EndFrame):
break
def _receive_audio(self):
@@ -150,13 +348,13 @@ class BaseTransportService():
while not self._stop_threads.is_set():
buffer = self.read_audio_frames(desired_frame_count)
if len(buffer) > 0:
frame = AudioQueueFrame(buffer)
frame = AudioFrame(buffer)
asyncio.run_coroutine_threadsafe(
self.receive_queue.put(frame), self._loop
)
asyncio.run_coroutine_threadsafe(
self.receive_queue.put(EndStreamQueueFrame()), self._loop
self.receive_queue.put(EndFrame()), self._loop
)
def _set_image(self, image: bytes):
@@ -181,31 +379,41 @@ class BaseTransportService():
self._logger.info("🎬 Starting frame consumer thread")
b = bytearray()
smallest_write_size = 3200
largest_write_size = 8000
all_audio_frames = bytearray()
while True:
try:
frames_or_frame: QueueFrame | list[QueueFrame] = (
frames_or_frame: Frame | list[Frame] = (
self._threadsafe_send_queue.get()
)
if isinstance(frames_or_frame, QueueFrame):
frames: list[QueueFrame] = [frames_or_frame]
if isinstance(frames_or_frame, AudioFrame) and len(frames_or_frame.data) > largest_write_size:
# subdivide large audio frames to enable interruption
frames = []
for i in range(0, len(frames_or_frame.data), largest_write_size):
frames.append(AudioFrame(
frames_or_frame.data[i: i+largest_write_size]))
elif isinstance(frames_or_frame, Frame):
frames: list[Frame] = [frames_or_frame]
elif isinstance(frames_or_frame, list):
frames: list[QueueFrame] = frames_or_frame
frames: list[Frame] = frames_or_frame
else:
raise Exception("Unknown type in output queue")
for frame in frames:
if isinstance(frame, EndStreamQueueFrame):
if isinstance(frame, EndFrame):
self._logger.info("Stopping frame consumer thread")
self._threadsafe_send_queue.task_done()
if self._loop:
asyncio.run_coroutine_threadsafe(
self.completed_queue.put(frame), self._loop
)
return
# if interrupted, we just pull frames off the queue and discard them
if not self._is_interrupted.is_set():
if frame:
if isinstance(frame, AudioQueueFrame):
if isinstance(frame, AudioFrame):
chunk = frame.data
all_audio_frames.extend(chunk)
b.extend(chunk)
@@ -213,11 +421,12 @@ class BaseTransportService():
len(b) % smallest_write_size
)
if truncated_length:
self.write_frame_to_mic(bytes(b[:truncated_length]))
self.write_frame_to_mic(
bytes(b[:truncated_length]))
b = b[truncated_length:]
elif isinstance(frame, ImageQueueFrame):
elif isinstance(frame, ImageFrame):
self._set_image(frame.image)
elif isinstance(frame, SpriteQueueFrame):
elif isinstance(frame, SpriteFrame):
self._set_images(frame.images)
elif len(b):
self.write_frame_to_mic(bytes(b))
@@ -227,12 +436,18 @@ class BaseTransportService():
# can cause static in the audio stream.
if len(b):
truncated_length = len(b) - (len(b) % 160)
self.write_frame_to_mic(bytes(b[:truncated_length]))
self.write_frame_to_mic(
bytes(b[:truncated_length]))
b = bytearray()
if isinstance(frame, StartStreamQueueFrame):
if isinstance(frame, StartFrame):
self._is_interrupted.clear()
if self._loop:
asyncio.run_coroutine_threadsafe(
self.completed_queue.put(frame), self._loop
)
self._threadsafe_send_queue.task_done()
except queue.Empty:
if len(b):
@@ -240,5 +455,6 @@ class BaseTransportService():
b = bytearray()
except Exception as e:
self._logger.error(f"Exception in frame_consumer: {e}, {len(b)}")
self._logger.error(
f"Exception in frame_consumer: {e}, {len(b)}")
raise e

View File

@@ -7,7 +7,7 @@ import types
from functools import partial
from dailyai.queue_frame import (
from dailyai.pipeline.frames import (
TranscriptionQueueFrame,
)
@@ -31,6 +31,7 @@ class DailyTransportService(BaseTransportService, EventHandler):
_speaker_enabled: bool
_speaker_sample_rate: int
_vad_enabled: bool
# This is necessary to override EventHandler's __new__ method.
def __new__(cls, *args, **kwargs):
@@ -142,54 +143,60 @@ class DailyTransportService(BaseTransportService, EventHandler):
"camera", width=self._camera_width, height=self._camera_height, color_format="RGB"
)
if self._speaker_enabled:
if self._speaker_enabled or self._vad_enabled:
self._speaker: VirtualSpeakerDevice = Daily.create_speaker_device(
"speaker", sample_rate=self._speaker_sample_rate, channels=1
)
Daily.select_speaker_device("speaker")
self.client.set_user_name(self._bot_name)
self.client.join(self._room_url, self._token, completion=self.call_joined)
self.client.join(
self._room_url,
self._token,
completion=self.call_joined,
client_settings={
"inputs": {
"camera": {
"isEnabled": True,
"settings": {
"deviceId": "camera",
},
},
"microphone": {
"isEnabled": True,
"settings": {
"deviceId": "mic",
"customConstraints": {
"autoGainControl": {"exact": False},
"echoCancellation": {"exact": False},
"noiseSuppression": {"exact": False},
},
},
},
},
"publishing": {
"camera": {
"sendSettings": {
"maxQuality": "low",
"encodings": {
"low": {
"maxBitrate": 250000,
"scaleResolutionDownBy": 1.333,
"maxFramerate": 8,
}
},
}
}
},
},
)
self._my_participant_id = self.client.participants()["local"]["id"]
self.client.update_inputs(
{
"camera": {
"isEnabled": True,
"settings": {
"deviceId": "camera",
},
},
"microphone": {
"isEnabled": True,
"settings": {
"deviceId": "mic",
"customConstraints": {
"autoGainControl": {"exact": False},
"echoCancellation": {"exact": False},
"noiseSuppression": {"exact": False},
},
},
},
self.client.update_subscription_profiles({
"base": {
"camera": "unsubscribed",
}
)
self.client.update_publishing(
{
"camera": {
"sendSettings": {
"maxQuality": "low",
"encodings": {
"low": {
"maxBitrate": 250000,
"scaleResolutionDownBy": 1.333,
"maxFramerate": 8,
}
},
}
}
}
)
})
if self._token and self._start_transcription:
self.client.start_transcription(self.transcription_settings)
@@ -242,8 +249,9 @@ class DailyTransportService(BaseTransportService, EventHandler):
participantId = message["participantId"]
elif "session_id" in message:
participantId = message["session_id"]
frame = TranscriptionQueueFrame(message["text"], participantId, message["timestamp"])
asyncio.run_coroutine_threadsafe(self.receive_queue.put(frame), self._loop)
if self._my_participant_id and participantId != self._my_participant_id:
frame = TranscriptionQueueFrame(message["text"], participantId, message["timestamp"])
asyncio.run_coroutine_threadsafe(self.receive_queue.put(frame), self._loop)
def on_transcription_stopped(self, stopped_by, stopped_by_error):
pass

View File

@@ -4,7 +4,7 @@ import math
import time
from typing import AsyncGenerator
import wave
from dailyai.queue_frame import AudioQueueFrame, QueueFrame, TranscriptionQueueFrame
from dailyai.pipeline.frames import AudioFrame, Frame, TranscriptionQueueFrame
from dailyai.services.ai_services import STTService
@@ -39,9 +39,9 @@ class LocalSTTService(STTService):
ww.setframerate(self._frame_rate)
self._wave = ww
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
"""Processes a frame of audio data, either buffering or transcribing it."""
if not isinstance(frame, AudioQueueFrame):
if not isinstance(frame, AudioFrame):
return
data = frame.data

View File

@@ -19,11 +19,13 @@ class OLLamaLLMService(LLMService):
model=self._model
)
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
async def run_llm_async(self, messages, tool_choice=None) -> AsyncGenerator[str, None]:
messages_for_log = json.dumps(messages)
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
chunks = await self._client.chat.completions.create(model=self._model, stream=True, messages=messages)
chunks = await self._client.chat.completions.create(
model=self._model, stream=True, messages=messages
)
async for chunk in chunks:
if len(chunk.choices) == 0:
continue
@@ -33,7 +35,7 @@ class OLLamaLLMService(LLMService):
async def run_llm(self, messages) -> str | None:
messages_for_log = json.dumps(messages)
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
self.logger.debug(f"Generating chat via ollama: {messages_for_log}")
response = await self._client.chat.completions.create(model=self._model, stream=False, messages=messages)
if response and len(response.choices) > 0:

View File

@@ -1,6 +1,7 @@
import aiohttp
from PIL import Image
import io
import time
from openai import AsyncOpenAI
import json
@@ -10,8 +11,8 @@ from dailyai.services.ai_services import LLMService, ImageGenService
class OpenAILLMService(LLMService):
def __init__(self, *, api_key, model="gpt-4"):
super().__init__()
def __init__(self, *, api_key, model="gpt-4", tools=None, messages=None):
super().__init__(tools=tools, messages=messages)
self._model = model
self._client = AsyncOpenAI(api_key=api_key)
@@ -19,19 +20,26 @@ class OpenAILLMService(LLMService):
return await self._client.chat.completions.create(
stream=stream,
messages=messages,
model=self._model
model=self._model,
tools=self._tools
)
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
async def run_llm_async(self, messages, tool_choice=None) -> AsyncGenerator[str, None]:
messages_for_log = json.dumps(messages)
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
chunks = await self._client.chat.completions.create(model=self._model, stream=True, messages=messages)
if self._tools:
tools = self._tools
else:
tools = None
start_time = time.time()
chunks = await self._client.chat.completions.create(model=self._model, stream=True, messages=messages, tools=tools, tool_choice=tool_choice)
self.logger.info(f"=== OpenAI LLM TTFB: {time.time() - start_time}")
async for chunk in chunks:
if len(chunk.choices) == 0:
continue
if chunk.choices[0].delta.content:
if chunk.choices[0].delta.tool_calls:
yield chunk.choices[0].delta.tool_calls[0]
elif chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
async def run_llm(self, messages) -> str | None:
@@ -57,7 +65,6 @@ class OpenAIImageGenService(ImageGenService):
):
super().__init__(image_size=image_size)
self._model = model
print(f"api key: {api_key}")
self._client = AsyncOpenAI(api_key=api_key)
self._aiohttp_session = aiohttp_session

View File

@@ -0,0 +1,128 @@
import asyncio
import doctest
import functools
import unittest
from dailyai.pipeline.aggregators import (
GatedAggregator,
ParallelPipeline,
SentenceAggregator,
StatelessTextTransformer,
)
from dailyai.pipeline.frames import (
AudioFrame,
EndFrame,
ImageFrame,
LLMResponseEndFrame,
LLMResponseStartFrame,
Frame,
TextFrame,
)
from dailyai.pipeline.pipeline import Pipeline
class TestDailyFrameAggregators(unittest.IsolatedAsyncioTestCase):
async def test_sentence_aggregator(self):
sentence = "Hello, world. How are you? I am fine"
expected_sentences = ["Hello, world.", " How are you?", " I am fine "]
aggregator = SentenceAggregator()
for word in sentence.split(" "):
async for sentence in aggregator.process_frame(TextFrame(word + " ")):
self.assertIsInstance(sentence, TextFrame)
if isinstance(sentence, TextFrame):
self.assertEqual(sentence.text, expected_sentences.pop(0))
async for sentence in aggregator.process_frame(EndFrame()):
if len(expected_sentences):
self.assertIsInstance(sentence, TextFrame)
if isinstance(sentence, TextFrame):
self.assertEqual(sentence.text, expected_sentences.pop(0))
else:
self.assertIsInstance(sentence, EndFrame)
self.assertEqual(expected_sentences, [])
async def test_gated_accumulator(self):
gated_aggregator = GatedAggregator(
gate_open_fn=lambda frame: isinstance(frame, ImageFrame),
gate_close_fn=lambda frame: isinstance(frame, LLMResponseStartFrame),
start_open=False,
)
frames = [
LLMResponseStartFrame(),
TextFrame("Hello, "),
TextFrame("world."),
AudioFrame(b"hello"),
ImageFrame("image", b"image"),
AudioFrame(b"world"),
LLMResponseEndFrame(),
]
expected_output_frames = [
ImageFrame("image", b"image"),
LLMResponseStartFrame(),
TextFrame("Hello, "),
TextFrame("world."),
AudioFrame(b"hello"),
AudioFrame(b"world"),
LLMResponseEndFrame(),
]
for frame in frames:
async for out_frame in gated_aggregator.process_frame(frame):
self.assertEqual(out_frame, expected_output_frames.pop(0))
self.assertEqual(expected_output_frames, [])
async def test_parallel_pipeline(self):
async def slow_add(sleep_time:float, name:str, x: str):
await asyncio.sleep(sleep_time)
return ":".join([x, name])
pipe1_annotation = StatelessTextTransformer(functools.partial(slow_add, 0.1, 'pipe1'))
pipe2_annotation = StatelessTextTransformer(functools.partial(slow_add, 0.2, 'pipe2'))
sentence_aggregator = SentenceAggregator()
add_dots = StatelessTextTransformer(lambda x: x + ".")
source = asyncio.Queue()
sink = asyncio.Queue()
pipeline = Pipeline(
[
ParallelPipeline(
[[pipe1_annotation], [sentence_aggregator, pipe2_annotation]]
),
add_dots,
],
source,
sink,
)
frames = [
TextFrame("Hello, "),
TextFrame("world."),
EndFrame()
]
expected_output_frames: list[Frame] = [
TextFrame(text='Hello, :pipe1.'),
TextFrame(text='world.:pipe1.'),
TextFrame(text='Hello, world.:pipe2.'),
EndFrame()
]
for frame in frames:
await source.put(frame)
await pipeline.run_pipeline()
while not sink.empty():
frame = await sink.get()
self.assertEqual(frame, expected_output_frames.pop(0))
def load_tests(loader, tests, ignore):
""" Run doctests on the aggregators module. """
from dailyai.pipeline import aggregators
tests.addTests(doctest.DocTestSuite(aggregators))
return tests

View File

@@ -3,11 +3,11 @@ import unittest
from typing import AsyncGenerator, Generator
from dailyai.services.ai_services import AIService
from dailyai.queue_frame import EndStreamQueueFrame, QueueFrame, TextQueueFrame
from dailyai.pipeline.frames import EndFrame, Frame, TextFrame
class SimpleAIService(AIService):
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
yield frame
@@ -16,11 +16,11 @@ class TestBaseAIService(unittest.IsolatedAsyncioTestCase):
service = SimpleAIService()
input_frames = [
TextQueueFrame("hello"),
EndStreamQueueFrame()
TextFrame("hello"),
EndFrame()
]
async def iterate_frames() -> AsyncGenerator[QueueFrame, None]:
async def iterate_frames() -> AsyncGenerator[Frame, None]:
for frame in input_frames:
yield frame
@@ -33,9 +33,9 @@ class TestBaseAIService(unittest.IsolatedAsyncioTestCase):
async def test_nonasync_input(self):
service = SimpleAIService()
input_frames = [TextQueueFrame("hello"), EndStreamQueueFrame()]
input_frames = [TextFrame("hello"), EndFrame()]
def iterate_frames() -> Generator[QueueFrame, None, None]:
def iterate_frames() -> Generator[Frame, None, None]:
for frame in input_frames:
yield frame

View File

@@ -3,7 +3,7 @@ import unittest
from unittest.mock import MagicMock, patch
from dailyai.queue_frame import AudioQueueFrame, ImageQueueFrame
from dailyai.pipeline.frames import AudioFrame, ImageFrame
class TestDailyTransport(unittest.IsolatedAsyncioTestCase):
@@ -42,6 +42,7 @@ class TestDailyTransport(unittest.IsolatedAsyncioTestCase):
await asyncio.wait_for(event.wait(), timeout=1)
self.assertTrue(event.is_set())
"""
@patch("dailyai.services.daily_transport_service.CallClient")
@patch("dailyai.services.daily_transport_service.Daily")
async def test_run_with_camera_and_mic(self, daily_mock, callclient_mock):
@@ -79,3 +80,4 @@ class TestDailyTransport(unittest.IsolatedAsyncioTestCase):
camera.write_frame.assert_called_with(b"test")
mic.write_frames.assert_called()
"""

View File

@@ -0,0 +1,60 @@
import asyncio
from doctest import OutputChecker
import unittest
from dailyai.pipeline.aggregators import SentenceAggregator, StatelessTextTransformer
from dailyai.pipeline.frames import EndFrame, TextFrame
from dailyai.pipeline.pipeline import Pipeline
class TestDailyPipeline(unittest.IsolatedAsyncioTestCase):
async def test_pipeline_simple(self):
aggregator = SentenceAggregator()
outgoing_queue = asyncio.Queue()
incoming_queue = asyncio.Queue()
pipeline = Pipeline([aggregator], incoming_queue, outgoing_queue)
await incoming_queue.put(TextFrame("Hello, "))
await incoming_queue.put(TextFrame("world."))
await incoming_queue.put(EndFrame())
await pipeline.run_pipeline()
self.assertEqual(await outgoing_queue.get(), TextFrame("Hello, world."))
self.assertIsInstance(await outgoing_queue.get(), EndFrame)
async def test_pipeline_multiple_stages(self):
sentence_aggregator = SentenceAggregator()
to_upper = StatelessTextTransformer(lambda x: x.upper())
add_space = StatelessTextTransformer(lambda x: x + " ")
outgoing_queue = asyncio.Queue()
incoming_queue = asyncio.Queue()
pipeline = Pipeline(
[add_space, sentence_aggregator, to_upper],
incoming_queue,
outgoing_queue
)
sentence = "Hello, world. It's me, a pipeline."
for c in sentence:
await incoming_queue.put(TextFrame(c))
await incoming_queue.put(EndFrame())
await pipeline.run_pipeline()
self.assertEqual(
await outgoing_queue.get(), TextFrame("H E L L O , W O R L D .")
)
self.assertEqual(
await outgoing_queue.get(),
TextFrame(" I T ' S M E , A P I P E L I N E ."),
)
# leftover little bit because of the spacing
self.assertEqual(
await outgoing_queue.get(),
TextFrame(" "),
)
self.assertIsInstance(await outgoing_queue.get(), EndFrame)

View File

@@ -28,7 +28,6 @@ async def main(room_url):
mic_enabled=True
)
"""
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
@@ -39,6 +38,7 @@ async def main(room_url):
user_id=os.getenv("PLAY_HT_USER_ID"),
voice_url=os.getenv("PLAY_HT_VOICE_URL"),
)
"""
# Register an event handler so we can play the audio when the participant joins.
@transport.event_handler("on_participant_joined")

View File

@@ -3,7 +3,7 @@ import os
import aiohttp
from dailyai.queue_frame import LLMMessagesQueueFrame
from dailyai.pipeline.frames import LLMMessagesQueueFrame
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService

View File

@@ -2,7 +2,7 @@ import asyncio
import aiohttp
import os
from dailyai.queue_frame import TextQueueFrame
from dailyai.pipeline.frames import TextFrame
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.fal_ai_services import FalImageGenService
from dailyai.services.open_ai_services import OpenAIImageGenService
@@ -39,7 +39,7 @@ async def main(room_url):
image_task = asyncio.create_task(
imagegen.run_to_queue(
transport.send_queue, [
TextQueueFrame("a cat in the style of picasso")]))
TextFrame("a cat in the style of picasso")]))
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):

View File

@@ -4,7 +4,7 @@ import os
import tkinter as tk
from dailyai.queue_frame import TextQueueFrame
from dailyai.pipeline.frames import TextFrame
from dailyai.services.fal_ai_services import FalImageGenService
from dailyai.services.local_transport_service import LocalTransportService
@@ -34,7 +34,7 @@ async def main():
)
image_task = asyncio.create_task(
imagegen.run_to_queue(
transport.send_queue, [TextQueueFrame("a cat in the style of picasso")]
transport.send_queue, [TextFrame("a cat in the style of picasso")]
)
)

View File

@@ -2,10 +2,11 @@ import asyncio
import os
import aiohttp
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.queue_frame import EndStreamQueueFrame, LLMMessagesQueueFrame
from dailyai.pipeline.frames import EndFrame, LLMMessagesQueueFrame
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from examples.foundational.support.runner import configure
@@ -41,13 +42,10 @@ async def main(room_url: str):
# will run in parallel with generating and speaking the audio for static text, so there's no delay to
# speak the LLM response.
buffer_queue = asyncio.Queue()
llm_response_task = asyncio.create_task(
elevenlabs_tts.run_to_queue(
buffer_queue,
llm.run([LLMMessagesQueueFrame(messages)]),
True,
)
)
source_queue = asyncio.Queue()
pipeline = Pipeline(source = source_queue, sink=buffer_queue, processors=[llm, elevenlabs_tts])
source_queue.put_nowait(LLMMessagesQueueFrame(messages))
pipeline_run_task = pipeline.run_pipeline()
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
@@ -58,10 +56,10 @@ async def main(room_url: str):
frame = await buffer_queue.get()
await transport.send_queue.put(frame)
buffer_queue.task_done()
if isinstance(frame, EndStreamQueueFrame):
if isinstance(frame, EndFrame):
break
await asyncio.gather(llm_response_task, buffer_to_send_queue())
await asyncio.gather(pipeline_run_task, buffer_to_send_queue())
await transport.stop_when_done()

View File

@@ -1,8 +1,11 @@
import asyncio
from re import S
import aiohttp
import os
from dailyai.pipeline.aggregators import GatedAggregator, LLMFullResponseAggregator, ParallelPipeline, SentenceAggregator
from dailyai.queue_frame import AudioQueueFrame, ImageQueueFrame
from dailyai.pipeline.frames import AudioFrame, EndFrame, ImageFrame, LLMMessagesQueueFrame, LLMResponseStartFrame
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.azure_ai_services import AzureLLMService, AzureImageGenServiceREST, AzureTTSService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.daily_transport_service import DailyTransportService
@@ -35,98 +38,54 @@ async def main(room_url):
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id="ErXwobaYiN019PkySvjV")
# tts = AzureTTSService(api_key=os.getenv("AZURE_SPEECH_API_KEY"), region=os.getenv("AZURE_SPEECH_REGION"))
dalle = FalImageGenService(
image_size="1024x1024",
aiohttp_session=session,
key_id=os.getenv("FAL_KEY_ID"),
key_secret=os.getenv("FAL_KEY_SECRET"))
# dalle = OpenAIImageGenService(aiohttp_session=session, api_key=os.getenv("OPENAI_DALLE_API_KEY"), image_size="1024x1024")
# dalle = AzureImageGenServiceREST(image_size="1024x1024", aiohttp_session=session, api_key=os.getenv("AZURE_DALLE_API_KEY"), endpoint=os.getenv("AZURE_DALLE_ENDPOINT"), model=os.getenv("AZURE_DALLE_MODEL"))
# Get a complete audio chunk from the given text. Splitting this into its own
# coroutine lets us ensure proper ordering of the audio chunks on the send queue.
async def get_all_audio(text):
all_audio = bytearray()
async for audio in tts.run_tts(text):
all_audio.extend(audio)
source_queue = asyncio.Queue()
return all_audio
async def get_month_data(month):
for month in ["January", "February"]:
messages = [
{
"role": "system",
"content": f"Describe a nature photograph suitable for use in a calendar, for the month of {month}. Include only the image description with no preamble. Limit the description to one sentence, please.",
}
]
await source_queue.put(LLMMessagesQueueFrame(messages))
image_description = await llm.run_llm(messages)
if not image_description:
return
await source_queue.put(EndFrame())
to_speak = f"{month}: {image_description}"
audio_task = asyncio.create_task(get_all_audio(to_speak))
image_task = asyncio.create_task(dalle.run_image_gen(image_description))
print(f"about to gather tasks for {month}")
(audio, image_data) = await asyncio.gather(
audio_task, image_task
)
print(f"about to return from get_month_data for {month}")
return {
"month": month,
"text": image_description,
"image_url": image_data[0],
"image": image_data[1],
"audio": audio,
}
gated_aggregator = GatedAggregator(
gate_open_fn=lambda frame: isinstance(frame, ImageFrame),
gate_close_fn=lambda frame: isinstance(frame, LLMResponseStartFrame),
start_open=False,
)
sentence_aggregator = SentenceAggregator()
llm_full_response_aggregator = LLMFullResponseAggregator()
pipeline = Pipeline(
source=source_queue,
sink=transport.send_queue,
processors=[
llm,
sentence_aggregator,
ParallelPipeline([[tts], [llm_full_response_aggregator, dalle]]),
gated_aggregator,
],
)
pipeline_task = pipeline.run_pipeline()
months: list[str] = [
"January",
"February",
"March",
"April",
"May",
"June"
]
"""
"February",
"March",
"April",
"May",
"June",
"July",
"August",
"September",
"October",
"November",
"December",
"""
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
# This will play the months in the order they're completed. The benefit
# is we'll have as little delay as possible before the first month, and
# likely no delay between months, but the months won't display in order.
for month_data_task in asyncio.as_completed(month_tasks):
print(f"month_data_task: {month_data_task}")
try:
data = await month_data_task
except Exception:
print("OMG EXCEPTION!!!!")
if data:
await transport.send_queue.put(
[
ImageQueueFrame(data["image_url"], data["image"]),
AudioQueueFrame(data["audio"]),
]
)
await pipeline_task
# wait for the output queue to be empty, then leave the meeting
await transport.stop_when_done()
month_tasks = [asyncio.create_task(get_month_data(month)) for month in months]
await transport.run()
if __name__ == "__main__":

View File

@@ -4,7 +4,7 @@ import asyncio
import tkinter as tk
import os
from dailyai.queue_frame import AudioQueueFrame, ImageQueueFrame
from dailyai.pipeline.frames import AudioFrame, ImageFrame
from dailyai.services.azure_ai_services import AzureLLMService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.fal_ai_services import FalImageGenService
@@ -103,8 +103,8 @@ async def main(room_url):
if data:
await transport.send_queue.put(
[
ImageQueueFrame(data["image_url"], data["image"]),
AudioQueueFrame(data["audio"]),
ImageFrame(data["image_url"], data["image"]),
AudioFrame(data["audio"]),
]
)

View File

@@ -1,9 +1,11 @@
import asyncio
import os
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.queue_aggregators import LLMAssistantContextAggregator, LLMContextAggregator, LLMUserContextAggregator
from dailyai.services.ai_services import FrameLogger
from dailyai.pipeline.aggregators import LLMAssistantContextAggregator, LLMUserContextAggregator
from examples.foundational.support.runner import configure
@@ -16,7 +18,8 @@ async def main(room_url: str, token):
start_transcription=True,
mic_enabled=True,
mic_sample_rate=16000,
camera_enabled=False
camera_enabled=False,
vad_enabled=True
)
llm = AzureLLMService(
@@ -26,7 +29,8 @@ async def main(room_url: str, token):
tts = AzureTTSService(
api_key=os.getenv("AZURE_SPEECH_API_KEY"),
region=os.getenv("AZURE_SPEECH_REGION"))
fl = FrameLogger("Inner")
fl2 = FrameLogger("Outer")
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
await tts.say("Hi, I'm listening!", transport.send_queue)
@@ -41,17 +45,19 @@ async def main(room_url: str, token):
tma_in = LLMUserContextAggregator(messages, transport._my_participant_id)
tma_out = LLMAssistantContextAggregator(messages, transport._my_participant_id)
await tts.run_to_queue(
transport.send_queue,
tma_out.run(
llm.run(
tma_in.run(
transport.get_receive_frames()
)
)
)
pipeline = Pipeline(
processors=[
fl,
tma_in,
llm,
fl2,
tma_out,
tts
],
)
await transport.run_uninterruptible_pipeline(pipeline)
transport.transcription_settings["extra"]["endpointing"] = True
transport.transcription_settings["extra"]["punctuate"] = True
await asyncio.gather(transport.run(), handle_transcriptions())

View File

@@ -8,12 +8,12 @@ import time
import urllib.parse
from PIL import Image
from dailyai.queue_frame import ImageQueueFrame, QueueFrame
from dailyai.pipeline.frames import ImageFrame, Frame
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.services.ai_services import AIService
from dailyai.queue_aggregators import LLMAssistantContextAggregator, LLMUserContextAggregator
from dailyai.pipeline.aggregators import LLMAssistantContextAggregator, LLMUserContextAggregator
from dailyai.services.fal_ai_services import FalImageGenService
from examples.foundational.support.runner import configure
@@ -27,10 +27,10 @@ class ImageSyncAggregator(AIService):
self._waiting_image = Image.open(waiting_path)
self._waiting_image_bytes = self._waiting_image.tobytes()
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
yield ImageQueueFrame(None, self._speaking_image_bytes)
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
yield ImageFrame(None, self._speaking_image_bytes)
yield frame
yield ImageQueueFrame(None, self._waiting_image_bytes)
yield ImageFrame(None, self._waiting_image_bytes)
async def main(room_url: str, token):

View File

@@ -1,14 +1,14 @@
import asyncio
import aiohttp
import os
from dailyai.conversation_wrappers import InterruptibleConversationWrapper
from dailyai.pipeline.aggregators import LLMAssistantContextAggregator, LLMResponseAggregator, LLMUserContextAggregator, UserResponseAggregator
from dailyai.queue_frame import StartStreamQueueFrame, TextQueueFrame
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.ai_services import FrameLogger
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from examples.foundational.support.runner import configure
from support.runner import configure
async def main(room_url: str, token):
@@ -22,6 +22,7 @@ async def main(room_url: str, token):
mic_enabled=True,
mic_sample_rate=16000,
camera_enabled=False,
vad_enabled=True,
)
llm = AzureLLMService(
@@ -32,17 +33,7 @@ async def main(room_url: str, token):
api_key=os.getenv("AZURE_SPEECH_API_KEY"),
region=os.getenv("AZURE_SPEECH_REGION"))
async def run_response(user_speech, tma_in, tma_out):
await tts.run_to_queue(
transport.send_queue,
tma_out.run(
llm.run(
tma_in.run(
[StartStreamQueueFrame(), TextQueueFrame(user_speech)]
)
)
),
)
pipeline = Pipeline([FrameLogger(), llm, FrameLogger(), tts])
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
@@ -53,14 +44,15 @@ async def main(room_url: str, token):
{"role": "system", "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way."},
]
conversation_wrapper = InterruptibleConversationWrapper(
frame_generator=transport.get_receive_frames,
runner=run_response,
interrupt=transport.interrupt,
my_participant_id=transport._my_participant_id,
llm_messages=messages,
await transport.run_interruptible_pipeline(
pipeline,
post_processor=LLMResponseAggregator(
messages
),
pre_processor=UserResponseAggregator(
messages
),
)
await conversation_wrapper.run_conversation()
transport.transcription_settings["extra"]["punctuate"] = False
await asyncio.gather(transport.run(), run_conversation())

View File

@@ -6,7 +6,7 @@ from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.fal_ai_services import FalImageGenService
from dailyai.queue_frame import AudioQueueFrame, ImageQueueFrame
from dailyai.pipeline.frames import AudioFrame, ImageFrame
from examples.foundational.support.runner import configure
@@ -90,8 +90,8 @@ async def main(room_url: str):
)
await transport.send_queue.put(
[
ImageQueueFrame(None, image_data1[1]),
AudioQueueFrame(audio1),
ImageFrame(None, image_data1[1]),
AudioFrame(audio1),
]
)
@@ -102,8 +102,8 @@ async def main(room_url: str):
)
await transport.send_queue.put(
[
ImageQueueFrame(None, image_data2[1]),
AudioQueueFrame(audio2),
ImageFrame(None, image_data2[1]),
AudioFrame(audio2),
]
)

View File

@@ -9,12 +9,12 @@ from PIL import Image
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.queue_aggregators import LLMUserContextAggregator, LLMAssistantContextAggregator
from dailyai.queue_frame import (
QueueFrame,
TextQueueFrame,
ImageQueueFrame,
SpriteQueueFrame,
from dailyai.pipeline.aggregators import LLMUserContextAggregator, LLMAssistantContextAggregator
from dailyai.pipeline.frames import (
Frame,
TextFrame,
ImageFrame,
SpriteFrame,
TranscriptionQueueFrame,
)
from dailyai.services.ai_services import AIService
@@ -45,11 +45,11 @@ for file in image_files:
sprites[file] = img.tobytes()
# When the bot isn't talking, show a static image of the cat listening
quiet_frame = ImageQueueFrame("", sprites["sc-listen-1.png"])
quiet_frame = ImageFrame("", sprites["sc-listen-1.png"])
# When the bot is talking, build an animation from two sprites
talking_list = [sprites['sc-default.png'], sprites['sc-talk.png']]
talking = [random.choice(talking_list) for x in range(30)]
talking_frame = SpriteQueueFrame(images=talking)
talking_frame = SpriteFrame(images=talking)
# TODO: Support "thinking" as soon as we get a valid transcript, while LLM is processing
thinking_list = [
@@ -57,14 +57,14 @@ thinking_list = [
sprites['sc-think-2.png'],
sprites['sc-think-3.png'],
sprites['sc-think-4.png']]
thinking_frame = SpriteQueueFrame(images=thinking_list)
thinking_frame = SpriteFrame(images=thinking_list)
class TranscriptFilter(AIService):
def __init__(self, bot_participant_id=None):
self.bot_participant_id = bot_participant_id
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
if isinstance(frame, TranscriptionQueueFrame):
if frame.participantId != self.bot_participant_id:
yield frame
@@ -75,11 +75,11 @@ class NameCheckFilter(AIService):
self.names = names
self.sentence = ""
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
content: str = ""
# TODO: split up transcription by participant
if isinstance(frame, TextQueueFrame):
if isinstance(frame, TextFrame):
content = frame.text
self.sentence += content
@@ -87,7 +87,7 @@ class NameCheckFilter(AIService):
if any(name in self.sentence for name in self.names):
out = self.sentence
self.sentence = ""
yield TextQueueFrame(out)
yield TextFrame(out)
else:
out = self.sentence
self.sentence = ""
@@ -97,7 +97,7 @@ class ImageSyncAggregator(AIService):
def __init__(self):
pass
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
yield talking_frame
yield frame
yield quiet_frame

View File

@@ -7,9 +7,9 @@ import wave
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.queue_aggregators import LLMContextAggregator, LLMUserContextAggregator, LLMAssistantContextAggregator
from dailyai.pipeline.aggregators import LLMContextAggregator, LLMUserContextAggregator, LLMAssistantContextAggregator
from dailyai.services.ai_services import AIService, FrameLogger
from dailyai.queue_frame import QueueFrame, AudioQueueFrame, LLMResponseEndQueueFrame, LLMMessagesQueueFrame
from dailyai.pipeline.frames import Frame, AudioFrame, LLMResponseEndFrame, LLMMessagesQueueFrame
from typing import AsyncGenerator
from examples.foundational.support.runner import configure
@@ -40,9 +40,9 @@ class OutboundSoundEffectWrapper(AIService):
def __init__(self):
pass
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if isinstance(frame, LLMResponseEndQueueFrame):
yield AudioQueueFrame(sounds["ding1.wav"])
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
if isinstance(frame, LLMResponseEndFrame):
yield AudioFrame(sounds["ding1.wav"])
# In case anything else up the stack needs it
yield frame
else:
@@ -53,9 +53,9 @@ class InboundSoundEffectWrapper(AIService):
def __init__(self):
pass
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
if isinstance(frame, LLMMessagesQueueFrame):
yield AudioQueueFrame(sounds["ding2.wav"])
yield AudioFrame(sounds["ding2.wav"])
# In case anything else up the stack needs it
yield frame
else:
@@ -86,7 +86,7 @@ async def main(room_url: str, token):
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
await tts.say("Hi, I'm listening!", transport.send_queue)
await transport.send_queue.put(AudioQueueFrame(sounds["ding1.wav"]))
await transport.send_queue.put(AudioFrame(sounds["ding1.wav"]))
async def handle_transcriptions():
messages = [

View File

@@ -1,7 +1,7 @@
import argparse
import asyncio
import wave
from dailyai.queue_frame import EndStreamQueueFrame, TranscriptionQueueFrame
from dailyai.pipeline.frames import EndFrame, TranscriptionQueueFrame
from dailyai.services.local_transport_service import LocalTransportService
from dailyai.services.whisper_ai_services import WhisperSTTService
@@ -30,7 +30,7 @@ async def main(room_url: str):
print("got item from queue", item)
if isinstance(item, TranscriptionQueueFrame):
print(item.text)
elif isinstance(item, EndStreamQueueFrame):
elif isinstance(item, EndFrame):
break
print("handle_transcription done")
@@ -38,7 +38,7 @@ async def main(room_url: str):
await stt.run_to_queue(
transcription_output_queue, transport.get_receive_frames()
)
await transcription_output_queue.put(EndStreamQueueFrame())
await transcription_output_queue.put(EndFrame())
print("handle speaker done.")
async def run_until_done():

View File

@@ -0,0 +1,376 @@
import aiohttp
import asyncio
import json
import random
import os
import re
import wave
from typing import AsyncGenerator
from PIL import Image
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.services.open_ai_services import OpenAILLMService
from dailyai.services.deepgram_ai_services import DeepgramTTSService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.pipeline.aggregators import LLMAssistantContextAggregator, LLMContextAggregator, LLMUserContextAggregator, UserResponseAggregator, LLMResponseAggregator
from support.runner import configure
from dailyai.pipeline.frames import LLMMessagesQueueFrame, TranscriptionQueueFrame, Frame, TextFrame, LLMFunctionCallFrame, LLMResponseEndFrame, StartFrame, AudioFrame, SpriteFrame, ImageFrame
from dailyai.services.ai_services import FrameLogger, AIService
import logging
logging.basicConfig(level=logging.DEBUG)
sounds = {}
sound_files = [
'clack-short.wav',
'clack.wav',
'clack-short-quiet.wav'
]
script_dir = os.path.dirname(__file__)
for file in sound_files:
# Build the full path to the image file
full_path = os.path.join(script_dir, "assets", file)
# Get the filename without the extension to use as the dictionary key
filename = os.path.splitext(os.path.basename(full_path))[0]
# Open the image and convert it to bytes
with wave.open(full_path) as audio_file:
sounds[file] = audio_file.readframes(-1)
steps = [
{
"prompt": "Start by introducing yourself. Then, ask the user to confirm their identity by telling you their birthday, including the year. When they answer with their birthday, call the verify_birthday function.",
"run_async": False,
"failed": "The user provided an incorrect birthday. Ask them for their birthday again. When they answer, call the verify_birthday function.", "tools": [{
"type": "function",
"function": {
"name": "verify_birthday",
"description": "Use this function to verify the user has provided their correct birthday.",
"parameters": {
"type": "object",
"properties": {
"birthday": {
"type": "string",
"description": "The user's birthdate, including the year. The user can provide it in any format, but convert it to YYYY-MM-DD format to call this function."
}
}
}
}
}]},
{
"prompt": "Next, thank the user for confirming their identity, then ask the user to list their current prescriptions. Each prescription needs to have a medication name and a dosage. Do not call the list_prescriptions function with any unknown dosages.",
"run_async": True,
"tools": [{
"type": "function",
"function": {
"name": "list_prescriptions",
"description": "Once the user has provided a list of their prescription medications, call this function.",
"parameters": {
"type": "object",
"properties": {
"prescriptions": {
"type": "array",
"items": {
"type": "object",
"properties": {
"medication": {
"type": "string",
"description": "The medication's name"
},
"dosage": {
"type": "string",
"description": "The prescription's dosage"
}
}
}
}
}
}
}
}]
},
{
"prompt": "Next, ask the user if they have any allergies. Once they have listed their allergies or confirmed they don't have any, call the list_allergies function.",
"run_async": True,
"tools": [
{
"type": "function",
"function": {
"name": "list_allergies",
"description": "Once the user has provided a list of their allergies, call this function.",
"parameters": {
"type": "object",
"properties": {
"allergies": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "What the user is allergic to"
}
}
}
}
}
}
}
}
]
},
{
"prompt": "Now ask the user if they have any medical conditions the doctor should know about. Once they've answered the question, call the list_conditions function.",
"run_async": True,
"tools": [
{
"type": "function",
"function": {
"name": "list_conditions",
"description": "Once the user has provided a list of their medical conditions, call this function.",
"parameters": {
"type": "object",
"properties": {
"conditions": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The user's medical condition"
}
}
}
}
}
}
}
},
],
},
{
"prompt": "Finally, ask the user the reason for their doctor visit today. Once they answer, call the list_visit_reasons function.",
"run_async": True,
"tools": [
{
"type": "function",
"function": {
"name": "list_visit_reasons",
"description": "Once the user has provided a list of the reasons they are visiting a doctor today, call this function.",
"parameters": {
"type": "object",
"properties": {
"visit_reasons": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The user's reason for visiting the doctor"
}
}
}
}
}
}
}
}
]
},
{"prompt": "Now, thank the user and end the conversation.",
"run_async": True, "tools": []},
{"prompt": "", "run_async": True, "tools": []}
]
current_step = 0
class TranscriptFilter(AIService):
def __init__(self, bot_participant_id=None):
super().__init__()
self.bot_participant_id = bot_participant_id
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
if isinstance(frame, TranscriptionQueueFrame):
if frame.participantId != self.bot_participant_id:
yield frame
class ChecklistProcessor(AIService):
def __init__(self, messages, llm, tools, *args, **kwargs):
super().__init__(*args, **kwargs)
self._messages = messages
self._llm = llm
self._tools = tools
self._function_name = ""
self._arguments = ""
self._id = "You are Jessica, an agent for a company called Tri-County Health Services. Your job is to collect important information from the user before their doctor visit. You're talking to Chad Bailey. You should address the user by their first name and be polite and professional. You're not a medical professional, so you shouldn't provide any advice. Keep your responses short. Your job is to collect information to give to a doctor. Don't make assumptions about what values to plug into functions. Ask for clarification if a user response is ambiguous."
self._acks = ["One sec.", "Let me confirm that.", "Thanks.", "OK."]
messages.append(
{"role": "system", "content": f"{self._id} {steps[0]['prompt']}"})
def verify_birthday(self, args):
return args['birthday'] == "1983-01-01"
def list_prescriptions(self, args):
# print(f"--- Prescriptions: {args['prescriptions']}\n")
pass
def list_allergies(self, args):
# print(f"--- Allergies: {args['allergies']}\n")
pass
def list_conditions(self, args):
# print(f"--- Medical Conditions: {args['conditions']}")
pass
def list_visit_reasons(self, args):
# print(f"Visit Reasons: {args['visit_reasons']}")
pass
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
global current_step
this_step = steps[current_step]
# TODO-CB: forcing a global here :/
self._tools.clear()
self._tools.extend(this_step['tools'])
if isinstance(frame, LLMFunctionCallFrame) and frame.function_name:
print(f"... Preparing function call: {frame.function_name}")
self._function_name = frame.function_name
if this_step['run_async']:
# Get the LLM talking about the next step before getting the rest
# of the function call completion
current_step += 1
# yield TextFrame(f"We should move on to Step {current_step}.")
self._messages.append({
"role": "system", "content": steps[current_step]['prompt']})
# yield LLMMessagesQueueFrame(self._messages)
yield LLMMessagesQueueFrame(self._messages)
async for frame in llm.process_frame(LLMMessagesQueueFrame(self._messages), tool_choice="none"):
yield frame
else:
# Insert a quick response while we run the function
# yield AudioFrame(sounds["clack-short-quiet.wav"])
pass
elif isinstance(frame, LLMFunctionCallFrame) and frame.arguments:
self._arguments += frame.arguments
elif isinstance(frame, LLMResponseEndFrame):
if self._function_name and self._arguments:
print(
f"--> Calling function: {self._function_name} with arguments:")
pretty_json = re.sub("\n", "\n ", json.dumps(
json.loads(self._arguments), indent=2))
print(f"--> {pretty_json}\n")
fn = getattr(self, self._function_name)
result = fn(json.loads(self._arguments))
self._function_name = ""
self._arguments = ""
if not this_step['run_async']:
if result:
current_step += 1
# yield TextFrame(f"We should move on to Step {current_step}.")
self._messages.append({
"role": "system", "content": steps[current_step]['prompt']})
# yield LLMMessagesQueueFrame(self._messages)
yield LLMMessagesQueueFrame(self._messages)
async for frame in llm.process_frame(LLMMessagesQueueFrame(self._messages), tool_choice="none"):
yield frame
else:
self._messages.append({
"role": "system", "content": this_step['failed']})
# yield LLMMessagesQueueFrame(self._messages)
yield LLMMessagesQueueFrame(self._messages)
async for frame in llm.process_frame(LLMMessagesQueueFrame(self._messages), tool_choice="none"):
yield frame
print(f"<-- Verify result: {result}\n")
else:
yield frame
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
global transport
global llm
global tts
transport = DailyTransportService(
room_url,
token,
"Intake Bot",
5,
mic_enabled=True,
mic_sample_rate=16000,
camera_enabled=False,
start_transcription=True,
vad_enabled=True
)
# TODO-CB: Go back to vad_enabled
messages = []
tools = []
# llm = AzureLLMService(api_key=os.getenv("AZURE_CHATGPT_API_KEY"), endpoint=os.getenv(
# "AZURE_CHATGPT_ENDPOINT"), model=os.getenv("AZURE_CHATGPT_MODEL"))
llm = OpenAILLMService(api_key=os.getenv(
"OPENAI_CHATGPT_API_KEY"), model="gpt-4-1106-preview", tools=tools) # gpt-4-1106-preview
# tts = AzureTTSService(api_key=os.getenv(
# "AZURE_SPEECH_API_KEY"), region=os.getenv("AZURE_SPEECH_REGION"))
tts = ElevenLabsTTSService(aiohttp_session=session, api_key=os.getenv(
"ELEVENLABS_API_KEY"), voice_id="XrExE9yKIg1WjnnlVkGX") # matilda
# tts = DeepgramTTSService(aiohttp_session=session, api_key=os.getenv(
# "DEEPGRAM_API_KEY"), voice="aura-asteria-en")
# lca = LLMContextAggregator(
# messages=messages, bot_participant_id=transport._my_participant_id)
checklist = ChecklistProcessor(messages, llm, tools)
fl = FrameLogger("FRAME LOGGER 1:")
fl2 = FrameLogger("FRAME LOGGER 2:")
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
fl = FrameLogger("first other participant")
# TODO-CB: Make sure this message gets into the context somehow
await tts.run_to_queue(
transport.send_queue,
llm.run([LLMMessagesQueueFrame(messages)]),
)
async def handle_intake():
pipeline = Pipeline(
processors=[
fl,
llm,
fl2,
checklist,
tts
]
)
await transport.run_interruptible_pipeline(pipeline,
post_processor=LLMResponseAggregator(
messages
),
pre_processor=UserResponseAggregator(messages)
)
transport.transcription_settings["extra"]["endpointing"] = True
transport.transcription_settings["extra"]["punctuate"] = True
try:
await asyncio.gather(transport.run(), handle_intake())
except (asyncio.CancelledError, KeyboardInterrupt):
print('whoops')
transport.stop()
if __name__ == "__main__":
(url, token) = configure()
asyncio.run(main(url, token))

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -7,7 +7,7 @@ import random
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.pipeline.frames import Frame, FrameType
from dailyai.services.fal_ai_services import FalImageGenService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
@@ -45,7 +45,7 @@ async def main(room_url: str, token):
print(f"finder: {finder}")
if finder >= 0:
async for audio in tts.run_tts(f"Resetting."):
transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio))
transport.output_queue.put(Frame(FrameType.AUDIO_FRAME, audio))
sentence = ""
continue
# todo: we could differentiate between transcriptions from different participants
@@ -54,12 +54,12 @@ async def main(room_url: str, token):
# TODO: Cache this audio
phrase = random.choice(["OK.", "Got it.", "Sure.", "You bet.", "Sure thing."])
async for audio in tts.run_tts(phrase):
transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio))
transport.output_queue.put(Frame(FrameType.AUDIO_FRAME, audio))
img_result = img.run_image_gen(sentence, "1024x1024")
awaited_img = await asyncio.gather(img_result)
transport.output_queue.put(
[
QueueFrame(FrameType.IMAGE_FRAME, awaited_img[0][1]),
Frame(FrameType.IMAGE_FRAME, awaited_img[0][1]),
]
)
@@ -72,7 +72,7 @@ async def main(room_url: str, token):
audio_generator = tts.run_tts(
f"Hello, {participant['info']['userName']}! Describe an image and I'll create it. To start over, just say 'start over'.")
async for audio in audio_generator:
transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio))
transport.output_queue.put(Frame(FrameType.AUDIO_FRAME, audio))
transport.transcription_settings["extra"]["punctuate"] = False
transport.transcription_settings["extra"]["endpointing"] = False

View File

@@ -5,9 +5,9 @@ import wave
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.queue_aggregators import LLMContextAggregator
from dailyai.pipeline.aggregators import LLMContextAggregator
from dailyai.services.ai_services import AIService, FrameLogger
from dailyai.queue_frame import QueueFrame, AudioQueueFrame, LLMResponseEndQueueFrame, LLMMessagesQueueFrame
from dailyai.pipeline.frames import Frame, AudioFrame, LLMResponseEndFrame, LLMMessagesQueueFrame
from typing import AsyncGenerator
from examples.foundational.support.runner import configure
@@ -34,9 +34,9 @@ class OutboundSoundEffectWrapper(AIService):
def __init__(self):
pass
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if isinstance(frame, LLMResponseEndQueueFrame):
yield AudioQueueFrame(sounds["ding1.wav"])
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
if isinstance(frame, LLMResponseEndFrame):
yield AudioFrame(sounds["ding1.wav"])
# In case anything else up the stack needs it
yield frame
else:
@@ -47,9 +47,9 @@ class InboundSoundEffectWrapper(AIService):
def __init__(self):
pass
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
if isinstance(frame, LLMMessagesQueueFrame):
yield AudioQueueFrame(sounds["ding2.wav"])
yield AudioFrame(sounds["ding2.wav"])
# In case anything else up the stack needs it
yield frame
else:
@@ -79,7 +79,7 @@ async def main(room_url: str, token, phone):
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
await tts.say("Hi, I'm listening!", transport.send_queue)
await transport.send_queue.put(AudioQueueFrame(sounds["ding1.wav"]))
await transport.send_queue.put(AudioFrame(sounds["ding1.wav"]))
async def handle_transcriptions():
messages = [