"""SiliconFlow ASR (Automatic Speech Recognition) Service. Uses the SiliconFlow API for speech-to-text transcription. API: https://docs.siliconflow.cn/cn/api-reference/audio/create-audio-transcriptions """ import asyncio import io import wave from typing import AsyncIterator, Optional, Callable, Awaitable from loguru import logger try: import aiohttp AIOHTTP_AVAILABLE = True except ImportError: AIOHTTP_AVAILABLE = False logger.warning("aiohttp not available - SiliconFlowASRService will not work") from services.base import BaseASRService, ASRResult, ServiceState class SiliconFlowASRService(BaseASRService): """ SiliconFlow ASR service for speech-to-text transcription. Features: - Buffers incoming audio chunks - Provides interim transcriptions periodically (for streaming to client) - Final transcription on EOU API Details: - Endpoint: POST https://api.siliconflow.cn/v1/audio/transcriptions - Models: FunAudioLLM/SenseVoiceSmall (default), TeleAI/TeleSpeechASR - Input: Audio file (multipart/form-data) - Output: {"text": "transcribed text"} """ # Supported models MODELS = { "sensevoice": "FunAudioLLM/SenseVoiceSmall", "telespeech": "TeleAI/TeleSpeechASR", } API_URL = "https://api.siliconflow.cn/v1/audio/transcriptions" def __init__( self, api_key: str, model: str = "FunAudioLLM/SenseVoiceSmall", sample_rate: int = 16000, language: str = "auto", interim_interval_ms: int = 500, # How often to send interim results min_audio_for_interim_ms: int = 300, # Min audio before first interim on_transcript: Optional[Callable[[str, bool], Awaitable[None]]] = None ): """ Initialize SiliconFlow ASR service. Args: api_key: SiliconFlow API key model: ASR model name or alias sample_rate: Audio sample rate (16000 recommended) language: Language code (auto for automatic detection) interim_interval_ms: How often to generate interim transcriptions min_audio_for_interim_ms: Minimum audio duration before first interim on_transcript: Callback for transcription results (text, is_final) """ super().__init__(sample_rate=sample_rate, language=language) if not AIOHTTP_AVAILABLE: raise RuntimeError("aiohttp is required for SiliconFlowASRService") self.api_key = api_key self.model = self.MODELS.get(model.lower(), model) self.interim_interval_ms = interim_interval_ms self.min_audio_for_interim_ms = min_audio_for_interim_ms self.on_transcript = on_transcript # Session self._session: Optional[aiohttp.ClientSession] = None # Audio buffer self._audio_buffer: bytes = b"" self._current_text: str = "" self._last_interim_time: float = 0 # Transcript queue for async iteration self._transcript_queue: asyncio.Queue[ASRResult] = asyncio.Queue() # Background task for interim results self._interim_task: Optional[asyncio.Task] = None self._running = False logger.info(f"SiliconFlowASRService initialized with model: {self.model}") async def connect(self) -> None: """Connect to the service.""" self._session = aiohttp.ClientSession( headers={ "Authorization": f"Bearer {self.api_key}" } ) self._running = True self.state = ServiceState.CONNECTED logger.info("SiliconFlowASRService connected") async def disconnect(self) -> None: """Disconnect and cleanup.""" self._running = False if self._interim_task: self._interim_task.cancel() try: await self._interim_task except asyncio.CancelledError: pass self._interim_task = None if self._session: await self._session.close() self._session = None self._audio_buffer = b"" self._current_text = "" self.state = ServiceState.DISCONNECTED logger.info("SiliconFlowASRService disconnected") async def send_audio(self, audio: bytes) -> None: """ Buffer incoming audio data. Args: audio: PCM audio data (16-bit, mono) """ self._audio_buffer += audio async def transcribe_buffer(self, is_final: bool = False) -> Optional[str]: """ Transcribe current audio buffer. Args: is_final: Whether this is the final transcription Returns: Transcribed text or None if not enough audio """ if not self._session: logger.warning("ASR session not connected") return None # Check minimum audio duration audio_duration_ms = len(self._audio_buffer) / (self.sample_rate * 2) * 1000 if not is_final and audio_duration_ms < self.min_audio_for_interim_ms: return None if audio_duration_ms < 100: # Less than 100ms - too short return None try: # Convert PCM to WAV in memory wav_buffer = io.BytesIO() with wave.open(wav_buffer, 'wb') as wav_file: wav_file.setnchannels(1) wav_file.setsampwidth(2) # 16-bit wav_file.setframerate(self.sample_rate) wav_file.writeframes(self._audio_buffer) wav_buffer.seek(0) wav_data = wav_buffer.read() # Send to API form_data = aiohttp.FormData() form_data.add_field( 'file', wav_data, filename='audio.wav', content_type='audio/wav' ) form_data.add_field('model', self.model) async with self._session.post(self.API_URL, data=form_data) as response: if response.status == 200: result = await response.json() text = result.get("text", "").strip() if text: self._current_text = text # Notify via callback if self.on_transcript: await self.on_transcript(text, is_final) # Queue result await self._transcript_queue.put( ASRResult(text=text, is_final=is_final) ) logger.debug(f"ASR {'final' if is_final else 'interim'}: {text[:50]}...") return text else: error_text = await response.text() logger.error(f"ASR API error {response.status}: {error_text}") return None except Exception as e: logger.error(f"ASR transcription error: {e}") return None async def get_final_transcription(self) -> str: """ Get final transcription and clear buffer. Call this when EOU is detected. Returns: Final transcribed text """ # Transcribe full buffer as final text = await self.transcribe_buffer(is_final=True) # Clear buffer result = text or self._current_text self._audio_buffer = b"" self._current_text = "" return result def get_and_clear_text(self) -> str: """ Get accumulated text and clear buffer. Compatible with BufferedASRService interface. """ text = self._current_text self._current_text = "" self._audio_buffer = b"" return text def get_audio_buffer(self) -> bytes: """Get current audio buffer.""" return self._audio_buffer def get_audio_duration_ms(self) -> float: """Get current audio buffer duration in milliseconds.""" return len(self._audio_buffer) / (self.sample_rate * 2) * 1000 def clear_buffer(self) -> None: """Clear audio and text buffers.""" self._audio_buffer = b"" self._current_text = "" async def receive_transcripts(self) -> AsyncIterator[ASRResult]: """ Async iterator for transcription results. Yields: ASRResult with text and is_final flag """ while self._running: try: result = await asyncio.wait_for( self._transcript_queue.get(), timeout=0.1 ) yield result except asyncio.TimeoutError: continue except asyncio.CancelledError: break async def start_interim_transcription(self) -> None: """ Start background task for interim transcriptions. This periodically transcribes buffered audio for real-time feedback to the user. """ if self._interim_task and not self._interim_task.done(): return self._interim_task = asyncio.create_task(self._interim_loop()) async def stop_interim_transcription(self) -> None: """Stop interim transcription task.""" if self._interim_task: self._interim_task.cancel() try: await self._interim_task except asyncio.CancelledError: pass self._interim_task = None async def _interim_loop(self) -> None: """Background loop for interim transcriptions.""" import time while self._running: try: await asyncio.sleep(self.interim_interval_ms / 1000) # Check if we have enough new audio current_time = time.time() time_since_last = (current_time - self._last_interim_time) * 1000 if time_since_last >= self.interim_interval_ms: audio_duration = self.get_audio_duration_ms() if audio_duration >= self.min_audio_for_interim_ms: await self.transcribe_buffer(is_final=False) self._last_interim_time = current_time except asyncio.CancelledError: break except Exception as e: logger.error(f"Interim transcription error: {e}")