Compare commits

...

47 Commits

Author SHA1 Message Date
Moishe Lettvin
9e35e21729 undone 2024-01-18 16:09:23 -05:00
Moishe Lettvin
ceeb93dd46 Clean up example 5 2024-01-18 11:58:08 -05:00
Moishe Lettvin
f55333844e Merge pull request #4 from daily-co/service-refactor
Service refactor: Generators over queues
2024-01-18 11:30:59 -05:00
Moishe Lettvin
0245f98eb5 update fal image_gen to use size in constructor 2024-01-18 11:29:13 -05:00
Moishe Lettvin
f9f2e2d7ea stop_when_done 2024-01-18 11:22:06 -05:00
Moishe Lettvin
0d21768d00 Fix example 5 2024-01-18 11:22:02 -05:00
Moishe Lettvin
13f2f792af refactor party tonight 2024-01-18 11:21:38 -05:00
Moishe Lettvin
a3ac0d84e8 working on making services more consistent/terse/easy 2024-01-18 11:20:56 -05:00
chadbailey59
6e8ebbd34c fal.ai integration (#3)
* fal.ai image gen

* some sample and readme updates

* holy cow this is fast

* basic image-gen working

* starting audio prompt and reset

* short confirmation words

* moved fal module to pyproject.toml

---------

Co-authored-by: Moishe Lettvin <moishel@gmail.com>
2024-01-17 12:08:00 -06:00
Chad Bailey
b1e6a89acd README tweaks 2024-01-17 16:56:17 +00:00
Moishe Lettvin
115b744861 input/output -> receive/send queues, for transport service 2024-01-16 21:11:38 -05:00
Moishe Lettvin
755059c358 a little cleanup 2024-01-16 19:58:11 -05:00
Moishe Lettvin
cfaccefe9c some sample and readme updates 2024-01-12 16:22:03 -05:00
Moishe Lettvin
5b49597854 Pass entire transcription message on queue; don't respond to yourself in 06- 2024-01-12 13:54:55 -05:00
Moishe Lettvin
869d557ded remove mistakenly added import: 2024-01-12 13:49:34 -05:00
Moishe Lettvin
a42f6bc531 Merge pull request #2 from daily-co/queueing
Adding queue transportation to services
2024-01-12 13:40:51 -05:00
Moishe Lettvin
9bbfc8ad05 Updating 06- sample 2024-01-12 11:40:49 -05:00
Moishe Lettvin
ad427bea3a Update 05- sample, sticking with generators here because they give us a lot more control over the order that things get queued. 2024-01-12 10:47:03 -05:00
Moishe Lettvin
ec1f2362c5 add on_first_other_participant_joined event 2024-01-12 08:39:56 -05:00
Moishe Lettvin
37207745c9 Remove some unused variables 2024-01-11 19:40:19 -05:00
Moishe Lettvin
b9b82695c6 Adding queue transportation to services 2024-01-11 19:14:19 -05:00
Moishe Lettvin
7ca7764be3 a little cleanup 2024-01-10 21:14:55 -05:00
Moishe Lettvin
df1bbef653 something is up with transcription, trying to figure it out 2024-01-10 17:12:40 -05:00
Moishe Lettvin
7229fc806e getting started on 06- 2024-01-10 13:00:36 -05:00
Moishe Lettvin
cd204ebd21 Add sample 04- 2024-01-09 14:19:27 -05:00
Moishe Lettvin
cb63307ddf Improve pipeline of data gathering for 05- sample (I think it can be better, though) 2024-01-09 12:56:56 -05:00
Moishe Lettvin
37f48d9f04 different take on 05 maybe 2024-01-09 11:59:21 -05:00
Moishe Lettvin
a29cb1a5ad Update 05 sample to use decorators and exit correctly 2024-01-09 06:40:48 -05:00
Moishe Lettvin
20a9c6a938 simplify the join, no need for END_STREAM 2024-01-09 06:12:24 -05:00
Moishe Lettvin
534a4e2fa1 Exit meeting after saying something for samples 1 and 2 2024-01-09 06:05:24 -05:00
Moishe Lettvin
11a553240f Sample 3 2024-01-09 05:45:49 -05:00
Moishe Lettvin
4efeae46bc stop using my random room in the demos 2024-01-09 05:32:42 -05:00
Moishe Lettvin
a59cec526f elevenlabs test in 02- 2024-01-08 20:28:53 -05:00
Moishe Lettvin
ac0e4b0c27 remove leftover prints 2024-01-08 17:56:21 -05:00
Moishe Lettvin
6cace129fd shut down correctly when a participant leaves 2024-01-08 17:55:19 -05:00
chadbailey59
290c1e7efa Added async OpenAI services (#1)
* added async openai services

* added async openai services

* added Deepgram service with 05 example

* modernized the 'say one thing' example

* async all the things

* cleanup and user greeting

* more cleanup
2024-01-08 16:07:21 -06:00
Moishe Lettvin
d95bca479d update 01 sample 2024-01-08 15:58:11 -05:00
Moishe Lettvin
0451ae498f Make transportservice.run async, and add 02- sample 2024-01-08 15:38:32 -05:00
Moishe Lettvin
712ab97f88 Support coroutine callbacks 2024-01-08 11:27:58 -05:00
Moishe Lettvin
b5c7e30efa Use a decorator for callbacks 2024-01-06 14:09:28 -05:00
Moishe Lettvin
7f51c0c9b2 A little cleanup 2024-01-05 20:41:52 -05:00
Moishe Lettvin
b48a377b17 a little cleanup, uglier-than-I'd-like 01-say-one-thing sample added 2024-01-05 14:19:18 -05:00
Moishe Lettvin
5b4c085cd2 even more demo cleanup, making run_llm play well with asyncio 2024-01-04 18:14:28 -05:00
Moishe Lettvin
cd2c9700ad more demo cleanup, allow bundled frames in output_queue 2024-01-04 17:54:13 -05:00
Moishe Lettvin
fcd9a248d9 use inference text in demo, clean up image generation 2024-01-04 17:26:13 -05:00
Moishe Lettvin
c68703749b remove unnecessary file 2024-01-04 15:49:12 -05:00
Moishe Lettvin
aab06f4441 a few tweaks to the prompt 2024-01-02 10:46:48 -05:00
37 changed files with 1642 additions and 183 deletions

4
.gitignore vendored
View File

@@ -22,4 +22,6 @@ share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
MANIFEST
.DS_Store
.env

View File

@@ -5,12 +5,14 @@ This SDK can help you build applications that participate in WebRTC meetings and
## Build/Install
_Note that you may need to set up a virtual environment before following the instructions below. For instance, you might need to run the following from the root of the repo:_
```
python3 -m venv env
source env/bin/activate
```
From the root of this repo, run the following:
```
pip install -r requirements.txt
python -m build
@@ -19,10 +21,11 @@ python -m build
This builds the package. To use the package locally (eg to run sample files), run
```
pip install .
pip install --editable .
```
If you want to use this package from another directory, you can run:
```
pip install path_to_this_repo
```
@@ -32,7 +35,7 @@ pip install path_to_this_repo
Tou can run the simple sample like so:
```
python src/samples/simple-sample/simple-sample.py -u your_room_url -k your_daily_api_key
python src/samples/theoretical-to-real/01-say-one-thing.py -u <url of your Daily meeting> -k <your Daily API Key>
```
Note that the sample uses Azure's TTS and LLM services. You'll need to set the following environment variables for the sample to work:
@@ -44,3 +47,9 @@ AZURE_CHATGPT_KEY
AZURE_CHATGPT_ENDPOINT
AZURE_CHATGPT_DEPLOYMENT_ID
```
If you have those environment variables stored in an .env file, you can quickly load them into your terminal's environment by running this:
```bash
export $(grep -v '^#' .env | xargs)
```

View File

@@ -14,7 +14,9 @@ dependencies = [
"google-cloud-texttospeech",
"azure-cognitiveservices-speech",
"pyht",
"opentelemetry-sdk"
"opentelemetry-sdk",
"aiohttp",
"fal"
]
[tool.setuptools.packages.find]

View File

@@ -9,7 +9,7 @@ from queue import Queue, PriorityQueue, Empty
from threading import Event, Semaphore, Thread
from typing import Any, Generator, Iterator, Optional, Type
from dailyai.output_queue import OutputQueueFrame, FrameType
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.message_handler.message_handler import MessageHandler
from dailyai.services.ai_services import AIServiceConfig
@@ -268,10 +268,10 @@ class LLMResponse(OrchestratorResponse):
if out.strip():
yield out.strip()
def get_frames_from_tts_response(self, audio_frame) -> list[OutputQueueFrame]:
return [OutputQueueFrame(FrameType.AUDIO_FRAME, audio_frame)]
def get_frames_from_tts_response(self, audio_frame) -> list[QueueFrame]:
return [QueueFrame(FrameType.AUDIO, audio_frame)]
def get_frames_from_chunk(self, chunk) -> Generator[list[OutputQueueFrame], Any, None]:
def get_frames_from_chunk(self, chunk) -> Generator[list[QueueFrame], Any, None]:
for audio_frame in self.services.tts.run_tts(chunk):
yield self.get_frames_from_tts_response(audio_frame)
@@ -317,7 +317,7 @@ class LLMResponse(OrchestratorResponse):
break
if not self.has_sent_first_frame:
self.output_queue.put(OutputQueueFrame(FrameType.START_STREAM, None))
self.output_queue.put(QueueFrame(FrameType.START_STREAM, None))
self.has_sent_first_frame = True
for frame in frames:

View File

@@ -15,7 +15,7 @@ from dailyai.async_processor.async_processor import (
OrchestratorResponse,
LLMResponse,
)
from dailyai.output_queue import OutputQueueFrame, FrameType
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.services.ai_services import AIServiceConfig
from dailyai.message_handler.message_handler import MessageHandler
@@ -197,7 +197,7 @@ class Orchestrator(EventHandler):
self.logger.info("Camera thread stopped")
self.logger.info("Put stop in output queue")
self.output_queue.put(OutputQueueFrame(FrameType.END_STREAM, None))
self.output_queue.put(QueueFrame(FrameType.END_STREAM, None))
self.frame_consumer_thread.join()
self.logger.info("Orchestrator stopped.")
@@ -209,6 +209,9 @@ class Orchestrator(EventHandler):
def on_intro_finished(self, intro):
self.logger.info(f"Introduction has finished")
waiting = self.conversation_processors.waiting(self.services, self.message_handler, self.output_queue)
waiting.prepare()
waiting.play()
def on_response_played(self, response):
response.finalize()
@@ -364,7 +367,7 @@ class Orchestrator(EventHandler):
all_audio_frames = bytearray()
while True:
try:
frame:OutputQueueFrame = self.output_queue.get()
frame:QueueFrame = self.output_queue.get()
if frame.frame_type == FrameType.END_STREAM:
self.logger.info("Stopping frame consumer thread")
return
@@ -372,7 +375,7 @@ class Orchestrator(EventHandler):
# if interrupted, we just pull frames off the queue and discard them
if not self.is_interrupted.is_set():
if frame:
if frame.frame_type == FrameType.AUDIO_FRAME:
if frame.frame_type == FrameType.AUDIO:
chunk = frame.frame_data
all_audio_frames.extend(chunk)
@@ -382,7 +385,7 @@ class Orchestrator(EventHandler):
if l:
self.mic.write_frames(bytes(b[:l]))
b = b[l:]
elif frame.frame_type == FrameType.IMAGE_FRAME:
elif frame.frame_type == FrameType.IMAGE:
self.set_image(frame.frame_data)
elif len(b):
self.mic.write_frames(bytes(b))

View File

@@ -1,14 +0,0 @@
from enum import Enum
from dataclasses import dataclass
class FrameType(Enum):
AUDIO_FRAME = 1
IMAGE_FRAME = 2
START_STREAM = 3
END_STREAM = 4
@dataclass(frozen=True)
class OutputQueueFrame:
frame_type: FrameType
frame_data: bytes

View File

@@ -0,0 +1,19 @@
from enum import Enum
from dataclasses import dataclass
class FrameType(Enum):
START_STREAM = 0
END_STREAM = 1
AUDIO = 2
IMAGE = 3
SENTENCE = 4
TEXT_CHUNK = 5
LLM_MESSAGE = 6
APP_MESSAGE = 7
IMAGE_DESCRIPTION = 8
TRANSCRIPTION = 9
@dataclass(frozen=True)
class QueueFrame:
frame_type: FrameType
frame_data: str | dict | bytes | list | None

View File

@@ -1,2 +1,2 @@
Pillow==10.1.0
typing_extensions==4.9.0
typing_extensions==4.9.0

View File

@@ -0,0 +1,73 @@
from typing import AsyncGenerator
from dailyai.queue_frame import FrameType, QueueFrame
from dailyai.services.ai_services import AIService
class SentenceAggregator(AIService):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.current_sentence = ""
def allowed_input_frame_types(self) -> set[FrameType]:
return set([FrameType.TEXT_CHUNK, FrameType.SENTENCE])
def possible_output_frame_types(self) -> set[FrameType]:
return set([FrameType.SENTENCE])
async def process_frame(
self, requested_frame_types: set[FrameType], frame: QueueFrame
) -> AsyncGenerator[QueueFrame, None]:
if not FrameType.SENTENCE in requested_frame_types:
return
if frame.frame_type == FrameType.TEXT_CHUNK:
if type(frame.frame_data) != str:
raise Exception(
"Sentence aggregator requires a string for the data field"
)
self.current_sentence += frame.frame_data
if self.current_sentence.endswith((".", "?", "!")):
sentence = self.current_sentence
self.current_sentence = ""
yield QueueFrame(FrameType.SENTENCE, sentence)
elif frame.frame_type == FrameType.END_STREAM:
if self.current_sentence:
yield QueueFrame(FrameType.SENTENCE, self.current_sentence)
elif frame.frame_type == FrameType.SENTENCE:
yield frame
class TranscriptionSentenceAggregator(AIService):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.current_sentence = ""
def allowed_input_frame_types(self) -> set[FrameType]:
return set([FrameType.TEXT_CHUNK, FrameType.SENTENCE])
def possible_output_frame_types(self) -> set[FrameType]:
return set([FrameType.SENTENCE])
async def process_frame(
self, requested_frame_types: set[FrameType], frame: QueueFrame
) -> AsyncGenerator[QueueFrame, None]:
if not FrameType.SENTENCE in requested_frame_types:
return
if frame.frame_type == FrameType.TEXT_CHUNK:
if type(frame.frame_data) != str:
raise Exception(
"Sentence aggregator requires a string for the data field"
)
self.current_sentence += frame.frame_data
if self.current_sentence.endswith((".", "?", "!")):
sentence = self.current_sentence
self.current_sentence = ""
yield QueueFrame(FrameType.SENTENCE, sentence)
elif frame.frame_type == FrameType.END_STREAM:
if self.current_sentence:
yield QueueFrame(FrameType.SENTENCE, self.current_sentence)
elif frame.frame_type == FrameType.SENTENCE:
yield frame

View File

@@ -1,33 +1,136 @@
import asyncio
import logging
import re
from httpx import request
from dailyai.queue_frame import QueueFrame, FrameType
from abc import abstractmethod
from typing import AsyncGenerator, Iterable
from dataclasses import dataclass
from typing import Generator
from PIL import Image
from typing import AsyncGenerator
from collections.abc import Iterable, AsyncIterable
class AIService:
def __init__(self):
self.logger = logging.getLogger("dailyai")
def close(self):
def stop(self):
pass
def allowed_input_frame_types(self) -> set[FrameType]:
return set()
def possible_output_frame_types(self) -> set[FrameType]:
return set()
async def run_to_queue(self, queue: asyncio.Queue, frames, add_end_of_stream=False) -> None:
async for frame in self.run(frames):
await queue.put(frame)
if add_end_of_stream:
await queue.put(QueueFrame(FrameType.END_STREAM, None))
async def run(
self,
frames: Iterable[QueueFrame]
| AsyncIterable[QueueFrame]
| asyncio.Queue[QueueFrame],
requested_frame_types: set[FrameType] | None=None,
) -> AsyncGenerator[QueueFrame, None]:
if requested_frame_types and self.possible_output_frame_types().intersection(requested_frame_types) == set():
raise Exception(f"Requested frame types {requested_frame_types} are not supported by this service.")
if not requested_frame_types:
requested_frame_types = self.possible_output_frame_types()
if isinstance(frames, AsyncIterable):
async for frame in frames:
async for output_frame in self.process_frame(requested_frame_types, frame):
yield output_frame
elif isinstance(frames, Iterable):
for frame in frames:
async for output_frame in self.process_frame(requested_frame_types, frame):
yield output_frame
elif isinstance(frames, asyncio.Queue):
while True:
frame = await frames.get()
async for output_frame in self.process_frame(requested_frame_types, frame):
yield output_frame
if frame.frame_type == FrameType.END_STREAM:
break
else:
raise Exception("Frames must be an iterable or async iterable")
@abstractmethod
async def process_frame(self, requested_frame_types:set[FrameType], frame:QueueFrame) -> AsyncGenerator[QueueFrame, None]:
# Yield something so the linter can deduce what should happen here.
yield QueueFrame(FrameType.END_STREAM, None)
class SentenceAggregator(AIService):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.current_sentence = ""
def allowed_input_frame_types(self) -> set[FrameType]:
return set([FrameType.TEXT_CHUNK, FrameType.SENTENCE])
def possible_output_frame_types(self) -> set[FrameType]:
return set([FrameType.SENTENCE])
async def process_frame(self, requested_frame_types: set[FrameType], frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if not FrameType.SENTENCE in requested_frame_types:
return
if frame.frame_type == FrameType.TEXT_CHUNK:
if type(frame.frame_data) != str:
raise Exception(
"Sentence aggregator requires a string for the data field"
)
self.current_sentence += frame.frame_data
if self.current_sentence.endswith((".", "?", "!")):
sentence = self.current_sentence
self.current_sentence = ""
yield QueueFrame(FrameType.SENTENCE, sentence)
elif frame.frame_type == FrameType.END_STREAM:
if self.current_sentence:
yield QueueFrame(FrameType.SENTENCE, self.current_sentence)
elif frame.frame_type == FrameType.SENTENCE:
yield frame
class LLMService(AIService):
# Generate a set of responses to a prompt. Yields a list of responses.
def allowed_input_frame_types(self) -> set[FrameType]:
return set([FrameType.LLM_MESSAGE, FrameType.SENTENCE, FrameType.TRANSCRIPTION])
def allowed_output_frame_types(self) -> set[FrameType]:
return set([FrameType.SENTENCE, FrameType.TEXT_CHUNK])
@abstractmethod
def run_llm_async(
self, messages
) -> Generator[str, None, None]:
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
yield ""
@abstractmethod
async def run_llm(self, messages) -> str:
pass
# Generate a responses to a prompt. Returns the response
@abstractmethod
def run_llm(
self, messages
) -> str or None:
pass
async def process_frame(self, requested_frame_types: set[FrameType], frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if frame.frame_type == FrameType.LLM_MESSAGE:
if type(frame.frame_data) != list:
raise Exception("LLM service requires a dict for the data field")
messages: list[dict[str, str]] = frame.frame_data
if FrameType.SENTENCE in requested_frame_types:
yield QueueFrame(FrameType.SENTENCE, await self.run_llm(messages))
else:
async for text_chunk in self.run_llm_async(messages):
yield QueueFrame(FrameType.TEXT_CHUNK, text_chunk)
# TODO: handle other frame types! Need to aggregate into messages
class TTSService(AIService):
@@ -35,19 +138,60 @@ class TTSService(AIService):
def get_mic_sample_rate(self):
return 16000
def allowed_input_frame_types(self) -> set[FrameType]:
return set([FrameType.SENTENCE, FrameType.TRANSCRIPTION, FrameType.TEXT_CHUNK])
def possible_output_frame_types(self) -> set[FrameType]:
return set([FrameType.AUDIO])
# Converts the sentence to audio. Yields a list of audio frames that can
# be sent to the microphone device
@abstractmethod
def run_tts(self, sentence) -> Generator[bytes, None, None]:
pass
async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]:
# yield empty bytes here, so linting can infer what this method does
yield bytes()
async def process_frame(self, requested_frame_types: set[FrameType], frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if not FrameType.AUDIO in requested_frame_types:
return
if type(frame.frame_data) != str:
raise Exception("TTS service requires a string for the data field")
async for audio_chunk in self.run_tts(frame.frame_data):
yield QueueFrame(FrameType.AUDIO, audio_chunk)
# Convenience function to send the audio for a sentence to the given queue
async def say(self, sentence, queue: asyncio.Queue):
await self.run_to_queue(queue, [QueueFrame(FrameType.SENTENCE, sentence)])
class ImageGenService(AIService):
def __init__(self, image_size, **kwargs):
super().__init__(**kwargs)
self.image_size = image_size
def allowed_input_frame_types(self) -> set[FrameType]:
return set([FrameType.SENTENCE, FrameType.TRANSCRIPTION, FrameType.TEXT_CHUNK, FrameType.IMAGE_DESCRIPTION])
def possible_output_frame_types(self) -> set[FrameType]:
return set([FrameType.IMAGE])
# Renders the image. Returns an Image object.
@abstractmethod
def run_image_gen(self, sentence) -> tuple[str, Image.Image]:
async def run_image_gen(self, sentence) -> tuple[str, bytes]:
pass
async def process_frame(self, requested_frame_types: set[FrameType], frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if not FrameType.IMAGE in requested_frame_types:
return
if type(frame.frame_data) != str:
raise Exception("Image service requires a string for the data field")
(_, image_data) = await self.run_image_gen(frame.frame_data)
yield QueueFrame(FrameType.IMAGE, image_data)
@dataclass
class AIServiceConfig:

View File

@@ -1,11 +1,13 @@
import json
import aiohttp
import asyncio
import io
from openai import AzureOpenAI
import json
from openai import AsyncAzureOpenAI
import os
import requests
from typing import Generator
from collections.abc import AsyncGenerator
from dailyai.services.ai_services import LLMService, TTSService, ImageGenService
from PIL import Image
@@ -23,7 +25,7 @@ class AzureTTSService(TTSService):
self.speech_config = SpeechConfig(subscription=speech_key, region=speech_region)
self.speech_synthesizer = SpeechSynthesizer(speech_config=self.speech_config, audio_config=None)
def run_tts(self, sentence) -> Generator[bytes, None, None]:
async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]:
self.logger.info("Running azure tts")
ssml = "<speak version='1.0' xml:lang='en-US' xmlns='http://www.w3.org/2001/10/synthesis' " \
"xmlns:mstts='http://www.w3.org/2001/mstts'>" \
@@ -33,7 +35,10 @@ class AzureTTSService(TTSService):
"<prosody rate='1.05'>" \
f"{sentence}" \
"</prosody></mstts:express-as></voice></speak> "
result = self.speech_synthesizer.speak_ssml(ssml)
try:
result = await asyncio.to_thread(self.speech_synthesizer.speak_ssml, (ssml))
except Exception as e:
self.logger.error("Error in azure tts", e)
self.logger.info("Got azure tts result")
if result.reason == ResultReason.SynthesizingAudioCompleted:
self.logger.info("Returning result")
@@ -49,45 +54,91 @@ class AzureLLMService(LLMService):
def __init__(self, api_key=None, azure_endpoint=None, api_version=None, model=None):
super().__init__()
api_key = api_key or os.getenv("AZURE_CHATGPT_KEY")
azure_endpoint = azure_endpoint or os.getenv("AZURE_CHATGPT_ENDPOINT")
if not azure_endpoint:
raise Exception("No azure endpoint specified for Azure LLM, please set AZURE_CHATGPT_ENDPOINT in the environment or pass it to the AzureLLMService constructor")
model: str | None = model or os.getenv("AZURE_CHATGPT_DEPLOYMENT_ID")
if not model:
raise Exception("No model specified for Azure LLM, please set AZURE_CHATGPT_DEPLOYMENT_ID in the environment or pass it to the AzureLLMService constructor")
self.model: str = model
api_version = api_version or "2023-12-01-preview"
self.client = AzureOpenAI(
self.client = AsyncAzureOpenAI(
api_key=api_key,
azure_endpoint=azure_endpoint,
api_version=api_version,
)
self.model = model or os.getenv("AZURE_CHATGPT_DEPLOYMENT_ID")
def get_response(self, messages, stream):
return self.client.chat.completions.create(
stream=stream,
messages=messages,
model=self.model,
)
def run_llm_async(self, messages) -> Generator[str, None, None]:
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
messages_for_log = json.dumps(messages)
self.logger.debug(f"Generating chat via azure: {messages_for_log}")
response = self.get_response(messages, stream=True)
for chunk in response:
chunks = await self.client.chat.completions.create(model=self.model, stream=True, messages=messages)
async for chunk in chunks:
if len(chunk.choices) == 0:
continue
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
def run_llm(self, messages) -> str | None:
async def run_llm(self, messages) -> str | None:
messages_for_log = json.dumps(messages)
self.logger.debug(f"Generating chat via azure: {messages_for_log}")
response = self.get_response(messages, stream=False)
response = await self.client.chat.completions.create(model=self.model, stream=False, messages=messages)
if response and len(response.choices) > 0:
return response.choices[0].message.content
else:
return None
class AzureImageGenServiceREST(ImageGenService):
def __init__(self, image_size:str, api_key=None, azure_endpoint=None, api_version=None, model=None):
super().__init__(image_size=image_size)
self.api_key = api_key or os.getenv("AZURE_DALLE_KEY")
self.azure_endpoint = azure_endpoint or os.getenv("AZURE_DALLE_ENDPOINT")
self.api_version = api_version or "2023-06-01-preview"
self.model = model or os.getenv("AZURE_DALLE_DEPLOYMENT_ID")
async def run_image_gen(self, sentence) -> tuple[str, bytes]:
# TODO hoist the session to app-level
async with aiohttp.ClientSession() as session:
url = f"{self.azure_endpoint}openai/images/generations:submit?api-version={self.api_version}"
headers= { "api-key": self.api_key, "Content-Type": "application/json" }
body = {
# Enter your prompt text here
"prompt": sentence,
"size": self.image_size,
"n": 1,
}
async with session.post(url, headers=headers, json=body) as submission:
operation_location = submission.headers['operation-location']
status = ""
attempts_left = 120
json_response = None
while status != "succeeded":
attempts_left -= 1
if attempts_left == 0:
raise Exception("Image generation timed out")
await asyncio.sleep(1)
response = await session.get(operation_location, headers=headers)
json_response = await response.json()
status = json_response["status"]
image_url = json_response["result"]["data"][0]["url"] if json_response else None
if not image_url:
raise Exception("Image generation failed")
# Load the image from the url
async with session.get(image_url) as response:
image_stream = io.BytesIO(await response.content.read())
image = Image.open(image_stream)
return (image_url, image.tobytes())
class AzureImageGenService(ImageGenService):
@@ -96,7 +147,7 @@ class AzureImageGenService(ImageGenService):
api_key = api_key or os.getenv("AZURE_DALLE_KEY")
azure_endpoint = azure_endpoint or os.getenv("AZURE_DALLE_ENDPOINT")
api_version = api_version or "2023-12-01-preview"
api_version = api_version or "2023-06-01-preview"
self.model = model or os.getenv("AZURE_DALLE_DEPLOYMENT_ID")
self.client = AzureOpenAI(
@@ -105,20 +156,20 @@ class AzureImageGenService(ImageGenService):
api_version=api_version,
)
def run_image_gen(self, sentence) -> tuple[str, Image.Image]:
async def run_image_gen(self, sentence) -> tuple[str, bytes]:
self.logger.info("Generating azure image", sentence)
image = self.client.images.generate(
model=self.model,
prompt=sentence,
n=1,
size=f"1024x1024",
size=self.image_size,
)
url = image["data"][0]["url"]
response = requests.get(url)
dalle_stream = io.BytesIO(response.content)
dalle_im = Image.open(dalle_stream)
dalle_im = Image.open(dalle_stream.tobytes())
return (url, dalle_im)

View File

@@ -0,0 +1,357 @@
import asyncio
import inspect
import logging
import time
import types
from functools import partial
from queue import Queue, Empty
from dailyai.queue_frame import QueueFrame, FrameType
from threading import Thread, Event, Timer
from daily import (
EventHandler,
CallClient,
Daily,
VirtualCameraDevice,
VirtualMicrophoneDevice,
VirtualSpeakerDevice,
)
class DailyTransportService(EventHandler):
def __init__(
self,
room_url: str,
token: str | None,
bot_name: str,
duration: float = 10,
):
super().__init__()
self.bot_name: str = bot_name
self.room_url: str = room_url
self.token: str | None = token
self.duration: float = duration
self.expiration = time.time() + duration * 60
# This queue is used to marshal frames from the async send queue to the thread that emits audio & video.
# We need this to maintain the asynchronous behavior of asyncio queues -- to give async functions
# a chance to run while waiting for queue items -- but also to maintain thread safety and have a threaded
# handler to send frames, to ensure that sending isn't subject to pauses in the async thread.
self.threadsafe_send_queue = Queue()
self.is_interrupted = Event()
self.stop_threads = Event()
self.story_started = False
self.mic_enabled = False
self.mic_sample_rate = 16000
self.camera_width = 1024
self.camera_height = 768
self.camera_enabled = False
self.send_queue = asyncio.Queue()
self.receive_queue = asyncio.Queue()
self.other_participant_has_joined = False
self.camera_thread = None
self.frame_consumer_thread = None
self.transcription_settings = {
"language": "en",
"tier": "nova",
"model": "2-conversationalai",
"profanity_filter": True,
"redact": False,
"extra": {
"endpointing": True,
"punctuate": False,
},
}
self.logger: logging.Logger = logging.getLogger("dailyai")
self.event_handlers = {}
try:
self.loop = asyncio.get_running_loop()
except RuntimeError:
self.loop = None
def patch_method(self, event_name, *args, **kwargs):
try:
for handler in self.event_handlers[event_name]:
if inspect.iscoroutinefunction(handler):
if self.loop:
asyncio.run_coroutine_threadsafe(handler(*args, **kwargs), self.loop)
else:
raise Exception("No event loop to run coroutine. In order to use async event handlers, you must run the DailyTransportService in an asyncio event loop.")
else:
handler(*args, **kwargs)
except Exception as e:
self.logger.error(f"Exception in event handler {event_name}: {e}")
def add_event_handler(self, event_name: str, handler):
if not event_name.startswith("on_"):
raise Exception(f"Event handler {event_name} must start with 'on_'")
methods = inspect.getmembers(self, predicate=inspect.ismethod)
if event_name not in [method[0] for method in methods]:
raise Exception(f"Event handler {event_name} not found")
if not event_name in self.event_handlers:
self.event_handlers[event_name] = [getattr(self, event_name), types.MethodType(handler, self)]
setattr(self, event_name, partial(self.patch_method, event_name))
else:
self.event_handlers[event_name].append(types.MethodType(handler, self))
def event_handler(self, event_name: str):
def decorator(handler):
self.add_event_handler(event_name, handler)
return handler
return decorator
def configure_daily(self):
Daily.init()
self.client = CallClient(event_handler=self)
if self.mic_enabled:
self.mic: VirtualMicrophoneDevice = Daily.create_microphone_device(
"mic", sample_rate=self.mic_sample_rate, channels=1
)
if self.camera_enabled:
self.camera: VirtualCameraDevice = Daily.create_camera_device(
"camera", width=self.camera_width, height=self.camera_height, color_format="RGB"
)
self.speaker: VirtualSpeakerDevice = Daily.create_speaker_device(
"speaker", sample_rate=16000, channels=1
)
self.image: bytes | None = None
self.camera_thread = Thread(target=self.run_camera, daemon=True)
self.camera_thread.start()
self.logger.info("Starting frame consumer thread")
self.frame_consumer_thread = Thread(target=self.frame_consumer, daemon=True)
self.frame_consumer_thread.start()
Daily.select_speaker_device("speaker")
self.client.set_user_name(self.bot_name)
self.client.join(self.room_url, self.token, completion=self.call_joined)
self.client.update_inputs(
{
"camera": {
"isEnabled": True,
"settings": {
"deviceId": "camera",
},
},
"microphone": {
"isEnabled": True,
"settings": {
"deviceId": "mic",
"customConstraints": {
"autoGainControl": {"exact": False},
"echoCancellation": {"exact": False},
"noiseSuppression": {"exact": False},
},
},
},
}
)
self.client.update_publishing(
{
"camera": {
"sendSettings": {
"maxQuality": "low",
"encodings": {
"low": {
"maxBitrate": 250000,
"scaleResolutionDownBy": 1.333,
"maxFramerate": 8,
}
},
}
}
}
)
if self.token:
self.client.start_transcription(self.transcription_settings)
self.my_participant_id = self.client.participants()["local"]["id"]
async def get_receive_frames(self):
while True:
frame = await self.receive_queue.get()
yield frame
if frame.frame_type == FrameType.END_STREAM:
break
def get_async_send_queue(self):
return self.send_queue
async def marshal_frames(self):
while True:
frame: QueueFrame | list = await self.send_queue.get()
self.threadsafe_send_queue.put(frame)
self.send_queue.task_done()
if type(frame) == QueueFrame and frame.frame_type == FrameType.END_STREAM:
break
async def wait_for_send_queue_to_empty(self):
await self.send_queue.join()
self.threadsafe_send_queue.join()
async def stop_when_done(self):
await self.wait_for_send_queue_to_empty()
self.stop()
async def run(self) -> None:
self.configure_daily()
self.participant_left = False
async_output_queue_marshal_task = asyncio.create_task(self.marshal_frames())
try:
participant_count: int = len(self.client.participants())
self.logger.info(f"{participant_count} participants in room")
while time.time() < self.expiration and not self.participant_left and not self.stop_threads.is_set():
await asyncio.sleep(1)
except Exception as e:
self.logger.error(f"Exception {e}")
finally:
self.client.leave()
self.stop_threads.set()
await self.receive_queue.put(QueueFrame(FrameType.END_STREAM, None))
await self.send_queue.put(QueueFrame(FrameType.END_STREAM, None))
await async_output_queue_marshal_task
if self.camera_thread and self.camera_thread.is_alive():
self.camera_thread.join()
if self.frame_consumer_thread and self.frame_consumer_thread.is_alive():
self.frame_consumer_thread.join()
def stop(self):
self.stop_threads.set()
def on_first_other_participant_joined(self):
pass
def call_joined(self, join_data, client_error):
self.logger.info(f"Call_joined: {join_data}, {client_error}")
def on_error(self, error):
self.logger.error(f"on_error: {error}")
def on_call_state_updated(self, state):
pass
def on_participant_joined(self, participant):
if not self.other_participant_has_joined and participant["id"] != self.my_participant_id:
self.other_participant_has_joined = True
self.on_first_other_participant_joined()
def on_participant_left(self, participant, reason):
if len(self.client.participants()) < 2:
self.participant_left = True
pass
def on_app_message(self, message, sender):
pass
def on_transcription_message(self, message:dict):
if self.loop:
frame = QueueFrame(FrameType.TRANSCRIPTION, message)
asyncio.run_coroutine_threadsafe(self.receive_queue.put(frame), self.loop)
def on_transcription_stopped(self, stopped_by, stopped_by_error):
pass
def on_transcription_error(self, message):
pass
def on_transcription_started(self, status):
pass
def set_image(self, image: bytes):
self.image: bytes | None = image
def run_camera(self):
try:
while not self.stop_threads.is_set():
if self.image:
self.camera.write_frame(self.image)
time.sleep(1.0 / 8) # 8 fps
except Exception as e:
self.logger.error(f"Exception {e} in camera thread.")
def frame_consumer(self):
self.logger.info("🎬 Starting frame consumer thread")
b = bytearray()
smallest_write_size = 3200
all_audio_frames = bytearray()
while True:
try:
frames_or_frame: QueueFrame | list[QueueFrame] = self.threadsafe_send_queue.get()
if type(frames_or_frame) == QueueFrame:
frames: list[QueueFrame] = [frames_or_frame]
elif type(frames_or_frame) == list:
frames: list[QueueFrame] = frames_or_frame
else:
raise Exception("Unknown type in output queue")
for frame in frames:
if frame.frame_type == FrameType.END_STREAM:
self.logger.info("Stopping frame consumer thread")
self.threadsafe_send_queue.task_done()
return
# if interrupted, we just pull frames off the queue and discard them
if not self.is_interrupted.is_set():
if frame:
if frame.frame_type == FrameType.AUDIO:
chunk = frame.frame_data
all_audio_frames.extend(chunk)
b.extend(chunk)
l = len(b) - (len(b) % smallest_write_size)
if l:
self.mic.write_frames(bytes(b[:l]))
b = b[l:]
elif frame.frame_type == FrameType.IMAGE:
self.set_image(frame.frame_data)
elif len(b):
self.mic.write_frames(bytes(b))
b = bytearray()
else:
if self.interrupt_time:
self.logger.info(
f"Lag to stop stream after interruption {time.perf_counter() - self.interrupt_time}"
)
self.interrupt_time = None
if frame.frame_type == FrameType.START_STREAM:
self.is_interrupted.clear()
self.threadsafe_send_queue.task_done()
except Empty:
try:
if len(b):
self.mic.write_frames(bytes(b))
except Exception as e:
self.logger.error(f"Exception in frame_consumer: {e}, {len(b)}")
b = bytearray()

View File

@@ -0,0 +1,29 @@
import aiohttp
import asyncio
import os
import requests
from collections.abc import AsyncGenerator
from dailyai.services.ai_services import TTSService
class DeepgramTTSService(TTSService):
def __init__(self, speech_key=None, voice=None):
super().__init__()
self.voice = voice or os.getenv("DEEPGRAM_VOICE") or "alpha-asteria-en-v2"
self.speech_key = speech_key or os.getenv("DEEPGRAM_API_KEY")
def get_mic_sample_rate(self):
return 24000
async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]:
self.logger.info(f"Running deepgram tts for {sentence}")
base_url = "https://api.beta.deepgram.com/v1/speak"
request_url = f"{base_url}?model={self.voice}&encoding=linear16&container=none&sample_rate=16000"
headers = {"authorization": f"token {self.speech_key}"}
body = { "text": sentence }
async with aiohttp.ClientSession() as session:
async with session.post(request_url, headers=headers, json=body) as r:
async for data in r.content:
yield data

View File

@@ -1,38 +1,36 @@
import aiohttp
import os
import requests
import time
from typing import Generator
from typing import AsyncGenerator
from ..services.ai_services import TTSService
from dailyai.services.ai_services import TTSService
class ElevenLabsTTSService(TTSService):
def __init__(self):
def __init__(self, api_key=None, voice_id=None):
super().__init__()
self.api_key = os.getenv("ELEVENLABS_API_KEY")
self.voice_id = os.getenv("ELEVENLABS_VOICE_ID")
self.api_key = api_key or os.getenv("ELEVENLABS_API_KEY")
self.voice_id = voice_id or os.getenv("ELEVENLABS_VOICE_ID")
def run_tts(self, sentence) -> Generator[bytes, None, None]:
url = f"https://api.elevenlabs.io/v1/text-to-speech/{self.voice_id}/stream"
payload = {"text": sentence, "model_id": "eleven_turbo_v2"}
querystring = {"output_format": "pcm_16000", "optimize_streaming_latency": 2}
headers = {
"xi-api-key": self.api_key,
"Content-Type": "application/json",
}
async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]:
async with aiohttp.ClientSession() as session:
url = f"https://api.elevenlabs.io/v1/text-to-speech/{self.voice_id}/stream"
payload = {"text": sentence, "model_id": "eleven_turbo_v2"}
querystring = {"output_format": "pcm_16000", "optimize_streaming_latency": 2}
headers = {
"xi-api-key": self.api_key,
"Content-Type": "application/json",
}
async with session.post(url, json=payload, headers=headers, params=querystring) as r:
if r.status != 200:
self.logger.error(
f"audio fetch status code: {r.status}, error: {r.text}"
)
return
r = requests.request(
"POST", url, json=payload, headers=headers, params=querystring, stream=True
)
if r.status_code != 200:
self.logger.error(
f"audio fetch status code: {r.status_code}, error: {r.text}"
)
return
for chunk in r.iter_content(chunk_size=3200):
if chunk:
yield chunk
async for chunk in r.content:
if chunk:
yield chunk

View File

@@ -0,0 +1,49 @@
import fal
import aiohttp
import asyncio
import io
import json
from PIL import Image
from dailyai.services.ai_services import LLMService, TTSService, ImageGenService
# Fal expects FAL_KEY_ID and FAL_KEY_SECRET to be set in the env
class FalImageGenService(ImageGenService):
def __init__(self, image_size):
super().__init__(image_size)
async def run_image_gen(self, sentence) -> tuple[str, bytes]:
def get_image_url(sentence, size):
print("starting fal submit...")
handler = fal.apps.submit(
"110602490-fast-sdxl",
arguments={
"prompt": sentence
},
)
print("past fal handler init, about to wait for iter_events...")
for event in handler.iter_events():
if isinstance(event, fal.apps.InProgress):
print('Request in progress')
print(event.logs)
result = handler.get()
image_url = result["images"][0]["url"] if result else None
if not image_url:
raise Exception("Image generation failed")
return image_url
print(f"fetching image url...")
image_url = await asyncio.to_thread(get_image_url, sentence, self.image_size)
print(f"got image url, downloading image...")
# Load the image from the url
async with aiohttp.ClientSession() as session:
async with session.get(image_url) as response:
print("got image response")
image_stream = io.BytesIO(await response.content.read())
print("read image stream")
image = Image.open(image_stream)
return (image_url, image.tobytes())
# return (image_url, dalle_im.tobytes())

View File

@@ -1,67 +0,0 @@
from dailyai.services.ai_services import AIService, TTSService, LLMService, ImageGenService
from typing import Generator
import requests
from PIL import Image
import io
from openai import OpenAI
import os
import json
class OpenAILLMService(LLMService):
def __init__(self, api_key=None, model=None):
super().__init__()
api_key = api_key or os.getenv("OPEN_AI_KEY")
self.model = model or os.getenv("OPEN_AI_MODEL")
self.client = OpenAI(api_key=api_key)
def get_response(self, messages, stream):
return self.client.chat.completions.create(
stream=stream,
messages=messages,
model=self.model
)
def run_llm_async(self, messages) -> Generator[str, None, None]:
messages_for_log = json.dumps(messages)
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
response = self.get_response(messages, stream=True)
for chunk in response:
if len(chunk.choices) == 0:
continue
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
def run_llm(self, messages) -> str | None:
messages_for_log = json.dumps(messages)
self.logger.debug(f"Generating chat via azure: {messages_for_log}")
response = self.get_response(messages, stream=False)
if response and len(response.choices) > 0:
return response.choices[0].message.content
else:
return None
class OpenAIImageGenService(ImageGenService):
def __init__(self, api_key=None, model=None):
super().__init__()
api_key = api_key or os.getenv("OPEN_AI_KEY")
self.model = model or os.getenv("OPEN_AI_MODEL")
self.client = OpenAI(api_key=api_key)
def run_image_gen(self, sentence) -> tuple[str, Image.Image]:
image = self.client.images.generate(
prompt=sentence,
n=1,
size=f"1024x1024"
)
image_url = image.data[0].url
response = requests.get(image_url)
dalle_stream = io.BytesIO(response.content)
dalle_im = Image.open(dalle_stream)
return (image_url, dalle_im)

View File

@@ -0,0 +1,79 @@
import requests
import aiohttp
import asyncio
from PIL import Image
import io
from openai import AsyncOpenAI
import os
import json
from collections.abc import AsyncGenerator
from dailyai.services.ai_services import AIService, TTSService, LLMService, ImageGenService
class OpenAILLMService(LLMService):
def __init__(self, api_key=None, model=None):
super().__init__()
api_key = api_key or os.getenv("OPEN_AI_KEY")
self.model = model or os.getenv("OPEN_AI_LLM_MODEL") or "gpt-4"
self.client = AsyncOpenAI(api_key=api_key)
async def get_response(self, messages, stream):
return await self.client.chat.completions.create(
stream=stream,
messages=messages,
model=self.model
)
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
messages_for_log = json.dumps(messages)
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
response = await self.get_response(messages, stream=True)
for chunk in response:
if len(chunk.choices) == 0:
continue
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
async def run_llm(self, messages) -> str | None:
messages_for_log = json.dumps(messages)
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
response = await self.get_response(messages, stream=False)
if response and len(response.choices) > 0:
return response.choices[0].message.content
else:
return None
class OpenAIImageGenService(ImageGenService):
def __init__(self, image_size:str, api_key=None, model=None):
super().__init__(image_size=image_size)
api_key = api_key or os.getenv("OPEN_AI_KEY")
self.model = model or os.getenv("OPEN_AI_IMAGE_MODEL") or "dall-e-3"
self.client = AsyncOpenAI(api_key=api_key)
async def run_image_gen(self, sentence) -> tuple[str, bytes]:
self.logger.info("Generating OpenAI image", sentence)
image = await self.client.images.generate(
prompt=sentence,
model=self.model,
n=1,
size=self.image_size
)
image_url = image.data[0].url
if not image_url:
raise Exception("No image provided in response", image)
# Load the image from the url
async with aiohttp.ClientSession() as session:
async with session.get(image_url) as response:
image_stream = io.BytesIO(await response.content.read())
image = Image.open(image_stream)
return (image_url, image.tobytes())
return (image_url, dalle_im.tobytes())

View File

@@ -20,7 +20,6 @@ class GoogleAIService(AIService):
)
def run_tts(self, sentence):
print("running google tts")
synthesis_input = texttospeech.SynthesisInput(text = sentence.strip())
result = self.client.synthesize_speech(input=synthesis_input, voice=self.voice, audio_config=self.audio_config)
return result

View File

@@ -13,7 +13,6 @@ class HuggingFaceAIService(AIService):
# available models at https://huggingface.co/Helsinki-NLP (**not all models use 2-character language codes**)
def run_text_translation(self, sentence, source_language, target_language):
translator = pipeline(f"translation", model=f"Helsinki-NLP/opus-mt-{source_language}-{target_language}")
print(translator(sentence))
return translator(sentence)[0]["translation_text"]

View File

@@ -0,0 +1,129 @@
from re import A
import unittest
from typing import AsyncGenerator, Generator
from dailyai.services.ai_services import AIService, SentenceAggregator
from dailyai.queue_frame import QueueFrame, FrameType
class SimpleAIService(AIService):
def allowed_input_frame_types(self) -> set[FrameType]:
return set([FrameType.TEXT_CHUNK])
def possible_output_frame_types(self) -> set[FrameType]:
return set([FrameType.TEXT_CHUNK])
async def process_frame(self, requested_frame_types: set[FrameType], frame: QueueFrame) -> QueueFrame | None:
return frame
class TestBaseAIService(unittest.IsolatedAsyncioTestCase):
async def test_async_input(self):
service = SimpleAIService()
input_frames = [
QueueFrame(FrameType.TEXT_CHUNK, "hello"),
QueueFrame(FrameType.END_STREAM, None),
]
async def iterate_frames() -> AsyncGenerator[QueueFrame, None]:
for frame in input_frames:
yield frame
output_frames = []
async for frame in service.run(set([FrameType.TEXT_CHUNK]), iterate_frames()):
output_frames.append(frame)
self.assertEqual(input_frames, output_frames)
async def test_nonasync_input(self):
service = SimpleAIService()
input_frames = [
QueueFrame(FrameType.TEXT_CHUNK, "hello"),
QueueFrame(FrameType.END_STREAM, None),
]
def iterate_frames() -> Generator[QueueFrame, None, None]:
for frame in input_frames:
yield frame
output_frames = []
async for frame in service.run(set([FrameType.TEXT_CHUNK]), iterate_frames()):
output_frames.append(frame)
self.assertEqual(input_frames, output_frames)
class TestSentenceAggregator(unittest.IsolatedAsyncioTestCase):
async def test_clause(self) -> None:
input_frames = [
QueueFrame(FrameType.TEXT_CHUNK, "hello"),
QueueFrame(FrameType.END_STREAM, None),
]
service = SentenceAggregator()
output_frames = []
async for frame in service.run(set([FrameType.SENTENCE]), input_frames):
output_frames.append(frame)
self.assertEqual(1, len(output_frames))
self.assertEqual(QueueFrame(FrameType.SENTENCE, "hello"), output_frames[0])
async def test_sentence(self) -> None:
input_frames = [
QueueFrame(FrameType.TEXT_CHUNK, "hello, "),
QueueFrame(FrameType.TEXT_CHUNK, "world."),
QueueFrame(FrameType.END_STREAM, None),
]
service = SentenceAggregator()
output_frames = []
async for frame in service.run(set([FrameType.SENTENCE]), input_frames):
output_frames.append(frame)
self.assertEqual(1, len(output_frames))
self.assertEqual(QueueFrame(FrameType.SENTENCE, "hello, world."), output_frames[0])
async def test_sentence_and_clause(self) -> None:
input_frames = [
QueueFrame(FrameType.TEXT_CHUNK, "hello, "),
QueueFrame(FrameType.TEXT_CHUNK, "world."),
QueueFrame(FrameType.TEXT_CHUNK, " How are"),
QueueFrame(FrameType.END_STREAM, None),
]
service = SentenceAggregator()
output_frames = []
async for frame in service.run(set([FrameType.SENTENCE]), input_frames):
output_frames.append(frame)
self.assertEqual(2, len(output_frames))
self.assertEqual(
QueueFrame(FrameType.SENTENCE, "hello, world."), output_frames[0]
)
self.assertEqual(
QueueFrame(FrameType.SENTENCE, " How are"), output_frames[1]
)
async def test_two_sentences(self) -> None:
input_frames = [
QueueFrame(FrameType.TEXT_CHUNK, "hello, "),
QueueFrame(FrameType.TEXT_CHUNK, "world."),
QueueFrame(FrameType.TEXT_CHUNK, " How are"),
QueueFrame(FrameType.TEXT_CHUNK, " you doing?"),
QueueFrame(FrameType.END_STREAM, None),
]
service = SentenceAggregator()
output_frames = []
async for frame in service.run(set([FrameType.SENTENCE]), input_frames):
output_frames.append(frame)
self.assertEqual(2, len(output_frames))
self.assertEqual(
QueueFrame(FrameType.SENTENCE, "hello, world."), output_frames[0]
)
self.assertEqual(QueueFrame(FrameType.SENTENCE, " How are you doing?"), output_frames[1])
if __name__ == "__main__":
unittest.main()

View File

@@ -11,14 +11,14 @@ from dailyai.async_processor.async_processor import (
LLMResponse,
)
from dailyai.message_handler.message_handler import MessageHandler
from dailyai.output_queue import OutputQueueFrame, FrameType
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.services.ai_services import (
AIServiceConfig,
ImageGenService,
LLMService,
TTSService,
)
"""
class MockTTSService(TTSService):
def run_tts(self, sentence):
for word in sentence.split(' '):
@@ -71,7 +71,7 @@ class TestResponse(unittest.TestCase):
output_queue.task_done()
while expected_words:
actual_word:OutputQueueFrame = output_queue.get()
actual_word:QueueFrame = output_queue.get()
word = expected_words.pop(0)
self.assertEqual(actual_word.frame_type, FrameType.AUDIO_FRAME)
self.assertEqual(actual_word.frame_data, bytes(word, "utf-8"))
@@ -127,7 +127,7 @@ class TestResponse(unittest.TestCase):
expected_words = ["Hello", "there.", "How", "are", "you?", "I", "hope", "you", "are", "well."]
while expected_words and not stop_processing_output_queue.is_set():
try:
actual_word:OutputQueueFrame = output_queue.get_nowait()
actual_word:QueueFrame = output_queue.get_nowait()
if actual_word.frame_type == FrameType.AUDIO_FRAME:
time.sleep(0.1)
word = expected_words.pop(0)
@@ -177,3 +177,4 @@ class TestResponse(unittest.TestCase):
if __name__ == '__main__':
unittest.main()
"""

View File

@@ -0,0 +1 @@
These samples need to be updated! Don't rely on them.

View File

@@ -13,16 +13,18 @@ from dailyai.orchestrator import OrchestratorConfig, Orchestrator
from dailyai.message_handler.message_handler import MessageHandler
from dailyai.services.ai_services import AIServiceConfig
from dailyai.services.azure_ai_services import AzureImageGenService, AzureTTSService, AzureLLMService
from dailyai.services.deepgram_ai_services import DeepgramTTSService
def add_bot_to_room(room_url, token, expiration) -> None:
# A simple prompt for a simple sample.
message_handler = MessageHandler(
"""
You are a sample bot, meant to demonstrate how to use an LLM with transcription at TTS.
You are a sample bot in a WebRTC session. You'll receive input as transcriptions of user's
speech, and your responses will be converted to audio via a TTS service.
Answer user's questions and be friendly, and if you can, give some ideas about how someone
could use a bot like you in a more in-depth way. Because your responses will be spoken,
try to keep them short and sweet.
try to keep them short.
"""
)
@@ -33,12 +35,6 @@ def add_bot_to_room(room_url, token, expiration) -> None:
# - AZURE_CHATGPT_KEY
# - AZURE_CHATGPT_ENDPOINT
# - AZURE_CHATGPT_DEPLOYMENT_ID
#
# This demo doesn't use image generation, but if you extend it to do so,
# you'll also need to set:
# - AZURE_DALLE_KEY
# - AZURE_DALLE_ENDPOINT
# - AZURE_DALLE_DEPLOYMENT_ID
services = AIServiceConfig(
tts=AzureTTSService(), image=None, llm=AzureLLMService()

View File

@@ -15,7 +15,7 @@ from dailyai.async_processor.async_processor import (
OrchestratorResponse
)
from dailyai.orchestrator import OrchestratorConfig, Orchestrator
from dailyai.output_queue import OutputQueueFrame, FrameType
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.message_handler.message_handler import MessageHandler
from dailyai.services.ai_services import AIServiceConfig
from dailyai.services.azure_ai_services import AzureImageGenService, AzureTTSService, AzureLLMService
@@ -40,7 +40,7 @@ class StaticSpriteResponse(OrchestratorResponse):
self.image_bytes = img.tobytes()
def do_play(self) -> None:
self.output_queue.put(OutputQueueFrame(FrameType.IMAGE_FRAME, self.image_bytes))
self.output_queue.put(QueueFrame(FrameType.IMAGE, self.image_bytes))
class IntroSpriteResponse(StaticSpriteResponse):
@@ -71,10 +71,10 @@ class AnimatedSpriteLLMResponse(LLMResponse):
with Image.open(full_path) as img:
self.image_bytes.append(img.tobytes())
def get_frames_from_tts_response(self, audio_frame) -> list[OutputQueueFrame]:
def get_frames_from_tts_response(self, audio_frame) -> list[QueueFrame]:
return [
OutputQueueFrame(FrameType.AUDIO_FRAME, audio_frame),
OutputQueueFrame(FrameType.IMAGE_FRAME, random.choice(self.image_bytes))
QueueFrame(FrameType.AUDIO, audio_frame),
QueueFrame(FrameType.IMAGE, random.choice(self.image_bytes))
]
@@ -83,10 +83,11 @@ def add_bot_to_room(room_url, token, expiration) -> None:
# A simple prompt for a simple sample.
message_handler = MessageHandler(
"""
You are a sample bot, meant to demonstrate how to use an LLM with transcription at TTS.
You are a sample bot in a WebRTC session. You'll receive input as transcriptions of user's
speech, and your responses will be converted to audio via a TTS service.
Answer user's questions and be friendly, and if you can, give some ideas about how someone
could use a bot like you in a more in-depth way. Because your responses will be spoken,
try to keep them short and sweet.
try to keep them short.
"""
)

View File

Before

Width:  |  Height:  |  Size: 871 KiB

After

Width:  |  Height:  |  Size: 871 KiB

View File

Before

Width:  |  Height:  |  Size: 870 KiB

After

Width:  |  Height:  |  Size: 870 KiB

View File

Before

Width:  |  Height:  |  Size: 871 KiB

After

Width:  |  Height:  |  Size: 871 KiB

View File

Before

Width:  |  Height:  |  Size: 868 KiB

After

Width:  |  Height:  |  Size: 868 KiB

113
src/samples/image-gen.py Normal file
View File

@@ -0,0 +1,113 @@
import argparse
import asyncio
import requests
import time
import urllib.parse
import random
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.services.fal_ai_services import FalImageGenService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
async def main(room_url:str, token):
global transport
global llm
global tts
transport = DailyTransportService(
room_url,
token,
"Imagebot",
1,
)
transport.mic_enabled = True
transport.camera_enabled = True
transport.mic_sample_rate = 16000
transport.camera_width = 1024
transport.camera_height = 1024
llm = AzureLLMService()
tts = AzureTTSService()
img = FalImageGenService()
async def handle_transcriptions():
print("handle_transcriptions got called")
sentence = ""
async for message in transport.get_transcriptions():
print(f"transcription message: {message}")
if message["session_id"] == transport.my_participant_id:
continue
finder = message["text"].find("start over")
print(f"finder: {finder}")
if finder >= 0:
async for audio in tts.run_tts(f"Resetting."):
transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio))
sentence = ""
continue
# todo: we could differentiate between transcriptions from different participants
sentence += f" {message['text']}"
print(f"sentence is now: {sentence}")
# TODO: Cache this audio
phrase = random.choice(["OK.", "Got it.", "Sure.", "You bet.", "Sure thing."])
async for audio in tts.run_tts(phrase):
transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio))
img_result = img.run_image_gen(sentence, "1024x1024")
awaited_img = await asyncio.gather(img_result)
transport.output_queue.put(
[
QueueFrame(FrameType.IMAGE_FRAME, awaited_img[0][1]),
]
)
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, participant):
print(f"participant joined: {participant['info']['userName']}")
if participant["info"]["isLocal"]:
return
async for audio in tts.run_tts("Describe an image, and I'll create it."):
audio_generator = tts.run_tts(f"Hello, {participant['info']['userName']}! Describe an image and I'll create it. To start over, just say 'start over'.")
async for audio in audio_generator:
transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio))
transport.transcription_settings["extra"]["punctuate"] = False
transport.transcription_settings["extra"]["endpointing"] = False
await asyncio.gather(transport.run(), handle_transcriptions())
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
type=str,
required=True,
help="Daily API Key (needed to create token)",
)
args, unknown = parser.parse_known_args()
# Create a meeting token for the given room with an expiration 1 hour in the future.
room_name: str = urllib.parse.urlparse(args.url).path[1:]
expiration: float = time.time() + 60 * 60
res: requests.Response = requests.post(
f"https://api.daily.co/v1/meeting-tokens",
headers={"Authorization": f"Bearer {args.apikey}"},
json={
"properties": {"room_name": room_name, "is_owner": True, "exp": expiration}
},
)
if res.status_code != 200:
raise Exception(f"Failed to create meeting token: {res.status_code} {res.text}")
token: str = res.json()["token"]
asyncio.run(main(args.url, token))

