Compare commits

..

5 Commits

Author SHA1 Message Date
Kwindla Hultman Kramer
297b9402a8 theoretical sample: basic voice chat 2024-01-03 20:54:51 -08:00
Kwindla Hultman Kramer
36f4001877 three more theoretical samples 2024-01-03 11:55:48 -08:00
Kwindla Hultman Kramer
4ee34ce796 mic doesn't need to be global in 02 2024-01-01 21:54:14 -08:00
Kwindla Hultman Kramer
0db2cf5a80 working on theoretical API examples 2024-01-01 21:46:10 -08:00
Kwindla Hultman Kramer
72aa034c85 start of khk minimal samples 2023-12-31 21:17:11 -08:00
45 changed files with 573 additions and 1642 deletions

4
.gitignore vendored
View File

@@ -22,6 +22,4 @@ share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
.DS_Store
.env
MANIFEST

View File

@@ -5,14 +5,12 @@ This SDK can help you build applications that participate in WebRTC meetings and
## Build/Install
_Note that you may need to set up a virtual environment before following the instructions below. For instance, you might need to run the following from the root of the repo:_
```
python3 -m venv env
source env/bin/activate
```
From the root of this repo, run the following:
```
pip install -r requirements.txt
python -m build
@@ -21,11 +19,10 @@ python -m build
This builds the package. To use the package locally (eg to run sample files), run
```
pip install --editable .
pip install .
```
If you want to use this package from another directory, you can run:
```
pip install path_to_this_repo
```
@@ -35,7 +32,7 @@ pip install path_to_this_repo
Tou can run the simple sample like so:
```
python src/samples/theoretical-to-real/01-say-one-thing.py -u <url of your Daily meeting> -k <your Daily API Key>
python src/samples/simple-sample/simple-sample.py -u your_room_url -k your_daily_api_key
```
Note that the sample uses Azure's TTS and LLM services. You'll need to set the following environment variables for the sample to work:
@@ -47,9 +44,3 @@ AZURE_CHATGPT_KEY
AZURE_CHATGPT_ENDPOINT
AZURE_CHATGPT_DEPLOYMENT_ID
```
If you have those environment variables stored in an .env file, you can quickly load them into your terminal's environment by running this:
```bash
export $(grep -v '^#' .env | xargs)
```

View File

@@ -14,9 +14,7 @@ dependencies = [
"google-cloud-texttospeech",
"azure-cognitiveservices-speech",
"pyht",
"opentelemetry-sdk",
"aiohttp",
"fal"
"opentelemetry-sdk"
]
[tool.setuptools.packages.find]

View File

@@ -9,7 +9,7 @@ from queue import Queue, PriorityQueue, Empty
from threading import Event, Semaphore, Thread
from typing import Any, Generator, Iterator, Optional, Type
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.output_queue import OutputQueueFrame, FrameType
from dailyai.message_handler.message_handler import MessageHandler
from dailyai.services.ai_services import AIServiceConfig
@@ -268,10 +268,10 @@ class LLMResponse(OrchestratorResponse):
if out.strip():
yield out.strip()
def get_frames_from_tts_response(self, audio_frame) -> list[QueueFrame]:
return [QueueFrame(FrameType.AUDIO, audio_frame)]
def get_frames_from_tts_response(self, audio_frame) -> list[OutputQueueFrame]:
return [OutputQueueFrame(FrameType.AUDIO_FRAME, audio_frame)]
def get_frames_from_chunk(self, chunk) -> Generator[list[QueueFrame], Any, None]:
def get_frames_from_chunk(self, chunk) -> Generator[list[OutputQueueFrame], Any, None]:
for audio_frame in self.services.tts.run_tts(chunk):
yield self.get_frames_from_tts_response(audio_frame)
@@ -317,7 +317,7 @@ class LLMResponse(OrchestratorResponse):
break
if not self.has_sent_first_frame:
self.output_queue.put(QueueFrame(FrameType.START_STREAM, None))
self.output_queue.put(OutputQueueFrame(FrameType.START_STREAM, None))
self.has_sent_first_frame = True
for frame in frames:

View File

@@ -15,7 +15,7 @@ from dailyai.async_processor.async_processor import (
OrchestratorResponse,
LLMResponse,
)
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.output_queue import OutputQueueFrame, FrameType
from dailyai.services.ai_services import AIServiceConfig
from dailyai.message_handler.message_handler import MessageHandler
@@ -197,7 +197,7 @@ class Orchestrator(EventHandler):
self.logger.info("Camera thread stopped")
self.logger.info("Put stop in output queue")
self.output_queue.put(QueueFrame(FrameType.END_STREAM, None))
self.output_queue.put(OutputQueueFrame(FrameType.END_STREAM, None))
self.frame_consumer_thread.join()
self.logger.info("Orchestrator stopped.")
@@ -209,9 +209,6 @@ class Orchestrator(EventHandler):
def on_intro_finished(self, intro):
self.logger.info(f"Introduction has finished")
waiting = self.conversation_processors.waiting(self.services, self.message_handler, self.output_queue)
waiting.prepare()
waiting.play()
def on_response_played(self, response):
response.finalize()
@@ -367,7 +364,7 @@ class Orchestrator(EventHandler):
all_audio_frames = bytearray()
while True:
try:
frame:QueueFrame = self.output_queue.get()
frame:OutputQueueFrame = self.output_queue.get()
if frame.frame_type == FrameType.END_STREAM:
self.logger.info("Stopping frame consumer thread")
return
@@ -375,7 +372,7 @@ class Orchestrator(EventHandler):
# if interrupted, we just pull frames off the queue and discard them
if not self.is_interrupted.is_set():
if frame:
if frame.frame_type == FrameType.AUDIO:
if frame.frame_type == FrameType.AUDIO_FRAME:
chunk = frame.frame_data
all_audio_frames.extend(chunk)
@@ -385,7 +382,7 @@ class Orchestrator(EventHandler):
if l:
self.mic.write_frames(bytes(b[:l]))
b = b[l:]
elif frame.frame_type == FrameType.IMAGE:
elif frame.frame_type == FrameType.IMAGE_FRAME:
self.set_image(frame.frame_data)
elif len(b):
self.mic.write_frames(bytes(b))

View File

@@ -0,0 +1,14 @@
from enum import Enum
from dataclasses import dataclass
class FrameType(Enum):
AUDIO_FRAME = 1
IMAGE_FRAME = 2
START_STREAM = 3
END_STREAM = 4
@dataclass(frozen=True)
class OutputQueueFrame:
frame_type: FrameType
frame_data: bytes

View File

@@ -1,19 +0,0 @@
from enum import Enum
from dataclasses import dataclass
class FrameType(Enum):
START_STREAM = 0
END_STREAM = 1
AUDIO = 2
IMAGE = 3
SENTENCE = 4
TEXT_CHUNK = 5
LLM_MESSAGE = 6
APP_MESSAGE = 7
IMAGE_DESCRIPTION = 8
TRANSCRIPTION = 9
@dataclass(frozen=True)
class QueueFrame:
frame_type: FrameType
frame_data: str | dict | bytes | list | None

View File

@@ -1,2 +1,2 @@
Pillow==10.1.0
typing_extensions==4.9.0
typing_extensions==4.9.0

View File

@@ -1,73 +0,0 @@
from typing import AsyncGenerator
from dailyai.queue_frame import FrameType, QueueFrame
from dailyai.services.ai_services import AIService
class SentenceAggregator(AIService):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.current_sentence = ""
def allowed_input_frame_types(self) -> set[FrameType]:
return set([FrameType.TEXT_CHUNK, FrameType.SENTENCE])
def possible_output_frame_types(self) -> set[FrameType]:
return set([FrameType.SENTENCE])
async def process_frame(
self, requested_frame_types: set[FrameType], frame: QueueFrame
) -> AsyncGenerator[QueueFrame, None]:
if not FrameType.SENTENCE in requested_frame_types:
return
if frame.frame_type == FrameType.TEXT_CHUNK:
if type(frame.frame_data) != str:
raise Exception(
"Sentence aggregator requires a string for the data field"
)
self.current_sentence += frame.frame_data
if self.current_sentence.endswith((".", "?", "!")):
sentence = self.current_sentence
self.current_sentence = ""
yield QueueFrame(FrameType.SENTENCE, sentence)
elif frame.frame_type == FrameType.END_STREAM:
if self.current_sentence:
yield QueueFrame(FrameType.SENTENCE, self.current_sentence)
elif frame.frame_type == FrameType.SENTENCE:
yield frame
class TranscriptionSentenceAggregator(AIService):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.current_sentence = ""
def allowed_input_frame_types(self) -> set[FrameType]:
return set([FrameType.TEXT_CHUNK, FrameType.SENTENCE])
def possible_output_frame_types(self) -> set[FrameType]:
return set([FrameType.SENTENCE])
async def process_frame(
self, requested_frame_types: set[FrameType], frame: QueueFrame
) -> AsyncGenerator[QueueFrame, None]:
if not FrameType.SENTENCE in requested_frame_types:
return
if frame.frame_type == FrameType.TEXT_CHUNK:
if type(frame.frame_data) != str:
raise Exception(
"Sentence aggregator requires a string for the data field"
)
self.current_sentence += frame.frame_data
if self.current_sentence.endswith((".", "?", "!")):
sentence = self.current_sentence
self.current_sentence = ""
yield QueueFrame(FrameType.SENTENCE, sentence)
elif frame.frame_type == FrameType.END_STREAM:
if self.current_sentence:
yield QueueFrame(FrameType.SENTENCE, self.current_sentence)
elif frame.frame_type == FrameType.SENTENCE:
yield frame

