"""Backend-agnostic streaming adapter from LLM text to TTS audio.""" import asyncio from loguru import logger from providers.common.base import BaseTTSService from providers.common.streaming_text import extract_tts_sentence, has_spoken_content class StreamingTTSAdapter: """ Adapter for streaming LLM text to TTS with sentence-level chunking. This reduces latency by starting TTS as soon as a complete sentence is received from the LLM, rather than waiting for the full response. """ SENTENCE_ENDS = {"。", "!", "?", ".", "!", "?", "\n"} SENTENCE_CLOSERS = frozenset() def __init__(self, tts_service: BaseTTSService, transport, session_id: str): self.tts_service = tts_service self.transport = transport self.session_id = session_id self._buffer = "" self._cancel_event = asyncio.Event() self._is_speaking = False async def process_text_chunk(self, text_chunk: str) -> None: """ Process a text chunk from LLM and trigger TTS when sentence is complete. Args: text_chunk: Text chunk from LLM streaming """ if self._cancel_event.is_set(): return self._buffer += text_chunk # Check for sentence completion while True: split_result = extract_tts_sentence( self._buffer, end_chars=frozenset(self.SENTENCE_ENDS), trailing_chars=frozenset(self.SENTENCE_ENDS), closers=self.SENTENCE_CLOSERS, force=False, ) if not split_result: break sentence, self._buffer = split_result if sentence and has_spoken_content(sentence): await self._speak_sentence(sentence) async def flush(self) -> None: """Flush remaining buffer.""" if self._buffer.strip() and not self._cancel_event.is_set(): await self._speak_sentence(self._buffer.strip()) self._buffer = "" async def _speak_sentence(self, text: str) -> None: """Synthesize and send a sentence.""" if not text or self._cancel_event.is_set(): return self._is_speaking = True try: async for chunk in self.tts_service.synthesize_stream(text): if self._cancel_event.is_set(): break await self.transport.send_audio(chunk.audio) await asyncio.sleep(0.01) # Prevent flooding except Exception as e: logger.error(f"TTS speak error: {e}") finally: self._is_speaking = False def cancel(self) -> None: """Cancel ongoing speech.""" self._cancel_event.set() self._buffer = "" def reset(self) -> None: """Reset for new turn.""" self._cancel_event.clear() self._buffer = "" self._is_speaking = False @property def is_speaking(self) -> bool: return self._is_speaking