Binary file not shown.

View File

@@ -0,0 +1,54 @@
import argparse
import asyncio
from typing import AsyncGenerator
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureTTSService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
async def main(room_url):
# create a transport service object using environment variables for
# the transport service's API key, room url, and any other configuration.
# services can all define and document the environment variables they use.
# services all also take an optional config object that is used instead of
# environment variables.
#
# the abstract transport service APIs presumably can map pretty closely
# to the daily-python basic API
meeting_duration_minutes = 1
transport = DailyTransportService(
room_url,
None,
"Say One Thing",
meeting_duration_minutes,
)
transport.mic_enabled = True
tts = ElevenLabsTTSService(voice_id="ErXwobaYiN019PkySvjV")
# Register an event handler so we can play the audio when the participant joins.
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, participant):
if participant["info"]["isLocal"]:
return
await tts.say(
"Hello there, " + participant["info"]["userName"] + "!",
transport.send_queue,
)
# wait for the output queue to be empty, then leave the meeting
await transport.stop_when_done()
await transport.run()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
)
args, unknown = parser.parse_known_args()
asyncio.run(main(args.url))

View File

@@ -0,0 +1,55 @@
import asyncio
import time
from typing import AsyncGenerator
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureTTSService
from dailyai.services.deepgram_ai_services import DeepgramTTSService
async def main(room_url):
# create a transport service object using environment variables for
# the transport service's API key, room url, and any other configuration.
# services can all define and document the environment variables they use.
# services all also take an optional config object that is used instead of
# environment variables.
#
# the abstract transport service APIs presumably can map pretty closely
# to the daily-python basic API
meeting_duration_minutes = 1
transport = DailyTransportService(
room_url,
None,
"Greeter",
meeting_duration_minutes,
)
transport.mic_enabled = True
# similarly, create a tts service
tts = DeepgramTTSService()
# Get the generator for the audio. This will start running in the background,
# and when we ask the generator for its items, we'll get what it's generated.
# Register an event handler so we can play the audio when the participant joins.
print("settting up handler")
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, participant):
print(f"participant joined: {participant['info']['userName']}")
if participant["info"]["isLocal"]:
return
audio_generator: AsyncGenerator[bytes, None] = tts.run_tts(f"Hello there, {participant['info']['userName']}!")
async for audio in audio_generator:
transport.output_queue.put(QueueFrame(FrameType.AUDIO, audio))
print("setting up call state handler")
@transport.event_handler("on_call_state_updated")
async def on_call_joined(transport, state):
print(f"call state callback: {state}")
await transport.run()
if __name__ == "__main__":
asyncio.run(main("https://chad-hq.daily.co/howdy"))

