Compare commits
65 Commits
khk/simple
...
cb/09-bots
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
50869e8cfd | ||
|
|
690cf2e47d | ||
|
|
ba89e41c5b | ||
|
|
c134598a77 | ||
|
|
b51abd2969 | ||
|
|
3fda9b0ecb | ||
|
|
c7c816feb3 | ||
|
|
95c92e5304 | ||
|
|
b443fbdb60 | ||
|
|
ccd2fa31e5 | ||
|
|
9b65286216 | ||
|
|
6ae733ebfe | ||
|
|
6c17594f1f | ||
|
|
c47f3d0c86 | ||
|
|
1071dede1a | ||
|
|
f03e208580 | ||
|
|
74e2c571c1 | ||
|
|
d9311195c9 | ||
|
|
9a018403c7 | ||
|
|
ceeb93dd46 | ||
|
|
f55333844e | ||
|
|
0245f98eb5 | ||
|
|
f9f2e2d7ea | ||
|
|
0d21768d00 | ||
|
|
13f2f792af | ||
|
|
a3ac0d84e8 | ||
|
|
6e8ebbd34c | ||
|
|
b1e6a89acd | ||
|
|
115b744861 | ||
|
|
755059c358 | ||
|
|
cfaccefe9c | ||
|
|
5b49597854 | ||
|
|
869d557ded | ||
|
|
a42f6bc531 | ||
|
|
9bbfc8ad05 | ||
|
|
ad427bea3a | ||
|
|
ec1f2362c5 | ||
|
|
37207745c9 | ||
|
|
b9b82695c6 | ||
|
|
7ca7764be3 | ||
|
|
df1bbef653 | ||
|
|
7229fc806e | ||
|
|
cd204ebd21 | ||
|
|
cb63307ddf | ||
|
|
37f48d9f04 | ||
|
|
a29cb1a5ad | ||
|
|
20a9c6a938 | ||
|
|
534a4e2fa1 | ||
|
|
11a553240f | ||
|
|
4efeae46bc | ||
|
|
a59cec526f | ||
|
|
ac0e4b0c27 | ||
|
|
6cace129fd | ||
|
|
290c1e7efa | ||
|
|
d95bca479d | ||
|
|
0451ae498f | ||
|
|
712ab97f88 | ||
|
|
b5c7e30efa | ||
|
|
7f51c0c9b2 | ||
|
|
b48a377b17 | ||
|
|
5b4c085cd2 | ||
|
|
cd2c9700ad | ||
|
|
fcd9a248d9 | ||
|
|
c68703749b | ||
|
|
aab06f4441 |
5
.gitignore
vendored
@@ -2,6 +2,7 @@
|
||||
env/
|
||||
__pycache__/
|
||||
*~
|
||||
venv
|
||||
#*#
|
||||
|
||||
# Distribution / packaging
|
||||
@@ -22,4 +23,6 @@ share/python-wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
MANIFEST
|
||||
MANIFEST
|
||||
.DS_Store
|
||||
.env
|
||||
|
||||
98
README.md
@@ -5,12 +5,14 @@ This SDK can help you build applications that participate in WebRTC meetings and
|
||||
## Build/Install
|
||||
|
||||
_Note that you may need to set up a virtual environment before following the instructions below. For instance, you might need to run the following from the root of the repo:_
|
||||
|
||||
```
|
||||
python3 -m venv env
|
||||
source env/bin/activate
|
||||
```
|
||||
|
||||
From the root of this repo, run the following:
|
||||
|
||||
```
|
||||
pip install -r requirements.txt
|
||||
python -m build
|
||||
@@ -19,10 +21,11 @@ python -m build
|
||||
This builds the package. To use the package locally (eg to run sample files), run
|
||||
|
||||
```
|
||||
pip install .
|
||||
pip install --editable .
|
||||
```
|
||||
|
||||
If you want to use this package from another directory, you can run:
|
||||
|
||||
```
|
||||
pip install path_to_this_repo
|
||||
```
|
||||
@@ -32,7 +35,7 @@ pip install path_to_this_repo
|
||||
Tou can run the simple sample like so:
|
||||
|
||||
```
|
||||
python src/samples/simple-sample/simple-sample.py -u your_room_url -k your_daily_api_key
|
||||
python src/samples/theoretical-to-real/01-say-one-thing.py -u <url of your Daily meeting> -k <your Daily API Key>
|
||||
```
|
||||
|
||||
Note that the sample uses Azure's TTS and LLM services. You'll need to set the following environment variables for the sample to work:
|
||||
@@ -44,3 +47,94 @@ AZURE_CHATGPT_KEY
|
||||
AZURE_CHATGPT_ENDPOINT
|
||||
AZURE_CHATGPT_DEPLOYMENT_ID
|
||||
```
|
||||
|
||||
If you have those environment variables stored in an .env file, you can quickly load them into your terminal's environment by running this:
|
||||
|
||||
```bash
|
||||
export $(grep -v '^#' .env | xargs)
|
||||
```
|
||||
|
||||
## Overview
|
||||
The Daily AI SDK allows you to build applications that can participate in WebRTC sessions and interact with AI Services. Some examples of what you can build with this:
|
||||
* conversational bots that interact 1:1 with a user, using voice recognition and text-to-speech
|
||||
* assistant bots that aggregate transcriptions from multiple participants in a meeting and provide realtime summaries or other AI-generated output.
|
||||
* image-recognition bots
|
||||
* etc
|
||||
## Concepts
|
||||
### Transport Service
|
||||
The SDK provides one “transport service”, which is a wrapper around Daily’s `daily-python` client (tk add link). You can use this service to listen for events related to a WebRTC session, such as “a participant joined the meeting”.
|
||||
The transport service also exposes a send queue, and a receive queue. You can use the send queue to send audio and video to the WebRTC session, and you can listen to the receive queue to see audio, video and transcription data from the WebRTC session.
|
||||
### AI Services
|
||||
The AI Service classes provide wrappers around various AI providers, and allow you to query LLMs, convert text to speech and make images from text. The audio and images can then be placed on the transport service’s send queue, where they’ll be sent to the WebRTC session.
|
||||
### Queue Frames
|
||||
Communication between the transport service and AI services, and between various AI services, takes place in Queue Frames. These frames contain an indication of the type of data as well as the data itself.
|
||||
## Using Transports, AI Services and Frames
|
||||
AI Services all define a `.run` method. This method consumes and generates `QueueFrame` frames. The kind of frames that can be consumed and generated depend on the kind of service. For instance, an LLM AI Service consumes `LLM_MESSAGE` frames (which define a history of interaction with an LLM) and emit `TEXT` frames (the response from the LLM).
|
||||
|
||||
The `.run` method is an `AsyncIterable`, and it takes an `iterable`, `AsyncIterable` or `asyncio.Queue` that produces QueueFrames as a parameter. This makes it easy to chain AI Services, and consume input from the Transport’s `receive_queue` .
|
||||
|
||||
AI Services also have a `.run_to_queue` method. This method is not an AsyncIterable, but instead sends processed QueueFrames to a queue. This makes it easy to send the output of an AI Service to the Transport’s `send_queue`.
|
||||
|
||||
AI Services also define convenience functions that let you bypass creating QueueFrames for some simple cases (eg. using the TTS service to convert a string to audio output and send that audio to the transport’s `send_queue`). See below for examples.
|
||||
## Examples
|
||||
### Say Something
|
||||
The base TTS AI service exposes a `.say` method. After creating a transport and TTS service, you can use this method like so:
|
||||
```
|
||||
transport = DailyTransportService(...)
|
||||
tts = AzureTTSService()
|
||||
await tts.say("hello world", transport.send_queue)
|
||||
```
|
||||
This will call the TTS service to render the text to audio frames, then put the audio frames on the transport’s send queue. The transport will then send those frames along to the WebRTC session.
|
||||
|
||||
### Speak an LLM response
|
||||
Given a system prompt contained in a `messages` array, you can emit the LLM’s response as audio with a chain like this:
|
||||
```
|
||||
transport = DailyTransportService(...) # setup parameters omitted
|
||||
tts = AzureTTSService()
|
||||
llm = AzureLLMService()
|
||||
messages = [...] # system prompt omitted for brevity
|
||||
|
||||
await tts.run_to_queue(
|
||||
transport.send_queue,
|
||||
llm.run([QueueFrame.LLM_MESSAGES, messages])
|
||||
)
|
||||
```
|
||||
In this code, the LLM service object sends the messages to Azure’s OpenAI implementation, which streams chunks back asynchronously. Those chunks are aggregated by the TTS Service to ensure the best audio response (TTS works best when it gets complete sentence, so it can inflect correctly), then sent to Azure’s TTS service, converted to audio frames, and sent to the WebRTC session via the Daily transport.
|
||||
|
||||
### Pre-cache an LLM response
|
||||
Sometimes LLMs can be slower than we’d like for natural-feeling communication. Here’s an example where we take advantage of the time it takes to speak some pre-defined text to get a head start on the LLM response:
|
||||
|
||||
(TK link to 04- sample)
|
||||
|
||||
In this sample, we set up a buffer queue to receive the audio frames from the LLM response before while we are joining the call and start an asynchronous task to start filling this buffer:
|
||||
```
|
||||
buffer_queue = asyncio.Queue()
|
||||
llm_response_task = asyncio.create_task(
|
||||
elevenlabs_tts.run_to_queue(
|
||||
buffer_queue,
|
||||
llm.run([QueueFrame(FrameType.LLM_MESSAGE, messages)]),
|
||||
True,
|
||||
)
|
||||
)
|
||||
```
|
||||
|
||||
Then, when we’ve joined the call, we speak the static text:
|
||||
```
|
||||
await azure_tts.say("My friend...", transport.send_queue)
|
||||
```
|
||||
|
||||
As that text is being spoken, the asynchronous LLM task continues in the background. When the text is done, we pull the frames off the buffer queue and put them in the transport’s `send_queue`:
|
||||
```
|
||||
async def buffer_to_send_queue():
|
||||
while True:
|
||||
frame = await buffer_queue.get()
|
||||
await transport.send_queue.put(frame)
|
||||
buffer_queue.task_done()
|
||||
if frame.frame_type == FrameType.END_STREAM:
|
||||
break
|
||||
|
||||
await asyncio.gather(llm_response_task, buffer_to_send_queue())
|
||||
|
||||
```
|
||||
|
||||
One thing to note here is the last parameter to `run_to_queue` in the first code clause above: this causes the `run_to_queue` method to send an `END_STREAM` frame when it’s done rendering. This lets us know when to stop our `buffer_to_send_queue` task above.
|
||||
|
||||
@@ -14,7 +14,9 @@ dependencies = [
|
||||
"google-cloud-texttospeech",
|
||||
"azure-cognitiveservices-speech",
|
||||
"pyht",
|
||||
"opentelemetry-sdk"
|
||||
"opentelemetry-sdk",
|
||||
"aiohttp",
|
||||
"fal"
|
||||
]
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
|
||||
@@ -9,7 +9,7 @@ from queue import Queue, PriorityQueue, Empty
|
||||
from threading import Event, Semaphore, Thread
|
||||
from typing import Any, Generator, Iterator, Optional, Type
|
||||
|
||||
from dailyai.output_queue import OutputQueueFrame, FrameType
|
||||
from dailyai.queue_frame import QueueFrame, FrameType
|
||||
from dailyai.message_handler.message_handler import MessageHandler
|
||||
from dailyai.services.ai_services import AIServiceConfig
|
||||
|
||||
@@ -268,10 +268,10 @@ class LLMResponse(OrchestratorResponse):
|
||||
if out.strip():
|
||||
yield out.strip()
|
||||
|
||||
def get_frames_from_tts_response(self, audio_frame) -> list[OutputQueueFrame]:
|
||||
return [OutputQueueFrame(FrameType.AUDIO_FRAME, audio_frame)]
|
||||
def get_frames_from_tts_response(self, audio_frame) -> list[QueueFrame]:
|
||||
return [QueueFrame(FrameType.AUDIO, audio_frame)]
|
||||
|
||||
def get_frames_from_chunk(self, chunk) -> Generator[list[OutputQueueFrame], Any, None]:
|
||||
def get_frames_from_chunk(self, chunk) -> Generator[list[QueueFrame], Any, None]:
|
||||
for audio_frame in self.services.tts.run_tts(chunk):
|
||||
yield self.get_frames_from_tts_response(audio_frame)
|
||||
|
||||
@@ -317,7 +317,7 @@ class LLMResponse(OrchestratorResponse):
|
||||
break
|
||||
|
||||
if not self.has_sent_first_frame:
|
||||
self.output_queue.put(OutputQueueFrame(FrameType.START_STREAM, None))
|
||||
self.output_queue.put(QueueFrame(FrameType.START_STREAM, None))
|
||||
self.has_sent_first_frame = True
|
||||
|
||||
for frame in frames:
|
||||
|
||||
@@ -15,7 +15,7 @@ from dailyai.async_processor.async_processor import (
|
||||
OrchestratorResponse,
|
||||
LLMResponse,
|
||||
)
|
||||
from dailyai.output_queue import OutputQueueFrame, FrameType
|
||||
from dailyai.queue_frame import QueueFrame, FrameType
|
||||
from dailyai.services.ai_services import AIServiceConfig
|
||||
from dailyai.message_handler.message_handler import MessageHandler
|
||||
|
||||
@@ -197,7 +197,7 @@ class Orchestrator(EventHandler):
|
||||
self.logger.info("Camera thread stopped")
|
||||
|
||||
self.logger.info("Put stop in output queue")
|
||||
self.output_queue.put(OutputQueueFrame(FrameType.END_STREAM, None))
|
||||
self.output_queue.put(QueueFrame(FrameType.END_STREAM, None))
|
||||
|
||||
self.frame_consumer_thread.join()
|
||||
self.logger.info("Orchestrator stopped.")
|
||||
@@ -209,6 +209,9 @@ class Orchestrator(EventHandler):
|
||||
|
||||
def on_intro_finished(self, intro):
|
||||
self.logger.info(f"Introduction has finished")
|
||||
waiting = self.conversation_processors.waiting(self.services, self.message_handler, self.output_queue)
|
||||
waiting.prepare()
|
||||
waiting.play()
|
||||
|
||||
def on_response_played(self, response):
|
||||
response.finalize()
|
||||
@@ -364,7 +367,7 @@ class Orchestrator(EventHandler):
|
||||
all_audio_frames = bytearray()
|
||||
while True:
|
||||
try:
|
||||
frame:OutputQueueFrame = self.output_queue.get()
|
||||
frame:QueueFrame = self.output_queue.get()
|
||||
if frame.frame_type == FrameType.END_STREAM:
|
||||
self.logger.info("Stopping frame consumer thread")
|
||||
return
|
||||
@@ -372,7 +375,7 @@ class Orchestrator(EventHandler):
|
||||
# if interrupted, we just pull frames off the queue and discard them
|
||||
if not self.is_interrupted.is_set():
|
||||
if frame:
|
||||
if frame.frame_type == FrameType.AUDIO_FRAME:
|
||||
if frame.frame_type == FrameType.AUDIO:
|
||||
chunk = frame.frame_data
|
||||
|
||||
all_audio_frames.extend(chunk)
|
||||
@@ -382,7 +385,7 @@ class Orchestrator(EventHandler):
|
||||
if l:
|
||||
self.mic.write_frames(bytes(b[:l]))
|
||||
b = b[l:]
|
||||
elif frame.frame_type == FrameType.IMAGE_FRAME:
|
||||
elif frame.frame_type == FrameType.IMAGE:
|
||||
self.set_image(frame.frame_data)
|
||||
elif len(b):
|
||||
self.mic.write_frames(bytes(b))
|
||||
|
||||
@@ -1,14 +0,0 @@
|
||||
from enum import Enum
|
||||
from dataclasses import dataclass
|
||||
|
||||
class FrameType(Enum):
|
||||
AUDIO_FRAME = 1
|
||||
IMAGE_FRAME = 2
|
||||
START_STREAM = 3
|
||||
END_STREAM = 4
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class OutputQueueFrame:
|
||||
frame_type: FrameType
|
||||
frame_data: bytes
|
||||
47
src/dailyai/queue_aggregators.py
Normal file
@@ -0,0 +1,47 @@
|
||||
import asyncio
|
||||
|
||||
from dailyai.queue_frame import LLMMessagesQueueFrame, QueueFrame, TextQueueFrame
|
||||
from dailyai.services.ai_services import AIService
|
||||
|
||||
from typing import AsyncGenerator, List
|
||||
|
||||
class QueueTee:
|
||||
async def run_to_queue_and_generate(
|
||||
self,
|
||||
output_queue: asyncio.Queue,
|
||||
generator: AsyncGenerator[QueueFrame, None]
|
||||
) -> AsyncGenerator[QueueFrame, None]:
|
||||
async for frame in generator:
|
||||
await output_queue.put(frame)
|
||||
yield frame
|
||||
|
||||
async def run_to_queues(
|
||||
self,
|
||||
output_queues: List[asyncio.Queue],
|
||||
generator: AsyncGenerator[QueueFrame, None]
|
||||
):
|
||||
async for frame in generator:
|
||||
for queue in output_queues:
|
||||
await queue.put(frame)
|
||||
|
||||
class LLMContextAggregator(AIService):
|
||||
def __init__(self, messages: list[dict], role:str, bot_participant_id=None):
|
||||
self.messages = messages
|
||||
self.bot_participant_id = bot_participant_id
|
||||
self.role = role
|
||||
self.sentence = ""
|
||||
|
||||
async def process_frame(self, frame:QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
content: str = ""
|
||||
|
||||
# TODO: split up transcription by participant
|
||||
if isinstance(frame, TextQueueFrame):
|
||||
content = frame.text
|
||||
|
||||
self.sentence += content
|
||||
if self.sentence.endswith((".", "?", "!")):
|
||||
self.messages.append({"role": self.role, "content": self.sentence})
|
||||
self.sentence = ""
|
||||
yield LLMMessagesQueueFrame(self.messages)
|
||||
|
||||
yield frame
|
||||
38
src/dailyai/queue_frame.py
Normal file
@@ -0,0 +1,38 @@
|
||||
from enum import Enum
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
class QueueFrame:
|
||||
pass
|
||||
|
||||
class StartStreamQueueFrame(QueueFrame):
|
||||
pass
|
||||
|
||||
class EndStreamQueueFrame(QueueFrame):
|
||||
pass
|
||||
|
||||
@dataclass()
|
||||
class AudioQueueFrame(QueueFrame):
|
||||
data: bytes
|
||||
|
||||
@dataclass()
|
||||
class ImageQueueFrame(QueueFrame):
|
||||
url: str | None
|
||||
image: bytes
|
||||
|
||||
@dataclass()
|
||||
class TextQueueFrame(QueueFrame):
|
||||
text: str
|
||||
|
||||
@dataclass()
|
||||
class TranscriptionQueueFrame(TextQueueFrame):
|
||||
participantId: str
|
||||
timestamp: str
|
||||
|
||||
@dataclass()
|
||||
class LLMMessagesQueueFrame(QueueFrame):
|
||||
messages: list[dict[str,str]] # TODO: define this more concretely!
|
||||
|
||||
class AppMessageQueueFrame(QueueFrame):
|
||||
message: Any
|
||||
participantId: str
|
||||
@@ -1,2 +1,2 @@
|
||||
Pillow==10.1.0
|
||||
typing_extensions==4.9.0
|
||||
typing_extensions==4.9.0
|
||||
@@ -1,53 +1,155 @@
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from dailyai.queue_frame import (
|
||||
AudioQueueFrame,
|
||||
EndStreamQueueFrame,
|
||||
ImageQueueFrame,
|
||||
LLMMessagesQueueFrame,
|
||||
QueueFrame,
|
||||
TextQueueFrame,
|
||||
)
|
||||
|
||||
from abc import abstractmethod
|
||||
from typing import AsyncGenerator, AsyncIterable, Iterable
|
||||
from dataclasses import dataclass
|
||||
from typing import Generator
|
||||
from PIL import Image
|
||||
|
||||
|
||||
class AIService:
|
||||
|
||||
def __init__(self):
|
||||
self.logger = logging.getLogger("dailyai")
|
||||
|
||||
def close(self):
|
||||
def stop(self):
|
||||
pass
|
||||
|
||||
async def run_to_queue(self, queue: asyncio.Queue, frames, add_end_of_stream=False) -> None:
|
||||
async for frame in self.run(frames):
|
||||
await queue.put(frame)
|
||||
|
||||
if add_end_of_stream:
|
||||
await queue.put(EndStreamQueueFrame())
|
||||
|
||||
async def run(
|
||||
self,
|
||||
frames: Iterable[QueueFrame]
|
||||
| AsyncIterable[QueueFrame]
|
||||
| asyncio.Queue[QueueFrame],
|
||||
) -> AsyncGenerator[QueueFrame, None]:
|
||||
try:
|
||||
if isinstance(frames, AsyncIterable):
|
||||
async for frame in frames:
|
||||
async for output_frame in self.process_frame(frame):
|
||||
yield output_frame
|
||||
elif isinstance(frames, Iterable):
|
||||
for frame in frames:
|
||||
async for output_frame in self.process_frame(frame):
|
||||
yield output_frame
|
||||
elif isinstance(frames, asyncio.Queue):
|
||||
while True:
|
||||
frame = await frames.get()
|
||||
async for output_frame in self.process_frame(frame):
|
||||
yield output_frame
|
||||
if isinstance(frame, EndStreamQueueFrame):
|
||||
break
|
||||
else:
|
||||
raise Exception("Frames must be an iterable or async iterable")
|
||||
|
||||
async for output_frame in self.finalize():
|
||||
yield output_frame
|
||||
except Exception as e:
|
||||
self.logger.error("Exception occurred while running AI service", e)
|
||||
raise e
|
||||
|
||||
@abstractmethod
|
||||
async def process_frame(self, frame:QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
# This is a trick for the interpreter (and linter) to know that this is a generator.
|
||||
if False:
|
||||
yield QueueFrame()
|
||||
|
||||
@abstractmethod
|
||||
async def finalize(self) -> AsyncGenerator[QueueFrame, None]:
|
||||
# This is a trick for the interpreter (and linter) to know that this is a generator.
|
||||
if False:
|
||||
yield QueueFrame()
|
||||
|
||||
class LLMService(AIService):
|
||||
# Generate a set of responses to a prompt. Yields a list of responses.
|
||||
@abstractmethod
|
||||
def run_llm_async(
|
||||
self, messages
|
||||
) -> Generator[str, None, None]:
|
||||
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
|
||||
yield ""
|
||||
|
||||
@abstractmethod
|
||||
async def run_llm(self, messages) -> str:
|
||||
pass
|
||||
|
||||
# Generate a responses to a prompt. Returns the response
|
||||
@abstractmethod
|
||||
def run_llm(
|
||||
self, messages
|
||||
) -> str or None:
|
||||
pass
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
if isinstance(frame, LLMMessagesQueueFrame):
|
||||
async for text_chunk in self.run_llm_async(frame.messages):
|
||||
yield TextQueueFrame(text_chunk)
|
||||
|
||||
|
||||
class TTSService(AIService):
|
||||
def __init__(self, aggregate_sentences=True):
|
||||
super().__init__()
|
||||
self.aggregate_sentences: bool = aggregate_sentences
|
||||
self.current_sentence: str = ""
|
||||
|
||||
# Some TTS services require a specific sample rate. We default to 16k
|
||||
def get_mic_sample_rate(self):
|
||||
return 16000
|
||||
|
||||
# Converts the sentence to audio. Yields a list of audio frames that can
|
||||
# Converts the text to audio. Yields a list of audio frames that can
|
||||
# be sent to the microphone device
|
||||
@abstractmethod
|
||||
def run_tts(self, sentence) -> Generator[bytes, None, None]:
|
||||
pass
|
||||
async def run_tts(self, text) -> AsyncGenerator[bytes, None]:
|
||||
# yield empty bytes here, so linting can infer what this method does
|
||||
yield bytes()
|
||||
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
if not isinstance(frame, TextQueueFrame):
|
||||
yield frame
|
||||
return
|
||||
|
||||
text: str | None = None
|
||||
if not self.aggregate_sentences:
|
||||
text = frame.text
|
||||
else:
|
||||
self.current_sentence += frame.text
|
||||
if self.current_sentence.endswith((".", "?", "!")):
|
||||
text = self.current_sentence
|
||||
self.current_sentence = ""
|
||||
|
||||
if text:
|
||||
async for audio_chunk in self.run_tts(text):
|
||||
yield AudioQueueFrame(audio_chunk)
|
||||
|
||||
async def finalize(self):
|
||||
if self.current_sentence:
|
||||
async for audio_chunk in self.run_tts(self.current_sentence):
|
||||
yield AudioQueueFrame(audio_chunk)
|
||||
|
||||
# Convenience function to send the audio for a sentence to the given queue
|
||||
async def say(self, sentence, queue: asyncio.Queue):
|
||||
await self.run_to_queue(queue, [TextQueueFrame(sentence)])
|
||||
|
||||
|
||||
class ImageGenService(AIService):
|
||||
def __init__(self, image_size, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.image_size = image_size
|
||||
|
||||
# Renders the image. Returns an Image object.
|
||||
@abstractmethod
|
||||
def run_image_gen(self, sentence) -> tuple[str, Image.Image]:
|
||||
async def run_image_gen(self, sentence:str) -> tuple[str, bytes]:
|
||||
pass
|
||||
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
if not isinstance(frame, TextQueueFrame):
|
||||
return
|
||||
|
||||
(url, image_data) = await self.run_image_gen(frame.text)
|
||||
yield ImageQueueFrame(url, image_data)
|
||||
|
||||
|
||||
@dataclass
|
||||
class AIServiceConfig:
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
import json
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import io
|
||||
from openai import AzureOpenAI
|
||||
import json
|
||||
from openai import AsyncAzureOpenAI
|
||||
|
||||
import os
|
||||
import requests
|
||||
|
||||
from typing import Generator
|
||||
from collections.abc import AsyncGenerator
|
||||
|
||||
from dailyai.services.ai_services import LLMService, TTSService, ImageGenService
|
||||
from PIL import Image
|
||||
@@ -23,7 +25,7 @@ class AzureTTSService(TTSService):
|
||||
self.speech_config = SpeechConfig(subscription=speech_key, region=speech_region)
|
||||
self.speech_synthesizer = SpeechSynthesizer(speech_config=self.speech_config, audio_config=None)
|
||||
|
||||
def run_tts(self, sentence) -> Generator[bytes, None, None]:
|
||||
async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]:
|
||||
self.logger.info("Running azure tts")
|
||||
ssml = "<speak version='1.0' xml:lang='en-US' xmlns='http://www.w3.org/2001/10/synthesis' " \
|
||||
"xmlns:mstts='http://www.w3.org/2001/mstts'>" \
|
||||
@@ -33,7 +35,10 @@ class AzureTTSService(TTSService):
|
||||
"<prosody rate='1.05'>" \
|
||||
f"{sentence}" \
|
||||
"</prosody></mstts:express-as></voice></speak> "
|
||||
result = self.speech_synthesizer.speak_ssml(ssml)
|
||||
try:
|
||||
result = await asyncio.to_thread(self.speech_synthesizer.speak_ssml, (ssml))
|
||||
except Exception as e:
|
||||
self.logger.error("Error in azure tts", e)
|
||||
self.logger.info("Got azure tts result")
|
||||
if result.reason == ResultReason.SynthesizingAudioCompleted:
|
||||
self.logger.info("Returning result")
|
||||
@@ -49,45 +54,91 @@ class AzureLLMService(LLMService):
|
||||
def __init__(self, api_key=None, azure_endpoint=None, api_version=None, model=None):
|
||||
super().__init__()
|
||||
api_key = api_key or os.getenv("AZURE_CHATGPT_KEY")
|
||||
|
||||
azure_endpoint = azure_endpoint or os.getenv("AZURE_CHATGPT_ENDPOINT")
|
||||
if not azure_endpoint:
|
||||
raise Exception("No azure endpoint specified for Azure LLM, please set AZURE_CHATGPT_ENDPOINT in the environment or pass it to the AzureLLMService constructor")
|
||||
|
||||
model: str | None = model or os.getenv("AZURE_CHATGPT_DEPLOYMENT_ID")
|
||||
if not model:
|
||||
raise Exception("No model specified for Azure LLM, please set AZURE_CHATGPT_DEPLOYMENT_ID in the environment or pass it to the AzureLLMService constructor")
|
||||
self.model: str = model
|
||||
|
||||
api_version = api_version or "2023-12-01-preview"
|
||||
self.client = AzureOpenAI(
|
||||
self.client = AsyncAzureOpenAI(
|
||||
api_key=api_key,
|
||||
azure_endpoint=azure_endpoint,
|
||||
api_version=api_version,
|
||||
)
|
||||
self.model = model or os.getenv("AZURE_CHATGPT_DEPLOYMENT_ID")
|
||||
|
||||
def get_response(self, messages, stream):
|
||||
return self.client.chat.completions.create(
|
||||
stream=stream,
|
||||
messages=messages,
|
||||
model=self.model,
|
||||
)
|
||||
|
||||
def run_llm_async(self, messages) -> Generator[str, None, None]:
|
||||
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
|
||||
messages_for_log = json.dumps(messages)
|
||||
self.logger.debug(f"Generating chat via azure: {messages_for_log}")
|
||||
|
||||
response = self.get_response(messages, stream=True)
|
||||
|
||||
for chunk in response:
|
||||
chunks = await self.client.chat.completions.create(model=self.model, stream=True, messages=messages)
|
||||
async for chunk in chunks:
|
||||
if len(chunk.choices) == 0:
|
||||
continue
|
||||
|
||||
if chunk.choices[0].delta.content:
|
||||
yield chunk.choices[0].delta.content
|
||||
|
||||
def run_llm(self, messages) -> str | None:
|
||||
async def run_llm(self, messages) -> str | None:
|
||||
messages_for_log = json.dumps(messages)
|
||||
self.logger.debug(f"Generating chat via azure: {messages_for_log}")
|
||||
|
||||
response = self.get_response(messages, stream=False)
|
||||
response = await self.client.chat.completions.create(model=self.model, stream=False, messages=messages)
|
||||
if response and len(response.choices) > 0:
|
||||
return response.choices[0].message.content
|
||||
else:
|
||||
return None
|
||||
|
||||
class AzureImageGenServiceREST(ImageGenService):
|
||||
|
||||
def __init__(self, image_size:str, api_key=None, azure_endpoint=None, api_version=None, model=None):
|
||||
super().__init__(image_size=image_size)
|
||||
self.api_key = api_key or os.getenv("AZURE_DALLE_KEY")
|
||||
self.azure_endpoint = azure_endpoint or os.getenv("AZURE_DALLE_ENDPOINT")
|
||||
self.api_version = api_version or "2023-06-01-preview"
|
||||
self.model = model or os.getenv("AZURE_DALLE_DEPLOYMENT_ID")
|
||||
|
||||
async def run_image_gen(self, sentence) -> tuple[str, bytes]:
|
||||
# TODO hoist the session to app-level
|
||||
async with aiohttp.ClientSession() as session:
|
||||
url = f"{self.azure_endpoint}openai/images/generations:submit?api-version={self.api_version}"
|
||||
headers= { "api-key": self.api_key, "Content-Type": "application/json" }
|
||||
body = {
|
||||
# Enter your prompt text here
|
||||
"prompt": sentence,
|
||||
"size": self.image_size,
|
||||
"n": 1,
|
||||
}
|
||||
async with session.post(url, headers=headers, json=body) as submission:
|
||||
operation_location = submission.headers['operation-location']
|
||||
|
||||
status = ""
|
||||
attempts_left = 120
|
||||
json_response = None
|
||||
while status != "succeeded":
|
||||
attempts_left -= 1
|
||||
if attempts_left == 0:
|
||||
raise Exception("Image generation timed out")
|
||||
|
||||
await asyncio.sleep(1)
|
||||
response = await session.get(operation_location, headers=headers)
|
||||
json_response = await response.json()
|
||||
status = json_response["status"]
|
||||
|
||||
image_url = json_response["result"]["data"][0]["url"] if json_response else None
|
||||
if not image_url:
|
||||
raise Exception("Image generation failed")
|
||||
|
||||
# Load the image from the url
|
||||
async with session.get(image_url) as response:
|
||||
image_stream = io.BytesIO(await response.content.read())
|
||||
image = Image.open(image_stream)
|
||||
return (image_url, image.tobytes())
|
||||
|
||||
|
||||
class AzureImageGenService(ImageGenService):
|
||||
|
||||
@@ -96,7 +147,7 @@ class AzureImageGenService(ImageGenService):
|
||||
|
||||
api_key = api_key or os.getenv("AZURE_DALLE_KEY")
|
||||
azure_endpoint = azure_endpoint or os.getenv("AZURE_DALLE_ENDPOINT")
|
||||
api_version = api_version or "2023-12-01-preview"
|
||||
api_version = api_version or "2023-06-01-preview"
|
||||
self.model = model or os.getenv("AZURE_DALLE_DEPLOYMENT_ID")
|
||||
|
||||
self.client = AzureOpenAI(
|
||||
@@ -105,20 +156,20 @@ class AzureImageGenService(ImageGenService):
|
||||
api_version=api_version,
|
||||
)
|
||||
|
||||
def run_image_gen(self, sentence) -> tuple[str, Image.Image]:
|
||||
async def run_image_gen(self, sentence) -> tuple[str, bytes]:
|
||||
self.logger.info("Generating azure image", sentence)
|
||||
|
||||
image = self.client.images.generate(
|
||||
model=self.model,
|
||||
prompt=sentence,
|
||||
n=1,
|
||||
size=f"1024x1024",
|
||||
size=self.image_size,
|
||||
)
|
||||
|
||||
url = image["data"][0]["url"]
|
||||
response = requests.get(url)
|
||||
|
||||
dalle_stream = io.BytesIO(response.content)
|
||||
dalle_im = Image.open(dalle_stream)
|
||||
dalle_im = Image.open(dalle_stream.tobytes())
|
||||
|
||||
return (url, dalle_im)
|
||||
|
||||
378
src/dailyai/services/daily_transport_service.py
Normal file
@@ -0,0 +1,378 @@
|
||||
import asyncio
|
||||
import inspect
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
import types
|
||||
|
||||
from functools import partial
|
||||
from queue import Queue, Empty
|
||||
|
||||
from dailyai.queue_frame import (
|
||||
AudioQueueFrame,
|
||||
EndStreamQueueFrame,
|
||||
ImageQueueFrame,
|
||||
QueueFrame,
|
||||
StartStreamQueueFrame,
|
||||
TranscriptionQueueFrame,
|
||||
)
|
||||
|
||||
from threading import Thread, Event
|
||||
|
||||
from daily import (
|
||||
EventHandler,
|
||||
CallClient,
|
||||
Daily,
|
||||
VirtualCameraDevice,
|
||||
VirtualMicrophoneDevice,
|
||||
VirtualSpeakerDevice,
|
||||
)
|
||||
|
||||
class DailyTransportService(EventHandler):
|
||||
_daily_initialized = False
|
||||
_lock = threading.Lock()
|
||||
def __init__(
|
||||
self,
|
||||
room_url: str,
|
||||
token: str | None,
|
||||
bot_name: str,
|
||||
duration: float = 10,
|
||||
min_others_count: int = 1,
|
||||
):
|
||||
super().__init__()
|
||||
self.bot_name: str = bot_name
|
||||
self.room_url: str = room_url
|
||||
self.token: str | None = token
|
||||
self.duration: float = duration
|
||||
self.expiration = time.time() + duration * 60
|
||||
self.min_others_count = min_others_count
|
||||
|
||||
# This queue is used to marshal frames from the async send queue to the thread that emits audio & video.
|
||||
# We need this to maintain the asynchronous behavior of asyncio queues -- to give async functions
|
||||
# a chance to run while waiting for queue items -- but also to maintain thread safety and have a threaded
|
||||
# handler to send frames, to ensure that sending isn't subject to pauses in the async thread.
|
||||
self.threadsafe_send_queue = Queue()
|
||||
|
||||
self.is_interrupted = Event()
|
||||
self.stop_threads = Event()
|
||||
self.story_started = False
|
||||
self.mic_enabled = False
|
||||
self.mic_sample_rate = 16000
|
||||
self.camera_width = 1024
|
||||
self.camera_height = 768
|
||||
self.camera_enabled = False
|
||||
|
||||
self.send_queue = asyncio.Queue(maxsize=5)
|
||||
self.receive_queue = asyncio.Queue()
|
||||
|
||||
self.other_participant_has_joined = False
|
||||
self.my_participant_id = None
|
||||
|
||||
self.camera_thread = None
|
||||
self.frame_consumer_thread = None
|
||||
|
||||
self.transcription_settings = {
|
||||
"language": "en",
|
||||
"tier": "nova",
|
||||
"model": "2-conversationalai",
|
||||
"profanity_filter": True,
|
||||
"redact": False,
|
||||
"extra": {
|
||||
"endpointing": True,
|
||||
"punctuate": False,
|
||||
},
|
||||
}
|
||||
|
||||
self.logger: logging.Logger = logging.getLogger("dailyai")
|
||||
|
||||
self.event_handlers = {}
|
||||
|
||||
try:
|
||||
self.loop = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
self.loop = None
|
||||
|
||||
def patch_method(self, event_name, *args, **kwargs):
|
||||
try:
|
||||
for handler in self.event_handlers[event_name]:
|
||||
if inspect.iscoroutinefunction(handler):
|
||||
if self.loop:
|
||||
asyncio.run_coroutine_threadsafe(handler(*args, **kwargs), self.loop)
|
||||
else:
|
||||
raise Exception("No event loop to run coroutine. In order to use async event handlers, you must run the DailyTransportService in an asyncio event loop.")
|
||||
else:
|
||||
handler(*args, **kwargs)
|
||||
except Exception as e:
|
||||
self.logger.error(f"Exception in event handler {event_name}: {e}")
|
||||
|
||||
def add_event_handler(self, event_name: str, handler):
|
||||
if not event_name.startswith("on_"):
|
||||
raise Exception(f"Event handler {event_name} must start with 'on_'")
|
||||
|
||||
methods = inspect.getmembers(self, predicate=inspect.ismethod)
|
||||
if event_name not in [method[0] for method in methods]:
|
||||
raise Exception(f"Event handler {event_name} not found")
|
||||
|
||||
if not event_name in self.event_handlers:
|
||||
self.event_handlers[event_name] = [getattr(self, event_name), types.MethodType(handler, self)]
|
||||
setattr(self, event_name, partial(self.patch_method, event_name))
|
||||
else:
|
||||
self.event_handlers[event_name].append(types.MethodType(handler, self))
|
||||
|
||||
def event_handler(self, event_name: str):
|
||||
def decorator(handler):
|
||||
self.add_event_handler(event_name, handler)
|
||||
return handler
|
||||
|
||||
return decorator
|
||||
|
||||
def configure_daily(self):
|
||||
# Only initialize Daily once
|
||||
if not DailyTransportService._daily_initialized:
|
||||
with DailyTransportService._lock:
|
||||
Daily.init()
|
||||
DailyTransportService._daily_initialized = True
|
||||
self.client = CallClient(event_handler=self)
|
||||
|
||||
if self.mic_enabled:
|
||||
self.mic: VirtualMicrophoneDevice = Daily.create_microphone_device(
|
||||
"mic", sample_rate=self.mic_sample_rate, channels=1
|
||||
)
|
||||
|
||||
if self.camera_enabled:
|
||||
self.camera: VirtualCameraDevice = Daily.create_camera_device(
|
||||
"camera", width=self.camera_width, height=self.camera_height, color_format="RGB"
|
||||
)
|
||||
|
||||
self.speaker: VirtualSpeakerDevice = Daily.create_speaker_device(
|
||||
"speaker", sample_rate=16000, channels=1
|
||||
)
|
||||
|
||||
self.image: bytes | None = None
|
||||
self.camera_thread = Thread(target=self.run_camera, daemon=True)
|
||||
self.camera_thread.start()
|
||||
|
||||
self.logger.info("Starting frame consumer thread")
|
||||
self.frame_consumer_thread = Thread(target=self.frame_consumer, daemon=True)
|
||||
self.frame_consumer_thread.start()
|
||||
|
||||
Daily.select_speaker_device("speaker")
|
||||
|
||||
self.client.set_user_name(self.bot_name)
|
||||
self.client.join(self.room_url, self.token, completion=self.call_joined)
|
||||
self.my_participant_id = self.client.participants()["local"]["id"]
|
||||
|
||||
self.client.update_inputs(
|
||||
{
|
||||
"camera": {
|
||||
"isEnabled": True,
|
||||
"settings": {
|
||||
"deviceId": "camera",
|
||||
},
|
||||
},
|
||||
"microphone": {
|
||||
"isEnabled": True,
|
||||
"settings": {
|
||||
"deviceId": "mic",
|
||||
"customConstraints": {
|
||||
"autoGainControl": {"exact": False},
|
||||
"echoCancellation": {"exact": False},
|
||||
"noiseSuppression": {"exact": False},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
self.client.update_publishing(
|
||||
{
|
||||
"camera": {
|
||||
"sendSettings": {
|
||||
"maxQuality": "low",
|
||||
"encodings": {
|
||||
"low": {
|
||||
"maxBitrate": 250000,
|
||||
"scaleResolutionDownBy": 1.333,
|
||||
"maxFramerate": 8,
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
if self.token:
|
||||
self.client.start_transcription(self.transcription_settings)
|
||||
|
||||
async def get_receive_frames(self):
|
||||
while True:
|
||||
frame = await self.receive_queue.get()
|
||||
yield frame
|
||||
if isinstance(frame, EndStreamQueueFrame):
|
||||
break
|
||||
|
||||
def get_async_send_queue(self):
|
||||
return self.send_queue
|
||||
|
||||
async def marshal_frames(self):
|
||||
while True:
|
||||
frame: QueueFrame | list = await self.send_queue.get()
|
||||
self.threadsafe_send_queue.put(frame)
|
||||
self.send_queue.task_done()
|
||||
if isinstance(frame, EndStreamQueueFrame):
|
||||
break
|
||||
|
||||
async def wait_for_send_queue_to_empty(self):
|
||||
await self.send_queue.join()
|
||||
self.threadsafe_send_queue.join()
|
||||
|
||||
async def stop_when_done(self):
|
||||
await self.wait_for_send_queue_to_empty()
|
||||
self.stop()
|
||||
|
||||
async def run(self) -> None:
|
||||
self.configure_daily()
|
||||
|
||||
self.do_shutdown = False
|
||||
|
||||
async_output_queue_marshal_task = asyncio.create_task(self.marshal_frames())
|
||||
|
||||
try:
|
||||
participant_count: int = len(self.client.participants())
|
||||
self.logger.info(f"{participant_count} participants in room")
|
||||
while time.time() < self.expiration and not self.do_shutdown and not self.stop_threads.is_set():
|
||||
await asyncio.sleep(1)
|
||||
except Exception as e:
|
||||
self.logger.error(f"Exception {e}")
|
||||
finally:
|
||||
self.client.leave()
|
||||
|
||||
self.stop_threads.set()
|
||||
|
||||
await self.receive_queue.put(EndStreamQueueFrame())
|
||||
await self.send_queue.put(EndStreamQueueFrame())
|
||||
await async_output_queue_marshal_task
|
||||
|
||||
if self.camera_thread and self.camera_thread.is_alive():
|
||||
self.camera_thread.join()
|
||||
if self.frame_consumer_thread and self.frame_consumer_thread.is_alive():
|
||||
self.frame_consumer_thread.join()
|
||||
|
||||
def stop(self):
|
||||
self.stop_threads.set()
|
||||
|
||||
def on_first_other_participant_joined(self):
|
||||
pass
|
||||
|
||||
def call_joined(self, join_data, client_error):
|
||||
self.logger.info(f"Call_joined: {join_data}, {client_error}")
|
||||
|
||||
def on_error(self, error):
|
||||
self.logger.error(f"on_error: {error}")
|
||||
|
||||
def on_call_state_updated(self, state):
|
||||
pass
|
||||
|
||||
def on_participant_joined(self, participant):
|
||||
if not self.other_participant_has_joined and participant["id"] != self.my_participant_id:
|
||||
self.other_participant_has_joined = True
|
||||
self.on_first_other_participant_joined()
|
||||
|
||||
def on_participant_left(self, participant, reason):
|
||||
if len(self.client.participants()) < self.min_others_count + 1:
|
||||
self.do_shutdown = True
|
||||
pass
|
||||
|
||||
def on_app_message(self, message, sender):
|
||||
pass
|
||||
|
||||
def on_transcription_message(self, message:dict):
|
||||
if self.loop:
|
||||
participantId = ""
|
||||
if "participantId" in message:
|
||||
participantId = message["participantId"]
|
||||
elif "session_id" in message:
|
||||
participantId = message["session_id"]
|
||||
frame = TranscriptionQueueFrame(message["text"], participantId, message["timestamp"])
|
||||
asyncio.run_coroutine_threadsafe(self.receive_queue.put(frame), self.loop)
|
||||
|
||||
def on_transcription_stopped(self, stopped_by, stopped_by_error):
|
||||
pass
|
||||
|
||||
def on_transcription_error(self, message):
|
||||
pass
|
||||
|
||||
def on_transcription_started(self, status):
|
||||
pass
|
||||
|
||||
def set_image(self, image: bytes):
|
||||
self.image: bytes | None = image
|
||||
|
||||
def run_camera(self):
|
||||
try:
|
||||
while not self.stop_threads.is_set():
|
||||
if self.image:
|
||||
self.camera.write_frame(self.image)
|
||||
|
||||
time.sleep(1.0 / 8) # 8 fps
|
||||
except Exception as e:
|
||||
self.logger.error(f"Exception {e} in camera thread.")
|
||||
|
||||
def frame_consumer(self):
|
||||
self.logger.info("🎬 Starting frame consumer thread")
|
||||
b = bytearray()
|
||||
smallest_write_size = 3200
|
||||
all_audio_frames = bytearray()
|
||||
while True:
|
||||
try:
|
||||
frames_or_frame: QueueFrame | list[QueueFrame] = self.threadsafe_send_queue.get()
|
||||
if isinstance(frames_or_frame, QueueFrame):
|
||||
frames: list[QueueFrame] = [frames_or_frame]
|
||||
elif isinstance(frames_or_frame, list):
|
||||
frames: list[QueueFrame] = frames_or_frame
|
||||
else:
|
||||
raise Exception("Unknown type in output queue")
|
||||
|
||||
for frame in frames:
|
||||
if isinstance(frame, EndStreamQueueFrame):
|
||||
self.logger.info("Stopping frame consumer thread")
|
||||
self.threadsafe_send_queue.task_done()
|
||||
return
|
||||
|
||||
# if interrupted, we just pull frames off the queue and discard them
|
||||
if not self.is_interrupted.is_set():
|
||||
if frame:
|
||||
if isinstance(frame, AudioQueueFrame):
|
||||
chunk = frame.data
|
||||
|
||||
all_audio_frames.extend(chunk)
|
||||
|
||||
b.extend(chunk)
|
||||
l = len(b) - (len(b) % smallest_write_size)
|
||||
if l:
|
||||
self.mic.write_frames(bytes(b[:l]))
|
||||
b = b[l:]
|
||||
elif isinstance(frame, ImageQueueFrame):
|
||||
self.set_image(frame.image)
|
||||
elif len(b):
|
||||
self.mic.write_frames(bytes(b))
|
||||
b = bytearray()
|
||||
else:
|
||||
if self.interrupt_time:
|
||||
self.logger.info(
|
||||
f"Lag to stop stream after interruption {time.perf_counter() - self.interrupt_time}"
|
||||
)
|
||||
self.interrupt_time = None
|
||||
|
||||
if isinstance(frame, StartStreamQueueFrame):
|
||||
self.is_interrupted.clear()
|
||||
|
||||
self.threadsafe_send_queue.task_done()
|
||||
except Empty:
|
||||
try:
|
||||
if len(b):
|
||||
self.mic.write_frames(bytes(b))
|
||||
except Exception as e:
|
||||
self.logger.error(f"Exception in frame_consumer: {e}, {len(b)}")
|
||||
|
||||
b = bytearray()
|
||||
29
src/dailyai/services/deepgram_ai_services.py
Normal file
@@ -0,0 +1,29 @@
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
import requests
|
||||
|
||||
from collections.abc import AsyncGenerator
|
||||
from dailyai.services.ai_services import TTSService
|
||||
|
||||
class DeepgramTTSService(TTSService):
|
||||
def __init__(self, speech_key=None, voice=None):
|
||||
super().__init__()
|
||||
|
||||
self.voice = voice or os.getenv("DEEPGRAM_VOICE") or "alpha-asteria-en-v2"
|
||||
self.speech_key = speech_key or os.getenv("DEEPGRAM_API_KEY")
|
||||
|
||||
def get_mic_sample_rate(self):
|
||||
return 24000
|
||||
|
||||
async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]:
|
||||
self.logger.info(f"Running deepgram tts for {sentence}")
|
||||
base_url = "https://api.beta.deepgram.com/v1/speak"
|
||||
request_url = f"{base_url}?model={self.voice}&encoding=linear16&container=none&sample_rate=16000"
|
||||
headers = {"authorization": f"token {self.speech_key}"}
|
||||
body = { "text": sentence }
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(request_url, headers=headers, json=body) as r:
|
||||
async for data in r.content:
|
||||
yield data
|
||||
@@ -1,38 +1,36 @@
|
||||
import aiohttp
|
||||
import os
|
||||
import requests
|
||||
import time
|
||||
|
||||
from typing import Generator
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from ..services.ai_services import TTSService
|
||||
from dailyai.services.ai_services import TTSService
|
||||
|
||||
|
||||
class ElevenLabsTTSService(TTSService):
|
||||
def __init__(self):
|
||||
def __init__(self, api_key=None, voice_id=None):
|
||||
super().__init__()
|
||||
|
||||
self.api_key = os.getenv("ELEVENLABS_API_KEY")
|
||||
self.voice_id = os.getenv("ELEVENLABS_VOICE_ID")
|
||||
self.api_key = api_key or os.getenv("ELEVENLABS_API_KEY")
|
||||
self.voice_id = voice_id or os.getenv("ELEVENLABS_VOICE_ID")
|
||||
|
||||
def run_tts(self, sentence) -> Generator[bytes, None, None]:
|
||||
url = f"https://api.elevenlabs.io/v1/text-to-speech/{self.voice_id}/stream"
|
||||
payload = {"text": sentence, "model_id": "eleven_turbo_v2"}
|
||||
querystring = {"output_format": "pcm_16000", "optimize_streaming_latency": 2}
|
||||
headers = {
|
||||
"xi-api-key": self.api_key,
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
url = f"https://api.elevenlabs.io/v1/text-to-speech/{self.voice_id}/stream"
|
||||
payload = {"text": sentence, "model_id": "eleven_turbo_v2"}
|
||||
querystring = {"output_format": "pcm_16000", "optimize_streaming_latency": 2}
|
||||
headers = {
|
||||
"xi-api-key": self.api_key,
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
async with session.post(url, json=payload, headers=headers, params=querystring) as r:
|
||||
if r.status != 200:
|
||||
self.logger.error(
|
||||
f"audio fetch status code: {r.status}, error: {r.text}"
|
||||
)
|
||||
return
|
||||
|
||||
r = requests.request(
|
||||
"POST", url, json=payload, headers=headers, params=querystring, stream=True
|
||||
)
|
||||
|
||||
if r.status_code != 200:
|
||||
self.logger.error(
|
||||
f"audio fetch status code: {r.status_code}, error: {r.text}"
|
||||
)
|
||||
return
|
||||
|
||||
for chunk in r.iter_content(chunk_size=3200):
|
||||
if chunk:
|
||||
yield chunk
|
||||
async for chunk in r.content:
|
||||
if chunk:
|
||||
yield chunk
|
||||
|
||||
48
src/dailyai/services/fal_ai_services.py
Normal file
@@ -0,0 +1,48 @@
|
||||
import fal
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import io
|
||||
import json
|
||||
from PIL import Image
|
||||
|
||||
|
||||
from dailyai.services.ai_services import LLMService, TTSService, ImageGenService
|
||||
# Fal expects FAL_KEY_ID and FAL_KEY_SECRET to be set in the env
|
||||
class FalImageGenService(ImageGenService):
|
||||
def __init__(self, image_size):
|
||||
super().__init__(image_size)
|
||||
|
||||
async def run_image_gen(self, sentence) -> tuple[str, bytes]:
|
||||
def get_image_url(sentence, size):
|
||||
print("starting fal submit...")
|
||||
handler = fal.apps.submit(
|
||||
"110602490-fast-sdxl",
|
||||
arguments={
|
||||
"prompt": sentence
|
||||
},
|
||||
)
|
||||
print("past fal handler init, about to wait for iter_events...")
|
||||
for event in handler.iter_events():
|
||||
if isinstance(event, fal.apps.InProgress):
|
||||
pass
|
||||
|
||||
result = handler.get()
|
||||
|
||||
image_url = result["images"][0]["url"] if result else None
|
||||
if not image_url:
|
||||
raise Exception("Image generation failed")
|
||||
|
||||
return image_url
|
||||
print(f"fetching image url...")
|
||||
image_url = await asyncio.to_thread(get_image_url, sentence, self.image_size)
|
||||
print(f"got image url, downloading image...")
|
||||
# Load the image from the url
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(image_url) as response:
|
||||
print("got image response")
|
||||
image_stream = io.BytesIO(await response.content.read())
|
||||
print("read image stream")
|
||||
image = Image.open(image_stream)
|
||||
return (image_url, image.tobytes())
|
||||
|
||||
# return (image_url, dalle_im.tobytes())
|
||||
@@ -1,67 +0,0 @@
|
||||
from dailyai.services.ai_services import AIService, TTSService, LLMService, ImageGenService
|
||||
from typing import Generator
|
||||
|
||||
import requests
|
||||
from PIL import Image
|
||||
import io
|
||||
from openai import OpenAI
|
||||
|
||||
import os
|
||||
import json
|
||||
|
||||
class OpenAILLMService(LLMService):
|
||||
def __init__(self, api_key=None, model=None):
|
||||
super().__init__()
|
||||
api_key = api_key or os.getenv("OPEN_AI_KEY")
|
||||
self.model = model or os.getenv("OPEN_AI_MODEL")
|
||||
self.client = OpenAI(api_key=api_key)
|
||||
|
||||
def get_response(self, messages, stream):
|
||||
return self.client.chat.completions.create(
|
||||
stream=stream,
|
||||
messages=messages,
|
||||
model=self.model
|
||||
)
|
||||
|
||||
def run_llm_async(self, messages) -> Generator[str, None, None]:
|
||||
messages_for_log = json.dumps(messages)
|
||||
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
|
||||
|
||||
response = self.get_response(messages, stream=True)
|
||||
|
||||
for chunk in response:
|
||||
if len(chunk.choices) == 0:
|
||||
continue
|
||||
|
||||
if chunk.choices[0].delta.content:
|
||||
yield chunk.choices[0].delta.content
|
||||
|
||||
def run_llm(self, messages) -> str | None:
|
||||
messages_for_log = json.dumps(messages)
|
||||
self.logger.debug(f"Generating chat via azure: {messages_for_log}")
|
||||
|
||||
response = self.get_response(messages, stream=False)
|
||||
if response and len(response.choices) > 0:
|
||||
return response.choices[0].message.content
|
||||
else:
|
||||
return None
|
||||
|
||||
class OpenAIImageGenService(ImageGenService):
|
||||
def __init__(self, api_key=None, model=None):
|
||||
super().__init__()
|
||||
api_key = api_key or os.getenv("OPEN_AI_KEY")
|
||||
self.model = model or os.getenv("OPEN_AI_MODEL")
|
||||
self.client = OpenAI(api_key=api_key)
|
||||
|
||||
def run_image_gen(self, sentence) -> tuple[str, Image.Image]:
|
||||
image = self.client.images.generate(
|
||||
prompt=sentence,
|
||||
n=1,
|
||||
size=f"1024x1024"
|
||||
)
|
||||
image_url = image.data[0].url
|
||||
response = requests.get(image_url)
|
||||
dalle_stream = io.BytesIO(response.content)
|
||||
dalle_im = Image.open(dalle_stream)
|
||||
|
||||
return (image_url, dalle_im)
|
||||
79
src/dailyai/services/open_ai_services.py
Normal file
@@ -0,0 +1,79 @@
|
||||
import requests
|
||||
import aiohttp
|
||||
import asyncio
|
||||
from PIL import Image
|
||||
import io
|
||||
from openai import AsyncOpenAI
|
||||
|
||||
import os
|
||||
import json
|
||||
from collections.abc import AsyncGenerator
|
||||
|
||||
from dailyai.services.ai_services import AIService, TTSService, LLMService, ImageGenService
|
||||
|
||||
|
||||
class OpenAILLMService(LLMService):
|
||||
def __init__(self, api_key=None, model=None):
|
||||
super().__init__()
|
||||
api_key = api_key or os.getenv("OPEN_AI_KEY")
|
||||
self.model = model or os.getenv("OPEN_AI_LLM_MODEL") or "gpt-4"
|
||||
self.client = AsyncOpenAI(api_key=api_key)
|
||||
|
||||
async def get_response(self, messages, stream):
|
||||
return await self.client.chat.completions.create(
|
||||
stream=stream,
|
||||
messages=messages,
|
||||
model=self.model
|
||||
)
|
||||
|
||||
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
|
||||
messages_for_log = json.dumps(messages)
|
||||
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
|
||||
|
||||
response = await self.get_response(messages, stream=True)
|
||||
|
||||
for chunk in response:
|
||||
if len(chunk.choices) == 0:
|
||||
continue
|
||||
|
||||
if chunk.choices[0].delta.content:
|
||||
yield chunk.choices[0].delta.content
|
||||
|
||||
async def run_llm(self, messages) -> str | None:
|
||||
messages_for_log = json.dumps(messages)
|
||||
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
|
||||
|
||||
response = await self.get_response(messages, stream=False)
|
||||
if response and len(response.choices) > 0:
|
||||
return response.choices[0].message.content
|
||||
else:
|
||||
return None
|
||||
|
||||
class OpenAIImageGenService(ImageGenService):
|
||||
def __init__(self, image_size:str, api_key=None, model=None):
|
||||
super().__init__(image_size=image_size)
|
||||
api_key = api_key or os.getenv("OPEN_AI_KEY")
|
||||
self.model = model or os.getenv("OPEN_AI_IMAGE_MODEL") or "dall-e-3"
|
||||
self.client = AsyncOpenAI(api_key=api_key)
|
||||
|
||||
async def run_image_gen(self, sentence) -> tuple[str, bytes]:
|
||||
self.logger.info("Generating OpenAI image", sentence)
|
||||
|
||||
image = await self.client.images.generate(
|
||||
prompt=sentence,
|
||||
model=self.model,
|
||||
n=1,
|
||||
size=self.image_size
|
||||
)
|
||||
image_url = image.data[0].url
|
||||
if not image_url:
|
||||
raise Exception("No image provided in response", image)
|
||||
|
||||
# Load the image from the url
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(image_url) as response:
|
||||
image_stream = io.BytesIO(await response.content.read())
|
||||
image = Image.open(image_stream)
|
||||
return (image_url, image.tobytes())
|
||||
|
||||
return (image_url, dalle_im.tobytes())
|
||||
@@ -20,7 +20,6 @@ class GoogleAIService(AIService):
|
||||
)
|
||||
|
||||
def run_tts(self, sentence):
|
||||
print("running google tts")
|
||||
synthesis_input = texttospeech.SynthesisInput(text = sentence.strip())
|
||||
result = self.client.synthesize_speech(input=synthesis_input, voice=self.voice, audio_config=self.audio_config)
|
||||
return result
|
||||
|
||||
@@ -13,7 +13,6 @@ class HuggingFaceAIService(AIService):
|
||||
# available models at https://huggingface.co/Helsinki-NLP (**not all models use 2-character language codes**)
|
||||
def run_text_translation(self, sentence, source_language, target_language):
|
||||
translator = pipeline(f"translation", model=f"Helsinki-NLP/opus-mt-{source_language}-{target_language}")
|
||||
print(translator(sentence))
|
||||
|
||||
return translator(sentence)[0]["translation_text"]
|
||||
|
||||
|
||||
48
src/dailyai/tests/test_ai_services.py
Normal file
@@ -0,0 +1,48 @@
|
||||
from re import A
|
||||
import unittest
|
||||
|
||||
from typing import AsyncGenerator, Generator
|
||||
|
||||
from dailyai.services.ai_services import AIService
|
||||
from dailyai.queue_frame import EndStreamQueueFrame, QueueFrame, TextQueueFrame
|
||||
|
||||
class SimpleAIService(AIService):
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
yield frame
|
||||
|
||||
class TestBaseAIService(unittest.IsolatedAsyncioTestCase):
|
||||
async def test_async_input(self):
|
||||
service = SimpleAIService()
|
||||
|
||||
input_frames = [
|
||||
TextQueueFrame("hello"),
|
||||
EndStreamQueueFrame()
|
||||
]
|
||||
async def iterate_frames() -> AsyncGenerator[QueueFrame, None]:
|
||||
for frame in input_frames:
|
||||
yield frame
|
||||
|
||||
output_frames = []
|
||||
async for frame in service.run(iterate_frames()):
|
||||
output_frames.append(frame)
|
||||
|
||||
self.assertEqual(input_frames, output_frames)
|
||||
|
||||
async def test_nonasync_input(self):
|
||||
service = SimpleAIService()
|
||||
|
||||
input_frames = [TextQueueFrame("hello"), EndStreamQueueFrame()]
|
||||
|
||||
def iterate_frames() -> Generator[QueueFrame, None, None]:
|
||||
for frame in input_frames:
|
||||
yield frame
|
||||
|
||||
output_frames = []
|
||||
async for frame in service.run(iterate_frames()):
|
||||
output_frames.append(frame)
|
||||
|
||||
self.assertEqual(input_frames, output_frames)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -11,14 +11,14 @@ from dailyai.async_processor.async_processor import (
|
||||
LLMResponse,
|
||||
)
|
||||
from dailyai.message_handler.message_handler import MessageHandler
|
||||
from dailyai.output_queue import OutputQueueFrame, FrameType
|
||||
from dailyai.queue_frame import QueueFrame, FrameType
|
||||
from dailyai.services.ai_services import (
|
||||
AIServiceConfig,
|
||||
ImageGenService,
|
||||
LLMService,
|
||||
TTSService,
|
||||
)
|
||||
|
||||
"""
|
||||
class MockTTSService(TTSService):
|
||||
def run_tts(self, sentence):
|
||||
for word in sentence.split(' '):
|
||||
@@ -71,7 +71,7 @@ class TestResponse(unittest.TestCase):
|
||||
output_queue.task_done()
|
||||
|
||||
while expected_words:
|
||||
actual_word:OutputQueueFrame = output_queue.get()
|
||||
actual_word:QueueFrame = output_queue.get()
|
||||
word = expected_words.pop(0)
|
||||
self.assertEqual(actual_word.frame_type, FrameType.AUDIO_FRAME)
|
||||
self.assertEqual(actual_word.frame_data, bytes(word, "utf-8"))
|
||||
@@ -127,7 +127,7 @@ class TestResponse(unittest.TestCase):
|
||||
expected_words = ["Hello", "there.", "How", "are", "you?", "I", "hope", "you", "are", "well."]
|
||||
while expected_words and not stop_processing_output_queue.is_set():
|
||||
try:
|
||||
actual_word:OutputQueueFrame = output_queue.get_nowait()
|
||||
actual_word:QueueFrame = output_queue.get_nowait()
|
||||
if actual_word.frame_type == FrameType.AUDIO_FRAME:
|
||||
time.sleep(0.1)
|
||||
word = expected_words.pop(0)
|
||||
@@ -177,3 +177,4 @@ class TestResponse(unittest.TestCase):
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
"""
|
||||
|
||||
1
src/samples/deprecated/README.md
Normal file
@@ -0,0 +1 @@
|
||||
These samples need to be updated! Don't rely on them.
|
||||
@@ -13,16 +13,18 @@ from dailyai.orchestrator import OrchestratorConfig, Orchestrator
|
||||
from dailyai.message_handler.message_handler import MessageHandler
|
||||
from dailyai.services.ai_services import AIServiceConfig
|
||||
from dailyai.services.azure_ai_services import AzureImageGenService, AzureTTSService, AzureLLMService
|
||||
from dailyai.services.deepgram_ai_services import DeepgramTTSService
|
||||
|
||||
def add_bot_to_room(room_url, token, expiration) -> None:
|
||||
|
||||
# A simple prompt for a simple sample.
|
||||
message_handler = MessageHandler(
|
||||
"""
|
||||
You are a sample bot, meant to demonstrate how to use an LLM with transcription at TTS.
|
||||
You are a sample bot in a WebRTC session. You'll receive input as transcriptions of user's
|
||||
speech, and your responses will be converted to audio via a TTS service.
|
||||
Answer user's questions and be friendly, and if you can, give some ideas about how someone
|
||||
could use a bot like you in a more in-depth way. Because your responses will be spoken,
|
||||
try to keep them short and sweet.
|
||||
try to keep them short.
|
||||
"""
|
||||
)
|
||||
|
||||
@@ -33,12 +35,6 @@ def add_bot_to_room(room_url, token, expiration) -> None:
|
||||
# - AZURE_CHATGPT_KEY
|
||||
# - AZURE_CHATGPT_ENDPOINT
|
||||
# - AZURE_CHATGPT_DEPLOYMENT_ID
|
||||
#
|
||||
# This demo doesn't use image generation, but if you extend it to do so,
|
||||
# you'll also need to set:
|
||||
# - AZURE_DALLE_KEY
|
||||
# - AZURE_DALLE_ENDPOINT
|
||||
# - AZURE_DALLE_DEPLOYMENT_ID
|
||||
|
||||
services = AIServiceConfig(
|
||||
tts=AzureTTSService(), image=None, llm=AzureLLMService()
|
||||
@@ -15,7 +15,7 @@ from dailyai.async_processor.async_processor import (
|
||||
OrchestratorResponse
|
||||
)
|
||||
from dailyai.orchestrator import OrchestratorConfig, Orchestrator
|
||||
from dailyai.output_queue import OutputQueueFrame, FrameType
|
||||
from dailyai.queue_frame import QueueFrame, FrameType
|
||||
from dailyai.message_handler.message_handler import MessageHandler
|
||||
from dailyai.services.ai_services import AIServiceConfig
|
||||
from dailyai.services.azure_ai_services import AzureImageGenService, AzureTTSService, AzureLLMService
|
||||
@@ -40,7 +40,7 @@ class StaticSpriteResponse(OrchestratorResponse):
|
||||
self.image_bytes = img.tobytes()
|
||||
|
||||
def do_play(self) -> None:
|
||||
self.output_queue.put(OutputQueueFrame(FrameType.IMAGE_FRAME, self.image_bytes))
|
||||
self.output_queue.put(QueueFrame(FrameType.IMAGE, self.image_bytes))
|
||||
|
||||
|
||||
class IntroSpriteResponse(StaticSpriteResponse):
|
||||
@@ -71,10 +71,10 @@ class AnimatedSpriteLLMResponse(LLMResponse):
|
||||
with Image.open(full_path) as img:
|
||||
self.image_bytes.append(img.tobytes())
|
||||
|
||||
def get_frames_from_tts_response(self, audio_frame) -> list[OutputQueueFrame]:
|
||||
def get_frames_from_tts_response(self, audio_frame) -> list[QueueFrame]:
|
||||
return [
|
||||
OutputQueueFrame(FrameType.AUDIO_FRAME, audio_frame),
|
||||
OutputQueueFrame(FrameType.IMAGE_FRAME, random.choice(self.image_bytes))
|
||||
QueueFrame(FrameType.AUDIO, audio_frame),
|
||||
QueueFrame(FrameType.IMAGE, random.choice(self.image_bytes))
|
||||
]
|
||||
|
||||
|
||||
@@ -83,10 +83,11 @@ def add_bot_to_room(room_url, token, expiration) -> None:
|
||||
# A simple prompt for a simple sample.
|
||||
message_handler = MessageHandler(
|
||||
"""
|
||||
You are a sample bot, meant to demonstrate how to use an LLM with transcription at TTS.
|
||||
You are a sample bot in a WebRTC session. You'll receive input as transcriptions of user's
|
||||
speech, and your responses will be converted to audio via a TTS service.
|
||||
Answer user's questions and be friendly, and if you can, give some ideas about how someone
|
||||
could use a bot like you in a more in-depth way. Because your responses will be spoken,
|
||||
try to keep them short and sweet.
|
||||
try to keep them short.
|
||||
"""
|
||||
)
|
||||
|
||||
|
Before Width: | Height: | Size: 871 KiB After Width: | Height: | Size: 871 KiB |
|
Before Width: | Height: | Size: 870 KiB After Width: | Height: | Size: 870 KiB |
|
Before Width: | Height: | Size: 871 KiB After Width: | Height: | Size: 871 KiB |
|
Before Width: | Height: | Size: 868 KiB After Width: | Height: | Size: 868 KiB |
51
src/samples/foundational/01-say-one-thing.py
Normal file
@@ -0,0 +1,51 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
|
||||
async def main(room_url):
|
||||
# create a transport service object using environment variables for
|
||||
# the transport service's API key, room url, and any other configuration.
|
||||
# services can all define and document the environment variables they use.
|
||||
# services all also take an optional config object that is used instead of
|
||||
# environment variables.
|
||||
#
|
||||
# the abstract transport service APIs presumably can map pretty closely
|
||||
# to the daily-python basic API
|
||||
meeting_duration_minutes = 1
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
None,
|
||||
"Say One Thing",
|
||||
meeting_duration_minutes,
|
||||
)
|
||||
transport.mic_enabled = True
|
||||
tts = ElevenLabsTTSService(voice_id="ErXwobaYiN019PkySvjV")
|
||||
|
||||
# Register an event handler so we can play the audio when the participant joins.
|
||||
@transport.event_handler("on_participant_joined")
|
||||
async def on_participant_joined(transport, participant):
|
||||
if participant["info"]["isLocal"]:
|
||||
return
|
||||
|
||||
await tts.say(
|
||||
"Hello there, " + participant["info"]["userName"] + "!",
|
||||
transport.send_queue,
|
||||
)
|
||||
|
||||
# wait for the output queue to be empty, then leave the meeting
|
||||
await transport.stop_when_done()
|
||||
|
||||
await transport.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
|
||||
parser.add_argument(
|
||||
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
|
||||
)
|
||||
|
||||
args, unknown = parser.parse_known_args()
|
||||
|
||||
asyncio.run(main(args.url))
|
||||
59
src/samples/foundational/01a-greet-user.py
Normal file
@@ -0,0 +1,59 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
import time
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from dailyai.queue_frame import QueueFrame, FrameType
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureTTSService
|
||||
from dailyai.services.deepgram_ai_services import DeepgramTTSService
|
||||
|
||||
async def main(room_url):
|
||||
# create a transport service object using environment variables for
|
||||
# the transport service's API key, room url, and any other configuration.
|
||||
# services can all define and document the environment variables they use.
|
||||
# services all also take an optional config object that is used instead of
|
||||
# environment variables.
|
||||
#
|
||||
# the abstract transport service APIs presumably can map pretty closely
|
||||
# to the daily-python basic API
|
||||
meeting_duration_minutes = 1
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
None,
|
||||
"Greeter",
|
||||
meeting_duration_minutes,
|
||||
)
|
||||
transport.mic_enabled = True
|
||||
|
||||
# similarly, create a tts service
|
||||
tts = DeepgramTTSService()
|
||||
|
||||
# Get the generator for the audio. This will start running in the background,
|
||||
# and when we ask the generator for its items, we'll get what it's generated.
|
||||
|
||||
# Register an event handler so we can play the audio when the participant joins.
|
||||
print("settting up handler")
|
||||
@transport.event_handler("on_participant_joined")
|
||||
async def on_participant_joined(transport, participant):
|
||||
print(f"participant joined: {participant['info']['userName']}")
|
||||
if participant["info"]["isLocal"]:
|
||||
return
|
||||
await tts.say(f"Hello there, {participant['info']['userName']}!", transport.send_queue)
|
||||
|
||||
print("setting up call state handler")
|
||||
@transport.event_handler("on_call_state_updated")
|
||||
async def on_call_joined(transport, state):
|
||||
print(f"call state callback: {state}")
|
||||
|
||||
await transport.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
|
||||
parser.add_argument(
|
||||
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
|
||||
)
|
||||
|
||||
args, unknown = parser.parse_known_args()
|
||||
asyncio.run(main(args.url))
|
||||
48
src/samples/foundational/02-llm-say-one-thing.py
Normal file
@@ -0,0 +1,48 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
|
||||
from dailyai.queue_frame import LLMMessagesQueueFrame
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
|
||||
async def main(room_url):
|
||||
meeting_duration_minutes = 1
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
None,
|
||||
"Say One Thing From an LLM",
|
||||
meeting_duration_minutes,
|
||||
)
|
||||
transport.mic_enabled = True
|
||||
|
||||
tts = ElevenLabsTTSService(voice_id="29vD33N1CtxCmqQRPOHJ")
|
||||
llm = AzureLLMService()
|
||||
|
||||
messages = [{
|
||||
"role": "system",
|
||||
"content": "You are an LLM in a WebRTC session, and this is a 'hello world' demo. Say hello to the world."
|
||||
}]
|
||||
tts_task = asyncio.create_task(
|
||||
tts.run_to_queue(
|
||||
transport.send_queue,
|
||||
llm.run([LLMMessagesQueueFrame(messages)]),
|
||||
)
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
await tts_task
|
||||
await transport.stop_when_done()
|
||||
|
||||
await transport.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
|
||||
parser.add_argument(
|
||||
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
|
||||
)
|
||||
|
||||
args, unknown = parser.parse_known_args()
|
||||
asyncio.run(main(args.url))
|
||||
44
src/samples/foundational/03-still-frame.py
Normal file
@@ -0,0 +1,44 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
|
||||
from dailyai.queue_frame import TextQueueFrame
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.open_ai_services import OpenAIImageGenService
|
||||
|
||||
local_joined = False
|
||||
participant_joined = False
|
||||
|
||||
async def main(room_url):
|
||||
meeting_duration_minutes = 1
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
None,
|
||||
"Show a still frame image",
|
||||
meeting_duration_minutes,
|
||||
)
|
||||
transport.mic_enabled = False
|
||||
transport.camera_enabled = True
|
||||
transport.camera_width = 1024
|
||||
transport.camera_height = 1024
|
||||
|
||||
imagegen = OpenAIImageGenService(image_size="1024x1024")
|
||||
image_task = asyncio.create_task(
|
||||
imagegen.run_to_queue(transport.send_queue, [TextQueueFrame("a cat in the style of picasso")])
|
||||
)
|
||||
|
||||
@transport.event_handler("on_participant_joined")
|
||||
async def on_participant_joined(transport, participant):
|
||||
await image_task
|
||||
|
||||
await transport.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
|
||||
parser.add_argument(
|
||||
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
|
||||
)
|
||||
|
||||
args, unknown = parser.parse_known_args()
|
||||
|
||||
asyncio.run(main(args.url))
|
||||
73
src/samples/foundational/04-utterance-and-speech.py
Normal file
@@ -0,0 +1,73 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
import re
|
||||
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.queue_frame import EndStreamQueueFrame, LLMMessagesQueueFrame
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
|
||||
async def main(room_url:str):
|
||||
global transport
|
||||
global llm
|
||||
global tts
|
||||
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
None,
|
||||
"Say Two Things Bot",
|
||||
1,
|
||||
)
|
||||
transport.mic_enabled = True
|
||||
transport.mic_sample_rate = 16000
|
||||
transport.camera_enabled = False
|
||||
|
||||
llm = AzureLLMService()
|
||||
azure_tts = AzureTTSService()
|
||||
elevenlabs_tts = ElevenLabsTTSService(voice_id="ErXwobaYiN019PkySvjV")
|
||||
|
||||
messages = [{"role": "system", "content": "tell the user a joke about llamas"}]
|
||||
|
||||
# Start a task to run the LLM to create a joke, and convert the LLM output to audio frames. This task
|
||||
# will run in parallel with generating and speaking the audio for static text, so there's no delay to
|
||||
# speak the LLM response.
|
||||
buffer_queue = asyncio.Queue()
|
||||
llm_response_task = asyncio.create_task(
|
||||
elevenlabs_tts.run_to_queue(
|
||||
buffer_queue,
|
||||
llm.run([LLMMessagesQueueFrame(messages)]),
|
||||
True,
|
||||
)
|
||||
)
|
||||
|
||||
@transport.event_handler("on_participant_joined")
|
||||
async def on_joined(transport, participant):
|
||||
if participant["id"] == transport.my_participant_id:
|
||||
return
|
||||
|
||||
await azure_tts.say("My friend the LLM is now going to tell a joke about llamas.", transport.send_queue)
|
||||
|
||||
async def buffer_to_send_queue():
|
||||
while True:
|
||||
frame = await buffer_queue.get()
|
||||
await transport.send_queue.put(frame)
|
||||
buffer_queue.task_done()
|
||||
if isinstance(frame, EndStreamQueueFrame):
|
||||
break
|
||||
|
||||
await asyncio.gather(llm_response_task, buffer_to_send_queue())
|
||||
|
||||
await transport.stop_when_done()
|
||||
|
||||
await transport.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
|
||||
parser.add_argument(
|
||||
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
|
||||
)
|
||||
|
||||
args, unknown = parser.parse_known_args()
|
||||
|
||||
asyncio.run(main(args.url))
|
||||
110
src/samples/foundational/05-sync-speech-and-text.py
Normal file
@@ -0,0 +1,110 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
|
||||
from dailyai.queue_frame import AudioQueueFrame, ImageQueueFrame
|
||||
from dailyai.services.azure_ai_services import AzureLLMService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.fal_ai_services import FalImageGenService
|
||||
|
||||
async def main(room_url):
|
||||
meeting_duration_minutes = 5
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
None,
|
||||
"Month Narration Bot",
|
||||
meeting_duration_minutes,
|
||||
)
|
||||
transport.mic_enabled = True
|
||||
transport.mic_sample_rate = 16000
|
||||
|
||||
transport.camera_enabled = True
|
||||
transport.camera_width = 1024
|
||||
transport.camera_height = 1024
|
||||
|
||||
llm = AzureLLMService()
|
||||
dalle = FalImageGenService(image_size="1024x1024")
|
||||
tts = ElevenLabsTTSService(voice_id="ErXwobaYiN019PkySvjV")
|
||||
# dalle = OpenAIImageGenService(image_size="1024x1024")
|
||||
|
||||
# Get a complete audio chunk from the given text. Splitting this into its own
|
||||
# coroutine lets us ensure proper ordering of the audio chunks on the send queue.
|
||||
async def get_all_audio(text):
|
||||
all_audio = bytearray()
|
||||
async for audio in tts.run_tts(text):
|
||||
all_audio.extend(audio)
|
||||
|
||||
return all_audio
|
||||
|
||||
async def get_month_data(month):
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": f"Describe a nature photograph suitable for use in a calendar, for the month of {month}. Include only the image description with no preamble. Limit the description to one sentence, please.",
|
||||
}
|
||||
]
|
||||
|
||||
image_description = await llm.run_llm(messages)
|
||||
if not image_description:
|
||||
return
|
||||
|
||||
to_speak = f"{month}: {image_description}"
|
||||
audio_task = asyncio.create_task(get_all_audio(to_speak))
|
||||
image_task = asyncio.create_task(dalle.run_image_gen(image_description))
|
||||
(audio, image_data) = await asyncio.gather(
|
||||
audio_task, image_task
|
||||
)
|
||||
|
||||
return {
|
||||
"month": month,
|
||||
"text": image_description,
|
||||
"image_url": image_data[0],
|
||||
"image": image_data[1],
|
||||
"audio": audio,
|
||||
}
|
||||
|
||||
months: list[str] = [
|
||||
"January",
|
||||
"February",
|
||||
"March",
|
||||
"April",
|
||||
"May",
|
||||
"June",
|
||||
"July",
|
||||
"August",
|
||||
"September",
|
||||
"October",
|
||||
"November",
|
||||
"December",
|
||||
]
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
# This will play the months in the order they're completed. The benefit
|
||||
# is we'll have as little delay as possible before the first month, and
|
||||
# likely no delay between months, but the months won't display in order.
|
||||
for month_data_task in asyncio.as_completed(month_tasks):
|
||||
data = await month_data_task
|
||||
await transport.send_queue.put(
|
||||
[
|
||||
ImageQueueFrame(data["image_url"], data["image"]),
|
||||
AudioQueueFrame(data["audio"]),
|
||||
]
|
||||
)
|
||||
|
||||
# wait for the output queue to be empty, then leave the meeting
|
||||
await transport.stop_when_done()
|
||||
|
||||
month_tasks = [asyncio.create_task(get_month_data(month)) for month in months]
|
||||
|
||||
await transport.run()
|
||||
|
||||
if __name__=="__main__":
|
||||
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
|
||||
parser.add_argument(
|
||||
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
|
||||
)
|
||||
|
||||
args, unknown = parser.parse_known_args()
|
||||
|
||||
asyncio.run(main(args.url))
|
||||
91
src/samples/foundational/06-listen-and-respond.py
Normal file
@@ -0,0 +1,91 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
import requests
|
||||
import time
|
||||
import urllib.parse
|
||||
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.queue_aggregators import LLMContextAggregator
|
||||
|
||||
async def main(room_url:str, token):
|
||||
global transport
|
||||
global llm
|
||||
global tts
|
||||
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
5,
|
||||
)
|
||||
transport.mic_enabled = True
|
||||
transport.mic_sample_rate = 16000
|
||||
transport.camera_enabled = False
|
||||
|
||||
llm = AzureLLMService()
|
||||
tts = AzureTTSService()
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
await tts.say("Hi, I'm listening!", transport.send_queue)
|
||||
|
||||
async def handle_transcriptions():
|
||||
messages = [
|
||||
{"role": "system", "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way."},
|
||||
]
|
||||
|
||||
tma_in = LLMContextAggregator(
|
||||
messages, "user", transport.my_participant_id
|
||||
)
|
||||
tma_out = LLMContextAggregator(
|
||||
messages, "assistant", transport.my_participant_id
|
||||
)
|
||||
await tts.run_to_queue(
|
||||
transport.send_queue,
|
||||
tma_out.run(
|
||||
llm.run(
|
||||
tma_in.run(
|
||||
transport.get_receive_frames()
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
transport.transcription_settings["extra"]["punctuate"] = True
|
||||
await asyncio.gather(transport.run(), handle_transcriptions())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
|
||||
parser.add_argument(
|
||||
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-k",
|
||||
"--apikey",
|
||||
type=str,
|
||||
required=True,
|
||||
help="Daily API Key (needed to create token)",
|
||||
)
|
||||
|
||||
args, unknown = parser.parse_known_args()
|
||||
|
||||
# Create a meeting token for the given room with an expiration 1 hour in the future.
|
||||
room_name: str = urllib.parse.urlparse(args.url).path[1:]
|
||||
expiration: float = time.time() + 60 * 60
|
||||
|
||||
res: requests.Response = requests.post(
|
||||
f"https://api.daily.co/v1/meeting-tokens",
|
||||
headers={"Authorization": f"Bearer {args.apikey}"},
|
||||
json={
|
||||
"properties": {"room_name": room_name, "is_owner": True, "exp": expiration}
|
||||
},
|
||||
)
|
||||
|
||||
if res.status_code != 200:
|
||||
raise Exception(f"Failed to create meeting token: {res.status_code} {res.text}")
|
||||
|
||||
token: str = res.json()["token"]
|
||||
|
||||
asyncio.run(main(args.url, token))
|
||||
67
src/samples/foundational/09-bots-arguing.py
Normal file
@@ -0,0 +1,67 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
import requests
|
||||
import time
|
||||
import urllib.parse
|
||||
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from dailyai.queue_frame import QueueFrame, FrameType
|
||||
|
||||
async def main(room_url:str):
|
||||
global transport
|
||||
global llm
|
||||
global tts
|
||||
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
None,
|
||||
"Respond bot",
|
||||
5,
|
||||
)
|
||||
transport.mic_enabled = True
|
||||
transport.mic_sample_rate = 16000
|
||||
transport.camera_enabled = False
|
||||
|
||||
llm = AzureLLMService()
|
||||
tts1 = AzureTTSService()
|
||||
tts2 = ElevenLabsTTSService()
|
||||
|
||||
async def argue():
|
||||
bot1_messages = [
|
||||
{"role": "system", "content": "You strongly believe that a hot dog is a sandwich. Start by stating this fact in a few sentences, then be prepared to debate this with the user. Your responses should only be a few sentences long."},
|
||||
]
|
||||
bot2_messages = [
|
||||
{"role": "system", "content": "You strongly believe that a hot dog is not a sandwich. Debate this with the user, only responding with a few sentences."},
|
||||
]
|
||||
|
||||
for i in range(1, 5):
|
||||
print(f"In iteration {i}")
|
||||
# Run the LLMs synchronously for the back-and-forth
|
||||
bot1_msg = await llm.run_llm(bot1_messages)
|
||||
print(f"bot1_msg: {bot1_msg}")
|
||||
bot1_messages.append({"role": "assistant", "content": bot1_msg})
|
||||
bot2_messages.append({"role": "user", "content": bot1_msg})
|
||||
|
||||
await tts1.say(bot1_msg, transport.send_queue)
|
||||
|
||||
bot2_msg = await llm.run_llm(bot2_messages)
|
||||
print(f"bot2_msg: {bot2_msg}")
|
||||
bot2_messages.append({"role": "assistant", "content": bot2_msg})
|
||||
bot1_messages.append({"role": "user", "content": bot2_msg})
|
||||
|
||||
await tts2.say(bot2_msg, transport.send_queue)
|
||||
|
||||
await asyncio.gather(transport.run(), argue())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
|
||||
parser.add_argument(
|
||||
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
|
||||
)
|
||||
|
||||
args, unknown = parser.parse_known_args()
|
||||
|
||||
asyncio.run(main(args.url))
|
||||
109
src/samples/foundational/09a-characters-arguing.py
Normal file
@@ -0,0 +1,109 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
import requests
|
||||
import time
|
||||
import urllib.parse
|
||||
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from dailyai.services.fal_ai_services import FalImageGenService
|
||||
from dailyai.services.open_ai_services import OpenAIImageGenService
|
||||
from dailyai.queue_frame import QueueFrame, FrameType
|
||||
|
||||
async def main(room_url:str):
|
||||
global transport
|
||||
global llm
|
||||
global tts
|
||||
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
None,
|
||||
"Respond bot",
|
||||
600,
|
||||
)
|
||||
transport.mic_enabled = True
|
||||
transport.mic_sample_rate = 16000
|
||||
transport.camera_enabled = True
|
||||
transport.camera_width = 1024
|
||||
transport.camera_height = 1024
|
||||
|
||||
llm = AzureLLMService()
|
||||
tts1 = AzureTTSService()
|
||||
tts2 = ElevenLabsTTSService()
|
||||
dalle = FalImageGenService(image_size="1024x1024")
|
||||
# dalle = OpenAIImageGenService(image_size="1024x1024")
|
||||
|
||||
bot1_messages = [
|
||||
{"role": "system", "content": "You are a stern librarian. You strongly believe that a hot dog is a sandwich. Start by stating this fact in a few sentences, then be prepared to debate this with the user. You shouldn't ever compromise on the fundamental truth that a hot dog is a sandwich. Your responses should only be a few sentences long."},
|
||||
]
|
||||
bot2_messages = [
|
||||
{"role": "system", "content": "You are a silly cat, and you strongly believe that a hot dog is not a sandwich. Debate this with the user, only responding with a few sentences. Don't ever accept that a hot dog is a sandwich."},
|
||||
]
|
||||
|
||||
async def get_bot1_statement():
|
||||
# Run the LLMs synchronously for the back-and-forth
|
||||
bot1_msg = await llm.run_llm(bot1_messages)
|
||||
print(f"bot1_msg: {bot1_msg}")
|
||||
bot1_messages.append({"role": "assistant", "content": bot1_msg})
|
||||
bot2_messages.append({"role": "user", "content": bot1_msg})
|
||||
|
||||
all_audio = bytearray()
|
||||
async for audio in tts1.run_tts(bot1_msg):
|
||||
all_audio.extend(audio)
|
||||
|
||||
return all_audio
|
||||
|
||||
async def get_bot2_statement():
|
||||
# Run the LLMs synchronously for the back-and-forth
|
||||
bot2_msg = await llm.run_llm(bot2_messages)
|
||||
print(f"bot2_msg: {bot2_msg}")
|
||||
bot2_messages.append({"role": "assistant", "content": bot2_msg})
|
||||
bot1_messages.append({"role": "user", "content": bot2_msg})
|
||||
|
||||
all_audio = bytearray()
|
||||
async for audio in tts2.run_tts(bot2_msg):
|
||||
all_audio.extend(audio)
|
||||
|
||||
return all_audio
|
||||
|
||||
async def argue():
|
||||
for i in range(100):
|
||||
print(f"In iteration {i}")
|
||||
|
||||
bot1_description = "A woman conservatively dressed as a librarian in a library surrounded by books, cartoon, serious, highly detailed"
|
||||
|
||||
(audio1, image_data1) = await asyncio.gather(
|
||||
get_bot1_statement(), dalle.run_image_gen(bot1_description)
|
||||
)
|
||||
await transport.send_queue.put(
|
||||
[
|
||||
QueueFrame(FrameType.IMAGE, image_data1[1]),
|
||||
QueueFrame(FrameType.AUDIO, audio1),
|
||||
]
|
||||
)
|
||||
|
||||
bot2_description = "A cat dressed in a hot dog costume, cartoon, bright colors, funny, highly detailed"
|
||||
|
||||
(audio2, image_data2) = await asyncio.gather(
|
||||
get_bot2_statement(), dalle.run_image_gen(bot2_description)
|
||||
)
|
||||
await transport.send_queue.put(
|
||||
[
|
||||
QueueFrame(FrameType.IMAGE, image_data2[1]),
|
||||
QueueFrame(FrameType.AUDIO, audio2),
|
||||
]
|
||||
)
|
||||
|
||||
await asyncio.gather(transport.run(), argue())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
|
||||
parser.add_argument(
|
||||
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
|
||||
)
|
||||
|
||||
args, unknown = parser.parse_known_args()
|
||||
|
||||
asyncio.run(main(args.url))
|
||||
119
src/samples/foundational/09b-debate-generator.py
Normal file
@@ -0,0 +1,119 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
import requests
|
||||
import time
|
||||
import urllib.parse
|
||||
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from dailyai.services.fal_ai_services import FalImageGenService
|
||||
from dailyai.services.open_ai_services import OpenAIImageGenService
|
||||
from dailyai.queue_frame import QueueFrame, FrameType
|
||||
|
||||
async def main(room_url:str):
|
||||
global transport
|
||||
global llm
|
||||
global tts
|
||||
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
None,
|
||||
"Respond bot",
|
||||
600,
|
||||
)
|
||||
transport.mic_enabled = True
|
||||
transport.mic_sample_rate = 16000
|
||||
transport.camera_enabled = True
|
||||
transport.camera_width = 1024
|
||||
transport.camera_height = 1024
|
||||
|
||||
llm = AzureLLMService()
|
||||
tts1 = AzureTTSService()
|
||||
tts2 = ElevenLabsTTSService()
|
||||
dalle = FalImageGenService(image_size="1024x1024")
|
||||
# dalle = OpenAIImageGenService(image_size="1024x1024")
|
||||
|
||||
topic = "Are pokemon edible?"
|
||||
affirmative = "A woman dressed as a cowboy, outside on a ranch"
|
||||
negative = "Pikachu in a business suit"
|
||||
|
||||
topic = "Is a hot dog a sandwich?"
|
||||
affirmative = "A woman conservatively dressed as a librarian in a library surrounded by books"
|
||||
negative = "A cat dressed in a hot dog costume"
|
||||
|
||||
|
||||
|
||||
bot1_messages = [
|
||||
{"role": "system", "content": f"You are {affirmative}. You're in a debate, and the topic is: '{topic}'. You're arguing the affirmative. Start by stating this fact in a few sentences, then be prepared to debate this with the user. You shouldn't ever agree with the user. Your responses should only be a few sentences long."},
|
||||
]
|
||||
bot2_messages = [
|
||||
{"role": "system", "content": f"You are {negative}. You're in a debate, and the topic is: '{topic}'. You're arguing the negative. Debate this with the user, only responding with a few sentences. Don't ever agree with the user."},
|
||||
]
|
||||
|
||||
async def get_bot1_statement():
|
||||
# Run the LLMs synchronously for the back-and-forth
|
||||
bot1_msg = await llm.run_llm(bot1_messages)
|
||||
print(f"bot1_msg: {bot1_msg}")
|
||||
bot1_messages.append({"role": "assistant", "content": bot1_msg})
|
||||
bot2_messages.append({"role": "user", "content": bot1_msg})
|
||||
|
||||
all_audio = bytearray()
|
||||
async for audio in tts1.run_tts(bot1_msg):
|
||||
all_audio.extend(audio)
|
||||
|
||||
return all_audio
|
||||
|
||||
async def get_bot2_statement():
|
||||
# Run the LLMs synchronously for the back-and-forth
|
||||
bot2_msg = await llm.run_llm(bot2_messages)
|
||||
print(f"bot2_msg: {bot2_msg}")
|
||||
bot2_messages.append({"role": "assistant", "content": bot2_msg})
|
||||
bot1_messages.append({"role": "user", "content": bot2_msg})
|
||||
|
||||
all_audio = bytearray()
|
||||
async for audio in tts2.run_tts(bot2_msg):
|
||||
all_audio.extend(audio)
|
||||
|
||||
return all_audio
|
||||
|
||||
async def argue():
|
||||
for i in range(100):
|
||||
print(f"In iteration {i}")
|
||||
|
||||
bot1_description = f"{affirmative}, cartoon, highly detailed"
|
||||
|
||||
(audio1, image_data1) = await asyncio.gather(
|
||||
get_bot1_statement(), dalle.run_image_gen(bot1_description)
|
||||
)
|
||||
await transport.send_queue.put(
|
||||
[
|
||||
QueueFrame(FrameType.IMAGE, image_data1[1]),
|
||||
QueueFrame(FrameType.AUDIO, audio1),
|
||||
]
|
||||
)
|
||||
|
||||
bot2_description = f"{negative}, cartoon, bright colors, funny, highly detailed"
|
||||
|
||||
(audio2, image_data2) = await asyncio.gather(
|
||||
get_bot2_statement(), dalle.run_image_gen(bot2_description)
|
||||
)
|
||||
await transport.send_queue.put(
|
||||
[
|
||||
QueueFrame(FrameType.IMAGE, image_data2[1]),
|
||||
QueueFrame(FrameType.AUDIO, audio2),
|
||||
]
|
||||
)
|
||||
|
||||
await asyncio.gather(transport.run(), argue())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
|
||||
parser.add_argument(
|
||||
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
|
||||
)
|
||||
|
||||
args, unknown = parser.parse_known_args()
|
||||
|
||||
asyncio.run(main(args.url))
|
||||
113
src/samples/image-gen.py
Normal file
@@ -0,0 +1,113 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
import requests
|
||||
import time
|
||||
import urllib.parse
|
||||
import random
|
||||
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.queue_frame import QueueFrame, FrameType
|
||||
from dailyai.services.fal_ai_services import FalImageGenService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
|
||||
async def main(room_url:str, token):
|
||||
global transport
|
||||
global llm
|
||||
global tts
|
||||
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
token,
|
||||
"Imagebot",
|
||||
1,
|
||||
)
|
||||
transport.mic_enabled = True
|
||||
transport.camera_enabled = True
|
||||
transport.mic_sample_rate = 16000
|
||||
transport.camera_width = 1024
|
||||
transport.camera_height = 1024
|
||||
|
||||
llm = AzureLLMService()
|
||||
tts = AzureTTSService()
|
||||
img = FalImageGenService()
|
||||
|
||||
|
||||
async def handle_transcriptions():
|
||||
print("handle_transcriptions got called")
|
||||
|
||||
sentence = ""
|
||||
async for message in transport.get_transcriptions():
|
||||
print(f"transcription message: {message}")
|
||||
if message["session_id"] == transport.my_participant_id:
|
||||
continue
|
||||
finder = message["text"].find("start over")
|
||||
print(f"finder: {finder}")
|
||||
if finder >= 0:
|
||||
async for audio in tts.run_tts(f"Resetting."):
|
||||
transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio))
|
||||
sentence = ""
|
||||
continue
|
||||
# todo: we could differentiate between transcriptions from different participants
|
||||
sentence += f" {message['text']}"
|
||||
print(f"sentence is now: {sentence}")
|
||||
# TODO: Cache this audio
|
||||
phrase = random.choice(["OK.", "Got it.", "Sure.", "You bet.", "Sure thing."])
|
||||
async for audio in tts.run_tts(phrase):
|
||||
transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio))
|
||||
img_result = img.run_image_gen(sentence, "1024x1024")
|
||||
awaited_img = await asyncio.gather(img_result)
|
||||
transport.output_queue.put(
|
||||
[
|
||||
QueueFrame(FrameType.IMAGE_FRAME, awaited_img[0][1]),
|
||||
]
|
||||
)
|
||||
|
||||
@transport.event_handler("on_participant_joined")
|
||||
async def on_participant_joined(transport, participant):
|
||||
print(f"participant joined: {participant['info']['userName']}")
|
||||
if participant["info"]["isLocal"]:
|
||||
return
|
||||
async for audio in tts.run_tts("Describe an image, and I'll create it."):
|
||||
audio_generator = tts.run_tts(f"Hello, {participant['info']['userName']}! Describe an image and I'll create it. To start over, just say 'start over'.")
|
||||
async for audio in audio_generator:
|
||||
transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio))
|
||||
|
||||
transport.transcription_settings["extra"]["punctuate"] = False
|
||||
transport.transcription_settings["extra"]["endpointing"] = False
|
||||
await asyncio.gather(transport.run(), handle_transcriptions())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
|
||||
parser.add_argument(
|
||||
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-k",
|
||||
"--apikey",
|
||||
type=str,
|
||||
required=True,
|
||||
help="Daily API Key (needed to create token)",
|
||||
)
|
||||
|
||||
args, unknown = parser.parse_known_args()
|
||||
|
||||
# Create a meeting token for the given room with an expiration 1 hour in the future.
|
||||
room_name: str = urllib.parse.urlparse(args.url).path[1:]
|
||||
expiration: float = time.time() + 60 * 60
|
||||
|
||||
res: requests.Response = requests.post(
|
||||
f"https://api.daily.co/v1/meeting-tokens",
|
||||
headers={"Authorization": f"Bearer {args.apikey}"},
|
||||
json={
|
||||
"properties": {"room_name": room_name, "is_owner": True, "exp": expiration}
|
||||
},
|
||||
)
|
||||
|
||||
if res.status_code != 200:
|
||||
raise Exception(f"Failed to create meeting token: {res.status_code} {res.text}")
|
||||
|
||||
token: str = res.json()["token"]
|
||||
|
||||
asyncio.run(main(args.url, token))
|
||||