View File

@@ -1,136 +1,33 @@
import asyncio
import logging
import re
from httpx import request
from dailyai.queue_frame import QueueFrame, FrameType
from abc import abstractmethod
from typing import AsyncGenerator, Iterable
from dataclasses import dataclass
from typing import AsyncGenerator
from typing import Generator
from PIL import Image
from collections.abc import Iterable, AsyncIterable
class AIService:
def __init__(self):
self.logger = logging.getLogger("dailyai")
def stop(self):
def close(self):
pass
def allowed_input_frame_types(self) -> set[FrameType]:
return set()
def possible_output_frame_types(self) -> set[FrameType]:
return set()
async def run_to_queue(self, queue: asyncio.Queue, frames, add_end_of_stream=False) -> None:
async for frame in self.run(frames):
await queue.put(frame)
if add_end_of_stream:
await queue.put(QueueFrame(FrameType.END_STREAM, None))
async def run(
self,
frames: Iterable[QueueFrame]
| AsyncIterable[QueueFrame]
| asyncio.Queue[QueueFrame],
requested_frame_types: set[FrameType] | None=None,
) -> AsyncGenerator[QueueFrame, None]:
if requested_frame_types and self.possible_output_frame_types().intersection(requested_frame_types) == set():
raise Exception(f"Requested frame types {requested_frame_types} are not supported by this service.")
if not requested_frame_types:
requested_frame_types = self.possible_output_frame_types()
if isinstance(frames, AsyncIterable):
async for frame in frames:
async for output_frame in self.process_frame(requested_frame_types, frame):
yield output_frame
elif isinstance(frames, Iterable):
for frame in frames:
async for output_frame in self.process_frame(requested_frame_types, frame):
yield output_frame
elif isinstance(frames, asyncio.Queue):
while True:
frame = await frames.get()
async for output_frame in self.process_frame(requested_frame_types, frame):
yield output_frame
if frame.frame_type == FrameType.END_STREAM:
break
else:
raise Exception("Frames must be an iterable or async iterable")
@abstractmethod
async def process_frame(self, requested_frame_types:set[FrameType], frame:QueueFrame) -> AsyncGenerator[QueueFrame, None]:
# Yield something so the linter can deduce what should happen here.
yield QueueFrame(FrameType.END_STREAM, None)
class SentenceAggregator(AIService):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.current_sentence = ""
def allowed_input_frame_types(self) -> set[FrameType]:
return set([FrameType.TEXT_CHUNK, FrameType.SENTENCE])
def possible_output_frame_types(self) -> set[FrameType]:
return set([FrameType.SENTENCE])
async def process_frame(self, requested_frame_types: set[FrameType], frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if not FrameType.SENTENCE in requested_frame_types:
return
if frame.frame_type == FrameType.TEXT_CHUNK:
if type(frame.frame_data) != str:
raise Exception(
"Sentence aggregator requires a string for the data field"
)
self.current_sentence += frame.frame_data
if self.current_sentence.endswith((".", "?", "!")):
sentence = self.current_sentence
self.current_sentence = ""
yield QueueFrame(FrameType.SENTENCE, sentence)
elif frame.frame_type == FrameType.END_STREAM:
if self.current_sentence:
yield QueueFrame(FrameType.SENTENCE, self.current_sentence)
elif frame.frame_type == FrameType.SENTENCE:
yield frame
class LLMService(AIService):
def allowed_input_frame_types(self) -> set[FrameType]:
return set([FrameType.LLM_MESSAGE, FrameType.SENTENCE, FrameType.TRANSCRIPTION])
def allowed_output_frame_types(self) -> set[FrameType]:
return set([FrameType.SENTENCE, FrameType.TEXT_CHUNK])
# Generate a set of responses to a prompt. Yields a list of responses.
@abstractmethod
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
yield ""
@abstractmethod
async def run_llm(self, messages) -> str:
def run_llm_async(
self, messages
) -> Generator[str, None, None]:
pass
async def process_frame(self, requested_frame_types: set[FrameType], frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if frame.frame_type == FrameType.LLM_MESSAGE:
if type(frame.frame_data) != list:
raise Exception("LLM service requires a dict for the data field")
messages: list[dict[str, str]] = frame.frame_data
if FrameType.SENTENCE in requested_frame_types:
yield QueueFrame(FrameType.SENTENCE, await self.run_llm(messages))
else:
async for text_chunk in self.run_llm_async(messages):
yield QueueFrame(FrameType.TEXT_CHUNK, text_chunk)
# TODO: handle other frame types! Need to aggregate into messages
# Generate a responses to a prompt. Returns the response
@abstractmethod
def run_llm(
self, messages
) -> str or None:
pass
class TTSService(AIService):
@@ -138,60 +35,19 @@ class TTSService(AIService):
def get_mic_sample_rate(self):
return 16000
def allowed_input_frame_types(self) -> set[FrameType]:
return set([FrameType.SENTENCE, FrameType.TRANSCRIPTION, FrameType.TEXT_CHUNK])
def possible_output_frame_types(self) -> set[FrameType]:
return set([FrameType.AUDIO])
# Converts the sentence to audio. Yields a list of audio frames that can
# be sent to the microphone device
@abstractmethod
async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]:
# yield empty bytes here, so linting can infer what this method does
yield bytes()
async def process_frame(self, requested_frame_types: set[FrameType], frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if not FrameType.AUDIO in requested_frame_types:
return
if type(frame.frame_data) != str:
raise Exception("TTS service requires a string for the data field")
async for audio_chunk in self.run_tts(frame.frame_data):
yield QueueFrame(FrameType.AUDIO, audio_chunk)
# Convenience function to send the audio for a sentence to the given queue
async def say(self, sentence, queue: asyncio.Queue):
await self.run_to_queue(queue, [QueueFrame(FrameType.SENTENCE, sentence)])
def run_tts(self, sentence) -> Generator[bytes, None, None]:
pass
class ImageGenService(AIService):
def __init__(self, image_size, **kwargs):
super().__init__(**kwargs)
self.image_size = image_size
def allowed_input_frame_types(self) -> set[FrameType]:
return set([FrameType.SENTENCE, FrameType.TRANSCRIPTION, FrameType.TEXT_CHUNK, FrameType.IMAGE_DESCRIPTION])
def possible_output_frame_types(self) -> set[FrameType]:
return set([FrameType.IMAGE])
# Renders the image. Returns an Image object.
@abstractmethod
async def run_image_gen(self, sentence) -> tuple[str, bytes]:
def run_image_gen(self, sentence) -> tuple[str, Image.Image]:
pass
async def process_frame(self, requested_frame_types: set[FrameType], frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if not FrameType.IMAGE in requested_frame_types:
return
if type(frame.frame_data) != str:
raise Exception("Image service requires a string for the data field")
(_, image_data) = await self.run_image_gen(frame.frame_data)
yield QueueFrame(FrameType.IMAGE, image_data)
@dataclass
class AIServiceConfig:

View File

@@ -1,13 +1,11 @@
import aiohttp
import asyncio
import io
import json
from openai import AsyncAzureOpenAI
import io
from openai import AzureOpenAI
import os
import requests
from collections.abc import AsyncGenerator
from typing import Generator
from dailyai.services.ai_services import LLMService, TTSService, ImageGenService
from PIL import Image
@@ -25,7 +23,7 @@ class AzureTTSService(TTSService):
self.speech_config = SpeechConfig(subscription=speech_key, region=speech_region)
self.speech_synthesizer = SpeechSynthesizer(speech_config=self.speech_config, audio_config=None)
async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]:
def run_tts(self, sentence) -> Generator[bytes, None, None]:
self.logger.info("Running azure tts")
ssml = "<speak version='1.0' xml:lang='en-US' xmlns='http://www.w3.org/2001/10/synthesis' " \
"xmlns:mstts='http://www.w3.org/2001/mstts'>" \
@@ -35,10 +33,7 @@ class AzureTTSService(TTSService):
"<prosody rate='1.05'>" \
f"{sentence}" \
"</prosody></mstts:express-as></voice></speak> "
try:
result = await asyncio.to_thread(self.speech_synthesizer.speak_ssml, (ssml))
except Exception as e:
self.logger.error("Error in azure tts", e)
result = self.speech_synthesizer.speak_ssml(ssml)
self.logger.info("Got azure tts result")
if result.reason == ResultReason.SynthesizingAudioCompleted:
self.logger.info("Returning result")
@@ -54,91 +49,45 @@ class AzureLLMService(LLMService):
def __init__(self, api_key=None, azure_endpoint=None, api_version=None, model=None):
super().__init__()
api_key = api_key or os.getenv("AZURE_CHATGPT_KEY")
azure_endpoint = azure_endpoint or os.getenv("AZURE_CHATGPT_ENDPOINT")
if not azure_endpoint:
raise Exception("No azure endpoint specified for Azure LLM, please set AZURE_CHATGPT_ENDPOINT in the environment or pass it to the AzureLLMService constructor")
model: str | None = model or os.getenv("AZURE_CHATGPT_DEPLOYMENT_ID")
if not model:
raise Exception("No model specified for Azure LLM, please set AZURE_CHATGPT_DEPLOYMENT_ID in the environment or pass it to the AzureLLMService constructor")
self.model: str = model
api_version = api_version or "2023-12-01-preview"
self.client = AsyncAzureOpenAI(
self.client = AzureOpenAI(
api_key=api_key,
azure_endpoint=azure_endpoint,
api_version=api_version,
)
self.model = model or os.getenv("AZURE_CHATGPT_DEPLOYMENT_ID")
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
def get_response(self, messages, stream):
return self.client.chat.completions.create(
stream=stream,
messages=messages,
model=self.model,
)
def run_llm_async(self, messages) -> Generator[str, None, None]:
messages_for_log = json.dumps(messages)
self.logger.debug(f"Generating chat via azure: {messages_for_log}")
chunks = await self.client.chat.completions.create(model=self.model, stream=True, messages=messages)
async for chunk in chunks:
response = self.get_response(messages, stream=True)
for chunk in response:
if len(chunk.choices) == 0:
continue
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
async def run_llm(self, messages) -> str | None:
def run_llm(self, messages) -> str | None:
messages_for_log = json.dumps(messages)
self.logger.debug(f"Generating chat via azure: {messages_for_log}")
response = await self.client.chat.completions.create(model=self.model, stream=False, messages=messages)
response = self.get_response(messages, stream=False)
if response and len(response.choices) > 0:
return response.choices[0].message.content
else:
return None
class AzureImageGenServiceREST(ImageGenService):
def __init__(self, image_size:str, api_key=None, azure_endpoint=None, api_version=None, model=None):
super().__init__(image_size=image_size)
self.api_key = api_key or os.getenv("AZURE_DALLE_KEY")
self.azure_endpoint = azure_endpoint or os.getenv("AZURE_DALLE_ENDPOINT")
self.api_version = api_version or "2023-06-01-preview"
self.model = model or os.getenv("AZURE_DALLE_DEPLOYMENT_ID")
async def run_image_gen(self, sentence) -> tuple[str, bytes]:
# TODO hoist the session to app-level
async with aiohttp.ClientSession() as session:
url = f"{self.azure_endpoint}openai/images/generations:submit?api-version={self.api_version}"
headers= { "api-key": self.api_key, "Content-Type": "application/json" }
body = {
# Enter your prompt text here
"prompt": sentence,
"size": self.image_size,
"n": 1,
}
async with session.post(url, headers=headers, json=body) as submission:
operation_location = submission.headers['operation-location']
status = ""
attempts_left = 120
json_response = None
while status != "succeeded":
attempts_left -= 1
if attempts_left == 0:
raise Exception("Image generation timed out")
await asyncio.sleep(1)
response = await session.get(operation_location, headers=headers)
json_response = await response.json()
status = json_response["status"]
image_url = json_response["result"]["data"][0]["url"] if json_response else None
if not image_url:
raise Exception("Image generation failed")
# Load the image from the url
async with session.get(image_url) as response:
image_stream = io.BytesIO(await response.content.read())
image = Image.open(image_stream)
return (image_url, image.tobytes())
class AzureImageGenService(ImageGenService):
@@ -147,7 +96,7 @@ class AzureImageGenService(ImageGenService):
api_key = api_key or os.getenv("AZURE_DALLE_KEY")
azure_endpoint = azure_endpoint or os.getenv("AZURE_DALLE_ENDPOINT")
api_version = api_version or "2023-06-01-preview"
api_version = api_version or "2023-12-01-preview"
self.model = model or os.getenv("AZURE_DALLE_DEPLOYMENT_ID")
self.client = AzureOpenAI(
@@ -156,20 +105,20 @@ class AzureImageGenService(ImageGenService):
api_version=api_version,
)
async def run_image_gen(self, sentence) -> tuple[str, bytes]:
def run_image_gen(self, sentence) -> tuple[str, Image.Image]:
self.logger.info("Generating azure image", sentence)
image = self.client.images.generate(
model=self.model,
prompt=sentence,
n=1,
size=self.image_size,
size=f"1024x1024",
)
url = image["data"][0]["url"]
response = requests.get(url)
dalle_stream = io.BytesIO(response.content)
dalle_im = Image.open(dalle_stream.tobytes())
dalle_im = Image.open(dalle_stream)
return (url, dalle_im)

View File

@@ -1,357 +0,0 @@
import asyncio
import inspect
import logging
import time
import types
from functools import partial
from queue import Queue, Empty
from dailyai.queue_frame import QueueFrame, FrameType
from threading import Thread, Event, Timer
from daily import (
EventHandler,
CallClient,
Daily,
VirtualCameraDevice,
VirtualMicrophoneDevice,
VirtualSpeakerDevice,
)
class DailyTransportService(EventHandler):
def __init__(
self,
room_url: str,
token: str | None,
bot_name: str,
duration: float = 10,
):
super().__init__()
self.bot_name: str = bot_name
self.room_url: str = room_url
self.token: str | None = token
self.duration: float = duration
self.expiration = time.time() + duration * 60
# This queue is used to marshal frames from the async send queue to the thread that emits audio & video.
# We need this to maintain the asynchronous behavior of asyncio queues -- to give async functions
# a chance to run while waiting for queue items -- but also to maintain thread safety and have a threaded
# handler to send frames, to ensure that sending isn't subject to pauses in the async thread.
self.threadsafe_send_queue = Queue()
self.is_interrupted = Event()
self.stop_threads = Event()
self.story_started = False
self.mic_enabled = False
self.mic_sample_rate = 16000
self.camera_width = 1024
self.camera_height = 768
self.camera_enabled = False
self.send_queue = asyncio.Queue()
self.receive_queue = asyncio.Queue()
self.other_participant_has_joined = False
self.camera_thread = None
self.frame_consumer_thread = None
self.transcription_settings = {
"language": "en",
"tier": "nova",
"model": "2-conversationalai",
"profanity_filter": True,
"redact": False,
"extra": {
"endpointing": True,
"punctuate": False,
},
}
self.logger: logging.Logger = logging.getLogger("dailyai")
self.event_handlers = {}
try:
self.loop = asyncio.get_running_loop()
except RuntimeError:
self.loop = None
def patch_method(self, event_name, *args, **kwargs):
try:
for handler in self.event_handlers[event_name]:
if inspect.iscoroutinefunction(handler):
if self.loop:
asyncio.run_coroutine_threadsafe(handler(*args, **kwargs), self.loop)
else:
raise Exception("No event loop to run coroutine. In order to use async event handlers, you must run the DailyTransportService in an asyncio event loop.")
else:
handler(*args, **kwargs)
except Exception as e:
self.logger.error(f"Exception in event handler {event_name}: {e}")
def add_event_handler(self, event_name: str, handler):
if not event_name.startswith("on_"):
raise Exception(f"Event handler {event_name} must start with 'on_'")
methods = inspect.getmembers(self, predicate=inspect.ismethod)
if event_name not in [method[0] for method in methods]:
raise Exception(f"Event handler {event_name} not found")
if not event_name in self.event_handlers:
self.event_handlers[event_name] = [getattr(self, event_name), types.MethodType(handler, self)]
setattr(self, event_name, partial(self.patch_method, event_name))
else:
self.event_handlers[event_name].append(types.MethodType(handler, self))
def event_handler(self, event_name: str):
def decorator(handler):
self.add_event_handler(event_name, handler)
return handler
return decorator
def configure_daily(self):
Daily.init()
self.client = CallClient(event_handler=self)
if self.mic_enabled:
self.mic: VirtualMicrophoneDevice = Daily.create_microphone_device(
"mic", sample_rate=self.mic_sample_rate, channels=1
)
if self.camera_enabled:
self.camera: VirtualCameraDevice = Daily.create_camera_device(
"camera", width=self.camera_width, height=self.camera_height, color_format="RGB"
)
self.speaker: VirtualSpeakerDevice = Daily.create_speaker_device(
"speaker", sample_rate=16000, channels=1
)
self.image: bytes | None = None
self.camera_thread = Thread(target=self.run_camera, daemon=True)
self.camera_thread.start()
self.logger.info("Starting frame consumer thread")
self.frame_consumer_thread = Thread(target=self.frame_consumer, daemon=True)
self.frame_consumer_thread.start()
Daily.select_speaker_device("speaker")
self.client.set_user_name(self.bot_name)
self.client.join(self.room_url, self.token, completion=self.call_joined)
self.client.update_inputs(
{
"camera": {
"isEnabled": True,
"settings": {
"deviceId": "camera",
},
},
"microphone": {
"isEnabled": True,
"settings": {
"deviceId": "mic",
"customConstraints": {
"autoGainControl": {"exact": False},
"echoCancellation": {"exact": False},
"noiseSuppression": {"exact": False},
},
},
},
}
)
self.client.update_publishing(
{
"camera": {
"sendSettings": {
"maxQuality": "low",
"encodings": {
"low": {
"maxBitrate": 250000,
"scaleResolutionDownBy": 1.333,
"maxFramerate": 8,
}
},
}
}
}
)
if self.token:
self.client.start_transcription(self.transcription_settings)
self.my_participant_id = self.client.participants()["local"]["id"]
async def get_receive_frames(self):
while True:
frame = await self.receive_queue.get()
yield frame
if frame.frame_type == FrameType.END_STREAM:
break
def get_async_send_queue(self):
return self.send_queue
async def marshal_frames(self):
while True:
frame: QueueFrame | list = await self.send_queue.get()
self.threadsafe_send_queue.put(frame)
self.send_queue.task_done()
if type(frame) == QueueFrame and frame.frame_type == FrameType.END_STREAM:
break
async def wait_for_send_queue_to_empty(self):
await self.send_queue.join()
self.threadsafe_send_queue.join()
async def stop_when_done(self):
await self.wait_for_send_queue_to_empty()
self.stop()
async def run(self) -> None:
self.configure_daily()
self.participant_left = False
async_output_queue_marshal_task = asyncio.create_task(self.marshal_frames())
try:
participant_count: int = len(self.client.participants())
self.logger.info(f"{participant_count} participants in room")
while time.time() < self.expiration and not self.participant_left and not self.stop_threads.is_set():
await asyncio.sleep(1)
except Exception as e:
self.logger.error(f"Exception {e}")
finally:
self.client.leave()
self.stop_threads.set()
await self.receive_queue.put(QueueFrame(FrameType.END_STREAM, None))
await self.send_queue.put(QueueFrame(FrameType.END_STREAM, None))
await async_output_queue_marshal_task
if self.camera_thread and self.camera_thread.is_alive():
self.camera_thread.join()
if self.frame_consumer_thread and self.frame_consumer_thread.is_alive():
self.frame_consumer_thread.join()
def stop(self):
self.stop_threads.set()
def on_first_other_participant_joined(self):
pass
def call_joined(self, join_data, client_error):
self.logger.info(f"Call_joined: {join_data}, {client_error}")
def on_error(self, error):
self.logger.error(f"on_error: {error}")
def on_call_state_updated(self, state):
pass
def on_participant_joined(self, participant):
if not self.other_participant_has_joined and participant["id"] != self.my_participant_id:
self.other_participant_has_joined = True
self.on_first_other_participant_joined()
def on_participant_left(self, participant, reason):
if len(self.client.participants()) < 2:
self.participant_left = True
pass
def on_app_message(self, message, sender):
pass
def on_transcription_message(self, message:dict):
if self.loop:
frame = QueueFrame(FrameType.TRANSCRIPTION, message)
asyncio.run_coroutine_threadsafe(self.receive_queue.put(frame), self.loop)
def on_transcription_stopped(self, stopped_by, stopped_by_error):
pass
def on_transcription_error(self, message):
pass
def on_transcription_started(self, status):
pass
def set_image(self, image: bytes):
self.image: bytes | None = image
def run_camera(self):
try:
while not self.stop_threads.is_set():
if self.image:
self.camera.write_frame(self.image)
time.sleep(1.0 / 8) # 8 fps
except Exception as e:
self.logger.error(f"Exception {e} in camera thread.")
def frame_consumer(self):
self.logger.info("🎬 Starting frame consumer thread")
b = bytearray()
smallest_write_size = 3200
all_audio_frames = bytearray()
while True:
try:
frames_or_frame: QueueFrame | list[QueueFrame] = self.threadsafe_send_queue.get()
if type(frames_or_frame) == QueueFrame:
frames: list[QueueFrame] = [frames_or_frame]
elif type(frames_or_frame) == list:
frames: list[QueueFrame] = frames_or_frame
else:
raise Exception("Unknown type in output queue")
for frame in frames:
if frame.frame_type == FrameType.END_STREAM:
self.logger.info("Stopping frame consumer thread")
self.threadsafe_send_queue.task_done()
return
# if interrupted, we just pull frames off the queue and discard them
if not self.is_interrupted.is_set():
if frame:
if frame.frame_type == FrameType.AUDIO:
chunk = frame.frame_data
all_audio_frames.extend(chunk)
b.extend(chunk)
l = len(b) - (len(b) % smallest_write_size)
if l:
self.mic.write_frames(bytes(b[:l]))
b = b[l:]
elif frame.frame_type == FrameType.IMAGE:
self.set_image(frame.frame_data)
elif len(b):
self.mic.write_frames(bytes(b))
b = bytearray()
else:
if self.interrupt_time:
self.logger.info(
f"Lag to stop stream after interruption {time.perf_counter() - self.interrupt_time}"
)
self.interrupt_time = None
if frame.frame_type == FrameType.START_STREAM:
self.is_interrupted.clear()
self.threadsafe_send_queue.task_done()
except Empty:
try:
if len(b):
self.mic.write_frames(bytes(b))
except Exception as e:
self.logger.error(f"Exception in frame_consumer: {e}, {len(b)}")
b = bytearray()

View File

@@ -1,29 +0,0 @@
import aiohttp
import asyncio
import os
import requests
from collections.abc import AsyncGenerator
from dailyai.services.ai_services import TTSService
class DeepgramTTSService(TTSService):
def __init__(self, speech_key=None, voice=None):
super().__init__()
self.voice = voice or os.getenv("DEEPGRAM_VOICE") or "alpha-asteria-en-v2"
self.speech_key = speech_key or os.getenv("DEEPGRAM_API_KEY")
def get_mic_sample_rate(self):
return 24000
async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]:
self.logger.info(f"Running deepgram tts for {sentence}")
base_url = "https://api.beta.deepgram.com/v1/speak"
request_url = f"{base_url}?model={self.voice}&encoding=linear16&container=none&sample_rate=16000"
headers = {"authorization": f"token {self.speech_key}"}
body = { "text": sentence }
async with aiohttp.ClientSession() as session:
async with session.post(request_url, headers=headers, json=body) as r:
async for data in r.content:
yield data

View File

@@ -1,36 +1,38 @@
import aiohttp
import os
import requests
import time
from typing import AsyncGenerator
from typing import Generator
from dailyai.services.ai_services import TTSService
from ..services.ai_services import TTSService
class ElevenLabsTTSService(TTSService):
def __init__(self, api_key=None, voice_id=None):
def __init__(self):
super().__init__()
self.api_key = api_key or os.getenv("ELEVENLABS_API_KEY")
self.voice_id = voice_id or os.getenv("ELEVENLABS_VOICE_ID")
self.api_key = os.getenv("ELEVENLABS_API_KEY")
self.voice_id = os.getenv("ELEVENLABS_VOICE_ID")
async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]:
async with aiohttp.ClientSession() as session:
url = f"https://api.elevenlabs.io/v1/text-to-speech/{self.voice_id}/stream"
payload = {"text": sentence, "model_id": "eleven_turbo_v2"}
querystring = {"output_format": "pcm_16000", "optimize_streaming_latency": 2}
headers = {
"xi-api-key": self.api_key,
"Content-Type": "application/json",
}
async with session.post(url, json=payload, headers=headers, params=querystring) as r:
if r.status != 200:
self.logger.error(
f"audio fetch status code: {r.status}, error: {r.text}"
)
return
def run_tts(self, sentence) -> Generator[bytes, None, None]:
url = f"https://api.elevenlabs.io/v1/text-to-speech/{self.voice_id}/stream"
payload = {"text": sentence, "model_id": "eleven_turbo_v2"}
querystring = {"output_format": "pcm_16000", "optimize_streaming_latency": 2}
headers = {
"xi-api-key": self.api_key,
"Content-Type": "application/json",
}
async for chunk in r.content:
if chunk:
yield chunk
r = requests.request(
"POST", url, json=payload, headers=headers, params=querystring, stream=True
)
if r.status_code != 200:
self.logger.error(
f"audio fetch status code: {r.status_code}, error: {r.text}"
)
return
for chunk in r.iter_content(chunk_size=3200):
if chunk:
yield chunk

View File

@@ -1,49 +0,0 @@
import fal
import aiohttp
import asyncio
import io
import json
from PIL import Image
from dailyai.services.ai_services import LLMService, TTSService, ImageGenService
# Fal expects FAL_KEY_ID and FAL_KEY_SECRET to be set in the env
class FalImageGenService(ImageGenService):
def __init__(self, image_size):
super().__init__(image_size)
async def run_image_gen(self, sentence) -> tuple[str, bytes]:
def get_image_url(sentence, size):
print("starting fal submit...")
handler = fal.apps.submit(
"110602490-fast-sdxl",
arguments={
"prompt": sentence
},
)
print("past fal handler init, about to wait for iter_events...")
for event in handler.iter_events():
if isinstance(event, fal.apps.InProgress):
print('Request in progress')
print(event.logs)
result = handler.get()
image_url = result["images"][0]["url"] if result else None
if not image_url:
raise Exception("Image generation failed")
return image_url
print(f"fetching image url...")
image_url = await asyncio.to_thread(get_image_url, sentence, self.image_size)
print(f"got image url, downloading image...")
# Load the image from the url
async with aiohttp.ClientSession() as session:
async with session.get(image_url) as response:
print("got image response")
image_stream = io.BytesIO(await response.content.read())
print("read image stream")
image = Image.open(image_stream)
return (image_url, image.tobytes())
# return (image_url, dalle_im.tobytes())

View File

@@ -0,0 +1,67 @@
from dailyai.services.ai_services import AIService, TTSService, LLMService, ImageGenService
from typing import Generator
import requests
from PIL import Image
import io
from openai import OpenAI
import os
import json
class OpenAILLMService(LLMService):
def __init__(self, api_key=None, model=None):
super().__init__()
api_key = api_key or os.getenv("OPEN_AI_KEY")
self.model = model or os.getenv("OPEN_AI_MODEL")
self.client = OpenAI(api_key=api_key)
def get_response(self, messages, stream):
return self.client.chat.completions.create(
stream=stream,
messages=messages,
model=self.model
)
def run_llm_async(self, messages) -> Generator[str, None, None]:
messages_for_log = json.dumps(messages)
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
response = self.get_response(messages, stream=True)
for chunk in response:
if len(chunk.choices) == 0:
continue
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
def run_llm(self, messages) -> str | None:
messages_for_log = json.dumps(messages)
self.logger.debug(f"Generating chat via azure: {messages_for_log}")
response = self.get_response(messages, stream=False)
if response and len(response.choices) > 0:
return response.choices[0].message.content
else:
return None
class OpenAIImageGenService(ImageGenService):
def __init__(self, api_key=None, model=None):
super().__init__()
api_key = api_key or os.getenv("OPEN_AI_KEY")
self.model = model or os.getenv("OPEN_AI_MODEL")
self.client = OpenAI(api_key=api_key)
def run_image_gen(self, sentence) -> tuple[str, Image.Image]:
image = self.client.images.generate(
prompt=sentence,
n=1,
size=f"1024x1024"
)
image_url = image.data[0].url
response = requests.get(image_url)
dalle_stream = io.BytesIO(response.content)
dalle_im = Image.open(dalle_stream)
return (image_url, dalle_im)

View File

@@ -1,79 +0,0 @@
import requests
import aiohttp
import asyncio
from PIL import Image
import io
from openai import AsyncOpenAI
import os
import json
from collections.abc import AsyncGenerator
from dailyai.services.ai_services import AIService, TTSService, LLMService, ImageGenService
class OpenAILLMService(LLMService):
def __init__(self, api_key=None, model=None):
super().__init__()
api_key = api_key or os.getenv("OPEN_AI_KEY")
self.model = model or os.getenv("OPEN_AI_LLM_MODEL") or "gpt-4"
self.client = AsyncOpenAI(api_key=api_key)
async def get_response(self, messages, stream):
return await self.client.chat.completions.create(
stream=stream,
messages=messages,
model=self.model
)
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
messages_for_log = json.dumps(messages)
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
response = await self.get_response(messages, stream=True)
for chunk in response:
if len(chunk.choices) == 0:
continue
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
async def run_llm(self, messages) -> str | None:
messages_for_log = json.dumps(messages)
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
response = await self.get_response(messages, stream=False)
if response and len(response.choices) > 0:
return response.choices[0].message.content
else:
return None
class OpenAIImageGenService(ImageGenService):
def __init__(self, image_size:str, api_key=None, model=None):
super().__init__(image_size=image_size)
api_key = api_key or os.getenv("OPEN_AI_KEY")
self.model = model or os.getenv("OPEN_AI_IMAGE_MODEL") or "dall-e-3"
self.client = AsyncOpenAI(api_key=api_key)
async def run_image_gen(self, sentence) -> tuple[str, bytes]:
self.logger.info("Generating OpenAI image", sentence)
image = await self.client.images.generate(
prompt=sentence,
model=self.model,
n=1,
size=self.image_size
)
image_url = image.data[0].url
if not image_url:
raise Exception("No image provided in response", image)
# Load the image from the url
async with aiohttp.ClientSession() as session:
async with session.get(image_url) as response:
image_stream = io.BytesIO(await response.content.read())
image = Image.open(image_stream)
return (image_url, image.tobytes())
return (image_url, dalle_im.tobytes())

View File

@@ -20,6 +20,7 @@ class GoogleAIService(AIService):
)
def run_tts(self, sentence):
print("running google tts")
synthesis_input = texttospeech.SynthesisInput(text = sentence.strip())
result = self.client.synthesize_speech(input=synthesis_input, voice=self.voice, audio_config=self.audio_config)
return result