View File

@@ -0,0 +1,52 @@
import argparse
import asyncio
from typing import AsyncGenerator
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.ai_services import SentenceAggregator
from dailyai.services.azure_ai_services import AzureLLMService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
async def main(room_url):
meeting_duration_minutes = 1
transport = DailyTransportService(
room_url,
None,
"Say One Thing From an LLM",
meeting_duration_minutes,
)
transport.mic_enabled = True
tts = ElevenLabsTTSService(voice_id="29vD33N1CtxCmqQRPOHJ")
llm = AzureLLMService()
messages = [{
"role": "system",
"content": "You are an LLM in a WebRTC session, and this is a 'hello world' demo. Say hello to the world."
}]
tts_task = asyncio.create_task(
tts.run_to_queue(
transport.send_queue,
SentenceAggregator().run(
llm.run([QueueFrame(FrameType.LLM_MESSAGE, messages)])
)
)
)
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
await tts_task
await transport.stop_when_done()
await transport.run()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
)
args, unknown = parser.parse_known_args()
asyncio.run(main(args.url))

View File

@@ -0,0 +1,44 @@
import argparse
import asyncio
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.open_ai_services import OpenAIImageGenService
local_joined = False
participant_joined = False
async def main(room_url):
meeting_duration_minutes = 1
transport = DailyTransportService(
room_url,
None,
"Show a still frame image",
meeting_duration_minutes,
)
transport.mic_enabled = False
transport.camera_enabled = True
transport.camera_width = 1024
transport.camera_height = 1024
imagegen = OpenAIImageGenService(image_size="1024x1024")
image_task = asyncio.create_task(
imagegen.run_to_queue(transport.send_queue, [QueueFrame(FrameType.IMAGE_DESCRIPTION, "a cat in the style of picasso")])
)
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, participant):
await image_task
await transport.run()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
)
args, unknown = parser.parse_known_args()
asyncio.run(main(args.url))

