diff --git a/personal/khk/bot-and-runner/bot.py b/personal/khk/bot-and-runner/bot.py new file mode 100644 index 000000000..688a76155 --- /dev/null +++ b/personal/khk/bot-and-runner/bot.py @@ -0,0 +1,190 @@ +import asyncio +import sys +import os +import argparse +import json +import struct +import math +import time + +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.pipeline.runner import PipelineRunner +from pipecat.processors.frameworks.rtvi import ( + RTVIConfig, + RTVILLMConfig, + RTVIProcessor, + RTVISetup, + RTVITTSConfig) +from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.vad.silero import SileroVADAnalyzer +from pipecat.vad.vad_analyzer import VADParams +from pipecat.services.deepgram import DeepgramSTTService +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.frames.frames import ( + Frame, + AudioRawFrame, + TranscriptionFrame, + MetricsFrame, + EndFrame +) + +from loguru import logger + +from dotenv import load_dotenv +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + +class DebugLogger(FrameProcessor): + def __init__(self): + super().__init__() + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + logger.debug(frame) + await self.push_frame(frame, direction) + + +class TranscriptionTimingLogger(FrameProcessor): + def __init__(self, avt): + super().__init__(name="Transcription") + self._avt = avt + + def can_generate_metrics(self) -> bool: + return True + + async def process_frame(self, frame: Frame, direction: FrameDirection): + try: + await super().process_frame(frame, direction) + if isinstance(frame, TranscriptionFrame): + elapsed = time.time() - self._avt.last_transition_ts + logger.debug(f"Transcription TTFB: {elapsed}") + # await self.push_frame(MetricsFrame(ttfb={self.name: elapsed})) + await self.push_frame(MetricsFrame(ttfb=[{"processor": self.name, "value": elapsed}])) + + await self.push_frame(frame, direction) + except Exception as e: + logger.debug(f"Exception {e}") + +class AudioVolumeTimer(FrameProcessor): + def __init__(self): + super().__init__() + self.last_transition_ts = 0 + self._prev_volume = -80 + self._speech_volume_threshold = -50 + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, AudioRawFrame): + volume = self.calculate_volume(frame) + # print(f"Audio volume: {volume:.2f} dB") + if (volume >= self._speech_volume_threshold and + self._prev_volume < self._speech_volume_threshold): + # logger.debug("transition above speech volume threshold") + self.last_transition_ts = time.time() + elif (volume < self._speech_volume_threshold and + self._prev_volume >= self._speech_volume_threshold): + # logger.debug("transition below non-speech volume threshold") + self.last_transition_ts = time.time() + self._prev_volume = volume + + await self.push_frame(frame, direction) + + def calculate_volume(self, frame: AudioRawFrame) -> float: + if frame.num_channels != 1: + raise ValueError(f"Expected 1 channel, got {frame.num_channels}") + + # Unpack audio data into 16-bit integers + fmt = f"{len(frame.audio) // 2}h" + audio_samples = struct.unpack(fmt, frame.audio) + + # Calculate RMS + sum_squares = sum(sample**2 for sample in audio_samples) + rms = math.sqrt(sum_squares / len(audio_samples)) + + # Convert RMS to decibels (dB) + # Reference: maximum value for 16-bit audio is 32767 + if rms > 0: + db = 20 * math.log10(rms / 32767) + else: + db = -96 # Minimum value (almost silent) + + return db + +async def main(room_url, token, bot_config): + transport = DailyTransport( + room_url, + token, + "Realtime AI", + DailyParams( + audio_out_enabled=True, + # transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(params=VADParams( + stop_secs=float(os.getenv("VAD_STOP_SECS", "0.3")) + )), + vad_audio_passthrough=True + )) + + avt = AudioVolumeTimer() + tl = TranscriptionTimingLogger(avt) + + stt = DeepgramSTTService( + name="STT", + api_key=os.getenv("DEEPGRAM_API_KEY", ""), + url=os.getenv("DEEPGRAM_STT_BASE_URL", "wss://api.deepgram.com") + ) + + rtai = RTVIProcessor( + transport=transport, + setup=RTVISetup(config=RTVIConfig(**bot_config)), + llm_api_key=os.getenv("OPENAI_API_KEY", ""), + tts_api_key=os.getenv("CARTESIA_API_KEY", "")) + + runner = PipelineRunner() + + pipeline = Pipeline([transport.input(), avt, stt, tl, rtai]) + + task = PipelineTask( + pipeline, + params=PipelineParams( + allow_interruptions=True, + enable_metrics=True, + # report_only_initial_ttfb=True + )) + + @ transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + transport.capture_participant_transcription(participant["id"]) + logger.info("First participant joined") + + @ transport.event_handler("on_participant_left") + async def on_participant_left(transport, participant, reason): + await task.queue_frame(EndFrame()) + logger.info("Partcipant left. Exiting.") + + @ transport.event_handler("on_call_state_updated") + async def on_call_state_updated(transport, state): + logger.info("Call state %s " % state) + if state == "left": + await task.queue_frame(EndFrame()) + + await runner.run(task) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Pipecat Bot") + parser.add_argument("-u", type=str, help="Room URL") + parser.add_argument("-t", type=str, help="Token") + parser.add_argument("-c", type=str, help="Bot configuration blob") + config = parser.parse_args() + + bot_config = json.loads(config.c) if config.c else {} + # bot_config = {"llm":{"model":"llama3-70b-8192","messages":[{"role":"system","content":"You are Chatbot, a friendly, helpful robot. Your output will be converted to audio so don't include special characters other than '!' or '?' in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying hello."}]},"tts":{"voice":"79a125e8-cd45-4c13-8a67-188112f4dd22"}} + + if config.u and config.t and bot_config: + asyncio.run(main(config.u, config.t, bot_config)) + else: + logger.error("Room URL and Token are required") diff --git a/personal/khk/bot-and-runner/sqs-runner.py b/personal/khk/bot-and-runner/sqs-runner.py new file mode 100644 index 000000000..585b793b1 --- /dev/null +++ b/personal/khk/bot-and-runner/sqs-runner.py @@ -0,0 +1,152 @@ +# +# requires AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables +# + +import boto3 +import json +import subprocess +import signal +import os +import time + +from pydantic import BaseModel, ValidationError +from typing import Optional + +from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams + +from dotenv import load_dotenv +load_dotenv(override=True) + +# SQS queue URL +QUEUE_URL = os.environ.get('SQS_QUEUE_URL') + +# The program to be spawned +SUBPROCESS_PROGRAM = 'your_subprocess_program.py' + + +def escape_bash_arg(s): + return "'" + s.replace("'", "'\\''") + "'" + +# ------------ Configuration ------------ # + + +MAX_SESSION_TIME = 5 * 60 # 5 minutes +REQUIRED_ENV_VARS = [ + 'DAILY_API_KEY', + 'CARTESIA_API_KEY', + 'SQS_QUEUE_URL', + 'AWS_ACCESS_KEY_ID', + 'AWS_SECRET_ACCESS_KEY'] + +print("DAILY_API_KEY", os.getenv("DAILY_API_KEY")) +daily_rest_helper = DailyRESTHelper( + os.getenv("DAILY_API_KEY", ""), + os.getenv("DAILY_API_URL", 'https://api.daily.co/v1')) + +# ----------------- API ----------------- # + + +def setup_sqs(): + """Set up the SQS client.""" + return boto3.client( + 'sqs', + region_name='us-west-2', + # use an iam user instead of role because passing a role into an eks pod requires + # adding an add-on and we don't want to change the eks cluster configuration if we + # can avoid it + aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'), + aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY') + ) + + +def receive_message(sqs): + """Receive a message from the SQS queue.""" + response = sqs.receive_message( + QueueUrl=QUEUE_URL, + MaxNumberOfMessages=1, + WaitTimeSeconds=20 # Long polling + ) + + messages = response.get('Messages', []) + if messages: + return messages[0] + return None + + +def delete_message(sqs, receipt_handle): + """Delete a message from the queue after processing.""" + sqs.delete_message( + QueueUrl=QUEUE_URL, + ReceiptHandle=receipt_handle + ) + + +def run_bot(msg): + try: + print(f"Getting room from URL: {'https://rtvi.daily.co/' + msg['room']}") + room: DailyRoomObject = daily_rest_helper.get_room_from_url( + 'https://rtvi.daily.co/' + msg['room']) + except Exception: + raise HTTPException( + status_code=500, detail=f"Room not found: {msg['room']}") + + # Give the agent a token to join the session + token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME) + + if not room or not token: + raise HTTPException( + status_code=500, detail=f"Failed to get token for room: {room.url}") + + # Spawn a new agent, and join the user session + try: + bot_settings_str = json.dumps(msg['config']) + + print(f"Starting bot with settings: {room.url}, {token}, {bot_settings_str}") + + process = subprocess.Popen( + [f"python3 -m bot -u {room.url} -t {token} -c {escape_bash_arg(bot_settings_str)}"], + shell=True, + bufsize=1, + cwd=os.path.dirname(os.path.abspath(__file__))) + + start_time = time.time() + while time.time() - start_time < (MAX_SESSION_TIME + 10): + if process.poll() is not None: + # process has finished + print("BOT EXITED BEFORE TIMEOUT") + return + time.sleep(1) + # process did not exit. need to kill -9 it + print("KILLING BOT PROCESS") + os.kill(process.pid, signal.SIGKILL) + except Exception as e: + raise HTTPException( + status_code=500, detail=f"Failed to start subprocess: {e}") + + +def main(): + print("sqs-runner.py - started.") + sqs = setup_sqs() + + while True: + print("sqs-runner.py - polling") + message = receive_message(sqs) + if message: + delete_message(sqs, message['ReceiptHandle']) + + message_body = json.loads(message['Body']) + print(f"Received message. {message_body}") + + run_bot(message_body) + + +if __name__ == "__main__": + # Check environment variables + for env_var in REQUIRED_ENV_VARS: + if env_var not in os.environ: + raise Exception(f"Missing environment variable: {env_var}.") + + try: + main() + except KeyboardInterrupt: + print("Pipecat runner shutting down...") diff --git a/personal/khk/lambda/lambda_function.py b/personal/khk/lambda/lambda_function.py new file mode 100644 index 000000000..7730de7da --- /dev/null +++ b/personal/khk/lambda/lambda_function.py @@ -0,0 +1,148 @@ +import json +import os +import requests +import boto3 +import time +from typing import Dict, Any + + +def get_sqs_client(): + if __name__ == "__main__": + # for local testing, load temporary credentials. see notes.md + try: + with open('assume_role_output.json', 'r') as f: + credentials = json.load(f)['Credentials'] + + session = boto3.Session( + aws_access_key_id=credentials['AccessKeyId'], + aws_secret_access_key=credentials['SecretAccessKey'], + aws_session_token=credentials['SessionToken'] + ) + return session.client('sqs', region_name='us-west-2') + except FileNotFoundError: + print("Warning: assume_role_output.json not found. Using default credentials.") + return boto3.client('sqs') + else: + # When running in Lambda, use the role attached to the function + return boto3.client('sqs') + + +sqs = get_sqs_client() +DAILY_API_KEY = os.environ.get('DAILY_API_KEY') +SQS_QUEUE_URL = os.environ.get('SQS_QUEUE_URL') + + +def create_daily_room() -> Dict[str, Any]: + if not DAILY_API_KEY: + raise ValueError("DAILY_API_KEY environment variable is not set") + + url = "https://api.daily.co/v1/rooms" + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {DAILY_API_KEY}" + } + + exp = int(time.time()) + 180 + payload = { + "properties": { + "exp": exp, + "eject_at_room_exp": True + } + } + + response = requests.post(url, headers=headers, json=payload) + response = requests.post(url, headers=headers, json=payload) + response.raise_for_status() + return response.json() + + +def create_token(room_name: str) -> Dict[str, Any]: + if not DAILY_API_KEY: + raise ValueError("DAILY_API_KEY environment variable is not set") + + url = f"https://api.daily.co/v1/meeting-tokens" + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {DAILY_API_KEY}" + } + + exp = int(time.time()) + 180 + payload = { + "properties": { + "exp": exp, + "room_name": room_name + } + } + + response = requests.post(url, headers=headers, json=payload) + response.raise_for_status() + return response.json() + + +def send_to_sqs(room_info: Dict[str, Any]) -> None: + if not SQS_QUEUE_URL: + raise ValueError("SQS_QUEUE_URL environment variable is not set") + + if sqs: + sqs.send_message( + QueueUrl=SQS_QUEUE_URL, + MessageBody=json.dumps(room_info) + ) + print(f"Message sent to SQS queue: {SQS_QUEUE_URL}") + else: + print(f"Simulated sending message to SQS queue: {SQS_QUEUE_URL}") + print(f"Message body: {json.dumps(room_info, indent=2)}") + + +def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: + try: + path = event.get('rawPath', '') + path = path.rstrip('/') + + if path == '/authenticate': + print("Creating Daily room...") + daily_room = create_daily_room() + print(f"Daily room created: {json.dumps(daily_room, indent=2)}") + + token = create_token(daily_room['name']) + print(f"Meeting token created: {json.dumps(token, indent=2)}") + + return { + 'statusCode': 200, + 'body': json.dumps({"room": daily_room['name'], "token": token['token']}) + } + elif path == '/start_bot': + body = json.loads(event['body']) + + print("Sending room info to SQS...") + + send_to_sqs(body) + return { + 'statusCode': 200, + 'body': json.dumps({"status": "healthy"}) + } + else: + return { + 'statusCode': 200, + 'body': json.dumps({"success": True}) + } + except Exception as e: + print(f"Error occurred: {str(e)}") + return { + 'statusCode': 500, + 'body': json.dumps(f'Error: {str(e)}') + } + + +if __name__ == "__main__": + # Simulate the Lambda event and context + # event = {} + event = { + # "rawPath": "/authenticate" + "rawPath": "/start_bot", + "body": json.dumps({"room": "test-room", "token": "test"}) + } + context = None + + result = lambda_handler(event, context) + print(f"Lambda handler result: {json.dumps(result, indent=2)}")