From 8bc24ded59b94311c1271ff64265b55ddae45e73 Mon Sep 17 00:00:00 2001 From: Xin Wang Date: Tue, 3 Feb 2026 12:05:09 +0800 Subject: [PATCH] fix long run bug --- README.md | 18 +++ core/duplex_pipeline.py | 68 +++++++-- docs/proejct_todo.md | 187 ++++++++++++++++++++++++ examples/mic_client.py | 45 +++++- examples/simple_client.py | 36 +++++ {scripts => examples}/test_websocket.py | 0 6 files changed, 343 insertions(+), 11 deletions(-) create mode 100644 docs/proejct_todo.md rename {scripts => examples}/test_websocket.py (100%) diff --git a/README.md b/README.md index c16edbd..6e7da04 100644 --- a/README.md +++ b/README.md @@ -5,3 +5,21 @@ Python Active-Call: real-time audio streaming with WebSocket and WebRTC. This repo contains a Python 3.11+ codebase for building low-latency voice pipelines (capture, stream, and process audio) using WebRTC and WebSockets. It is currently in an early, experimental stage. + +# Usage + +启动 + +``` +uvicorn app.main:app --reload --host 0.0.0.0 --port 8000 +``` + +测试 + +``` +python examples/test_websocket.py +``` + +``` +python mic_client.py +``` \ No newline at end of file diff --git a/core/duplex_pipeline.py b/core/duplex_pipeline.py index 4714a1d..4dd0bae 100644 --- a/core/duplex_pipeline.py +++ b/core/duplex_pipeline.py @@ -113,6 +113,10 @@ class DuplexPipeline: # Interruption handling self._interrupt_event = asyncio.Event() + # Latency tracking - TTFB (Time to First Byte) + self._turn_start_time: Optional[float] = None + self._first_audio_sent: bool = False + # Barge-in filtering - require minimum speech duration to interrupt self._barge_in_speech_start_time: Optional[float] = None self._barge_in_min_duration_ms: int = settings.barge_in_min_duration_ms if hasattr(settings, 'barge_in_min_duration_ms') else 50 @@ -396,6 +400,10 @@ class DuplexPipeline: user_text: User's transcribed text """ try: + # Start latency tracking + self._turn_start_time = time.time() + self._first_audio_sent = False + # Get AI response (streaming) messages = self.conversation.get_messages() full_response = "" @@ -495,10 +503,33 @@ class DuplexPipeline: try: async for chunk in self.tts_service.synthesize_stream(text): + # Check interrupt at the start of each iteration + if self._interrupt_event.is_set(): + logger.debug("TTS sentence interrupted") + break + + # Track and log first audio packet latency (TTFB) + if not self._first_audio_sent and self._turn_start_time: + ttfb_ms = (time.time() - self._turn_start_time) * 1000 + self._first_audio_sent = True + logger.info(f"[TTFB] Server first audio packet latency: {ttfb_ms:.0f}ms (session {self.session_id})") + + # Send TTFB event to client + await self.transport.send_event({ + "event": "ttfb", + "trackId": self.session_id, + "timestamp": self._get_timestamp_ms(), + "latencyMs": round(ttfb_ms) + }) + + # Double-check interrupt right before sending audio if self._interrupt_event.is_set(): break + await self.transport.send_audio(chunk.audio) await asyncio.sleep(0.005) # Small delay to prevent flooding + except asyncio.CancelledError: + logger.debug("TTS sentence cancelled") except Exception as e: logger.error(f"TTS sentence error: {e}") @@ -513,6 +544,10 @@ class DuplexPipeline: return try: + # Start latency tracking for greeting + speak_start_time = time.time() + first_audio_sent = False + # Send track start event await self.transport.send_event({ "event": "trackStart", @@ -528,6 +563,20 @@ class DuplexPipeline: logger.info("TTS interrupted by barge-in") break + # Track and log first audio packet latency (TTFB) + if not first_audio_sent: + ttfb_ms = (time.time() - speak_start_time) * 1000 + first_audio_sent = True + logger.info(f"[TTFB] Greeting first audio packet latency: {ttfb_ms:.0f}ms (session {self.session_id})") + + # Send TTFB event to client + await self.transport.send_event({ + "event": "ttfb", + "trackId": self.session_id, + "timestamp": self._get_timestamp_ms(), + "latencyMs": round(ttfb_ms) + }) + # Send audio to client await self.transport.send_audio(chunk.audio) @@ -561,8 +610,17 @@ class DuplexPipeline: self._barge_in_speech_frames = 0 self._barge_in_silence_frames = 0 - # Signal interruption + # IMPORTANT: Signal interruption FIRST to stop audio sending self._interrupt_event.set() + self._is_bot_speaking = False + + # Send interrupt event to client IMMEDIATELY + # This must happen BEFORE canceling services, so client knows to discard in-flight audio + await self.transport.send_event({ + "event": "interrupt", + "trackId": self.session_id, + "timestamp": self._get_timestamp_ms() + }) # Cancel TTS if self.tts_service: @@ -575,15 +633,7 @@ class DuplexPipeline: # Interrupt conversation await self.conversation.interrupt() - # Send interrupt event to client - await self.transport.send_event({ - "event": "interrupt", - "trackId": self.session_id, - "timestamp": self._get_timestamp_ms() - }) - # Reset for new user turn - self._is_bot_speaking = False await self.conversation.start_user_turn() self._audio_buffer = b"" self.eou_detector.reset() diff --git a/docs/proejct_todo.md b/docs/proejct_todo.md new file mode 100644 index 0000000..18a9f17 --- /dev/null +++ b/docs/proejct_todo.md @@ -0,0 +1,187 @@ +# OmniSense: 12-Week Sprint Board + Tech Stack (Python Backend) — TODO + +## Scope +- [ ] Build a realtime AI SaaS (OmniSense) focused on web-first audio + video with WebSocket + WebRTC endpoints +- [ ] Deliver assistant builder, tool execution, observability, evals, optional telephony later +- [ ] Keep scope aligned to 2-person team, self-hosted services + +--- + +## Sprint Board (12 weeks, 2-week sprints) +Team assumption: 2 engineers. Scope prioritized to web-first audio + video, with BYO-SFU adapters. + +### Sprint 1 (Weeks 1–2) — Realtime Core MVP (WebSocket + WebRTC Audio) +- Deliverables + - [ ] WebSocket transport: audio in/out streaming (1:1) + - [ ] WebRTC transport: audio in/out streaming (1:1) + - [ ] Adapter contract wired into runtime (transport-agnostic session core) + - [ ] ASR → LLM → TTS pipeline, streaming both directions + - [ ] Basic session state (start/stop, silence timeout) + - [ ] Transcript persistence +- Acceptance criteria + - [ ] < 1.5s median round-trip for short responses + - [ ] Stable streaming for 10+ minute session + +### Sprint 2 (Weeks 3–4) — Video + Realtime UX +- Deliverables + - [ ] WebRTC video capture + streaming (assistant can “see” frames) + - [ ] WebSocket video streaming for local/dev mode + - [ ] Low-latency UI: push-to-talk, live captions, speaking indicator + - [ ] Recording + transcript storage (web sessions) +- Acceptance criteria + - [ ] Video < 2.5s end-to-end latency for analysis + - [ ] Audio quality acceptable (no clipping, jitter handling) + +### Sprint 3 (Weeks 5–6) — Assistant Builder v1 +- Deliverables + - [ ] Assistant schema + versioning + - [ ] UI: Model/Voice/Transcriber/Tools/Video/Transport tabs + - [ ] “Test/Chat/Talk to Assistant” (web) +- Acceptance criteria + - [ ] Create/publish assistant and run a live web session + - [ ] All config changes tracked by version + +### Sprint 4 (Weeks 7–8) — Tooling + Structured Outputs +- Deliverables + - [ ] Tool registry + custom HTTP tools + - [ ] Tool auth secrets management + - [ ] Structured outputs (JSON extraction) +- Acceptance criteria + - [ ] Tool calls executed with retries/timeouts + - [ ] Structured JSON stored per call/session + +### Sprint 5 (Weeks 9–10) — Observability + QA + Dev Platform +- Deliverables + - [ ] Session logs + chat logs + media logs + - [ ] Evals engine + test suites + - [ ] Basic analytics dashboard + - [ ] Public WebSocket API spec + message schema + - [ ] JS/TS SDK (connect, send audio/video, receive transcripts) +- Acceptance criteria + - [ ] Reproducible test suite runs + - [ ] Log filters by assistant/time/status + - [ ] SDK demo app runs end-to-end + +### Sprint 6 (Weeks 11–12) — SaaS Hardening +- Deliverables + - [ ] Org/RBAC + API keys + rate limits + - [ ] Usage metering + credits + - [ ] Stripe billing integration + - [ ] Self-hosted DB ops (migrations, backup/restore, monitoring) +- Acceptance criteria + - [ ] Metered usage per org + - [ ] Credits decrement correctly + - [ ] Optional telephony spike documented (defer build) + - [ ] Enterprise adapter guide published (BYO-SFU) + +--- + +## Tech Stack by Service (Self-Hosted, Web-First) + +### 1) Transport Gateway (Realtime) +- [ ] WebRTC (browser) + WebSocket (lightweight/dev) protocols +- [ ] BYO-SFU adapter (enterprise) + LiveKit optional adapter + WS transport server +- [ ] Python core (FastAPI + asyncio) + Node.js mediasoup adapters when needed +- [ ] Media: Opus/VP8, jitter buffer, VAD, echo cancellation +- [ ] Storage: S3-compatible (MinIO) for recordings + +### 2) ASR Service +- [ ] Whisper (self-hosted) baseline +- [ ] gRPC/WebSocket streaming transport +- [ ] Python native service +- [ ] Optional cloud provider fallback (later) + +### 3) TTS Service +- [ ] Piper or Coqui TTS (self-hosted) +- [ ] gRPC/WebSocket streaming transport +- [ ] Python native service +- [ ] Redis cache for common phrases + +### 4) LLM Orchestrator +- [ ] Self-hosted (vLLM + open model) +- [ ] Python (FastAPI + asyncio) +- [ ] Streaming, tool calling, JSON mode +- [ ] Safety filters + prompt templates + +### 5) Assistant Config Service +- [ ] PostgreSQL +- [ ] Python (SQLAlchemy or SQLModel) +- [ ] Versioning, publish/rollback + +### 6) Session Service +- [ ] PostgreSQL + Redis +- [ ] Python +- [ ] State machine, timeouts, events + +### 7) Tool Execution Layer +- [ ] PostgreSQL +- [ ] Python +- [ ] Auth secret vault, retry policies, tool schemas + +### 8) Observability + Logs +- [ ] Postgres (metadata), ClickHouse (logs/metrics) +- [ ] OpenSearch for search +- [ ] Prometheus + Grafana metrics +- [ ] OpenTelemetry tracing + +### 9) Billing + Usage Metering +- [ ] Stripe billing +- [ ] PostgreSQL +- [ ] NATS JetStream (events) + Redis counters + +### 10) Web App (Dashboard) +- [ ] React + Next.js +- [ ] Tailwind or Radix UI +- [ ] WebRTC client + WS client; adapter-based RTC integration +- [ ] ECharts/Recharts + +### 11) Auth + RBAC +- [ ] Keycloak (self-hosted) or custom JWT +- [ ] Org/user/role tables in Postgres + +### 12) Public WebSocket API + SDK +- [ ] WS API: versioned schema, binary audio frames + JSON control messages +- [ ] SDKs: JS/TS first, optional Python/Go clients +- [ ] Docs: quickstart, auth flow, session lifecycle, examples + +--- + +## Infrastructure (Self-Hosted) +- [ ] Docker Compose → k3s (later) +- [ ] Redis Streams or NATS +- [ ] MinIO object store +- [ ] GitHub Actions + Helm or kustomize +- [ ] Self-hosted Postgres + pgbackrest backups +- [ ] Vault for secrets + +--- + +## Suggested MVP Sequence +- [ ] WebRTC demo + ASR/LLM/TTS streaming +- [ ] Assistant schema + versioning (web-first) +- [ ] Video capture + multimodal analysis +- [ ] Tool execution + structured outputs +- [ ] Logs + evals + public WS API + SDK +- [ ] Telephony (optional, later) + +--- + +## Public WebSocket API (Minimum Spec) +- [ ] Auth: API key or JWT in initial `hello` message +- [ ] Core messages: `session.start`, `session.stop`, `audio.append`, `audio.commit`, `video.append`, `transcript.delta`, `assistant.response`, `tool.call`, `tool.result`, `error` +- [ ] Binary payloads: PCM/Opus frames with metadata in control channel +- [ ] Versioning: `v1` schema with backward compatibility rules + +--- + +## Self-Hosted DB Ops Checklist +- [ ] Postgres in Docker/k3s with persistent volumes +- [ ] Migrations: `alembic` or `atlas` +- [ ] Backups: `pgbackrest` nightly + on-demand +- [ ] Monitoring: postgres_exporter + alerts + +--- + +## RTC Adapter Contract (BYO-SFU First) +- [ ] Keep RTC pluggable; LiveKit optional, not core dependency +- [ ] Define adapter interface (TypeScript sketch) \ No newline at end of file diff --git a/examples/mic_client.py b/examples/mic_client.py index b7b95cb..3d9acc7 100644 --- a/examples/mic_client.py +++ b/examples/mic_client.py @@ -17,6 +17,7 @@ import argparse import asyncio import json import sys +import time import threading import queue from pathlib import Path @@ -92,6 +93,14 @@ class MicrophoneClient: # State self.is_recording = True self.is_playing = True + + # TTFB tracking (Time to First Byte) + self.request_start_time = None + self.first_audio_received = False + + # Interrupt handling - discard audio until next trackStart + self._discard_audio = False + self._audio_sequence = 0 # Track audio sequence to detect stale chunks async def connect(self) -> None: """Connect to WebSocket server.""" @@ -117,6 +126,10 @@ class MicrophoneClient: async def send_chat(self, text: str) -> None: """Send chat message (text input).""" + # Reset TTFB tracking for new request + self.request_start_time = time.time() + self.first_audio_received = False + await self.send_command({ "command": "chat", "text": text @@ -236,9 +249,21 @@ class MicrophoneClient: # Audio data received self.bytes_received += len(message) + # Check if we should discard this audio (after interrupt) + if self._discard_audio: + duration_ms = len(message) / (self.sample_rate * 2) * 1000 + print(f"← Audio: {duration_ms:.0f}ms (DISCARDED - waiting for new track)") + continue + if self.is_playing: self._add_audio_to_buffer(message) + # Calculate and display TTFB for first audio packet + if not self.first_audio_received and self.request_start_time: + client_ttfb_ms = (time.time() - self.request_start_time) * 1000 + self.first_audio_received = True + print(f"← [TTFB] Client first audio latency: {client_ttfb_ms:.0f}ms") + # Show progress (less verbose) with self.audio_output_lock: buffer_ms = len(self.audio_output_buffer) / (self.sample_rate * 2) * 1000 @@ -285,20 +310,36 @@ class MicrophoneClient: # Interim result - show with indicator (overwrite same line) display_text = text[:60] + "..." if len(text) > 60 else text print(f" [listening] {display_text}".ljust(80), end="\r") + elif event_type == "ttfb": + # Server-side TTFB event + latency_ms = event.get("latencyMs", 0) + print(f"← [TTFB] Server reported latency: {latency_ms}ms") elif event_type == "trackStart": print("← Bot started speaking") + # IMPORTANT: Accept audio again after trackStart + self._discard_audio = False + self._audio_sequence += 1 + # Reset TTFB tracking for voice responses (when no chat was sent) + if self.request_start_time is None: + self.request_start_time = time.time() + self.first_audio_received = False # Clear any old audio in buffer with self.audio_output_lock: self.audio_output_buffer = b"" elif event_type == "trackEnd": print("← Bot finished speaking") + # Reset TTFB tracking after response completes + self.request_start_time = None + self.first_audio_received = False elif event_type == "interrupt": print("← Bot interrupted!") - # IMPORTANT: Clear audio buffer immediately on interrupt + # IMPORTANT: Discard all audio until next trackStart + self._discard_audio = True + # Clear audio buffer immediately with self.audio_output_lock: buffer_ms = len(self.audio_output_buffer) / (self.sample_rate * 2) * 1000 self.audio_output_buffer = b"" - print(f" (cleared {buffer_ms:.0f}ms of buffered audio)") + print(f" (cleared {buffer_ms:.0f}ms, discarding audio until new track)") elif event_type == "error": print(f"← Error: {event.get('error')}") elif event_type == "hangup": diff --git a/examples/simple_client.py b/examples/simple_client.py index fec5700..4280f93 100644 --- a/examples/simple_client.py +++ b/examples/simple_client.py @@ -12,6 +12,7 @@ import argparse import asyncio import json import sys +import time import wave import io @@ -67,6 +68,13 @@ class SimpleVoiceClient: # Stats self.bytes_received = 0 + + # TTFB tracking (Time to First Byte) + self.request_start_time = None + self.first_audio_received = False + + # Interrupt handling - discard audio until next trackStart + self._discard_audio = False async def connect(self): """Connect to server.""" @@ -84,6 +92,10 @@ class SimpleVoiceClient: async def send_chat(self, text: str): """Send chat message.""" + # Reset TTFB tracking for new request + self.request_start_time = time.time() + self.first_audio_received = False + await self.ws.send(json.dumps({"command": "chat", "text": text})) print(f"-> chat: {text}") @@ -120,6 +132,18 @@ class SimpleVoiceClient: # Audio data self.bytes_received += len(msg) duration_ms = len(msg) / (self.sample_rate * 2) * 1000 + + # Check if we should discard this audio (after interrupt) + if self._discard_audio: + print(f"<- audio: {len(msg)} bytes ({duration_ms:.0f}ms) [DISCARDED]") + continue + + # Calculate and display TTFB for first audio packet + if not self.first_audio_received and self.request_start_time: + client_ttfb_ms = (time.time() - self.request_start_time) * 1000 + self.first_audio_received = True + print(f"<- [TTFB] Client first audio latency: {client_ttfb_ms:.0f}ms") + print(f"<- audio: {len(msg)} bytes ({duration_ms:.0f}ms)") # Play immediately in executor to not block @@ -138,6 +162,18 @@ class SimpleVoiceClient: print(f"<- You said: {text}") else: print(f"<- [listening] {text}", end="\r") + elif etype == "ttfb": + # Server-side TTFB event + latency_ms = event.get("latencyMs", 0) + print(f"<- [TTFB] Server reported latency: {latency_ms}ms") + elif etype == "trackStart": + # New track starting - accept audio again + self._discard_audio = False + print(f"<- {etype}") + elif etype == "interrupt": + # Interrupt - discard audio until next trackStart + self._discard_audio = True + print(f"<- {etype} (discarding audio until new track)") elif etype == "hangup": print(f"<- {etype}") self.running = False diff --git a/scripts/test_websocket.py b/examples/test_websocket.py similarity index 100% rename from scripts/test_websocket.py rename to examples/test_websocket.py