View File

@@ -0,0 +1,79 @@
import argparse
import asyncio
import re
from dailyai.services.ai_services import SentenceAggregator
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
async def main(room_url:str):
global transport
global llm
global tts
transport = DailyTransportService(
room_url,
None,
"Say Two Things Bot",
1,
)
transport.mic_enabled = True
transport.mic_sample_rate = 16000
transport.camera_enabled = False
llm = AzureLLMService()
azure_tts = AzureTTSService()
elevenlabs_tts = ElevenLabsTTSService(voice_id="ErXwobaYiN019PkySvjV")
messages = [{"role": "system", "content": "tell the user a joke about llamas"}]
# Start a task to run the LLM to create a joke, and convert the LLM output to audio frames. This task
# will run in parallel with generating and speaking the audio for static text, so there's no delay to
# speak the LLM response.
buffer_queue = asyncio.Queue()
llm_response_task = asyncio.create_task(
elevenlabs_tts.run_to_queue(
buffer_queue,
SentenceAggregator().run(
llm.run([QueueFrame(FrameType.LLM_MESSAGE, messages)])
),
True,
)
)
@transport.event_handler("on_participant_joined")
async def on_joined(transport, participant):
if participant["id"] == transport.my_participant_id:
return
await azure_tts.run_to_queue(
transport.send_queue,
[QueueFrame(FrameType.SENTENCE, "My friend the LLM is now going to tell a joke about llamas.")]
)
async def buffer_to_send_queue():
while True:
frame = await buffer_queue.get()
await transport.send_queue.put(frame)
buffer_queue.task_done()
if frame.frame_type == FrameType.END_STREAM:
break
await asyncio.gather(llm_response_task, buffer_to_send_queue())
await transport.stop_when_done()
await transport.run()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
)
args, unknown = parser.parse_known_args()
asyncio.run(main(args.url))

