Compare commits

36 Commits

Author SHA1 Message Date
Xin Wang
7be8fda424 Fix microphone talk eou missing and clean chat log 2026-02-06 11:36:39 +08:00
Xin Wang
c8c0e30bc3 Update web client 2026-02-06 11:25:05 +08:00
Xin Wang
960690ba80 Remove invite button, correct stream asr tts transcription 2026-02-06 11:20:52 +08:00
Xin Wang
cb35d87eb4 Update web client 2026-02-06 10:46:24 +08:00
Xin Wang
5c03cf2b1f Update web client layout 2026-02-06 10:34:09 +08:00
Xin Wang
876ca8221c Put web client together 2026-02-06 09:57:45 +08:00
Xin Wang
a8e7c7e2ef Add web client to app server 2026-02-06 09:54:23 +08:00
Xin Wang
9d42f3cca1 Fix list devices on web client 2026-02-06 09:40:52 +08:00
Xin Wang
f81a561e0e Fix indent error 2026-02-06 08:40:42 +08:00
Xin Wang
a70970fee5 Add web client 2026-02-06 08:36:00 +08:00
Xin Wang
e511cf9077 Fix Potential state duplication on barge-in. 2026-02-06 08:30:37 +08:00
Xin Wang
0576231d8d Fix Race risks if process_audio is called concurrently. 2026-02-06 08:26:56 +08:00
Xin Wang
26458faa6c Fix Unbounded _audio_buffer growth. 2026-02-06 08:11:14 +08:00
Xin Wang
605968a639 Fix _stop_current_speech doesn’t cancel LLM/TTS services. 2026-02-06 08:05:33 +08:00
Xin Wang
31d24a7428 Merge branch 'master' of https://gitea.xiaowang.eu.org/wx44wx/py-active-call 2026-02-06 08:00:39 +08:00
Xin Wang
7846e4cebc Fix No cancellation of existing turn on new EOU. 2026-02-06 07:59:31 +08:00
Xin Wang
d9dc14d03a update sentences ends 2026-02-06 07:58:54 +08:00
Xin Wang
294a3e405c sentences ends update 2026-02-06 07:55:06 +08:00
Xin Wang
6831f5316c Merge branch 'master' of https://gitea.xiaowang.eu.org/wx44wx/py-active-call 2026-02-06 07:52:54 +08:00
Xin Wang
65128b0eb0 update client latency and three utterances example 2026-02-06 07:52:06 +08:00
Xin Wang
9954e8d18f update client latency and three utterances example 2026-02-06 07:51:09 +08:00
Xin Wang
4ceb3ec96f Fix Duplicate / inconsistent EOU 2026-02-06 07:23:31 +08:00
Xin Wang
da52a88006 Fix _on_end_of_utterance sets state to LISTENING even when no text. 2026-02-05 18:47:56 +08:00
Xin Wang
2de427b92c Add energy based vad fallback 2026-02-05 17:21:52 +08:00
Xin Wang
b72e09f263 Add heartbeat 2026-02-04 23:16:30 +08:00
Xin Wang
77d54d284f remove pipeline because it is just a vad integration 2026-02-04 15:01:05 +08:00
Xin Wang
0835f6a617 update gitignore 2026-02-04 14:16:27 +08:00
Xin Wang
d9d5d523ec change default logs path in main 2026-02-04 13:34:39 +08:00
Xin Wang
2b41648a87 add audio_examples and update gitignore 2026-02-04 13:25:55 +08:00
Xin Wang
911bbb5bf4 add audio samples and update wav client 2026-02-04 13:22:17 +08:00
Xin Wang
7d255468ab api has llm response event 2026-02-04 12:00:52 +08:00
Xin Wang
5aa9a12ca8 Add generate test audio script 2026-02-04 10:45:10 +08:00
Xin Wang
8bc24ded59 fix long run bug 2026-02-03 12:05:09 +08:00
Xin Wang
a2e341b433 Merge branch 'master' of https://gitea.xiaowang.eu.org/wx44wx/py-active-call2 2026-02-02 23:19:26 +08:00
d27f230532 Merge pull request 'Add basic README' (#1) from add-readme into master
Reviewed-on: #1
2026-01-30 09:07:26 +00:00
Xin Wang
cf7d3b23bc Update bargin in duration ms 2026-01-30 16:24:47 +08:00
22 changed files with 2280 additions and 267 deletions

5
.gitignore vendored
View File

@@ -143,9 +143,6 @@ cython_debug/
*~
# Project specific
assets/*.onnx
*.wav
*.mp3
*.pcm
recordings/
logs/
running/

View File

@@ -5,3 +5,21 @@ Python Active-Call: real-time audio streaming with WebSocket and WebRTC.
This repo contains a Python 3.11+ codebase for building low-latency voice
pipelines (capture, stream, and process audio) using WebRTC and WebSockets.
It is currently in an early, experimental stage.
# Usage
启动
```
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
```
测试
```
python examples/test_websocket.py
```
```
python mic_client.py
```

View File

@@ -64,8 +64,8 @@ class Settings(BaseSettings):
# Barge-in (interruption) Configuration
barge_in_min_duration_ms: int = Field(
default=50,
description="Minimum speech duration (ms) required to trigger barge-in. 50-100ms recommended."
default=200,
description="Minimum speech duration (ms) required to trigger barge-in. Lower=more sensitive."
)
# Logging
@@ -84,6 +84,10 @@ class Settings(BaseSettings):
description="ICE servers configuration"
)
# WebSocket heartbeat and inactivity
inactivity_timeout_sec: int = Field(default=60, description="Close connection after no message from client (seconds)")
heartbeat_interval_sec: int = Field(default=50, description="Send heartBeat event to client every N seconds")
@property
def chunk_size_bytes(self) -> int:
"""Calculate chunk size in bytes based on sample rate and duration."""

View File

@@ -1,11 +1,14 @@
"""FastAPI application with WebSocket and WebRTC endpoints."""
import uuid
import asyncio
import json
from typing import Dict, Any, Optional
import time
import uuid
from pathlib import Path
from typing import Dict, Any, Optional, List
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.responses import JSONResponse, FileResponse
from loguru import logger
# Try to import aiortc (optional for WebRTC functionality)
@@ -17,13 +20,52 @@ except ImportError:
logger.warning("aiortc not available - WebRTC endpoint will be disabled")
from app.config import settings
from core.transports import SocketTransport, WebRtcTransport
from core.transports import SocketTransport, WebRtcTransport, BaseTransport
from core.session import Session
from processors.tracks import Resampled16kTrack
from core.events import get_event_bus, reset_event_bus
# Check interval for heartbeat/timeout (seconds)
_HEARTBEAT_CHECK_INTERVAL_SEC = 5
async def heartbeat_and_timeout_task(
transport: BaseTransport,
session: Session,
session_id: str,
last_received_at: List[float],
last_heartbeat_at: List[float],
inactivity_timeout_sec: int,
heartbeat_interval_sec: int,
) -> None:
"""
Background task: send heartBeat every ~heartbeat_interval_sec and close
connection if no message from client for inactivity_timeout_sec.
"""
while True:
await asyncio.sleep(_HEARTBEAT_CHECK_INTERVAL_SEC)
if transport.is_closed:
break
now = time.monotonic()
if now - last_received_at[0] > inactivity_timeout_sec:
logger.info(f"Session {session_id}: {inactivity_timeout_sec}s no message, closing")
await session.cleanup()
break
if now - last_heartbeat_at[0] >= heartbeat_interval_sec:
try:
await transport.send_event({
"event": "heartBeat",
"timestamp": int(time.time() * 1000),
})
last_heartbeat_at[0] = now
except Exception as e:
logger.debug(f"Session {session_id}: heartbeat send failed: {e}")
break
# Initialize FastAPI
app = FastAPI(title="Python Active-Call", version="0.1.0")
_WEB_CLIENT_PATH = Path(__file__).resolve().parent.parent / "examples" / "web_client.html"
# Configure CORS
app.add_middleware(
@@ -40,7 +82,7 @@ active_sessions: Dict[str, Session] = {}
# Configure logging
logger.remove()
logger.add(
"../logs/active_call_{time}.log",
"./logs/active_call_{time}.log",
rotation="1 day",
retention="7 days",
level=settings.log_level,
@@ -59,6 +101,24 @@ async def health_check():
return {"status": "healthy", "sessions": len(active_sessions)}
@app.get("/")
async def web_client_root():
"""Serve the web client."""
if not _WEB_CLIENT_PATH.exists():
raise HTTPException(status_code=404, detail="Web client not found")
return FileResponse(_WEB_CLIENT_PATH)
@app.get("/client")
async def web_client_alias():
"""Alias for the web client."""
if not _WEB_CLIENT_PATH.exists():
raise HTTPException(status_code=404, detail="Web client not found")
return FileResponse(_WEB_CLIENT_PATH)
@app.get("/iceservers")
async def get_ice_servers():
"""Get ICE servers configuration for WebRTC."""
@@ -112,10 +172,25 @@ async def websocket_endpoint(websocket: WebSocket):
logger.info(f"WebSocket connection established: {session_id}")
last_received_at: List[float] = [time.monotonic()]
last_heartbeat_at: List[float] = [0.0]
hb_task = asyncio.create_task(
heartbeat_and_timeout_task(
transport,
session,
session_id,
last_received_at,
last_heartbeat_at,
settings.inactivity_timeout_sec,
settings.heartbeat_interval_sec,
)
)
try:
# Receive loop
while True:
message = await websocket.receive()
last_received_at[0] = time.monotonic()
# Handle binary audio data
if "bytes" in message:
@@ -132,6 +207,11 @@ async def websocket_endpoint(websocket: WebSocket):
logger.error(f"WebSocket error: {e}", exc_info=True)
finally:
hb_task.cancel()
try:
await hb_task
except asyncio.CancelledError:
pass
# Cleanup session
if session_id in active_sessions:
await session.cleanup()
@@ -165,6 +245,20 @@ async def webrtc_endpoint(websocket: WebSocket):
logger.info(f"WebRTC connection established: {session_id}")
last_received_at: List[float] = [time.monotonic()]
last_heartbeat_at: List[float] = [0.0]
hb_task = asyncio.create_task(
heartbeat_and_timeout_task(
transport,
session,
session_id,
last_received_at,
last_heartbeat_at,
settings.inactivity_timeout_sec,
settings.heartbeat_interval_sec,
)
)
# Track handler for incoming audio
@pc.on("track")
def on_track(track):
@@ -202,6 +296,7 @@ async def webrtc_endpoint(websocket: WebSocket):
if "text" not in message:
continue
last_received_at[0] = time.monotonic()
data = json.loads(message["text"])
# Handle SDP offer/answer
@@ -238,6 +333,11 @@ async def webrtc_endpoint(websocket: WebSocket):
logger.error(f"WebRTC error: {e}", exc_info=True)
finally:
hb_task.cancel()
try:
await hb_task
except asyncio.CancelledError:
pass
# Cleanup
await pc.close()
if session_id in active_sessions:

View File

@@ -2,7 +2,6 @@
from core.events import EventBus, get_event_bus
from core.transports import BaseTransport, SocketTransport, WebRtcTransport
from core.pipeline import AudioPipeline
from core.session import Session
from core.conversation import ConversationManager, ConversationState, ConversationTurn
from core.duplex_pipeline import DuplexPipeline
@@ -13,7 +12,6 @@ __all__ = [
"BaseTransport",
"SocketTransport",
"WebRtcTransport",
"AudioPipeline",
"Session",
"ConversationManager",
"ConversationState",

View File

@@ -85,8 +85,8 @@ class DuplexPipeline:
# Initialize EOU detector
self.eou_detector = EouDetector(
silence_threshold_ms=600,
min_speech_duration_ms=200
silence_threshold_ms=settings.vad_eou_threshold_ms,
min_speech_duration_ms=settings.vad_min_speech_duration_ms
)
# Initialize services
@@ -108,11 +108,18 @@ class DuplexPipeline:
self._is_bot_speaking = False
self._current_turn_task: Optional[asyncio.Task] = None
self._audio_buffer: bytes = b""
max_buffer_seconds = settings.max_audio_buffer_seconds if hasattr(settings, "max_audio_buffer_seconds") else 30
self._max_audio_buffer_bytes = int(settings.sample_rate * 2 * max_buffer_seconds)
self._last_vad_status: str = "Silence"
self._process_lock = asyncio.Lock()
# Interruption handling
self._interrupt_event = asyncio.Event()
# Latency tracking - TTFB (Time to First Byte)
self._turn_start_time: Optional[float] = None
self._first_audio_sent: bool = False
# Barge-in filtering - require minimum speech duration to interrupt
self._barge_in_speech_start_time: Optional[float] = None
self._barge_in_min_duration_ms: int = settings.barge_in_min_duration_ms if hasattr(settings, 'barge_in_min_duration_ms') else 50
@@ -202,6 +209,7 @@ class DuplexPipeline:
return
try:
async with self._process_lock:
# 1. Process through VAD
vad_result = self.vad_processor.process(pcm_bytes, settings.chunk_size_ms)
@@ -259,6 +267,9 @@ class DuplexPipeline:
# 3. Buffer audio for ASR
if vad_status == "Speech" or self.conversation.state == ConversationState.LISTENING:
self._audio_buffer += pcm_bytes
if len(self._audio_buffer) > self._max_audio_buffer_bytes:
# Keep only the most recent audio to cap memory usage
self._audio_buffer = self._audio_buffer[-self._max_audio_buffer_bytes:]
await self.asr_service.send_audio(pcm_bytes)
# For SiliconFlow ASR, trigger interim transcription periodically
@@ -364,7 +375,8 @@ class DuplexPipeline:
# Reset for next utterance
self._audio_buffer = b""
self._last_sent_transcript = ""
await self.conversation.start_user_turn()
# Return to idle; don't force LISTENING which causes buffering on silence
await self.conversation.set_state(ConversationState.IDLE)
return
logger.info(f"EOU detected - user said: {user_text[:100]}...")
@@ -383,6 +395,8 @@ class DuplexPipeline:
self._last_sent_transcript = ""
# Process the turn - trigger LLM response
# Cancel any existing turn to avoid overlapping assistant responses
await self._stop_current_speech()
await self.conversation.end_user_turn(user_text)
self._current_turn_task = asyncio.create_task(self._handle_turn(user_text))
@@ -396,6 +410,10 @@ class DuplexPipeline:
user_text: User's transcribed text
"""
try:
# Start latency tracking
self._turn_start_time = time.time()
self._first_audio_sent = False
# Get AI response (streaming)
messages = self.conversation.get_messages()
full_response = ""
@@ -406,7 +424,7 @@ class DuplexPipeline:
# Sentence buffer for streaming TTS
sentence_buffer = ""
sentence_ends = {'.', '!', '?', '', '', '', '', '\n'}
sentence_ends = {'', '', '', '', '\n'}
first_audio_sent = False
# Stream LLM response and TTS sentence by sentence
@@ -418,6 +436,15 @@ class DuplexPipeline:
sentence_buffer += text_chunk
await self.conversation.update_assistant_text(text_chunk)
# Send LLM response streaming event to client
await self.transport.send_event({
"event": "llmResponse",
"trackId": self.session_id,
"text": text_chunk,
"isFinal": False,
"timestamp": self._get_timestamp_ms()
})
# Check for sentence completion - synthesize immediately for low latency
while any(end in sentence_buffer for end in sentence_ends):
# Find first sentence end
@@ -446,6 +473,16 @@ class DuplexPipeline:
else:
break
# Send final LLM response event
if full_response and not self._interrupt_event.is_set():
await self.transport.send_event({
"event": "llmResponse",
"trackId": self.session_id,
"text": full_response,
"isFinal": True,
"timestamp": self._get_timestamp_ms()
})
# Speak any remaining text
if sentence_buffer.strip() and not self._interrupt_event.is_set():
if not first_audio_sent:
@@ -495,10 +532,33 @@ class DuplexPipeline:
try:
async for chunk in self.tts_service.synthesize_stream(text):
# Check interrupt at the start of each iteration
if self._interrupt_event.is_set():
logger.debug("TTS sentence interrupted")
break
# Track and log first audio packet latency (TTFB)
if not self._first_audio_sent and self._turn_start_time:
ttfb_ms = (time.time() - self._turn_start_time) * 1000
self._first_audio_sent = True
logger.info(f"[TTFB] Server first audio packet latency: {ttfb_ms:.0f}ms (session {self.session_id})")
# Send TTFB event to client
await self.transport.send_event({
"event": "ttfb",
"trackId": self.session_id,
"timestamp": self._get_timestamp_ms(),
"latencyMs": round(ttfb_ms)
})
# Double-check interrupt right before sending audio
if self._interrupt_event.is_set():
break
await self.transport.send_audio(chunk.audio)
await asyncio.sleep(0.005) # Small delay to prevent flooding
except asyncio.CancelledError:
logger.debug("TTS sentence cancelled")
except Exception as e:
logger.error(f"TTS sentence error: {e}")
@@ -513,6 +573,10 @@ class DuplexPipeline:
return
try:
# Start latency tracking for greeting
speak_start_time = time.time()
first_audio_sent = False
# Send track start event
await self.transport.send_event({
"event": "trackStart",
@@ -528,6 +592,20 @@ class DuplexPipeline:
logger.info("TTS interrupted by barge-in")
break
# Track and log first audio packet latency (TTFB)
if not first_audio_sent:
ttfb_ms = (time.time() - speak_start_time) * 1000
first_audio_sent = True
logger.info(f"[TTFB] Greeting first audio packet latency: {ttfb_ms:.0f}ms (session {self.session_id})")
# Send TTFB event to client
await self.transport.send_event({
"event": "ttfb",
"trackId": self.session_id,
"timestamp": self._get_timestamp_ms(),
"latencyMs": round(ttfb_ms)
})
# Send audio to client
await self.transport.send_audio(chunk.audio)
@@ -561,8 +639,17 @@ class DuplexPipeline:
self._barge_in_speech_frames = 0
self._barge_in_silence_frames = 0
# Signal interruption
# IMPORTANT: Signal interruption FIRST to stop audio sending
self._interrupt_event.set()
self._is_bot_speaking = False
# Send interrupt event to client IMMEDIATELY
# This must happen BEFORE canceling services, so client knows to discard in-flight audio
await self.transport.send_event({
"event": "interrupt",
"trackId": self.session_id,
"timestamp": self._get_timestamp_ms()
})
# Cancel TTS
if self.tts_service:
@@ -572,18 +659,12 @@ class DuplexPipeline:
if self.llm_service and hasattr(self.llm_service, 'cancel'):
self.llm_service.cancel()
# Interrupt conversation
# Interrupt conversation only if there is no active turn task.
# When a turn task exists, it will handle end_assistant_turn() to avoid double callbacks.
if not (self._current_turn_task and not self._current_turn_task.done()):
await self.conversation.interrupt()
# Send interrupt event to client
await self.transport.send_event({
"event": "interrupt",
"trackId": self.session_id,
"timestamp": self._get_timestamp_ms()
})
# Reset for new user turn
self._is_bot_speaking = False
await self.conversation.start_user_turn()
self._audio_buffer = b""
self.eou_detector.reset()
@@ -598,6 +679,12 @@ class DuplexPipeline:
except asyncio.CancelledError:
pass
# Ensure underlying services are cancelled to avoid leaking work/audio
if self.tts_service:
await self.tts_service.cancel()
if self.llm_service and hasattr(self.llm_service, 'cancel'):
self.llm_service.cancel()
self._is_bot_speaking = False
self._interrupt_event.clear()

View File

@@ -1,131 +0,0 @@
"""Audio processing pipeline."""
import asyncio
from typing import Optional
from loguru import logger
from core.transports import BaseTransport
from core.events import EventBus, get_event_bus
from processors.vad import VADProcessor, SileroVAD
from app.config import settings
class AudioPipeline:
"""
Audio processing pipeline.
Processes incoming audio through VAD and emits events.
"""
def __init__(self, transport: BaseTransport, session_id: str):
"""
Initialize audio pipeline.
Args:
transport: Transport instance for sending events/audio
session_id: Session identifier for event tracking
"""
self.transport = transport
self.session_id = session_id
self.event_bus = get_event_bus()
# Initialize VAD
self.vad_model = SileroVAD(
model_path=settings.vad_model_path,
sample_rate=settings.sample_rate
)
self.vad_processor = VADProcessor(
vad_model=self.vad_model,
threshold=settings.vad_threshold,
silence_threshold_ms=settings.vad_eou_threshold_ms,
min_speech_duration_ms=settings.vad_min_speech_duration_ms
)
# State
self.is_bot_speaking = False
self.interrupt_signal = asyncio.Event()
self._running = True
logger.info(f"Audio pipeline initialized for session {session_id}")
async def process_input(self, pcm_bytes: bytes) -> None:
"""
Process incoming audio chunk.
Args:
pcm_bytes: PCM audio data (16-bit, mono, 16kHz)
"""
if not self._running:
return
try:
# Process through VAD
result = self.vad_processor.process(pcm_bytes, settings.chunk_size_ms)
if result:
event_type, probability = result
# Emit event through event bus
await self.event_bus.publish(event_type, {
"trackId": self.session_id,
"probability": probability
})
# Send event to client
if event_type == "speaking":
logger.info(f"User speaking started (session {self.session_id})")
await self.transport.send_event({
"event": "speaking",
"trackId": self.session_id,
"timestamp": self._get_timestamp_ms(),
"startTime": self._get_timestamp_ms()
})
elif event_type == "silence":
logger.info(f"User speaking stopped (session {self.session_id})")
await self.transport.send_event({
"event": "silence",
"trackId": self.session_id,
"timestamp": self._get_timestamp_ms(),
"startTime": self._get_timestamp_ms(),
"duration": 0 # TODO: Calculate actual duration
})
elif event_type == "eou":
logger.info(f"EOU detected (session {self.session_id})")
await self.transport.send_event({
"event": "eou",
"trackId": self.session_id,
"timestamp": self._get_timestamp_ms()
})
except Exception as e:
logger.error(f"Pipeline processing error: {e}", exc_info=True)
async def process_text_input(self, text: str) -> None:
"""
Process text input (chat command).
Args:
text: Text input
"""
logger.info(f"Processing text input: {text[:50]}...")
# TODO: Implement text processing (LLM integration, etc.)
# For now, just log it
async def interrupt(self) -> None:
"""Interrupt current audio playback."""
if self.is_bot_speaking:
self.interrupt_signal.set()
logger.info(f"Pipeline interrupted for session {self.session_id}")
async def cleanup(self) -> None:
"""Cleanup pipeline resources."""
logger.info(f"Cleaning up pipeline for session {self.session_id}")
self._running = False
self.interrupt_signal.set()
def _get_timestamp_ms(self) -> int:
"""Get current timestamp in milliseconds."""
import time
return int(time.time() * 1000)

View File

@@ -6,7 +6,7 @@ from typing import Optional, Dict, Any
from loguru import logger
from core.transports import BaseTransport
from core.pipeline import AudioPipeline
from core.duplex_pipeline import DuplexPipeline
from models.commands import parse_command, TTSCommand, ChatCommand, InterruptCommand, HangupCommand
from app.config import settings
@@ -16,7 +16,7 @@ class Session:
Manages a single call session.
Handles command routing, audio processing, and session lifecycle.
Supports both basic audio pipeline and full duplex voice conversation.
Uses full duplex voice conversation pipeline.
"""
def __init__(self, session_id: str, transport: BaseTransport, use_duplex: bool = None):
@@ -30,20 +30,14 @@ class Session:
"""
self.id = session_id
self.transport = transport
# Determine pipeline mode
self.use_duplex = use_duplex if use_duplex is not None else settings.duplex_enabled
if self.use_duplex:
from core.duplex_pipeline import DuplexPipeline
self.pipeline = DuplexPipeline(
transport=transport,
session_id=session_id,
system_prompt=settings.duplex_system_prompt,
greeting=settings.duplex_greeting
)
else:
self.pipeline = AudioPipeline(transport, session_id)
# Session state
self.created_at = None
@@ -129,10 +123,7 @@ class Session:
audio_bytes: PCM audio data
"""
try:
if self.use_duplex:
await self.pipeline.process_audio(audio_bytes)
else:
await self.pipeline.process_input(audio_bytes)
except Exception as e:
logger.error(f"Session {self.id} handle_audio error: {e}", exc_info=True)
@@ -148,8 +139,8 @@ class Session:
"timestamp": self._get_timestamp_ms()
})
# Start duplex pipeline if enabled
if self.use_duplex and not self._pipeline_started:
# Start duplex pipeline
if not self._pipeline_started:
try:
await self.pipeline.start()
self._pipeline_started = True
@@ -228,9 +219,6 @@ class Session:
logger.info(f"Session {self.id} graceful interrupt")
else:
logger.info(f"Session {self.id} immediate interrupt")
if self.use_duplex:
await self.pipeline.interrupt()
else:
await self.pipeline.interrupt()
async def _handle_pause(self) -> None:
@@ -267,11 +255,7 @@ class Session:
async def _handle_chat(self, command: ChatCommand) -> None:
"""Handle chat command."""
logger.info(f"Session {self.id} chat: {command.text[:50]}...")
# Process text input through pipeline
if self.use_duplex:
await self.pipeline.process_text(command.text)
else:
await self.pipeline.process_text_input(command.text)
async def _send_error(self, sender: str, error_message: str) -> None:
"""

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,96 @@
<svg width="1200" height="620" viewBox="0 0 1200 620" xmlns="http://www.w3.org/2000/svg">
<defs>
<style>
.box { fill:#11131a; stroke:#3a3f4b; stroke-width:1.2; rx:10; ry:10; }
.title { font: 600 14px 'Arial'; fill:#f2f3f7; }
.text { font: 12px 'Arial'; fill:#c8ccd8; }
.arrow { stroke:#7aa2ff; stroke-width:1.6; marker-end:url(#arrow); fill:none; }
.arrow2 { stroke:#2dd4bf; stroke-width:1.6; marker-end:url(#arrow); fill:none; }
.arrow3 { stroke:#ff6b6b; stroke-width:1.6; marker-end:url(#arrow); fill:none; }
.label { font: 11px 'Arial'; fill:#9aa3b2; }
</style>
<marker id="arrow" markerWidth="8" markerHeight="8" refX="7" refY="4" orient="auto">
<path d="M0,0 L8,4 L0,8 Z" fill="#7aa2ff"/>
</marker>
</defs>
<rect x="40" y="40" width="250" height="120" class="box"/>
<text x="60" y="70" class="title">Web Client</text>
<text x="60" y="95" class="text">WS JSON commands</text>
<text x="60" y="115" class="text">WS binary PCM audio</text>
<rect x="350" y="40" width="250" height="120" class="box"/>
<text x="370" y="70" class="title">FastAPI /ws</text>
<text x="370" y="95" class="text">Session + Transport</text>
<rect x="660" y="40" width="250" height="120" class="box"/>
<text x="680" y="70" class="title">DuplexPipeline</text>
<text x="680" y="95" class="text">process_audio / process_text</text>
<rect x="920" y="40" width="240" height="120" class="box"/>
<text x="940" y="70" class="title">ConversationManager</text>
<text x="940" y="95" class="text">turns + state</text>
<rect x="660" y="200" width="180" height="100" class="box"/>
<text x="680" y="230" class="title">VADProcessor</text>
<text x="680" y="255" class="text">speech/silence</text>
<rect x="860" y="200" width="180" height="100" class="box"/>
<text x="880" y="230" class="title">EOU Detector</text>
<text x="880" y="255" class="text">end-of-utterance</text>
<rect x="1060" y="200" width="120" height="100" class="box"/>
<text x="1075" y="230" class="title">ASR</text>
<text x="1075" y="255" class="text">transcripts</text>
<rect x="920" y="350" width="240" height="110" class="box"/>
<text x="940" y="380" class="title">LLM (stream)</text>
<text x="940" y="405" class="text">llmResponse events</text>
<rect x="660" y="350" width="220" height="110" class="box"/>
<text x="680" y="380" class="title">TTS (stream)</text>
<text x="680" y="405" class="text">PCM audio</text>
<rect x="40" y="350" width="250" height="110" class="box"/>
<text x="60" y="380" class="title">Web Client</text>
<text x="60" y="405" class="text">audio playback + UI</text>
<path d="M290 80 L350 80" class="arrow"/>
<text x="300" y="70" class="label">JSON / PCM</text>
<path d="M600 80 L660 80" class="arrow"/>
<text x="615" y="70" class="label">dispatch</text>
<path d="M910 80 L920 80" class="arrow"/>
<text x="880" y="70" class="label">turn mgmt</text>
<path d="M750 160 L750 200" class="arrow"/>
<text x="705" y="190" class="label">audio chunks</text>
<path d="M840 250 L860 250" class="arrow"/>
<text x="835" y="240" class="label">vad status</text>
<path d="M1040 250 L1060 250" class="arrow"/>
<text x="1010" y="240" class="label">audio buffer</text>
<path d="M950 300 L950 350" class="arrow2"/>
<text x="930" y="340" class="label">EOU -> LLM</text>
<path d="M880 405 L920 405" class="arrow2"/>
<text x="870" y="395" class="label">text stream</text>
<path d="M660 405 L290 405" class="arrow2"/>
<text x="430" y="395" class="label">PCM audio</text>
<path d="M660 450 L350 450" class="arrow"/>
<text x="420" y="440" class="label">events: trackStart/End</text>
<path d="M350 450 L290 450" class="arrow"/>
<text x="315" y="440" class="label">UI updates</text>
<path d="M750 200 L750 160" class="arrow3"/>
<text x="700" y="145" class="label">barge-in detection</text>
<path d="M760 170 L920 170" class="arrow3"/>
<text x="820" y="160" class="label">interrupt event + cancel</text>
</svg>

After

Width:  |  Height:  |  Size: 3.9 KiB

187
docs/proejct_todo.md Normal file
View File

@@ -0,0 +1,187 @@
# OmniSense: 12-Week Sprint Board + Tech Stack (Python Backend) — TODO
## Scope
- [ ] Build a realtime AI SaaS (OmniSense) focused on web-first audio + video with WebSocket + WebRTC endpoints
- [ ] Deliver assistant builder, tool execution, observability, evals, optional telephony later
- [ ] Keep scope aligned to 2-person team, self-hosted services
---
## Sprint Board (12 weeks, 2-week sprints)
Team assumption: 2 engineers. Scope prioritized to web-first audio + video, with BYO-SFU adapters.
### Sprint 1 (Weeks 12) — Realtime Core MVP (WebSocket + WebRTC Audio)
- Deliverables
- [ ] WebSocket transport: audio in/out streaming (1:1)
- [ ] WebRTC transport: audio in/out streaming (1:1)
- [ ] Adapter contract wired into runtime (transport-agnostic session core)
- [ ] ASR → LLM → TTS pipeline, streaming both directions
- [ ] Basic session state (start/stop, silence timeout)
- [ ] Transcript persistence
- Acceptance criteria
- [ ] < 1.5s median round-trip for short responses
- [ ] Stable streaming for 10+ minute session
### Sprint 2 (Weeks 34) — Video + Realtime UX
- Deliverables
- [ ] WebRTC video capture + streaming (assistant can “see” frames)
- [ ] WebSocket video streaming for local/dev mode
- [ ] Low-latency UI: push-to-talk, live captions, speaking indicator
- [ ] Recording + transcript storage (web sessions)
- Acceptance criteria
- [ ] Video < 2.5s end-to-end latency for analysis
- [ ] Audio quality acceptable (no clipping, jitter handling)
### Sprint 3 (Weeks 56) — Assistant Builder v1
- Deliverables
- [ ] Assistant schema + versioning
- [ ] UI: Model/Voice/Transcriber/Tools/Video/Transport tabs
- [ ] “Test/Chat/Talk to Assistant” (web)
- Acceptance criteria
- [ ] Create/publish assistant and run a live web session
- [ ] All config changes tracked by version
### Sprint 4 (Weeks 78) — Tooling + Structured Outputs
- Deliverables
- [ ] Tool registry + custom HTTP tools
- [ ] Tool auth secrets management
- [ ] Structured outputs (JSON extraction)
- Acceptance criteria
- [ ] Tool calls executed with retries/timeouts
- [ ] Structured JSON stored per call/session
### Sprint 5 (Weeks 910) — Observability + QA + Dev Platform
- Deliverables
- [ ] Session logs + chat logs + media logs
- [ ] Evals engine + test suites
- [ ] Basic analytics dashboard
- [ ] Public WebSocket API spec + message schema
- [ ] JS/TS SDK (connect, send audio/video, receive transcripts)
- Acceptance criteria
- [ ] Reproducible test suite runs
- [ ] Log filters by assistant/time/status
- [ ] SDK demo app runs end-to-end
### Sprint 6 (Weeks 1112) — SaaS Hardening
- Deliverables
- [ ] Org/RBAC + API keys + rate limits
- [ ] Usage metering + credits
- [ ] Stripe billing integration
- [ ] Self-hosted DB ops (migrations, backup/restore, monitoring)
- Acceptance criteria
- [ ] Metered usage per org
- [ ] Credits decrement correctly
- [ ] Optional telephony spike documented (defer build)
- [ ] Enterprise adapter guide published (BYO-SFU)
---
## Tech Stack by Service (Self-Hosted, Web-First)
### 1) Transport Gateway (Realtime)
- [ ] WebRTC (browser) + WebSocket (lightweight/dev) protocols
- [ ] BYO-SFU adapter (enterprise) + LiveKit optional adapter + WS transport server
- [ ] Python core (FastAPI + asyncio) + Node.js mediasoup adapters when needed
- [ ] Media: Opus/VP8, jitter buffer, VAD, echo cancellation
- [ ] Storage: S3-compatible (MinIO) for recordings
### 2) ASR Service
- [ ] Whisper (self-hosted) baseline
- [ ] gRPC/WebSocket streaming transport
- [ ] Python native service
- [ ] Optional cloud provider fallback (later)
### 3) TTS Service
- [ ] Piper or Coqui TTS (self-hosted)
- [ ] gRPC/WebSocket streaming transport
- [ ] Python native service
- [ ] Redis cache for common phrases
### 4) LLM Orchestrator
- [ ] Self-hosted (vLLM + open model)
- [ ] Python (FastAPI + asyncio)
- [ ] Streaming, tool calling, JSON mode
- [ ] Safety filters + prompt templates
### 5) Assistant Config Service
- [ ] PostgreSQL
- [ ] Python (SQLAlchemy or SQLModel)
- [ ] Versioning, publish/rollback
### 6) Session Service
- [ ] PostgreSQL + Redis
- [ ] Python
- [ ] State machine, timeouts, events
### 7) Tool Execution Layer
- [ ] PostgreSQL
- [ ] Python
- [ ] Auth secret vault, retry policies, tool schemas
### 8) Observability + Logs
- [ ] Postgres (metadata), ClickHouse (logs/metrics)
- [ ] OpenSearch for search
- [ ] Prometheus + Grafana metrics
- [ ] OpenTelemetry tracing
### 9) Billing + Usage Metering
- [ ] Stripe billing
- [ ] PostgreSQL
- [ ] NATS JetStream (events) + Redis counters
### 10) Web App (Dashboard)
- [ ] React + Next.js
- [ ] Tailwind or Radix UI
- [ ] WebRTC client + WS client; adapter-based RTC integration
- [ ] ECharts/Recharts
### 11) Auth + RBAC
- [ ] Keycloak (self-hosted) or custom JWT
- [ ] Org/user/role tables in Postgres
### 12) Public WebSocket API + SDK
- [ ] WS API: versioned schema, binary audio frames + JSON control messages
- [ ] SDKs: JS/TS first, optional Python/Go clients
- [ ] Docs: quickstart, auth flow, session lifecycle, examples
---
## Infrastructure (Self-Hosted)
- [ ] Docker Compose → k3s (later)
- [ ] Redis Streams or NATS
- [ ] MinIO object store
- [ ] GitHub Actions + Helm or kustomize
- [ ] Self-hosted Postgres + pgbackrest backups
- [ ] Vault for secrets
---
## Suggested MVP Sequence
- [ ] WebRTC demo + ASR/LLM/TTS streaming
- [ ] Assistant schema + versioning (web-first)
- [ ] Video capture + multimodal analysis
- [ ] Tool execution + structured outputs
- [ ] Logs + evals + public WS API + SDK
- [ ] Telephony (optional, later)
---
## Public WebSocket API (Minimum Spec)
- [ ] Auth: API key or JWT in initial `hello` message
- [ ] Core messages: `session.start`, `session.stop`, `audio.append`, `audio.commit`, `video.append`, `transcript.delta`, `assistant.response`, `tool.call`, `tool.result`, `error`
- [ ] Binary payloads: PCM/Opus frames with metadata in control channel
- [ ] Versioning: `v1` schema with backward compatibility rules
---
## Self-Hosted DB Ops Checklist
- [ ] Postgres in Docker/k3s with persistent volumes
- [ ] Migrations: `alembic` or `atlas`
- [ ] Backups: `pgbackrest` nightly + on-demand
- [ ] Monitoring: postgres_exporter + alerts
---
## RTC Adapter Contract (BYO-SFU First)
- [ ] Keep RTC pluggable; LiveKit optional, not core dependency
- [ ] Define adapter interface (TypeScript sketch)

View File

@@ -4,10 +4,12 @@ Microphone client for testing duplex voice conversation.
This client captures audio from the microphone, sends it to the server,
and plays back the AI's voice response through the speakers.
It also displays the LLM's text responses in the console.
Usage:
python examples/mic_client.py --url ws://localhost:8000/ws
python examples/mic_client.py --url ws://localhost:8000/ws --chat "Hello!"
python examples/mic_client.py --url ws://localhost:8000/ws --verbose
Requirements:
pip install sounddevice soundfile websockets numpy
@@ -17,6 +19,7 @@ import argparse
import asyncio
import json
import sys
import time
import threading
import queue
from pathlib import Path
@@ -93,6 +96,17 @@ class MicrophoneClient:
self.is_recording = True
self.is_playing = True
# TTFB tracking (Time to First Byte)
self.request_start_time = None
self.first_audio_received = False
# Interrupt handling - discard audio until next trackStart
self._discard_audio = False
self._audio_sequence = 0 # Track audio sequence to detect stale chunks
# Verbose mode for streaming LLM responses
self.verbose = False
async def connect(self) -> None:
"""Connect to WebSocket server."""
print(f"Connecting to {self.url}...")
@@ -117,6 +131,10 @@ class MicrophoneClient:
async def send_chat(self, text: str) -> None:
"""Send chat message (text input)."""
# Reset TTFB tracking for new request
self.request_start_time = time.time()
self.first_audio_received = False
await self.send_command({
"command": "chat",
"text": text
@@ -236,9 +254,21 @@ class MicrophoneClient:
# Audio data received
self.bytes_received += len(message)
# Check if we should discard this audio (after interrupt)
if self._discard_audio:
duration_ms = len(message) / (self.sample_rate * 2) * 1000
print(f"← Audio: {duration_ms:.0f}ms (DISCARDED - waiting for new track)")
continue
if self.is_playing:
self._add_audio_to_buffer(message)
# Calculate and display TTFB for first audio packet
if not self.first_audio_received and self.request_start_time:
client_ttfb_ms = (time.time() - self.request_start_time) * 1000
self.first_audio_received = True
print(f"← [TTFB] Client first audio latency: {client_ttfb_ms:.0f}ms")
# Show progress (less verbose)
with self.audio_output_lock:
buffer_ms = len(self.audio_output_buffer) / (self.sample_rate * 2) * 1000
@@ -285,20 +315,47 @@ class MicrophoneClient:
# Interim result - show with indicator (overwrite same line)
display_text = text[:60] + "..." if len(text) > 60 else text
print(f" [listening] {display_text}".ljust(80), end="\r")
elif event_type == "ttfb":
# Server-side TTFB event
latency_ms = event.get("latencyMs", 0)
print(f"← [TTFB] Server reported latency: {latency_ms}ms")
elif event_type == "llmResponse":
# LLM text response
text = event.get("text", "")
is_final = event.get("isFinal", False)
if is_final:
# Print final LLM response
print(f"← AI: {text}")
elif self.verbose:
# Show streaming chunks only in verbose mode
display_text = text[:60] + "..." if len(text) > 60 else text
print(f" [streaming] {display_text}")
elif event_type == "trackStart":
print("← Bot started speaking")
# IMPORTANT: Accept audio again after trackStart
self._discard_audio = False
self._audio_sequence += 1
# Reset TTFB tracking for voice responses (when no chat was sent)
if self.request_start_time is None:
self.request_start_time = time.time()
self.first_audio_received = False
# Clear any old audio in buffer
with self.audio_output_lock:
self.audio_output_buffer = b""
elif event_type == "trackEnd":
print("← Bot finished speaking")
# Reset TTFB tracking after response completes
self.request_start_time = None
self.first_audio_received = False
elif event_type == "interrupt":
print("← Bot interrupted!")
# IMPORTANT: Clear audio buffer immediately on interrupt
# IMPORTANT: Discard all audio until next trackStart
self._discard_audio = True
# Clear audio buffer immediately
with self.audio_output_lock:
buffer_ms = len(self.audio_output_buffer) / (self.sample_rate * 2) * 1000
self.audio_output_buffer = b""
print(f" (cleared {buffer_ms:.0f}ms of buffered audio)")
print(f" (cleared {buffer_ms:.0f}ms, discarding audio until new track)")
elif event_type == "error":
print(f"← Error: {event.get('error')}")
elif event_type == "hangup":
@@ -511,6 +568,11 @@ async def main():
action="store_true",
help="Disable interactive mode"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Show streaming LLM response chunks"
)
args = parser.parse_args()
@@ -524,6 +586,7 @@ async def main():
input_device=args.input_device,
output_device=args.output_device
)
client.verbose = args.verbose
await client.run(
chat_message=args.chat,

View File

@@ -12,6 +12,7 @@ import argparse
import asyncio
import json
import sys
import time
import wave
import io
@@ -68,6 +69,13 @@ class SimpleVoiceClient:
# Stats
self.bytes_received = 0
# TTFB tracking (Time to First Byte)
self.request_start_time = None
self.first_audio_received = False
# Interrupt handling - discard audio until next trackStart
self._discard_audio = False
async def connect(self):
"""Connect to server."""
print(f"Connecting to {self.url}...")
@@ -84,6 +92,10 @@ class SimpleVoiceClient:
async def send_chat(self, text: str):
"""Send chat message."""
# Reset TTFB tracking for new request
self.request_start_time = time.time()
self.first_audio_received = False
await self.ws.send(json.dumps({"command": "chat", "text": text}))
print(f"-> chat: {text}")
@@ -120,6 +132,18 @@ class SimpleVoiceClient:
# Audio data
self.bytes_received += len(msg)
duration_ms = len(msg) / (self.sample_rate * 2) * 1000
# Check if we should discard this audio (after interrupt)
if self._discard_audio:
print(f"<- audio: {len(msg)} bytes ({duration_ms:.0f}ms) [DISCARDED]")
continue
# Calculate and display TTFB for first audio packet
if not self.first_audio_received and self.request_start_time:
client_ttfb_ms = (time.time() - self.request_start_time) * 1000
self.first_audio_received = True
print(f"<- [TTFB] Client first audio latency: {client_ttfb_ms:.0f}ms")
print(f"<- audio: {len(msg)} bytes ({duration_ms:.0f}ms)")
# Play immediately in executor to not block
@@ -138,6 +162,18 @@ class SimpleVoiceClient:
print(f"<- You said: {text}")
else:
print(f"<- [listening] {text}", end="\r")
elif etype == "ttfb":
# Server-side TTFB event
latency_ms = event.get("latencyMs", 0)
print(f"<- [TTFB] Server reported latency: {latency_ms}ms")
elif etype == "trackStart":
# New track starting - accept audio again
self._discard_audio = False
print(f"<- {etype}")
elif etype == "interrupt":
# Interrupt - discard audio until next trackStart
self._discard_audio = True
print(f"<- {etype} (discarding audio until new track)")
elif etype == "hangup":
print(f"<- {etype}")
self.running = False

504
examples/wav_client.py Normal file
View File

@@ -0,0 +1,504 @@
#!/usr/bin/env python3
"""
WAV file client for testing duplex voice conversation.
This client reads audio from a WAV file, sends it to the server,
and saves the AI's voice response to an output WAV file.
Usage:
python examples/wav_client.py --input input.wav --output response.wav
python examples/wav_client.py --input input.wav --output response.wav --url ws://localhost:8000/ws
python examples/wav_client.py --input input.wav --output response.wav --wait-time 10
python wav_client.py --input ../data/audio_examples/two_utterances.wav -o response.wav
Requirements:
pip install soundfile websockets numpy
"""
import argparse
import asyncio
import json
import sys
import time
import wave
from pathlib import Path
try:
import numpy as np
except ImportError:
print("Please install numpy: pip install numpy")
sys.exit(1)
try:
import soundfile as sf
except ImportError:
print("Please install soundfile: pip install soundfile")
sys.exit(1)
try:
import websockets
except ImportError:
print("Please install websockets: pip install websockets")
sys.exit(1)
class WavFileClient:
"""
WAV file client for voice conversation testing.
Features:
- Read audio from WAV file
- Send audio to WebSocket server
- Receive and save response audio
- Event logging
"""
def __init__(
self,
url: str,
input_file: str,
output_file: str,
sample_rate: int = 16000,
chunk_duration_ms: int = 20,
wait_time: float = 15.0,
verbose: bool = False
):
"""
Initialize WAV file client.
Args:
url: WebSocket server URL
input_file: Input WAV file path
output_file: Output WAV file path
sample_rate: Audio sample rate (Hz)
chunk_duration_ms: Audio chunk duration (ms) for sending
wait_time: Time to wait for response after sending (seconds)
verbose: Enable verbose output
"""
self.url = url
self.input_file = Path(input_file)
self.output_file = Path(output_file)
self.sample_rate = sample_rate
self.chunk_duration_ms = chunk_duration_ms
self.chunk_samples = int(sample_rate * chunk_duration_ms / 1000)
self.wait_time = wait_time
self.verbose = verbose
# WebSocket connection
self.ws = None
self.running = False
# Audio buffers
self.received_audio = bytearray()
# Statistics
self.bytes_sent = 0
self.bytes_received = 0
# TTFB tracking (per response)
self.send_start_time = None
self.response_start_time = None # set on each trackStart
self.waiting_for_first_audio = False
self.ttfb_ms = None # last TTFB for summary
self.ttfb_list = [] # TTFB for each response
# State tracking
self.track_started = False
self.track_ended = False
self.send_completed = False
# Events log
self.events_log = []
def log_event(self, direction: str, message: str):
"""Log an event with timestamp."""
timestamp = time.time()
self.events_log.append({
"timestamp": timestamp,
"direction": direction,
"message": message
})
# Handle encoding errors on Windows
try:
print(f"{direction} {message}")
except UnicodeEncodeError:
# Replace problematic characters for console output
safe_message = message.encode('ascii', errors='replace').decode('ascii')
print(f"{direction} {safe_message}")
async def connect(self) -> None:
"""Connect to WebSocket server."""
self.log_event("", f"Connecting to {self.url}...")
self.ws = await websockets.connect(self.url)
self.running = True
self.log_event("", "Connected!")
# Send invite command
await self.send_command({
"command": "invite",
"option": {
"codec": "pcm",
"sampleRate": self.sample_rate
}
})
async def send_command(self, cmd: dict) -> None:
"""Send JSON command to server."""
if self.ws:
await self.ws.send(json.dumps(cmd))
self.log_event("", f"Command: {cmd.get('command', 'unknown')}")
async def send_hangup(self, reason: str = "Session complete") -> None:
"""Send hangup command."""
await self.send_command({
"command": "hangup",
"reason": reason
})
def load_wav_file(self) -> tuple[np.ndarray, int]:
"""
Load and prepare WAV file for sending.
Returns:
Tuple of (audio_data as int16 numpy array, original sample rate)
"""
if not self.input_file.exists():
raise FileNotFoundError(f"Input file not found: {self.input_file}")
# Load audio file
audio_data, file_sample_rate = sf.read(self.input_file)
self.log_event("", f"Loaded: {self.input_file}")
self.log_event("", f" Original sample rate: {file_sample_rate} Hz")
self.log_event("", f" Duration: {len(audio_data) / file_sample_rate:.2f}s")
# Convert stereo to mono if needed
if len(audio_data.shape) > 1:
audio_data = audio_data.mean(axis=1)
self.log_event("", " Converted stereo to mono")
# Resample if needed
if file_sample_rate != self.sample_rate:
# Simple resampling using numpy
duration = len(audio_data) / file_sample_rate
num_samples = int(duration * self.sample_rate)
indices = np.linspace(0, len(audio_data) - 1, num_samples)
audio_data = np.interp(indices, np.arange(len(audio_data)), audio_data)
self.log_event("", f" Resampled to {self.sample_rate} Hz")
# Convert to int16
if audio_data.dtype != np.int16:
# Normalize to [-1, 1] if needed
max_val = np.max(np.abs(audio_data))
if max_val > 1.0:
audio_data = audio_data / max_val
audio_data = (audio_data * 32767).astype(np.int16)
self.log_event("", f" Prepared: {len(audio_data)} samples ({len(audio_data)/self.sample_rate:.2f}s)")
return audio_data, file_sample_rate
async def audio_sender(self, audio_data: np.ndarray) -> None:
"""Send audio data to server in chunks."""
total_samples = len(audio_data)
chunk_size = self.chunk_samples
sent_samples = 0
self.send_start_time = time.time()
self.log_event("", f"Starting audio transmission ({total_samples} samples)...")
while sent_samples < total_samples and self.running:
# Get next chunk
end_sample = min(sent_samples + chunk_size, total_samples)
chunk = audio_data[sent_samples:end_sample]
chunk_bytes = chunk.tobytes()
# Send to server
if self.ws:
await self.ws.send(chunk_bytes)
self.bytes_sent += len(chunk_bytes)
sent_samples = end_sample
# Progress logging (every 500ms worth of audio)
if self.verbose and sent_samples % (self.sample_rate // 2) == 0:
progress = (sent_samples / total_samples) * 100
print(f" Sending: {progress:.0f}%", end="\r")
# Delay to simulate real-time streaming
# Server expects audio at real-time pace for VAD/ASR to work properly
await asyncio.sleep(self.chunk_duration_ms / 1000)
self.send_completed = True
elapsed = time.time() - self.send_start_time
self.log_event("", f"Audio transmission complete ({elapsed:.2f}s, {self.bytes_sent/1024:.1f} KB)")
async def receiver(self) -> None:
"""Receive messages from server."""
try:
while self.running:
try:
message = await asyncio.wait_for(self.ws.recv(), timeout=0.1)
if isinstance(message, bytes):
# Audio data received
self.bytes_received += len(message)
self.received_audio.extend(message)
# Calculate TTFB on first audio of each response
if self.waiting_for_first_audio and self.response_start_time is not None:
ttfb_ms = (time.time() - self.response_start_time) * 1000
self.ttfb_ms = ttfb_ms
self.ttfb_list.append(ttfb_ms)
self.waiting_for_first_audio = False
self.log_event("", f"[TTFB] First audio latency: {ttfb_ms:.0f}ms")
# Log progress
duration_ms = len(message) / (self.sample_rate * 2) * 1000
total_ms = len(self.received_audio) / (self.sample_rate * 2) * 1000
if self.verbose:
print(f"← Audio: +{duration_ms:.0f}ms (total: {total_ms:.0f}ms)", end="\r")
else:
# JSON event
event = json.loads(message)
await self._handle_event(event)
except asyncio.TimeoutError:
continue
except websockets.ConnectionClosed:
self.log_event("", "Connection closed")
self.running = False
break
except asyncio.CancelledError:
pass
except Exception as e:
self.log_event("!", f"Receiver error: {e}")
self.running = False
async def _handle_event(self, event: dict) -> None:
"""Handle incoming event."""
event_type = event.get("event", "unknown")
if event_type == "answer":
self.log_event("", "Session ready!")
elif event_type == "speaking":
self.log_event("", "Speech detected")
elif event_type == "silence":
self.log_event("", "Silence detected")
elif event_type == "transcript":
# ASR transcript (interim = asrDelta-style, final = asrFinal-style)
text = event.get("text", "")
is_final = event.get("isFinal", False)
if is_final:
# Clear interim line and print final
print(" " * 80, end="\r")
self.log_event("", f"→ You: {text}")
else:
# Interim result - show with indicator (overwrite same line, as in mic_client)
display_text = text[:60] + "..." if len(text) > 60 else text
print(f" [listening] {display_text}".ljust(80), end="\r")
elif event_type == "ttfb":
latency_ms = event.get("latencyMs", 0)
self.log_event("", f"[TTFB] Server latency: {latency_ms}ms")
elif event_type == "llmResponse":
text = event.get("text", "")
is_final = event.get("isFinal", False)
if is_final:
self.log_event("", f"LLM Response (final): {text[:100]}{'...' if len(text) > 100 else ''}")
elif self.verbose:
# Show streaming chunks only in verbose mode
self.log_event("", f"LLM: {text}")
elif event_type == "trackStart":
self.track_started = True
self.response_start_time = time.time()
self.waiting_for_first_audio = True
self.log_event("", "Bot started speaking")
elif event_type == "trackEnd":
self.track_ended = True
self.log_event("", "Bot finished speaking")
elif event_type == "interrupt":
self.log_event("", "Bot interrupted!")
elif event_type == "error":
self.log_event("!", f"Error: {event.get('error')}")
elif event_type == "hangup":
self.log_event("", f"Hangup: {event.get('reason')}")
self.running = False
else:
self.log_event("", f"Event: {event_type}")
def save_output_wav(self) -> None:
"""Save received audio to output WAV file."""
if not self.received_audio:
self.log_event("!", "No audio received to save")
return
# Convert bytes to numpy array
audio_data = np.frombuffer(bytes(self.received_audio), dtype=np.int16)
# Ensure output directory exists
self.output_file.parent.mkdir(parents=True, exist_ok=True)
# Save using wave module for compatibility
with wave.open(str(self.output_file), 'wb') as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2) # 16-bit
wav_file.setframerate(self.sample_rate)
wav_file.writeframes(audio_data.tobytes())
duration = len(audio_data) / self.sample_rate
self.log_event("", f"Saved output: {self.output_file}")
self.log_event("", f" Duration: {duration:.2f}s ({len(audio_data)} samples)")
self.log_event("", f" Size: {len(self.received_audio)/1024:.1f} KB")
async def run(self) -> None:
"""Run the WAV file test."""
try:
# Load input WAV file
audio_data, _ = self.load_wav_file()
# Connect to server
await self.connect()
# Wait for answer
await asyncio.sleep(0.5)
# Start receiver task
receiver_task = asyncio.create_task(self.receiver())
# Send audio
await self.audio_sender(audio_data)
# Wait for response
self.log_event("", f"Waiting {self.wait_time}s for response...")
wait_start = time.time()
while self.running and (time.time() - wait_start) < self.wait_time:
# Check if track has ended (response complete)
if self.track_ended and self.send_completed:
# Give a little extra time for any remaining audio
await asyncio.sleep(1.0)
break
await asyncio.sleep(0.1)
# Cleanup
self.running = False
receiver_task.cancel()
try:
await receiver_task
except asyncio.CancelledError:
pass
# Save output
self.save_output_wav()
# Print summary
self._print_summary()
except FileNotFoundError as e:
print(f"Error: {e}")
sys.exit(1)
except ConnectionRefusedError:
print(f"Error: Could not connect to {self.url}")
print("Make sure the server is running.")
sys.exit(1)
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
finally:
await self.close()
def _print_summary(self):
"""Print session summary."""
print("\n" + "=" * 50)
print("Session Summary")
print("=" * 50)
print(f" Input file: {self.input_file}")
print(f" Output file: {self.output_file}")
print(f" Bytes sent: {self.bytes_sent / 1024:.1f} KB")
print(f" Bytes received: {self.bytes_received / 1024:.1f} KB")
if self.ttfb_list:
if len(self.ttfb_list) == 1:
print(f" TTFB: {self.ttfb_list[0]:.0f} ms")
else:
print(f" TTFB (per response): {', '.join(f'{t:.0f}ms' for t in self.ttfb_list)}")
if self.received_audio:
duration = len(self.received_audio) / (self.sample_rate * 2)
print(f" Response duration: {duration:.2f}s")
print("=" * 50)
async def close(self) -> None:
"""Close the connection."""
self.running = False
if self.ws:
try:
await self.ws.close()
except:
pass
async def main():
parser = argparse.ArgumentParser(
description="WAV file client for testing duplex voice conversation"
)
parser.add_argument(
"--input", "-i",
required=True,
help="Input WAV file path"
)
parser.add_argument(
"--output", "-o",
required=True,
help="Output WAV file path for response"
)
parser.add_argument(
"--url",
default="ws://localhost:8000/ws",
help="WebSocket server URL (default: ws://localhost:8000/ws)"
)
parser.add_argument(
"--sample-rate",
type=int,
default=16000,
help="Target sample rate for audio (default: 16000)"
)
parser.add_argument(
"--chunk-duration",
type=int,
default=20,
help="Chunk duration in ms for sending (default: 20)"
)
parser.add_argument(
"--wait-time", "-w",
type=float,
default=15.0,
help="Time to wait for response after sending (default: 15.0)"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose output"
)
args = parser.parse_args()
client = WavFileClient(
url=args.url,
input_file=args.input,
output_file=args.output,
sample_rate=args.sample_rate,
chunk_duration_ms=args.chunk_duration,
wait_time=args.wait_time,
verbose=args.verbose
)
await client.run()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nInterrupted by user")

742
examples/web_client.html Normal file
View File

@@ -0,0 +1,742 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>Duplex Voice Web Client</title>
<style>
@import url("https://fonts.googleapis.com/css2?family=Fraunces:opsz,wght@9..144,300;9..144,500;9..144,700&family=Recursive:wght@300;400;600;700&display=swap");
:root {
--bg: #0b0b0f;
--panel: #14141c;
--panel-2: #101018;
--ink: #f2f3f7;
--muted: #a7acba;
--accent: #ff6b6b;
--accent-2: #ffd166;
--good: #2dd4bf;
--bad: #f87171;
--grid: rgba(255, 255, 255, 0.06);
--shadow: 0 20px 60px rgba(0, 0, 0, 0.45);
}
* {
box-sizing: border-box;
}
html,
body {
height: 100%;
margin: 0;
color: var(--ink);
background: radial-gradient(1200px 600px at 20% -10%, #1d1d2a 0%, transparent 60%),
radial-gradient(800px 800px at 110% 10%, #20203a 0%, transparent 50%),
var(--bg);
font-family: "Recursive", ui-sans-serif, system-ui, -apple-system, "Segoe UI", sans-serif;
}
.noise {
position: fixed;
inset: 0;
background-image: url("data:image/svg+xml;utf8,<svg xmlns='http://www.w3.org/2000/svg' width='120' height='120' viewBox='0 0 120 120'><filter id='n'><feTurbulence type='fractalNoise' baseFrequency='0.9' numOctaves='2' stitchTiles='stitch'/></filter><rect width='120' height='120' filter='url(%23n)' opacity='0.06'/></svg>");
pointer-events: none;
mix-blend-mode: soft-light;
}
header {
padding: 32px 28px 18px;
border-bottom: 1px solid var(--grid);
}
h1 {
font-family: "Fraunces", serif;
font-weight: 600;
margin: 0 0 6px;
letter-spacing: 0.4px;
}
.subtitle {
color: var(--muted);
font-size: 0.95rem;
}
main {
display: grid;
grid-template-columns: 1.1fr 1.4fr;
gap: 24px;
padding: 24px 28px 40px;
}
.panel {
background: linear-gradient(180deg, rgba(255, 255, 255, 0.02), transparent),
var(--panel);
border: 1px solid var(--grid);
border-radius: 16px;
padding: 20px;
box-shadow: var(--shadow);
}
.panel h2 {
margin: 0 0 12px;
font-size: 1.05rem;
font-weight: 600;
}
.stack {
display: grid;
gap: 12px;
}
label {
display: block;
font-size: 0.85rem;
color: var(--muted);
margin-bottom: 6px;
}
input,
select,
button,
textarea {
font-family: inherit;
}
input,
select,
textarea {
width: 100%;
padding: 10px 12px;
border-radius: 10px;
border: 1px solid var(--grid);
background: var(--panel-2);
color: var(--ink);
outline: none;
}
textarea {
min-height: 80px;
resize: vertical;
}
.row {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 12px;
}
.btn-row {
display: flex;
flex-wrap: wrap;
gap: 10px;
}
button {
border: none;
border-radius: 999px;
padding: 10px 16px;
font-weight: 600;
background: var(--ink);
color: #111;
cursor: pointer;
transition: transform 0.2s ease, box-shadow 0.2s ease;
}
button.secondary {
background: transparent;
color: var(--ink);
border: 1px solid var(--grid);
}
button.accent {
background: linear-gradient(120deg, var(--accent), #f97316);
color: #0b0b0f;
}
button.good {
background: linear-gradient(120deg, var(--good), #22c55e);
color: #07261f;
}
button.bad {
background: linear-gradient(120deg, var(--bad), #f97316);
color: #2a0b0b;
}
button:active {
transform: translateY(1px) scale(0.99);
}
.status {
display: flex;
align-items: center;
gap: 12px;
padding: 12px;
background: rgba(255, 255, 255, 0.03);
border-radius: 12px;
border: 1px dashed var(--grid);
font-size: 0.9rem;
}
.dot {
width: 10px;
height: 10px;
border-radius: 999px;
background: var(--bad);
box-shadow: 0 0 12px rgba(248, 113, 113, 0.5);
}
.dot.on {
background: var(--good);
box-shadow: 0 0 12px rgba(45, 212, 191, 0.7);
}
.log {
height: 320px;
overflow: auto;
padding: 12px;
background: #0d0d14;
border-radius: 12px;
border: 1px solid var(--grid);
font-size: 0.85rem;
line-height: 1.4;
}
.chat {
height: 260px;
overflow: auto;
padding: 12px;
background: #0d0d14;
border-radius: 12px;
border: 1px solid var(--grid);
font-size: 0.9rem;
line-height: 1.45;
}
.chat-entry {
padding: 8px 10px;
margin-bottom: 8px;
border-radius: 10px;
background: rgba(255, 255, 255, 0.04);
border: 1px solid rgba(255, 255, 255, 0.06);
}
.chat-entry.user {
border-left: 3px solid var(--accent-2);
}
.chat-entry.ai {
border-left: 3px solid var(--good);
}
.chat-entry.interim {
opacity: 0.7;
font-style: italic;
}
.log-entry {
padding: 6px 8px;
border-bottom: 1px dashed rgba(255, 255, 255, 0.06);
}
.log-entry:last-child {
border-bottom: none;
}
.tag {
display: inline-flex;
align-items: center;
gap: 6px;
padding: 2px 8px;
border-radius: 999px;
font-size: 0.7rem;
text-transform: uppercase;
letter-spacing: 0.6px;
background: rgba(255, 255, 255, 0.08);
color: var(--muted);
}
.tag.event {
background: rgba(255, 107, 107, 0.18);
color: #ffc1c1;
}
.tag.audio {
background: rgba(45, 212, 191, 0.2);
color: #c5f9f0;
}
.tag.sys {
background: rgba(255, 209, 102, 0.2);
color: #ffefb0;
}
.muted {
color: var(--muted);
}
footer {
padding: 0 28px 28px;
color: var(--muted);
font-size: 0.8rem;
}
@media (max-width: 1100px) {
main {
grid-template-columns: 1fr;
}
.log {
height: 360px;
}
.chat {
height: 260px;
}
}
</style>
</head>
<body>
<div class="noise"></div>
<header>
<h1>Duplex Voice Client</h1>
<div class="subtitle">Browser client for the WebSocket duplex pipeline. Device selection + event logging.</div>
</header>
<main>
<section class="panel stack">
<h2>Connection</h2>
<div>
<label for="wsUrl">WebSocket URL</label>
<input id="wsUrl" value="ws://localhost:8000/ws" />
</div>
<div class="btn-row">
<button class="accent" id="connectBtn">Connect</button>
<button class="secondary" id="disconnectBtn">Disconnect</button>
</div>
<div class="status">
<div id="statusDot" class="dot"></div>
<div>
<div id="statusText">Disconnected</div>
<div class="muted" id="statusSub">Waiting for connection</div>
</div>
</div>
<h2>Devices</h2>
<div class="row">
<div>
<label for="inputSelect">Input (Mic)</label>
<select id="inputSelect"></select>
</div>
<div>
<label for="outputSelect">Output (Speaker)</label>
<select id="outputSelect"></select>
</div>
</div>
<div class="btn-row">
<button class="secondary" id="refreshDevicesBtn">Refresh Devices</button>
<button class="good" id="startMicBtn">Start Mic</button>
<button class="secondary" id="stopMicBtn">Stop Mic</button>
</div>
<h2>Chat</h2>
<div class="stack">
<textarea id="chatInput" placeholder="Type a message, press Send"></textarea>
<div class="btn-row">
<button class="accent" id="sendChatBtn">Send Chat</button>
<button class="secondary" id="clearLogBtn">Clear Log</button>
</div>
</div>
</section>
<section class="stack">
<div class="panel stack">
<h2>Chat History</h2>
<div class="chat" id="chatHistory"></div>
</div>
<div class="panel stack">
<h2>Event Log</h2>
<div class="log" id="log"></div>
</div>
</section>
</main>
<footer>
Output device selection requires HTTPS + a browser that supports <code>setSinkId</code>.
Audio is sent as 16-bit PCM @ 16 kHz, matching <code>examples/mic_client.py</code>.
</footer>
<audio id="audioOut" autoplay></audio>
<script>
const wsUrl = document.getElementById("wsUrl");
const connectBtn = document.getElementById("connectBtn");
const disconnectBtn = document.getElementById("disconnectBtn");
const inputSelect = document.getElementById("inputSelect");
const outputSelect = document.getElementById("outputSelect");
const startMicBtn = document.getElementById("startMicBtn");
const stopMicBtn = document.getElementById("stopMicBtn");
const refreshDevicesBtn = document.getElementById("refreshDevicesBtn");
const sendChatBtn = document.getElementById("sendChatBtn");
const clearLogBtn = document.getElementById("clearLogBtn");
const chatInput = document.getElementById("chatInput");
const logEl = document.getElementById("log");
const chatHistory = document.getElementById("chatHistory");
const statusDot = document.getElementById("statusDot");
const statusText = document.getElementById("statusText");
const statusSub = document.getElementById("statusSub");
const audioOut = document.getElementById("audioOut");
let ws = null;
let audioCtx = null;
let micStream = null;
let processor = null;
let micSource = null;
let playbackDest = null;
let playbackTime = 0;
let discardAudio = false;
let playbackSources = [];
let interimUserEl = null;
let interimAiEl = null;
let interimUserText = "";
let interimAiText = "";
const targetSampleRate = 16000;
function logLine(type, text, data) {
const time = new Date().toLocaleTimeString();
const entry = document.createElement("div");
entry.className = "log-entry";
const tag = document.createElement("span");
tag.className = `tag ${type}`;
tag.textContent = type.toUpperCase();
const msg = document.createElement("span");
msg.style.marginLeft = "10px";
msg.textContent = `[${time}] ${text}`;
entry.appendChild(tag);
entry.appendChild(msg);
if (data) {
const pre = document.createElement("div");
pre.className = "muted";
pre.textContent = JSON.stringify(data);
pre.style.marginTop = "4px";
entry.appendChild(pre);
}
logEl.appendChild(entry);
logEl.scrollTop = logEl.scrollHeight;
}
function addChat(role, text) {
const entry = document.createElement("div");
entry.className = `chat-entry ${role === "AI" ? "ai" : "user"}`;
entry.textContent = `${role}: ${text}`;
chatHistory.appendChild(entry);
chatHistory.scrollTop = chatHistory.scrollHeight;
}
function setInterim(role, text) {
const isAi = role === "AI";
let el = isAi ? interimAiEl : interimUserEl;
if (!text) {
if (el) el.remove();
if (isAi) interimAiEl = null;
else interimUserEl = null;
if (isAi) interimAiText = "";
else interimUserText = "";
return;
}
if (!el) {
el = document.createElement("div");
el.className = `chat-entry ${isAi ? "ai" : "user"} interim`;
chatHistory.appendChild(el);
if (isAi) interimAiEl = el;
else interimUserEl = el;
}
el.textContent = `${role} (interim): ${text}`;
chatHistory.scrollTop = chatHistory.scrollHeight;
}
function stopPlayback() {
discardAudio = true;
playbackTime = audioCtx ? audioCtx.currentTime : 0;
playbackSources.forEach((s) => {
try {
s.stop();
} catch (err) {}
});
playbackSources = [];
}
function setStatus(connected, detail) {
statusDot.classList.toggle("on", connected);
statusText.textContent = connected ? "Connected" : "Disconnected";
statusSub.textContent = detail || "";
}
async function ensureAudioContext() {
if (audioCtx) return;
audioCtx = new (window.AudioContext || window.webkitAudioContext)();
playbackDest = audioCtx.createMediaStreamDestination();
audioOut.srcObject = playbackDest.stream;
try {
await audioOut.play();
} catch (err) {
logLine("sys", "Audio playback blocked (user gesture needed)", { err: String(err) });
}
if (outputSelect.value) {
await setOutputDevice(outputSelect.value);
}
}
function downsampleBuffer(buffer, inRate, outRate) {
if (outRate === inRate) return buffer;
const ratio = inRate / outRate;
const newLength = Math.round(buffer.length / ratio);
const result = new Float32Array(newLength);
let offsetResult = 0;
let offsetBuffer = 0;
while (offsetResult < result.length) {
const nextOffsetBuffer = Math.round((offsetResult + 1) * ratio);
let accum = 0;
let count = 0;
for (let i = offsetBuffer; i < nextOffsetBuffer && i < buffer.length; i++) {
accum += buffer[i];
count++;
}
result[offsetResult] = accum / count;
offsetResult++;
offsetBuffer = nextOffsetBuffer;
}
return result;
}
function floatTo16BitPCM(float32) {
const out = new Int16Array(float32.length);
for (let i = 0; i < float32.length; i++) {
const s = Math.max(-1, Math.min(1, float32[i]));
out[i] = s < 0 ? s * 0x8000 : s * 0x7fff;
}
return out;
}
function schedulePlayback(int16Data) {
if (!audioCtx || !playbackDest) return;
if (discardAudio) return;
const float32 = new Float32Array(int16Data.length);
for (let i = 0; i < int16Data.length; i++) {
float32[i] = int16Data[i] / 32768;
}
const buffer = audioCtx.createBuffer(1, float32.length, targetSampleRate);
buffer.copyToChannel(float32, 0);
const source = audioCtx.createBufferSource();
source.buffer = buffer;
source.connect(playbackDest);
const startTime = Math.max(audioCtx.currentTime + 0.02, playbackTime);
source.start(startTime);
playbackTime = startTime + buffer.duration;
playbackSources.push(source);
source.onended = () => {
playbackSources = playbackSources.filter((s) => s !== source);
};
}
async function connect() {
if (ws && ws.readyState === WebSocket.OPEN) return;
ws = new WebSocket(wsUrl.value.trim());
ws.binaryType = "arraybuffer";
ws.onopen = () => {
setStatus(true, "Session open");
logLine("sys", "WebSocket connected");
ensureAudioContext();
sendCommand({ command: "invite", option: { codec: "pcm", sampleRate: targetSampleRate } });
};
ws.onclose = () => {
setStatus(false, "Connection closed");
logLine("sys", "WebSocket closed");
ws = null;
};
ws.onerror = (err) => {
logLine("sys", "WebSocket error", { err: String(err) });
};
ws.onmessage = (msg) => {
if (typeof msg.data === "string") {
const event = JSON.parse(msg.data);
handleEvent(event);
} else {
const audioBuf = msg.data;
const int16 = new Int16Array(audioBuf);
schedulePlayback(int16);
logLine("audio", `Audio ${Math.round((int16.length / targetSampleRate) * 1000)}ms`);
}
};
}
function disconnect() {
if (ws) ws.close();
ws = null;
setStatus(false, "Disconnected");
}
function sendCommand(cmd) {
if (!ws || ws.readyState !== WebSocket.OPEN) {
logLine("sys", "Not connected");
return;
}
ws.send(JSON.stringify(cmd));
logLine("sys", `${cmd.command}`, cmd);
}
function handleEvent(event) {
const type = event.event || "unknown";
logLine("event", type, event);
if (type === "transcript") {
if (event.isFinal && event.text) {
setInterim("You", "");
addChat("You", event.text);
} else if (event.text) {
interimUserText += event.text;
setInterim("You", interimUserText);
}
}
if (type === "llmResponse") {
if (event.isFinal && event.text) {
setInterim("AI", "");
addChat("AI", event.text);
} else if (event.text) {
interimAiText += event.text;
setInterim("AI", interimAiText);
}
}
if (type === "trackStart") {
// New bot audio: stop any previous playback to avoid overlap
stopPlayback();
discardAudio = false;
}
if (type === "speaking") {
// User started speaking: clear any in-flight audio to avoid overlap
stopPlayback();
}
if (type === "interrupt") {
stopPlayback();
}
}
async function startMic() {
if (!ws || ws.readyState !== WebSocket.OPEN) {
logLine("sys", "Connect before starting mic");
return;
}
await ensureAudioContext();
const deviceId = inputSelect.value || undefined;
micStream = await navigator.mediaDevices.getUserMedia({
audio: deviceId ? { deviceId: { exact: deviceId } } : true,
});
micSource = audioCtx.createMediaStreamSource(micStream);
processor = audioCtx.createScriptProcessor(2048, 1, 1);
processor.onaudioprocess = (e) => {
if (!ws || ws.readyState !== WebSocket.OPEN) return;
const input = e.inputBuffer.getChannelData(0);
const downsampled = downsampleBuffer(input, audioCtx.sampleRate, targetSampleRate);
const pcm16 = floatTo16BitPCM(downsampled);
ws.send(pcm16.buffer);
};
micSource.connect(processor);
processor.connect(audioCtx.destination);
logLine("sys", "Microphone started");
}
function stopMic() {
if (processor) {
processor.disconnect();
processor = null;
}
if (micSource) {
micSource.disconnect();
micSource = null;
}
if (micStream) {
micStream.getTracks().forEach((t) => t.stop());
micStream = null;
}
logLine("sys", "Microphone stopped");
}
async function refreshDevices() {
const devices = await navigator.mediaDevices.enumerateDevices();
inputSelect.innerHTML = "";
outputSelect.innerHTML = "";
devices.forEach((d) => {
if (d.kind === "audioinput") {
const opt = document.createElement("option");
opt.value = d.deviceId;
opt.textContent = d.label || `Mic ${inputSelect.length + 1}`;
inputSelect.appendChild(opt);
}
if (d.kind === "audiooutput") {
const opt = document.createElement("option");
opt.value = d.deviceId;
opt.textContent = d.label || `Output ${outputSelect.length + 1}`;
outputSelect.appendChild(opt);
}
});
}
async function requestDeviceAccess() {
// Needed to reveal device labels in most browsers
try {
const stream = await navigator.mediaDevices.getUserMedia({ audio: true });
stream.getTracks().forEach((t) => t.stop());
logLine("sys", "Microphone permission granted");
} catch (err) {
logLine("sys", "Microphone permission denied", { err: String(err) });
}
}
async function setOutputDevice(deviceId) {
if (!audioOut.setSinkId) {
logLine("sys", "setSinkId not supported in this browser");
return;
}
await audioOut.setSinkId(deviceId);
logLine("sys", `Output device set`, { deviceId });
}
connectBtn.addEventListener("click", connect);
disconnectBtn.addEventListener("click", disconnect);
refreshDevicesBtn.addEventListener("click", async () => {
await requestDeviceAccess();
await refreshDevices();
});
startMicBtn.addEventListener("click", startMic);
stopMicBtn.addEventListener("click", stopMic);
sendChatBtn.addEventListener("click", () => {
const text = chatInput.value.trim();
if (!text) return;
ensureAudioContext();
addChat("You", text);
sendCommand({ command: "chat", text });
chatInput.value = "";
});
clearLogBtn.addEventListener("click", () => {
logEl.innerHTML = "";
chatHistory.innerHTML = "";
setInterim("You", "");
setInterim("AI", "");
interimUserText = "";
interimAiText = "";
});
inputSelect.addEventListener("change", () => {
if (micStream) {
stopMic();
startMic();
}
});
outputSelect.addEventListener("change", () => setOutputDevice(outputSelect.value));
navigator.mediaDevices.addEventListener("devicechange", refreshDevices);
refreshDevices().catch(() => {});
</script>
</body>
</html>

View File

@@ -179,6 +179,13 @@ class DTMFEvent(BaseEvent):
digit: str = Field(..., description="DTMF digit (0-9, *, #, A-D)")
class HeartBeatEvent(BaseModel):
"""Server-to-client heartbeat to keep connection alive."""
event: str = Field(default="heartBeat", description="Event type")
timestamp: int = Field(default_factory=current_timestamp_ms, description="Event timestamp in milliseconds")
# Event type mapping
EVENT_TYPES = {
"incoming": IncomingEvent,
@@ -198,6 +205,7 @@ EVENT_TYPES = {
"metrics": MetricsEvent,
"addHistory": AddHistoryEvent,
"dtmf": DTMFEvent,
"heartBeat": HeartBeatEvent,
}

View File

@@ -6,7 +6,6 @@ from typing import Tuple, Optional
import numpy as np
from loguru import logger
from processors.eou import EouDetector
# Try to import onnxruntime (optional for VAD functionality)
try:
@@ -64,6 +63,7 @@ class SileroVAD:
self.min_chunk_size = 512
self.last_label = "Silence"
self.last_probability = 0.0
self._energy_noise_floor = 1e-4
def _reset_state(self):
# Silero VAD V4+ expects state shape [2, 1, 128]
@@ -82,8 +82,27 @@ class SileroVAD:
Tuple of (label, probability) where label is "Speech" or "Silence"
"""
if self.session is None or not ONNX_AVAILABLE:
# If model not loaded or onnxruntime not available, assume speech
return "Speech", 1.0
# Fallback energy-based VAD with adaptive noise floor.
if not pcm_bytes:
return "Silence", 0.0
audio_int16 = np.frombuffer(pcm_bytes, dtype=np.int16)
if audio_int16.size == 0:
return "Silence", 0.0
audio_float = audio_int16.astype(np.float32) / 32768.0
rms = float(np.sqrt(np.mean(audio_float * audio_float)))
# Update adaptive noise floor (slowly rises, faster to fall)
if rms < self._energy_noise_floor:
self._energy_noise_floor = 0.95 * self._energy_noise_floor + 0.05 * rms
else:
self._energy_noise_floor = 0.995 * self._energy_noise_floor + 0.005 * rms
# Compute SNR-like ratio and map to probability
denom = max(self._energy_noise_floor, 1e-6)
snr = max(0.0, (rms - denom) / denom)
probability = min(1.0, snr / 3.0) # ~3x above noise => strong speech
label = "Speech" if probability >= 0.5 else "Silence"
return label, probability
# Convert bytes to numpy array of int16
audio_int16 = np.frombuffer(pcm_bytes, dtype=np.int16)
@@ -148,25 +167,19 @@ class VADProcessor:
Tracks speech/silence state and emits events on transitions.
"""
def __init__(self, vad_model: SileroVAD, threshold: float = 0.5,
silence_threshold_ms: int = 1000, min_speech_duration_ms: int = 250):
def __init__(self, vad_model: SileroVAD, threshold: float = 0.5):
"""
Initialize VAD processor.
Args:
vad_model: Silero VAD model instance
threshold: Speech detection threshold
silence_threshold_ms: EOU silence threshold in ms (longer = one EOU across short pauses)
min_speech_duration_ms: EOU min speech duration in ms (ignore very short noises)
"""
self.vad = vad_model
self.threshold = threshold
self._eou_silence_ms = silence_threshold_ms
self._eou_min_speech_ms = min_speech_duration_ms
self.is_speaking = False
self.speech_start_time: Optional[float] = None
self.silence_start_time: Optional[float] = None
self.eou_detector = EouDetector(silence_threshold_ms, min_speech_duration_ms)
def process(self, pcm_bytes: bytes, chunk_size_ms: int = 20) -> Optional[Tuple[str, float]]:
"""
@@ -184,10 +197,6 @@ class VADProcessor:
# Check if this is speech based on threshold
is_speech = probability >= self.threshold
# Check EOU
if self.eou_detector.process("Speech" if is_speech else "Silence"):
return ("eou", probability)
# State transition: Silence -> Speech
if is_speech and not self.is_speaking:
self.is_speaking = True
@@ -210,4 +219,3 @@ class VADProcessor:
self.is_speaking = False
self.speech_start_time = None
self.silence_start_time = None
self.eou_detector = EouDetector(self._eou_silence_ms, self._eou_min_speech_ms)

1
scripts/README.md Normal file
View File

@@ -0,0 +1 @@
# Development Script

View File

@@ -0,0 +1,311 @@
#!/usr/bin/env python3
"""
Generate test audio file with utterances using SiliconFlow TTS API.
Creates a 16kHz mono WAV file with real speech segments separated by
configurable silence (for VAD/testing).
Usage:
python generate_test_audio.py [OPTIONS]
Options:
-o, --output PATH Output WAV path (default: data/audio_examples/two_utterances_16k.wav)
-u, --utterance TEXT Utterance text; repeat for multiple (ignored if -j is set)
-j, --json PATH JSON file: array of strings or {"utterances": [...]}
--silence-ms MS Silence in ms between utterances (default: 500)
--lead-silence-ms MS Silence in ms at start (default: 200)
--trail-silence-ms MS Silence in ms at end (default: 300)
Examples:
# Default utterances and output
python generate_test_audio.py
# Custom output path
python generate_test_audio.py -o out.wav
# Utterances from command line
python generate_test_audio.py -u "Hello" -u "World" -o test.wav
# Utterancgenerate_test_audio.py -j utterances.json -o test.wav
# Custom silence (1s between utterances)
python generate_test_audio.py -u "One" -u "Two" --silence-ms 1000 -o test.wav
Requires SILICONFLOW_API_KEY in .env.
"""
import wave
import struct
import argparse
import asyncio
import aiohttp
import json
import os
from pathlib import Path
from dotenv import load_dotenv
# Load .env file from project root
project_root = Path(__file__).parent.parent.parent
load_dotenv(project_root / ".env")
# SiliconFlow TTS Configuration
SILICONFLOW_API_URL = "https://api.siliconflow.cn/v1/audio/speech"
SILICONFLOW_MODEL = "FunAudioLLM/CosyVoice2-0.5B"
# Available voices
VOICES = {
"alex": "FunAudioLLM/CosyVoice2-0.5B:alex",
"anna": "FunAudioLLM/CosyVoice2-0.5B:anna",
"bella": "FunAudioLLM/CosyVoice2-0.5B:bella",
"benjamin": "FunAudioLLM/CosyVoice2-0.5B:benjamin",
"charles": "FunAudioLLM/CosyVoice2-0.5B:charles",
"claire": "FunAudioLLM/CosyVoice2-0.5B:claire",
"david": "FunAudioLLM/CosyVoice2-0.5B:david",
"diana": "FunAudioLLM/CosyVoice2-0.5B:diana",
}
def generate_silence(duration_ms: int, sample_rate: int = 16000) -> bytes:
"""Generate silence as PCM bytes."""
num_samples = int(sample_rate * (duration_ms / 1000.0))
return b'\x00\x00' * num_samples
async def synthesize_speech(
text: str,
api_key: str,
voice: str = "anna",
sample_rate: int = 16000,
speed: float = 1.0
) -> bytes:
"""
Synthesize speech using SiliconFlow TTS API.
Args:
text: Text to synthesize
api_key: SiliconFlow API key
voice: Voice name (alex, anna, bella, benjamin, charles, claire, david, diana)
sample_rate: Output sample rate (8000, 16000, 24000, 32000, 44100)
speed: Speech speed (0.25 to 4.0)
Returns:
PCM audio bytes (16-bit signed, little-endian)
"""
# Resolve voice name
full_voice = VOICES.get(voice, voice)
payload = {
"model": SILICONFLOW_MODEL,
"input": text,
"voice": full_voice,
"response_format": "pcm",
"sample_rate": sample_rate,
"stream": False,
"speed": speed
}
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
async with aiohttp.ClientSession() as session:
async with session.post(SILICONFLOW_API_URL, json=payload, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
raise RuntimeError(f"SiliconFlow TTS error: {response.status} - {error_text}")
return await response.read()
async def generate_test_audio(
output_path: str,
utterances: list[str],
silence_ms: int = 500,
lead_silence_ms: int = 200,
trail_silence_ms: int = 300,
voice: str = "anna",
sample_rate: int = 16000,
speed: float = 1.0
):
"""
Generate test audio with multiple utterances separated by silence.
Args:
output_path: Path to save the WAV file
utterances: List of text strings for each utterance
silence_ms: Silence duration between utterances (milliseconds)
lead_silence_ms: Silence at the beginning (milliseconds)
trail_silence_ms: Silence at the end (milliseconds)
voice: TTS voice to use
sample_rate: Audio sample rate
speed: TTS speech speed
"""
api_key = os.getenv("SILICONFLOW_API_KEY")
if not api_key:
raise ValueError(
"SILICONFLOW_API_KEY not found in environment.\n"
"Please set it in your .env file:\n"
" SILICONFLOW_API_KEY=your-api-key-here"
)
print(f"Using SiliconFlow TTS API")
print(f" Voice: {voice}")
print(f" Sample rate: {sample_rate}Hz")
print(f" Speed: {speed}x")
print()
segments = []
# Lead-in silence
if lead_silence_ms > 0:
segments.append(generate_silence(lead_silence_ms, sample_rate))
print(f" [silence: {lead_silence_ms}ms]")
# Generate each utterance with silence between
for i, text in enumerate(utterances):
print(f" Synthesizing utterance {i + 1}: \"{text}\"")
audio = await synthesize_speech(
text=text,
api_key=api_key,
voice=voice,
sample_rate=sample_rate,
speed=speed
)
segments.append(audio)
# Add silence between utterances (not after the last one)
if i < len(utterances) - 1:
segments.append(generate_silence(silence_ms, sample_rate))
print(f" [silence: {silence_ms}ms]")
# Trail silence
if trail_silence_ms > 0:
segments.append(generate_silence(trail_silence_ms, sample_rate))
print(f" [silence: {trail_silence_ms}ms]")
# Concatenate all segments
audio_data = b''.join(segments)
# Write WAV file
with wave.open(output_path, 'wb') as wf:
wf.setnchannels(1) # Mono
wf.setsampwidth(2) # 16-bit
wf.setframerate(sample_rate)
wf.writeframes(audio_data)
duration_sec = len(audio_data) / (sample_rate * 2)
print()
print(f"Generated: {output_path}")
print(f" Duration: {duration_sec:.2f}s")
print(f" Sample rate: {sample_rate}Hz")
print(f" Format: 16-bit mono PCM WAV")
print(f" Size: {len(audio_data):,} bytes")
def load_utterances_from_json(path: Path) -> list[str]:
"""
Load utterances from a JSON file.
Accepts either:
- A JSON array: ["utterance 1", "utterance 2"]
- A JSON object with "utterances" key: {"utterances": ["a", "b"]}
"""
with open(path, encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, list):
return [str(s) for s in data]
if isinstance(data, dict) and "utterances" in data:
return [str(s) for s in data["utterances"]]
raise ValueError(
f"JSON file must be an array of strings or an object with 'utterances' key. "
f"Got: {type(data).__name__}"
)
def parse_args():
"""Parse command-line arguments."""
script_dir = Path(__file__).parent
default_output = script_dir.parent / "data" / "audio_examples" / "two_utterances_16k.wav"
parser = argparse.ArgumentParser(description="Generate test audio with SiliconFlow TTS (utterances + silence).")
parser.add_argument(
"-o", "--output",
type=Path,
default=default_output,
help=f"Output WAV file path (default: {default_output})"
)
parser.add_argument(
"-u", "--utterance",
action="append",
dest="utterances",
metavar="TEXT",
help="Utterance text (repeat for multiple). Ignored if --json is set."
)
parser.add_argument(
"-j", "--json",
type=Path,
metavar="PATH",
help="JSON file with utterances: array of strings or object with 'utterances' key"
)
parser.add_argument(
"--silence-ms",
type=int,
default=500,
metavar="MS",
help="Silence in ms between utterances (default: 500)"
)
parser.add_argument(
"--lead-silence-ms",
type=int,
default=200,
metavar="MS",
help="Silence in ms at start of file (default: 200)"
)
parser.add_argument(
"--trail-silence-ms",
type=int,
default=300,
metavar="MS",
help="Silence in ms at end of file (default: 300)"
)
return parser.parse_args()
async def main():
"""Main entry point."""
args = parse_args()
output_path = args.output
output_path.parent.mkdir(parents=True, exist_ok=True)
# Resolve utterances: JSON file > -u args > defaults
if args.json is not None:
if not args.json.is_file():
raise FileNotFoundError(f"Utterances JSON file not found: {args.json}")
utterances = load_utterances_from_json(args.json)
if not utterances:
raise ValueError(f"JSON file has no utterances: {args.json}")
elif args.utterances:
utterances = args.utterances
else:
utterances = [
"Hello, how are you doing today?",
"I'm doing great, thank you for asking!"
]
await generate_test_audio(
output_path=str(output_path),
utterances=utterances,
silence_ms=args.silence_ms,
lead_silence_ms=args.lead_silence_ms,
trail_silence_ms=args.trail_silence_ms,
voice="anna",
sample_rate=16000,
speed=1.0
)
if __name__ == "__main__":
asyncio.run(main())