Compare commits

...

9 Commits

Author SHA1 Message Date
Chad Bailey
9c154c3d49 requirements cleanup 2024-01-24 19:53:05 +00:00
Chad Bailey
96256e90cb gunicorn 2024-01-24 19:12:54 +00:00
Chad Bailey
6f75db4d54 flask_cors 2024-01-24 19:01:19 +00:00
Chad Bailey
127fddfb1e added dotenv 2024-01-24 18:49:40 +00:00
Chad Bailey
5231243795 trying requirements.txt 2024-01-24 18:45:18 +00:00
Chad Bailey
cba14c2002 added flask to module build 2024-01-24 18:25:27 +00:00
Chad Bailey
8ae61bf2ac added health check 2024-01-24 18:22:59 +00:00
Chad Bailey
bc6849b255 added web server 2024-01-24 18:05:41 +00:00
Chad Bailey
9bbd14d5e7 WIP: golden kitty 2024-01-23 21:46:02 +00:00
11 changed files with 301 additions and 4 deletions

View File

@@ -55,20 +55,31 @@ export $(grep -v '^#' .env | xargs)
```
## Overview
The Daily AI SDK allows you to build applications that can participate in WebRTC sessions and interact with AI Services. Some examples of what you can build with this:
* conversational bots that interact 1:1 with a user, using voice recognition and text-to-speech
* assistant bots that aggregate transcriptions from multiple participants in a meeting and provide realtime summaries or other AI-generated output.
* image-recognition bots
* etc
- conversational bots that interact 1:1 with a user, using voice recognition and text-to-speech
- assistant bots that aggregate transcriptions from multiple participants in a meeting and provide realtime summaries or other AI-generated output.
- image-recognition bots
- etc
## Concepts
### Transport Service
The SDK provides one “transport service”, which is a wrapper around Dailys `daily-python` client (tk add link). You can use this service to listen for events related to a WebRTC session, such as “a participant joined the meeting”.
The transport service also exposes a send queue, and a receive queue. You can use the send queue to send audio and video to the WebRTC session, and you can listen to the receive queue to see audio, video and transcription data from the WebRTC session.
### AI Services
The AI Service classes provide wrappers around various AI providers, and allow you to query LLMs, convert text to speech and make images from text. The audio and images can then be placed on the transport services send queue, where theyll be sent to the WebRTC session.
### Queue Frames
Communication between the transport service and AI services, and between various AI services, takes place in Queue Frames. These frames contain an indication of the type of data as well as the data itself.
## Using Transports, AI Services and Frames
AI Services all define a `.run` method. This method consumes and generates `QueueFrame` frames. The kind of frames that can be consumed and generated depend on the kind of service. For instance, an LLM AI Service consumes `LLM_MESSAGE` frames (which define a history of interaction with an LLM) and emit `TEXT` frames (the response from the LLM).
The `.run` method is an `AsyncIterable`, and it takes an `iterable`, `AsyncIterable` or `asyncio.Queue` that produces QueueFrames as a parameter. This makes it easy to chain AI Services, and consume input from the Transports `receive_queue` .
@@ -76,18 +87,25 @@ The `.run` method is an `AsyncIterable`, and it takes an `iterable`, `AsyncItera
AI Services also have a `.run_to_queue` method. This method is not an AsyncIterable, but instead sends processed QueueFrames to a queue. This makes it easy to send the output of an AI Service to the Transports `send_queue`.
AI Services also define convenience functions that let you bypass creating QueueFrames for some simple cases (eg. using the TTS service to convert a string to audio output and send that audio to the transports `send_queue`). See below for examples.
## Examples
### Say Something
The base TTS AI service exposes a `.say` method. After creating a transport and TTS service, you can use this method like so:
```
transport = DailyTransportService(...)
tts = AzureTTSService()
await tts.say("hello world", transport.send_queue)
```
This will call the TTS service to render the text to audio frames, then put the audio frames on the transports send queue. The transport will then send those frames along to the WebRTC session.
### Speak an LLM response
Given a system prompt contained in a `messages` array, you can emit the LLMs response as audio with a chain like this:
```
transport = DailyTransportService(...) # setup parameters omitted
tts = AzureTTSService()
@@ -99,14 +117,17 @@ await tts.run_to_queue(
llm.run([QueueFrame.LLM_MESSAGES, messages])
)
```
In this code, the LLM service object sends the messages to Azures OpenAI implementation, which streams chunks back asynchronously. Those chunks are aggregated by the TTS Service to ensure the best audio response (TTS works best when it gets complete sentence, so it can inflect correctly), then sent to Azures TTS service, converted to audio frames, and sent to the WebRTC session via the Daily transport.
### Pre-cache an LLM response
Sometimes LLMs can be slower than wed like for natural-feeling communication. Heres an example where we take advantage of the time it takes to speak some pre-defined text to get a head start on the LLM response:
(TK link to 04- sample)
In this sample, we set up a buffer queue to receive the audio frames from the LLM response before while we are joining the call and start an asynchronous task to start filling this buffer:
```
buffer_queue = asyncio.Queue()
llm_response_task = asyncio.create_task(
@@ -119,11 +140,13 @@ In this sample, we set up a buffer queue to receive the audio frames from the LL
```
Then, when weve joined the call, we speak the static text:
```
await azure_tts.say("My friend...", transport.send_queue)
```
As that text is being spoken, the asynchronous LLM task continues in the background. When the text is done, we pull the frames off the buffer queue and put them in the transports `send_queue`:
```
async def buffer_to_send_queue():
while True:
@@ -138,3 +161,11 @@ As that text is being spoken, the asynchronous LLM task continues in the backgro
```
One thing to note here is the last parameter to `run_to_queue` in the first code clause above: this causes the `run_to_queue` method to send an `END_STREAM` frame when its done rendering. This lets us know when to stop our `buffer_to_send_queue` task above.
## Test Server
To start the test server:
```python
flask --app daily-bot-manager.py --debug run
```

