Compare commits

..

16 Commits

Author SHA1 Message Date
Xin Wang
9954e8d18f update client latency and three utterances example 2026-02-06 07:51:09 +08:00
Xin Wang
4ceb3ec96f Fix Duplicate / inconsistent EOU 2026-02-06 07:23:31 +08:00
Xin Wang
da52a88006 Fix _on_end_of_utterance sets state to LISTENING even when no text. 2026-02-05 18:47:56 +08:00
Xin Wang
2de427b92c Add energy based vad fallback 2026-02-05 17:21:52 +08:00
Xin Wang
b72e09f263 Add heartbeat 2026-02-04 23:16:30 +08:00
Xin Wang
77d54d284f remove pipeline because it is just a vad integration 2026-02-04 15:01:05 +08:00
Xin Wang
0835f6a617 update gitignore 2026-02-04 14:16:27 +08:00
Xin Wang
d9d5d523ec change default logs path in main 2026-02-04 13:34:39 +08:00
Xin Wang
2b41648a87 add audio_examples and update gitignore 2026-02-04 13:25:55 +08:00
Xin Wang
911bbb5bf4 add audio samples and update wav client 2026-02-04 13:22:17 +08:00
Xin Wang
7d255468ab api has llm response event 2026-02-04 12:00:52 +08:00
Xin Wang
5aa9a12ca8 Add generate test audio script 2026-02-04 10:45:10 +08:00
Xin Wang
8bc24ded59 fix long run bug 2026-02-03 12:05:09 +08:00
Xin Wang
a2e341b433 Merge branch 'master' of https://gitea.xiaowang.eu.org/wx44wx/py-active-call2 2026-02-02 23:19:26 +08:00
d27f230532 Merge pull request 'Add basic README' (#1) from add-readme into master
Reviewed-on: #1
2026-01-30 09:07:26 +00:00
Xin Wang
cf7d3b23bc Update bargin in duration ms 2026-01-30 16:24:47 +08:00
20 changed files with 1332 additions and 202 deletions

5
.gitignore vendored
View File

@@ -143,9 +143,6 @@ cython_debug/
*~
# Project specific
assets/*.onnx
*.wav
*.mp3
*.pcm
recordings/
logs/
running/

View File

@@ -5,3 +5,21 @@ Python Active-Call: real-time audio streaming with WebSocket and WebRTC.
This repo contains a Python 3.11+ codebase for building low-latency voice
pipelines (capture, stream, and process audio) using WebRTC and WebSockets.
It is currently in an early, experimental stage.
# Usage
启动
```
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
```
测试
```
python examples/test_websocket.py
```
```
python mic_client.py
```

View File

@@ -64,8 +64,8 @@ class Settings(BaseSettings):
# Barge-in (interruption) Configuration
barge_in_min_duration_ms: int = Field(
default=50,
description="Minimum speech duration (ms) required to trigger barge-in. 50-100ms recommended."
default=200,
description="Minimum speech duration (ms) required to trigger barge-in. Lower=more sensitive."
)
# Logging
@@ -84,6 +84,10 @@ class Settings(BaseSettings):
description="ICE servers configuration"
)
# WebSocket heartbeat and inactivity
inactivity_timeout_sec: int = Field(default=60, description="Close connection after no message from client (seconds)")
heartbeat_interval_sec: int = Field(default=50, description="Send heartBeat event to client every N seconds")
@property
def chunk_size_bytes(self) -> int:
"""Calculate chunk size in bytes based on sample rate and duration."""

View File

@@ -1,8 +1,10 @@
"""FastAPI application with WebSocket and WebRTC endpoints."""
import uuid
import asyncio
import json
from typing import Dict, Any, Optional
import time
import uuid
from typing import Dict, Any, Optional, List
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
@@ -17,11 +19,49 @@ except ImportError:
logger.warning("aiortc not available - WebRTC endpoint will be disabled")
from app.config import settings
from core.transports import SocketTransport, WebRtcTransport
from core.transports import SocketTransport, WebRtcTransport, BaseTransport
from core.session import Session
from processors.tracks import Resampled16kTrack
from core.events import get_event_bus, reset_event_bus
# Check interval for heartbeat/timeout (seconds)
_HEARTBEAT_CHECK_INTERVAL_SEC = 5
async def heartbeat_and_timeout_task(
transport: BaseTransport,
session: Session,
session_id: str,
last_received_at: List[float],
last_heartbeat_at: List[float],
inactivity_timeout_sec: int,
heartbeat_interval_sec: int,
) -> None:
"""
Background task: send heartBeat every ~heartbeat_interval_sec and close
connection if no message from client for inactivity_timeout_sec.
"""
while True:
await asyncio.sleep(_HEARTBEAT_CHECK_INTERVAL_SEC)
if transport.is_closed:
break
now = time.monotonic()
if now - last_received_at[0] > inactivity_timeout_sec:
logger.info(f"Session {session_id}: {inactivity_timeout_sec}s no message, closing")
await session.cleanup()
break
if now - last_heartbeat_at[0] >= heartbeat_interval_sec:
try:
await transport.send_event({
"event": "heartBeat",
"timestamp": int(time.time() * 1000),
})
last_heartbeat_at[0] = now
except Exception as e:
logger.debug(f"Session {session_id}: heartbeat send failed: {e}")
break
# Initialize FastAPI
app = FastAPI(title="Python Active-Call", version="0.1.0")
@@ -40,7 +80,7 @@ active_sessions: Dict[str, Session] = {}
# Configure logging
logger.remove()
logger.add(
"../logs/active_call_{time}.log",
"./logs/active_call_{time}.log",
rotation="1 day",
retention="7 days",
level=settings.log_level,
@@ -112,10 +152,25 @@ async def websocket_endpoint(websocket: WebSocket):
logger.info(f"WebSocket connection established: {session_id}")
last_received_at: List[float] = [time.monotonic()]
last_heartbeat_at: List[float] = [0.0]
hb_task = asyncio.create_task(
heartbeat_and_timeout_task(
transport,
session,
session_id,
last_received_at,
last_heartbeat_at,
settings.inactivity_timeout_sec,
settings.heartbeat_interval_sec,
)
)
try:
# Receive loop
while True:
message = await websocket.receive()
last_received_at[0] = time.monotonic()
# Handle binary audio data
if "bytes" in message:
@@ -132,6 +187,11 @@ async def websocket_endpoint(websocket: WebSocket):
logger.error(f"WebSocket error: {e}", exc_info=True)
finally:
hb_task.cancel()
try:
await hb_task
except asyncio.CancelledError:
pass
# Cleanup session
if session_id in active_sessions:
await session.cleanup()
@@ -165,6 +225,20 @@ async def webrtc_endpoint(websocket: WebSocket):
logger.info(f"WebRTC connection established: {session_id}")
last_received_at: List[float] = [time.monotonic()]
last_heartbeat_at: List[float] = [0.0]
hb_task = asyncio.create_task(
heartbeat_and_timeout_task(
transport,
session,
session_id,
last_received_at,
last_heartbeat_at,
settings.inactivity_timeout_sec,
settings.heartbeat_interval_sec,
)
)
# Track handler for incoming audio
@pc.on("track")
def on_track(track):
@@ -202,6 +276,7 @@ async def webrtc_endpoint(websocket: WebSocket):
if "text" not in message:
continue
last_received_at[0] = time.monotonic()
data = json.loads(message["text"])
# Handle SDP offer/answer
@@ -238,6 +313,11 @@ async def webrtc_endpoint(websocket: WebSocket):
logger.error(f"WebRTC error: {e}", exc_info=True)
finally:
hb_task.cancel()
try:
await hb_task
except asyncio.CancelledError:
pass
# Cleanup
await pc.close()
if session_id in active_sessions:

View File

@@ -2,7 +2,6 @@
from core.events import EventBus, get_event_bus
from core.transports import BaseTransport, SocketTransport, WebRtcTransport
from core.pipeline import AudioPipeline
from core.session import Session
from core.conversation import ConversationManager, ConversationState, ConversationTurn
from core.duplex_pipeline import DuplexPipeline
@@ -13,7 +12,6 @@ __all__ = [
"BaseTransport",
"SocketTransport",
"WebRtcTransport",
"AudioPipeline",
"Session",
"ConversationManager",
"ConversationState",

View File

@@ -85,8 +85,8 @@ class DuplexPipeline:
# Initialize EOU detector
self.eou_detector = EouDetector(
silence_threshold_ms=600,
min_speech_duration_ms=200
silence_threshold_ms=settings.vad_eou_threshold_ms,
min_speech_duration_ms=settings.vad_min_speech_duration_ms
)
# Initialize services
@@ -113,6 +113,10 @@ class DuplexPipeline:
# Interruption handling
self._interrupt_event = asyncio.Event()
# Latency tracking - TTFB (Time to First Byte)
self._turn_start_time: Optional[float] = None
self._first_audio_sent: bool = False
# Barge-in filtering - require minimum speech duration to interrupt
self._barge_in_speech_start_time: Optional[float] = None
self._barge_in_min_duration_ms: int = settings.barge_in_min_duration_ms if hasattr(settings, 'barge_in_min_duration_ms') else 50
@@ -364,7 +368,8 @@ class DuplexPipeline:
# Reset for next utterance
self._audio_buffer = b""
self._last_sent_transcript = ""
await self.conversation.start_user_turn()
# Return to idle; don't force LISTENING which causes buffering on silence
await self.conversation.set_state(ConversationState.IDLE)
return
logger.info(f"EOU detected - user said: {user_text[:100]}...")
@@ -396,6 +401,10 @@ class DuplexPipeline:
user_text: User's transcribed text
"""
try:
# Start latency tracking
self._turn_start_time = time.time()
self._first_audio_sent = False
# Get AI response (streaming)
messages = self.conversation.get_messages()
full_response = ""
@@ -418,6 +427,15 @@ class DuplexPipeline:
sentence_buffer += text_chunk
await self.conversation.update_assistant_text(text_chunk)
# Send LLM response streaming event to client
await self.transport.send_event({
"event": "llmResponse",
"trackId": self.session_id,
"text": text_chunk,
"isFinal": False,
"timestamp": self._get_timestamp_ms()
})
# Check for sentence completion - synthesize immediately for low latency
while any(end in sentence_buffer for end in sentence_ends):
# Find first sentence end
@@ -446,6 +464,16 @@ class DuplexPipeline:
else:
break
# Send final LLM response event
if full_response and not self._interrupt_event.is_set():
await self.transport.send_event({
"event": "llmResponse",
"trackId": self.session_id,
"text": full_response,
"isFinal": True,
"timestamp": self._get_timestamp_ms()
})
# Speak any remaining text
if sentence_buffer.strip() and not self._interrupt_event.is_set():
if not first_audio_sent:
@@ -495,10 +523,33 @@ class DuplexPipeline:
try:
async for chunk in self.tts_service.synthesize_stream(text):
# Check interrupt at the start of each iteration
if self._interrupt_event.is_set():
logger.debug("TTS sentence interrupted")
break
# Track and log first audio packet latency (TTFB)
if not self._first_audio_sent and self._turn_start_time:
ttfb_ms = (time.time() - self._turn_start_time) * 1000
self._first_audio_sent = True
logger.info(f"[TTFB] Server first audio packet latency: {ttfb_ms:.0f}ms (session {self.session_id})")
# Send TTFB event to client
await self.transport.send_event({
"event": "ttfb",
"trackId": self.session_id,
"timestamp": self._get_timestamp_ms(),
"latencyMs": round(ttfb_ms)
})
# Double-check interrupt right before sending audio
if self._interrupt_event.is_set():
break
await self.transport.send_audio(chunk.audio)
await asyncio.sleep(0.005) # Small delay to prevent flooding
except asyncio.CancelledError:
logger.debug("TTS sentence cancelled")
except Exception as e:
logger.error(f"TTS sentence error: {e}")
@@ -513,6 +564,10 @@ class DuplexPipeline:
return
try:
# Start latency tracking for greeting
speak_start_time = time.time()
first_audio_sent = False
# Send track start event
await self.transport.send_event({
"event": "trackStart",
@@ -528,6 +583,20 @@ class DuplexPipeline:
logger.info("TTS interrupted by barge-in")
break
# Track and log first audio packet latency (TTFB)
if not first_audio_sent:
ttfb_ms = (time.time() - speak_start_time) * 1000
first_audio_sent = True
logger.info(f"[TTFB] Greeting first audio packet latency: {ttfb_ms:.0f}ms (session {self.session_id})")
# Send TTFB event to client
await self.transport.send_event({
"event": "ttfb",
"trackId": self.session_id,
"timestamp": self._get_timestamp_ms(),
"latencyMs": round(ttfb_ms)
})
# Send audio to client
await self.transport.send_audio(chunk.audio)
@@ -561,8 +630,17 @@ class DuplexPipeline:
self._barge_in_speech_frames = 0
self._barge_in_silence_frames = 0
# Signal interruption
# IMPORTANT: Signal interruption FIRST to stop audio sending
self._interrupt_event.set()
self._is_bot_speaking = False
# Send interrupt event to client IMMEDIATELY
# This must happen BEFORE canceling services, so client knows to discard in-flight audio
await self.transport.send_event({
"event": "interrupt",
"trackId": self.session_id,
"timestamp": self._get_timestamp_ms()
})
# Cancel TTS
if self.tts_service:
@@ -575,15 +653,7 @@ class DuplexPipeline:
# Interrupt conversation
await self.conversation.interrupt()
# Send interrupt event to client
await self.transport.send_event({
"event": "interrupt",
"trackId": self.session_id,
"timestamp": self._get_timestamp_ms()
})
# Reset for new user turn
self._is_bot_speaking = False
await self.conversation.start_user_turn()
self._audio_buffer = b""
self.eou_detector.reset()

View File

@@ -1,131 +0,0 @@
"""Audio processing pipeline."""
import asyncio
from typing import Optional
from loguru import logger
from core.transports import BaseTransport
from core.events import EventBus, get_event_bus
from processors.vad import VADProcessor, SileroVAD
from app.config import settings
class AudioPipeline:
"""
Audio processing pipeline.
Processes incoming audio through VAD and emits events.
"""
def __init__(self, transport: BaseTransport, session_id: str):
"""
Initialize audio pipeline.
Args:
transport: Transport instance for sending events/audio
session_id: Session identifier for event tracking
"""
self.transport = transport
self.session_id = session_id
self.event_bus = get_event_bus()
# Initialize VAD
self.vad_model = SileroVAD(
model_path=settings.vad_model_path,
sample_rate=settings.sample_rate
)
self.vad_processor = VADProcessor(
vad_model=self.vad_model,
threshold=settings.vad_threshold,
silence_threshold_ms=settings.vad_eou_threshold_ms,
min_speech_duration_ms=settings.vad_min_speech_duration_ms
)
# State
self.is_bot_speaking = False
self.interrupt_signal = asyncio.Event()
self._running = True
logger.info(f"Audio pipeline initialized for session {session_id}")
async def process_input(self, pcm_bytes: bytes) -> None:
"""
Process incoming audio chunk.
Args:
pcm_bytes: PCM audio data (16-bit, mono, 16kHz)
"""
if not self._running:
return
try:
# Process through VAD
result = self.vad_processor.process(pcm_bytes, settings.chunk_size_ms)
if result:
event_type, probability = result
# Emit event through event bus
await self.event_bus.publish(event_type, {
"trackId": self.session_id,
"probability": probability
})
# Send event to client
if event_type == "speaking":
logger.info(f"User speaking started (session {self.session_id})")
await self.transport.send_event({
"event": "speaking",
"trackId": self.session_id,
"timestamp": self._get_timestamp_ms(),
"startTime": self._get_timestamp_ms()
})
elif event_type == "silence":
logger.info(f"User speaking stopped (session {self.session_id})")
await self.transport.send_event({
"event": "silence",
"trackId": self.session_id,
"timestamp": self._get_timestamp_ms(),
"startTime": self._get_timestamp_ms(),
"duration": 0 # TODO: Calculate actual duration
})
elif event_type == "eou":
logger.info(f"EOU detected (session {self.session_id})")
await self.transport.send_event({
"event": "eou",
"trackId": self.session_id,
"timestamp": self._get_timestamp_ms()
})
except Exception as e:
logger.error(f"Pipeline processing error: {e}", exc_info=True)
async def process_text_input(self, text: str) -> None:
"""
Process text input (chat command).
Args:
text: Text input
"""
logger.info(f"Processing text input: {text[:50]}...")
# TODO: Implement text processing (LLM integration, etc.)
# For now, just log it
async def interrupt(self) -> None:
"""Interrupt current audio playback."""
if self.is_bot_speaking:
self.interrupt_signal.set()
logger.info(f"Pipeline interrupted for session {self.session_id}")
async def cleanup(self) -> None:
"""Cleanup pipeline resources."""
logger.info(f"Cleaning up pipeline for session {self.session_id}")
self._running = False
self.interrupt_signal.set()
def _get_timestamp_ms(self) -> int:
"""Get current timestamp in milliseconds."""
import time
return int(time.time() * 1000)

View File

@@ -6,7 +6,7 @@ from typing import Optional, Dict, Any
from loguru import logger
from core.transports import BaseTransport
from core.pipeline import AudioPipeline
from core.duplex_pipeline import DuplexPipeline
from models.commands import parse_command, TTSCommand, ChatCommand, InterruptCommand, HangupCommand
from app.config import settings
@@ -16,7 +16,7 @@ class Session:
Manages a single call session.
Handles command routing, audio processing, and session lifecycle.
Supports both basic audio pipeline and full duplex voice conversation.
Uses full duplex voice conversation pipeline.
"""
def __init__(self, session_id: str, transport: BaseTransport, use_duplex: bool = None):
@@ -30,20 +30,14 @@ class Session:
"""
self.id = session_id
self.transport = transport
# Determine pipeline mode
self.use_duplex = use_duplex if use_duplex is not None else settings.duplex_enabled
if self.use_duplex:
from core.duplex_pipeline import DuplexPipeline
self.pipeline = DuplexPipeline(
transport=transport,
session_id=session_id,
system_prompt=settings.duplex_system_prompt,
greeting=settings.duplex_greeting
)
else:
self.pipeline = AudioPipeline(transport, session_id)
self.pipeline = DuplexPipeline(
transport=transport,
session_id=session_id,
system_prompt=settings.duplex_system_prompt,
greeting=settings.duplex_greeting
)
# Session state
self.created_at = None
@@ -129,10 +123,7 @@ class Session:
audio_bytes: PCM audio data
"""
try:
if self.use_duplex:
await self.pipeline.process_audio(audio_bytes)
else:
await self.pipeline.process_input(audio_bytes)
await self.pipeline.process_audio(audio_bytes)
except Exception as e:
logger.error(f"Session {self.id} handle_audio error: {e}", exc_info=True)
@@ -148,8 +139,8 @@ class Session:
"timestamp": self._get_timestamp_ms()
})
# Start duplex pipeline if enabled
if self.use_duplex and not self._pipeline_started:
# Start duplex pipeline
if not self._pipeline_started:
try:
await self.pipeline.start()
self._pipeline_started = True
@@ -228,10 +219,7 @@ class Session:
logger.info(f"Session {self.id} graceful interrupt")
else:
logger.info(f"Session {self.id} immediate interrupt")
if self.use_duplex:
await self.pipeline.interrupt()
else:
await self.pipeline.interrupt()
await self.pipeline.interrupt()
async def _handle_pause(self) -> None:
"""Handle pause command."""
@@ -267,11 +255,7 @@ class Session:
async def _handle_chat(self, command: ChatCommand) -> None:
"""Handle chat command."""
logger.info(f"Session {self.id} chat: {command.text[:50]}...")
# Process text input through pipeline
if self.use_duplex:
await self.pipeline.process_text(command.text)
else:
await self.pipeline.process_text_input(command.text)
await self.pipeline.process_text(command.text)
async def _send_error(self, sender: str, error_message: str) -> None:
"""

Binary file not shown.

Binary file not shown.

Binary file not shown.

187
docs/proejct_todo.md Normal file
View File

@@ -0,0 +1,187 @@
# OmniSense: 12-Week Sprint Board + Tech Stack (Python Backend) — TODO
## Scope
- [ ] Build a realtime AI SaaS (OmniSense) focused on web-first audio + video with WebSocket + WebRTC endpoints
- [ ] Deliver assistant builder, tool execution, observability, evals, optional telephony later
- [ ] Keep scope aligned to 2-person team, self-hosted services
---
## Sprint Board (12 weeks, 2-week sprints)
Team assumption: 2 engineers. Scope prioritized to web-first audio + video, with BYO-SFU adapters.
### Sprint 1 (Weeks 12) — Realtime Core MVP (WebSocket + WebRTC Audio)
- Deliverables
- [ ] WebSocket transport: audio in/out streaming (1:1)
- [ ] WebRTC transport: audio in/out streaming (1:1)
- [ ] Adapter contract wired into runtime (transport-agnostic session core)
- [ ] ASR → LLM → TTS pipeline, streaming both directions
- [ ] Basic session state (start/stop, silence timeout)
- [ ] Transcript persistence
- Acceptance criteria
- [ ] < 1.5s median round-trip for short responses
- [ ] Stable streaming for 10+ minute session
### Sprint 2 (Weeks 34) — Video + Realtime UX
- Deliverables
- [ ] WebRTC video capture + streaming (assistant can “see” frames)
- [ ] WebSocket video streaming for local/dev mode
- [ ] Low-latency UI: push-to-talk, live captions, speaking indicator
- [ ] Recording + transcript storage (web sessions)
- Acceptance criteria
- [ ] Video < 2.5s end-to-end latency for analysis
- [ ] Audio quality acceptable (no clipping, jitter handling)
### Sprint 3 (Weeks 56) — Assistant Builder v1
- Deliverables
- [ ] Assistant schema + versioning
- [ ] UI: Model/Voice/Transcriber/Tools/Video/Transport tabs
- [ ] “Test/Chat/Talk to Assistant” (web)
- Acceptance criteria
- [ ] Create/publish assistant and run a live web session
- [ ] All config changes tracked by version
### Sprint 4 (Weeks 78) — Tooling + Structured Outputs
- Deliverables
- [ ] Tool registry + custom HTTP tools
- [ ] Tool auth secrets management
- [ ] Structured outputs (JSON extraction)
- Acceptance criteria
- [ ] Tool calls executed with retries/timeouts
- [ ] Structured JSON stored per call/session
### Sprint 5 (Weeks 910) — Observability + QA + Dev Platform
- Deliverables
- [ ] Session logs + chat logs + media logs
- [ ] Evals engine + test suites
- [ ] Basic analytics dashboard
- [ ] Public WebSocket API spec + message schema
- [ ] JS/TS SDK (connect, send audio/video, receive transcripts)
- Acceptance criteria
- [ ] Reproducible test suite runs
- [ ] Log filters by assistant/time/status
- [ ] SDK demo app runs end-to-end
### Sprint 6 (Weeks 1112) — SaaS Hardening
- Deliverables
- [ ] Org/RBAC + API keys + rate limits
- [ ] Usage metering + credits
- [ ] Stripe billing integration
- [ ] Self-hosted DB ops (migrations, backup/restore, monitoring)
- Acceptance criteria
- [ ] Metered usage per org
- [ ] Credits decrement correctly
- [ ] Optional telephony spike documented (defer build)
- [ ] Enterprise adapter guide published (BYO-SFU)
---
## Tech Stack by Service (Self-Hosted, Web-First)
### 1) Transport Gateway (Realtime)
- [ ] WebRTC (browser) + WebSocket (lightweight/dev) protocols
- [ ] BYO-SFU adapter (enterprise) + LiveKit optional adapter + WS transport server
- [ ] Python core (FastAPI + asyncio) + Node.js mediasoup adapters when needed
- [ ] Media: Opus/VP8, jitter buffer, VAD, echo cancellation
- [ ] Storage: S3-compatible (MinIO) for recordings
### 2) ASR Service
- [ ] Whisper (self-hosted) baseline
- [ ] gRPC/WebSocket streaming transport
- [ ] Python native service
- [ ] Optional cloud provider fallback (later)
### 3) TTS Service
- [ ] Piper or Coqui TTS (self-hosted)
- [ ] gRPC/WebSocket streaming transport
- [ ] Python native service
- [ ] Redis cache for common phrases
### 4) LLM Orchestrator
- [ ] Self-hosted (vLLM + open model)
- [ ] Python (FastAPI + asyncio)
- [ ] Streaming, tool calling, JSON mode
- [ ] Safety filters + prompt templates
### 5) Assistant Config Service
- [ ] PostgreSQL
- [ ] Python (SQLAlchemy or SQLModel)
- [ ] Versioning, publish/rollback
### 6) Session Service
- [ ] PostgreSQL + Redis
- [ ] Python
- [ ] State machine, timeouts, events
### 7) Tool Execution Layer
- [ ] PostgreSQL
- [ ] Python
- [ ] Auth secret vault, retry policies, tool schemas
### 8) Observability + Logs
- [ ] Postgres (metadata), ClickHouse (logs/metrics)
- [ ] OpenSearch for search
- [ ] Prometheus + Grafana metrics
- [ ] OpenTelemetry tracing
### 9) Billing + Usage Metering
- [ ] Stripe billing
- [ ] PostgreSQL
- [ ] NATS JetStream (events) + Redis counters
### 10) Web App (Dashboard)
- [ ] React + Next.js
- [ ] Tailwind or Radix UI
- [ ] WebRTC client + WS client; adapter-based RTC integration
- [ ] ECharts/Recharts
### 11) Auth + RBAC
- [ ] Keycloak (self-hosted) or custom JWT
- [ ] Org/user/role tables in Postgres
### 12) Public WebSocket API + SDK
- [ ] WS API: versioned schema, binary audio frames + JSON control messages
- [ ] SDKs: JS/TS first, optional Python/Go clients
- [ ] Docs: quickstart, auth flow, session lifecycle, examples
---
## Infrastructure (Self-Hosted)
- [ ] Docker Compose → k3s (later)
- [ ] Redis Streams or NATS
- [ ] MinIO object store
- [ ] GitHub Actions + Helm or kustomize
- [ ] Self-hosted Postgres + pgbackrest backups
- [ ] Vault for secrets
---
## Suggested MVP Sequence
- [ ] WebRTC demo + ASR/LLM/TTS streaming
- [ ] Assistant schema + versioning (web-first)
- [ ] Video capture + multimodal analysis
- [ ] Tool execution + structured outputs
- [ ] Logs + evals + public WS API + SDK
- [ ] Telephony (optional, later)
---
## Public WebSocket API (Minimum Spec)
- [ ] Auth: API key or JWT in initial `hello` message
- [ ] Core messages: `session.start`, `session.stop`, `audio.append`, `audio.commit`, `video.append`, `transcript.delta`, `assistant.response`, `tool.call`, `tool.result`, `error`
- [ ] Binary payloads: PCM/Opus frames with metadata in control channel
- [ ] Versioning: `v1` schema with backward compatibility rules
---
## Self-Hosted DB Ops Checklist
- [ ] Postgres in Docker/k3s with persistent volumes
- [ ] Migrations: `alembic` or `atlas`
- [ ] Backups: `pgbackrest` nightly + on-demand
- [ ] Monitoring: postgres_exporter + alerts
---
## RTC Adapter Contract (BYO-SFU First)
- [ ] Keep RTC pluggable; LiveKit optional, not core dependency
- [ ] Define adapter interface (TypeScript sketch)

View File

@@ -4,10 +4,12 @@ Microphone client for testing duplex voice conversation.
This client captures audio from the microphone, sends it to the server,
and plays back the AI's voice response through the speakers.
It also displays the LLM's text responses in the console.
Usage:
python examples/mic_client.py --url ws://localhost:8000/ws
python examples/mic_client.py --url ws://localhost:8000/ws --chat "Hello!"
python examples/mic_client.py --url ws://localhost:8000/ws --verbose
Requirements:
pip install sounddevice soundfile websockets numpy
@@ -17,6 +19,7 @@ import argparse
import asyncio
import json
import sys
import time
import threading
import queue
from pathlib import Path
@@ -92,6 +95,17 @@ class MicrophoneClient:
# State
self.is_recording = True
self.is_playing = True
# TTFB tracking (Time to First Byte)
self.request_start_time = None
self.first_audio_received = False
# Interrupt handling - discard audio until next trackStart
self._discard_audio = False
self._audio_sequence = 0 # Track audio sequence to detect stale chunks
# Verbose mode for streaming LLM responses
self.verbose = False
async def connect(self) -> None:
"""Connect to WebSocket server."""
@@ -117,6 +131,10 @@ class MicrophoneClient:
async def send_chat(self, text: str) -> None:
"""Send chat message (text input)."""
# Reset TTFB tracking for new request
self.request_start_time = time.time()
self.first_audio_received = False
await self.send_command({
"command": "chat",
"text": text
@@ -236,9 +254,21 @@ class MicrophoneClient:
# Audio data received
self.bytes_received += len(message)
# Check if we should discard this audio (after interrupt)
if self._discard_audio:
duration_ms = len(message) / (self.sample_rate * 2) * 1000
print(f"← Audio: {duration_ms:.0f}ms (DISCARDED - waiting for new track)")
continue
if self.is_playing:
self._add_audio_to_buffer(message)
# Calculate and display TTFB for first audio packet
if not self.first_audio_received and self.request_start_time:
client_ttfb_ms = (time.time() - self.request_start_time) * 1000
self.first_audio_received = True
print(f"← [TTFB] Client first audio latency: {client_ttfb_ms:.0f}ms")
# Show progress (less verbose)
with self.audio_output_lock:
buffer_ms = len(self.audio_output_buffer) / (self.sample_rate * 2) * 1000
@@ -285,20 +315,47 @@ class MicrophoneClient:
# Interim result - show with indicator (overwrite same line)
display_text = text[:60] + "..." if len(text) > 60 else text
print(f" [listening] {display_text}".ljust(80), end="\r")
elif event_type == "ttfb":
# Server-side TTFB event
latency_ms = event.get("latencyMs", 0)
print(f"← [TTFB] Server reported latency: {latency_ms}ms")
elif event_type == "llmResponse":
# LLM text response
text = event.get("text", "")
is_final = event.get("isFinal", False)
if is_final:
# Print final LLM response
print(f"← AI: {text}")
elif self.verbose:
# Show streaming chunks only in verbose mode
display_text = text[:60] + "..." if len(text) > 60 else text
print(f" [streaming] {display_text}")
elif event_type == "trackStart":
print("← Bot started speaking")
# IMPORTANT: Accept audio again after trackStart
self._discard_audio = False
self._audio_sequence += 1
# Reset TTFB tracking for voice responses (when no chat was sent)
if self.request_start_time is None:
self.request_start_time = time.time()
self.first_audio_received = False
# Clear any old audio in buffer
with self.audio_output_lock:
self.audio_output_buffer = b""
elif event_type == "trackEnd":
print("← Bot finished speaking")
# Reset TTFB tracking after response completes
self.request_start_time = None
self.first_audio_received = False
elif event_type == "interrupt":
print("← Bot interrupted!")
# IMPORTANT: Clear audio buffer immediately on interrupt
# IMPORTANT: Discard all audio until next trackStart
self._discard_audio = True
# Clear audio buffer immediately
with self.audio_output_lock:
buffer_ms = len(self.audio_output_buffer) / (self.sample_rate * 2) * 1000
self.audio_output_buffer = b""
print(f" (cleared {buffer_ms:.0f}ms of buffered audio)")
print(f" (cleared {buffer_ms:.0f}ms, discarding audio until new track)")
elif event_type == "error":
print(f"← Error: {event.get('error')}")
elif event_type == "hangup":
@@ -511,6 +568,11 @@ async def main():
action="store_true",
help="Disable interactive mode"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Show streaming LLM response chunks"
)
args = parser.parse_args()
@@ -524,6 +586,7 @@ async def main():
input_device=args.input_device,
output_device=args.output_device
)
client.verbose = args.verbose
await client.run(
chat_message=args.chat,

View File

@@ -12,6 +12,7 @@ import argparse
import asyncio
import json
import sys
import time
import wave
import io
@@ -67,6 +68,13 @@ class SimpleVoiceClient:
# Stats
self.bytes_received = 0
# TTFB tracking (Time to First Byte)
self.request_start_time = None
self.first_audio_received = False
# Interrupt handling - discard audio until next trackStart
self._discard_audio = False
async def connect(self):
"""Connect to server."""
@@ -84,6 +92,10 @@ class SimpleVoiceClient:
async def send_chat(self, text: str):
"""Send chat message."""
# Reset TTFB tracking for new request
self.request_start_time = time.time()
self.first_audio_received = False
await self.ws.send(json.dumps({"command": "chat", "text": text}))
print(f"-> chat: {text}")
@@ -120,6 +132,18 @@ class SimpleVoiceClient:
# Audio data
self.bytes_received += len(msg)
duration_ms = len(msg) / (self.sample_rate * 2) * 1000
# Check if we should discard this audio (after interrupt)
if self._discard_audio:
print(f"<- audio: {len(msg)} bytes ({duration_ms:.0f}ms) [DISCARDED]")
continue
# Calculate and display TTFB for first audio packet
if not self.first_audio_received and self.request_start_time:
client_ttfb_ms = (time.time() - self.request_start_time) * 1000
self.first_audio_received = True
print(f"<- [TTFB] Client first audio latency: {client_ttfb_ms:.0f}ms")
print(f"<- audio: {len(msg)} bytes ({duration_ms:.0f}ms)")
# Play immediately in executor to not block
@@ -138,6 +162,18 @@ class SimpleVoiceClient:
print(f"<- You said: {text}")
else:
print(f"<- [listening] {text}", end="\r")
elif etype == "ttfb":
# Server-side TTFB event
latency_ms = event.get("latencyMs", 0)
print(f"<- [TTFB] Server reported latency: {latency_ms}ms")
elif etype == "trackStart":
# New track starting - accept audio again
self._discard_audio = False
print(f"<- {etype}")
elif etype == "interrupt":
# Interrupt - discard audio until next trackStart
self._discard_audio = True
print(f"<- {etype} (discarding audio until new track)")
elif etype == "hangup":
print(f"<- {etype}")
self.running = False

504
examples/wav_client.py Normal file
View File

@@ -0,0 +1,504 @@
#!/usr/bin/env python3
"""
WAV file client for testing duplex voice conversation.
This client reads audio from a WAV file, sends it to the server,
and saves the AI's voice response to an output WAV file.
Usage:
python examples/wav_client.py --input input.wav --output response.wav
python examples/wav_client.py --input input.wav --output response.wav --url ws://localhost:8000/ws
python examples/wav_client.py --input input.wav --output response.wav --wait-time 10
python wav_client.py --input ../data/audio_examples/two_utterances.wav -o response.wav
Requirements:
pip install soundfile websockets numpy
"""
import argparse
import asyncio
import json
import sys
import time
import wave
from pathlib import Path
try:
import numpy as np
except ImportError:
print("Please install numpy: pip install numpy")
sys.exit(1)
try:
import soundfile as sf
except ImportError:
print("Please install soundfile: pip install soundfile")
sys.exit(1)
try:
import websockets
except ImportError:
print("Please install websockets: pip install websockets")
sys.exit(1)
class WavFileClient:
"""
WAV file client for voice conversation testing.
Features:
- Read audio from WAV file
- Send audio to WebSocket server
- Receive and save response audio
- Event logging
"""
def __init__(
self,
url: str,
input_file: str,
output_file: str,
sample_rate: int = 16000,
chunk_duration_ms: int = 20,
wait_time: float = 15.0,
verbose: bool = False
):
"""
Initialize WAV file client.
Args:
url: WebSocket server URL
input_file: Input WAV file path
output_file: Output WAV file path
sample_rate: Audio sample rate (Hz)
chunk_duration_ms: Audio chunk duration (ms) for sending
wait_time: Time to wait for response after sending (seconds)
verbose: Enable verbose output
"""
self.url = url
self.input_file = Path(input_file)
self.output_file = Path(output_file)
self.sample_rate = sample_rate
self.chunk_duration_ms = chunk_duration_ms
self.chunk_samples = int(sample_rate * chunk_duration_ms / 1000)
self.wait_time = wait_time
self.verbose = verbose
# WebSocket connection
self.ws = None
self.running = False
# Audio buffers
self.received_audio = bytearray()
# Statistics
self.bytes_sent = 0
self.bytes_received = 0
# TTFB tracking (per response)
self.send_start_time = None
self.response_start_time = None # set on each trackStart
self.waiting_for_first_audio = False
self.ttfb_ms = None # last TTFB for summary
self.ttfb_list = [] # TTFB for each response
# State tracking
self.track_started = False
self.track_ended = False
self.send_completed = False
# Events log
self.events_log = []
def log_event(self, direction: str, message: str):
"""Log an event with timestamp."""
timestamp = time.time()
self.events_log.append({
"timestamp": timestamp,
"direction": direction,
"message": message
})
# Handle encoding errors on Windows
try:
print(f"{direction} {message}")
except UnicodeEncodeError:
# Replace problematic characters for console output
safe_message = message.encode('ascii', errors='replace').decode('ascii')
print(f"{direction} {safe_message}")
async def connect(self) -> None:
"""Connect to WebSocket server."""
self.log_event("", f"Connecting to {self.url}...")
self.ws = await websockets.connect(self.url)
self.running = True
self.log_event("", "Connected!")
# Send invite command
await self.send_command({
"command": "invite",
"option": {
"codec": "pcm",
"sampleRate": self.sample_rate
}
})
async def send_command(self, cmd: dict) -> None:
"""Send JSON command to server."""
if self.ws:
await self.ws.send(json.dumps(cmd))
self.log_event("", f"Command: {cmd.get('command', 'unknown')}")
async def send_hangup(self, reason: str = "Session complete") -> None:
"""Send hangup command."""
await self.send_command({
"command": "hangup",
"reason": reason
})
def load_wav_file(self) -> tuple[np.ndarray, int]:
"""
Load and prepare WAV file for sending.
Returns:
Tuple of (audio_data as int16 numpy array, original sample rate)
"""
if not self.input_file.exists():
raise FileNotFoundError(f"Input file not found: {self.input_file}")
# Load audio file
audio_data, file_sample_rate = sf.read(self.input_file)
self.log_event("", f"Loaded: {self.input_file}")
self.log_event("", f" Original sample rate: {file_sample_rate} Hz")
self.log_event("", f" Duration: {len(audio_data) / file_sample_rate:.2f}s")
# Convert stereo to mono if needed
if len(audio_data.shape) > 1:
audio_data = audio_data.mean(axis=1)
self.log_event("", " Converted stereo to mono")
# Resample if needed
if file_sample_rate != self.sample_rate:
# Simple resampling using numpy
duration = len(audio_data) / file_sample_rate
num_samples = int(duration * self.sample_rate)
indices = np.linspace(0, len(audio_data) - 1, num_samples)
audio_data = np.interp(indices, np.arange(len(audio_data)), audio_data)
self.log_event("", f" Resampled to {self.sample_rate} Hz")
# Convert to int16
if audio_data.dtype != np.int16:
# Normalize to [-1, 1] if needed
max_val = np.max(np.abs(audio_data))
if max_val > 1.0:
audio_data = audio_data / max_val
audio_data = (audio_data * 32767).astype(np.int16)
self.log_event("", f" Prepared: {len(audio_data)} samples ({len(audio_data)/self.sample_rate:.2f}s)")
return audio_data, file_sample_rate
async def audio_sender(self, audio_data: np.ndarray) -> None:
"""Send audio data to server in chunks."""
total_samples = len(audio_data)
chunk_size = self.chunk_samples
sent_samples = 0
self.send_start_time = time.time()
self.log_event("", f"Starting audio transmission ({total_samples} samples)...")
while sent_samples < total_samples and self.running:
# Get next chunk
end_sample = min(sent_samples + chunk_size, total_samples)
chunk = audio_data[sent_samples:end_sample]
chunk_bytes = chunk.tobytes()
# Send to server
if self.ws:
await self.ws.send(chunk_bytes)
self.bytes_sent += len(chunk_bytes)
sent_samples = end_sample
# Progress logging (every 500ms worth of audio)
if self.verbose and sent_samples % (self.sample_rate // 2) == 0:
progress = (sent_samples / total_samples) * 100
print(f" Sending: {progress:.0f}%", end="\r")
# Delay to simulate real-time streaming
# Server expects audio at real-time pace for VAD/ASR to work properly
await asyncio.sleep(self.chunk_duration_ms / 1000)
self.send_completed = True
elapsed = time.time() - self.send_start_time
self.log_event("", f"Audio transmission complete ({elapsed:.2f}s, {self.bytes_sent/1024:.1f} KB)")
async def receiver(self) -> None:
"""Receive messages from server."""
try:
while self.running:
try:
message = await asyncio.wait_for(self.ws.recv(), timeout=0.1)
if isinstance(message, bytes):
# Audio data received
self.bytes_received += len(message)
self.received_audio.extend(message)
# Calculate TTFB on first audio of each response
if self.waiting_for_first_audio and self.response_start_time is not None:
ttfb_ms = (time.time() - self.response_start_time) * 1000
self.ttfb_ms = ttfb_ms
self.ttfb_list.append(ttfb_ms)
self.waiting_for_first_audio = False
self.log_event("", f"[TTFB] First audio latency: {ttfb_ms:.0f}ms")
# Log progress
duration_ms = len(message) / (self.sample_rate * 2) * 1000
total_ms = len(self.received_audio) / (self.sample_rate * 2) * 1000
if self.verbose:
print(f"← Audio: +{duration_ms:.0f}ms (total: {total_ms:.0f}ms)", end="\r")
else:
# JSON event
event = json.loads(message)
await self._handle_event(event)
except asyncio.TimeoutError:
continue
except websockets.ConnectionClosed:
self.log_event("", "Connection closed")
self.running = False
break
except asyncio.CancelledError:
pass
except Exception as e:
self.log_event("!", f"Receiver error: {e}")
self.running = False
async def _handle_event(self, event: dict) -> None:
"""Handle incoming event."""
event_type = event.get("event", "unknown")
if event_type == "answer":
self.log_event("", "Session ready!")
elif event_type == "speaking":
self.log_event("", "Speech detected")
elif event_type == "silence":
self.log_event("", "Silence detected")
elif event_type == "transcript":
# ASR transcript (interim = asrDelta-style, final = asrFinal-style)
text = event.get("text", "")
is_final = event.get("isFinal", False)
if is_final:
# Clear interim line and print final
print(" " * 80, end="\r")
self.log_event("", f"→ You: {text}")
else:
# Interim result - show with indicator (overwrite same line, as in mic_client)
display_text = text[:60] + "..." if len(text) > 60 else text
print(f" [listening] {display_text}".ljust(80), end="\r")
elif event_type == "ttfb":
latency_ms = event.get("latencyMs", 0)
self.log_event("", f"[TTFB] Server latency: {latency_ms}ms")
elif event_type == "llmResponse":
text = event.get("text", "")
is_final = event.get("isFinal", False)
if is_final:
self.log_event("", f"LLM Response (final): {text[:100]}{'...' if len(text) > 100 else ''}")
elif self.verbose:
# Show streaming chunks only in verbose mode
self.log_event("", f"LLM: {text}")
elif event_type == "trackStart":
self.track_started = True
self.response_start_time = time.time()
self.waiting_for_first_audio = True
self.log_event("", "Bot started speaking")
elif event_type == "trackEnd":
self.track_ended = True
self.log_event("", "Bot finished speaking")
elif event_type == "interrupt":
self.log_event("", "Bot interrupted!")
elif event_type == "error":
self.log_event("!", f"Error: {event.get('error')}")
elif event_type == "hangup":
self.log_event("", f"Hangup: {event.get('reason')}")
self.running = False
else:
self.log_event("", f"Event: {event_type}")
def save_output_wav(self) -> None:
"""Save received audio to output WAV file."""
if not self.received_audio:
self.log_event("!", "No audio received to save")
return
# Convert bytes to numpy array
audio_data = np.frombuffer(bytes(self.received_audio), dtype=np.int16)
# Ensure output directory exists
self.output_file.parent.mkdir(parents=True, exist_ok=True)
# Save using wave module for compatibility
with wave.open(str(self.output_file), 'wb') as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2) # 16-bit
wav_file.setframerate(self.sample_rate)
wav_file.writeframes(audio_data.tobytes())
duration = len(audio_data) / self.sample_rate
self.log_event("", f"Saved output: {self.output_file}")
self.log_event("", f" Duration: {duration:.2f}s ({len(audio_data)} samples)")
self.log_event("", f" Size: {len(self.received_audio)/1024:.1f} KB")
async def run(self) -> None:
"""Run the WAV file test."""
try:
# Load input WAV file
audio_data, _ = self.load_wav_file()
# Connect to server
await self.connect()
# Wait for answer
await asyncio.sleep(0.5)
# Start receiver task
receiver_task = asyncio.create_task(self.receiver())
# Send audio
await self.audio_sender(audio_data)
# Wait for response
self.log_event("", f"Waiting {self.wait_time}s for response...")
wait_start = time.time()
while self.running and (time.time() - wait_start) < self.wait_time:
# Check if track has ended (response complete)
if self.track_ended and self.send_completed:
# Give a little extra time for any remaining audio
await asyncio.sleep(1.0)
break
await asyncio.sleep(0.1)
# Cleanup
self.running = False
receiver_task.cancel()
try:
await receiver_task
except asyncio.CancelledError:
pass
# Save output
self.save_output_wav()
# Print summary
self._print_summary()
except FileNotFoundError as e:
print(f"Error: {e}")
sys.exit(1)
except ConnectionRefusedError:
print(f"Error: Could not connect to {self.url}")
print("Make sure the server is running.")
sys.exit(1)
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
finally:
await self.close()
def _print_summary(self):
"""Print session summary."""
print("\n" + "=" * 50)
print("Session Summary")
print("=" * 50)
print(f" Input file: {self.input_file}")
print(f" Output file: {self.output_file}")
print(f" Bytes sent: {self.bytes_sent / 1024:.1f} KB")
print(f" Bytes received: {self.bytes_received / 1024:.1f} KB")
if self.ttfb_list:
if len(self.ttfb_list) == 1:
print(f" TTFB: {self.ttfb_list[0]:.0f} ms")
else:
print(f" TTFB (per response): {', '.join(f'{t:.0f}ms' for t in self.ttfb_list)}")
if self.received_audio:
duration = len(self.received_audio) / (self.sample_rate * 2)
print(f" Response duration: {duration:.2f}s")
print("=" * 50)
async def close(self) -> None:
"""Close the connection."""
self.running = False
if self.ws:
try:
await self.ws.close()
except:
pass
async def main():
parser = argparse.ArgumentParser(
description="WAV file client for testing duplex voice conversation"
)
parser.add_argument(
"--input", "-i",
required=True,
help="Input WAV file path"
)
parser.add_argument(
"--output", "-o",
required=True,
help="Output WAV file path for response"
)
parser.add_argument(
"--url",
default="ws://localhost:8000/ws",
help="WebSocket server URL (default: ws://localhost:8000/ws)"
)
parser.add_argument(
"--sample-rate",
type=int,
default=16000,
help="Target sample rate for audio (default: 16000)"
)
parser.add_argument(
"--chunk-duration",
type=int,
default=20,
help="Chunk duration in ms for sending (default: 20)"
)
parser.add_argument(
"--wait-time", "-w",
type=float,
default=15.0,
help="Time to wait for response after sending (default: 15.0)"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose output"
)
args = parser.parse_args()
client = WavFileClient(
url=args.url,
input_file=args.input,
output_file=args.output,
sample_rate=args.sample_rate,
chunk_duration_ms=args.chunk_duration,
wait_time=args.wait_time,
verbose=args.verbose
)
await client.run()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nInterrupted by user")

View File

@@ -179,6 +179,13 @@ class DTMFEvent(BaseEvent):
digit: str = Field(..., description="DTMF digit (0-9, *, #, A-D)")
class HeartBeatEvent(BaseModel):
"""Server-to-client heartbeat to keep connection alive."""
event: str = Field(default="heartBeat", description="Event type")
timestamp: int = Field(default_factory=current_timestamp_ms, description="Event timestamp in milliseconds")
# Event type mapping
EVENT_TYPES = {
"incoming": IncomingEvent,
@@ -198,6 +205,7 @@ EVENT_TYPES = {
"metrics": MetricsEvent,
"addHistory": AddHistoryEvent,
"dtmf": DTMFEvent,
"heartBeat": HeartBeatEvent,
}

View File

@@ -6,7 +6,6 @@ from typing import Tuple, Optional
import numpy as np
from loguru import logger
from processors.eou import EouDetector
# Try to import onnxruntime (optional for VAD functionality)
try:
@@ -82,8 +81,20 @@ class SileroVAD:
Tuple of (label, probability) where label is "Speech" or "Silence"
"""
if self.session is None or not ONNX_AVAILABLE:
# If model not loaded or onnxruntime not available, assume speech
return "Speech", 1.0
# Fallback energy-based VAD when model isn't available.
# Map RMS energy to a pseudo-probability so the existing threshold works.
if not pcm_bytes:
return "Silence", 0.0
audio_int16 = np.frombuffer(pcm_bytes, dtype=np.int16)
if audio_int16.size == 0:
return "Silence", 0.0
audio_float = audio_int16.astype(np.float32) / 32768.0
rms = float(np.sqrt(np.mean(audio_float * audio_float)))
# Typical speech RMS is ~0.02-0.05 at 16-bit normalized scale.
# Normalize so threshold=0.5 roughly corresponds to ~0.025 RMS.
probability = min(1.0, rms / 0.05)
label = "Speech" if probability >= 0.5 else "Silence"
return label, probability
# Convert bytes to numpy array of int16
audio_int16 = np.frombuffer(pcm_bytes, dtype=np.int16)
@@ -148,25 +159,19 @@ class VADProcessor:
Tracks speech/silence state and emits events on transitions.
"""
def __init__(self, vad_model: SileroVAD, threshold: float = 0.5,
silence_threshold_ms: int = 1000, min_speech_duration_ms: int = 250):
def __init__(self, vad_model: SileroVAD, threshold: float = 0.5):
"""
Initialize VAD processor.
Args:
vad_model: Silero VAD model instance
threshold: Speech detection threshold
silence_threshold_ms: EOU silence threshold in ms (longer = one EOU across short pauses)
min_speech_duration_ms: EOU min speech duration in ms (ignore very short noises)
"""
self.vad = vad_model
self.threshold = threshold
self._eou_silence_ms = silence_threshold_ms
self._eou_min_speech_ms = min_speech_duration_ms
self.is_speaking = False
self.speech_start_time: Optional[float] = None
self.silence_start_time: Optional[float] = None
self.eou_detector = EouDetector(silence_threshold_ms, min_speech_duration_ms)
def process(self, pcm_bytes: bytes, chunk_size_ms: int = 20) -> Optional[Tuple[str, float]]:
"""
@@ -184,10 +189,6 @@ class VADProcessor:
# Check if this is speech based on threshold
is_speech = probability >= self.threshold
# Check EOU
if self.eou_detector.process("Speech" if is_speech else "Silence"):
return ("eou", probability)
# State transition: Silence -> Speech
if is_speech and not self.is_speaking:
self.is_speaking = True
@@ -210,4 +211,3 @@ class VADProcessor:
self.is_speaking = False
self.speech_start_time = None
self.silence_start_time = None
self.eou_detector = EouDetector(self._eou_silence_ms, self._eou_min_speech_ms)

1
scripts/README.md Normal file
View File

@@ -0,0 +1 @@
# Development Script

View File

@@ -0,0 +1,311 @@
#!/usr/bin/env python3
"""
Generate test audio file with utterances using SiliconFlow TTS API.
Creates a 16kHz mono WAV file with real speech segments separated by
configurable silence (for VAD/testing).
Usage:
python generate_test_audio.py [OPTIONS]
Options:
-o, --output PATH Output WAV path (default: data/audio_examples/two_utterances_16k.wav)
-u, --utterance TEXT Utterance text; repeat for multiple (ignored if -j is set)
-j, --json PATH JSON file: array of strings or {"utterances": [...]}
--silence-ms MS Silence in ms between utterances (default: 500)
--lead-silence-ms MS Silence in ms at start (default: 200)
--trail-silence-ms MS Silence in ms at end (default: 300)
Examples:
# Default utterances and output
python generate_test_audio.py
# Custom output path
python generate_test_audio.py -o out.wav
# Utterances from command line
python generate_test_audio.py -u "Hello" -u "World" -o test.wav
# Utterancgenerate_test_audio.py -j utterances.json -o test.wav
# Custom silence (1s between utterances)
python generate_test_audio.py -u "One" -u "Two" --silence-ms 1000 -o test.wav
Requires SILICONFLOW_API_KEY in .env.
"""
import wave
import struct
import argparse
import asyncio
import aiohttp
import json
import os
from pathlib import Path
from dotenv import load_dotenv
# Load .env file from project root
project_root = Path(__file__).parent.parent.parent
load_dotenv(project_root / ".env")
# SiliconFlow TTS Configuration
SILICONFLOW_API_URL = "https://api.siliconflow.cn/v1/audio/speech"
SILICONFLOW_MODEL = "FunAudioLLM/CosyVoice2-0.5B"
# Available voices
VOICES = {
"alex": "FunAudioLLM/CosyVoice2-0.5B:alex",
"anna": "FunAudioLLM/CosyVoice2-0.5B:anna",
"bella": "FunAudioLLM/CosyVoice2-0.5B:bella",
"benjamin": "FunAudioLLM/CosyVoice2-0.5B:benjamin",
"charles": "FunAudioLLM/CosyVoice2-0.5B:charles",
"claire": "FunAudioLLM/CosyVoice2-0.5B:claire",
"david": "FunAudioLLM/CosyVoice2-0.5B:david",
"diana": "FunAudioLLM/CosyVoice2-0.5B:diana",
}
def generate_silence(duration_ms: int, sample_rate: int = 16000) -> bytes:
"""Generate silence as PCM bytes."""
num_samples = int(sample_rate * (duration_ms / 1000.0))
return b'\x00\x00' * num_samples
async def synthesize_speech(
text: str,
api_key: str,
voice: str = "anna",
sample_rate: int = 16000,
speed: float = 1.0
) -> bytes:
"""
Synthesize speech using SiliconFlow TTS API.
Args:
text: Text to synthesize
api_key: SiliconFlow API key
voice: Voice name (alex, anna, bella, benjamin, charles, claire, david, diana)
sample_rate: Output sample rate (8000, 16000, 24000, 32000, 44100)
speed: Speech speed (0.25 to 4.0)
Returns:
PCM audio bytes (16-bit signed, little-endian)
"""
# Resolve voice name
full_voice = VOICES.get(voice, voice)
payload = {
"model": SILICONFLOW_MODEL,
"input": text,
"voice": full_voice,
"response_format": "pcm",
"sample_rate": sample_rate,
"stream": False,
"speed": speed
}
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
async with aiohttp.ClientSession() as session:
async with session.post(SILICONFLOW_API_URL, json=payload, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
raise RuntimeError(f"SiliconFlow TTS error: {response.status} - {error_text}")
return await response.read()
async def generate_test_audio(
output_path: str,
utterances: list[str],
silence_ms: int = 500,
lead_silence_ms: int = 200,
trail_silence_ms: int = 300,
voice: str = "anna",
sample_rate: int = 16000,
speed: float = 1.0
):
"""
Generate test audio with multiple utterances separated by silence.
Args:
output_path: Path to save the WAV file
utterances: List of text strings for each utterance
silence_ms: Silence duration between utterances (milliseconds)
lead_silence_ms: Silence at the beginning (milliseconds)
trail_silence_ms: Silence at the end (milliseconds)
voice: TTS voice to use
sample_rate: Audio sample rate
speed: TTS speech speed
"""
api_key = os.getenv("SILICONFLOW_API_KEY")
if not api_key:
raise ValueError(
"SILICONFLOW_API_KEY not found in environment.\n"
"Please set it in your .env file:\n"
" SILICONFLOW_API_KEY=your-api-key-here"
)
print(f"Using SiliconFlow TTS API")
print(f" Voice: {voice}")
print(f" Sample rate: {sample_rate}Hz")
print(f" Speed: {speed}x")
print()
segments = []
# Lead-in silence
if lead_silence_ms > 0:
segments.append(generate_silence(lead_silence_ms, sample_rate))
print(f" [silence: {lead_silence_ms}ms]")
# Generate each utterance with silence between
for i, text in enumerate(utterances):
print(f" Synthesizing utterance {i + 1}: \"{text}\"")
audio = await synthesize_speech(
text=text,
api_key=api_key,
voice=voice,
sample_rate=sample_rate,
speed=speed
)
segments.append(audio)
# Add silence between utterances (not after the last one)
if i < len(utterances) - 1:
segments.append(generate_silence(silence_ms, sample_rate))
print(f" [silence: {silence_ms}ms]")
# Trail silence
if trail_silence_ms > 0:
segments.append(generate_silence(trail_silence_ms, sample_rate))
print(f" [silence: {trail_silence_ms}ms]")
# Concatenate all segments
audio_data = b''.join(segments)
# Write WAV file
with wave.open(output_path, 'wb') as wf:
wf.setnchannels(1) # Mono
wf.setsampwidth(2) # 16-bit
wf.setframerate(sample_rate)
wf.writeframes(audio_data)
duration_sec = len(audio_data) / (sample_rate * 2)
print()
print(f"Generated: {output_path}")
print(f" Duration: {duration_sec:.2f}s")
print(f" Sample rate: {sample_rate}Hz")
print(f" Format: 16-bit mono PCM WAV")
print(f" Size: {len(audio_data):,} bytes")
def load_utterances_from_json(path: Path) -> list[str]:
"""
Load utterances from a JSON file.
Accepts either:
- A JSON array: ["utterance 1", "utterance 2"]
- A JSON object with "utterances" key: {"utterances": ["a", "b"]}
"""
with open(path, encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, list):
return [str(s) for s in data]
if isinstance(data, dict) and "utterances" in data:
return [str(s) for s in data["utterances"]]
raise ValueError(
f"JSON file must be an array of strings or an object with 'utterances' key. "
f"Got: {type(data).__name__}"
)
def parse_args():
"""Parse command-line arguments."""
script_dir = Path(__file__).parent
default_output = script_dir.parent / "data" / "audio_examples" / "two_utterances_16k.wav"
parser = argparse.ArgumentParser(description="Generate test audio with SiliconFlow TTS (utterances + silence).")
parser.add_argument(
"-o", "--output",
type=Path,
default=default_output,
help=f"Output WAV file path (default: {default_output})"
)
parser.add_argument(
"-u", "--utterance",
action="append",
dest="utterances",
metavar="TEXT",
help="Utterance text (repeat for multiple). Ignored if --json is set."
)
parser.add_argument(
"-j", "--json",
type=Path,
metavar="PATH",
help="JSON file with utterances: array of strings or object with 'utterances' key"
)
parser.add_argument(
"--silence-ms",
type=int,
default=500,
metavar="MS",
help="Silence in ms between utterances (default: 500)"
)
parser.add_argument(
"--lead-silence-ms",
type=int,
default=200,
metavar="MS",
help="Silence in ms at start of file (default: 200)"
)
parser.add_argument(
"--trail-silence-ms",
type=int,
default=300,
metavar="MS",
help="Silence in ms at end of file (default: 300)"
)
return parser.parse_args()
async def main():
"""Main entry point."""
args = parse_args()
output_path = args.output
output_path.parent.mkdir(parents=True, exist_ok=True)
# Resolve utterances: JSON file > -u args > defaults
if args.json is not None:
if not args.json.is_file():
raise FileNotFoundError(f"Utterances JSON file not found: {args.json}")
utterances = load_utterances_from_json(args.json)
if not utterances:
raise ValueError(f"JSON file has no utterances: {args.json}")
elif args.utterances:
utterances = args.utterances
else:
utterances = [
"Hello, how are you doing today?",
"I'm doing great, thank you for asking!"
]
await generate_test_audio(
output_path=str(output_path),
utterances=utterances,
silence_ms=args.silence_ms,
lead_silence_ms=args.lead_silence_ms,
trail_silence_ms=args.trail_silence_ms,
voice="anna",
sample_rate=16000,
speed=1.0
)
if __name__ == "__main__":
asyncio.run(main())