Compare commits

...

4 Commits

Author SHA1 Message Date
Ubuntu
c67296a1be sqs Dockerfile 2024-07-21 15:44:31 +00:00
Kwindla Hultman Kramer
85d80d8e0b a couple of comments 2024-07-21 08:06:59 -07:00
Kwindla Hultman Kramer
729d266481 change room timeout 2024-07-21 07:45:11 -07:00
Kwindla Hultman Kramer
d9d22aa4b5 end-to-end working sqs-runner.py 2024-07-21 07:06:31 -07:00
6 changed files with 722 additions and 0 deletions

View File

@@ -0,0 +1,101 @@
import json
import os
import requests
import boto3
import time
from typing import Dict, Any
def get_sqs_client():
if __name__ == "__main__":
# for local testing, load temporary credentials. see notes.md
try:
with open('assume_role_output.json', 'r') as f:
credentials = json.load(f)['Credentials']
session = boto3.Session(
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken']
)
return session.client('sqs', region_name='us-west-2')
except FileNotFoundError:
print("Warning: assume_role_output.json not found. Using default credentials.")
return boto3.client('sqs')
else:
# When running in Lambda, use the role attached to the function
return boto3.client('sqs')
sqs = get_sqs_client()
DAILY_API_KEY = os.environ.get('DAILY_API_KEY')
SQS_QUEUE_URL = 'https://sqs.us-west-2.amazonaws.com/955740203061/khk-sqs-launch-day-demos'
def create_daily_room() -> Dict[str, Any]:
if not DAILY_API_KEY:
raise ValueError("DAILY_API_KEY environment variable is not set")
url = "https://api.daily.co/v1/rooms"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {DAILY_API_KEY}"
}
exp = int(time.time()) + 305
payload = {
"properties": {
"exp": exp,
"eject_at_room_exp": True
}
}
response = requests.post(url, headers=headers, json=payload)
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
return response.json()
def send_to_sqs(room_info: Dict[str, Any]) -> None:
if not SQS_QUEUE_URL:
raise ValueError("SQS_QUEUE_URL environment variable is not set")
if sqs:
sqs.send_message(
QueueUrl=SQS_QUEUE_URL,
MessageBody=json.dumps(room_info)
)
print(f"Message sent to SQS queue: {SQS_QUEUE_URL}")
else:
print(f"Simulated sending message to SQS queue: {SQS_QUEUE_URL}")
print(f"Message body: {json.dumps(room_info, indent=2)}")
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
try:
print("Creating Daily room...")
daily_room = create_daily_room()
print(f"Daily room created: {json.dumps(daily_room, indent=2)}")
print("Sending room info to SQS...")
send_to_sqs(daily_room)
return {
'statusCode': 200,
'body': json.dumps({"room_url": daily_room['url']})
}
except Exception as e:
print(f"Error occurred: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps(f'Error: {str(e)}')
}
if __name__ == "__main__":
# Simulate the Lambda event and context
event = {}
context = None
result = lambda_handler(event, context)
print(f"Lambda handler result: {json.dumps(result, indent=2)}")

View File

@@ -0,0 +1,2 @@
requests
boto3

21
khk/sqs-runner/Dockerfile Normal file
View File

@@ -0,0 +1,21 @@
FROM python:3.10-bullseye
RUN apt-get update
RUN apt-get install -y python3-dev portaudio19-dev
RUN git clone https://github.com/pipecat-ai/pipecat.git
WORKDIR /pipecat
RUN git fetch
RUN git checkout khk/tuesday-sqs
RUN python3 -m venv venv
RUN . venv/bin/activate
RUN pip install -r dev-requirements.txt -r linux-py3.10-requirements.txt
RUN pip install boto3
RUN python -m build
RUN pip install --editable .
EXPOSE 7860
WORKDIR /pipecat/khk/sqs-runner
CMD ["python", "sqs-runner.py"]

172
khk/sqs-runner/bot.py Normal file
View File

