Merge pull request #33 from daily-co/pipeline-instead-of-nest

Pipeline instead of nest
This commit is contained in:
Moishe Lettvin
2024-03-05 11:04:20 -05:00
committed by GitHub
30 changed files with 668 additions and 344 deletions

View File

@@ -19,6 +19,7 @@ dependencies = [
"pyht",
"python-dotenv",
"torch",
"torchaudio",
"pyaudio",
"typing-extensions"
]

View File

@@ -1,77 +0,0 @@
import asyncio
import copy
import functools
from typing import AsyncGenerator, Awaitable, Callable
from dailyai.queue_aggregators import LLMAssistantContextAggregator, LLMContextAggregator, LLMUserContextAggregator
from dailyai.queue_frame import EndStreamQueueFrame, QueueFrame, TranscriptionQueueFrame
class InterruptibleConversationWrapper:
def __init__(
self,
frame_generator: Callable[[], AsyncGenerator[QueueFrame, None]],
runner: Callable[
[str, LLMContextAggregator, LLMContextAggregator], Awaitable[None]
],
interrupt: Callable[[], None],
my_participant_id: str | None,
llm_messages: list[dict[str, str]],
llm_context_aggregator_in=LLMUserContextAggregator,
llm_context_aggregator_out=LLMAssistantContextAggregator,
delay_before_speech_seconds: float = 1.0,
):
self._frame_generator: Callable[[], AsyncGenerator[QueueFrame, None]] = frame_generator
self._runner: Callable[
[str, LLMContextAggregator, LLMContextAggregator], Awaitable[None]
] = runner
self._interrupt: Callable[[], None] = interrupt
self._my_participant_id = my_participant_id
self._messages: list[dict[str, str]] = llm_messages
self._delay_before_speech_seconds = delay_before_speech_seconds
self._llm_context_aggregator_in = llm_context_aggregator_in
self._llm_context_aggregator_out = llm_context_aggregator_out
self._current_phrase = ""
def update_messages(self, new_messages: list[dict[str, str]], task: asyncio.Task | None):
if task:
if not task.cancelled():
self._current_phrase = ""
self._messages = new_messages
async def speak_after_delay(self, user_speech, messages):
await asyncio.sleep(self._delay_before_speech_seconds)
tma_in = self._llm_context_aggregator_in(
messages, self._my_participant_id, complete_sentences=False
)
tma_out = self._llm_context_aggregator_out(
messages, self._my_participant_id
)
await self._runner(user_speech, tma_in, tma_out)
async def run_conversation(self):
current_response_task = None
async for frame in self._frame_generator():
if isinstance(frame, EndStreamQueueFrame):
break
elif not isinstance(frame, TranscriptionQueueFrame):
continue
if frame.participantId == self._my_participant_id:
continue
if current_response_task:
current_response_task.cancel()
self._interrupt()
self._current_phrase += " " + frame.text
current_llm_messages = copy.deepcopy(self._messages)
current_response_task = asyncio.create_task(
self.speak_after_delay(self._current_phrase, current_llm_messages)
)
current_response_task.add_done_callback(
functools.partial(self.update_messages, current_llm_messages)
)

View File