View File

@@ -13,6 +13,7 @@ class HuggingFaceAIService(AIService):
# available models at https://huggingface.co/Helsinki-NLP (**not all models use 2-character language codes**)
def run_text_translation(self, sentence, source_language, target_language):
translator = pipeline(f"translation", model=f"Helsinki-NLP/opus-mt-{source_language}-{target_language}")
print(translator(sentence))
return translator(sentence)[0]["translation_text"]

View File

@@ -1,129 +0,0 @@
from re import A
import unittest
from typing import AsyncGenerator, Generator
from dailyai.services.ai_services import AIService, SentenceAggregator
from dailyai.queue_frame import QueueFrame, FrameType
class SimpleAIService(AIService):
def allowed_input_frame_types(self) -> set[FrameType]:
return set([FrameType.TEXT_CHUNK])
def possible_output_frame_types(self) -> set[FrameType]:
return set([FrameType.TEXT_CHUNK])
async def process_frame(self, requested_frame_types: set[FrameType], frame: QueueFrame) -> QueueFrame | None:
return frame
class TestBaseAIService(unittest.IsolatedAsyncioTestCase):
async def test_async_input(self):
service = SimpleAIService()
input_frames = [
QueueFrame(FrameType.TEXT_CHUNK, "hello"),
QueueFrame(FrameType.END_STREAM, None),
]
async def iterate_frames() -> AsyncGenerator[QueueFrame, None]:
for frame in input_frames:
yield frame
output_frames = []
async for frame in service.run(set([FrameType.TEXT_CHUNK]), iterate_frames()):
output_frames.append(frame)
self.assertEqual(input_frames, output_frames)
async def test_nonasync_input(self):
service = SimpleAIService()
input_frames = [
QueueFrame(FrameType.TEXT_CHUNK, "hello"),
QueueFrame(FrameType.END_STREAM, None),
]
def iterate_frames() -> Generator[QueueFrame, None, None]:
for frame in input_frames:
yield frame
output_frames = []
async for frame in service.run(set([FrameType.TEXT_CHUNK]), iterate_frames()):
output_frames.append(frame)
self.assertEqual(input_frames, output_frames)
class TestSentenceAggregator(unittest.IsolatedAsyncioTestCase):
async def test_clause(self) -> None:
input_frames = [
QueueFrame(FrameType.TEXT_CHUNK, "hello"),
QueueFrame(FrameType.END_STREAM, None),
]
service = SentenceAggregator()
output_frames = []
async for frame in service.run(set([FrameType.SENTENCE]), input_frames):
output_frames.append(frame)
self.assertEqual(1, len(output_frames))
self.assertEqual(QueueFrame(FrameType.SENTENCE, "hello"), output_frames[0])
async def test_sentence(self) -> None:
input_frames = [
QueueFrame(FrameType.TEXT_CHUNK, "hello, "),
QueueFrame(FrameType.TEXT_CHUNK, "world."),
QueueFrame(FrameType.END_STREAM, None),
]
service = SentenceAggregator()
output_frames = []
async for frame in service.run(set([FrameType.SENTENCE]), input_frames):
output_frames.append(frame)
self.assertEqual(1, len(output_frames))
self.assertEqual(QueueFrame(FrameType.SENTENCE, "hello, world."), output_frames[0])
async def test_sentence_and_clause(self) -> None:
input_frames = [
QueueFrame(FrameType.TEXT_CHUNK, "hello, "),
QueueFrame(FrameType.TEXT_CHUNK, "world."),
QueueFrame(FrameType.TEXT_CHUNK, " How are"),
QueueFrame(FrameType.END_STREAM, None),
]
service = SentenceAggregator()
output_frames = []
async for frame in service.run(set([FrameType.SENTENCE]), input_frames):
output_frames.append(frame)
self.assertEqual(2, len(output_frames))
self.assertEqual(
QueueFrame(FrameType.SENTENCE, "hello, world."), output_frames[0]
)
self.assertEqual(
QueueFrame(FrameType.SENTENCE, " How are"), output_frames[1]
)
async def test_two_sentences(self) -> None:
input_frames = [
QueueFrame(FrameType.TEXT_CHUNK, "hello, "),
QueueFrame(FrameType.TEXT_CHUNK, "world."),
QueueFrame(FrameType.TEXT_CHUNK, " How are"),
QueueFrame(FrameType.TEXT_CHUNK, " you doing?"),
QueueFrame(FrameType.END_STREAM, None),
]
service = SentenceAggregator()
output_frames = []
async for frame in service.run(set([FrameType.SENTENCE]), input_frames):
output_frames.append(frame)
self.assertEqual(2, len(output_frames))
self.assertEqual(
QueueFrame(FrameType.SENTENCE, "hello, world."), output_frames[0]
)
self.assertEqual(QueueFrame(FrameType.SENTENCE, " How are you doing?"), output_frames[1])
if __name__ == "__main__":
unittest.main()

