Don't create aiohttp sessions inside services

This commit is contained in:
Moishe Lettvin
2024-01-26 12:30:37 -05:00
parent fcceb32bd7
commit e81f247845
13 changed files with 401 additions and 408 deletions

View File

@@ -101,84 +101,53 @@ class AzureImageGenServiceREST(ImageGenService):
def __init__(
self,
image_size: str,
aiohttp_session:aiohttp.ClientSession,
api_key=None,
azure_endpoint=None,
api_version=None,
model=None):
super().__init__(image_size=image_size)
self.api_key = api_key or os.getenv("AZURE_DALLE_KEY")
self.azure_endpoint = azure_endpoint or os.getenv("AZURE_DALLE_ENDPOINT")
self.api_version = api_version or "2023-06-01-preview"
self.model = model or os.getenv("AZURE_DALLE_DEPLOYMENT_ID")
self._api_key = api_key or os.getenv("AZURE_DALLE_KEY")
self._azure_endpoint = azure_endpoint or os.getenv("AZURE_DALLE_ENDPOINT")
self._api_version = api_version or "2023-06-01-preview"
self._model = model or os.getenv("AZURE_DALLE_DEPLOYMENT_ID")
self._aiohttp_session = aiohttp_session
async def run_image_gen(self, sentence) -> tuple[str, bytes]:
# TODO hoist the session to app-level
async with aiohttp.ClientSession() as session:
url = f"{self.azure_endpoint}openai/images/generations:submit?api-version={self.api_version}"
headers = {"api-key": self.api_key, "Content-Type": "application/json"}
body = {
# Enter your prompt text here
"prompt": sentence,
"size": self.image_size,
"n": 1,
}
async with session.post(url, headers=headers, json=body) as submission:
operation_location = submission.headers['operation-location']
url = f"{self._azure_endpoint}openai/images/generations:submit?api-version={self._api_version}"
headers = {"api-key": self._api_key, "Content-Type": "application/json"}
body = {
# Enter your prompt text here
"prompt": sentence,
"size": self.image_size,
"n": 1,
}
async with self._aiohttp_session.post(
url, headers=headers, json=body
) as submission:
operation_location = submission.headers['operation-location']
status = ""
attempts_left = 120
json_response = None
while status != "succeeded":
attempts_left -= 1
if attempts_left == 0:
raise Exception("Image generation timed out")
status = ""
attempts_left = 120
json_response = None
while status != "succeeded":
attempts_left -= 1
if attempts_left == 0:
raise Exception("Image generation timed out")
await asyncio.sleep(1)
response = await session.get(operation_location, headers=headers)
json_response = await response.json()
status = json_response["status"]
await asyncio.sleep(1)
response = await self._aiohttp_session.get(
operation_location, headers=headers
)
json_response = await response.json()
status = json_response["status"]
image_url = json_response["result"]["data"][0]["url"] if json_response else None
if not image_url:
raise Exception("Image generation failed")
image_url = json_response["result"]["data"][0]["url"] if json_response else None
if not image_url:
raise Exception("Image generation failed")
# Load the image from the url
async with session.get(image_url) as response:
image_stream = io.BytesIO(await response.content.read())
image = Image.open(image_stream)
return (image_url, image.tobytes())
class AzureImageGenService(ImageGenService):
def __init__(self, api_key=None, azure_endpoint=None, api_version=None, model=None):
super().__init__()
api_key = api_key or os.getenv("AZURE_DALLE_KEY")
azure_endpoint = azure_endpoint or os.getenv("AZURE_DALLE_ENDPOINT")
api_version = api_version or "2023-06-01-preview"
self.model = model or os.getenv("AZURE_DALLE_DEPLOYMENT_ID")
self.client = AzureOpenAI(
api_key=api_key,
azure_endpoint=azure_endpoint,
api_version=api_version,
)
async def run_image_gen(self, sentence) -> tuple[str, bytes]:
self.logger.info("Generating azure image", sentence)
image = self.client.images.generate(
model=self.model,
prompt=sentence,
n=1,
size=self.image_size,
)
url = image["data"][0]["url"]
response = requests.get(url)
dalle_stream = io.BytesIO(response.content)
dalle_im = Image.open(dalle_stream.tobytes())
return (url, dalle_im)
# Load the image from the url
async with self._aiohttp_session.get(image_url) as response:
image_stream = io.BytesIO(await response.content.read())
image = Image.open(image_stream)
return (image_url, image.tobytes())