View File

@@ -0,0 +1,108 @@
import argparse
import asyncio
from asyncio.queues import Queue
import re
from dailyai.queue_frame import QueueFrame, FrameType
from dailyai.services.ai_services import SentenceAggregator
from dailyai.services.azure_ai_services import AzureLLMService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.open_ai_services import OpenAIImageGenService
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.fal_ai_services import FalImageGenService
async def main(room_url):
meeting_duration_minutes = 5
transport = DailyTransportService(
room_url,
None,
"Month Narration Bot",
meeting_duration_minutes,
)
transport.mic_enabled = True
transport.camera_enabled = True
transport.mic_sample_rate = 16000
transport.camera_width = 1024
transport.camera_height = 1024
llm = AzureLLMService()
dalle = FalImageGenService(image_size="1024x1024")
tts = ElevenLabsTTSService(voice_id="ErXwobaYiN019PkySvjV")
# dalle = OpenAIImageGenService(image_size="1024x1024")
# Get a complete audio chunk from the given text. Splitting this into its own
# coroutine lets us ensure proper ordering of the audio chunks on the send queue.
async def get_all_audio(text):
all_audio = bytearray()
async for audio in tts.run_tts(text):
all_audio.extend(audio)
return all_audio
async def get_month_data(month):
messages = [
{
"role": "system",
"content": f"Describe a nature photograph suitable for use in a calendar, for the month of {month}. Include only the image description with no preamble. Limit the description to one sentence, please.",
}
]
image_description = await llm.run_llm(messages)
to_speak = f"{month}: {image_description}"
(audio, image_data) = await asyncio.gather(
get_all_audio(to_speak), dalle.run_image_gen(image_description)
)
return {
"month": month,
"text": image_description,
"image": image_data[1],
"audio": audio,
}
months: list[str] = [
"January",
"February",
"March",
"April",
"May",
"June",
"July",
"August",
"September",
"October",
"November",
"December",
]
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
# This will play the months in the order they're completed. The benefit
# is we'll have as little delay as possible before the first month, and
# likely no delay between months, but the months won't display in order.
for month_data_task in asyncio.as_completed(month_tasks):
data = await month_data_task
await transport.send_queue.put(
[
QueueFrame(FrameType.IMAGE, data["image"]),
QueueFrame(FrameType.AUDIO, data["audio"]),
]
)
# wait for the output queue to be empty, then leave the meeting
await transport.stop_when_done()
month_tasks = [asyncio.create_task(get_month_data(month)) for month in months]
await transport.run()
if __name__=="__main__":
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
)
args, unknown = parser.parse_known_args()
asyncio.run(main(args.url))

