#!/usr/bin/env python3 """ WAV file client for testing duplex voice conversation. This client reads audio from a WAV file, sends it to the server, and saves a stereo WAV file with the input audio on the left channel and the AI's voice response on the right channel. Usage: python examples/wav_client.py --input input.wav --output response.wav python examples/wav_client.py --input input.wav --output response.wav --url ws://localhost:8000/ws python examples/wav_client.py --input input.wav --output response.wav --wait-time 10 python wav_client.py --input ../data/audio_examples/two_utterances.wav -o response.wav Requirements: pip install soundfile websockets numpy """ import argparse import asyncio import json import sys import time import wave from pathlib import Path from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit try: import numpy as np except ImportError: print("Please install numpy: pip install numpy") sys.exit(1) try: import soundfile as sf except ImportError: print("Please install soundfile: pip install soundfile") sys.exit(1) try: import websockets except ImportError: print("Please install websockets: pip install websockets") sys.exit(1) class WavFileClient: """ WAV file client for voice conversation testing. Features: - Read audio from WAV file - Send audio to WebSocket server - Receive and save stereo conversation audio - Event logging """ def __init__( self, url: str, input_file: str, output_file: str, assistant_id: str = "default", channel: str = "wav_client", sample_rate: int = 16000, chunk_duration_ms: int = 20, wait_time: float = 15.0, verbose: bool = False, track_debug: bool = False, tail_silence_ms: int = 800, ): """ Initialize WAV file client. Args: url: WebSocket server URL input_file: Input WAV file path output_file: Output WAV file path sample_rate: Audio sample rate (Hz) chunk_duration_ms: Audio chunk duration (ms) for sending wait_time: Time to wait for response after sending (seconds) verbose: Enable verbose output """ self.url = url self.input_file = Path(input_file) self.output_file = Path(output_file) self.assistant_id = assistant_id self.channel = channel self.sample_rate = sample_rate self.chunk_duration_ms = chunk_duration_ms self.chunk_samples = int(sample_rate * chunk_duration_ms / 1000) self.wait_time = wait_time self.verbose = verbose self.track_debug = track_debug self.tail_silence_ms = max(0, int(tail_silence_ms)) self.frame_bytes = 640 # 16k mono pcm_s16le, 20ms # WebSocket connection self.ws = None self.running = False # Audio buffers self.input_audio = np.array([], dtype=np.int16) self.received_audio = bytearray() self.output_segments: list[dict[str, object]] = [] self.current_output_segment: bytearray | None = None # Statistics self.bytes_sent = 0 self.bytes_received = 0 # TTFB tracking (per response) self.send_start_time = None self.response_start_time = None # set on each output.audio.start self.waiting_for_first_audio = False self.ttfb_ms = None # last TTFB for summary self.ttfb_list = [] # TTFB for each response # State tracking self.track_started = False self.track_ended = False self.send_completed = False self.session_ready = False # Events log self.events_log = [] def log_event(self, direction: str, message: str) -> None: """Log an event with timestamp.""" timestamp = time.time() self.events_log.append( { "timestamp": timestamp, "direction": direction, "message": message, } ) try: print(f"{direction} {message}") except UnicodeEncodeError: safe_message = message.encode("ascii", errors="replace").decode("ascii") print(f"{direction} {safe_message}") @staticmethod def _event_ids_suffix(event: dict) -> str: data = event.get("data") if isinstance(event.get("data"), dict) else {} keys = ("turn_id", "utterance_id", "response_id", "tool_call_id", "tts_id") parts = [] for key in keys: value = data.get(key, event.get(key)) if value: parts.append(f"{key}={value}") return f" [{' '.join(parts)}]" if parts else "" def _session_url(self) -> str: parts = urlsplit(self.url) query = dict(parse_qsl(parts.query, keep_blank_values=True)) query["assistant_id"] = self.assistant_id return urlunsplit((parts.scheme, parts.netloc, parts.path, urlencode(query), parts.fragment)) def _current_timeline_sample(self) -> int: """Return current sample position relative to input send start.""" if self.send_start_time is None: return 0 elapsed_seconds = max(0.0, time.time() - self.send_start_time) return int(round(elapsed_seconds * self.sample_rate)) def _start_output_segment(self) -> None: """Create a new assistant-audio segment if one is not active.""" if self.current_output_segment is not None: return self.current_output_segment = bytearray() self.output_segments.append( { "start_sample": self._current_timeline_sample(), "audio": self.current_output_segment, } ) def _close_output_segment(self) -> None: """Close the active assistant-audio segment, if any.""" self.current_output_segment = None def _build_input_track(self) -> np.ndarray: """Build the saved left channel using the streamed input audio.""" input_track = self.input_audio.astype(np.int16, copy=True) tail_samples = int(round(self.sample_rate * self.tail_silence_ms / 1000.0)) if tail_samples <= 0: return input_track if input_track.size == 0: return np.zeros(tail_samples, dtype=np.int16) return np.concatenate((input_track, np.zeros(tail_samples, dtype=np.int16))) def _build_output_track(self) -> np.ndarray: """Build the saved right channel using received assistant audio.""" if not self.output_segments: return np.zeros(0, dtype=np.int16) total_samples = max( int(segment["start_sample"]) + (len(segment["audio"]) // 2) for segment in self.output_segments ) mixed_track = np.zeros(total_samples, dtype=np.int32) for segment in self.output_segments: start_sample = int(segment["start_sample"]) segment_audio = np.frombuffer(bytes(segment["audio"]), dtype=np.int16).astype(np.int32) if segment_audio.size == 0: continue end_sample = start_sample + segment_audio.size mixed_track[start_sample:end_sample] += segment_audio np.clip(mixed_track, -32768, 32767, out=mixed_track) return mixed_track.astype(np.int16) async def connect(self) -> None: """Connect to WebSocket server.""" session_url = self._session_url() self.log_event("->", f"Connecting to {session_url}...") self.ws = await websockets.connect(session_url) self.running = True self.log_event("->", "Connected!") await self.send_command( { "type": "session.start", "audio": { "encoding": "pcm_s16le", "sample_rate_hz": self.sample_rate, "channels": 1, }, "metadata": { "channel": self.channel, "source": "wav_client", }, } ) async def send_command(self, cmd: dict) -> None: """Send JSON command to server.""" if self.ws: await self.ws.send(json.dumps(cmd)) self.log_event("->", f"Command: {cmd.get('type', 'unknown')}") async def send_hangup(self, reason: str = "Session complete") -> None: """Send hangup command.""" await self.send_command({"type": "session.stop", "reason": reason}) def load_wav_file(self) -> tuple[np.ndarray, int]: """ Load and prepare WAV file for sending. Returns: Tuple of (audio_data as int16 numpy array, original sample rate) """ if not self.input_file.exists(): raise FileNotFoundError(f"Input file not found: {self.input_file}") audio_data, file_sample_rate = sf.read(self.input_file) self.log_event("->", f"Loaded: {self.input_file}") self.log_event("->", f" Original sample rate: {file_sample_rate} Hz") self.log_event("->", f" Duration: {len(audio_data) / file_sample_rate:.2f}s") if len(audio_data.shape) > 1: audio_data = audio_data.mean(axis=1) self.log_event("->", " Converted stereo to mono") if file_sample_rate != self.sample_rate: duration = len(audio_data) / file_sample_rate num_samples = int(duration * self.sample_rate) indices = np.linspace(0, len(audio_data) - 1, num_samples) audio_data = np.interp(indices, np.arange(len(audio_data)), audio_data) self.log_event("->", f" Resampled to {self.sample_rate} Hz") if audio_data.dtype != np.int16: max_val = np.max(np.abs(audio_data)) if max_val > 1.0: audio_data = audio_data / max_val audio_data = (audio_data * 32767).astype(np.int16) self.log_event("->", f" Prepared: {len(audio_data)} samples ({len(audio_data) / self.sample_rate:.2f}s)") self.input_audio = audio_data.copy() return audio_data, file_sample_rate async def audio_sender(self, audio_data: np.ndarray) -> None: """Send audio data to server in chunks.""" total_samples = len(audio_data) chunk_size = self.chunk_samples sent_samples = 0 self.send_start_time = time.time() self.log_event("->", f"Starting audio transmission ({total_samples} samples)...") while sent_samples < total_samples and self.running: end_sample = min(sent_samples + chunk_size, total_samples) chunk = audio_data[sent_samples:end_sample] chunk_bytes = chunk.tobytes() if len(chunk_bytes) % self.frame_bytes != 0: pad = self.frame_bytes - (len(chunk_bytes) % self.frame_bytes) chunk_bytes += b"\x00" * pad if self.ws: await self.ws.send(chunk_bytes) self.bytes_sent += len(chunk_bytes) sent_samples = end_sample if self.verbose and sent_samples % (self.sample_rate // 2) == 0: progress = (sent_samples / total_samples) * 100 print(f" Sending: {progress:.0f}%", end="\r") await asyncio.sleep(self.chunk_duration_ms / 1000) if self.tail_silence_ms > 0 and self.ws: tail_frames = max(1, self.tail_silence_ms // 20) silence = b"\x00" * self.frame_bytes for _ in range(tail_frames): await self.ws.send(silence) self.bytes_sent += len(silence) await asyncio.sleep(0.02) self.log_event("->", f"Sent trailing silence: {self.tail_silence_ms}ms") self.send_completed = True elapsed = time.time() - self.send_start_time self.log_event("->", f"Audio transmission complete ({elapsed:.2f}s, {self.bytes_sent / 1024:.1f} KB)") async def receiver(self) -> None: """Receive messages from server.""" try: while self.running: try: message = await asyncio.wait_for(self.ws.recv(), timeout=0.1) if isinstance(message, bytes): self.bytes_received += len(message) self.received_audio.extend(message) self._start_output_segment() self.current_output_segment.extend(message) if self.waiting_for_first_audio and self.response_start_time is not None: ttfb_ms = (time.time() - self.response_start_time) * 1000 self.ttfb_ms = ttfb_ms self.ttfb_list.append(ttfb_ms) self.waiting_for_first_audio = False self.log_event("<-", f"[TTFB] First audio latency: {ttfb_ms:.0f}ms") duration_ms = len(message) / (self.sample_rate * 2) * 1000 total_ms = len(self.received_audio) / (self.sample_rate * 2) * 1000 if self.verbose: print(f"<- Audio: +{duration_ms:.0f}ms (total: {total_ms:.0f}ms)", end="\r") else: event = json.loads(message) await self._handle_event(event) except asyncio.TimeoutError: continue except websockets.ConnectionClosed: self.log_event("<-", "Connection closed") self.running = False break except asyncio.CancelledError: pass except Exception as exc: self.log_event("!", f"Receiver error: {exc}") self.running = False async def _handle_event(self, event: dict) -> None: """Handle incoming event.""" event_type = event.get("type", "unknown") ids = self._event_ids_suffix(event) if self.track_debug: print(f"[track-debug] event={event_type} trackId={event.get('trackId')}{ids}") if event_type == "session.started": self.session_ready = True self.log_event("<-", f"Session ready!{ids}") elif event_type == "config.resolved": config = event.get("config", {}) self.log_event("<-", f"Config resolved (output={config.get('output', {})}){ids}") elif event_type == "input.speech_started": self.log_event("<-", f"Speech detected{ids}") elif event_type == "input.speech_stopped": self.log_event("<-", f"Silence detected{ids}") elif event_type == "transcript.delta": text = event.get("text", "") display_text = text[:60] + "..." if len(text) > 60 else text print(f" [listening] {display_text}".ljust(80), end="\r") elif event_type == "transcript.final": text = event.get("text", "") print(" " * 80, end="\r") self.log_event("<-", f"You: {text}{ids}") elif event_type == "metrics.ttfb": latency_ms = event.get("latencyMs", 0) self.log_event("<-", f"[TTFB] Server latency: {latency_ms}ms") elif event_type == "assistant.response.delta": text = event.get("text", "") if self.verbose and text: self.log_event("<-", f"LLM: {text}{ids}") elif event_type == "assistant.response.final": text = event.get("text", "") if text: summary = text[:100] + ("..." if len(text) > 100 else "") self.log_event("<-", f"LLM Response (final): {summary}{ids}") elif event_type == "output.audio.start": self.track_started = True self.response_start_time = time.time() self.waiting_for_first_audio = True self._close_output_segment() self.log_event("<-", f"Bot started speaking{ids}") elif event_type == "output.audio.end": self.track_ended = True self._close_output_segment() self.log_event("<-", f"Bot finished speaking{ids}") elif event_type == "response.interrupted": self._close_output_segment() self.log_event("<-", f"Bot interrupted!{ids}") elif event_type == "error": self.log_event("!", f"Error: {event.get('message')}{ids}") elif event_type == "session.stopped": self.log_event("<-", f"Session stopped: {event.get('reason')}{ids}") self.running = False else: self.log_event("<-", f"Event: {event_type}{ids}") def save_output_wav(self) -> None: """Save the conversation to a stereo WAV file.""" input_track = self._build_input_track() output_track = self._build_output_track() if input_track.size == 0 and output_track.size == 0: self.log_event("!", "No audio available to save") return if not self.received_audio: self.log_event("!", "No assistant audio received; saving silent right channel") total_samples = max(input_track.size, output_track.size) if input_track.size < total_samples: input_track = np.pad(input_track, (0, total_samples - input_track.size)) if output_track.size < total_samples: output_track = np.pad(output_track, (0, total_samples - output_track.size)) stereo_audio = np.column_stack((input_track, output_track)).astype(np.int16, copy=False) self.output_file.parent.mkdir(parents=True, exist_ok=True) with wave.open(str(self.output_file), "wb") as wav_file: wav_file.setnchannels(2) wav_file.setsampwidth(2) # 16-bit wav_file.setframerate(self.sample_rate) wav_file.writeframes(stereo_audio.tobytes()) duration = total_samples / self.sample_rate self.log_event("->", f"Saved stereo output: {self.output_file}") self.log_event("->", f" Duration: {duration:.2f}s ({total_samples} samples/channel)") self.log_event("->", " Channels: left=input, right=assistant") self.log_event("->", f" Size: {stereo_audio.nbytes / 1024:.1f} KB") async def run(self) -> None: """Run the WAV file test.""" try: audio_data, _ = self.load_wav_file() await self.connect() receiver_task = asyncio.create_task(self.receiver()) ready_start = time.time() while self.running and not self.session_ready: if time.time() - ready_start > 8.0: raise TimeoutError("Timeout waiting for session.started") await asyncio.sleep(0.05) await self.audio_sender(audio_data) self.log_event("->", f"Waiting {self.wait_time}s for response...") wait_start = time.time() while self.running and (time.time() - wait_start) < self.wait_time: if self.track_ended and self.send_completed: await asyncio.sleep(1.0) break await asyncio.sleep(0.1) self.running = False receiver_task.cancel() try: await receiver_task except asyncio.CancelledError: pass self.save_output_wav() self._print_summary() except FileNotFoundError as exc: print(f"Error: {exc}") sys.exit(1) except ConnectionRefusedError: print(f"Error: Could not connect to {self.url}") print("Make sure the server is running.") sys.exit(1) except Exception as exc: print(f"Error: {exc}") import traceback traceback.print_exc() sys.exit(1) finally: await self.close() def _print_summary(self) -> None: """Print session summary.""" print("\n" + "=" * 50) print("Session Summary") print("=" * 50) print(f" Input file: {self.input_file}") print(f" Output file: {self.output_file}") print(f" Bytes sent: {self.bytes_sent / 1024:.1f} KB") print(f" Bytes received: {self.bytes_received / 1024:.1f} KB") if self.ttfb_list: if len(self.ttfb_list) == 1: print(f" TTFB: {self.ttfb_list[0]:.0f} ms") else: values = ", ".join(f"{ttfb:.0f}ms" for ttfb in self.ttfb_list) print(f" TTFB (per response): {values}") if self.received_audio: duration = len(self.received_audio) / (self.sample_rate * 2) print(f" Response duration: {duration:.2f}s") print("=" * 50) async def close(self) -> None: """Close the connection.""" self.running = False if self.ws: try: await self.ws.close() except Exception: pass async def main(): parser = argparse.ArgumentParser( description="WAV file client for testing duplex voice conversation" ) parser.add_argument( "--input", "-i", required=True, help="Input WAV file path", ) parser.add_argument( "--output", "-o", required=True, help="Output WAV file path for stereo conversation audio", ) parser.add_argument( "--url", default="ws://localhost:8000/ws", help="WebSocket server URL (default: ws://localhost:8000/ws)", ) parser.add_argument( "--sample-rate", type=int, default=16000, help="Target sample rate for audio (default: 16000)", ) parser.add_argument( "--assistant-id", default="default", help="Assistant identifier used in websocket query parameter", ) parser.add_argument( "--channel", default="wav_client", help="Client channel name", ) parser.add_argument( "--chunk-duration", type=int, default=20, help="Chunk duration in ms for sending (default: 20)", ) parser.add_argument( "--wait-time", "-w", type=float, default=15.0, help="Time to wait for response after sending (default: 15.0)", ) parser.add_argument( "--verbose", "-v", action="store_true", help="Enable verbose output", ) parser.add_argument( "--track-debug", action="store_true", help="Print event trackId for protocol debugging", ) parser.add_argument( "--tail-silence-ms", type=int, default=800, help="Trailing silence to send after WAV playback for EOU detection (default: 800)", ) args = parser.parse_args() client = WavFileClient( url=args.url, input_file=args.input, output_file=args.output, assistant_id=args.assistant_id, channel=args.channel, sample_rate=args.sample_rate, chunk_duration_ms=args.chunk_duration, wait_time=args.wait_time, verbose=args.verbose, track_debug=args.track_debug, tail_silence_ms=args.tail_silence_ms, ) await client.run() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\nInterrupted by user")