View File

@@ -15,7 +15,6 @@ from dailyai.queue_frame import (
ImageQueueFrame,
QueueFrame,
StartStreamQueueFrame,
TextQueueFrame,
TranscriptionQueueFrame,
)
@@ -414,3 +413,6 @@ class DailyTransportService(EventHandler):
raise e
b = bytearray()
except Exception as e:
print("!!!!", e)
raise e

View File

@@ -9,11 +9,12 @@ from dailyai.services.ai_services import TTSService
class DeepgramTTSService(TTSService):
def __init__(self, speech_key=None, voice=None):
def __init__(self, aiohttp_session, speech_key=None, voice=None):
super().__init__()
self.voice = voice or os.getenv("DEEPGRAM_VOICE") or "alpha-asteria-en-v2"
self.speech_key = speech_key or os.getenv("DEEPGRAM_API_KEY")
self._voice = voice or os.getenv("DEEPGRAM_VOICE") or "alpha-asteria-en-v2"
self._speech_key = speech_key or os.getenv("DEEPGRAM_API_KEY")
self._aiohttp_session = aiohttp_session
def get_mic_sample_rate(self):
return 24000
@@ -21,10 +22,9 @@ class DeepgramTTSService(TTSService):
async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]:
self.logger.info(f"Running deepgram tts for {sentence}")
base_url = "https://api.beta.deepgram.com/v1/speak"
request_url = f"{base_url}?model={self.voice}&encoding=linear16&container=none&sample_rate=16000"
headers = {"authorization": f"token {self.speech_key}"}
request_url = f"{base_url}?model={self._voice}&encoding=linear16&container=none&sample_rate=16000"
headers = {"authorization": f"token {self._speech_key}"}
body = {"text": sentence}
async with aiohttp.ClientSession() as session:
async with session.post(request_url, headers=headers, json=body) as r:
async for data in r.content:
yield data
async with self._aiohttp_session.post(request_url, headers=headers, json=body) as r:
async for data in r.content:
yield data

View File

@@ -9,28 +9,36 @@ from dailyai.services.ai_services import TTSService
class ElevenLabsTTSService(TTSService):
def __init__(self, api_key=None, voice_id=None):
def __init__(
self,
aiohttp_session: aiohttp.ClientSession,
api_key=None,
voice_id=None,
):
super().__init__()
self.api_key = api_key or os.getenv("ELEVENLABS_API_KEY")
self.voice_id = voice_id or os.getenv("ELEVENLABS_VOICE_ID")
self._api_key = api_key or os.getenv("ELEVENLABS_API_KEY")
self._voice_id = voice_id or os.getenv("ELEVENLABS_VOICE_ID")
self._aiohttp_session = aiohttp_session
async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]:
async with aiohttp.ClientSession() as session:
url = f"https://api.elevenlabs.io/v1/text-to-speech/{self.voice_id}/stream"
payload = {"text": sentence, "model_id": "eleven_turbo_v2"}
querystring = {"output_format": "pcm_16000", "optimize_streaming_latency": 2}
headers = {
"xi-api-key": self.api_key,
"Content-Type": "application/json",
}
async with session.post(url, json=payload, headers=headers, params=querystring) as r:
if r.status != 200:
self.logger.error(
f"audio fetch status code: {r.status}, error: {r.text}"
)
return
url = f"https://api.elevenlabs.io/v1/text-to-speech/{self._voice_id}/stream"
payload = {"text": sentence, "model_id": "eleven_turbo_v2"}
querystring = {"output_format": "pcm_16000", "optimize_streaming_latency": 2}
headers = {
"xi-api-key": self._api_key,
"Content-Type": "application/json",
}
async with self._aiohttp_session.post(
url, json=payload, headers=headers, params=querystring
) as r:
if r.status != 200:
self.logger.error(
f"audio fetch status code: {r.status}, error: {r.text}"
)
return
async for chunk in r.content:
if chunk:
yield chunk
async for chunk in r.content:
if chunk:
yield chunk

View File

