From d6d0ade33e70b4af10c8546f5aa7021b50779714 Mon Sep 17 00:00:00 2001 From: Xin Wang Date: Thu, 29 Jan 2026 16:36:46 +0800 Subject: [PATCH] a basic duplex agent is built --- app/config.py | 6 + core/duplex_pipeline.py | 96 +++++++++-- examples/mic_client.py | 9 + examples/simple_client.py | 14 +- services/__init__.py | 5 + services/siliconflow_asr.py | 317 ++++++++++++++++++++++++++++++++++++ 6 files changed, 432 insertions(+), 15 deletions(-) create mode 100644 services/siliconflow_asr.py diff --git a/app/config.py b/app/config.py index fbbb21f..c728c45 100644 --- a/app/config.py +++ b/app/config.py @@ -48,6 +48,12 @@ class Settings(BaseSettings): siliconflow_api_key: Optional[str] = Field(default=None, description="SiliconFlow API key") siliconflow_tts_model: str = Field(default="FunAudioLLM/CosyVoice2-0.5B", description="SiliconFlow TTS model") + # ASR Configuration + asr_provider: str = Field(default="siliconflow", description="ASR provider (siliconflow, buffered)") + siliconflow_asr_model: str = Field(default="FunAudioLLM/SenseVoiceSmall", description="SiliconFlow ASR model") + asr_interim_interval_ms: int = Field(default=500, description="Interval for interim ASR results in ms") + asr_min_audio_ms: int = Field(default=300, description="Minimum audio duration before first ASR result") + # Duplex Pipeline Configuration duplex_enabled: bool = Field(default=True, description="Enable duplex voice pipeline") duplex_greeting: Optional[str] = Field(default=None, description="Optional greeting message") diff --git a/core/duplex_pipeline.py b/core/duplex_pipeline.py index 25aef1a..db2ed81 100644 --- a/core/duplex_pipeline.py +++ b/core/duplex_pipeline.py @@ -25,6 +25,7 @@ from services.llm import OpenAILLMService, MockLLMService from services.tts import EdgeTTSService, MockTTSService from services.asr import BufferedASRService from services.siliconflow_tts import SiliconFlowTTSService +from services.siliconflow_asr import SiliconFlowASRService from app.config import settings @@ -90,7 +91,10 @@ class DuplexPipeline: # Initialize services self.llm_service = llm_service self.tts_service = tts_service - self.asr_service = asr_service or BufferedASRService() + self.asr_service = asr_service # Will be initialized in start() + + # Track last sent transcript to avoid duplicates + self._last_sent_transcript = "" # Conversation manager self.conversation = ConversationManager( @@ -148,6 +152,23 @@ class DuplexPipeline: await self.tts_service.connect() # Connect ASR service + if not self.asr_service: + if settings.asr_provider == "siliconflow" and settings.siliconflow_api_key: + self.asr_service = SiliconFlowASRService( + api_key=settings.siliconflow_api_key, + model=settings.siliconflow_asr_model, + sample_rate=settings.sample_rate, + interim_interval_ms=settings.asr_interim_interval_ms, + min_audio_for_interim_ms=settings.asr_min_audio_ms, + on_transcript=self._on_transcript_callback + ) + logger.info("Using SiliconFlow ASR service") + else: + self.asr_service = BufferedASRService( + sample_rate=settings.sample_rate + ) + logger.info("Using Buffered ASR service (no real transcription)") + await self.asr_service.connect() logger.info("DuplexPipeline services connected") @@ -204,8 +225,11 @@ class DuplexPipeline: if vad_status == "Speech" or self.conversation.state == ConversationState.LISTENING: self._audio_buffer += pcm_bytes await self.asr_service.send_audio(pcm_bytes) + + # For SiliconFlow ASR, trigger interim transcription periodically + # The service handles timing internally via start_interim_transcription() - # 4. Check for End of Utterance + # 4. Check for End of Utterance - this triggers LLM response if self.eou_detector.process(vad_status): await self._on_end_of_utterance() @@ -237,12 +261,47 @@ class DuplexPipeline: """Interrupt current bot speech (manual interrupt command).""" await self._handle_barge_in() + async def _on_transcript_callback(self, text: str, is_final: bool) -> None: + """ + Callback for ASR transcription results. + + Streams transcription to client for display. + + Args: + text: Transcribed text + is_final: Whether this is the final transcription + """ + # Avoid sending duplicate transcripts + if text == self._last_sent_transcript and not is_final: + return + + self._last_sent_transcript = text + + # Send transcript event to client + await self.transport.send_event({ + "event": "transcript", + "trackId": self.session_id, + "text": text, + "isFinal": is_final, + "timestamp": self._get_timestamp_ms() + }) + + logger.debug(f"Sent transcript ({'final' if is_final else 'interim'}): {text[:50]}...") + async def _on_speech_start(self) -> None: """Handle user starting to speak.""" if self.conversation.state == ConversationState.IDLE: await self.conversation.start_user_turn() self._audio_buffer = b"" + self._last_sent_transcript = "" self.eou_detector.reset() + + # Clear ASR buffer and start interim transcriptions + if hasattr(self.asr_service, 'clear_buffer'): + self.asr_service.clear_buffer() + if hasattr(self.asr_service, 'start_interim_transcription'): + await self.asr_service.start_interim_transcription() + logger.debug("User speech started") async def _on_end_of_utterance(self) -> None: @@ -250,25 +309,36 @@ class DuplexPipeline: if self.conversation.state != ConversationState.LISTENING: return - # Get transcribed text (if using ASR that provides it) + # Stop interim transcriptions + if hasattr(self.asr_service, 'stop_interim_transcription'): + await self.asr_service.stop_interim_transcription() + + # Get final transcription from ASR service user_text = "" - if hasattr(self.asr_service, 'get_and_clear_text'): + + if hasattr(self.asr_service, 'get_final_transcription'): + # SiliconFlow ASR - get final transcription + user_text = await self.asr_service.get_final_transcription() + elif hasattr(self.asr_service, 'get_and_clear_text'): + # Buffered ASR - get accumulated text user_text = self.asr_service.get_and_clear_text() - # If no ASR text, we could use the audio buffer for external ASR - # For now, just use placeholder if no ASR text - if not user_text: - # In a real implementation, you'd send audio_buffer to ASR here - # For demo purposes, use mock text - user_text = "[User speech detected]" - logger.warning("No ASR text available - using placeholder") + # Skip if no meaningful text + if not user_text or not user_text.strip(): + logger.debug("EOU detected but no transcription - skipping") + # Reset for next utterance + self._audio_buffer = b"" + self._last_sent_transcript = "" + await self.conversation.start_user_turn() + return - logger.info(f"EOU detected - user said: {user_text[:50]}...") + logger.info(f"EOU detected - user said: {user_text[:100]}...") # Clear buffers self._audio_buffer = b"" + self._last_sent_transcript = "" - # Process the turn + # Process the turn - trigger LLM response await self.conversation.end_user_turn(user_text) self._current_turn_task = asyncio.create_task(self._handle_turn(user_text)) diff --git a/examples/mic_client.py b/examples/mic_client.py index cd6b80d..e3941ff 100644 --- a/examples/mic_client.py +++ b/examples/mic_client.py @@ -269,6 +269,15 @@ class MicrophoneClient: print("← User speech detected") elif event_type == "silence": print("← User silence detected") + elif event_type == "transcript": + # Display user speech transcription + text = event.get("text", "") + is_final = event.get("isFinal", False) + if is_final: + print(f"← You said: {text}") + else: + # Interim result - show with indicator + print(f"← [listening] {text}", end="\r") elif event_type == "trackStart": print("← Bot started speaking") # Clear any old audio in buffer diff --git a/examples/simple_client.py b/examples/simple_client.py index b587bba..fec5700 100644 --- a/examples/simple_client.py +++ b/examples/simple_client.py @@ -129,11 +129,21 @@ class SimpleVoiceClient: # JSON event event = json.loads(msg) etype = event.get("event", "?") - print(f"<- {etype}") - if etype == "hangup": + if etype == "transcript": + # User speech transcription + text = event.get("text", "") + is_final = event.get("isFinal", False) + if is_final: + print(f"<- You said: {text}") + else: + print(f"<- [listening] {text}", end="\r") + elif etype == "hangup": + print(f"<- {etype}") self.running = False break + else: + print(f"<- {etype}") except asyncio.TimeoutError: continue diff --git a/services/__init__.py b/services/__init__.py index 54664f4..5a53e70 100644 --- a/services/__init__.py +++ b/services/__init__.py @@ -15,6 +15,8 @@ from services.base import ( from services.llm import OpenAILLMService, MockLLMService from services.tts import EdgeTTSService, MockTTSService from services.asr import BufferedASRService, MockASRService +from services.siliconflow_asr import SiliconFlowASRService +from services.siliconflow_tts import SiliconFlowTTSService from services.realtime import RealtimeService, RealtimeConfig, RealtimePipeline __all__ = [ @@ -35,6 +37,9 @@ __all__ = [ # ASR "BufferedASRService", "MockASRService", + "SiliconFlowASRService", + # TTS (SiliconFlow) + "SiliconFlowTTSService", # Realtime "RealtimeService", "RealtimeConfig", diff --git a/services/siliconflow_asr.py b/services/siliconflow_asr.py new file mode 100644 index 0000000..6d67ad7 --- /dev/null +++ b/services/siliconflow_asr.py @@ -0,0 +1,317 @@ +"""SiliconFlow ASR (Automatic Speech Recognition) Service. + +Uses the SiliconFlow API for speech-to-text transcription. +API: https://docs.siliconflow.cn/cn/api-reference/audio/create-audio-transcriptions +""" + +import asyncio +import io +import wave +from typing import AsyncIterator, Optional, Callable, Awaitable +from loguru import logger + +try: + import aiohttp + AIOHTTP_AVAILABLE = True +except ImportError: + AIOHTTP_AVAILABLE = False + logger.warning("aiohttp not available - SiliconFlowASRService will not work") + +from services.base import BaseASRService, ASRResult, ServiceState + + +class SiliconFlowASRService(BaseASRService): + """ + SiliconFlow ASR service for speech-to-text transcription. + + Features: + - Buffers incoming audio chunks + - Provides interim transcriptions periodically (for streaming to client) + - Final transcription on EOU + + API Details: + - Endpoint: POST https://api.siliconflow.cn/v1/audio/transcriptions + - Models: FunAudioLLM/SenseVoiceSmall (default), TeleAI/TeleSpeechASR + - Input: Audio file (multipart/form-data) + - Output: {"text": "transcribed text"} + """ + + # Supported models + MODELS = { + "sensevoice": "FunAudioLLM/SenseVoiceSmall", + "telespeech": "TeleAI/TeleSpeechASR", + } + + API_URL = "https://api.siliconflow.cn/v1/audio/transcriptions" + + def __init__( + self, + api_key: str, + model: str = "FunAudioLLM/SenseVoiceSmall", + sample_rate: int = 16000, + language: str = "auto", + interim_interval_ms: int = 500, # How often to send interim results + min_audio_for_interim_ms: int = 300, # Min audio before first interim + on_transcript: Optional[Callable[[str, bool], Awaitable[None]]] = None + ): + """ + Initialize SiliconFlow ASR service. + + Args: + api_key: SiliconFlow API key + model: ASR model name or alias + sample_rate: Audio sample rate (16000 recommended) + language: Language code (auto for automatic detection) + interim_interval_ms: How often to generate interim transcriptions + min_audio_for_interim_ms: Minimum audio duration before first interim + on_transcript: Callback for transcription results (text, is_final) + """ + super().__init__(sample_rate=sample_rate, language=language) + + if not AIOHTTP_AVAILABLE: + raise RuntimeError("aiohttp is required for SiliconFlowASRService") + + self.api_key = api_key + self.model = self.MODELS.get(model.lower(), model) + self.interim_interval_ms = interim_interval_ms + self.min_audio_for_interim_ms = min_audio_for_interim_ms + self.on_transcript = on_transcript + + # Session + self._session: Optional[aiohttp.ClientSession] = None + + # Audio buffer + self._audio_buffer: bytes = b"" + self._current_text: str = "" + self._last_interim_time: float = 0 + + # Transcript queue for async iteration + self._transcript_queue: asyncio.Queue[ASRResult] = asyncio.Queue() + + # Background task for interim results + self._interim_task: Optional[asyncio.Task] = None + self._running = False + + logger.info(f"SiliconFlowASRService initialized with model: {self.model}") + + async def connect(self) -> None: + """Connect to the service.""" + self._session = aiohttp.ClientSession( + headers={ + "Authorization": f"Bearer {self.api_key}" + } + ) + self._running = True + self.state = ServiceState.CONNECTED + logger.info("SiliconFlowASRService connected") + + async def disconnect(self) -> None: + """Disconnect and cleanup.""" + self._running = False + + if self._interim_task: + self._interim_task.cancel() + try: + await self._interim_task + except asyncio.CancelledError: + pass + self._interim_task = None + + if self._session: + await self._session.close() + self._session = None + + self._audio_buffer = b"" + self._current_text = "" + self.state = ServiceState.DISCONNECTED + logger.info("SiliconFlowASRService disconnected") + + async def send_audio(self, audio: bytes) -> None: + """ + Buffer incoming audio data. + + Args: + audio: PCM audio data (16-bit, mono) + """ + self._audio_buffer += audio + + async def transcribe_buffer(self, is_final: bool = False) -> Optional[str]: + """ + Transcribe current audio buffer. + + Args: + is_final: Whether this is the final transcription + + Returns: + Transcribed text or None if not enough audio + """ + if not self._session: + logger.warning("ASR session not connected") + return None + + # Check minimum audio duration + audio_duration_ms = len(self._audio_buffer) / (self.sample_rate * 2) * 1000 + + if not is_final and audio_duration_ms < self.min_audio_for_interim_ms: + return None + + if audio_duration_ms < 100: # Less than 100ms - too short + return None + + try: + # Convert PCM to WAV in memory + wav_buffer = io.BytesIO() + with wave.open(wav_buffer, 'wb') as wav_file: + wav_file.setnchannels(1) + wav_file.setsampwidth(2) # 16-bit + wav_file.setframerate(self.sample_rate) + wav_file.writeframes(self._audio_buffer) + + wav_buffer.seek(0) + wav_data = wav_buffer.read() + + # Send to API + form_data = aiohttp.FormData() + form_data.add_field( + 'file', + wav_data, + filename='audio.wav', + content_type='audio/wav' + ) + form_data.add_field('model', self.model) + + async with self._session.post(self.API_URL, data=form_data) as response: + if response.status == 200: + result = await response.json() + text = result.get("text", "").strip() + + if text: + self._current_text = text + + # Notify via callback + if self.on_transcript: + await self.on_transcript(text, is_final) + + # Queue result + await self._transcript_queue.put( + ASRResult(text=text, is_final=is_final) + ) + + logger.debug(f"ASR {'final' if is_final else 'interim'}: {text[:50]}...") + return text + else: + error_text = await response.text() + logger.error(f"ASR API error {response.status}: {error_text}") + return None + + except Exception as e: + logger.error(f"ASR transcription error: {e}") + return None + + async def get_final_transcription(self) -> str: + """ + Get final transcription and clear buffer. + + Call this when EOU is detected. + + Returns: + Final transcribed text + """ + # Transcribe full buffer as final + text = await self.transcribe_buffer(is_final=True) + + # Clear buffer + result = text or self._current_text + self._audio_buffer = b"" + self._current_text = "" + + return result + + def get_and_clear_text(self) -> str: + """ + Get accumulated text and clear buffer. + + Compatible with BufferedASRService interface. + """ + text = self._current_text + self._current_text = "" + self._audio_buffer = b"" + return text + + def get_audio_buffer(self) -> bytes: + """Get current audio buffer.""" + return self._audio_buffer + + def get_audio_duration_ms(self) -> float: + """Get current audio buffer duration in milliseconds.""" + return len(self._audio_buffer) / (self.sample_rate * 2) * 1000 + + def clear_buffer(self) -> None: + """Clear audio and text buffers.""" + self._audio_buffer = b"" + self._current_text = "" + + async def receive_transcripts(self) -> AsyncIterator[ASRResult]: + """ + Async iterator for transcription results. + + Yields: + ASRResult with text and is_final flag + """ + while self._running: + try: + result = await asyncio.wait_for( + self._transcript_queue.get(), + timeout=0.1 + ) + yield result + except asyncio.TimeoutError: + continue + except asyncio.CancelledError: + break + + async def start_interim_transcription(self) -> None: + """ + Start background task for interim transcriptions. + + This periodically transcribes buffered audio for + real-time feedback to the user. + """ + if self._interim_task and not self._interim_task.done(): + return + + self._interim_task = asyncio.create_task(self._interim_loop()) + + async def stop_interim_transcription(self) -> None: + """Stop interim transcription task.""" + if self._interim_task: + self._interim_task.cancel() + try: + await self._interim_task + except asyncio.CancelledError: + pass + self._interim_task = None + + async def _interim_loop(self) -> None: + """Background loop for interim transcriptions.""" + import time + + while self._running: + try: + await asyncio.sleep(self.interim_interval_ms / 1000) + + # Check if we have enough new audio + current_time = time.time() + time_since_last = (current_time - self._last_interim_time) * 1000 + + if time_since_last >= self.interim_interval_ms: + audio_duration = self.get_audio_duration_ms() + + if audio_duration >= self.min_audio_for_interim_ms: + await self.transcribe_buffer(is_final=False) + self._last_interim_time = current_time + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Interim transcription error: {e}")