View File

@@ -11,14 +11,14 @@ from dailyai.async_processor.async_processor import (
LLMResponse,
)
from dailyai.message_handler.message_handler import MessageHandler
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.output_queue import OutputQueueFrame, FrameType
from dailyai.services.ai_services import (
AIServiceConfig,
ImageGenService,
LLMService,
TTSService,
)
"""
class MockTTSService(TTSService):
def run_tts(self, sentence):
for word in sentence.split(' '):
@@ -71,7 +71,7 @@ class TestResponse(unittest.TestCase):
output_queue.task_done()
while expected_words:
actual_word:QueueFrame = output_queue.get()
actual_word:OutputQueueFrame = output_queue.get()
word = expected_words.pop(0)
self.assertEqual(actual_word.frame_type, FrameType.AUDIO_FRAME)
self.assertEqual(actual_word.frame_data, bytes(word, "utf-8"))
@@ -127,7 +127,7 @@ class TestResponse(unittest.TestCase):
expected_words = ["Hello", "there.", "How", "are", "you?", "I", "hope", "you", "are", "well."]
while expected_words and not stop_processing_output_queue.is_set():
try:
actual_word:QueueFrame = output_queue.get_nowait()
actual_word:OutputQueueFrame = output_queue.get_nowait()
if actual_word.frame_type == FrameType.AUDIO_FRAME:
time.sleep(0.1)
word = expected_words.pop(0)
@@ -177,4 +177,3 @@ class TestResponse(unittest.TestCase):
if __name__ == '__main__':
unittest.main()
"""