@@ -11,23 +11,21 @@ from dailyai.services.ai_services import LLMService, TTSService, ImageGenService
class FalImageGenService(ImageGenService):
def __init__(self, image_size):
def __init__(self, image_size, aiohttp_session:aiohttp.ClientSession):
super().__init__(image_size)
self._aiohttp_session = aiohttp_session
async def run_image_gen(self, sentence) -> tuple[str, bytes]:
def get_image_url(sentence, size):
print("starting fal submit...")
handler = fal.apps.submit(
"110602490-fast-sdxl",
arguments={
"prompt": sentence
},
)
print("past fal handler init, about to wait for iter_events...")
for event in handler.iter_events():
if isinstance(event, fal.apps.InProgress):
print('Request in progress')
print(event.logs)
pass
result = handler.get()
@@ -36,16 +34,11 @@ class FalImageGenService(ImageGenService):
raise Exception("Image generation failed")
return image_url
print(f"fetching image url...")
image_url = await asyncio.to_thread(get_image_url, sentence, self.image_size)
print(f"got image url, downloading image...")
# Load the image from the url
async with aiohttp.ClientSession() as session:
async with session.get(image_url) as response:
print("got image response")
image_stream = io.BytesIO(await response.content.read())
print("read image stream")
image = Image.open(image_stream)
return (image_url, image.tobytes())
async with self._aiohttp_session.get(image_url) as response:
image_stream = io.BytesIO(await response.content.read())
image = Image.open(image_stream)
return (image_url, image.tobytes())
# return (image_url, dalle_im.tobytes())

View File

@@ -51,18 +51,26 @@ class OpenAILLMService(LLMService):
class OpenAIImageGenService(ImageGenService):
def __init__(self, image_size: str, api_key=None, model=None):
def __init__(
self,
image_size: str,
aiohttp_session: aiohttp.ClientSession,
api_key=None,
model=None,
):
super().__init__(image_size=image_size)
api_key = api_key or os.getenv("OPEN_AI_KEY")
self.model = model or os.getenv("OPEN_AI_IMAGE_MODEL") or "dall-e-3"
self.client = AsyncOpenAI(api_key=api_key)
self._model = model or os.getenv("OPEN_AI_IMAGE_MODEL") or "dall-e-3"
self._client = AsyncOpenAI(api_key=api_key)
self._aiohttp_session = aiohttp_session
async def run_image_gen(self, sentence) -> tuple[str, bytes]:
self.logger.info("Generating OpenAI image", sentence)
image = await self.client.images.generate(
image = await self._client.images.generate(
prompt=sentence,
model=self.model,
model=self._model,
n=1,
size=self.image_size
)
@@ -71,10 +79,7 @@ class OpenAIImageGenService(ImageGenService):
raise Exception("No image provided in response", image)
# Load the image from the url
async with aiohttp.ClientSession() as session:
async with session.get(image_url) as response:
image_stream = io.BytesIO(await response.content.read())
image = Image.open(image_stream)
return (image_url, image.tobytes())
return (image_url, dalle_im.tobytes())
async with self._aiohttp_session.get(image_url) as response:
image_stream = io.BytesIO(await response.content.read())
image = Image.open(image_stream)
return (image_url, image.tobytes())

View File

@@ -1,44 +1,47 @@
import argparse
import asyncio
import aiohttp
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
async def main(room_url):
# create a transport service object using environment variables for
# the transport service's API key, room url, and any other configuration.
# services can all define and document the environment variables they use.
# services all also take an optional config object that is used instead of
# environment variables.
#
# the abstract transport service APIs presumably can map pretty closely
# to the daily-python basic API
meeting_duration_minutes = 1
transport = DailyTransportService(
room_url,
None,
"Say One Thing",
meeting_duration_minutes,
)
transport.mic_enabled = True
tts = ElevenLabsTTSService(voice_id="ErXwobaYiN019PkySvjV")
# Register an event handler so we can play the audio when the participant joins.
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, participant):
if participant["info"]["isLocal"]:
return
await tts.say(
"Hello there, " + participant["info"]["userName"] + "!",
transport.send_queue,
async with aiohttp.ClientSession() as session:
# create a transport service object using environment variables for
# the transport service's API key, room url, and any other configuration.
# services can all define and document the environment variables they use.
# services all also take an optional config object that is used instead of
# environment variables.
#
# the abstract transport service APIs presumably can map pretty closely
# to the daily-python basic API
meeting_duration_minutes = 1
transport = DailyTransportService(
room_url,
None,
"Say One Thing",
meeting_duration_minutes,
)
transport.mic_enabled = True
tts = ElevenLabsTTSService(session, voice_id="ErXwobaYiN019PkySvjV")
# wait for the output queue to be empty, then leave the meeting
await transport.stop_when_done()
# Register an event handler so we can play the audio when the participant joins.
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, participant):
if participant["info"]["isLocal"]:
return
await transport.run()
await tts.say(
"Hello there, " + participant["info"]["userName"] + "!",
transport.send_queue,
)
# wait for the output queue to be empty, then leave the meeting
await transport.stop_when_done()
await transport.run()
if __name__ == "__main__":

