remove pipeline because it is just a vad integration

This commit is contained in:
Xin Wang
2026-02-04 15:01:05 +08:00
parent 0835f6a617
commit 77d54d284f
3 changed files with 14 additions and 163 deletions

View File

@@ -2,7 +2,6 @@
from core.events import EventBus, get_event_bus from core.events import EventBus, get_event_bus
from core.transports import BaseTransport, SocketTransport, WebRtcTransport from core.transports import BaseTransport, SocketTransport, WebRtcTransport
from core.pipeline import AudioPipeline
from core.session import Session from core.session import Session
from core.conversation import ConversationManager, ConversationState, ConversationTurn from core.conversation import ConversationManager, ConversationState, ConversationTurn
from core.duplex_pipeline import DuplexPipeline from core.duplex_pipeline import DuplexPipeline
@@ -13,7 +12,6 @@ __all__ = [
"BaseTransport", "BaseTransport",
"SocketTransport", "SocketTransport",
"WebRtcTransport", "WebRtcTransport",
"AudioPipeline",
"Session", "Session",
"ConversationManager", "ConversationManager",
"ConversationState", "ConversationState",

View File

@@ -1,131 +0,0 @@
"""Audio processing pipeline."""
import asyncio
from typing import Optional
from loguru import logger
from core.transports import BaseTransport
from core.events import EventBus, get_event_bus
from processors.vad import VADProcessor, SileroVAD
from app.config import settings
class AudioPipeline:
"""
Audio processing pipeline.
Processes incoming audio through VAD and emits events.
"""
def __init__(self, transport: BaseTransport, session_id: str):
"""
Initialize audio pipeline.
Args:
transport: Transport instance for sending events/audio
session_id: Session identifier for event tracking
"""
self.transport = transport
self.session_id = session_id
self.event_bus = get_event_bus()
# Initialize VAD
self.vad_model = SileroVAD(
model_path=settings.vad_model_path,
sample_rate=settings.sample_rate
)
self.vad_processor = VADProcessor(
vad_model=self.vad_model,
threshold=settings.vad_threshold,
silence_threshold_ms=settings.vad_eou_threshold_ms,
min_speech_duration_ms=settings.vad_min_speech_duration_ms
)
# State
self.is_bot_speaking = False
self.interrupt_signal = asyncio.Event()
self._running = True
logger.info(f"Audio pipeline initialized for session {session_id}")
async def process_input(self, pcm_bytes: bytes) -> None:
"""
Process incoming audio chunk.
Args:
pcm_bytes: PCM audio data (16-bit, mono, 16kHz)
"""
if not self._running:
return
try:
# Process through VAD
result = self.vad_processor.process(pcm_bytes, settings.chunk_size_ms)
if result:
event_type, probability = result
# Emit event through event bus
await self.event_bus.publish(event_type, {
"trackId": self.session_id,
"probability": probability
})
# Send event to client
if event_type == "speaking":
logger.info(f"User speaking started (session {self.session_id})")
await self.transport.send_event({
"event": "speaking",
"trackId": self.session_id,
"timestamp": self._get_timestamp_ms(),
"startTime": self._get_timestamp_ms()
})
elif event_type == "silence":
logger.info(f"User speaking stopped (session {self.session_id})")
await self.transport.send_event({
"event": "silence",
"trackId": self.session_id,
"timestamp": self._get_timestamp_ms(),
"startTime": self._get_timestamp_ms(),
"duration": 0 # TODO: Calculate actual duration
})
elif event_type == "eou":
logger.info(f"EOU detected (session {self.session_id})")
await self.transport.send_event({
"event": "eou",
"trackId": self.session_id,
"timestamp": self._get_timestamp_ms()
})
except Exception as e:
logger.error(f"Pipeline processing error: {e}", exc_info=True)
async def process_text_input(self, text: str) -> None:
"""
Process text input (chat command).
Args:
text: Text input
"""
logger.info(f"Processing text input: {text[:50]}...")
# TODO: Implement text processing (LLM integration, etc.)
# For now, just log it
async def interrupt(self) -> None:
"""Interrupt current audio playback."""
if self.is_bot_speaking:
self.interrupt_signal.set()
logger.info(f"Pipeline interrupted for session {self.session_id}")
async def cleanup(self) -> None:
"""Cleanup pipeline resources."""
logger.info(f"Cleaning up pipeline for session {self.session_id}")
self._running = False
self.interrupt_signal.set()
def _get_timestamp_ms(self) -> int:
"""Get current timestamp in milliseconds."""
import time
return int(time.time() * 1000)

View File

@@ -6,7 +6,7 @@ from typing import Optional, Dict, Any
from loguru import logger from loguru import logger
from core.transports import BaseTransport from core.transports import BaseTransport
from core.pipeline import AudioPipeline from core.duplex_pipeline import DuplexPipeline
from models.commands import parse_command, TTSCommand, ChatCommand, InterruptCommand, HangupCommand from models.commands import parse_command, TTSCommand, ChatCommand, InterruptCommand, HangupCommand
from app.config import settings from app.config import settings
@@ -16,7 +16,7 @@ class Session:
Manages a single call session. Manages a single call session.
Handles command routing, audio processing, and session lifecycle. Handles command routing, audio processing, and session lifecycle.
Supports both basic audio pipeline and full duplex voice conversation. Uses full duplex voice conversation pipeline.
""" """
def __init__(self, session_id: str, transport: BaseTransport, use_duplex: bool = None): def __init__(self, session_id: str, transport: BaseTransport, use_duplex: bool = None):
@@ -30,20 +30,14 @@ class Session:
""" """
self.id = session_id self.id = session_id
self.transport = transport self.transport = transport
# Determine pipeline mode
self.use_duplex = use_duplex if use_duplex is not None else settings.duplex_enabled self.use_duplex = use_duplex if use_duplex is not None else settings.duplex_enabled
if self.use_duplex: self.pipeline = DuplexPipeline(
from core.duplex_pipeline import DuplexPipeline transport=transport,
self.pipeline = DuplexPipeline( session_id=session_id,
transport=transport, system_prompt=settings.duplex_system_prompt,
session_id=session_id, greeting=settings.duplex_greeting
system_prompt=settings.duplex_system_prompt, )
greeting=settings.duplex_greeting
)
else:
self.pipeline = AudioPipeline(transport, session_id)
# Session state # Session state
self.created_at = None self.created_at = None
@@ -129,10 +123,7 @@ class Session:
audio_bytes: PCM audio data audio_bytes: PCM audio data
""" """
try: try:
if self.use_duplex: await self.pipeline.process_audio(audio_bytes)
await self.pipeline.process_audio(audio_bytes)
else:
await self.pipeline.process_input(audio_bytes)
except Exception as e: except Exception as e:
logger.error(f"Session {self.id} handle_audio error: {e}", exc_info=True) logger.error(f"Session {self.id} handle_audio error: {e}", exc_info=True)
@@ -148,8 +139,8 @@ class Session:
"timestamp": self._get_timestamp_ms() "timestamp": self._get_timestamp_ms()
}) })
# Start duplex pipeline if enabled # Start duplex pipeline
if self.use_duplex and not self._pipeline_started: if not self._pipeline_started:
try: try:
await self.pipeline.start() await self.pipeline.start()
self._pipeline_started = True self._pipeline_started = True
@@ -228,10 +219,7 @@ class Session:
logger.info(f"Session {self.id} graceful interrupt") logger.info(f"Session {self.id} graceful interrupt")
else: else:
logger.info(f"Session {self.id} immediate interrupt") logger.info(f"Session {self.id} immediate interrupt")
if self.use_duplex: await self.pipeline.interrupt()
await self.pipeline.interrupt()
else:
await self.pipeline.interrupt()
async def _handle_pause(self) -> None: async def _handle_pause(self) -> None:
"""Handle pause command.""" """Handle pause command."""
@@ -267,11 +255,7 @@ class Session:
async def _handle_chat(self, command: ChatCommand) -> None: async def _handle_chat(self, command: ChatCommand) -> None:
"""Handle chat command.""" """Handle chat command."""
logger.info(f"Session {self.id} chat: {command.text[:50]}...") logger.info(f"Session {self.id} chat: {command.text[:50]}...")
# Process text input through pipeline await self.pipeline.process_text(command.text)
if self.use_duplex:
await self.pipeline.process_text(command.text)
else:
await self.pipeline.process_text_input(command.text)
async def _send_error(self, sender: str, error_message: str) -> None: async def _send_error(self, sender: str, error_message: str) -> None:
""" """