diff --git a/src/dailyai/conversation_wrappers.py b/src/dailyai/conversation_wrappers.py index bc7dd902d..7f688477c 100644 --- a/src/dailyai/conversation_wrappers.py +++ b/src/dailyai/conversation_wrappers.py @@ -2,7 +2,7 @@ import asyncio import copy import functools from typing import AsyncGenerator, Awaitable, Callable -from dailyai.queue_aggregators import LLMContextAggregator +from dailyai.queue_aggregators import LLMAssistantContextAggregator, LLMContextAggregator, LLMUserContextAggregator from dailyai.queue_frame import EndStreamQueueFrame, QueueFrame, TranscriptionQueueFrame @@ -17,8 +17,8 @@ class InterruptibleConversationWrapper: interrupt: Callable[[], None], my_participant_id: str | None, llm_messages: list[dict[str, str]], - llm_context_aggregator_in=LLMContextAggregator, - llm_context_aggregator_out=LLMContextAggregator, + llm_context_aggregator_in=LLMUserContextAggregator, + llm_context_aggregator_out=LLMAssistantContextAggregator, delay_before_speech_seconds: float = 1.0, ): self._frame_generator: Callable[[], AsyncGenerator[QueueFrame, None]] = frame_generator @@ -43,10 +43,10 @@ class InterruptibleConversationWrapper: async def speak_after_delay(self, user_speech, messages): await asyncio.sleep(self._delay_before_speech_seconds) tma_in = self._llm_context_aggregator_in( - messages, "user", self._my_participant_id, False + messages, self._my_participant_id, complete_sentences=False ) tma_out = self._llm_context_aggregator_out( - messages, "assistant", self._my_participant_id + messages, self._my_participant_id ) await self._runner(user_speech, tma_in, tma_out) diff --git a/src/dailyai/queue_aggregators.py b/src/dailyai/queue_aggregators.py index e2e5ff7fd..55461e29c 100644 --- a/src/dailyai/queue_aggregators.py +++ b/src/dailyai/queue_aggregators.py @@ -1,6 +1,6 @@ import asyncio -from dailyai.queue_frame import LLMMessagesQueueFrame, QueueFrame, TextQueueFrame +from dailyai.queue_frame import LLMMessagesQueueFrame, QueueFrame, TextQueueFrame, TranscriptionQueueFrame from dailyai.services.ai_services import AIService from typing import AsyncGenerator, List @@ -32,24 +32,55 @@ class LLMContextAggregator(AIService): messages: list[dict], role: str, bot_participant_id=None, - complete_sentences=True): + complete_sentences=True, + pass_through=True): self.messages = messages self.bot_participant_id = bot_participant_id self.role = role self.sentence = "" self.complete_sentences = complete_sentences + self.pass_through = pass_through async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: - # TODO: split up transcription by participant - if isinstance(frame, TextQueueFrame): - if self.complete_sentences: - self.sentence += frame.text - if self.sentence.endswith((".", "?", "!")): - self.messages.append({"role": self.role, "content": self.sentence}) - self.sentence = "" - yield LLMMessagesQueueFrame(self.messages) - else: - self.messages.append({"role": self.role, "content": frame.text}) - yield LLMMessagesQueueFrame(self.messages) + # We don't do anything with non-text frames, pass it along to next in the pipeline. + if not isinstance(frame, TextQueueFrame): + yield frame + return - yield frame + # Ignore transcription frames from the bot + if isinstance(frame, TranscriptionQueueFrame): + if frame.participantId == self.bot_participant_id: + return + + # The common case for "pass through" is receiving frames from the LLM that we'll + # use to update the "assistant" LLM messages, but also passing the text frames + # along to a TTS service to be spoken to the user. + if self.pass_through: + yield frame + + # TODO: split up transcription by participant + if self.complete_sentences: + self.sentence += frame.text # type: ignore -- the linter thinks this isn't a TextQueueFrame, even though we check it above + if self.sentence.endswith((".", "?", "!")): + self.messages.append({"role": self.role, "content": self.sentence}) + self.sentence = "" + yield LLMMessagesQueueFrame(self.messages) + else: + self.messages.append({"role": self.role, "content": frame.text}) # type: ignore -- the linter thinks this isn't a TextQueueFrame, even though we check it above + yield LLMMessagesQueueFrame(self.messages) + +class LLMUserContextAggregator(LLMContextAggregator): + def __init__(self, + messages: list[dict], + bot_participant_id=None, + complete_sentences=True): + super().__init__(messages, "user", bot_participant_id, complete_sentences, pass_through=False) + + +class LLMAssistantContextAggregator(LLMContextAggregator): + def __init__( + self, messages: list[dict], bot_participant_id=None, complete_sentences=True + ): + super().__init__( + messages, "assistan", bot_participant_id, complete_sentences, pass_through=True + ) diff --git a/src/dailyai/services/ai_services.py b/src/dailyai/services/ai_services.py index 3c5dba3c4..263cf186e 100644 --- a/src/dailyai/services/ai_services.py +++ b/src/dailyai/services/ai_services.py @@ -86,9 +86,7 @@ class LLMService(AIService): pass async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: - if isinstance(frame, ControlQueueFrame): - yield frame - elif isinstance(frame, LLMMessagesQueueFrame): + if isinstance(frame, LLMMessagesQueueFrame): async for text_chunk in self.run_llm_async(frame.messages): yield TextQueueFrame(text_chunk) @@ -111,11 +109,9 @@ class TTSService(AIService): yield bytes() async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: - if isinstance(frame, ControlQueueFrame): + if not isinstance(frame, TextQueueFrame): yield frame return - elif not isinstance(frame, TextQueueFrame): - return text: str | None = None if not self.aggregate_sentences: diff --git a/src/samples/foundational/06a-image-sync.py b/src/samples/foundational/06a-image-sync.py new file mode 100644 index 000000000..47bb025de --- /dev/null +++ b/src/samples/foundational/06a-image-sync.py @@ -0,0 +1,134 @@ +import argparse +import asyncio +from typing import AsyncGenerator +import requests +import time +import urllib.parse + +from PIL import Image +from dailyai.queue_frame import ImageQueueFrame, QueueFrame + +from dailyai.services.daily_transport_service import DailyTransportService +from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService +from dailyai.services.ai_services import AIService +from dailyai.queue_aggregators import LLMAssistantContextAggregator, LLMUserContextAggregator +from dailyai.services.fal_ai_services import FalImageGenService + + +class ImageSyncAggregator(AIService): + def __init__(self, speaking_path:str, waiting_path:str): + self._speaking_image = Image.open(speaking_path) + self._speaking_image_bytes = self._speaking_image.tobytes() + + self._waiting_image = Image.open(waiting_path) + self._waiting_image_bytes = self._waiting_image.tobytes() + + async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]: + yield ImageQueueFrame(None, self._speaking_image_bytes) + yield frame + yield ImageQueueFrame(None, self._waiting_image_bytes) + +async def main(room_url: str, token): + global transport + global llm + global tts + + transport = DailyTransportService( + room_url, + token, + "Respond bot", + 5, + ) + transport.camera_enabled = True + transport.camera_width = 1024 + transport.camera_height = 1024 + transport.mic_enabled = True + transport.mic_sample_rate = 16000 + + llm = AzureLLMService() + tts = AzureTTSService() + img = FalImageGenService(image_size="1024x1024") + + async def get_images(): + get_speaking_task = asyncio.create_task( + img.run_image_gen("An image of a cat speaking") + ) + get_waiting_task = asyncio.create_task( + img.run_image_gen("An image of a cat waiting") + ) + + (speaking_data, waiting_data) = await asyncio.gather( + get_speaking_task, get_waiting_task + ) + + return speaking_data, waiting_data + + @transport.event_handler("on_first_other_participant_joined") + async def on_first_other_participant_joined(transport): + await tts.say("Hi, I'm listening!", transport.send_queue) + + async def handle_transcriptions(): + messages = [ + {"role": "system", "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way."}, + ] + + tma_in = LLMUserContextAggregator( + messages, transport.my_participant_id + ) + tma_out = LLMAssistantContextAggregator( + messages, transport.my_participant_id + ) + image_sync_aggregator = ImageSyncAggregator( + "/Users/moishe/src/daily-ai-sdk/src/samples/foundational/speaking.png", + "/Users/moishe/src/daily-ai-sdk/src/samples/foundational/waiting.png", + ) + await tts.run_to_queue( + transport.send_queue, + image_sync_aggregator.run( + tma_out.run( + llm.run( + tma_in.run( + transport.get_receive_frames() + ) + ) + ) + ) + ) + + transport.transcription_settings["extra"]["punctuate"] = True + await asyncio.gather(transport.run(), handle_transcriptions()) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Simple Daily Bot Sample") + parser.add_argument( + "-u", "--url", type=str, required=True, help="URL of the Daily room to join" + ) + parser.add_argument( + "-k", + "--apikey", + type=str, + required=True, + help="Daily API Key (needed to create token)", + ) + + args, unknown = parser.parse_known_args() + + # Create a meeting token for the given room with an expiration 1 hour in the future. + room_name: str = urllib.parse.urlparse(args.url).path[1:] + expiration: float = time.time() + 60 * 60 + + res: requests.Response = requests.post( + f"https://api.daily.co/v1/meeting-tokens", + headers={"Authorization": f"Bearer {args.apikey}"}, + json={ + "properties": {"room_name": room_name, "is_owner": True, "exp": expiration} + }, + ) + + if res.status_code != 200: + raise Exception(f"Failed to create meeting token: {res.status_code} {res.text}") + + token: str = res.json()["token"] + + asyncio.run(main(args.url, token)) diff --git a/src/samples/foundational/07-interruptible.py b/src/samples/foundational/07-interruptible.py index cd18804af..927a5670f 100644 --- a/src/samples/foundational/07-interruptible.py +++ b/src/samples/foundational/07-interruptible.py @@ -25,6 +25,7 @@ async def main(room_url: str, token): transport.mic_enabled = True transport.mic_sample_rate = 16000 transport.camera_enabled = False + transport.start_transcription = True llm = AzureLLMService() tts = ElevenLabsTTSService(voice_id="ErXwobaYiN019PkySvjV") diff --git a/src/samples/foundational/speaking.png b/src/samples/foundational/speaking.png new file mode 100644 index 000000000..d7bab6fd7 Binary files /dev/null and b/src/samples/foundational/speaking.png differ diff --git a/src/samples/foundational/waiting.png b/src/samples/foundational/waiting.png new file mode 100644 index 000000000..01db5e7cd Binary files /dev/null and b/src/samples/foundational/waiting.png differ