"""WebSocket endpoint test client. Tests the /ws endpoint with sine wave or file audio streaming. Based on reference/py-active-call/exec/test_ws_endpoint/test_ws.py """ import asyncio import aiohttp import json import struct import math import argparse import os from datetime import datetime # Configuration SERVER_URL = "ws://localhost:8000/ws" SAMPLE_RATE = 16000 FREQUENCY = 440 # 440Hz Sine Wave CHUNK_DURATION_MS = 20 # 16kHz * 16-bit (2 bytes) * 20ms = 640 bytes per chunk CHUNK_SIZE_BYTES = int(SAMPLE_RATE * 2 * (CHUNK_DURATION_MS / 1000.0)) def generate_sine_wave(duration_ms=1000): """Generates sine wave audio (16kHz mono PCM 16-bit).""" num_samples = int(SAMPLE_RATE * (duration_ms / 1000.0)) audio_data = bytearray() for x in range(num_samples): # Generate sine wave sample value = int(32767.0 * math.sin(2 * math.pi * FREQUENCY * x / SAMPLE_RATE)) # Pack as little-endian 16-bit integer audio_data.extend(struct.pack('