#!/usr/bin/env python3 """ Microphone client for testing duplex voice conversation. This client captures audio from the microphone, sends it to the server, and plays back the AI's voice response through the speakers. It also displays the LLM's text responses in the console. Usage: python examples/mic_client.py --url ws://localhost:8000/ws python examples/mic_client.py --url ws://localhost:8000/ws --chat "Hello!" python examples/mic_client.py --url ws://localhost:8000/ws --verbose Requirements: pip install sounddevice soundfile websockets numpy """ import argparse import asyncio import json import sys import time import threading import queue from pathlib import Path try: import numpy as np except ImportError: print("Please install numpy: pip install numpy") sys.exit(1) try: import sounddevice as sd except ImportError: print("Please install sounddevice: pip install sounddevice") sys.exit(1) try: import websockets except ImportError: print("Please install websockets: pip install websockets") sys.exit(1) class MicrophoneClient: """ Full-duplex microphone client for voice conversation. Features: - Real-time microphone capture - Real-time speaker playback - WebSocket communication - Text chat support """ def __init__( self, url: str, sample_rate: int = 16000, chunk_duration_ms: int = 20, input_device: int = None, output_device: int = None ): """ Initialize microphone client. Args: url: WebSocket server URL sample_rate: Audio sample rate (Hz) chunk_duration_ms: Audio chunk duration (ms) input_device: Input device ID (None for default) output_device: Output device ID (None for default) """ self.url = url self.sample_rate = sample_rate self.chunk_duration_ms = chunk_duration_ms self.chunk_samples = int(sample_rate * chunk_duration_ms / 1000) self.input_device = input_device self.output_device = output_device # WebSocket connection self.ws = None self.running = False # Audio buffers self.audio_input_queue = queue.Queue() self.audio_output_buffer = b"" # Continuous buffer for smooth playback self.audio_output_lock = threading.Lock() # Statistics self.bytes_sent = 0 self.bytes_received = 0 # State self.is_recording = True self.is_playing = True # TTFB tracking (Time to First Byte) self.request_start_time = None self.first_audio_received = False # Interrupt handling - discard audio until next trackStart self._discard_audio = False self._audio_sequence = 0 # Track audio sequence to detect stale chunks # Verbose mode for streaming LLM responses self.verbose = False async def connect(self) -> None: """Connect to WebSocket server.""" print(f"Connecting to {self.url}...") self.ws = await websockets.connect(self.url) self.running = True print("Connected!") # Send invite command await self.send_command({ "command": "invite", "option": { "codec": "pcm", "sampleRate": self.sample_rate } }) async def send_command(self, cmd: dict) -> None: """Send JSON command to server.""" if self.ws: await self.ws.send(json.dumps(cmd)) print(f"→ Command: {cmd.get('command', 'unknown')}") async def send_chat(self, text: str) -> None: """Send chat message (text input).""" # Reset TTFB tracking for new request self.request_start_time = time.time() self.first_audio_received = False await self.send_command({ "command": "chat", "text": text }) print(f"→ Chat: {text}") async def send_interrupt(self) -> None: """Send interrupt command.""" await self.send_command({ "command": "interrupt" }) async def send_hangup(self, reason: str = "User quit") -> None: """Send hangup command.""" await self.send_command({ "command": "hangup", "reason": reason }) def _audio_input_callback(self, indata, frames, time, status): """Callback for audio input (microphone).""" if status: print(f"Input status: {status}") if self.is_recording and self.running: # Convert to 16-bit PCM audio_data = (indata[:, 0] * 32767).astype(np.int16).tobytes() self.audio_input_queue.put(audio_data) def _add_audio_to_buffer(self, audio_data: bytes): """Add audio data to playback buffer.""" with self.audio_output_lock: self.audio_output_buffer += audio_data def _playback_thread_func(self): """Thread function for continuous audio playback.""" import time # Chunk size: 50ms of audio chunk_samples = int(self.sample_rate * 0.05) chunk_bytes = chunk_samples * 2 print(f"Audio playback thread started (device: {self.output_device or 'default'})") try: # Create output stream with callback with sd.OutputStream( samplerate=self.sample_rate, channels=1, dtype='int16', blocksize=chunk_samples, device=self.output_device, latency='low' ) as stream: while self.running: # Get audio from buffer with self.audio_output_lock: if len(self.audio_output_buffer) >= chunk_bytes: audio_data = self.audio_output_buffer[:chunk_bytes] self.audio_output_buffer = self.audio_output_buffer[chunk_bytes:] else: # Not enough audio - output silence audio_data = b'\x00' * chunk_bytes # Convert to numpy array and write to stream samples = np.frombuffer(audio_data, dtype=np.int16).reshape(-1, 1) stream.write(samples) except Exception as e: print(f"Playback thread error: {e}") import traceback traceback.print_exc() async def _playback_task(self): """Start playback thread and monitor it.""" # Run playback in a dedicated thread for reliable timing playback_thread = threading.Thread(target=self._playback_thread_func, daemon=True) playback_thread.start() # Wait for client to stop while self.running and playback_thread.is_alive(): await asyncio.sleep(0.1) print("Audio playback stopped") async def audio_sender(self) -> None: """Send audio from microphone to server.""" while self.running: try: # Get audio from queue with timeout try: audio_data = await asyncio.get_event_loop().run_in_executor( None, lambda: self.audio_input_queue.get(timeout=0.1) ) except queue.Empty: continue # Send to server if self.ws and self.is_recording: await self.ws.send(audio_data) self.bytes_sent += len(audio_data) except asyncio.CancelledError: break except Exception as e: print(f"Audio sender error: {e}") break async def receiver(self) -> None: """Receive messages from server.""" try: while self.running: try: message = await asyncio.wait_for(self.ws.recv(), timeout=0.1) if isinstance(message, bytes): # Audio data received self.bytes_received += len(message) # Check if we should discard this audio (after interrupt) if self._discard_audio: duration_ms = len(message) / (self.sample_rate * 2) * 1000 print(f"← Audio: {duration_ms:.0f}ms (DISCARDED - waiting for new track)") continue if self.is_playing: self._add_audio_to_buffer(message) # Calculate and display TTFB for first audio packet if not self.first_audio_received and self.request_start_time: client_ttfb_ms = (time.time() - self.request_start_time) * 1000 self.first_audio_received = True print(f"← [TTFB] Client first audio latency: {client_ttfb_ms:.0f}ms") # Show progress (less verbose) with self.audio_output_lock: buffer_ms = len(self.audio_output_buffer) / (self.sample_rate * 2) * 1000 duration_ms = len(message) / (self.sample_rate * 2) * 1000 print(f"← Audio: {duration_ms:.0f}ms (buffer: {buffer_ms:.0f}ms)") else: # JSON event event = json.loads(message) await self._handle_event(event) except asyncio.TimeoutError: continue except websockets.ConnectionClosed: print("Connection closed") self.running = False break except asyncio.CancelledError: pass except Exception as e: print(f"Receiver error: {e}") self.running = False async def _handle_event(self, event: dict) -> None: """Handle incoming event.""" event_type = event.get("event", "unknown") if event_type == "answer": print("← Session ready!") elif event_type == "speaking": print("← User speech detected") elif event_type == "silence": print("← User silence detected") elif event_type == "transcript": # Display user speech transcription text = event.get("text", "") is_final = event.get("isFinal", False) if is_final: # Clear the interim line and print final print(" " * 80, end="\r") # Clear previous interim text print(f"→ You: {text}") else: # Interim result - show with indicator (overwrite same line) display_text = text[:60] + "..." if len(text) > 60 else text print(f" [listening] {display_text}".ljust(80), end="\r") elif event_type == "ttfb": # Server-side TTFB event latency_ms = event.get("latencyMs", 0) print(f"← [TTFB] Server reported latency: {latency_ms}ms") elif event_type == "llmResponse": # LLM text response text = event.get("text", "") is_final = event.get("isFinal", False) if is_final: # Print final LLM response print(f"← AI: {text}") elif self.verbose: # Show streaming chunks only in verbose mode display_text = text[:60] + "..." if len(text) > 60 else text print(f" [streaming] {display_text}") elif event_type == "trackStart": print("← Bot started speaking") # IMPORTANT: Accept audio again after trackStart self._discard_audio = False self._audio_sequence += 1 # Reset TTFB tracking for voice responses (when no chat was sent) if self.request_start_time is None: self.request_start_time = time.time() self.first_audio_received = False # Clear any old audio in buffer with self.audio_output_lock: self.audio_output_buffer = b"" elif event_type == "trackEnd": print("← Bot finished speaking") # Reset TTFB tracking after response completes self.request_start_time = None self.first_audio_received = False elif event_type == "interrupt": print("← Bot interrupted!") # IMPORTANT: Discard all audio until next trackStart self._discard_audio = True # Clear audio buffer immediately with self.audio_output_lock: buffer_ms = len(self.audio_output_buffer) / (self.sample_rate * 2) * 1000 self.audio_output_buffer = b"" print(f" (cleared {buffer_ms:.0f}ms, discarding audio until new track)") elif event_type == "error": print(f"← Error: {event.get('error')}") elif event_type == "hangup": print(f"← Hangup: {event.get('reason')}") self.running = False else: print(f"← Event: {event_type}") async def interactive_mode(self) -> None: """Run interactive mode for text chat.""" print("\n" + "=" * 50) print("Voice Conversation Client") print("=" * 50) print("Speak into your microphone to talk to the AI.") print("Or type messages to send text.") print("") print("Commands:") print(" /quit - End conversation") print(" /mute - Mute microphone") print(" /unmute - Unmute microphone") print(" /interrupt - Interrupt AI speech") print(" /stats - Show statistics") print("=" * 50 + "\n") while self.running: try: user_input = await asyncio.get_event_loop().run_in_executor( None, input, "" ) if not user_input: continue # Handle commands if user_input.startswith("/"): cmd = user_input.lower().strip() if cmd == "/quit": await self.send_hangup("User quit") break elif cmd == "/mute": self.is_recording = False print("Microphone muted") elif cmd == "/unmute": self.is_recording = True print("Microphone unmuted") elif cmd == "/interrupt": await self.send_interrupt() elif cmd == "/stats": print(f"Sent: {self.bytes_sent / 1024:.1f} KB") print(f"Received: {self.bytes_received / 1024:.1f} KB") else: print(f"Unknown command: {cmd}") else: # Send as chat message await self.send_chat(user_input) except EOFError: break except Exception as e: print(f"Input error: {e}") async def run(self, chat_message: str = None, interactive: bool = True) -> None: """ Run the client. Args: chat_message: Optional single chat message to send interactive: Whether to run in interactive mode """ try: await self.connect() # Wait for answer await asyncio.sleep(0.5) # Start audio input stream print("Starting audio streams...") input_stream = sd.InputStream( samplerate=self.sample_rate, channels=1, dtype=np.float32, blocksize=self.chunk_samples, device=self.input_device, callback=self._audio_input_callback ) input_stream.start() print("Audio streams started") # Start background tasks sender_task = asyncio.create_task(self.audio_sender()) receiver_task = asyncio.create_task(self.receiver()) playback_task = asyncio.create_task(self._playback_task()) if chat_message: # Send single message and wait await self.send_chat(chat_message) await asyncio.sleep(15) elif interactive: # Run interactive mode await self.interactive_mode() else: # Just wait while self.running: await asyncio.sleep(0.1) # Cleanup self.running = False sender_task.cancel() receiver_task.cancel() playback_task.cancel() try: await sender_task except asyncio.CancelledError: pass try: await receiver_task except asyncio.CancelledError: pass try: await playback_task except asyncio.CancelledError: pass input_stream.stop() except ConnectionRefusedError: print(f"Error: Could not connect to {self.url}") print("Make sure the server is running.") except Exception as e: print(f"Error: {e}") finally: await self.close() async def close(self) -> None: """Close the connection.""" self.running = False if self.ws: await self.ws.close() print(f"\nSession ended") print(f" Total sent: {self.bytes_sent / 1024:.1f} KB") print(f" Total received: {self.bytes_received / 1024:.1f} KB") def list_devices(): """List available audio devices.""" print("\nAvailable audio devices:") print("-" * 60) devices = sd.query_devices() for i, device in enumerate(devices): direction = [] if device['max_input_channels'] > 0: direction.append("IN") if device['max_output_channels'] > 0: direction.append("OUT") direction_str = "/".join(direction) if direction else "N/A" default = "" if i == sd.default.device[0]: default += " [DEFAULT INPUT]" if i == sd.default.device[1]: default += " [DEFAULT OUTPUT]" print(f" {i:2d}: {device['name'][:40]:40s} ({direction_str}){default}") print("-" * 60) async def main(): parser = argparse.ArgumentParser( description="Microphone client for duplex voice conversation" ) parser.add_argument( "--url", default="ws://localhost:8000/ws", help="WebSocket server URL" ) parser.add_argument( "--chat", help="Send a single chat message instead of using microphone" ) parser.add_argument( "--sample-rate", type=int, default=16000, help="Audio sample rate (default: 16000)" ) parser.add_argument( "--input-device", type=int, help="Input device ID" ) parser.add_argument( "--output-device", type=int, help="Output device ID" ) parser.add_argument( "--list-devices", action="store_true", help="List available audio devices and exit" ) parser.add_argument( "--no-interactive", action="store_true", help="Disable interactive mode" ) parser.add_argument( "--verbose", "-v", action="store_true", help="Show streaming LLM response chunks" ) args = parser.parse_args() if args.list_devices: list_devices() return client = MicrophoneClient( url=args.url, sample_rate=args.sample_rate, input_device=args.input_device, output_device=args.output_device ) client.verbose = args.verbose await client.run( chat_message=args.chat, interactive=not args.no_interactive ) if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\nInterrupted by user")