Compare commits

...

2 Commits

Author SHA1 Message Date
Ubuntu
c44ca86a08 messy Dockerfile for an in-checkout build 2024-07-30 17:23:29 +00:00
Kwindla Hultman Kramer
aafc7d8fb4 make sqs queue configurable 2024-07-25 13:26:37 -07:00
4 changed files with 513 additions and 0 deletions

View File

@@ -0,0 +1,23 @@
FROM python:3.10-bullseye
RUN apt-get update
RUN apt-get install -y python3-dev portaudio19-dev
RUN git clone https://github.com/pipecat-ai/pipecat.git
WORKDIR /pipecat
RUN echo "forcing rebuild"
RUN git fetch
RUN git checkout khk/gpu-demo-0725 && echo "I hope that worked you know?"
RUN git checkout aafc7d8fb49f3d640b4af5266db04a6e36099384
RUN python3 -m venv venv
RUN . venv/bin/activate
RUN pip install -r dev-requirements.txt -r linux-py3.10-requirements.txt
RUN pip install boto3
RUN python -m build
RUN pip install --editable .
EXPOSE 7860
WORKDIR /pipecat/personal/khk/bot-and-runner
CMD ["python", "sqs-runner.py"]

View File

@@ -0,0 +1,190 @@
import asyncio
import sys
import os
import argparse
import json
import struct
import math
import time
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from pipecat.processors.frameworks.rtvi import (
RTVIConfig,
RTVILLMConfig,
RTVIProcessor,
RTVISetup,
RTVITTSConfig)
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from pipecat.vad.vad_analyzer import VADParams
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.frames.frames import (
Frame,
AudioRawFrame,
TranscriptionFrame,
MetricsFrame,
EndFrame
)
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class DebugLogger(FrameProcessor):
def __init__(self):
super().__init__()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
logger.debug(frame)
await self.push_frame(frame, direction)
class TranscriptionTimingLogger(FrameProcessor):
def __init__(self, avt):
super().__init__(name="Transcription")
self._avt = avt
def can_generate_metrics(self) -> bool:
return True
async def process_frame(self, frame: Frame, direction: FrameDirection):
try:
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
elapsed = time.time() - self._avt.last_transition_ts
logger.debug(f"Transcription TTFB: {elapsed}")
# await self.push_frame(MetricsFrame(ttfb={self.name: elapsed}))
await self.push_frame(MetricsFrame(ttfb=[{"processor": self.name, "value": elapsed}]))
await self.push_frame(frame, direction)
except Exception as e:
logger.debug(f"Exception {e}")
class AudioVolumeTimer(FrameProcessor):
def __init__(self):
super().__init__()
self.last_transition_ts = 0
self._prev_volume = -80
self._speech_volume_threshold = -50
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, AudioRawFrame):
volume = self.calculate_volume(frame)
# print(f"Audio volume: {volume:.2f} dB")
if (volume >= self._speech_volume_threshold and
self._prev_volume < self._speech_volume_threshold):
# logger.debug("transition above speech volume threshold")
self.last_transition_ts = time.time()
elif (volume < self._speech_volume_threshold and
self._prev_volume >= self._speech_volume_threshold):
# logger.debug("transition below non-speech volume threshold")
self.last_transition_ts = time.time()
self._prev_volume = volume
await self.push_frame(frame, direction)
def calculate_volume(self, frame: AudioRawFrame) -> float:
if frame.num_channels != 1:
raise ValueError(f"Expected 1 channel, got {frame.num_channels}")
# Unpack audio data into 16-bit integers
fmt = f"{len(frame.audio) // 2}h"
audio_samples = struct.unpack(fmt, frame.audio)
# Calculate RMS
sum_squares = sum(sample**2 for sample in audio_samples)
rms = math.sqrt(sum_squares / len(audio_samples))
# Convert RMS to decibels (dB)
# Reference: maximum value for 16-bit audio is 32767
if rms > 0:
db = 20 * math.log10(rms / 32767)
else:
db = -96 # Minimum value (almost silent)
return db
async def main(room_url, token, bot_config):
transport = DailyTransport(
room_url,
token,
"Realtime AI",
DailyParams(
audio_out_enabled=True,
# transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(
stop_secs=float(os.getenv("VAD_STOP_SECS", "0.3"))
)),
vad_audio_passthrough=True
))
avt = AudioVolumeTimer()
tl = TranscriptionTimingLogger(avt)
stt = DeepgramSTTService(
name="STT",
api_key=os.getenv("DEEPGRAM_API_KEY", ""),
url=os.getenv("DEEPGRAM_STT_BASE_URL", "wss://api.deepgram.com")
)
rtai = RTVIProcessor(
transport=transport,
setup=RTVISetup(config=RTVIConfig(**bot_config)),
llm_api_key=os.getenv("OPENAI_API_KEY", ""),
tts_api_key=os.getenv("CARTESIA_API_KEY", ""))
runner = PipelineRunner()
pipeline = Pipeline([transport.input(), avt, stt, tl, rtai])
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
# report_only_initial_ttfb=True
))
@ transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
logger.info("First participant joined")
@ transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
logger.info("Partcipant left. Exiting.")
@ transport.event_handler("on_call_state_updated")
async def on_call_state_updated(transport, state):
logger.info("Call state %s " % state)
if state == "left":
await task.queue_frame(EndFrame())
await runner.run(task)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pipecat Bot")
parser.add_argument("-u", type=str, help="Room URL")
parser.add_argument("-t", type=str, help="Token")
parser.add_argument("-c", type=str, help="Bot configuration blob")
config = parser.parse_args()
bot_config = json.loads(config.c) if config.c else {}
# bot_config = {"llm":{"model":"llama3-70b-8192","messages":[{"role":"system","content":"You are Chatbot, a friendly, helpful robot. Your output will be converted to audio so don't include special characters other than '!' or '?' in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying hello."}]},"tts":{"voice":"79a125e8-cd45-4c13-8a67-188112f4dd22"}}
if config.u and config.t and bot_config:
asyncio.run(main(config.u, config.t, bot_config))
else:
logger.error("Room URL and Token are required")