@@ -0,0 +1,172 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from loguru import logger
import argparse
import asyncio
import aiohttp
import os
import sys
import time
from typing import Optional
from pydantic import BaseModel, ValidationError
from pipecat.vad.vad_analyzer import VADParams
from pipecat.vad.silero import SileroVADAnalyzer
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.services.openai import OpenAILLMService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.pipeline import Pipeline
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator
)
from helpers import (
ClearableDeepgramTTSService,
AudioVolumeTimer,
TranscriptionTimingLogger
)
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level=os.getenv("LOG_LEVEL", "DEBUG"))
class BotSettings(BaseModel):
room_url: str
room_token: str
bot_name: str = "Pipecat"
prompt: Optional[str] = "You are a helpful assistant."
deepgram_api_key: Optional[str] = os.getenv("DEEPGRAM_API_KEY", None)
deepgram_voice: Optional[str] = os.getenv("DEEPGRAM_VOICE", "aura-asteria-en")
deepgram_tts_base_url: Optional[str] = os.getenv(
"DEEPGRAM_TTS_BASE_URL", "https://api.deepgram.com/v1/speak")
deepgram_stt_base_url: Optional[str] = os.getenv(
"DEEPGRAM_STT_BASE_URL", "https://api.deepgram.com/v1/speak")
openai_api_key: Optional[str] = os.getenv("OPENAI_API_KEY", None),
openai_model: Optional[str] = os.getenv("OPENAI_MODEL", None),
openai_base_url: Optional[str] = os.getenv("OPENAI_BASE_URL", None)
vad_stop_secs: Optional[float] = os.getenv("VAD_STOP_SECS", 0.200)
async def main(settings: BotSettings):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
settings.room_url,
settings.room_token,
settings.bot_name,
DailyParams(
audio_out_sample_rate=44100,
audio_out_enabled=True,
transcription_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(
stop_secs=settings.vad_stop_secs
)),
vad_audio_passthrough=True
)
)
stt = DeepgramSTTService(
name="STT",
api_key=settings.deepgram_api_key,
url=settings.deepgram_stt_base_url
)
# tts = ClearableDeepgramTTSService(
# name="Voice",
# aiohttp_session=session,
# api_key=settings.deepgram_api_key,
# voice=settings.deepgram_voice,
# **({'base_url': url} if (url := settings.deepgram_tts_base_url) else {})
# )
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="a0e99841-438c-4a64-b679-ae501e7d6091", # Barbershop Man
sample_rate=44100,
)
llm = OpenAILLMService(
name="Groq Llama 3 70B",
api_key=settings.openai_api_key,
model=settings.openai_model,
base_url=settings.openai_base_url,
)
messages = [
{
"role": "system",
"content": settings.prompt,
},
]
avt = AudioVolumeTimer()
tl = TranscriptionTimingLogger(avt)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
avt, # Audio volume timer
stt, # Speech-to-text
tl, # Transcription timing logger
tma_in, # User responses
llm, # LLM
tts, # TTS
tma_out, # Assistant spoken responses
transport.output(), # Transport bot output
])
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
))
# When the participant leaves, we exit the bot.
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
# When the first participant joins, the bot should introduce itself.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
# Provide some air whilst tracks subscribe
time.sleep(2)
messages.append(
{
"role": "system",
"content": "Introduce yourself by saying 'hello, I'm FastBot, how can I help you today?'"})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pipecat Bot")
parser.add_argument("-s", "--settings", type=str, required=True, help="Pipecat bot settings")
args, unknown = parser.parse_known_args()
try:
settings = BotSettings.model_validate_json(args.settings)
print(f"settings: {settings.json()}")
asyncio.run(main(settings))
except ValidationError as e:
print(e)

267
khk/sqs-runner/helpers.py Normal file
View File

