Compare commits
1 Commits
vp-moq-vib
...
mb/impmrov
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
40ed9a7e25 |
@@ -47,6 +47,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
`maybe_capture_participant_screen()` for `SmallWebRTCTransport` in the runner
|
||||
utils.
|
||||
|
||||
- Improved `SimpleTextAggregator` and `TTSService` to properly handle buffered
|
||||
sentences at the end of LLM responses. Previously, when an LLM response ended,
|
||||
any complete sentences remaining in the aggregator's buffer would be sent to
|
||||
TTS as one large chunk. Now these sentences are flushed individually, providing
|
||||
better interruption points. Added `flush_next_sentence()` method to
|
||||
`SimpleTextAggregator` to extract buffered sentences without adding new text.
|
||||
|
||||
- Added Hindi support for Rime TTS services.
|
||||
|
||||
- Updated `GeminiTTSService` to use Google Cloud Text-to-Speech streaming API
|
||||
|
||||
@@ -352,17 +352,26 @@ class TTSService(AIService):
|
||||
# pause to avoid audio overlapping.
|
||||
await self._maybe_pause_frame_processing()
|
||||
|
||||
sentence = self._text_aggregator.text
|
||||
# Flush any remaining complete sentences from the aggregator.
|
||||
# This ensures all buffered sentences are sent to TTS individually.
|
||||
sentence = await self._text_aggregator.flush_next_sentence()
|
||||
includes_inter_frame_spaces = self._aggregated_text_includes_inter_frame_spaces
|
||||
while sentence:
|
||||
await self._push_tts_frames(
|
||||
sentence, includes_inter_frame_spaces=includes_inter_frame_spaces
|
||||
)
|
||||
sentence = await self._text_aggregator.flush_next_sentence()
|
||||
|
||||
# Send any remaining incomplete text
|
||||
remaining_text = self._text_aggregator.text
|
||||
if remaining_text:
|
||||
await self._push_tts_frames(
|
||||
remaining_text, includes_inter_frame_spaces=includes_inter_frame_spaces
|
||||
)
|
||||
|
||||
# Reset aggregator state
|
||||
await self._text_aggregator.reset()
|
||||
self._processing_text = False
|
||||
self._aggregated_text_includes_inter_frame_spaces = False
|
||||
|
||||
await self._push_tts_frames(
|
||||
sentence, includes_inter_frame_spaces=includes_inter_frame_spaces
|
||||
)
|
||||
if isinstance(frame, LLMFullResponseEndFrame):
|
||||
if self._push_text_frames:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
@@ -41,30 +41,57 @@ class SimpleTextAggregator(BaseTextAggregator):
|
||||
"""
|
||||
return self._text
|
||||
|
||||
async def aggregate(self, text: str) -> Optional[str]:
|
||||
"""Aggregate text and return completed sentences.
|
||||
def _extract_next_sentence(self) -> Optional[str]:
|
||||
"""Extract the next complete sentence from the buffer.
|
||||
|
||||
Adds the new text to the buffer and checks for end-of-sentence markers.
|
||||
When a sentence boundary is found, returns the completed sentence and
|
||||
removes it from the buffer.
|
||||
Returns:
|
||||
The first complete sentence if a sentence boundary is found,
|
||||
or None if the buffer is empty or contains only incomplete text.
|
||||
"""
|
||||
eos_end_marker = match_endofsentence(self._text)
|
||||
if eos_end_marker:
|
||||
# Extract the first complete sentence
|
||||
sentence = self._text[:eos_end_marker]
|
||||
# Remove it from buffer
|
||||
self._text = self._text[eos_end_marker:]
|
||||
return sentence
|
||||
|
||||
return None
|
||||
|
||||
async def aggregate(self, text: str) -> Optional[str]:
|
||||
"""Aggregate text and return the first completed sentence.
|
||||
|
||||
Adds the new text to the buffer and checks for sentence boundaries.
|
||||
When a sentence boundary is found, returns the first completed sentence
|
||||
and removes it from the buffer. Subsequent calls (even with empty strings)
|
||||
will return additional complete sentences if they exist in the buffer.
|
||||
|
||||
This handles varying input patterns from different LLM providers:
|
||||
- Word-by-word tokens (e.g., 'Hello', '!', ' I', ' am', ' Doug.')
|
||||
- Chunks with one or more sentences (e.g., 'Hello! I am Doug. Nice to meet you!')
|
||||
|
||||
Args:
|
||||
text: New text to add to the aggregation buffer.
|
||||
|
||||
Returns:
|
||||
A complete sentence if an end-of-sentence marker is found,
|
||||
The first complete sentence if a sentence boundary is found,
|
||||
or None if more text is needed to complete a sentence.
|
||||
"""
|
||||
result: Optional[str] = None
|
||||
|
||||
self._text += text
|
||||
return self._extract_next_sentence()
|
||||
|
||||
eos_end_marker = match_endofsentence(self._text)
|
||||
if eos_end_marker:
|
||||
result = self._text[:eos_end_marker]
|
||||
self._text = self._text[eos_end_marker:]
|
||||
async def flush_next_sentence(self) -> Optional[str]:
|
||||
"""Retrieve the next complete sentence from the buffer without adding new text.
|
||||
|
||||
return result
|
||||
This method extracts the next complete sentence from the internal buffer
|
||||
without requiring new input text. It's useful for draining multiple
|
||||
complete sentences that were received in a single chunk.
|
||||
|
||||
Returns:
|
||||
The next complete sentence if one exists in the buffer, or None if
|
||||
the buffer is empty or contains only incomplete text.
|
||||
"""
|
||||
return self._extract_next_sentence()
|
||||
|
||||
async def handle_interruption(self):
|
||||
"""Handle interruptions by clearing the text buffer.
|
||||
|
||||
@@ -19,11 +19,39 @@ class TestSimpleTextAggregator(unittest.IsolatedAsyncioTestCase):
|
||||
await self.aggregator.reset()
|
||||
assert self.aggregator.text == ""
|
||||
|
||||
async def test_simple_sentence(self):
|
||||
assert await self.aggregator.aggregate("Hello ") == None
|
||||
assert await self.aggregator.aggregate("Pipecat!") == "Hello Pipecat!"
|
||||
async def test_word_by_word(self):
|
||||
"""Test word-by-word token aggregation (e.g., OpenAI)."""
|
||||
assert await self.aggregator.aggregate("Hello") == None
|
||||
assert await self.aggregator.aggregate("!") == "Hello!"
|
||||
assert await self.aggregator.aggregate(" I") == None
|
||||
assert await self.aggregator.aggregate(" am") == None
|
||||
assert await self.aggregator.aggregate(" Doug.") == " I am Doug."
|
||||
assert self.aggregator.text == ""
|
||||
|
||||
async def test_multiple_sentences(self):
|
||||
assert await self.aggregator.aggregate("Hello Pipecat! How are ") == "Hello Pipecat!"
|
||||
assert await self.aggregator.aggregate("you?") == " How are you?"
|
||||
async def test_chunks_with_partial_sentences(self):
|
||||
"""Test chunks with partial sentences."""
|
||||
assert await self.aggregator.aggregate("Hey!") == "Hey!"
|
||||
assert await self.aggregator.aggregate(" Nice to meet you! So") == " Nice to meet you!"
|
||||
assert self.aggregator.text == " So"
|
||||
assert await self.aggregator.aggregate(" what") == None
|
||||
assert await self.aggregator.aggregate("'d you like?") == " So what'd you like?"
|
||||
|
||||
async def test_multi_sentence_chunk(self):
|
||||
"""Test chunks with multiple complete sentences."""
|
||||
result = await self.aggregator.aggregate("Hello! I am Doug. Nice to meet you!")
|
||||
assert result == "Hello!"
|
||||
# Drain remaining sentences
|
||||
assert await self.aggregator.flush_next_sentence() == " I am Doug."
|
||||
assert await self.aggregator.flush_next_sentence() == " Nice to meet you!"
|
||||
assert await self.aggregator.flush_next_sentence() == None
|
||||
assert self.aggregator.text == ""
|
||||
|
||||
async def test_flush_next_sentence_with_incomplete(self):
|
||||
"""Test flush_next_sentence with incomplete sentence in buffer."""
|
||||
assert await self.aggregator.aggregate("Hello! I am") == "Hello!"
|
||||
assert await self.aggregator.flush_next_sentence() == None
|
||||
assert self.aggregator.text == " I am"
|
||||
|
||||
async def test_flush_next_sentence_empty_buffer(self):
|
||||
"""Test flush_next_sentence with empty buffer."""
|
||||
assert await self.aggregator.flush_next_sentence() == None
|
||||
|
||||
Reference in New Issue
Block a user