"""SiliconFlow TTS Service with streaming support. Uses SiliconFlow's CosyVoice2 or MOSS-TTSD models for low-latency text-to-speech synthesis with streaming. API Docs: https://docs.siliconflow.cn/cn/api-reference/audio/create-speech """ import os import asyncio import aiohttp from typing import AsyncIterator, Optional from loguru import logger from services.base import BaseTTSService, TTSChunk, ServiceState from services.streaming_tts_adapter import StreamingTTSAdapter # backward-compatible re-export class SiliconFlowTTSService(BaseTTSService): """ SiliconFlow TTS service with streaming support. Supports CosyVoice2-0.5B and MOSS-TTSD-v0.5 models. """ # Available voices VOICES = { "alex": "FunAudioLLM/CosyVoice2-0.5B:alex", "anna": "FunAudioLLM/CosyVoice2-0.5B:anna", "bella": "FunAudioLLM/CosyVoice2-0.5B:bella", "benjamin": "FunAudioLLM/CosyVoice2-0.5B:benjamin", "charles": "FunAudioLLM/CosyVoice2-0.5B:charles", "claire": "FunAudioLLM/CosyVoice2-0.5B:claire", "david": "FunAudioLLM/CosyVoice2-0.5B:david", "diana": "FunAudioLLM/CosyVoice2-0.5B:diana", } def __init__( self, api_key: Optional[str] = None, voice: str = "anna", model: str = "FunAudioLLM/CosyVoice2-0.5B", sample_rate: int = 16000, speed: float = 1.0 ): """ Initialize SiliconFlow TTS service. Args: api_key: SiliconFlow API key (defaults to SILICONFLOW_API_KEY env var) voice: Voice name (alex, anna, bella, benjamin, charles, claire, david, diana) model: Model name sample_rate: Output sample rate (8000, 16000, 24000, 32000, 44100) speed: Speech speed (0.25 to 4.0) """ # Resolve voice name if voice in self.VOICES: full_voice = self.VOICES[voice] else: full_voice = voice super().__init__(voice=full_voice, sample_rate=sample_rate, speed=speed) self.api_key = api_key or os.getenv("SILICONFLOW_API_KEY") self.model = model self.api_url = "https://api.siliconflow.cn/v1/audio/speech" self._session: Optional[aiohttp.ClientSession] = None self._cancel_event = asyncio.Event() async def connect(self) -> None: """Initialize HTTP session.""" if not self.api_key: raise ValueError("SiliconFlow API key not provided. Set SILICONFLOW_API_KEY env var.") self._session = aiohttp.ClientSession( headers={ "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } ) self.state = ServiceState.CONNECTED logger.info(f"SiliconFlow TTS service ready: voice={self.voice}, model={self.model}") async def disconnect(self) -> None: """Close HTTP session.""" if self._session: await self._session.close() self._session = None self.state = ServiceState.DISCONNECTED logger.info("SiliconFlow TTS service disconnected") async def synthesize(self, text: str) -> bytes: """Synthesize complete audio for text.""" audio_data = b"" async for chunk in self.synthesize_stream(text): audio_data += chunk.audio return audio_data async def synthesize_stream(self, text: str) -> AsyncIterator[TTSChunk]: """ Synthesize audio in streaming mode. Args: text: Text to synthesize Yields: TTSChunk objects with PCM audio """ if not self._session: raise RuntimeError("TTS service not connected") if not text.strip(): return self._cancel_event.clear() payload = { "model": self.model, "input": text, "voice": self.voice, "response_format": "pcm", "sample_rate": self.sample_rate, "stream": True, "speed": self.speed } try: async with self._session.post(self.api_url, json=payload) as response: if response.status != 200: error_text = await response.text() logger.error(f"SiliconFlow TTS error: {response.status} - {error_text}") return # Stream audio chunks chunk_size = self.sample_rate * 2 // 10 # 100ms chunks buffer = b"" pending_chunk = None async for chunk in response.content.iter_any(): if self._cancel_event.is_set(): logger.info("TTS synthesis cancelled") return buffer += chunk # Yield complete chunks while len(buffer) >= chunk_size: audio_chunk = buffer[:chunk_size] buffer = buffer[chunk_size:] # Keep one full chunk buffered so we can always tag the true # last full chunk as final when stream length is an exact multiple. if pending_chunk is not None: yield TTSChunk( audio=pending_chunk, sample_rate=self.sample_rate, is_final=False ) pending_chunk = audio_chunk # Flush pending chunk(s) and remaining tail. if pending_chunk is not None: if buffer: yield TTSChunk( audio=pending_chunk, sample_rate=self.sample_rate, is_final=False ) pending_chunk = None else: yield TTSChunk( audio=pending_chunk, sample_rate=self.sample_rate, is_final=True ) pending_chunk = None if buffer: yield TTSChunk( audio=buffer, sample_rate=self.sample_rate, is_final=True ) except asyncio.CancelledError: logger.info("TTS synthesis cancelled via asyncio") raise except Exception as e: logger.error(f"TTS synthesis error: {e}") raise async def cancel(self) -> None: """Cancel ongoing synthesis.""" self._cancel_event.set()