diff --git a/engine/core/duplex_pipeline.py b/engine/core/duplex_pipeline.py index 64a7012..b24e11d 100644 --- a/engine/core/duplex_pipeline.py +++ b/engine/core/duplex_pipeline.py @@ -30,6 +30,7 @@ from services.base import BaseASRService, BaseLLMService, BaseTTSService from services.llm import MockLLMService, OpenAILLMService from services.siliconflow_asr import SiliconFlowASRService from services.siliconflow_tts import SiliconFlowTTSService +from services.streaming_text import extract_tts_sentence, has_spoken_content from services.tts import EdgeTTSService, MockTTSService @@ -529,7 +530,15 @@ class DuplexPipeline: # Check for sentence completion - synthesize immediately for low latency while True: - split_result = self._extract_tts_sentence(sentence_buffer, force=False) + split_result = extract_tts_sentence( + sentence_buffer, + end_chars=self._SENTENCE_END_CHARS, + trailing_chars=self._SENTENCE_TRAILING_CHARS, + closers=self._SENTENCE_CLOSERS, + min_split_spoken_chars=self._MIN_SPLIT_SPOKEN_CHARS, + hold_trailing_at_buffer_end=True, + force=False, + ) if not split_result: break sentence, sentence_buffer = split_result @@ -542,7 +551,7 @@ class DuplexPipeline: continue # Avoid synthesizing punctuation-only fragments (e.g. standalone "!") - if not self._has_spoken_content(sentence): + if not has_spoken_content(sentence): pending_punctuation = sentence continue @@ -576,7 +585,7 @@ class DuplexPipeline: # Speak any remaining text remaining_text = f"{pending_punctuation}{sentence_buffer}".strip() - if remaining_text and self._has_spoken_content(remaining_text) and not self._interrupt_event.is_set(): + if remaining_text and has_spoken_content(remaining_text) and not self._interrupt_event.is_set(): if not first_audio_sent: await self.transport.send_event({ **ev( @@ -618,84 +627,6 @@ class DuplexPipeline: self._barge_in_speech_frames = 0 self._barge_in_silence_frames = 0 - def _extract_tts_sentence(self, text_buffer: str, force: bool = False) -> Optional[tuple[str, str]]: - """ - Extract one TTS sentence from the buffer. - - Consecutive sentence terminators are grouped together to avoid creating - punctuation-only fragments such as a standalone "!" after "?". By - default, trailing terminator at buffer end is held for more context. - """ - if not text_buffer: - return None - - search_start = 0 - while True: - split_idx = -1 - for idx in range(search_start, len(text_buffer)): - char = text_buffer[idx] - if char == "." and self._is_non_sentence_period(text_buffer, idx): - continue - if char in self._SENTENCE_END_CHARS: - split_idx = idx - break - - if split_idx == -1: - return None - - end_idx = split_idx + 1 - while end_idx < len(text_buffer) and text_buffer[end_idx] in self._SENTENCE_TRAILING_CHARS: - end_idx += 1 - - # Include trailing quote/bracket closers in the same segment. - while end_idx < len(text_buffer) and text_buffer[end_idx] in self._SENTENCE_CLOSERS: - end_idx += 1 - - if not force and end_idx >= len(text_buffer): - return None - - sentence = text_buffer[:end_idx].strip() - spoken_chars = sum(1 for ch in sentence if ch.isalnum()) - - # Keep short utterances (e.g. "好。", "OK.") merged with following text. - if ( - not force - and 0 < spoken_chars < self._MIN_SPLIT_SPOKEN_CHARS - and end_idx < len(text_buffer) - ): - search_start = end_idx - continue - - remainder = text_buffer[end_idx:] - return sentence, remainder - - def _has_spoken_content(self, text: str) -> bool: - """Check whether text contains pronounceable content (not punctuation-only).""" - return any(char.isalnum() for char in text) - - def _is_non_sentence_period(self, text: str, idx: int) -> bool: - """Check whether '.' should NOT be treated as a sentence delimiter.""" - if text[idx] != ".": - return False - - # Decimal/version segment: 1.2, v1.2.3 - if idx > 0 and idx < len(text) - 1 and text[idx - 1].isdigit() and text[idx + 1].isdigit(): - return True - - # Number abbreviations: No.1 / No. 1 - left_start = idx - 1 - while left_start >= 0 and text[left_start].isalpha(): - left_start -= 1 - left_token = text[left_start + 1:idx].lower() - if left_token == "no": - j = idx + 1 - while j < len(text) and text[j].isspace(): - j += 1 - if j < len(text) and text[j].isdigit(): - return True - - return False - async def _speak_sentence(self, text: str, fade_in_ms: int = 0, fade_out_ms: int = 8) -> None: """ Synthesize and send a single sentence. diff --git a/engine/services/__init__.py b/engine/services/__init__.py index 5a53e70..50301d4 100644 --- a/engine/services/__init__.py +++ b/engine/services/__init__.py @@ -17,6 +17,7 @@ from services.tts import EdgeTTSService, MockTTSService from services.asr import BufferedASRService, MockASRService from services.siliconflow_asr import SiliconFlowASRService from services.siliconflow_tts import SiliconFlowTTSService +from services.streaming_tts_adapter import StreamingTTSAdapter from services.realtime import RealtimeService, RealtimeConfig, RealtimePipeline __all__ = [ @@ -40,6 +41,7 @@ __all__ = [ "SiliconFlowASRService", # TTS (SiliconFlow) "SiliconFlowTTSService", + "StreamingTTSAdapter", # Realtime "RealtimeService", "RealtimeConfig", diff --git a/engine/services/siliconflow_tts.py b/engine/services/siliconflow_tts.py index 3974015..c2744c9 100644 --- a/engine/services/siliconflow_tts.py +++ b/engine/services/siliconflow_tts.py @@ -13,6 +13,7 @@ from typing import AsyncIterator, Optional from loguru import logger from services.base import BaseTTSService, TTSChunk, ServiceState +from services.streaming_tts_adapter import StreamingTTSAdapter # backward-compatible re-export class SiliconFlowTTSService(BaseTTSService): @@ -192,119 +193,3 @@ class SiliconFlowTTSService(BaseTTSService): async def cancel(self) -> None: """Cancel ongoing synthesis.""" self._cancel_event.set() - - -class StreamingTTSAdapter: - """ - Adapter for streaming LLM text to TTS with sentence-level chunking. - - This reduces latency by starting TTS as soon as a complete sentence - is received from the LLM, rather than waiting for the full response. - """ - - # Sentence delimiters - SENTENCE_ENDS = {'。', '!', '?', '.', '!', '?', '\n'} - - def __init__(self, tts_service: BaseTTSService, transport, session_id: str): - self.tts_service = tts_service - self.transport = transport - self.session_id = session_id - self._buffer = "" - self._cancel_event = asyncio.Event() - self._is_speaking = False - - def _is_non_sentence_period(self, text: str, idx: int) -> bool: - """Check whether '.' should NOT be treated as a sentence delimiter.""" - if text[idx] != ".": - return False - - # Decimal/version segment: 1.2, v1.2.3 - if idx > 0 and idx < len(text) - 1 and text[idx - 1].isdigit() and text[idx + 1].isdigit(): - return True - - # Number abbreviations: No.1 / No. 1 - left_start = idx - 1 - while left_start >= 0 and text[left_start].isalpha(): - left_start -= 1 - left_token = text[left_start + 1:idx].lower() - if left_token == "no": - j = idx + 1 - while j < len(text) and text[j].isspace(): - j += 1 - if j < len(text) and text[j].isdigit(): - return True - - return False - - async def process_text_chunk(self, text_chunk: str) -> None: - """ - Process a text chunk from LLM and trigger TTS when sentence is complete. - - Args: - text_chunk: Text chunk from LLM streaming - """ - if self._cancel_event.is_set(): - return - - self._buffer += text_chunk - - # Check for sentence completion - while True: - split_idx = -1 - for i, char in enumerate(self._buffer): - if char == "." and self._is_non_sentence_period(self._buffer, i): - continue - if char in self.SENTENCE_ENDS: - split_idx = i - break - if split_idx < 0: - break - - end_idx = split_idx + 1 - while end_idx < len(self._buffer) and self._buffer[end_idx] in self.SENTENCE_ENDS: - end_idx += 1 - - sentence = self._buffer[:end_idx].strip() - self._buffer = self._buffer[end_idx:] - - if sentence and any(ch.isalnum() for ch in sentence): - await self._speak_sentence(sentence) - - async def flush(self) -> None: - """Flush remaining buffer.""" - if self._buffer.strip() and not self._cancel_event.is_set(): - await self._speak_sentence(self._buffer.strip()) - self._buffer = "" - - async def _speak_sentence(self, text: str) -> None: - """Synthesize and send a sentence.""" - if not text or self._cancel_event.is_set(): - return - - self._is_speaking = True - - try: - async for chunk in self.tts_service.synthesize_stream(text): - if self._cancel_event.is_set(): - break - await self.transport.send_audio(chunk.audio) - await asyncio.sleep(0.01) # Prevent flooding - except Exception as e: - logger.error(f"TTS speak error: {e}") - finally: - self._is_speaking = False - - def cancel(self) -> None: - """Cancel ongoing speech.""" - self._cancel_event.set() - self._buffer = "" - - def reset(self) -> None: - """Reset for new turn.""" - self._cancel_event.clear() - self._buffer = "" - self._is_speaking = False - - @property - def is_speaking(self) -> bool: - return self._is_speaking diff --git a/engine/services/streaming_text.py b/engine/services/streaming_text.py new file mode 100644 index 0000000..d5c123f --- /dev/null +++ b/engine/services/streaming_text.py @@ -0,0 +1,86 @@ +"""Shared text chunking helpers for streaming TTS.""" + +from typing import Optional + + +def is_non_sentence_period(text: str, idx: int) -> bool: + """Check whether '.' should NOT be treated as a sentence delimiter.""" + if idx < 0 or idx >= len(text) or text[idx] != ".": + return False + + # Decimal/version segment: 1.2, v1.2.3 + if idx > 0 and idx < len(text) - 1 and text[idx - 1].isdigit() and text[idx + 1].isdigit(): + return True + + # Number abbreviations: No.1 / No. 1 + left_start = idx - 1 + while left_start >= 0 and text[left_start].isalpha(): + left_start -= 1 + left_token = text[left_start + 1:idx].lower() + if left_token == "no": + j = idx + 1 + while j < len(text) and text[j].isspace(): + j += 1 + if j < len(text) and text[j].isdigit(): + return True + + return False + + +def has_spoken_content(text: str) -> bool: + """Check whether text contains pronounceable content (not punctuation-only).""" + return any(char.isalnum() for char in text) + + +def extract_tts_sentence( + text_buffer: str, + *, + end_chars: frozenset[str], + trailing_chars: frozenset[str], + closers: frozenset[str], + min_split_spoken_chars: int = 0, + hold_trailing_at_buffer_end: bool = False, + force: bool = False, +) -> Optional[tuple[str, str]]: + """Extract one TTS sentence from text buffer.""" + if not text_buffer: + return None + + search_start = 0 + while True: + split_idx = -1 + for idx in range(search_start, len(text_buffer)): + char = text_buffer[idx] + if char == "." and is_non_sentence_period(text_buffer, idx): + continue + if char in end_chars: + split_idx = idx + break + + if split_idx == -1: + return None + + end_idx = split_idx + 1 + while end_idx < len(text_buffer) and text_buffer[end_idx] in trailing_chars: + end_idx += 1 + + while end_idx < len(text_buffer) and text_buffer[end_idx] in closers: + end_idx += 1 + + if hold_trailing_at_buffer_end and not force and end_idx >= len(text_buffer): + return None + + sentence = text_buffer[:end_idx].strip() + spoken_chars = sum(1 for ch in sentence if ch.isalnum()) + + if ( + not force + and min_split_spoken_chars > 0 + and 0 < spoken_chars < min_split_spoken_chars + and end_idx < len(text_buffer) + ): + search_start = end_idx + continue + + remainder = text_buffer[end_idx:] + return sentence, remainder diff --git a/engine/services/streaming_tts_adapter.py b/engine/services/streaming_tts_adapter.py new file mode 100644 index 0000000..d4cb745 --- /dev/null +++ b/engine/services/streaming_tts_adapter.py @@ -0,0 +1,95 @@ +"""Backend-agnostic streaming adapter from LLM text to TTS audio.""" + +import asyncio + +from loguru import logger + +from services.base import BaseTTSService +from services.streaming_text import extract_tts_sentence, has_spoken_content + + +class StreamingTTSAdapter: + """ + Adapter for streaming LLM text to TTS with sentence-level chunking. + + This reduces latency by starting TTS as soon as a complete sentence + is received from the LLM, rather than waiting for the full response. + """ + + SENTENCE_ENDS = {"。", "!", "?", ".", "!", "?", "\n"} + SENTENCE_CLOSERS = frozenset() + + def __init__(self, tts_service: BaseTTSService, transport, session_id: str): + self.tts_service = tts_service + self.transport = transport + self.session_id = session_id + self._buffer = "" + self._cancel_event = asyncio.Event() + self._is_speaking = False + + async def process_text_chunk(self, text_chunk: str) -> None: + """ + Process a text chunk from LLM and trigger TTS when sentence is complete. + + Args: + text_chunk: Text chunk from LLM streaming + """ + if self._cancel_event.is_set(): + return + + self._buffer += text_chunk + + # Check for sentence completion + while True: + split_result = extract_tts_sentence( + self._buffer, + end_chars=frozenset(self.SENTENCE_ENDS), + trailing_chars=frozenset(self.SENTENCE_ENDS), + closers=self.SENTENCE_CLOSERS, + force=False, + ) + if not split_result: + break + + sentence, self._buffer = split_result + if sentence and has_spoken_content(sentence): + await self._speak_sentence(sentence) + + async def flush(self) -> None: + """Flush remaining buffer.""" + if self._buffer.strip() and not self._cancel_event.is_set(): + await self._speak_sentence(self._buffer.strip()) + self._buffer = "" + + async def _speak_sentence(self, text: str) -> None: + """Synthesize and send a sentence.""" + if not text or self._cancel_event.is_set(): + return + + self._is_speaking = True + + try: + async for chunk in self.tts_service.synthesize_stream(text): + if self._cancel_event.is_set(): + break + await self.transport.send_audio(chunk.audio) + await asyncio.sleep(0.01) # Prevent flooding + except Exception as e: + logger.error(f"TTS speak error: {e}") + finally: + self._is_speaking = False + + def cancel(self) -> None: + """Cancel ongoing speech.""" + self._cancel_event.set() + self._buffer = "" + + def reset(self) -> None: + """Reset for new turn.""" + self._cancel_event.clear() + self._buffer = "" + self._is_speaking = False + + @property + def is_speaking(self) -> bool: + return self._is_speaking