From e81f247845f2ac72a7caba7555c8602f8a1d4c63 Mon Sep 17 00:00:00 2001 From: Moishe Lettvin Date: Fri, 26 Jan 2024 12:30:37 -0500 Subject: [PATCH] Don't create aiohttp sessions inside services --- src/dailyai/services/azure_ai_services.py | 109 ++++-------- .../services/daily_transport_service.py | 4 +- src/dailyai/services/deepgram_ai_services.py | 18 +- src/dailyai/services/elevenlabs_ai_service.py | 48 ++--- src/dailyai/services/fal_ai_services.py | 21 +-- src/dailyai/services/open_ai_services.py | 29 +-- src/samples/foundational/01-say-one-thing.py | 63 +++---- src/samples/foundational/01a-greet-user.py | 78 ++++---- .../foundational/02-llm-say-one-thing.py | 57 +++--- src/samples/foundational/03-still-frame.py | 43 ++--- .../foundational/04-utterance-and-speech.py | 89 +++++----- .../foundational/05-sync-speech-and-text.py | 166 +++++++++--------- src/samples/foundational/07-interruptible.py | 84 +++++---- 13 files changed, 401 insertions(+), 408 deletions(-) diff --git a/src/dailyai/services/azure_ai_services.py b/src/dailyai/services/azure_ai_services.py index 6710534b9..c35584f54 100644 --- a/src/dailyai/services/azure_ai_services.py +++ b/src/dailyai/services/azure_ai_services.py @@ -101,84 +101,53 @@ class AzureImageGenServiceREST(ImageGenService): def __init__( self, image_size: str, + aiohttp_session:aiohttp.ClientSession, api_key=None, azure_endpoint=None, api_version=None, model=None): super().__init__(image_size=image_size) - self.api_key = api_key or os.getenv("AZURE_DALLE_KEY") - self.azure_endpoint = azure_endpoint or os.getenv("AZURE_DALLE_ENDPOINT") - self.api_version = api_version or "2023-06-01-preview" - self.model = model or os.getenv("AZURE_DALLE_DEPLOYMENT_ID") + self._api_key = api_key or os.getenv("AZURE_DALLE_KEY") + self._azure_endpoint = azure_endpoint or os.getenv("AZURE_DALLE_ENDPOINT") + self._api_version = api_version or "2023-06-01-preview" + self._model = model or os.getenv("AZURE_DALLE_DEPLOYMENT_ID") + self._aiohttp_session = aiohttp_session async def run_image_gen(self, sentence) -> tuple[str, bytes]: - # TODO hoist the session to app-level - async with aiohttp.ClientSession() as session: - url = f"{self.azure_endpoint}openai/images/generations:submit?api-version={self.api_version}" - headers = {"api-key": self.api_key, "Content-Type": "application/json"} - body = { - # Enter your prompt text here - "prompt": sentence, - "size": self.image_size, - "n": 1, - } - async with session.post(url, headers=headers, json=body) as submission: - operation_location = submission.headers['operation-location'] + url = f"{self._azure_endpoint}openai/images/generations:submit?api-version={self._api_version}" + headers = {"api-key": self._api_key, "Content-Type": "application/json"} + body = { + # Enter your prompt text here + "prompt": sentence, + "size": self.image_size, + "n": 1, + } + async with self._aiohttp_session.post( + url, headers=headers, json=body + ) as submission: + operation_location = submission.headers['operation-location'] - status = "" - attempts_left = 120 - json_response = None - while status != "succeeded": - attempts_left -= 1 - if attempts_left == 0: - raise Exception("Image generation timed out") + status = "" + attempts_left = 120 + json_response = None + while status != "succeeded": + attempts_left -= 1 + if attempts_left == 0: + raise Exception("Image generation timed out") - await asyncio.sleep(1) - response = await session.get(operation_location, headers=headers) - json_response = await response.json() - status = json_response["status"] + await asyncio.sleep(1) + response = await self._aiohttp_session.get( + operation_location, headers=headers + ) + json_response = await response.json() + status = json_response["status"] - image_url = json_response["result"]["data"][0]["url"] if json_response else None - if not image_url: - raise Exception("Image generation failed") + image_url = json_response["result"]["data"][0]["url"] if json_response else None + if not image_url: + raise Exception("Image generation failed") - # Load the image from the url - async with session.get(image_url) as response: - image_stream = io.BytesIO(await response.content.read()) - image = Image.open(image_stream) - return (image_url, image.tobytes()) - - -class AzureImageGenService(ImageGenService): - - def __init__(self, api_key=None, azure_endpoint=None, api_version=None, model=None): - super().__init__() - - api_key = api_key or os.getenv("AZURE_DALLE_KEY") - azure_endpoint = azure_endpoint or os.getenv("AZURE_DALLE_ENDPOINT") - api_version = api_version or "2023-06-01-preview" - self.model = model or os.getenv("AZURE_DALLE_DEPLOYMENT_ID") - - self.client = AzureOpenAI( - api_key=api_key, - azure_endpoint=azure_endpoint, - api_version=api_version, - ) - - async def run_image_gen(self, sentence) -> tuple[str, bytes]: - self.logger.info("Generating azure image", sentence) - - image = self.client.images.generate( - model=self.model, - prompt=sentence, - n=1, - size=self.image_size, - ) - - url = image["data"][0]["url"] - response = requests.get(url) - - dalle_stream = io.BytesIO(response.content) - dalle_im = Image.open(dalle_stream.tobytes()) - - return (url, dalle_im) + # Load the image from the url + async with self._aiohttp_session.get(image_url) as response: + image_stream = io.BytesIO(await response.content.read()) + image = Image.open(image_stream) + return (image_url, image.tobytes()) diff --git a/src/dailyai/services/daily_transport_service.py b/src/dailyai/services/daily_transport_service.py index 2e2af9f87..ac4daae7a 100644 --- a/src/dailyai/services/daily_transport_service.py +++ b/src/dailyai/services/daily_transport_service.py @@ -15,7 +15,6 @@ from dailyai.queue_frame import ( ImageQueueFrame, QueueFrame, StartStreamQueueFrame, - TextQueueFrame, TranscriptionQueueFrame, ) @@ -414,3 +413,6 @@ class DailyTransportService(EventHandler): raise e b = bytearray() + except Exception as e: + print("!!!!", e) + raise e diff --git a/src/dailyai/services/deepgram_ai_services.py b/src/dailyai/services/deepgram_ai_services.py index 24fb2a604..781e1dd88 100644 --- a/src/dailyai/services/deepgram_ai_services.py +++ b/src/dailyai/services/deepgram_ai_services.py @@ -9,11 +9,12 @@ from dailyai.services.ai_services import TTSService class DeepgramTTSService(TTSService): - def __init__(self, speech_key=None, voice=None): + def __init__(self, aiohttp_session, speech_key=None, voice=None): super().__init__() - self.voice = voice or os.getenv("DEEPGRAM_VOICE") or "alpha-asteria-en-v2" - self.speech_key = speech_key or os.getenv("DEEPGRAM_API_KEY") + self._voice = voice or os.getenv("DEEPGRAM_VOICE") or "alpha-asteria-en-v2" + self._speech_key = speech_key or os.getenv("DEEPGRAM_API_KEY") + self._aiohttp_session = aiohttp_session def get_mic_sample_rate(self): return 24000 @@ -21,10 +22,9 @@ class DeepgramTTSService(TTSService): async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]: self.logger.info(f"Running deepgram tts for {sentence}") base_url = "https://api.beta.deepgram.com/v1/speak" - request_url = f"{base_url}?model={self.voice}&encoding=linear16&container=none&sample_rate=16000" - headers = {"authorization": f"token {self.speech_key}"} + request_url = f"{base_url}?model={self._voice}&encoding=linear16&container=none&sample_rate=16000" + headers = {"authorization": f"token {self._speech_key}"} body = {"text": sentence} - async with aiohttp.ClientSession() as session: - async with session.post(request_url, headers=headers, json=body) as r: - async for data in r.content: - yield data + async with self._aiohttp_session.post(request_url, headers=headers, json=body) as r: + async for data in r.content: + yield data diff --git a/src/dailyai/services/elevenlabs_ai_service.py b/src/dailyai/services/elevenlabs_ai_service.py index 5d6514dec..e5951d933 100644 --- a/src/dailyai/services/elevenlabs_ai_service.py +++ b/src/dailyai/services/elevenlabs_ai_service.py @@ -9,28 +9,36 @@ from dailyai.services.ai_services import TTSService class ElevenLabsTTSService(TTSService): - def __init__(self, api_key=None, voice_id=None): + + def __init__( + self, + aiohttp_session: aiohttp.ClientSession, + api_key=None, + voice_id=None, + ): super().__init__() - self.api_key = api_key or os.getenv("ELEVENLABS_API_KEY") - self.voice_id = voice_id or os.getenv("ELEVENLABS_VOICE_ID") + self._api_key = api_key or os.getenv("ELEVENLABS_API_KEY") + self._voice_id = voice_id or os.getenv("ELEVENLABS_VOICE_ID") + self._aiohttp_session = aiohttp_session async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]: - async with aiohttp.ClientSession() as session: - url = f"https://api.elevenlabs.io/v1/text-to-speech/{self.voice_id}/stream" - payload = {"text": sentence, "model_id": "eleven_turbo_v2"} - querystring = {"output_format": "pcm_16000", "optimize_streaming_latency": 2} - headers = { - "xi-api-key": self.api_key, - "Content-Type": "application/json", - } - async with session.post(url, json=payload, headers=headers, params=querystring) as r: - if r.status != 200: - self.logger.error( - f"audio fetch status code: {r.status}, error: {r.text}" - ) - return + url = f"https://api.elevenlabs.io/v1/text-to-speech/{self._voice_id}/stream" + payload = {"text": sentence, "model_id": "eleven_turbo_v2"} + querystring = {"output_format": "pcm_16000", "optimize_streaming_latency": 2} + headers = { + "xi-api-key": self._api_key, + "Content-Type": "application/json", + } + async with self._aiohttp_session.post( + url, json=payload, headers=headers, params=querystring + ) as r: + if r.status != 200: + self.logger.error( + f"audio fetch status code: {r.status}, error: {r.text}" + ) + return - async for chunk in r.content: - if chunk: - yield chunk + async for chunk in r.content: + if chunk: + yield chunk diff --git a/src/dailyai/services/fal_ai_services.py b/src/dailyai/services/fal_ai_services.py index a88dc2241..b4b185e4e 100644 --- a/src/dailyai/services/fal_ai_services.py +++ b/src/dailyai/services/fal_ai_services.py @@ -11,23 +11,21 @@ from dailyai.services.ai_services import LLMService, TTSService, ImageGenService class FalImageGenService(ImageGenService): - def __init__(self, image_size): + def __init__(self, image_size, aiohttp_session:aiohttp.ClientSession): super().__init__(image_size) + self._aiohttp_session = aiohttp_session async def run_image_gen(self, sentence) -> tuple[str, bytes]: def get_image_url(sentence, size): - print("starting fal submit...") handler = fal.apps.submit( "110602490-fast-sdxl", arguments={ "prompt": sentence }, ) - print("past fal handler init, about to wait for iter_events...") for event in handler.iter_events(): if isinstance(event, fal.apps.InProgress): - print('Request in progress') - print(event.logs) + pass result = handler.get() @@ -36,16 +34,11 @@ class FalImageGenService(ImageGenService): raise Exception("Image generation failed") return image_url - print(f"fetching image url...") image_url = await asyncio.to_thread(get_image_url, sentence, self.image_size) - print(f"got image url, downloading image...") # Load the image from the url - async with aiohttp.ClientSession() as session: - async with session.get(image_url) as response: - print("got image response") - image_stream = io.BytesIO(await response.content.read()) - print("read image stream") - image = Image.open(image_stream) - return (image_url, image.tobytes()) + async with self._aiohttp_session.get(image_url) as response: + image_stream = io.BytesIO(await response.content.read()) + image = Image.open(image_stream) + return (image_url, image.tobytes()) # return (image_url, dalle_im.tobytes()) diff --git a/src/dailyai/services/open_ai_services.py b/src/dailyai/services/open_ai_services.py index f10e62e12..072f64022 100644 --- a/src/dailyai/services/open_ai_services.py +++ b/src/dailyai/services/open_ai_services.py @@ -51,18 +51,26 @@ class OpenAILLMService(LLMService): class OpenAIImageGenService(ImageGenService): - def __init__(self, image_size: str, api_key=None, model=None): + + def __init__( + self, + image_size: str, + aiohttp_session: aiohttp.ClientSession, + api_key=None, + model=None, + ): super().__init__(image_size=image_size) api_key = api_key or os.getenv("OPEN_AI_KEY") - self.model = model or os.getenv("OPEN_AI_IMAGE_MODEL") or "dall-e-3" - self.client = AsyncOpenAI(api_key=api_key) + self._model = model or os.getenv("OPEN_AI_IMAGE_MODEL") or "dall-e-3" + self._client = AsyncOpenAI(api_key=api_key) + self._aiohttp_session = aiohttp_session async def run_image_gen(self, sentence) -> tuple[str, bytes]: self.logger.info("Generating OpenAI image", sentence) - image = await self.client.images.generate( + image = await self._client.images.generate( prompt=sentence, - model=self.model, + model=self._model, n=1, size=self.image_size ) @@ -71,10 +79,7 @@ class OpenAIImageGenService(ImageGenService): raise Exception("No image provided in response", image) # Load the image from the url - async with aiohttp.ClientSession() as session: - async with session.get(image_url) as response: - image_stream = io.BytesIO(await response.content.read()) - image = Image.open(image_stream) - return (image_url, image.tobytes()) - - return (image_url, dalle_im.tobytes()) + async with self._aiohttp_session.get(image_url) as response: + image_stream = io.BytesIO(await response.content.read()) + image = Image.open(image_stream) + return (image_url, image.tobytes()) diff --git a/src/samples/foundational/01-say-one-thing.py b/src/samples/foundational/01-say-one-thing.py index d55d605ff..780a2ce45 100644 --- a/src/samples/foundational/01-say-one-thing.py +++ b/src/samples/foundational/01-say-one-thing.py @@ -1,44 +1,47 @@ import argparse import asyncio +import aiohttp + from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService async def main(room_url): - # create a transport service object using environment variables for - # the transport service's API key, room url, and any other configuration. - # services can all define and document the environment variables they use. - # services all also take an optional config object that is used instead of - # environment variables. - # - # the abstract transport service APIs presumably can map pretty closely - # to the daily-python basic API - meeting_duration_minutes = 1 - transport = DailyTransportService( - room_url, - None, - "Say One Thing", - meeting_duration_minutes, - ) - transport.mic_enabled = True - tts = ElevenLabsTTSService(voice_id="ErXwobaYiN019PkySvjV") - - # Register an event handler so we can play the audio when the participant joins. - @transport.event_handler("on_participant_joined") - async def on_participant_joined(transport, participant): - if participant["info"]["isLocal"]: - return - - await tts.say( - "Hello there, " + participant["info"]["userName"] + "!", - transport.send_queue, + async with aiohttp.ClientSession() as session: + # create a transport service object using environment variables for + # the transport service's API key, room url, and any other configuration. + # services can all define and document the environment variables they use. + # services all also take an optional config object that is used instead of + # environment variables. + # + # the abstract transport service APIs presumably can map pretty closely + # to the daily-python basic API + meeting_duration_minutes = 1 + transport = DailyTransportService( + room_url, + None, + "Say One Thing", + meeting_duration_minutes, ) + transport.mic_enabled = True + tts = ElevenLabsTTSService(session, voice_id="ErXwobaYiN019PkySvjV") - # wait for the output queue to be empty, then leave the meeting - await transport.stop_when_done() + # Register an event handler so we can play the audio when the participant joins. + @transport.event_handler("on_participant_joined") + async def on_participant_joined(transport, participant): + if participant["info"]["isLocal"]: + return - await transport.run() + await tts.say( + "Hello there, " + participant["info"]["userName"] + "!", + transport.send_queue, + ) + + # wait for the output queue to be empty, then leave the meeting + await transport.stop_when_done() + + await transport.run() if __name__ == "__main__": diff --git a/src/samples/foundational/01a-greet-user.py b/src/samples/foundational/01a-greet-user.py index 9bf3434c1..7aae1621c 100644 --- a/src/samples/foundational/01a-greet-user.py +++ b/src/samples/foundational/01a-greet-user.py @@ -2,57 +2,59 @@ import asyncio import time from typing import AsyncGenerator -from dailyai.queue_frame import QueueFrame, FrameType +import aiohttp + +from dailyai.queue_frame import AudioQueueFrame from dailyai.services.daily_transport_service import DailyTransportService -from dailyai.services.azure_ai_services import AzureTTSService from dailyai.services.deepgram_ai_services import DeepgramTTSService async def main(room_url): - # create a transport service object using environment variables for - # the transport service's API key, room url, and any other configuration. - # services can all define and document the environment variables they use. - # services all also take an optional config object that is used instead of - # environment variables. - # - # the abstract transport service APIs presumably can map pretty closely - # to the daily-python basic API - meeting_duration_minutes = 1 - transport = DailyTransportService( - room_url, - None, - "Greeter", - meeting_duration_minutes, - ) - transport.mic_enabled = True + async with aiohttp.ClientSession() as session: + # create a transport service object using environment variables for + # the transport service's API key, room url, and any other configuration. + # services can all define and document the environment variables they use. + # services all also take an optional config object that is used instead of + # environment variables. + # + # the abstract transport service APIs presumably can map pretty closely + # to the daily-python basic API + meeting_duration_minutes = 1 + transport = DailyTransportService( + room_url, + None, + "Greeter", + meeting_duration_minutes, + ) + transport.mic_enabled = True - # similarly, create a tts service - tts = DeepgramTTSService() + # similarly, create a tts service + tts = DeepgramTTSService(session) - # Get the generator for the audio. This will start running in the background, - # and when we ask the generator for its items, we'll get what it's generated. + # Get the generator for the audio. This will start running in the background, + # and when we ask the generator for its items, we'll get what it's generated. - # Register an event handler so we can play the audio when the participant joins. - print("settting up handler") + # Register an event handler so we can play the audio when the participant joins. + print("settting up handler") - @transport.event_handler("on_participant_joined") - async def on_participant_joined(transport, participant): - print(f"participant joined: {participant['info']['userName']}") - if participant["info"]["isLocal"]: - return - audio_generator: AsyncGenerator[bytes, None] = tts.run_tts( - f"Hello there, {participant['info']['userName']}!") + @transport.event_handler("on_participant_joined") + async def on_participant_joined(transport, participant): + print(f"participant joined: {participant['info']['userName']}") + if participant["info"]["isLocal"]: + return + audio_generator: AsyncGenerator[bytes, None] = tts.run_tts( + f"Hello there, {participant['info']['userName']}!") - async for audio in audio_generator: - transport.output_queue.put(QueueFrame(FrameType.AUDIO, audio)) + async for audio in audio_generator: + await transport.send_queue.put(AudioQueueFrame(audio)) - print("setting up call state handler") + print("setting up call state handler") - @transport.event_handler("on_call_state_updated") - async def on_call_joined(transport, state): - print(f"call state callback: {state}") + @transport.event_handler("on_call_state_updated") + async def on_call_joined(transport, state): + print(f"call state callback: {state}") - await transport.run() + await transport.run() if __name__ == "__main__": diff --git a/src/samples/foundational/02-llm-say-one-thing.py b/src/samples/foundational/02-llm-say-one-thing.py index 0da365d42..e1f5656f5 100644 --- a/src/samples/foundational/02-llm-say-one-thing.py +++ b/src/samples/foundational/02-llm-say-one-thing.py @@ -1,5 +1,8 @@ import argparse import asyncio +import logging + +import aiohttp from dailyai.queue_frame import LLMMessagesQueueFrame from dailyai.services.daily_transport_service import DailyTransportService @@ -8,35 +11,39 @@ from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService async def main(room_url): - meeting_duration_minutes = 1 - transport = DailyTransportService( - room_url, - None, - "Say One Thing From an LLM", - meeting_duration_minutes, - ) - transport.mic_enabled = True + async with aiohttp.ClientSession() as session: + logger = logging.getLogger("dailyai") + logger.setLevel(logging.DEBUG) - tts = ElevenLabsTTSService(voice_id="29vD33N1CtxCmqQRPOHJ") - llm = AzureLLMService() - - messages = [{ - "role": "system", - "content": "You are an LLM in a WebRTC session, and this is a 'hello world' demo. Say hello to the world." - }] - tts_task = asyncio.create_task( - tts.run_to_queue( - transport.send_queue, - llm.run([LLMMessagesQueueFrame(messages)]), + meeting_duration_minutes = 1 + transport = DailyTransportService( + room_url, + None, + "Say One Thing From an LLM", + meeting_duration_minutes, ) - ) + transport.mic_enabled = True - @transport.event_handler("on_first_other_participant_joined") - async def on_first_other_participant_joined(transport): - await tts_task - await transport.stop_when_done() + tts = ElevenLabsTTSService(session, voice_id="29vD33N1CtxCmqQRPOHJ") + llm = AzureLLMService() - await transport.run() + messages = [{ + "role": "system", + "content": "You are an LLM in a WebRTC session, and this is a 'hello world' demo. Say hello to the world." + }] + tts_task = asyncio.create_task( + tts.run_to_queue( + transport.send_queue, + llm.run([LLMMessagesQueueFrame(messages)]), + ) + ) + + @transport.event_handler("on_first_other_participant_joined") + async def on_first_other_participant_joined(transport): + await tts_task + await transport.stop_when_done() + + await transport.run() if __name__ == "__main__": diff --git a/src/samples/foundational/03-still-frame.py b/src/samples/foundational/03-still-frame.py index 8ea025b3b..5ff8dc186 100644 --- a/src/samples/foundational/03-still-frame.py +++ b/src/samples/foundational/03-still-frame.py @@ -1,6 +1,8 @@ import argparse import asyncio +import aiohttp + from dailyai.queue_frame import TextQueueFrame from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.open_ai_services import OpenAIImageGenService @@ -10,29 +12,30 @@ participant_joined = False async def main(room_url): - meeting_duration_minutes = 1 - transport = DailyTransportService( - room_url, - None, - "Show a still frame image", - meeting_duration_minutes, - ) - transport.mic_enabled = False - transport.camera_enabled = True - transport.camera_width = 1024 - transport.camera_height = 1024 + async with aiohttp.ClientSession() as session: + meeting_duration_minutes = 1 + transport = DailyTransportService( + room_url, + None, + "Show a still frame image", + meeting_duration_minutes, + ) + transport.mic_enabled = False + transport.camera_enabled = True + transport.camera_width = 1024 + transport.camera_height = 1024 - imagegen = OpenAIImageGenService(image_size="1024x1024") - image_task = asyncio.create_task( - imagegen.run_to_queue( - transport.send_queue, [ - TextQueueFrame("a cat in the style of picasso")])) + imagegen = OpenAIImageGenService(image_size="1024x1024", aiohttp_session=session) + image_task = asyncio.create_task( + imagegen.run_to_queue( + transport.send_queue, [ + TextQueueFrame("a cat in the style of picasso")])) - @transport.event_handler("on_participant_joined") - async def on_participant_joined(transport, participant): - await image_task + @transport.event_handler("on_participant_joined") + async def on_participant_joined(transport, participant): + await image_task - await transport.run() + await transport.run() if __name__ == "__main__": diff --git a/src/samples/foundational/04-utterance-and-speech.py b/src/samples/foundational/04-utterance-and-speech.py index 8da2147bc..cde9629b9 100644 --- a/src/samples/foundational/04-utterance-and-speech.py +++ b/src/samples/foundational/04-utterance-and-speech.py @@ -2,6 +2,8 @@ import argparse import asyncio import re +import aiohttp + from dailyai.services.daily_transport_service import DailyTransportService from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService from dailyai.queue_frame import EndStreamQueueFrame, LLMMessagesQueueFrame @@ -9,58 +11,55 @@ from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService async def main(room_url: str): - global transport - global llm - global tts - - transport = DailyTransportService( - room_url, - None, - "Say Two Things Bot", - 1, - ) - transport.mic_enabled = True - transport.mic_sample_rate = 16000 - transport.camera_enabled = False - - llm = AzureLLMService() - azure_tts = AzureTTSService() - elevenlabs_tts = ElevenLabsTTSService(voice_id="ErXwobaYiN019PkySvjV") - - messages = [{"role": "system", "content": "tell the user a joke about llamas"}] - - # Start a task to run the LLM to create a joke, and convert the LLM output to audio frames. This task - # will run in parallel with generating and speaking the audio for static text, so there's no delay to - # speak the LLM response. - buffer_queue = asyncio.Queue() - llm_response_task = asyncio.create_task( - elevenlabs_tts.run_to_queue( - buffer_queue, - llm.run([LLMMessagesQueueFrame(messages)]), - True, + async with aiohttp.ClientSession() as session: + transport = DailyTransportService( + room_url, + None, + "Say Two Things Bot", + 1, ) - ) + transport.mic_enabled = True + transport.mic_sample_rate = 16000 + transport.camera_enabled = False - @transport.event_handler("on_participant_joined") - async def on_joined(transport, participant): - if participant["id"] == transport.my_participant_id: - return + llm = AzureLLMService() + azure_tts = AzureTTSService() + elevenlabs_tts = ElevenLabsTTSService(session, voice_id="ErXwobaYiN019PkySvjV") - await azure_tts.say("My friend the LLM is now going to tell a joke about llamas.", transport.send_queue) + messages = [{"role": "system", "content": "tell the user a joke about llamas"}] - async def buffer_to_send_queue(): - while True: - frame = await buffer_queue.get() - await transport.send_queue.put(frame) - buffer_queue.task_done() - if isinstance(frame, EndStreamQueueFrame): - break + # Start a task to run the LLM to create a joke, and convert the LLM output to audio frames. This task + # will run in parallel with generating and speaking the audio for static text, so there's no delay to + # speak the LLM response. + buffer_queue = asyncio.Queue() + llm_response_task = asyncio.create_task( + elevenlabs_tts.run_to_queue( + buffer_queue, + llm.run([LLMMessagesQueueFrame(messages)]), + True, + ) + ) - await asyncio.gather(llm_response_task, buffer_to_send_queue()) + @transport.event_handler("on_participant_joined") + async def on_joined(transport, participant): + if participant["id"] == transport.my_participant_id: + return - await transport.stop_when_done() + await azure_tts.say("My friend the LLM is now going to tell a joke about llamas.", transport.send_queue) - await transport.run() + async def buffer_to_send_queue(): + while True: + frame = await buffer_queue.get() + await transport.send_queue.put(frame) + buffer_queue.task_done() + if isinstance(frame, EndStreamQueueFrame): + break + + await asyncio.gather(llm_response_task, buffer_to_send_queue()) + + await transport.stop_when_done() + + await transport.run() if __name__ == "__main__": diff --git a/src/samples/foundational/05-sync-speech-and-text.py b/src/samples/foundational/05-sync-speech-and-text.py index c08161566..3bc9be606 100644 --- a/src/samples/foundational/05-sync-speech-and-text.py +++ b/src/samples/foundational/05-sync-speech-and-text.py @@ -1,6 +1,8 @@ import argparse import asyncio +import aiohttp + from dailyai.queue_frame import AudioQueueFrame, ImageQueueFrame from dailyai.services.azure_ai_services import AzureLLMService from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService @@ -9,95 +11,97 @@ from dailyai.services.fal_ai_services import FalImageGenService async def main(room_url): - meeting_duration_minutes = 5 - transport = DailyTransportService( - room_url, - None, - "Month Narration Bot", - meeting_duration_minutes, - ) - transport.mic_enabled = True - transport.camera_enabled = True - transport.mic_sample_rate = 16000 - transport.camera_width = 1024 - transport.camera_height = 1024 - - llm = AzureLLMService() - dalle = FalImageGenService(image_size="1024x1024") - tts = ElevenLabsTTSService(voice_id="ErXwobaYiN019PkySvjV") - # dalle = OpenAIImageGenService(image_size="1024x1024") - - # Get a complete audio chunk from the given text. Splitting this into its own - # coroutine lets us ensure proper ordering of the audio chunks on the send queue. - async def get_all_audio(text): - all_audio = bytearray() - async for audio in tts.run_tts(text): - all_audio.extend(audio) - - return all_audio - - async def get_month_data(month): - messages = [ - { - "role": "system", - "content": f"Describe a nature photograph suitable for use in a calendar, for the month of {month}. Include only the image description with no preamble. Limit the description to one sentence, please.", - } - ] - - image_description = await llm.run_llm(messages) - if not image_description: - return - - to_speak = f"{month}: {image_description}" - audio_task = asyncio.create_task(get_all_audio(to_speak)) - image_task = asyncio.create_task(dalle.run_image_gen(image_description)) - (audio, image_data) = await asyncio.gather( - audio_task, image_task + async with aiohttp.ClientSession() as session: + meeting_duration_minutes = 5 + transport = DailyTransportService( + room_url, + None, + "Month Narration Bot", + meeting_duration_minutes, ) + transport.mic_enabled = True + transport.camera_enabled = True + transport.mic_sample_rate = 16000 + transport.camera_width = 1024 + transport.camera_height = 1024 - return { - "month": month, - "text": image_description, - "image_url": image_data[0], - "image": image_data[1], - "audio": audio, - } + llm = AzureLLMService() + dalle = FalImageGenService(aiohttp_session=session, image_size="1024x1024") + tts = ElevenLabsTTSService(aiohttp_session=session, voice_id="ErXwobaYiN019PkySvjV") + # dalle = OpenAIImageGenService(image_size="1024x1024") - months: list[str] = [ - "January", - "February", - "March", - "April", - "May", - "June", - "July", - "August", - "September", - "October", - "November", - "December", - ] + # Get a complete audio chunk from the given text. Splitting this into its own + # coroutine lets us ensure proper ordering of the audio chunks on the send queue. + async def get_all_audio(text): + all_audio = bytearray() + async for audio in tts.run_tts(text): + all_audio.extend(audio) - @transport.event_handler("on_first_other_participant_joined") - async def on_first_other_participant_joined(transport): - # This will play the months in the order they're completed. The benefit - # is we'll have as little delay as possible before the first month, and - # likely no delay between months, but the months won't display in order. - for month_data_task in asyncio.as_completed(month_tasks): - data = await month_data_task - await transport.send_queue.put( - [ - ImageQueueFrame(data["image_url"], data["image"]), - AudioQueueFrame(data["audio"]), - ] + return all_audio + + async def get_month_data(month): + messages = [ + { + "role": "system", + "content": f"Describe a nature photograph suitable for use in a calendar, for the month of {month}. Include only the image description with no preamble. Limit the description to one sentence, please.", + } + ] + + image_description = await llm.run_llm(messages) + if not image_description: + return + + to_speak = f"{month}: {image_description}" + audio_task = asyncio.create_task(get_all_audio(to_speak)) + image_task = asyncio.create_task(dalle.run_image_gen(image_description)) + (audio, image_data) = await asyncio.gather( + audio_task, image_task ) - # wait for the output queue to be empty, then leave the meeting - await transport.stop_when_done() + return { + "month": month, + "text": image_description, + "image_url": image_data[0], + "image": image_data[1], + "audio": audio, + } - month_tasks = [asyncio.create_task(get_month_data(month)) for month in months] + months: list[str] = [ + "January", + "February", + "March", + "April", + "May", + "June", + "July", + "August", + "September", + "October", + "November", + "December", + ] - await transport.run() + @transport.event_handler("on_first_other_participant_joined") + async def on_first_other_participant_joined(transport): + # This will play the months in the order they're completed. The benefit + # is we'll have as little delay as possible before the first month, and + # likely no delay between months, but the months won't display in order. + for month_data_task in asyncio.as_completed(month_tasks): + data = await month_data_task + if data: + await transport.send_queue.put( + [ + ImageQueueFrame(data["image_url"], data["image"]), + AudioQueueFrame(data["audio"]), + ] + ) + + # wait for the output queue to be empty, then leave the meeting + await transport.stop_when_done() + + month_tasks = [asyncio.create_task(get_month_data(month)) for month in months] + + await transport.run() if __name__ == "__main__": parser = argparse.ArgumentParser(description="Simple Daily Bot Sample") diff --git a/src/samples/foundational/07-interruptible.py b/src/samples/foundational/07-interruptible.py index 927a5670f..9e84c0a5a 100644 --- a/src/samples/foundational/07-interruptible.py +++ b/src/samples/foundational/07-interruptible.py @@ -1,5 +1,6 @@ import argparse import asyncio +import aiohttp import requests import time import urllib.parse @@ -12,56 +13,53 @@ from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService async def main(room_url: str, token): - global transport - global llm - global tts + async with aiohttp.ClientSession() as session: + transport = DailyTransportService( + room_url, + token, + "Respond bot", + 5, + ) + transport.mic_enabled = True + transport.mic_sample_rate = 16000 + transport.camera_enabled = False + transport.start_transcription = True - transport = DailyTransportService( - room_url, - token, - "Respond bot", - 5, - ) - transport.mic_enabled = True - transport.mic_sample_rate = 16000 - transport.camera_enabled = False - transport.start_transcription = True + llm = AzureLLMService() + tts = ElevenLabsTTSService(session, voice_id="ErXwobaYiN019PkySvjV") - llm = AzureLLMService() - tts = ElevenLabsTTSService(voice_id="ErXwobaYiN019PkySvjV") - - async def run_response(user_speech, tma_in, tma_out): - await tts.run_to_queue( - transport.send_queue, - tma_out.run( - llm.run( - tma_in.run( - [StartStreamQueueFrame(), TextQueueFrame(user_speech)] + async def run_response(user_speech, tma_in, tma_out): + await tts.run_to_queue( + transport.send_queue, + tma_out.run( + llm.run( + tma_in.run( + [StartStreamQueueFrame(), TextQueueFrame(user_speech)] + ) ) - ) - ), - ) + ), + ) - @transport.event_handler("on_first_other_participant_joined") - async def on_first_other_participant_joined(transport): - await tts.say("Hi, I'm listening!", transport.send_queue) + @transport.event_handler("on_first_other_participant_joined") + async def on_first_other_participant_joined(transport): + await tts.say("Hi, I'm listening!", transport.send_queue) - async def run_conversation(): - messages = [ - {"role": "system", "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way."}, - ] + async def run_conversation(): + messages = [ + {"role": "system", "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way."}, + ] - conversation_wrapper = InterruptibleConversationWrapper( - frame_generator=transport.get_receive_frames, - runner=run_response, - interrupt=transport.interrupt, - my_participant_id=transport.my_participant_id, - llm_messages=messages, - ) - await conversation_wrapper.run_conversation() + conversation_wrapper = InterruptibleConversationWrapper( + frame_generator=transport.get_receive_frames, + runner=run_response, + interrupt=transport.interrupt, + my_participant_id=transport.my_participant_id, + llm_messages=messages, + ) + await conversation_wrapper.run_conversation() - transport.transcription_settings["extra"]["punctuate"] = False - await asyncio.gather(transport.run(), run_conversation()) + transport.transcription_settings["extra"]["punctuate"] = False + await asyncio.gather(transport.run(), run_conversation()) if __name__ == "__main__":