#!/usr/bin/env python3 """ Simple WebSocket client for testing voice conversation. Uses PyAudio for more reliable audio playback on Windows. Usage: python examples/simple_client.py python examples/simple_client.py --text "Hello" """ import argparse import asyncio import json import sys import time import wave import io try: import numpy as np except ImportError: print("pip install numpy") sys.exit(1) try: import websockets except ImportError: print("pip install websockets") sys.exit(1) # Try PyAudio first (more reliable on Windows) try: import pyaudio PYAUDIO_AVAILABLE = True except ImportError: PYAUDIO_AVAILABLE = False print("PyAudio not available, trying sounddevice...") try: import sounddevice as sd SD_AVAILABLE = True except ImportError: SD_AVAILABLE = False if not PYAUDIO_AVAILABLE and not SD_AVAILABLE: print("Please install pyaudio or sounddevice:") print(" pip install pyaudio") print(" or: pip install sounddevice") sys.exit(1) class SimpleVoiceClient: """Simple voice client with reliable audio playback.""" def __init__( self, url: str, sample_rate: int = 16000, app_id: str = "assistant_demo", channel: str = "simple_client", config_version_id: str = "local-dev", track_debug: bool = False, ): self.url = url self.sample_rate = sample_rate self.app_id = app_id self.channel = channel self.config_version_id = config_version_id self.track_debug = track_debug self.ws = None self.running = False # Audio buffer self.audio_buffer = b"" # PyAudio setup if PYAUDIO_AVAILABLE: self.pa = pyaudio.PyAudio() self.stream = None # Stats self.bytes_received = 0 # TTFB tracking (Time to First Byte) self.request_start_time = None self.first_audio_received = False # Interrupt handling - discard audio until next trackStart self._discard_audio = False @staticmethod def _event_ids_suffix(event: dict) -> str: data = event.get("data") if isinstance(event.get("data"), dict) else {} keys = ("turn_id", "utterance_id", "response_id", "tool_call_id", "tts_id") parts = [] for key in keys: value = data.get(key, event.get(key)) if value: parts.append(f"{key}={value}") return f" [{' '.join(parts)}]" if parts else "" async def connect(self): """Connect to server.""" print(f"Connecting to {self.url}...") self.ws = await websockets.connect(self.url) self.running = True print("Connected!") # WS v1 handshake: hello -> session.start await self.ws.send(json.dumps({ "type": "hello", "version": "v1", })) await self.ws.send(json.dumps({ "type": "session.start", "audio": { "encoding": "pcm_s16le", "sample_rate_hz": self.sample_rate, "channels": 1, }, "metadata": { "appId": self.app_id, "channel": self.channel, "configVersionId": self.config_version_id, }, })) print("-> hello/session.start") async def send_chat(self, text: str): """Send chat message.""" # Reset TTFB tracking for new request self.request_start_time = time.time() self.first_audio_received = False await self.ws.send(json.dumps({"type": "input.text", "text": text})) print(f"-> input.text: {text}") def play_audio(self, audio_data: bytes): """Play audio data immediately.""" if len(audio_data) == 0: return if PYAUDIO_AVAILABLE: # Use PyAudio - more reliable on Windows if self.stream is None: self.stream = self.pa.open( format=pyaudio.paInt16, channels=1, rate=self.sample_rate, output=True, frames_per_buffer=1024 ) self.stream.write(audio_data) elif SD_AVAILABLE: # Use sounddevice samples = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32) / 32767.0 sd.play(samples, self.sample_rate, blocking=True) async def receive_loop(self): """Receive and play audio.""" print("\nWaiting for response...") while self.running: try: msg = await asyncio.wait_for(self.ws.recv(), timeout=0.1) if isinstance(msg, bytes): # Audio data self.bytes_received += len(msg) duration_ms = len(msg) / (self.sample_rate * 2) * 1000 # Check if we should discard this audio (after interrupt) if self._discard_audio: print(f"<- audio: {len(msg)} bytes ({duration_ms:.0f}ms) [DISCARDED]") continue # Calculate and display TTFB for first audio packet if not self.first_audio_received and self.request_start_time: client_ttfb_ms = (time.time() - self.request_start_time) * 1000 self.first_audio_received = True print(f"<- [TTFB] Client first audio latency: {client_ttfb_ms:.0f}ms") print(f"<- audio: {len(msg)} bytes ({duration_ms:.0f}ms)") # Play immediately in executor to not block loop = asyncio.get_event_loop() await loop.run_in_executor(None, self.play_audio, msg) else: # JSON event event = json.loads(msg) etype = event.get("type", event.get("event", "?")) ids = self._event_ids_suffix(event) if self.track_debug: print(f"[track-debug] event={etype} trackId={event.get('trackId')}{ids}") if etype in {"transcript", "transcript.delta", "transcript.final"}: # User speech transcription text = event.get("text", "") is_final = etype == "transcript.final" or bool(event.get("isFinal")) if is_final: print(f"<- You said: {text}{ids}") else: print(f"<- [listening] {text}", end="\r") elif etype in {"ttfb", "metrics.ttfb"}: # Server-side TTFB event latency_ms = event.get("latencyMs", 0) print(f"<- [TTFB] Server reported latency: {latency_ms}ms") elif etype in {"trackStart", "output.audio.start"}: # New track starting - accept audio again self._discard_audio = False print(f"<- {etype}{ids}") elif etype in {"interrupt", "response.interrupted"}: # Interrupt - discard audio until next trackStart self._discard_audio = True print(f"<- {etype}{ids} (discarding audio until new track)") elif etype in {"hangup", "session.stopped"}: print(f"<- {etype}{ids}") self.running = False break elif etype == "config.resolved": print(f"<- config.resolved {event.get('config', {}).get('output', {})}{ids}") else: print(f"<- {etype}{ids}") except asyncio.TimeoutError: continue except websockets.ConnectionClosed: print("Connection closed") self.running = False break async def run(self, text: str = None): """Run the client.""" try: await self.connect() await asyncio.sleep(0.5) # Start receiver recv_task = asyncio.create_task(self.receive_loop()) if text: await self.send_chat(text) # Wait for response await asyncio.sleep(30) else: # Interactive mode print("\nType a message and press Enter (or 'quit' to exit):") while self.running: try: user_input = await asyncio.get_event_loop().run_in_executor( None, input, "> " ) if user_input.lower() == 'quit': break if user_input.strip(): await self.send_chat(user_input) except EOFError: break self.running = False recv_task.cancel() try: await recv_task except asyncio.CancelledError: pass finally: await self.close() async def close(self): """Close connections.""" self.running = False if PYAUDIO_AVAILABLE: if self.stream: self.stream.stop_stream() self.stream.close() self.pa.terminate() if self.ws: await self.ws.close() print(f"\nTotal audio received: {self.bytes_received / 1024:.1f} KB") def list_audio_devices(): """List available audio devices.""" print("\n=== Audio Devices ===") if PYAUDIO_AVAILABLE: pa = pyaudio.PyAudio() print("\nPyAudio devices:") for i in range(pa.get_device_count()): info = pa.get_device_info_by_index(i) if info['maxOutputChannels'] > 0: default = " [DEFAULT]" if i == pa.get_default_output_device_info()['index'] else "" print(f" {i}: {info['name']}{default}") pa.terminate() if SD_AVAILABLE: print("\nSounddevice devices:") for i, d in enumerate(sd.query_devices()): if d['max_output_channels'] > 0: default = " [DEFAULT]" if i == sd.default.device[1] else "" print(f" {i}: {d['name']}{default}") async def main(): parser = argparse.ArgumentParser(description="Simple voice client") parser.add_argument("--url", default="ws://localhost:8000/ws") parser.add_argument("--text", help="Send text and play response") parser.add_argument("--list-devices", action="store_true") parser.add_argument("--sample-rate", type=int, default=16000) parser.add_argument("--app-id", default="assistant_demo") parser.add_argument("--channel", default="simple_client") parser.add_argument("--config-version-id", default="local-dev") parser.add_argument("--track-debug", action="store_true") args = parser.parse_args() if args.list_devices: list_audio_devices() return client = SimpleVoiceClient( args.url, args.sample_rate, app_id=args.app_id, channel=args.channel, config_version_id=args.config_version_id, track_debug=args.track_debug, ) await client.run(args.text) if __name__ == "__main__": asyncio.run(main())