# CLAUDE.md This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. ## Project Overview This is a Python implementation of `active-call` (originally Rust), a high-performance Voice AI Gateway that bridges telephony protocols (WebSocket, WebRTC) with AI pipelines (LLM, ASR, TTS). The system follows a **decoupled architecture** where the Media Gateway (this service) handles low-level audio/signaling, while Business Logic (AI Agent) controls it via WebSocket API. **Technology Stack:** - Python 3.11+ with `asyncio` for all I/O - FastAPI + Uvicorn for WebSocket/WebRTC endpoints - aiortc for WebRTC media transport (optional dependency) - Silero VAD for voice activity detection (optional dependency) - Pydantic for protocol validation - Loguru for structured logging ## Common Development Commands ### Running the Server ```bash # Start development server (with auto-reload) uvicorn app.main:app --reload --host 0.0.0.0 --port 8000 # Start with specific host/port HOST=0.0.0.0 PORT=8080 uvicorn app.main:app # Using Docker docker-compose up --build ``` ### Testing ```bash # Run WebSocket test client (sine wave generation) python scripts/test_websocket.py --url ws://localhost:8000/ws --sine # Run WebSocket test client (with audio file) python scripts/test_websocket.py --url ws://localhost:8000/ws --file test_audio.wav # Run WebRTC test client python scripts/test_webrtc.py --url ws://localhost:8000/webrtc # Run unit tests pytest tests/ -v --cov=app --cov=core # Run specific test file pytest tests/test_session.py -v # Run with coverage report pytest tests/ --cov=app --cov=core --cov-report=html ``` ### Code Quality ```bash # Format code black app/ core/ models/ processors/ utils/ scripts/ # Lint code ruff check app/ core/ models/ processors/ utils/ scripts/ # Type checking mypy app/ core/ ``` ### Dependency Management ```bash # Install all dependencies pip install -r requirements.txt # Install development dependencies pip install -r requirements-dev.txt # Update dependencies pip install --upgrade -r requirements.txt ``` ## Architecture Overview ### Decoupled Design Pattern The system implements a **decoupled architecture** separating concerns: 1. **Media Gateway Layer** (`app/`, `core/`, `processors/`) - Handles low-level audio transport (WebSocket, WebRTC) - Manages session lifecycle and state - Processes audio through pipeline (VAD, resampling) - Emits events (speaking, silence, error) to control layer 2. **Business Logic Layer** (External AI Agent) - Connects via WebSocket - Receives real-time events (speech detection, ASR transcripts) - Sends commands (tts, play, interrupt, hangup) ### Key Architecture Components **Transport Abstraction** ([`core/transports.py`](core/transports.py)): - `BaseTransport` - Abstract interface with `send_event()` and `send_audio()` - `SocketTransport` - WebSocket with mixed text/binary frames, uses `asyncio.Lock` to prevent frame interleaving - `WebRtcTransport` - WebSocket signaling + aiortc RTCPeerConnection for media **Session Management** ([`core/session.py`](core/session.py)): - Each WebSocket/WebRTC connection creates a `Session` with unique UUID - Routes incoming JSON commands to handlers via `parse_command()` from `models/commands.py` - Routes binary audio data to `AudioPipeline` - Manages session state: created → invited → accepted → ringing → hungup - Cleanup on disconnect **Audio Pipeline** ([`core/pipeline.py`](core/pipeline.py)): - Processes audio through VAD (Voice Activity Detection) - Emits events to global event bus when VAD state changes - Supports interruption for barge-in scenarios **Event Bus** ([`core/events.py`](core/events.py)): - Global pub/sub system for inter-component communication - Subscribe to specific event types (speaking, silence, error) - Async notification to all subscribers ### Protocol Compatibility The implementation **must maintain protocol compatibility** with the original Rust API. All commands and events are strictly defined in: - [`models/commands.py`](models/commands.py) - Command models (invite, accept, reject, tts, play, interrupt, hangup, chat) - [`models/events.py`](models/events.py) - Event models (answer, speaking, silence, trackStart, trackEnd, error) - [`models/config.py`](models/config.py) - Configuration models (CallOption, VADOption, TTSOption, ASROption, etc.) **Important:** Always use `parse_command()` from `models/commands.py` to parse incoming JSON - never manually parse command strings. This ensures type safety and validation. ### WebSocket Protocol (`/ws` endpoint) **Mixed Frame Handling:** - **Text frames** → JSON commands (invite, tts, play, interrupt, hangup, etc.) - **Binary frames** → Raw PCM audio (16kHz, 16-bit, mono) **Flow:** 1. Client connects and sends `invite` command with codec configuration 2. Server responds with `answer` event 3. Client streams binary audio frames 4. Server processes audio and emits events (speaking, silence) 5. Client can send commands at any time (tts, play, interrupt, hangup) ### WebRTC Protocol (`/webrtc` endpoint) **Signaling Flow:** 1. Client connects via WebSocket 2. Client sends SDP offer (JSON with `sdp` and `type` fields) 3. Server creates RTCPeerConnection and generates SDP answer 4. Server responds with `answer` event containing SDP 5. WebRTC media flows via UDP (managed by aiortc) 6. Commands can be sent via WebSocket text frames at any time **Audio Track Handling:** - When `pc.on("track")` fires, wrap received track with `Resampled16kTrack` - Pull frames from track and convert to bytes - Feed bytes to `session.handle_audio()` ### Session Lifecycle ``` 1. Connection → WebSocket/WebRTC endpoint accepts 2. Session creation → New Session(uuid, transport) 3. Invite → Client sends invite command 4. Answer → Server sends answer event 5. Audio streaming → Client sends binary audio / WebRTC media 6. Commands → Client sends JSON commands (tts, play, interrupt) 7. Hangup → Client sends hangup command OR connection closes 8. Cleanup → Session cleanup, remove from active_sessions ``` ### Optional Dependencies The following dependencies are **optional** - the code gracefully degrades without them: - **aiortc + av (PyAV)** - Required for WebRTC functionality. Without them: - `/webrtc` endpoint will reject connections - WebRTC transport cannot be used - WebSocket endpoint still works fine - **onnxruntime** - Required for VAD functionality. Without it: - VAD always returns "Speech" with probability 1.0 - speaking/silence events still emitted but not accurate ## Important Implementation Details ### Thread Safety in WebSocket Transport The `SocketTransport` uses `asyncio.Lock()` because FastAPI WebSocket's `send_text()` and `send_bytes()` are NOT thread-safe. Without the lock, rapidly sending text and binary frames can interleave, causing protocol violations. ```python async def send_event(self, event: dict): async with self.lock: # Critical for thread safety await self.ws.send_text(json.dumps(event)) async def send_audio(self, pcm_bytes: bytes): async with self.lock: await self.ws.send_bytes(pcm_bytes) ``` ### Event Bus Usage Components subscribe to event types and are notified asynchronously: ```python event_bus = get_event_bus() # Subscribe to speaking events event_bus.subscribe("speaking", my_callback) # Publish events await event_bus.publish("speaking", {"trackId": session_id, "probability": 0.9}) ``` ### Error Handling Pattern All errors are sent as `error` events to the client: ```python await self.transport.send_event({ "event": "error", "trackId": self.current_track_id, "timestamp": self._get_timestamp_ms(), "sender": "server", # or "asr", "tts", "media", etc. "error": error_message }) ``` ### Configuration Management Configuration is loaded from: 1. Environment variables 2. `.env` file (gitignored) 3. Default values in `app/config.py` **Never commit `.env`** - it may contain sensitive keys. Use `.env.example` as a template. ### Audio Format Specifications **Input/Output Audio:** - Sample rate: 16kHz - Bit depth: 16-bit (PCM) - Channels: Mono - Chunk size: 640 bytes (20ms at 16kHz) **Format:** Little-endian signed 16-bit integers (int16) ## Key Files Reference When working with this codebase, these files are the most critical: - [`app/main.py`](app/main.py) - FastAPI endpoints, session lifecycle, event hooks - [`core/transports.py`](core/transports.py) - Transport abstraction and WebSocket/WebRTC handling - [`core/session.py`](core/session.py) - Command routing, session state management - [`core/pipeline.py`](core/pipeline.py) - Audio processing, VAD integration, event emission - [`models/commands.py`](models/commands.py) - Protocol command definitions and parsing - [`models/events.py`](models/events.py) - Protocol event definitions - [`processors/vad.py`](processors/vad.py) - Silero VAD implementation - [`reference/active-call/docs/api.md`](reference/active-call/docs/api.md) - Complete API specification from original Rust implementation ### Testing Strategy When implementing new features: 1. **Unit tests** - Test individual components (transports, session, pipeline) 2. **Integration tests** - Test endpoint behavior with test clients 3. **Protocol tests** - Verify commands/events match API specification 4. **Manual testing** - Use `scripts/test_websocket.py` and `scripts/test_webrtc.py` ### Reference Implementations - **Original Rust implementation:** [`reference/active-call/`](reference/active-call/) - Complete feature set with SIP, ASR, TTS - **Python reference:** [`reference/py-active-call/`](reference/py-active-call/) - Partial implementation with bot integration Use these as references for: - Protocol specification details - Architecture patterns - Testing approaches - Edge case handling ### Common Patterns **Creating a new command:** 1. Add model to `models/commands.py` 2. Add to `COMMAND_TYPES` dict 3. Add handler method in `core/session.py` (e.g., `_handle_mycommand`) 4. Route in `Session.handle_text()` under the command type **Adding a new event:** 1. Add model to `models/events.py` 2. Add to `EVENT_TYPES` dict 3. Emit via `transport.send_event()` or `event_bus.publish()` **Adding a new processor:** 1. Create in `processors/myprocessor.py` 2. Integrate into `core/pipeline.py` AudioPipeline 3. Emit events through event bus ### Session State Management Sessions track state through these transitions: - `created` - Initial state - `invited` - Invite command received - `accepted` - Accept command received - `ringing` - Ringing command sent - `hungup` - Hangup command or disconnect The `state` attribute is updated in each handler and logged for debugging. ### Testing Endpoints Without Full Dependencies The WebSocket endpoint (`/ws`) works without aiortc, av, or onnxruntime. Use this for testing core functionality: ```bash # Install minimal dependencies pip install fastapi uvicorn numpy pydantic python-dotenv loguru aiohttp # Start server uvicorn app.main:app # Test with basic client python scripts/test_websocket.py ``` The WebRTC endpoint requires aiortc+av (PyAV) which can be challenging to install on Windows. Consider Linux/macOS for full WebRTC development. ### Logging Logs are written to: - Console (stdout) - Real-time output - `logs/active_call_YYYY-MM-DD.log` - Rotated daily, retained for 7 days Log levels: DEBUG, INFO, WARNING, ERROR, CRITICAL Set via `LOG_LEVEL` environment variable or in `.env`. ### Dependencies Note On Windows with Python 3.11, `aiortc` and `av` (PyAV) may have installation issues due to: - Missing C compilers - Incompatible binary wheel versions - FFmpeg/library dependencies The code gracefully handles missing optional dependencies with try/except imports and runtime checks. Consider using Docker for consistent development environments.