From 77d54d284fafce58e65914e726c2a4ca9d492f3a Mon Sep 17 00:00:00 2001 From: Xin Wang Date: Wed, 4 Feb 2026 15:01:05 +0800 Subject: [PATCH] remove pipeline because it is just a vad integration --- core/__init__.py | 2 - core/pipeline.py | 131 ----------------------------------------------- core/session.py | 44 +++++----------- 3 files changed, 14 insertions(+), 163 deletions(-) delete mode 100644 core/pipeline.py diff --git a/core/__init__.py b/core/__init__.py index 9a8736f..0110686 100644 --- a/core/__init__.py +++ b/core/__init__.py @@ -2,7 +2,6 @@ from core.events import EventBus, get_event_bus from core.transports import BaseTransport, SocketTransport, WebRtcTransport -from core.pipeline import AudioPipeline from core.session import Session from core.conversation import ConversationManager, ConversationState, ConversationTurn from core.duplex_pipeline import DuplexPipeline @@ -13,7 +12,6 @@ __all__ = [ "BaseTransport", "SocketTransport", "WebRtcTransport", - "AudioPipeline", "Session", "ConversationManager", "ConversationState", diff --git a/core/pipeline.py b/core/pipeline.py deleted file mode 100644 index 2760e4e..0000000 --- a/core/pipeline.py +++ /dev/null @@ -1,131 +0,0 @@ -"""Audio processing pipeline.""" - -import asyncio -from typing import Optional -from loguru import logger - -from core.transports import BaseTransport -from core.events import EventBus, get_event_bus -from processors.vad import VADProcessor, SileroVAD -from app.config import settings - - -class AudioPipeline: - """ - Audio processing pipeline. - - Processes incoming audio through VAD and emits events. - """ - - def __init__(self, transport: BaseTransport, session_id: str): - """ - Initialize audio pipeline. - - Args: - transport: Transport instance for sending events/audio - session_id: Session identifier for event tracking - """ - self.transport = transport - self.session_id = session_id - self.event_bus = get_event_bus() - - # Initialize VAD - self.vad_model = SileroVAD( - model_path=settings.vad_model_path, - sample_rate=settings.sample_rate - ) - self.vad_processor = VADProcessor( - vad_model=self.vad_model, - threshold=settings.vad_threshold, - silence_threshold_ms=settings.vad_eou_threshold_ms, - min_speech_duration_ms=settings.vad_min_speech_duration_ms - ) - - # State - self.is_bot_speaking = False - self.interrupt_signal = asyncio.Event() - self._running = True - - logger.info(f"Audio pipeline initialized for session {session_id}") - - async def process_input(self, pcm_bytes: bytes) -> None: - """ - Process incoming audio chunk. - - Args: - pcm_bytes: PCM audio data (16-bit, mono, 16kHz) - """ - if not self._running: - return - - try: - # Process through VAD - result = self.vad_processor.process(pcm_bytes, settings.chunk_size_ms) - - if result: - event_type, probability = result - - # Emit event through event bus - await self.event_bus.publish(event_type, { - "trackId": self.session_id, - "probability": probability - }) - - # Send event to client - if event_type == "speaking": - logger.info(f"User speaking started (session {self.session_id})") - await self.transport.send_event({ - "event": "speaking", - "trackId": self.session_id, - "timestamp": self._get_timestamp_ms(), - "startTime": self._get_timestamp_ms() - }) - - elif event_type == "silence": - logger.info(f"User speaking stopped (session {self.session_id})") - await self.transport.send_event({ - "event": "silence", - "trackId": self.session_id, - "timestamp": self._get_timestamp_ms(), - "startTime": self._get_timestamp_ms(), - "duration": 0 # TODO: Calculate actual duration - }) - - elif event_type == "eou": - logger.info(f"EOU detected (session {self.session_id})") - await self.transport.send_event({ - "event": "eou", - "trackId": self.session_id, - "timestamp": self._get_timestamp_ms() - }) - - except Exception as e: - logger.error(f"Pipeline processing error: {e}", exc_info=True) - - async def process_text_input(self, text: str) -> None: - """ - Process text input (chat command). - - Args: - text: Text input - """ - logger.info(f"Processing text input: {text[:50]}...") - # TODO: Implement text processing (LLM integration, etc.) - # For now, just log it - - async def interrupt(self) -> None: - """Interrupt current audio playback.""" - if self.is_bot_speaking: - self.interrupt_signal.set() - logger.info(f"Pipeline interrupted for session {self.session_id}") - - async def cleanup(self) -> None: - """Cleanup pipeline resources.""" - logger.info(f"Cleaning up pipeline for session {self.session_id}") - self._running = False - self.interrupt_signal.set() - - def _get_timestamp_ms(self) -> int: - """Get current timestamp in milliseconds.""" - import time - return int(time.time() * 1000) diff --git a/core/session.py b/core/session.py index 8f5fc9b..54bf0d4 100644 --- a/core/session.py +++ b/core/session.py @@ -6,7 +6,7 @@ from typing import Optional, Dict, Any from loguru import logger from core.transports import BaseTransport -from core.pipeline import AudioPipeline +from core.duplex_pipeline import DuplexPipeline from models.commands import parse_command, TTSCommand, ChatCommand, InterruptCommand, HangupCommand from app.config import settings @@ -16,7 +16,7 @@ class Session: Manages a single call session. Handles command routing, audio processing, and session lifecycle. - Supports both basic audio pipeline and full duplex voice conversation. + Uses full duplex voice conversation pipeline. """ def __init__(self, session_id: str, transport: BaseTransport, use_duplex: bool = None): @@ -30,20 +30,14 @@ class Session: """ self.id = session_id self.transport = transport - - # Determine pipeline mode self.use_duplex = use_duplex if use_duplex is not None else settings.duplex_enabled - - if self.use_duplex: - from core.duplex_pipeline import DuplexPipeline - self.pipeline = DuplexPipeline( - transport=transport, - session_id=session_id, - system_prompt=settings.duplex_system_prompt, - greeting=settings.duplex_greeting - ) - else: - self.pipeline = AudioPipeline(transport, session_id) + + self.pipeline = DuplexPipeline( + transport=transport, + session_id=session_id, + system_prompt=settings.duplex_system_prompt, + greeting=settings.duplex_greeting + ) # Session state self.created_at = None @@ -129,10 +123,7 @@ class Session: audio_bytes: PCM audio data """ try: - if self.use_duplex: - await self.pipeline.process_audio(audio_bytes) - else: - await self.pipeline.process_input(audio_bytes) + await self.pipeline.process_audio(audio_bytes) except Exception as e: logger.error(f"Session {self.id} handle_audio error: {e}", exc_info=True) @@ -148,8 +139,8 @@ class Session: "timestamp": self._get_timestamp_ms() }) - # Start duplex pipeline if enabled - if self.use_duplex and not self._pipeline_started: + # Start duplex pipeline + if not self._pipeline_started: try: await self.pipeline.start() self._pipeline_started = True @@ -228,10 +219,7 @@ class Session: logger.info(f"Session {self.id} graceful interrupt") else: logger.info(f"Session {self.id} immediate interrupt") - if self.use_duplex: - await self.pipeline.interrupt() - else: - await self.pipeline.interrupt() + await self.pipeline.interrupt() async def _handle_pause(self) -> None: """Handle pause command.""" @@ -267,11 +255,7 @@ class Session: async def _handle_chat(self, command: ChatCommand) -> None: """Handle chat command.""" logger.info(f"Session {self.id} chat: {command.text[:50]}...") - # Process text input through pipeline - if self.use_duplex: - await self.pipeline.process_text(command.text) - else: - await self.pipeline.process_text_input(command.text) + await self.pipeline.process_text(command.text) async def _send_error(self, sender: str, error_message: str) -> None: """