View File

@@ -0,0 +1,55 @@
import argparse
import time
from dailyai.orchestrator import OrchestratorConfig, Orchestrator
from dailyai.message_handler.message_handler import MessageHandler
from dailyai.services.ai_services import AIServiceConfig
from dailyai.services.azure_ai_services import AzureTTSService, AzureLLMService
# For now, use Azure service for the TTS. Todo: make tts service
# and tts args (like which voice to use) configurable via command
# line arguments.
# Need the following environment variables:
# - AZURE_SPEECH_SERVICE_KEY
# - AZURE_SPEECH_SERVICE_REGION
def add_bot_to_room(room_url, text) -> None:
message_handler = MessageHandler(
"Respond with only the following text: " + text)
services = AIServiceConfig(
tts=AzureTTSService(), image=None, llm=AzureLLMService()
)
orchestrator_config = OrchestratorConfig(
room_url=room_url,
# todo: token should be optional
token=None,
bot_name="Minimal Speaking Bot",
# todo: expiration should be optional
expiration=time.time() + 10
)
orchestrator = Orchestrator(
orchestrator_config,
services,
message_handler,
)
orchestrator.start()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Say one phrase and exit")
parser.add_argument("-u", "--url", type=str,
required=True, help="URL of the Daily room")
parser.add_argument(
"-t", "--text", type=str, required=True, help="text to send into the session as speech"
)
args: argparse.Namespace = parser.parse_args()
add_bot_to_room(args.url, args.text)