View File

@@ -2,57 +2,59 @@ import asyncio
import time
from typing import AsyncGenerator
from dailyai.queue_frame import QueueFrame, FrameType
import aiohttp
from dailyai.queue_frame import AudioQueueFrame
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureTTSService
from dailyai.services.deepgram_ai_services import DeepgramTTSService
async def main(room_url):
# create a transport service object using environment variables for
# the transport service's API key, room url, and any other configuration.
# services can all define and document the environment variables they use.
# services all also take an optional config object that is used instead of
# environment variables.
#
# the abstract transport service APIs presumably can map pretty closely
# to the daily-python basic API
meeting_duration_minutes = 1
transport = DailyTransportService(
room_url,
None,
"Greeter",
meeting_duration_minutes,
)
transport.mic_enabled = True
async with aiohttp.ClientSession() as session:
# create a transport service object using environment variables for
# the transport service's API key, room url, and any other configuration.
# services can all define and document the environment variables they use.
# services all also take an optional config object that is used instead of
# environment variables.
#
# the abstract transport service APIs presumably can map pretty closely
# to the daily-python basic API
meeting_duration_minutes = 1
transport = DailyTransportService(
room_url,
None,
"Greeter",
meeting_duration_minutes,
)
transport.mic_enabled = True
# similarly, create a tts service
tts = DeepgramTTSService()
# similarly, create a tts service
tts = DeepgramTTSService(session)
# Get the generator for the audio. This will start running in the background,
# and when we ask the generator for its items, we'll get what it's generated.
# Get the generator for the audio. This will start running in the background,
# and when we ask the generator for its items, we'll get what it's generated.
# Register an event handler so we can play the audio when the participant joins.
print("settting up handler")
# Register an event handler so we can play the audio when the participant joins.
print("settting up handler")
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, participant):
print(f"participant joined: {participant['info']['userName']}")
if participant["info"]["isLocal"]:
return
audio_generator: AsyncGenerator[bytes, None] = tts.run_tts(
f"Hello there, {participant['info']['userName']}!")
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, participant):
print(f"participant joined: {participant['info']['userName']}")
if participant["info"]["isLocal"]:
return
audio_generator: AsyncGenerator[bytes, None] = tts.run_tts(
f"Hello there, {participant['info']['userName']}!")
async for audio in audio_generator:
transport.output_queue.put(QueueFrame(FrameType.AUDIO, audio))
async for audio in audio_generator:
await transport.send_queue.put(AudioQueueFrame(audio))
print("setting up call state handler")
print("setting up call state handler")
@transport.event_handler("on_call_state_updated")
async def on_call_joined(transport, state):
print(f"call state callback: {state}")
@transport.event_handler("on_call_state_updated")
async def on_call_joined(transport, state):
print(f"call state callback: {state}")
await transport.run()
await transport.run()
if __name__ == "__main__":

View File

