Compare commits

..

8 Commits

Author SHA1 Message Date
Kwindla Hultman Kramer
2d3f718733 probably non-pythonic exception handling in 01 2024-03-08 10:41:28 -08:00
chadbailey59
824df8ca7c moved patient intake and example runner (#44) 2024-03-08 12:07:51 -06:00
chadbailey59
0db8a51b27 cleaned up function calling frames (#43) 2024-03-08 10:13:28 -06:00
chadbailey59
ce9c6ede66 function allowlist (#42) 2024-03-08 08:49:09 -06:00
Moishe Lettvin
192b46bbab Merge pull request #41 from daily-co/optimize-pipeline
Optimize pipeline processing
2024-03-07 21:01:03 -05:00
Moishe Lettvin
196279e342 Add endframe to sample 4 2024-03-07 19:24:27 -05:00
Moishe Lettvin
edd93bc4cb remove errant print statement 2024-03-07 19:05:03 -05:00
Moishe Lettvin
d0076dd4ee Optimize pipeline processing so we don't wait for the completion of one generator to move onto the next. 2024-03-07 18:59:47 -05:00
20 changed files with 105 additions and 99 deletions

View File

@@ -74,6 +74,9 @@ class UserStoppedSpeakingFrame(Frame):
pass
@dataclass()
class LLMFunctionStartFrame(Frame):
function_name: str
@dataclass()
class LLMFunctionCallFrame(Frame):
function_name: str
arguments: str

View File

@@ -48,6 +48,16 @@ class Pipeline:
raise ValueError("Source queue not set")
yield await self.source.get()
async def run_pipeline_recursively(
self, initial_frame: Frame, processors: List[FrameProcessor]
) -> AsyncGenerator[Frame, None]:
if processors:
async for frame in processors[0].process_frame(initial_frame):
async for final_frame in self.run_pipeline_recursively(frame, processors[1:]):
yield final_frame
else:
yield initial_frame
async def run_pipeline(self):
""" Run the pipeline. Take each frame from the source queue, pass it to
the first frame_processor, pass the output of that frame_processor to the
@@ -65,23 +75,12 @@ class Pipeline:
try:
while True:
frame_generators = [self.get_next_source_frame()]
for processor in self.processors:
next_frame_generators = []
for frame_generator in frame_generators:
async for frame in frame_generator:
next_frame_generators.append(processor.process_frame(frame))
frame_generators = next_frame_generators
initial_frame = await self.source.get()
async for frame in self.run_pipeline_recursively(initial_frame, self.processors):
await self.sink.put(frame)
for frame_generator in frame_generators:
async for frame in frame_generator:
await self.sink.put(frame)
if isinstance(
frame, EndFrame
) or isinstance(
frame, EndPipeFrame
):
return
if isinstance(initial_frame, EndFrame) or isinstance(initial_frame, EndPipeFrame):
break
except asyncio.CancelledError:
# this means there's been an interruption, do any cleanup necessary here.
for processor in self.processors:

View File

@@ -12,24 +12,17 @@ from dailyai.pipeline.frames import (
LLMMessagesQueueFrame,
LLMResponseEndFrame,
LLMResponseStartFrame,
LLMFunctionStartFrame,
LLMFunctionCallFrame,
Frame,
TextFrame,
TranscriptionQueueFrame,
TranscriptionQueueFrame
)
from abc import abstractmethod
from typing import AsyncGenerator, AsyncIterable, BinaryIO, Iterable, List
class AIService(FrameProcessor):
""" This is the base class for various AI services (LLM, TTS and Image)
This class adds some convenienence functions to run, effectively, a one-stage
pipeline where the incoming frames can come from an iterable or queue
and the processed frames go to a queue. Child classes extend those convenience
functions, eg. TTS's `say` method runs the TTS and emits the AudioFrames to a
queue.
"""
def __init__(self):
self.logger = logging.getLogger("dailyai")
@@ -37,17 +30,12 @@ class AIService(FrameProcessor):
def stop(self):
pass
async def run_to_queue(
self,
queue: asyncio.Queue,
frames: Iterable[Frame] | AsyncIterable[Frame] | asyncio.Queue[Frame]
) -> None:
""" Process the given frames (from an iterable or queue) and send them to
the given queue.
"""
async def run_to_queue(self, queue: asyncio.Queue, frames, add_end_of_stream=False) -> None:
async for frame in self.run(frames):
await queue.put(frame)
if add_end_of_stream:
await queue.put(EndFrame())
async def run(
self,
@@ -55,16 +43,6 @@ class AIService(FrameProcessor):
| AsyncIterable[Frame]
| asyncio.Queue[Frame],
) -> AsyncGenerator[Frame, None]:
""" Generates 0 or more frames from the given iterable or queue.
This is a convenience function to take a collection of frames, process
them, and yield processed frames.
The preferred way to use FrameProcessors is with a pipeline, but if you
have a very simple case (eg. a list of static text blocks you want to speak,
or a list of static image description you want to render) this function
will be helpful.
"""
try:
if isinstance(frames, AsyncIterable):
async for frame in frames:
@@ -95,31 +73,38 @@ class LLMService(AIService):
self._messages = messages
@abstractmethod
async def run_llm_async(self, messages, tool_choice=None) -> AsyncGenerator[str, None]:
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
yield ""
@abstractmethod
async def run_llm(self, messages) -> str:
pass
async def process_frame(self, frame: Frame, tool_choice: str | None = None) -> AsyncGenerator[Frame, None]:
async def process_frame(self, frame: Frame, tool_choice: str = None) -> AsyncGenerator[Frame, None]:
if isinstance(frame, LLMMessagesQueueFrame):
function_name = ""
arguments = ""
if isinstance(frame, LLMMessagesQueueFrame):
yield LLMResponseStartFrame()
async for text_chunk in self.run_llm_async(frame.messages, tool_choice):
# We're streaming the LLM response and returning individual TextFrames for each chunk because
# we want to enable quick TTS. But if the LLM response is a function call, we don't need to yield
# each chunk because the function call is only useful as a single frame. Instead, we'll emit a
# LLMFunctionStartFrame to let downstream services know a function call is coming, then we'll
# collect the function arguments and return the entire call in a single LLMFunctionCallFrame.
if isinstance(text_chunk, str):
yield TextFrame(text_chunk)
elif text_chunk.function:
if text_chunk.function.name:
# function_name += text_chunk.function.name
yield LLMFunctionCallFrame(function_name=text_chunk.function.name, arguments=None)
function_name += text_chunk.function.name
yield LLMFunctionStartFrame(function_name=text_chunk.function.name)
if text_chunk.function.arguments:
# arguments += text_chunk.function.arguments
yield LLMFunctionCallFrame(function_name=None, arguments=text_chunk.function.arguments)
# Keep iterating through the response to collect all the argument fragments and
# yield a complete LLMFunctionCallFrame after run_llm_async completes
arguments += text_chunk.function.arguments
if (function_name and arguments):
yield LLMFunctionCallFrame(function_name=function_name, arguments=arguments)
function_name = ""
arguments = ""
yield LLMResponseEndFrame()

View File

@@ -328,7 +328,7 @@ class BaseTransportService():
break
def interrupt(self):
self._logger.debug("!!! Interrupting")
self._logger.debug("### Interrupting")
self._is_interrupted.set()
async def get_receive_frames(self) -> AsyncGenerator[Frame, None]:

View File

@@ -19,13 +19,11 @@ class OLLamaLLMService(LLMService):
model=self._model
)
async def run_llm_async(self, messages, tool_choice=None) -> AsyncGenerator[str, None]:
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
messages_for_log = json.dumps(messages)
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
chunks = await self._client.chat.completions.create(
model=self._model, stream=True, messages=messages
)
chunks = await self._client.chat.completions.create(model=self._model, stream=True, messages=messages)
async for chunk in chunks:
if len(chunk.choices) == 0:
continue
@@ -35,7 +33,7 @@ class OLLamaLLMService(LLMService):
async def run_llm(self, messages) -> str | None:
messages_for_log = json.dumps(messages)
self.logger.debug(f"Generating chat via ollama: {messages_for_log}")
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
response = await self._client.chat.completions.create(model=self._model, stream=False, messages=messages)
if response and len(response.choices) > 0:

View File

@@ -1,5 +1,4 @@
import asyncio
from doctest import OutputChecker
import unittest
from dailyai.pipeline.aggregators import SentenceAggregator, StatelessTextTransformer
from dailyai.pipeline.frames import EndFrame, TextFrame

View File

@@ -6,7 +6,7 @@ from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.playht_ai_service import PlayHTAIService
from examples.foundational.support.runner import configure
from examples.support.runner import configure
async def main(room_url):
@@ -44,19 +44,22 @@ async def main(room_url):
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, participant):
nonlocal tts
if participant["info"]["isLocal"]:
return
try:
if participant["info"]["isLocal"]:
return
await tts.say(
"Hello there, " + participant["info"]["userName"] + "!",
transport.send_queue,
)
await tts.say(
"Hello there, " + participant["info"]["userName"] + "!",
transport.send_queue,
)
except Exception as e:
print("Exception while speaking:", e)
# wait for the output queue to be empty, then leave the meeting
await transport.stop_when_done()
await transport.run()
del(tts)
del (tts)
if __name__ == "__main__":

View File

@@ -9,7 +9,7 @@ from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.deepgram_ai_services import DeepgramTTSService
from dailyai.services.open_ai_services import OpenAILLMService
from examples.foundational.support.runner import configure
from examples.support.runner import configure
async def main(room_url):

View File

@@ -8,7 +8,7 @@ from dailyai.services.fal_ai_services import FalImageGenService
from dailyai.services.open_ai_services import OpenAIImageGenService
from dailyai.services.azure_ai_services import AzureImageGenServiceREST
from examples.foundational.support.runner import configure
from examples.support.runner import configure
local_joined = False
participant_joined = False

View File

@@ -9,7 +9,7 @@ from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.pipeline.frames import EndFrame, LLMMessagesQueueFrame
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from examples.foundational.support.runner import configure
from examples.support.runner import configure
async def main(room_url: str):
@@ -44,7 +44,8 @@ async def main(room_url: str):
buffer_queue = asyncio.Queue()
source_queue = asyncio.Queue()
pipeline = Pipeline(source = source_queue, sink=buffer_queue, processors=[llm, elevenlabs_tts])
source_queue.put_nowait(LLMMessagesQueueFrame(messages))
await source_queue.put(LLMMessagesQueueFrame(messages))
await source_queue.put(EndFrame())
pipeline_run_task = pipeline.run_pipeline()
@transport.event_handler("on_first_other_participant_joined")

View File

@@ -12,7 +12,7 @@ from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.fal_ai_services import FalImageGenService
from dailyai.services.open_ai_services import OpenAIImageGenService
from examples.foundational.support.runner import configure
from examples.support.runner import configure
async def main(room_url):
@@ -47,7 +47,20 @@ async def main(room_url):
source_queue = asyncio.Queue()
for month in ["January", "February"]:
for month in [
"January",
"February",
"March",
"April",
"May",
"June",
"July",
"August",
"September",
"October",
"November",
"December",
]:
messages = [
{
"role": "system",

View File

@@ -6,7 +6,7 @@ from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.services.ai_services import FrameLogger
from dailyai.pipeline.aggregators import LLMAssistantContextAggregator, LLMUserContextAggregator
from examples.foundational.support.runner import configure
from examples.support.runner import configure
async def main(room_url: str, token):
@@ -51,8 +51,8 @@ async def main(room_url: str, token):
tma_in,
llm,
fl2,
tts,
tma_out,
tts
],
)
await transport.run_uninterruptible_pipeline(pipeline)

View File

@@ -16,7 +16,7 @@ from dailyai.services.ai_services import AIService
from dailyai.pipeline.aggregators import LLMAssistantContextAggregator, LLMUserContextAggregator
from dailyai.services.fal_ai_services import FalImageGenService
from examples.foundational.support.runner import configure
from examples.support.runner import configure
class ImageSyncAggregator(AIService):

View File

@@ -8,7 +8,7 @@ from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.fal_ai_services import FalImageGenService
from dailyai.pipeline.frames import AudioFrame, ImageFrame
from examples.foundational.support.runner import configure
from examples.support.runner import configure
async def main(room_url: str):

View File

@@ -19,7 +19,7 @@ from dailyai.pipeline.frames import (
)
from dailyai.services.ai_services import AIService
from examples.foundational.support.runner import configure
from examples.support.runner import configure
sprites = {}

View File

@@ -12,7 +12,7 @@ from dailyai.services.ai_services import AIService, FrameLogger
from dailyai.pipeline.frames import Frame, AudioFrame, LLMResponseEndFrame, LLMMessagesQueueFrame
from typing import AsyncGenerator
from examples.foundational.support.runner import configure
from examples.support.runner import configure
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s") # or whatever
logger = logging.getLogger("dailyai")

View File

@@ -3,7 +3,7 @@ import asyncio
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.whisper_ai_services import WhisperSTTService
from examples.foundational.support.runner import configure
from examples.support.runner import configure
async def main(room_url: str):

View File

@@ -10,7 +10,7 @@ from dailyai.services.ai_services import AIService, FrameLogger
from dailyai.pipeline.frames import Frame, AudioFrame, LLMResponseEndFrame, LLMMessagesQueueFrame
from typing import AsyncGenerator
from examples.foundational.support.runner import configure
from examples.support.runner import configure
sounds = {}
sound_files = [

View File

@@ -8,6 +8,9 @@ import wave
from typing import AsyncGenerator
from PIL import Image
import sys
print('\n'.join(sys.path))
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
@@ -15,12 +18,12 @@ from dailyai.services.open_ai_services import OpenAILLMService
from dailyai.services.deepgram_ai_services import DeepgramTTSService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.pipeline.aggregators import LLMAssistantContextAggregator, LLMContextAggregator, LLMUserContextAggregator, UserResponseAggregator, LLMResponseAggregator
from support.runner import configure
from dailyai.pipeline.frames import LLMMessagesQueueFrame, TranscriptionQueueFrame, Frame, TextFrame, LLMFunctionCallFrame, LLMResponseEndFrame, StartFrame, AudioFrame, SpriteFrame, ImageFrame
from examples.support.runner import configure
from dailyai.pipeline.frames import LLMMessagesQueueFrame, TranscriptionQueueFrame, Frame, TextFrame, LLMFunctionCallFrame, LLMFunctionStartFrame, LLMResponseEndFrame, StartFrame, AudioFrame, SpriteFrame, ImageFrame
from dailyai.services.ai_services import FrameLogger, AIService
import logging
logging.basicConfig(level=logging.DEBUG)
logging.basicConfig(level=logging.INFO)
sounds = {}
sound_files = [
@@ -207,11 +210,19 @@ class ChecklistProcessor(AIService):
self._messages = messages
self._llm = llm
self._tools = tools
self._function_name = ""
self._arguments = ""
self._id = "You are Jessica, an agent for a company called Tri-County Health Services. Your job is to collect important information from the user before their doctor visit. You're talking to Chad Bailey. You should address the user by their first name and be polite and professional. You're not a medical professional, so you shouldn't provide any advice. Keep your responses short. Your job is to collect information to give to a doctor. Don't make assumptions about what values to plug into functions. Ask for clarification if a user response is ambiguous."
self._acks = ["One sec.", "Let me confirm that.", "Thanks.", "OK."]
# Create an allowlist of functions that the LLM can call
self._functions = [
"verify_birthday",
"list_prescriptions",
"list_allergies",
"list_conditions",
"list_visit_reasons"
]
messages.append(
{"role": "system", "content": f"{self._id} {steps[0]['prompt']}"})
@@ -240,17 +251,15 @@ class ChecklistProcessor(AIService):
# TODO-CB: forcing a global here :/
self._tools.clear()
self._tools.extend(this_step['tools'])
if isinstance(frame, LLMFunctionCallFrame) and frame.function_name:
if isinstance(frame, LLMFunctionStartFrame):
print(f"... Preparing function call: {frame.function_name}")
self._function_name = frame.function_name
if this_step['run_async']:
# Get the LLM talking about the next step before getting the rest
# of the function call completion
current_step += 1
# yield TextFrame(f"We should move on to Step {current_step}.")
self._messages.append({
"role": "system", "content": steps[current_step]['prompt']})
# yield LLMMessagesQueueFrame(self._messages)
yield LLMMessagesQueueFrame(self._messages)
async for frame in llm.process_frame(LLMMessagesQueueFrame(self._messages), tool_choice="none"):
yield frame
@@ -258,34 +267,30 @@ class ChecklistProcessor(AIService):
# Insert a quick response while we run the function
# yield AudioFrame(sounds["clack-short-quiet.wav"])
pass
elif isinstance(frame, LLMFunctionCallFrame) and frame.arguments:
self._arguments += frame.arguments
elif isinstance(frame, LLMResponseEndFrame):
elif isinstance(frame, LLMFunctionCallFrame):
if self._function_name and self._arguments:
if frame.function_name and frame.arguments:
print(
f"--> Calling function: {self._function_name} with arguments:")
f"--> Calling function: {frame.function_name} with arguments:")
pretty_json = re.sub("\n", "\n ", json.dumps(
json.loads(self._arguments), indent=2))
json.loads(frame.arguments), indent=2))
print(f"--> {pretty_json}\n")
fn = getattr(self, self._function_name)
result = fn(json.loads(self._arguments))
self._function_name = ""
self._arguments = ""
if not frame.function_name in self._functions:
raise Exception(f"The LLM tried to call a function named {frame.function_name}, which isn't in the list of known functions. Please check your prompt and/or self._functions.")
fn = getattr(self, frame.function_name)
result = fn(json.loads(frame.arguments))
if not this_step['run_async']:
if result:
current_step += 1
# yield TextFrame(f"We should move on to Step {current_step}.")
self._messages.append({
"role": "system", "content": steps[current_step]['prompt']})
# yield LLMMessagesQueueFrame(self._messages)
yield LLMMessagesQueueFrame(self._messages)
async for frame in llm.process_frame(LLMMessagesQueueFrame(self._messages), tool_choice="none"):
yield frame
else:
self._messages.append({
"role": "system", "content": this_step['failed']})
# yield LLMMessagesQueueFrame(self._messages)
yield LLMMessagesQueueFrame(self._messages)
async for frame in llm.process_frame(LLMMessagesQueueFrame(self._messages), tool_choice="none"):
yield frame