diff --git a/engine/core/duplex_pipeline.py b/engine/core/duplex_pipeline.py index 31e118d..2c7c7b6 100644 --- a/engine/core/duplex_pipeline.py +++ b/engine/core/duplex_pipeline.py @@ -48,6 +48,10 @@ class DuplexPipeline: ↓ Barge-in Detection → Interrupt """ + + _SENTENCE_END_CHARS = frozenset({"。", "!", "?", ".", "!", "?", "\n"}) + _SENTENCE_TRAILING_CHARS = frozenset({"。", "!", "?", ".", "!", "?", "…", "~", "~", "\n"}) + _SENTENCE_CLOSERS = frozenset({'"', "'", "”", "’", ")", "]", "}", ")", "】", "」", "』", "》"}) def __init__( self, @@ -499,8 +503,9 @@ class DuplexPipeline: # Sentence buffer for streaming TTS sentence_buffer = "" - sentence_ends = {',', '。', '!', '?', '\n'} + pending_punctuation = "" first_audio_sent = False + spoken_sentence_count = 0 # Stream LLM response and TTS sentence by sentence async for text_chunk in self.llm_service.generate_stream(messages): @@ -521,33 +526,43 @@ class DuplexPipeline: }) # Check for sentence completion - synthesize immediately for low latency - while any(end in sentence_buffer for end in sentence_ends): - # Find first sentence end - min_idx = len(sentence_buffer) - for end in sentence_ends: - idx = sentence_buffer.find(end) - if idx != -1 and idx < min_idx: - min_idx = idx - - if min_idx < len(sentence_buffer): - sentence = sentence_buffer[:min_idx + 1].strip() - sentence_buffer = sentence_buffer[min_idx + 1:] - - if sentence and not self._interrupt_event.is_set(): - # Send track start on first audio - if not first_audio_sent: - await self.transport.send_event({ - **ev( - "output.audio.start", - trackId=self.session_id, - ) - }) - first_audio_sent = True - - # Synthesize and send this sentence immediately - await self._speak_sentence(sentence) - else: + while True: + split_result = self._extract_tts_sentence(sentence_buffer, force=False) + if not split_result: break + sentence, sentence_buffer = split_result + if not sentence: + continue + + sentence = f"{pending_punctuation}{sentence}".strip() + pending_punctuation = "" + if not sentence: + continue + + # Avoid synthesizing punctuation-only fragments (e.g. standalone "!") + if not self._has_spoken_content(sentence): + pending_punctuation = sentence + continue + + if not self._interrupt_event.is_set(): + # Send track start on first audio + if not first_audio_sent: + await self.transport.send_event({ + **ev( + "output.audio.start", + trackId=self.session_id, + ) + }) + first_audio_sent = True + + # Keep very short fade-in for non-first sentence to preserve consonant attack. + fade_in_ms = 2 if spoken_sentence_count == 0 else 1 + await self._speak_sentence( + sentence, + fade_in_ms=fade_in_ms, + fade_out_ms=8, + ) + spoken_sentence_count += 1 # Send final LLM response event if full_response and not self._interrupt_event.is_set(): @@ -560,7 +575,8 @@ class DuplexPipeline: }) # Speak any remaining text - if sentence_buffer.strip() and not self._interrupt_event.is_set(): + remaining_text = f"{pending_punctuation}{sentence_buffer}".strip() + if remaining_text and self._has_spoken_content(remaining_text) and not self._interrupt_event.is_set(): if not first_audio_sent: await self.transport.send_event({ **ev( @@ -569,7 +585,12 @@ class DuplexPipeline: ) }) first_audio_sent = True - await self._speak_sentence(sentence_buffer.strip()) + fade_in_ms = 2 if spoken_sentence_count == 0 else 1 + await self._speak_sentence( + remaining_text, + fade_in_ms=fade_in_ms, + fade_out_ms=8, + ) # Send track end if first_audio_sent: @@ -598,12 +619,53 @@ class DuplexPipeline: self._barge_in_speech_frames = 0 self._barge_in_silence_frames = 0 - async def _speak_sentence(self, text: str) -> None: + def _extract_tts_sentence(self, text_buffer: str, force: bool = False) -> Optional[tuple[str, str]]: + """ + Extract one TTS sentence from the buffer. + + Consecutive sentence terminators are grouped together to avoid creating + punctuation-only fragments such as a standalone "!" after "?". By + default, trailing terminator at buffer end is held for more context. + """ + if not text_buffer: + return None + + split_idx = -1 + for idx, char in enumerate(text_buffer): + if char in self._SENTENCE_END_CHARS: + split_idx = idx + break + + if split_idx == -1: + return None + + end_idx = split_idx + 1 + while end_idx < len(text_buffer) and text_buffer[end_idx] in self._SENTENCE_TRAILING_CHARS: + end_idx += 1 + + # Include trailing quote/bracket closers in the same segment. + while end_idx < len(text_buffer) and text_buffer[end_idx] in self._SENTENCE_CLOSERS: + end_idx += 1 + + if not force and end_idx >= len(text_buffer): + return None + + sentence = text_buffer[:end_idx].strip() + remainder = text_buffer[end_idx:] + return sentence, remainder + + def _has_spoken_content(self, text: str) -> bool: + """Check whether text contains pronounceable content (not punctuation-only).""" + return any(char.isalnum() for char in text) + + async def _speak_sentence(self, text: str, fade_in_ms: int = 2, fade_out_ms: int = 8) -> None: """ Synthesize and send a single sentence. Args: text: Sentence to speak + fade_in_ms: Fade-in duration for sentence start chunks + fade_out_ms: Fade-out duration for sentence end chunks """ if not text.strip() or self._interrupt_event.is_set(): return @@ -640,7 +702,8 @@ class DuplexPipeline: sample_rate=chunk.sample_rate, fade_in=is_first_chunk, fade_out=bool(chunk.is_final), - fade_ms=8, + fade_in_ms=fade_in_ms, + fade_out_ms=fade_out_ms, ) is_first_chunk = False @@ -656,7 +719,8 @@ class DuplexPipeline: sample_rate: int, fade_in: bool = False, fade_out: bool = False, - fade_ms: int = 8, + fade_in_ms: int = 2, + fade_out_ms: int = 8, ) -> bytes: """Apply short edge fades to reduce click/pop at sentence boundaries.""" if not pcm_bytes or (not fade_in and not fade_out): @@ -667,13 +731,14 @@ class DuplexPipeline: if samples.size == 0: return pcm_bytes - fade_samples = int(sample_rate * (fade_ms / 1000.0)) - fade_samples = max(1, min(fade_samples, samples.size)) - if fade_in: - samples[:fade_samples] *= np.linspace(0.0, 1.0, fade_samples, endpoint=True) + fade_in_samples = int(sample_rate * (fade_in_ms / 1000.0)) + fade_in_samples = max(1, min(fade_in_samples, samples.size)) + samples[:fade_in_samples] *= np.linspace(0.0, 1.0, fade_in_samples, endpoint=True) if fade_out: - samples[-fade_samples:] *= np.linspace(1.0, 0.0, fade_samples, endpoint=True) + fade_out_samples = int(sample_rate * (fade_out_ms / 1000.0)) + fade_out_samples = max(1, min(fade_out_samples, samples.size)) + samples[-fade_out_samples:] *= np.linspace(1.0, 0.0, fade_out_samples, endpoint=True) return np.clip(samples, -32768, 32767).astype(" { + const now = audioCtx ? audioCtx.currentTime : 0; + playbackTime = now; + playbackSources.forEach((node) => { try { - s.stop(); + if (audioCtx && node.gainNode && node.source) { + node.gainNode.gain.cancelScheduledValues(now); + node.gainNode.gain.setValueAtTime(node.gainNode.gain.value || 1, now); + node.gainNode.gain.linearRampToValueAtTime(0, now + playbackStopRampSec); + node.source.stop(now + playbackStopRampSec + 0.002); + } else if (node.source) { + node.source.stop(); + } } catch (err) {} }); playbackSources = []; @@ -527,14 +536,18 @@ const buffer = audioCtx.createBuffer(1, float32.length, targetSampleRate); buffer.copyToChannel(float32, 0); const source = audioCtx.createBufferSource(); + const gainNode = audioCtx.createGain(); source.buffer = buffer; - source.connect(playbackDest); + source.connect(gainNode); + gainNode.connect(playbackDest); const startTime = Math.max(audioCtx.currentTime + 0.02, playbackTime); + gainNode.gain.setValueAtTime(1, startTime); source.start(startTime); playbackTime = startTime + buffer.duration; - playbackSources.push(source); + const playbackNode = { source, gainNode }; + playbackSources.push(playbackNode); source.onended = () => { - playbackSources = playbackSources.filter((s) => s !== source); + playbackSources = playbackSources.filter((s) => s !== playbackNode); }; } diff --git a/engine/services/siliconflow_tts.py b/engine/services/siliconflow_tts.py index cf3b74c..15d71ef 100644 --- a/engine/services/siliconflow_tts.py +++ b/engine/services/siliconflow_tts.py @@ -134,6 +134,7 @@ class SiliconFlowTTSService(BaseTTSService): # Stream audio chunks chunk_size = self.sample_rate * 2 // 10 # 100ms chunks buffer = b"" + pending_chunk = None async for chunk in response.content.iter_any(): if self._cancel_event.is_set(): @@ -146,14 +147,34 @@ class SiliconFlowTTSService(BaseTTSService): while len(buffer) >= chunk_size: audio_chunk = buffer[:chunk_size] buffer = buffer[chunk_size:] - + + # Keep one full chunk buffered so we can always tag the true + # last full chunk as final when stream length is an exact multiple. + if pending_chunk is not None: + yield TTSChunk( + audio=pending_chunk, + sample_rate=self.sample_rate, + is_final=False + ) + pending_chunk = audio_chunk + + # Flush pending chunk(s) and remaining tail. + if pending_chunk is not None: + if buffer: yield TTSChunk( - audio=audio_chunk, + audio=pending_chunk, sample_rate=self.sample_rate, is_final=False ) - - # Yield remaining buffer + pending_chunk = None + else: + yield TTSChunk( + audio=pending_chunk, + sample_rate=self.sample_rate, + is_final=True + ) + pending_chunk = None + if buffer: yield TTSChunk( audio=buffer, @@ -182,7 +203,7 @@ class StreamingTTSAdapter: """ # Sentence delimiters - SENTENCE_ENDS = {'。', '!', '?', '\n'} + SENTENCE_ENDS = {'。', '!', '?', '.', '!', '?', '\n'} def __init__(self, tts_service: BaseTTSService, transport, session_id: str): self.tts_service = tts_service @@ -205,15 +226,24 @@ class StreamingTTSAdapter: self._buffer += text_chunk # Check for sentence completion - for i, char in enumerate(self._buffer): - if char in self.SENTENCE_ENDS: - # Found sentence end, synthesize up to this point - sentence = self._buffer[:i+1].strip() - self._buffer = self._buffer[i+1:] - - if sentence: - await self._speak_sentence(sentence) + while True: + split_idx = -1 + for i, char in enumerate(self._buffer): + if char in self.SENTENCE_ENDS: + split_idx = i + break + if split_idx < 0: break + + end_idx = split_idx + 1 + while end_idx < len(self._buffer) and self._buffer[end_idx] in self.SENTENCE_ENDS: + end_idx += 1 + + sentence = self._buffer[:end_idx].strip() + self._buffer = self._buffer[end_idx:] + + if sentence and any(ch.isalnum() for ch in sentence): + await self._speak_sentence(sentence) async def flush(self) -> None: """Flush remaining buffer."""