View File

@@ -0,0 +1,48 @@
from dailyai.services.transport.DailyTransport import DailyTransportService
from dailyai.services.tts.AzureTTSService import AzureTTSService
transport = None
mic = None
tts = None
def main():
global transport
global mic
global tts
# create a transport service object using environment variables for
# the transport service's API key, room url, and any other configuration.
# services can all define and document the environment variables they use.
# services all also take an optional config object that is used instead of
# environment variables.
#
# the abstract transport service APIs presumably can map pretty closely
# to the daily-python basic API
transport = DailyTransportService()
# similarly, create a tts service
tts = AzureTTSService()
# ask the transport to create a local audio "device"/queue for
# chunks of audio to play sequentially. the "mic" object is a handle
# we can use to inspect and control the queue if we need to. in this
# case we will pipe into this queue from the tts service
mic = transport.create_audio_queue()
tts.set_output(mic)
transport.on("error", lambda e: print(e))
transport.on("joined-meeting", say_one_thing)
transport.start()
def say_one_thing():
# say one thing, then leave
tts.run_tts("hello world")
mic.on("audio-queue-empty", shutdown)
def shutdown():
transport.stop()
tts.close()

View File

@@ -0,0 +1,35 @@
from dailyai.services.transport.DailyTransport import DailyTransportService
from dailyai.services.llm.AzureLLMService import AzureLLMService
from dailyai.services.tts.AzureTTSService import AzureTTSService
transport = None
llm = None
tts = None
def main():
global transport
global llm
global tts
transport = DailyTransportService()
llm = AzureLLMService()
tts = AzureTTSService()
mic = transport.create_audio_queue()
tts.set_output(mic)
llm.set_output(tts)
transport.on("error", lambda e: print(e))
transport.on("joined-meeting", make_one_inference_call)
transport.start()
def make_one_inference_call():
# ask our llm to say one thing, then leave
llm.run_llm("tell me a joke about llamas")
transport.on("audio-queue-empty", shutdown)
def shutdown():
transport.stop()
tts.close()

View File

@@ -0,0 +1,27 @@
from dailyai.services.transport.DailyTransport import DailyTransportService
from dailyai.services.genimage.AzureDalleService import AzureDalleService
dalle = None
def main():
global dalle
transport = DailyTransportService()
dalle = AzureDalleService()
# create_video_queue() could presumably take configuration parameters that
# correspond to Daily video settings (resolution, framerate, target
# bitrate, etc.)
cam = transport.create_video_queue()
dalle.set_output(cam)
transport.on("error", lambda e: print(e))
transport.on("joined-meeting", say_one_thing)
transport.start()
def say_one_thing():
# make one image, send it to the video queue, then just hang out.
# for simplicity we have not implemented graceful shutdown :-)
dalle.generate_image("an astronaut riding a skateboard")

View File

@@ -0,0 +1,37 @@
from dailyai.services.transport.DailyTransport import DailyTransportService
from dailyai.services.llm.AzureLLMService import AzureLLMService
from dailyai.services.tts.AzureTTSService import AzureTTSService
transport = None
llm = None
tts = None
def main():
global transport
global llm
global tts
transport = DailyTransportService()
llm = AzureLLMService()
tts = AzureTTSService()
mic = transport.create_audio_queue()
tts.set_output(mic)
llm.set_output(tts)
transport.on("error", lambda e: print(e))
transport.on("joined-meeting", say_two_things)
transport.start()
def say_two_things():
# queue two pieces of speech: one specified as a text literal,
# and one generated by an llm
tts.run_tts("My friend the LLM is now going to tell a joke about llamas.")
llm.run_llm("tell me a joke about llamas")
transport.on("audio-queue-empty", shutdown)
def shutdown():
transport.stop()
tts.close()

View File

@@ -0,0 +1,101 @@
from dailyai.services.transport.DailyTransport import DailyTransportService
from dailyai.services.llm.AzureLLMService import AzureLLMService
from dailyai.services.tts.AzureTTSService import AzureTTSService
from dailyai.services.genimage.AzureDalleService import AzureDalleService
from dailyai.services.utils.AudioImageSynchronizedPair import AudioImageSynchronizedPair
transport = None
llm = None
tts = None
dalle = None
mic = None
cam = None
def main():
global transport
global llm
global tts
global dalle
transport = DailyTransportService()
llm = AzureLLMService()
tts = AzureTTSService()
dalle = AzureDalleService()
# set up mic and cam. but don't wire up automatic output to the mic
# and cam from our AI services because we need to manage synchronization
# of image/speech pairings
mic = transport.create_audio_queue()
cam = transport.create_video_queue()
transport.on("error", lambda e: print(e))
transport.on("joined-meeting", narrate_calendar_images)
transport.start()
def narrate_calendar_images():
# let's loop over the months of the year. for each month name, we will have
# our llm generate a description of a nice photograph for that month's page
# in a calendar.
#
# then we'll take the text description and:
# 1. turn it into speech that we send into the session as audio
# 2. turn it into an image that we send into the session as video
# we want the audio and video to be synchronized, so we'll use a helper
# class to manage that.
#
# the first `run_llm()` call defines a lambda to process its output.
#
# the design idea here is that output can be piped into a function that
# takes inference completion text as its argument. *or* output can be
# piped into an object that has more options (maybe a callback for streaming
# results, or a callback for inference completion, or both).
#
# note that we might queue up the month outputs out of order, but that's
# okay for this demo
#
for month in ["January", "February", "March", "April", "May", "June", "July", "August", "September", "October", "November", "December"]:
synchronizer = AudioImageSynchronizedPair(
audio_output=mic, video_output=cam)
llm.run_llm(
f""""
Describe a nature photograph suitable for use in a calendar,
for the month of {month}. Include only the image description
with no preamble.
""",
output=lambda inference_text: (
dalle.generate_image(inference_text, output=synchronizer),
tts.run_tts(inference_text, output=synchronizer)
),
)
# the AudioImageSynchronizedPair class seems useful enough that I've listed
# it above as a standard utility we can import. but here's a theoretical
# implementation
class TheoreticalAudioImageSynchronizedPair:
def __init__(self, audio_output, video_output):
self.audio_output = audio_output
self.video_output = video_output
self.image = None
self.audio = None
def image_generation_complete(self, image):
self.image = image
self._maybe_send()
def tts_complete(self, audio):
self.audio = audio
self._maybe_send()
def _maybe_send(self):
if self.image is not None and self.audio is not None:
self.video_output.queue_frame(self.image)
self.audio_output.queue_audio(self.audio)
def shutdown():
transport.stop()
tts.close()

View File

@@ -0,0 +1,72 @@
from dailyai.services.transport.DailyTransport import DailyTransportService
from dailyai.services.llm.AzureLLMService import AzureLLMService
from dailyai.services.tts.AzureTTSService import AzureTTSService
from dailyai.services.utils import Tee
from dailyai.services.utils import ReadySoundWav
initial_prompt = "You are a helpful assistant. Introduce yourself and ask how you can be helpful."
llm_messages = [{
"role": "system",
"content": initial_prompt
}]
transport = None
llm = None
tts = None
mic = None
transcription = None
def main():
global transport
global llm
global tts
global mic
global transcription
transport = DailyTransportService()
llm = AzureLLMService()
tts = AzureTTSService()
# using Moishe's combined output queue rather than an audio-only queue
mic = transport.create_output_queue(audio=True, video=False)
llm.set_output(Tee(tts, accumulate_assistant_messages))
tts.set_output(mic)
# DailyTransport implements transcription internally. we'll grab a handle to this
# Transcription service, configure it to use silence-based endpointing, and
# set the silence interval to 1.5 seconds
transcription = transport.transcription_service()
transcription.configure(endpointing_pause=1.5)
transport.on("error", lambda e: print(e))
transport.on("joined-meeting", llm_prompt)
transport.start()
def llm_prompt():
llm.run_llm(
"""You are a friendly assistant. Introduce yourself and ask how you can be helpful""")
mic.once("audio-queue-empty", listen)
def listen():
mic.queue(ReadySoundWav)
# ignore any transcription results that come in before we're ready
_ = transcription.read()
user_text_input = transcription.read_until_silence()
llm_messages.push({
"role": "user",
"content": user_text_input
})
llm_prompt()
def accumulate_assistant_messages(completed_inference_text):
llm_messages.push({
"role": "assistant",
"content": completed_inference_text
})