26
auth.py Normal file
View File

@@ -0,0 +1,26 @@
import time
import urllib
from dotenv import load_dotenv
import requests
from flask import jsonify
import os
load_dotenv()
def get_meeting_token(room_name, daily_api_key, token_expiry):
api_path = os.getenv('DAILY_API_PATH') or 'https://api.daily.co/v1'
if not token_expiry:
token_expiry = time.time() + 600
res = requests.post(f'{api_path}/meeting-tokens',
headers={'Authorization': f'Bearer {daily_api_key}'},
json={'properties': {'room_name': room_name, 'is_owner': True, 'exp': token_expiry}})
if res.status_code != 200:
return jsonify({'error': 'Unable to create meeting token', 'detail': res.text}), 500
meeting_token = res.json()['token']
return meeting_token
def get_room_name(room_url):
return urllib.parse.urlparse(room_url).path[1:]

93
daily-bot-manager.py Normal file
View File

@@ -0,0 +1,93 @@
import os
import requests
import subprocess
import time
from flask import Flask, jsonify, request
from flask_cors import CORS
from auth import get_meeting_token
app = Flask(__name__)
CORS(app)
def start_bot(bot_path, args=None):
daily_api_key = os.getenv("DAILY_API_KEY")
api_path = os.getenv("DAILY_API_PATH") or "https://api.daily.co/v1"
timeout = int(os.getenv("ROOM_TIMEOUT") or os.getenv("BOT_MAX_DURATION") or 300)
exp = time.time() + timeout
res = requests.post(
f"{api_path}/rooms",
headers={"Authorization": f"Bearer {daily_api_key}"},
json={
"properties": {
"exp": exp,
"enable_chat": True,
"enable_emoji_reactions": True,
"eject_at_room_exp": True,
"enable_prejoin_ui": False,
}
},
)
if res.status_code != 200:
return (
jsonify(
{
"error": "Unable to create room",
"status_code": res.status_code,
"text": res.text,
}
),
500,
)
room_url = res.json()["url"]
room_name = res.json()["name"]
meeting_token = get_meeting_token(room_name, daily_api_key, exp)
if args:
extra_args = " ".join([f'-{x[0]} "{x[1]}"' for x in args])
else:
extra_args = ""
otel_path = "opentelemetry-instrument" if os.getenv("USE_OTEL") else ""
print("using otel path: ", otel_path, os.getenv("USE_OTEL"))
proc = subprocess.Popen(
[
f"{otel_path} python {bot_path} -u {room_url} -t {meeting_token} -k {daily_api_key} {extra_args}"
],
shell=True,
bufsize=1,
)
# Don't return until the bot has joined the room, but wait for at most 2 seconds.
attempts = 0
while attempts < 20:
time.sleep(0.1)
attempts += 1
res = requests.get(
f"{api_path}/rooms/{room_name}/get-session-data",
headers={"Authorization": f"Bearer {daily_api_key}"},
)
if res.status_code == 200:
break
print(f"Took {attempts} attempts to join room {room_name}")
# Additional client config
config = {}
if os.getenv("CLIENT_VAD_TIMEOUT_SEC"):
config['vad_timeout_sec'] = float(os.getenv("CLIENT_VAD_TIMEOUT_SEC"))
else:
config['vad_timeout_sec'] = 1.5
return jsonify({"room_url": room_url, "token": meeting_token, "config": config}), 200
@app.route("/spin-up-kitty", methods=["POST"])
def spin_up_kitty():
return start_bot("./src/samples/foundational/06a-golden-kitty.py")
@app.route("/healthz")
def health_check():
return "ok", 200

View File

@@ -16,6 +16,7 @@ dependencies = [
"pyht",
"opentelemetry-sdk",
"aiohttp",
"flask",
"fal"
]

View File

@@ -1,3 +1,8 @@
build==1.0.3
packaging==23.2
pyproject_hooks==1.0.0
aiohttp
flask
flask_cors
gunicorn
python-dotenv

