From 4416f36ae967a0bd4532b9815a1fa607bac1ebce Mon Sep 17 00:00:00 2001 From: Moishe Lettvin Date: Sat, 27 Jan 2024 19:07:29 -0500 Subject: [PATCH] some minor cleanup, and coalesce image/images into one thing, and use itertools.cycle --- src/dailyai/queue_frame.py | 3 +- src/dailyai/services/azure_ai_services.py | 2 +- .../services/daily_transport_service.py | 37 +++++++--------- src/dailyai/services/fal_ai_services.py | 2 +- src/samples/foundational/06a-image-sync.py | 3 +- src/samples/foundational/10-wake-word.py | 43 +++++++++++-------- src/samples/server/auth.py | 15 +++++-- src/samples/server/daily-bot-manager.py | 7 +-- 8 files changed, 62 insertions(+), 50 deletions(-) diff --git a/src/dailyai/queue_frame.py b/src/dailyai/queue_frame.py index c00f0827d..75a524813 100644 --- a/src/dailyai/queue_frame.py +++ b/src/dailyai/queue_frame.py @@ -32,7 +32,8 @@ class ImageQueueFrame(QueueFrame): @dataclass() class SpriteQueueFrame(QueueFrame): - images: list[bytes] | None + images: list[bytes] + @dataclass() class TextQueueFrame(QueueFrame): diff --git a/src/dailyai/services/azure_ai_services.py b/src/dailyai/services/azure_ai_services.py index c35584f54..64d542336 100644 --- a/src/dailyai/services/azure_ai_services.py +++ b/src/dailyai/services/azure_ai_services.py @@ -101,7 +101,7 @@ class AzureImageGenServiceREST(ImageGenService): def __init__( self, image_size: str, - aiohttp_session:aiohttp.ClientSession, + aiohttp_session: aiohttp.ClientSession, api_key=None, azure_endpoint=None, api_version=None, diff --git a/src/dailyai/services/daily_transport_service.py b/src/dailyai/services/daily_transport_service.py index e7ab5e8e9..e3dd5abde 100644 --- a/src/dailyai/services/daily_transport_service.py +++ b/src/dailyai/services/daily_transport_service.py @@ -1,5 +1,6 @@ import asyncio import inspect +import itertools import logging import sys import threading @@ -8,7 +9,7 @@ import types from functools import partial from queue import Queue, Empty -from typing import AsyncGenerator +from typing import AsyncGenerator, Iterable from dailyai.queue_frame import ( AudioQueueFrame, @@ -62,7 +63,8 @@ class DailyTransportService(EventHandler): # This queue is used to marshal frames from the async send queue to the thread that emits audio & video. # We need this to maintain the asynchronous behavior of asyncio queues -- to give async functions # a chance to run while waiting for queue items -- but also to maintain thread safety and have a threaded - # handler to send frames, to ensure that sending isn't subject to pauses in the async thread. + # handler to send frames, to ensure that sending isn't subject to pauses + # in the async thread. self.threadsafe_send_queue = Queue() self._is_interrupted = Event() @@ -168,10 +170,9 @@ class DailyTransportService(EventHandler): ) Daily.select_speaker_device("speaker") - self._image: bytes | None = None - self._images: list[bytes] | None = None + self._images = None - self._camera_thread = Thread(target=self.run_camera, daemon=True) + self._camera_thread = Thread(target=self._run_camera, daemon=True) self._camera_thread.start() self._logger.info("Starting frame consumer thread") @@ -343,24 +344,18 @@ class DailyTransportService(EventHandler): def on_transcription_started(self, status): pass - def set_image(self, image: bytes): - self._image: bytes | None = image - self._images: list[bytes] | None = None - - def set_images(self, images: list[bytes], start_frame=0): - self._images: list[bytes] | None = images - self._image = None - self._current_frame = start_frame - - def run_camera(self): + def _set_image(self, image: bytes): + self._images = itertools.cycle([image]) + + def _set_images(self, images: list[bytes], start_frame=0): + self._images = itertools.cycle(images) + + def _run_camera(self): try: while not self._stop_threads.is_set(): - if self._image: - self.camera.write_frame(self._image) if self._images: - this_frame = self._images[self._current_frame] + this_frame = next(self._images) self.camera.write_frame(this_frame) - self._current_frame = (self._current_frame + 1) % len(self._images) time.sleep(1.0 / 8) # 8 fps except Exception as e: @@ -402,9 +397,9 @@ class DailyTransportService(EventHandler): self.mic.write_frames(bytes(b[:l])) b = b[l:] elif isinstance(frame, ImageQueueFrame): - self.set_image(frame.image) + self._set_image(frame.image) elif isinstance(frame, SpriteQueueFrame): - self.set_images(frame.images) + self._set_images(frame.images) elif len(b): self.mic.write_frames(bytes(b)) b = bytearray() diff --git a/src/dailyai/services/fal_ai_services.py b/src/dailyai/services/fal_ai_services.py index b4b185e4e..ef9f8a801 100644 --- a/src/dailyai/services/fal_ai_services.py +++ b/src/dailyai/services/fal_ai_services.py @@ -11,7 +11,7 @@ from dailyai.services.ai_services import LLMService, TTSService, ImageGenService class FalImageGenService(ImageGenService): - def __init__(self, image_size, aiohttp_session:aiohttp.ClientSession): + def __init__(self, image_size, aiohttp_session: aiohttp.ClientSession): super().__init__(image_size) self._aiohttp_session = aiohttp_session diff --git a/src/samples/foundational/06a-image-sync.py b/src/samples/foundational/06a-image-sync.py index 47bb025de..ede4d7648 100644 --- a/src/samples/foundational/06a-image-sync.py +++ b/src/samples/foundational/06a-image-sync.py @@ -16,7 +16,7 @@ from dailyai.services.fal_ai_services import FalImageGenService class ImageSyncAggregator(AIService): - def __init__(self, speaking_path:str, waiting_path:str): + def __init__(self, speaking_path: str, waiting_path: str): self._speaking_image = Image.open(speaking_path) self._speaking_image_bytes = self._speaking_image.tobytes() @@ -28,6 +28,7 @@ class ImageSyncAggregator(AIService): yield frame yield ImageQueueFrame(None, self._waiting_image_bytes) + async def main(room_url: str, token): global transport global llm diff --git a/src/samples/foundational/10-wake-word.py b/src/samples/foundational/10-wake-word.py index e56d527ea..07d30d35b 100644 --- a/src/samples/foundational/10-wake-word.py +++ b/src/samples/foundational/10-wake-word.py @@ -7,21 +7,22 @@ import requests import time import urllib.parse -from dotenv import load_dotenv from PIL import Image -load_dotenv() - from dailyai.services.daily_transport_service import DailyTransportService -from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService +from dailyai.services.azure_ai_services import AzureLLMService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService -from dailyai.services.fal_ai_services import FalImageGenService -from dailyai.services.open_ai_services import OpenAIImageGenService from dailyai.queue_aggregators import LLMContextAggregator -from dailyai.queue_frame import LLMMessagesQueueFrame, QueueFrame, TextQueueFrame, ImageQueueFrame, SpriteQueueFrame +from dailyai.queue_frame import ( + QueueFrame, + TextQueueFrame, + ImageQueueFrame, + SpriteQueueFrame, + TranscriptionQueueFrame, +) from dailyai.services.ai_services import AIService -from typing import AsyncGenerator, List +from typing import AsyncGenerator sprites = {} image_files = [ @@ -53,23 +54,30 @@ talking = [random.choice(talking_list) for x in range(30)] talking_frame = SpriteQueueFrame(images=talking) # TODO: Support "thinking" as soon as we get a valid transcript, while LLM is processing -thinking_list = [sprites['sc-think-1.png'], sprites['sc-think-2.png'], sprites['sc-think-3.png'], sprites['sc-think-4.png']] +thinking_list = [ + sprites['sc-think-1.png'], + sprites['sc-think-2.png'], + sprites['sc-think-3.png'], + sprites['sc-think-4.png']] thinking_frame = SpriteQueueFrame(images=thinking_list) + class TranscriptFilter(AIService): def __init__(self, bot_participant_id=None): self.bot_participant_id = bot_participant_id - async def process_frame(self, frame:QueueFrame) -> AsyncGenerator[QueueFrame, None]: - if frame.participantId != self.bot_participant_id: - yield frame + async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: + if isinstance(frame, TranscriptionQueueFrame): + if frame.participantId != self.bot_participant_id: + yield frame + class NameCheckFilter(AIService): def __init__(self, names=None): self.names = names self.sentence = "" - async def process_frame(self, frame:QueueFrame) -> AsyncGenerator[QueueFrame, None]: + async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: content: str = "" # TODO: split up transcription by participant @@ -86,6 +94,7 @@ class NameCheckFilter(AIService): out = self.sentence self.sentence = "" + class ImageSyncAggregator(AIService): def __init__(self): pass @@ -95,7 +104,8 @@ class ImageSyncAggregator(AIService): yield frame yield quiet_frame -async def main(room_url:str, token): + +async def main(room_url: str, token): async with aiohttp.ClientSession() as session: global transport global llm @@ -153,14 +163,11 @@ async def main(room_url:str, token): async def starting_image(): await transport.send_queue.put(quiet_frame) - + transport.transcription_settings["extra"]["punctuate"] = True await asyncio.gather(transport.run(), handle_transcriptions(), starting_image()) - - - if __name__ == "__main__": parser = argparse.ArgumentParser(description="Simple Daily Bot Sample") parser.add_argument( diff --git a/src/samples/server/auth.py b/src/samples/server/auth.py index 5fab362cd..6eba905e3 100644 --- a/src/samples/server/auth.py +++ b/src/samples/server/auth.py @@ -8,14 +8,21 @@ import os load_dotenv() + def get_meeting_token(room_name, daily_api_key, token_expiry): api_path = os.getenv('DAILY_API_PATH') or 'https://api.daily.co/v1' if not token_expiry: token_expiry = time.time() + 600 - res = requests.post(f'{api_path}/meeting-tokens', - headers={'Authorization': f'Bearer {daily_api_key}'}, - json={'properties': {'room_name': room_name, 'is_owner': True, 'exp': token_expiry}}) + res = requests.post( + f'{api_path}/meeting-tokens', + headers={ + 'Authorization': f'Bearer {daily_api_key}'}, + json={ + 'properties': { + 'room_name': room_name, + 'is_owner': True, + 'exp': token_expiry}}) if res.status_code != 200: return jsonify({'error': 'Unable to create meeting token', 'detail': res.text}), 500 meeting_token = res.json()['token'] @@ -23,4 +30,4 @@ def get_meeting_token(room_name, daily_api_key, token_expiry): def get_room_name(room_url): - return urllib.parse.urlparse(room_url).path[1:] \ No newline at end of file + return urllib.parse.urlparse(room_url).path[1:] diff --git a/src/samples/server/daily-bot-manager.py b/src/samples/server/daily-bot-manager.py index 877861866..9d001cb87 100644 --- a/src/samples/server/daily-bot-manager.py +++ b/src/samples/server/daily-bot-manager.py @@ -16,6 +16,7 @@ CORS(app) print(f"I loaded an environment, and my FAL_KEY_ID is {os.getenv('FAL_KEY_ID')}") + def start_bot(bot_path, args=None): daily_api_key = os.getenv("DAILY_API_KEY") api_path = os.getenv("DAILY_API_PATH") or "https://api.daily.co/v1" @@ -77,7 +78,7 @@ def start_bot(bot_path, args=None): if res.status_code == 200: break print(f"Took {attempts} attempts to join room {room_name}") - + # Additional client config config = {} if os.getenv("CLIENT_VAD_TIMEOUT_SEC"): @@ -85,7 +86,7 @@ def start_bot(bot_path, args=None): else: config['vad_timeout_sec'] = 1.5 - #return jsonify({"room_url": room_url, "token": meeting_token, "config": config}), 200 + # return jsonify({"room_url": room_url, "token": meeting_token, "config": config}), 200 return redirect(room_url, code=301) @@ -96,4 +97,4 @@ def spin_up_kitty(): @app.route("/healthz") def health_check(): - return "ok", 200 \ No newline at end of file + return "ok", 200