Merge pull request #86 from daily-co/transport-refactor

Starting refactor of transports into their own directory
This commit is contained in:
Moishe Lettvin
2024-03-28 11:17:32 -04:00
committed by GitHub
31 changed files with 106 additions and 80 deletions

View File

@@ -5,7 +5,7 @@ import os
from dailyai.pipeline.frames import EndFrame, TextFrame
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from runner import configure
@@ -20,7 +20,7 @@ logger.setLevel(logging.DEBUG)
async def main(room_url):
async with aiohttp.ClientSession() as session:
transport = DailyTransportService(
transport = DailyTransport(
room_url,
None,
"Say One Thing",

View File

@@ -4,7 +4,7 @@ import logging
import os
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.local_transport_service import LocalTransportService
from dailyai.transports.local_transport import LocalTransport
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
logger = logging.getLogger("dailyai")
@@ -14,7 +14,7 @@ logger.setLevel(logging.DEBUG)
async def main():
async with aiohttp.ClientSession() as session:
meeting_duration_minutes = 1
transport = LocalTransportService(
transport = LocalTransport(
duration_minutes=meeting_duration_minutes, mic_enabled=True
)
tts = ElevenLabsTTSService(

View File

@@ -6,7 +6,7 @@ import aiohttp
from dailyai.pipeline.frames import EndFrame, LLMMessagesQueueFrame
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.open_ai_services import OpenAILLMService
@@ -22,7 +22,7 @@ logger.setLevel(logging.DEBUG)
async def main(room_url):
async with aiohttp.ClientSession() as session:
transport = DailyTransportService(
transport = DailyTransport(
room_url,
None,
"Say One Thing From an LLM",

View File

@@ -5,7 +5,7 @@ import os
from dailyai.pipeline.frames import TextFrame
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.fal_ai_services import FalImageGenService
from runner import configure
@@ -20,7 +20,7 @@ logger.setLevel(logging.DEBUG)
async def main(room_url):
async with aiohttp.ClientSession() as session:
transport = DailyTransportService(
transport = DailyTransport(
room_url,
None,
"Show a still frame image",

View File

@@ -7,7 +7,7 @@ import tkinter as tk
from dailyai.pipeline.frames import TextFrame
from dailyai.services.fal_ai_services import FalImageGenService
from dailyai.services.local_transport_service import LocalTransportService
from dailyai.transports.local_transport import LocalTransport
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
logger = logging.getLogger("dailyai")
@@ -22,7 +22,7 @@ async def main():
meeting_duration_minutes = 2
tk_root = tk.Tk()
tk_root.title("Calendar")
transport = LocalTransportService(
transport = LocalTransport(
tk_root=tk_root,
mic_enabled=True,
camera_enabled=True,

View File

@@ -6,7 +6,7 @@ import aiohttp
from dailyai.pipeline.merge_pipeline import SequentialMergePipeline
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.services.deepgram_ai_services import DeepgramTTSService
from dailyai.pipeline.frames import EndPipeFrame, LLMMessagesQueueFrame, TextFrame
@@ -24,7 +24,7 @@ logger.setLevel(logging.DEBUG)
async def main(room_url: str):
async with aiohttp.ClientSession() as session:
transport = DailyTransportService(
transport = DailyTransport(
room_url,
None,
"Static And Dynamic Speech",

View File

@@ -23,7 +23,7 @@ from dailyai.pipeline.frames import (
from dailyai.pipeline.frame_processor import FrameProcessor
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.open_ai_services import OpenAILLMService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.fal_ai_services import FalImageGenService
@@ -63,7 +63,7 @@ class MonthPrepender(FrameProcessor):
async def main(room_url):
async with aiohttp.ClientSession() as session:
transport = DailyTransportService(
transport = DailyTransport(
room_url,
None,
"Month Narration Bot",

View File

@@ -9,7 +9,7 @@ from dailyai.pipeline.frames import AudioFrame, ImageFrame
from dailyai.services.open_ai_services import OpenAILLMService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.fal_ai_services import FalImageGenService
from dailyai.services.local_transport_service import LocalTransportService
from dailyai.transports.local_transport import LocalTransport
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
logger = logging.getLogger("dailyai")
@@ -22,7 +22,7 @@ async def main(room_url):
tk_root = tk.Tk()
tk_root.title("Calendar")
transport = LocalTransportService(
transport = LocalTransport(
mic_enabled=True,
camera_enabled=True,
camera_width=1024,

View File

@@ -5,7 +5,7 @@ import os
from dailyai.pipeline.frames import LLMMessagesQueueFrame
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.open_ai_services import OpenAILLMService
from dailyai.services.ai_services import FrameLogger
@@ -25,7 +25,7 @@ logger.setLevel(logging.DEBUG)
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransportService(
transport = DailyTransport(
room_url,
token,
"Respond bot",

View File

@@ -6,7 +6,7 @@ import aiohttp
from PIL import Image
from dailyai.pipeline.frames import ImageFrame, Frame
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.ai_services import AIService
from dailyai.pipeline.aggregators import (
LLMAssistantContextAggregator,
@@ -42,7 +42,7 @@ class ImageSyncAggregator(AIService):
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransportService(
transport = DailyTransport(
room_url,
token,
"Respond bot",

View File

@@ -9,7 +9,7 @@ from dailyai.pipeline.aggregators import (
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.ai_services import FrameLogger
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.open_ai_services import OpenAILLMService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
@@ -25,7 +25,7 @@ logger.setLevel(logging.DEBUG)
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransportService(
transport = DailyTransport(
room_url,
token,
"Respond bot",

View File

@@ -6,7 +6,7 @@ import os
from dailyai.pipeline.aggregators import SentenceAggregator
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.fal_ai_services import FalImageGenService
@@ -24,7 +24,7 @@ logger.setLevel(logging.DEBUG)
async def main(room_url: str):
async with aiohttp.ClientSession() as session:
transport = DailyTransportService(
transport = DailyTransport(
room_url,
None,
"Respond bot",

View File

@@ -6,7 +6,7 @@ import random
from typing import AsyncGenerator
from PIL import Image
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.open_ai_services import OpenAILLMService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.pipeline.aggregators import (
@@ -116,7 +116,7 @@ class ImageSyncAggregator(AIService):
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransportService(
transport = DailyTransport(
room_url,
token,
"Santa Cat",

View File

@@ -4,7 +4,7 @@ import logging
import os
import wave
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.open_ai_services import OpenAILLMService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.pipeline.aggregators import (
@@ -72,7 +72,7 @@ class InboundSoundEffectWrapper(AIService):
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransportService(
transport = DailyTransport(
room_url,
token,
"Respond bot",

View File

@@ -1,7 +1,7 @@
import asyncio
import logging
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.whisper_ai_services import WhisperSTTService
from runner import configure
@@ -15,7 +15,7 @@ logger.setLevel(logging.DEBUG)
async def main(room_url: str):
transport = DailyTransportService(
transport = DailyTransport(
room_url,
None,
"Transcription bot",

View File

@@ -3,7 +3,7 @@ import asyncio
import logging
from dailyai.pipeline.frames import EndFrame, TranscriptionQueueFrame
from dailyai.services.local_transport_service import LocalTransportService
from dailyai.transports.local_transport import LocalTransport
from dailyai.services.whisper_ai_services import WhisperSTTService
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
@@ -16,7 +16,7 @@ async def main(room_url: str):
global stt
meeting_duration_minutes = 1
transport = LocalTransportService(
transport = LocalTransport(
mic_enabled=True,
camera_enabled=False,
speaker_enabled=True,

View File

@@ -6,7 +6,7 @@ from dailyai.pipeline.frame_processor import FrameProcessor
from dailyai.pipeline.frames import TextFrame, TranscriptionQueueFrame
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.websocket_transport_service import WebsocketTransport
from dailyai.transports.websocket_transport import WebsocketTransport
from dailyai.services.whisper_ai_services import WhisperSTTService
logging.basicConfig(format="%(levelno)s %(asctime)s %(message)s")

View File

@@ -5,7 +5,7 @@ import time
import urllib.parse
import random
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.pipeline.frames import Frame, FrameType
from dailyai.services.fal_ai_services import FalImageGenService
@@ -17,7 +17,7 @@ async def main(room_url: str, token):
global llm
global tts
transport = DailyTransportService(
transport = DailyTransport(
room_url,
token,
"Imagebot",

View File

@@ -3,7 +3,7 @@ import asyncio
import os
import wave
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.pipeline.aggregators import LLMContextAggregator
from dailyai.services.ai_services import AIService, FrameLogger
@@ -66,7 +66,7 @@ async def main(room_url: str, token, phone):
global llm
global tts
transport = DailyTransportService(
transport = DailyTransport(
room_url,
token,
"Respond bot",

View File

@@ -20,7 +20,7 @@ from dailyai.pipeline.frames import (
)
from dailyai.services.ai_services import AIService
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.open_ai_services import OpenAILLMService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
@@ -92,7 +92,7 @@ class AnimationInitializer(AIService):
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransportService(
transport = DailyTransport(
room_url,
token,
"Chatbot",

View File

@@ -13,7 +13,7 @@ from dailyai.pipeline.opeanai_llm_aggregator import (
)
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.openai_llm_context import OpenAILLMContext
from dailyai.services.open_ai_services import OpenAILLMService
# from dailyai.services.deepgram_ai_services import DeepgramTTSService
@@ -292,7 +292,7 @@ async def main(room_url: str, token):
global llm
global tts
transport = DailyTransportService(
transport = DailyTransport(
room_url,
token,
"Intake Bot",

View File

@@ -11,7 +11,7 @@ from PIL import Image
from dailyai.pipeline.pipeline import Pipeline
from dailyai.pipeline.frame_processor import FrameProcessor
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.services.fal_ai_services import FalImageGenService
from dailyai.services.open_ai_services import OpenAILLMService
@@ -214,7 +214,7 @@ async def main(room_url: str, token):
sp = StoryProcessor(messages, story)
sig = StoryImageGenerator(story, llm, img)
transport = DailyTransportService(
transport = DailyTransport(
room_url,
token,
"Storybot",

View File

@@ -10,7 +10,7 @@ from dailyai.pipeline.aggregators import (
from dailyai.pipeline.frames import Frame, LLMMessagesQueueFrame, TextFrame
from dailyai.pipeline.frame_processor import FrameProcessor
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.azure_ai_services import AzureTTSService
from dailyai.services.open_ai_services import OpenAILLMService
@@ -51,7 +51,7 @@ class TranslationProcessor(FrameProcessor):
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransportService(
transport = DailyTransport(
room_url,
token,
"Translator",

View File

@@ -12,9 +12,10 @@ class SequentialMergePipeline(Pipeline):
self.pipelines = pipelines
async def run_pipeline(self):
for pipeline in self.pipelines:
for idx, pipeline in enumerate(self.pipelines):
while True:
frame = await pipeline.sink.get()
print(idx, frame)
if isinstance(
frame, EndFrame) or isinstance(
frame, EndPipeFrame):

View File

@@ -0,0 +1,41 @@
from abc import abstractmethod
import asyncio
import logging
import time
from dailyai.pipeline.frame_processor import FrameProcessor
from dailyai.pipeline.pipeline import Pipeline
class AbstractTransport:
def __init__(self, **kwargs):
self.send_queue = asyncio.Queue()
self.receive_queue = asyncio.Queue()
self.completed_queue = asyncio.Queue()
duration_minutes = kwargs.get("duration_minutes") or 10
self._expiration = time.time() + duration_minutes * 60
self._mic_enabled = kwargs.get("mic_enabled") or False
self._mic_sample_rate = kwargs.get("mic_sample_rate") or 16000
self._camera_enabled = kwargs.get("camera_enabled") or False
self._camera_width = kwargs.get("camera_width") or 1024
self._camera_height = kwargs.get("camera_height") or 768
self._speaker_enabled = kwargs.get("speaker_enabled") or False
self._speaker_sample_rate = kwargs.get("speaker_sample_rate") or 16000
self._fps = kwargs.get("fps") or 8
self._logger: logging.Logger = logging.getLogger("dailyai.transport")
@abstractmethod
async def run(self, pipeline: Pipeline, override_pipeline_source_queue=True):
pass
@abstractmethod
async def run_interruptible_pipeline(
self,
pipeline: Pipeline,
pre_processor: FrameProcessor | None = None,
post_processor: FrameProcessor | None = None,
):
pass

View File

@@ -24,10 +24,10 @@ from daily import (
VirtualSpeakerDevice,
)
from dailyai.services.base_transport_service import BaseTransportService
from dailyai.transports.threaded_transport import ThreadedTransport
class DailyTransportService(BaseTransportService, EventHandler):
class DailyTransport(ThreadedTransport, EventHandler):
_daily_initialized = False
_lock = threading.Lock()
@@ -48,7 +48,7 @@ class DailyTransportService(BaseTransportService, EventHandler):
start_transcription: bool = False,
**kwargs,
):
# This will call BaseTransportService.__init__ method, not EventHandler
# This will call ThreadedTransport.__init__ method, not EventHandler
super().__init__(**kwargs)
self._room_url: str = room_url
@@ -140,10 +140,10 @@ class DailyTransportService(BaseTransportService, EventHandler):
def _prerun(self):
# Only initialize Daily once
if not DailyTransportService._daily_initialized:
with DailyTransportService._lock:
if not DailyTransport._daily_initialized:
with DailyTransport._lock:
Daily.init()
DailyTransportService._daily_initialized = True
DailyTransport._daily_initialized = True
self.client = CallClient(event_handler=self)
if self._mic_enabled:

View File

@@ -3,10 +3,10 @@ import numpy as np
import tkinter as tk
import pyaudio
from dailyai.services.base_transport_service import BaseTransportService
from dailyai.transports.threaded_transport import ThreadedTransport
class LocalTransportService(BaseTransportService):
class LocalTransport(ThreadedTransport):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._sample_width = kwargs.get("sample_width") or 2

View File

@@ -27,6 +27,7 @@ from dailyai.pipeline.frames import (
)
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.ai_services import TTSService
from dailyai.transports.abstract_transport import AbstractTransport
torch.set_num_threads(1)
@@ -72,20 +73,13 @@ class VADState(Enum):
STOPPING = 4
class BaseTransportService:
class ThreadedTransport(AbstractTransport):
def __init__(
self,
**kwargs,
) -> None:
self._mic_enabled = kwargs.get("mic_enabled") or False
self._mic_sample_rate = kwargs.get("mic_sample_rate") or 16000
self._camera_enabled = kwargs.get("camera_enabled") or False
self._camera_width = kwargs.get("camera_width") or 1024
self._camera_height = kwargs.get("camera_height") or 768
self._speaker_enabled = kwargs.get("speaker_enabled") or False
self._speaker_sample_rate = kwargs.get("speaker_sample_rate") or 16000
self._fps = kwargs.get("fps") or 8
super().__init__(**kwargs)
self._vad_start_s = kwargs.get("vad_start_s") or 0.2
self._vad_stop_s = kwargs.get("vad_stop_s") or 0.8
self._context = kwargs.get("context") or []
@@ -105,14 +99,6 @@ class BaseTransportService:
self._vad_state = VADState.QUIET
self._user_is_speaking = False
duration_minutes = kwargs.get("duration_minutes") or 10
self._expiration = time.time() + duration_minutes * 60
self.send_queue = asyncio.Queue()
self.receive_queue = asyncio.Queue()
self.completed_queue = asyncio.Queue()
self._threadsafe_send_queue = queue.Queue()
self._images = None
@@ -125,8 +111,6 @@ class BaseTransportService:
self._stop_threads = threading.Event()
self._is_interrupted = threading.Event()
self._logger: logging.Logger = logging.getLogger()
async def run(self, pipeline: Pipeline | None = None, override_pipeline_source_queue=True):
self._prerun()
@@ -193,8 +177,7 @@ class BaseTransportService:
async def run_interruptible_pipeline(
self,
pipeline: Pipeline,
allow_interruptions=True,
pre_processor=None,
pre_processor: FrameProcessor | None = None,
post_processor: FrameProcessor | None = None,
):
pipeline.set_sink(self.send_queue)

View File

@@ -7,7 +7,8 @@ from dailyai.pipeline.frame_processor import FrameProcessor
from dailyai.pipeline.frames import AudioFrame, ControlFrame, EndFrame, Frame, TTSEndFrame, TTSStartFrame, TextFrame
from dailyai.pipeline.pipeline import Pipeline
from dailyai.serializers.protobuf_serializer import ProtobufFrameSerializer
from dailyai.services.base_transport_service import BaseTransportService
from dailyai.transports.abstract_transport import AbstractTransport
from dailyai.transports.threaded_transport import ThreadedTransport
class WebSocketFrameProcessor(FrameProcessor):
@@ -45,7 +46,7 @@ class WebSocketFrameProcessor(FrameProcessor):
yield frame
class WebsocketTransport(BaseTransportService):
class WebsocketTransport(AbstractTransport):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._sample_width = kwargs.get("sample_width", 2)

View File

@@ -4,9 +4,9 @@ import unittest
class TestDailyTransport(unittest.IsolatedAsyncioTestCase):
async def test_event_handler(self):
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.transports.daily_transport import DailyTransport
transport = DailyTransportService("mock.daily.co/mock", "token", "bot")
transport = DailyTransport("mock.daily.co/mock", "token", "bot")
was_called = False

View File

@@ -4,7 +4,7 @@ from unittest.mock import AsyncMock, patch, Mock
from dailyai.pipeline.frames import AudioFrame, EndFrame, TextFrame, TTSEndFrame, TTSStartFrame
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.websocket_transport_service import WebSocketFrameProcessor, WebsocketTransport
from dailyai.transports.websocket_transport import WebSocketFrameProcessor, WebsocketTransport
class TestWebSocketTransportService(unittest.IsolatedAsyncioTestCase):