View File

@@ -0,0 +1,94 @@
import argparse
import asyncio
import requests
import time
import urllib.parse
from dailyai.services.ai_services import SentenceAggregator
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.queue_frame import QueueFrame, FrameType
async def main(room_url:str, token):
global transport
global llm
global tts
transport = DailyTransportService(
room_url,
token,
"Respond bot",
1,
)
transport.mic_enabled = True
transport.mic_sample_rate = 16000
transport.camera_enabled = False
llm = AzureLLMService()
tts = AzureTTSService()
async def handle_transcriptions():
messages = [
{"role": "system", "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way."},
]
sentence = ""
async for frame in transport.get_receive_frames():
if frame.frame_type != FrameType.TRANSCRIPTION:
continue
message = frame.frame_data
if message["session_id"] == transport.my_participant_id:
continue
# todo: we could differentiate between transcriptions from different participants
sentence += message["text"]
if sentence.endswith((".", "?", "!")):
messages.append({"role": "user", "content": sentence})
sentence = ''
full_response = ""
async for response in llm.run_llm_async_sentences(messages):
full_response += response
async for audio in tts.run_tts(response):
await transport.send_queue.put(QueueFrame(FrameType.AUDIO, audio))
messages.append({"role": "assistant", "content": full_response})
transport.transcription_settings["extra"]["punctuate"] = True
await asyncio.gather(transport.run(), handle_transcriptions())
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
type=str,
required=True,
help="Daily API Key (needed to create token)",
)
args, unknown = parser.parse_known_args()
# Create a meeting token for the given room with an expiration 1 hour in the future.
room_name: str = urllib.parse.urlparse(args.url).path[1:]
expiration: float = time.time() + 60 * 60
res: requests.Response = requests.post(
f"https://api.daily.co/v1/meeting-tokens",
headers={"Authorization": f"Bearer {args.apikey}"},
json={
"properties": {"room_name": room_name, "is_owner": True, "exp": expiration}
},
)
if res.status_code != 200:
raise Exception(f"Failed to create meeting token: {res.status_code} {res.text}")
token: str = res.json()["token"]
asyncio.run(main(args.url, token))