Adds tests for AggregatedFrameSequencer, WordCompletionTracker, and word_timestamp_utils (including CJK language scenarios). Updates existing Cartesia TTS and TTS frame ordering tests to cover the new behaviours.
1603 lines
74 KiB
Python
1603 lines
74 KiB
Python
#
|
||
# Copyright (c) 2024-2026, Daily
|
||
#
|
||
# SPDX-License-Identifier: BSD 2-Clause License
|
||
#
|
||
|
||
import unittest
|
||
|
||
from pipecat.utils.context.word_completion_tracker import WordCompletionTracker
|
||
|
||
|
||
class TestWordCompletionTrackerBasic(unittest.TestCase):
|
||
def test_not_complete_before_any_words(self):
|
||
tracker = WordCompletionTracker("Hello world")
|
||
self.assertFalse(tracker.is_complete)
|
||
|
||
def test_complete_after_all_words(self):
|
||
tracker = WordCompletionTracker("Hello world")
|
||
tracker.add_word_and_check_complete("Hello")
|
||
self.assertFalse(tracker.is_complete)
|
||
tracker.add_word_and_check_complete("world")
|
||
self.assertTrue(tracker.is_complete)
|
||
|
||
def test_add_word_and_check_complete_return_value(self):
|
||
"""add_word_and_check_complete returns True exactly when the tracker becomes complete."""
|
||
tracker = WordCompletionTracker("Hi there")
|
||
self.assertFalse(tracker.add_word_and_check_complete("Hi"))
|
||
self.assertTrue(tracker.add_word_and_check_complete("there"))
|
||
|
||
def test_single_word(self):
|
||
tracker = WordCompletionTracker("Hello")
|
||
self.assertTrue(tracker.add_word_and_check_complete("Hello"))
|
||
|
||
def test_complete_stays_true_after_extra_words(self):
|
||
"""Extra words after completion force-complete (no-op remaining) and stay complete."""
|
||
tracker = WordCompletionTracker("Hi")
|
||
tracker.add_word_and_check_complete("Hi")
|
||
result = tracker.add_word_and_check_complete("extra")
|
||
self.assertTrue(result)
|
||
self.assertTrue(tracker.is_complete)
|
||
|
||
|
||
class TestWordCompletionTrackerNormalization(unittest.TestCase):
|
||
def test_punctuation_ignored_in_expected(self):
|
||
"""Punctuation in the source text is stripped before comparison."""
|
||
tracker = WordCompletionTracker("Hello, world!")
|
||
tracker.add_word_and_check_complete("Hello")
|
||
tracker.add_word_and_check_complete("world")
|
||
self.assertTrue(tracker.is_complete)
|
||
|
||
def test_punctuation_ignored_in_words(self):
|
||
"""Punctuation attached to TTS word tokens is also stripped."""
|
||
tracker = WordCompletionTracker("Hello world")
|
||
tracker.add_word_and_check_complete("Hello,")
|
||
tracker.add_word_and_check_complete("world!")
|
||
self.assertTrue(tracker.is_complete)
|
||
|
||
def test_case_insensitive(self):
|
||
tracker = WordCompletionTracker("HELLO WORLD")
|
||
tracker.add_word_and_check_complete("hello")
|
||
tracker.add_word_and_check_complete("world")
|
||
self.assertTrue(tracker.is_complete)
|
||
|
||
def test_spaces_ignored(self):
|
||
"""Spaces in expected or received text do not count towards char totals."""
|
||
tracker = WordCompletionTracker("a b c")
|
||
# Normalized expected: "abc" (3 chars)
|
||
tracker.add_word_and_check_complete("a")
|
||
tracker.add_word_and_check_complete("b")
|
||
self.assertFalse(tracker.is_complete)
|
||
tracker.add_word_and_check_complete("c")
|
||
self.assertTrue(tracker.is_complete)
|
||
|
||
def test_numbers_kept(self):
|
||
"""Digits are treated as regular alphanumeric characters."""
|
||
tracker = WordCompletionTracker("Room 42")
|
||
tracker.add_word_and_check_complete("Room")
|
||
self.assertFalse(tracker.is_complete)
|
||
tracker.add_word_and_check_complete("42")
|
||
self.assertTrue(tracker.is_complete)
|
||
|
||
def test_mixed_punctuation_and_numbers(self):
|
||
tracker = WordCompletionTracker("It costs $9.99!")
|
||
# Normalized expected: "itcosts999"
|
||
tracker.add_word_and_check_complete("It")
|
||
tracker.add_word_and_check_complete("costs")
|
||
self.assertFalse(tracker.is_complete)
|
||
tracker.add_word_and_check_complete("$9.99!")
|
||
self.assertTrue(tracker.is_complete)
|
||
|
||
def test_special_characters_only_in_expected(self):
|
||
"""If the expected text normalizes to empty, the tracker is immediately complete."""
|
||
tracker = WordCompletionTracker("...")
|
||
self.assertTrue(tracker.is_complete)
|
||
|
||
def test_emoji_only_expected_accepts_emoji_word_without_warning(self):
|
||
"""Emoji-only frame is already complete (normalizes to ''), but a TTS word event
|
||
for the emoji must still be accepted gracefully and return True without a warning."""
|
||
tracker = WordCompletionTracker("😊")
|
||
self.assertTrue(tracker.is_complete)
|
||
result = tracker.add_word_and_check_complete("😊")
|
||
self.assertTrue(result)
|
||
self.assertTrue(tracker.is_complete)
|
||
|
||
def test_special_characters_only_in_word(self):
|
||
"""A word token that normalizes to empty and is absent from the expected text
|
||
triggers force-complete; one that is present in the expected text is consumed
|
||
normally and contributes nothing to the alnum count."""
|
||
# "---" is not in "hello" → force-complete
|
||
tracker = WordCompletionTracker("hello")
|
||
tracker.add_word_and_check_complete("---")
|
||
self.assertTrue(tracker.is_complete)
|
||
self.assertEqual(tracker.get_overflow_word(), "---")
|
||
|
||
# "..." IS in "hello..." so it belongs here and contributes 0 alnum chars
|
||
tracker2 = WordCompletionTracker("hello...")
|
||
tracker2.add_word_and_check_complete("...") # belongs, but no alnum chars → incomplete
|
||
self.assertFalse(tracker2.is_complete)
|
||
tracker2.add_word_and_check_complete("hello")
|
||
self.assertTrue(tracker2.is_complete)
|
||
|
||
def test_ssml_tags_stripped_in_word(self):
|
||
"""SSML tags like <spell>...</spell> in TTS word tokens are stripped before comparison."""
|
||
tracker = WordCompletionTracker("1234-5678")
|
||
# Cartesia returns something like "<spell>1234-5678</spell>" as a word token
|
||
self.assertTrue(tracker.add_word_and_check_complete("<spell>1234-5678</spell>"))
|
||
|
||
def test_ssml_tags_do_not_inflate_char_count(self):
|
||
"""Tag names must not count as alphanumeric chars, preventing premature completion."""
|
||
tracker = WordCompletionTracker("ab")
|
||
# Without tag stripping "spell" (5 chars) would push received count over threshold.
|
||
tracker.add_word_and_check_complete("<spell>a</spell>")
|
||
self.assertFalse(tracker.is_complete)
|
||
tracker.add_word_and_check_complete("b")
|
||
self.assertTrue(tracker.is_complete)
|
||
|
||
def test_ssml_tags_in_expected_text(self):
|
||
"""Tags in the expected text are also stripped."""
|
||
tracker = WordCompletionTracker("<speak>hello</speak>")
|
||
self.assertTrue(tracker.add_word_and_check_complete("hello"))
|
||
|
||
def test_curly_apostrophe_in_llm_text_matches_straight_apostrophe_in_tts_word(self):
|
||
"""LLM curly apostrophe must not trigger the safeguard when TTS uses straight."""
|
||
llm = "you’re welcome" # LLM: RIGHT SINGLE QUOTATION MARK
|
||
tracker = WordCompletionTracker(llm, llm_text=llm)
|
||
tracker.add_word_and_check_complete("you’re") # TTS: straight apostrophe
|
||
self.assertIsNotNone(tracker.get_llm_consumed())
|
||
tracker.add_word_and_check_complete("welcome")
|
||
self.assertTrue(tracker.is_complete)
|
||
|
||
def test_curly_apostrophe_in_tts_word_matches_straight_apostrophe_in_llm_text(self):
|
||
"""TTS curly apostrophe must not trigger the safeguard when LLM uses straight."""
|
||
llm = "you’re welcome" # LLM: straight apostrophe
|
||
tracker = WordCompletionTracker(llm, llm_text=llm)
|
||
tracker.add_word_and_check_complete("you’re") # TTS: RIGHT SINGLE QUOTATION MARK
|
||
self.assertIsNotNone(tracker.get_llm_consumed())
|
||
tracker.add_word_and_check_complete("welcome")
|
||
self.assertTrue(tracker.is_complete)
|
||
|
||
|
||
class TestWordCompletionTrackerReset(unittest.TestCase):
|
||
def test_reset_clears_progress(self):
|
||
tracker = WordCompletionTracker("Hello world")
|
||
tracker.add_word_and_check_complete("Hello")
|
||
tracker.add_word_and_check_complete("world")
|
||
self.assertTrue(tracker.is_complete)
|
||
tracker.reset()
|
||
self.assertFalse(tracker.is_complete)
|
||
|
||
def test_reset_allows_reuse(self):
|
||
tracker = WordCompletionTracker("Hello world")
|
||
tracker.add_word_and_check_complete("Hello")
|
||
tracker.add_word_and_check_complete("world")
|
||
tracker.reset()
|
||
tracker.add_word_and_check_complete("Hello")
|
||
self.assertFalse(tracker.is_complete)
|
||
tracker.add_word_and_check_complete("world")
|
||
self.assertTrue(tracker.is_complete)
|
||
|
||
def test_reset_preserves_expected_text(self):
|
||
"""reset() only clears received chars; the expected text is unchanged."""
|
||
tracker = WordCompletionTracker("Hi")
|
||
tracker.add_word_and_check_complete("Hi")
|
||
tracker.reset()
|
||
# Re-adding the same word should complete again
|
||
self.assertTrue(tracker.add_word_and_check_complete("Hi"))
|
||
|
||
def test_reset_clears_raw_pos_cursor(self):
|
||
"""reset() resets the llm_text cursor so raw_consumed is correct after replay."""
|
||
raw = "<card>4111</card>"
|
||
tracker = WordCompletionTracker("4111", llm_text=raw)
|
||
tracker.add_word_and_check_complete("4111")
|
||
self.assertEqual(tracker.get_llm_consumed(), "<card>4111</card>")
|
||
tracker.reset()
|
||
tracker.add_word_and_check_complete("4111")
|
||
# Cursor restarts from position 0 after reset.
|
||
self.assertEqual(tracker.get_llm_consumed(), "<card>4111</card>")
|
||
|
||
def test_reset_clears_expected_raw_pos_cursor(self):
|
||
"""reset() resets the expected_raw cursor so force-complete uses full text again."""
|
||
tracker = WordCompletionTracker("number is")
|
||
tracker.add_word_and_check_complete("number")
|
||
# Partially advanced: remaining = " is"
|
||
tracker.add_word_and_check_complete("4111") # force-complete
|
||
self.assertEqual(tracker.get_word_for_frame(), "is")
|
||
|
||
tracker.reset()
|
||
# After reset the cursor is back at 0, so force-complete sees the full text.
|
||
tracker.add_word_and_check_complete("4111")
|
||
self.assertEqual(tracker.get_word_for_frame(), "number is")
|
||
|
||
|
||
class TestWordCompletionTrackerEdgeCases(unittest.TestCase):
|
||
def test_empty_expected_text(self):
|
||
"""An empty expected string is complete from the start."""
|
||
tracker = WordCompletionTracker("")
|
||
self.assertTrue(tracker.is_complete)
|
||
|
||
def test_empty_word_adds_nothing(self):
|
||
tracker = WordCompletionTracker("hello")
|
||
tracker.add_word_and_check_complete("")
|
||
self.assertFalse(tracker.is_complete)
|
||
|
||
def test_partial_word_completion(self):
|
||
"""Chars accumulate; completion happens mid-add_word_and_check_complete if count is reached."""
|
||
tracker = WordCompletionTracker("ab")
|
||
# "ab" normalizes to 2 chars; one word covering both at once
|
||
self.assertTrue(tracker.add_word_and_check_complete("ab"))
|
||
|
||
def test_word_with_extra_chars_completes(self):
|
||
"""A single verbose token can satisfy a longer expected text."""
|
||
tracker = WordCompletionTracker("Hi")
|
||
self.assertTrue(tracker.add_word_and_check_complete("Hieveryone"))
|
||
|
||
|
||
class TestWordCompletionTrackerRealisticSentences(unittest.TestCase):
|
||
# Sentence as it would appear in an AggregatedTextFrame from the LLM.
|
||
SENTENCE = "You're welcome! If you have any more questions or need further examples, feel free to ask. Have a great day! 😊"
|
||
|
||
# Words as a TTS word-timestamp service typically returns them:
|
||
# punctuation attached to the adjacent word, emoji absent (unspeakable).
|
||
TTS_WORDS = [
|
||
"You're",
|
||
"welcome!",
|
||
"If",
|
||
"you",
|
||
"have",
|
||
"any",
|
||
"more",
|
||
"questions",
|
||
"or",
|
||
"need",
|
||
"further",
|
||
"examples,",
|
||
"feel",
|
||
"free",
|
||
"to",
|
||
"ask.",
|
||
"Have",
|
||
"a",
|
||
"great",
|
||
"day!",
|
||
]
|
||
|
||
def test_completes_after_all_tts_words(self):
|
||
"""Feeding every TTS word in order must complete the tracker."""
|
||
tracker = WordCompletionTracker(self.SENTENCE)
|
||
results = [tracker.add_word_and_check_complete(w) for w in self.TTS_WORDS]
|
||
self.assertTrue(results[-1], "tracker should be complete after the last word")
|
||
|
||
def test_not_complete_before_last_word(self):
|
||
"""Tracker must not report complete before the final word is added."""
|
||
tracker = WordCompletionTracker(self.SENTENCE)
|
||
for word in self.TTS_WORDS[:-1]:
|
||
self.assertFalse(tracker.is_complete, f"should not be complete after '{word}'")
|
||
|
||
def test_last_word_triggers_completion(self):
|
||
"""Only the last add_word_and_check_complete call should return True."""
|
||
tracker = WordCompletionTracker(self.SENTENCE)
|
||
intermediate = [tracker.add_word_and_check_complete(w) for w in self.TTS_WORDS[:-1]]
|
||
self.assertFalse(any(intermediate))
|
||
self.assertTrue(tracker.add_word_and_check_complete(self.TTS_WORDS[-1]))
|
||
|
||
def test_emoji_in_expected_does_not_block_completion(self):
|
||
"""The 😊 at the end normalizes to '' so it adds no required chars."""
|
||
tracker = WordCompletionTracker(self.SENTENCE)
|
||
for word in self.TTS_WORDS:
|
||
tracker.add_word_and_check_complete(word)
|
||
self.assertTrue(tracker.is_complete)
|
||
|
||
def test_chunked_delivery_two_words_per_call(self):
|
||
"""Some TTS providers return multiple words in one timestamp event."""
|
||
tracker = WordCompletionTracker(self.SENTENCE)
|
||
pairs = [
|
||
self.TTS_WORDS[i] + " " + self.TTS_WORDS[i + 1]
|
||
for i in range(0, len(self.TTS_WORDS) - 1, 2)
|
||
]
|
||
# If word count is odd the last word is left alone.
|
||
if len(self.TTS_WORDS) % 2:
|
||
pairs.append(self.TTS_WORDS[-1])
|
||
for chunk in pairs:
|
||
tracker.add_word_and_check_complete(chunk)
|
||
self.assertTrue(tracker.is_complete)
|
||
|
||
def test_reset_and_replay(self):
|
||
"""After reset the same tracker can complete the same sentence again."""
|
||
tracker = WordCompletionTracker(self.SENTENCE)
|
||
for word in self.TTS_WORDS:
|
||
tracker.add_word_and_check_complete(word)
|
||
self.assertTrue(tracker.is_complete)
|
||
|
||
tracker.reset()
|
||
self.assertFalse(tracker.is_complete)
|
||
|
||
for word in self.TTS_WORDS:
|
||
tracker.add_word_and_check_complete(word)
|
||
self.assertTrue(tracker.is_complete)
|
||
|
||
def test_mid_sentence_is_not_complete(self):
|
||
"""Spot-check several points through the sentence."""
|
||
tracker = WordCompletionTracker(self.SENTENCE)
|
||
checkpoints = {4, 9, 14} # after 5th, 10th, 15th word (0-indexed)
|
||
for i, word in enumerate(self.TTS_WORDS[:-1]):
|
||
tracker.add_word_and_check_complete(word)
|
||
if i in checkpoints:
|
||
self.assertFalse(
|
||
tracker.is_complete,
|
||
f"should not be complete after {i + 1} words",
|
||
)
|
||
|
||
def test_short_sentence_word_by_word(self):
|
||
"""Smaller realistic sentence: each word added individually."""
|
||
sentence = "Of course! Here's a simple Python example."
|
||
words = ["Of", "course!", "Here's", "a", "simple", "Python", "example."]
|
||
tracker = WordCompletionTracker(sentence)
|
||
for word in words[:-1]:
|
||
self.assertFalse(tracker.add_word_and_check_complete(word))
|
||
self.assertTrue(tracker.add_word_and_check_complete(words[-1]))
|
||
|
||
def test_sentence_with_numbers(self):
|
||
"""Numbers are alphanumeric and must be counted in both directions."""
|
||
sentence = "There are 3 options available for you."
|
||
words = ["There", "are", "3", "options", "available", "for", "you."]
|
||
tracker = WordCompletionTracker(sentence)
|
||
for word in words[:-1]:
|
||
self.assertFalse(tracker.add_word_and_check_complete(word))
|
||
self.assertTrue(tracker.add_word_and_check_complete(words[-1]))
|
||
|
||
def test_credit_card_sentence_with_raw_consumed(self):
|
||
"""Test that get_raw_consumed returns 'code:' after completing the sentence and all words."""
|
||
sentence = "Here is a sample credit card number and a simple Python code:"
|
||
words = [
|
||
"Here",
|
||
"is",
|
||
"a",
|
||
"sample",
|
||
"credit",
|
||
"card",
|
||
"number",
|
||
"and",
|
||
"a",
|
||
"simple",
|
||
"Python",
|
||
"code:",
|
||
]
|
||
|
||
# Provide llm_text parameter so get_llm_consumed() works
|
||
tracker = WordCompletionTracker(sentence, llm_text=sentence)
|
||
|
||
# Add all words except the last one
|
||
for word in words[:-1]:
|
||
result = tracker.add_word_and_check_complete(word)
|
||
self.assertFalse(result, f"Should not be complete after adding '{word}'")
|
||
|
||
# Add the final word - should complete the tracker
|
||
result = tracker.add_word_and_check_complete(words[-1])
|
||
self.assertTrue(result, "Should be complete after adding the last word")
|
||
self.assertTrue(tracker.is_complete)
|
||
|
||
# get_raw_consumed should return "code:" for the last word
|
||
self.assertEqual(tracker.get_llm_consumed(), "code:")
|
||
|
||
def test_maori_culture_sentence(self):
|
||
"""Test completion with Māori culture sentence - last word should be 'culture.'"""
|
||
sentence = (
|
||
"The indigenous Māori people are a significant part of the population and culture."
|
||
)
|
||
words = [
|
||
"The",
|
||
"indigenous",
|
||
"Māori",
|
||
"people",
|
||
"are",
|
||
"a",
|
||
"significant",
|
||
"part",
|
||
"of",
|
||
"the",
|
||
"population",
|
||
"and",
|
||
"culture.",
|
||
]
|
||
tracker = WordCompletionTracker(sentence, llm_text=sentence)
|
||
|
||
# Add all words except the last one - should not complete
|
||
for word in words[:-1]:
|
||
result = tracker.add_word_and_check_complete(word)
|
||
self.assertFalse(result, f"Should not be complete after adding '{word}'")
|
||
|
||
# Add the final word "culture." - should complete the tracker
|
||
result = tracker.add_word_and_check_complete(words[-1])
|
||
self.assertTrue(result, "Should be complete after adding 'culture.'")
|
||
self.assertTrue(tracker.is_complete)
|
||
|
||
# get_raw_consumed should return "culture." for the last word
|
||
self.assertEqual(tracker.get_word_for_frame(), "culture.")
|
||
self.assertEqual(tracker.get_llm_consumed(), "culture.")
|
||
|
||
def test_geography_sentence_frame_word_and_raw_consumed_validation(self):
|
||
"""Test geography sentence word by word, validating _frame_word and _raw_consumed match expected values.
|
||
|
||
Sentence: 'Here are some key facts: **Geography:** - It consists mainly of two large islands:
|
||
the North Island and the South Island, as well as many smaller islands.'
|
||
|
||
This test validates that the received word, _frame_word, and _raw_consumed are exactly
|
||
what we expect for each word addition, especially in special cases with punctuation.
|
||
"""
|
||
sentence = "Here are some key facts: **Geography:** - It consists mainly of two large islands: the North Island and the South Island, as well as many smaller islands."
|
||
llm_text = f"<geography>{sentence}</geography>"
|
||
|
||
words = [
|
||
"Here",
|
||
"are",
|
||
"some",
|
||
"key",
|
||
"facts:",
|
||
"**Geography:**",
|
||
"-",
|
||
"It",
|
||
"consists",
|
||
"mainly",
|
||
"of",
|
||
"two",
|
||
"large",
|
||
"islands:",
|
||
"the",
|
||
"North",
|
||
"Island",
|
||
"and",
|
||
"the",
|
||
"South",
|
||
"Island,",
|
||
"as",
|
||
"well",
|
||
"as",
|
||
"many",
|
||
"smaller",
|
||
"islands.",
|
||
]
|
||
|
||
# Expected _frame_word for each word (should match the input word exactly)
|
||
expected_frame_words = [
|
||
"Here",
|
||
"are",
|
||
"some",
|
||
"key",
|
||
"facts:",
|
||
"**Geography:**",
|
||
"-",
|
||
"It",
|
||
"consists",
|
||
"mainly",
|
||
"of",
|
||
"two",
|
||
"large",
|
||
"islands:",
|
||
"the",
|
||
"North",
|
||
"Island",
|
||
"and",
|
||
"the",
|
||
"South",
|
||
"Island,",
|
||
"as",
|
||
"well",
|
||
"as",
|
||
"many",
|
||
"smaller",
|
||
"islands.",
|
||
]
|
||
|
||
# Expected _llm_consumed for each word (spans from llm_text)
|
||
expected_raw_consumed = [
|
||
"<geography>Here",
|
||
"are",
|
||
"some",
|
||
"key",
|
||
"facts:",
|
||
"**Geography:**",
|
||
"-",
|
||
"It",
|
||
"consists",
|
||
"mainly",
|
||
"of",
|
||
"two",
|
||
"large",
|
||
"islands:",
|
||
"the",
|
||
"North",
|
||
"Island",
|
||
"and",
|
||
"the",
|
||
"South",
|
||
"Island,",
|
||
"as",
|
||
"well",
|
||
"as",
|
||
"many",
|
||
"smaller",
|
||
"islands.</geography>",
|
||
]
|
||
|
||
tracker = WordCompletionTracker(sentence, llm_text=llm_text)
|
||
|
||
for i, word in enumerate(words):
|
||
is_complete = tracker.add_word_and_check_complete(word)
|
||
|
||
# Test 1: Validate _frame_word matches expected
|
||
actual_frame_word = tracker.get_word_for_frame()
|
||
expected_frame_word = expected_frame_words[i]
|
||
self.assertEqual(
|
||
actual_frame_word,
|
||
expected_frame_word,
|
||
f"Word {i + 1} '{word}': expected _frame_word '{expected_frame_word}', got '{actual_frame_word}'",
|
||
)
|
||
|
||
# Test 2: Validate _raw_consumed matches expected
|
||
actual_raw_consumed = tracker.get_llm_consumed()
|
||
expected_raw = expected_raw_consumed[i]
|
||
self.assertEqual(
|
||
actual_raw_consumed,
|
||
expected_raw,
|
||
f"Word {i + 1} '{word}': expected _raw_consumed '{expected_raw}', got '{actual_raw_consumed}'",
|
||
)
|
||
|
||
# Test 3: Validate completion status
|
||
if i == len(words) - 1:
|
||
self.assertTrue(is_complete, f"Should be complete after final word '{word}'")
|
||
else:
|
||
self.assertFalse(
|
||
is_complete, f"Should not be complete after word '{word}' (position {i + 1})"
|
||
)
|
||
|
||
# Test special punctuation cases individually
|
||
special_cases = [
|
||
("**Geography:**", "**Geography:**"),
|
||
("facts:", "facts:"),
|
||
("islands:", "islands:"),
|
||
("Island,", "Island,"),
|
||
("islands.", "islands."),
|
||
]
|
||
|
||
for word, expected_frame in special_cases:
|
||
tracker_special = WordCompletionTracker(word, llm_text=f"<test>{word}</test>")
|
||
tracker_special.add_word_and_check_complete(word)
|
||
|
||
actual_frame = tracker_special.get_word_for_frame()
|
||
self.assertEqual(
|
||
actual_frame,
|
||
expected_frame,
|
||
f"Special case '{word}': expected _frame_word '{expected_frame}', got '{actual_frame}'",
|
||
)
|
||
|
||
actual_raw = tracker_special.get_llm_consumed()
|
||
expected_raw_special = f"<test>{word}</test>"
|
||
self.assertEqual(
|
||
actual_raw,
|
||
expected_raw_special,
|
||
f"Special case '{word}': expected _raw_consumed '{expected_raw_special}', got '{actual_raw}'",
|
||
)
|
||
|
||
|
||
class TestWordCompletionTrackerWordBelongsHere(unittest.TestCase):
|
||
def test_belongs_when_word_is_prefix_of_remaining(self):
|
||
"""word_belongs_here returns True when word alnum chars match the start of remaining."""
|
||
tracker = WordCompletionTracker("4111 1111 1111 1111")
|
||
self.assertTrue(tracker.word_belongs_here("4111"))
|
||
|
||
def test_does_not_belong_when_word_mismatches_remaining(self):
|
||
"""word_belongs_here returns False when the word is clearly from a different slot."""
|
||
tracker = WordCompletionTracker("number is")
|
||
# '4' does not match 'n' (start of "numberis")
|
||
self.assertFalse(tracker.word_belongs_here("4111"))
|
||
|
||
def test_punctuation_present_in_expected_belongs(self):
|
||
"""Punctuation that appears in the remaining expected text belongs here."""
|
||
tracker = WordCompletionTracker("hello, world")
|
||
tracker.add_word_and_check_complete("hello")
|
||
# remaining expected = ", world" — comma is present, so it belongs
|
||
self.assertTrue(tracker.word_belongs_here(","))
|
||
|
||
def test_punctuation_absent_from_expected_does_not_belong(self):
|
||
"""Punctuation absent from the remaining expected text does not belong here."""
|
||
tracker = WordCompletionTracker("hello")
|
||
# "..." and "," are not in "hello", so they don't belong
|
||
self.assertFalse(tracker.word_belongs_here("..."))
|
||
self.assertFalse(tracker.word_belongs_here(","))
|
||
|
||
def test_empty_word_always_belongs(self):
|
||
"""An empty word token always belongs (empty string is a substring of any text)."""
|
||
tracker = WordCompletionTracker("hello")
|
||
self.assertTrue(tracker.word_belongs_here(""))
|
||
|
||
def test_emoji_in_expected_belongs(self):
|
||
"""An emoji present in the remaining expected text belongs to this frame."""
|
||
tracker = WordCompletionTracker("Hello 😊")
|
||
tracker.add_word_and_check_complete("Hello")
|
||
self.assertTrue(tracker.word_belongs_here("😊"))
|
||
|
||
def test_emoji_absent_from_expected_does_not_belong(self):
|
||
"""An emoji not in the remaining expected text is routed to the next frame."""
|
||
tracker = WordCompletionTracker("Hello world")
|
||
tracker.add_word_and_check_complete("Hello")
|
||
# 😊 is not in "Hello world", so it doesn't belong here
|
||
self.assertFalse(tracker.word_belongs_here("😊"))
|
||
|
||
def test_belongs_when_partial_prefix_matches(self):
|
||
"""Partial token (fewer chars than remaining) still passes if it is a prefix."""
|
||
tracker = WordCompletionTracker("hello world")
|
||
# remaining = "helloworld", word = "hel" — prefix matches
|
||
self.assertTrue(tracker.word_belongs_here("hel"))
|
||
|
||
def test_does_not_belong_when_no_remaining(self):
|
||
"""word_belongs_here returns False when the tracker is already complete."""
|
||
tracker = WordCompletionTracker("hi")
|
||
tracker.add_word_and_check_complete("hi")
|
||
self.assertFalse(tracker.word_belongs_here("extra"))
|
||
|
||
def test_belongs_advances_correctly_after_partial_consumption(self):
|
||
"""Remaining expected text shifts as words are consumed."""
|
||
tracker = WordCompletionTracker("hello world")
|
||
tracker.add_word_and_check_complete("hello")
|
||
# remaining is now "world"
|
||
self.assertTrue(tracker.word_belongs_here("world"))
|
||
self.assertFalse(tracker.word_belongs_here("hello"))
|
||
|
||
|
||
class TestWordCompletionTrackerOverflow(unittest.TestCase):
|
||
"""Words that span the boundary of two AggregatedTextFrames."""
|
||
|
||
def test_word_straddles_frame_boundary(self):
|
||
"""A word token that spans two frames splits at the alnum boundary."""
|
||
tracker = WordCompletionTracker("hello")
|
||
result = tracker.add_word_and_check_complete("helloworld")
|
||
self.assertTrue(result)
|
||
self.assertEqual(tracker.get_word_for_frame(), "hello")
|
||
self.assertEqual(tracker.get_overflow_word(), "world")
|
||
|
||
def test_no_overflow_when_word_fits_exactly(self):
|
||
"""A word that exactly fills remaining slots produces no overflow."""
|
||
tracker = WordCompletionTracker("hello")
|
||
tracker.add_word_and_check_complete("hello")
|
||
self.assertIsNone(tracker.get_overflow_word())
|
||
self.assertEqual(tracker.get_word_for_frame(), "hello")
|
||
|
||
def test_overflow_split_preserves_non_alnum_suffix(self):
|
||
"""The raw overflow word retains non-alnum chars after the split point."""
|
||
tracker = WordCompletionTracker("1111")
|
||
# "1111And" — 4 alnum chars for this frame, "And" as raw overflow
|
||
result = tracker.add_word_and_check_complete("1111And")
|
||
self.assertTrue(result)
|
||
self.assertEqual(tracker.get_word_for_frame(), "1111")
|
||
self.assertEqual(tracker.get_overflow_word(), "And")
|
||
|
||
def test_overflow_with_digits_splits_at_correct_position(self):
|
||
"""Split position is computed by alnum count, not byte offset."""
|
||
tracker = WordCompletionTracker("4111") # 4 alnum chars
|
||
tracker.add_word_and_check_complete("41111111")
|
||
self.assertEqual(tracker.get_word_for_frame(), "4111")
|
||
self.assertEqual(tracker.get_overflow_word(), "1111")
|
||
|
||
def test_overflow_flows_into_next_tracker(self):
|
||
"""Overflow word fed into the next tracker completes it correctly."""
|
||
tracker1 = WordCompletionTracker("hello")
|
||
tracker2 = WordCompletionTracker("world")
|
||
|
||
tracker1.add_word_and_check_complete("helloworld")
|
||
overflow = tracker1.get_overflow_word()
|
||
self.assertEqual(overflow, "world")
|
||
|
||
result = tracker2.add_word_and_check_complete(overflow)
|
||
self.assertTrue(result)
|
||
self.assertEqual(tracker2.get_word_for_frame(), "world")
|
||
self.assertIsNone(tracker2.get_overflow_word())
|
||
|
||
def test_overflow_with_digits_flows_into_next_tracker(self):
|
||
"""Realistic card-number overflow: last token spans two frame slots."""
|
||
tracker1 = WordCompletionTracker("4111 1111 1111 1111")
|
||
tracker2 = WordCompletionTracker("And your")
|
||
|
||
for word in ["4111", "1111", "1111"]:
|
||
tracker1.add_word_and_check_complete(word)
|
||
|
||
# "1111And" straddles the frame boundary
|
||
result = tracker1.add_word_and_check_complete("1111And")
|
||
self.assertTrue(result)
|
||
self.assertEqual(tracker1.get_word_for_frame(), "1111")
|
||
self.assertEqual(tracker1.get_overflow_word(), "And")
|
||
|
||
# Feed overflow into tracker2
|
||
result = tracker2.add_word_and_check_complete(tracker1.get_overflow_word())
|
||
self.assertFalse(result)
|
||
self.assertEqual(tracker2.get_word_for_frame(), "And")
|
||
|
||
result = tracker2.add_word_and_check_complete("your")
|
||
self.assertTrue(result)
|
||
|
||
def test_no_overflow_state_on_normal_word(self):
|
||
"""After a normal (non-straddling) word, overflow getters return None."""
|
||
tracker = WordCompletionTracker("hello world")
|
||
tracker.add_word_and_check_complete("hello")
|
||
self.assertIsNone(tracker.get_overflow_word())
|
||
|
||
|
||
class TestWordCompletionTrackerMissingWord(unittest.TestCase):
|
||
"""Force-complete: TTS provider drops a word-timestamp event."""
|
||
|
||
def test_force_complete_when_word_does_not_belong(self):
|
||
"""When word doesn't belong, the slot is force-completed and word becomes overflow."""
|
||
tracker = WordCompletionTracker("number is")
|
||
result = tracker.add_word_and_check_complete("4111")
|
||
self.assertTrue(result)
|
||
self.assertEqual(tracker.get_overflow_word(), "4111")
|
||
|
||
def test_force_complete_frame_word_is_full_remaining_expected(self):
|
||
"""Force-complete with no prior progress: frame_word is the entire expected text."""
|
||
tracker = WordCompletionTracker("number is")
|
||
tracker.add_word_and_check_complete("4111")
|
||
self.assertEqual(tracker.get_word_for_frame(), "number is")
|
||
|
||
def test_force_complete_frame_word_is_partial_remaining_expected(self):
|
||
"""Force-complete after partial progress: frame_word is only the unspoken suffix."""
|
||
tracker = WordCompletionTracker("number is")
|
||
tracker.add_word_and_check_complete("number") # consumes "number" (6 chars)
|
||
# Now remaining = " is"; "4111" doesn't belong
|
||
result = tracker.add_word_and_check_complete("4111")
|
||
self.assertTrue(result)
|
||
self.assertEqual(tracker.get_word_for_frame(), "is")
|
||
self.assertEqual(tracker.get_overflow_word(), "4111")
|
||
|
||
def test_force_complete_overflow_routes_to_next_tracker(self):
|
||
"""Overflow from a force-completed slot feeds the next tracker correctly."""
|
||
tracker1 = WordCompletionTracker("number is")
|
||
tracker2 = WordCompletionTracker("4111 1111")
|
||
|
||
tracker1.add_word_and_check_complete("4111") # force-completes tracker1
|
||
overflow = tracker1.get_overflow_word()
|
||
self.assertEqual(overflow, "4111")
|
||
|
||
self.assertFalse(tracker2.add_word_and_check_complete(overflow))
|
||
self.assertTrue(tracker2.add_word_and_check_complete("1111"))
|
||
|
||
def test_force_complete_after_several_normal_words(self):
|
||
"""Partial consumption followed by a mismatched word force-completes correctly."""
|
||
tracker = WordCompletionTracker("Your credit card number is")
|
||
for word in ["Your", "credit", "card"]:
|
||
tracker.add_word_and_check_complete(word)
|
||
# "4111" doesn't belong (remaining starts with "number...")
|
||
result = tracker.add_word_and_check_complete("4111")
|
||
self.assertTrue(result)
|
||
# frame_word should be the unspoken suffix including leading space
|
||
self.assertEqual(tracker.get_word_for_frame(), "number is")
|
||
self.assertEqual(tracker.get_overflow_word(), "4111")
|
||
|
||
def test_force_complete_marks_tracker_complete(self):
|
||
"""After force-complete, is_complete is True regardless of how many chars were spoken."""
|
||
tracker = WordCompletionTracker("hello world")
|
||
tracker.add_word_and_check_complete("hello")
|
||
self.assertFalse(tracker.is_complete)
|
||
tracker.add_word_and_check_complete("4111") # force-complete
|
||
self.assertTrue(tracker.is_complete)
|
||
|
||
def test_force_complete_does_not_affect_overflow_getters_on_normal_words(self):
|
||
"""overflow state is fresh per call; a normal word clears any prior force-complete state."""
|
||
tracker1 = WordCompletionTracker("ab")
|
||
tracker2 = WordCompletionTracker("cd")
|
||
|
||
# Force-complete tracker1 with a wrong word
|
||
tracker1.add_word_and_check_complete("xyz")
|
||
self.assertIsNotNone(tracker1.get_overflow_word())
|
||
|
||
# tracker2 receives "cd" normally — no overflow
|
||
tracker2.add_word_and_check_complete("cd")
|
||
self.assertIsNone(tracker2.get_overflow_word())
|
||
|
||
|
||
class TestWordCompletionTrackerLLMText(unittest.TestCase):
|
||
"""llm_text cursor tracking: each word maps back to its span in the original LLM text."""
|
||
|
||
def test_llm_text_none_when_no_llm_text_provided(self):
|
||
"""get_raw_consumed returns None when llm_text was not given."""
|
||
tracker = WordCompletionTracker("hello world")
|
||
tracker.add_word_and_check_complete("hello")
|
||
self.assertIsNone(tracker.get_llm_consumed())
|
||
|
||
def test_llm_text_single_word_no_tags(self):
|
||
"""Simple case: llm_text matches tts_text, no tags."""
|
||
tracker = WordCompletionTracker("hello world", llm_text="hello world")
|
||
tracker.add_word_and_check_complete("hello")
|
||
self.assertEqual(tracker.get_llm_consumed(), "hello")
|
||
tracker.add_word_and_check_complete("world")
|
||
self.assertEqual(tracker.get_llm_consumed(), "world")
|
||
|
||
def test_llm_text_opening_tag_included_in_first_word(self):
|
||
"""The opening tag preceding content is consumed with the first word."""
|
||
raw = "<card>4111 1111 1111 1111</card>"
|
||
tracker = WordCompletionTracker("4111 1111 1111 1111", llm_text=raw)
|
||
tracker.add_word_and_check_complete("4111")
|
||
self.assertEqual(tracker.get_llm_consumed(), "<card>4111")
|
||
|
||
def test_llm_text_tag_chars_not_counted_as_alnum(self):
|
||
"""Tag chars (c,a,r,d) inside <card> must not burn the alnum budget."""
|
||
# If tag chars were counted, "4111" would only consume "<card" (4 alnum budget
|
||
# spent on c,a,r,d) and the result would be wrong.
|
||
raw = "<card>4111</card>"
|
||
tracker = WordCompletionTracker("4111", llm_text=raw)
|
||
tracker.add_word_and_check_complete("4111")
|
||
# The full "<card>4111</card>" should be consumed (last word → consume all).
|
||
self.assertEqual(tracker.get_llm_consumed(), "<card>4111</card>")
|
||
|
||
def test_llm_text_four_words_with_card_tags(self):
|
||
"""Full card-number scenario: each word maps to its correct raw span."""
|
||
raw = "<card>4111 1111 1111 1111</card>"
|
||
tracker = WordCompletionTracker("4111 1111 1111 1111", llm_text=raw)
|
||
|
||
tracker.add_word_and_check_complete("4111")
|
||
self.assertEqual(tracker.get_llm_consumed(), "<card>4111")
|
||
|
||
tracker.add_word_and_check_complete("1111")
|
||
self.assertEqual(tracker.get_llm_consumed(), "1111")
|
||
|
||
tracker.add_word_and_check_complete("1111")
|
||
self.assertEqual(tracker.get_llm_consumed(), "1111")
|
||
|
||
tracker.add_word_and_check_complete("1111")
|
||
# Last word: consume all remaining raw text including closing tag.
|
||
self.assertEqual(tracker.get_llm_consumed(), "1111</card>")
|
||
self.assertTrue(tracker.is_complete)
|
||
|
||
def test_llm_text_closing_tag_consumed_with_last_word(self):
|
||
"""Closing tag is swept into the last word's raw_consumed, not lost."""
|
||
raw = "<card>hello</card>"
|
||
tracker = WordCompletionTracker("hello", llm_text=raw)
|
||
tracker.add_word_and_check_complete("hello")
|
||
self.assertEqual(tracker.get_llm_consumed(), "<card>hello</card>")
|
||
|
||
def test_llm_text_mid_frame_word_does_not_consume_closing_tag(self):
|
||
"""Non-final words stop before the closing tag; only the last sweeps it up."""
|
||
raw = "<card>hello world</card>"
|
||
tracker = WordCompletionTracker("hello world", llm_text=raw)
|
||
tracker.add_word_and_check_complete("hello")
|
||
self.assertEqual(tracker.get_llm_consumed(), "<card>hello")
|
||
tracker.add_word_and_check_complete("world")
|
||
self.assertEqual(tracker.get_llm_consumed(), "world</card>")
|
||
|
||
def test_llm_text_force_complete_consumes_all_remaining(self):
|
||
"""When force-complete fires, all remaining llm_text is consumed at once."""
|
||
raw = "<card>4111 1111 1111 1111</card>"
|
||
tracker = WordCompletionTracker("4111 1111 1111 1111", llm_text=raw)
|
||
tracker.add_word_and_check_complete("4111") # advances cursor to pos 10
|
||
# "WRONG" doesn't belong → force-complete
|
||
tracker.add_word_and_check_complete("WRONG")
|
||
self.assertEqual(tracker.get_llm_consumed(), "1111 1111 1111</card>")
|
||
self.assertTrue(tracker.is_complete)
|
||
|
||
def test_llm_text_overflow_word_last_raw_consumed_sweeps_tag(self):
|
||
"""When a straddling word completes the frame, closing tag is consumed with it."""
|
||
raw = "<card>4111 1111</card>"
|
||
tracker = WordCompletionTracker("4111 1111", llm_text=raw)
|
||
tracker.add_word_and_check_complete("4111")
|
||
self.assertEqual(tracker.get_llm_consumed(), "<card>4111")
|
||
|
||
# "1111And" straddles into the next frame
|
||
result = tracker.add_word_and_check_complete("1111And")
|
||
self.assertTrue(result)
|
||
self.assertEqual(tracker.get_word_for_frame(), "1111")
|
||
self.assertEqual(tracker.get_overflow_word(), "And")
|
||
# is_complete → consume all remaining raw including </card>
|
||
self.assertEqual(tracker.get_llm_consumed(), "1111</card>")
|
||
|
||
def test_llm_text_with_ssml_tags_in_expected(self):
|
||
"""expected_text with SSML tags: raw cursor still advances by content alnum count."""
|
||
# Cartesia might receive "<spell>4111 1111</spell>" as expected_text
|
||
# while <llm_text is "<card>4111 1111</card>"
|
||
raw = "<card>4111 1111</card>"
|
||
tracker = WordCompletionTracker("<spell>4111 1111</spell>", llm_text=raw)
|
||
# normalized expected = "41111111" (8 chars); both texts share the same alnum count.
|
||
tracker.add_word_and_check_complete("4111")
|
||
self.assertEqual(tracker.get_llm_consumed(), "<card>4111")
|
||
tracker.add_word_and_check_complete("1111")
|
||
self.assertEqual(tracker.get_llm_consumed(), "1111</card>")
|
||
self.assertTrue(tracker.is_complete)
|
||
|
||
|
||
class TestWordCompletionTrackerMultiFrameSimulation(unittest.TestCase):
|
||
"""End-to-end simulations of multiple AggregatedTextFrame slots."""
|
||
|
||
def test_two_plain_frames_sequential_words(self):
|
||
"""Normal two-frame flow: all words arrive in order with no drops or overflow."""
|
||
tracker1 = WordCompletionTracker("Your credit card")
|
||
tracker2 = WordCompletionTracker("number is 42")
|
||
|
||
for word in ["Your", "credit", "card"]:
|
||
tracker1.add_word_and_check_complete(word)
|
||
self.assertTrue(tracker1.is_complete)
|
||
|
||
for word in ["number", "is"]:
|
||
self.assertFalse(tracker2.add_word_and_check_complete(word))
|
||
self.assertTrue(tracker2.add_word_and_check_complete("42"))
|
||
|
||
def test_credit_card_full_flow_with_llm_text(self):
|
||
"""Full card-number scenario with llm_text tracking across two frames."""
|
||
tracker1 = WordCompletionTracker("Your credit card number is")
|
||
tracker2 = WordCompletionTracker(
|
||
"4111 1111 1111 1111",
|
||
llm_text="<card>4111 1111 1111 1111</card>",
|
||
)
|
||
|
||
for word in ["Your", "credit", "card", "number", "is"]:
|
||
tracker1.add_word_and_check_complete(word)
|
||
self.assertTrue(tracker1.is_complete)
|
||
|
||
tracker2.add_word_and_check_complete("4111")
|
||
self.assertEqual(tracker2.get_llm_consumed(), "<card>4111")
|
||
|
||
tracker2.add_word_and_check_complete("1111")
|
||
self.assertEqual(tracker2.get_llm_consumed(), "1111")
|
||
|
||
tracker2.add_word_and_check_complete("1111")
|
||
self.assertEqual(tracker2.get_llm_consumed(), "1111")
|
||
|
||
result = tracker2.add_word_and_check_complete("1111")
|
||
self.assertTrue(result)
|
||
self.assertEqual(tracker2.get_llm_consumed(), "1111</card>")
|
||
|
||
def test_missing_word_force_complete_then_next_frame(self):
|
||
"""Dropped word-timestamp: force-complete slot 1, route word to slot 2."""
|
||
tracker1 = WordCompletionTracker("Your credit card number is")
|
||
tracker2 = WordCompletionTracker(
|
||
"4111 1111 1111 1111",
|
||
llm_text="<card>4111 1111 1111 1111</card>",
|
||
)
|
||
|
||
# "Your", "credit", "card" arrive; then "number" and "is" are dropped.
|
||
for word in ["Your", "credit", "card"]:
|
||
tracker1.add_word_and_check_complete(word)
|
||
|
||
# "4111" arrives but belongs to tracker2 — force-completes tracker1.
|
||
result = tracker1.add_word_and_check_complete("4111")
|
||
self.assertTrue(result)
|
||
# frame_word carries the unspoken remainder so a TTSTextFrame can be emitted.
|
||
self.assertEqual(tracker1.get_word_for_frame(), "number is")
|
||
overflow = tracker1.get_overflow_word()
|
||
self.assertEqual(overflow, "4111")
|
||
|
||
# Route overflow into tracker2.
|
||
tracker2.add_word_and_check_complete(overflow)
|
||
self.assertEqual(tracker2.get_llm_consumed(), "<card>4111")
|
||
|
||
tracker2.add_word_and_check_complete("1111")
|
||
tracker2.add_word_and_check_complete("1111")
|
||
result = tracker2.add_word_and_check_complete("1111")
|
||
self.assertTrue(result)
|
||
self.assertEqual(tracker2.get_llm_consumed(), "1111</card>")
|
||
|
||
def test_overflow_word_spans_two_frames(self):
|
||
"""A word token that straddles two frame boundaries splits and routes correctly."""
|
||
tracker1 = WordCompletionTracker("4111 1111 1111 1111")
|
||
tracker2 = WordCompletionTracker("And your")
|
||
|
||
for word in ["4111", "1111", "1111"]:
|
||
tracker1.add_word_and_check_complete(word)
|
||
|
||
# "1111And" spans the frame boundary.
|
||
result = tracker1.add_word_and_check_complete("1111And")
|
||
self.assertTrue(result)
|
||
self.assertEqual(tracker1.get_word_for_frame(), "1111")
|
||
self.assertEqual(tracker1.get_overflow_word(), "And")
|
||
|
||
# Feed overflow into tracker2.
|
||
result = tracker2.add_word_and_check_complete(tracker1.get_overflow_word())
|
||
self.assertFalse(result)
|
||
self.assertEqual(tracker2.get_word_for_frame(), "And")
|
||
|
||
self.assertTrue(tracker2.add_word_and_check_complete("your"))
|
||
|
||
def test_overflow_with_llm_text_across_frames(self):
|
||
"""Raw text cursor is correct when the last straddling word completes the frame."""
|
||
tracker1 = WordCompletionTracker(
|
||
"4111 1111",
|
||
llm_text="<card>4111 1111</card>",
|
||
)
|
||
tracker2 = WordCompletionTracker("And")
|
||
|
||
tracker1.add_word_and_check_complete("4111")
|
||
self.assertEqual(tracker1.get_llm_consumed(), "<card>4111")
|
||
|
||
# "1111And" completes tracker1; "And" overflows to tracker2.
|
||
result = tracker1.add_word_and_check_complete("1111And")
|
||
self.assertTrue(result)
|
||
self.assertEqual(tracker1.get_llm_consumed(), "1111</card>")
|
||
self.assertEqual(tracker1.get_overflow_word(), "And")
|
||
|
||
result = tracker2.add_word_and_check_complete(tracker1.get_overflow_word())
|
||
self.assertTrue(result)
|
||
self.assertEqual(tracker2.get_word_for_frame(), "And")
|
||
|
||
def test_multiple_missing_words_single_force_complete(self):
|
||
"""Even if several consecutive words are dropped, one force-complete handles them all."""
|
||
# Frame expects "one two three"; TTS skips straight to "four" (next frame's word).
|
||
tracker1 = WordCompletionTracker("one two three")
|
||
tracker2 = WordCompletionTracker("four five")
|
||
|
||
# No words from tracker1 ever arrive; "four" is the first word seen.
|
||
result = tracker1.add_word_and_check_complete("four")
|
||
self.assertTrue(result)
|
||
self.assertEqual(tracker1.get_word_for_frame(), "one two three")
|
||
self.assertEqual(tracker1.get_overflow_word(), "four")
|
||
|
||
self.assertFalse(tracker2.add_word_and_check_complete("four"))
|
||
self.assertTrue(tracker2.add_word_and_check_complete("five"))
|
||
|
||
|
||
class TestWordCompletionTrackerEmojiInSentence(unittest.TestCase):
|
||
"""Word-by-word validation of frame_word and raw_consumed for sentences
|
||
containing emojis and special characters.
|
||
|
||
Covers:
|
||
- Emoji in the middle of a sentence whose llm_text has surrounding XML tags.
|
||
- Emoji adjacent to currency symbols and punctuation.
|
||
- Multiple emojis scattered through a sentence with no XML tags.
|
||
- Emoji that does NOT appear in llm_text (safeguard must return None).
|
||
"""
|
||
|
||
def _assert_word(self, tracker, word, expected_frame_word, expected_raw_consumed, idx):
|
||
msg = f"word {idx + 1} {repr(word)}"
|
||
self.assertEqual(tracker.get_word_for_frame(), expected_frame_word, f"{msg}: frame_word")
|
||
self.assertEqual(tracker.get_llm_consumed(), expected_raw_consumed, f"{msg}: raw_consumed")
|
||
|
||
def test_emoji_in_middle_with_raw_tags(self):
|
||
"""Sentence: 'Great job! 🎉 Well done.' wrapped in <praise> tags.
|
||
|
||
The emoji word gets its own raw_consumed span ('🎉') because the
|
||
chars_for_frame==0 branch finds it at the correct offset in llm_text.
|
||
Words that follow the emoji pick up immediately after it.
|
||
"""
|
||
sentence = "Great job! 🎉 Well done."
|
||
llm_text = f"<praise>{sentence}</praise>"
|
||
words = ["Great", "job!", "🎉", "Well", "done."]
|
||
|
||
expected_frame_words = ["Great", "job!", "🎉", "Well", "done."]
|
||
expected_raw_consumed = [
|
||
"<praise>Great", # opening tag consumed with first alnum word
|
||
"job!", # " job!" stripped
|
||
"🎉", # emoji found directly in llm_text
|
||
"Well", # " Well" stripped; emoji already consumed
|
||
"done.</praise>", # last word sweeps closing tag
|
||
]
|
||
|
||
tracker = WordCompletionTracker(sentence, llm_text=llm_text)
|
||
for i, word in enumerate(words):
|
||
is_complete = tracker.add_word_and_check_complete(word)
|
||
self._assert_word(tracker, word, expected_frame_words[i], expected_raw_consumed[i], i)
|
||
if i == len(words) - 1:
|
||
self.assertTrue(is_complete, f"should be complete after final word {repr(word)}")
|
||
else:
|
||
self.assertFalse(is_complete, f"should not be complete after {repr(word)}")
|
||
|
||
def test_emoji_with_currency_and_raw_tags(self):
|
||
"""Sentence: 'Pay $50 😊 today!' wrapped in <promo> tags.
|
||
|
||
Validates that currency symbols ($) are treated as non-alnum punctuation
|
||
and that the emoji token is correctly assigned its own raw span.
|
||
"""
|
||
sentence = "Pay $50 😊 today!"
|
||
llm_text = f"<promo>{sentence}</promo>"
|
||
words = ["Pay", "$50", "😊", "today!"]
|
||
|
||
expected_frame_words = ["Pay", "$50", "😊", "today!"]
|
||
expected_raw_consumed = [
|
||
"<promo>Pay", # opening tag consumed with first word
|
||
"$50", # " $50" stripped
|
||
"😊", # emoji found directly in llm_text
|
||
"today!</promo>", # last word sweeps closing tag
|
||
]
|
||
|
||
tracker = WordCompletionTracker(sentence, llm_text=llm_text)
|
||
for i, word in enumerate(words):
|
||
is_complete = tracker.add_word_and_check_complete(word)
|
||
self._assert_word(tracker, word, expected_frame_words[i], expected_raw_consumed[i], i)
|
||
if i == len(words) - 1:
|
||
self.assertTrue(is_complete, f"should be complete after final word {repr(word)}")
|
||
else:
|
||
self.assertFalse(is_complete, f"should not be complete after {repr(word)}")
|
||
|
||
def test_multiple_emojis_no_tags(self):
|
||
"""Sentence: 'Hello 😊 world 🎉 there!' with llm_text equal to the sentence.
|
||
|
||
Two emojis at different positions in the sentence; each gets its own
|
||
raw_consumed span and neither disrupts the alnum cursor for the words
|
||
that follow.
|
||
"""
|
||
sentence = "Hello 😊 world 🎉 there!"
|
||
words = ["Hello", "😊", "world", "🎉", "there!"]
|
||
|
||
expected_frame_words = ["Hello", "😊", "world", "🎉", "there!"]
|
||
expected_raw_consumed = [
|
||
"Hello", # no leading tag
|
||
"😊", # emoji found directly
|
||
"world", # " world" stripped
|
||
"🎉", # second emoji found directly
|
||
"there!", # " there!" stripped; last word
|
||
]
|
||
|
||
tracker = WordCompletionTracker(sentence, llm_text=sentence)
|
||
for i, word in enumerate(words):
|
||
is_complete = tracker.add_word_and_check_complete(word)
|
||
self._assert_word(tracker, word, expected_frame_words[i], expected_raw_consumed[i], i)
|
||
if i == len(words) - 1:
|
||
self.assertTrue(is_complete, f"should be complete after final word {repr(word)}")
|
||
else:
|
||
self.assertFalse(is_complete, f"should not be complete after {repr(word)}")
|
||
|
||
def test_emoji_absent_from_llm_text_returns_none(self):
|
||
"""Sentence: 'See you soon 😊' but llm_text omits the emoji.
|
||
|
||
The emoji is the last token the TTS returns for this frame. The alnum
|
||
content ('seeyousoon') is already complete after 'soon', so llm_text is
|
||
exhausted. The safeguard must set raw_consumed to None because the swept
|
||
span is empty and does not contain '😊'.
|
||
"""
|
||
sentence = "See you soon 😊"
|
||
llm_text = "<note>See you soon</note>"
|
||
words = ["See", "you", "soon", "😊"]
|
||
|
||
expected_frame_words = ["See", "you", "soon", "😊"]
|
||
expected_raw_consumed = [
|
||
"<note>See", # opening tag consumed with first word
|
||
"you", # " you" stripped
|
||
"soon</note>", # last alnum word sweeps closing tag
|
||
None, # safeguard: emoji not in exhausted llm_text
|
||
]
|
||
|
||
tracker = WordCompletionTracker(sentence, llm_text=llm_text)
|
||
for i, word in enumerate(words):
|
||
tracker.add_word_and_check_complete(word)
|
||
self._assert_word(tracker, word, expected_frame_words[i], expected_raw_consumed[i], i)
|
||
|
||
self.assertTrue(tracker.is_complete)
|
||
|
||
|
||
class TestWordCompletionTrackerRemainingText(unittest.TestCase):
|
||
"""Tests for get_remaining_tts_text() and get_remaining_llm_text().
|
||
|
||
These methods expose the unspoken suffix of tts_text / llm_text so that
|
||
_force_complete_spoken_slots() can emit a TTSTextFrame for any words that the
|
||
TTS provider dropped without sending a word-timestamp event.
|
||
"""
|
||
|
||
# ------------------------------------------------------------------ #
|
||
# get_remaining_text #
|
||
# ------------------------------------------------------------------ #
|
||
|
||
def test_remaining_text_before_any_words(self):
|
||
"""Before any words arrive, the full expected text is remaining."""
|
||
tracker = WordCompletionTracker("hello world")
|
||
self.assertEqual(tracker.get_remaining_tts_text(), "hello world")
|
||
|
||
def test_remaining_text_after_partial_consumption(self):
|
||
"""After one word is consumed, only the unspoken suffix is returned."""
|
||
tracker = WordCompletionTracker("hello world")
|
||
tracker.add_word_and_check_complete("hello")
|
||
self.assertEqual(tracker.get_remaining_tts_text(), "world")
|
||
|
||
def test_remaining_text_after_completion(self):
|
||
"""After full completion get_remaining_text returns an empty string."""
|
||
tracker = WordCompletionTracker("hello world")
|
||
tracker.add_word_and_check_complete("hello")
|
||
tracker.add_word_and_check_complete("world")
|
||
self.assertEqual(tracker.get_remaining_tts_text(), "")
|
||
|
||
def test_remaining_text_strips_whitespace(self):
|
||
"""Leading/trailing whitespace in the remaining portion is stripped."""
|
||
tracker = WordCompletionTracker("hello world")
|
||
tracker.add_word_and_check_complete("hello")
|
||
# The tts_text suffix before stripping is " world".
|
||
result = tracker.get_remaining_tts_text()
|
||
self.assertEqual(result, "world")
|
||
self.assertFalse(result.startswith(" "))
|
||
|
||
def test_remaining_text_preserves_punctuation(self):
|
||
"""Punctuation inside the remaining text is not removed."""
|
||
tracker = WordCompletionTracker("Hello, world!")
|
||
tracker.add_word_and_check_complete("Hello,")
|
||
self.assertEqual(tracker.get_remaining_tts_text(), "world!")
|
||
|
||
def test_remaining_text_after_force_complete(self):
|
||
"""Force-completing the slot exhausts expected_raw_pos; remaining becomes empty."""
|
||
tracker = WordCompletionTracker("number is")
|
||
tracker.add_word_and_check_complete("4111") # force-complete
|
||
self.assertTrue(tracker.is_complete)
|
||
self.assertEqual(tracker.get_remaining_tts_text(), "number is")
|
||
|
||
def test_remaining_text_resets_after_reset(self):
|
||
"""reset() resets expected_raw_pos; get_remaining_text returns the full text again."""
|
||
tracker = WordCompletionTracker("hello world")
|
||
tracker.add_word_and_check_complete("hello")
|
||
tracker.add_word_and_check_complete("world")
|
||
self.assertEqual(tracker.get_remaining_tts_text(), "")
|
||
tracker.reset()
|
||
self.assertEqual(tracker.get_remaining_tts_text(), "hello world")
|
||
|
||
# ------------------------------------------------------------------ #
|
||
# get_remaining_llm_text #
|
||
# ------------------------------------------------------------------ #
|
||
|
||
def test_remaining_llm_text_none_when_no_llm_text(self):
|
||
"""Without llm_text, get_remaining_llm_text always returns None."""
|
||
tracker = WordCompletionTracker("hello world")
|
||
self.assertIsNone(tracker.get_remaining_llm_text())
|
||
tracker.add_word_and_check_complete("hello")
|
||
self.assertIsNone(tracker.get_remaining_llm_text())
|
||
|
||
def test_remaining_llm_text_before_any_words(self):
|
||
"""Before any words, the entire llm_text is remaining."""
|
||
raw = "<card>4111 1111</card>"
|
||
tracker = WordCompletionTracker("4111 1111", llm_text=raw)
|
||
self.assertEqual(tracker.get_remaining_llm_text(), raw)
|
||
|
||
def test_remaining_llm_text_after_partial_consumption(self):
|
||
"""After one word, the raw cursor is past its span; only the tail is remaining."""
|
||
raw = "<card>4111 1111</card>"
|
||
tracker = WordCompletionTracker("4111 1111", llm_text=raw)
|
||
tracker.add_word_and_check_complete("4111")
|
||
# raw_pos moves past "<card>4111" (10 chars); remaining = " 1111</card>" → strip → "1111</card>"
|
||
self.assertEqual(tracker.get_remaining_llm_text(), "1111</card>")
|
||
|
||
def test_remaining_llm_text_none_after_completion(self):
|
||
"""After full completion all llm_text is consumed; remaining is None."""
|
||
raw = "<card>4111 1111</card>"
|
||
tracker = WordCompletionTracker("4111 1111", llm_text=raw)
|
||
tracker.add_word_and_check_complete("4111")
|
||
tracker.add_word_and_check_complete("1111")
|
||
self.assertIsNone(tracker.get_remaining_llm_text())
|
||
|
||
def test_remaining_llm_text_resets_after_reset(self):
|
||
"""reset() resets the raw cursor; get_remaining_llm_text returns the full llm_text again."""
|
||
raw = "<card>hello world</card>"
|
||
tracker = WordCompletionTracker("hello world", llm_text=raw)
|
||
tracker.add_word_and_check_complete("hello")
|
||
tracker.add_word_and_check_complete("world")
|
||
self.assertIsNone(tracker.get_remaining_llm_text())
|
||
tracker.reset()
|
||
self.assertEqual(tracker.get_remaining_llm_text(), raw)
|
||
|
||
def test_remaining_llm_text_strips_whitespace(self):
|
||
"""Leading/trailing whitespace in the remaining raw portion is stripped."""
|
||
raw = "<card>hello world</card>"
|
||
tracker = WordCompletionTracker("hello world", llm_text=raw)
|
||
tracker.add_word_and_check_complete("hello")
|
||
# raw_pos is past "<card>hello" (11 chars); raw suffix = " world</card>" → strip
|
||
result = tracker.get_remaining_llm_text()
|
||
self.assertIsNotNone(result)
|
||
self.assertFalse(result.startswith(" "))
|
||
self.assertEqual(result, "world</card>")
|
||
|
||
def test_remaining_llm_text_after_force_complete(self):
|
||
"""Force-complete sweeps all llm_text; remaining is None afterwards."""
|
||
raw = "<card>4111 1111 1111 1111</card>"
|
||
tracker = WordCompletionTracker("4111 1111 1111 1111", llm_text=raw)
|
||
tracker.add_word_and_check_complete("4111")
|
||
tracker.add_word_and_check_complete("WRONG") # force-complete
|
||
self.assertTrue(tracker.is_complete)
|
||
self.assertIsNone(tracker.get_remaining_llm_text())
|
||
|
||
|
||
class TestWordCompletionTrackerAccumulatedText(unittest.TestCase):
|
||
"""Tests for get_accumulated_tts_text() and get_accumulated_llm_text().
|
||
|
||
These methods return the prefix of tts_text / llm_text that has already been
|
||
consumed by word-timestamp events — the complement of get_remaining_tts_text()
|
||
and get_remaining_llm_text() which return the unspoken suffix.
|
||
"""
|
||
|
||
# ------------------------------------------------------------------ #
|
||
# get_accumulated_tts_text #
|
||
# ------------------------------------------------------------------ #
|
||
|
||
def test_accumulated_tts_text_before_any_words(self):
|
||
"""Before any words arrive, nothing has been consumed."""
|
||
tracker = WordCompletionTracker("hello world")
|
||
self.assertEqual(tracker.get_accumulated_tts_text(), "")
|
||
|
||
def test_accumulated_tts_text_after_partial_consumption(self):
|
||
"""After one word the accumulated prefix covers exactly that word."""
|
||
tracker = WordCompletionTracker("hello world")
|
||
tracker.add_word_and_check_complete("hello")
|
||
self.assertEqual(tracker.get_accumulated_tts_text(), "hello")
|
||
|
||
def test_accumulated_tts_text_after_completion(self):
|
||
"""After all words, the entire tts_text is accumulated."""
|
||
tracker = WordCompletionTracker("hello world")
|
||
tracker.add_word_and_check_complete("hello")
|
||
tracker.add_word_and_check_complete("world")
|
||
self.assertEqual(tracker.get_accumulated_tts_text(), "hello world")
|
||
|
||
def test_accumulated_tts_text_includes_trailing_punctuation(self):
|
||
"""The TTS cursor consumes trailing punctuation, so it appears in the accumulated span."""
|
||
tracker = WordCompletionTracker("Hello, world!")
|
||
tracker.add_word_and_check_complete("Hello,")
|
||
self.assertEqual(tracker.get_accumulated_tts_text(), "Hello,")
|
||
|
||
def test_accumulated_tts_text_complements_remaining(self):
|
||
"""accumulated + stripped-whitespace + remaining reconstructs tts_text."""
|
||
tts_text = "hello world"
|
||
tracker = WordCompletionTracker(tts_text)
|
||
tracker.add_word_and_check_complete("hello")
|
||
accumulated = tracker.get_accumulated_tts_text()
|
||
remaining = tracker.get_remaining_tts_text()
|
||
# The space between tokens is stripped from remaining, so re-join with one.
|
||
self.assertEqual(accumulated + " " + remaining, tts_text)
|
||
|
||
def test_accumulated_tts_text_after_force_complete(self):
|
||
"""Force-complete does not advance the TTS cursor; accumulated stays at the
|
||
point reached before the mismatched word arrived."""
|
||
tracker = WordCompletionTracker("number is")
|
||
tracker.add_word_and_check_complete("number")
|
||
tracker.add_word_and_check_complete("4111") # force-complete
|
||
self.assertEqual(tracker.get_accumulated_tts_text(), "number")
|
||
|
||
def test_accumulated_tts_text_resets_after_reset(self):
|
||
"""reset() resets the TTS cursor; accumulated returns empty string again."""
|
||
tracker = WordCompletionTracker("hello world")
|
||
tracker.add_word_and_check_complete("hello")
|
||
tracker.add_word_and_check_complete("world")
|
||
self.assertEqual(tracker.get_accumulated_tts_text(), "hello world")
|
||
tracker.reset()
|
||
self.assertEqual(tracker.get_accumulated_tts_text(), "")
|
||
|
||
# ------------------------------------------------------------------ #
|
||
# get_accumulated_llm_text #
|
||
# ------------------------------------------------------------------ #
|
||
|
||
def test_accumulated_llm_text_none_when_no_llm_text(self):
|
||
"""Without llm_text, get_accumulated_llm_text always returns None."""
|
||
tracker = WordCompletionTracker("hello world")
|
||
self.assertIsNone(tracker.get_accumulated_llm_text())
|
||
tracker.add_word_and_check_complete("hello")
|
||
self.assertIsNone(tracker.get_accumulated_llm_text())
|
||
|
||
def test_accumulated_llm_text_before_any_words(self):
|
||
"""Before any words, nothing has been consumed from llm_text."""
|
||
tracker = WordCompletionTracker("4111 1111", llm_text="<card>4111 1111</card>")
|
||
self.assertEqual(tracker.get_accumulated_llm_text(), "")
|
||
|
||
def test_accumulated_llm_text_after_partial_consumption(self):
|
||
"""After one word, the consumed prefix includes the opening tag and first word."""
|
||
tracker = WordCompletionTracker("4111 1111", llm_text="<card>4111 1111</card>")
|
||
tracker.add_word_and_check_complete("4111")
|
||
self.assertEqual(tracker.get_accumulated_llm_text(), "<card>4111")
|
||
|
||
def test_accumulated_llm_text_after_completion(self):
|
||
"""After all words, the entire llm_text is accumulated (closing tag included)."""
|
||
tracker = WordCompletionTracker("4111 1111", llm_text="<card>4111 1111</card>")
|
||
tracker.add_word_and_check_complete("4111")
|
||
tracker.add_word_and_check_complete("1111")
|
||
self.assertEqual(tracker.get_accumulated_llm_text(), "<card>4111 1111</card>")
|
||
|
||
def test_accumulated_llm_text_complements_remaining(self):
|
||
"""accumulated + stripped-whitespace + remaining reconstructs llm_text."""
|
||
llm = "<card>4111 1111</card>"
|
||
tracker = WordCompletionTracker("4111 1111", llm_text=llm)
|
||
tracker.add_word_and_check_complete("4111")
|
||
accumulated = tracker.get_accumulated_llm_text()
|
||
remaining = tracker.get_remaining_llm_text()
|
||
self.assertEqual(accumulated.rstrip() + " " + remaining, llm)
|
||
|
||
def test_accumulated_llm_text_after_force_complete(self):
|
||
"""Force-complete sweeps all remaining llm_text; accumulated covers the full string."""
|
||
llm = "<card>4111 1111 1111 1111</card>"
|
||
tracker = WordCompletionTracker("4111 1111 1111 1111", llm_text=llm)
|
||
tracker.add_word_and_check_complete("4111")
|
||
tracker.add_word_and_check_complete("WRONG") # force-complete sweeps remaining llm_text
|
||
self.assertEqual(tracker.get_accumulated_llm_text(), llm)
|
||
|
||
def test_accumulated_llm_text_resets_after_reset(self):
|
||
"""reset() resets the LLM cursor; accumulated returns empty string again."""
|
||
tracker = WordCompletionTracker("hello world", llm_text="<card>hello world</card>")
|
||
tracker.add_word_and_check_complete("hello")
|
||
tracker.add_word_and_check_complete("world")
|
||
self.assertEqual(tracker.get_accumulated_llm_text(), "<card>hello world</card>")
|
||
tracker.reset()
|
||
self.assertEqual(tracker.get_accumulated_llm_text(), "")
|
||
|
||
|
||
class TestWordCompletionTrackerUnicodeSymbolSubstitution(unittest.TestCase):
|
||
"""Guards against the regression where ElevenLabs maps Unicode symbols such
|
||
as '→' to ASCII punctuation like '-' in word-timestamp events.
|
||
|
||
The literal-substring check in _symbol_word_belongs_here failed to find '-'
|
||
inside '→ Santiago…', which caused premature force-completion of the whole
|
||
frame after 'Paulo' was consumed. The symbol-substitution fallback (check
|
||
whether the next non-space char in the TTS text is itself a non-alnum symbol)
|
||
must accept the substituted '-' and keep the tracker running normally.
|
||
"""
|
||
|
||
# The exact sentence that revealed the bug.
|
||
SENTENCE = "- Example route: São Paulo → Santiago (Chile) → Auckland (New Zealand)."
|
||
|
||
# Words as ElevenLabs emits them: both '→' chars are reported as '-'.
|
||
ELEVENLABS_WORDS = [
|
||
"-",
|
||
"Example",
|
||
"route:",
|
||
"São",
|
||
"Paulo",
|
||
"-", # ElevenLabs substitution for first →
|
||
"Santiago",
|
||
"(Chile)",
|
||
"-", # ElevenLabs substitution for second →
|
||
"Auckland",
|
||
"(New",
|
||
"Zealand).",
|
||
]
|
||
|
||
def test_arrow_as_dash_belongs_here_before_first_arrow(self):
|
||
"""After consuming up to 'Paulo', word_belongs_here('-') must return True.
|
||
|
||
The next character in the TTS text is '→'. ElevenLabs sends '-' instead;
|
||
the symbol-substitution fallback must accept it without force-completing.
|
||
"""
|
||
tracker = WordCompletionTracker(self.SENTENCE)
|
||
for word in ["-", "Example", "route:", "São", "Paulo"]:
|
||
tracker.add_word_and_check_complete(word)
|
||
self.assertTrue(tracker.word_belongs_here("-"))
|
||
|
||
def test_first_arrow_substitution_does_not_force_complete(self):
|
||
"""Processing '-' in place of the first '→' must not complete the tracker.
|
||
|
||
Without the fix, word_belongs_here('-') returned False here and the
|
||
force-complete path fired, marking the entire frame done prematurely.
|
||
"""
|
||
tracker = WordCompletionTracker(self.SENTENCE)
|
||
for word in ["-", "Example", "route:", "São", "Paulo"]:
|
||
tracker.add_word_and_check_complete(word)
|
||
result = tracker.add_word_and_check_complete("-")
|
||
self.assertFalse(result, "tracker must not be complete after the first → substitution")
|
||
self.assertFalse(tracker.is_complete)
|
||
|
||
def test_arrow_as_dash_belongs_here_before_second_arrow(self):
|
||
"""After consuming through '(Chile)', word_belongs_here('-') must return True again."""
|
||
tracker = WordCompletionTracker(self.SENTENCE)
|
||
for word in ["-", "Example", "route:", "São", "Paulo", "-", "Santiago", "(Chile)"]:
|
||
tracker.add_word_and_check_complete(word)
|
||
self.assertTrue(tracker.word_belongs_here("-"))
|
||
|
||
def test_completes_after_all_elevenlabs_words(self):
|
||
"""Feeding the full ElevenLabs word-timestamp stream must complete the tracker."""
|
||
tracker = WordCompletionTracker(self.SENTENCE)
|
||
results = [tracker.add_word_and_check_complete(w) for w in self.ELEVENLABS_WORDS]
|
||
self.assertTrue(results[-1], "tracker should be complete after the last word")
|
||
self.assertTrue(tracker.is_complete)
|
||
|
||
def test_only_last_word_triggers_completion(self):
|
||
"""No intermediate word should complete the tracker."""
|
||
tracker = WordCompletionTracker(self.SENTENCE)
|
||
for word in self.ELEVENLABS_WORDS[:-1]:
|
||
result = tracker.add_word_and_check_complete(word)
|
||
self.assertFalse(result, f"should not be complete after '{word}'")
|
||
self.assertTrue(tracker.add_word_and_check_complete(self.ELEVENLABS_WORDS[-1]))
|
||
|
||
|
||
class TestWordCompletionTrackerCJK(unittest.TestCase):
|
||
"""Completion tracking for CJK scripts: Korean, Japanese, and Chinese.
|
||
|
||
Korean words are fed one at a time (Cartesia returns each Hangul word as a
|
||
separate timestamp event). Japanese and Chinese characters are fed as
|
||
combined groups (Cartesia merges all chars in one timestamp message into a
|
||
single token before calling add_word_timestamps).
|
||
"""
|
||
|
||
# --- Korean ---
|
||
|
||
def test_korean_word_by_word_completion(self):
|
||
"""Korean words fed one at a time complete the tracker correctly."""
|
||
sentence = "저는 여러분의 AI 어시스턴트입니다."
|
||
words = ["저는", "여러분의", "AI", "어시스턴트입니다."]
|
||
tracker = WordCompletionTracker(sentence)
|
||
for word in words[:-1]:
|
||
self.assertFalse(tracker.add_word_and_check_complete(word))
|
||
self.assertTrue(tracker.add_word_and_check_complete(words[-1]))
|
||
|
||
def test_korean_normalized_char_count_matches_raw_alnum(self):
|
||
"""Each Hangul syllable must normalize to exactly one char.
|
||
|
||
The NFKD decomposition would expand each syllable into 2-3 conjoining
|
||
jamo, making the normalized length much larger than the raw alnum count
|
||
and breaking the _advance_by_alnums cursor. NFD-per-char normalization
|
||
must keep each syllable as a single character.
|
||
"""
|
||
samples = ["저는여러분의", "안녕하세요", "어시스턴트"]
|
||
for text in samples:
|
||
raw_count = sum(1 for c in text if c.isalnum())
|
||
norm_count = len(WordCompletionTracker._normalize(text))
|
||
self.assertEqual(
|
||
norm_count,
|
||
raw_count,
|
||
f"_normalize({text!r}): got {norm_count} chars, want {raw_count}",
|
||
)
|
||
|
||
def test_korean_force_complete_remaining_text_is_correct(self):
|
||
"""After the first Korean word the TTS cursor is at the right position.
|
||
|
||
get_remaining_tts_text() must return the unspoken suffix verbatim so
|
||
force_complete can emit a correct TTSTextFrame for it.
|
||
"""
|
||
sentence = "저는 여러분의 AI 어시스턴트입니다."
|
||
tracker = WordCompletionTracker(sentence)
|
||
tracker.add_word_and_check_complete("저는")
|
||
self.assertEqual(tracker.get_remaining_tts_text(), "여러분의 AI 어시스턴트입니다.")
|
||
|
||
def test_korean_mixed_with_latin(self):
|
||
"""Latin tokens (e.g. 'AI') embedded in Korean text are handled correctly."""
|
||
sentence = "AI 어시스턴트입니다."
|
||
tracker = WordCompletionTracker(sentence)
|
||
self.assertFalse(tracker.add_word_and_check_complete("AI"))
|
||
self.assertTrue(tracker.add_word_and_check_complete("어시스턴트입니다."))
|
||
|
||
def test_korean_word_belongs_here(self):
|
||
"""word_belongs_here distinguishes Korean words based on remaining content."""
|
||
tracker = WordCompletionTracker("저는 여러분의")
|
||
self.assertTrue(tracker.word_belongs_here("저는"))
|
||
# "여러분의" starts with chars that follow "저는", so before consuming
|
||
# "저는" the next word doesn't belong here yet.
|
||
self.assertFalse(tracker.word_belongs_here("여러분의"))
|
||
|
||
tracker.add_word_and_check_complete("저는")
|
||
self.assertTrue(tracker.word_belongs_here("여러분의"))
|
||
|
||
# --- Japanese ---
|
||
|
||
def test_japanese_single_combined_group_completes(self):
|
||
"""A single Cartesia-style combined group (all chars merged) completes the frame."""
|
||
# Cartesia merges ["こ","ん","に","ち","は","。"] into "こんにちは。"
|
||
tracker = WordCompletionTracker("こんにちは。")
|
||
self.assertTrue(tracker.add_word_and_check_complete("こんにちは。"))
|
||
|
||
def test_japanese_multiple_groups_for_one_frame(self):
|
||
"""Two Cartesia timestamp groups for one Japanese frame complete in sequence."""
|
||
sentence = "こんにちは、私はあなたの"
|
||
tracker = WordCompletionTracker(sentence)
|
||
self.assertFalse(tracker.add_word_and_check_complete("こんにちは、私"))
|
||
self.assertTrue(tracker.add_word_and_check_complete("はあなたの"))
|
||
|
||
def test_japanese_force_complete_remaining_text(self):
|
||
"""After the first Japanese group the cursor sits at the right position."""
|
||
sentence = "こんにちは、私はあなたの"
|
||
tracker = WordCompletionTracker(sentence)
|
||
tracker.add_word_and_check_complete("こんにちは、私")
|
||
self.assertEqual(tracker.get_remaining_tts_text(), "はあなたの")
|
||
|
||
def test_japanese_punctuation_not_counted_toward_completion(self):
|
||
"""Japanese punctuation (。、) is not alphanumeric and must not block completion."""
|
||
tracker = WordCompletionTracker("こんにちは。")
|
||
# "こんにちは" covers all 5 alnum chars; "。" adds 0.
|
||
self.assertTrue(tracker.add_word_and_check_complete("こんにちは。"))
|
||
|
||
# --- Chinese ---
|
||
|
||
def test_chinese_single_combined_group_completes(self):
|
||
"""A single Cartesia-style combined Chinese group completes the frame."""
|
||
# Cartesia merges ["你","好",",","我","是"] into "你好,我是"
|
||
tracker = WordCompletionTracker("你好,我是")
|
||
self.assertTrue(tracker.add_word_and_check_complete("你好,我是"))
|
||
|
||
def test_chinese_multiple_groups_for_one_frame(self):
|
||
"""Two Cartesia timestamp groups for one Chinese frame complete in sequence."""
|
||
sentence = "你好,我是你的智能"
|
||
tracker = WordCompletionTracker(sentence)
|
||
self.assertFalse(tracker.add_word_and_check_complete("你好,我是"))
|
||
self.assertTrue(tracker.add_word_and_check_complete("你的智能"))
|
||
|
||
def test_chinese_force_complete_remaining_text(self):
|
||
"""After the first Chinese group the cursor sits at the right position."""
|
||
sentence = "你好,我是你的智能"
|
||
tracker = WordCompletionTracker(sentence)
|
||
tracker.add_word_and_check_complete("你好,我是")
|
||
self.assertEqual(tracker.get_remaining_tts_text(), "你的智能")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
unittest.main()
|