diff --git a/engine/core/duplex_pipeline.py b/engine/core/duplex_pipeline.py index b24e11d..16abd68 100644 --- a/engine/core/duplex_pipeline.py +++ b/engine/core/duplex_pipeline.py @@ -638,6 +638,8 @@ class DuplexPipeline: """ if not text.strip() or self._interrupt_event.is_set(): return + + logger.info(f"[TTS] split sentence: {text!r}") try: is_first_chunk = True diff --git a/engine/services/siliconflow_tts.py b/engine/services/siliconflow_tts.py index c2744c9..6d2dd8c 100644 --- a/engine/services/siliconflow_tts.py +++ b/engine/services/siliconflow_tts.py @@ -193,3 +193,119 @@ class SiliconFlowTTSService(BaseTTSService): async def cancel(self) -> None: """Cancel ongoing synthesis.""" self._cancel_event.set() + + +class StreamingTTSAdapter: + """ + Adapter for streaming LLM text to TTS with sentence-level chunking. + + This reduces latency by starting TTS as soon as a complete sentence + is received from the LLM, rather than waiting for the full response. + """ + + # Sentence delimiters + SENTENCE_ENDS = {',', '。', '!', '?', '.', '!', '?', '\n'} + + def __init__(self, tts_service: BaseTTSService, transport, session_id: str): + self.tts_service = tts_service + self.transport = transport + self.session_id = session_id + self._buffer = "" + self._cancel_event = asyncio.Event() + self._is_speaking = False + + def _is_non_sentence_period(self, text: str, idx: int) -> bool: + """Check whether '.' should NOT be treated as a sentence delimiter.""" + if text[idx] != ".": + return False + + # Decimal/version segment: 1.2, v1.2.3 + if idx > 0 and idx < len(text) - 1 and text[idx - 1].isdigit() and text[idx + 1].isdigit(): + return True + + # Number abbreviations: No.1 / No. 1 + left_start = idx - 1 + while left_start >= 0 and text[left_start].isalpha(): + left_start -= 1 + left_token = text[left_start + 1:idx].lower() + if left_token == "no": + j = idx + 1 + while j < len(text) and text[j].isspace(): + j += 1 + if j < len(text) and text[j].isdigit(): + return True + + return False + + async def process_text_chunk(self, text_chunk: str) -> None: + """ + Process a text chunk from LLM and trigger TTS when sentence is complete. + + Args: + text_chunk: Text chunk from LLM streaming + """ + if self._cancel_event.is_set(): + return + + self._buffer += text_chunk + + # Check for sentence completion + while True: + split_idx = -1 + for i, char in enumerate(self._buffer): + if char == "." and self._is_non_sentence_period(self._buffer, i): + continue + if char in self.SENTENCE_ENDS: + split_idx = i + break + if split_idx < 0: + break + + end_idx = split_idx + 1 + while end_idx < len(self._buffer) and self._buffer[end_idx] in self.SENTENCE_ENDS: + end_idx += 1 + + sentence = self._buffer[:end_idx].strip() + self._buffer = self._buffer[end_idx:] + + if sentence and any(ch.isalnum() for ch in sentence): + await self._speak_sentence(sentence) + + async def flush(self) -> None: + """Flush remaining buffer.""" + if self._buffer.strip() and not self._cancel_event.is_set(): + await self._speak_sentence(self._buffer.strip()) + self._buffer = "" + + async def _speak_sentence(self, text: str) -> None: + """Synthesize and send a sentence.""" + if not text or self._cancel_event.is_set(): + return + + self._is_speaking = True + + try: + async for chunk in self.tts_service.synthesize_stream(text): + if self._cancel_event.is_set(): + break + await self.transport.send_audio(chunk.audio) + await asyncio.sleep(0.01) # Prevent flooding + except Exception as e: + logger.error(f"TTS speak error: {e}") + finally: + self._is_speaking = False + + def cancel(self) -> None: + """Cancel ongoing speech.""" + self._cancel_event.set() + self._buffer = "" + + def reset(self) -> None: + """Reset for new turn.""" + self._cancel_event.clear() + self._buffer = "" + self._is_speaking = False + + @property + def is_speaking(self) -> bool: + return self._is_speaking