@@ -0,0 +1,267 @@
from loguru import logger
import asyncio
import math
import struct
import time
from dataclasses import dataclass, field
from typing import List
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.frames.frames import (
Frame,
AudioRawFrame,
InterimTranscriptionFrame,
TranscriptionFrame,
TextFrame,
StartInterruptionFrame,
LLMFullResponseStartFrame,
TTSStoppedFrame,
MetricsFrame
)
from pipecat.vad.vad_analyzer import VADAnalyzer, VADState
from pipecat.services.deepgram import DeepgramTTSService
from pipecat.services.openai import OpenAILLMContext, OpenAILLMContextFrame
class GreedyLLMAggregator(FrameProcessor):
def __init__(self, context: OpenAILLMContext = None, **kwargs):
super().__init__(**kwargs)
self.context: OpenAILLMContext = context if context else OpenAILLMContext()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
logger.debug(f"{frame}")
try:
if isinstance(frame, InterimTranscriptionFrame):
return
if isinstance(frame, TranscriptionFrame):
# append transcribed text to last "user" frame
if self.context.messages and self.context.messages[-1]["role"] == "user":
last_frame = self.context.messages.pop()
else:
last_frame = {"role": "user", "content": ""}
last_frame["content"] += " " + frame.text
self.context.messages.append(last_frame)
oai_context_frame = OpenAILLMContextFrame(context=self.context)
logger.debug(f"pushing frame {oai_context_frame}")
await self.push_frame(oai_context_frame)
return
await self.push_frame(frame, direction)
except Exception as e:
logger.debug(f"error: {e}")
class ClearableDeepgramTTSService(DeepgramTTSService):
def __init___(self, **kwargs):
super().__init(**kwargs)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, StartInterruptionFrame):
self._current_sentence = ""
@dataclass
class BufferedSentence:
audio_frames: List[AudioRawFrame] = field(default_factory=list)
text_frame: TextFrame = None
class VADGate(FrameProcessor):
def __init__(
self,
vad_analyzer: VADAnalyzer = None,
context: OpenAILLMContext = None,
**kwargs):
super().__init__(**kwargs)
self.vad_analyzer = vad_analyzer
self.context = context
self._audio_pusher_task = None
self._expect_text_frame_next = False
self._sentences: List[BufferedSentence] = []
# queue output from tts one sentence at a time. associate a buffer of audio frames with the content of
# each text frame.
#
# start a coroutine to service the queue and send sentences down the pipeline when possible.
# 1. do not send anything when we are not in VADState.QUIET
# 2. if we are in VADState.QUIET, send a sentence, estimate how long it will take for that sentence
# to output, sleep until it's time to send another sentence
# 3. each time we send a sentence, append it to the conversation context
# 3. when the sentence buffer becomes empty, cancel the coroutine
# 4. if we get a new LLMFullResponse, treat that as a cancellation, too
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
try:
# A TTSService will emit a series of AudioRawFrame objects, then a TTSStoppedFrame,
# then a TextFrame.
if self._expect_text_frame_next:
self._expect_text_frame_next = False
if isinstance(frame, TextFrame):
self._sentences[-1].text_frame = frame
else:
logger.debug(f"expected a text frame, but received {frame}")
await self.push_frame(frame, direction)
return
else:
if isinstance(frame, TextFrame):
logger.error(f"XXXXXXXXXXXXXXXXXXX received a text frame, wasn't expecting it.")
if isinstance(frame, AudioRawFrame):
# if our buffer is empty or has a "finished" sentence at the end,
# then we need to start buffering a new sentence
if not self._sentences or self._sentences[-1].text_frame:
self._sentences.append(BufferedSentence())
self._sentences[-1].audio_frames.append(frame)
await self.maybe_start_audio_pusher_task()
return
if isinstance(frame, TTSStoppedFrame):
self._expect_text_frame_next = True
await self.push_frame(frame, direction)
return
# There are two ways we can be interrupted. During greedy inference, a new
# LLM response can start. Or, during playout, we can get a traditional
# user interruption frame.
if (isinstance(frame, LLMFullResponseStartFrame) or
isinstance(frame, StartInterruptionFrame)):
logger.debug(f"{frame} - Handle interruption in VADGate")
self._sentences = []
if self._audio_pusher_task:
self._audio_pusher_task.cancel()
self._audio_pusher_task = None
await self.push_frame(frame, direction)
return
await self.push_frame(frame, direction)
except Exception as e:
logger.debug(f"error: {e}")
async def maybe_start_audio_pusher_task(self):
try:
if self._audio_pusher_task:
return
self._audio_pusher_task = self.get_event_loop().create_task(self.push_audio())
except Exception as e:
logger.debug(f"Exception {e}")
async def push_audio(self):
try:
while True:
if not self._sentences:
await asyncio.sleep(0.01)
continue
if self.vad_analyzer._vad_state != VADState.QUIET:
await asyncio.sleep(0.01)
continue
# we only want to push completed sentence buffers
if not self._sentences[0].text_frame:
await asyncio.sleep(0.01)
continue
s = self._sentences.pop(0)
if not s.audio_frames:
continue
sample_rate = s.audio_frames[0].sample_rate
duration = 0
logger.debug(f"Pushing {len(s.audio_frames)} audio frames")
for frame in s.audio_frames:
await self.push_frame(frame)
# assume linear16 encoding (2 bytes per sample). todo: add some more
# metadata to AudioRawFrame, maybe
duration += (len(frame.audio) / 2 / frame.num_channels) / sample_rate
await asyncio.sleep(duration - 20 / 1000)
if self.context:
logger.debug(f"Appending assistant message to context: [{s.text_frame.text}]")
self.context.messages.append(
{"role": "assistant", "content": s.text_frame.text}
)
await self.push_frame(s.text_frame)
except Exception as e:
logger.debug(f"Exception {e}")
class TranscriptionTimingLogger(FrameProcessor):
def __init__(self, avt):
super().__init__()
self.name = "Transcription"
self._avt = avt
async def process_frame(self, frame: Frame, direction: FrameDirection):
try:
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
elapsed = time.time() - self._avt.last_transition_ts
logger.debug(f"Transcription TTF: {elapsed}")
await self.push_frame(MetricsFrame(ttfb={self.name: elapsed}))
await self.push_frame(frame, direction)
except Exception as e:
logger.debug(f"Exception {e}")
class AudioVolumeTimer(FrameProcessor):
def __init__(self):
super().__init__()
self.last_transition_ts = 0
self._prev_volume = -80
self._speech_volume_threshold = -50
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, AudioRawFrame):
volume = self.calculate_volume(frame)
# print(f"Audio volume: {volume:.2f} dB")
if (volume >= self._speech_volume_threshold and
self._prev_volume < self._speech_volume_threshold):
# logger.debug("transition above speech volume threshold")
self.last_transition_ts = time.time()
elif (volume < self._speech_volume_threshold and
self._prev_volume >= self._speech_volume_threshold):
# logger.debug("transition below non-speech volume threshold")
self.last_transition_ts = time.time()
self._prev_volume = volume
await self.push_frame(frame, direction)
def calculate_volume(self, frame: AudioRawFrame) -> float:
if frame.num_channels != 1:
raise ValueError(f"Expected 1 channel, got {frame.num_channels}")
# Unpack audio data into 16-bit integers
fmt = f"{len(frame.audio) // 2}h"
audio_samples = struct.unpack(fmt, frame.audio)
# Calculate RMS
sum_squares = sum(sample**2 for sample in audio_samples)
rms = math.sqrt(sum_squares / len(audio_samples))
# Convert RMS to decibels (dB)
# Reference: maximum value for 16-bit audio is 32767
if rms > 0:
db = 20 * math.log10(rms / 32767)
else:
db = -96 # Minimum value (almost silent)
return db

