diff --git a/examples/foundational/01-say-one-thing.py b/examples/foundational/01-say-one-thing.py index e0195ef8c..aecda2963 100644 --- a/examples/foundational/01-say-one-thing.py +++ b/examples/foundational/01-say-one-thing.py @@ -5,7 +5,7 @@ import os from dailyai.pipeline.frames import EndFrame, TextFrame from dailyai.pipeline.pipeline import Pipeline -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from runner import configure @@ -20,7 +20,7 @@ logger.setLevel(logging.DEBUG) async def main(room_url): async with aiohttp.ClientSession() as session: - transport = DailyTransportService( + transport = DailyTransport( room_url, None, "Say One Thing", diff --git a/examples/foundational/01a-local-transport.py b/examples/foundational/01a-local-transport.py index b54ecedbd..d653a684c 100644 --- a/examples/foundational/01a-local-transport.py +++ b/examples/foundational/01a-local-transport.py @@ -4,7 +4,7 @@ import logging import os from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService -from dailyai.services.local_transport_service import LocalTransportService +from dailyai.transports.local_transport import LocalTransport logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s") logger = logging.getLogger("dailyai") @@ -14,7 +14,7 @@ logger.setLevel(logging.DEBUG) async def main(): async with aiohttp.ClientSession() as session: meeting_duration_minutes = 1 - transport = LocalTransportService( + transport = LocalTransport( duration_minutes=meeting_duration_minutes, mic_enabled=True ) tts = ElevenLabsTTSService( diff --git a/examples/foundational/02-llm-say-one-thing.py b/examples/foundational/02-llm-say-one-thing.py index 84589c2d2..ed0a243ff 100644 --- a/examples/foundational/02-llm-say-one-thing.py +++ b/examples/foundational/02-llm-say-one-thing.py @@ -6,7 +6,7 @@ import aiohttp from dailyai.pipeline.frames import EndFrame, LLMMessagesQueueFrame from dailyai.pipeline.pipeline import Pipeline -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from dailyai.services.open_ai_services import OpenAILLMService @@ -22,7 +22,7 @@ logger.setLevel(logging.DEBUG) async def main(room_url): async with aiohttp.ClientSession() as session: - transport = DailyTransportService( + transport = DailyTransport( room_url, None, "Say One Thing From an LLM", diff --git a/examples/foundational/03-still-frame.py b/examples/foundational/03-still-frame.py index 3464f26be..c517fe6a9 100644 --- a/examples/foundational/03-still-frame.py +++ b/examples/foundational/03-still-frame.py @@ -5,7 +5,7 @@ import os from dailyai.pipeline.frames import TextFrame from dailyai.pipeline.pipeline import Pipeline -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.fal_ai_services import FalImageGenService from runner import configure @@ -20,7 +20,7 @@ logger.setLevel(logging.DEBUG) async def main(room_url): async with aiohttp.ClientSession() as session: - transport = DailyTransportService( + transport = DailyTransport( room_url, None, "Show a still frame image", diff --git a/examples/foundational/03a-image-local.py b/examples/foundational/03a-image-local.py index d3e6b5797..eba208970 100644 --- a/examples/foundational/03a-image-local.py +++ b/examples/foundational/03a-image-local.py @@ -7,7 +7,7 @@ import tkinter as tk from dailyai.pipeline.frames import TextFrame from dailyai.services.fal_ai_services import FalImageGenService -from dailyai.services.local_transport_service import LocalTransportService +from dailyai.transports.local_transport import LocalTransport logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s") logger = logging.getLogger("dailyai") @@ -22,7 +22,7 @@ async def main(): meeting_duration_minutes = 2 tk_root = tk.Tk() tk_root.title("Calendar") - transport = LocalTransportService( + transport = LocalTransport( tk_root=tk_root, mic_enabled=True, camera_enabled=True, diff --git a/examples/foundational/04-utterance-and-speech.py b/examples/foundational/04-utterance-and-speech.py index 2a033942b..c5eb84f59 100644 --- a/examples/foundational/04-utterance-and-speech.py +++ b/examples/foundational/04-utterance-and-speech.py @@ -6,7 +6,7 @@ import aiohttp from dailyai.pipeline.merge_pipeline import SequentialMergePipeline from dailyai.pipeline.pipeline import Pipeline -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService from dailyai.services.deepgram_ai_services import DeepgramTTSService from dailyai.pipeline.frames import EndPipeFrame, LLMMessagesQueueFrame, TextFrame @@ -24,7 +24,7 @@ logger.setLevel(logging.DEBUG) async def main(room_url: str): async with aiohttp.ClientSession() as session: - transport = DailyTransportService( + transport = DailyTransport( room_url, None, "Static And Dynamic Speech", diff --git a/examples/foundational/05-sync-speech-and-image.py b/examples/foundational/05-sync-speech-and-image.py index 7211bd155..3ee2514f8 100644 --- a/examples/foundational/05-sync-speech-and-image.py +++ b/examples/foundational/05-sync-speech-and-image.py @@ -23,7 +23,7 @@ from dailyai.pipeline.frames import ( from dailyai.pipeline.frame_processor import FrameProcessor from dailyai.pipeline.pipeline import Pipeline -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.open_ai_services import OpenAILLMService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from dailyai.services.fal_ai_services import FalImageGenService @@ -63,7 +63,7 @@ class MonthPrepender(FrameProcessor): async def main(room_url): async with aiohttp.ClientSession() as session: - transport = DailyTransportService( + transport = DailyTransport( room_url, None, "Month Narration Bot", diff --git a/examples/foundational/05a-local-sync-speech-and-text.py b/examples/foundational/05a-local-sync-speech-and-text.py index cd21474b0..fd1076ff5 100644 --- a/examples/foundational/05a-local-sync-speech-and-text.py +++ b/examples/foundational/05a-local-sync-speech-and-text.py @@ -9,7 +9,7 @@ from dailyai.pipeline.frames import AudioFrame, ImageFrame from dailyai.services.open_ai_services import OpenAILLMService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from dailyai.services.fal_ai_services import FalImageGenService -from dailyai.services.local_transport_service import LocalTransportService +from dailyai.transports.local_transport import LocalTransport logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s") logger = logging.getLogger("dailyai") @@ -22,7 +22,7 @@ async def main(room_url): tk_root = tk.Tk() tk_root.title("Calendar") - transport = LocalTransportService( + transport = LocalTransport( mic_enabled=True, camera_enabled=True, camera_width=1024, diff --git a/examples/foundational/06-listen-and-respond.py b/examples/foundational/06-listen-and-respond.py index 4c1018d5c..ec5aa6fa0 100644 --- a/examples/foundational/06-listen-and-respond.py +++ b/examples/foundational/06-listen-and-respond.py @@ -5,7 +5,7 @@ import os from dailyai.pipeline.frames import LLMMessagesQueueFrame from dailyai.pipeline.pipeline import Pipeline -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from dailyai.services.open_ai_services import OpenAILLMService from dailyai.services.ai_services import FrameLogger @@ -25,7 +25,7 @@ logger.setLevel(logging.DEBUG) async def main(room_url: str, token): async with aiohttp.ClientSession() as session: - transport = DailyTransportService( + transport = DailyTransport( room_url, token, "Respond bot", diff --git a/examples/foundational/06a-image-sync.py b/examples/foundational/06a-image-sync.py index 5d0919385..8a517b448 100644 --- a/examples/foundational/06a-image-sync.py +++ b/examples/foundational/06a-image-sync.py @@ -6,7 +6,7 @@ import aiohttp from PIL import Image from dailyai.pipeline.frames import ImageFrame, Frame -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.ai_services import AIService from dailyai.pipeline.aggregators import ( LLMAssistantContextAggregator, @@ -42,7 +42,7 @@ class ImageSyncAggregator(AIService): async def main(room_url: str, token): async with aiohttp.ClientSession() as session: - transport = DailyTransportService( + transport = DailyTransport( room_url, token, "Respond bot", diff --git a/examples/foundational/07-interruptible.py b/examples/foundational/07-interruptible.py index f6ca27402..5da4fae16 100644 --- a/examples/foundational/07-interruptible.py +++ b/examples/foundational/07-interruptible.py @@ -9,7 +9,7 @@ from dailyai.pipeline.aggregators import ( from dailyai.pipeline.pipeline import Pipeline from dailyai.services.ai_services import FrameLogger -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.open_ai_services import OpenAILLMService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService @@ -25,7 +25,7 @@ logger.setLevel(logging.DEBUG) async def main(room_url: str, token): async with aiohttp.ClientSession() as session: - transport = DailyTransportService( + transport = DailyTransport( room_url, token, "Respond bot", diff --git a/examples/foundational/08-bots-arguing.py b/examples/foundational/08-bots-arguing.py index ff85318cd..2857203f8 100644 --- a/examples/foundational/08-bots-arguing.py +++ b/examples/foundational/08-bots-arguing.py @@ -6,7 +6,7 @@ import os from dailyai.pipeline.aggregators import SentenceAggregator from dailyai.pipeline.pipeline import Pipeline -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from dailyai.services.fal_ai_services import FalImageGenService @@ -24,7 +24,7 @@ logger.setLevel(logging.DEBUG) async def main(room_url: str): async with aiohttp.ClientSession() as session: - transport = DailyTransportService( + transport = DailyTransport( room_url, None, "Respond bot", diff --git a/examples/foundational/10-wake-word.py b/examples/foundational/10-wake-word.py index 39f2decff..bb7fd4c5f 100644 --- a/examples/foundational/10-wake-word.py +++ b/examples/foundational/10-wake-word.py @@ -6,7 +6,7 @@ import random from typing import AsyncGenerator from PIL import Image -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.open_ai_services import OpenAILLMService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from dailyai.pipeline.aggregators import ( @@ -116,7 +116,7 @@ class ImageSyncAggregator(AIService): async def main(room_url: str, token): async with aiohttp.ClientSession() as session: - transport = DailyTransportService( + transport = DailyTransport( room_url, token, "Santa Cat", diff --git a/examples/foundational/11-sound-effects.py b/examples/foundational/11-sound-effects.py index 767dbfb89..0bb306f81 100644 --- a/examples/foundational/11-sound-effects.py +++ b/examples/foundational/11-sound-effects.py @@ -4,7 +4,7 @@ import logging import os import wave -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.open_ai_services import OpenAILLMService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService from dailyai.pipeline.aggregators import ( @@ -72,7 +72,7 @@ class InboundSoundEffectWrapper(AIService): async def main(room_url: str, token): async with aiohttp.ClientSession() as session: - transport = DailyTransportService( + transport = DailyTransport( room_url, token, "Respond bot", diff --git a/examples/foundational/13-whisper-transcription.py b/examples/foundational/13-whisper-transcription.py index eaa9a3bc2..d579724de 100644 --- a/examples/foundational/13-whisper-transcription.py +++ b/examples/foundational/13-whisper-transcription.py @@ -1,7 +1,7 @@ import asyncio import logging -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.whisper_ai_services import WhisperSTTService from runner import configure @@ -15,7 +15,7 @@ logger.setLevel(logging.DEBUG) async def main(room_url: str): - transport = DailyTransportService( + transport = DailyTransport( room_url, None, "Transcription bot", diff --git a/examples/foundational/13a-whisper-local.py b/examples/foundational/13a-whisper-local.py index cbe751766..1fc862038 100644 --- a/examples/foundational/13a-whisper-local.py +++ b/examples/foundational/13a-whisper-local.py @@ -3,7 +3,7 @@ import asyncio import logging from dailyai.pipeline.frames import EndFrame, TranscriptionQueueFrame -from dailyai.services.local_transport_service import LocalTransportService +from dailyai.transports.local_transport import LocalTransport from dailyai.services.whisper_ai_services import WhisperSTTService logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s") @@ -16,7 +16,7 @@ async def main(room_url: str): global stt meeting_duration_minutes = 1 - transport = LocalTransportService( + transport = LocalTransport( mic_enabled=True, camera_enabled=False, speaker_enabled=True, diff --git a/examples/foundational/websocket-server/sample.py b/examples/foundational/websocket-server/sample.py index 72390831e..d82b0cfea 100644 --- a/examples/foundational/websocket-server/sample.py +++ b/examples/foundational/websocket-server/sample.py @@ -6,7 +6,7 @@ from dailyai.pipeline.frame_processor import FrameProcessor from dailyai.pipeline.frames import TextFrame, TranscriptionQueueFrame from dailyai.pipeline.pipeline import Pipeline from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService -from dailyai.services.websocket_transport_service import WebsocketTransport +from dailyai.transports.websocket_transport import WebsocketTransport from dailyai.services.whisper_ai_services import WhisperSTTService logging.basicConfig(format="%(levelno)s %(asctime)s %(message)s") diff --git a/examples/image-gen.py b/examples/image-gen.py index e1cedfcca..6b1efbf76 100644 --- a/examples/image-gen.py +++ b/examples/image-gen.py @@ -5,7 +5,7 @@ import time import urllib.parse import random -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService from dailyai.pipeline.frames import Frame, FrameType from dailyai.services.fal_ai_services import FalImageGenService @@ -17,7 +17,7 @@ async def main(room_url: str, token): global llm global tts - transport = DailyTransportService( + transport = DailyTransport( room_url, token, "Imagebot", diff --git a/examples/internal/11a-dial-out.py b/examples/internal/11a-dial-out.py index ee23bc9ce..a41bb50b7 100644 --- a/examples/internal/11a-dial-out.py +++ b/examples/internal/11a-dial-out.py @@ -3,7 +3,7 @@ import asyncio import os import wave -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService from dailyai.pipeline.aggregators import LLMContextAggregator from dailyai.services.ai_services import AIService, FrameLogger @@ -66,7 +66,7 @@ async def main(room_url: str, token, phone): global llm global tts - transport = DailyTransportService( + transport = DailyTransport( room_url, token, "Respond bot", diff --git a/examples/starter-apps/chatbot.py b/examples/starter-apps/chatbot.py index 1c40a3413..67702025e 100644 --- a/examples/starter-apps/chatbot.py +++ b/examples/starter-apps/chatbot.py @@ -20,7 +20,7 @@ from dailyai.pipeline.frames import ( ) from dailyai.services.ai_services import AIService from dailyai.pipeline.pipeline import Pipeline -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.open_ai_services import OpenAILLMService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService @@ -92,7 +92,7 @@ class AnimationInitializer(AIService): async def main(room_url: str, token): async with aiohttp.ClientSession() as session: - transport = DailyTransportService( + transport = DailyTransport( room_url, token, "Chatbot", diff --git a/examples/starter-apps/patient-intake.py b/examples/starter-apps/patient-intake.py index 7a35631d7..9409d894a 100644 --- a/examples/starter-apps/patient-intake.py +++ b/examples/starter-apps/patient-intake.py @@ -13,7 +13,7 @@ from dailyai.pipeline.opeanai_llm_aggregator import ( ) from dailyai.pipeline.pipeline import Pipeline -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.openai_llm_context import OpenAILLMContext from dailyai.services.open_ai_services import OpenAILLMService # from dailyai.services.deepgram_ai_services import DeepgramTTSService @@ -292,7 +292,7 @@ async def main(room_url: str, token): global llm global tts - transport = DailyTransportService( + transport = DailyTransport( room_url, token, "Intake Bot", diff --git a/examples/starter-apps/storybot.py b/examples/starter-apps/storybot.py index c001ec673..79f708071 100644 --- a/examples/starter-apps/storybot.py +++ b/examples/starter-apps/storybot.py @@ -11,7 +11,7 @@ from PIL import Image from dailyai.pipeline.pipeline import Pipeline from dailyai.pipeline.frame_processor import FrameProcessor -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService from dailyai.services.fal_ai_services import FalImageGenService from dailyai.services.open_ai_services import OpenAILLMService @@ -214,7 +214,7 @@ async def main(room_url: str, token): sp = StoryProcessor(messages, story) sig = StoryImageGenerator(story, llm, img) - transport = DailyTransportService( + transport = DailyTransport( room_url, token, "Storybot", diff --git a/examples/starter-apps/translator.py b/examples/starter-apps/translator.py index 04be82d8e..cb80d3282 100644 --- a/examples/starter-apps/translator.py +++ b/examples/starter-apps/translator.py @@ -10,7 +10,7 @@ from dailyai.pipeline.aggregators import ( from dailyai.pipeline.frames import Frame, LLMMessagesQueueFrame, TextFrame from dailyai.pipeline.frame_processor import FrameProcessor from dailyai.pipeline.pipeline import Pipeline -from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.transports.daily_transport import DailyTransport from dailyai.services.azure_ai_services import AzureTTSService from dailyai.services.open_ai_services import OpenAILLMService @@ -51,7 +51,7 @@ class TranslationProcessor(FrameProcessor): async def main(room_url: str, token): async with aiohttp.ClientSession() as session: - transport = DailyTransportService( + transport = DailyTransport( room_url, token, "Translator", diff --git a/src/dailyai/pipeline/merge_pipeline.py b/src/dailyai/pipeline/merge_pipeline.py index 736903e9d..e4b865f4a 100644 --- a/src/dailyai/pipeline/merge_pipeline.py +++ b/src/dailyai/pipeline/merge_pipeline.py @@ -12,9 +12,10 @@ class SequentialMergePipeline(Pipeline): self.pipelines = pipelines async def run_pipeline(self): - for pipeline in self.pipelines: + for idx, pipeline in enumerate(self.pipelines): while True: frame = await pipeline.sink.get() + print(idx, frame) if isinstance( frame, EndFrame) or isinstance( frame, EndPipeFrame): diff --git a/src/dailyai/transports/abstract_transport.py b/src/dailyai/transports/abstract_transport.py new file mode 100644 index 000000000..f5ecc5880 --- /dev/null +++ b/src/dailyai/transports/abstract_transport.py @@ -0,0 +1,41 @@ +from abc import abstractmethod +import asyncio +import logging +import time + +from dailyai.pipeline.frame_processor import FrameProcessor +from dailyai.pipeline.pipeline import Pipeline + + +class AbstractTransport: + def __init__(self, **kwargs): + self.send_queue = asyncio.Queue() + self.receive_queue = asyncio.Queue() + self.completed_queue = asyncio.Queue() + + duration_minutes = kwargs.get("duration_minutes") or 10 + self._expiration = time.time() + duration_minutes * 60 + + self._mic_enabled = kwargs.get("mic_enabled") or False + self._mic_sample_rate = kwargs.get("mic_sample_rate") or 16000 + self._camera_enabled = kwargs.get("camera_enabled") or False + self._camera_width = kwargs.get("camera_width") or 1024 + self._camera_height = kwargs.get("camera_height") or 768 + self._speaker_enabled = kwargs.get("speaker_enabled") or False + self._speaker_sample_rate = kwargs.get("speaker_sample_rate") or 16000 + self._fps = kwargs.get("fps") or 8 + + self._logger: logging.Logger = logging.getLogger("dailyai.transport") + + @abstractmethod + async def run(self, pipeline: Pipeline, override_pipeline_source_queue=True): + pass + + @abstractmethod + async def run_interruptible_pipeline( + self, + pipeline: Pipeline, + pre_processor: FrameProcessor | None = None, + post_processor: FrameProcessor | None = None, + ): + pass diff --git a/src/dailyai/services/daily_transport_service.py b/src/dailyai/transports/daily_transport.py similarity index 96% rename from src/dailyai/services/daily_transport_service.py rename to src/dailyai/transports/daily_transport.py index 9c8aa5e78..1da114d31 100644 --- a/src/dailyai/services/daily_transport_service.py +++ b/src/dailyai/transports/daily_transport.py @@ -24,10 +24,10 @@ from daily import ( VirtualSpeakerDevice, ) -from dailyai.services.base_transport_service import BaseTransportService +from dailyai.transports.threaded_transport import ThreadedTransport -class DailyTransportService(BaseTransportService, EventHandler): +class DailyTransport(ThreadedTransport, EventHandler): _daily_initialized = False _lock = threading.Lock() @@ -48,7 +48,7 @@ class DailyTransportService(BaseTransportService, EventHandler): start_transcription: bool = False, **kwargs, ): - # This will call BaseTransportService.__init__ method, not EventHandler + # This will call ThreadedTransport.__init__ method, not EventHandler super().__init__(**kwargs) self._room_url: str = room_url @@ -140,10 +140,10 @@ class DailyTransportService(BaseTransportService, EventHandler): def _prerun(self): # Only initialize Daily once - if not DailyTransportService._daily_initialized: - with DailyTransportService._lock: + if not DailyTransport._daily_initialized: + with DailyTransport._lock: Daily.init() - DailyTransportService._daily_initialized = True + DailyTransport._daily_initialized = True self.client = CallClient(event_handler=self) if self._mic_enabled: diff --git a/src/dailyai/services/local_transport_service.py b/src/dailyai/transports/local_transport.py similarity index 95% rename from src/dailyai/services/local_transport_service.py rename to src/dailyai/transports/local_transport.py index 2d334e7cd..1e791a467 100644 --- a/src/dailyai/services/local_transport_service.py +++ b/src/dailyai/transports/local_transport.py @@ -3,10 +3,10 @@ import numpy as np import tkinter as tk import pyaudio -from dailyai.services.base_transport_service import BaseTransportService +from dailyai.transports.threaded_transport import ThreadedTransport -class LocalTransportService(BaseTransportService): +class LocalTransport(ThreadedTransport): def __init__(self, **kwargs): super().__init__(**kwargs) self._sample_width = kwargs.get("sample_width") or 2 diff --git a/src/dailyai/services/base_transport_service.py b/src/dailyai/transports/threaded_transport.py similarity index 94% rename from src/dailyai/services/base_transport_service.py rename to src/dailyai/transports/threaded_transport.py index c4d963007..d529731cd 100644 --- a/src/dailyai/services/base_transport_service.py +++ b/src/dailyai/transports/threaded_transport.py @@ -27,6 +27,7 @@ from dailyai.pipeline.frames import ( ) from dailyai.pipeline.pipeline import Pipeline from dailyai.services.ai_services import TTSService +from dailyai.transports.abstract_transport import AbstractTransport torch.set_num_threads(1) @@ -72,20 +73,13 @@ class VADState(Enum): STOPPING = 4 -class BaseTransportService: +class ThreadedTransport(AbstractTransport): def __init__( self, **kwargs, ) -> None: - self._mic_enabled = kwargs.get("mic_enabled") or False - self._mic_sample_rate = kwargs.get("mic_sample_rate") or 16000 - self._camera_enabled = kwargs.get("camera_enabled") or False - self._camera_width = kwargs.get("camera_width") or 1024 - self._camera_height = kwargs.get("camera_height") or 768 - self._speaker_enabled = kwargs.get("speaker_enabled") or False - self._speaker_sample_rate = kwargs.get("speaker_sample_rate") or 16000 - self._fps = kwargs.get("fps") or 8 + super().__init__(**kwargs) self._vad_start_s = kwargs.get("vad_start_s") or 0.2 self._vad_stop_s = kwargs.get("vad_stop_s") or 0.8 self._context = kwargs.get("context") or [] @@ -105,14 +99,6 @@ class BaseTransportService: self._vad_state = VADState.QUIET self._user_is_speaking = False - duration_minutes = kwargs.get("duration_minutes") or 10 - self._expiration = time.time() + duration_minutes * 60 - - self.send_queue = asyncio.Queue() - self.receive_queue = asyncio.Queue() - - self.completed_queue = asyncio.Queue() - self._threadsafe_send_queue = queue.Queue() self._images = None @@ -125,8 +111,6 @@ class BaseTransportService: self._stop_threads = threading.Event() self._is_interrupted = threading.Event() - self._logger: logging.Logger = logging.getLogger() - async def run(self, pipeline: Pipeline | None = None, override_pipeline_source_queue=True): self._prerun() @@ -193,8 +177,7 @@ class BaseTransportService: async def run_interruptible_pipeline( self, pipeline: Pipeline, - allow_interruptions=True, - pre_processor=None, + pre_processor: FrameProcessor | None = None, post_processor: FrameProcessor | None = None, ): pipeline.set_sink(self.send_queue) diff --git a/src/dailyai/services/websocket_transport_service.py b/src/dailyai/transports/websocket_transport.py similarity index 96% rename from src/dailyai/services/websocket_transport_service.py rename to src/dailyai/transports/websocket_transport.py index a9df7c07d..0784eb89f 100644 --- a/src/dailyai/services/websocket_transport_service.py +++ b/src/dailyai/transports/websocket_transport.py @@ -7,7 +7,8 @@ from dailyai.pipeline.frame_processor import FrameProcessor from dailyai.pipeline.frames import AudioFrame, ControlFrame, EndFrame, Frame, TTSEndFrame, TTSStartFrame, TextFrame from dailyai.pipeline.pipeline import Pipeline from dailyai.serializers.protobuf_serializer import ProtobufFrameSerializer -from dailyai.services.base_transport_service import BaseTransportService +from dailyai.transports.abstract_transport import AbstractTransport +from dailyai.transports.threaded_transport import ThreadedTransport class WebSocketFrameProcessor(FrameProcessor): @@ -45,7 +46,7 @@ class WebSocketFrameProcessor(FrameProcessor): yield frame -class WebsocketTransport(BaseTransportService): +class WebsocketTransport(AbstractTransport): def __init__(self, **kwargs): super().__init__(**kwargs) self._sample_width = kwargs.get("sample_width", 2) diff --git a/tests/test_daily_transport_service.py b/tests/test_daily_transport_service.py index fb748cc97..da4e67389 100644 --- a/tests/test_daily_transport_service.py +++ b/tests/test_daily_transport_service.py @@ -4,9 +4,9 @@ import unittest class TestDailyTransport(unittest.IsolatedAsyncioTestCase): async def test_event_handler(self): - from dailyai.services.daily_transport_service import DailyTransportService + from dailyai.transports.daily_transport import DailyTransport - transport = DailyTransportService("mock.daily.co/mock", "token", "bot") + transport = DailyTransport("mock.daily.co/mock", "token", "bot") was_called = False diff --git a/tests/test_websocket_transport.py b/tests/test_websocket_transport.py index 4b0d3d9b3..ebcf94ea6 100644 --- a/tests/test_websocket_transport.py +++ b/tests/test_websocket_transport.py @@ -4,7 +4,7 @@ from unittest.mock import AsyncMock, patch, Mock from dailyai.pipeline.frames import AudioFrame, EndFrame, TextFrame, TTSEndFrame, TTSStartFrame from dailyai.pipeline.pipeline import Pipeline -from dailyai.services.websocket_transport_service import WebSocketFrameProcessor, WebsocketTransport +from dailyai.transports.websocket_transport import WebSocketFrameProcessor, WebsocketTransport class TestWebSocketTransportService(unittest.IsolatedAsyncioTestCase):