@@ -0,0 +1,214 @@
import asyncio
import re
from tblib import Frame
from dailyai.pipeline.frame_processor import FrameProcessor
from dailyai.pipeline.frames import (
ControlQueueFrame,
EndParallelPipeQueueFrame,
EndStreamQueueFrame,
LLMMessagesQueueFrame,
LLMResponseEndQueueFrame,
QueueFrame,
TextQueueFrame,
TranscriptionQueueFrame,
)
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.ai_services import AIService
from typing import AsyncGenerator, Coroutine, List, Text
class LLMContextAggregator(AIService):
def __init__(
self,
messages: list[dict],
role: str,
bot_participant_id=None,
complete_sentences=True,
pass_through=True,
):
super().__init__()
self.messages = messages
self.bot_participant_id = bot_participant_id
self.role = role
self.sentence = ""
self.complete_sentences = complete_sentences
self.pass_through = pass_through
async def process_frame(
self, frame: QueueFrame
) -> AsyncGenerator[QueueFrame, None]:
# We don't do anything with non-text frames, pass it along to next in the pipeline.
if not isinstance(frame, TextQueueFrame):
yield frame
return
# Ignore transcription frames from the bot
if isinstance(frame, TranscriptionQueueFrame):
if frame.participantId == self.bot_participant_id:
return
# The common case for "pass through" is receiving frames from the LLM that we'll
# use to update the "assistant" LLM messages, but also passing the text frames
# along to a TTS service to be spoken to the user.
if self.pass_through:
yield frame
# TODO: split up transcription by participant
if self.complete_sentences:
# type: ignore -- the linter thinks this isn't a TextQueueFrame, even
# though we check it above
self.sentence += frame.text
if self.sentence.endswith((".", "?", "!")):
self.messages.append({"role": self.role, "content": self.sentence})
self.sentence = ""
yield LLMMessagesQueueFrame(self.messages)
else:
# type: ignore -- the linter thinks this isn't a TextQueueFrame, even
# though we check it above
self.messages.append({"role": self.role, "content": frame.text})
yield LLMMessagesQueueFrame(self.messages)
async def finalize(self) -> AsyncGenerator[QueueFrame, None]:
# Send any dangling words that weren't finished with punctuation.
if self.complete_sentences and self.sentence:
self.messages.append({"role": self.role, "content": self.sentence})
yield LLMMessagesQueueFrame(self.messages)
class LLMUserContextAggregator(LLMContextAggregator):
def __init__(
self, messages: list[dict], bot_participant_id=None, complete_sentences=True
):
super().__init__(
messages, "user", bot_participant_id, complete_sentences, pass_through=False
)
class LLMAssistantContextAggregator(LLMContextAggregator):
def __init__(
self, messages: list[dict], bot_participant_id=None, complete_sentences=True
):
super().__init__(
messages,
"assistant",
bot_participant_id,
complete_sentences,
pass_through=True,
)
class SentenceAggregator(FrameProcessor):
def __init__(self):
self.aggregation = ""
async def process_frame(
self, frame: QueueFrame
) -> AsyncGenerator[QueueFrame, None]:
if isinstance(frame, TextQueueFrame):
m = re.search("(.*[?.!])(.*)", frame.text)
if m:
yield TextQueueFrame(self.aggregation + m.group(1))
self.aggregation = m.group(2)
else:
self.aggregation += frame.text
elif isinstance(frame, EndStreamQueueFrame):
if self.aggregation:
yield TextQueueFrame(self.aggregation)
yield frame
else:
yield frame
class LLMFullResponseAggregator(FrameProcessor):
def __init__(self):
self.aggregation = ""
async def process_frame(
self, frame: QueueFrame
) -> AsyncGenerator[QueueFrame, None]:
if isinstance(frame, TextQueueFrame):
self.aggregation += frame.text
elif isinstance(frame, LLMResponseEndQueueFrame):
yield TextQueueFrame(self.aggregation)
self.aggregation = ""
else:
yield frame
class StatelessTextTransformer(FrameProcessor):
def __init__(self, transform_fn):
self.transform_fn = transform_fn
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if isinstance(frame, TextQueueFrame):
result = self.transform_fn(frame.text)
if isinstance(result, Coroutine):
result = await result
yield TextQueueFrame(result)
else:
yield frame
class ParallelPipeline(FrameProcessor):
def __init__(self, pipeline_definitions: List[List[FrameProcessor]]):
self.sources = [asyncio.Queue() for _ in pipeline_definitions]
self.sink: asyncio.Queue[QueueFrame] = asyncio.Queue()
self.pipelines: list[Pipeline] = [
Pipeline(
pipeline_definition,
source,
self.sink,
)
for source, pipeline_definition in zip(self.sources, pipeline_definitions)
]
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
for source in self.sources:
await source.put(frame)
await source.put(EndParallelPipeQueueFrame())
await asyncio.gather(*[pipeline.run_pipeline() for pipeline in self.pipelines])
seen_ids = set()
while not self.sink.empty():
frame = await self.sink.get()
# de-dup frames. Because the convention is to yield a frame that isn't processed,
# each pipeline will likely yield the same frame, so we will end up with _n_ copies
# of unprocessed frames where _n_ is the number of parallel pipes that don't
# process that frame.
if id(frame) in seen_ids:
continue
seen_ids.add(id(frame))
# Skip passing along EndParallelPipeQueueFrame, because we use them for our own flow control.
if not isinstance(frame, EndParallelPipeQueueFrame):
yield frame
class GatedAggregator(FrameProcessor):
def __init__(self, gate_open_fn, gate_close_fn, start_open):
self.gate_open_fn = gate_open_fn
self.gate_close_fn = gate_close_fn
self.gate_open = start_open
self.accumulator: List[QueueFrame] = []
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if self.gate_open:
if self.gate_close_fn(frame):
self.gate_open = False
else:
if self.gate_open_fn(frame):
self.gate_open = True
if self.gate_open:
yield frame
if self.accumulator:
for frame in self.accumulator:
yield frame
self.accumulator = []
else:
self.accumulator.append(frame)

View File

@@ -0,0 +1,37 @@
from abc import abstractmethod
from typing import AsyncGenerator
from dailyai.pipeline.frames import ControlQueueFrame, QueueFrame
"""
This is the base class for all frame processors. Frame processors consume a frame
and yield 0 or more frames. Generally frame processors are used as part of a pipeline,
where frames come from a source queue, are processed by a series of frame processors,
then placed on a sink queue.
By convention, FrameProcessors should immediately yield any frames they don't process.
Stateful FrameProcessors should watch for the EndStreamQueueFrame and finalize their
output, eg. yielding an unfinished sentence if they're aggregating LLM output to full
sentences. EndStreamQueueFrame is also a chance to clean up any services that need to
be closed, del'd, etc.
"""
class FrameProcessor:
@abstractmethod
async def process_frame(
self, frame: QueueFrame
) -> AsyncGenerator[QueueFrame, None]:
if isinstance(frame, ControlQueueFrame):
yield frame
@abstractmethod
async def finalize(self) -> AsyncGenerator[QueueFrame, None]:
# This is a trick for the interpreter (and linter) to know that this is a generator.
if False:
yield QueueFrame()
@abstractmethod
async def interrupted(self) -> None:
pass

View File

@@ -1,10 +1,10 @@
from enum import Enum
from dataclasses import dataclass
from typing import Any
class QueueFrame:
pass
def __eq__(self, other):
return isinstance(other, self.__class__)
class ControlQueueFrame(QueueFrame):
@@ -18,6 +18,13 @@ class StartStreamQueueFrame(ControlQueueFrame):
class EndStreamQueueFrame(ControlQueueFrame):
pass
class EndParallelPipeQueueFrame(ControlQueueFrame):
pass
class LLMResponseStartQueueFrame(QueueFrame):
pass
class LLMResponseEndQueueFrame(QueueFrame):
pass
@@ -61,6 +68,6 @@ class AppMessageQueueFrame(QueueFrame):
class UserStartedSpeakingFrame(QueueFrame):
pass
class UserStoppedSpeakingFrame(QueueFrame):
pass
pass

