Compare commits

...

3 Commits

Author SHA1 Message Date
Moishe Lettvin
358166f347 Merge pull request #59 from daily-co/remove-requirements
Remove unused requirements file
2024-03-13 16:23:42 -04:00
Moishe Lettvin
c006c123b2 Remove unused requirements file 2024-03-13 16:19:03 -04:00
chadbailey59
cf302fb765 Storybot and Chatbot examples (#58)
* storybot

* storybot

* added pipeline.queue_frames

* fixup
2024-03-13 15:12:59 -05:00
41 changed files with 594 additions and 505 deletions

View File

@@ -5,6 +5,7 @@ from dailyai.pipeline.frame_processor import FrameProcessor
from dailyai.pipeline.frames import (
EndFrame,
AudioFrame,
EndPipeFrame,
Frame,
ImageFrame,
@@ -14,7 +15,7 @@ from dailyai.pipeline.frames import (
TextFrame,
TranscriptionQueueFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame
UserStoppedSpeakingFrame,
)
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.ai_services import AIService
@@ -23,6 +24,7 @@ from typing import AsyncGenerator, Callable, Coroutine, List
from dailyai.services.openai_llm_context import OpenAILLMContext
class ResponseAggregator(FrameProcessor):
def __init__(
@@ -44,9 +46,7 @@ class ResponseAggregator(FrameProcessor):
self._accumulator_frame = accumulator_frame
self._pass_through = pass_through
async def process_frame(
self, frame: Frame
) -> AsyncGenerator[Frame, None]:
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
if not self.messages:
return
@@ -54,9 +54,13 @@ class ResponseAggregator(FrameProcessor):
self.aggregating = True
elif isinstance(frame, self._end_frame):
self.aggregating = False
self.messages.append({"role": self._role, "content": self.aggregation})
self.aggregation = ""
yield LLMMessagesQueueFrame(self.messages)
# Sometimes VAD triggers quickly on and off. If we don't get any transcription,
# it creates empty LLM message queue frames
if len(self.aggregation) > 0:
self.messages.append({"role": self._role, "content": self.aggregation})
self.aggregation = ""
yield self._end_frame()
yield LLMMessagesQueueFrame(self.messages)
elif isinstance(frame, self._accumulator_frame) and self.aggregating:
self.aggregation += f" {frame.text}"
if self._pass_through:
@@ -64,6 +68,7 @@ class ResponseAggregator(FrameProcessor):
else:
yield frame
class LLMResponseAggregator(ResponseAggregator):
def __init__(self, messages: list[dict]):
super().__init__(
@@ -71,9 +76,10 @@ class LLMResponseAggregator(ResponseAggregator):
role="assistant",
start_frame=LLMResponseStartFrame,
end_frame=LLMResponseEndFrame,
accumulator_frame=TextFrame
accumulator_frame=TextFrame,
)
class UserResponseAggregator(ResponseAggregator):
def __init__(self, messages: list[dict]):
super().__init__(
@@ -82,7 +88,7 @@ class UserResponseAggregator(ResponseAggregator):
start_frame=UserStartedSpeakingFrame,
end_frame=UserStoppedSpeakingFrame,
accumulator_frame=TranscriptionQueueFrame,
pass_through=False
pass_through=False,
)
@@ -103,9 +109,7 @@ class LLMContextAggregator(AIService):
self.complete_sentences = complete_sentences
self.pass_through = pass_through
async def process_frame(
self, frame: Frame
) -> AsyncGenerator[Frame, None]:
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
# We don't do anything with non-text frames, pass it along to next in the pipeline.
if not isinstance(frame, TextFrame):
yield frame
@@ -137,6 +141,7 @@ class LLMContextAggregator(AIService):
self.messages.append({"role": self.role, "content": frame.text})
yield LLMMessagesQueueFrame(self.messages)
class LLMUserContextAggregator(LLMContextAggregator):
def __init__(
self, messages: list[dict], bot_participant_id=None, complete_sentences=True
@@ -176,12 +181,11 @@ class SentenceAggregator(FrameProcessor):
>>> asyncio.run(print_frames(aggregator, TextFrame(" world.")))
Hello, world.
"""
def __init__(self):
self.aggregation = ""
async def process_frame(
self, frame: Frame
) -> AsyncGenerator[Frame, None]:
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
if isinstance(frame, TextFrame):
m = re.search("(.*[?.!])(.*)", frame.text)
if m:
@@ -233,12 +237,11 @@ class LLMFullResponseAggregator(FrameProcessor):
Hello, world. I am an LLM.
LLMResponseEndFrame
"""
def __init__(self):
self.aggregation = ""
async def process_frame(
self, frame: Frame
) -> AsyncGenerator[Frame, None]:
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
if isinstance(frame, TextFrame):
self.aggregation += frame.text
elif isinstance(frame, LLMResponseEndFrame):
@@ -274,8 +277,9 @@ class StatelessTextTransformer(FrameProcessor):
else:
yield frame
class ParallelPipeline(FrameProcessor):
""" Run multiple pipelines in parallel.
"""Run multiple pipelines in parallel.
This class takes frames from its source queue and sends them to each
sub-pipeline. Each sub-pipeline emits its frames into this class's
@@ -292,6 +296,7 @@ class ParallelPipeline(FrameProcessor):
Since frame handlers pass through unhandled frames by convention, this
class de-dupes frames in its sink before yielding them.
"""
def __init__(self, pipeline_definitions: List[List[FrameProcessor]]):
self.sources = [asyncio.Queue() for _ in pipeline_definitions]
self.sink: asyncio.Queue[Frame] = asyncio.Queue()
@@ -327,6 +332,7 @@ class ParallelPipeline(FrameProcessor):
if not isinstance(frame, EndPipeFrame):
yield frame
class GatedAggregator(FrameProcessor):
"""Accumulate frames, with custom functions to start and stop accumulation.
Yields gate-opening frame before any accumulated frames, then ensuing frames
@@ -352,6 +358,7 @@ class GatedAggregator(FrameProcessor):
>>> asyncio.run(print_frames(aggregator, TextFrame("Goodbye.")))
Goodbye.
"""
def __init__(self, gate_open_fn, gate_close_fn, start_open):
self.gate_open_fn = gate_open_fn
self.gate_close_fn = gate_close_fn

View File

@@ -5,7 +5,9 @@ from dailyai.services.openai_llm_context import OpenAILLMContext
class Frame:
pass
def __str__(self):
return f"{self.__class__.__name__}"
class ControlFrame(Frame):
# Control frames should contain no instance data, so
@@ -21,10 +23,21 @@ class StartFrame(ControlFrame):
class EndFrame(ControlFrame):
pass
class EndPipeFrame(ControlFrame):
pass
class PipelineStartedFrame(ControlFrame):
"""
Used by the transport to indicate that execution of a pipeline is starting
(or restarting). It should be the first frame your app receives when it
starts, or when an interruptible pipeline has been interrupted.
"""
pass
class LLMResponseStartFrame(ControlFrame):
pass
@@ -37,22 +50,34 @@ class LLMResponseEndFrame(ControlFrame):
class AudioFrame(Frame):
data: bytes
def __str__(self):
return f"{self.__class__.__name__}, size: {len(self.data)} B"
@dataclass()
class ImageFrame(Frame):
url: str | None
image: bytes
def __str__(self):
return f"{self.__class__.__name__}, url: {self.url}, image size: {len(self.image)} B"
@dataclass()
class SpriteFrame(Frame):
images: list[bytes]
def __str__(self):
return f"{self.__class__.name__}, list size: {len(self.images)}"
@dataclass()
class TextFrame(Frame):
text: str
def __str__(self):
return f'{self.__class__.__name__}: "{self.text}"'
@dataclass()
class TranscriptionQueueFrame(TextFrame):
@@ -74,15 +99,28 @@ class AppMessageQueueFrame(Frame):
message: Any
participantId: str
class UserStartedSpeakingFrame(Frame):
pass
class UserStoppedSpeakingFrame(Frame):
pass
class BotStartedSpeakingFrame(Frame):
pass
class BotStoppedSpeakingFrame(Frame):
pass
@dataclass()
class LLMFunctionStartFrame(Frame):
function_name: str
@dataclass()
class LLMFunctionCallFrame(Frame):
function_name: str

View File

@@ -19,7 +19,7 @@ class Pipeline:
source: asyncio.Queue | None = None,
sink: asyncio.Queue[Frame] | None = None,
):
""" Create a new pipeline. By default neither the source nor sink
"""Create a new pipeline. By default neither the source nor sink
queues are set, so you'll need to pass them to this constructor or
call set_source and set_sink before using the pipeline. Note that
the transport's run_*_pipeline methods will set the source and sink
@@ -30,18 +30,18 @@ class Pipeline:
self.sink: asyncio.Queue[Frame] | None = sink
def set_source(self, source: asyncio.Queue[Frame]):
""" Set the source queue for this pipeline. Frames from this queue
"""Set the source queue for this pipeline. Frames from this queue
will be processed by each frame_processor in the pipeline, or order
from first to last. """
from first to last."""
self.source = source
def set_sink(self, sink: asyncio.Queue[Frame]):
""" Set the sink queue for this pipeline. After the last frame_processor
"""Set the sink queue for this pipeline. After the last frame_processor
has processed a frame, its output will be placed on this queue."""
self.sink = sink
async def get_next_source_frame(self) -> AsyncGenerator[Frame, None]:
""" Convenience function to get the next frame from the source queue. This
"""Convenience function to get the next frame from the source queue. This
lets us consistently have an AsyncGenerator yield frames, from either the
source queue or a frame_processor."""
if self.source is None:
@@ -53,13 +53,15 @@ class Pipeline:
) -> AsyncGenerator[Frame, None]:
if processors:
async for frame in processors[0].process_frame(initial_frame):
async for final_frame in self.run_pipeline_recursively(frame, processors[1:]):
async for final_frame in self.run_pipeline_recursively(
frame, processors[1:]
):
yield final_frame
else:
yield initial_frame
async def run_pipeline(self):
""" Run the pipeline. Take each frame from the source queue, pass it to
"""Run the pipeline. Take each frame from the source queue, pass it to
the first frame_processor, pass the output of that frame_processor to the
next in the list, etc. until the last frame_processor has processed the
resulting frames, then place those frames in the sink queue.
@@ -68,7 +70,8 @@ class Pipeline:
This method will exit when an EndStreamQueueFrame is placed on the sink queue.
No more frames will be placed on the sink queue after an EndStreamQueueFrame, even
if it's not the last frame yielded by the last frame_processor in the pipeline.."""
if it's not the last frame yielded by the last frame_processor in the pipeline..
"""
if self.source is None or self.sink is None:
raise ValueError("Source or sink queue not set")
@@ -76,13 +79,26 @@ class Pipeline:
try:
while True:
initial_frame = await self.source.get()
async for frame in self.run_pipeline_recursively(initial_frame, self.processors):
async for frame in self.run_pipeline_recursively(
initial_frame, self.processors
):
await self.sink.put(frame)
if isinstance(initial_frame, EndFrame) or isinstance(initial_frame, EndPipeFrame):
if isinstance(initial_frame, EndFrame) or isinstance(
initial_frame, EndPipeFrame
):
break
except asyncio.CancelledError:
# this means there's been an interruption, do any cleanup necessary here.
for processor in self.processors:
await processor.interrupted()
pass
async def queue_frames(self, frames: Frame | List[Frame]):
"""Insert frames directly into a pipeline. This is typically used inside a transport
participant_joined callback to prompt a bot to start a conversation, for example.
"""
if not isinstance(frames, List):
frames = [frames]
for f in frames:
await self.source.put(f)

View File

@@ -1,3 +0,0 @@
Pillow==10.1.0
typing_extensions==4.9.0
faster-whisper==0.10.0

View File

@@ -16,12 +16,13 @@ from dailyai.pipeline.frames import (
LLMFunctionCallFrame,
Frame,
TextFrame,
TranscriptionQueueFrame
TranscriptionQueueFrame,
)
from abc import abstractmethod
from typing import AsyncGenerator, AsyncIterable, BinaryIO, Iterable, List
class AIService(FrameProcessor):
def __init__(self):
@@ -30,7 +31,9 @@ class AIService(FrameProcessor):
def stop(self):
pass
async def run_to_queue(self, queue: asyncio.Queue, frames, add_end_of_stream=False) -> None:
async def run_to_queue(
self, queue: asyncio.Queue, frames, add_end_of_stream=False
) -> None:
async for frame in self.run(frames):
await queue.put(frame)
@@ -39,9 +42,7 @@ class AIService(FrameProcessor):
async def run(
self,
frames: Iterable[Frame]
| AsyncIterable[Frame]
| asyncio.Queue[Frame],
frames: Iterable[Frame] | AsyncIterable[Frame] | asyncio.Queue[Frame],
) -> AsyncGenerator[Frame, None]:
try:
if isinstance(frames, AsyncIterable):
@@ -67,7 +68,8 @@ class AIService(FrameProcessor):
class LLMService(AIService):
""" This class is a no-op but serves as a base class for LLM services. """
"""This class is a no-op but serves as a base class for LLM services."""
def __init__(self):
super().__init__()
@@ -105,7 +107,7 @@ class TTSService(AIService):
text = frame.text
else:
self.current_sentence += frame.text
if self.current_sentence.endswith((".", "?", "!")):
if self.current_sentence.strip().endswith((".", "?", "!")):
text = self.current_sentence
self.current_sentence = ""
@@ -118,7 +120,9 @@ class TTSService(AIService):
# Convenience function to send the audio for a sentence to the given queue
async def say(self, sentence, queue: asyncio.Queue):
await self.run_to_queue(queue, [LLMResponseStartFrame(), TextFrame(sentence), LLMResponseEndFrame()])
await self.run_to_queue(
queue, [LLMResponseStartFrame(), TextFrame(sentence), LLMResponseEndFrame()]
)
class ImageGenService(AIService):
@@ -169,7 +173,7 @@ class STTService(AIService):
ww.close()
content.seek(0)
text = await self.run_stt(content)
yield TranscriptionQueueFrame(text, '', str(time.time()))
yield TranscriptionQueueFrame(text, "", str(time.time()))
class FrameLogger(AIService):

View File

@@ -17,43 +17,40 @@ from dailyai.pipeline.frames import (
EndFrame,
ImageFrame,
Frame,
PipelineStartedFrame,
SpriteFrame,
StartFrame,
TranscriptionQueueFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame
UserStoppedSpeakingFrame,
)
from dailyai.pipeline.pipeline import Pipeline
torch.set_num_threads(1)
model, utils = torch.hub.load(repo_or_dir='snakers4/silero-vad',
model='silero_vad',
force_reload=False)
model, utils = torch.hub.load(
repo_or_dir="snakers4/silero-vad", model="silero_vad", force_reload=False
)
(get_speech_timestamps,
save_audio,
read_audio,
VADIterator,
collect_chunks) = utils
(get_speech_timestamps, save_audio, read_audio, VADIterator, collect_chunks) = utils
# Taken from utils_vad.py
def validate(model,
inputs: torch.Tensor):
def validate(model, inputs: torch.Tensor):
with torch.no_grad():
outs = model(inputs)
return outs
# Provided by Alexander Veysov
def int2float(sound):
abs_max = np.abs(sound).max()
sound = sound.astype('float32')
sound = sound.astype("float32")
if abs_max > 0:
sound *= 1/32768
sound *= 1 / 32768
sound = sound.squeeze() # depends on the use case
return sound
@@ -73,7 +70,7 @@ class VADState(Enum):
STOPPING = 4
class BaseTransportService():
class BaseTransportService:
def __init__(
self,
@@ -94,7 +91,8 @@ class BaseTransportService():
if self._vad_enabled and self._speaker_enabled:
raise Exception(
"Sorry, you can't use speaker_enabled and vad_enabled at the same time. Please set one to False.")
"Sorry, you can't use speaker_enabled and vad_enabled at the same time. Please set one to False."
)
self._vad_samples = 1536
vad_frame_s = self._vad_samples / SAMPLE_RATE
@@ -130,20 +128,20 @@ class BaseTransportService():
async def run(self):
self._prerun()
async_output_queue_marshal_task = asyncio.create_task(
self._marshal_frames())
async_output_queue_marshal_task = asyncio.create_task(self._marshal_frames())
self._camera_thread = threading.Thread(
target=self._run_camera, daemon=True)
self._camera_thread = threading.Thread(target=self._run_camera, daemon=True)
self._camera_thread.start()
self._frame_consumer_thread = threading.Thread(
target=self._frame_consumer, daemon=True)
target=self._frame_consumer, daemon=True
)
self._frame_consumer_thread.start()
if self._speaker_enabled:
self._receive_audio_thread = threading.Thread(
target=self._receive_audio, daemon=True)
target=self._receive_audio, daemon=True
)
self._receive_audio_thread.start()
if self._vad_enabled:
@@ -151,10 +149,7 @@ class BaseTransportService():
self._vad_thread.start()
try:
while (
time.time() < self._expiration
and not self._stop_threads.is_set()
):
while time.time() < self._expiration and not self._stop_threads.is_set():
await asyncio.sleep(1)
except Exception as e:
self._logger.error(f"Exception {e}")
@@ -278,8 +273,7 @@ class BaseTransportService():
audio_chunk = self.read_audio_frames(self._vad_samples)
audio_int16 = np.frombuffer(audio_chunk, np.int16)
audio_float32 = int2float(audio_int16)
new_confidence = model(
torch.from_numpy(audio_float32), 16000).item()
new_confidence = model(torch.from_numpy(audio_float32), 16000).item()
speaking = new_confidence > 0.5
if speaking:
@@ -303,18 +297,22 @@ class BaseTransportService():
case VADState.STOPPING:
self._vad_stopping_count += 1
if self._vad_state == VADState.STARTING and self._vad_starting_count >= self._vad_start_frames:
if (
self._vad_state == VADState.STARTING
and self._vad_starting_count >= self._vad_start_frames
):
asyncio.run_coroutine_threadsafe(
self.receive_queue.put(
UserStartedSpeakingFrame()), self._loop
self.receive_queue.put(UserStartedSpeakingFrame()), self._loop
)
# self.interrupt()
self._vad_state = VADState.SPEAKING
self._vad_starting_count = 0
if self._vad_state == VADState.STOPPING and self._vad_stopping_count >= self._vad_stop_frames:
if (
self._vad_state == VADState.STOPPING
and self._vad_stopping_count >= self._vad_stop_frames
):
asyncio.run_coroutine_threadsafe(
self.receive_queue.put(
UserStoppedSpeakingFrame()), self._loop
self.receive_queue.put(UserStoppedSpeakingFrame()), self._loop
)
self._vad_state = VADState.QUIET
self._vad_stopping_count = 0
@@ -353,9 +351,7 @@ class BaseTransportService():
self.receive_queue.put(frame), self._loop
)
asyncio.run_coroutine_threadsafe(
self.receive_queue.put(EndFrame()), self._loop
)
asyncio.run_coroutine_threadsafe(self.receive_queue.put(EndFrame()), self._loop)
def _set_image(self, image: bytes):
self._images = itertools.cycle([image])
@@ -382,15 +378,17 @@ class BaseTransportService():
largest_write_size = 8000
while True:
try:
frames_or_frame: Frame | list[Frame] = (
self._threadsafe_send_queue.get()
)
if isinstance(frames_or_frame, AudioFrame) and len(frames_or_frame.data) > largest_write_size:
frames_or_frame: Frame | list[Frame] = self._threadsafe_send_queue.get()
if (
isinstance(frames_or_frame, AudioFrame)
and len(frames_or_frame.data) > largest_write_size
):
# subdivide large audio frames to enable interruption
frames = []
for i in range(0, len(frames_or_frame.data), largest_write_size):
frames.append(AudioFrame(
frames_or_frame.data[i: i+largest_write_size]))
frames.append(
AudioFrame(frames_or_frame.data[i : i + largest_write_size])
)
elif isinstance(frames_or_frame, Frame):
frames: list[Frame] = [frames_or_frame]
elif isinstance(frames_or_frame, list):
@@ -419,8 +417,7 @@ class BaseTransportService():
len(b) % smallest_write_size
)
if truncated_length:
self.write_frame_to_mic(
bytes(b[:truncated_length]))
self.write_frame_to_mic(bytes(b[:truncated_length]))
b = b[truncated_length:]
elif isinstance(frame, ImageFrame):
self._set_image(frame.image)
@@ -434,12 +431,15 @@ class BaseTransportService():
# can cause static in the audio stream.
if len(b):
truncated_length = len(b) - (len(b) % 160)
self.write_frame_to_mic(
bytes(b[:truncated_length]))
self.write_frame_to_mic(bytes(b[:truncated_length]))
b = bytearray()
if isinstance(frame, StartFrame):
self._is_interrupted.clear()
asyncio.run_coroutine_threadsafe(
self.receive_queue.put(PipelineStartedFrame()),
self._loop,
)
if self._loop:
asyncio.run_coroutine_threadsafe(
@@ -453,6 +453,5 @@ class BaseTransportService():
b = bytearray()
except Exception as e:
self._logger.error(
f"Exception in frame_consumer: {e}, {len(b)}")
self._logger.error(f"Exception in frame_consumer: {e}, {len(b)}")
raise e

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 759 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 884 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 876 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 881 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 866 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 874 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 882 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 885 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 888 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 890 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 898 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 836 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 903 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 908 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 908 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 905 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 903 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 866 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 849 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 866 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 866 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 864 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 858 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 875 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 881 KiB

Binary file not shown.

View File

@@ -0,0 +1,150 @@
import asyncio
import aiohttp
import logging
import os
from PIL import Image
from typing import AsyncGenerator
from dailyai.pipeline.aggregators import (
LLMAssistantContextAggregator,
LLMResponseAggregator,
LLMUserContextAggregator,
UserResponseAggregator,
)
from dailyai.pipeline.frames import (
ImageFrame,
SpriteFrame,
Frame,
LLMResponseEndFrame,
LLMResponseStartFrame,
LLMMessagesQueueFrame,
UserStartedSpeakingFrame,
AudioFrame,
PipelineStartedFrame,
)
from dailyai.services.ai_services import AIService
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.ai_services import FrameLogger
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.open_ai_services import OpenAILLMService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from examples.support.runner import configure
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
logger = logging.getLogger("dailyai")
logger.setLevel(logging.DEBUG)
sprites = []
script_dir = os.path.dirname(__file__)
for i in range(1, 26):
# Build the full path to the image file
full_path = os.path.join(script_dir, f"assets/robot0{i}.png")
# Get the filename without the extension to use as the dictionary key
# Open the image and convert it to bytes
with Image.open(full_path) as img:
sprites.append(img.tobytes())
flipped = sprites[::-1]
sprites.extend(flipped)
# When the bot isn't talking, show a static image of the cat listening
quiet_frame = ImageFrame("", sprites[0])
talking_frame = SpriteFrame(images=sprites)
class TalkingAnimation(AIService):
"""
This class starts a talking animation when it receives an first AudioFrame,
and then returns to a "quiet" sprite when it sees a LLMResponseEndFrame.
"""
def __init__(self):
super().__init__()
self._is_talking = False
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
if isinstance(frame, AudioFrame):
if not self._is_talking:
yield talking_frame
yield frame
self._is_talking = True
else:
yield frame
elif isinstance(frame, LLMResponseEndFrame):
yield quiet_frame
yield frame
self._is_talking = False
else:
yield frame
class AnimationInitializer(AIService):
def __init__(self):
super().__init__()
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
if isinstance(frame, PipelineStartedFrame):
yield quiet_frame
yield frame
else:
yield frame
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransportService(
room_url,
token,
"Chatbot",
duration_minutes=5,
start_transcription=True,
mic_enabled=True,
mic_sample_rate=16000,
camera_enabled=True,
camera_width=1024,
camera_height=576,
vad_enabled=True,
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id="pNInz6obpgDQGcFmaJgB",
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_CHATGPT_API_KEY"), model="gpt-4-turbo-preview"
)
ta = TalkingAnimation()
ai = AnimationInitializer()
pipeline = Pipeline([ai, llm, tts, ta])
messages = [
{
"role": "system",
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself.",
},
]
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
print(f"!!! in here, pipeline.source is {pipeline.source}")
await pipeline.queue_frames(LLMMessagesQueueFrame(messages))
async def run_conversation():
await transport.run_interruptible_pipeline(
pipeline,
post_processor=LLMResponseAggregator(messages),
pre_processor=UserResponseAggregator(messages),
)
transport.transcription_settings["extra"]["endpointing"] = True
transport.transcription_settings["extra"]["punctuate"] = True
await asyncio.gather(transport.run(), run_conversation())
if __name__ == "__main__":
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -384,17 +384,13 @@ async def main(room_url: str, token):
checklist = ChecklistProcessor(context, llm)
fl = FrameLogger("FRAME LOGGER 1:")
fl2 = FrameLogger("FRAME LOGGER 2:")
pipeline = Pipeline(processors=[fl, llm, fl2, checklist, tts])
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
fl = FrameLogger("first other participant")
await tts.run_to_queue(
transport.send_queue,
llm.run([OpenAILLMContextFrame(context)]),
)
await pipeline.queue_frames([OpenAILLMContextFrame(context)])
async def handle_intake():
pipeline = Pipeline(processors=[fl, llm, fl2, checklist, tts])
await transport.run_interruptible_pipeline(
pipeline,
post_processor=OpenAIAssistantContextAggregator(context),

View File

@@ -0,0 +1,291 @@
import aiohttp
import asyncio
import json
import random
import logging
import os
import re
import wave
from typing import AsyncGenerator
from PIL import Image
from dailyai.pipeline.pipeline import Pipeline
from dailyai.pipeline.frame_processor import FrameProcessor
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.services.fal_ai_services import FalImageGenService
from dailyai.services.open_ai_services import OpenAILLMService
from dailyai.services.deepgram_ai_services import DeepgramTTSService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.pipeline.aggregators import (
LLMAssistantContextAggregator,
LLMContextAggregator,
LLMUserContextAggregator,
ParallelPipeline,
UserResponseAggregator,
LLMResponseAggregator,
)
from examples.support.runner import configure
from dailyai.pipeline.frames import (
LLMMessagesQueueFrame,
TranscriptionQueueFrame,
Frame,
TextFrame,
LLMFunctionCallFrame,
LLMFunctionStartFrame,
LLMResponseEndFrame,
StartFrame,
AudioFrame,
SpriteFrame,
ImageFrame,
UserStoppedSpeakingFrame,
)
from dailyai.services.ai_services import FrameLogger, AIService
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
logger = logging.getLogger("dailyai")
logger.setLevel(logging.DEBUG)
sounds = {}
images = {}
sound_files = ["talking.wav", "listening.wav", "ding3.wav"]
image_files = ["grandma-writing.png", "grandma-listening.png"]
script_dir = os.path.dirname(__file__)
for file in sound_files:
# Build the full path to the sound file
full_path = os.path.join(script_dir, "assets", file)
# Get the filename without the extension to use as the dictionary key
filename = os.path.splitext(os.path.basename(full_path))[0]
# Open the sound and convert it to bytes
with wave.open(full_path) as audio_file:
sounds[file] = audio_file.readframes(-1)
for file in image_files:
# Build the full path to the image file
full_path = os.path.join(script_dir, "assets", file)
# Get the filename without the extension to use as the dictionary key
filename = os.path.splitext(os.path.basename(full_path))[0]
# Open the image and convert it to bytes
with Image.open(full_path) as img:
images[file] = img.tobytes()
class StoryStartFrame(TextFrame):
pass
class StoryPageFrame(TextFrame):
pass
class StoryPromptFrame(TextFrame):
pass
class StoryProcessor(FrameProcessor):
def __init__(self, messages, story):
self._messages = messages
self._text = ""
self._story = story
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
"""
The response from the LLM service looks like:
A comment about the user's choice
[start] (when the cat starts telling parts of the story)
A sentence of the story
[break] (between each sentence/'page' of the story)
[prompt] (when the cat asks the user to make a decision)
Question about the next part of the story
1. Catch the frames that are generated by the LLM service
"""
if isinstance(frame, UserStoppedSpeakingFrame):
yield ImageFrame(None, images["grandma-writing.png"])
yield AudioFrame(sounds["talking.wav"])
elif isinstance(frame, TextFrame):
self._text += frame.text
if re.findall(r".*\[[sS]tart\].*", self._text):
# Then we have the intro. Send it to speech ASAP
self._text = self._text.replace("[Start]", "")
self._text = self._text.replace("[start]", "")
self._text = self._text.replace("\n", " ")
if len(self._text) > 2:
yield ImageFrame(None, images["grandma-writing.png"])
yield StoryStartFrame(self._text)
yield AudioFrame(sounds["ding3.wav"])
self._text = ""
elif re.findall(r".*\[[bB]reak\].*", self._text):
# Then it's a page of the story. Get an image too
self._text = self._text.replace("[Break]", "")
self._text = self._text.replace("[break]", "")
self._text = self._text.replace("\n", " ")
if len(self._text) > 2:
self._story.append(self._text)
yield StoryPageFrame(self._text)
yield AudioFrame(sounds["ding3.wav"])
self._text = ""
elif re.findall(r".*\[[pP]rompt\].*", self._text):
# Then it's question time. Flush any
# text here as a story page, then set
# the var to get to prompt mode
# cb: trying scene now
# self.handle_chunk(self._text)
self._text = self._text.replace("[Prompt]", "")
self._text = self._text.replace("[prompt]", "")
self._text = self._text.replace("\n", " ")
if len(self._text) > 2:
self._story.append(self._text)
yield StoryPageFrame(self._text)
else:
# After the prompt thing, we'll catch an LLM end to get the last bit
pass
elif isinstance(frame, LLMResponseEndFrame):
yield ImageFrame(None, images["grandma-writing.png"])
yield StoryPromptFrame(self._text)
self._text = ""
yield frame
yield ImageFrame(None, images["grandma-listening.png"])
yield AudioFrame(sounds["listening.wav"])
else:
# pass through everything that's not a TextFrame
yield frame
class StoryImageGenerator(FrameProcessor):
def __init__(self, story, llm, img):
self._story = story
self._llm = llm
self._img = img
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
if isinstance(frame, StoryPageFrame):
if len(self._story) == 1:
prompt = f'You are an illustrator for a children\'s story book. Generate a prompt for DALL-E to create an illustration for the first page of the book, which reads: "{self._story[0]}"\n\n Your response should start with the phrase "Children\'s book illustration of".'
else:
prompt = f"You are an illustrator for a children's story book. Here is the story so far:\n\n\"{' '.join(self._story[:-1])}\"\n\nGenerate a prompt for DALL-E to create an illustration for the next page. Here's the sentence for the next page:\n\n\"{self._story[-1:][0]}\"\n\n Your response should start with the phrase \"Children's book illustration of\"."
msgs = [{"role": "system", "content": prompt}]
image_prompt = ""
async for f in self._llm.process_frame(LLMMessagesQueueFrame(msgs)):
if isinstance(f, TextFrame):
image_prompt += f.text
async for f in self._img.process_frame(TextFrame(image_prompt)):
yield f
# Yield the original StoryPageFrame for basic image/audio sync
yield frame
else:
yield frame
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
global transport
global llm
global tts
messages = [
{
"role": "system",
"content": "You are a storytelling grandma who loves to make up fantastic, fun, and educational stories for children between the ages of 5 and 10 years old. Your stories are full of friendly, magical creatures. Your stories are never scary. Each sentence of your story will become a page in a storybook. Stop after 3-4 sentences and give the child a choice to make that will influence the next part of the story. Once the child responds, start by saying something nice about the choice they made, then include [start] in your response. Include [break] after each sentence of the story. Include [prompt] between the story and the prompt.",
}
]
story = []
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_CHATGPT_API_KEY"),
model="gpt-4-1106-preview",
) # gpt-4-1106-preview
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id="Xb7hH8MSUJpSbSDYk0k2",
) # matilda
img = FalImageGenService(
image_size="1024x1024",
aiohttp_session=session,
key_id=os.getenv("FAL_KEY_ID"),
key_secret=os.getenv("FAL_KEY_SECRET"),
)
lra = LLMResponseAggregator(messages)
ura = UserResponseAggregator(messages)
sp = StoryProcessor(messages, story)
sig = StoryImageGenerator(story, llm, img)
transport = DailyTransportService(
room_url,
token,
"Storybot",
5,
mic_enabled=True,
mic_sample_rate=16000,
camera_enabled=True,
camera_width=1024,
camera_height=1024,
start_transcription=True,
vad_enabled=True,
vad_stop_s=1.5,
)
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
# We're being a bit tricky here by using a special system prompt to
# ask the user for a story topic. After their intial response, we'll
# use a different system prompt to create story pages.
intro_messages = [
{
"role": "system",
"content": "You are a storytelling grandma who loves to make up fantastic, fun, and educational stories for children between the ages of 5 and 10 years old. Your stories are full of friendly, magical creatures. Your stories are never scary. Begin by asking what a child wants you to tell a story about. Keep your reponse to only a few sentences.",
}
]
lca = LLMAssistantContextAggregator(messages)
await tts.run_to_queue(
transport.send_queue,
lca.run(
llm.run(
[
ImageFrame(None, images["grandma-listening.png"]),
LLMMessagesQueueFrame(intro_messages),
AudioFrame(sounds["listening.wav"]),
]
),
),
)
async def storytime():
fl = FrameLogger("### After Image Generation")
pipeline = Pipeline(
processors=[
ura,
llm,
sp,
sig,
fl,
tts,
lra,
]
)
await transport.run_uninterruptible_pipeline(
pipeline,
)
transport.transcription_settings["extra"]["endpointing"] = True
transport.transcription_settings["extra"]["punctuate"] = True
try:
await asyncio.gather(transport.run(), storytime())
except (asyncio.CancelledError, KeyboardInterrupt):
print("whoops")
transport.stop()
if __name__ == "__main__":
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -1,409 +0,0 @@
import aiohttp
import asyncio
import json
import random
import logging
import os
import re
import wave
from typing import AsyncGenerator
from PIL import Image
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.services.open_ai_services import OpenAILLMService
from dailyai.services.deepgram_ai_services import DeepgramTTSService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.pipeline.aggregators import (
LLMAssistantContextAggregator,
LLMContextAggregator,
LLMUserContextAggregator,
UserResponseAggregator,
LLMResponseAggregator,
)
from examples.support.runner import configure
from dailyai.pipeline.frames import (
LLMMessagesQueueFrame,
TranscriptionQueueFrame,
Frame,
TextFrame,
LLMFunctionCallFrame,
LLMFunctionStartFrame,
LLMResponseEndFrame,
StartFrame,
AudioFrame,
SpriteFrame,
ImageFrame,
)
from dailyai.services.ai_services import FrameLogger, AIService
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
logger = logging.getLogger("dailyai")
logger.setLevel(logging.DEBUG)
sounds = {}
sound_files = ["clack-short.wav", "clack.wav", "clack-short-quiet.wav"]
script_dir = os.path.dirname(__file__)
for file in sound_files:
# Build the full path to the image file
full_path = os.path.join(script_dir, "assets", file)
# Get the filename without the extension to use as the dictionary key
filename = os.path.splitext(os.path.basename(full_path))[0]
# Open the image and convert it to bytes
with wave.open(full_path) as audio_file:
sounds[file] = audio_file.readframes(-1)
steps = [
{
"prompt": "Start by introducing yourself. Then, ask the user to confirm their identity by telling you their birthday, including the year. When they answer with their birthday, call the verify_birthday function.",
"run_async": False,
"failed": "The user provided an incorrect birthday. Ask them for their birthday again. When they answer, call the verify_birthday function.",
"tools": [
{
"type": "function",
"function": {
"name": "verify_birthday",
"description": "Use this function to verify the user has provided their correct birthday.",
"parameters": {
"type": "object",
"properties": {
"birthday": {
"type": "string",
"description": "The user's birthdate, including the year. The user can provide it in any format, but convert it to YYYY-MM-DD format to call this function.",
}
},
},
},
}
],
},
{
"prompt": "Next, thank the user for confirming their identity, then ask the user to list their current prescriptions. Each prescription needs to have a medication name and a dosage. Do not call the list_prescriptions function with any unknown dosages.",
"run_async": True,
"tools": [
{
"type": "function",
"function": {
"name": "list_prescriptions",
"description": "Once the user has provided a list of their prescription medications, call this function.",
"parameters": {
"type": "object",
"properties": {
"prescriptions": {
"type": "array",
"items": {
"type": "object",
"properties": {
"medication": {
"type": "string",
"description": "The medication's name",
},
"dosage": {
"type": "string",
"description": "The prescription's dosage",
},
},
},
}
},
},
},
}
],
},
{
"prompt": "Next, ask the user if they have any allergies. Once they have listed their allergies or confirmed they don't have any, call the list_allergies function.",
"run_async": True,
"tools": [
{
"type": "function",
"function": {
"name": "list_allergies",
"description": "Once the user has provided a list of their allergies, call this function.",
"parameters": {
"type": "object",
"properties": {
"allergies": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "What the user is allergic to",
}
},
},
}
},
},
},
}
],
},
{
"prompt": "Now ask the user if they have any medical conditions the doctor should know about. Once they've answered the question, call the list_conditions function.",
"run_async": True,
"tools": [
{
"type": "function",
"function": {
"name": "list_conditions",
"description": "Once the user has provided a list of their medical conditions, call this function.",
"parameters": {
"type": "object",
"properties": {
"conditions": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The user's medical condition",
}
},
},
}
},
},
},
},
],
},
{
"prompt": "Finally, ask the user the reason for their doctor visit today. Once they answer, call the list_visit_reasons function.",
"run_async": True,
"tools": [
{
"type": "function",
"function": {
"name": "list_visit_reasons",
"description": "Once the user has provided a list of the reasons they are visiting a doctor today, call this function.",
"parameters": {
"type": "object",
"properties": {
"visit_reasons": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The user's reason for visiting the doctor",
}
},
},
}
},
},
},
}
],
},
{
"prompt": "Now, thank the user and end the conversation.",
"run_async": True,
"tools": [],
},
{"prompt": "", "run_async": True, "tools": []},
]
current_step = 0
class TranscriptFilter(AIService):
def __init__(self, bot_participant_id=None):
super().__init__()
self.bot_participant_id = bot_participant_id
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
if isinstance(frame, TranscriptionQueueFrame):
if frame.participantId != self.bot_participant_id:
yield frame
class ChecklistProcessor(AIService):
def __init__(self, messages, llm, tools, *args, **kwargs):
super().__init__(*args, **kwargs)
self._messages = messages
self._llm = llm
self._tools = tools
self._id = "You are Jessica, an agent for a company called Tri-County Health Services. Your job is to collect important information from the user before their doctor visit. You're talking to Chad Bailey. You should address the user by their first name and be polite and professional. You're not a medical professional, so you shouldn't provide any advice. Keep your responses short. Your job is to collect information to give to a doctor. Don't make assumptions about what values to plug into functions. Ask for clarification if a user response is ambiguous."
self._acks = ["One sec.", "Let me confirm that.", "Thanks.", "OK."]
# Create an allowlist of functions that the LLM can call
self._functions = [
"verify_birthday",
"list_prescriptions",
"list_allergies",
"list_conditions",
"list_visit_reasons",
]
messages.append(
{"role": "system", "content": f"{self._id} {steps[0]['prompt']}"}
)
def verify_birthday(self, args):
return args["birthday"] == "1983-01-01"
def list_prescriptions(self, args):
# print(f"--- Prescriptions: {args['prescriptions']}\n")
pass
def list_allergies(self, args):
# print(f"--- Allergies: {args['allergies']}\n")
pass
def list_conditions(self, args):
# print(f"--- Medical Conditions: {args['conditions']}")
pass
def list_visit_reasons(self, args):
# print(f"Visit Reasons: {args['visit_reasons']}")
pass
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
global current_step
this_step = steps[current_step]
# TODO-CB: forcing a global here :/
self._tools.clear()
self._tools.extend(this_step["tools"])
if isinstance(frame, LLMFunctionStartFrame):
print(f"... Preparing function call: {frame.function_name}")
self._function_name = frame.function_name
if this_step["run_async"]:
# Get the LLM talking about the next step before getting the rest
# of the function call completion
current_step += 1
self._messages.append(
{"role": "system", "content": steps[current_step]["prompt"]}
)
yield LLMMessagesQueueFrame(self._messages)
async for frame in llm.process_frame(
LLMMessagesQueueFrame(self._messages), tool_choice="none"
):
yield frame
else:
# Insert a quick response while we run the function
# yield AudioFrame(sounds["clack-short-quiet.wav"])
pass
elif isinstance(frame, LLMFunctionCallFrame):
if frame.function_name and frame.arguments:
print(f"--> Calling function: {frame.function_name} with arguments:")
pretty_json = re.sub(
"\n", "\n ", json.dumps(json.loads(frame.arguments), indent=2)
)
print(f"--> {pretty_json}\n")
if not frame.function_name in self._functions:
raise Exception(
f"The LLM tried to call a function named {frame.function_name}, which isn't in the list of known functions. Please check your prompt and/or self._functions."
)
fn = getattr(self, frame.function_name)
result = fn(json.loads(frame.arguments))
if not this_step["run_async"]:
if result:
current_step += 1
self._messages.append(
{"role": "system", "content": steps[current_step]["prompt"]}
)
yield LLMMessagesQueueFrame(self._messages)
async for frame in llm.process_frame(
LLMMessagesQueueFrame(self._messages), tool_choice="none"
):
yield frame
else:
self._messages.append(
{"role": "system", "content": this_step["failed"]}
)
yield LLMMessagesQueueFrame(self._messages)
async for frame in llm.process_frame(
LLMMessagesQueueFrame(self._messages), tool_choice="none"
):
yield frame
print(f"<-- Verify result: {result}\n")
else:
yield frame
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
global transport
global llm
global tts
transport = DailyTransportService(
room_url,
token,
"Story Cat",
5,
mic_enabled=True,
mic_sample_rate=16000,
camera_enabled=False,
start_transcription=True,
vad_enabled=True,
)
# TODO-CB: Go back to vad_enabled
messages = []
tools = []
# llm = AzureLLMService(api_key=os.getenv("AZURE_CHATGPT_API_KEY"), endpoint=os.getenv(
# "AZURE_CHATGPT_ENDPOINT"), model=os.getenv("AZURE_CHATGPT_MODEL"))
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_CHATGPT_API_KEY"),
model="gpt-4-1106-preview",
tools=tools,
) # gpt-4-1106-preview
# tts = AzureTTSService(api_key=os.getenv(
# "AZURE_SPEECH_API_KEY"), region=os.getenv("AZURE_SPEECH_REGION"))
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id="XrExE9yKIg1WjnnlVkGX",
) # matilda
# tts = DeepgramTTSService(aiohttp_session=session, api_key=os.getenv(
# "DEEPGRAM_API_KEY"), voice="aura-asteria-en")
# lca = LLMContextAggregator(
# messages=messages, bot_participant_id=transport._my_participant_id)
checklist = ChecklistProcessor(messages, llm, tools)
fl = FrameLogger("FRAME LOGGER 1:")
fl2 = FrameLogger("FRAME LOGGER 2:")
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
fl = FrameLogger("first other participant")
# TODO-CB: Make sure this message gets into the context somehow
await tts.run_to_queue(
transport.send_queue,
llm.run([LLMMessagesQueueFrame(messages)]),
)
async def handle_intake():
pipeline = Pipeline(processors=[fl, llm, fl2, checklist, tts])
await transport.run_interruptible_pipeline(
pipeline,
post_processor=LLMResponseAggregator(messages),
pre_processor=UserResponseAggregator(messages),
)
transport.transcription_settings["extra"]["endpointing"] = True
transport.transcription_settings["extra"]["punctuate"] = True
try:
await asyncio.gather(transport.run(), handle_intake())
except (asyncio.CancelledError, KeyboardInterrupt):
print("whoops")
transport.stop()
if __name__ == "__main__":
(url, token) = configure()
asyncio.run(main(url, token))