Compare commits
2 Commits
hush/aggre
...
khk/gpu-de
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c44ca86a08 | ||
|
|
aafc7d8fb4 |
23
personal/khk/bot-and-runner/Dockerfile
Normal file
23
personal/khk/bot-and-runner/Dockerfile
Normal file
@@ -0,0 +1,23 @@
|
||||
FROM python:3.10-bullseye
|
||||
|
||||
RUN apt-get update
|
||||
RUN apt-get install -y python3-dev portaudio19-dev
|
||||
|
||||
RUN git clone https://github.com/pipecat-ai/pipecat.git
|
||||
WORKDIR /pipecat
|
||||
RUN echo "forcing rebuild"
|
||||
RUN git fetch
|
||||
RUN git checkout khk/gpu-demo-0725 && echo "I hope that worked you know?"
|
||||
RUN git checkout aafc7d8fb49f3d640b4af5266db04a6e36099384
|
||||
RUN python3 -m venv venv
|
||||
RUN . venv/bin/activate
|
||||
RUN pip install -r dev-requirements.txt -r linux-py3.10-requirements.txt
|
||||
RUN pip install boto3
|
||||
RUN python -m build
|
||||
RUN pip install --editable .
|
||||
|
||||
EXPOSE 7860
|
||||
|
||||
WORKDIR /pipecat/personal/khk/bot-and-runner
|
||||
CMD ["python", "sqs-runner.py"]
|
||||
|
||||
190
personal/khk/bot-and-runner/bot.py
Normal file
190
personal/khk/bot-and-runner/bot.py
Normal file
@@ -0,0 +1,190 @@
|
||||
import asyncio
|
||||
import sys
|
||||
import os
|
||||
import argparse
|
||||
import json
|
||||
import struct
|
||||
import math
|
||||
import time
|
||||
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.processors.frameworks.rtvi import (
|
||||
RTVIConfig,
|
||||
RTVILLMConfig,
|
||||
RTVIProcessor,
|
||||
RTVISetup,
|
||||
RTVITTSConfig)
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
from pipecat.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.vad.vad_analyzer import VADParams
|
||||
from pipecat.services.deepgram import DeepgramSTTService
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
AudioRawFrame,
|
||||
TranscriptionFrame,
|
||||
MetricsFrame,
|
||||
EndFrame
|
||||
)
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
class DebugLogger(FrameProcessor):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
logger.debug(frame)
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
class TranscriptionTimingLogger(FrameProcessor):
|
||||
def __init__(self, avt):
|
||||
super().__init__(name="Transcription")
|
||||
self._avt = avt
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return True
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
try:
|
||||
await super().process_frame(frame, direction)
|
||||
if isinstance(frame, TranscriptionFrame):
|
||||
elapsed = time.time() - self._avt.last_transition_ts
|
||||
logger.debug(f"Transcription TTFB: {elapsed}")
|
||||
# await self.push_frame(MetricsFrame(ttfb={self.name: elapsed}))
|
||||
await self.push_frame(MetricsFrame(ttfb=[{"processor": self.name, "value": elapsed}]))
|
||||
|
||||
await self.push_frame(frame, direction)
|
||||
except Exception as e:
|
||||
logger.debug(f"Exception {e}")
|
||||
|
||||
class AudioVolumeTimer(FrameProcessor):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.last_transition_ts = 0
|
||||
self._prev_volume = -80
|
||||
self._speech_volume_threshold = -50
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, AudioRawFrame):
|
||||
volume = self.calculate_volume(frame)
|
||||
# print(f"Audio volume: {volume:.2f} dB")
|
||||
if (volume >= self._speech_volume_threshold and
|
||||
self._prev_volume < self._speech_volume_threshold):
|
||||
# logger.debug("transition above speech volume threshold")
|
||||
self.last_transition_ts = time.time()
|
||||
elif (volume < self._speech_volume_threshold and
|
||||
self._prev_volume >= self._speech_volume_threshold):
|
||||
# logger.debug("transition below non-speech volume threshold")
|
||||
self.last_transition_ts = time.time()
|
||||
self._prev_volume = volume
|
||||
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
def calculate_volume(self, frame: AudioRawFrame) -> float:
|
||||
if frame.num_channels != 1:
|
||||
raise ValueError(f"Expected 1 channel, got {frame.num_channels}")
|
||||
|
||||
# Unpack audio data into 16-bit integers
|
||||
fmt = f"{len(frame.audio) // 2}h"
|
||||
audio_samples = struct.unpack(fmt, frame.audio)
|
||||
|
||||
# Calculate RMS
|
||||
sum_squares = sum(sample**2 for sample in audio_samples)
|
||||
rms = math.sqrt(sum_squares / len(audio_samples))
|
||||
|
||||
# Convert RMS to decibels (dB)
|
||||
# Reference: maximum value for 16-bit audio is 32767
|
||||
if rms > 0:
|
||||
db = 20 * math.log10(rms / 32767)
|
||||
else:
|
||||
db = -96 # Minimum value (almost silent)
|
||||
|
||||
return db
|
||||
|
||||
async def main(room_url, token, bot_config):
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Realtime AI",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
# transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(
|
||||
stop_secs=float(os.getenv("VAD_STOP_SECS", "0.3"))
|
||||
)),
|
||||
vad_audio_passthrough=True
|
||||
))
|
||||
|
||||
avt = AudioVolumeTimer()
|
||||
tl = TranscriptionTimingLogger(avt)
|
||||
|
||||
stt = DeepgramSTTService(
|
||||
name="STT",
|
||||
api_key=os.getenv("DEEPGRAM_API_KEY", ""),
|
||||
url=os.getenv("DEEPGRAM_STT_BASE_URL", "wss://api.deepgram.com")
|
||||
)
|
||||
|
||||
rtai = RTVIProcessor(
|
||||
transport=transport,
|
||||
setup=RTVISetup(config=RTVIConfig(**bot_config)),
|
||||
llm_api_key=os.getenv("OPENAI_API_KEY", ""),
|
||||
tts_api_key=os.getenv("CARTESIA_API_KEY", ""))
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
pipeline = Pipeline([transport.input(), avt, stt, tl, rtai])
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
# report_only_initial_ttfb=True
|
||||
))
|
||||
|
||||
@ transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
logger.info("First participant joined")
|
||||
|
||||
@ transport.event_handler("on_participant_left")
|
||||
async def on_participant_left(transport, participant, reason):
|
||||
await task.queue_frame(EndFrame())
|
||||
logger.info("Partcipant left. Exiting.")
|
||||
|
||||
@ transport.event_handler("on_call_state_updated")
|
||||
async def on_call_state_updated(transport, state):
|
||||
logger.info("Call state %s " % state)
|
||||
if state == "left":
|
||||
await task.queue_frame(EndFrame())
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Pipecat Bot")
|
||||
parser.add_argument("-u", type=str, help="Room URL")
|
||||
parser.add_argument("-t", type=str, help="Token")
|
||||
parser.add_argument("-c", type=str, help="Bot configuration blob")
|
||||
config = parser.parse_args()
|
||||
|
||||
bot_config = json.loads(config.c) if config.c else {}
|
||||
# bot_config = {"llm":{"model":"llama3-70b-8192","messages":[{"role":"system","content":"You are Chatbot, a friendly, helpful robot. Your output will be converted to audio so don't include special characters other than '!' or '?' in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying hello."}]},"tts":{"voice":"79a125e8-cd45-4c13-8a67-188112f4dd22"}}
|
||||
|
||||
if config.u and config.t and bot_config:
|
||||
asyncio.run(main(config.u, config.t, bot_config))
|
||||
else:
|
||||
logger.error("Room URL and Token are required")
|
||||
152
personal/khk/bot-and-runner/sqs-runner.py
Normal file
152
personal/khk/bot-and-runner/sqs-runner.py
Normal file
@@ -0,0 +1,152 @@
|
||||
#
|
||||
# requires AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables
|
||||
#
|
||||
|
||||
import boto3
|
||||
import json
|
||||
import subprocess
|
||||
import signal
|
||||
import os
|
||||
import time
|
||||
|
||||
from pydantic import BaseModel, ValidationError
|
||||
from typing import Optional
|
||||
|
||||
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv(override=True)
|
||||
|
||||
# SQS queue URL
|
||||
QUEUE_URL = os.environ.get('SQS_QUEUE_URL')
|
||||
|
||||
# The program to be spawned
|
||||
SUBPROCESS_PROGRAM = 'your_subprocess_program.py'
|
||||
|
||||
|
||||
def escape_bash_arg(s):
|
||||
return "'" + s.replace("'", "'\\''") + "'"
|
||||
|
||||
# ------------ Configuration ------------ #
|
||||
|
||||
|
||||
MAX_SESSION_TIME = 5 * 60 # 5 minutes
|
||||
REQUIRED_ENV_VARS = [
|
||||
'DAILY_API_KEY',
|
||||
'CARTESIA_API_KEY',
|
||||
'SQS_QUEUE_URL',
|
||||
'AWS_ACCESS_KEY_ID',
|
||||
'AWS_SECRET_ACCESS_KEY']
|
||||
|
||||
print("DAILY_API_KEY", os.getenv("DAILY_API_KEY"))
|
||||
daily_rest_helper = DailyRESTHelper(
|
||||
os.getenv("DAILY_API_KEY", ""),
|
||||
os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'))
|
||||
|
||||
# ----------------- API ----------------- #
|
||||
|
||||
|
||||
def setup_sqs():
|
||||
"""Set up the SQS client."""
|
||||
return boto3.client(
|
||||
'sqs',
|
||||
region_name='us-west-2',
|
||||
# use an iam user instead of role because passing a role into an eks pod requires
|
||||
# adding an add-on and we don't want to change the eks cluster configuration if we
|
||||
# can avoid it
|
||||
aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'),
|
||||
aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY')
|
||||
)
|
||||
|
||||
|
||||
def receive_message(sqs):
|
||||
"""Receive a message from the SQS queue."""
|
||||
response = sqs.receive_message(
|
||||
QueueUrl=QUEUE_URL,
|
||||
MaxNumberOfMessages=1,
|
||||
WaitTimeSeconds=20 # Long polling
|
||||
)
|
||||
|
||||
messages = response.get('Messages', [])
|
||||
if messages:
|
||||
return messages[0]
|
||||
return None
|
||||
|
||||
|
||||
def delete_message(sqs, receipt_handle):
|
||||
"""Delete a message from the queue after processing."""
|
||||
sqs.delete_message(
|
||||
QueueUrl=QUEUE_URL,
|
||||
ReceiptHandle=receipt_handle
|
||||
)
|
||||
|
||||
|
||||
def run_bot(msg):
|
||||
try:
|
||||
print(f"Getting room from URL: {'https://rtvi.daily.co/' + msg['room']}")
|
||||
room: DailyRoomObject = daily_rest_helper.get_room_from_url(
|
||||
'https://rtvi.daily.co/' + msg['room'])
|
||||
except Exception:
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Room not found: {msg['room']}")
|
||||
|
||||
# Give the agent a token to join the session
|
||||
token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
|
||||
|
||||
if not room or not token:
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Failed to get token for room: {room.url}")
|
||||
|
||||
# Spawn a new agent, and join the user session
|
||||
try:
|
||||
bot_settings_str = json.dumps(msg['config'])
|
||||
|
||||
print(f"Starting bot with settings: {room.url}, {token}, {bot_settings_str}")
|
||||
|
||||
process = subprocess.Popen(
|
||||
[f"python3 -m bot -u {room.url} -t {token} -c {escape_bash_arg(bot_settings_str)}"],
|
||||
shell=True,
|
||||
bufsize=1,
|
||||
cwd=os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < (MAX_SESSION_TIME + 10):
|
||||
if process.poll() is not None:
|
||||
# process has finished
|
||||
print("BOT EXITED BEFORE TIMEOUT")
|
||||
return
|
||||
time.sleep(1)
|
||||
# process did not exit. need to kill -9 it
|
||||
print("KILLING BOT PROCESS")
|
||||
os.kill(process.pid, signal.SIGKILL)
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Failed to start subprocess: {e}")
|
||||
|
||||
|
||||
def main():
|
||||
print("sqs-runner.py - started.")
|
||||
sqs = setup_sqs()
|
||||
|
||||
while True:
|
||||
print("sqs-runner.py - polling")
|
||||
message = receive_message(sqs)
|
||||
if message:
|
||||
delete_message(sqs, message['ReceiptHandle'])
|
||||
|
||||
message_body = json.loads(message['Body'])
|
||||
print(f"Received message. {message_body}")
|
||||
|
||||
run_bot(message_body)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Check environment variables
|
||||
for env_var in REQUIRED_ENV_VARS:
|
||||
if env_var not in os.environ:
|
||||
raise Exception(f"Missing environment variable: {env_var}.")
|
||||
|
||||
try:
|
||||
main()
|
||||
except KeyboardInterrupt:
|
||||
print("Pipecat runner shutting down...")
|
||||
148
personal/khk/lambda/lambda_function.py
Normal file
148
personal/khk/lambda/lambda_function.py
Normal file
@@ -0,0 +1,148 @@
|
||||
import json
|
||||
import os
|
||||
import requests
|
||||
import boto3
|
||||
import time
|
||||
from typing import Dict, Any
|
||||
|
||||
|
||||
def get_sqs_client():
|
||||
if __name__ == "__main__":
|
||||
# for local testing, load temporary credentials. see notes.md
|
||||
try:
|
||||
with open('assume_role_output.json', 'r') as f:
|
||||
credentials = json.load(f)['Credentials']
|
||||
|
||||
session = boto3.Session(
|
||||
aws_access_key_id=credentials['AccessKeyId'],
|
||||
aws_secret_access_key=credentials['SecretAccessKey'],
|
||||
aws_session_token=credentials['SessionToken']
|
||||
)
|
||||
return session.client('sqs', region_name='us-west-2')
|
||||
except FileNotFoundError:
|
||||
print("Warning: assume_role_output.json not found. Using default credentials.")
|
||||
return boto3.client('sqs')
|
||||
else:
|
||||
# When running in Lambda, use the role attached to the function
|
||||
return boto3.client('sqs')
|
||||
|
||||
|
||||
sqs = get_sqs_client()
|
||||
DAILY_API_KEY = os.environ.get('DAILY_API_KEY')
|
||||
SQS_QUEUE_URL = os.environ.get('SQS_QUEUE_URL')
|
||||
|
||||
|
||||
def create_daily_room() -> Dict[str, Any]:
|
||||
if not DAILY_API_KEY:
|
||||
raise ValueError("DAILY_API_KEY environment variable is not set")
|
||||
|
||||
url = "https://api.daily.co/v1/rooms"
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": f"Bearer {DAILY_API_KEY}"
|
||||
}
|
||||
|
||||
exp = int(time.time()) + 180
|
||||
payload = {
|
||||
"properties": {
|
||||
"exp": exp,
|
||||
"eject_at_room_exp": True
|
||||
}
|
||||
}
|
||||
|
||||
response = requests.post(url, headers=headers, json=payload)
|
||||
response = requests.post(url, headers=headers, json=payload)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
|
||||
def create_token(room_name: str) -> Dict[str, Any]:
|
||||
if not DAILY_API_KEY:
|
||||
raise ValueError("DAILY_API_KEY environment variable is not set")
|
||||
|
||||
url = f"https://api.daily.co/v1/meeting-tokens"
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": f"Bearer {DAILY_API_KEY}"
|
||||
}
|
||||
|
||||
exp = int(time.time()) + 180
|
||||
payload = {
|
||||
"properties": {
|
||||
"exp": exp,
|
||||
"room_name": room_name
|
||||
}
|
||||
}
|
||||
|
||||
response = requests.post(url, headers=headers, json=payload)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
|
||||
def send_to_sqs(room_info: Dict[str, Any]) -> None:
|
||||
if not SQS_QUEUE_URL:
|
||||
raise ValueError("SQS_QUEUE_URL environment variable is not set")
|
||||
|
||||
if sqs:
|
||||
sqs.send_message(
|
||||
QueueUrl=SQS_QUEUE_URL,
|
||||
MessageBody=json.dumps(room_info)
|
||||
)
|
||||
print(f"Message sent to SQS queue: {SQS_QUEUE_URL}")
|
||||
else:
|
||||
print(f"Simulated sending message to SQS queue: {SQS_QUEUE_URL}")
|
||||
print(f"Message body: {json.dumps(room_info, indent=2)}")
|
||||
|
||||
|
||||
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
|
||||
try:
|
||||
path = event.get('rawPath', '')
|
||||
path = path.rstrip('/')
|
||||
|
||||
if path == '/authenticate':
|
||||
print("Creating Daily room...")
|
||||
daily_room = create_daily_room()
|
||||
print(f"Daily room created: {json.dumps(daily_room, indent=2)}")
|
||||
|
||||
token = create_token(daily_room['name'])
|
||||
print(f"Meeting token created: {json.dumps(token, indent=2)}")
|
||||
|
||||
return {
|
||||
'statusCode': 200,
|
||||
'body': json.dumps({"room": daily_room['name'], "token": token['token']})
|
||||
}
|
||||
elif path == '/start_bot':
|
||||
body = json.loads(event['body'])
|
||||
|
||||
print("Sending room info to SQS...")
|
||||
|
||||
send_to_sqs(body)
|
||||
return {
|
||||
'statusCode': 200,
|
||||
'body': json.dumps({"status": "healthy"})
|
||||
}
|
||||
else:
|
||||
return {
|
||||
'statusCode': 200,
|
||||
'body': json.dumps({"success": True})
|
||||
}
|
||||
except Exception as e:
|
||||
print(f"Error occurred: {str(e)}")
|
||||
return {
|
||||
'statusCode': 500,
|
||||
'body': json.dumps(f'Error: {str(e)}')
|
||||
}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Simulate the Lambda event and context
|
||||
# event = {}
|
||||
event = {
|
||||
# "rawPath": "/authenticate"
|
||||
"rawPath": "/start_bot",
|
||||
"body": json.dumps({"room": "test-room", "token": "test"})
|
||||
}
|
||||
context = None
|
||||
|
||||
result = lambda_handler(event, context)
|
||||
print(f"Lambda handler result: {json.dumps(result, indent=2)}")
|
||||
Reference in New Issue
Block a user