View File

@@ -0,0 +1,64 @@
import asyncio
from typing import AsyncGenerator, List
from dailyai.pipeline.frame_processor import FrameProcessor
from dailyai.pipeline.frames import EndParallelPipeQueueFrame, EndStreamQueueFrame, QueueFrame
"""
This class manages a pipe of FrameProcessors, and runs them in sequence. The "source"
and "sink" queues are managed by the caller. You can use this class stand-alone to
perform specialized processing, or you can use the Transport's run_pipeline method to
instantiate and run a pipeline with the Transport's sink and source queues.
"""
class Pipeline:
def __init__(
self,
processors: List[FrameProcessor],
source: asyncio.Queue | None = None,
sink: asyncio.Queue[QueueFrame] | None = None,
):
self.processors = processors
self.source: asyncio.Queue[QueueFrame] | None = source
self.sink: asyncio.Queue[QueueFrame] | None = sink
def set_source(self, source: asyncio.Queue[QueueFrame]):
self.source = source
def set_sink(self, sink: asyncio.Queue[QueueFrame]):
self.sink = sink
async def get_next_source_frame(self) -> AsyncGenerator[QueueFrame, None]:
if self.source is None:
raise ValueError("Source queue not set")
yield await self.source.get()
async def run_pipeline(self):
if self.source is None or self.sink is None:
raise ValueError("Source or sink queue not set")
try:
while True:
frame_generators = [self.get_next_source_frame()]
for processor in self.processors:
next_frame_generators = []
for frame_generator in frame_generators:
async for frame in frame_generator:
next_frame_generators.append(processor.process_frame(frame))
frame_generators = next_frame_generators
for frame_generator in frame_generators:
async for frame in frame_generator:
await self.sink.put(frame)
if isinstance(
frame, EndStreamQueueFrame
) or isinstance(
frame, EndParallelPipeQueueFrame
):
return
except asyncio.CancelledError:
# this means there's been an interruption, do any cleanup necessary here.
for processor in self.processors:
await processor.interrupted()
pass

View File

@@ -1,98 +0,0 @@
import asyncio
from dailyai.queue_frame import LLMMessagesQueueFrame, QueueFrame, TextQueueFrame, TranscriptionQueueFrame
from dailyai.services.ai_services import AIService
from typing import AsyncGenerator, List
class QueueTee:
async def run_to_queue_and_generate(
self,
output_queue: asyncio.Queue,
generator: AsyncGenerator[QueueFrame, None]
) -> AsyncGenerator[QueueFrame, None]:
async for frame in generator:
await output_queue.put(frame)
yield frame
async def run_to_queues(
self,
output_queues: List[asyncio.Queue],
generator: AsyncGenerator[QueueFrame, None]
):
async for frame in generator:
for queue in output_queues:
await queue.put(frame)
class LLMContextAggregator(AIService):
def __init__(
self,
messages: list[dict],
role: str,
bot_participant_id=None,
complete_sentences=True,
pass_through=True):
super().__init__()
self.messages = messages
self.bot_participant_id = bot_participant_id
self.role = role
self.sentence = ""
self.complete_sentences = complete_sentences
self.pass_through = pass_through
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
# We don't do anything with non-text frames, pass it along to next in the pipeline.
if not isinstance(frame, TextQueueFrame):
yield frame
return
# Ignore transcription frames from the bot
if isinstance(frame, TranscriptionQueueFrame):
if frame.participantId == self.bot_participant_id:
return
# The common case for "pass through" is receiving frames from the LLM that we'll
# use to update the "assistant" LLM messages, but also passing the text frames
# along to a TTS service to be spoken to the user.
if self.pass_through:
yield frame
# TODO: split up transcription by participant
if self.complete_sentences:
# type: ignore -- the linter thinks this isn't a TextQueueFrame, even
# though we check it above
self.sentence += frame.text
if self.sentence.endswith((".", "?", "!")):
self.messages.append({"role": self.role, "content": self.sentence})
self.sentence = ""
yield LLMMessagesQueueFrame(self.messages)
else:
# type: ignore -- the linter thinks this isn't a TextQueueFrame, even
# though we check it above
self.messages.append({"role": self.role, "content": frame.text})
yield LLMMessagesQueueFrame(self.messages)
async def finalize(self) -> AsyncGenerator[QueueFrame, None]:
# Send any dangling words that weren't finished with punctuation.
if self.complete_sentences and self.sentence:
self.messages.append({"role": self.role, "content": self.sentence})
yield LLMMessagesQueueFrame(self.messages)
class LLMUserContextAggregator(LLMContextAggregator):
def __init__(self,
messages: list[dict],
bot_participant_id=None,
complete_sentences=True):
super().__init__(messages, "user", bot_participant_id, complete_sentences, pass_through=False)
class LLMAssistantContextAggregator(LLMContextAggregator):
def __init__(
self, messages: list[dict], bot_participant_id=None, complete_sentences=True
):
super().__init__(
messages, "assistant", bot_participant_id, complete_sentences, pass_through=True
)

View File