View File

@@ -0,0 +1,15 @@
-01 just say one thing and exit
-02 llm say one thing and exit
-03 send "still frame" of video
-04 manual intro utterance and then llm say one thing and exit
-05 generate images for the months of the year, synchronized with their spoken descriptions
-06 chat: llm speak and respond (ignoring transcription input while speaking)
-07 chat: llm speak and respond (interruptible)
-08 two llms arguing about a topic (in the same process)
-09 two llms arguing about a topic (two separate bots)
-10 listen for wake word before sending commands to llm
-11 06 plus sound effects queued from sound file
-12 06 plus background music played through a second "mic" device

View File

@@ -1 +0,0 @@
These samples need to be updated! Don't rely on them.

View File

@@ -1,113 +0,0 @@
import argparse
import asyncio
import requests
import time
import urllib.parse
import random
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.services.fal_ai_services import FalImageGenService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
async def main(room_url:str, token):
global transport
global llm
global tts
transport = DailyTransportService(
room_url,
token,
"Imagebot",
1,
)
transport.mic_enabled = True
transport.camera_enabled = True
transport.mic_sample_rate = 16000
transport.camera_width = 1024
transport.camera_height = 1024
llm = AzureLLMService()
tts = AzureTTSService()
img = FalImageGenService()
async def handle_transcriptions():
print("handle_transcriptions got called")
sentence = ""
async for message in transport.get_transcriptions():
print(f"transcription message: {message}")
if message["session_id"] == transport.my_participant_id:
continue
finder = message["text"].find("start over")
print(f"finder: {finder}")
if finder >= 0:
async for audio in tts.run_tts(f"Resetting."):
transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio))
sentence = ""
continue
# todo: we could differentiate between transcriptions from different participants
sentence += f" {message['text']}"
print(f"sentence is now: {sentence}")
# TODO: Cache this audio
phrase = random.choice(["OK.", "Got it.", "Sure.", "You bet.", "Sure thing."])
async for audio in tts.run_tts(phrase):
transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio))
img_result = img.run_image_gen(sentence, "1024x1024")
awaited_img = await asyncio.gather(img_result)
transport.output_queue.put(
[
QueueFrame(FrameType.IMAGE_FRAME, awaited_img[0][1]),
]
)
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, participant):
print(f"participant joined: {participant['info']['userName']}")
if participant["info"]["isLocal"]:
return
async for audio in tts.run_tts("Describe an image, and I'll create it."):
audio_generator = tts.run_tts(f"Hello, {participant['info']['userName']}! Describe an image and I'll create it. To start over, just say 'start over'.")
async for audio in audio_generator:
transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio))
transport.transcription_settings["extra"]["punctuate"] = False
transport.transcription_settings["extra"]["endpointing"] = False
await asyncio.gather(transport.run(), handle_transcriptions())
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
type=str,
required=True,
help="Daily API Key (needed to create token)",
)
args, unknown = parser.parse_known_args()
# Create a meeting token for the given room with an expiration 1 hour in the future.
room_name: str = urllib.parse.urlparse(args.url).path[1:]
expiration: float = time.time() + 60 * 60
res: requests.Response = requests.post(
f"https://api.daily.co/v1/meeting-tokens",
headers={"Authorization": f"Bearer {args.apikey}"},
json={
"properties": {"room_name": room_name, "is_owner": True, "exp": expiration}
},
)
if res.status_code != 200:
raise Exception(f"Failed to create meeting token: {res.status_code} {res.text}")
token: str = res.json()["token"]
asyncio.run(main(args.url, token))

View File

@@ -13,18 +13,16 @@ from dailyai.orchestrator import OrchestratorConfig, Orchestrator
from dailyai.message_handler.message_handler import MessageHandler
from dailyai.services.ai_services import AIServiceConfig
from dailyai.services.azure_ai_services import AzureImageGenService, AzureTTSService, AzureLLMService
from dailyai.services.deepgram_ai_services import DeepgramTTSService
def add_bot_to_room(room_url, token, expiration) -> None:
# A simple prompt for a simple sample.
message_handler = MessageHandler(
"""
You are a sample bot in a WebRTC session. You'll receive input as transcriptions of user's
speech, and your responses will be converted to audio via a TTS service.
You are a sample bot, meant to demonstrate how to use an LLM with transcription at TTS.
Answer user's questions and be friendly, and if you can, give some ideas about how someone
could use a bot like you in a more in-depth way. Because your responses will be spoken,
try to keep them short.
try to keep them short and sweet.
"""
)
@@ -35,6 +33,12 @@ def add_bot_to_room(room_url, token, expiration) -> None:
# - AZURE_CHATGPT_KEY
# - AZURE_CHATGPT_ENDPOINT
# - AZURE_CHATGPT_DEPLOYMENT_ID
#
# This demo doesn't use image generation, but if you extend it to do so,
# you'll also need to set:
# - AZURE_DALLE_KEY
# - AZURE_DALLE_ENDPOINT
# - AZURE_DALLE_DEPLOYMENT_ID
services = AIServiceConfig(
tts=AzureTTSService(), image=None, llm=AzureLLMService()

BIN
src/samples/static-sprite/.DS_Store vendored Normal file

Binary file not shown.

View File

@@ -15,7 +15,7 @@ from dailyai.async_processor.async_processor import (
OrchestratorResponse
)
from dailyai.orchestrator import OrchestratorConfig, Orchestrator
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.output_queue import OutputQueueFrame, FrameType
from dailyai.message_handler.message_handler import MessageHandler
from dailyai.services.ai_services import AIServiceConfig
from dailyai.services.azure_ai_services import AzureImageGenService, AzureTTSService, AzureLLMService
@@ -40,7 +40,7 @@ class StaticSpriteResponse(OrchestratorResponse):
self.image_bytes = img.tobytes()
def do_play(self) -> None:
self.output_queue.put(QueueFrame(FrameType.IMAGE, self.image_bytes))
self.output_queue.put(OutputQueueFrame(FrameType.IMAGE_FRAME, self.image_bytes))
class IntroSpriteResponse(StaticSpriteResponse):
@@ -71,10 +71,10 @@ class AnimatedSpriteLLMResponse(LLMResponse):
with Image.open(full_path) as img:
self.image_bytes.append(img.tobytes())
def get_frames_from_tts_response(self, audio_frame) -> list[QueueFrame]:
def get_frames_from_tts_response(self, audio_frame) -> list[OutputQueueFrame]:
return [
QueueFrame(FrameType.AUDIO, audio_frame),
QueueFrame(FrameType.IMAGE, random.choice(self.image_bytes))
OutputQueueFrame(FrameType.AUDIO_FRAME, audio_frame),
OutputQueueFrame(FrameType.IMAGE_FRAME, random.choice(self.image_bytes))
]
@@ -83,11 +83,10 @@ def add_bot_to_room(room_url, token, expiration) -> None:
# A simple prompt for a simple sample.
message_handler = MessageHandler(
"""
You are a sample bot in a WebRTC session. You'll receive input as transcriptions of user's
speech, and your responses will be converted to audio via a TTS service.
You are a sample bot, meant to demonstrate how to use an LLM with transcription at TTS.
Answer user's questions and be friendly, and if you can, give some ideas about how someone
could use a bot like you in a more in-depth way. Because your responses will be spoken,
try to keep them short.
try to keep them short and sweet.
"""
)

View File

Before

Width:  |  Height:  |  Size: 871 KiB

After

Width:  |  Height:  |  Size: 871 KiB

View File

Before

Width:  |  Height:  |  Size: 870 KiB

After

Width:  |  Height:  |  Size: 870 KiB

View File

Before

Width:  |  Height:  |  Size: 871 KiB

After

Width:  |  Height:  |  Size: 871 KiB

View File

Before

Width:  |  Height:  |  Size: 868 KiB

After

Width:  |  Height:  |  Size: 868 KiB

View File

@@ -1,54 +0,0 @@
import argparse
import asyncio
from typing import AsyncGenerator
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureTTSService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
async def main(room_url):
# create a transport service object using environment variables for
# the transport service's API key, room url, and any other configuration.
# services can all define and document the environment variables they use.
# services all also take an optional config object that is used instead of
# environment variables.
#
# the abstract transport service APIs presumably can map pretty closely
# to the daily-python basic API
meeting_duration_minutes = 1
transport = DailyTransportService(
room_url,
None,
"Say One Thing",
meeting_duration_minutes,
)
transport.mic_enabled = True
tts = ElevenLabsTTSService(voice_id="ErXwobaYiN019PkySvjV")
# Register an event handler so we can play the audio when the participant joins.
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, participant):
if participant["info"]["isLocal"]:
return
await tts.say(
"Hello there, " + participant["info"]["userName"] + "!",
transport.send_queue,
)
# wait for the output queue to be empty, then leave the meeting
await transport.stop_when_done()
await transport.run()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
)
args, unknown = parser.parse_known_args()
asyncio.run(main(args.url))

View File

