""" Step 1: Minimal WebSocket Echo Server This is the simplest possible WebSocket audio server. It accepts connections and echoes back events. What you'll learn: - How to create a FastAPI WebSocket endpoint - How to handle mixed text/binary frames - Basic event sending Test with: python main.py python test_client.py """ import asyncio import json import uuid from fastapi import FastAPI, WebSocket from loguru import logger # Configure logging logger.remove() logger.add(lambda msg: print(msg, end=""), level="INFO", format="{time:HH:mm:ss} | {level} | {message}") # Create FastAPI app app = FastAPI(title="Voice Gateway - Step 1") @app.get("/health") async def health_check(): """Health check endpoint.""" return {"status": "healthy", "step": "1_minimal_echo"} @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): """ WebSocket endpoint for audio streaming. This is a minimal echo server that: 1. Accepts WebSocket connections 2. Sends a welcome event 3. Receives text commands and binary audio 4. Echoes speaking events back """ await websocket.accept() # Generate unique session ID session_id = str(uuid.uuid4()) logger.info(f"[{session_id}] Client connected") try: # Send welcome event (answer) await websocket.send_json({ "event": "answer", "trackId": session_id, "timestamp": _get_timestamp_ms() }) logger.info(f"[{session_id}] Sent answer event") # Message receive loop while True: message = await websocket.receive() # Handle binary audio data if "bytes" in message: audio_bytes = message["bytes"] logger.info(f"[{session_id}] Received audio: {len(audio_bytes)} bytes") # Send speaking event (echo back) await websocket.send_json({ "event": "speaking", "trackId": session_id, "timestamp": _get_timestamp_ms(), "startTime": _get_timestamp_ms() }) # Handle text commands elif "text" in message: text_data = message["text"] logger.info(f"[{session_id}] Received text: {text_data[:100]}...") try: data = json.loads(text_data) command = data.get("command", "unknown") logger.info(f"[{session_id}] Command: {command}") # Handle basic commands if command == "invite": await websocket.send_json({ "event": "answer", "trackId": session_id, "timestamp": _get_timestamp_ms() }) logger.info(f"[{session_id}] Responded to invite") elif command == "hangup": logger.info(f"[{session_id}] Hangup requested") break elif command == "ping": await websocket.send_json({ "event": "pong", "timestamp": _get_timestamp_ms() }) except json.JSONDecodeError as e: logger.error(f"[{session_id}] Invalid JSON: {e}") except Exception as e: logger.error(f"[{session_id}] Error: {e}") finally: logger.info(f"[{session_id}] Connection closed") def _get_timestamp_ms() -> int: """Get current timestamp in milliseconds.""" import time return int(time.time() * 1000) if __name__ == "__main__": import uvicorn logger.info("🚀 Starting Step 1: Minimal WebSocket Echo Server") logger.info("📡 Server: ws://localhost:8000/ws") logger.info("🩺 Health: http://localhost:8000/health") uvicorn.run( app, host="0.0.0.0", port=8000, log_level="info" )