Merge pull request #12 from daily-co/frame_sync

Speaking / waiting images
This commit is contained in:
Moishe Lettvin
2024-01-26 10:17:01 -05:00
committed by GitHub
7 changed files with 187 additions and 25 deletions

View File

@@ -2,7 +2,7 @@ import asyncio
import copy
import functools
from typing import AsyncGenerator, Awaitable, Callable
from dailyai.queue_aggregators import LLMContextAggregator
from dailyai.queue_aggregators import LLMAssistantContextAggregator, LLMContextAggregator, LLMUserContextAggregator
from dailyai.queue_frame import EndStreamQueueFrame, QueueFrame, TranscriptionQueueFrame
@@ -17,8 +17,8 @@ class InterruptibleConversationWrapper:
interrupt: Callable[[], None],
my_participant_id: str | None,
llm_messages: list[dict[str, str]],
llm_context_aggregator_in=LLMContextAggregator,
llm_context_aggregator_out=LLMContextAggregator,
llm_context_aggregator_in=LLMUserContextAggregator,
llm_context_aggregator_out=LLMAssistantContextAggregator,
delay_before_speech_seconds: float = 1.0,
):
self._frame_generator: Callable[[], AsyncGenerator[QueueFrame, None]] = frame_generator
@@ -43,10 +43,10 @@ class InterruptibleConversationWrapper:
async def speak_after_delay(self, user_speech, messages):
await asyncio.sleep(self._delay_before_speech_seconds)
tma_in = self._llm_context_aggregator_in(
messages, "user", self._my_participant_id, False
messages, self._my_participant_id, complete_sentences=False
)
tma_out = self._llm_context_aggregator_out(
messages, "assistant", self._my_participant_id
messages, self._my_participant_id
)
await self._runner(user_speech, tma_in, tma_out)

View File

@@ -1,6 +1,6 @@
import asyncio
from dailyai.queue_frame import LLMMessagesQueueFrame, QueueFrame, TextQueueFrame
from dailyai.queue_frame import LLMMessagesQueueFrame, QueueFrame, TextQueueFrame, TranscriptionQueueFrame
from dailyai.services.ai_services import AIService
from typing import AsyncGenerator, List
@@ -32,24 +32,55 @@ class LLMContextAggregator(AIService):
messages: list[dict],
role: str,
bot_participant_id=None,
complete_sentences=True):
complete_sentences=True,
pass_through=True):
self.messages = messages
self.bot_participant_id = bot_participant_id
self.role = role
self.sentence = ""
self.complete_sentences = complete_sentences
self.pass_through = pass_through
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
# TODO: split up transcription by participant
if isinstance(frame, TextQueueFrame):
if self.complete_sentences:
self.sentence += frame.text
if self.sentence.endswith((".", "?", "!")):
self.messages.append({"role": self.role, "content": self.sentence})
self.sentence = ""
yield LLMMessagesQueueFrame(self.messages)
else:
self.messages.append({"role": self.role, "content": frame.text})
yield LLMMessagesQueueFrame(self.messages)
# We don't do anything with non-text frames, pass it along to next in the pipeline.
if not isinstance(frame, TextQueueFrame):
yield frame
return
yield frame
# Ignore transcription frames from the bot
if isinstance(frame, TranscriptionQueueFrame):
if frame.participantId == self.bot_participant_id:
return
# The common case for "pass through" is receiving frames from the LLM that we'll
# use to update the "assistant" LLM messages, but also passing the text frames
# along to a TTS service to be spoken to the user.
if self.pass_through:
yield frame
# TODO: split up transcription by participant
if self.complete_sentences:
self.sentence += frame.text # type: ignore -- the linter thinks this isn't a TextQueueFrame, even though we check it above
if self.sentence.endswith((".", "?", "!")):
self.messages.append({"role": self.role, "content": self.sentence})
self.sentence = ""
yield LLMMessagesQueueFrame(self.messages)
else:
self.messages.append({"role": self.role, "content": frame.text}) # type: ignore -- the linter thinks this isn't a TextQueueFrame, even though we check it above
yield LLMMessagesQueueFrame(self.messages)
class LLMUserContextAggregator(LLMContextAggregator):
def __init__(self,
messages: list[dict],
bot_participant_id=None,
complete_sentences=True):
super().__init__(messages, "user", bot_participant_id, complete_sentences, pass_through=False)
class LLMAssistantContextAggregator(LLMContextAggregator):
def __init__(
self, messages: list[dict], bot_participant_id=None, complete_sentences=True
):
super().__init__(
messages, "assistan", bot_participant_id, complete_sentences, pass_through=True
)

