Compare commits
10 Commits
cb/telestr
...
khk-functi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5d6d674ff6 | ||
|
|
1e552958aa | ||
|
|
17edfe98bd | ||
|
|
5100a7599b | ||
|
|
18c2b37358 | ||
|
|
0244f358d2 | ||
|
|
85fe6c0580 | ||
|
|
ae7482ed18 | ||
|
|
90d928be99 | ||
|
|
0703b926a3 |
32
.github/workflows/lint.yaml
vendored
@@ -1,32 +0,0 @@
|
||||
name: lint
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
pull_request:
|
||||
branches:
|
||||
- "**"
|
||||
paths-ignore:
|
||||
- "docs/**"
|
||||
|
||||
concurrency:
|
||||
group: build-lint-${{ github.event.pull_request.number || github.ref }}
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
autopep8:
|
||||
name: "Formatting lints"
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout repo
|
||||
uses: actions/checkout@v4
|
||||
- name: autopep8
|
||||
id: autopep8
|
||||
uses: peter-evans/autopep8@v2
|
||||
with:
|
||||
args: --exit-code -r -d -a -a src/
|
||||
- name: Fail if autopep8 requires changes
|
||||
if: steps.autopep8.outputs.exit-code == 2
|
||||
run: exit 1
|
||||
196
README.md
@@ -1,78 +1,21 @@
|
||||
# dailyai — an open source framework for real-time, multi-modal, conversational AI applications
|
||||
# Daily AI SDK
|
||||
|
||||
Build things like this:
|
||||
Build conversational, multi-modal AI apps with real-time voice and video, like this:
|
||||
|
||||
[](https://www.youtube.com/watch?v=lDevgsp9vn0)
|
||||
_Demo Video to come_
|
||||
|
||||
With built-in support for many of the best AI platforms (or [add your own](/docs)):
|
||||
|
||||
- Azure - DALL-E, ChatGPT, and Azure AI Text-to-Speech
|
||||
- Deepgram - Speech-to-text, and Aura text-to-speech
|
||||
- Eleven Labs text-to-speech
|
||||
- Fal.ai image generation
|
||||
- OpenAI DALL-E and ChatGPT
|
||||
- Whisper local speech-to-text
|
||||
|
||||
## Step 1: Get Started
|
||||
|
||||
**`dailyai` started as a toolkit for implementing generative AI voice bots.** Things like personal coaches, meeting assistants, story-telling toys for kids, customer support bots, and snarky social companions.
|
||||
|
||||
|
||||
In 2023 a *lot* of us got excited about the possibility of having open-ended conversations with LLMs. It became clear pretty quickly that we were all solving the same [low-level problems](https://www.daily.co/blog/how-to-talk-to-an-llm-with-your-voice/):
|
||||
- low-latency, reliable audio transport
|
||||
- echo cancellation
|
||||
- phrase endpointing (knowing when the bot should respond to human speech)
|
||||
- interruptibility
|
||||
- writing clean code to stream data through "pipelines" of speech-to-text, LLM inference, and text-to-speech models
|
||||
|
||||
As our applications expanded to include additional things like image generation, function calling, and vision models, we started to think about what a complete framework for these kinds of apps could look like.
|
||||
|
||||
Today, `dailyai` is:
|
||||
|
||||
1. a set of code building blocks for interacting with generative AI services and creating low-latency, interruptible data pipelines that use multiple services
|
||||
2. transport services that moves audio, video, and events across the Internet
|
||||
3. implementations of specific generative AI services
|
||||
|
||||
Currently implemented services:
|
||||
- Speech-to-text
|
||||
- Deepgram
|
||||
- Whisper
|
||||
- LLMs
|
||||
- Azure
|
||||
- OpenAI
|
||||
- Image generation
|
||||
- Azure
|
||||
- Fal
|
||||
- OpenAI
|
||||
- Text-to-speech
|
||||
- Azure
|
||||
- Deepgram
|
||||
- ElevenLabs
|
||||
- Transport
|
||||
- Daily
|
||||
- Local (in progress, intended as a quick start example service)
|
||||
|
||||
If you'd like to [implement a service]((https://github.com/daily-co/daily-ai-sdk/tree/main/src/dailyai/services)), we welcome PRs! Our goal is to support lots of services in all of the above categories, plus new categories (like real-time video) as they emerge.
|
||||
|
||||
## Step 1: Get started
|
||||
|
||||
Today, the easiest way to get started with `dailyai` is to use [Daily](https://www.daily.co/) as your transport service. This toolkit started life as an internal SDK at Daily and millions of minutes of AI conversation have been served using it and its earlier prototype incarnations. (The [transport base class](https://github.com/daily-co/daily-ai-sdk/blob/main/src/dailyai/services/base_transport_service.py) is easy to extend, though, so feel free to submit PRs if you'd like to implement another transport service.)
|
||||
|
||||
```
|
||||
# install the module
|
||||
pip install dailyai
|
||||
|
||||
# set up an .env file with API keys
|
||||
cp dot-env.template .env
|
||||
|
||||
# sign up for a free Daily account, if you don't already have one, and
|
||||
# join the Daily room URL directly from a browser tab, then run one of the
|
||||
# samples
|
||||
python src/examples/foundational/02-llm-say-one-thing.py
|
||||
```
|
||||
|
||||
## Code examples
|
||||
|
||||
There are two directories of examples:
|
||||
|
||||
- [foundational](https://github.com/daily-co/daily-ai-sdk/tree/main/src/examples/foundational) — demos that build on each other, introducing one or two concepts at a time
|
||||
- [starter apps](https://github.com/daily-co/daily-ai-sdk/tree/main/src/examples/starter-apps) — complete applications that you can use as starting points for development
|
||||
|
||||
|
||||
|
||||
## Hacking on the framework itself
|
||||
## Build/Install
|
||||
|
||||
_Note that you may need to set up a virtual environment before following the instructions below. For instance, you might need to run the following from the root of the repo:_
|
||||
|
||||
@@ -99,3 +42,118 @@ If you want to use this package from another directory, you can run:
|
||||
```
|
||||
pip install path_to_this_repo
|
||||
```
|
||||
|
||||
## Running the samples
|
||||
|
||||
Tou can run the simple sample like so:
|
||||
|
||||
```
|
||||
python src/examples/theoretical-to-real/01-say-one-thing.py -u <url of your Daily meeting> -k <your Daily API Key>
|
||||
```
|
||||
## Overview
|
||||
|
||||
The Daily AI SDK allows you to build applications that can participate in WebRTC sessions and interact with AI Services. Some examples of what you can build with this:
|
||||
|
||||
- conversational bots that interact 1:1 with a user, using voice recognition and text-to-speech
|
||||
- assistant bots that aggregate transcriptions from multiple participants in a meeting and provide realtime summaries or other AI-generated output.
|
||||
- image-recognition bots
|
||||
- etc
|
||||
|
||||
## Concepts
|
||||
|
||||
### Transport Service
|
||||
|
||||
The SDK provides one “transport service”, which is a wrapper around Daily’s `daily-python` client (tk add link). You can use this service to listen for events related to a WebRTC session, such as “a participant joined the meeting”.
|
||||
The transport service also exposes a send queue, and a receive queue. You can use the send queue to send audio and video to the WebRTC session, and you can listen to the receive queue to see audio, video and transcription data from the WebRTC session.
|
||||
|
||||
### AI Services
|
||||
|
||||
The AI Service classes provide wrappers around various AI providers, and allow you to query LLMs, convert text to speech and make images from text. The audio and images can then be placed on the transport service’s send queue, where they’ll be sent to the WebRTC session.
|
||||
|
||||
### Queue Frames
|
||||
|
||||
Communication between the transport service and AI services, and between various AI services, takes place in Queue Frames. These frames contain an indication of the type of data as well as the data itself.
|
||||
|
||||
## Using Transports, AI Services and Frames
|
||||
|
||||
AI Services all define a `.run` method. This method consumes and generates `QueueFrame` frames. The kind of frames that can be consumed and generated depend on the kind of service. For instance, an LLM AI Service consumes `LLM_MESSAGE` frames (which define a history of interaction with an LLM) and emit `TEXT` frames (the response from the LLM).
|
||||
|
||||
The `.run` method is an `AsyncIterable`, and it takes an `iterable`, `AsyncIterable` or `asyncio.Queue` that produces QueueFrames as a parameter. This makes it easy to chain AI Services, and consume input from the Transport’s `receive_queue` .
|
||||
|
||||
AI Services also have a `.run_to_queue` method. This method is not an AsyncIterable, but instead sends processed QueueFrames to a queue. This makes it easy to send the output of an AI Service to the Transport’s `send_queue`.
|
||||
|
||||
AI Services also define convenience functions that let you bypass creating QueueFrames for some simple cases (eg. using the TTS service to convert a string to audio output and send that audio to the transport’s `send_queue`). See below for examples.
|
||||
|
||||
## Examples
|
||||
|
||||
### Say Something
|
||||
|
||||
The base TTS AI service exposes a `.say` method. After creating a transport and TTS service, you can use this method like so:
|
||||
|
||||
```
|
||||
transport = DailyTransportService(...)
|
||||
tts = AzureTTSService()
|
||||
await tts.say("hello world", transport.send_queue)
|
||||
```
|
||||
|
||||
This will call the TTS service to render the text to audio frames, then put the audio frames on the transport’s send queue. The transport will then send those frames along to the WebRTC session.
|
||||
|
||||
### Speak an LLM response
|
||||
|
||||
Given a system prompt contained in a `messages` array, you can emit the LLM’s response as audio with a chain like this:
|
||||
|
||||
```
|
||||
transport = DailyTransportService(...) # setup parameters omitted
|
||||
tts = AzureTTSService()
|
||||
llm = AzureLLMService()
|
||||
messages = [...] # system prompt omitted for brevity
|
||||
|
||||
await tts.run_to_queue(
|
||||
transport.send_queue,
|
||||
llm.run([QueueFrame.LLM_MESSAGES, messages])
|
||||
)
|
||||
```
|
||||
|
||||
In this code, the LLM service object sends the messages to Azure’s OpenAI implementation, which streams chunks back asynchronously. Those chunks are aggregated by the TTS Service to ensure the best audio response (TTS works best when it gets complete sentence, so it can inflect correctly), then sent to Azure’s TTS service, converted to audio frames, and sent to the WebRTC session via the Daily transport.
|
||||
|
||||
### Pre-cache an LLM response
|
||||
|
||||
Sometimes LLMs can be slower than we’d like for natural-feeling communication. Here’s an example where we take advantage of the time it takes to speak some pre-defined text to get a head start on the LLM response:
|
||||
|
||||
(TK link to 04- sample)
|
||||
|
||||
In this sample, we set up a buffer queue to receive the audio frames from the LLM response before while we are joining the call and start an asynchronous task to start filling this buffer:
|
||||
|
||||
```
|
||||
buffer_queue = asyncio.Queue()
|
||||
llm_response_task = asyncio.create_task(
|
||||
elevenlabs_tts.run_to_queue(
|
||||
buffer_queue,
|
||||
llm.run([QueueFrame(FrameType.LLM_MESSAGE, messages)]),
|
||||
True,
|
||||
)
|
||||
)
|
||||
```
|
||||
|
||||
Then, when we’ve joined the call, we speak the static text:
|
||||
|
||||
```
|
||||
await azure_tts.say("My friend...", transport.send_queue)
|
||||
```
|
||||
|
||||
As that text is being spoken, the asynchronous LLM task continues in the background. When the text is done, we pull the frames off the buffer queue and put them in the transport’s `send_queue`:
|
||||
|
||||
```
|
||||
async def buffer_to_send_queue():
|
||||
while True:
|
||||
frame = await buffer_queue.get()
|
||||
await transport.send_queue.put(frame)
|
||||
buffer_queue.task_done()
|
||||
if frame.frame_type == FrameType.END_STREAM:
|
||||
break
|
||||
|
||||
await asyncio.gather(llm_response_task, buffer_to_send_queue())
|
||||
|
||||
```
|
||||
|
||||
One thing to note here is the last parameter to `run_to_queue` in the first code clause above: this causes the `run_to_queue` method to send an `END_STREAM` frame when it’s done rendering. This lets us know when to stop our `buffer_to_send_queue` task above.
|
||||
|
||||
@@ -1,17 +1,2 @@
|
||||
# Daily AI SDK Architecture Guide
|
||||
|
||||
## Frames
|
||||
|
||||
Frames can represent discrete chunks of data, for instance a chunk of text, a chunk of audio, or an image. They can also be used to as control flow, for instance a frame that indicates that there is no more data available, or that a user started or stopped talking. They can also represent more complex data structures, such as a message array used for an LLM completion.
|
||||
|
||||
## FrameProcessors
|
||||
|
||||
Frame processors operate on frames. Every frame processor implements a `process_frame` method that consumes one frame and produces zero or more frames. Frame processors can do simple transforms, such as concatenating text fragments into sentences, or they can treat frames as input for an AI Service, and emit chat completions based on message arrays or transform text into audio or images.
|
||||
|
||||
## Pipelines
|
||||
|
||||
Pipelines are lists of frame processors that read from a source queue and send the processed frames to a sink queue. A very simple pipeline might chain an LLM frame processor to a text-to-speech frame processor, with a transport's send queue as its sync. Placing LLM message frames on the pipeline's source queue will cause the LLM's response to be spoken. See example #2 for an implementation of this.
|
||||
|
||||
## Transports
|
||||
|
||||
Transports provide a receive queue, which is input from "the outside world", and a sink queue, which is data that will be sent "to the outside world". The `LocalTransportService` does this with the local camera, mic, display and speaker. The `DailyTransportService` does this with a WebRTC session joined to a Daily.co room.
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
OPENAI_API_KEY=...
|
||||
ELEVENLABS_API_KEY=...
|
||||
ELEVENLABS_VOICE_ID=...
|
||||
DAILY_API_KEY=...
|
||||
DAILY_SAMPLE_ROOM_URL=https://...
|
||||
@@ -3,29 +3,16 @@ requires = ["setuptools"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "dailyai"
|
||||
version = "0.0.3.1"
|
||||
description = "An open source framework for real-time, multi-modal, conversational AI applications"
|
||||
license = { text = "BSD 2-Clause License" }
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.7"
|
||||
keywords = ["webrtc", "audio", "video", "ai"]
|
||||
classifiers = [
|
||||
"Development Status :: 5 - Production/Stable",
|
||||
"Intended Audience :: Developers",
|
||||
"License :: OSI Approved :: BSD License",
|
||||
"Topic :: Communications :: Conferencing",
|
||||
"Topic :: Multimedia :: Sound/Audio",
|
||||
"Topic :: Multimedia :: Video",
|
||||
"Topic :: Scientific/Engineering :: Artificial Intelligence"
|
||||
]
|
||||
name = "daily_ai"
|
||||
version = "0.0.1"
|
||||
description = "Orchestrator for AI bots with Daily"
|
||||
dependencies = [
|
||||
"aiohttp",
|
||||
"anthropic",
|
||||
"azure-cognitiveservices-speech",
|
||||
"daily-python",
|
||||
"fal",
|
||||
"faster_whisper",
|
||||
"groq",
|
||||
"google-cloud-texttospeech",
|
||||
"numpy",
|
||||
"openai",
|
||||
@@ -38,10 +25,6 @@ dependencies = [
|
||||
"typing-extensions"
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
Source = "https://github.com/daily-co/daily-ai-sdk"
|
||||
Website = "https://daily.co"
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
# All the following settings are optional:
|
||||
where = ["src"]
|
||||
|
||||
77
src/dailyai/conversation_wrappers.py
Normal file
@@ -0,0 +1,77 @@
|
||||
import asyncio
|
||||
import copy
|
||||
import functools
|
||||
from typing import AsyncGenerator, Awaitable, Callable
|
||||
from dailyai.queue_aggregators import LLMAssistantContextAggregator, LLMContextAggregator, LLMUserContextAggregator
|
||||
from dailyai.queue_frame import EndStreamQueueFrame, QueueFrame, TranscriptionQueueFrame
|
||||
|
||||
|
||||
class InterruptibleConversationWrapper:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
frame_generator: Callable[[], AsyncGenerator[QueueFrame, None]],
|
||||
runner: Callable[
|
||||
[str, LLMContextAggregator, LLMContextAggregator], Awaitable[None]
|
||||
],
|
||||
interrupt: Callable[[], None],
|
||||
my_participant_id: str | None,
|
||||
llm_messages: list[dict[str, str]],
|
||||
llm_context_aggregator_in=LLMUserContextAggregator,
|
||||
llm_context_aggregator_out=LLMAssistantContextAggregator,
|
||||
delay_before_speech_seconds: float = 1.0,
|
||||
):
|
||||
self._frame_generator: Callable[[], AsyncGenerator[QueueFrame, None]] = frame_generator
|
||||
self._runner: Callable[
|
||||
[str, LLMContextAggregator, LLMContextAggregator], Awaitable[None]
|
||||
] = runner
|
||||
self._interrupt: Callable[[], None] = interrupt
|
||||
self._my_participant_id = my_participant_id
|
||||
self._messages: list[dict[str, str]] = llm_messages
|
||||
self._delay_before_speech_seconds = delay_before_speech_seconds
|
||||
self._llm_context_aggregator_in = llm_context_aggregator_in
|
||||
self._llm_context_aggregator_out = llm_context_aggregator_out
|
||||
|
||||
self._current_phrase = ""
|
||||
|
||||
def update_messages(self, new_messages: list[dict[str, str]], task: asyncio.Task | None):
|
||||
if task:
|
||||
if not task.cancelled():
|
||||
self._current_phrase = ""
|
||||
self._messages = new_messages
|
||||
|
||||
async def speak_after_delay(self, user_speech, messages):
|
||||
await asyncio.sleep(self._delay_before_speech_seconds)
|
||||
tma_in = self._llm_context_aggregator_in(
|
||||
messages, self._my_participant_id, complete_sentences=False
|
||||
)
|
||||
tma_out = self._llm_context_aggregator_out(
|
||||
messages, self._my_participant_id
|
||||
)
|
||||
|
||||
await self._runner(user_speech, tma_in, tma_out)
|
||||
|
||||
async def run_conversation(self):
|
||||
current_response_task = None
|
||||
|
||||
async for frame in self._frame_generator():
|
||||
if isinstance(frame, EndStreamQueueFrame):
|
||||
break
|
||||
elif not isinstance(frame, TranscriptionQueueFrame):
|
||||
continue
|
||||
|
||||
if frame.participantId == self._my_participant_id:
|
||||
continue
|
||||
|
||||
if current_response_task:
|
||||
current_response_task.cancel()
|
||||
self._interrupt()
|
||||
|
||||
self._current_phrase += " " + frame.text
|
||||
current_llm_messages = copy.deepcopy(self._messages)
|
||||
current_response_task = asyncio.create_task(
|
||||
self.speak_after_delay(self._current_phrase, current_llm_messages)
|
||||
)
|
||||
current_response_task.add_done_callback(
|
||||
functools.partial(self.update_messages, current_llm_messages)
|
||||
)
|
||||
@@ -1,400 +0,0 @@
|
||||
import asyncio
|
||||
import re
|
||||
|
||||
from dailyai.pipeline.frame_processor import FrameProcessor
|
||||
|
||||
from dailyai.pipeline.frames import (
|
||||
EndFrame,
|
||||
AudioFrame,
|
||||
EndPipeFrame,
|
||||
Frame,
|
||||
ImageFrame,
|
||||
LLMMessagesQueueFrame,
|
||||
LLMResponseEndFrame,
|
||||
LLMResponseStartFrame,
|
||||
TextFrame,
|
||||
TranscriptionQueueFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
from dailyai.pipeline.pipeline import Pipeline
|
||||
from dailyai.services.ai_services import AIService
|
||||
|
||||
from typing import AsyncGenerator, Callable, Coroutine, List
|
||||
|
||||
from dailyai.services.openai_llm_context import OpenAILLMContext
|
||||
|
||||
|
||||
class ResponseAggregator(FrameProcessor):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
messages: list[dict] | None,
|
||||
role: str,
|
||||
start_frame,
|
||||
end_frame,
|
||||
accumulator_frame,
|
||||
pass_through=True,
|
||||
):
|
||||
self.aggregation = ""
|
||||
self.aggregating = False
|
||||
self.messages = messages
|
||||
self._role = role
|
||||
self._start_frame = start_frame
|
||||
self._end_frame = end_frame
|
||||
self._accumulator_frame = accumulator_frame
|
||||
self._pass_through = pass_through
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
if not self.messages:
|
||||
return
|
||||
|
||||
if isinstance(frame, self._start_frame):
|
||||
self.aggregating = True
|
||||
elif isinstance(frame, self._end_frame):
|
||||
self.aggregating = False
|
||||
# Sometimes VAD triggers quickly on and off. If we don't get any transcription,
|
||||
# it creates empty LLM message queue frames
|
||||
if len(self.aggregation) > 0:
|
||||
self.messages.append(
|
||||
{"role": self._role, "content": self.aggregation})
|
||||
self.aggregation = ""
|
||||
yield self._end_frame()
|
||||
yield LLMMessagesQueueFrame(self.messages)
|
||||
elif isinstance(frame, self._accumulator_frame) and self.aggregating:
|
||||
self.aggregation += f" {frame.text}"
|
||||
if self._pass_through:
|
||||
yield frame
|
||||
else:
|
||||
yield frame
|
||||
|
||||
|
||||
class LLMResponseAggregator(ResponseAggregator):
|
||||
def __init__(self, messages: list[dict]):
|
||||
super().__init__(
|
||||
messages=messages,
|
||||
role="assistant",
|
||||
start_frame=LLMResponseStartFrame,
|
||||
end_frame=LLMResponseEndFrame,
|
||||
accumulator_frame=TextFrame,
|
||||
)
|
||||
|
||||
|
||||
class UserResponseAggregator(ResponseAggregator):
|
||||
def __init__(self, messages: list[dict]):
|
||||
super().__init__(
|
||||
messages=messages,
|
||||
role="user",
|
||||
start_frame=UserStartedSpeakingFrame,
|
||||
end_frame=UserStoppedSpeakingFrame,
|
||||
accumulator_frame=TranscriptionQueueFrame,
|
||||
pass_through=False,
|
||||
)
|
||||
|
||||
|
||||
class LLMContextAggregator(AIService):
|
||||
def __init__(
|
||||
self,
|
||||
messages: list[dict],
|
||||
role: str,
|
||||
bot_participant_id=None,
|
||||
complete_sentences=True,
|
||||
pass_through=True,
|
||||
):
|
||||
super().__init__()
|
||||
self.messages = messages
|
||||
self.bot_participant_id = bot_participant_id
|
||||
self.role = role
|
||||
self.sentence = ""
|
||||
self.complete_sentences = complete_sentences
|
||||
self.pass_through = pass_through
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
# We don't do anything with non-text frames, pass it along to next in
|
||||
# the pipeline.
|
||||
if not isinstance(frame, TextFrame):
|
||||
yield frame
|
||||
return
|
||||
|
||||
# Ignore transcription frames from the bot
|
||||
if isinstance(frame, TranscriptionQueueFrame):
|
||||
if frame.participantId == self.bot_participant_id:
|
||||
return
|
||||
|
||||
# The common case for "pass through" is receiving frames from the LLM that we'll
|
||||
# use to update the "assistant" LLM messages, but also passing the text frames
|
||||
# along to a TTS service to be spoken to the user.
|
||||
if self.pass_through:
|
||||
yield frame
|
||||
|
||||
# TODO: split up transcription by participant
|
||||
if self.complete_sentences:
|
||||
# type: ignore -- the linter thinks this isn't a TextQueueFrame, even
|
||||
# though we check it above
|
||||
self.sentence += frame.text
|
||||
if self.sentence.endswith((".", "?", "!")):
|
||||
self.messages.append(
|
||||
{"role": self.role, "content": self.sentence})
|
||||
self.sentence = ""
|
||||
yield LLMMessagesQueueFrame(self.messages)
|
||||
else:
|
||||
# type: ignore -- the linter thinks this isn't a TextQueueFrame, even
|
||||
# though we check it above
|
||||
self.messages.append({"role": self.role, "content": frame.text})
|
||||
yield LLMMessagesQueueFrame(self.messages)
|
||||
|
||||
|
||||
class LLMUserContextAggregator(LLMContextAggregator):
|
||||
def __init__(
|
||||
self,
|
||||
messages: list[dict],
|
||||
bot_participant_id=None,
|
||||
complete_sentences=True):
|
||||
super().__init__(
|
||||
messages,
|
||||
"user",
|
||||
bot_participant_id,
|
||||
complete_sentences,
|
||||
pass_through=False)
|
||||
|
||||
|
||||
class LLMAssistantContextAggregator(LLMContextAggregator):
|
||||
def __init__(
|
||||
self,
|
||||
messages: list[dict],
|
||||
bot_participant_id=None,
|
||||
complete_sentences=True):
|
||||
super().__init__(
|
||||
messages,
|
||||
"assistant",
|
||||
bot_participant_id,
|
||||
complete_sentences,
|
||||
pass_through=True,
|
||||
)
|
||||
|
||||
|
||||
class SentenceAggregator(FrameProcessor):
|
||||
"""This frame processor aggregates text frames into complete sentences.
|
||||
|
||||
Frame input/output:
|
||||
TextFrame("Hello,") -> None
|
||||
TextFrame(" world.") -> TextFrame("Hello world.")
|
||||
|
||||
Doctest:
|
||||
>>> async def print_frames(aggregator, frame):
|
||||
... async for frame in aggregator.process_frame(frame):
|
||||
... print(frame.text)
|
||||
|
||||
>>> aggregator = SentenceAggregator()
|
||||
>>> asyncio.run(print_frames(aggregator, TextFrame("Hello,")))
|
||||
>>> asyncio.run(print_frames(aggregator, TextFrame(" world.")))
|
||||
Hello, world.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.aggregation = ""
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
if isinstance(frame, TextFrame):
|
||||
m = re.search("(.*[?.!])(.*)", frame.text)
|
||||
if m:
|
||||
yield TextFrame(self.aggregation + m.group(1))
|
||||
self.aggregation = m.group(2)
|
||||
else:
|
||||
self.aggregation += frame.text
|
||||
elif isinstance(frame, EndFrame):
|
||||
if self.aggregation:
|
||||
yield TextFrame(self.aggregation)
|
||||
yield frame
|
||||
else:
|
||||
yield frame
|
||||
|
||||
|
||||
class LLMFullResponseAggregator(FrameProcessor):
|
||||
"""This class aggregates Text frames until it receives a
|
||||
LLMResponseEndFrame, then emits the concatenated text as
|
||||
a single text frame.
|
||||
|
||||
given the following frames:
|
||||
|
||||
TextFrame("Hello,")
|
||||
TextFrame(" world.")
|
||||
TextFrame(" I am")
|
||||
TextFrame(" an LLM.")
|
||||
LLMResponseEndFrame()]
|
||||
|
||||
this processor will yield nothing for the first 4 frames, then
|
||||
|
||||
TextFrame("Hello, world. I am an LLM.")
|
||||
LLMResponseEndFrame()
|
||||
|
||||
when passed the last frame.
|
||||
|
||||
>>> async def print_frames(aggregator, frame):
|
||||
... async for frame in aggregator.process_frame(frame):
|
||||
... if isinstance(frame, TextFrame):
|
||||
... print(frame.text)
|
||||
... else:
|
||||
... print(frame.__class__.__name__)
|
||||
|
||||
>>> aggregator = LLMFullResponseAggregator()
|
||||
>>> asyncio.run(print_frames(aggregator, TextFrame("Hello,")))
|
||||
>>> asyncio.run(print_frames(aggregator, TextFrame(" world.")))
|
||||
>>> asyncio.run(print_frames(aggregator, TextFrame(" I am")))
|
||||
>>> asyncio.run(print_frames(aggregator, TextFrame(" an LLM.")))
|
||||
>>> asyncio.run(print_frames(aggregator, LLMResponseEndFrame()))
|
||||
Hello, world. I am an LLM.
|
||||
LLMResponseEndFrame
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.aggregation = ""
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
if not isinstance(frame, AudioFrame):
|
||||
print(f"^^^ LFRA got frame: {frame}")
|
||||
if isinstance(frame, TextFrame):
|
||||
self.aggregation += frame.text
|
||||
print(
|
||||
f"^^^ LFRA got textframe. aggregation is now {self.aggregation}")
|
||||
elif isinstance(frame, LLMResponseEndFrame):
|
||||
print(
|
||||
f"^^^ LFRA got an llmresponseendframe. About to yield aggregation: {self.aggregation}")
|
||||
yield TextFrame(self.aggregation)
|
||||
yield frame
|
||||
self.aggregation = ""
|
||||
else:
|
||||
yield frame
|
||||
|
||||
|
||||
class StatelessTextTransformer(FrameProcessor):
|
||||
"""This processor calls the given function on any text in a text frame.
|
||||
|
||||
>>> async def print_frames(aggregator, frame):
|
||||
... async for frame in aggregator.process_frame(frame):
|
||||
... print(frame.text)
|
||||
|
||||
>>> aggregator = StatelessTextTransformer(lambda x: x.upper())
|
||||
>>> asyncio.run(print_frames(aggregator, TextFrame("Hello")))
|
||||
HELLO
|
||||
"""
|
||||
|
||||
def __init__(self, transform_fn):
|
||||
self.transform_fn = transform_fn
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
if isinstance(frame, TextFrame):
|
||||
result = self.transform_fn(frame.text)
|
||||
if isinstance(result, Coroutine):
|
||||
result = await result
|
||||
|
||||
yield TextFrame(result)
|
||||
else:
|
||||
yield frame
|
||||
|
||||
|
||||
class ParallelPipeline(FrameProcessor):
|
||||
"""Run multiple pipelines in parallel.
|
||||
|
||||
This class takes frames from its source queue and sends them to each
|
||||
sub-pipeline. Each sub-pipeline emits its frames into this class's
|
||||
sink queue. No guarantees are made about the ordering of frames in
|
||||
the sink queue (that is, no sub-pipeline has higher priority than
|
||||
any other, frames are put on the sink in the order they're emitted
|
||||
by the sub-pipelines).
|
||||
|
||||
After each frame is taken from this class's source queue and placed
|
||||
in each sub-pipeline's source queue, an EndPipeFrame is put on each
|
||||
sub-pipeline's source queue. This indicates to the sub-pipe runner
|
||||
that it should exit.
|
||||
|
||||
Since frame handlers pass through unhandled frames by convention, this
|
||||
class de-dupes frames in its sink before yielding them.
|
||||
"""
|
||||
|
||||
def __init__(self, pipeline_definitions: List[List[FrameProcessor]]):
|
||||
self.sources = [asyncio.Queue() for _ in pipeline_definitions]
|
||||
self.sink: asyncio.Queue[Frame] = asyncio.Queue()
|
||||
self.pipelines: list[Pipeline] = [
|
||||
Pipeline(
|
||||
pipeline_definition,
|
||||
source,
|
||||
self.sink,
|
||||
)
|
||||
for source, pipeline_definition in zip(self.sources, pipeline_definitions)
|
||||
]
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
for source in self.sources:
|
||||
await source.put(frame)
|
||||
await source.put(EndPipeFrame())
|
||||
|
||||
await asyncio.gather(*[pipeline.run_pipeline() for pipeline in self.pipelines])
|
||||
|
||||
seen_ids = set()
|
||||
while not self.sink.empty():
|
||||
frame = await self.sink.get()
|
||||
|
||||
# de-dup frames. Because the convention is to yield a frame that isn't processed,
|
||||
# each pipeline will likely yield the same frame, so we will end up with _n_ copies
|
||||
# of unprocessed frames where _n_ is the number of parallel pipes that don't
|
||||
# process that frame.
|
||||
if id(frame) in seen_ids:
|
||||
continue
|
||||
seen_ids.add(id(frame))
|
||||
|
||||
# Skip passing along EndParallelPipeQueueFrame, because we use them
|
||||
# for our own flow control.
|
||||
if not isinstance(frame, EndPipeFrame):
|
||||
yield frame
|
||||
|
||||
|
||||
class GatedAggregator(FrameProcessor):
|
||||
"""Accumulate frames, with custom functions to start and stop accumulation.
|
||||
Yields gate-opening frame before any accumulated frames, then ensuing frames
|
||||
until and not including the gate-closed frame.
|
||||
|
||||
>>> async def print_frames(aggregator, frame):
|
||||
... async for frame in aggregator.process_frame(frame):
|
||||
... if isinstance(frame, TextFrame):
|
||||
... print(frame.text)
|
||||
... else:
|
||||
... print(frame.__class__.__name__)
|
||||
|
||||
>>> aggregator = GatedAggregator(
|
||||
... gate_close_fn=lambda x: isinstance(x, LLMResponseStartFrame),
|
||||
... gate_open_fn=lambda x: isinstance(x, ImageFrame),
|
||||
... start_open=False)
|
||||
>>> asyncio.run(print_frames(aggregator, TextFrame("Hello")))
|
||||
>>> asyncio.run(print_frames(aggregator, TextFrame("Hello again.")))
|
||||
>>> asyncio.run(print_frames(aggregator, ImageFrame(url='', image=bytes([]))))
|
||||
ImageFrame
|
||||
Hello
|
||||
Hello again.
|
||||
>>> asyncio.run(print_frames(aggregator, TextFrame("Goodbye.")))
|
||||
Goodbye.
|
||||
"""
|
||||
|
||||
def __init__(self, gate_open_fn, gate_close_fn, start_open):
|
||||
self.gate_open_fn = gate_open_fn
|
||||
self.gate_close_fn = gate_close_fn
|
||||
self.gate_open = start_open
|
||||
self.accumulator: List[Frame] = []
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
if self.gate_open:
|
||||
if self.gate_close_fn(frame):
|
||||
self.gate_open = False
|
||||
else:
|
||||
if self.gate_open_fn(frame):
|
||||
self.gate_open = True
|
||||
|
||||
if self.gate_open:
|
||||
yield frame
|
||||
if self.accumulator:
|
||||
for frame in self.accumulator:
|
||||
yield frame
|
||||
self.accumulator = []
|
||||
else:
|
||||
self.accumulator.append(frame)
|
||||
@@ -1,33 +0,0 @@
|
||||
from abc import abstractmethod
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from dailyai.pipeline.frames import ControlFrame, Frame
|
||||
|
||||
|
||||
class FrameProcessor:
|
||||
"""This is the base class for all frame processors. Frame processors consume a frame
|
||||
and yield 0 or more frames. Generally frame processors are used as part of a pipeline
|
||||
where frames come from a source queue, are processed by a series of frame processors,
|
||||
then placed on a sink queue.
|
||||
|
||||
By convention, FrameProcessors should immediately yield any frames they don't process.
|
||||
|
||||
Stateful FrameProcessors should watch for the EndStreamQueueFrame and finalize their
|
||||
output, eg. yielding an unfinished sentence if they're aggregating LLM output to full
|
||||
sentences. EndStreamQueueFrame is also a chance to clean up any services that need to
|
||||
be closed, del'd, etc.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
async def process_frame(
|
||||
self, frame: Frame
|
||||
) -> AsyncGenerator[Frame, None]:
|
||||
"""Process a single frame and yield 0 or more frames."""
|
||||
if isinstance(frame, ControlFrame):
|
||||
yield frame
|
||||
yield frame
|
||||
|
||||
@abstractmethod
|
||||
async def interrupted(self) -> None:
|
||||
"""Handle any cleanup if the pipeline was interrupted."""
|
||||
pass
|
||||
@@ -1,211 +0,0 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, List
|
||||
|
||||
from dailyai.services.openai_llm_context import OpenAILLMContext
|
||||
|
||||
|
||||
class Frame:
|
||||
def __str__(self):
|
||||
return f"{self.__class__.__name__}"
|
||||
|
||||
|
||||
class ControlFrame(Frame):
|
||||
# Control frames should contain no instance data, so
|
||||
# equality is based solely on the class.
|
||||
def __eq__(self, other):
|
||||
return isinstance(other, self.__class__)
|
||||
|
||||
|
||||
class StartFrame(ControlFrame):
|
||||
"""Used (but not required) to start a pipeline, and is also used to
|
||||
indicate that an interruption has ended and the transport should start
|
||||
processing frames again."""
|
||||
pass
|
||||
|
||||
|
||||
class EndFrame(ControlFrame):
|
||||
"""Indicates that a pipeline has ended and frame processors and pipelines
|
||||
should be shut down. If the transport receives this frame, it will stop
|
||||
sending frames to its output channel(s) and close all its threads."""
|
||||
pass
|
||||
|
||||
|
||||
class EndPipeFrame(ControlFrame):
|
||||
"""Indicates that a pipeline has ended but that the transport should
|
||||
continue processing. This frame is used in parallel pipelines and other
|
||||
sub-pipelines."""
|
||||
pass
|
||||
|
||||
|
||||
class PipelineStartedFrame(ControlFrame):
|
||||
"""
|
||||
Used by the transport to indicate that execution of a pipeline is starting
|
||||
(or restarting). It should be the first frame your app receives when it
|
||||
starts, or when an interruptible pipeline has been interrupted.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class LLMResponseStartFrame(ControlFrame):
|
||||
"""Used to indicate the beginning of an LLM response. Following TextFrames
|
||||
are part of the LLM response until an LLMResponseEndFrame"""
|
||||
pass
|
||||
|
||||
|
||||
class LLMResponseEndFrame(ControlFrame):
|
||||
"""Indicates the end of an LLM response."""
|
||||
pass
|
||||
|
||||
|
||||
@dataclass()
|
||||
class AudioFrame(Frame):
|
||||
"""A chunk of audio. Will be played by the transport if the transport's mic
|
||||
has been enabled."""
|
||||
data: bytes
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.__class__.__name__}, size: {len(self.data)} B"
|
||||
|
||||
|
||||
@dataclass()
|
||||
class ImageFrame(Frame):
|
||||
"""An image. Will be shown by the transport if the transport's camera is
|
||||
enabled."""
|
||||
url: str | None
|
||||
image: bytes
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.__class__.__name__}, url: {self.url}, image size: {len(self.image)} B"
|
||||
|
||||
|
||||
@dataclass()
|
||||
class SpriteFrame(Frame):
|
||||
"""An animated sprite. Will be shown by the transport if the transport's
|
||||
camera is enabled. Will play at the framerate specified in the transport's
|
||||
`fps` constructor parameter."""
|
||||
images: list[bytes]
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.__class__.__name__}, list size: {len(self.images)}"
|
||||
|
||||
|
||||
@dataclass()
|
||||
class TextFrame(Frame):
|
||||
"""A chunk of text. Emitted by LLM services, consumed by TTS services, can
|
||||
be used to send text through pipelines."""
|
||||
text: str
|
||||
|
||||
def __str__(self):
|
||||
return f'{self.__class__.__name__}: "{self.text}"'
|
||||
|
||||
|
||||
@dataclass()
|
||||
class TranscriptionQueueFrame(TextFrame):
|
||||
"""A text frame with transcription-specific data. Will be placed in the
|
||||
transport's receive queue when a participant speaks."""
|
||||
participantId: str
|
||||
timestamp: str
|
||||
|
||||
|
||||
@dataclass()
|
||||
class LLMMessagesQueueFrame(Frame):
|
||||
"""A frame containing a list of LLM messages. Used to signal that an LLM
|
||||
service should run a chat completion and emit an LLMStartFrames, TextFrames
|
||||
and an LLMEndFrame.
|
||||
Note that the messages property on this class is mutable, and will be
|
||||
be updated by various ResponseAggregator frame processors."""
|
||||
messages: List[dict]
|
||||
|
||||
|
||||
@dataclass()
|
||||
class OpenAILLMContextFrame(Frame):
|
||||
"""Like an LLMMessagesQueueFrame, but with extra context specific to the
|
||||
OpenAI API. The context in this message is also mutable, and will be
|
||||
changed by the OpenAIContextAggregator frame processor."""
|
||||
context: OpenAILLMContext
|
||||
|
||||
|
||||
@dataclass()
|
||||
class ReceivedAppMessageFrame(Frame):
|
||||
message: Any
|
||||
sender: str
|
||||
|
||||
def __str__(self):
|
||||
return f"ReceivedAppMessageFrame: sender: {self.sender}, message: {self.message}"
|
||||
|
||||
|
||||
@dataclass()
|
||||
class SendAppMessageFrame(Frame):
|
||||
message: Any
|
||||
participantId: str | None
|
||||
|
||||
def __str__(self):
|
||||
return f"SendAppMessageFrame: participantId: {self.participantId}, message: {self.message}"
|
||||
|
||||
|
||||
class UserStartedSpeakingFrame(Frame):
|
||||
"""Emitted by VAD to indicate that a participant has started speaking.
|
||||
This can be used for interruptions or other times when detecting that
|
||||
someone is speaking is more important than knowing what they're saying
|
||||
(as you will with a TranscriptionFrame)"""
|
||||
pass
|
||||
|
||||
|
||||
class UserStoppedSpeakingFrame(Frame):
|
||||
"""Emitted by the VAD to indicate that a user stopped speaking."""
|
||||
pass
|
||||
|
||||
|
||||
class BotStartedSpeakingFrame(Frame):
|
||||
pass
|
||||
|
||||
|
||||
class BotStoppedSpeakingFrame(Frame):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass()
|
||||
class LLMFunctionStartFrame(Frame):
|
||||
"""Emitted when the LLM receives the beginning of a function call
|
||||
completion. A frame processor can use this frame to indicate that it should
|
||||
start preparing to make a function call, if it can do so in the absence of
|
||||
any arguments."""
|
||||
function_name: str
|
||||
|
||||
|
||||
@dataclass()
|
||||
class LLMFunctionCallFrame(Frame):
|
||||
"""Emitted when the LLM has received an entire function call completion."""
|
||||
function_name: str
|
||||
arguments: str
|
||||
|
||||
|
||||
@dataclass()
|
||||
class VideoImageFrame(Frame):
|
||||
"""Contains a still image from a partcipant's video stream."""
|
||||
participantId: str
|
||||
image: bytes
|
||||
|
||||
# def __str__(self):
|
||||
# return f"{self.__class__.__name__}, participantId: {self.participantId}, image size: {len(self.image)} B"
|
||||
|
||||
|
||||
class TelestratorImageFrame(ImageFrame):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass()
|
||||
class VisionFrame(Frame):
|
||||
prompt: str
|
||||
image: bytes
|
||||
|
||||
# def __str__(self):
|
||||
# return f"{self.__class__.__name__}, prompt: {self.prompt}, image size: {len(self.image)} B"
|
||||
|
||||
|
||||
@dataclass()
|
||||
class RequestVideoImageFrame(Frame):
|
||||
"""Send to the transport to request a new video image from a specific participant. Leave participantId
|
||||
empty to request a frame from all participants."""
|
||||
participantId: str | None
|
||||
@@ -1,24 +0,0 @@
|
||||
from typing import List
|
||||
from dailyai.pipeline.frames import EndFrame, EndPipeFrame
|
||||
from dailyai.pipeline.pipeline import Pipeline
|
||||
|
||||
|
||||
class SequentialMergePipeline(Pipeline):
|
||||
"""This class merges the sink queues from a list of pipelines. Frames from
|
||||
each pipeline's sink are merged in the order of pipelines in the list."""
|
||||
|
||||
def __init__(self, pipelines: List[Pipeline]):
|
||||
super().__init__([])
|
||||
self.pipelines = pipelines
|
||||
|
||||
async def run_pipeline(self):
|
||||
for pipeline in self.pipelines:
|
||||
while True:
|
||||
frame = await pipeline.sink.get()
|
||||
if isinstance(
|
||||
frame, EndFrame) or isinstance(
|
||||
frame, EndPipeFrame):
|
||||
break
|
||||
await self.sink.put(frame)
|
||||
|
||||
await self.sink.put(EndFrame())
|
||||
@@ -1,109 +0,0 @@
|
||||
from typing import Any, AsyncGenerator, Callable
|
||||
from dailyai.pipeline.frame_processor import FrameProcessor
|
||||
from dailyai.pipeline.frames import (
|
||||
Frame,
|
||||
LLMResponseEndFrame,
|
||||
LLMResponseStartFrame,
|
||||
OpenAILLMContextFrame,
|
||||
TextFrame,
|
||||
TranscriptionQueueFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
from dailyai.services.openai_llm_context import OpenAILLMContext
|
||||
|
||||
from openai.types.chat import ChatCompletionRole
|
||||
|
||||
|
||||
class OpenAIContextAggregator(FrameProcessor):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
context: OpenAILLMContext,
|
||||
aggregator: Callable[[Frame, str | None], str | None],
|
||||
role: ChatCompletionRole,
|
||||
start_frame: type,
|
||||
end_frame: type,
|
||||
accumulator_frame: type,
|
||||
pass_through=True,
|
||||
):
|
||||
if not (
|
||||
issubclass(start_frame, Frame)
|
||||
and issubclass(end_frame, Frame)
|
||||
and issubclass(accumulator_frame, Frame)
|
||||
):
|
||||
raise TypeError(
|
||||
"start_frame, end_frame and accumulator_frame must be instances of Frame"
|
||||
)
|
||||
|
||||
self._context: OpenAILLMContext = context
|
||||
self._aggregator: Callable[[Frame, str | None], None] = aggregator
|
||||
self._role: ChatCompletionRole = role
|
||||
self._start_frame = start_frame
|
||||
self._end_frame = end_frame
|
||||
self._accumulator_frame = accumulator_frame
|
||||
self._pass_through = pass_through
|
||||
|
||||
self._aggregating = False
|
||||
self._aggregation = None
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
if isinstance(frame, self._start_frame):
|
||||
self._aggregating = True
|
||||
elif isinstance(frame, self._end_frame):
|
||||
self._aggregating = False
|
||||
if self._aggregation:
|
||||
self._context.add_message(
|
||||
{
|
||||
"role": self._role,
|
||||
"content": self._aggregation,
|
||||
"name": self._role,
|
||||
} # type: ignore
|
||||
)
|
||||
self._aggregation = None
|
||||
yield OpenAILLMContextFrame(self._context)
|
||||
elif isinstance(frame, self._accumulator_frame) and self._aggregating:
|
||||
self._aggregation = self._aggregator(frame, self._aggregation)
|
||||
if self._pass_through:
|
||||
yield frame
|
||||
else:
|
||||
yield frame
|
||||
|
||||
def string_aggregator(
|
||||
self,
|
||||
frame: Frame,
|
||||
aggregation: str | None) -> str | None:
|
||||
if not isinstance(frame, TextFrame):
|
||||
raise TypeError(
|
||||
"Frame must be a TextFrame instance to be aggregated by a string aggregator."
|
||||
)
|
||||
if not aggregation:
|
||||
aggregation = ""
|
||||
return " ".join([aggregation, frame.text])
|
||||
|
||||
|
||||
class OpenAIUserContextAggregator(OpenAIContextAggregator):
|
||||
def __init__(self, context: OpenAILLMContext):
|
||||
super().__init__(
|
||||
context=context,
|
||||
aggregator=self.string_aggregator,
|
||||
role="user",
|
||||
start_frame=UserStartedSpeakingFrame,
|
||||
end_frame=UserStoppedSpeakingFrame,
|
||||
accumulator_frame=TranscriptionQueueFrame,
|
||||
pass_through=False,
|
||||
)
|
||||
|
||||
|
||||
class OpenAIAssistantContextAggregator(OpenAIContextAggregator):
|
||||
|
||||
def __init__(self, context: OpenAILLMContext):
|
||||
super().__init__(
|
||||
context,
|
||||
aggregator=self.string_aggregator,
|
||||
role="assistant",
|
||||
start_frame=LLMResponseStartFrame,
|
||||
end_frame=LLMResponseEndFrame,
|
||||
accumulator_frame=TextFrame,
|
||||
pass_through=True,
|
||||
)
|
||||
@@ -1,110 +0,0 @@
|
||||
import asyncio
|
||||
from typing import AsyncGenerator, AsyncIterable, Iterable, List
|
||||
from dailyai.pipeline.frame_processor import FrameProcessor
|
||||
|
||||
from dailyai.pipeline.frames import EndPipeFrame, EndFrame, Frame
|
||||
|
||||
|
||||
class Pipeline:
|
||||
"""
|
||||
This class manages a pipe of FrameProcessors, and runs them in sequence. The "source"
|
||||
and "sink" queues are managed by the caller. You can use this class stand-alone to
|
||||
perform specialized processing, or you can use the Transport's run_pipeline method to
|
||||
instantiate and run a pipeline with the Transport's sink and source queues.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
processors: List[FrameProcessor],
|
||||
source: asyncio.Queue | None = None,
|
||||
sink: asyncio.Queue[Frame] | None = None
|
||||
):
|
||||
"""Create a new pipeline. By default we create the sink and source queues
|
||||
if they're not provided, but these can be overridden to point to other
|
||||
queues. If this pipeline is run by a transport, its sink and source queues
|
||||
will be overridden.
|
||||
"""
|
||||
self.processors: List[FrameProcessor] = processors
|
||||
|
||||
self.source: asyncio.Queue[Frame] = source or asyncio.Queue()
|
||||
self.sink: asyncio.Queue[Frame] = sink or asyncio.Queue()
|
||||
|
||||
def set_source(self, source: asyncio.Queue[Frame]):
|
||||
"""Set the source queue for this pipeline. Frames from this queue
|
||||
will be processed by each frame_processor in the pipeline, or order
|
||||
from first to last."""
|
||||
self.source = source
|
||||
|
||||
def set_sink(self, sink: asyncio.Queue[Frame]):
|
||||
"""Set the sink queue for this pipeline. After the last frame_processor
|
||||
has processed a frame, its output will be placed on this queue."""
|
||||
self.sink = sink
|
||||
|
||||
async def get_next_source_frame(self) -> AsyncGenerator[Frame, None]:
|
||||
"""Convenience function to get the next frame from the source queue. This
|
||||
lets us consistently have an AsyncGenerator yield frames, from either the
|
||||
source queue or a frame_processor."""
|
||||
|
||||
yield await self.source.get()
|
||||
|
||||
async def queue_frames(
|
||||
self,
|
||||
frames: Iterable[Frame] | AsyncIterable[Frame],
|
||||
) -> None:
|
||||
"""Insert frames directly into a pipeline. This is typically used inside a transport
|
||||
participant_joined callback to prompt a bot to start a conversation, for example."""
|
||||
|
||||
if isinstance(frames, AsyncIterable):
|
||||
async for frame in frames:
|
||||
await self.source.put(frame)
|
||||
elif isinstance(frames, Iterable):
|
||||
for frame in frames:
|
||||
await self.source.put(frame)
|
||||
else:
|
||||
raise Exception("Frames must be an iterable or async iterable")
|
||||
|
||||
async def run_pipeline(self):
|
||||
"""Run the pipeline. Take each frame from the source queue, pass it to
|
||||
the first frame_processor, pass the output of that frame_processor to the
|
||||
next in the list, etc. until the last frame_processor has processed the
|
||||
resulting frames, then place those frames in the sink queue.
|
||||
|
||||
The source and sink queues must be set before calling this method.
|
||||
|
||||
This method will exit when an EndStreamQueueFrame is placed on the sink queue.
|
||||
No more frames will be placed on the sink queue after an EndStreamQueueFrame, even
|
||||
if it's not the last frame yielded by the last frame_processor in the pipeline..
|
||||
"""
|
||||
|
||||
try:
|
||||
while True:
|
||||
initial_frame = await self.source.get()
|
||||
async for frame in self._run_pipeline_recursively(
|
||||
initial_frame, self.processors
|
||||
):
|
||||
await self.sink.put(frame)
|
||||
|
||||
if isinstance(initial_frame, EndFrame) or isinstance(
|
||||
initial_frame, EndPipeFrame
|
||||
):
|
||||
break
|
||||
except asyncio.CancelledError:
|
||||
# this means there's been an interruption, do any cleanup necessary
|
||||
# here.
|
||||
for processor in self.processors:
|
||||
await processor.interrupted()
|
||||
pass
|
||||
|
||||
async def _run_pipeline_recursively(
|
||||
self, initial_frame: Frame, processors: List[FrameProcessor]
|
||||
) -> AsyncGenerator[Frame, None]:
|
||||
"""Internal function to add frames to the pipeline as they're yielded
|
||||
by each processor."""
|
||||
if processors:
|
||||
async for frame in processors[0].process_frame(initial_frame):
|
||||
async for final_frame in self._run_pipeline_recursively(
|
||||
frame, processors[1:]
|
||||
):
|
||||
yield final_frame
|
||||
else:
|
||||
yield initial_frame
|
||||
98
src/dailyai/queue_aggregators.py
Normal file
@@ -0,0 +1,98 @@
|
||||
import asyncio
|
||||
|
||||
from dailyai.queue_frame import LLMMessagesQueueFrame, QueueFrame, TextQueueFrame, TranscriptionQueueFrame
|
||||
from dailyai.services.ai_services import AIService
|
||||
|
||||
from typing import AsyncGenerator, List
|
||||
|
||||
|
||||
class QueueTee:
|
||||
async def run_to_queue_and_generate(
|
||||
self,
|
||||
output_queue: asyncio.Queue,
|
||||
generator: AsyncGenerator[QueueFrame, None]
|
||||
) -> AsyncGenerator[QueueFrame, None]:
|
||||
async for frame in generator:
|
||||
await output_queue.put(frame)
|
||||
yield frame
|
||||
|
||||
async def run_to_queues(
|
||||
self,
|
||||
output_queues: List[asyncio.Queue],
|
||||
generator: AsyncGenerator[QueueFrame, None]
|
||||
):
|
||||
async for frame in generator:
|
||||
for queue in output_queues:
|
||||
await queue.put(frame)
|
||||
|
||||
|
||||
class LLMContextAggregator(AIService):
|
||||
def __init__(
|
||||
self,
|
||||
messages: list[dict],
|
||||
role: str,
|
||||
bot_participant_id=None,
|
||||
complete_sentences=True,
|
||||
pass_through=True):
|
||||
super().__init__()
|
||||
self.messages = messages
|
||||
self.bot_participant_id = bot_participant_id
|
||||
self.role = role
|
||||
self.sentence = ""
|
||||
self.complete_sentences = complete_sentences
|
||||
self.pass_through = pass_through
|
||||
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
# We don't do anything with non-text frames, pass it along to next in the pipeline.
|
||||
if not isinstance(frame, TextQueueFrame):
|
||||
yield frame
|
||||
return
|
||||
|
||||
# Ignore transcription frames from the bot
|
||||
if isinstance(frame, TranscriptionQueueFrame):
|
||||
if frame.participantId == self.bot_participant_id:
|
||||
return
|
||||
|
||||
# The common case for "pass through" is receiving frames from the LLM that we'll
|
||||
# use to update the "assistant" LLM messages, but also passing the text frames
|
||||
# along to a TTS service to be spoken to the user.
|
||||
if self.pass_through:
|
||||
yield frame
|
||||
|
||||
# TODO: split up transcription by participant
|
||||
if self.complete_sentences:
|
||||
# type: ignore -- the linter thinks this isn't a TextQueueFrame, even
|
||||
# though we check it above
|
||||
self.sentence += frame.text
|
||||
if self.sentence.endswith((".", "?", "!")):
|
||||
self.messages.append({"role": self.role, "content": self.sentence})
|
||||
self.sentence = ""
|
||||
yield LLMMessagesQueueFrame(self.messages)
|
||||
else:
|
||||
# type: ignore -- the linter thinks this isn't a TextQueueFrame, even
|
||||
# though we check it above
|
||||
self.messages.append({"role": self.role, "content": frame.text})
|
||||
yield LLMMessagesQueueFrame(self.messages)
|
||||
|
||||
async def finalize(self) -> AsyncGenerator[QueueFrame, None]:
|
||||
# Send any dangling words that weren't finished with punctuation.
|
||||
if self.complete_sentences and self.sentence:
|
||||
self.messages.append({"role": self.role, "content": self.sentence})
|
||||
yield LLMMessagesQueueFrame(self.messages)
|
||||
|
||||
|
||||
class LLMUserContextAggregator(LLMContextAggregator):
|
||||
def __init__(self,
|
||||
messages: list[dict],
|
||||
bot_participant_id=None,
|
||||
complete_sentences=True):
|
||||
super().__init__(messages, "user", bot_participant_id, complete_sentences, pass_through=False)
|
||||
|
||||
|
||||
class LLMAssistantContextAggregator(LLMContextAggregator):
|
||||
def __init__(
|
||||
self, messages: list[dict], bot_participant_id=None, complete_sentences=True
|
||||
):
|
||||
super().__init__(
|
||||
messages, "assistant", bot_participant_id, complete_sentences, pass_through=True
|
||||
)
|
||||
79
src/dailyai/queue_frame.py
Normal file
@@ -0,0 +1,79 @@
|
||||
from enum import Enum
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
|
||||
class QueueFrame:
|
||||
pass
|
||||
|
||||
|
||||
class ControlQueueFrame(QueueFrame):
|
||||
pass
|
||||
|
||||
|
||||
class StartStreamQueueFrame(ControlQueueFrame):
|
||||
pass
|
||||
|
||||
|
||||
class EndStreamQueueFrame(ControlQueueFrame):
|
||||
pass
|
||||
|
||||
|
||||
class LLMResponseEndQueueFrame(QueueFrame):
|
||||
pass
|
||||
|
||||
|
||||
class UserStartedSpeakingFrame(QueueFrame):
|
||||
pass
|
||||
|
||||
|
||||
class UserStoppedSpeakingFrame(QueueFrame):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass()
|
||||
class AudioQueueFrame(QueueFrame):
|
||||
data: bytes
|
||||
|
||||
|
||||
@dataclass()
|
||||
class ImageQueueFrame(QueueFrame):
|
||||
url: str | None
|
||||
image: bytes
|
||||
|
||||
|
||||
@dataclass()
|
||||
class SpriteQueueFrame(QueueFrame):
|
||||
images: list[bytes]
|
||||
|
||||
|
||||
@dataclass()
|
||||
class TextQueueFrame(QueueFrame):
|
||||
text: str
|
||||
|
||||
|
||||
@dataclass()
|
||||
class TextQueueOutOfBandFrame(TextQueueFrame):
|
||||
outOfBand: bool = True
|
||||
|
||||
|
||||
@dataclass()
|
||||
class TTSCompletedFrame(QueueFrame):
|
||||
text: str
|
||||
outOfBand: bool = False
|
||||
|
||||
|
||||
@dataclass()
|
||||
class TranscriptionQueueFrame(TextQueueFrame):
|
||||
participantId: str
|
||||
timestamp: str
|
||||
|
||||
|
||||
@dataclass()
|
||||
class LLMMessagesQueueFrame(QueueFrame):
|
||||
messages: list[dict[str, str]] # TODO: define this more concretely!
|
||||
|
||||
|
||||
class AppMessageQueueFrame(QueueFrame):
|
||||
message: Any
|
||||
participantId: str
|
||||
3
src/dailyai/requirements.txt
Normal file
@@ -0,0 +1,3 @@
|
||||
Pillow==10.1.0
|
||||
typing_extensions==4.9.0
|
||||
faster-whisper==0.10.0
|
||||
@@ -2,39 +2,119 @@ import asyncio
|
||||
import io
|
||||
import logging
|
||||
import time
|
||||
import datetime
|
||||
import wave
|
||||
from dailyai.pipeline.frame_processor import FrameProcessor
|
||||
|
||||
from dailyai.pipeline.frames import (
|
||||
AudioFrame,
|
||||
EndFrame,
|
||||
EndPipeFrame,
|
||||
ImageFrame,
|
||||
from dailyai.queue_frame import (
|
||||
QueueFrame,
|
||||
AudioQueueFrame,
|
||||
ControlQueueFrame,
|
||||
EndStreamQueueFrame,
|
||||
ImageQueueFrame,
|
||||
LLMMessagesQueueFrame,
|
||||
LLMResponseEndFrame,
|
||||
LLMResponseStartFrame,
|
||||
LLMFunctionStartFrame,
|
||||
LLMFunctionCallFrame,
|
||||
Frame,
|
||||
TextFrame,
|
||||
LLMResponseEndQueueFrame,
|
||||
QueueFrame,
|
||||
TextQueueFrame,
|
||||
TTSCompletedFrame,
|
||||
TranscriptionQueueFrame,
|
||||
VisionFrame
|
||||
UserStoppedSpeakingFrame
|
||||
)
|
||||
|
||||
from abc import abstractmethod
|
||||
from typing import AsyncGenerator, BinaryIO
|
||||
from typing import AsyncGenerator, AsyncIterable, BinaryIO, Iterable
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
class AIService(FrameProcessor):
|
||||
class AIService:
|
||||
|
||||
def __init__(self):
|
||||
self.logger = logging.getLogger("dailyai")
|
||||
|
||||
def stop(self):
|
||||
pass
|
||||
|
||||
async def run_to_queue(self, queue: asyncio.Queue, frames, add_end_of_stream=False) -> None:
|
||||
async for frame in self.run(frames):
|
||||
await queue.put(frame)
|
||||
|
||||
if add_end_of_stream:
|
||||
await queue.put(EndStreamQueueFrame())
|
||||
|
||||
async def run(
|
||||
self,
|
||||
frames: Iterable[QueueFrame]
|
||||
| AsyncIterable[QueueFrame]
|
||||
| asyncio.Queue[QueueFrame],
|
||||
) -> AsyncGenerator[QueueFrame, None]:
|
||||
try:
|
||||
if isinstance(frames, AsyncIterable):
|
||||
async for frame in frames:
|
||||
async for output_frame in self.process_frame(frame):
|
||||
yield output_frame
|
||||
elif isinstance(frames, Iterable):
|
||||
for frame in frames:
|
||||
async for output_frame in self.process_frame(frame):
|
||||
yield output_frame
|
||||
elif isinstance(frames, asyncio.Queue):
|
||||
while True:
|
||||
frame = await frames.get()
|
||||
async for output_frame in self.process_frame(frame):
|
||||
yield output_frame
|
||||
if isinstance(frame, EndStreamQueueFrame):
|
||||
break
|
||||
else:
|
||||
raise Exception("Frames must be an iterable or async iterable")
|
||||
|
||||
async for output_frame in self.finalize():
|
||||
yield output_frame
|
||||
except Exception as e:
|
||||
self.logger.error("Exception occurred while running AI service", e)
|
||||
raise e
|
||||
|
||||
@abstractmethod
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
if isinstance(frame, ControlQueueFrame):
|
||||
yield frame
|
||||
|
||||
@abstractmethod
|
||||
async def finalize(self) -> AsyncGenerator[QueueFrame, None]:
|
||||
# This is a trick for the interpreter (and linter) to know that this is a generator.
|
||||
if False:
|
||||
yield QueueFrame()
|
||||
|
||||
|
||||
class LLMService(AIService):
|
||||
"""This class is a no-op but serves as a base class for LLM services."""
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, context):
|
||||
super().__init__()
|
||||
self._context = context
|
||||
|
||||
@abstractmethod
|
||||
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
|
||||
yield ""
|
||||
|
||||
@abstractmethod
|
||||
async def run_llm(self, messages) -> str:
|
||||
pass
|
||||
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
print(f"##### process frame got a frame, {type(frame)}")
|
||||
if isinstance(frame, UserStoppedSpeakingFrame):
|
||||
print(
|
||||
f"### Got a user stopped speaking frame, context is {self._context}")
|
||||
async for chunk in self.run_llm_async(self._context):
|
||||
# if we get a string, wrap it in a frame
|
||||
if isinstance(chunk, str):
|
||||
yield TextQueueFrame(chunk)
|
||||
# if we get a frame, pass it through
|
||||
elif isinstance(chunk, QueueFrame):
|
||||
print(f"### Got a frame chunk: {chunk}")
|
||||
yield chunk
|
||||
else:
|
||||
print(f"### Got an unknown chunk: {chunk}")
|
||||
yield LLMResponseEndQueueFrame()
|
||||
else:
|
||||
yield frame
|
||||
|
||||
|
||||
class TTSService(AIService):
|
||||
@@ -54,15 +134,14 @@ class TTSService(AIService):
|
||||
# yield empty bytes here, so linting can infer what this method does
|
||||
yield bytes()
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
if isinstance(frame, EndFrame) or isinstance(frame, EndPipeFrame):
|
||||
if self.current_sentence:
|
||||
async for audio_chunk in self.run_tts(self.current_sentence):
|
||||
yield AudioFrame(audio_chunk)
|
||||
yield TextFrame(self.current_sentence)
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
if not isinstance(frame, TextQueueFrame):
|
||||
# We don't want transcription frames, which are a subclass
|
||||
yield frame
|
||||
return
|
||||
|
||||
if not isinstance(frame, TextFrame):
|
||||
print(f"*** tts yielding non-text: {frame}")
|
||||
# TODO-CB: Clean this up
|
||||
if isinstance(frame, TranscriptionQueueFrame):
|
||||
yield frame
|
||||
return
|
||||
|
||||
@@ -71,18 +150,26 @@ class TTSService(AIService):
|
||||
text = frame.text
|
||||
else:
|
||||
self.current_sentence += frame.text
|
||||
if self.current_sentence.strip().endswith((".", "?", "!")):
|
||||
if self.current_sentence.endswith((".", "?", "!")):
|
||||
text = self.current_sentence
|
||||
self.current_sentence = ""
|
||||
|
||||
if text:
|
||||
async for audio_chunk in self.run_tts(text):
|
||||
yield AudioFrame(audio_chunk)
|
||||
size = 8000
|
||||
for i in range(0, len(audio_chunk), size):
|
||||
yield AudioQueueFrame(audio_chunk[i: i+size])
|
||||
print("### ABOUT TO YIELD TTS COMPLETED FRAME", frame)
|
||||
yield TTSCompletedFrame(text, hasattr(frame, 'outOfBand') and frame.outOfBand)
|
||||
|
||||
# note we pass along the text frame *after* the audio, so the text
|
||||
# frame is completed after the audio is processed.
|
||||
print(f"*** tts yielding text: {text}")
|
||||
yield TextFrame(text)
|
||||
async def finalize(self):
|
||||
if self.current_sentence:
|
||||
async for audio_chunk in self.run_tts(self.current_sentence):
|
||||
yield AudioQueueFrame(audio_chunk)
|
||||
|
||||
# Convenience function to send the audio for a sentence to the given queue
|
||||
async def say(self, sentence, queue: asyncio.Queue):
|
||||
await self.run_to_queue(queue, [TextQueueFrame(sentence)])
|
||||
|
||||
|
||||
class ImageGenService(AIService):
|
||||
@@ -95,13 +182,13 @@ class ImageGenService(AIService):
|
||||
async def run_image_gen(self, sentence: str) -> tuple[str, bytes]:
|
||||
pass
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
if not isinstance(frame, TextFrame):
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
if not isinstance(frame, TextQueueFrame):
|
||||
yield frame
|
||||
return
|
||||
|
||||
(url, image_data) = await self.run_image_gen(frame.text)
|
||||
yield ImageFrame(url, image_data)
|
||||
yield ImageQueueFrame(url, image_data)
|
||||
|
||||
|
||||
class STTService(AIService):
|
||||
@@ -118,9 +205,9 @@ class STTService(AIService):
|
||||
"""Returns transcript as a string"""
|
||||
pass
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
"""Processes a frame of audio data, either buffering or transcribing it."""
|
||||
if not isinstance(frame, AudioFrame):
|
||||
if not isinstance(frame, AudioQueueFrame):
|
||||
return
|
||||
|
||||
data = frame.data
|
||||
@@ -133,28 +220,7 @@ class STTService(AIService):
|
||||
ww.close()
|
||||
content.seek(0)
|
||||
text = await self.run_stt(content)
|
||||
yield TranscriptionQueueFrame(text, "", str(time.time()))
|
||||
|
||||
|
||||
class VisionService(AIService):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
# Renders the image. Returns an Image object.
|
||||
# TODO-CB: return type
|
||||
@abstractmethod
|
||||
async def run_vision(self, prompt: str, image: bytes):
|
||||
pass
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
if isinstance(frame, VisionFrame):
|
||||
async for frame in self.run_vision(frame.prompt, frame.image):
|
||||
print(
|
||||
f"&&& visionservce processframe got frame to yield: {frame}")
|
||||
yield frame
|
||||
yield LLMResponseEndFrame()
|
||||
else:
|
||||
yield frame
|
||||
yield TranscriptionQueueFrame(text, '', str(time.time()))
|
||||
|
||||
|
||||
class FrameLogger(AIService):
|
||||
@@ -162,11 +228,11 @@ class FrameLogger(AIService):
|
||||
super().__init__(**kwargs)
|
||||
self.prefix = prefix
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
if isinstance(frame, (AudioFrame)):
|
||||
# self.logger.info(f"{self.prefix}: {type(frame)}")
|
||||
pass
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
if isinstance(frame, (AudioQueueFrame, ImageQueueFrame)):
|
||||
self.logger.info(
|
||||
f"{datetime.datetime.utcnow().isoformat()} {self.prefix}: {type(frame)}")
|
||||
else:
|
||||
print(f"{self.prefix}: {frame}")
|
||||
print(f"{datetime.datetime.utcnow().isoformat()} {self.prefix}: {frame}")
|
||||
|
||||
yield frame
|
||||
|
||||
@@ -1,39 +0,0 @@
|
||||
import asyncio
|
||||
import os
|
||||
from typing import AsyncGenerator
|
||||
from anthropic import AsyncAnthropic
|
||||
from dailyai.pipeline.frames import Frame, LLMMessagesQueueFrame, TextFrame
|
||||
|
||||
from dailyai.services.ai_services import LLMService
|
||||
|
||||
|
||||
class AnthropicLLMService(LLMService):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
api_key,
|
||||
model="claude-3-opus-20240229",
|
||||
max_tokens=1024):
|
||||
super().__init__()
|
||||
self.client = AsyncAnthropic(api_key=api_key)
|
||||
self.model = model
|
||||
self.max_tokens = max_tokens
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
if not isinstance(frame, LLMMessagesQueueFrame):
|
||||
yield frame
|
||||
|
||||
stream = await self.client.messages.create(
|
||||
max_tokens=self.max_tokens,
|
||||
messages=[
|
||||
{
|
||||
"role": "user",
|
||||
"content": "Hello, Claude",
|
||||
}
|
||||
],
|
||||
model=self.model,
|
||||
stream=True,
|
||||
)
|
||||
async for event in stream:
|
||||
if event.type == "content_block_delta":
|
||||
yield TextFrame(event.delta.text)
|
||||
@@ -2,7 +2,6 @@ import aiohttp
|
||||
import asyncio
|
||||
import io
|
||||
import json
|
||||
import time
|
||||
from openai import AsyncAzureOpenAI
|
||||
|
||||
import os
|
||||
@@ -14,37 +13,27 @@ from dailyai.services.ai_services import LLMService, TTSService, ImageGenService
|
||||
from PIL import Image
|
||||
|
||||
# See .env.example for Azure configuration needed
|
||||
from azure.cognitiveservices.speech import (
|
||||
SpeechSynthesizer,
|
||||
SpeechConfig,
|
||||
ResultReason,
|
||||
CancellationReason,
|
||||
)
|
||||
|
||||
from dailyai.services.openai_api_llm_service import BaseOpenAILLMService
|
||||
from azure.cognitiveservices.speech import SpeechSynthesizer, SpeechConfig, ResultReason, CancellationReason
|
||||
|
||||
|
||||
class AzureTTSService(TTSService):
|
||||
def __init__(self, *, api_key, region, voice="en-US-SaraNeural"):
|
||||
def __init__(self, *, api_key, region):
|
||||
super().__init__()
|
||||
|
||||
self.speech_config = SpeechConfig(subscription=api_key, region=region)
|
||||
self.speech_synthesizer = SpeechSynthesizer(
|
||||
speech_config=self.speech_config, audio_config=None
|
||||
)
|
||||
self._voice = voice
|
||||
speech_config=self.speech_config, audio_config=None)
|
||||
|
||||
async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]:
|
||||
self.logger.info("Running azure tts")
|
||||
ssml = (
|
||||
"<speak version='1.0' xml:lang='en-US' xmlns='http://www.w3.org/2001/10/synthesis' "
|
||||
"xmlns:mstts='http://www.w3.org/2001/mstts'>"
|
||||
f"<voice name='{self._voice}'>"
|
||||
"<mstts:silence type='Sentenceboundary' value='20ms' />"
|
||||
"<mstts:express-as style='lyrical' styledegree='2' role='SeniorFemale'>"
|
||||
"<prosody rate='1.05'>"
|
||||
f"{sentence}"
|
||||
"</prosody></mstts:express-as></voice></speak> ")
|
||||
ssml = "<speak version='1.0' xml:lang='en-US' xmlns='http://www.w3.org/2001/10/synthesis' " \
|
||||
"xmlns:mstts='http://www.w3.org/2001/mstts'>" \
|
||||
"<voice name='en-US-SaraNeural'>" \
|
||||
"<mstts:silence type='Sentenceboundary' value='20ms' />" \
|
||||
"<mstts:express-as style='lyrical' styledegree='2' role='SeniorFemale'>" \
|
||||
"<prosody rate='1.05'>" \
|
||||
f"{sentence}" \
|
||||
"</prosody></mstts:express-as></voice></speak> "
|
||||
result = await asyncio.to_thread(self.speech_synthesizer.speak_ssml, (ssml))
|
||||
self.logger.info("Got azure tts result")
|
||||
if result.reason == ResultReason.SynthesizingAudioCompleted:
|
||||
@@ -53,49 +42,58 @@ class AzureTTSService(TTSService):
|
||||
yield result.audio_data[44:]
|
||||
elif result.reason == ResultReason.Canceled:
|
||||
cancellation_details = result.cancellation_details
|
||||
self.logger.info(
|
||||
"Speech synthesis canceled: {}".format(
|
||||
cancellation_details.reason))
|
||||
self.logger.info("Speech synthesis canceled: {}".format(
|
||||
cancellation_details.reason))
|
||||
if cancellation_details.reason == CancellationReason.Error:
|
||||
self.logger.info(
|
||||
"Error details: {}".format(
|
||||
cancellation_details.error_details))
|
||||
self.logger.info("Error details: {}".format(
|
||||
cancellation_details.error_details))
|
||||
|
||||
|
||||
class AzureLLMService(BaseOpenAILLMService):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
api_key,
|
||||
endpoint,
|
||||
api_version="2023-12-01-preview",
|
||||
model):
|
||||
self._endpoint = endpoint
|
||||
self._api_version = api_version
|
||||
|
||||
super().__init__(api_key=api_key, model=model)
|
||||
class AzureLLMService(LLMService):
|
||||
def __init__(self, *, api_key, endpoint, api_version="2023-12-01-preview", model, context):
|
||||
super().__init__(context)
|
||||
self._model: str = model
|
||||
|
||||
def create_client(self, api_key=None, base_url=None):
|
||||
self._client = AsyncAzureOpenAI(
|
||||
api_key=api_key,
|
||||
azure_endpoint=self._endpoint,
|
||||
api_version=self._api_version,
|
||||
azure_endpoint=endpoint,
|
||||
api_version=api_version,
|
||||
)
|
||||
|
||||
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
|
||||
messages_for_log = json.dumps(messages)
|
||||
self.logger.debug(f"Generating chat via azure: {messages_for_log}")
|
||||
|
||||
chunks = await self._client.chat.completions.create(model=self._model, stream=True, messages=messages)
|
||||
async for chunk in chunks:
|
||||
if len(chunk.choices) == 0:
|
||||
continue
|
||||
|
||||
if chunk.choices[0].delta.content:
|
||||
yield chunk.choices[0].delta.content
|
||||
|
||||
async def run_llm(self, messages) -> str | None:
|
||||
messages_for_log = json.dumps(messages)
|
||||
self.logger.debug(f"Generating chat via azure: {messages_for_log}")
|
||||
|
||||
response = await self._client.chat.completions.create(model=self._model, stream=False, messages=messages)
|
||||
if response and len(response.choices) > 0:
|
||||
return response.choices[0].message.content
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
class AzureImageGenServiceREST(ImageGenService):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
api_version="2023-06-01-preview",
|
||||
image_size: str,
|
||||
aiohttp_session: aiohttp.ClientSession,
|
||||
api_key,
|
||||
endpoint,
|
||||
model,
|
||||
):
|
||||
self,
|
||||
*,
|
||||
api_version="2023-06-01-preview",
|
||||
image_size: str,
|
||||
aiohttp_session: aiohttp.ClientSession,
|
||||
api_key,
|
||||
endpoint,
|
||||
model):
|
||||
super().__init__(image_size=image_size)
|
||||
|
||||
self._api_key = api_key
|
||||
@@ -106,9 +104,8 @@ class AzureImageGenServiceREST(ImageGenService):
|
||||
|
||||
async def run_image_gen(self, sentence) -> tuple[str, bytes]:
|
||||
url = f"{self._azure_endpoint}openai/images/generations:submit?api-version={self._api_version}"
|
||||
headers = {
|
||||
"api-key": self._api_key,
|
||||
"Content-Type": "application/json"}
|
||||
headers = {"api-key": self._api_key,
|
||||
"Content-Type": "application/json"}
|
||||
body = {
|
||||
# Enter your prompt text here
|
||||
"prompt": sentence,
|
||||
@@ -118,10 +115,11 @@ class AzureImageGenServiceREST(ImageGenService):
|
||||
async with self._aiohttp_session.post(
|
||||
url, headers=headers, json=body
|
||||
) as submission:
|
||||
print(f"submission: {submission}")
|
||||
# We never get past this line, because this header isn't
|
||||
# defined on a 429 response, but something is eating our
|
||||
# exceptions!
|
||||
operation_location = submission.headers["operation-location"]
|
||||
# defined on a 429 response, but something is eating our exceptions!
|
||||
operation_location = submission.headers['operation-location']
|
||||
print(f"submission status: {submission.status}")
|
||||
status = ""
|
||||
attempts_left = 120
|
||||
json_response = None
|
||||
@@ -137,12 +135,12 @@ class AzureImageGenServiceREST(ImageGenService):
|
||||
json_response = await response.json()
|
||||
status = json_response["status"]
|
||||
|
||||
image_url = (
|
||||
json_response["result"]["data"][0]["url"] if json_response else None)
|
||||
image_url = json_response["result"]["data"][0]["url"] if json_response else None
|
||||
if not image_url:
|
||||
raise Exception("Image generation failed")
|
||||
# Load the image from the url
|
||||
async with self._aiohttp_session.get(image_url) as response:
|
||||
image_stream = io.BytesIO(await response.content.read())
|
||||
image = Image.open(image_stream)
|
||||
print("i got an image file!")
|
||||
return (image_url, image.tobytes())
|
||||
|
||||
@@ -1,60 +1,66 @@
|
||||
from abc import abstractmethod
|
||||
import asyncio
|
||||
import copy
|
||||
import functools
|
||||
import itertools
|
||||
import logging
|
||||
import numpy as np
|
||||
import pyaudio
|
||||
import torch
|
||||
import queue
|
||||
import threading
|
||||
import time
|
||||
from typing import Any, AsyncGenerator
|
||||
from typing import AsyncGenerator
|
||||
import numpy as np
|
||||
import pyaudio
|
||||
import torch
|
||||
import torchaudio
|
||||
from enum import Enum
|
||||
from dailyai.pipeline.frame_processor import FrameProcessor
|
||||
import datetime
|
||||
import traceback
|
||||
|
||||
from dailyai.pipeline.frames import (
|
||||
SendAppMessageFrame,
|
||||
AudioFrame,
|
||||
EndFrame,
|
||||
ImageFrame,
|
||||
Frame,
|
||||
PipelineStartedFrame,
|
||||
SpriteFrame,
|
||||
StartFrame,
|
||||
TextFrame,
|
||||
from typing import AsyncGenerator, AsyncIterable, BinaryIO, Iterable
|
||||
from dailyai.queue_aggregators import LLMAssistantContextAggregator, LLMUserContextAggregator
|
||||
|
||||
from dailyai.queue_frame import (
|
||||
AudioQueueFrame,
|
||||
EndStreamQueueFrame,
|
||||
ImageQueueFrame,
|
||||
QueueFrame,
|
||||
SpriteQueueFrame,
|
||||
StartStreamQueueFrame,
|
||||
TranscriptionQueueFrame,
|
||||
TTSCompletedFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
RequestVideoImageFrame,
|
||||
TelestratorImageFrame
|
||||
UserStoppedSpeakingFrame
|
||||
)
|
||||
from dailyai.pipeline.pipeline import Pipeline
|
||||
from dailyai.services.ai_services import TTSService
|
||||
|
||||
torch.set_num_threads(1)
|
||||
|
||||
model, utils = torch.hub.load(
|
||||
repo_or_dir="snakers4/silero-vad", model="silero_vad", force_reload=False
|
||||
)
|
||||
model, utils = torch.hub.load(repo_or_dir='snakers4/silero-vad',
|
||||
model='silero_vad',
|
||||
force_reload=False)
|
||||
|
||||
(get_speech_timestamps, save_audio, read_audio, VADIterator, collect_chunks) = utils
|
||||
(get_speech_timestamps,
|
||||
save_audio,
|
||||
read_audio,
|
||||
VADIterator,
|
||||
collect_chunks) = utils
|
||||
|
||||
# Taken from utils_vad.py
|
||||
|
||||
|
||||
def validate(model, inputs: torch.Tensor):
|
||||
def validate(model,
|
||||
inputs: torch.Tensor):
|
||||
with torch.no_grad():
|
||||
outs = model(inputs)
|
||||
return outs
|
||||
|
||||
|
||||
# Provided by Alexander Veysov
|
||||
|
||||
|
||||
def int2float(sound):
|
||||
abs_max = np.abs(sound).max()
|
||||
sound = sound.astype("float32")
|
||||
sound = sound.astype('float32')
|
||||
if abs_max > 0:
|
||||
sound *= 1 / 32768
|
||||
sound *= 1/32768
|
||||
sound = sound.squeeze() # depends on the use case
|
||||
return sound
|
||||
|
||||
@@ -74,7 +80,7 @@ class VADState(Enum):
|
||||
STOPPING = 4
|
||||
|
||||
|
||||
class BaseTransportService:
|
||||
class BaseTransportService():
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -89,16 +95,8 @@ class BaseTransportService:
|
||||
self._speaker_sample_rate = kwargs.get("speaker_sample_rate") or 16000
|
||||
self._fps = kwargs.get("fps") or 8
|
||||
self._vad_start_s = kwargs.get("vad_start_s") or 0.2
|
||||
self._vad_stop_s = kwargs.get("vad_stop_s") or 0.8
|
||||
self._vad_stop_s = kwargs.get("vad_stop_s") or 0.5
|
||||
self._context = kwargs.get("context") or []
|
||||
self._vad_enabled = kwargs.get("vad_enabled") or False
|
||||
self._receive_video = kwargs.get("receive_video") or False
|
||||
self._receive_video_fps = kwargs.get("receive_video_fps") or 0.0
|
||||
self._participant_frame_times = {}
|
||||
if self._vad_enabled and self._speaker_enabled:
|
||||
raise Exception(
|
||||
"Sorry, you can't use speaker_enabled and vad_enabled at the same time. Please set one to False."
|
||||
)
|
||||
|
||||
self._vad_samples = 1536
|
||||
vad_frame_s = self._vad_samples / SAMPLE_RATE
|
||||
@@ -107,7 +105,6 @@ class BaseTransportService:
|
||||
self._vad_starting_count = 0
|
||||
self._vad_stopping_count = 0
|
||||
self._vad_state = VADState.QUIET
|
||||
self._user_is_speaking = False
|
||||
|
||||
duration_minutes = kwargs.get("duration_minutes") or 10
|
||||
self._expiration = time.time() + duration_minutes * 60
|
||||
@@ -115,11 +112,11 @@ class BaseTransportService:
|
||||
self.send_queue = asyncio.Queue()
|
||||
self.receive_queue = asyncio.Queue()
|
||||
|
||||
self.completed_queue = asyncio.Queue()
|
||||
|
||||
self._threadsafe_send_queue = queue.Queue()
|
||||
|
||||
self._images = None
|
||||
self._user_is_speaking = False
|
||||
self._current_phrase = ""
|
||||
|
||||
try:
|
||||
self._loop: asyncio.AbstractEventLoop | None = asyncio.get_running_loop()
|
||||
@@ -131,7 +128,73 @@ class BaseTransportService:
|
||||
|
||||
self._logger: logging.Logger = logging.getLogger()
|
||||
|
||||
async def run(self, pipeline: Pipeline | None = None, override_pipeline_source_queue=True):
|
||||
def update_messages(self, new_context: list[dict[str, str]], task: asyncio.Task | None):
|
||||
if task:
|
||||
if not task.cancelled():
|
||||
self._current_phrase = ""
|
||||
self._context = new_context
|
||||
|
||||
def append_to_context(self, role, chunk_or_text):
|
||||
print("IN APPEND", chunk_or_text)
|
||||
# if we get a non-string, append it to the context without further error checking
|
||||
# unless the outOfBand property is True
|
||||
if not isinstance(chunk_or_text, str):
|
||||
|
||||
if not chunk_or_text.get("outOfBand") == True:
|
||||
self._context.append(chunk_or_text)
|
||||
return
|
||||
|
||||
text = chunk_or_text
|
||||
last_context_item = self._context[-1]
|
||||
|
||||
print("TEXT", text)
|
||||
print("LAST CONTEXT ITEM", last_context_item)
|
||||
traceback.print_stack()
|
||||
|
||||
if last_context_item and last_context_item['role'] == role:
|
||||
last_context_item['content'] += f" {text}"
|
||||
else:
|
||||
self._context.append({"role": role, "content": text})
|
||||
|
||||
async def run_pipeline(self, frame):
|
||||
print(f"starting to speak_after_delay, {frame}")
|
||||
# TODO-CB: This exception for missing class gets eaten!
|
||||
await self._runner(frame)
|
||||
|
||||
async def run_conversation(self, runner: Iterable[QueueFrame]
|
||||
| AsyncIterable[QueueFrame]
|
||||
| asyncio.Queue[QueueFrame],
|
||||
) -> AsyncGenerator[QueueFrame, None]:
|
||||
current_response_task = None
|
||||
self._runner = runner
|
||||
|
||||
async for frame in self.get_receive_frames():
|
||||
print(f"got frame of type: {type(frame)}, {frame}")
|
||||
if isinstance(frame, EndStreamQueueFrame):
|
||||
break
|
||||
# elif not isinstance(frame, TranscriptionQueueFrame):
|
||||
# continue
|
||||
# TODO-CB: Verify this is an accurate replacement
|
||||
# if hasattr(frame, 'participantId') and frame.participantId == self._my_participant_id:
|
||||
if not isinstance(frame, UserStoppedSpeakingFrame):
|
||||
continue
|
||||
|
||||
if current_response_task:
|
||||
# TODO-CB: Maybe not always interrupt? Are there frame types we can pass through?
|
||||
current_response_task.cancel()
|
||||
self.interrupt()
|
||||
|
||||
# self._current_phrase += " " + frame.text
|
||||
# current_llm_context = copy.deepcopy(self._context)
|
||||
current_response_task = asyncio.create_task(
|
||||
self.run_pipeline(
|
||||
frame)
|
||||
)
|
||||
current_response_task.add_done_callback(
|
||||
functools.partial(self.update_messages, self._context)
|
||||
)
|
||||
|
||||
async def run(self):
|
||||
self._prerun()
|
||||
|
||||
async_output_queue_marshal_task = asyncio.create_task(
|
||||
@@ -142,28 +205,23 @@ class BaseTransportService:
|
||||
self._camera_thread.start()
|
||||
|
||||
self._frame_consumer_thread = threading.Thread(
|
||||
target=self._frame_consumer, daemon=True
|
||||
)
|
||||
target=self._frame_consumer, daemon=True)
|
||||
self._frame_consumer_thread.start()
|
||||
|
||||
if self._speaker_enabled:
|
||||
self._receive_audio_thread = threading.Thread(
|
||||
target=self._receive_audio, daemon=True
|
||||
)
|
||||
self._receive_audio_thread.start()
|
||||
# TODO-CB: This is interesting
|
||||
# self._receive_audio_thread = threading.Thread(
|
||||
# target=self._receive_audio, daemon=True)
|
||||
# self._receive_audio_thread.start()
|
||||
|
||||
if self._vad_enabled:
|
||||
self._vad_thread = threading.Thread(target=self._vad, daemon=True)
|
||||
self._vad_thread.start()
|
||||
|
||||
pipeline_task = None
|
||||
if pipeline:
|
||||
pipeline_task = asyncio.create_task(
|
||||
self.run_pipeline(pipeline, override_pipeline_source_queue)
|
||||
)
|
||||
|
||||
try:
|
||||
while time.time() < self._expiration and not self._stop_threads.is_set():
|
||||
while (
|
||||
time.time() < self._expiration
|
||||
and not self._stop_threads.is_set()
|
||||
):
|
||||
await asyncio.sleep(1)
|
||||
except Exception as e:
|
||||
self._logger.error(f"Exception {e}")
|
||||
@@ -174,88 +232,14 @@ class BaseTransportService:
|
||||
|
||||
self._stop_threads.set()
|
||||
|
||||
if pipeline_task:
|
||||
pipeline_task.cancel()
|
||||
|
||||
await self.send_queue.put(EndFrame())
|
||||
|
||||
await self.send_queue.put(EndStreamQueueFrame())
|
||||
await async_output_queue_marshal_task
|
||||
await self.send_queue.join()
|
||||
self._frame_consumer_thread.join()
|
||||
|
||||
if self._speaker_enabled:
|
||||
self._receive_audio_thread.join()
|
||||
|
||||
if self._vad_enabled:
|
||||
self._vad_thread.join()
|
||||
|
||||
async def run_pipeline(self, pipeline: Pipeline, override_pipeline_source_queue=True):
|
||||
pipeline.set_sink(self.send_queue)
|
||||
if override_pipeline_source_queue:
|
||||
pipeline.set_source(self.receive_queue)
|
||||
await pipeline.run_pipeline()
|
||||
|
||||
async def run_interruptible_pipeline(
|
||||
self,
|
||||
pipeline: Pipeline,
|
||||
allow_interruptions=True,
|
||||
pre_processor=None,
|
||||
post_processor: FrameProcessor | None = None,
|
||||
):
|
||||
pipeline.set_sink(self.send_queue)
|
||||
source_queue = asyncio.Queue()
|
||||
pipeline.set_source(source_queue)
|
||||
pipeline.set_sink(self.send_queue)
|
||||
pipeline_task = asyncio.create_task(pipeline.run_pipeline())
|
||||
|
||||
async def yield_frame(frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
yield frame
|
||||
|
||||
async def post_process(post_processor: FrameProcessor):
|
||||
while True:
|
||||
frame = await self.completed_queue.get()
|
||||
|
||||
# We ignore the output of the post_processor's process frame;
|
||||
# this is called to update the post-processor's state.
|
||||
async for frame in post_processor.process_frame(frame):
|
||||
pass
|
||||
|
||||
if isinstance(frame, EndFrame):
|
||||
break
|
||||
|
||||
if post_processor:
|
||||
post_process_task = asyncio.create_task(
|
||||
post_process(post_processor))
|
||||
|
||||
started = False
|
||||
|
||||
async for frame in self.get_receive_frames():
|
||||
if isinstance(frame, UserStartedSpeakingFrame):
|
||||
pipeline_task.cancel()
|
||||
self.interrupt()
|
||||
pipeline_task = asyncio.create_task(pipeline.run_pipeline())
|
||||
started = False
|
||||
|
||||
if not started:
|
||||
await self.send_queue.put(StartFrame())
|
||||
|
||||
if pre_processor:
|
||||
frame_generator = pre_processor.process_frame(frame)
|
||||
else:
|
||||
frame_generator = yield_frame(frame)
|
||||
|
||||
async for frame in frame_generator:
|
||||
await source_queue.put(frame)
|
||||
|
||||
if isinstance(frame, EndFrame):
|
||||
break
|
||||
|
||||
await asyncio.gather(pipeline_task, post_process_task)
|
||||
|
||||
async def say(self, text: str, tts: TTSService):
|
||||
"""Say a phrase. Use with caution; this bypasses any running pipelines."""
|
||||
async for frame in tts.process_frame(TextFrame(text)):
|
||||
await self.send_queue.put(frame)
|
||||
|
||||
def _post_run(self):
|
||||
# Note that this function must be idempotent! It can be called multiple times
|
||||
# if, for example, a keyboard interrupt occurs.
|
||||
@@ -322,45 +306,44 @@ class BaseTransportService:
|
||||
case VADState.STOPPING:
|
||||
self._vad_stopping_count += 1
|
||||
|
||||
if (
|
||||
self._vad_state == VADState.STARTING
|
||||
and self._vad_starting_count >= self._vad_start_frames
|
||||
):
|
||||
if self._loop:
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.receive_queue.put(
|
||||
UserStartedSpeakingFrame()), self._loop)
|
||||
# self.interrupt()
|
||||
if self._vad_state == VADState.STARTING and self._vad_starting_count >= self._vad_start_frames:
|
||||
print(
|
||||
f'!!! {datetime.datetime.utcnow().isoformat()} queueing start frame')
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.receive_queue.put(
|
||||
UserStartedSpeakingFrame()), self._loop
|
||||
)
|
||||
print(f"!!! VAD started, calling interrupt")
|
||||
self.interrupt()
|
||||
self._vad_state = VADState.SPEAKING
|
||||
self._vad_starting_count = 0
|
||||
if (
|
||||
self._vad_state == VADState.STOPPING
|
||||
and self._vad_stopping_count >= self._vad_stop_frames
|
||||
):
|
||||
if self._loop:
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.receive_queue.put(
|
||||
UserStoppedSpeakingFrame()), self._loop)
|
||||
if self._vad_state == VADState.STOPPING and self._vad_stopping_count >= self._vad_stop_frames:
|
||||
print(
|
||||
f'!!! {datetime.datetime.utcnow().isoformat()} queueing stop frame')
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.receive_queue.put(
|
||||
UserStoppedSpeakingFrame()), self._loop
|
||||
)
|
||||
self._vad_state = VADState.QUIET
|
||||
self._vad_stopping_count = 0
|
||||
|
||||
async def _marshal_frames(self):
|
||||
while True:
|
||||
frame: Frame | list = await self.send_queue.get()
|
||||
frame: QueueFrame | list = await self.send_queue.get()
|
||||
self._threadsafe_send_queue.put(frame)
|
||||
self.send_queue.task_done()
|
||||
if isinstance(frame, EndFrame):
|
||||
if isinstance(frame, EndStreamQueueFrame):
|
||||
break
|
||||
|
||||
def interrupt(self):
|
||||
self._logger.debug("### Interrupting")
|
||||
print(f"!!! setting interrupt")
|
||||
self._is_interrupted.set()
|
||||
|
||||
async def get_receive_frames(self) -> AsyncGenerator[Frame, None]:
|
||||
async def get_receive_frames(self) -> AsyncGenerator[QueueFrame, None]:
|
||||
while True:
|
||||
frame = await self.receive_queue.get()
|
||||
yield frame
|
||||
if isinstance(frame, EndFrame):
|
||||
if isinstance(frame, EndStreamQueueFrame):
|
||||
break
|
||||
|
||||
def _receive_audio(self):
|
||||
@@ -373,14 +356,14 @@ class BaseTransportService:
|
||||
while not self._stop_threads.is_set():
|
||||
buffer = self.read_audio_frames(desired_frame_count)
|
||||
if len(buffer) > 0:
|
||||
frame = AudioFrame(buffer)
|
||||
frame = AudioQueueFrame(buffer)
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.receive_queue.put(frame), self._loop
|
||||
)
|
||||
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.receive_queue.put(
|
||||
EndFrame()), self._loop)
|
||||
self.receive_queue.put(EndStreamQueueFrame()), self._loop
|
||||
)
|
||||
|
||||
def _set_image(self, image: bytes):
|
||||
self._images = itertools.cycle([image])
|
||||
@@ -388,10 +371,6 @@ class BaseTransportService:
|
||||
def _set_images(self, images: list[bytes], start_frame=0):
|
||||
self._images = itertools.cycle(images)
|
||||
|
||||
def send_app_message(self, message: Any, participantId: str | None):
|
||||
""" Child classes should override this to send a custom message to the room. """
|
||||
pass
|
||||
|
||||
def _run_camera(self):
|
||||
try:
|
||||
while not self._stop_threads.is_set():
|
||||
@@ -408,46 +387,31 @@ class BaseTransportService:
|
||||
self._logger.info("🎬 Starting frame consumer thread")
|
||||
b = bytearray()
|
||||
smallest_write_size = 3200
|
||||
largest_write_size = 8000
|
||||
all_audio_frames = bytearray()
|
||||
while True:
|
||||
try:
|
||||
frames_or_frame: Frame | list[Frame] = self._threadsafe_send_queue.get(
|
||||
frames_or_frame: QueueFrame | list[QueueFrame] = (
|
||||
self._threadsafe_send_queue.get()
|
||||
)
|
||||
if (
|
||||
isinstance(frames_or_frame, AudioFrame)
|
||||
and len(frames_or_frame.data) > largest_write_size
|
||||
):
|
||||
# subdivide large audio frames to enable interruption
|
||||
frames = []
|
||||
for i in range(0, len(frames_or_frame.data),
|
||||
largest_write_size):
|
||||
frames.append(AudioFrame(
|
||||
frames_or_frame.data[i: i + largest_write_size]))
|
||||
elif isinstance(frames_or_frame, Frame):
|
||||
frames: list[Frame] = [frames_or_frame]
|
||||
if isinstance(frames_or_frame, QueueFrame):
|
||||
frames: list[QueueFrame] = [frames_or_frame]
|
||||
elif isinstance(frames_or_frame, list):
|
||||
frames: list[Frame] = frames_or_frame
|
||||
frames: list[QueueFrame] = frames_or_frame
|
||||
else:
|
||||
raise Exception("Unknown type in output queue")
|
||||
|
||||
for frame in frames:
|
||||
if isinstance(frame, EndFrame):
|
||||
if isinstance(frame, EndStreamQueueFrame):
|
||||
self._logger.info("Stopping frame consumer thread")
|
||||
self._stop_threads.set()
|
||||
self._threadsafe_send_queue.task_done()
|
||||
if self._loop:
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.completed_queue.put(frame), self._loop
|
||||
)
|
||||
return
|
||||
|
||||
# if interrupted, we just pull frames off the queue and
|
||||
# discard them
|
||||
# if interrupted, we just pull frames off the queue and discard them
|
||||
if not self._is_interrupted.is_set():
|
||||
if frame:
|
||||
|
||||
if isinstance(frame, AudioFrame):
|
||||
if isinstance(frame, AudioQueueFrame):
|
||||
chunk = frame.data
|
||||
all_audio_frames.extend(chunk)
|
||||
|
||||
b.extend(chunk)
|
||||
truncated_length: int = len(b) - (
|
||||
@@ -457,51 +421,28 @@ class BaseTransportService:
|
||||
self.write_frame_to_mic(
|
||||
bytes(b[:truncated_length]))
|
||||
b = b[truncated_length:]
|
||||
elif isinstance(frame, TelestratorImageFrame):
|
||||
elif isinstance(frame, ImageQueueFrame):
|
||||
self._set_image(frame.image)
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.receive_queue.put(frame),
|
||||
self._loop,
|
||||
)
|
||||
elif isinstance(frame, ImageFrame):
|
||||
self._set_image(frame.image)
|
||||
elif isinstance(frame, SpriteFrame):
|
||||
elif isinstance(frame, SpriteQueueFrame):
|
||||
self._set_images(frame.images)
|
||||
elif isinstance(frame, SendAppMessageFrame):
|
||||
self.send_app_message(
|
||||
frame.message, frame.participantId)
|
||||
elif isinstance(frame, RequestVideoImageFrame):
|
||||
# removing one or all participant IDs from _participant_frame_times
|
||||
# will cause the transport to send the next available frame from
|
||||
# that participant
|
||||
if frame.participantId:
|
||||
self._participant_frame_times.pop(
|
||||
frame.participantId, None)
|
||||
else:
|
||||
self._participant_frame_times.clear()
|
||||
elif isinstance(frame, TTSCompletedFrame) and not frame.outOfBand:
|
||||
self.append_to_context(
|
||||
"assistant", frame.text)
|
||||
elif len(b):
|
||||
self.write_frame_to_mic(bytes(b))
|
||||
b = bytearray()
|
||||
else:
|
||||
# if there are leftover audio bytes, write them now; failing to do so
|
||||
# can cause static in the audio stream.
|
||||
print(f"!!! interrupted, flushing audio")
|
||||
if len(b):
|
||||
truncated_length = len(b) - (len(b) % 160)
|
||||
self.write_frame_to_mic(
|
||||
bytes(b[:truncated_length]))
|
||||
b = bytearray()
|
||||
|
||||
if isinstance(frame, StartFrame):
|
||||
if isinstance(frame, StartStreamQueueFrame):
|
||||
self._is_interrupted.clear()
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.receive_queue.put(PipelineStartedFrame()),
|
||||
self._loop,
|
||||
)
|
||||
|
||||
if self._loop:
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.completed_queue.put(frame), self._loop
|
||||
)
|
||||
|
||||
self._threadsafe_send_queue.task_done()
|
||||
except queue.Empty:
|
||||
|
||||
@@ -1,23 +1,4 @@
|
||||
import asyncio
|
||||
import inspect
|
||||
import logging
|
||||
import signal
|
||||
import time
|
||||
import threading
|
||||
import types
|
||||
|
||||
from functools import partial
|
||||
from typing import Any
|
||||
|
||||
from dailyai.pipeline.frames import (
|
||||
ReceivedAppMessageFrame,
|
||||
TranscriptionQueueFrame,
|
||||
VideoImageFrame,
|
||||
TelestratorImageFrame
|
||||
)
|
||||
|
||||
from threading import Event
|
||||
|
||||
from dailyai.services.base_transport_service import BaseTransportService
|
||||
from daily import (
|
||||
EventHandler,
|
||||
CallClient,
|
||||
@@ -26,8 +7,61 @@ from daily import (
|
||||
VirtualMicrophoneDevice,
|
||||
VirtualSpeakerDevice,
|
||||
)
|
||||
from threading import Event
|
||||
from dailyai.queue_frame import (
|
||||
TranscriptionQueueFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame
|
||||
)
|
||||
from functools import partial
|
||||
import types
|
||||
import pyaudio
|
||||
import torchaudio
|
||||
import asyncio
|
||||
import inspect
|
||||
import io
|
||||
import logging
|
||||
import numpy as np
|
||||
import signal
|
||||
import threading
|
||||
import torch
|
||||
torch.set_num_threads(1)
|
||||
|
||||
from dailyai.services.base_transport_service import BaseTransportService
|
||||
model, utils = torch.hub.load(repo_or_dir='snakers4/silero-vad',
|
||||
model='silero_vad',
|
||||
force_reload=False)
|
||||
|
||||
(get_speech_timestamps,
|
||||
save_audio,
|
||||
read_audio,
|
||||
VADIterator,
|
||||
collect_chunks) = utils
|
||||
|
||||
# Taken from utils_vad.py
|
||||
|
||||
|
||||
def validate(model,
|
||||
inputs: torch.Tensor):
|
||||
with torch.no_grad():
|
||||
outs = model(inputs)
|
||||
return outs
|
||||
|
||||
# Provided by Alexander Veysov
|
||||
|
||||
|
||||
def int2float(sound):
|
||||
abs_max = np.abs(sound).max()
|
||||
sound = sound.astype('float32')
|
||||
if abs_max > 0:
|
||||
sound *= 1/32768
|
||||
sound = sound.squeeze() # depends on the use case
|
||||
return sound
|
||||
|
||||
|
||||
FORMAT = pyaudio.paInt16
|
||||
CHANNELS = 1
|
||||
SAMPLE_RATE = 16000
|
||||
CHUNK = int(SAMPLE_RATE / 10)
|
||||
|
||||
audio = pyaudio.PyAudio()
|
||||
|
||||
|
||||
class DailyTransportService(BaseTransportService, EventHandler):
|
||||
@@ -36,7 +70,6 @@ class DailyTransportService(BaseTransportService, EventHandler):
|
||||
|
||||
_speaker_enabled: bool
|
||||
_speaker_sample_rate: int
|
||||
_vad_enabled: bool
|
||||
|
||||
# This is necessary to override EventHandler's __new__ method.
|
||||
def __new__(cls, *args, **kwargs):
|
||||
@@ -87,12 +120,8 @@ class DailyTransportService(BaseTransportService, EventHandler):
|
||||
for handler in self._event_handlers[event_name]:
|
||||
if inspect.iscoroutinefunction(handler):
|
||||
if self._loop:
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
handler(*args, **kwargs), self._loop)
|
||||
|
||||
# wait for the coroutine to finish. This will also
|
||||
# raise any exceptions raised by the coroutine.
|
||||
future.result()
|
||||
else:
|
||||
raise Exception(
|
||||
"No event loop to run coroutine. In order to use async event handlers, you must run the DailyTransportService in an asyncio event loop.")
|
||||
@@ -134,9 +163,6 @@ class DailyTransportService(BaseTransportService, EventHandler):
|
||||
def write_frame_to_mic(self, frame: bytes):
|
||||
self.mic.write_frames(frame)
|
||||
|
||||
def send_app_message(self, message: Any, participantId: str | None):
|
||||
self.client.send_app_message(message, participantId)
|
||||
|
||||
def read_audio_frames(self, desired_frame_count):
|
||||
bytes = self._speaker.read_frames(desired_frame_count)
|
||||
return bytes
|
||||
@@ -156,63 +182,58 @@ class DailyTransportService(BaseTransportService, EventHandler):
|
||||
|
||||
if self._camera_enabled:
|
||||
self.camera: VirtualCameraDevice = Daily.create_camera_device(
|
||||
"camera", width=self._camera_width, height=self._camera_height, color_format="RGB")
|
||||
"camera", width=self._camera_width, height=self._camera_height, color_format="RGB"
|
||||
)
|
||||
|
||||
if self._speaker_enabled or self._vad_enabled:
|
||||
if self._speaker_enabled:
|
||||
self._speaker: VirtualSpeakerDevice = Daily.create_speaker_device(
|
||||
"speaker", sample_rate=self._speaker_sample_rate, channels=1
|
||||
)
|
||||
Daily.select_speaker_device("speaker")
|
||||
|
||||
self.client.set_user_name(self._bot_name)
|
||||
self.client.join(
|
||||
self._room_url,
|
||||
self._token,
|
||||
completion=self.call_joined,
|
||||
client_settings={
|
||||
"inputs": {
|
||||
"camera": {
|
||||
"isEnabled": True,
|
||||
"settings": {
|
||||
"deviceId": "camera",
|
||||
},
|
||||
},
|
||||
"microphone": {
|
||||
"isEnabled": True,
|
||||
"settings": {
|
||||
"deviceId": "mic",
|
||||
"customConstraints": {
|
||||
"autoGainControl": {"exact": False},
|
||||
"echoCancellation": {"exact": False},
|
||||
"noiseSuppression": {"exact": False},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"publishing": {
|
||||
"camera": {
|
||||
"sendSettings": {
|
||||
"maxQuality": "low",
|
||||
"encodings": {
|
||||
"low": {
|
||||
"maxBitrate": 250000,
|
||||
"scaleResolutionDownBy": 1.333,
|
||||
"maxFramerate": 8,
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
},
|
||||
)
|
||||
self.client.join(self._room_url, self._token,
|
||||
completion=self.call_joined)
|
||||
self._my_participant_id = self.client.participants()["local"]["id"]
|
||||
|
||||
if not self._receive_video:
|
||||
self.client.update_subscription_profiles({
|
||||
"base": {
|
||||
"camera": "unsubscribed",
|
||||
self.client.update_inputs(
|
||||
{
|
||||
"camera": {
|
||||
"isEnabled": True,
|
||||
"settings": {
|
||||
"deviceId": "camera",
|
||||
},
|
||||
},
|
||||
"microphone": {
|
||||
"isEnabled": True,
|
||||
"settings": {
|
||||
"deviceId": "mic",
|
||||
"customConstraints": {
|
||||
"autoGainControl": {"exact": False},
|
||||
"echoCancellation": {"exact": False},
|
||||
"noiseSuppression": {"exact": False},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
self.client.update_publishing(
|
||||
{
|
||||
"camera": {
|
||||
"sendSettings": {
|
||||
"maxQuality": "low",
|
||||
"encodings": {
|
||||
"low": {
|
||||
"maxBitrate": 250000,
|
||||
"scaleResolutionDownBy": 1.333,
|
||||
"maxFramerate": 8,
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
)
|
||||
|
||||
if self._token and self._start_transcription:
|
||||
self.client.start_transcription(self.transcription_settings)
|
||||
@@ -227,39 +248,12 @@ class DailyTransportService(BaseTransportService, EventHandler):
|
||||
|
||||
def _post_run(self):
|
||||
self.client.leave()
|
||||
self.client.release()
|
||||
|
||||
def _handle_video_frame(self, participant_id, video_frame):
|
||||
"""If receive_video is true, this function is called once for each frame from each participant. We
|
||||
don't need to send every frame to the pipeline, so there are two ways to decide how to send frames:
|
||||
1. Set a greater-than-zero value for receive_video_fps. The transport will track the last send time
|
||||
for each participant and send a new frame when the requested frame rate has elapsed. This
|
||||
guarantees an image every second, for example.
|
||||
2. Set receive_video_fps less than or equal to zero to disable timed frame sending. Then, put a
|
||||
RequestVideoImageFrame in the pipeline to get a new frame for one or all participants. By
|
||||
sending a RequestVideoImageFrame immediately after successfully processing an image, you can
|
||||
ensure you don't end up queueing up frames faster than you can process them.
|
||||
"""
|
||||
send_frame = False
|
||||
if not participant_id in self._participant_frame_times:
|
||||
# then it's a new participant; send the first frame
|
||||
send_frame = True
|
||||
elif self._receive_video_fps > 0 and time.time() > self._participant_frame_times[participant_id] + 1.0/self._receive_video_fps:
|
||||
# Then it's an existing participant who is due to send a new frame
|
||||
send_frame = True
|
||||
|
||||
if send_frame:
|
||||
self._participant_frame_times[participant_id] = time.time()
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
self.receive_queue.put(
|
||||
VideoImageFrame(participant_id, video_frame)), self._loop)
|
||||
|
||||
def on_first_other_participant_joined(self):
|
||||
pass
|
||||
|
||||
def call_joined(self, join_data, client_error):
|
||||
# self._logger.info(f"Call_joined: {join_data}, {client_error}")
|
||||
pass
|
||||
self._logger.info(f"Call_joined: {join_data}, {client_error}")
|
||||
|
||||
def dialout(self, number):
|
||||
self.client.start_dialout({"phoneNumber": number})
|
||||
@@ -277,40 +271,52 @@ class DailyTransportService(BaseTransportService, EventHandler):
|
||||
if not self._other_participant_has_joined and participant["id"] != self._my_participant_id:
|
||||
self._other_participant_has_joined = True
|
||||
self.on_first_other_participant_joined()
|
||||
if self._receive_video:
|
||||
self.client.set_video_renderer(
|
||||
participant["id"], self._handle_video_frame)
|
||||
|
||||
def on_participant_left(self, participant, reason):
|
||||
if len(self.client.participants()) < self._min_others_count + 1:
|
||||
self._stop_threads.set()
|
||||
|
||||
def on_app_message(self, message: Any, sender: str):
|
||||
async def insert_speech(self, text, sender, date):
|
||||
await self.receive_queue.put(UserStartedSpeakingFrame())
|
||||
await asyncio.sleep(0.3)
|
||||
|
||||
# frame = TranscriptionQueueFrame(text, sender, date)
|
||||
# await self.receive_queue.put(frame)
|
||||
self.on_transcription_message({
|
||||
"text": text,
|
||||
"participantId": "cb65b845-aac0-4fc8-987d-2e7ce3c7d8f0",
|
||||
"timestamp": date
|
||||
})
|
||||
|
||||
await asyncio.sleep(0.3)
|
||||
await self.receive_queue.put(UserStoppedSpeakingFrame())
|
||||
|
||||
def on_app_message(self, message, sender):
|
||||
if self._loop:
|
||||
frame = ReceivedAppMessageFrame(message, sender)
|
||||
print(frame)
|
||||
print("APP MESSAGE", message)
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.receive_queue.put(frame), self._loop
|
||||
)
|
||||
self.insert_speech(message["message"], sender, message["date"]), self._loop)
|
||||
|
||||
def on_transcription_message(self, message: dict):
|
||||
if self._loop:
|
||||
print(f"transcription: {message}")
|
||||
participantId = ""
|
||||
if "participantId" in message:
|
||||
participantId = message["participantId"]
|
||||
elif "session_id" in message:
|
||||
participantId = message["session_id"]
|
||||
frame = TranscriptionQueueFrame(
|
||||
message["text"], participantId, message["timestamp"])
|
||||
if self._my_participant_id and participantId != self._my_participant_id:
|
||||
frame = TranscriptionQueueFrame(
|
||||
message["text"], participantId, message["timestamp"])
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.receive_queue.put(frame), self._loop)
|
||||
|
||||
def on_transcription_error(self, message):
|
||||
self._logger.error(f"Transcription error: {message}")
|
||||
|
||||
def on_transcription_started(self, status):
|
||||
pass
|
||||
self.append_to_context("user", message["text"])
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.receive_queue.put(frame), self._loop)
|
||||
|
||||
def on_transcription_stopped(self, stopped_by, stopped_by_error):
|
||||
pass
|
||||
|
||||
def on_transcription_error(self, message):
|
||||
pass
|
||||
|
||||
def on_transcription_started(self, status):
|
||||
pass
|
||||
|
||||
@@ -25,9 +25,7 @@ class DeepgramAIService(TTSService):
|
||||
self.logger.info(f"Running deepgram tts for {sentence}")
|
||||
base_url = "https://api.beta.deepgram.com/v1/speak"
|
||||
request_url = f"{base_url}?model={self._voice}&encoding=linear16&container=none&sample_rate={self._sample_rate}"
|
||||
headers = {
|
||||
"authorization": f"token {self._api_key}",
|
||||
"Content-Type": "application/json"}
|
||||
headers = {"authorization": f"token {self._api_key}", "Content-Type": "application/json"}
|
||||
data = {"text": sentence}
|
||||
|
||||
async with self._aiohttp_session.post(
|
||||
|
||||
@@ -9,12 +9,7 @@ from dailyai.services.ai_services import TTSService
|
||||
|
||||
|
||||
class DeepgramTTSService(TTSService):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
aiohttp_session,
|
||||
api_key,
|
||||
voice="alpha-asteria-en-v2"):
|
||||
def __init__(self, *, aiohttp_session, api_key, voice="alpha-asteria-en-v2"):
|
||||
super().__init__()
|
||||
|
||||
self._voice = voice
|
||||
|
||||
@@ -15,28 +15,22 @@ class ElevenLabsTTSService(TTSService):
|
||||
*,
|
||||
aiohttp_session: aiohttp.ClientSession,
|
||||
api_key,
|
||||
narrator,
|
||||
model="eleven_turbo_v2",
|
||||
aggregate_sentences=True
|
||||
voice_id,
|
||||
):
|
||||
super().__init__(aggregate_sentences)
|
||||
super().__init__()
|
||||
|
||||
self._api_key = api_key
|
||||
self._narrator = narrator
|
||||
self._voice_id = voice_id
|
||||
self._aiohttp_session = aiohttp_session
|
||||
self._model = model
|
||||
|
||||
async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]:
|
||||
url = f"https://api.elevenlabs.io/v1/text-to-speech/{self._narrator['narrator']['voice_id']}/stream"
|
||||
payload = {"text": sentence, "model_id": self._model}
|
||||
querystring = {
|
||||
"output_format": "pcm_16000",
|
||||
"optimize_streaming_latency": 2}
|
||||
url = f"https://api.elevenlabs.io/v1/text-to-speech/{self._voice_id}/stream"
|
||||
payload = {"text": sentence, "model_id": "eleven_turbo_v2"}
|
||||
querystring = {"output_format": "pcm_16000", "optimize_streaming_latency": 2}
|
||||
headers = {
|
||||
"xi-api-key": self._api_key,
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
async with self._aiohttp_session.post(
|
||||
url, json=payload, headers=headers, params=querystring
|
||||
) as r:
|
||||
|
||||
@@ -9,19 +9,17 @@ from dailyai.services.ai_services import ImageGenService
|
||||
|
||||
|
||||
from dailyai.services.ai_services import ImageGenService
|
||||
|
||||
# Fal expects FAL_KEY_ID and FAL_KEY_SECRET to be set in the env
|
||||
|
||||
|
||||
class FalImageGenService(ImageGenService):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
image_size,
|
||||
aiohttp_session: aiohttp.ClientSession,
|
||||
key_id=None,
|
||||
key_secret=None
|
||||
):
|
||||
self,
|
||||
*,
|
||||
image_size,
|
||||
aiohttp_session: aiohttp.ClientSession,
|
||||
key_id=None,
|
||||
key_secret=None):
|
||||
super().__init__(image_size)
|
||||
self._aiohttp_session = aiohttp_session
|
||||
if key_id:
|
||||
@@ -33,8 +31,10 @@ class FalImageGenService(ImageGenService):
|
||||
def get_image_url(sentence, size):
|
||||
handler = fal.apps.submit(
|
||||
"110602490-fast-sdxl",
|
||||
# "fal-ai/fast-sdxl",
|
||||
arguments={"prompt": sentence},
|
||||
arguments={
|
||||
"prompt": sentence,
|
||||
"seed": 23
|
||||
},
|
||||
)
|
||||
for event in handler.iter_events():
|
||||
if isinstance(event, fal.apps.InProgress):
|
||||
@@ -47,13 +47,9 @@ class FalImageGenService(ImageGenService):
|
||||
raise Exception("Image generation failed")
|
||||
|
||||
return image_url
|
||||
|
||||
image_url = await asyncio.to_thread(get_image_url, sentence, self.image_size)
|
||||
# Load the image from the url
|
||||
async with self._aiohttp_session.get(image_url) as response:
|
||||
image_stream = io.BytesIO(await response.content.read())
|
||||
image = Image.open(image_stream)
|
||||
image_bytes = image.tobytes()
|
||||
print(f"!!! fal image tobytes is:")
|
||||
print(image)
|
||||
return (image_url, image_bytes)
|
||||
return (image_url, image.tobytes())
|
||||
|
||||
122
src/dailyai/services/fireworks_ai_services.py
Normal file
@@ -0,0 +1,122 @@
|
||||
import aiohttp
|
||||
from PIL import Image
|
||||
import io
|
||||
from openai import AsyncOpenAI
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from collections.abc import AsyncGenerator
|
||||
|
||||
from dailyai.services.ai_services import LLMService, ImageGenService
|
||||
|
||||
from dailyai.queue_frame import (TextQueueFrame, TextQueueOutOfBandFrame)
|
||||
|
||||
|
||||
class FireworksLLMService(LLMService):
|
||||
def __init__(self, *, api_key, model="", tools=[], context, change_appearance, transport=""):
|
||||
super().__init__(context)
|
||||
self._model = model
|
||||
self._tools = tools
|
||||
self._change_appearance = change_appearance
|
||||
self._transport = transport
|
||||
self._client = AsyncOpenAI(
|
||||
api_key=api_key,
|
||||
base_url="https://api.fireworks.ai/inference/v1"
|
||||
)
|
||||
|
||||
async def get_response(self, messages, stream):
|
||||
print("GET RESPONSE ... WHEN DO WE EXPECT THIS TO BE CALLED?")
|
||||
return await self._client.chat.completions.create(
|
||||
stream=stream,
|
||||
messages=messages,
|
||||
model=self._model,
|
||||
temperature=0.1,
|
||||
tools=self._tools
|
||||
)
|
||||
|
||||
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
|
||||
print("IN ASYNC")
|
||||
messages_for_log = json.dumps(messages)
|
||||
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
|
||||
|
||||
chunks = await self._client.chat.completions.create(
|
||||
model=self._model,
|
||||
stream=True, # BLARGH
|
||||
messages=messages,
|
||||
temperature=0.1,
|
||||
tools=self._tools
|
||||
)
|
||||
|
||||
tool_call = {}
|
||||
|
||||
async for chunk in chunks:
|
||||
print(f"CHUNK: {chunk}")
|
||||
if len(chunk.choices) == 0:
|
||||
continue
|
||||
|
||||
if chunk.choices[0].delta.content:
|
||||
yield chunk.choices[0].delta.content
|
||||
|
||||
if chunk.choices[0].delta.tool_calls:
|
||||
print(f"TOOL CALLS: {chunk.choices[0].delta.tool_calls[0]}")
|
||||
if chunk.choices[0].delta.tool_calls[0].function.name:
|
||||
tool_call["id"] = chunk.choices[0].delta.tool_calls[0].id
|
||||
tool_call["name"] = chunk.choices[0].delta.tool_calls[0].function.name
|
||||
tool_call["arguments"] = ''
|
||||
if chunk.choices[0].delta.tool_calls[0].function.arguments:
|
||||
tool_call["arguments"] += chunk.choices[0].delta.tool_calls[0].function.arguments
|
||||
|
||||
if chunk.choices[0].finish_reason:
|
||||
print(f"TOOL CALLS ACCUM -- {tool_call}")
|
||||
if tool_call.get("name"):
|
||||
# hard coding tool call action for now. we should assemble the tool call
|
||||
# from the streaming response, then yield it to the pipeline.
|
||||
# this approach works for the first few change appearance requests but
|
||||
# then the model starts refusing. need to read more about function
|
||||
# calling, try this with the OpenAI APIs, and talk to the Fireworks people.
|
||||
self._transport.append_to_context("assistant", {
|
||||
# pipeline will append the content to this context after it goes
|
||||
# through tts. we need to manually append the tool call, though
|
||||
"content": "",
|
||||
"role": "assistant",
|
||||
"tool_calls": [
|
||||
{
|
||||
"id": tool_call["id"],
|
||||
"type": "function",
|
||||
"index": 0,
|
||||
"function": {
|
||||
"name": tool_call["name"],
|
||||
"arguments": tool_call["arguments"]
|
||||
},
|
||||
}
|
||||
],
|
||||
})
|
||||
self._transport.append_to_context("tool", {
|
||||
"content": "image generated by prompt arguments: " + tool_call["arguments"],
|
||||
"role": "tool",
|
||||
"tool_call_id": tool_call["id"]
|
||||
})
|
||||
self._transport.append_to_context("assistant", {
|
||||
"content": f"call to {tool_call['name']} function succeeded",
|
||||
"role": "assistant",
|
||||
})
|
||||
print("APPENDED TO CONTEXT")
|
||||
image_prompt = json.loads(
|
||||
tool_call["arguments"]).get("appearance")
|
||||
print("IMAGE PROMPT", image_prompt)
|
||||
asyncio.create_task(
|
||||
self._change_appearance(image_prompt))
|
||||
yield TextQueueOutOfBandFrame("Sure, let me work on that for you!")
|
||||
# yield {"content": "Sure, let me work on that for you!"}
|
||||
# yield "Sure, let me work on that for you!"
|
||||
|
||||
async def run_llm(self, messages) -> str | None:
|
||||
print("--> IN SYNC ... WHEN DO WE EXPECT THIS TO BE CALLED?")
|
||||
messages_for_log = json.dumps(messages)
|
||||
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
|
||||
|
||||
response = await self._client.chat.completions.create(model=self._model, stream=False, messages=messages)
|
||||
if response and len(response.choices) > 0:
|
||||
return response.choices[0].message.content
|
||||
else:
|
||||
return None
|
||||
33
src/dailyai/services/groq_ai_services.py
Normal file
@@ -0,0 +1,33 @@
|
||||
import os
|
||||
import groq
|
||||
from groq import AsyncGroq
|
||||
from dailyai.services.ai_services import LLMService
|
||||
from collections.abc import AsyncGenerator
|
||||
|
||||
|
||||
class GroqLLMService(LLMService):
|
||||
def __init__(self, *, api_key, model="mixtral-8x7b-32768", context):
|
||||
super().__init__(context)
|
||||
self._model = model
|
||||
# os.environ["GROQ_SECRET_ACCESS_KEY"] = api_key
|
||||
|
||||
self._client = AsyncGroq()
|
||||
|
||||
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
|
||||
print(f"messages are {messages}")
|
||||
try:
|
||||
resp = await self._client.chat.completions.create(messages=messages, model=self._model)
|
||||
print(f"got chunks from groq: {resp}")
|
||||
|
||||
if resp.choices[0].message.content:
|
||||
yield resp.choices[0].message.content
|
||||
except groq.APIConnectionError as e:
|
||||
print("The server could not be reached")
|
||||
print(e.__cause__) # an underlying Exception, likely raised within httpx.
|
||||
except groq.RateLimitError as e:
|
||||
print("A 429 status code was received; we should back off a bit.")
|
||||
except groq.APIStatusError as e:
|
||||
print("Another non-200-range status code was received")
|
||||
print(e.status_code)
|
||||
print(e.response)
|
||||
|
||||
@@ -4,7 +4,7 @@ import math
|
||||
import time
|
||||
from typing import AsyncGenerator
|
||||
import wave
|
||||
from dailyai.pipeline.frames import AudioFrame, Frame, TranscriptionQueueFrame
|
||||
from dailyai.queue_frame import AudioQueueFrame, QueueFrame, TranscriptionQueueFrame
|
||||
from dailyai.services.ai_services import STTService
|
||||
|
||||
|
||||
@@ -39,9 +39,9 @@ class LocalSTTService(STTService):
|
||||
ww.setframerate(self._frame_rate)
|
||||
self._wave = ww
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
"""Processes a frame of audio data, either buffering or transcribing it."""
|
||||
if not isinstance(frame, AudioFrame):
|
||||
if not isinstance(frame, AudioQueueFrame):
|
||||
return
|
||||
|
||||
data = frame.data
|
||||
|
||||
@@ -15,15 +15,13 @@ class LocalTransportService(BaseTransportService):
|
||||
self._tk_root = kwargs.get("tk_root") or None
|
||||
|
||||
if self._camera_enabled and not self._tk_root:
|
||||
raise ValueError(
|
||||
"If camera is enabled, a tkinter root must be provided")
|
||||
raise ValueError("If camera is enabled, a tkinter root must be provided")
|
||||
|
||||
if self._speaker_enabled:
|
||||
self._speaker_buffer_pending = bytearray()
|
||||
|
||||
async def _write_frame_to_tkinter(self, frame: bytes):
|
||||
data = f"P6 {self._camera_width} {self._camera_height} 255 ".encode() + \
|
||||
frame
|
||||
data = f"P6 {self._camera_width} {self._camera_height} 255 ".encode() + frame
|
||||
photo = tk.PhotoImage(
|
||||
width=self._camera_width,
|
||||
height=self._camera_height,
|
||||
@@ -31,8 +29,7 @@ class LocalTransportService(BaseTransportService):
|
||||
format="PPM")
|
||||
self._image_label.config(image=photo)
|
||||
|
||||
# This holds a reference to the photo, preventing it from being garbage
|
||||
# collected.
|
||||
# This holds a reference to the photo, preventing it from being garbage collected.
|
||||
self._image_label.image = photo # type: ignore
|
||||
|
||||
def write_frame_to_camera(self, frame: bytes):
|
||||
@@ -64,13 +61,8 @@ class LocalTransportService(BaseTransportService):
|
||||
if self._camera_enabled:
|
||||
# Start with a neutral gray background.
|
||||
array = np.ones((1024, 1024, 3)) * 128
|
||||
data = f"P5 {1024} {1024} 255 ".encode(
|
||||
) + array.astype(np.uint8).tobytes()
|
||||
photo = tk.PhotoImage(
|
||||
width=1024,
|
||||
height=1024,
|
||||
data=data,
|
||||
format="PPM")
|
||||
data = f"P5 {1024} {1024} 255 ".encode() + array.astype(np.uint8).tobytes()
|
||||
photo = tk.PhotoImage(width=1024, height=1024, data=data, format="PPM")
|
||||
self._image_label = tk.Label(self._tk_root, image=photo)
|
||||
self._image_label.pack()
|
||||
|
||||
|
||||
@@ -1,7 +1,42 @@
|
||||
from dailyai.services.openai_api_llm_service import BaseOpenAILLMService
|
||||
from openai import AsyncOpenAI
|
||||
|
||||
import json
|
||||
from collections.abc import AsyncGenerator
|
||||
|
||||
from dailyai.services.ai_services import LLMService
|
||||
|
||||
|
||||
class OLLamaLLMService(BaseOpenAILLMService):
|
||||
class OLLamaLLMService(LLMService):
|
||||
def __init__(self, model="llama2", base_url='http://localhost:11434/v1'):
|
||||
super().__init__()
|
||||
self._model = model
|
||||
self._client = AsyncOpenAI(api_key="ollama", base_url=base_url)
|
||||
|
||||
def __init__(self, model="llama2", base_url="http://localhost:11434/v1"):
|
||||
super().__init__(model=model, base_url=base_url, api_key="ollama")
|
||||
async def get_response(self, messages, stream):
|
||||
return await self._client.chat.completions.create(
|
||||
stream=stream,
|
||||
messages=messages,
|
||||
model=self._model
|
||||
)
|
||||
|
||||
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
|
||||
messages_for_log = json.dumps(messages)
|
||||
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
|
||||
|
||||
chunks = await self._client.chat.completions.create(model=self._model, stream=True, messages=messages)
|
||||
async for chunk in chunks:
|
||||
if len(chunk.choices) == 0:
|
||||
continue
|
||||
|
||||
if chunk.choices[0].delta.content:
|
||||
yield chunk.choices[0].delta.content
|
||||
|
||||
async def run_llm(self, messages) -> str | None:
|
||||
messages_for_log = json.dumps(messages)
|
||||
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
|
||||
|
||||
response = await self._client.chat.completions.create(model=self._model, stream=False, messages=messages)
|
||||
if response and len(response.choices) > 0:
|
||||
return response.choices[0].message.content
|
||||
else:
|
||||
return None
|
||||
|
||||
@@ -1,29 +1,48 @@
|
||||
import aiohttp
|
||||
from PIL import Image
|
||||
import io
|
||||
import time
|
||||
import base64
|
||||
from openai import AsyncOpenAI, AsyncStream
|
||||
from openai import AsyncOpenAI
|
||||
|
||||
import json
|
||||
from collections.abc import AsyncGenerator
|
||||
|
||||
from openai.types.chat import (
|
||||
ChatCompletion,
|
||||
ChatCompletionChunk,
|
||||
ChatCompletionMessageParam,
|
||||
)
|
||||
|
||||
from daily import VideoFrame
|
||||
from dailyai.services.ai_services import LLMService, ImageGenService, VisionService
|
||||
from dailyai.services.openai_api_llm_service import BaseOpenAILLMService
|
||||
from dailyai.pipeline.frames import TextFrame
|
||||
from dailyai.services.ai_services import LLMService, ImageGenService
|
||||
|
||||
|
||||
class OpenAILLMService(BaseOpenAILLMService):
|
||||
class OpenAILLMService(LLMService):
|
||||
def __init__(self, *, api_key, model="gpt-4-turbo-preview", context):
|
||||
super().__init__(context)
|
||||
self._model = model
|
||||
self._client = AsyncOpenAI(api_key=api_key)
|
||||
|
||||
def __init__(self, model="gpt-4", * args, **kwargs):
|
||||
super().__init__(model, *args, **kwargs)
|
||||
async def get_response(self, messages, stream):
|
||||
return await self._client.chat.completions.create(
|
||||
stream=stream,
|
||||
messages=messages,
|
||||
model=self._model
|
||||
)
|
||||
|
||||
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
|
||||
messages_for_log = json.dumps(messages)
|
||||
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
|
||||
|
||||
chunks = await self._client.chat.completions.create(model=self._model, stream=True, messages=messages)
|
||||
async for chunk in chunks:
|
||||
if len(chunk.choices) == 0:
|
||||
continue
|
||||
|
||||
if chunk.choices[0].delta.content:
|
||||
yield chunk.choices[0].delta.content
|
||||
|
||||
async def run_llm(self, messages) -> str | None:
|
||||
messages_for_log = json.dumps(messages)
|
||||
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
|
||||
|
||||
response = await self._client.chat.completions.create(model=self._model, stream=False, messages=messages)
|
||||
if response and len(response.choices) > 0:
|
||||
return response.choices[0].message.content
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
class OpenAIImageGenService(ImageGenService):
|
||||
@@ -38,6 +57,7 @@ class OpenAIImageGenService(ImageGenService):
|
||||
):
|
||||
super().__init__(image_size=image_size)
|
||||
self._model = model
|
||||
print(f"api key: {api_key}")
|
||||
self._client = AsyncOpenAI(api_key=api_key)
|
||||
self._aiohttp_session = aiohttp_session
|
||||
|
||||
@@ -59,67 +79,3 @@ class OpenAIImageGenService(ImageGenService):
|
||||
image_stream = io.BytesIO(await response.content.read())
|
||||
image = Image.open(image_stream)
|
||||
return (image_url, image.tobytes())
|
||||
|
||||
|
||||
class OpenAIVisionService(VisionService):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
model="gpt-4-vision-preview",
|
||||
api_key,
|
||||
):
|
||||
self._model = model
|
||||
self._client = AsyncOpenAI(api_key=api_key)
|
||||
|
||||
async def run_vision(self, prompt: str, image: bytes):
|
||||
if isinstance(image, VideoFrame):
|
||||
# Then it's from a daily video frame
|
||||
print("### processing daily video frame for recognition")
|
||||
IMAGE_WIDTH = image.width
|
||||
IMAGE_HEIGHT = image.height
|
||||
COLOR_FORMAT = image.color_format
|
||||
a_image = Image.frombytes(
|
||||
'RGBA', (IMAGE_WIDTH, IMAGE_HEIGHT), image.buffer)
|
||||
new_image = a_image.convert('RGB')
|
||||
else:
|
||||
# handle it as a byte stream from image gen
|
||||
new_image = Image.frombytes('RGB', (1024, 1024), image)
|
||||
# Uncomment these lines to write the frame to a jpg in the same directory.
|
||||
# current_path = os.getcwd()
|
||||
# image_path = os.path.join(current_path, "image.jpg")
|
||||
# image.save(image_path, format="JPEG")
|
||||
|
||||
jpeg_buffer = io.BytesIO()
|
||||
|
||||
new_image.save(jpeg_buffer, format='JPEG')
|
||||
|
||||
jpeg_bytes = jpeg_buffer.getvalue()
|
||||
base64_image = base64.b64encode(jpeg_bytes).decode('utf-8')
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{"type": "text", "text": prompt},
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {
|
||||
"url": f"data:image/jpeg;base64,{base64_image}"
|
||||
},
|
||||
},
|
||||
],
|
||||
}
|
||||
]
|
||||
chunks: AsyncStream[ChatCompletionChunk] = (
|
||||
await self._client.chat.completions.create(
|
||||
model=self._model,
|
||||
stream=True,
|
||||
messages=messages,
|
||||
)
|
||||
)
|
||||
async for chunk in chunks:
|
||||
print(f"%%% chunk: {chunk}")
|
||||
if len(chunk.choices) == 0:
|
||||
continue
|
||||
if chunk.choices[0].delta.content:
|
||||
yield TextFrame(chunk.choices[0].delta.content)
|
||||
|
||||
@@ -1,124 +0,0 @@
|
||||
import json
|
||||
import time
|
||||
from typing import AsyncGenerator, List
|
||||
from openai import AsyncOpenAI, AsyncStream
|
||||
from dailyai.pipeline.frames import (
|
||||
Frame,
|
||||
LLMFunctionCallFrame,
|
||||
LLMFunctionStartFrame,
|
||||
LLMMessagesQueueFrame,
|
||||
LLMResponseEndFrame,
|
||||
LLMResponseStartFrame,
|
||||
OpenAILLMContextFrame,
|
||||
TextFrame,
|
||||
)
|
||||
from dailyai.services.ai_services import LLMService
|
||||
from dailyai.services.openai_llm_context import OpenAILLMContext
|
||||
|
||||
from openai.types.chat import (
|
||||
ChatCompletion,
|
||||
ChatCompletionChunk,
|
||||
ChatCompletionMessageParam,
|
||||
)
|
||||
|
||||
|
||||
class BaseOpenAILLMService(LLMService):
|
||||
"""This is the base for all services that use the AsyncOpenAI client.
|
||||
|
||||
This service consumes OpenAILLMContextFrame frames, which contain a reference
|
||||
to an OpenAILLMContext frame. The OpenAILLMContext object defines the context
|
||||
sent to the LLM for a completion. This includes user, assistant and system messages
|
||||
as well as tool choices and the tool, which is used if requesting function
|
||||
calls from the LLM.
|
||||
"""
|
||||
|
||||
def __init__(self, model: str, api_key=None, base_url=None):
|
||||
super().__init__()
|
||||
self._model: str = model
|
||||
self.create_client(api_key=api_key, base_url=base_url)
|
||||
|
||||
def create_client(self, api_key=None, base_url=None):
|
||||
self._client = AsyncOpenAI(api_key=api_key, base_url=base_url)
|
||||
|
||||
async def _stream_chat_completions(
|
||||
self, context: OpenAILLMContext
|
||||
) -> AsyncStream[ChatCompletionChunk]:
|
||||
messages: List[ChatCompletionMessageParam] = context.get_messages()
|
||||
messages_for_log = json.dumps(messages)
|
||||
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
|
||||
|
||||
start_time = time.time()
|
||||
chunks: AsyncStream[ChatCompletionChunk] = (
|
||||
await self._client.chat.completions.create(
|
||||
model=self._model,
|
||||
stream=True,
|
||||
messages=messages,
|
||||
tools=context.tools,
|
||||
tool_choice=context.tool_choice,
|
||||
)
|
||||
)
|
||||
self.logger.info(f"=== OpenAI LLM TTFB: {time.time() - start_time}")
|
||||
return chunks
|
||||
|
||||
async def _chat_completions(self, messages) -> str | None:
|
||||
messages_for_log = json.dumps(messages)
|
||||
self.logger.debug(f"Generating chat via openai: {messages_for_log}")
|
||||
|
||||
response: ChatCompletion = await self._client.chat.completions.create(
|
||||
model=self._model, stream=False, messages=messages
|
||||
)
|
||||
if response and len(response.choices) > 0:
|
||||
return response.choices[0].message.content
|
||||
else:
|
||||
return None
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
if isinstance(frame, OpenAILLMContextFrame):
|
||||
context: OpenAILLMContext = frame.context
|
||||
elif isinstance(frame, LLMMessagesQueueFrame):
|
||||
context = OpenAILLMContext.from_messages(frame.messages)
|
||||
else:
|
||||
yield frame
|
||||
return
|
||||
|
||||
function_name = ""
|
||||
arguments = ""
|
||||
|
||||
yield LLMResponseStartFrame()
|
||||
chunk_stream: AsyncStream[ChatCompletionChunk] = (
|
||||
await self._stream_chat_completions(context)
|
||||
)
|
||||
async for chunk in chunk_stream:
|
||||
if len(chunk.choices) == 0:
|
||||
continue
|
||||
|
||||
if chunk.choices[0].delta.tool_calls:
|
||||
# We're streaming the LLM response to enable the fastest response times.
|
||||
# For text, we just yield each chunk as we receive it and count on consumers
|
||||
# to do whatever coalescing they need (eg. to pass full sentences to TTS)
|
||||
#
|
||||
# If the LLM is a function call, we'll do some coalescing here.
|
||||
# If the response contains a function name, we'll yield a frame to tell consumers
|
||||
# that they can start preparing to call the function with that name.
|
||||
# We accumulate all the arguments for the rest of the streamed response, then when
|
||||
# the response is done, we package up all the arguments and the function name and
|
||||
# yield a frame containing the function name and the arguments.
|
||||
|
||||
tool_call = chunk.choices[0].delta.tool_calls[0]
|
||||
if tool_call.function and tool_call.function.name:
|
||||
function_name += tool_call.function.name
|
||||
yield LLMFunctionStartFrame(function_name=tool_call.function.name)
|
||||
if tool_call.function and tool_call.function.arguments:
|
||||
# Keep iterating through the response to collect all the argument fragments and
|
||||
# yield a complete LLMFunctionCallFrame after run_llm_async
|
||||
# completes
|
||||
arguments += tool_call.function.arguments
|
||||
elif chunk.choices[0].delta.content:
|
||||
yield TextFrame(chunk.choices[0].delta.content)
|
||||
|
||||
# if we got a function name and arguments, yield the frame with all the info so
|
||||
# frame consumers can take action based on the function call.
|
||||
if function_name and arguments:
|
||||
yield LLMFunctionCallFrame(function_name=function_name, arguments=arguments)
|
||||
|
||||
yield LLMResponseEndFrame()
|
||||
@@ -1,54 +0,0 @@
|
||||
from typing import List
|
||||
from openai._types import NOT_GIVEN, NotGiven
|
||||
|
||||
from openai.types.chat import (
|
||||
ChatCompletionToolParam,
|
||||
ChatCompletionToolChoiceOptionParam,
|
||||
ChatCompletionMessageParam,
|
||||
)
|
||||
|
||||
|
||||
class OpenAILLMContext:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
messages: List[ChatCompletionMessageParam] | None = None,
|
||||
tools: List[ChatCompletionToolParam] | NotGiven = NOT_GIVEN,
|
||||
tool_choice: ChatCompletionToolChoiceOptionParam | NotGiven = NOT_GIVEN
|
||||
):
|
||||
self.messages: List[ChatCompletionMessageParam] = messages if messages else [
|
||||
]
|
||||
self.tool_choice: ChatCompletionToolChoiceOptionParam | NotGiven = tool_choice
|
||||
self.tools: List[ChatCompletionToolParam] | NotGiven = tools
|
||||
|
||||
@staticmethod
|
||||
def from_messages(messages: List[dict]) -> "OpenAILLMContext":
|
||||
context = OpenAILLMContext()
|
||||
for message in messages:
|
||||
context.add_message({
|
||||
"content": message["content"],
|
||||
"role": message["role"],
|
||||
"name": message["name"] if "name" in message else message["role"]
|
||||
})
|
||||
return context
|
||||
|
||||
# def __deepcopy__(self, memo):
|
||||
|
||||
def add_message(self, message: ChatCompletionMessageParam):
|
||||
self.messages.append(message)
|
||||
|
||||
def get_messages(self) -> List[ChatCompletionMessageParam]:
|
||||
return self.messages
|
||||
|
||||
def set_tool_choice(
|
||||
self, tool_choice: ChatCompletionToolChoiceOptionParam | NotGiven
|
||||
):
|
||||
self.tool_choice = tool_choice
|
||||
|
||||
def set_tools(
|
||||
self,
|
||||
tools: List[ChatCompletionToolParam] | NotGiven = NOT_GIVEN):
|
||||
if tools != NOT_GIVEN and len(tools) == 0:
|
||||
tools = NOT_GIVEN
|
||||
|
||||
self.tools = tools
|
||||
@@ -17,10 +17,7 @@ class CloudflareAIService(AIService):
|
||||
|
||||
# base endpoint, used by the others
|
||||
def run(self, model, input):
|
||||
response = requests.post(
|
||||
f"{self.api_base_url}{model}",
|
||||
headers=self.headers,
|
||||
json=input)
|
||||
response = requests.post(f"{self.api_base_url}{model}", headers=self.headers, json=input)
|
||||
return response.json()
|
||||
|
||||
# https://developers.cloudflare.com/workers-ai/models/llm/
|
||||
@@ -44,8 +41,7 @@ class CloudflareAIService(AIService):
|
||||
|
||||
# https://developers.cloudflare.com/workers-ai/models/sentiment-analysis/
|
||||
def run_text_sentiment(self, sentence):
|
||||
return self.run("@cf/huggingface/distilbert-sst-2-int8",
|
||||
{"text": sentence})
|
||||
return self.run("@cf/huggingface/distilbert-sst-2-int8", {"text": sentence})
|
||||
|
||||
# https://developers.cloudflare.com/workers-ai/models/image-classification/
|
||||
def run_image_classification(self, image_url):
|
||||
|
||||
@@ -1,28 +0,0 @@
|
||||
import asyncio
|
||||
import os
|
||||
from dailyai.pipeline.frames import (
|
||||
OpenAILLMContextFrame,
|
||||
)
|
||||
from dailyai.services.azure_ai_services import AzureLLMService
|
||||
from dailyai.services.openai_llm_context import OpenAILLMContext
|
||||
|
||||
from openai.types.chat import (
|
||||
ChatCompletionSystemMessageParam,
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
async def test_chat():
|
||||
llm = AzureLLMService(
|
||||
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
|
||||
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
|
||||
model=os.getenv("AZURE_CHATGPT_MODEL"),
|
||||
)
|
||||
context = OpenAILLMContext()
|
||||
message: ChatCompletionSystemMessageParam = ChatCompletionSystemMessageParam(
|
||||
content="Please tell the world hello.", name="system", role="system")
|
||||
context.add_message(message)
|
||||
frame = OpenAILLMContextFrame(context)
|
||||
async for s in llm.process_frame(frame):
|
||||
print(s)
|
||||
|
||||
asyncio.run(test_chat())
|
||||
@@ -1,23 +0,0 @@
|
||||
import asyncio
|
||||
from dailyai.pipeline.frames import (
|
||||
OpenAILLMContextFrame,
|
||||
)
|
||||
from dailyai.services.openai_llm_context import OpenAILLMContext
|
||||
|
||||
from openai.types.chat import (
|
||||
ChatCompletionSystemMessageParam,
|
||||
)
|
||||
from dailyai.services.ollama_ai_services import OLLamaLLMService
|
||||
|
||||
if __name__ == "__main__":
|
||||
async def test_chat():
|
||||
llm = OLLamaLLMService()
|
||||
context = OpenAILLMContext()
|
||||
message: ChatCompletionSystemMessageParam = ChatCompletionSystemMessageParam(
|
||||
content="Please tell the world hello.", name="system", role="system")
|
||||
context.add_message(message)
|
||||
frame = OpenAILLMContextFrame(context)
|
||||
async for s in llm.process_frame(frame):
|
||||
print(s)
|
||||
|
||||
asyncio.run(test_chat())
|
||||
@@ -1,85 +0,0 @@
|
||||
import asyncio
|
||||
import os
|
||||
from dailyai.pipeline.frames import (
|
||||
OpenAILLMContextFrame,
|
||||
)
|
||||
from dailyai.services.openai_llm_context import OpenAILLMContext
|
||||
|
||||
from openai.types.chat import (
|
||||
ChatCompletionSystemMessageParam,
|
||||
ChatCompletionToolParam,
|
||||
ChatCompletionUserMessageParam,
|
||||
)
|
||||
|
||||
from dailyai.services.openai_api_llm_service import BaseOpenAILLMService
|
||||
|
||||
if __name__ == "__main__":
|
||||
async def test_functions():
|
||||
tools = [
|
||||
ChatCompletionToolParam(
|
||||
type="function",
|
||||
function={
|
||||
"name": "get_current_weather",
|
||||
"description": "Get the current weather",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
"format": {
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"celsius",
|
||||
"fahrenheit"],
|
||||
"description": "The temperature unit to use. Infer this from the users location.",
|
||||
},
|
||||
},
|
||||
"required": [
|
||||
"location",
|
||||
"format"],
|
||||
},
|
||||
})]
|
||||
|
||||
api_key = os.getenv("OPENAI_API_KEY")
|
||||
|
||||
llm = BaseOpenAILLMService(
|
||||
api_key=api_key or "",
|
||||
model="gpt-4-1106-preview",
|
||||
)
|
||||
context = OpenAILLMContext(tools=tools)
|
||||
system_message: ChatCompletionSystemMessageParam = ChatCompletionSystemMessageParam(
|
||||
content="Ask the user to ask for a weather report", name="system", role="system"
|
||||
)
|
||||
user_message: ChatCompletionUserMessageParam = ChatCompletionUserMessageParam(
|
||||
content="Could you tell me the weather for Boulder, Colorado",
|
||||
name="user",
|
||||
role="user",
|
||||
)
|
||||
context.add_message(system_message)
|
||||
context.add_message(user_message)
|
||||
frame = OpenAILLMContextFrame(context)
|
||||
async for s in llm.process_frame(frame):
|
||||
print(s)
|
||||
|
||||
async def test_chat():
|
||||
api_key = os.getenv("OPENAI_API_KEY")
|
||||
|
||||
llm = BaseOpenAILLMService(
|
||||
api_key=api_key or "",
|
||||
model="gpt-4-1106-preview",
|
||||
)
|
||||
context = OpenAILLMContext()
|
||||
message: ChatCompletionSystemMessageParam = ChatCompletionSystemMessageParam(
|
||||
content="Please tell the world hello.", name="system", role="system")
|
||||
context.add_message(message)
|
||||
frame = OpenAILLMContextFrame(context)
|
||||
async for s in llm.process_frame(frame):
|
||||
print(s)
|
||||
|
||||
async def run_tests():
|
||||
await test_functions()
|
||||
await test_chat()
|
||||
|
||||
asyncio.run(run_tests())
|
||||
@@ -1,129 +0,0 @@
|
||||
import asyncio
|
||||
import doctest
|
||||
import functools
|
||||
import unittest
|
||||
|
||||
from dailyai.pipeline.aggregators import (
|
||||
GatedAggregator,
|
||||
ParallelPipeline,
|
||||
SentenceAggregator,
|
||||
StatelessTextTransformer,
|
||||
)
|
||||
from dailyai.pipeline.frames import (
|
||||
AudioFrame,
|
||||
EndFrame,
|
||||
ImageFrame,
|
||||
LLMResponseEndFrame,
|
||||
LLMResponseStartFrame,
|
||||
Frame,
|
||||
TextFrame,
|
||||
)
|
||||
|
||||
from dailyai.pipeline.pipeline import Pipeline
|
||||
|
||||
|
||||
class TestDailyFrameAggregators(unittest.IsolatedAsyncioTestCase):
|
||||
async def test_sentence_aggregator(self):
|
||||
sentence = "Hello, world. How are you? I am fine"
|
||||
expected_sentences = ["Hello, world.", " How are you?", " I am fine "]
|
||||
aggregator = SentenceAggregator()
|
||||
for word in sentence.split(" "):
|
||||
async for sentence in aggregator.process_frame(TextFrame(word + " ")):
|
||||
self.assertIsInstance(sentence, TextFrame)
|
||||
if isinstance(sentence, TextFrame):
|
||||
self.assertEqual(sentence.text, expected_sentences.pop(0))
|
||||
|
||||
async for sentence in aggregator.process_frame(EndFrame()):
|
||||
if len(expected_sentences):
|
||||
self.assertIsInstance(sentence, TextFrame)
|
||||
if isinstance(sentence, TextFrame):
|
||||
self.assertEqual(sentence.text, expected_sentences.pop(0))
|
||||
else:
|
||||
self.assertIsInstance(sentence, EndFrame)
|
||||
|
||||
self.assertEqual(expected_sentences, [])
|
||||
|
||||
async def test_gated_accumulator(self):
|
||||
gated_aggregator = GatedAggregator(
|
||||
gate_open_fn=lambda frame: isinstance(
|
||||
frame, ImageFrame), gate_close_fn=lambda frame: isinstance(
|
||||
frame, LLMResponseStartFrame), start_open=False, )
|
||||
|
||||
frames = [
|
||||
LLMResponseStartFrame(),
|
||||
TextFrame("Hello, "),
|
||||
TextFrame("world."),
|
||||
AudioFrame(b"hello"),
|
||||
ImageFrame("image", b"image"),
|
||||
AudioFrame(b"world"),
|
||||
LLMResponseEndFrame(),
|
||||
]
|
||||
|
||||
expected_output_frames = [
|
||||
ImageFrame("image", b"image"),
|
||||
LLMResponseStartFrame(),
|
||||
TextFrame("Hello, "),
|
||||
TextFrame("world."),
|
||||
AudioFrame(b"hello"),
|
||||
AudioFrame(b"world"),
|
||||
LLMResponseEndFrame(),
|
||||
]
|
||||
for frame in frames:
|
||||
async for out_frame in gated_aggregator.process_frame(frame):
|
||||
self.assertEqual(out_frame, expected_output_frames.pop(0))
|
||||
self.assertEqual(expected_output_frames, [])
|
||||
|
||||
async def test_parallel_pipeline(self):
|
||||
|
||||
async def slow_add(sleep_time: float, name: str, x: str):
|
||||
await asyncio.sleep(sleep_time)
|
||||
return ":".join([x, name])
|
||||
|
||||
pipe1_annotation = StatelessTextTransformer(
|
||||
functools.partial(slow_add, 0.1, 'pipe1'))
|
||||
pipe2_annotation = StatelessTextTransformer(
|
||||
functools.partial(slow_add, 0.2, 'pipe2'))
|
||||
sentence_aggregator = SentenceAggregator()
|
||||
add_dots = StatelessTextTransformer(lambda x: x + ".")
|
||||
|
||||
source = asyncio.Queue()
|
||||
sink = asyncio.Queue()
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
ParallelPipeline(
|
||||
[[pipe1_annotation], [sentence_aggregator, pipe2_annotation]]
|
||||
),
|
||||
add_dots,
|
||||
],
|
||||
source,
|
||||
sink,
|
||||
)
|
||||
|
||||
frames = [
|
||||
TextFrame("Hello, "),
|
||||
TextFrame("world."),
|
||||
EndFrame()
|
||||
]
|
||||
|
||||
expected_output_frames: list[Frame] = [
|
||||
TextFrame(text='Hello, :pipe1.'),
|
||||
TextFrame(text='world.:pipe1.'),
|
||||
TextFrame(text='Hello, world.:pipe2.'),
|
||||
EndFrame()
|
||||
]
|
||||
|
||||
for frame in frames:
|
||||
await source.put(frame)
|
||||
|
||||
await pipeline.run_pipeline()
|
||||
|
||||
while not sink.empty():
|
||||
frame = await sink.get()
|
||||
self.assertEqual(frame, expected_output_frames.pop(0))
|
||||
|
||||
|
||||
def load_tests(loader, tests, ignore):
|
||||
""" Run doctests on the aggregators module. """
|
||||
from dailyai.pipeline import aggregators
|
||||
tests.addTests(doctest.DocTestSuite(aggregators))
|
||||
return tests
|
||||
@@ -3,27 +3,45 @@ import unittest
|
||||
from typing import AsyncGenerator, Generator
|
||||
|
||||
from dailyai.services.ai_services import AIService
|
||||
from dailyai.pipeline.frames import EndFrame, Frame, TextFrame
|
||||
from dailyai.queue_frame import EndStreamQueueFrame, QueueFrame, TextQueueFrame
|
||||
|
||||
|
||||
class SimpleAIService(AIService):
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
yield frame
|
||||
|
||||
|
||||
class TestBaseAIService(unittest.IsolatedAsyncioTestCase):
|
||||
async def test_simple_processing(self):
|
||||
async def test_async_input(self):
|
||||
service = SimpleAIService()
|
||||
|
||||
input_frames = [
|
||||
TextFrame("hello"),
|
||||
EndFrame()
|
||||
TextQueueFrame("hello"),
|
||||
EndStreamQueueFrame()
|
||||
]
|
||||
|
||||
async def iterate_frames() -> AsyncGenerator[QueueFrame, None]:
|
||||
for frame in input_frames:
|
||||
yield frame
|
||||
|
||||
output_frames = []
|
||||
for input_frame in input_frames:
|
||||
async for output_frame in service.process_frame(input_frame):
|
||||
output_frames.append(output_frame)
|
||||
async for frame in service.run(iterate_frames()):
|
||||
output_frames.append(frame)
|
||||
|
||||
self.assertEqual(input_frames, output_frames)
|
||||
|
||||
async def test_nonasync_input(self):
|
||||
service = SimpleAIService()
|
||||
|
||||
input_frames = [TextQueueFrame("hello"), EndStreamQueueFrame()]
|
||||
|
||||
def iterate_frames() -> Generator[QueueFrame, None, None]:
|
||||
for frame in input_frames:
|
||||
yield frame
|
||||
|
||||
output_frames = []
|
||||
async for frame in service.run(iterate_frames()):
|
||||
output_frames.append(frame)
|
||||
|
||||
self.assertEqual(input_frames, output_frames)
|
||||
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
import asyncio
|
||||
import threading
|
||||
import unittest
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from dailyai.pipeline.frames import AudioFrame, ImageFrame
|
||||
from dailyai.queue_frame import AudioQueueFrame, ImageQueueFrame
|
||||
|
||||
|
||||
class TestDailyTransport(unittest.IsolatedAsyncioTestCase):
|
||||
@@ -25,8 +24,6 @@ class TestDailyTransport(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
self.assertTrue(was_called)
|
||||
|
||||
"""
|
||||
TODO: fix this test, it broke when I added the `.result` call in the patch.
|
||||
async def test_event_handler_async(self):
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
|
||||
@@ -37,21 +34,14 @@ class TestDailyTransport(unittest.IsolatedAsyncioTestCase):
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def test_event_handler(transport):
|
||||
nonlocal event
|
||||
print("sleeping")
|
||||
await asyncio.sleep(0.1)
|
||||
print("setting")
|
||||
event.set()
|
||||
print("returning")
|
||||
|
||||
thread = threading.Thread(target=transport.on_first_other_participant_joined)
|
||||
thread.start()
|
||||
thread.join()
|
||||
transport.on_first_other_participant_joined()
|
||||
|
||||
await asyncio.wait_for(event.wait(), timeout=1)
|
||||
self.assertTrue(event.is_set())
|
||||
"""
|
||||
|
||||
"""
|
||||
@patch("dailyai.services.daily_transport_service.CallClient")
|
||||
@patch("dailyai.services.daily_transport_service.Daily")
|
||||
async def test_run_with_camera_and_mic(self, daily_mock, callclient_mock):
|
||||
@@ -89,4 +79,3 @@ class TestDailyTransport(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
camera.write_frame.assert_called_with(b"test")
|
||||
mic.write_frames.assert_called()
|
||||
"""
|
||||
|
||||
@@ -1,59 +0,0 @@
|
||||
import asyncio
|
||||
import unittest
|
||||
from dailyai.pipeline.aggregators import SentenceAggregator, StatelessTextTransformer
|
||||
from dailyai.pipeline.frames import EndFrame, TextFrame
|
||||
|
||||
from dailyai.pipeline.pipeline import Pipeline
|
||||
|
||||
|
||||
class TestDailyPipeline(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
async def test_pipeline_simple(self):
|
||||
aggregator = SentenceAggregator()
|
||||
|
||||
outgoing_queue = asyncio.Queue()
|
||||
incoming_queue = asyncio.Queue()
|
||||
pipeline = Pipeline([aggregator], incoming_queue, outgoing_queue)
|
||||
|
||||
await incoming_queue.put(TextFrame("Hello, "))
|
||||
await incoming_queue.put(TextFrame("world."))
|
||||
await incoming_queue.put(EndFrame())
|
||||
|
||||
await pipeline.run_pipeline()
|
||||
|
||||
self.assertEqual(await outgoing_queue.get(), TextFrame("Hello, world."))
|
||||
self.assertIsInstance(await outgoing_queue.get(), EndFrame)
|
||||
|
||||
async def test_pipeline_multiple_stages(self):
|
||||
sentence_aggregator = SentenceAggregator()
|
||||
to_upper = StatelessTextTransformer(lambda x: x.upper())
|
||||
add_space = StatelessTextTransformer(lambda x: x + " ")
|
||||
|
||||
outgoing_queue = asyncio.Queue()
|
||||
incoming_queue = asyncio.Queue()
|
||||
pipeline = Pipeline(
|
||||
[add_space, sentence_aggregator, to_upper],
|
||||
incoming_queue,
|
||||
outgoing_queue
|
||||
)
|
||||
|
||||
sentence = "Hello, world. It's me, a pipeline."
|
||||
for c in sentence:
|
||||
await incoming_queue.put(TextFrame(c))
|
||||
await incoming_queue.put(EndFrame())
|
||||
|
||||
await pipeline.run_pipeline()
|
||||
|
||||
self.assertEqual(
|
||||
await outgoing_queue.get(), TextFrame("H E L L O , W O R L D .")
|
||||
)
|
||||
self.assertEqual(
|
||||
await outgoing_queue.get(),
|
||||
TextFrame(" I T ' S M E , A P I P E L I N E ."),
|
||||
)
|
||||
# leftover little bit because of the spacing
|
||||
self.assertEqual(
|
||||
await outgoing_queue.get(),
|
||||
TextFrame(" "),
|
||||
)
|
||||
self.assertIsInstance(await outgoing_queue.get(), EndFrame)
|
||||
@@ -1,49 +1,62 @@
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import logging
|
||||
import os
|
||||
from dailyai.pipeline.frames import EndFrame, TextFrame
|
||||
from dailyai.pipeline.pipeline import Pipeline
|
||||
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from dailyai.services.playht_ai_service import PlayHTAIService
|
||||
|
||||
from examples.support.runner import configure
|
||||
|
||||
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
|
||||
logger = logging.getLogger("dailyai")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
from examples.foundational.support.runner import configure
|
||||
|
||||
|
||||
async def main(room_url):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
# create a transport service object using environment variables for
|
||||
# the transport service's API key, room url, and any other configuration.
|
||||
# services can all define and document the environment variables they use.
|
||||
# services all also take an optional config object that is used instead of
|
||||
# environment variables.
|
||||
#
|
||||
# the abstract transport service APIs presumably can map pretty closely
|
||||
# to the daily-python basic API
|
||||
meeting_duration_minutes = 5
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
None,
|
||||
"Say One Thing",
|
||||
mic_enabled=True,
|
||||
meeting_duration_minutes,
|
||||
mic_enabled=True
|
||||
)
|
||||
|
||||
"""
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"))
|
||||
"""
|
||||
tts = PlayHTAIService(
|
||||
api_key=os.getenv("PLAY_HT_API_KEY"),
|
||||
user_id=os.getenv("PLAY_HT_USER_ID"),
|
||||
voice_url=os.getenv("PLAY_HT_VOICE_URL"),
|
||||
)
|
||||
|
||||
pipeline = Pipeline([tts])
|
||||
|
||||
# Register an event handler so we can play the audio when the
|
||||
# participant joins.
|
||||
# Register an event handler so we can play the audio when the participant joins.
|
||||
@transport.event_handler("on_participant_joined")
|
||||
async def on_participant_joined(transport, participant):
|
||||
nonlocal tts
|
||||
if participant["info"]["isLocal"]:
|
||||
return
|
||||
|
||||
participant_name = participant["info"]["userName"] or ''
|
||||
await pipeline.queue_frames([TextFrame("Hello there, " + participant_name + "!"), EndFrame()])
|
||||
await tts.say(
|
||||
"Hello there, " + participant["info"]["userName"] + "!",
|
||||
transport.send_queue,
|
||||
)
|
||||
|
||||
await transport.run(pipeline)
|
||||
del tts
|
||||
# wait for the output queue to be empty, then leave the meeting
|
||||
await transport.stop_when_done()
|
||||
|
||||
await transport.run()
|
||||
del(tts)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,21 +1,17 @@
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import logging
|
||||
import os
|
||||
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from dailyai.services.local_transport_service import LocalTransportService
|
||||
|
||||
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
|
||||
logger = logging.getLogger("dailyai")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
meeting_duration_minutes = 1
|
||||
transport = LocalTransportService(
|
||||
duration_minutes=meeting_duration_minutes, mic_enabled=True
|
||||
duration_minutes=meeting_duration_minutes,
|
||||
mic_enabled=True
|
||||
)
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
|
||||
@@ -1,54 +1,58 @@
|
||||
import asyncio
|
||||
import os
|
||||
import logging
|
||||
|
||||
import aiohttp
|
||||
|
||||
from dailyai.pipeline.frames import EndFrame, LLMMessagesQueueFrame
|
||||
from dailyai.pipeline.pipeline import Pipeline
|
||||
from dailyai.queue_frame import LLMMessagesQueueFrame
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from dailyai.services.deepgram_ai_services import DeepgramTTSService
|
||||
from dailyai.services.open_ai_services import OpenAILLMService
|
||||
|
||||
from examples.support.runner import configure
|
||||
|
||||
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
|
||||
logger = logging.getLogger("dailyai")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
from examples.foundational.support.runner import configure
|
||||
|
||||
|
||||
async def main(room_url):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
meeting_duration_minutes = 1
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
None,
|
||||
"Say One Thing From an LLM",
|
||||
duration_minutes=meeting_duration_minutes,
|
||||
mic_enabled=True,
|
||||
speaker_enabled=True
|
||||
)
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"))
|
||||
# tts = AzureTTSService(api_key=os.getenv("AZURE_SPEECH_API_KEY"), region=os.getenv("AZURE_SPEECH_REGION"))
|
||||
# tts = DeepgramTTSService(aiohttp_session=session, api_key=os.getenv("DEEPGRAM_API_KEY"), voice=os.getenv("DEEPGRAM_VOICE"))
|
||||
|
||||
llm = AzureLLMService(
|
||||
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
|
||||
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
|
||||
model=os.getenv("AZURE_CHATGPT_MODEL"))
|
||||
# llm = OpenAILLMService(api_key=os.getenv("OPENAI_CHATGPT_API_KEY"))
|
||||
messages = [{
|
||||
"role": "system",
|
||||
"content": "You are an LLM in a WebRTC session, and this is a 'hello world' demo. Say hello to the world."
|
||||
}]
|
||||
tts_task = asyncio.create_task(
|
||||
tts.run_to_queue(
|
||||
transport.send_queue,
|
||||
llm.run([LLMMessagesQueueFrame(messages)]),
|
||||
)
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_CHATGPT_API_KEY"),
|
||||
model="gpt-4-turbo-preview")
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are an LLM in a WebRTC session, and this is a 'hello world' demo. Say hello to the world.",
|
||||
}]
|
||||
|
||||
pipeline = Pipeline([llm, tts])
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
await pipeline.queue_frames([LLMMessagesQueueFrame(messages), EndFrame()])
|
||||
await tts_task
|
||||
await transport.stop_when_done()
|
||||
|
||||
await transport.run(pipeline)
|
||||
await transport.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,52 +1,51 @@
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import logging
|
||||
import os
|
||||
|
||||
from dailyai.pipeline.frames import EndFrame, TextFrame
|
||||
from dailyai.pipeline.pipeline import Pipeline
|
||||
from dailyai.queue_frame import TextQueueFrame
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.fal_ai_services import FalImageGenService
|
||||
from dailyai.services.open_ai_services import OpenAIImageGenService
|
||||
from dailyai.services.azure_ai_services import AzureImageGenServiceREST
|
||||
|
||||
from examples.support.runner import configure
|
||||
from examples.foundational.support.runner import configure
|
||||
|
||||
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
|
||||
logger = logging.getLogger("dailyai")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
local_joined = False
|
||||
participant_joined = False
|
||||
|
||||
|
||||
async def main(room_url):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
meeting_duration_minutes = 1
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
None,
|
||||
"Show a still frame image",
|
||||
duration_minutes=meeting_duration_minutes,
|
||||
mic_enabled=False,
|
||||
camera_enabled=True,
|
||||
camera_width=1024,
|
||||
camera_height=1024,
|
||||
duration_minutes=1
|
||||
camera_height=1024
|
||||
)
|
||||
|
||||
imagegen = FalImageGenService(
|
||||
image_size="square_hd",
|
||||
image_size="1024x1024",
|
||||
aiohttp_session=session,
|
||||
key_id=os.getenv("FAL_KEY_ID"),
|
||||
key_secret=os.getenv("FAL_KEY_SECRET"),
|
||||
)
|
||||
key_secret=os.getenv("FAL_KEY_SECRET"))
|
||||
# imagegen = OpenAIImageGenService(aiohttp_session=session, api_key=os.getenv("OPENAI_DALLE_API_KEY"), image_size="1024x1024")
|
||||
# imagegen = AzureImageGenServiceREST(image_size="1024x1024", aiohttp_session=session, api_key=os.getenv("AZURE_DALLE_API_KEY"), endpoint=os.getenv("AZURE_DALLE_ENDPOINT"), model=os.getenv("AZURE_DALLE_MODEL"))
|
||||
|
||||
pipeline = Pipeline([imagegen])
|
||||
image_task = asyncio.create_task(
|
||||
imagegen.run_to_queue(
|
||||
transport.send_queue, [
|
||||
TextQueueFrame("a cat in the style of picasso")]))
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
# Note that we do not put an EndFrame() item in the pipeline for this demo.
|
||||
# This means that the bot will stay in the channel until it times out.
|
||||
# An EndFrame() in the pipeline would cause the transport to shut
|
||||
# down.
|
||||
await pipeline.queue_frames(
|
||||
[TextFrame("a cat in the style of picasso")]
|
||||
)
|
||||
await image_task
|
||||
|
||||
await transport.run(pipeline)
|
||||
await transport.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,18 +1,13 @@
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import logging
|
||||
import os
|
||||
|
||||
import tkinter as tk
|
||||
|
||||
from dailyai.pipeline.frames import TextFrame
|
||||
from dailyai.queue_frame import TextQueueFrame
|
||||
from dailyai.services.fal_ai_services import FalImageGenService
|
||||
from dailyai.services.local_transport_service import LocalTransportService
|
||||
|
||||
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
|
||||
logger = logging.getLogger("dailyai")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
local_joined = False
|
||||
participant_joined = False
|
||||
|
||||
@@ -39,8 +34,9 @@ async def main():
|
||||
)
|
||||
image_task = asyncio.create_task(
|
||||
imagegen.run_to_queue(
|
||||
transport.send_queue, [
|
||||
TextFrame("a cat in the style of picasso")]))
|
||||
transport.send_queue, [TextQueueFrame("a cat in the style of picasso")]
|
||||
)
|
||||
)
|
||||
|
||||
async def run_tk():
|
||||
while not transport._stop_threads.is_set():
|
||||
@@ -50,6 +46,5 @@ async def main():
|
||||
|
||||
await asyncio.gather(transport.run(), image_task, run_tk())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
|
||||
@@ -1,21 +1,14 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
|
||||
import aiohttp
|
||||
from dailyai.pipeline.merge_pipeline import SequentialMergePipeline
|
||||
from dailyai.pipeline.pipeline import Pipeline
|
||||
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.services.deepgram_ai_services import DeepgramTTSService
|
||||
from dailyai.pipeline.frames import EndFrame, EndPipeFrame, LLMMessagesQueueFrame, TextFrame
|
||||
from dailyai.queue_frame import EndStreamQueueFrame, LLMMessagesQueueFrame
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from examples.support.runner import configure
|
||||
|
||||
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
|
||||
logger = logging.getLogger("dailyai")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
from examples.foundational.support.runner import configure
|
||||
|
||||
|
||||
async def main(room_url: str):
|
||||
@@ -27,53 +20,52 @@ async def main(room_url: str):
|
||||
duration_minutes=1,
|
||||
mic_enabled=True,
|
||||
mic_sample_rate=16000,
|
||||
camera_enabled=False
|
||||
)
|
||||
|
||||
llm = AzureLLMService(
|
||||
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
|
||||
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
|
||||
model=os.getenv("AZURE_CHATGPT_MODEL"),
|
||||
)
|
||||
model=os.getenv("AZURE_CHATGPT_MODEL"))
|
||||
azure_tts = AzureTTSService(
|
||||
api_key=os.getenv("AZURE_SPEECH_API_KEY"),
|
||||
region=os.getenv("AZURE_SPEECH_REGION"),
|
||||
)
|
||||
|
||||
deepgram_tts = DeepgramTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("DEEPGRAM_API_KEY"),
|
||||
)
|
||||
region=os.getenv("AZURE_SPEECH_REGION"))
|
||||
elevenlabs_tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
|
||||
)
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"))
|
||||
|
||||
messages = [{"role": "system",
|
||||
"content": "tell the user a joke about llamas"}]
|
||||
messages = [{"role": "system", "content": "tell the user a joke about llamas"}]
|
||||
|
||||
# Start a task to run the LLM to create a joke, and convert the LLM output to audio frames. This task
|
||||
# will run in parallel with generating and speaking the audio for static text, so there's no delay to
|
||||
# speak the LLM response.
|
||||
llm_pipeline = Pipeline([llm, elevenlabs_tts])
|
||||
await llm_pipeline.queue_frames([LLMMessagesQueueFrame(messages), EndPipeFrame()])
|
||||
|
||||
simple_tts_pipeline = Pipeline([azure_tts])
|
||||
await simple_tts_pipeline.queue_frames(
|
||||
[
|
||||
TextFrame("My friend the LLM is going to tell a joke about llamas"),
|
||||
EndPipeFrame(),
|
||||
]
|
||||
buffer_queue = asyncio.Queue()
|
||||
llm_response_task = asyncio.create_task(
|
||||
elevenlabs_tts.run_to_queue(
|
||||
buffer_queue,
|
||||
llm.run([LLMMessagesQueueFrame(messages)]),
|
||||
True,
|
||||
)
|
||||
)
|
||||
|
||||
merge_pipeline = SequentialMergePipeline(
|
||||
[simple_tts_pipeline, llm_pipeline])
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
await azure_tts.say("My friend the LLM is now going to tell a joke about llamas.", transport.send_queue)
|
||||
|
||||
await asyncio.gather(
|
||||
transport.run(merge_pipeline),
|
||||
simple_tts_pipeline.run_pipeline(),
|
||||
llm_pipeline.run_pipeline(),
|
||||
)
|
||||
async def buffer_to_send_queue():
|
||||
while True:
|
||||
frame = await buffer_queue.get()
|
||||
await transport.send_queue.put(frame)
|
||||
buffer_queue.task_done()
|
||||
if isinstance(frame, EndStreamQueueFrame):
|
||||
break
|
||||
|
||||
await asyncio.gather(llm_response_task, buffer_to_send_queue())
|
||||
|
||||
await transport.stop_when_done()
|
||||
|
||||
await transport.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,143 +1,133 @@
|
||||
import asyncio
|
||||
from re import S
|
||||
import aiohttp
|
||||
import os
|
||||
import logging
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from dailyai.pipeline.aggregators import (
|
||||
GatedAggregator,
|
||||
LLMFullResponseAggregator,
|
||||
ParallelPipeline,
|
||||
SentenceAggregator,
|
||||
)
|
||||
from dailyai.pipeline.frames import (
|
||||
Frame,
|
||||
TextFrame,
|
||||
EndFrame,
|
||||
ImageFrame,
|
||||
LLMMessagesQueueFrame,
|
||||
LLMResponseStartFrame,
|
||||
)
|
||||
from dailyai.pipeline.frame_processor import FrameProcessor
|
||||
|
||||
from dailyai.pipeline.pipeline import Pipeline
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.open_ai_services import OpenAILLMService
|
||||
from dailyai.queue_frame import AudioQueueFrame, ImageQueueFrame
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureImageGenServiceREST, AzureTTSService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.fal_ai_services import FalImageGenService
|
||||
from dailyai.services.open_ai_services import OpenAIImageGenService
|
||||
|
||||
from examples.support.runner import configure
|
||||
|
||||
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
|
||||
logger = logging.getLogger("dailyai")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MonthFrame(Frame):
|
||||
month: str
|
||||
|
||||
|
||||
class MonthPrepender(FrameProcessor):
|
||||
def __init__(self):
|
||||
self.most_recent_month = "Placeholder, month frame not yet received"
|
||||
self.prepend_to_next_text_frame = False
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
if isinstance(frame, MonthFrame):
|
||||
self.most_recent_month = frame.month
|
||||
elif self.prepend_to_next_text_frame and isinstance(frame, TextFrame):
|
||||
yield TextFrame(f"{self.most_recent_month}: {frame.text}")
|
||||
self.prepend_to_next_text_frame = False
|
||||
elif isinstance(frame, LLMResponseStartFrame):
|
||||
self.prepend_to_next_text_frame = True
|
||||
yield frame
|
||||
else:
|
||||
yield frame
|
||||
from examples.foundational.support.runner import configure
|
||||
|
||||
|
||||
async def main(room_url):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
meeting_duration_minutes = 5
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
None,
|
||||
"Month Narration Bot",
|
||||
duration_minutes=meeting_duration_minutes,
|
||||
mic_enabled=True,
|
||||
camera_enabled=True,
|
||||
mic_sample_rate=16000,
|
||||
camera_width=1024,
|
||||
camera_height=1024,
|
||||
camera_height=1024
|
||||
)
|
||||
|
||||
llm = AzureLLMService(
|
||||
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
|
||||
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
|
||||
model=os.getenv("AZURE_CHATGPT_MODEL"))
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
|
||||
)
|
||||
voice_id="ErXwobaYiN019PkySvjV")
|
||||
# tts = AzureTTSService(api_key=os.getenv("AZURE_SPEECH_API_KEY"), region=os.getenv("AZURE_SPEECH_REGION"))
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_CHATGPT_API_KEY"),
|
||||
model="gpt-4-turbo-preview")
|
||||
|
||||
imagegen = FalImageGenService(
|
||||
image_size="square_hd",
|
||||
dalle = FalImageGenService(
|
||||
image_size="1024x1024",
|
||||
aiohttp_session=session,
|
||||
key_id=os.getenv("FAL_KEY_ID"),
|
||||
key_secret=os.getenv("FAL_KEY_SECRET"),
|
||||
)
|
||||
key_secret=os.getenv("FAL_KEY_SECRET"))
|
||||
# dalle = OpenAIImageGenService(aiohttp_session=session, api_key=os.getenv("OPENAI_DALLE_API_KEY"), image_size="1024x1024")
|
||||
# dalle = AzureImageGenServiceREST(image_size="1024x1024", aiohttp_session=session, api_key=os.getenv("AZURE_DALLE_API_KEY"), endpoint=os.getenv("AZURE_DALLE_ENDPOINT"), model=os.getenv("AZURE_DALLE_MODEL"))
|
||||
|
||||
gated_aggregator = GatedAggregator(
|
||||
gate_open_fn=lambda frame: isinstance(
|
||||
frame, ImageFrame), gate_close_fn=lambda frame: isinstance(
|
||||
frame, LLMResponseStartFrame), start_open=False, )
|
||||
# Get a complete audio chunk from the given text. Splitting this into its own
|
||||
# coroutine lets us ensure proper ordering of the audio chunks on the send queue.
|
||||
async def get_all_audio(text):
|
||||
all_audio = bytearray()
|
||||
async for audio in tts.run_tts(text):
|
||||
all_audio.extend(audio)
|
||||
|
||||
sentence_aggregator = SentenceAggregator()
|
||||
month_prepender = MonthPrepender()
|
||||
llm_full_response_aggregator = LLMFullResponseAggregator()
|
||||
return all_audio
|
||||
|
||||
pipeline = Pipeline(
|
||||
processors=[
|
||||
llm,
|
||||
sentence_aggregator,
|
||||
ParallelPipeline(
|
||||
[[month_prepender, tts], [llm_full_response_aggregator, imagegen]]
|
||||
),
|
||||
gated_aggregator,
|
||||
],
|
||||
)
|
||||
|
||||
frames = []
|
||||
for month in [
|
||||
"January",
|
||||
"February",
|
||||
"March",
|
||||
"April",
|
||||
"May",
|
||||
"June",
|
||||
"July",
|
||||
"August",
|
||||
"September",
|
||||
"October",
|
||||
"November",
|
||||
"December",
|
||||
]:
|
||||
async def get_month_data(month):
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": f"Describe a nature photograph suitable for use in a calendar, for the month of {month}. Include only the image description with no preamble. Limit the description to one sentence, please.",
|
||||
}
|
||||
]
|
||||
frames.append(MonthFrame(month))
|
||||
frames.append(LLMMessagesQueueFrame(messages))
|
||||
|
||||
frames.append(EndFrame())
|
||||
await pipeline.queue_frames(frames)
|
||||
image_description = await llm.run_llm(messages)
|
||||
if not image_description:
|
||||
return
|
||||
|
||||
await transport.run(pipeline, override_pipeline_source_queue=False)
|
||||
to_speak = f"{month}: {image_description}"
|
||||
audio_task = asyncio.create_task(get_all_audio(to_speak))
|
||||
image_task = asyncio.create_task(dalle.run_image_gen(image_description))
|
||||
print(f"about to gather tasks for {month}")
|
||||
(audio, image_data) = await asyncio.gather(
|
||||
audio_task, image_task
|
||||
)
|
||||
print(f"about to return from get_month_data for {month}")
|
||||
return {
|
||||
"month": month,
|
||||
"text": image_description,
|
||||
"image_url": image_data[0],
|
||||
"image": image_data[1],
|
||||
"audio": audio,
|
||||
}
|
||||
|
||||
months: list[str] = [
|
||||
"January",
|
||||
"February",
|
||||
"March",
|
||||
"April",
|
||||
"May",
|
||||
"June"
|
||||
]
|
||||
"""
|
||||
"February",
|
||||
"March",
|
||||
"April",
|
||||
"May",
|
||||
"June",
|
||||
"July",
|
||||
"August",
|
||||
"September",
|
||||
"October",
|
||||
"November",
|
||||
"December",
|
||||
"""
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
# This will play the months in the order they're completed. The benefit
|
||||
# is we'll have as little delay as possible before the first month, and
|
||||
# likely no delay between months, but the months won't display in order.
|
||||
for month_data_task in asyncio.as_completed(month_tasks):
|
||||
print(f"month_data_task: {month_data_task}")
|
||||
try:
|
||||
data = await month_data_task
|
||||
except Exception:
|
||||
print("OMG EXCEPTION!!!!")
|
||||
if data:
|
||||
await transport.send_queue.put(
|
||||
[
|
||||
ImageQueueFrame(data["image_url"], data["image"]),
|
||||
AudioQueueFrame(data["audio"]),
|
||||
]
|
||||
)
|
||||
|
||||
# wait for the output queue to be empty, then leave the meeting
|
||||
await transport.stop_when_done()
|
||||
|
||||
month_tasks = [asyncio.create_task(get_month_data(month)) for month in months]
|
||||
|
||||
await transport.run()
|
||||
|
||||
if __name__ == "__main__":
|
||||
(url, token) = configure()
|
||||
|
||||
@@ -1,20 +1,15 @@
|
||||
import aiohttp
|
||||
import argparse
|
||||
import asyncio
|
||||
import logging
|
||||
import tkinter as tk
|
||||
import os
|
||||
|
||||
from dailyai.pipeline.frames import AudioFrame, ImageFrame
|
||||
from dailyai.services.open_ai_services import OpenAILLMService
|
||||
from dailyai.queue_frame import AudioQueueFrame, ImageQueueFrame
|
||||
from dailyai.services.azure_ai_services import AzureLLMService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from dailyai.services.fal_ai_services import FalImageGenService
|
||||
from dailyai.services.local_transport_service import LocalTransportService
|
||||
|
||||
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
|
||||
logger = logging.getLogger("dailyai")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
async def main(room_url):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
@@ -31,16 +26,16 @@ async def main(room_url):
|
||||
tk_root=tk_root,
|
||||
)
|
||||
|
||||
llm = AzureLLMService(
|
||||
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
|
||||
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
|
||||
model=os.getenv("AZURE_CHATGPT_MODEL"),
|
||||
)
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
|
||||
voice_id="ErXwobaYiN019PkySvjV",
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_CHATGPT_API_KEY"),
|
||||
model="gpt-4-turbo-preview")
|
||||
|
||||
dalle = FalImageGenService(
|
||||
image_size="1024x1024",
|
||||
aiohttp_session=session,
|
||||
@@ -49,8 +44,7 @@ async def main(room_url):
|
||||
)
|
||||
|
||||
# Get a complete audio chunk from the given text. Splitting this into its own
|
||||
# coroutine lets us ensure proper ordering of the audio chunks on the
|
||||
# send queue.
|
||||
# coroutine lets us ensure proper ordering of the audio chunks on the send queue.
|
||||
async def get_all_audio(text):
|
||||
all_audio = bytearray()
|
||||
async for audio in tts.run_tts(text):
|
||||
@@ -72,9 +66,10 @@ async def main(room_url):
|
||||
|
||||
to_speak = f"{month}: {image_description}"
|
||||
audio_task = asyncio.create_task(get_all_audio(to_speak))
|
||||
image_task = asyncio.create_task(
|
||||
dalle.run_image_gen(image_description))
|
||||
(audio, image_data) = await asyncio.gather(audio_task, image_task)
|
||||
image_task = asyncio.create_task(dalle.run_image_gen(image_description))
|
||||
(audio, image_data) = await asyncio.gather(
|
||||
audio_task, image_task
|
||||
)
|
||||
|
||||
return {
|
||||
"month": month,
|
||||
@@ -102,15 +97,14 @@ async def main(room_url):
|
||||
async def show_images():
|
||||
# This will play the months in the order they're completed. The benefit
|
||||
# is we'll have as little delay as possible before the first month, and
|
||||
# likely no delay between months, but the months won't display in
|
||||
# order.
|
||||
# likely no delay between months, but the months won't display in order.
|
||||
for month_data_task in asyncio.as_completed(month_tasks):
|
||||
data = await month_data_task
|
||||
if data:
|
||||
await transport.send_queue.put(
|
||||
[
|
||||
ImageFrame(data["image_url"], data["image"]),
|
||||
AudioFrame(data["audio"]),
|
||||
ImageQueueFrame(data["image_url"], data["image"]),
|
||||
AudioQueueFrame(data["audio"]),
|
||||
]
|
||||
)
|
||||
|
||||
@@ -125,21 +119,15 @@ async def main(room_url):
|
||||
tk_root.update_idletasks()
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
month_tasks = [
|
||||
asyncio.create_task(
|
||||
get_month_data(month)) for month in months]
|
||||
month_tasks = [asyncio.create_task(get_month_data(month)) for month in months]
|
||||
|
||||
await asyncio.gather(transport.run(), show_images(), run_tk())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
|
||||
parser.add_argument(
|
||||
"-u",
|
||||
"--url",
|
||||
type=str,
|
||||
required=True,
|
||||
help="URL of the Daily room to join")
|
||||
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
|
||||
)
|
||||
|
||||
args, unknown = parser.parse_known_args()
|
||||
|
||||
|
||||
@@ -1,83 +1,68 @@
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import logging
|
||||
import os
|
||||
from dailyai.pipeline.frames import LLMMessagesQueueFrame
|
||||
from dailyai.pipeline.pipeline import Pipeline
|
||||
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from dailyai.services.open_ai_services import OpenAILLMService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.queue_aggregators import LLMAssistantContextAggregator, LLMContextAggregator, LLMUserContextAggregator
|
||||
from examples.foundational.support.runner import configure
|
||||
from dailyai.services.ai_services import FrameLogger
|
||||
from dailyai.pipeline.aggregators import (
|
||||
LLMAssistantContextAggregator,
|
||||
LLMUserContextAggregator,
|
||||
)
|
||||
from examples.support.runner import configure
|
||||
|
||||
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
|
||||
logger = logging.getLogger("dailyai")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
async def main(room_url: str, token):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
duration_minutes=5,
|
||||
start_transcription=True,
|
||||
mic_enabled=True,
|
||||
mic_sample_rate=16000,
|
||||
camera_enabled=False,
|
||||
vad_enabled=True,
|
||||
)
|
||||
context = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
duration_minutes=5,
|
||||
start_transcription=True,
|
||||
mic_enabled=True,
|
||||
mic_sample_rate=16000,
|
||||
camera_enabled=False,
|
||||
speaker_enabled=True,
|
||||
context=context
|
||||
)
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
|
||||
)
|
||||
llm = AzureLLMService(
|
||||
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
|
||||
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
|
||||
model=os.getenv("AZURE_CHATGPT_MODEL"))
|
||||
tts = AzureTTSService(
|
||||
api_key=os.getenv("AZURE_SPEECH_API_KEY"),
|
||||
region=os.getenv("AZURE_SPEECH_REGION"))
|
||||
fl = FrameLogger("transport")
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_CHATGPT_API_KEY"),
|
||||
model="gpt-4-turbo-preview")
|
||||
fl = FrameLogger("Inner")
|
||||
fl2 = FrameLogger("Outer")
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
await tts.say("Hi, I'm listening!", transport.send_queue)
|
||||
|
||||
async def handle_transcriptions():
|
||||
|
||||
tma_in = LLMUserContextAggregator(
|
||||
messages, transport._my_participant_id)
|
||||
context, transport._my_participant_id)
|
||||
tma_out = LLMAssistantContextAggregator(
|
||||
messages, transport._my_participant_id
|
||||
)
|
||||
pipeline = Pipeline(
|
||||
processors=[
|
||||
fl,
|
||||
tma_in,
|
||||
llm,
|
||||
fl2,
|
||||
tts,
|
||||
tma_out,
|
||||
],
|
||||
context, transport._my_participant_id)
|
||||
await tts.run_to_queue(
|
||||
transport.send_queue,
|
||||
tma_out.run(
|
||||
llm.run(
|
||||
tma_in.run(
|
||||
fl.run(
|
||||
transport.get_receive_frames()
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
# Kick off the conversation.
|
||||
messages.append(
|
||||
{"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await pipeline.queue_frames([LLMMessagesQueueFrame(messages)])
|
||||
|
||||
transport.transcription_settings["extra"]["endpointing"] = True
|
||||
transport.transcription_settings["extra"]["punctuate"] = True
|
||||
await transport.run(pipeline)
|
||||
transport.transcription_settings["extra"]["punctuate"] = True
|
||||
transport.transcription_settings["extra"]["endpointing"] = True
|
||||
await asyncio.gather(transport.run(), handle_transcriptions())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,29 +1,22 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
import os
|
||||
import logging
|
||||
from typing import AsyncGenerator
|
||||
import aiohttp
|
||||
import requests
|
||||
import time
|
||||
import urllib.parse
|
||||
|
||||
from PIL import Image
|
||||
from dailyai.queue_frame import ImageQueueFrame, QueueFrame
|
||||
|
||||
from dailyai.pipeline.frames import ImageFrame, Frame
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.services.ai_services import AIService
|
||||
from dailyai.pipeline.aggregators import (
|
||||
LLMAssistantContextAggregator,
|
||||
LLMUserContextAggregator,
|
||||
)
|
||||
from dailyai.services.open_ai_services import OpenAILLMService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from dailyai.queue_aggregators import LLMAssistantContextAggregator, LLMUserContextAggregator
|
||||
from dailyai.services.fal_ai_services import FalImageGenService
|
||||
from examples.support.runner import configure
|
||||
|
||||
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
|
||||
logger = logging.getLogger("dailyai")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
from examples.foundational.support.runner import configure
|
||||
|
||||
|
||||
class ImageSyncAggregator(AIService):
|
||||
@@ -34,10 +27,10 @@ class ImageSyncAggregator(AIService):
|
||||
self._waiting_image = Image.open(waiting_path)
|
||||
self._waiting_image_bytes = self._waiting_image.tobytes()
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
yield ImageFrame(None, self._speaking_image_bytes)
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
yield ImageQueueFrame(None, self._speaking_image_bytes)
|
||||
yield frame
|
||||
yield ImageFrame(None, self._waiting_image_bytes)
|
||||
yield ImageQueueFrame(None, self._waiting_image_bytes)
|
||||
|
||||
|
||||
async def main(room_url: str, token):
|
||||
@@ -54,22 +47,18 @@ async def main(room_url: str, token):
|
||||
transport._mic_enabled = True
|
||||
transport._mic_sample_rate = 16000
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_CHATGPT_API_KEY"),
|
||||
model="gpt-4-turbo-preview")
|
||||
|
||||
llm = AzureLLMService(
|
||||
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
|
||||
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
|
||||
model=os.getenv("AZURE_CHATGPT_MODEL"))
|
||||
tts = AzureTTSService(
|
||||
api_key=os.getenv("AZURE_SPEECH_API_KEY"),
|
||||
region=os.getenv("AZURE_SPEECH_REGION"))
|
||||
img = FalImageGenService(
|
||||
image_size="1024x1024",
|
||||
aiohttp_session=session,
|
||||
key_id=os.getenv("FAL_KEY_ID"),
|
||||
key_secret=os.getenv("FAL_KEY_SECRET"),
|
||||
)
|
||||
key_secret=os.getenv("FAL_KEY_SECRET"))
|
||||
|
||||
async def get_images():
|
||||
get_speaking_task = asyncio.create_task(
|
||||
@@ -91,26 +80,30 @@ async def main(room_url: str, token):
|
||||
|
||||
async def handle_transcriptions():
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
{"role": "system", "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way."},
|
||||
]
|
||||
|
||||
tma_in = LLMUserContextAggregator(
|
||||
messages, transport._my_participant_id)
|
||||
messages, transport._my_participant_id
|
||||
)
|
||||
tma_out = LLMAssistantContextAggregator(
|
||||
messages, transport._my_participant_id
|
||||
)
|
||||
image_sync_aggregator = ImageSyncAggregator(
|
||||
os.path.join(
|
||||
os.path.dirname(__file__), "assets", "speaking.png"), os.path.join(
|
||||
os.path.dirname(__file__), "assets", "waiting.png"), )
|
||||
os.path.join(os.path.dirname(__file__), "assets", "speaking.png"),
|
||||
os.path.join(os.path.dirname(__file__), "assets", "waiting.png"),
|
||||
)
|
||||
await tts.run_to_queue(
|
||||
transport.send_queue,
|
||||
image_sync_aggregator.run(
|
||||
tma_out.run(llm.run(tma_in.run(transport.get_receive_frames())))
|
||||
),
|
||||
tma_out.run(
|
||||
llm.run(
|
||||
tma_in.run(
|
||||
transport.get_receive_frames()
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
transport.transcription_settings["extra"]["punctuate"] = True
|
||||
|
||||
@@ -0,0 +1,83 @@
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
from dailyai.conversation_wrappers import InterruptibleConversationWrapper
|
||||
|
||||
from dailyai.queue_frame import StartStreamQueueFrame, TextQueueFrame
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from dailyai.services.open_ai_services import OpenAILLMService
|
||||
from dailyai.services.deepgram_ai_services import DeepgramTTSService
|
||||
from dailyai.services.ai_services import FrameLogger
|
||||
from dailyai.services.groq_ai_services import GroqLLMService
|
||||
|
||||
from examples.foundational.support.runner import configure
|
||||
|
||||
|
||||
async def main(room_url: str, token):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
context = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
duration_minutes=5,
|
||||
start_transcription=True,
|
||||
mic_enabled=True,
|
||||
mic_sample_rate=16000,
|
||||
camera_enabled=False,
|
||||
# TODO-CB: Should this be VAD enabled or something?
|
||||
speaker_enabled=True,
|
||||
context=context
|
||||
)
|
||||
|
||||
# llm = AzureLLMService(
|
||||
# api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
|
||||
# endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
|
||||
# model=os.getenv("AZURE_CHATGPT_MODEL"),
|
||||
# context=context)
|
||||
llm = OpenAILLMService(
|
||||
context=context, api_key=os.getenv("OPENAI_CHATGPT_API_KEY"))
|
||||
# llm = GroqLLMService(api_key=os.getenv("GROQ_API_KEY"), context=context)
|
||||
# tts = AzureTTSService(
|
||||
# api_key=os.getenv("AZURE_SPEECH_API_KEY"),
|
||||
# region=os.getenv("AZURE_SPEECH_REGION"))
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"))
|
||||
# tts = DeepgramTTSService(aiohttp_session=session, api_key=os.getenv("DEEPGRAM_API_KEY"), voice=os.getenv("DEEPGRAM_VOICE"))
|
||||
fl = FrameLogger("just outside the innermost layer")
|
||||
|
||||
async def run_response(in_frame):
|
||||
await tts.run_to_queue(
|
||||
transport.send_queue,
|
||||
# tma_out.run(
|
||||
llm.run(
|
||||
# tma_in.run(
|
||||
fl.run(
|
||||
[StartStreamQueueFrame(), in_frame]
|
||||
)
|
||||
# )
|
||||
)
|
||||
# ),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
await tts.say("Hi, I'm listening!", transport.send_queue)
|
||||
|
||||
transport.transcription_settings["extra"]["endpointing"] = True
|
||||
transport.transcription_settings["extra"]["punctuate"] = True
|
||||
await asyncio.gather(transport.run(), transport.run_conversation(run_response))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
(url, token) = configure()
|
||||
asyncio.run(main(url, token))
|
||||
@@ -1,24 +1,14 @@
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import logging
|
||||
import os
|
||||
from dailyai.pipeline.aggregators import (
|
||||
LLMAssistantContextAggregator,
|
||||
LLMResponseAggregator,
|
||||
LLMUserContextAggregator,
|
||||
UserResponseAggregator,
|
||||
)
|
||||
from dailyai.conversation_wrappers import InterruptibleConversationWrapper
|
||||
|
||||
from dailyai.pipeline.pipeline import Pipeline
|
||||
from dailyai.services.ai_services import FrameLogger
|
||||
from dailyai.queue_frame import StartStreamQueueFrame, TextQueueFrame
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.open_ai_services import OpenAILLMService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from examples.support.runner import configure
|
||||
|
||||
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
|
||||
logger = logging.getLogger("dailyai")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
from examples.foundational.support.runner import configure
|
||||
|
||||
|
||||
async def main(room_url: str, token):
|
||||
@@ -32,38 +22,45 @@ async def main(room_url: str, token):
|
||||
mic_enabled=True,
|
||||
mic_sample_rate=16000,
|
||||
camera_enabled=False,
|
||||
vad_enabled=True,
|
||||
)
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
|
||||
)
|
||||
llm = AzureLLMService(
|
||||
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
|
||||
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
|
||||
model=os.getenv("AZURE_CHATGPT_MODEL"))
|
||||
tts = AzureTTSService(
|
||||
api_key=os.getenv("AZURE_SPEECH_API_KEY"),
|
||||
region=os.getenv("AZURE_SPEECH_REGION"))
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_CHATGPT_API_KEY"),
|
||||
model="gpt-4-turbo-preview")
|
||||
|
||||
pipeline = Pipeline([FrameLogger(), llm, FrameLogger(), tts])
|
||||
async def run_response(user_speech, tma_in, tma_out):
|
||||
await tts.run_to_queue(
|
||||
transport.send_queue,
|
||||
tma_out.run(
|
||||
llm.run(
|
||||
tma_in.run(
|
||||
[StartStreamQueueFrame(), TextQueueFrame(user_speech)]
|
||||
)
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
await transport.say("Hi, I'm listening!", tts)
|
||||
await tts.say("Hi, I'm listening!", transport.send_queue)
|
||||
|
||||
async def run_conversation():
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
{"role": "system", "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way."},
|
||||
]
|
||||
|
||||
await transport.run_interruptible_pipeline(
|
||||
pipeline,
|
||||
post_processor=LLMResponseAggregator(messages),
|
||||
pre_processor=UserResponseAggregator(messages),
|
||||
conversation_wrapper = InterruptibleConversationWrapper(
|
||||
frame_generator=transport.get_receive_frames,
|
||||
runner=run_response,
|
||||
interrupt=transport.interrupt,
|
||||
my_participant_id=transport._my_participant_id,
|
||||
llm_messages=messages,
|
||||
)
|
||||
await conversation_wrapper.run_conversation()
|
||||
|
||||
transport.transcription_settings["extra"]["punctuate"] = False
|
||||
await asyncio.gather(transport.run(), run_conversation())
|
||||
|
||||
@@ -1,21 +1,14 @@
|
||||
from typing import Tuple
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
from dailyai.pipeline.aggregators import SentenceAggregator
|
||||
from dailyai.pipeline.pipeline import Pipeline
|
||||
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from dailyai.services.fal_ai_services import FalImageGenService
|
||||
from dailyai.pipeline.frames import AudioFrame, EndFrame, ImageFrame, LLMMessagesQueueFrame, TextFrame
|
||||
from examples.support.runner import configure
|
||||
from dailyai.queue_frame import AudioQueueFrame, ImageQueueFrame
|
||||
|
||||
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
|
||||
logger = logging.getLogger("dailyai")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
from examples.foundational.support.runner import configure
|
||||
|
||||
|
||||
async def main(room_url: str):
|
||||
@@ -29,83 +22,62 @@ async def main(room_url: str):
|
||||
mic_sample_rate=16000,
|
||||
camera_enabled=True,
|
||||
camera_width=1024,
|
||||
camera_height=1024,
|
||||
camera_height=1024
|
||||
)
|
||||
|
||||
llm = AzureLLMService(
|
||||
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
|
||||
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
|
||||
model=os.getenv("AZURE_CHATGPT_MODEL"),
|
||||
)
|
||||
model=os.getenv("AZURE_CHATGPT_MODEL"))
|
||||
tts1 = AzureTTSService(
|
||||
api_key=os.getenv("AZURE_SPEECH_API_KEY"),
|
||||
region=os.getenv("AZURE_SPEECH_REGION"),
|
||||
)
|
||||
region=os.getenv("AZURE_SPEECH_REGION"))
|
||||
tts2 = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id="jBpfuIE2acCO8z3wKNLl",
|
||||
)
|
||||
voice_id="jBpfuIE2acCO8z3wKNLl")
|
||||
dalle = FalImageGenService(
|
||||
image_size="1024x1024",
|
||||
aiohttp_session=session,
|
||||
key_id=os.getenv("FAL_KEY_ID"),
|
||||
key_secret=os.getenv("FAL_KEY_SECRET"),
|
||||
)
|
||||
key_secret=os.getenv("FAL_KEY_SECRET"))
|
||||
|
||||
bot1_messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a stern librarian. You strongly believe that a hot dog is a sandwich. Start by stating this fact in a few sentences, then be prepared to debate this with the user. You shouldn't ever compromise on the fundamental truth that a hot dog is a sandwich. Your responses should only be a few sentences long.",
|
||||
},
|
||||
{"role": "system", "content": "You are a stern librarian. You strongly believe that a hot dog is a sandwich. Start by stating this fact in a few sentences, then be prepared to debate this with the user. You shouldn't ever compromise on the fundamental truth that a hot dog is a sandwich. Your responses should only be a few sentences long."},
|
||||
]
|
||||
bot2_messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a silly cat, and you strongly believe that a hot dog is not a sandwich. Debate this with the user, only responding with a few sentences. Don't ever accept that a hot dog is a sandwich.",
|
||||
},
|
||||
"content": "You are a silly cat, and you strongly believe that a hot dog is not a sandwich. Debate this with the user, only responding with a few sentences. Don't ever accept that a hot dog is a sandwich."},
|
||||
]
|
||||
|
||||
async def get_text_and_audio(messages) -> Tuple[str, bytearray]:
|
||||
"""This function streams text from the LLM and uses the TTS service to convert
|
||||
that text to speech as it's received. """
|
||||
source_queue = asyncio.Queue()
|
||||
sink_queue = asyncio.Queue()
|
||||
sentence_aggregator = SentenceAggregator()
|
||||
pipeline = Pipeline(
|
||||
[llm, sentence_aggregator, tts1], source_queue, sink_queue
|
||||
)
|
||||
|
||||
await source_queue.put(LLMMessagesQueueFrame(messages))
|
||||
await source_queue.put(EndFrame())
|
||||
await pipeline.run_pipeline()
|
||||
|
||||
message = ""
|
||||
all_audio = bytearray()
|
||||
while sink_queue.qsize():
|
||||
frame = sink_queue.get_nowait()
|
||||
if isinstance(frame, TextFrame):
|
||||
message += frame.text
|
||||
elif isinstance(frame, AudioFrame):
|
||||
all_audio.extend(frame.data)
|
||||
|
||||
return (message, all_audio)
|
||||
|
||||
async def get_bot1_statement():
|
||||
message, audio = await get_text_and_audio(bot1_messages)
|
||||
# Run the LLMs synchronously for the back-and-forth
|
||||
bot1_msg = await llm.run_llm(bot1_messages)
|
||||
print(f"bot1_msg: {bot1_msg}")
|
||||
if bot1_msg:
|
||||
bot1_messages.append({"role": "assistant", "content": bot1_msg})
|
||||
bot2_messages.append({"role": "user", "content": bot1_msg})
|
||||
|
||||
bot1_messages.append({"role": "assistant", "content": message})
|
||||
bot2_messages.append({"role": "user", "content": message})
|
||||
all_audio = bytearray()
|
||||
async for audio in tts1.run_tts(bot1_msg):
|
||||
all_audio.extend(audio)
|
||||
|
||||
return audio
|
||||
return all_audio
|
||||
|
||||
async def get_bot2_statement():
|
||||
message, audio = await get_text_and_audio(bot2_messages)
|
||||
# Run the LLMs synchronously for the back-and-forth
|
||||
bot2_msg = await llm.run_llm(bot2_messages)
|
||||
print(f"bot2_msg: {bot2_msg}")
|
||||
if bot2_msg:
|
||||
bot2_messages.append({"role": "assistant", "content": bot2_msg})
|
||||
bot1_messages.append({"role": "user", "content": bot2_msg})
|
||||
|
||||
bot2_messages.append({"role": "assistant", "content": message})
|
||||
bot1_messages.append({"role": "user", "content": message})
|
||||
all_audio = bytearray()
|
||||
async for audio in tts2.run_tts(bot2_msg):
|
||||
all_audio.extend(audio)
|
||||
|
||||
return audio
|
||||
return all_audio
|
||||
|
||||
async def argue():
|
||||
for i in range(100):
|
||||
@@ -118,8 +90,8 @@ async def main(room_url: str):
|
||||
)
|
||||
await transport.send_queue.put(
|
||||
[
|
||||
ImageFrame(None, image_data1[1]),
|
||||
AudioFrame(audio1),
|
||||
ImageQueueFrame(None, image_data1[1]),
|
||||
AudioQueueFrame(audio1),
|
||||
]
|
||||
)
|
||||
|
||||
@@ -130,8 +102,8 @@ async def main(room_url: str):
|
||||
)
|
||||
await transport.send_queue.put(
|
||||
[
|
||||
ImageFrame(None, image_data2[1]),
|
||||
AudioFrame(audio2),
|
||||
ImageQueueFrame(None, image_data2[1]),
|
||||
AudioQueueFrame(audio2),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@@ -1,41 +1,36 @@
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from PIL import Image
|
||||
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.open_ai_services import OpenAILLMService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from dailyai.pipeline.aggregators import (
|
||||
LLMUserContextAggregator,
|
||||
LLMAssistantContextAggregator,
|
||||
)
|
||||
from dailyai.pipeline.frames import (
|
||||
Frame,
|
||||
TextFrame,
|
||||
ImageFrame,
|
||||
SpriteFrame,
|
||||
from dailyai.queue_aggregators import LLMUserContextAggregator, LLMAssistantContextAggregator
|
||||
from dailyai.queue_frame import (
|
||||
QueueFrame,
|
||||
TextQueueFrame,
|
||||
ImageQueueFrame,
|
||||
SpriteQueueFrame,
|
||||
TranscriptionQueueFrame,
|
||||
)
|
||||
from dailyai.services.ai_services import AIService
|
||||
from examples.support.runner import configure
|
||||
|
||||
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
|
||||
logger = logging.getLogger("dailyai")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
from examples.foundational.support.runner import configure
|
||||
|
||||
|
||||
sprites = {}
|
||||
image_files = [
|
||||
"sc-default.png",
|
||||
"sc-talk.png",
|
||||
"sc-listen-1.png",
|
||||
"sc-think-1.png",
|
||||
"sc-think-2.png",
|
||||
"sc-think-3.png",
|
||||
"sc-think-4.png",
|
||||
'sc-default.png',
|
||||
'sc-talk.png',
|
||||
'sc-listen-1.png',
|
||||
'sc-think-1.png',
|
||||
'sc-think-2.png',
|
||||
'sc-think-3.png',
|
||||
'sc-think-4.png'
|
||||
]
|
||||
|
||||
script_dir = os.path.dirname(__file__)
|
||||
@@ -50,28 +45,26 @@ for file in image_files:
|
||||
sprites[file] = img.tobytes()
|
||||
|
||||
# When the bot isn't talking, show a static image of the cat listening
|
||||
quiet_frame = ImageFrame("", sprites["sc-listen-1.png"])
|
||||
quiet_frame = ImageQueueFrame("", sprites["sc-listen-1.png"])
|
||||
# When the bot is talking, build an animation from two sprites
|
||||
talking_list = [sprites["sc-default.png"], sprites["sc-talk.png"]]
|
||||
talking_list = [sprites['sc-default.png'], sprites['sc-talk.png']]
|
||||
talking = [random.choice(talking_list) for x in range(30)]
|
||||
talking_frame = SpriteFrame(images=talking)
|
||||
talking_frame = SpriteQueueFrame(images=talking)
|
||||
|
||||
# TODO: Support "thinking" as soon as we get a valid transcript, while LLM
|
||||
# is processing
|
||||
# TODO: Support "thinking" as soon as we get a valid transcript, while LLM is processing
|
||||
thinking_list = [
|
||||
sprites["sc-think-1.png"],
|
||||
sprites["sc-think-2.png"],
|
||||
sprites["sc-think-3.png"],
|
||||
sprites["sc-think-4.png"],
|
||||
]
|
||||
thinking_frame = SpriteFrame(images=thinking_list)
|
||||
sprites['sc-think-1.png'],
|
||||
sprites['sc-think-2.png'],
|
||||
sprites['sc-think-3.png'],
|
||||
sprites['sc-think-4.png']]
|
||||
thinking_frame = SpriteQueueFrame(images=thinking_list)
|
||||
|
||||
|
||||
class TranscriptFilter(AIService):
|
||||
def __init__(self, bot_participant_id=None):
|
||||
self.bot_participant_id = bot_participant_id
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
if isinstance(frame, TranscriptionQueueFrame):
|
||||
if frame.participantId != self.bot_participant_id:
|
||||
yield frame
|
||||
@@ -82,11 +75,11 @@ class NameCheckFilter(AIService):
|
||||
self.names = names
|
||||
self.sentence = ""
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
content: str = ""
|
||||
|
||||
# TODO: split up transcription by participant
|
||||
if isinstance(frame, TextFrame):
|
||||
if isinstance(frame, TextQueueFrame):
|
||||
content = frame.text
|
||||
|
||||
self.sentence += content
|
||||
@@ -94,7 +87,7 @@ class NameCheckFilter(AIService):
|
||||
if any(name in self.sentence for name in self.names):
|
||||
out = self.sentence
|
||||
self.sentence = ""
|
||||
yield TextFrame(out)
|
||||
yield TextQueueFrame(out)
|
||||
else:
|
||||
out = self.sentence
|
||||
self.sentence = ""
|
||||
@@ -104,7 +97,7 @@ class ImageSyncAggregator(AIService):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
yield talking_frame
|
||||
yield frame
|
||||
yield quiet_frame
|
||||
@@ -122,7 +115,7 @@ async def main(room_url: str, token):
|
||||
mic_sample_rate=16000,
|
||||
camera_enabled=True,
|
||||
camera_width=720,
|
||||
camera_height=1280,
|
||||
camera_height=1280
|
||||
)
|
||||
transport._mic_enabled = True
|
||||
transport._mic_sample_rate = 16000
|
||||
@@ -130,34 +123,28 @@ async def main(room_url: str, token):
|
||||
transport._camera_width = 720
|
||||
transport._camera_height = 1280
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_CHATGPT_API_KEY"),
|
||||
model="gpt-4-turbo-preview")
|
||||
|
||||
llm = AzureLLMService(
|
||||
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
|
||||
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
|
||||
model=os.getenv("AZURE_CHATGPT_MODEL"))
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id="jBpfuIE2acCO8z3wKNLl",
|
||||
)
|
||||
voice_id="jBpfuIE2acCO8z3wKNLl")
|
||||
isa = ImageSyncAggregator()
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
await tts.say(
|
||||
"Hi! If you want to talk to me, just say 'hey Santa Cat'.",
|
||||
transport.send_queue,
|
||||
)
|
||||
await tts.say("Hi! If you want to talk to me, just say 'hey Santa Cat'.", transport.send_queue)
|
||||
|
||||
async def handle_transcriptions():
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are Santa Cat, a cat that lives in Santa's workshop at the North Pole. You should be clever, and a bit sarcastic. You should also tell jokes every once in a while. Your responses should only be a few sentences long.",
|
||||
},
|
||||
{"role": "system", "content": "You are Santa Cat, a cat that lives in Santa's workshop at the North Pole. You should be clever, and a bit sarcastic. You should also tell jokes every once in a while. Your responses should only be a few sentences long."},
|
||||
]
|
||||
|
||||
tma_in = LLMUserContextAggregator(
|
||||
messages, transport._my_participant_id)
|
||||
messages, transport._my_participant_id
|
||||
)
|
||||
tma_out = LLMAssistantContextAggregator(
|
||||
messages, transport._my_participant_id
|
||||
)
|
||||
@@ -168,10 +155,16 @@ async def main(room_url: str, token):
|
||||
isa.run(
|
||||
tma_out.run(
|
||||
llm.run(
|
||||
tma_in.run(ncf.run(tf.run(transport.get_receive_frames())))
|
||||
tma_in.run(
|
||||
ncf.run(
|
||||
tf.run(
|
||||
transport.get_receive_frames()
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
async def starting_image():
|
||||
|
||||
@@ -5,30 +5,24 @@ import os
|
||||
import wave
|
||||
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.open_ai_services import OpenAILLMService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from dailyai.pipeline.aggregators import (
|
||||
LLMContextAggregator,
|
||||
LLMUserContextAggregator,
|
||||
LLMAssistantContextAggregator,
|
||||
)
|
||||
from dailyai.queue_aggregators import LLMContextAggregator, LLMUserContextAggregator, LLMAssistantContextAggregator
|
||||
from dailyai.services.ai_services import AIService, FrameLogger
|
||||
from dailyai.pipeline.frames import (
|
||||
Frame,
|
||||
AudioFrame,
|
||||
LLMResponseEndFrame,
|
||||
LLMMessagesQueueFrame,
|
||||
)
|
||||
from dailyai.queue_frame import QueueFrame, AudioQueueFrame, LLMResponseEndQueueFrame, LLMMessagesQueueFrame
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from examples.support.runner import configure
|
||||
from examples.foundational.support.runner import configure
|
||||
|
||||
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
|
||||
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s") # or whatever
|
||||
logger = logging.getLogger("dailyai")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
sounds = {}
|
||||
sound_files = ["ding1.wav", "ding2.wav"]
|
||||
sound_files = [
|
||||
'ding1.wav',
|
||||
'ding2.wav'
|
||||
]
|
||||
|
||||
script_dir = os.path.dirname(__file__)
|
||||
|
||||
@@ -46,9 +40,9 @@ class OutboundSoundEffectWrapper(AIService):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
if isinstance(frame, LLMResponseEndFrame):
|
||||
yield AudioFrame(sounds["ding1.wav"])
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
if isinstance(frame, LLMResponseEndQueueFrame):
|
||||
yield AudioQueueFrame(sounds["ding1.wav"])
|
||||
# In case anything else up the stack needs it
|
||||
yield frame
|
||||
else:
|
||||
@@ -59,9 +53,9 @@ class InboundSoundEffectWrapper(AIService):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
if isinstance(frame, LLMMessagesQueueFrame):
|
||||
yield AudioFrame(sounds["ding2.wav"])
|
||||
yield AudioQueueFrame(sounds["ding2.wav"])
|
||||
# In case anything else up the stack needs it
|
||||
yield frame
|
||||
else:
|
||||
@@ -77,34 +71,31 @@ async def main(room_url: str, token):
|
||||
duration_minutes=5,
|
||||
mic_enabled=True,
|
||||
mic_sample_rate=16000,
|
||||
camera_enabled=False,
|
||||
camera_enabled=False
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_CHATGPT_API_KEY"),
|
||||
model="gpt-4-turbo-preview")
|
||||
|
||||
llm = AzureLLMService(
|
||||
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
|
||||
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
|
||||
model=os.getenv("AZURE_CHATGPT_MODEL"))
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id="ErXwobaYiN019PkySvjV",
|
||||
)
|
||||
voice_id="ErXwobaYiN019PkySvjV")
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
await tts.say("Hi, I'm listening!", transport.send_queue)
|
||||
await transport.send_queue.put(AudioFrame(sounds["ding1.wav"]))
|
||||
await transport.send_queue.put(AudioQueueFrame(sounds["ding1.wav"]))
|
||||
|
||||
async def handle_transcriptions():
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
{"role": "system", "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way."},
|
||||
]
|
||||
|
||||
tma_in = LLMUserContextAggregator(
|
||||
messages, transport._my_participant_id)
|
||||
messages, transport._my_participant_id
|
||||
)
|
||||
tma_out = LLMAssistantContextAggregator(
|
||||
messages, transport._my_participant_id
|
||||
)
|
||||
@@ -120,13 +111,15 @@ async def main(room_url: str, token):
|
||||
llm.run(
|
||||
fl2.run(
|
||||
in_sound.run(
|
||||
tma_in.run(transport.get_receive_frames())
|
||||
tma_in.run(
|
||||
transport.get_receive_frames()
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
transport.transcription_settings["extra"]["punctuate"] = True
|
||||
|
||||
@@ -1,97 +0,0 @@
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import logging
|
||||
import os
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from dailyai.pipeline.frames import Frame, LLMMessagesQueueFrame, RequestVideoImageFrame, LLMResponseEndFrame
|
||||
from dailyai.pipeline.pipeline import Pipeline
|
||||
from dailyai.pipeline.frame_processor import FrameProcessor
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from dailyai.services.open_ai_services import OpenAILLMService, OpenAIVisionService
|
||||
from dailyai.services.deepgram_ai_services import DeepgramTTSService
|
||||
from dailyai.services.ai_services import FrameLogger
|
||||
from dailyai.pipeline.aggregators import (
|
||||
LLMAssistantContextAggregator,
|
||||
LLMUserContextAggregator,
|
||||
)
|
||||
from dailyai.pipeline.frames import VideoImageFrame, VisionFrame
|
||||
from examples.support.runner import configure
|
||||
|
||||
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
|
||||
logger = logging.getLogger("dailyai")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
class VideoImageFrameProcessor(FrameProcessor):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
if isinstance(frame, VideoImageFrame):
|
||||
yield VisionFrame("Describe the image in one sentence.", frame.image)
|
||||
else:
|
||||
yield frame
|
||||
|
||||
|
||||
class ImageRefresher(FrameProcessor):
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
if isinstance(frame, LLMResponseEndFrame):
|
||||
yield RequestVideoImageFrame(participantId=None)
|
||||
yield frame
|
||||
else:
|
||||
yield frame
|
||||
|
||||
|
||||
async def main(room_url: str, token):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
duration_minutes=5,
|
||||
start_transcription=True,
|
||||
mic_enabled=True,
|
||||
mic_sample_rate=16000,
|
||||
camera_enabled=False,
|
||||
vad_enabled=True,
|
||||
receive_video=True,
|
||||
receive_video_fps=0
|
||||
)
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_CHATGPT_API_KEY"),
|
||||
model="gpt-4-turbo-preview")
|
||||
|
||||
vs = OpenAIVisionService(api_key=os.getenv("OPENAI_CHATGPT_API_KEY"))
|
||||
vifp = VideoImageFrameProcessor()
|
||||
ir = ImageRefresher()
|
||||
pipeline = Pipeline(
|
||||
processors=[
|
||||
vifp,
|
||||
vs,
|
||||
llm,
|
||||
tts,
|
||||
ir,
|
||||
],
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
await pipeline.queue_frames([RequestVideoImageFrame(participantId=None)])
|
||||
|
||||
transport.transcription_settings["extra"]["endpointing"] = True
|
||||
transport.transcription_settings["extra"]["punctuate"] = True
|
||||
await transport.run(pipeline)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
(url, token) = configure()
|
||||
asyncio.run(main(url, token))
|
||||
@@ -1,13 +1,9 @@
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.whisper_ai_services import WhisperSTTService
|
||||
from examples.support.runner import configure
|
||||
|
||||
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
|
||||
logger = logging.getLogger("dailyai")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
from examples.foundational.support.runner import configure
|
||||
|
||||
|
||||
async def main(room_url: str):
|
||||
@@ -18,7 +14,7 @@ async def main(room_url: str):
|
||||
start_transcription=True,
|
||||
mic_enabled=False,
|
||||
camera_enabled=False,
|
||||
speaker_enabled=True,
|
||||
speaker_enabled=True
|
||||
)
|
||||
|
||||
stt = WhisperSTTService()
|
||||
@@ -32,9 +28,9 @@ async def main(room_url: str):
|
||||
|
||||
async def handle_speaker():
|
||||
await stt.run_to_queue(
|
||||
transcription_output_queue, transport.get_receive_frames()
|
||||
transcription_output_queue,
|
||||
transport.get_receive_frames()
|
||||
)
|
||||
|
||||
await asyncio.gather(transport.run(), handle_speaker(), handle_transcription())
|
||||
|
||||
|
||||
|
||||
@@ -1,16 +1,11 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
import logging
|
||||
import wave
|
||||
from dailyai.pipeline.frames import EndFrame, TranscriptionQueueFrame
|
||||
from dailyai.queue_frame import EndStreamQueueFrame, TranscriptionQueueFrame
|
||||
|
||||
from dailyai.services.local_transport_service import LocalTransportService
|
||||
from dailyai.services.whisper_ai_services import WhisperSTTService
|
||||
|
||||
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
|
||||
logger = logging.getLogger("dailyai")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
async def main(room_url: str):
|
||||
global transport
|
||||
@@ -22,7 +17,7 @@ async def main(room_url: str):
|
||||
camera_enabled=False,
|
||||
speaker_enabled=True,
|
||||
duration_minutes=meeting_duration_minutes,
|
||||
start_transcription=True,
|
||||
start_transcription=True
|
||||
)
|
||||
stt = WhisperSTTService()
|
||||
transcription_output_queue = asyncio.Queue()
|
||||
@@ -35,7 +30,7 @@ async def main(room_url: str):
|
||||
print("got item from queue", item)
|
||||
if isinstance(item, TranscriptionQueueFrame):
|
||||
print(item.text)
|
||||
elif isinstance(item, EndFrame):
|
||||
elif isinstance(item, EndStreamQueueFrame):
|
||||
break
|
||||
print("handle_transcription done")
|
||||
|
||||
@@ -43,7 +38,7 @@ async def main(room_url: str):
|
||||
await stt.run_to_queue(
|
||||
transcription_output_queue, transport.get_receive_frames()
|
||||
)
|
||||
await transcription_output_queue.put(EndFrame())
|
||||
await transcription_output_queue.put(EndStreamQueueFrame())
|
||||
print("handle speaker done.")
|
||||
|
||||
async def run_until_done():
|
||||
@@ -57,11 +52,8 @@ async def main(room_url: str):
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
|
||||
parser.add_argument(
|
||||
"-u",
|
||||
"--url",
|
||||
type=str,
|
||||
required=True,
|
||||
help="URL of the Daily room to join")
|
||||
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
|
||||
)
|
||||
|
||||
args, unknown = parser.parse_known_args()
|
||||
asyncio.run(main(args.url))
|
||||
|
||||
@@ -11,11 +11,8 @@ load_dotenv()
|
||||
def configure():
|
||||
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
|
||||
parser.add_argument(
|
||||
"-u",
|
||||
"--url",
|
||||
type=str,
|
||||
required=False,
|
||||
help="URL of the Daily room to join")
|
||||
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-k",
|
||||
"--apikey",
|
||||
@@ -36,25 +33,20 @@ def configure():
|
||||
if not key:
|
||||
raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
|
||||
|
||||
# Create a meeting token for the given room with an expiration 1 hour in
|
||||
# the future.
|
||||
# Create a meeting token for the given room with an expiration 1 hour in the future.
|
||||
room_name: str = urllib.parse.urlparse(url).path[1:]
|
||||
expiration: float = time.time() + 60 * 60
|
||||
|
||||
res: requests.Response = requests.post(
|
||||
f"https://api.daily.co/v1/meeting-tokens",
|
||||
headers={
|
||||
"Authorization": f"Bearer {key}"},
|
||||
headers={"Authorization": f"Bearer {key}"},
|
||||
json={
|
||||
"properties": {
|
||||
"room_name": room_name,
|
||||
"is_owner": True,
|
||||
"exp": expiration}},
|
||||
"properties": {"room_name": room_name, "is_owner": True, "exp": expiration}
|
||||
},
|
||||
)
|
||||
|
||||
if res.status_code != 200:
|
||||
raise Exception(
|
||||
f"Failed to create meeting token: {res.status_code} {res.text}")
|
||||
raise Exception(f"Failed to create meeting token: {res.status_code} {res.text}")
|
||||
|
||||
token: str = res.json()["token"]
|
||||
|
||||
@@ -7,7 +7,7 @@ import random
|
||||
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.pipeline.frames import Frame, FrameType
|
||||
from dailyai.queue_frame import QueueFrame, FrameType
|
||||
from dailyai.services.fal_ai_services import FalImageGenService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
|
||||
@@ -45,24 +45,21 @@ async def main(room_url: str, token):
|
||||
print(f"finder: {finder}")
|
||||
if finder >= 0:
|
||||
async for audio in tts.run_tts(f"Resetting."):
|
||||
transport.output_queue.put(
|
||||
Frame(FrameType.AUDIO_FRAME, audio))
|
||||
transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio))
|
||||
sentence = ""
|
||||
continue
|
||||
# todo: we could differentiate between transcriptions from
|
||||
# different participants
|
||||
# todo: we could differentiate between transcriptions from different participants
|
||||
sentence += f" {message['text']}"
|
||||
print(f"sentence is now: {sentence}")
|
||||
# TODO: Cache this audio
|
||||
phrase = random.choice(
|
||||
["OK.", "Got it.", "Sure.", "You bet.", "Sure thing."])
|
||||
phrase = random.choice(["OK.", "Got it.", "Sure.", "You bet.", "Sure thing."])
|
||||
async for audio in tts.run_tts(phrase):
|
||||
transport.output_queue.put(Frame(FrameType.AUDIO_FRAME, audio))
|
||||
transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio))
|
||||
img_result = img.run_image_gen(sentence, "1024x1024")
|
||||
awaited_img = await asyncio.gather(img_result)
|
||||
transport.output_queue.put(
|
||||
[
|
||||
Frame(FrameType.IMAGE_FRAME, awaited_img[0][1]),
|
||||
QueueFrame(FrameType.IMAGE_FRAME, awaited_img[0][1]),
|
||||
]
|
||||
)
|
||||
|
||||
@@ -75,7 +72,7 @@ async def main(room_url: str, token):
|
||||
audio_generator = tts.run_tts(
|
||||
f"Hello, {participant['info']['userName']}! Describe an image and I'll create it. To start over, just say 'start over'.")
|
||||
async for audio in audio_generator:
|
||||
transport.output_queue.put(Frame(FrameType.AUDIO_FRAME, audio))
|
||||
transport.output_queue.put(QueueFrame(FrameType.AUDIO_FRAME, audio))
|
||||
|
||||
transport.transcription_settings["extra"]["punctuate"] = False
|
||||
transport.transcription_settings["extra"]["endpointing"] = False
|
||||
@@ -85,11 +82,8 @@ async def main(room_url: str, token):
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
|
||||
parser.add_argument(
|
||||
"-u",
|
||||
"--url",
|
||||
type=str,
|
||||
required=True,
|
||||
help="URL of the Daily room to join")
|
||||
"-u", "--url", type=str, required=True, help="URL of the Daily room to join"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-k",
|
||||
"--apikey",
|
||||
@@ -100,25 +94,20 @@ if __name__ == "__main__":
|
||||
|
||||
args, unknown = parser.parse_known_args()
|
||||
|
||||
# Create a meeting token for the given room with an expiration 1 hour in
|
||||
# the future.
|
||||
# Create a meeting token for the given room with an expiration 1 hour in the future.
|
||||
room_name: str = urllib.parse.urlparse(args.url).path[1:]
|
||||
expiration: float = time.time() + 60 * 60
|
||||
|
||||
res: requests.Response = requests.post(
|
||||
f"https://api.daily.co/v1/meeting-tokens",
|
||||
headers={
|
||||
"Authorization": f"Bearer {args.apikey}"},
|
||||
headers={"Authorization": f"Bearer {args.apikey}"},
|
||||
json={
|
||||
"properties": {
|
||||
"room_name": room_name,
|
||||
"is_owner": True,
|
||||
"exp": expiration}},
|
||||
"properties": {"room_name": room_name, "is_owner": True, "exp": expiration}
|
||||
},
|
||||
)
|
||||
|
||||
if res.status_code != 200:
|
||||
raise Exception(
|
||||
f"Failed to create meeting token: {res.status_code} {res.text}")
|
||||
raise Exception(f"Failed to create meeting token: {res.status_code} {res.text}")
|
||||
|
||||
token: str = res.json()["token"]
|
||||
|
||||
|
||||
@@ -5,12 +5,12 @@ import wave
|
||||
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
|
||||
from dailyai.pipeline.aggregators import LLMContextAggregator
|
||||
from dailyai.queue_aggregators import LLMContextAggregator
|
||||
from dailyai.services.ai_services import AIService, FrameLogger
|
||||
from dailyai.pipeline.frames import Frame, AudioFrame, LLMResponseEndFrame, LLMMessagesQueueFrame
|
||||
from dailyai.queue_frame import QueueFrame, AudioQueueFrame, LLMResponseEndQueueFrame, LLMMessagesQueueFrame
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from examples.support.runner import configure
|
||||
from examples.foundational.support.runner import configure
|
||||
|
||||
sounds = {}
|
||||
sound_files = [
|
||||
@@ -34,9 +34,9 @@ class OutboundSoundEffectWrapper(AIService):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
if isinstance(frame, LLMResponseEndFrame):
|
||||
yield AudioFrame(sounds["ding1.wav"])
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
if isinstance(frame, LLMResponseEndQueueFrame):
|
||||
yield AudioQueueFrame(sounds["ding1.wav"])
|
||||
# In case anything else up the stack needs it
|
||||
yield frame
|
||||
else:
|
||||
@@ -47,9 +47,9 @@ class InboundSoundEffectWrapper(AIService):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
async def process_frame(self, frame: QueueFrame) -> AsyncGenerator[QueueFrame, None]:
|
||||
if isinstance(frame, LLMMessagesQueueFrame):
|
||||
yield AudioFrame(sounds["ding2.wav"])
|
||||
yield AudioQueueFrame(sounds["ding2.wav"])
|
||||
# In case anything else up the stack needs it
|
||||
yield frame
|
||||
else:
|
||||
@@ -79,7 +79,7 @@ async def main(room_url: str, token, phone):
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
await tts.say("Hi, I'm listening!", transport.send_queue)
|
||||
await transport.send_queue.put(AudioFrame(sounds["ding1.wav"]))
|
||||
await transport.send_queue.put(AudioQueueFrame(sounds["ding1.wav"]))
|
||||
|
||||
async def handle_transcriptions():
|
||||
messages = [
|
||||
|
||||
@@ -24,8 +24,7 @@ def get_meeting_token(room_name, daily_api_key, token_expiry):
|
||||
'is_owner': True,
|
||||
'exp': token_expiry}})
|
||||
if res.status_code != 200:
|
||||
return jsonify(
|
||||
{'error': 'Unable to create meeting token', 'detail': res.text}), 500
|
||||
return jsonify({'error': 'Unable to create meeting token', 'detail': res.text}), 500
|
||||
meeting_token = res.json()['token']
|
||||
return meeting_token
|
||||
|
||||
|
||||
@@ -14,16 +14,14 @@ load_dotenv()
|
||||
app = Flask(__name__)
|
||||
CORS(app)
|
||||
|
||||
print(
|
||||
f"I loaded an environment, and my FAL_KEY_ID is {os.getenv('FAL_KEY_ID')}")
|
||||
print(f"I loaded an environment, and my FAL_KEY_ID is {os.getenv('FAL_KEY_ID')}")
|
||||
|
||||
|
||||
def start_bot(bot_path, args=None):
|
||||
daily_api_key = os.getenv("DAILY_API_KEY")
|
||||
api_path = os.getenv("DAILY_API_PATH") or "https://api.daily.co/v1"
|
||||
|
||||
timeout = int(os.getenv("DAILY_ROOM_TIMEOUT")
|
||||
or os.getenv("DAILY_BOT_MAX_DURATION") or 300)
|
||||
timeout = int(os.getenv("DAILY_ROOM_TIMEOUT") or os.getenv("DAILY_BOT_MAX_DURATION") or 300)
|
||||
exp = time.time() + timeout
|
||||
res = requests.post(
|
||||
f"{api_path}/rooms",
|
||||
@@ -61,13 +59,14 @@ def start_bot(bot_path, args=None):
|
||||
extra_args = ""
|
||||
|
||||
proc = subprocess.Popen(
|
||||
[f"python {bot_path} -u {room_url} -t {meeting_token} -k {daily_api_key} {extra_args}"],
|
||||
[
|
||||
f"python {bot_path} -u {room_url} -t {meeting_token} -k {daily_api_key} {extra_args}"
|
||||
],
|
||||
shell=True,
|
||||
bufsize=1,
|
||||
)
|
||||
|
||||
# Don't return until the bot has joined the room, but wait for at most 2
|
||||
# seconds.
|
||||
# Don't return until the bot has joined the room, but wait for at most 2 seconds.
|
||||
attempts = 0
|
||||
while attempts < 20:
|
||||
time.sleep(0.1)
|
||||
@@ -83,13 +82,11 @@ def start_bot(bot_path, args=None):
|
||||
# Additional client config
|
||||
config = {}
|
||||
if os.getenv("CLIENT_VAD_TIMEOUT_SEC"):
|
||||
config['vad_timeout_sec'] = float(
|
||||
os.getenv("DAILY_CLIENT_VAD_TIMEOUT_SEC"))
|
||||
config['vad_timeout_sec'] = float(os.getenv("DAILY_CLIENT_VAD_TIMEOUT_SEC"))
|
||||
else:
|
||||
config['vad_timeout_sec'] = 1.5
|
||||
|
||||
# return jsonify({"room_url": room_url, "token": meeting_token, "config":
|
||||
# config}), 200
|
||||
# return jsonify({"room_url": room_url, "token": meeting_token, "config": config}), 200
|
||||
return redirect(room_url, code=301)
|
||||
|
||||
|
||||
|
||||
|
Before Width: | Height: | Size: 1.1 MiB |
|
Before Width: | Height: | Size: 1.1 MiB |
|
Before Width: | Height: | Size: 759 KiB |
|
Before Width: | Height: | Size: 884 KiB |
|
Before Width: | Height: | Size: 876 KiB |
|
Before Width: | Height: | Size: 881 KiB |
|
Before Width: | Height: | Size: 866 KiB |
|
Before Width: | Height: | Size: 874 KiB |
|
Before Width: | Height: | Size: 882 KiB |
|
Before Width: | Height: | Size: 885 KiB |
|
Before Width: | Height: | Size: 888 KiB |
|
Before Width: | Height: | Size: 890 KiB |
|
Before Width: | Height: | Size: 898 KiB |
|
Before Width: | Height: | Size: 836 KiB |
|
Before Width: | Height: | Size: 903 KiB |
|
Before Width: | Height: | Size: 908 KiB |
|
Before Width: | Height: | Size: 908 KiB |
|
Before Width: | Height: | Size: 905 KiB |
|
Before Width: | Height: | Size: 903 KiB |
|
Before Width: | Height: | Size: 866 KiB |
|
Before Width: | Height: | Size: 849 KiB |
|
Before Width: | Height: | Size: 866 KiB |
|
Before Width: | Height: | Size: 866 KiB |
|
Before Width: | Height: | Size: 864 KiB |
|
Before Width: | Height: | Size: 858 KiB |
|
Before Width: | Height: | Size: 875 KiB |
|
Before Width: | Height: | Size: 881 KiB |
@@ -1,144 +0,0 @@
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import logging
|
||||
import os
|
||||
from PIL import Image
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from dailyai.pipeline.aggregators import (
|
||||
LLMResponseAggregator,
|
||||
UserResponseAggregator,
|
||||
)
|
||||
from dailyai.pipeline.frames import (
|
||||
ImageFrame,
|
||||
SpriteFrame,
|
||||
Frame,
|
||||
LLMResponseEndFrame,
|
||||
LLMMessagesQueueFrame,
|
||||
AudioFrame,
|
||||
PipelineStartedFrame,
|
||||
)
|
||||
from dailyai.services.ai_services import AIService
|
||||
from dailyai.pipeline.pipeline import Pipeline
|
||||
from dailyai.services.daily_transport_service import DailyTransportService
|
||||
from dailyai.services.open_ai_services import OpenAILLMService
|
||||
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
|
||||
from examples.support.runner import configure
|
||||
|
||||
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
|
||||
logger = logging.getLogger("dailyai")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
sprites = []
|
||||
|
||||
script_dir = os.path.dirname(__file__)
|
||||
|
||||
for i in range(1, 26):
|
||||
# Build the full path to the image file
|
||||
full_path = os.path.join(script_dir, f"assets/robot0{i}.png")
|
||||
# Get the filename without the extension to use as the dictionary key
|
||||
# Open the image and convert it to bytes
|
||||
with Image.open(full_path) as img:
|
||||
sprites.append(img.tobytes())
|
||||
|
||||
flipped = sprites[::-1]
|
||||
sprites.extend(flipped)
|
||||
# When the bot isn't talking, show a static image of the cat listening
|
||||
quiet_frame = ImageFrame("", sprites[0])
|
||||
talking_frame = SpriteFrame(images=sprites)
|
||||
|
||||
|
||||
class TalkingAnimation(AIService):
|
||||
"""
|
||||
This class starts a talking animation when it receives an first AudioFrame,
|
||||
and then returns to a "quiet" sprite when it sees a LLMResponseEndFrame.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._is_talking = False
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
if isinstance(frame, AudioFrame):
|
||||
if not self._is_talking:
|
||||
yield talking_frame
|
||||
yield frame
|
||||
self._is_talking = True
|
||||
else:
|
||||
yield frame
|
||||
elif isinstance(frame, LLMResponseEndFrame):
|
||||
yield quiet_frame
|
||||
yield frame
|
||||
self._is_talking = False
|
||||
else:
|
||||
yield frame
|
||||
|
||||
|
||||
class AnimationInitializer(AIService):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
|
||||
if isinstance(frame, PipelineStartedFrame):
|
||||
yield quiet_frame
|
||||
yield frame
|
||||
else:
|
||||
yield frame
|
||||
|
||||
|
||||
async def main(room_url: str, token):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
transport = DailyTransportService(
|
||||
room_url,
|
||||
token,
|
||||
"Chatbot",
|
||||
duration_minutes=5,
|
||||
start_transcription=True,
|
||||
mic_enabled=True,
|
||||
mic_sample_rate=16000,
|
||||
camera_enabled=True,
|
||||
camera_width=1024,
|
||||
camera_height=576,
|
||||
vad_enabled=True,
|
||||
)
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id="pNInz6obpgDQGcFmaJgB",
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_CHATGPT_API_KEY"),
|
||||
model="gpt-4-turbo-preview")
|
||||
|
||||
ta = TalkingAnimation()
|
||||
ai = AnimationInitializer()
|
||||
pipeline = Pipeline([ai, llm, tts, ta])
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself.",
|
||||
},
|
||||
]
|
||||
|
||||
@transport.event_handler("on_first_other_participant_joined")
|
||||
async def on_first_other_participant_joined(transport):
|
||||
await pipeline.queue_frames([LLMMessagesQueueFrame(messages)])
|
||||
|
||||
async def run_conversation():
|
||||
|
||||
await transport.run_interruptible_pipeline(
|
||||
pipeline,
|
||||
post_processor=LLMResponseAggregator(messages),
|
||||
pre_processor=UserResponseAggregator(messages),
|
||||
)
|
||||
|
||||
transport.transcription_settings["extra"]["endpointing"] = True
|
||||
transport.transcription_settings["extra"]["punctuate"] = True
|
||||
await asyncio.gather(transport.run(), run_conversation())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
(url, token) = configure()
|
||||
asyncio.run(main(url, token))
|
||||