Files
AI-VideoAssistant/engine/examples/simple_client.py
2026-02-26 01:58:39 +08:00

338 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Simple WebSocket client for testing voice conversation.
Uses PyAudio for more reliable audio playback on Windows.
Usage:
python examples/simple_client.py
python examples/simple_client.py --text "Hello"
"""
import argparse
import asyncio
import json
import sys
import time
import wave
import io
try:
import numpy as np
except ImportError:
print("pip install numpy")
sys.exit(1)
try:
import websockets
except ImportError:
print("pip install websockets")
sys.exit(1)
# Try PyAudio first (more reliable on Windows)
try:
import pyaudio
PYAUDIO_AVAILABLE = True
except ImportError:
PYAUDIO_AVAILABLE = False
print("PyAudio not available, trying sounddevice...")
try:
import sounddevice as sd
SD_AVAILABLE = True
except ImportError:
SD_AVAILABLE = False
if not PYAUDIO_AVAILABLE and not SD_AVAILABLE:
print("Please install pyaudio or sounddevice:")
print(" pip install pyaudio")
print(" or: pip install sounddevice")
sys.exit(1)
class SimpleVoiceClient:
"""Simple voice client with reliable audio playback."""
def __init__(
self,
url: str,
sample_rate: int = 16000,
app_id: str = "assistant_demo",
channel: str = "simple_client",
config_version_id: str = "local-dev",
track_debug: bool = False,
):
self.url = url
self.sample_rate = sample_rate
self.app_id = app_id
self.channel = channel
self.config_version_id = config_version_id
self.track_debug = track_debug
self.ws = None
self.running = False
# Audio buffer
self.audio_buffer = b""
# PyAudio setup
if PYAUDIO_AVAILABLE:
self.pa = pyaudio.PyAudio()
self.stream = None
# Stats
self.bytes_received = 0
# TTFB tracking (Time to First Byte)
self.request_start_time = None
self.first_audio_received = False
# Interrupt handling - discard audio until next trackStart
self._discard_audio = False
@staticmethod
def _event_ids_suffix(event: dict) -> str:
data = event.get("data") if isinstance(event.get("data"), dict) else {}
keys = ("turn_id", "utterance_id", "response_id", "tool_call_id", "tts_id")
parts = []
for key in keys:
value = data.get(key, event.get(key))
if value:
parts.append(f"{key}={value}")
return f" [{' '.join(parts)}]" if parts else ""
async def connect(self):
"""Connect to server."""
print(f"Connecting to {self.url}...")
self.ws = await websockets.connect(self.url)
self.running = True
print("Connected!")
# WS v1 handshake: hello -> session.start
await self.ws.send(json.dumps({
"type": "hello",
"version": "v1",
}))
await self.ws.send(json.dumps({
"type": "session.start",
"audio": {
"encoding": "pcm_s16le",
"sample_rate_hz": self.sample_rate,
"channels": 1,
},
"metadata": {
"appId": self.app_id,
"channel": self.channel,
"configVersionId": self.config_version_id,
},
}))
print("-> hello/session.start")
async def send_chat(self, text: str):
"""Send chat message."""
# Reset TTFB tracking for new request
self.request_start_time = time.time()
self.first_audio_received = False
await self.ws.send(json.dumps({"type": "input.text", "text": text}))
print(f"-> input.text: {text}")
def play_audio(self, audio_data: bytes):
"""Play audio data immediately."""
if len(audio_data) == 0:
return
if PYAUDIO_AVAILABLE:
# Use PyAudio - more reliable on Windows
if self.stream is None:
self.stream = self.pa.open(
format=pyaudio.paInt16,
channels=1,
rate=self.sample_rate,
output=True,
frames_per_buffer=1024
)
self.stream.write(audio_data)
elif SD_AVAILABLE:
# Use sounddevice
samples = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32) / 32767.0
sd.play(samples, self.sample_rate, blocking=True)
async def receive_loop(self):
"""Receive and play audio."""
print("\nWaiting for response...")
while self.running:
try:
msg = await asyncio.wait_for(self.ws.recv(), timeout=0.1)
if isinstance(msg, bytes):
# Audio data
self.bytes_received += len(msg)
duration_ms = len(msg) / (self.sample_rate * 2) * 1000
# Check if we should discard this audio (after interrupt)
if self._discard_audio:
print(f"<- audio: {len(msg)} bytes ({duration_ms:.0f}ms) [DISCARDED]")
continue
# Calculate and display TTFB for first audio packet
if not self.first_audio_received and self.request_start_time:
client_ttfb_ms = (time.time() - self.request_start_time) * 1000
self.first_audio_received = True
print(f"<- [TTFB] Client first audio latency: {client_ttfb_ms:.0f}ms")
print(f"<- audio: {len(msg)} bytes ({duration_ms:.0f}ms)")
# Play immediately in executor to not block
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.play_audio, msg)
else:
# JSON event
event = json.loads(msg)
etype = event.get("type", event.get("event", "?"))
ids = self._event_ids_suffix(event)
if self.track_debug:
print(f"[track-debug] event={etype} trackId={event.get('trackId')}{ids}")
if etype in {"transcript", "transcript.delta", "transcript.final"}:
# User speech transcription
text = event.get("text", "")
is_final = etype == "transcript.final" or bool(event.get("isFinal"))
if is_final:
print(f"<- You said: {text}{ids}")
else:
print(f"<- [listening] {text}", end="\r")
elif etype in {"ttfb", "metrics.ttfb"}:
# Server-side TTFB event
latency_ms = event.get("latencyMs", 0)
print(f"<- [TTFB] Server reported latency: {latency_ms}ms")
elif etype in {"trackStart", "output.audio.start"}:
# New track starting - accept audio again
self._discard_audio = False
print(f"<- {etype}{ids}")
elif etype in {"interrupt", "response.interrupted"}:
# Interrupt - discard audio until next trackStart
self._discard_audio = True
print(f"<- {etype}{ids} (discarding audio until new track)")
elif etype in {"hangup", "session.stopped"}:
print(f"<- {etype}{ids}")
self.running = False
break
elif etype == "config.resolved":
print(f"<- config.resolved {event.get('config', {}).get('output', {})}{ids}")
else:
print(f"<- {etype}{ids}")
except asyncio.TimeoutError:
continue
except websockets.ConnectionClosed:
print("Connection closed")
self.running = False
break
async def run(self, text: str = None):
"""Run the client."""
try:
await self.connect()
await asyncio.sleep(0.5)
# Start receiver
recv_task = asyncio.create_task(self.receive_loop())
if text:
await self.send_chat(text)
# Wait for response
await asyncio.sleep(30)
else:
# Interactive mode
print("\nType a message and press Enter (or 'quit' to exit):")
while self.running:
try:
user_input = await asyncio.get_event_loop().run_in_executor(
None, input, "> "
)
if user_input.lower() == 'quit':
break
if user_input.strip():
await self.send_chat(user_input)
except EOFError:
break
self.running = False
recv_task.cancel()
try:
await recv_task
except asyncio.CancelledError:
pass
finally:
await self.close()
async def close(self):
"""Close connections."""
self.running = False
if PYAUDIO_AVAILABLE:
if self.stream:
self.stream.stop_stream()
self.stream.close()
self.pa.terminate()
if self.ws:
await self.ws.close()
print(f"\nTotal audio received: {self.bytes_received / 1024:.1f} KB")
def list_audio_devices():
"""List available audio devices."""
print("\n=== Audio Devices ===")
if PYAUDIO_AVAILABLE:
pa = pyaudio.PyAudio()
print("\nPyAudio devices:")
for i in range(pa.get_device_count()):
info = pa.get_device_info_by_index(i)
if info['maxOutputChannels'] > 0:
default = " [DEFAULT]" if i == pa.get_default_output_device_info()['index'] else ""
print(f" {i}: {info['name']}{default}")
pa.terminate()
if SD_AVAILABLE:
print("\nSounddevice devices:")
for i, d in enumerate(sd.query_devices()):
if d['max_output_channels'] > 0:
default = " [DEFAULT]" if i == sd.default.device[1] else ""
print(f" {i}: {d['name']}{default}")
async def main():
parser = argparse.ArgumentParser(description="Simple voice client")
parser.add_argument("--url", default="ws://localhost:8000/ws")
parser.add_argument("--text", help="Send text and play response")
parser.add_argument("--list-devices", action="store_true")
parser.add_argument("--sample-rate", type=int, default=16000)
parser.add_argument("--app-id", default="assistant_demo")
parser.add_argument("--channel", default="simple_client")
parser.add_argument("--config-version-id", default="local-dev")
parser.add_argument("--track-debug", action="store_true")
args = parser.parse_args()
if args.list_devices:
list_audio_devices()
return
client = SimpleVoiceClient(
args.url,
args.sample_rate,
app_id=args.app_id,
channel=args.channel,
config_version_id=args.config_version_id,
track_debug=args.track_debug,
)
await client.run(args.text)
if __name__ == "__main__":
asyncio.run(main())