Compare commits
5 Commits
hush/usage
...
hush/roomC
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a11fa05370 | ||
|
|
ed7b512f40 | ||
|
|
6377825cff | ||
|
|
875ecf7fcb | ||
|
|
8fb75cf6a7 |
21
examples/batch-dialout-chatbot/analyze_logs.py
Normal file
21
examples/batch-dialout-chatbot/analyze_logs.py
Normal file
@@ -0,0 +1,21 @@
|
||||
import json
|
||||
|
||||
with open("logs.json", "r") as file:
|
||||
logs = json.load(file)
|
||||
|
||||
# Find IDs with duration less than 245 seconds
|
||||
ids_with_short_duration = [
|
||||
[entry["id"], entry["mtgSessionId"], entry["duration"]]
|
||||
for entry in logs["data"]
|
||||
if entry["duration"] < 245
|
||||
]
|
||||
|
||||
print("\n=== Entries with Duration < 245 seconds ===")
|
||||
print(f"Total entries found: {len(ids_with_short_duration)} / 144")
|
||||
print("-" * 100)
|
||||
print(f"{'Recording ID':36} | {'Session ID':36} | {'Duration':10}")
|
||||
print("-" * 100)
|
||||
|
||||
for rec_id, session_id, duration in ids_with_short_duration:
|
||||
print(f"{rec_id:20} | {session_id:20} | {duration:8} seconds")
|
||||
print("-" * 100 + "\n")
|
||||
217
examples/batch-dialout-chatbot/batch_dialout_bot.py
Normal file
217
examples/batch-dialout-chatbot/batch_dialout_bot.py
Normal file
@@ -0,0 +1,217 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import EndFrame, LLMMessagesFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.deepgram import DeepgramTTSService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="INFO")
|
||||
|
||||
daily_api_key = os.getenv("DAILY_API_KEY", "")
|
||||
daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1")
|
||||
|
||||
|
||||
class DialoutBot:
|
||||
def __init__(self, room_url: str, token: str, callId: int, run_number: int, phone_number: str):
|
||||
self.recording_id = None
|
||||
self.room_url = room_url
|
||||
self.token = token
|
||||
self.callId = callId
|
||||
self.run_number = run_number
|
||||
self.phone_number = phone_number
|
||||
|
||||
async def run(self):
|
||||
transport = DailyTransport(
|
||||
self.room_url,
|
||||
self.token,
|
||||
"Chatbot",
|
||||
DailyParams(
|
||||
api_url=daily_api_url,
|
||||
api_key=daily_api_key,
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
camera_out_enabled=False,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
transcription_enabled=True,
|
||||
),
|
||||
)
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
)
|
||||
|
||||
tts = DeepgramTTSService(
|
||||
api_key=os.getenv("DEEPGRAM_API_KEY"),
|
||||
voice="aura-helios-en",
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying 'Oh, hello! Who dares dial me at this hour?!'.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
context_aggregator.user(),
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
def get_phone_number(callId: int, run_number: int) -> str:
|
||||
# if self.phone_number:
|
||||
# return self.phone_number
|
||||
|
||||
if run_number % 2 == 0:
|
||||
phone_numbers = [
|
||||
"+12097135124", # James
|
||||
"+12097135125", # James
|
||||
self.phone_number,
|
||||
# "+19499870006", # Varun
|
||||
]
|
||||
return phone_numbers[callId % len(phone_numbers)]
|
||||
else:
|
||||
phone_numbers = [
|
||||
"+14155204406", # James
|
||||
"+18187229086", # James (Avoca)
|
||||
self.phone_number,
|
||||
# "+16673870006", # Varun
|
||||
]
|
||||
return phone_numbers[callId % len(phone_numbers)]
|
||||
|
||||
@transport.event_handler("on_call_state_updated")
|
||||
async def on_call_state_updated(transport, state):
|
||||
logger.info(f"on_call_state_updated, state: {state}")
|
||||
# dialout_id = None
|
||||
|
||||
if state == "joined":
|
||||
logger.info(f"on_call_state_updated {state}")
|
||||
|
||||
backoff_time = 1 # Initial backoff time in seconds
|
||||
|
||||
for _ in range(5):
|
||||
try:
|
||||
phone_number = get_phone_number(self.callId, self.run_number)
|
||||
logger.info(f"Starting dialout to {phone_number}")
|
||||
settings = {
|
||||
"phoneNumber": phone_number,
|
||||
"display_name": "Dialout User",
|
||||
}
|
||||
await transport.start_dialout(settings)
|
||||
break # Break out of the loop if start_dialout is successful
|
||||
except Exception as e:
|
||||
logger.error(f"Error starting dialout: {e}")
|
||||
await asyncio.sleep(backoff_time) # Wait for the current backoff time
|
||||
backoff_time *= 2 # Double the backoff time for the next attempt
|
||||
|
||||
if state == "left":
|
||||
logger.info(f"on_call_state_updated {state}")
|
||||
# await transport.stop_dialout(dialout_id)
|
||||
async with aiohttp.ClientSession() as aiohttp_session:
|
||||
print(f"Deleting room: {self.room_url}")
|
||||
rest = DailyRESTHelper(
|
||||
daily_api_key=os.getenv("DAILY_API_KEY", ""),
|
||||
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
|
||||
aiohttp_session=aiohttp_session,
|
||||
)
|
||||
await rest.delete_room_by_url(self.room_url)
|
||||
|
||||
# @transport.event_handler("on_first_participant_joined")
|
||||
# async def on_first_participant_joined(transport, participant):
|
||||
# await transport.capture_participant_transcription(participant["id"])
|
||||
# await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@transport.event_handler("on_dialout_answered")
|
||||
async def on_dialout_answered(transport, participant):
|
||||
logger.info(f"on_dialout_answered {participant["participantId"]}")
|
||||
streaming_settings = {
|
||||
"minIdleTimeOut": 10,
|
||||
"layout": {
|
||||
"preset": "audio-only",
|
||||
},
|
||||
}
|
||||
|
||||
backoff_time = 1 # Initial backoff time in seconds
|
||||
for _ in range(5):
|
||||
try:
|
||||
await transport.start_recording(streaming_settings=streaming_settings)
|
||||
break # Break out of the loop if start_dialout is successful
|
||||
except Exception as e:
|
||||
logger.error(f"Error starting recording: {e}")
|
||||
await asyncio.sleep(backoff_time) # Wait for the current backoff time
|
||||
backoff_time *= 2 # Double the backoff time for the next attempt
|
||||
|
||||
await transport.capture_participant_transcription(participant["participantId"])
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@transport.event_handler("on_recording_started")
|
||||
async def on_recording_started(transport, stream_id):
|
||||
self.recording_id = stream_id["streamId"]
|
||||
logger.info(f"Recording started: {self.recording_id}")
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
async def on_participant_left(transport, participant, reason):
|
||||
logger.info(f"Participant left: {participant}, reason: {reason}")
|
||||
logger.info(f"Stopping recording: {self.recording_id}")
|
||||
await transport.stop_recording(self.recording_id)
|
||||
await task.queue_frame(EndFrame())
|
||||
|
||||
@transport.event_handler("on_recording_error")
|
||||
async def on_recording_error(transport, error):
|
||||
logger.error(f"Recording error: {error}")
|
||||
await task.queue_frame(EndFrame())
|
||||
|
||||
@transport.event_handler("on_dialout_error")
|
||||
async def on_dialout_error(transport, error):
|
||||
logger.error(f"Dialout error: {error}")
|
||||
await task.queue_frame(EndFrame())
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Pipecat Simple ChatBot")
|
||||
parser.add_argument("-u", type=str, help="Room URL")
|
||||
parser.add_argument("-t", type=str, help="Token")
|
||||
parser.add_argument("-i", type=str, help="Call ID")
|
||||
parser.add_argument("-r", type=str, help="Run Number")
|
||||
parser.add_argument("-p", type=str, help="Phone Number")
|
||||
config = parser.parse_args()
|
||||
|
||||
bot = DialoutBot(config.u, config.t, int(config.i), int(config.r), config.p)
|
||||
|
||||
try:
|
||||
asyncio.run(bot.run())
|
||||
except Exception as e:
|
||||
logger.error(f"++++++++++++++ Error: {e}")
|
||||
sys.exit(1)
|
||||
109
examples/batch-dialout-chatbot/batch_dialout_demo.py
Normal file
109
examples/batch-dialout-chatbot/batch_dialout_demo.py
Normal file
@@ -0,0 +1,109 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
import csv
|
||||
import os
|
||||
import subprocess
|
||||
import time
|
||||
from datetime import datetime
|
||||
|
||||
import aiohttp
|
||||
|
||||
from pipecat.transports.services.helpers.daily_rest import (
|
||||
DailyRESTHelper,
|
||||
DailyRoomParams,
|
||||
DailyRoomProperties,
|
||||
)
|
||||
|
||||
|
||||
async def run_bot(id: int, run_number: int, bot_run_time: int, phone_number: str, csv_writer):
|
||||
async with aiohttp.ClientSession() as aiohttp_session:
|
||||
print(f"Starting bot number: {id}")
|
||||
rest = DailyRESTHelper(
|
||||
daily_api_key=os.getenv("DAILY_API_KEY", ""),
|
||||
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
|
||||
aiohttp_session=aiohttp_session,
|
||||
)
|
||||
|
||||
# Create daily.co room with dialin and dialout enabled
|
||||
exp = time.time() + bot_run_time + 600 + 60
|
||||
room_params = DailyRoomParams(
|
||||
properties=DailyRoomProperties(
|
||||
exp=exp,
|
||||
enable_dialout=True,
|
||||
eject_at_room_exp=True,
|
||||
enable_recording="cloud",
|
||||
)
|
||||
)
|
||||
|
||||
try:
|
||||
# Create the room with the specified parameters
|
||||
room = await rest.create_room(room_params)
|
||||
# Create token with owner permissions
|
||||
token = await rest.get_token(
|
||||
room_url=room.url,
|
||||
expiry_time=60 * 60,
|
||||
owner=True, # Ensure the token has owner permissions
|
||||
)
|
||||
# print(f"{id}: Room Token: {token}")
|
||||
room_info = await rest.get_room_from_url(room.url)
|
||||
# print(f"{id}: Room Info: {room_info}")
|
||||
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
|
||||
# Other party joined or not and start dialout joined
|
||||
csv_writer.writerow([id, room_info.config.enable_dialout, current_time])
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error creating room for bot {id}: {e}")
|
||||
print("Sleeping for 10 seconds")
|
||||
await asyncio.sleep(10)
|
||||
csv_writer.writerow(
|
||||
[id, "Rate Limit Error", datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]]
|
||||
)
|
||||
|
||||
await asyncio.sleep(15)
|
||||
bot_proc = f"python3 -m batch_dialout_bot -u {room.url} -t {token} -i {id} -r {run_number} -p {phone_number}"
|
||||
|
||||
try:
|
||||
subprocess.Popen(
|
||||
[bot_proc], shell=True, bufsize=1, cwd=os.path.dirname(os.path.abspath(__file__))
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"Failed to start subprocess: {e}")
|
||||
|
||||
|
||||
async def main():
|
||||
parser = argparse.ArgumentParser(description="Pipecat Simple Dialout Bot")
|
||||
parser.add_argument("-b", type=int, help="Number of bots per run")
|
||||
parser.add_argument("-r", type=int, help="Number of runs")
|
||||
parser.add_argument("-t", type=int, help="Time to run the bot in seconds")
|
||||
parser.add_argument("-p", type=str, help="Phone Number")
|
||||
config = parser.parse_args()
|
||||
|
||||
number_of_batches = int(config.r)
|
||||
number_of_bots = int(config.b)
|
||||
bot_run_time = int(config.t)
|
||||
phone_number = config.p
|
||||
|
||||
# Open the CSV file in append mode
|
||||
with open("output.csv", mode="w", newline="") as file:
|
||||
csv_writer = csv.writer(file)
|
||||
# Write the header row
|
||||
csv_writer.writerow(["bot_id", "enable_dialout", "timestamp"])
|
||||
|
||||
for run_number in range(number_of_batches):
|
||||
print(f"-- Starting batch run number: {run_number} of {number_of_batches}")
|
||||
bots = [
|
||||
run_bot(i, run_number, bot_run_time, phone_number, csv_writer)
|
||||
for i in range(number_of_bots)
|
||||
]
|
||||
print(f"-- Number of bots: {len(bots)}")
|
||||
await asyncio.gather(*bots)
|
||||
print(f"-- Waiting {bot_run_time} seconds...")
|
||||
await asyncio.sleep(bot_run_time)
|
||||
print(f"-- Finished waiting {bot_run_time} seconds...")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
asyncio.run(main())
|
||||
except KeyboardInterrupt:
|
||||
print("Parent process interrupted")
|
||||
1877
examples/batch-dialout-chatbot/logs.json
Normal file
1877
examples/batch-dialout-chatbot/logs.json
Normal file
File diff suppressed because it is too large
Load Diff
41
examples/batch-dialout-chatbot/output.csv
Normal file
41
examples/batch-dialout-chatbot/output.csv
Normal file
@@ -0,0 +1,41 @@
|
||||
bot_id,enable_dialout,timestamp
|
||||
0,True,2024-12-20 17:18:47.197
|
||||
1,True,2024-12-20 17:18:47.247
|
||||
0,True,2024-12-20 17:20:03.366
|
||||
1,True,2024-12-20 17:20:03.392
|
||||
1,True,2024-12-20 17:21:19.477
|
||||
0,True,2024-12-20 17:21:19.514
|
||||
0,True,2024-12-20 17:22:35.613
|
||||
1,True,2024-12-20 17:22:35.637
|
||||
3,True,2024-12-20 17:10:15.329
|
||||
2,True,2024-12-20 17:10:15.356
|
||||
1,True,2024-12-20 17:10:15.359
|
||||
0,True,2024-12-20 17:10:15.389
|
||||
3,True,2024-12-20 17:11:31.428
|
||||
0,True,2024-12-20 17:11:31.469
|
||||
2,True,2024-12-20 17:11:31.511
|
||||
1,True,2024-12-20 17:11:31.670
|
||||
0,True,2024-12-20 17:12:47.680
|
||||
3,True,2024-12-20 17:12:47.753
|
||||
1,True,2024-12-20 17:12:47.770
|
||||
2,True,2024-12-20 17:12:47.779
|
||||
3,True,2024-12-20 17:14:03.844
|
||||
1,True,2024-12-20 17:14:03.847
|
||||
2,True,2024-12-20 17:14:03.883
|
||||
0,True,2024-12-20 17:14:03.977
|
||||
3,True,2024-12-20 17:15:20.026
|
||||
0,True,2024-12-20 17:15:20.069
|
||||
1,True,2024-12-20 17:15:20.071
|
||||
2,True,2024-12-20 17:15:20.127
|
||||
0,True,2024-12-20 17:16:36.155
|
||||
1,True,2024-12-20 17:16:36.166
|
||||
3,True,2024-12-20 17:16:36.217
|
||||
2,True,2024-12-20 17:16:36.248
|
||||
3,True,2024-12-20 17:17:52.320
|
||||
2,True,2024-12-20 17:17:52.357
|
||||
0,True,2024-12-20 17:17:52.401
|
||||
1,True,2024-12-20 17:17:52.477
|
||||
1,True,2024-12-20 17:19:08.543
|
||||
3,True,2024-12-20 17:19:08.553
|
||||
2,True,2024-12-20 17:19:08.578
|
||||
0,True,2024-12-20 17:19:08.656
|
||||
|
@@ -170,7 +170,8 @@ async def daily_start_bot(request: Request) -> JSONResponse:
|
||||
# forward the call to the SIP URi when dialin_ready fires.
|
||||
|
||||
# Use specified room URL, or create a new one if not specified
|
||||
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", None)
|
||||
# room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", None)
|
||||
room_url = None
|
||||
# Get the dial-in properties from the request
|
||||
try:
|
||||
data = await request.json()
|
||||
|
||||
Reference in New Issue
Block a user