diff --git a/pyproject.toml b/pyproject.toml index e6aa7e5a9..2f5d8dc95 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,7 @@ dependencies = [ "pyht", "python-dotenv", "torch", + "torchaudio", "pyaudio", "typing-extensions" ] diff --git a/src/dailyai/conversation_wrappers.py b/src/dailyai/conversation_wrappers.py deleted file mode 100644 index 7f688477c..000000000 --- a/src/dailyai/conversation_wrappers.py +++ /dev/null @@ -1,77 +0,0 @@ -import asyncio -import copy -import functools -from typing import AsyncGenerator, Awaitable, Callable -from dailyai.queue_aggregators import LLMAssistantContextAggregator, LLMContextAggregator, LLMUserContextAggregator -from dailyai.queue_frame import EndStreamQueueFrame, QueueFrame, TranscriptionQueueFrame - - -class InterruptibleConversationWrapper: - - def __init__( - self, - frame_generator: Callable[[], AsyncGenerator[QueueFrame, None]], - runner: Callable[ - [str, LLMContextAggregator, LLMContextAggregator], Awaitable[None] - ], - interrupt: Callable[[], None], - my_participant_id: str | None, - llm_messages: list[dict[str, str]], - llm_context_aggregator_in=LLMUserContextAggregator, - llm_context_aggregator_out=LLMAssistantContextAggregator, - delay_before_speech_seconds: float = 1.0, - ): - self._frame_generator: Callable[[], AsyncGenerator[QueueFrame, None]] = frame_generator - self._runner: Callable[ - [str, LLMContextAggregator, LLMContextAggregator], Awaitable[None] - ] = runner - self._interrupt: Callable[[], None] = interrupt - self._my_participant_id = my_participant_id - self._messages: list[dict[str, str]] = llm_messages - self._delay_before_speech_seconds = delay_before_speech_seconds - self._llm_context_aggregator_in = llm_context_aggregator_in - self._llm_context_aggregator_out = llm_context_aggregator_out - - self._current_phrase = "" - - def update_messages(self, new_messages: list[dict[str, str]], task: asyncio.Task | None): - if task: - if not task.cancelled(): - self._current_phrase = "" - self._messages = new_messages - - async def speak_after_delay(self, user_speech, messages): - await asyncio.sleep(self._delay_before_speech_seconds) - tma_in = self._llm_context_aggregator_in( - messages, self._my_participant_id, complete_sentences=False - ) - tma_out = self._llm_context_aggregator_out( - messages, self._my_participant_id - ) - - await self._runner(user_speech, tma_in, tma_out) - - async def run_conversation(self): - current_response_task = None - - async for frame in self._frame_generator(): - if isinstance(frame, EndStreamQueueFrame): - break - elif not isinstance(frame, TranscriptionQueueFrame): - continue - - if frame.participantId == self._my_participant_id: - continue - - if current_response_task: - current_response_task.cancel() - self._interrupt() - - self._current_phrase += " " + frame.text - current_llm_messages = copy.deepcopy(self._messages) - current_response_task = asyncio.create_task( - self.speak_after_delay(self._current_phrase, current_llm_messages) - ) - current_response_task.add_done_callback( - functools.partial(self.update_messages, current_llm_messages) - ) diff --git a/src/dailyai/pipeline/aggregators.py b/src/dailyai/pipeline/aggregators.py new file mode 100644 index 000000000..9b244ef00 --- /dev/null +++ b/src/dailyai/pipeline/aggregators.py @@ -0,0 +1,214 @@ +import asyncio +import re + +from tblib import Frame +from dailyai.pipeline.frame_processor import FrameProcessor + +from dailyai.pipeline.frames import ( + ControlQueueFrame, + EndParallelPipeQueueFrame, + EndStreamQueueFrame, + LLMMessagesQueueFrame, + LLMResponseEndQueueFrame, + QueueFrame, + TextQueueFrame, + TranscriptionQueueFrame, +) +from dailyai.pipeline.pipeline import Pipeline +from dailyai.services.ai_services import AIService + +from typing import AsyncGenerator, Coroutine, List, Text + + +class LLMContextAggregator(AIService): + def __init__( + self, + messages: list[dict], + role: str, + bot_participant_id=None, + complete_sentences=True, + pass_through=True, + ): + super().__init__() + self.messages = messages + self.bot_participant_id = bot_participant_id + self.role = role + self.sentence = "" + self.complete_sentences = complete_sentences + self.pass_through = pass_through + + async def process_frame( + self, frame: QueueFrame + ) -> AsyncGenerator[QueueFrame, None]: + # We don't do anything with non-text frames, pass it along to next in the pipeline. + if not isinstance(frame, TextQueueFrame): + yield frame + return + + # Ignore transcription frames from the bot + if isinstance(frame, TranscriptionQueueFrame): + if frame.participantId == self.bot_participant_id: + return + + # The common case for "pass through" is receiving frames from the LLM that we'll + # use to update the "assistant" LLM messages, but also passing the text frames + # along to a TTS service to be spoken to the user. + if self.pass_through: + yield frame + + # TODO: split up transcription by participant + if self.complete_sentences: + # type: ignore -- the linter thinks this isn't a TextQueueFrame, even + # though we check it above + self.sentence += frame.text + if self.sentence.endswith((".", "?", "!")): + self.messages.append({"role": self.role, "content": self.sentence}) + self.sentence = "" + yield LLMMessagesQueueFrame(self.messages) + else: + # type: ignore -- the linter thinks this isn't a TextQueueFrame, even + # though we check it above + self.messages.append({"role": self.role, "content": frame.text}) + yield LLMMessagesQueueFrame(self.messages) + + async def finalize(self) -> AsyncGenerator[QueueFrame, None]: + # Send any dangling words that weren't finished with punctuation. + if self.complete_sentences and self.sentence: + self.messages.append({"role": self.role, "content": self.sentence}) + yield LLMMessagesQueueFrame(self.messages) + + +class LLMUserContextAggregator(LLMContextAggregator): + def __init__( + self, messages: list[dict], bot_participant_id=None, complete_sentences=True + ): + super().__init__( + messages, "user", bot_participant_id, complete_sentences, pass_through=False + ) + + +class LLMAssistantContextAggregator(LLMContextAggregator): + def __init__( + self, messages: list[dict], bot_participant_id=None, complete_sentences=True + ): + super().__init__( + messages, + "assistant", + bot_participant_id, + complete_sentences, + pass_through=True, + ) + + +class SentenceAggregator(FrameProcessor): + + def __init__(self): + self.aggregation = "" + + async def process_frame( + self, frame: QueueFrame + ) -> AsyncGenerator[QueueFrame, None]: + if isinstance(frame, TextQueueFrame): + m = re.search("(.*[?.!])(.*)", frame.text) + if m: + yield TextQueueFrame(self.aggregation + m.group(1)) + self.aggregation = m.group(2) + else: + self.aggregation += frame.text + elif isinstance(frame, EndStreamQueueFrame): + if self.aggregation: + yield TextQueueFrame(self.aggregation) + yield frame + else: + yield frame + + +class LLMFullResponseAggregator(FrameProcessor): + def __init__(self): + self.aggregation = "" + + async def process_frame( + self, frame: QueueFrame + ) -> AsyncGenerator[QueueFrame, None]: + if isinstance(frame, TextQueueFrame): + self.aggregation += frame.text + elif isinstance(frame, LLMResponseEndQueueFrame): + yield TextQueueFrame(self.aggregation) + self.aggregation = "" + else: + yield frame + + +class StatelessTextTransformer(FrameProcessor): + def __init__(self, transform_fn): + self.transform_fn = transform_fn + + async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: + if isinstance(frame, TextQueueFrame): + result = self.transform_fn(frame.text) + if isinstance(result, Coroutine): + result = await result + + yield TextQueueFrame(result) + else: + yield frame + +class ParallelPipeline(FrameProcessor): + def __init__(self, pipeline_definitions: List[List[FrameProcessor]]): + self.sources = [asyncio.Queue() for _ in pipeline_definitions] + self.sink: asyncio.Queue[QueueFrame] = asyncio.Queue() + self.pipelines: list[Pipeline] = [ + Pipeline( + pipeline_definition, + source, + self.sink, + ) + for source, pipeline_definition in zip(self.sources, pipeline_definitions) + ] + + async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: + for source in self.sources: + await source.put(frame) + await source.put(EndParallelPipeQueueFrame()) + + await asyncio.gather(*[pipeline.run_pipeline() for pipeline in self.pipelines]) + + seen_ids = set() + while not self.sink.empty(): + frame = await self.sink.get() + + # de-dup frames. Because the convention is to yield a frame that isn't processed, + # each pipeline will likely yield the same frame, so we will end up with _n_ copies + # of unprocessed frames where _n_ is the number of parallel pipes that don't + # process that frame. + if id(frame) in seen_ids: + continue + seen_ids.add(id(frame)) + + # Skip passing along EndParallelPipeQueueFrame, because we use them for our own flow control. + if not isinstance(frame, EndParallelPipeQueueFrame): + yield frame + +class GatedAggregator(FrameProcessor): + def __init__(self, gate_open_fn, gate_close_fn, start_open): + self.gate_open_fn = gate_open_fn + self.gate_close_fn = gate_close_fn + self.gate_open = start_open + self.accumulator: List[QueueFrame] = [] + + async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: + if self.gate_open: + if self.gate_close_fn(frame): + self.gate_open = False + else: + if self.gate_open_fn(frame): + self.gate_open = True + + if self.gate_open: + yield frame + if self.accumulator: + for frame in self.accumulator: + yield frame + self.accumulator = [] + else: + self.accumulator.append(frame) diff --git a/src/dailyai/pipeline/frame_processor.py b/src/dailyai/pipeline/frame_processor.py new file mode 100644 index 000000000..5bebd9be2 --- /dev/null +++ b/src/dailyai/pipeline/frame_processor.py @@ -0,0 +1,37 @@ +from abc import abstractmethod +from typing import AsyncGenerator + +from dailyai.pipeline.frames import ControlQueueFrame, QueueFrame + +""" +This is the base class for all frame processors. Frame processors consume a frame +and yield 0 or more frames. Generally frame processors are used as part of a pipeline, +where frames come from a source queue, are processed by a series of frame processors, +then placed on a sink queue. + +By convention, FrameProcessors should immediately yield any frames they don't process. + +Stateful FrameProcessors should watch for the EndStreamQueueFrame and finalize their +output, eg. yielding an unfinished sentence if they're aggregating LLM output to full +sentences. EndStreamQueueFrame is also a chance to clean up any services that need to +be closed, del'd, etc. +""" + +class FrameProcessor: + @abstractmethod + async def process_frame( + self, frame: QueueFrame + ) -> AsyncGenerator[QueueFrame, None]: + if isinstance(frame, ControlQueueFrame): + yield frame + + @abstractmethod + async def finalize(self) -> AsyncGenerator[QueueFrame, None]: + # This is a trick for the interpreter (and linter) to know that this is a generator. + if False: + yield QueueFrame() + + @abstractmethod + async def interrupted(self) -> None: + pass + diff --git a/src/dailyai/queue_frame.py b/src/dailyai/pipeline/frames.py similarity index 83% rename from src/dailyai/queue_frame.py rename to src/dailyai/pipeline/frames.py index dc111dcbe..e8a31d0eb 100644 --- a/src/dailyai/queue_frame.py +++ b/src/dailyai/pipeline/frames.py @@ -1,10 +1,10 @@ -from enum import Enum from dataclasses import dataclass from typing import Any class QueueFrame: - pass + def __eq__(self, other): + return isinstance(other, self.__class__) class ControlQueueFrame(QueueFrame): @@ -18,6 +18,13 @@ class StartStreamQueueFrame(ControlQueueFrame): class EndStreamQueueFrame(ControlQueueFrame): pass +class EndParallelPipeQueueFrame(ControlQueueFrame): + pass + + +class LLMResponseStartQueueFrame(QueueFrame): + pass + class LLMResponseEndQueueFrame(QueueFrame): pass @@ -61,6 +68,6 @@ class AppMessageQueueFrame(QueueFrame): class UserStartedSpeakingFrame(QueueFrame): pass - + class UserStoppedSpeakingFrame(QueueFrame): - pass \ No newline at end of file + pass diff --git a/src/dailyai/pipeline/pipeline.py b/src/dailyai/pipeline/pipeline.py new file mode 100644 index 000000000..0ef2ab72a --- /dev/null +++ b/src/dailyai/pipeline/pipeline.py @@ -0,0 +1,64 @@ +import asyncio +from typing import AsyncGenerator, List +from dailyai.pipeline.frame_processor import FrameProcessor + +from dailyai.pipeline.frames import EndParallelPipeQueueFrame, EndStreamQueueFrame, QueueFrame + +""" +This class manages a pipe of FrameProcessors, and runs them in sequence. The "source" +and "sink" queues are managed by the caller. You can use this class stand-alone to +perform specialized processing, or you can use the Transport's run_pipeline method to +instantiate and run a pipeline with the Transport's sink and source queues. +""" + +class Pipeline: + + def __init__( + self, + processors: List[FrameProcessor], + source: asyncio.Queue | None = None, + sink: asyncio.Queue[QueueFrame] | None = None, + ): + self.processors = processors + self.source: asyncio.Queue[QueueFrame] | None = source + self.sink: asyncio.Queue[QueueFrame] | None = sink + + def set_source(self, source: asyncio.Queue[QueueFrame]): + self.source = source + + def set_sink(self, sink: asyncio.Queue[QueueFrame]): + self.sink = sink + + async def get_next_source_frame(self) -> AsyncGenerator[QueueFrame, None]: + if self.source is None: + raise ValueError("Source queue not set") + yield await self.source.get() + + async def run_pipeline(self): + if self.source is None or self.sink is None: + raise ValueError("Source or sink queue not set") + + try: + while True: + frame_generators = [self.get_next_source_frame()] + for processor in self.processors: + next_frame_generators = [] + for frame_generator in frame_generators: + async for frame in frame_generator: + next_frame_generators.append(processor.process_frame(frame)) + frame_generators = next_frame_generators + + for frame_generator in frame_generators: + async for frame in frame_generator: + await self.sink.put(frame) + if isinstance( + frame, EndStreamQueueFrame + ) or isinstance( + frame, EndParallelPipeQueueFrame + ): + return + except asyncio.CancelledError: + # this means there's been an interruption, do any cleanup necessary here. + for processor in self.processors: + await processor.interrupted() + pass diff --git a/src/dailyai/queue_aggregators.py b/src/dailyai/queue_aggregators.py deleted file mode 100644 index 182d7d479..000000000 --- a/src/dailyai/queue_aggregators.py +++ /dev/null @@ -1,98 +0,0 @@ -import asyncio - -from dailyai.queue_frame import LLMMessagesQueueFrame, QueueFrame, TextQueueFrame, TranscriptionQueueFrame -from dailyai.services.ai_services import AIService - -from typing import AsyncGenerator, List - - -class QueueTee: - async def run_to_queue_and_generate( - self, - output_queue: asyncio.Queue, - generator: AsyncGenerator[QueueFrame, None] - ) -> AsyncGenerator[QueueFrame, None]: - async for frame in generator: - await output_queue.put(frame) - yield frame - - async def run_to_queues( - self, - output_queues: List[asyncio.Queue], - generator: AsyncGenerator[QueueFrame, None] - ): - async for frame in generator: - for queue in output_queues: - await queue.put(frame) - - -class LLMContextAggregator(AIService): - def __init__( - self, - messages: list[dict], - role: str, - bot_participant_id=None, - complete_sentences=True, - pass_through=True): - super().__init__() - self.messages = messages - self.bot_participant_id = bot_participant_id - self.role = role - self.sentence = "" - self.complete_sentences = complete_sentences - self.pass_through = pass_through - - async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: - # We don't do anything with non-text frames, pass it along to next in the pipeline. - if not isinstance(frame, TextQueueFrame): - yield frame - return - - # Ignore transcription frames from the bot - if isinstance(frame, TranscriptionQueueFrame): - if frame.participantId == self.bot_participant_id: - return - - # The common case for "pass through" is receiving frames from the LLM that we'll - # use to update the "assistant" LLM messages, but also passing the text frames - # along to a TTS service to be spoken to the user. - if self.pass_through: - yield frame - - # TODO: split up transcription by participant - if self.complete_sentences: - # type: ignore -- the linter thinks this isn't a TextQueueFrame, even - # though we check it above - self.sentence += frame.text - if self.sentence.endswith((".", "?", "!")): - self.messages.append({"role": self.role, "content": self.sentence}) - self.sentence = "" - yield LLMMessagesQueueFrame(self.messages) - else: - # type: ignore -- the linter thinks this isn't a TextQueueFrame, even - # though we check it above - self.messages.append({"role": self.role, "content": frame.text}) - yield LLMMessagesQueueFrame(self.messages) - - async def finalize(self) -> AsyncGenerator[QueueFrame, None]: - # Send any dangling words that weren't finished with punctuation. - if self.complete_sentences and self.sentence: - self.messages.append({"role": self.role, "content": self.sentence}) - yield LLMMessagesQueueFrame(self.messages) - - -class LLMUserContextAggregator(LLMContextAggregator): - def __init__(self, - messages: list[dict], - bot_participant_id=None, - complete_sentences=True): - super().__init__(messages, "user", bot_participant_id, complete_sentences, pass_through=False) - - -class LLMAssistantContextAggregator(LLMContextAggregator): - def __init__( - self, messages: list[dict], bot_participant_id=None, complete_sentences=True - ): - super().__init__( - messages, "assistant", bot_participant_id, complete_sentences, pass_through=True - ) diff --git a/src/dailyai/services/ai_services.py b/src/dailyai/services/ai_services.py index 29b7d6488..13b069a1c 100644 --- a/src/dailyai/services/ai_services.py +++ b/src/dailyai/services/ai_services.py @@ -3,25 +3,24 @@ import io import logging import time import wave +from dailyai.pipeline.frame_processor import FrameProcessor -from dailyai.queue_frame import ( +from dailyai.pipeline.frames import ( AudioQueueFrame, - ControlQueueFrame, EndStreamQueueFrame, ImageQueueFrame, LLMMessagesQueueFrame, LLMResponseEndQueueFrame, + LLMResponseStartQueueFrame, QueueFrame, TextQueueFrame, TranscriptionQueueFrame, ) from abc import abstractmethod -from typing import AsyncGenerator, AsyncIterable, BinaryIO, Iterable -from dataclasses import dataclass +from typing import AsyncGenerator, AsyncIterable, BinaryIO, Iterable, List - -class AIService: +class AIService(FrameProcessor): def __init__(self): self.logger = logging.getLogger("dailyai") @@ -67,17 +66,6 @@ class AIService: self.logger.error("Exception occurred while running AI service", e) raise e - @abstractmethod - async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: - if isinstance(frame, ControlQueueFrame): - yield frame - - @abstractmethod - async def finalize(self) -> AsyncGenerator[QueueFrame, None]: - # This is a trick for the interpreter (and linter) to know that this is a generator. - if False: - yield QueueFrame() - class LLMService(AIService): @abstractmethod @@ -90,6 +78,7 @@ class LLMService(AIService): async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: if isinstance(frame, LLMMessagesQueueFrame): + yield LLMResponseStartQueueFrame() async for text_chunk in self.run_llm_async(frame.messages): yield TextQueueFrame(text_chunk) yield LLMResponseEndQueueFrame() diff --git a/src/dailyai/services/base_transport_service.py b/src/dailyai/services/base_transport_service.py index 9ee60f4bb..6ad51261d 100644 --- a/src/dailyai/services/base_transport_service.py +++ b/src/dailyai/services/base_transport_service.py @@ -5,23 +5,24 @@ import logging import numpy as np import pyaudio import torch -import torchaudio import queue import threading import time from typing import AsyncGenerator from enum import Enum -from dailyai.queue_frame import ( +from dailyai.pipeline.frames import ( AudioQueueFrame, EndStreamQueueFrame, ImageQueueFrame, QueueFrame, SpriteQueueFrame, StartStreamQueueFrame, + TranscriptionQueueFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame ) +from dailyai.pipeline.pipeline import Pipeline torch.set_num_threads(1) @@ -89,10 +90,10 @@ class BaseTransportService(): self._vad_stop_s = kwargs.get("vad_stop_s") or 0.8 self._context = kwargs.get("context") or [] self._vad_enabled = kwargs.get("vad_enabled") or False - + if self._vad_enabled and self._speaker_enabled: raise Exception("Sorry, you can't use speaker_enabled and vad_enabled at the same time. Please set one to False.") - + self._vad_samples = 1536 vad_frame_s = self._vad_samples / SAMPLE_RATE self._vad_start_frames = round(self._vad_start_s / vad_frame_s) @@ -101,13 +102,15 @@ class BaseTransportService(): self._vad_stopping_count = 0 self._vad_state = VADState.QUIET self._user_is_speaking = False - + duration_minutes = kwargs.get("duration_minutes") or 10 self._expiration = time.time() + duration_minutes * 60 self.send_queue = asyncio.Queue() self.receive_queue = asyncio.Queue() + self.completed_queue = asyncio.Queue() + self._threadsafe_send_queue = queue.Queue() self._images = None @@ -136,7 +139,7 @@ class BaseTransportService(): if self._speaker_enabled: self._receive_audio_thread = threading.Thread(target=self._receive_audio, daemon=True) self._receive_audio_thread.start() - + if self._vad_enabled: self._vad_thread = threading.Thread(target=self._vad, daemon=True) self._vad_thread.start() @@ -163,10 +166,58 @@ class BaseTransportService(): if self._speaker_enabled: self._receive_audio_thread.join() - + if self._vad_enabled: self._vad_thread.join() - + + async def run_uninterruptible_pipeline(self, pipeline:Pipeline): + pipeline.set_sink(self.send_queue) + pipeline.set_source(self.receive_queue) + await pipeline.run_pipeline() + + async def run_interruptible_pipeline(self, pipeline:Pipeline, allow_interruptions=True, pre_processor=None, post_processor=None): + pipeline.set_sink(self.send_queue) + source_queue = asyncio.Queue() + pipeline.set_source(source_queue) + pipeline.set_sink(self.send_queue) + pipeline_task = asyncio.create_task(pipeline.run_pipeline()) + + async def yield_frame(frame:QueueFrame) -> AsyncGenerator[QueueFrame, None]: + yield frame + + async def post_process(post_processor): + if not post_processor: + return + + while True: + frame = await self.completed_queue.get() + print("post-processing frame: ", frame.__class__.__name__) + await post_processor.process_frame(frame) + + if isinstance(frame, EndStreamQueueFrame): + break + + post_process_task = asyncio.create_task(post_process(post_processor)) + + async for frame in self.get_receive_frames(): + print("Got frame: ", frame.__class__.__name__) + if isinstance(frame, UserStartedSpeakingFrame): + pipeline_task.cancel() + self.interrupt() + pipeline_task = asyncio.create_task(pipeline.run_pipeline()) + + if pre_processor: + frame_generator = pre_processor.process_frame(frame) + else: + frame_generator = yield_frame(frame) + + async for frame in frame_generator: + await source_queue.put(frame) + + if isinstance(frame, EndStreamQueueFrame): + break + + await asyncio.gather(pipeline_task, post_process_task) def _post_run(self): # Note that this function must be idempotent! It can be called multiple times @@ -199,7 +250,7 @@ class BaseTransportService(): @abstractmethod def _prerun(self): pass - + def _vad(self): # CB: Starting silero VAD stuff # TODO-CB: Probably need to force virtual speaker creation if we're @@ -212,7 +263,7 @@ class BaseTransportService(): new_confidence = model( torch.from_numpy(audio_float32), 16000).item() speaking = new_confidence > 0.5 - + if speaking: match self._vad_state: case VADState.QUIET: @@ -233,7 +284,7 @@ class BaseTransportService(): self._vad_stopping_count = 1 case VADState.STOPPING: self._vad_stopping_count += 1 - + if self._vad_state == VADState.STARTING and self._vad_starting_count >= self._vad_start_frames: asyncio.run_coroutine_threadsafe( self.receive_queue.put( @@ -249,7 +300,7 @@ class BaseTransportService(): ) self._vad_state = VADState.QUIET self._vad_stopping_count = 0 - + async def _marshal_frames(self): while True: frame: QueueFrame | list = await self.send_queue.get() @@ -326,6 +377,10 @@ class BaseTransportService(): if isinstance(frame, EndStreamQueueFrame): self._logger.info("Stopping frame consumer thread") self._threadsafe_send_queue.task_done() + if self._loop: + asyncio.run_coroutine_threadsafe( + self.completed_queue.put(frame), self._loop + ) return # if interrupted, we just pull frames off the queue and discard them @@ -350,6 +405,11 @@ class BaseTransportService(): elif len(b): self.write_frame_to_mic(bytes(b)) b = bytearray() + + if self._loop: + asyncio.run_coroutine_threadsafe( + self.completed_queue.put(frame), self._loop + ) else: # if there are leftover audio bytes, write them now; failing to do so # can cause static in the audio stream. diff --git a/src/dailyai/services/daily_transport_service.py b/src/dailyai/services/daily_transport_service.py index 2b8416336..c2a47c2dc 100644 --- a/src/dailyai/services/daily_transport_service.py +++ b/src/dailyai/services/daily_transport_service.py @@ -7,7 +7,7 @@ import types from functools import partial -from dailyai.queue_frame import ( +from dailyai.pipeline.frames import ( TranscriptionQueueFrame, ) diff --git a/src/dailyai/services/local_stt_service.py b/src/dailyai/services/local_stt_service.py index 21359ea23..30471af8d 100644 --- a/src/dailyai/services/local_stt_service.py +++ b/src/dailyai/services/local_stt_service.py @@ -4,7 +4,7 @@ import math import time from typing import AsyncGenerator import wave -from dailyai.queue_frame import AudioQueueFrame, QueueFrame, TranscriptionQueueFrame +from dailyai.pipeline.frames import AudioQueueFrame, QueueFrame, TranscriptionQueueFrame from dailyai.services.ai_services import STTService diff --git a/src/dailyai/tests/test_aggregators.py b/src/dailyai/tests/test_aggregators.py new file mode 100644 index 000000000..5a2cfb07d --- /dev/null +++ b/src/dailyai/tests/test_aggregators.py @@ -0,0 +1,120 @@ +import asyncio +import functools +import unittest + +from dailyai.pipeline.aggregators import ( + GatedAggregator, + ParallelPipeline, + SentenceAggregator, + StatelessTextTransformer, +) +from dailyai.pipeline.frames import ( + AudioQueueFrame, + EndStreamQueueFrame, + ImageQueueFrame, + LLMResponseEndQueueFrame, + LLMResponseStartQueueFrame, + QueueFrame, + TextQueueFrame, +) + +from dailyai.pipeline.pipeline import Pipeline + + +class TestDailyFrameAggregators(unittest.IsolatedAsyncioTestCase): + async def test_sentence_aggregator(self): + sentence = "Hello, world. How are you? I am fine" + expected_sentences = ["Hello, world.", " How are you?", " I am fine "] + aggregator = SentenceAggregator() + for word in sentence.split(" "): + async for sentence in aggregator.process_frame(TextQueueFrame(word + " ")): + self.assertIsInstance(sentence, TextQueueFrame) + if isinstance(sentence, TextQueueFrame): + self.assertEqual(sentence.text, expected_sentences.pop(0)) + + async for sentence in aggregator.process_frame(EndStreamQueueFrame()): + if len(expected_sentences): + self.assertIsInstance(sentence, TextQueueFrame) + if isinstance(sentence, TextQueueFrame): + self.assertEqual(sentence.text, expected_sentences.pop(0)) + else: + self.assertIsInstance(sentence, EndStreamQueueFrame) + + self.assertEqual(expected_sentences, []) + + async def test_gated_accumulator(self): + gated_aggregator = GatedAggregator( + gate_open_fn=lambda frame: isinstance(frame, ImageQueueFrame), + gate_close_fn=lambda frame: isinstance(frame, LLMResponseStartQueueFrame), + start_open=False, + ) + + frames = [ + LLMResponseStartQueueFrame(), + TextQueueFrame("Hello, "), + TextQueueFrame("world."), + AudioQueueFrame(b"hello"), + ImageQueueFrame("image", b"image"), + AudioQueueFrame(b"world"), + LLMResponseEndQueueFrame(), + ] + + expected_output_frames = [ + ImageQueueFrame("image", b"image"), + LLMResponseStartQueueFrame(), + TextQueueFrame("Hello, "), + TextQueueFrame("world."), + AudioQueueFrame(b"hello"), + AudioQueueFrame(b"world"), + LLMResponseEndQueueFrame(), + ] + for frame in frames: + async for out_frame in gated_aggregator.process_frame(frame): + self.assertEqual(out_frame, expected_output_frames.pop(0)) + self.assertEqual(expected_output_frames, []) + + async def test_parallel_pipeline(self): + + async def slow_add(sleep_time:float, name:str, x: str): + await asyncio.sleep(sleep_time) + return ":".join([x, name]) + + pipe1_annotation = StatelessTextTransformer(functools.partial(slow_add, 0.1, 'pipe1')) + pipe2_annotation = StatelessTextTransformer(functools.partial(slow_add, 0.2, 'pipe2')) + sentence_aggregator = SentenceAggregator() + add_dots = StatelessTextTransformer(lambda x: x + ".") + + source = asyncio.Queue() + sink = asyncio.Queue() + pipeline = Pipeline( + [ + ParallelPipeline( + [[pipe1_annotation], [sentence_aggregator, pipe2_annotation]] + ), + add_dots, + ], + source, + sink, + ) + + frames = [ + TextQueueFrame("Hello, "), + TextQueueFrame("world."), + EndStreamQueueFrame() + ] + + expected_output_frames: list[QueueFrame] = [ + TextQueueFrame(text='Hello, :pipe1.'), + TextQueueFrame(text='world.:pipe1.'), + TextQueueFrame(text='Hello, world.:pipe2.'), + EndStreamQueueFrame() + ] + + for frame in frames: + await source.put(frame) + + await pipeline.run_pipeline() + + while not sink.empty(): + frame = await sink.get() + self.assertEqual(frame, expected_output_frames.pop(0)) diff --git a/src/dailyai/tests/test_ai_services.py b/src/dailyai/tests/test_ai_services.py index 88a1b50a0..007616eda 100644 --- a/src/dailyai/tests/test_ai_services.py +++ b/src/dailyai/tests/test_ai_services.py @@ -3,7 +3,7 @@ import unittest from typing import AsyncGenerator, Generator from dailyai.services.ai_services import AIService -from dailyai.queue_frame import EndStreamQueueFrame, QueueFrame, TextQueueFrame +from dailyai.pipeline.frames import EndStreamQueueFrame, QueueFrame, TextQueueFrame class SimpleAIService(AIService): diff --git a/src/dailyai/tests/test_daily_transport_service.py b/src/dailyai/tests/test_daily_transport_service.py index 469b1bb1b..8914acb6f 100644 --- a/src/dailyai/tests/test_daily_transport_service.py +++ b/src/dailyai/tests/test_daily_transport_service.py @@ -3,7 +3,7 @@ import unittest from unittest.mock import MagicMock, patch -from dailyai.queue_frame import AudioQueueFrame, ImageQueueFrame +from dailyai.pipeline.frames import AudioQueueFrame, ImageQueueFrame class TestDailyTransport(unittest.IsolatedAsyncioTestCase): @@ -42,6 +42,7 @@ class TestDailyTransport(unittest.IsolatedAsyncioTestCase): await asyncio.wait_for(event.wait(), timeout=1) self.assertTrue(event.is_set()) + """ @patch("dailyai.services.daily_transport_service.CallClient") @patch("dailyai.services.daily_transport_service.Daily") async def test_run_with_camera_and_mic(self, daily_mock, callclient_mock): @@ -79,3 +80,4 @@ class TestDailyTransport(unittest.IsolatedAsyncioTestCase): camera.write_frame.assert_called_with(b"test") mic.write_frames.assert_called() + """ diff --git a/src/dailyai/tests/test_pipeline.py b/src/dailyai/tests/test_pipeline.py new file mode 100644 index 000000000..2e1d4289c --- /dev/null +++ b/src/dailyai/tests/test_pipeline.py @@ -0,0 +1,60 @@ +import asyncio +from doctest import OutputChecker +import unittest +from dailyai.pipeline.aggregators import SentenceAggregator, StatelessTextTransformer +from dailyai.pipeline.frames import EndStreamQueueFrame, TextQueueFrame + +from dailyai.pipeline.pipeline import Pipeline + + +class TestDailyPipeline(unittest.IsolatedAsyncioTestCase): + + async def test_pipeline_simple(self): + aggregator = SentenceAggregator() + + outgoing_queue = asyncio.Queue() + incoming_queue = asyncio.Queue() + pipeline = Pipeline([aggregator], incoming_queue, outgoing_queue) + + await incoming_queue.put(TextQueueFrame("Hello, ")) + await incoming_queue.put(TextQueueFrame("world.")) + await incoming_queue.put(EndStreamQueueFrame()) + + await pipeline.run_pipeline() + + self.assertEqual(await outgoing_queue.get(), TextQueueFrame("Hello, world.")) + self.assertIsInstance(await outgoing_queue.get(), EndStreamQueueFrame) + + async def test_pipeline_multiple_stages(self): + sentence_aggregator = SentenceAggregator() + to_upper = StatelessTextTransformer(lambda x: x.upper()) + add_space = StatelessTextTransformer(lambda x: x + " ") + + outgoing_queue = asyncio.Queue() + incoming_queue = asyncio.Queue() + pipeline = Pipeline( + [add_space, sentence_aggregator, to_upper], + incoming_queue, + outgoing_queue + ) + + sentence = "Hello, world. It's me, a pipeline." + for c in sentence: + await incoming_queue.put(TextQueueFrame(c)) + await incoming_queue.put(EndStreamQueueFrame()) + + await pipeline.run_pipeline() + + self.assertEqual( + await outgoing_queue.get(), TextQueueFrame("H E L L O , W O R L D .") + ) + self.assertEqual( + await outgoing_queue.get(), + TextQueueFrame(" I T ' S M E , A P I P E L I N E ."), + ) + # leftover little bit because of the spacing + self.assertEqual( + await outgoing_queue.get(), + TextQueueFrame(" "), + ) + self.assertIsInstance(await outgoing_queue.get(), EndStreamQueueFrame) diff --git a/src/examples/foundational/02-llm-say-one-thing.py b/src/examples/foundational/02-llm-say-one-thing.py index b15023380..a97b63fe6 100644 --- a/src/examples/foundational/02-llm-say-one-thing.py +++ b/src/examples/foundational/02-llm-say-one-thing.py @@ -3,7 +3,7 @@ import os import aiohttp -from dailyai.queue_frame import LLMMessagesQueueFrame +from dailyai.pipeline.frames import LLMMessagesQueueFrame from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService diff --git a/src/examples/foundational/03-still-frame.py b/src/examples/foundational/03-still-frame.py index de368d5d6..8fefd5cba 100644 --- a/src/examples/foundational/03-still-frame.py +++ b/src/examples/foundational/03-still-frame.py @@ -2,7 +2,7 @@ import asyncio import aiohttp import os -from dailyai.queue_frame import TextQueueFrame +from dailyai.pipeline.frames import TextQueueFrame from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.fal_ai_services import FalImageGenService from dailyai.services.open_ai_services import OpenAIImageGenService diff --git a/src/examples/foundational/03a-image-local.py b/src/examples/foundational/03a-image-local.py index 40cb2b245..0904a060e 100644 --- a/src/examples/foundational/03a-image-local.py +++ b/src/examples/foundational/03a-image-local.py @@ -4,7 +4,7 @@ import os import tkinter as tk -from dailyai.queue_frame import TextQueueFrame +from dailyai.pipeline.frames import TextQueueFrame from dailyai.services.fal_ai_services import FalImageGenService from dailyai.services.local_transport_service import LocalTransportService diff --git a/src/examples/foundational/04-utterance-and-speech.py b/src/examples/foundational/04-utterance-and-speech.py index 17bd32797..57ba3dc91 100644 --- a/src/examples/foundational/04-utterance-and-speech.py +++ b/src/examples/foundational/04-utterance-and-speech.py @@ -2,10 +2,11 @@ import asyncio import os import aiohttp +from dailyai.pipeline.pipeline import Pipeline from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService -from dailyai.queue_frame import EndStreamQueueFrame, LLMMessagesQueueFrame +from dailyai.pipeline.frames import EndStreamQueueFrame, LLMMessagesQueueFrame from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from examples.foundational.support.runner import configure @@ -41,13 +42,10 @@ async def main(room_url: str): # will run in parallel with generating and speaking the audio for static text, so there's no delay to # speak the LLM response. buffer_queue = asyncio.Queue() - llm_response_task = asyncio.create_task( - elevenlabs_tts.run_to_queue( - buffer_queue, - llm.run([LLMMessagesQueueFrame(messages)]), - True, - ) - ) + source_queue = asyncio.Queue() + pipeline = Pipeline(source = source_queue, sink=buffer_queue, processors=[llm, elevenlabs_tts]) + source_queue.put_nowait(LLMMessagesQueueFrame(messages)) + pipeline_run_task = pipeline.run_pipeline() @transport.event_handler("on_first_other_participant_joined") async def on_first_other_participant_joined(transport): @@ -61,7 +59,7 @@ async def main(room_url: str): if isinstance(frame, EndStreamQueueFrame): break - await asyncio.gather(llm_response_task, buffer_to_send_queue()) + await asyncio.gather(pipeline_run_task, buffer_to_send_queue()) await transport.stop_when_done() diff --git a/src/examples/foundational/05-sync-speech-and-image.py b/src/examples/foundational/05-sync-speech-and-image.py index dfef2bc15..0ab448275 100644 --- a/src/examples/foundational/05-sync-speech-and-image.py +++ b/src/examples/foundational/05-sync-speech-and-image.py @@ -1,8 +1,11 @@ import asyncio +from re import S import aiohttp import os +from dailyai.pipeline.aggregators import GatedAggregator, LLMFullResponseAggregator, ParallelPipeline, SentenceAggregator -from dailyai.queue_frame import AudioQueueFrame, ImageQueueFrame +from dailyai.pipeline.frames import AudioQueueFrame, EndStreamQueueFrame, ImageQueueFrame, LLMMessagesQueueFrame, LLMResponseStartQueueFrame +from dailyai.pipeline.pipeline import Pipeline from dailyai.services.azure_ai_services import AzureLLMService, AzureImageGenServiceREST, AzureTTSService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from dailyai.services.daily_transport_service import DailyTransportService @@ -35,98 +38,54 @@ async def main(room_url): aiohttp_session=session, api_key=os.getenv("ELEVENLABS_API_KEY"), voice_id="ErXwobaYiN019PkySvjV") - # tts = AzureTTSService(api_key=os.getenv("AZURE_SPEECH_API_KEY"), region=os.getenv("AZURE_SPEECH_REGION")) dalle = FalImageGenService( image_size="1024x1024", aiohttp_session=session, key_id=os.getenv("FAL_KEY_ID"), key_secret=os.getenv("FAL_KEY_SECRET")) - # dalle = OpenAIImageGenService(aiohttp_session=session, api_key=os.getenv("OPENAI_DALLE_API_KEY"), image_size="1024x1024") - # dalle = AzureImageGenServiceREST(image_size="1024x1024", aiohttp_session=session, api_key=os.getenv("AZURE_DALLE_API_KEY"), endpoint=os.getenv("AZURE_DALLE_ENDPOINT"), model=os.getenv("AZURE_DALLE_MODEL")) - # Get a complete audio chunk from the given text. Splitting this into its own - # coroutine lets us ensure proper ordering of the audio chunks on the send queue. - async def get_all_audio(text): - all_audio = bytearray() - async for audio in tts.run_tts(text): - all_audio.extend(audio) + source_queue = asyncio.Queue() - return all_audio - - async def get_month_data(month): + for month in ["January", "February"]: messages = [ { "role": "system", "content": f"Describe a nature photograph suitable for use in a calendar, for the month of {month}. Include only the image description with no preamble. Limit the description to one sentence, please.", } ] + await source_queue.put(LLMMessagesQueueFrame(messages)) - image_description = await llm.run_llm(messages) - if not image_description: - return + await source_queue.put(EndStreamQueueFrame()) - to_speak = f"{month}: {image_description}" - audio_task = asyncio.create_task(get_all_audio(to_speak)) - image_task = asyncio.create_task(dalle.run_image_gen(image_description)) - print(f"about to gather tasks for {month}") - (audio, image_data) = await asyncio.gather( - audio_task, image_task - ) - print(f"about to return from get_month_data for {month}") - return { - "month": month, - "text": image_description, - "image_url": image_data[0], - "image": image_data[1], - "audio": audio, - } + gated_aggregator = GatedAggregator( + gate_open_fn=lambda frame: isinstance(frame, ImageQueueFrame), + gate_close_fn=lambda frame: isinstance(frame, LLMResponseStartQueueFrame), + start_open=False, + ) + + sentence_aggregator = SentenceAggregator() + llm_full_response_aggregator = LLMFullResponseAggregator() + + pipeline = Pipeline( + source=source_queue, + sink=transport.send_queue, + processors=[ + llm, + sentence_aggregator, + ParallelPipeline([[tts], [llm_full_response_aggregator, dalle]]), + gated_aggregator, + ], + ) + pipeline_task = pipeline.run_pipeline() - months: list[str] = [ - "January", - "February", - "March", - "April", - "May", - "June" - ] - """ - "February", - "March", - "April", - "May", - "June", - "July", - "August", - "September", - "October", - "November", - "December", - """ @transport.event_handler("on_first_other_participant_joined") async def on_first_other_participant_joined(transport): - # This will play the months in the order they're completed. The benefit - # is we'll have as little delay as possible before the first month, and - # likely no delay between months, but the months won't display in order. - for month_data_task in asyncio.as_completed(month_tasks): - print(f"month_data_task: {month_data_task}") - try: - data = await month_data_task - except Exception: - print("OMG EXCEPTION!!!!") - if data: - await transport.send_queue.put( - [ - ImageQueueFrame(data["image_url"], data["image"]), - AudioQueueFrame(data["audio"]), - ] - ) + await pipeline_task # wait for the output queue to be empty, then leave the meeting await transport.stop_when_done() - month_tasks = [asyncio.create_task(get_month_data(month)) for month in months] - await transport.run() if __name__ == "__main__": diff --git a/src/examples/foundational/05a-local-sync-speech-and-text.py b/src/examples/foundational/05a-local-sync-speech-and-text.py index fb9f419bb..6a61dbb60 100644 --- a/src/examples/foundational/05a-local-sync-speech-and-text.py +++ b/src/examples/foundational/05a-local-sync-speech-and-text.py @@ -4,7 +4,7 @@ import asyncio import tkinter as tk import os -from dailyai.queue_frame import AudioQueueFrame, ImageQueueFrame +from dailyai.pipeline.frames import AudioQueueFrame, ImageQueueFrame from dailyai.services.azure_ai_services import AzureLLMService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from dailyai.services.fal_ai_services import FalImageGenService diff --git a/src/examples/foundational/06-listen-and-respond.py b/src/examples/foundational/06-listen-and-respond.py index 7cceb607d..c70bc2acb 100644 --- a/src/examples/foundational/06-listen-and-respond.py +++ b/src/examples/foundational/06-listen-and-respond.py @@ -1,11 +1,12 @@ import asyncio import os +from dailyai.pipeline.pipeline import Pipeline from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService from dailyai.services.ai_services import FrameLogger -from dailyai.queue_aggregators import LLMAssistantContextAggregator, LLMContextAggregator, LLMUserContextAggregator -from support.runner import configure +from dailyai.pipeline.aggregators import LLMAssistantContextAggregator, LLMUserContextAggregator +from examples.foundational.support.runner import configure async def main(room_url: str, token): @@ -44,22 +45,18 @@ async def main(room_url: str, token): tma_in = LLMUserContextAggregator(messages, transport._my_participant_id) tma_out = LLMAssistantContextAggregator(messages, transport._my_participant_id) - await tts.run_to_queue( - transport.send_queue, - tma_out.run( - fl2.run( - llm.run( - tma_in.run( - fl.run( - transport.get_receive_frames() - ) - ) - ) - ) + pipeline = Pipeline( + processors=[ + fl, + tma_in, + llm, + fl2, + tma_out, + tts + ], + ) + await transport.run_pipeline(pipeline) - ) - ) - transport.transcription_settings["extra"]["endpointing"] = True transport.transcription_settings["extra"]["punctuate"] = True await asyncio.gather(transport.run(), handle_transcriptions()) diff --git a/src/examples/foundational/06a-image-sync.py b/src/examples/foundational/06a-image-sync.py index 032d828c1..a43e72a3e 100644 --- a/src/examples/foundational/06a-image-sync.py +++ b/src/examples/foundational/06a-image-sync.py @@ -8,12 +8,12 @@ import time import urllib.parse from PIL import Image -from dailyai.queue_frame import ImageQueueFrame, QueueFrame +from dailyai.pipeline.frames import ImageQueueFrame, QueueFrame from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService from dailyai.services.ai_services import AIService -from dailyai.queue_aggregators import LLMAssistantContextAggregator, LLMUserContextAggregator +from dailyai.pipeline.aggregators import LLMAssistantContextAggregator, LLMUserContextAggregator from dailyai.services.fal_ai_services import FalImageGenService from examples.foundational.support.runner import configure diff --git a/src/examples/foundational/07-interruptible.py b/src/examples/foundational/07-interruptible.py index 6cae19f5c..6698230fa 100644 --- a/src/examples/foundational/07-interruptible.py +++ b/src/examples/foundational/07-interruptible.py @@ -1,12 +1,12 @@ import asyncio import aiohttp import os -from dailyai.conversation_wrappers import InterruptibleConversationWrapper +from dailyai.pipeline.aggregators import LLMAssistantContextAggregator, LLMUserContextAggregator -from dailyai.queue_frame import StartStreamQueueFrame, TextQueueFrame +from dailyai.pipeline.pipeline import Pipeline +from dailyai.services.ai_services import FrameLogger from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService -from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from examples.foundational.support.runner import configure @@ -32,17 +32,7 @@ async def main(room_url: str, token): api_key=os.getenv("AZURE_SPEECH_API_KEY"), region=os.getenv("AZURE_SPEECH_REGION")) - async def run_response(user_speech, tma_in, tma_out): - await tts.run_to_queue( - transport.send_queue, - tma_out.run( - llm.run( - tma_in.run( - [StartStreamQueueFrame(), TextQueueFrame(user_speech)] - ) - ) - ), - ) + pipeline = Pipeline([FrameLogger(), llm, FrameLogger(), tts]) @transport.event_handler("on_first_other_participant_joined") async def on_first_other_participant_joined(transport): @@ -53,14 +43,15 @@ async def main(room_url: str, token): {"role": "system", "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way."}, ] - conversation_wrapper = InterruptibleConversationWrapper( - frame_generator=transport.get_receive_frames, - runner=run_response, - interrupt=transport.interrupt, - my_participant_id=transport._my_participant_id, - llm_messages=messages, + await transport.run_interruptible_pipeline( + pipeline, + post_processor=LLMAssistantContextAggregator( + messages, transport._my_participant_id + ), + pre_processor=LLMUserContextAggregator( + messages, transport._my_participant_id, complete_sentences=False + ), ) - await conversation_wrapper.run_conversation() transport.transcription_settings["extra"]["punctuate"] = False await asyncio.gather(transport.run(), run_conversation()) diff --git a/src/examples/foundational/08-bots-arguing.py b/src/examples/foundational/08-bots-arguing.py index a5329086e..49e15bd79 100644 --- a/src/examples/foundational/08-bots-arguing.py +++ b/src/examples/foundational/08-bots-arguing.py @@ -6,7 +6,7 @@ from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from dailyai.services.fal_ai_services import FalImageGenService -from dailyai.queue_frame import AudioQueueFrame, ImageQueueFrame +from dailyai.pipeline.frames import AudioQueueFrame, ImageQueueFrame from examples.foundational.support.runner import configure diff --git a/src/examples/foundational/10-wake-word.py b/src/examples/foundational/10-wake-word.py index 54db2468c..00331795d 100644 --- a/src/examples/foundational/10-wake-word.py +++ b/src/examples/foundational/10-wake-word.py @@ -9,8 +9,8 @@ from PIL import Image from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.azure_ai_services import AzureLLMService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService -from dailyai.queue_aggregators import LLMUserContextAggregator, LLMAssistantContextAggregator -from dailyai.queue_frame import ( +from dailyai.pipeline.aggregators import LLMUserContextAggregator, LLMAssistantContextAggregator +from dailyai.pipeline.frames import ( QueueFrame, TextQueueFrame, ImageQueueFrame, diff --git a/src/examples/foundational/11-sound-effects.py b/src/examples/foundational/11-sound-effects.py index 954f363d7..a75e5acd6 100644 --- a/src/examples/foundational/11-sound-effects.py +++ b/src/examples/foundational/11-sound-effects.py @@ -7,9 +7,9 @@ import wave from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService -from dailyai.queue_aggregators import LLMContextAggregator, LLMUserContextAggregator, LLMAssistantContextAggregator +from dailyai.pipeline.aggregators import LLMContextAggregator, LLMUserContextAggregator, LLMAssistantContextAggregator from dailyai.services.ai_services import AIService, FrameLogger -from dailyai.queue_frame import QueueFrame, AudioQueueFrame, LLMResponseEndQueueFrame, LLMMessagesQueueFrame +from dailyai.pipeline.frames import QueueFrame, AudioQueueFrame, LLMResponseEndQueueFrame, LLMMessagesQueueFrame from typing import AsyncGenerator from examples.foundational.support.runner import configure diff --git a/src/examples/foundational/13a-whisper-local.py b/src/examples/foundational/13a-whisper-local.py index 6c764c1a9..4971423c7 100644 --- a/src/examples/foundational/13a-whisper-local.py +++ b/src/examples/foundational/13a-whisper-local.py @@ -1,7 +1,7 @@ import argparse import asyncio import wave -from dailyai.queue_frame import EndStreamQueueFrame, TranscriptionQueueFrame +from dailyai.pipeline.frames import EndStreamQueueFrame, TranscriptionQueueFrame from dailyai.services.local_transport_service import LocalTransportService from dailyai.services.whisper_ai_services import WhisperSTTService diff --git a/src/examples/image-gen.py b/src/examples/image-gen.py index 8b5ca5566..4819e3b15 100644 --- a/src/examples/image-gen.py +++ b/src/examples/image-gen.py @@ -7,7 +7,7 @@ import random from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService -from dailyai.queue_frame import QueueFrame, FrameType +from dailyai.pipeline.frames import QueueFrame, FrameType from dailyai.services.fal_ai_services import FalImageGenService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService diff --git a/src/examples/internal/11a-dial-out.py b/src/examples/internal/11a-dial-out.py index b16c72ed1..c169d1f1e 100644 --- a/src/examples/internal/11a-dial-out.py +++ b/src/examples/internal/11a-dial-out.py @@ -5,9 +5,9 @@ import wave from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService -from dailyai.queue_aggregators import LLMContextAggregator +from dailyai.pipeline.aggregators import LLMContextAggregator from dailyai.services.ai_services import AIService, FrameLogger -from dailyai.queue_frame import QueueFrame, AudioQueueFrame, LLMResponseEndQueueFrame, LLMMessagesQueueFrame +from dailyai.pipeline.frames import QueueFrame, AudioQueueFrame, LLMResponseEndQueueFrame, LLMMessagesQueueFrame from typing import AsyncGenerator from examples.foundational.support.runner import configure