fix long run bug

This commit is contained in:
Xin Wang
2026-02-03 12:05:09 +08:00
parent a2e341b433
commit 8bc24ded59
6 changed files with 343 additions and 11 deletions

View File

@@ -5,3 +5,21 @@ Python Active-Call: real-time audio streaming with WebSocket and WebRTC.
This repo contains a Python 3.11+ codebase for building low-latency voice This repo contains a Python 3.11+ codebase for building low-latency voice
pipelines (capture, stream, and process audio) using WebRTC and WebSockets. pipelines (capture, stream, and process audio) using WebRTC and WebSockets.
It is currently in an early, experimental stage. It is currently in an early, experimental stage.
# Usage
启动
```
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
```
测试
```
python examples/test_websocket.py
```
```
python mic_client.py
```

View File

@@ -113,6 +113,10 @@ class DuplexPipeline:
# Interruption handling # Interruption handling
self._interrupt_event = asyncio.Event() self._interrupt_event = asyncio.Event()
# Latency tracking - TTFB (Time to First Byte)
self._turn_start_time: Optional[float] = None
self._first_audio_sent: bool = False
# Barge-in filtering - require minimum speech duration to interrupt # Barge-in filtering - require minimum speech duration to interrupt
self._barge_in_speech_start_time: Optional[float] = None self._barge_in_speech_start_time: Optional[float] = None
self._barge_in_min_duration_ms: int = settings.barge_in_min_duration_ms if hasattr(settings, 'barge_in_min_duration_ms') else 50 self._barge_in_min_duration_ms: int = settings.barge_in_min_duration_ms if hasattr(settings, 'barge_in_min_duration_ms') else 50
@@ -396,6 +400,10 @@ class DuplexPipeline:
user_text: User's transcribed text user_text: User's transcribed text
""" """
try: try:
# Start latency tracking
self._turn_start_time = time.time()
self._first_audio_sent = False
# Get AI response (streaming) # Get AI response (streaming)
messages = self.conversation.get_messages() messages = self.conversation.get_messages()
full_response = "" full_response = ""
@@ -495,10 +503,33 @@ class DuplexPipeline:
try: try:
async for chunk in self.tts_service.synthesize_stream(text): async for chunk in self.tts_service.synthesize_stream(text):
# Check interrupt at the start of each iteration
if self._interrupt_event.is_set():
logger.debug("TTS sentence interrupted")
break
# Track and log first audio packet latency (TTFB)
if not self._first_audio_sent and self._turn_start_time:
ttfb_ms = (time.time() - self._turn_start_time) * 1000
self._first_audio_sent = True
logger.info(f"[TTFB] Server first audio packet latency: {ttfb_ms:.0f}ms (session {self.session_id})")
# Send TTFB event to client
await self.transport.send_event({
"event": "ttfb",
"trackId": self.session_id,
"timestamp": self._get_timestamp_ms(),
"latencyMs": round(ttfb_ms)
})
# Double-check interrupt right before sending audio
if self._interrupt_event.is_set(): if self._interrupt_event.is_set():
break break
await self.transport.send_audio(chunk.audio) await self.transport.send_audio(chunk.audio)
await asyncio.sleep(0.005) # Small delay to prevent flooding await asyncio.sleep(0.005) # Small delay to prevent flooding
except asyncio.CancelledError:
logger.debug("TTS sentence cancelled")
except Exception as e: except Exception as e:
logger.error(f"TTS sentence error: {e}") logger.error(f"TTS sentence error: {e}")
@@ -513,6 +544,10 @@ class DuplexPipeline:
return return
try: try:
# Start latency tracking for greeting
speak_start_time = time.time()
first_audio_sent = False
# Send track start event # Send track start event
await self.transport.send_event({ await self.transport.send_event({
"event": "trackStart", "event": "trackStart",
@@ -528,6 +563,20 @@ class DuplexPipeline:
logger.info("TTS interrupted by barge-in") logger.info("TTS interrupted by barge-in")
break break
# Track and log first audio packet latency (TTFB)
if not first_audio_sent:
ttfb_ms = (time.time() - speak_start_time) * 1000
first_audio_sent = True
logger.info(f"[TTFB] Greeting first audio packet latency: {ttfb_ms:.0f}ms (session {self.session_id})")
# Send TTFB event to client
await self.transport.send_event({
"event": "ttfb",
"trackId": self.session_id,
"timestamp": self._get_timestamp_ms(),
"latencyMs": round(ttfb_ms)
})
# Send audio to client # Send audio to client
await self.transport.send_audio(chunk.audio) await self.transport.send_audio(chunk.audio)
@@ -561,8 +610,17 @@ class DuplexPipeline:
self._barge_in_speech_frames = 0 self._barge_in_speech_frames = 0
self._barge_in_silence_frames = 0 self._barge_in_silence_frames = 0
# Signal interruption # IMPORTANT: Signal interruption FIRST to stop audio sending
self._interrupt_event.set() self._interrupt_event.set()
self._is_bot_speaking = False
# Send interrupt event to client IMMEDIATELY
# This must happen BEFORE canceling services, so client knows to discard in-flight audio
await self.transport.send_event({
"event": "interrupt",
"trackId": self.session_id,
"timestamp": self._get_timestamp_ms()
})
# Cancel TTS # Cancel TTS
if self.tts_service: if self.tts_service:
@@ -575,15 +633,7 @@ class DuplexPipeline:
# Interrupt conversation # Interrupt conversation
await self.conversation.interrupt() await self.conversation.interrupt()
# Send interrupt event to client
await self.transport.send_event({
"event": "interrupt",
"trackId": self.session_id,
"timestamp": self._get_timestamp_ms()
})
# Reset for new user turn # Reset for new user turn
self._is_bot_speaking = False
await self.conversation.start_user_turn() await self.conversation.start_user_turn()
self._audio_buffer = b"" self._audio_buffer = b""
self.eou_detector.reset() self.eou_detector.reset()

187
docs/proejct_todo.md Normal file
View File

@@ -0,0 +1,187 @@
# OmniSense: 12-Week Sprint Board + Tech Stack (Python Backend) — TODO
## Scope
- [ ] Build a realtime AI SaaS (OmniSense) focused on web-first audio + video with WebSocket + WebRTC endpoints
- [ ] Deliver assistant builder, tool execution, observability, evals, optional telephony later
- [ ] Keep scope aligned to 2-person team, self-hosted services
---
## Sprint Board (12 weeks, 2-week sprints)
Team assumption: 2 engineers. Scope prioritized to web-first audio + video, with BYO-SFU adapters.
### Sprint 1 (Weeks 12) — Realtime Core MVP (WebSocket + WebRTC Audio)
- Deliverables
- [ ] WebSocket transport: audio in/out streaming (1:1)
- [ ] WebRTC transport: audio in/out streaming (1:1)
- [ ] Adapter contract wired into runtime (transport-agnostic session core)
- [ ] ASR → LLM → TTS pipeline, streaming both directions
- [ ] Basic session state (start/stop, silence timeout)
- [ ] Transcript persistence
- Acceptance criteria
- [ ] < 1.5s median round-trip for short responses
- [ ] Stable streaming for 10+ minute session
### Sprint 2 (Weeks 34) — Video + Realtime UX
- Deliverables
- [ ] WebRTC video capture + streaming (assistant can “see” frames)
- [ ] WebSocket video streaming for local/dev mode
- [ ] Low-latency UI: push-to-talk, live captions, speaking indicator
- [ ] Recording + transcript storage (web sessions)
- Acceptance criteria
- [ ] Video < 2.5s end-to-end latency for analysis
- [ ] Audio quality acceptable (no clipping, jitter handling)
### Sprint 3 (Weeks 56) — Assistant Builder v1
- Deliverables
- [ ] Assistant schema + versioning
- [ ] UI: Model/Voice/Transcriber/Tools/Video/Transport tabs
- [ ] “Test/Chat/Talk to Assistant” (web)
- Acceptance criteria
- [ ] Create/publish assistant and run a live web session
- [ ] All config changes tracked by version
### Sprint 4 (Weeks 78) — Tooling + Structured Outputs
- Deliverables
- [ ] Tool registry + custom HTTP tools
- [ ] Tool auth secrets management
- [ ] Structured outputs (JSON extraction)
- Acceptance criteria
- [ ] Tool calls executed with retries/timeouts
- [ ] Structured JSON stored per call/session
### Sprint 5 (Weeks 910) — Observability + QA + Dev Platform
- Deliverables
- [ ] Session logs + chat logs + media logs
- [ ] Evals engine + test suites
- [ ] Basic analytics dashboard
- [ ] Public WebSocket API spec + message schema
- [ ] JS/TS SDK (connect, send audio/video, receive transcripts)
- Acceptance criteria
- [ ] Reproducible test suite runs
- [ ] Log filters by assistant/time/status
- [ ] SDK demo app runs end-to-end
### Sprint 6 (Weeks 1112) — SaaS Hardening
- Deliverables
- [ ] Org/RBAC + API keys + rate limits
- [ ] Usage metering + credits
- [ ] Stripe billing integration
- [ ] Self-hosted DB ops (migrations, backup/restore, monitoring)
- Acceptance criteria
- [ ] Metered usage per org
- [ ] Credits decrement correctly
- [ ] Optional telephony spike documented (defer build)
- [ ] Enterprise adapter guide published (BYO-SFU)
---
## Tech Stack by Service (Self-Hosted, Web-First)
### 1) Transport Gateway (Realtime)
- [ ] WebRTC (browser) + WebSocket (lightweight/dev) protocols
- [ ] BYO-SFU adapter (enterprise) + LiveKit optional adapter + WS transport server
- [ ] Python core (FastAPI + asyncio) + Node.js mediasoup adapters when needed
- [ ] Media: Opus/VP8, jitter buffer, VAD, echo cancellation
- [ ] Storage: S3-compatible (MinIO) for recordings
### 2) ASR Service
- [ ] Whisper (self-hosted) baseline
- [ ] gRPC/WebSocket streaming transport
- [ ] Python native service
- [ ] Optional cloud provider fallback (later)
### 3) TTS Service
- [ ] Piper or Coqui TTS (self-hosted)
- [ ] gRPC/WebSocket streaming transport
- [ ] Python native service
- [ ] Redis cache for common phrases
### 4) LLM Orchestrator
- [ ] Self-hosted (vLLM + open model)
- [ ] Python (FastAPI + asyncio)
- [ ] Streaming, tool calling, JSON mode
- [ ] Safety filters + prompt templates
### 5) Assistant Config Service
- [ ] PostgreSQL
- [ ] Python (SQLAlchemy or SQLModel)
- [ ] Versioning, publish/rollback
### 6) Session Service
- [ ] PostgreSQL + Redis
- [ ] Python
- [ ] State machine, timeouts, events
### 7) Tool Execution Layer
- [ ] PostgreSQL
- [ ] Python
- [ ] Auth secret vault, retry policies, tool schemas
### 8) Observability + Logs
- [ ] Postgres (metadata), ClickHouse (logs/metrics)
- [ ] OpenSearch for search
- [ ] Prometheus + Grafana metrics
- [ ] OpenTelemetry tracing
### 9) Billing + Usage Metering
- [ ] Stripe billing
- [ ] PostgreSQL
- [ ] NATS JetStream (events) + Redis counters
### 10) Web App (Dashboard)
- [ ] React + Next.js
- [ ] Tailwind or Radix UI
- [ ] WebRTC client + WS client; adapter-based RTC integration
- [ ] ECharts/Recharts
### 11) Auth + RBAC
- [ ] Keycloak (self-hosted) or custom JWT
- [ ] Org/user/role tables in Postgres
### 12) Public WebSocket API + SDK
- [ ] WS API: versioned schema, binary audio frames + JSON control messages
- [ ] SDKs: JS/TS first, optional Python/Go clients
- [ ] Docs: quickstart, auth flow, session lifecycle, examples
---
## Infrastructure (Self-Hosted)
- [ ] Docker Compose → k3s (later)
- [ ] Redis Streams or NATS
- [ ] MinIO object store
- [ ] GitHub Actions + Helm or kustomize
- [ ] Self-hosted Postgres + pgbackrest backups
- [ ] Vault for secrets
---
## Suggested MVP Sequence
- [ ] WebRTC demo + ASR/LLM/TTS streaming
- [ ] Assistant schema + versioning (web-first)
- [ ] Video capture + multimodal analysis
- [ ] Tool execution + structured outputs
- [ ] Logs + evals + public WS API + SDK
- [ ] Telephony (optional, later)
---
## Public WebSocket API (Minimum Spec)
- [ ] Auth: API key or JWT in initial `hello` message
- [ ] Core messages: `session.start`, `session.stop`, `audio.append`, `audio.commit`, `video.append`, `transcript.delta`, `assistant.response`, `tool.call`, `tool.result`, `error`
- [ ] Binary payloads: PCM/Opus frames with metadata in control channel
- [ ] Versioning: `v1` schema with backward compatibility rules
---
## Self-Hosted DB Ops Checklist
- [ ] Postgres in Docker/k3s with persistent volumes
- [ ] Migrations: `alembic` or `atlas`
- [ ] Backups: `pgbackrest` nightly + on-demand
- [ ] Monitoring: postgres_exporter + alerts
---
## RTC Adapter Contract (BYO-SFU First)
- [ ] Keep RTC pluggable; LiveKit optional, not core dependency
- [ ] Define adapter interface (TypeScript sketch)

View File

@@ -17,6 +17,7 @@ import argparse
import asyncio import asyncio
import json import json
import sys import sys
import time
import threading import threading
import queue import queue
from pathlib import Path from pathlib import Path
@@ -92,6 +93,14 @@ class MicrophoneClient:
# State # State
self.is_recording = True self.is_recording = True
self.is_playing = True self.is_playing = True
# TTFB tracking (Time to First Byte)
self.request_start_time = None
self.first_audio_received = False
# Interrupt handling - discard audio until next trackStart
self._discard_audio = False
self._audio_sequence = 0 # Track audio sequence to detect stale chunks
async def connect(self) -> None: async def connect(self) -> None:
"""Connect to WebSocket server.""" """Connect to WebSocket server."""
@@ -117,6 +126,10 @@ class MicrophoneClient:
async def send_chat(self, text: str) -> None: async def send_chat(self, text: str) -> None:
"""Send chat message (text input).""" """Send chat message (text input)."""
# Reset TTFB tracking for new request
self.request_start_time = time.time()
self.first_audio_received = False
await self.send_command({ await self.send_command({
"command": "chat", "command": "chat",
"text": text "text": text
@@ -236,9 +249,21 @@ class MicrophoneClient:
# Audio data received # Audio data received
self.bytes_received += len(message) self.bytes_received += len(message)
# Check if we should discard this audio (after interrupt)
if self._discard_audio:
duration_ms = len(message) / (self.sample_rate * 2) * 1000
print(f"← Audio: {duration_ms:.0f}ms (DISCARDED - waiting for new track)")
continue
if self.is_playing: if self.is_playing:
self._add_audio_to_buffer(message) self._add_audio_to_buffer(message)
# Calculate and display TTFB for first audio packet
if not self.first_audio_received and self.request_start_time:
client_ttfb_ms = (time.time() - self.request_start_time) * 1000
self.first_audio_received = True
print(f"← [TTFB] Client first audio latency: {client_ttfb_ms:.0f}ms")
# Show progress (less verbose) # Show progress (less verbose)
with self.audio_output_lock: with self.audio_output_lock:
buffer_ms = len(self.audio_output_buffer) / (self.sample_rate * 2) * 1000 buffer_ms = len(self.audio_output_buffer) / (self.sample_rate * 2) * 1000
@@ -285,20 +310,36 @@ class MicrophoneClient:
# Interim result - show with indicator (overwrite same line) # Interim result - show with indicator (overwrite same line)
display_text = text[:60] + "..." if len(text) > 60 else text display_text = text[:60] + "..." if len(text) > 60 else text
print(f" [listening] {display_text}".ljust(80), end="\r") print(f" [listening] {display_text}".ljust(80), end="\r")
elif event_type == "ttfb":
# Server-side TTFB event
latency_ms = event.get("latencyMs", 0)
print(f"← [TTFB] Server reported latency: {latency_ms}ms")
elif event_type == "trackStart": elif event_type == "trackStart":
print("← Bot started speaking") print("← Bot started speaking")
# IMPORTANT: Accept audio again after trackStart
self._discard_audio = False
self._audio_sequence += 1
# Reset TTFB tracking for voice responses (when no chat was sent)
if self.request_start_time is None:
self.request_start_time = time.time()
self.first_audio_received = False
# Clear any old audio in buffer # Clear any old audio in buffer
with self.audio_output_lock: with self.audio_output_lock:
self.audio_output_buffer = b"" self.audio_output_buffer = b""
elif event_type == "trackEnd": elif event_type == "trackEnd":
print("← Bot finished speaking") print("← Bot finished speaking")
# Reset TTFB tracking after response completes
self.request_start_time = None
self.first_audio_received = False
elif event_type == "interrupt": elif event_type == "interrupt":
print("← Bot interrupted!") print("← Bot interrupted!")
# IMPORTANT: Clear audio buffer immediately on interrupt # IMPORTANT: Discard all audio until next trackStart
self._discard_audio = True
# Clear audio buffer immediately
with self.audio_output_lock: with self.audio_output_lock:
buffer_ms = len(self.audio_output_buffer) / (self.sample_rate * 2) * 1000 buffer_ms = len(self.audio_output_buffer) / (self.sample_rate * 2) * 1000
self.audio_output_buffer = b"" self.audio_output_buffer = b""
print(f" (cleared {buffer_ms:.0f}ms of buffered audio)") print(f" (cleared {buffer_ms:.0f}ms, discarding audio until new track)")
elif event_type == "error": elif event_type == "error":
print(f"← Error: {event.get('error')}") print(f"← Error: {event.get('error')}")
elif event_type == "hangup": elif event_type == "hangup":

View File

@@ -12,6 +12,7 @@ import argparse
import asyncio import asyncio
import json import json
import sys import sys
import time
import wave import wave
import io import io
@@ -67,6 +68,13 @@ class SimpleVoiceClient:
# Stats # Stats
self.bytes_received = 0 self.bytes_received = 0
# TTFB tracking (Time to First Byte)
self.request_start_time = None
self.first_audio_received = False
# Interrupt handling - discard audio until next trackStart
self._discard_audio = False
async def connect(self): async def connect(self):
"""Connect to server.""" """Connect to server."""
@@ -84,6 +92,10 @@ class SimpleVoiceClient:
async def send_chat(self, text: str): async def send_chat(self, text: str):
"""Send chat message.""" """Send chat message."""
# Reset TTFB tracking for new request
self.request_start_time = time.time()
self.first_audio_received = False
await self.ws.send(json.dumps({"command": "chat", "text": text})) await self.ws.send(json.dumps({"command": "chat", "text": text}))
print(f"-> chat: {text}") print(f"-> chat: {text}")
@@ -120,6 +132,18 @@ class SimpleVoiceClient:
# Audio data # Audio data
self.bytes_received += len(msg) self.bytes_received += len(msg)
duration_ms = len(msg) / (self.sample_rate * 2) * 1000 duration_ms = len(msg) / (self.sample_rate * 2) * 1000
# Check if we should discard this audio (after interrupt)
if self._discard_audio:
print(f"<- audio: {len(msg)} bytes ({duration_ms:.0f}ms) [DISCARDED]")
continue
# Calculate and display TTFB for first audio packet
if not self.first_audio_received and self.request_start_time:
client_ttfb_ms = (time.time() - self.request_start_time) * 1000
self.first_audio_received = True
print(f"<- [TTFB] Client first audio latency: {client_ttfb_ms:.0f}ms")
print(f"<- audio: {len(msg)} bytes ({duration_ms:.0f}ms)") print(f"<- audio: {len(msg)} bytes ({duration_ms:.0f}ms)")
# Play immediately in executor to not block # Play immediately in executor to not block
@@ -138,6 +162,18 @@ class SimpleVoiceClient:
print(f"<- You said: {text}") print(f"<- You said: {text}")
else: else:
print(f"<- [listening] {text}", end="\r") print(f"<- [listening] {text}", end="\r")
elif etype == "ttfb":
# Server-side TTFB event
latency_ms = event.get("latencyMs", 0)
print(f"<- [TTFB] Server reported latency: {latency_ms}ms")
elif etype == "trackStart":
# New track starting - accept audio again
self._discard_audio = False
print(f"<- {etype}")
elif etype == "interrupt":
# Interrupt - discard audio until next trackStart
self._discard_audio = True
print(f"<- {etype} (discarding audio until new track)")
elif etype == "hangup": elif etype == "hangup":
print(f"<- {etype}") print(f"<- {etype}")
self.running = False self.running = False