Compare commits

...

1 Commits

Author SHA1 Message Date
Mark Backman
40ed9a7e25 Improve SimpleTextAggregator to handle multi-sentence chunks 2025-11-18 11:35:15 -05:00
4 changed files with 96 additions and 25 deletions

View File

@@ -47,6 +47,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
`maybe_capture_participant_screen()` for `SmallWebRTCTransport` in the runner
utils.
- Improved `SimpleTextAggregator` and `TTSService` to properly handle buffered
sentences at the end of LLM responses. Previously, when an LLM response ended,
any complete sentences remaining in the aggregator's buffer would be sent to
TTS as one large chunk. Now these sentences are flushed individually, providing
better interruption points. Added `flush_next_sentence()` method to
`SimpleTextAggregator` to extract buffered sentences without adding new text.
- Added Hindi support for Rime TTS services.
- Updated `GeminiTTSService` to use Google Cloud Text-to-Speech streaming API

View File

@@ -352,17 +352,26 @@ class TTSService(AIService):
# pause to avoid audio overlapping.
await self._maybe_pause_frame_processing()
sentence = self._text_aggregator.text
# Flush any remaining complete sentences from the aggregator.
# This ensures all buffered sentences are sent to TTS individually.
sentence = await self._text_aggregator.flush_next_sentence()
includes_inter_frame_spaces = self._aggregated_text_includes_inter_frame_spaces
while sentence:
await self._push_tts_frames(
sentence, includes_inter_frame_spaces=includes_inter_frame_spaces
)
sentence = await self._text_aggregator.flush_next_sentence()
# Send any remaining incomplete text
remaining_text = self._text_aggregator.text
if remaining_text:
await self._push_tts_frames(
remaining_text, includes_inter_frame_spaces=includes_inter_frame_spaces
)
# Reset aggregator state
await self._text_aggregator.reset()
self._processing_text = False
self._aggregated_text_includes_inter_frame_spaces = False
await self._push_tts_frames(
sentence, includes_inter_frame_spaces=includes_inter_frame_spaces
)
if isinstance(frame, LLMFullResponseEndFrame):
if self._push_text_frames:
await self.push_frame(frame, direction)

View File

@@ -41,30 +41,57 @@ class SimpleTextAggregator(BaseTextAggregator):
"""
return self._text
async def aggregate(self, text: str) -> Optional[str]:
"""Aggregate text and return completed sentences.
def _extract_next_sentence(self) -> Optional[str]:
"""Extract the next complete sentence from the buffer.
Adds the new text to the buffer and checks for end-of-sentence markers.
When a sentence boundary is found, returns the completed sentence and
removes it from the buffer.
Returns:
The first complete sentence if a sentence boundary is found,
or None if the buffer is empty or contains only incomplete text.
"""
eos_end_marker = match_endofsentence(self._text)
if eos_end_marker:
# Extract the first complete sentence
sentence = self._text[:eos_end_marker]
# Remove it from buffer
self._text = self._text[eos_end_marker:]
return sentence
return None
async def aggregate(self, text: str) -> Optional[str]:
"""Aggregate text and return the first completed sentence.
Adds the new text to the buffer and checks for sentence boundaries.
When a sentence boundary is found, returns the first completed sentence
and removes it from the buffer. Subsequent calls (even with empty strings)
will return additional complete sentences if they exist in the buffer.
This handles varying input patterns from different LLM providers:
- Word-by-word tokens (e.g., 'Hello', '!', ' I', ' am', ' Doug.')
- Chunks with one or more sentences (e.g., 'Hello! I am Doug. Nice to meet you!')
Args:
text: New text to add to the aggregation buffer.
Returns:
A complete sentence if an end-of-sentence marker is found,
The first complete sentence if a sentence boundary is found,
or None if more text is needed to complete a sentence.
"""
result: Optional[str] = None
self._text += text
return self._extract_next_sentence()
eos_end_marker = match_endofsentence(self._text)
if eos_end_marker:
result = self._text[:eos_end_marker]
self._text = self._text[eos_end_marker:]
async def flush_next_sentence(self) -> Optional[str]:
"""Retrieve the next complete sentence from the buffer without adding new text.
return result
This method extracts the next complete sentence from the internal buffer
without requiring new input text. It's useful for draining multiple
complete sentences that were received in a single chunk.
Returns:
The next complete sentence if one exists in the buffer, or None if
the buffer is empty or contains only incomplete text.
"""
return self._extract_next_sentence()
async def handle_interruption(self):
"""Handle interruptions by clearing the text buffer.

View File

@@ -19,11 +19,39 @@ class TestSimpleTextAggregator(unittest.IsolatedAsyncioTestCase):
await self.aggregator.reset()
assert self.aggregator.text == ""
async def test_simple_sentence(self):
assert await self.aggregator.aggregate("Hello ") == None
assert await self.aggregator.aggregate("Pipecat!") == "Hello Pipecat!"
async def test_word_by_word(self):
"""Test word-by-word token aggregation (e.g., OpenAI)."""
assert await self.aggregator.aggregate("Hello") == None
assert await self.aggregator.aggregate("!") == "Hello!"
assert await self.aggregator.aggregate(" I") == None
assert await self.aggregator.aggregate(" am") == None
assert await self.aggregator.aggregate(" Doug.") == " I am Doug."
assert self.aggregator.text == ""
async def test_multiple_sentences(self):
assert await self.aggregator.aggregate("Hello Pipecat! How are ") == "Hello Pipecat!"
assert await self.aggregator.aggregate("you?") == " How are you?"
async def test_chunks_with_partial_sentences(self):
"""Test chunks with partial sentences."""
assert await self.aggregator.aggregate("Hey!") == "Hey!"
assert await self.aggregator.aggregate(" Nice to meet you! So") == " Nice to meet you!"
assert self.aggregator.text == " So"
assert await self.aggregator.aggregate(" what") == None
assert await self.aggregator.aggregate("'d you like?") == " So what'd you like?"
async def test_multi_sentence_chunk(self):
"""Test chunks with multiple complete sentences."""
result = await self.aggregator.aggregate("Hello! I am Doug. Nice to meet you!")
assert result == "Hello!"
# Drain remaining sentences
assert await self.aggregator.flush_next_sentence() == " I am Doug."
assert await self.aggregator.flush_next_sentence() == " Nice to meet you!"
assert await self.aggregator.flush_next_sentence() == None
assert self.aggregator.text == ""
async def test_flush_next_sentence_with_incomplete(self):
"""Test flush_next_sentence with incomplete sentence in buffer."""
assert await self.aggregator.aggregate("Hello! I am") == "Hello!"
assert await self.aggregator.flush_next_sentence() == None
assert self.aggregator.text == " I am"
async def test_flush_next_sentence_empty_buffer(self):
"""Test flush_next_sentence with empty buffer."""
assert await self.aggregator.flush_next_sentence() == None