View File

@@ -0,0 +1,152 @@
#
# requires AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables
#
import boto3
import json
import subprocess
import signal
import os
import time
from pydantic import BaseModel, ValidationError
from typing import Optional
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams
from dotenv import load_dotenv
load_dotenv(override=True)
# SQS queue URL
QUEUE_URL = os.environ.get('SQS_QUEUE_URL')
# The program to be spawned
SUBPROCESS_PROGRAM = 'your_subprocess_program.py'
def escape_bash_arg(s):
return "'" + s.replace("'", "'\\''") + "'"
# ------------ Configuration ------------ #
MAX_SESSION_TIME = 5 * 60 # 5 minutes
REQUIRED_ENV_VARS = [
'DAILY_API_KEY',
'CARTESIA_API_KEY',
'SQS_QUEUE_URL',
'AWS_ACCESS_KEY_ID',
'AWS_SECRET_ACCESS_KEY']
print("DAILY_API_KEY", os.getenv("DAILY_API_KEY"))
daily_rest_helper = DailyRESTHelper(
os.getenv("DAILY_API_KEY", ""),
os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'))
# ----------------- API ----------------- #
def setup_sqs():
"""Set up the SQS client."""
return boto3.client(
'sqs',
region_name='us-west-2',
# use an iam user instead of role because passing a role into an eks pod requires
# adding an add-on and we don't want to change the eks cluster configuration if we
# can avoid it
aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY')
)
def receive_message(sqs):
"""Receive a message from the SQS queue."""
response = sqs.receive_message(
QueueUrl=QUEUE_URL,
MaxNumberOfMessages=1,
WaitTimeSeconds=20 # Long polling
)
messages = response.get('Messages', [])
if messages:
return messages[0]
return None
def delete_message(sqs, receipt_handle):
"""Delete a message from the queue after processing."""
sqs.delete_message(
QueueUrl=QUEUE_URL,
ReceiptHandle=receipt_handle
)
def run_bot(msg):
try:
print(f"Getting room from URL: {'https://rtvi.daily.co/' + msg['room']}")
room: DailyRoomObject = daily_rest_helper.get_room_from_url(
'https://rtvi.daily.co/' + msg['room'])
except Exception:
raise HTTPException(
status_code=500, detail=f"Room not found: {msg['room']}")
# Give the agent a token to join the session
token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
if not room or not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room.url}")
# Spawn a new agent, and join the user session
try:
bot_settings_str = json.dumps(msg['config'])
print(f"Starting bot with settings: {room.url}, {token}, {bot_settings_str}")
process = subprocess.Popen(
[f"python3 -m bot -u {room.url} -t {token} -c {escape_bash_arg(bot_settings_str)}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)))
start_time = time.time()
while time.time() - start_time < (MAX_SESSION_TIME + 10):
if process.poll() is not None:
# process has finished
print("BOT EXITED BEFORE TIMEOUT")
return
time.sleep(1)
# process did not exit. need to kill -9 it
print("KILLING BOT PROCESS")
os.kill(process.pid, signal.SIGKILL)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
def main():
print("sqs-runner.py - started.")
sqs = setup_sqs()
while True:
print("sqs-runner.py - polling")
message = receive_message(sqs)
if message:
delete_message(sqs, message['ReceiptHandle'])
message_body = json.loads(message['Body'])
print(f"Received message. {message_body}")
run_bot(message_body)
if __name__ == "__main__":
# Check environment variables
for env_var in REQUIRED_ENV_VARS:
if env_var not in os.environ:
raise Exception(f"Missing environment variable: {env_var}.")
try:
main()
except KeyboardInterrupt:
print("Pipecat runner shutting down...")

View File

@@ -0,0 +1,148 @@
import json
import os
import requests
import boto3
import time
from typing import Dict, Any
def get_sqs_client():
if __name__ == "__main__":
# for local testing, load temporary credentials. see notes.md
try:
with open('assume_role_output.json', 'r') as f:
credentials = json.load(f)['Credentials']
session = boto3.Session(
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken']
)
return session.client('sqs', region_name='us-west-2')
except FileNotFoundError:
print("Warning: assume_role_output.json not found. Using default credentials.")
return boto3.client('sqs')
else:
# When running in Lambda, use the role attached to the function
return boto3.client('sqs')
sqs = get_sqs_client()
DAILY_API_KEY = os.environ.get('DAILY_API_KEY')
SQS_QUEUE_URL = os.environ.get('SQS_QUEUE_URL')
def create_daily_room() -> Dict[str, Any]:
if not DAILY_API_KEY:
raise ValueError("DAILY_API_KEY environment variable is not set")
url = "https://api.daily.co/v1/rooms"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {DAILY_API_KEY}"
}
exp = int(time.time()) + 180
payload = {
"properties": {
"exp": exp,
"eject_at_room_exp": True
}
}
response = requests.post(url, headers=headers, json=payload)
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
return response.json()
def create_token(room_name: str) -> Dict[str, Any]:
if not DAILY_API_KEY:
raise ValueError("DAILY_API_KEY environment variable is not set")
url = f"https://api.daily.co/v1/meeting-tokens"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {DAILY_API_KEY}"
}
exp = int(time.time()) + 180
payload = {
"properties": {
"exp": exp,
"room_name": room_name
}
}
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
return response.json()
def send_to_sqs(room_info: Dict[str, Any]) -> None:
if not SQS_QUEUE_URL:
raise ValueError("SQS_QUEUE_URL environment variable is not set")
if sqs:
sqs.send_message(
QueueUrl=SQS_QUEUE_URL,
MessageBody=json.dumps(room_info)
)
print(f"Message sent to SQS queue: {SQS_QUEUE_URL}")
else:
print(f"Simulated sending message to SQS queue: {SQS_QUEUE_URL}")
print(f"Message body: {json.dumps(room_info, indent=2)}")
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
try:
path = event.get('rawPath', '')
path = path.rstrip('/')
if path == '/authenticate':
print("Creating Daily room...")
daily_room = create_daily_room()
print(f"Daily room created: {json.dumps(daily_room, indent=2)}")
token = create_token(daily_room['name'])
print(f"Meeting token created: {json.dumps(token, indent=2)}")
return {
'statusCode': 200,
'body': json.dumps({"room": daily_room['name'], "token": token['token']})
}
elif path == '/start_bot':
body = json.loads(event['body'])
print("Sending room info to SQS...")
send_to_sqs(body)
return {
'statusCode': 200,
'body': json.dumps({"status": "healthy"})
}
else:
return {
'statusCode': 200,
'body': json.dumps({"success": True})
}
except Exception as e:
print(f"Error occurred: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps(f'Error: {str(e)}')
}
if __name__ == "__main__":
# Simulate the Lambda event and context
# event = {}
event = {
# "rawPath": "/authenticate"
"rawPath": "/start_bot",
"body": json.dumps({"room": "test-room", "token": "test"})
}
context = None
result = lambda_handler(event, context)
print(f"Lambda handler result: {json.dumps(result, indent=2)}")