@@ -1,5 +1,8 @@
import argparse
import asyncio
import logging
import aiohttp
from dailyai.queue_frame import LLMMessagesQueueFrame
from dailyai.services.daily_transport_service import DailyTransportService
@@ -8,35 +11,39 @@ from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
async def main(room_url):
meeting_duration_minutes = 1
transport = DailyTransportService(
room_url,
None,
"Say One Thing From an LLM",
meeting_duration_minutes,
)
transport.mic_enabled = True
async with aiohttp.ClientSession() as session:
logger = logging.getLogger("dailyai")
logger.setLevel(logging.DEBUG)
tts = ElevenLabsTTSService(voice_id="29vD33N1CtxCmqQRPOHJ")
llm = AzureLLMService()
messages = [{
"role": "system",
"content": "You are an LLM in a WebRTC session, and this is a 'hello world' demo. Say hello to the world."
}]
tts_task = asyncio.create_task(
tts.run_to_queue(
transport.send_queue,
llm.run([LLMMessagesQueueFrame(messages)]),
meeting_duration_minutes = 1
transport = DailyTransportService(
room_url,
None,
"Say One Thing From an LLM",
meeting_duration_minutes,
)
)
transport.mic_enabled = True
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
await tts_task
await transport.stop_when_done()
tts = ElevenLabsTTSService(session, voice_id="29vD33N1CtxCmqQRPOHJ")
llm = AzureLLMService()
await transport.run()
messages = [{
"role": "system",
"content": "You are an LLM in a WebRTC session, and this is a 'hello world' demo. Say hello to the world."
}]
tts_task = asyncio.create_task(
tts.run_to_queue(
transport.send_queue,
llm.run([LLMMessagesQueueFrame(messages)]),
)
)
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
await tts_task
await transport.stop_when_done()
await transport.run()
if __name__ == "__main__":

View File

@@ -1,6 +1,8 @@
import argparse
import asyncio
import aiohttp
from dailyai.queue_frame import TextQueueFrame
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.open_ai_services import OpenAIImageGenService
@@ -10,29 +12,30 @@ participant_joined = False
async def main(room_url):
meeting_duration_minutes = 1
transport = DailyTransportService(
room_url,
None,
"Show a still frame image",
meeting_duration_minutes,
)
transport.mic_enabled = False
transport.camera_enabled = True
transport.camera_width = 1024
transport.camera_height = 1024
async with aiohttp.ClientSession() as session:
meeting_duration_minutes = 1
transport = DailyTransportService(
room_url,
None,
"Show a still frame image",
meeting_duration_minutes,
)
transport.mic_enabled = False
transport.camera_enabled = True
transport.camera_width = 1024
transport.camera_height = 1024
imagegen = OpenAIImageGenService(image_size="1024x1024")
image_task = asyncio.create_task(
imagegen.run_to_queue(
transport.send_queue, [
TextQueueFrame("a cat in the style of picasso")]))
imagegen = OpenAIImageGenService(image_size="1024x1024", aiohttp_session=session)
image_task = asyncio.create_task(
imagegen.run_to_queue(
transport.send_queue, [
TextQueueFrame("a cat in the style of picasso")]))
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, participant):
await image_task
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, participant):
await image_task
await transport.run()
await transport.run()
if __name__ == "__main__":

View File