View File

@@ -20,6 +20,10 @@ class ImageQueueFrame(QueueFrame):
url: str | None
image: bytes
@dataclass()
class ImageListQueueFrame(QueueFrame):
images: list[bytes] | None
@dataclass()
class TextQueueFrame(QueueFrame):
text: str

View File

@@ -12,6 +12,7 @@ from dailyai.queue_frame import (
AudioQueueFrame,
EndStreamQueueFrame,
ImageQueueFrame,
ImageListQueueFrame,
QueueFrame,
StartStreamQueueFrame,
TranscriptionQueueFrame,
@@ -149,6 +150,7 @@ class DailyTransportService(EventHandler):
)
self.image: bytes | None = None
self.images: list[bytes] | None = None
self.camera_thread = Thread(target=self.run_camera, daemon=True)
self.camera_thread.start()
@@ -307,12 +309,22 @@ class DailyTransportService(EventHandler):
def set_image(self, image: bytes):
self.image: bytes | None = image
self.images: list[bytes] | None = None
def set_images(self, images: list[bytes], start_frame=0):
self.images: list[bytes] | None = images
self.image = None
self.current_frame = start_frame
def run_camera(self):
try:
while not self.stop_threads.is_set():
if self.image:
self.camera.write_frame(self.image)
if self.images:
this_frame = self.current_frame % len(self.images)
self.camera.write_frame(self.sprites[self.images[this_frame]])
self.current_frame = this_frame + 1
time.sleep(1.0 / 8) # 8 fps
except Exception as e:
@@ -354,6 +366,8 @@ class DailyTransportService(EventHandler):
b = b[l:]
elif isinstance(frame, ImageQueueFrame):
self.set_image(frame.image)
elif isinstance(frame, ImageListQueueFrame):
self.set_images(frame.images)
elif len(b):
self.mic.write_frames(bytes(b))
b = bytearray()

View File

@@ -0,0 +1,123 @@
import argparse
import asyncio
import requests
import time
import urllib.parse
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.fal_ai_services import FalImageGenService
from dailyai.services.open_ai_services import OpenAIImageGenService
from dailyai.queue_aggregators import LLMContextAggregator
from dailyai.queue_frame import LLMMessagesQueueFrame, QueueFrame, TextQueueFrame
from dailyai.services.ai_services import AIService
from typing import AsyncGenerator, List
class TranscriptFilter(AIService):
def __init__(self, bot_participant_id=None):
self.bot_participant_id = bot_participant_id
async def process_frame(self, frame:QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if frame.participantId != self.bot_participant_id:
yield frame
async def main(room_url:str, token):
global transport
global llm
global tts
transport = DailyTransportService(
room_url,
token,
"The Golden Kitty",
5,
)
transport.mic_enabled = True
transport.mic_sample_rate = 16000
transport.camera_enabled = True
transport.camera_width = 1024
transport.camera_height = 1024
llm = AzureLLMService()
tts = ElevenLabsTTSService()
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
await tts.say("Hi, I'm listening!", transport.send_queue)
async def handle_transcriptions():
messages = [
{"role": "system", "content": "You are the Golden Kitty, the mascot for Product Hunt's annual awards. You are a cat who knows everything about all the cool new tech startups. You should be clever, and a bit sarcastic. You should also tell jokes every once in a while. Your responses should only be a few sentences long."},
]
tma_in = LLMContextAggregator(
messages, "user", transport.my_participant_id
)
tma_out = LLMContextAggregator(
messages, "assistant", transport.my_participant_id
)
tf = TranscriptFilter(transport.my_participant_id)
await tts.run_to_queue(
transport.send_queue,
tma_out.run(
llm.run(
tma_in.run(
tf.run(
transport.get_receive_frames()
)
)
)
)
)
async def make_cats():
imagegen = OpenAIImageGenService(image_size="1024x1024")
while True:
print("generating new image")
await imagegen.run_to_queue(transport.send_queue, [TextQueueFrame("a golden kitty trophy, cartoon, colorful, detailed, 4k")])
await asyncio.sleep(10)
transport.transcription_settings["extra"]["punctuate"] = True
await asyncio.gather(transport.run(), handle_transcriptions(), make_cats())
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
type=str,
required=True,
help="Daily API Key (needed to create token)",
)
args, unknown = parser.parse_known_args()
# Create a meeting token for the given room with an expiration 24 hours in the future.
room_name: str = urllib.parse.urlparse(args.url).path[1:]
expiration: float = time.time() + 60 * 60 * 24
res: requests.Response = requests.post(
f"https://api.daily.co/v1/meeting-tokens",
headers={"Authorization": f"Bearer {args.apikey}"},
json={
"properties": {"room_name": room_name, "is_owner": True, "exp": expiration}
},
)
if res.status_code != 200:
raise Exception(f"Failed to create meeting token: {res.status_code} {res.text}")
token: str = res.json()["token"]
asyncio.run(main(args.url, token))

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 MiB