@@ -3,25 +3,24 @@ import io
import logging
import time
import wave
from dailyai.pipeline.frame_processor import FrameProcessor
from dailyai.queue_frame import (
from dailyai.pipeline.frames import (
AudioQueueFrame,
ControlQueueFrame,
EndStreamQueueFrame,
ImageQueueFrame,
LLMMessagesQueueFrame,
LLMResponseEndQueueFrame,
LLMResponseStartQueueFrame,
QueueFrame,
TextQueueFrame,
TranscriptionQueueFrame,
)
from abc import abstractmethod
from typing import AsyncGenerator, AsyncIterable, BinaryIO, Iterable
from dataclasses import dataclass
from typing import AsyncGenerator, AsyncIterable, BinaryIO, Iterable, List
class AIService:
class AIService(FrameProcessor):
def __init__(self):
self.logger = logging.getLogger("dailyai")
@@ -67,17 +66,6 @@ class AIService:
self.logger.error("Exception occurred while running AI service", e)
raise e
@abstractmethod
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if isinstance(frame, ControlQueueFrame):
yield frame
@abstractmethod
async def finalize(self) -> AsyncGenerator[QueueFrame, None]:
# This is a trick for the interpreter (and linter) to know that this is a generator.
if False:
yield QueueFrame()
class LLMService(AIService):
@abstractmethod
@@ -90,6 +78,7 @@ class LLMService(AIService):
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if isinstance(frame, LLMMessagesQueueFrame):
yield LLMResponseStartQueueFrame()
async for text_chunk in self.run_llm_async(frame.messages):
yield TextQueueFrame(text_chunk)
yield LLMResponseEndQueueFrame()

View File

@@ -5,23 +5,24 @@ import logging
import numpy as np
import pyaudio
import torch
import torchaudio
import queue
import threading
import time
from typing import AsyncGenerator
from enum import Enum
from dailyai.queue_frame import (
from dailyai.pipeline.frames import (
AudioQueueFrame,
EndStreamQueueFrame,
ImageQueueFrame,
QueueFrame,
SpriteQueueFrame,
StartStreamQueueFrame,
TranscriptionQueueFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame
)
from dailyai.pipeline.pipeline import Pipeline
torch.set_num_threads(1)
@@ -89,10 +90,10 @@ class BaseTransportService():
self._vad_stop_s = kwargs.get("vad_stop_s") or 0.8
self._context = kwargs.get("context") or []
self._vad_enabled = kwargs.get("vad_enabled") or False
if self._vad_enabled and self._speaker_enabled:
raise Exception("Sorry, you can't use speaker_enabled and vad_enabled at the same time. Please set one to False.")
self._vad_samples = 1536
vad_frame_s = self._vad_samples / SAMPLE_RATE
self._vad_start_frames = round(self._vad_start_s / vad_frame_s)
@@ -101,13 +102,15 @@ class BaseTransportService():
self._vad_stopping_count = 0
self._vad_state = VADState.QUIET
self._user_is_speaking = False
duration_minutes = kwargs.get("duration_minutes") or 10
self._expiration = time.time() + duration_minutes * 60
self.send_queue = asyncio.Queue()
self.receive_queue = asyncio.Queue()
self.completed_queue = asyncio.Queue()
self._threadsafe_send_queue = queue.Queue()
self._images = None
@@ -136,7 +139,7 @@ class BaseTransportService():
if self._speaker_enabled:
self._receive_audio_thread = threading.Thread(target=self._receive_audio, daemon=True)
self._receive_audio_thread.start()
if self._vad_enabled:
self._vad_thread = threading.Thread(target=self._vad, daemon=True)
self._vad_thread.start()
@@ -163,10 +166,58 @@ class BaseTransportService():
if self._speaker_enabled:
self._receive_audio_thread.join()
if self._vad_enabled:
self._vad_thread.join()
async def run_uninterruptible_pipeline(self, pipeline:Pipeline):
pipeline.set_sink(self.send_queue)
pipeline.set_source(self.receive_queue)
await pipeline.run_pipeline()
async def run_interruptible_pipeline(self, pipeline:Pipeline, allow_interruptions=True, pre_processor=None, post_processor=None):
pipeline.set_sink(self.send_queue)
source_queue = asyncio.Queue()
pipeline.set_source(source_queue)
pipeline.set_sink(self.send_queue)
pipeline_task = asyncio.create_task(pipeline.run_pipeline())
async def yield_frame(frame:QueueFrame) -> AsyncGenerator[QueueFrame, None]:
yield frame
async def post_process(post_processor):
if not post_processor:
return
while True:
frame = await self.completed_queue.get()
print("post-processing frame: ", frame.__class__.__name__)
await post_processor.process_frame(frame)
if isinstance(frame, EndStreamQueueFrame):
break
post_process_task = asyncio.create_task(post_process(post_processor))
async for frame in self.get_receive_frames():
print("Got frame: ", frame.__class__.__name__)
if isinstance(frame, UserStartedSpeakingFrame):
pipeline_task.cancel()
self.interrupt()
pipeline_task = asyncio.create_task(pipeline.run_pipeline())
if pre_processor:
frame_generator = pre_processor.process_frame(frame)
else:
frame_generator = yield_frame(frame)
async for frame in frame_generator:
await source_queue.put(frame)
if isinstance(frame, EndStreamQueueFrame):
break
await asyncio.gather(pipeline_task, post_process_task)
def _post_run(self):
# Note that this function must be idempotent! It can be called multiple times
@@ -199,7 +250,7 @@ class BaseTransportService():
@abstractmethod
def _prerun(self):
pass
def _vad(self):
# CB: Starting silero VAD stuff
# TODO-CB: Probably need to force virtual speaker creation if we're
@@ -212,7 +263,7 @@ class BaseTransportService():
new_confidence = model(
torch.from_numpy(audio_float32), 16000).item()
speaking = new_confidence > 0.5
if speaking:
match self._vad_state:
case VADState.QUIET:
@@ -233,7 +284,7 @@ class BaseTransportService():
self._vad_stopping_count = 1
case VADState.STOPPING:
self._vad_stopping_count += 1
if self._vad_state == VADState.STARTING and self._vad_starting_count >= self._vad_start_frames:
asyncio.run_coroutine_threadsafe(
self.receive_queue.put(
@@ -249,7 +300,7 @@ class BaseTransportService():
)
self._vad_state = VADState.QUIET
self._vad_stopping_count = 0
async def _marshal_frames(self):
while True:
frame: QueueFrame | list = await self.send_queue.get()
@@ -326,6 +377,10 @@ class BaseTransportService():
if isinstance(frame, EndStreamQueueFrame):
self._logger.info("Stopping frame consumer thread")
self._threadsafe_send_queue.task_done()
if self._loop:
asyncio.run_coroutine_threadsafe(
self.completed_queue.put(frame), self._loop
)
return
# if interrupted, we just pull frames off the queue and discard them
@@ -350,6 +405,11 @@ class BaseTransportService():
elif len(b):
self.write_frame_to_mic(bytes(b))
b = bytearray()
if self._loop:
asyncio.run_coroutine_threadsafe(
self.completed_queue.put(frame), self._loop
)
else:
# if there are leftover audio bytes, write them now; failing to do so
# can cause static in the audio stream.

View File

@@ -7,7 +7,7 @@ import types
from functools import partial
from dailyai.queue_frame import (
from dailyai.pipeline.frames import (
TranscriptionQueueFrame,
)

View File

@@ -4,7 +4,7 @@ import math
import time
from typing import AsyncGenerator
import wave
from dailyai.queue_frame import AudioQueueFrame, QueueFrame, TranscriptionQueueFrame
from dailyai.pipeline.frames import AudioQueueFrame, QueueFrame, TranscriptionQueueFrame
from dailyai.services.ai_services import STTService

View File

@@ -0,0 +1,120 @@
import asyncio
import functools
import unittest
from dailyai.pipeline.aggregators import (
GatedAggregator,
ParallelPipeline,
SentenceAggregator,
StatelessTextTransformer,
)
from dailyai.pipeline.frames import (
AudioQueueFrame,
EndStreamQueueFrame,
ImageQueueFrame,
LLMResponseEndQueueFrame,
LLMResponseStartQueueFrame,
QueueFrame,
TextQueueFrame,
)
from dailyai.pipeline.pipeline import Pipeline
class TestDailyFrameAggregators(unittest.IsolatedAsyncioTestCase):
async def test_sentence_aggregator(self):
sentence = "Hello, world. How are you? I am fine"
expected_sentences = ["Hello, world.", " How are you?", " I am fine "]
aggregator = SentenceAggregator()
for word in sentence.split(" "):
async for sentence in aggregator.process_frame(TextQueueFrame(word + " ")):
self.assertIsInstance(sentence, TextQueueFrame)
if isinstance(sentence, TextQueueFrame):
self.assertEqual(sentence.text, expected_sentences.pop(0))
async for sentence in aggregator.process_frame(EndStreamQueueFrame()):
if len(expected_sentences):
self.assertIsInstance(sentence, TextQueueFrame)
if isinstance(sentence, TextQueueFrame):
self.assertEqual(sentence.text, expected_sentences.pop(0))
else:
self.assertIsInstance(sentence, EndStreamQueueFrame)
self.assertEqual(expected_sentences, [])
async def test_gated_accumulator(self):
gated_aggregator = GatedAggregator(
gate_open_fn=lambda frame: isinstance(frame, ImageQueueFrame),
gate_close_fn=lambda frame: isinstance(frame, LLMResponseStartQueueFrame),
start_open=False,
)
frames = [
LLMResponseStartQueueFrame(),
TextQueueFrame("Hello, "),
TextQueueFrame("world."),
AudioQueueFrame(b"hello"),
ImageQueueFrame("image", b"image"),
AudioQueueFrame(b"world"),
LLMResponseEndQueueFrame(),
]
expected_output_frames = [
ImageQueueFrame("image", b"image"),
LLMResponseStartQueueFrame(),
TextQueueFrame("Hello, "),
TextQueueFrame("world."),
AudioQueueFrame(b"hello"),
AudioQueueFrame(b"world"),
LLMResponseEndQueueFrame(),
]
for frame in frames:
async for out_frame in gated_aggregator.process_frame(frame):
self.assertEqual(out_frame, expected_output_frames.pop(0))
self.assertEqual(expected_output_frames, [])
async def test_parallel_pipeline(self):
async def slow_add(sleep_time:float, name:str, x: str):
await asyncio.sleep(sleep_time)
return ":".join([x, name])
pipe1_annotation = StatelessTextTransformer(functools.partial(slow_add, 0.1, 'pipe1'))
pipe2_annotation = StatelessTextTransformer(functools.partial(slow_add, 0.2, 'pipe2'))
sentence_aggregator = SentenceAggregator()
add_dots = StatelessTextTransformer(lambda x: x + ".")
source = asyncio.Queue()
sink = asyncio.Queue()
pipeline = Pipeline(
[
ParallelPipeline(
[[pipe1_annotation], [sentence_aggregator, pipe2_annotation]]
),
add_dots,
],
source,
sink,
)
frames = [
TextQueueFrame("Hello, "),
TextQueueFrame("world."),
EndStreamQueueFrame()
]
expected_output_frames: list[QueueFrame] = [
TextQueueFrame(text='Hello, :pipe1.'),
TextQueueFrame(text='world.:pipe1.'),
TextQueueFrame(text='Hello, world.:pipe2.'),
EndStreamQueueFrame()
]
for frame in frames:
await source.put(frame)
await pipeline.run_pipeline()
while not sink.empty():
frame = await sink.get()
self.assertEqual(frame, expected_output_frames.pop(0))

View File

@@ -3,7 +3,7 @@ import unittest
from typing import AsyncGenerator, Generator
from dailyai.services.ai_services import AIService
from dailyai.queue_frame import EndStreamQueueFrame, QueueFrame, TextQueueFrame
from dailyai.pipeline.frames import EndStreamQueueFrame, QueueFrame, TextQueueFrame
class SimpleAIService(AIService):

View File

@@ -3,7 +3,7 @@ import unittest
from unittest.mock import MagicMock, patch
from dailyai.queue_frame import AudioQueueFrame, ImageQueueFrame
from dailyai.pipeline.frames import AudioQueueFrame, ImageQueueFrame
class TestDailyTransport(unittest.IsolatedAsyncioTestCase):
@@ -42,6 +42,7 @@ class TestDailyTransport(unittest.IsolatedAsyncioTestCase):
await asyncio.wait_for(event.wait(), timeout=1)
self.assertTrue(event.is_set())
"""
@patch("dailyai.services.daily_transport_service.CallClient")
@patch("dailyai.services.daily_transport_service.Daily")
async def test_run_with_camera_and_mic(self, daily_mock, callclient_mock):
@@ -79,3 +80,4 @@ class TestDailyTransport(unittest.IsolatedAsyncioTestCase):
camera.write_frame.assert_called_with(b"test")
mic.write_frames.assert_called()
"""

View File

@@ -0,0 +1,60 @@
import asyncio
from doctest import OutputChecker
import unittest
from dailyai.pipeline.aggregators import SentenceAggregator, StatelessTextTransformer
from dailyai.pipeline.frames import EndStreamQueueFrame, TextQueueFrame
from dailyai.pipeline.pipeline import Pipeline
class TestDailyPipeline(unittest.IsolatedAsyncioTestCase):
async def test_pipeline_simple(self):
aggregator = SentenceAggregator()
outgoing_queue = asyncio.Queue()
incoming_queue = asyncio.Queue()
pipeline = Pipeline([aggregator], incoming_queue, outgoing_queue)
await incoming_queue.put(TextQueueFrame("Hello, "))
await incoming_queue.put(TextQueueFrame("world."))
await incoming_queue.put(EndStreamQueueFrame())
await pipeline.run_pipeline()
self.assertEqual(await outgoing_queue.get(), TextQueueFrame("Hello, world."))
self.assertIsInstance(await outgoing_queue.get(), EndStreamQueueFrame)
async def test_pipeline_multiple_stages(self):
sentence_aggregator = SentenceAggregator()
to_upper = StatelessTextTransformer(lambda x: x.upper())
add_space = StatelessTextTransformer(lambda x: x + " ")
outgoing_queue = asyncio.Queue()
incoming_queue = asyncio.Queue()
pipeline = Pipeline(
[add_space, sentence_aggregator, to_upper],
incoming_queue,
outgoing_queue
)
sentence = "Hello, world. It's me, a pipeline."
for c in sentence:
await incoming_queue.put(TextQueueFrame(c))
await incoming_queue.put(EndStreamQueueFrame())
await pipeline.run_pipeline()
self.assertEqual(
await outgoing_queue.get(), TextQueueFrame("H E L L O , W O R L D .")
)
self.assertEqual(
await outgoing_queue.get(),
TextQueueFrame(" I T ' S M E , A P I P E L I N E ."),
)
# leftover little bit because of the spacing
self.assertEqual(
await outgoing_queue.get(),
TextQueueFrame(" "),
)
self.assertIsInstance(await outgoing_queue.get(), EndStreamQueueFrame)

View File

@@ -3,7 +3,7 @@ import os
import aiohttp
from dailyai.queue_frame import LLMMessagesQueueFrame
from dailyai.pipeline.frames import LLMMessagesQueueFrame
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService

View File

@@ -2,7 +2,7 @@ import asyncio
import aiohttp
import os
from dailyai.queue_frame import TextQueueFrame
from dailyai.pipeline.frames import TextQueueFrame
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.fal_ai_services import FalImageGenService
from dailyai.services.open_ai_services import OpenAIImageGenService

View File

@@ -4,7 +4,7 @@ import os
import tkinter as tk
from dailyai.queue_frame import TextQueueFrame
from dailyai.pipeline.frames import TextQueueFrame
from dailyai.services.fal_ai_services import FalImageGenService
from dailyai.services.local_transport_service import LocalTransportService

View File

@@ -2,10 +2,11 @@ import asyncio
import os
import aiohttp
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.queue_frame import EndStreamQueueFrame, LLMMessagesQueueFrame
from dailyai.pipeline.frames import EndStreamQueueFrame, LLMMessagesQueueFrame
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from examples.foundational.support.runner import configure
@@ -41,13 +42,10 @@ async def main(room_url: str):
# will run in parallel with generating and speaking the audio for static text, so there's no delay to
# speak the LLM response.
buffer_queue = asyncio.Queue()
llm_response_task = asyncio.create_task(
elevenlabs_tts.run_to_queue(
buffer_queue,
llm.run([LLMMessagesQueueFrame(messages)]),
True,
)
)
source_queue = asyncio.Queue()
pipeline = Pipeline(source = source_queue, sink=buffer_queue, processors=[llm, elevenlabs_tts])
source_queue.put_nowait(LLMMessagesQueueFrame(messages))
pipeline_run_task = pipeline.run_pipeline()
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
@@ -61,7 +59,7 @@ async def main(room_url: str):
if isinstance(frame, EndStreamQueueFrame):
break
await asyncio.gather(llm_response_task, buffer_to_send_queue())
await asyncio.gather(pipeline_run_task, buffer_to_send_queue())
await transport.stop_when_done()

View File

@@ -1,8 +1,11 @@
import asyncio
from re import S
import aiohttp
import os
from dailyai.pipeline.aggregators import GatedAggregator, LLMFullResponseAggregator, ParallelPipeline, SentenceAggregator
from dailyai.queue_frame import AudioQueueFrame, ImageQueueFrame
from dailyai.pipeline.frames import AudioQueueFrame, EndStreamQueueFrame, ImageQueueFrame, LLMMessagesQueueFrame, LLMResponseStartQueueFrame
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.azure_ai_services import AzureLLMService, AzureImageGenServiceREST, AzureTTSService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.daily_transport_service import DailyTransportService
@@ -35,98 +38,54 @@ async def main(room_url):
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id="ErXwobaYiN019PkySvjV")
# tts = AzureTTSService(api_key=os.getenv("AZURE_SPEECH_API_KEY"), region=os.getenv("AZURE_SPEECH_REGION"))
dalle = FalImageGenService(
image_size="1024x1024",
aiohttp_session=session,
key_id=os.getenv("FAL_KEY_ID"),
key_secret=os.getenv("FAL_KEY_SECRET"))
# dalle = OpenAIImageGenService(aiohttp_session=session, api_key=os.getenv("OPENAI_DALLE_API_KEY"), image_size="1024x1024")
# dalle = AzureImageGenServiceREST(image_size="1024x1024", aiohttp_session=session, api_key=os.getenv("AZURE_DALLE_API_KEY"), endpoint=os.getenv("AZURE_DALLE_ENDPOINT"), model=os.getenv("AZURE_DALLE_MODEL"))
# Get a complete audio chunk from the given text. Splitting this into its own
# coroutine lets us ensure proper ordering of the audio chunks on the send queue.
async def get_all_audio(text):
all_audio = bytearray()
async for audio in tts.run_tts(text):
all_audio.extend(audio)
source_queue = asyncio.Queue()
return all_audio
async def get_month_data(month):
for month in ["January", "February"]:
messages = [
{
"role": "system",
"content": f"Describe a nature photograph suitable for use in a calendar, for the month of {month}. Include only the image description with no preamble. Limit the description to one sentence, please.",
}
]
await source_queue.put(LLMMessagesQueueFrame(messages))
image_description = await llm.run_llm(messages)
if not image_description:
return
await source_queue.put(EndStreamQueueFrame())
to_speak = f"{month}: {image_description}"
audio_task = asyncio.create_task(get_all_audio(to_speak))
image_task = asyncio.create_task(dalle.run_image_gen(image_description))
print(f"about to gather tasks for {month}")
(audio, image_data) = await asyncio.gather(
audio_task, image_task
)
print(f"about to return from get_month_data for {month}")
return {
"month": month,
"text": image_description,
"image_url": image_data[0],
"image": image_data[1],
"audio": audio,
}
gated_aggregator = GatedAggregator(
gate_open_fn=lambda frame: isinstance(frame, ImageQueueFrame),
gate_close_fn=lambda frame: isinstance(frame, LLMResponseStartQueueFrame),
start_open=False,
)
sentence_aggregator = SentenceAggregator()
llm_full_response_aggregator = LLMFullResponseAggregator()
pipeline = Pipeline(
source=source_queue,
sink=transport.send_queue,
processors=[
llm,
sentence_aggregator,
ParallelPipeline([[tts], [llm_full_response_aggregator, dalle]]),
gated_aggregator,
],
)
pipeline_task = pipeline.run_pipeline()
months: list[str] = [
"January",
"February",
"March",
"April",
"May",
"June"
]
"""
"February",
"March",
"April",
"May",
"June",
"July",
"August",
"September",
"October",
"November",
"December",
"""
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
# This will play the months in the order they're completed. The benefit
# is we'll have as little delay as possible before the first month, and
# likely no delay between months, but the months won't display in order.
for month_data_task in asyncio.as_completed(month_tasks):
print(f"month_data_task: {month_data_task}")
try:
data = await month_data_task
except Exception:
print("OMG EXCEPTION!!!!")
if data:
await transport.send_queue.put(
[
ImageQueueFrame(data["image_url"], data["image"]),
AudioQueueFrame(data["audio"]),
]
)
await pipeline_task
# wait for the output queue to be empty, then leave the meeting
await transport.stop_when_done()
month_tasks = [asyncio.create_task(get_month_data(month)) for month in months]
await transport.run()
if __name__ == "__main__":

View File

@@ -4,7 +4,7 @@ import asyncio
import tkinter as tk
import os
from dailyai.queue_frame import AudioQueueFrame, ImageQueueFrame
from dailyai.pipeline.frames import AudioQueueFrame, ImageQueueFrame
from dailyai.services.azure_ai_services import AzureLLMService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.fal_ai_services import FalImageGenService

View File

@@ -1,11 +1,12 @@
import asyncio
import os
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.services.ai_services import FrameLogger
from dailyai.queue_aggregators import LLMAssistantContextAggregator, LLMContextAggregator, LLMUserContextAggregator
from support.runner import configure
from dailyai.pipeline.aggregators import LLMAssistantContextAggregator, LLMUserContextAggregator
from examples.foundational.support.runner import configure
async def main(room_url: str, token):
@@ -44,22 +45,18 @@ async def main(room_url: str, token):
tma_in = LLMUserContextAggregator(messages, transport._my_participant_id)
tma_out = LLMAssistantContextAggregator(messages, transport._my_participant_id)
await tts.run_to_queue(
transport.send_queue,
tma_out.run(
fl2.run(
llm.run(
tma_in.run(
fl.run(
transport.get_receive_frames()
)
)
)
)
pipeline = Pipeline(
processors=[
fl,
tma_in,
llm,
fl2,
tma_out,
tts
],
)
await transport.run_pipeline(pipeline)
)
)
transport.transcription_settings["extra"]["endpointing"] = True
transport.transcription_settings["extra"]["punctuate"] = True
await asyncio.gather(transport.run(), handle_transcriptions())

View File

@@ -8,12 +8,12 @@ import time
import urllib.parse
from PIL import Image
from dailyai.queue_frame import ImageQueueFrame, QueueFrame
from dailyai.pipeline.frames import ImageQueueFrame, QueueFrame
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.services.ai_services import AIService
from dailyai.queue_aggregators import LLMAssistantContextAggregator, LLMUserContextAggregator
from dailyai.pipeline.aggregators import LLMAssistantContextAggregator, LLMUserContextAggregator
from dailyai.services.fal_ai_services import FalImageGenService
from examples.foundational.support.runner import configure

View File

@@ -1,12 +1,12 @@
import asyncio
import aiohttp
import os
from dailyai.conversation_wrappers import InterruptibleConversationWrapper
from dailyai.pipeline.aggregators import LLMAssistantContextAggregator, LLMUserContextAggregator
from dailyai.queue_frame import StartStreamQueueFrame, TextQueueFrame
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.ai_services import FrameLogger
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from examples.foundational.support.runner import configure
@@ -32,17 +32,7 @@ async def main(room_url: str, token):
api_key=os.getenv("AZURE_SPEECH_API_KEY"),
region=os.getenv("AZURE_SPEECH_REGION"))
async def run_response(user_speech, tma_in, tma_out):
await tts.run_to_queue(
transport.send_queue,
tma_out.run(
llm.run(
tma_in.run(
[StartStreamQueueFrame(), TextQueueFrame(user_speech)]
)
)
),
)
pipeline = Pipeline([FrameLogger(), llm, FrameLogger(), tts])
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
@@ -53,14 +43,15 @@ async def main(room_url: str, token):
{"role": "system", "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way."},
]
conversation_wrapper = InterruptibleConversationWrapper(
frame_generator=transport.get_receive_frames,
runner=run_response,
interrupt=transport.interrupt,
my_participant_id=transport._my_participant_id,
llm_messages=messages,
await transport.run_interruptible_pipeline(
pipeline,
post_processor=LLMAssistantContextAggregator(
messages, transport._my_participant_id
),
pre_processor=LLMUserContextAggregator(
messages, transport._my_participant_id, complete_sentences=False
),
)
await conversation_wrapper.run_conversation()
transport.transcription_settings["extra"]["punctuate"] = False
await asyncio.gather(transport.run(), run_conversation())

View File

@@ -6,7 +6,7 @@ from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.fal_ai_services import FalImageGenService
from dailyai.queue_frame import AudioQueueFrame, ImageQueueFrame
from dailyai.pipeline.frames import AudioQueueFrame, ImageQueueFrame
from examples.foundational.support.runner import configure

View File

@@ -9,8 +9,8 @@ from PIL import Image
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.queue_aggregators import LLMUserContextAggregator, LLMAssistantContextAggregator
from dailyai.queue_frame import (
from dailyai.pipeline.aggregators import LLMUserContextAggregator, LLMAssistantContextAggregator
from dailyai.pipeline.frames import (
QueueFrame,
TextQueueFrame,
ImageQueueFrame,

View File

@@ -7,9 +7,9 @@ import wave
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.queue_aggregators import LLMContextAggregator, LLMUserContextAggregator, LLMAssistantContextAggregator
from dailyai.pipeline.aggregators import LLMContextAggregator, LLMUserContextAggregator, LLMAssistantContextAggregator
from dailyai.services.ai_services import AIService, FrameLogger
from dailyai.queue_frame import QueueFrame, AudioQueueFrame, LLMResponseEndQueueFrame, LLMMessagesQueueFrame
from dailyai.pipeline.frames import QueueFrame, AudioQueueFrame, LLMResponseEndQueueFrame, LLMMessagesQueueFrame
from typing import AsyncGenerator
from examples.foundational.support.runner import configure

View File

@@ -1,7 +1,7 @@
import argparse
import asyncio
import wave
from dailyai.queue_frame import EndStreamQueueFrame, TranscriptionQueueFrame
from dailyai.pipeline.frames import EndStreamQueueFrame, TranscriptionQueueFrame
from dailyai.services.local_transport_service import LocalTransportService
from dailyai.services.whisper_ai_services import WhisperSTTService

View File

@@ -7,7 +7,7 @@ import random
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.pipeline.frames import QueueFrame, FrameType
from dailyai.services.fal_ai_services import FalImageGenService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService

View File

@@ -5,9 +5,9 @@ import wave
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.queue_aggregators import LLMContextAggregator
from dailyai.pipeline.aggregators import LLMContextAggregator
from dailyai.services.ai_services import AIService, FrameLogger
from dailyai.queue_frame import QueueFrame, AudioQueueFrame, LLMResponseEndQueueFrame, LLMMessagesQueueFrame
from dailyai.pipeline.frames import QueueFrame, AudioQueueFrame, LLMResponseEndQueueFrame, LLMMessagesQueueFrame
from typing import AsyncGenerator
from examples.foundational.support.runner import configure