View File

@@ -0,0 +1,159 @@
#
# requires AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables
#
import boto3
import json
import subprocess
import signal
import os
import time
from pydantic import BaseModel, ValidationError
from typing import Optional
from bot import BotSettings
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams
from dotenv import load_dotenv
load_dotenv(override=True)
# SQS queue URL
QUEUE_URL = 'https://sqs.us-west-2.amazonaws.com/955740203061/khk-sqs-launch-day-demos'
# The program to be spawned
SUBPROCESS_PROGRAM = 'your_subprocess_program.py'
# ------------ Configuration ------------ #
MAX_SESSION_TIME = 5 * 60 # 5 minutes
REQUIRED_ENV_VARS = ['DAILY_API_KEY', 'CARTESIA_API_KEY']
daily_rest_helper = DailyRESTHelper(
os.getenv("DAILY_API_KEY", ""),
os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'))
class RunnerSettings(BaseModel):
prompt: Optional[
str] = "You are a fast, low-latency chatbot. Your goal is to demonstrate voice-driven AI capabilities at human-like speeds. The technology powering you is Daily for transport, Groq for AI inference, Llama 3 (70-B version) LLM, and Deepgram for speech-to-text and text-to-speech. You are running on servers in Oregon. Respond to what the user said in a creative and helpful way, but keep responses short and legible. Ensure responses contain only words. Check again that you have not included special characters other than '?' or '!'."
deepgram_voice: Optional[str] = os.getenv("DEEPGRAM_VOICE")
openai_model: Optional[str] = os.getenv("OPENAI_MODEL", "gpt-4o")
openai_api_key: Optional[str] = os.getenv("OPENAI_API_KEY")
test: Optional[bool] = None
# ----------------- API ----------------- #
def setup_sqs():
"""Set up the SQS client."""
return boto3.client(
'sqs',
region_name='us-west-2',
# use an iam user instead of role because passing a role into an eks pod requires
# adding an add-on and we don't want to change the eks cluster configuration if we
# can avoid it
aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY')
)
def receive_message(sqs):
"""Receive a message from the SQS queue."""
response = sqs.receive_message(
QueueUrl=QUEUE_URL,
MaxNumberOfMessages=1,
WaitTimeSeconds=20 # Long polling
)
messages = response.get('Messages', [])
if messages:
return messages[0]
return None
def delete_message(sqs, receipt_handle):
"""Delete a message from the queue after processing."""
sqs.delete_message(
QueueUrl=QUEUE_URL,
ReceiptHandle=receipt_handle
)
def run_bot(room_url):
runner_settings = RunnerSettings()
# Check passed room URL exists, we should assume that it already has a sip set up
try:
room: DailyRoomObject = daily_rest_helper.get_room_from_url(room_url)
except Exception:
raise HTTPException(
status_code=500, detail=f"Room not found: {room_url}")
# Give the agent a token to join the session
token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
if not room or not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room_url}")
# Spawn a new agent, and join the user session
try:
bot_settings = BotSettings(
room_url=room.url,
room_token=token,
prompt=runner_settings.prompt,
deepgram_voice=runner_settings.deepgram_voice,
openai_model=runner_settings.openai_model,
openai_api_key=runner_settings.openai_api_key,
)
bot_settings_str = bot_settings.model_dump_json(exclude_none=True)
process = subprocess.Popen(
[f"python3 -m bot -s '{bot_settings_str}'"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)))
start_time = time.time()
while time.time() - start_time < (MAX_SESSION_TIME + 10):
if process.poll() is not None:
# process has finished
print("BOT EXITED BEFORE TIMEOUT")
return
time.sleep(1)
# process did not exit. need to kill -9 it
print("KILLING BOT PROCESS")
os.kill(process.pid, signal.SIGKILL)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
def main():
print("sqs-runner.py - started.")
sqs = setup_sqs()
while True:
print("sqs-runner.py - polling")
message = receive_message(sqs)
if message:
delete_message(sqs, message['ReceiptHandle'])
message_body = json.loads(message['Body'])
print(f"Received message. {message_body}")
run_bot(message_body['url'])
if __name__ == "__main__":
# Check environment variables
for env_var in REQUIRED_ENV_VARS:
if env_var not in os.environ:
raise Exception(f"Missing environment variable: {env_var}.")
try:
main()
except KeyboardInterrupt:
print("Pipecat runner shutting down...")