Compare commits
4 Commits
hush/spell
...
khk/tuesda
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f895f30495 | ||
|
|
85d80d8e0b | ||
|
|
729d266481 | ||
|
|
d9d22aa4b5 |
146
khk/lambda/lambda_function.py
Normal file
146
khk/lambda/lambda_function.py
Normal file
@@ -0,0 +1,146 @@
|
||||
import json
|
||||
import os
|
||||
import requests
|
||||
import boto3
|
||||
import time
|
||||
from typing import Dict, Any
|
||||
|
||||
|
||||
def get_sqs_client():
|
||||
if __name__ == "__main__":
|
||||
# for local testing, load temporary credentials. see notes.md
|
||||
try:
|
||||
with open('assume_role_output.json', 'r') as f:
|
||||
credentials = json.load(f)['Credentials']
|
||||
|
||||
session = boto3.Session(
|
||||
aws_access_key_id=credentials['AccessKeyId'],
|
||||
aws_secret_access_key=credentials['SecretAccessKey'],
|
||||
aws_session_token=credentials['SessionToken']
|
||||
)
|
||||
return session.client('sqs', region_name='us-west-2')
|
||||
except FileNotFoundError:
|
||||
print("Warning: assume_role_output.json not found. Using default credentials.")
|
||||
return boto3.client('sqs')
|
||||
else:
|
||||
# When running in Lambda, use the role attached to the function
|
||||
return boto3.client('sqs')
|
||||
|
||||
|
||||
sqs = get_sqs_client()
|
||||
DAILY_API_KEY = os.environ.get('DAILY_API_KEY')
|
||||
SQS_QUEUE_URL = 'https://sqs.us-west-2.amazonaws.com/955740203061/khk-sqs-launch-day-demos'
|
||||
|
||||
|
||||
def create_daily_room() -> Dict[str, Any]:
|
||||
if not DAILY_API_KEY:
|
||||
raise ValueError("DAILY_API_KEY environment variable is not set")
|
||||
|
||||
url = "https://api.daily.co/v1/rooms"
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": f"Bearer {DAILY_API_KEY}"
|
||||
}
|
||||
|
||||
exp = int(time.time()) + 305
|
||||
payload = {
|
||||
"properties": {
|
||||
"exp": exp,
|
||||
"eject_at_room_exp": True
|
||||
}
|
||||
}
|
||||
|
||||
response = requests.post(url, headers=headers, json=payload)
|
||||
response = requests.post(url, headers=headers, json=payload)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
|
||||
def create_token(room_name: str) -> Dict[str, Any]:
|
||||
if not DAILY_API_KEY:
|
||||
raise ValueError("DAILY_API_KEY environment variable is not set")
|
||||
|
||||
url = f"https://api.daily.co/v1/meeting-tokens"
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": f"Bearer {DAILY_API_KEY}"
|
||||
}
|
||||
|
||||
exp = int(time.time()) + 305
|
||||
payload = {
|
||||
"properties": {
|
||||
"exp": exp,
|
||||
"room_name": room_name
|
||||
}
|
||||
}
|
||||
|
||||
response = requests.post(url, headers=headers, json=payload)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
|
||||
def send_to_sqs(room_info: Dict[str, Any]) -> None:
|
||||
if not SQS_QUEUE_URL:
|
||||
raise ValueError("SQS_QUEUE_URL environment variable is not set")
|
||||
|
||||
if sqs:
|
||||
sqs.send_message(
|
||||
QueueUrl=SQS_QUEUE_URL,
|
||||
MessageBody=json.dumps(room_info)
|
||||
)
|
||||
print(f"Message sent to SQS queue: {SQS_QUEUE_URL}")
|
||||
else:
|
||||
print(f"Simulated sending message to SQS queue: {SQS_QUEUE_URL}")
|
||||
print(f"Message body: {json.dumps(room_info, indent=2)}")
|
||||
|
||||
|
||||
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
|
||||
try:
|
||||
path = event.get('rawPath', '')
|
||||
path = path.rstrip('/')
|
||||
|
||||
if path == '/authenticate':
|
||||
print("Creating Daily room...")
|
||||
daily_room = create_daily_room()
|
||||
print(f"Daily room created: {json.dumps(daily_room, indent=2)}")
|
||||
|
||||
token = create_token(daily_room['name'])
|
||||
print(f"Meeting token created: {json.dumps(token, indent=2)}")
|
||||
|
||||
return {
|
||||
'statusCode': 200,
|
||||
'body': json.dumps({"room": daily_room['name'], "token": token['token']})
|
||||
}
|
||||
elif path == '/start_bot':
|
||||
body = json.loads(event['body'])
|
||||
|
||||
print("Sending room info to SQS...")
|
||||
|
||||
send_to_sqs(body)
|
||||
return {
|
||||
'statusCode': 200,
|
||||
'body': json.dumps({"status": "healthy"})
|
||||
}
|
||||
else:
|
||||
return {
|
||||
'statusCode': 200,
|
||||
'body': json.dumps({"success": True})
|
||||
}
|
||||
except Exception as e:
|
||||
print(f"Error occurred: {str(e)}")
|
||||
return {
|
||||
'statusCode': 500,
|
||||
'body': json.dumps(f'Error: {str(e)}')
|
||||
}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Simulate the Lambda event and context
|
||||
# event = {}
|
||||
event = {
|
||||
"rawPath": "/authenticate"
|
||||
}
|
||||
context = None
|
||||
|
||||
result = lambda_handler(event, context)
|
||||
print(f"Lambda handler result: {json.dumps(result, indent=2)}")
|
||||
2
khk/lambda/requirements.txt
Normal file
2
khk/lambda/requirements.txt
Normal file
@@ -0,0 +1,2 @@
|
||||
requests
|
||||
boto3
|
||||
172
khk/sqs-runner/bot.py
Normal file
172
khk/sqs-runner/bot.py
Normal file
@@ -0,0 +1,172 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from loguru import logger
|
||||
import argparse
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel, ValidationError
|
||||
|
||||
from pipecat.vad.vad_analyzer import VADParams
|
||||
from pipecat.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.services.deepgram import DeepgramSTTService
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
|
||||
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantResponseAggregator, LLMUserResponseAggregator
|
||||
)
|
||||
|
||||
from helpers import (
|
||||
ClearableDeepgramTTSService,
|
||||
AudioVolumeTimer,
|
||||
TranscriptionTimingLogger
|
||||
)
|
||||
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level=os.getenv("LOG_LEVEL", "DEBUG"))
|
||||
|
||||
|
||||
class BotSettings(BaseModel):
|
||||
room_url: str
|
||||
room_token: str
|
||||
bot_name: str = "Pipecat"
|
||||
prompt: Optional[str] = "You are a helpful assistant."
|
||||
deepgram_api_key: Optional[str] = os.getenv("DEEPGRAM_API_KEY", None)
|
||||
deepgram_voice: Optional[str] = os.getenv("DEEPGRAM_VOICE", "aura-asteria-en")
|
||||
deepgram_tts_base_url: Optional[str] = os.getenv(
|
||||
"DEEPGRAM_TTS_BASE_URL", "https://api.deepgram.com/v1/speak")
|
||||
deepgram_stt_base_url: Optional[str] = os.getenv(
|
||||
"DEEPGRAM_STT_BASE_URL", "https://api.deepgram.com/v1/speak")
|
||||
openai_api_key: Optional[str] = os.getenv("OPENAI_API_KEY", None),
|
||||
openai_model: Optional[str] = os.getenv("OPENAI_MODEL", None),
|
||||
openai_base_url: Optional[str] = os.getenv("OPENAI_BASE_URL", None)
|
||||
vad_stop_secs: Optional[float] = os.getenv("VAD_STOP_SECS", 0.200)
|
||||
|
||||
|
||||
async def main(settings: BotSettings):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
transport = DailyTransport(
|
||||
settings.room_url,
|
||||
settings.room_token,
|
||||
settings.bot_name,
|
||||
DailyParams(
|
||||
audio_out_sample_rate=44100,
|
||||
audio_out_enabled=True,
|
||||
transcription_enabled=False,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(
|
||||
stop_secs=settings.vad_stop_secs
|
||||
)),
|
||||
vad_audio_passthrough=True
|
||||
)
|
||||
)
|
||||
|
||||
stt = DeepgramSTTService(
|
||||
name="STT",
|
||||
api_key=settings.deepgram_api_key,
|
||||
url=settings.deepgram_stt_base_url
|
||||
)
|
||||
|
||||
# tts = ClearableDeepgramTTSService(
|
||||
# name="Voice",
|
||||
# aiohttp_session=session,
|
||||
# api_key=settings.deepgram_api_key,
|
||||
# voice=settings.deepgram_voice,
|
||||
# **({'base_url': url} if (url := settings.deepgram_tts_base_url) else {})
|
||||
# )
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="a0e99841-438c-4a64-b679-ae501e7d6091", # Barbershop Man
|
||||
sample_rate=44100,
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
name="Groq Llama 3 70B",
|
||||
api_key=settings.openai_api_key,
|
||||
model=settings.openai_model,
|
||||
base_url=settings.openai_base_url,
|
||||
)
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": settings.prompt,
|
||||
},
|
||||
]
|
||||
|
||||
avt = AudioVolumeTimer()
|
||||
tl = TranscriptionTimingLogger(avt)
|
||||
|
||||
tma_in = LLMUserResponseAggregator(messages)
|
||||
tma_out = LLMAssistantResponseAggregator(messages)
|
||||
|
||||
pipeline = Pipeline([
|
||||
transport.input(), # Transport user input
|
||||
avt, # Audio volume timer
|
||||
stt, # Speech-to-text
|
||||
tl, # Transcription timing logger
|
||||
tma_in, # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
tma_out, # Assistant spoken responses
|
||||
transport.output(), # Transport bot output
|
||||
])
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
))
|
||||
|
||||
# When the participant leaves, we exit the bot.
|
||||
@transport.event_handler("on_participant_left")
|
||||
async def on_participant_left(transport, participant, reason):
|
||||
await task.queue_frame(EndFrame())
|
||||
|
||||
# When the first participant joins, the bot should introduce itself.
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
# Provide some air whilst tracks subscribe
|
||||
time.sleep(2)
|
||||
messages.append(
|
||||
{
|
||||
"role": "system",
|
||||
"content": "Introduce yourself by saying 'hello, I'm FastBot, how can I help you today?'"})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
runner = PipelineRunner()
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Pipecat Bot")
|
||||
parser.add_argument("-s", "--settings", type=str, required=True, help="Pipecat bot settings")
|
||||
|
||||
args, unknown = parser.parse_known_args()
|
||||
|
||||
try:
|
||||
settings = BotSettings.model_validate_json(args.settings)
|
||||
print(f"settings: {settings.json()}")
|
||||
asyncio.run(main(settings))
|
||||
except ValidationError as e:
|
||||
print(e)
|
||||
267
khk/sqs-runner/helpers.py
Normal file
267
khk/sqs-runner/helpers.py
Normal file
@@ -0,0 +1,267 @@
|
||||
from loguru import logger
|
||||
import asyncio
|
||||
import math
|
||||
import struct
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List
|
||||
|
||||
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
AudioRawFrame,
|
||||
InterimTranscriptionFrame,
|
||||
TranscriptionFrame,
|
||||
TextFrame,
|
||||
StartInterruptionFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
TTSStoppedFrame,
|
||||
MetricsFrame
|
||||
)
|
||||
|
||||
from pipecat.vad.vad_analyzer import VADAnalyzer, VADState
|
||||
from pipecat.services.deepgram import DeepgramTTSService
|
||||
from pipecat.services.openai import OpenAILLMContext, OpenAILLMContextFrame
|
||||
|
||||
|
||||
class GreedyLLMAggregator(FrameProcessor):
|
||||
def __init__(self, context: OpenAILLMContext = None, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.context: OpenAILLMContext = context if context else OpenAILLMContext()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
logger.debug(f"{frame}")
|
||||
|
||||
try:
|
||||
if isinstance(frame, InterimTranscriptionFrame):
|
||||
return
|
||||
|
||||
if isinstance(frame, TranscriptionFrame):
|
||||
# append transcribed text to last "user" frame
|
||||
if self.context.messages and self.context.messages[-1]["role"] == "user":
|
||||
last_frame = self.context.messages.pop()
|
||||
else:
|
||||
last_frame = {"role": "user", "content": ""}
|
||||
|
||||
last_frame["content"] += " " + frame.text
|
||||
self.context.messages.append(last_frame)
|
||||
|
||||
oai_context_frame = OpenAILLMContextFrame(context=self.context)
|
||||
logger.debug(f"pushing frame {oai_context_frame}")
|
||||
await self.push_frame(oai_context_frame)
|
||||
return
|
||||
|
||||
await self.push_frame(frame, direction)
|
||||
except Exception as e:
|
||||
logger.debug(f"error: {e}")
|
||||
|
||||
|
||||
class ClearableDeepgramTTSService(DeepgramTTSService):
|
||||
def __init___(self, **kwargs):
|
||||
super().__init(**kwargs)
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, StartInterruptionFrame):
|
||||
self._current_sentence = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class BufferedSentence:
|
||||
audio_frames: List[AudioRawFrame] = field(default_factory=list)
|
||||
text_frame: TextFrame = None
|
||||
|
||||
|
||||
class VADGate(FrameProcessor):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
vad_analyzer: VADAnalyzer = None,
|
||||
context: OpenAILLMContext = None,
|
||||
**kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.vad_analyzer = vad_analyzer
|
||||
self.context = context
|
||||
|
||||
self._audio_pusher_task = None
|
||||
self._expect_text_frame_next = False
|
||||
self._sentences: List[BufferedSentence] = []
|
||||
|
||||
# queue output from tts one sentence at a time. associate a buffer of audio frames with the content of
|
||||
# each text frame.
|
||||
#
|
||||
# start a coroutine to service the queue and send sentences down the pipeline when possible.
|
||||
# 1. do not send anything when we are not in VADState.QUIET
|
||||
# 2. if we are in VADState.QUIET, send a sentence, estimate how long it will take for that sentence
|
||||
# to output, sleep until it's time to send another sentence
|
||||
# 3. each time we send a sentence, append it to the conversation context
|
||||
# 3. when the sentence buffer becomes empty, cancel the coroutine
|
||||
# 4. if we get a new LLMFullResponse, treat that as a cancellation, too
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
try:
|
||||
|
||||
# A TTSService will emit a series of AudioRawFrame objects, then a TTSStoppedFrame,
|
||||
# then a TextFrame.
|
||||
|
||||
if self._expect_text_frame_next:
|
||||
self._expect_text_frame_next = False
|
||||
if isinstance(frame, TextFrame):
|
||||
self._sentences[-1].text_frame = frame
|
||||
else:
|
||||
logger.debug(f"expected a text frame, but received {frame}")
|
||||
await self.push_frame(frame, direction)
|
||||
return
|
||||
else:
|
||||
if isinstance(frame, TextFrame):
|
||||
logger.error(f"XXXXXXXXXXXXXXXXXXX received a text frame, wasn't expecting it.")
|
||||
|
||||
if isinstance(frame, AudioRawFrame):
|
||||
# if our buffer is empty or has a "finished" sentence at the end,
|
||||
# then we need to start buffering a new sentence
|
||||
if not self._sentences or self._sentences[-1].text_frame:
|
||||
self._sentences.append(BufferedSentence())
|
||||
self._sentences[-1].audio_frames.append(frame)
|
||||
await self.maybe_start_audio_pusher_task()
|
||||
return
|
||||
|
||||
if isinstance(frame, TTSStoppedFrame):
|
||||
self._expect_text_frame_next = True
|
||||
await self.push_frame(frame, direction)
|
||||
return
|
||||
|
||||
# There are two ways we can be interrupted. During greedy inference, a new
|
||||
# LLM response can start. Or, during playout, we can get a traditional
|
||||
# user interruption frame.
|
||||
if (isinstance(frame, LLMFullResponseStartFrame) or
|
||||
isinstance(frame, StartInterruptionFrame)):
|
||||
logger.debug(f"{frame} - Handle interruption in VADGate")
|
||||
self._sentences = []
|
||||
if self._audio_pusher_task:
|
||||
self._audio_pusher_task.cancel()
|
||||
self._audio_pusher_task = None
|
||||
await self.push_frame(frame, direction)
|
||||
return
|
||||
|
||||
await self.push_frame(frame, direction)
|
||||
except Exception as e:
|
||||
logger.debug(f"error: {e}")
|
||||
|
||||
async def maybe_start_audio_pusher_task(self):
|
||||
try:
|
||||
if self._audio_pusher_task:
|
||||
return
|
||||
self._audio_pusher_task = self.get_event_loop().create_task(self.push_audio())
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Exception {e}")
|
||||
|
||||
async def push_audio(self):
|
||||
try:
|
||||
while True:
|
||||
if not self._sentences:
|
||||
await asyncio.sleep(0.01)
|
||||
continue
|
||||
|
||||
if self.vad_analyzer._vad_state != VADState.QUIET:
|
||||
await asyncio.sleep(0.01)
|
||||
continue
|
||||
|
||||
# we only want to push completed sentence buffers
|
||||
if not self._sentences[0].text_frame:
|
||||
await asyncio.sleep(0.01)
|
||||
continue
|
||||
|
||||
s = self._sentences.pop(0)
|
||||
if not s.audio_frames:
|
||||
continue
|
||||
sample_rate = s.audio_frames[0].sample_rate
|
||||
duration = 0
|
||||
logger.debug(f"Pushing {len(s.audio_frames)} audio frames")
|
||||
for frame in s.audio_frames:
|
||||
await self.push_frame(frame)
|
||||
# assume linear16 encoding (2 bytes per sample). todo: add some more
|
||||
# metadata to AudioRawFrame, maybe
|
||||
duration += (len(frame.audio) / 2 / frame.num_channels) / sample_rate
|
||||
await asyncio.sleep(duration - 20 / 1000)
|
||||
if self.context:
|
||||
logger.debug(f"Appending assistant message to context: [{s.text_frame.text}]")
|
||||
self.context.messages.append(
|
||||
{"role": "assistant", "content": s.text_frame.text}
|
||||
)
|
||||
await self.push_frame(s.text_frame)
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Exception {e}")
|
||||
|
||||
|
||||
class TranscriptionTimingLogger(FrameProcessor):
|
||||
def __init__(self, avt):
|
||||
super().__init__()
|
||||
self.name = "Transcription"
|
||||
self._avt = avt
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
try:
|
||||
await super().process_frame(frame, direction)
|
||||
if isinstance(frame, TranscriptionFrame):
|
||||
elapsed = time.time() - self._avt.last_transition_ts
|
||||
logger.debug(f"Transcription TTF: {elapsed}")
|
||||
await self.push_frame(MetricsFrame(ttfb={self.name: elapsed}))
|
||||
|
||||
await self.push_frame(frame, direction)
|
||||
except Exception as e:
|
||||
logger.debug(f"Exception {e}")
|
||||
|
||||
|
||||
class AudioVolumeTimer(FrameProcessor):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.last_transition_ts = 0
|
||||
self._prev_volume = -80
|
||||
self._speech_volume_threshold = -50
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, AudioRawFrame):
|
||||
volume = self.calculate_volume(frame)
|
||||
# print(f"Audio volume: {volume:.2f} dB")
|
||||
if (volume >= self._speech_volume_threshold and
|
||||
self._prev_volume < self._speech_volume_threshold):
|
||||
# logger.debug("transition above speech volume threshold")
|
||||
self.last_transition_ts = time.time()
|
||||
elif (volume < self._speech_volume_threshold and
|
||||
self._prev_volume >= self._speech_volume_threshold):
|
||||
# logger.debug("transition below non-speech volume threshold")
|
||||
self.last_transition_ts = time.time()
|
||||
self._prev_volume = volume
|
||||
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
def calculate_volume(self, frame: AudioRawFrame) -> float:
|
||||
if frame.num_channels != 1:
|
||||
raise ValueError(f"Expected 1 channel, got {frame.num_channels}")
|
||||
|
||||
# Unpack audio data into 16-bit integers
|
||||
fmt = f"{len(frame.audio) // 2}h"
|
||||
audio_samples = struct.unpack(fmt, frame.audio)
|
||||
|
||||
# Calculate RMS
|
||||
sum_squares = sum(sample**2 for sample in audio_samples)
|
||||
rms = math.sqrt(sum_squares / len(audio_samples))
|
||||
|
||||
# Convert RMS to decibels (dB)
|
||||
# Reference: maximum value for 16-bit audio is 32767
|
||||
if rms > 0:
|
||||
db = 20 * math.log10(rms / 32767)
|
||||
else:
|
||||
db = -96 # Minimum value (almost silent)
|
||||
|
||||
return db
|
||||
159
khk/sqs-runner/sqs-runner.py
Normal file
159
khk/sqs-runner/sqs-runner.py
Normal file
@@ -0,0 +1,159 @@
|
||||
#
|
||||
# requires AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables
|
||||
#
|
||||
|
||||
import boto3
|
||||
import json
|
||||
import subprocess
|
||||
import signal
|
||||
import os
|
||||
import time
|
||||
|
||||
from pydantic import BaseModel, ValidationError
|
||||
from typing import Optional
|
||||
|
||||
from bot import BotSettings
|
||||
|
||||
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv(override=True)
|
||||
|
||||
# SQS queue URL
|
||||
QUEUE_URL = 'https://sqs.us-west-2.amazonaws.com/955740203061/khk-sqs-launch-day-demos'
|
||||
|
||||
# The program to be spawned
|
||||
SUBPROCESS_PROGRAM = 'your_subprocess_program.py'
|
||||
|
||||
# ------------ Configuration ------------ #
|
||||
|
||||
MAX_SESSION_TIME = 5 * 60 # 5 minutes
|
||||
REQUIRED_ENV_VARS = ['DAILY_API_KEY', 'CARTESIA_API_KEY']
|
||||
|
||||
daily_rest_helper = DailyRESTHelper(
|
||||
os.getenv("DAILY_API_KEY", ""),
|
||||
os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'))
|
||||
|
||||
|
||||
class RunnerSettings(BaseModel):
|
||||
prompt: Optional[
|
||||
str] = "You are a fast, low-latency chatbot. Your goal is to demonstrate voice-driven AI capabilities at human-like speeds. The technology powering you is Daily for transport, Groq for AI inference, Llama 3 (70-B version) LLM, and Deepgram for speech-to-text and text-to-speech. You are running on servers in Oregon. Respond to what the user said in a creative and helpful way, but keep responses short and legible. Ensure responses contain only words. Check again that you have not included special characters other than '?' or '!'."
|
||||
deepgram_voice: Optional[str] = os.getenv("DEEPGRAM_VOICE")
|
||||
openai_model: Optional[str] = os.getenv("OPENAI_MODEL", "gpt-4o")
|
||||
openai_api_key: Optional[str] = os.getenv("OPENAI_API_KEY")
|
||||
test: Optional[bool] = None
|
||||
|
||||
# ----------------- API ----------------- #
|
||||
|
||||
|
||||
def setup_sqs():
|
||||
"""Set up the SQS client."""
|
||||
return boto3.client(
|
||||
'sqs',
|
||||
region_name='us-west-2',
|
||||
# use an iam user instead of role because passing a role into an eks pod requires
|
||||
# adding an add-on and we don't want to change the eks cluster configuration if we
|
||||
# can avoid it
|
||||
aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'),
|
||||
aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY')
|
||||
)
|
||||
|
||||
|
||||
def receive_message(sqs):
|
||||
"""Receive a message from the SQS queue."""
|
||||
response = sqs.receive_message(
|
||||
QueueUrl=QUEUE_URL,
|
||||
MaxNumberOfMessages=1,
|
||||
WaitTimeSeconds=20 # Long polling
|
||||
)
|
||||
|
||||
messages = response.get('Messages', [])
|
||||
if messages:
|
||||
return messages[0]
|
||||
return None
|
||||
|
||||
|
||||
def delete_message(sqs, receipt_handle):
|
||||
"""Delete a message from the queue after processing."""
|
||||
sqs.delete_message(
|
||||
QueueUrl=QUEUE_URL,
|
||||
ReceiptHandle=receipt_handle
|
||||
)
|
||||
|
||||
|
||||
def run_bot(room_url):
|
||||
runner_settings = RunnerSettings()
|
||||
|
||||
# Check passed room URL exists, we should assume that it already has a sip set up
|
||||
try:
|
||||
room: DailyRoomObject = daily_rest_helper.get_room_from_url(room_url)
|
||||
except Exception:
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Room not found: {room_url}")
|
||||
|
||||
# Give the agent a token to join the session
|
||||
token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
|
||||
|
||||
if not room or not token:
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Failed to get token for room: {room_url}")
|
||||
|
||||
# Spawn a new agent, and join the user session
|
||||
try:
|
||||
bot_settings = BotSettings(
|
||||
room_url=room.url,
|
||||
room_token=token,
|
||||
prompt=runner_settings.prompt,
|
||||
deepgram_voice=runner_settings.deepgram_voice,
|
||||
openai_model=runner_settings.openai_model,
|
||||
openai_api_key=runner_settings.openai_api_key,
|
||||
)
|
||||
bot_settings_str = bot_settings.model_dump_json(exclude_none=True)
|
||||
|
||||
process = subprocess.Popen(
|
||||
[f"python3 -m bot -s '{bot_settings_str}'"],
|
||||
shell=True,
|
||||
bufsize=1,
|
||||
cwd=os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < (MAX_SESSION_TIME + 10):
|
||||
if process.poll() is not None:
|
||||
# process has finished
|
||||
print("BOT EXITED BEFORE TIMEOUT")
|
||||
return
|
||||
time.sleep(1)
|
||||
# process did not exit. need to kill -9 it
|
||||
print("KILLING BOT PROCESS")
|
||||
os.kill(process.pid, signal.SIGKILL)
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Failed to start subprocess: {e}")
|
||||
|
||||
|
||||
def main():
|
||||
print("sqs-runner.py - started.")
|
||||
sqs = setup_sqs()
|
||||
|
||||
while True:
|
||||
print("sqs-runner.py - polling")
|
||||
message = receive_message(sqs)
|
||||
if message:
|
||||
delete_message(sqs, message['ReceiptHandle'])
|
||||
|
||||
message_body = json.loads(message['Body'])
|
||||
print(f"Received message. {message_body}")
|
||||
|
||||
run_bot(message_body['url'])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Check environment variables
|
||||
for env_var in REQUIRED_ENV_VARS:
|
||||
if env_var not in os.environ:
|
||||
raise Exception(f"Missing environment variable: {env_var}.")
|
||||
|
||||
try:
|
||||
main()
|
||||
except KeyboardInterrupt:
|
||||
print("Pipecat runner shutting down...")
|
||||
Reference in New Issue
Block a user