Compare commits

...

5 Commits

Author SHA1 Message Date
James Hush
a11fa05370 Save progress 2025-01-02 08:35:05 +08:00
James Hush
ed7b512f40 Change logic to launch 4 bots per min 2024-12-20 15:40:48 +08:00
James Hush
6377825cff Add way to specify phone number 2024-12-19 14:40:54 +08:00
James Hush
875ecf7fcb Add command line args
Fix
2024-12-19 14:40:50 +08:00
James Hush
8fb75cf6a7 test: batch dialout demo
Rename

Save changes

Batches of 50

No sort

Only write dialout false

Add rate limit handling

Save

Improvements

Save progress with recording

Fixed stop

Add backoff

Save logs

Make table nicer

Move to new folder
2024-12-19 14:40:47 +08:00
6 changed files with 2267 additions and 1 deletions

View File

@@ -0,0 +1,21 @@
import json
with open("logs.json", "r") as file:
logs = json.load(file)
# Find IDs with duration less than 245 seconds
ids_with_short_duration = [
[entry["id"], entry["mtgSessionId"], entry["duration"]]
for entry in logs["data"]
if entry["duration"] < 245
]
print("\n=== Entries with Duration < 245 seconds ===")
print(f"Total entries found: {len(ids_with_short_duration)} / 144")
print("-" * 100)
print(f"{'Recording ID':36} | {'Session ID':36} | {'Duration':10}")
print("-" * 100)
for rec_id, session_id, duration in ids_with_short_duration:
print(f"{rec_id:20} | {session_id:20} | {duration:8} seconds")
print("-" * 100 + "\n")

View File