View File

@@ -86,9 +86,7 @@ class LLMService(AIService):
pass
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if isinstance(frame, ControlQueueFrame):
yield frame
elif isinstance(frame, LLMMessagesQueueFrame):
if isinstance(frame, LLMMessagesQueueFrame):
async for text_chunk in self.run_llm_async(frame.messages):
yield TextQueueFrame(text_chunk)
@@ -111,11 +109,9 @@ class TTSService(AIService):
yield bytes()
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
if isinstance(frame, ControlQueueFrame):
if not isinstance(frame, TextQueueFrame):
yield frame
return
elif not isinstance(frame, TextQueueFrame):
return
text: str | None = None
if not self.aggregate_sentences:

View File

@@ -0,0 +1,134 @@
import argparse
import asyncio
from typing import AsyncGenerator
import requests
import time
import urllib.parse
from PIL import Image
from dailyai.queue_frame import ImageQueueFrame, QueueFrame
from dailyai.services.daily_transport_service import DailyTransportService
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.services.ai_services import AIService
from dailyai.queue_aggregators import LLMAssistantContextAggregator, LLMUserContextAggregator
from dailyai.services.fal_ai_services import FalImageGenService
class ImageSyncAggregator(AIService):
def __init__(self, speaking_path:str, waiting_path:str):
self._speaking_image = Image.open(speaking_path)
self._speaking_image_bytes = self._speaking_image.tobytes()
self._waiting_image = Image.open(waiting_path)
self._waiting_image_bytes = self._waiting_image.tobytes()
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
yield ImageQueueFrame(None, self._speaking_image_bytes)
yield frame
yield ImageQueueFrame(None, self._waiting_image_bytes)
async def main(room_url: str, token):
global transport
global llm
global tts
transport = DailyTransportService(
room_url,
token,
"Respond bot",
5,
)
transport.camera_enabled = True
transport.camera_width = 1024
transport.camera_height = 1024
transport.mic_enabled = True
transport.mic_sample_rate = 16000
llm = AzureLLMService()
tts = AzureTTSService()
img = FalImageGenService(image_size="1024x1024")
async def get_images():
get_speaking_task = asyncio.create_task(
img.run_image_gen("An image of a cat speaking")
)
get_waiting_task = asyncio.create_task(
img.run_image_gen("An image of a cat waiting")
)
(speaking_data, waiting_data) = await asyncio.gather(
get_speaking_task, get_waiting_task
)
return speaking_data, waiting_data
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
await tts.say("Hi, I'm listening!", transport.send_queue)
async def handle_transcriptions():
messages = [
{"role": "system", "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way."},
]
tma_in = LLMUserContextAggregator(
messages, transport.my_participant_id
)
tma_out = LLMAssistantContextAggregator(
messages, transport.my_participant_id
)
image_sync_aggregator = ImageSyncAggregator(
"/Users/moishe/src/daily-ai-sdk/src/samples/foundational/speaking.png",
"/Users/moishe/src/daily-ai-sdk/src/samples/foundational/waiting.png",
)
await tts.run_to_queue(
transport.send_queue,
image_sync_aggregator.run(
tma_out.run(
llm.run(
tma_in.run(
transport.get_receive_frames()
)
)
)
)
)
transport.transcription_settings["extra"]["punctuate"] = True
await asyncio.gather(transport.run(), handle_transcriptions())
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
type=str,
required=True,
help="Daily API Key (needed to create token)",
)
args, unknown = parser.parse_known_args()
# Create a meeting token for the given room with an expiration 1 hour in the future.
room_name: str = urllib.parse.urlparse(args.url).path[1:]
expiration: float = time.time() + 60 * 60
res: requests.Response = requests.post(
f"https://api.daily.co/v1/meeting-tokens",
headers={"Authorization": f"Bearer {args.apikey}"},
json={
"properties": {"room_name": room_name, "is_owner": True, "exp": expiration}
},
)
if res.status_code != 200:
raise Exception(f"Failed to create meeting token: {res.status_code} {res.text}")
token: str = res.json()["token"]
asyncio.run(main(args.url, token))

View File

@@ -25,6 +25,7 @@ async def main(room_url: str, token):
transport.mic_enabled = True
transport.mic_sample_rate = 16000
transport.camera_enabled = False
transport.start_transcription = True
llm = AzureLLMService()
tts = ElevenLabsTTSService(voice_id="ErXwobaYiN019PkySvjV")

Binary file not shown.

After

Width:  |  Height:  |  Size: 33 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 30 KiB