@@ -2,6 +2,8 @@ import argparse
import asyncio
import re
import aiohttp
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.queue_frame import EndStreamQueueFrame, LLMMessagesQueueFrame
@@ -9,58 +11,55 @@ from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
async def main(room_url: str):
global transport
global llm
global tts
transport = DailyTransportService(
room_url,
None,
"Say Two Things Bot",
1,
)
transport.mic_enabled = True
transport.mic_sample_rate = 16000
transport.camera_enabled = False
llm = AzureLLMService()
azure_tts = AzureTTSService()
elevenlabs_tts = ElevenLabsTTSService(voice_id="ErXwobaYiN019PkySvjV")
messages = [{"role": "system", "content": "tell the user a joke about llamas"}]
# Start a task to run the LLM to create a joke, and convert the LLM output to audio frames. This task
# will run in parallel with generating and speaking the audio for static text, so there's no delay to
# speak the LLM response.
buffer_queue = asyncio.Queue()
llm_response_task = asyncio.create_task(
elevenlabs_tts.run_to_queue(
buffer_queue,
llm.run([LLMMessagesQueueFrame(messages)]),
True,
async with aiohttp.ClientSession() as session:
transport = DailyTransportService(
room_url,
None,
"Say Two Things Bot",
1,
)
)
transport.mic_enabled = True
transport.mic_sample_rate = 16000
transport.camera_enabled = False
@transport.event_handler("on_participant_joined")
async def on_joined(transport, participant):
if participant["id"] == transport.my_participant_id:
return
llm = AzureLLMService()
azure_tts = AzureTTSService()
elevenlabs_tts = ElevenLabsTTSService(session, voice_id="ErXwobaYiN019PkySvjV")
await azure_tts.say("My friend the LLM is now going to tell a joke about llamas.", transport.send_queue)
messages = [{"role": "system", "content": "tell the user a joke about llamas"}]
async def buffer_to_send_queue():
while True:
frame = await buffer_queue.get()
await transport.send_queue.put(frame)
buffer_queue.task_done()
if isinstance(frame, EndStreamQueueFrame):
break
# Start a task to run the LLM to create a joke, and convert the LLM output to audio frames. This task
# will run in parallel with generating and speaking the audio for static text, so there's no delay to
# speak the LLM response.
buffer_queue = asyncio.Queue()
llm_response_task = asyncio.create_task(
elevenlabs_tts.run_to_queue(
buffer_queue,
llm.run([LLMMessagesQueueFrame(messages)]),
True,
)
)
await asyncio.gather(llm_response_task, buffer_to_send_queue())
@transport.event_handler("on_participant_joined")
async def on_joined(transport, participant):
if participant["id"] == transport.my_participant_id:
return
await transport.stop_when_done()
await azure_tts.say("My friend the LLM is now going to tell a joke about llamas.", transport.send_queue)
await transport.run()
async def buffer_to_send_queue():
while True:
frame = await buffer_queue.get()
await transport.send_queue.put(frame)
buffer_queue.task_done()
if isinstance(frame, EndStreamQueueFrame):
break
await asyncio.gather(llm_response_task, buffer_to_send_queue())
await transport.stop_when_done()
await transport.run()
if __name__ == "__main__":

View File

@@ -1,6 +1,8 @@
import argparse
import asyncio
import aiohttp
from dailyai.queue_frame import AudioQueueFrame, ImageQueueFrame
from dailyai.services.azure_ai_services import AzureLLMService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
@@ -9,95 +11,97 @@ from dailyai.services.fal_ai_services import FalImageGenService
async def main(room_url):
meeting_duration_minutes = 5
transport = DailyTransportService(
room_url,
None,
"Month Narration Bot",
meeting_duration_minutes,
)
transport.mic_enabled = True
transport.camera_enabled = True
transport.mic_sample_rate = 16000
transport.camera_width = 1024
transport.camera_height = 1024
llm = AzureLLMService()
dalle = FalImageGenService(image_size="1024x1024")
tts = ElevenLabsTTSService(voice_id="ErXwobaYiN019PkySvjV")
# dalle = OpenAIImageGenService(image_size="1024x1024")
# Get a complete audio chunk from the given text. Splitting this into its own
# coroutine lets us ensure proper ordering of the audio chunks on the send queue.
async def get_all_audio(text):
all_audio = bytearray()
async for audio in tts.run_tts(text):
all_audio.extend(audio)
return all_audio
async def get_month_data(month):
messages = [
{
"role": "system",
"content": f"Describe a nature photograph suitable for use in a calendar, for the month of {month}. Include only the image description with no preamble. Limit the description to one sentence, please.",
}
]
image_description = await llm.run_llm(messages)
if not image_description:
return
to_speak = f"{month}: {image_description}"
audio_task = asyncio.create_task(get_all_audio(to_speak))
image_task = asyncio.create_task(dalle.run_image_gen(image_description))
(audio, image_data) = await asyncio.gather(
audio_task, image_task
async with aiohttp.ClientSession() as session:
meeting_duration_minutes = 5
transport = DailyTransportService(
room_url,
None,
"Month Narration Bot",
meeting_duration_minutes,
)
transport.mic_enabled = True
transport.camera_enabled = True
transport.mic_sample_rate = 16000
transport.camera_width = 1024
transport.camera_height = 1024
return {
"month": month,
"text": image_description,
"image_url": image_data[0],
"image": image_data[1],
"audio": audio,
}
llm = AzureLLMService()
dalle = FalImageGenService(aiohttp_session=session, image_size="1024x1024")
tts = ElevenLabsTTSService(aiohttp_session=session, voice_id="ErXwobaYiN019PkySvjV")
# dalle = OpenAIImageGenService(image_size="1024x1024")
months: list[str] = [
"January",
"February",
"March",
"April",
"May",
"June",
"July",
"August",
"September",
"October",
"November",
"December",
]
# Get a complete audio chunk from the given text. Splitting this into its own
# coroutine lets us ensure proper ordering of the audio chunks on the send queue.
async def get_all_audio(text):
all_audio = bytearray()
async for audio in tts.run_tts(text):
all_audio.extend(audio)
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
# This will play the months in the order they're completed. The benefit
# is we'll have as little delay as possible before the first month, and
# likely no delay between months, but the months won't display in order.
for month_data_task in asyncio.as_completed(month_tasks):
data = await month_data_task
await transport.send_queue.put(
[
ImageQueueFrame(data["image_url"], data["image"]),
AudioQueueFrame(data["audio"]),
]
return all_audio
async def get_month_data(month):
messages = [
{
"role": "system",
"content": f"Describe a nature photograph suitable for use in a calendar, for the month of {month}. Include only the image description with no preamble. Limit the description to one sentence, please.",
}
]
image_description = await llm.run_llm(messages)
if not image_description:
return
to_speak = f"{month}: {image_description}"
audio_task = asyncio.create_task(get_all_audio(to_speak))
image_task = asyncio.create_task(dalle.run_image_gen(image_description))
(audio, image_data) = await asyncio.gather(
audio_task, image_task
)
# wait for the output queue to be empty, then leave the meeting
await transport.stop_when_done()
return {
"month": month,
"text": image_description,
"image_url": image_data[0],
"image": image_data[1],
"audio": audio,
}
month_tasks = [asyncio.create_task(get_month_data(month)) for month in months]
months: list[str] = [
"January",
"February",
"March",
"April",
"May",
"June",
"July",
"August",
"September",
"October",
"November",
"December",
]
await transport.run()
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
# This will play the months in the order they're completed. The benefit
# is we'll have as little delay as possible before the first month, and
# likely no delay between months, but the months won't display in order.
for month_data_task in asyncio.as_completed(month_tasks):
data = await month_data_task
if data:
await transport.send_queue.put(
[
ImageQueueFrame(data["image_url"], data["image"]),
AudioQueueFrame(data["audio"]),
]
)
# wait for the output queue to be empty, then leave the meeting
await transport.stop_when_done()
month_tasks = [asyncio.create_task(get_month_data(month)) for month in months]
await transport.run()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")

View File

@@ -1,5 +1,6 @@
import argparse
import asyncio
import aiohttp
import requests
import time
import urllib.parse
@@ -12,56 +13,53 @@ from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
async def main(room_url: str, token):
global transport
global llm
global tts
async with aiohttp.ClientSession() as session:
transport = DailyTransportService(
room_url,
token,
"Respond bot",
5,
)
transport.mic_enabled = True
transport.mic_sample_rate = 16000
transport.camera_enabled = False
transport.start_transcription = True
transport = DailyTransportService(
room_url,
token,
"Respond bot",
5,
)
transport.mic_enabled = True
transport.mic_sample_rate = 16000
transport.camera_enabled = False
transport.start_transcription = True
llm = AzureLLMService()
tts = ElevenLabsTTSService(session, voice_id="ErXwobaYiN019PkySvjV")
llm = AzureLLMService()
tts = ElevenLabsTTSService(voice_id="ErXwobaYiN019PkySvjV")
async def run_response(user_speech, tma_in, tma_out):
await tts.run_to_queue(
transport.send_queue,
tma_out.run(
llm.run(
tma_in.run(
[StartStreamQueueFrame(), TextQueueFrame(user_speech)]
async def run_response(user_speech, tma_in, tma_out):
await tts.run_to_queue(
transport.send_queue,
tma_out.run(
llm.run(
tma_in.run(
[StartStreamQueueFrame(), TextQueueFrame(user_speech)]
)
)
)
),
)
),
)
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
await tts.say("Hi, I'm listening!", transport.send_queue)
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
await tts.say("Hi, I'm listening!", transport.send_queue)
async def run_conversation():
messages = [
{"role": "system", "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way."},
]
async def run_conversation():
messages = [
{"role": "system", "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way."},
]
conversation_wrapper = InterruptibleConversationWrapper(
frame_generator=transport.get_receive_frames,
runner=run_response,
interrupt=transport.interrupt,
my_participant_id=transport.my_participant_id,
llm_messages=messages,
)
await conversation_wrapper.run_conversation()
conversation_wrapper = InterruptibleConversationWrapper(
frame_generator=transport.get_receive_frames,
runner=run_response,
interrupt=transport.interrupt,
my_participant_id=transport.my_participant_id,
llm_messages=messages,
)
await conversation_wrapper.run_conversation()
transport.transcription_settings["extra"]["punctuate"] = False
await asyncio.gather(transport.run(), run_conversation())
transport.transcription_settings["extra"]["punctuate"] = False
await asyncio.gather(transport.run(), run_conversation())
if __name__ == "__main__":