@@ -1,55 +0,0 @@
import asyncio
import time
from typing import AsyncGenerator
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureTTSService
from dailyai.services.deepgram_ai_services import DeepgramTTSService
async def main(room_url):
# create a transport service object using environment variables for
# the transport service's API key, room url, and any other configuration.
# services can all define and document the environment variables they use.
# services all also take an optional config object that is used instead of
# environment variables.
#
# the abstract transport service APIs presumably can map pretty closely
# to the daily-python basic API
meeting_duration_minutes = 1
transport = DailyTransportService(
room_url,
None,
"Greeter",
meeting_duration_minutes,
)
transport.mic_enabled = True
# similarly, create a tts service
tts = DeepgramTTSService()
# Get the generator for the audio. This will start running in the background,
# and when we ask the generator for its items, we'll get what it's generated.
# Register an event handler so we can play the audio when the participant joins.
print("settting up handler")
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, participant):
print(f"participant joined: {participant['info']['userName']}")
if participant["info"]["isLocal"]:
return
audio_generator: AsyncGenerator[bytes, None] = tts.run_tts(f"Hello there, {participant['info']['userName']}!")
async for audio in audio_generator:
transport.output_queue.put(QueueFrame(FrameType.AUDIO, audio))
print("setting up call state handler")
@transport.event_handler("on_call_state_updated")
async def on_call_joined(transport, state):
print(f"call state callback: {state}")
await transport.run()
if __name__ == "__main__":
asyncio.run(main("https://chad-hq.daily.co/howdy"))

View File

@@ -1,52 +0,0 @@
import argparse
import asyncio
from typing import AsyncGenerator
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.ai_services import SentenceAggregator
from dailyai.services.azure_ai_services import AzureLLMService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
async def main(room_url):
meeting_duration_minutes = 1
transport = DailyTransportService(
room_url,
None,
"Say One Thing From an LLM",
meeting_duration_minutes,
)
transport.mic_enabled = True
tts = ElevenLabsTTSService(voice_id="29vD33N1CtxCmqQRPOHJ")
llm = AzureLLMService()
messages = [{
"role": "system",
"content": "You are an LLM in a WebRTC session, and this is a 'hello world' demo. Say hello to the world."
}]
tts_task = asyncio.create_task(
tts.run_to_queue(
transport.send_queue,
SentenceAggregator().run(
llm.run([QueueFrame(FrameType.LLM_MESSAGE, messages)])
)
)
)
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
await tts_task
await transport.stop_when_done()
await transport.run()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
)
args, unknown = parser.parse_known_args()
asyncio.run(main(args.url))

View File

@@ -1,44 +0,0 @@
import argparse
import asyncio
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.open_ai_services import OpenAIImageGenService
local_joined = False
participant_joined = False
async def main(room_url):
meeting_duration_minutes = 1
transport = DailyTransportService(
room_url,
None,
"Show a still frame image",
meeting_duration_minutes,
)
transport.mic_enabled = False
transport.camera_enabled = True
transport.camera_width = 1024
transport.camera_height = 1024
imagegen = OpenAIImageGenService(image_size="1024x1024")
image_task = asyncio.create_task(
imagegen.run_to_queue(transport.send_queue, [QueueFrame(FrameType.IMAGE_DESCRIPTION, "a cat in the style of picasso")])
)
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, participant):
await image_task
await transport.run()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
)
args, unknown = parser.parse_known_args()
asyncio.run(main(args.url))

View File

@@ -1,79 +0,0 @@
import argparse
import asyncio
import re
from dailyai.services.ai_services import SentenceAggregator
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
async def main(room_url:str):
global transport
global llm
global tts
transport = DailyTransportService(
room_url,
None,
"Say Two Things Bot",
1,
)
transport.mic_enabled = True
transport.mic_sample_rate = 16000
transport.camera_enabled = False
llm = AzureLLMService()
azure_tts = AzureTTSService()
elevenlabs_tts = ElevenLabsTTSService(voice_id="ErXwobaYiN019PkySvjV")
messages = [{"role": "system", "content": "tell the user a joke about llamas"}]
# Start a task to run the LLM to create a joke, and convert the LLM output to audio frames. This task
# will run in parallel with generating and speaking the audio for static text, so there's no delay to
# speak the LLM response.
buffer_queue = asyncio.Queue()
llm_response_task = asyncio.create_task(
elevenlabs_tts.run_to_queue(
buffer_queue,
SentenceAggregator().run(
llm.run([QueueFrame(FrameType.LLM_MESSAGE, messages)])
),
True,
)
)
@transport.event_handler("on_participant_joined")
async def on_joined(transport, participant):
if participant["id"] == transport.my_participant_id:
return
await azure_tts.run_to_queue(
transport.send_queue,
[QueueFrame(FrameType.SENTENCE, "My friend the LLM is now going to tell a joke about llamas.")]
)
async def buffer_to_send_queue():
while True:
frame = await buffer_queue.get()
await transport.send_queue.put(frame)
buffer_queue.task_done()
if frame.frame_type == FrameType.END_STREAM:
break
await asyncio.gather(llm_response_task, buffer_to_send_queue())
await transport.stop_when_done()
await transport.run()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
)
args, unknown = parser.parse_known_args()
asyncio.run(main(args.url))

View File

@@ -1,108 +0,0 @@
import argparse
import asyncio
from asyncio.queues import Queue
import re
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.services.ai_services import SentenceAggregator
from dailyai.services.azure_ai_services import AzureLLMService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.open_ai_services import OpenAIImageGenService
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.fal_ai_services import FalImageGenService
async def main(room_url):
meeting_duration_minutes = 5
transport = DailyTransportService(
room_url,
None,
"Month Narration Bot",
meeting_duration_minutes,
)
transport.mic_enabled = True
transport.camera_enabled = True
transport.mic_sample_rate = 16000
transport.camera_width = 1024
transport.camera_height = 1024
llm = AzureLLMService()
dalle = FalImageGenService(image_size="1024x1024")
tts = ElevenLabsTTSService(voice_id="ErXwobaYiN019PkySvjV")
# dalle = OpenAIImageGenService(image_size="1024x1024")
# Get a complete audio chunk from the given text. Splitting this into its own
# coroutine lets us ensure proper ordering of the audio chunks on the send queue.
async def get_all_audio(text):
all_audio = bytearray()
async for audio in tts.run_tts(text):
all_audio.extend(audio)
return all_audio
async def get_month_data(month):
messages = [
{
"role": "system",
"content": f"Describe a nature photograph suitable for use in a calendar, for the month of {month}. Include only the image description with no preamble. Limit the description to one sentence, please.",
}
]
image_description = await llm.run_llm(messages)
to_speak = f"{month}: {image_description}"
(audio, image_data) = await asyncio.gather(
get_all_audio(to_speak), dalle.run_image_gen(image_description)
)
return {
"month": month,
"text": image_description,
"image": image_data[1],
"audio": audio,
}
months: list[str] = [
"January",
"February",
"March",
"April",
"May",
"June",
"July",
"August",
"September",
"October",
"November",
"December",
]
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
# This will play the months in the order they're completed. The benefit
# is we'll have as little delay as possible before the first month, and
# likely no delay between months, but the months won't display in order.
for month_data_task in asyncio.as_completed(month_tasks):
data = await month_data_task
await transport.send_queue.put(
[
QueueFrame(FrameType.IMAGE, data["image"]),
QueueFrame(FrameType.AUDIO, data["audio"]),
]
)
# wait for the output queue to be empty, then leave the meeting
await transport.stop_when_done()
month_tasks = [asyncio.create_task(get_month_data(month)) for month in months]
await transport.run()
if __name__=="__main__":
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
)
args, unknown = parser.parse_known_args()
asyncio.run(main(args.url))

View File

@@ -1,94 +0,0 @@
import argparse
import asyncio
import requests
import time
import urllib.parse
from dailyai.services.ai_services import SentenceAggregator
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.queue_frame import QueueFrame, FrameType
async def main(room_url:str, token):
global transport
global llm
global tts
transport = DailyTransportService(
room_url,
token,
"Respond bot",
1,
)
transport.mic_enabled = True
transport.mic_sample_rate = 16000
transport.camera_enabled = False
llm = AzureLLMService()
tts = AzureTTSService()
async def handle_transcriptions():
messages = [
{"role": "system", "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way."},
]
sentence = ""
async for frame in transport.get_receive_frames():
if frame.frame_type != FrameType.TRANSCRIPTION:
continue
message = frame.frame_data
if message["session_id"] == transport.my_participant_id:
continue
# todo: we could differentiate between transcriptions from different participants
sentence += message["text"]
if sentence.endswith((".", "?", "!")):
messages.append({"role": "user", "content": sentence})
sentence = ''
full_response = ""
async for response in llm.run_llm_async_sentences(messages):
full_response += response
async for audio in tts.run_tts(response):
await transport.send_queue.put(QueueFrame(FrameType.AUDIO, audio))
messages.append({"role": "assistant", "content": full_response})
transport.transcription_settings["extra"]["punctuate"] = True
await asyncio.gather(transport.run(), handle_transcriptions())
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
type=str,
required=True,
help="Daily API Key (needed to create token)",
)
args, unknown = parser.parse_known_args()
# Create a meeting token for the given room with an expiration 1 hour in the future.
room_name: str = urllib.parse.urlparse(args.url).path[1:]
expiration: float = time.time() + 60 * 60
res: requests.Response = requests.post(
f"https://api.daily.co/v1/meeting-tokens",
headers={"Authorization": f"Bearer {args.apikey}"},
json={
"properties": {"room_name": room_name, "is_owner": True, "exp": expiration}
},
)
if res.status_code != 200:
raise Exception(f"Failed to create meeting token: {res.status_code} {res.text}")
token: str = res.json()["token"]
asyncio.run(main(args.url, token))