@@ -0,0 +1,217 @@
import argparse
import asyncio
import os
import sys
import aiohttp
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame, LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
logger.remove(0)
logger.add(sys.stderr, level="INFO")
daily_api_key = os.getenv("DAILY_API_KEY", "")
daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1")
class DialoutBot:
def __init__(self, room_url: str, token: str, callId: int, run_number: int, phone_number: str):
self.recording_id = None
self.room_url = room_url
self.token = token
self.callId = callId
self.run_number = run_number
self.phone_number = phone_number
async def run(self):
transport = DailyTransport(
self.room_url,
self.token,
"Chatbot",
DailyParams(
api_url=daily_api_url,
api_key=daily_api_key,
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
tts = DeepgramTTSService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
voice="aura-helios-en",
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying 'Oh, hello! Who dares dial me at this hour?!'.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
def get_phone_number(callId: int, run_number: int) -> str:
# if self.phone_number:
# return self.phone_number
if run_number % 2 == 0:
phone_numbers = [
"+12097135124", # James
"+12097135125", # James
self.phone_number,
# "+19499870006", # Varun
]
return phone_numbers[callId % len(phone_numbers)]
else:
phone_numbers = [
"+14155204406", # James
"+18187229086", # James (Avoca)
self.phone_number,
# "+16673870006", # Varun
]
return phone_numbers[callId % len(phone_numbers)]
@transport.event_handler("on_call_state_updated")
async def on_call_state_updated(transport, state):
logger.info(f"on_call_state_updated, state: {state}")
# dialout_id = None
if state == "joined":
logger.info(f"on_call_state_updated {state}")
backoff_time = 1 # Initial backoff time in seconds
for _ in range(5):
try:
phone_number = get_phone_number(self.callId, self.run_number)
logger.info(f"Starting dialout to {phone_number}")
settings = {
"phoneNumber": phone_number,
"display_name": "Dialout User",
}
await transport.start_dialout(settings)
break # Break out of the loop if start_dialout is successful
except Exception as e:
logger.error(f"Error starting dialout: {e}")
await asyncio.sleep(backoff_time) # Wait for the current backoff time
backoff_time *= 2 # Double the backoff time for the next attempt
if state == "left":
logger.info(f"on_call_state_updated {state}")
# await transport.stop_dialout(dialout_id)
async with aiohttp.ClientSession() as aiohttp_session:
print(f"Deleting room: {self.room_url}")
rest = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
await rest.delete_room_by_url(self.room_url)
# @transport.event_handler("on_first_participant_joined")
# async def on_first_participant_joined(transport, participant):
# await transport.capture_participant_transcription(participant["id"])
# await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_dialout_answered")
async def on_dialout_answered(transport, participant):
logger.info(f"on_dialout_answered {participant["participantId"]}")
streaming_settings = {
"minIdleTimeOut": 10,
"layout": {
"preset": "audio-only",
},
}
backoff_time = 1 # Initial backoff time in seconds
for _ in range(5):
try:
await transport.start_recording(streaming_settings=streaming_settings)
break # Break out of the loop if start_dialout is successful
except Exception as e:
logger.error(f"Error starting recording: {e}")
await asyncio.sleep(backoff_time) # Wait for the current backoff time
backoff_time *= 2 # Double the backoff time for the next attempt
await transport.capture_participant_transcription(participant["participantId"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_recording_started")
async def on_recording_started(transport, stream_id):
self.recording_id = stream_id["streamId"]
logger.info(f"Recording started: {self.recording_id}")
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
logger.info(f"Participant left: {participant}, reason: {reason}")
logger.info(f"Stopping recording: {self.recording_id}")
await transport.stop_recording(self.recording_id)
await task.queue_frame(EndFrame())
@transport.event_handler("on_recording_error")
async def on_recording_error(transport, error):
logger.error(f"Recording error: {error}")
await task.queue_frame(EndFrame())
@transport.event_handler("on_dialout_error")
async def on_dialout_error(transport, error):
logger.error(f"Dialout error: {error}")
await task.queue_frame(EndFrame())
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pipecat Simple ChatBot")
parser.add_argument("-u", type=str, help="Room URL")
parser.add_argument("-t", type=str, help="Token")
parser.add_argument("-i", type=str, help="Call ID")
parser.add_argument("-r", type=str, help="Run Number")
parser.add_argument("-p", type=str, help="Phone Number")
config = parser.parse_args()
bot = DialoutBot(config.u, config.t, int(config.i), int(config.r), config.p)
try:
asyncio.run(bot.run())
except Exception as e:
logger.error(f"++++++++++++++ Error: {e}")
sys.exit(1)

View File

@@ -0,0 +1,109 @@
import argparse
import asyncio
import csv
import os
import subprocess
import time
from datetime import datetime
import aiohttp
from pipecat.transports.services.helpers.daily_rest import (
DailyRESTHelper,
DailyRoomParams,
DailyRoomProperties,
)
async def run_bot(id: int, run_number: int, bot_run_time: int, phone_number: str, csv_writer):
async with aiohttp.ClientSession() as aiohttp_session:
print(f"Starting bot number: {id}")
rest = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
# Create daily.co room with dialin and dialout enabled
exp = time.time() + bot_run_time + 600 + 60
room_params = DailyRoomParams(
properties=DailyRoomProperties(
exp=exp,
enable_dialout=True,
eject_at_room_exp=True,
enable_recording="cloud",
)
)
try:
# Create the room with the specified parameters
room = await rest.create_room(room_params)
# Create token with owner permissions
token = await rest.get_token(
room_url=room.url,
expiry_time=60 * 60,
owner=True, # Ensure the token has owner permissions
)
# print(f"{id}: Room Token: {token}")
room_info = await rest.get_room_from_url(room.url)
# print(f"{id}: Room Info: {room_info}")
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
# Other party joined or not and start dialout joined
csv_writer.writerow([id, room_info.config.enable_dialout, current_time])
except Exception as e:
print(f"Error creating room for bot {id}: {e}")
print("Sleeping for 10 seconds")
await asyncio.sleep(10)
csv_writer.writerow(
[id, "Rate Limit Error", datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]]
)
await asyncio.sleep(15)
bot_proc = f"python3 -m batch_dialout_bot -u {room.url} -t {token} -i {id} -r {run_number} -p {phone_number}"
try:
subprocess.Popen(
[bot_proc], shell=True, bufsize=1, cwd=os.path.dirname(os.path.abspath(__file__))
)
except Exception as e:
print(f"Failed to start subprocess: {e}")
async def main():
parser = argparse.ArgumentParser(description="Pipecat Simple Dialout Bot")
parser.add_argument("-b", type=int, help="Number of bots per run")
parser.add_argument("-r", type=int, help="Number of runs")
parser.add_argument("-t", type=int, help="Time to run the bot in seconds")
parser.add_argument("-p", type=str, help="Phone Number")
config = parser.parse_args()
number_of_batches = int(config.r)
number_of_bots = int(config.b)
bot_run_time = int(config.t)
phone_number = config.p
# Open the CSV file in append mode
with open("output.csv", mode="w", newline="") as file:
csv_writer = csv.writer(file)
# Write the header row
csv_writer.writerow(["bot_id", "enable_dialout", "timestamp"])
for run_number in range(number_of_batches):
print(f"-- Starting batch run number: {run_number} of {number_of_batches}")
bots = [
run_bot(i, run_number, bot_run_time, phone_number, csv_writer)
for i in range(number_of_bots)
]
print(f"-- Number of bots: {len(bots)}")
await asyncio.gather(*bots)
print(f"-- Waiting {bot_run_time} seconds...")
await asyncio.sleep(bot_run_time)
print(f"-- Finished waiting {bot_run_time} seconds...")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Parent process interrupted")

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,41 @@
bot_id,enable_dialout,timestamp
0,True,2024-12-20 17:18:47.197
1,True,2024-12-20 17:18:47.247
0,True,2024-12-20 17:20:03.366
1,True,2024-12-20 17:20:03.392
1,True,2024-12-20 17:21:19.477
0,True,2024-12-20 17:21:19.514
0,True,2024-12-20 17:22:35.613
1,True,2024-12-20 17:22:35.637
3,True,2024-12-20 17:10:15.329
2,True,2024-12-20 17:10:15.356
1,True,2024-12-20 17:10:15.359
0,True,2024-12-20 17:10:15.389
3,True,2024-12-20 17:11:31.428
0,True,2024-12-20 17:11:31.469
2,True,2024-12-20 17:11:31.511
1,True,2024-12-20 17:11:31.670
0,True,2024-12-20 17:12:47.680
3,True,2024-12-20 17:12:47.753
1,True,2024-12-20 17:12:47.770
2,True,2024-12-20 17:12:47.779
3,True,2024-12-20 17:14:03.844
1,True,2024-12-20 17:14:03.847
2,True,2024-12-20 17:14:03.883
0,True,2024-12-20 17:14:03.977
3,True,2024-12-20 17:15:20.026
0,True,2024-12-20 17:15:20.069
1,True,2024-12-20 17:15:20.071
2,True,2024-12-20 17:15:20.127
0,True,2024-12-20 17:16:36.155
1,True,2024-12-20 17:16:36.166
3,True,2024-12-20 17:16:36.217
2,True,2024-12-20 17:16:36.248
3,True,2024-12-20 17:17:52.320
2,True,2024-12-20 17:17:52.357
0,True,2024-12-20 17:17:52.401
1,True,2024-12-20 17:17:52.477
1,True,2024-12-20 17:19:08.543
3,True,2024-12-20 17:19:08.553
2,True,2024-12-20 17:19:08.578
0,True,2024-12-20 17:19:08.656
1 bot_id enable_dialout timestamp
2 0 True 2024-12-20 17:18:47.197
3 1 True 2024-12-20 17:18:47.247
4 0 True 2024-12-20 17:20:03.366
5 1 True 2024-12-20 17:20:03.392
6 1 True 2024-12-20 17:21:19.477
7 0 True 2024-12-20 17:21:19.514
8 0 True 2024-12-20 17:22:35.613
9 1 True 2024-12-20 17:22:35.637
10 3 True 2024-12-20 17:10:15.329
11 2 True 2024-12-20 17:10:15.356
12 1 True 2024-12-20 17:10:15.359
13 0 True 2024-12-20 17:10:15.389
14 3 True 2024-12-20 17:11:31.428
15 0 True 2024-12-20 17:11:31.469
16 2 True 2024-12-20 17:11:31.511
17 1 True 2024-12-20 17:11:31.670
18 0 True 2024-12-20 17:12:47.680
19 3 True 2024-12-20 17:12:47.753
20 1 True 2024-12-20 17:12:47.770
21 2 True 2024-12-20 17:12:47.779
22 3 True 2024-12-20 17:14:03.844
23 1 True 2024-12-20 17:14:03.847
24 2 True 2024-12-20 17:14:03.883
25 0 True 2024-12-20 17:14:03.977
26 3 True 2024-12-20 17:15:20.026
27 0 True 2024-12-20 17:15:20.069
28 1 True 2024-12-20 17:15:20.071
29 2 True 2024-12-20 17:15:20.127
30 0 True 2024-12-20 17:16:36.155
31 1 True 2024-12-20 17:16:36.166
32 3 True 2024-12-20 17:16:36.217
33 2 True 2024-12-20 17:16:36.248
34 3 True 2024-12-20 17:17:52.320
35 2 True 2024-12-20 17:17:52.357
36 0 True 2024-12-20 17:17:52.401
37 1 True 2024-12-20 17:17:52.477
38 1 True 2024-12-20 17:19:08.543
39 3 True 2024-12-20 17:19:08.553
40 2 True 2024-12-20 17:19:08.578
41 0 True 2024-12-20 17:19:08.656

View File

@@ -170,7 +170,8 @@ async def daily_start_bot(request: Request) -> JSONResponse:
# forward the call to the SIP URi when dialin_ready fires.
# Use specified room URL, or create a new one if not specified
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", None)
# room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", None)
room_url = None
# Get the dial-in properties from the request
try:
data = await request.json()