Refactor duplicate stream tts adapter

This commit is contained in:
Xin Wang
2026-02-09 16:28:59 +08:00
parent 11016c04da
commit 29d8361ca9
5 changed files with 196 additions and 197 deletions

View File

@@ -30,6 +30,7 @@ from services.base import BaseASRService, BaseLLMService, BaseTTSService
from services.llm import MockLLMService, OpenAILLMService
from services.siliconflow_asr import SiliconFlowASRService
from services.siliconflow_tts import SiliconFlowTTSService
from services.streaming_text import extract_tts_sentence, has_spoken_content
from services.tts import EdgeTTSService, MockTTSService
@@ -529,7 +530,15 @@ class DuplexPipeline:
# Check for sentence completion - synthesize immediately for low latency
while True:
split_result = self._extract_tts_sentence(sentence_buffer, force=False)
split_result = extract_tts_sentence(
sentence_buffer,
end_chars=self._SENTENCE_END_CHARS,
trailing_chars=self._SENTENCE_TRAILING_CHARS,
closers=self._SENTENCE_CLOSERS,
min_split_spoken_chars=self._MIN_SPLIT_SPOKEN_CHARS,
hold_trailing_at_buffer_end=True,
force=False,
)
if not split_result:
break
sentence, sentence_buffer = split_result
@@ -542,7 +551,7 @@ class DuplexPipeline:
continue
# Avoid synthesizing punctuation-only fragments (e.g. standalone "!")
if not self._has_spoken_content(sentence):
if not has_spoken_content(sentence):
pending_punctuation = sentence
continue
@@ -576,7 +585,7 @@ class DuplexPipeline:
# Speak any remaining text
remaining_text = f"{pending_punctuation}{sentence_buffer}".strip()
if remaining_text and self._has_spoken_content(remaining_text) and not self._interrupt_event.is_set():
if remaining_text and has_spoken_content(remaining_text) and not self._interrupt_event.is_set():
if not first_audio_sent:
await self.transport.send_event({
**ev(
@@ -618,84 +627,6 @@ class DuplexPipeline:
self._barge_in_speech_frames = 0
self._barge_in_silence_frames = 0
def _extract_tts_sentence(self, text_buffer: str, force: bool = False) -> Optional[tuple[str, str]]:
"""
Extract one TTS sentence from the buffer.
Consecutive sentence terminators are grouped together to avoid creating
punctuation-only fragments such as a standalone "!" after "?". By
default, trailing terminator at buffer end is held for more context.
"""
if not text_buffer:
return None
search_start = 0
while True:
split_idx = -1
for idx in range(search_start, len(text_buffer)):
char = text_buffer[idx]
if char == "." and self._is_non_sentence_period(text_buffer, idx):
continue
if char in self._SENTENCE_END_CHARS:
split_idx = idx
break
if split_idx == -1:
return None
end_idx = split_idx + 1
while end_idx < len(text_buffer) and text_buffer[end_idx] in self._SENTENCE_TRAILING_CHARS:
end_idx += 1
# Include trailing quote/bracket closers in the same segment.
while end_idx < len(text_buffer) and text_buffer[end_idx] in self._SENTENCE_CLOSERS:
end_idx += 1
if not force and end_idx >= len(text_buffer):
return None
sentence = text_buffer[:end_idx].strip()
spoken_chars = sum(1 for ch in sentence if ch.isalnum())
# Keep short utterances (e.g. "好。", "OK.") merged with following text.
if (
not force
and 0 < spoken_chars < self._MIN_SPLIT_SPOKEN_CHARS
and end_idx < len(text_buffer)
):
search_start = end_idx
continue
remainder = text_buffer[end_idx:]
return sentence, remainder
def _has_spoken_content(self, text: str) -> bool:
"""Check whether text contains pronounceable content (not punctuation-only)."""
return any(char.isalnum() for char in text)
def _is_non_sentence_period(self, text: str, idx: int) -> bool:
"""Check whether '.' should NOT be treated as a sentence delimiter."""
if text[idx] != ".":
return False
# Decimal/version segment: 1.2, v1.2.3
if idx > 0 and idx < len(text) - 1 and text[idx - 1].isdigit() and text[idx + 1].isdigit():
return True
# Number abbreviations: No.1 / No. 1
left_start = idx - 1
while left_start >= 0 and text[left_start].isalpha():
left_start -= 1
left_token = text[left_start + 1:idx].lower()
if left_token == "no":
j = idx + 1
while j < len(text) and text[j].isspace():
j += 1
if j < len(text) and text[j].isdigit():
return True
return False
async def _speak_sentence(self, text: str, fade_in_ms: int = 0, fade_out_ms: int = 8) -> None:
"""
Synthesize and send a single sentence.

View File

@@ -17,6 +17,7 @@ from services.tts import EdgeTTSService, MockTTSService
from services.asr import BufferedASRService, MockASRService
from services.siliconflow_asr import SiliconFlowASRService
from services.siliconflow_tts import SiliconFlowTTSService
from services.streaming_tts_adapter import StreamingTTSAdapter
from services.realtime import RealtimeService, RealtimeConfig, RealtimePipeline
__all__ = [
@@ -40,6 +41,7 @@ __all__ = [
"SiliconFlowASRService",
# TTS (SiliconFlow)
"SiliconFlowTTSService",
"StreamingTTSAdapter",
# Realtime
"RealtimeService",
"RealtimeConfig",

View File

@@ -13,6 +13,7 @@ from typing import AsyncIterator, Optional
from loguru import logger
from services.base import BaseTTSService, TTSChunk, ServiceState
from services.streaming_tts_adapter import StreamingTTSAdapter # backward-compatible re-export
class SiliconFlowTTSService(BaseTTSService):
@@ -192,119 +193,3 @@ class SiliconFlowTTSService(BaseTTSService):
async def cancel(self) -> None:
"""Cancel ongoing synthesis."""
self._cancel_event.set()
class StreamingTTSAdapter:
"""
Adapter for streaming LLM text to TTS with sentence-level chunking.
This reduces latency by starting TTS as soon as a complete sentence
is received from the LLM, rather than waiting for the full response.
"""
# Sentence delimiters
SENTENCE_ENDS = {'', '', '', '.', '!', '?', '\n'}
def __init__(self, tts_service: BaseTTSService, transport, session_id: str):
self.tts_service = tts_service
self.transport = transport
self.session_id = session_id
self._buffer = ""
self._cancel_event = asyncio.Event()
self._is_speaking = False
def _is_non_sentence_period(self, text: str, idx: int) -> bool:
"""Check whether '.' should NOT be treated as a sentence delimiter."""
if text[idx] != ".":
return False
# Decimal/version segment: 1.2, v1.2.3
if idx > 0 and idx < len(text) - 1 and text[idx - 1].isdigit() and text[idx + 1].isdigit():
return True
# Number abbreviations: No.1 / No. 1
left_start = idx - 1
while left_start >= 0 and text[left_start].isalpha():
left_start -= 1
left_token = text[left_start + 1:idx].lower()
if left_token == "no":
j = idx + 1
while j < len(text) and text[j].isspace():
j += 1
if j < len(text) and text[j].isdigit():
return True
return False
async def process_text_chunk(self, text_chunk: str) -> None:
"""
Process a text chunk from LLM and trigger TTS when sentence is complete.
Args:
text_chunk: Text chunk from LLM streaming
"""
if self._cancel_event.is_set():
return
self._buffer += text_chunk
# Check for sentence completion
while True:
split_idx = -1
for i, char in enumerate(self._buffer):
if char == "." and self._is_non_sentence_period(self._buffer, i):
continue
if char in self.SENTENCE_ENDS:
split_idx = i
break
if split_idx < 0:
break
end_idx = split_idx + 1
while end_idx < len(self._buffer) and self._buffer[end_idx] in self.SENTENCE_ENDS:
end_idx += 1
sentence = self._buffer[:end_idx].strip()
self._buffer = self._buffer[end_idx:]
if sentence and any(ch.isalnum() for ch in sentence):
await self._speak_sentence(sentence)
async def flush(self) -> None:
"""Flush remaining buffer."""
if self._buffer.strip() and not self._cancel_event.is_set():
await self._speak_sentence(self._buffer.strip())
self._buffer = ""
async def _speak_sentence(self, text: str) -> None:
"""Synthesize and send a sentence."""
if not text or self._cancel_event.is_set():
return
self._is_speaking = True
try:
async for chunk in self.tts_service.synthesize_stream(text):
if self._cancel_event.is_set():
break
await self.transport.send_audio(chunk.audio)
await asyncio.sleep(0.01) # Prevent flooding
except Exception as e:
logger.error(f"TTS speak error: {e}")
finally:
self._is_speaking = False
def cancel(self) -> None:
"""Cancel ongoing speech."""
self._cancel_event.set()
self._buffer = ""
def reset(self) -> None:
"""Reset for new turn."""
self._cancel_event.clear()
self._buffer = ""
self._is_speaking = False
@property
def is_speaking(self) -> bool:
return self._is_speaking

View File

@@ -0,0 +1,86 @@
"""Shared text chunking helpers for streaming TTS."""
from typing import Optional
def is_non_sentence_period(text: str, idx: int) -> bool:
"""Check whether '.' should NOT be treated as a sentence delimiter."""
if idx < 0 or idx >= len(text) or text[idx] != ".":
return False
# Decimal/version segment: 1.2, v1.2.3
if idx > 0 and idx < len(text) - 1 and text[idx - 1].isdigit() and text[idx + 1].isdigit():
return True
# Number abbreviations: No.1 / No. 1
left_start = idx - 1
while left_start >= 0 and text[left_start].isalpha():
left_start -= 1
left_token = text[left_start + 1:idx].lower()
if left_token == "no":
j = idx + 1
while j < len(text) and text[j].isspace():
j += 1
if j < len(text) and text[j].isdigit():
return True
return False
def has_spoken_content(text: str) -> bool:
"""Check whether text contains pronounceable content (not punctuation-only)."""
return any(char.isalnum() for char in text)
def extract_tts_sentence(
text_buffer: str,
*,
end_chars: frozenset[str],
trailing_chars: frozenset[str],
closers: frozenset[str],
min_split_spoken_chars: int = 0,
hold_trailing_at_buffer_end: bool = False,
force: bool = False,
) -> Optional[tuple[str, str]]:
"""Extract one TTS sentence from text buffer."""
if not text_buffer:
return None
search_start = 0
while True:
split_idx = -1
for idx in range(search_start, len(text_buffer)):
char = text_buffer[idx]
if char == "." and is_non_sentence_period(text_buffer, idx):
continue
if char in end_chars:
split_idx = idx
break
if split_idx == -1:
return None
end_idx = split_idx + 1
while end_idx < len(text_buffer) and text_buffer[end_idx] in trailing_chars:
end_idx += 1
while end_idx < len(text_buffer) and text_buffer[end_idx] in closers:
end_idx += 1
if hold_trailing_at_buffer_end and not force and end_idx >= len(text_buffer):
return None
sentence = text_buffer[:end_idx].strip()
spoken_chars = sum(1 for ch in sentence if ch.isalnum())
if (
not force
and min_split_spoken_chars > 0
and 0 < spoken_chars < min_split_spoken_chars
and end_idx < len(text_buffer)
):
search_start = end_idx
continue
remainder = text_buffer[end_idx:]
return sentence, remainder

View File

@@ -0,0 +1,95 @@
"""Backend-agnostic streaming adapter from LLM text to TTS audio."""
import asyncio
from loguru import logger
from services.base import BaseTTSService
from services.streaming_text import extract_tts_sentence, has_spoken_content
class StreamingTTSAdapter:
"""
Adapter for streaming LLM text to TTS with sentence-level chunking.
This reduces latency by starting TTS as soon as a complete sentence
is received from the LLM, rather than waiting for the full response.
"""
SENTENCE_ENDS = {"", "", "", ".", "!", "?", "\n"}
SENTENCE_CLOSERS = frozenset()
def __init__(self, tts_service: BaseTTSService, transport, session_id: str):
self.tts_service = tts_service
self.transport = transport
self.session_id = session_id
self._buffer = ""
self._cancel_event = asyncio.Event()
self._is_speaking = False
async def process_text_chunk(self, text_chunk: str) -> None:
"""
Process a text chunk from LLM and trigger TTS when sentence is complete.
Args:
text_chunk: Text chunk from LLM streaming
"""
if self._cancel_event.is_set():
return
self._buffer += text_chunk
# Check for sentence completion
while True:
split_result = extract_tts_sentence(
self._buffer,
end_chars=frozenset(self.SENTENCE_ENDS),
trailing_chars=frozenset(self.SENTENCE_ENDS),
closers=self.SENTENCE_CLOSERS,
force=False,
)
if not split_result:
break
sentence, self._buffer = split_result
if sentence and has_spoken_content(sentence):
await self._speak_sentence(sentence)
async def flush(self) -> None:
"""Flush remaining buffer."""
if self._buffer.strip() and not self._cancel_event.is_set():
await self._speak_sentence(self._buffer.strip())
self._buffer = ""
async def _speak_sentence(self, text: str) -> None:
"""Synthesize and send a sentence."""
if not text or self._cancel_event.is_set():
return
self._is_speaking = True
try:
async for chunk in self.tts_service.synthesize_stream(text):
if self._cancel_event.is_set():
break
await self.transport.send_audio(chunk.audio)
await asyncio.sleep(0.01) # Prevent flooding
except Exception as e:
logger.error(f"TTS speak error: {e}")
finally:
self._is_speaking = False
def cancel(self) -> None:
"""Cancel ongoing speech."""
self._cancel_event.set()
self._buffer = ""
def reset(self) -> None:
"""Reset for new turn."""
self._cancel_event.clear()
self._buffer = ""
self._is_speaking = False
@property
def is_speaking(self) -> bool:
return self._is_speaking