Make the mechanism of adding spaces when concatenating TTS (or speech-to-speech LLM) output text explicit and deterministic, rather than heuristic-based.

This fixes a bug where spaces were sometimes missing from assistant messages in context.
This commit is contained in:
Paul Kompfner
2025-11-10 12:28:40 -05:00
parent c2ce143e6c
commit 913194844e
10 changed files with 113 additions and 44 deletions

View File

@@ -11,6 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `LivekitFrameSerializer` has been removed. Use `LiveKitTransport` instead.
### Fixed
- Fixed a bug related to `LLMAssistantAggregator` where spaces were sometimes
missing from assistant messages in context.
## [0.0.93] - 2025-11-07
### Added

View File

@@ -330,10 +330,18 @@ class TextFrame(DataFrame):
text: str
skip_tts: bool = field(init=False)
# Whether any necessary inter-frame (leading/trailing) spaces are already
# included in the text.
# NOTE: Ideally this would be available at init time with a default value,
# but that would impact how subclasses can be initialized (it would require
# mandatory fields of theirs to have defaults to preserve
# non-default-before-default argument order)
includes_inter_frame_spaces: bool = field(init=False)
def __post_init__(self):
super().__post_init__()
self.skip_tts = False
self.includes_inter_frame_spaces = False
def __str__(self):
pts = format_pts(self.pts)

View File

@@ -92,6 +92,14 @@ class LLMContextAggregator(FrameProcessor):
self._aggregation: List[str] = []
# Whether to add spaces between text parts.
# (Currently only used by LLMAssistantAggregator, but could be expanded
# to LLMUserAggregator in the future if needed; that would require
# additional work since LLMUserAggregator currently trims spaces from
# incoming frames before determining whether it "really" received any
# text).
self._add_spaces = True
@property
def messages(self) -> List[LLMContextMessage]:
"""Get messages from the LLM context.
@@ -183,7 +191,7 @@ class LLMContextAggregator(FrameProcessor):
Returns:
The concatenated aggregation string.
"""
return concatenate_aggregated_text(self._aggregation)
return concatenate_aggregated_text(self._aggregation, self._add_spaces)
class LLMUserAggregator(LLMContextAggregator):
@@ -813,6 +821,10 @@ class LLMAssistantAggregator(LLMContextAggregator):
if len(frame.text) == 0:
return
# Track whether we need to add spaces between text parts
# Assumption: we can just keep track of the latest frame's value
self._add_spaces = not frame.includes_inter_frame_spaces
self._aggregation.append(frame.text)
def _context_updated_task_finished(self, task: asyncio.Task):

View File

@@ -107,7 +107,9 @@ class LangchainProcessor(FrameProcessor):
{self._transcript_key: text},
config={"configurable": {"session_id": self._participant_id}},
):
await self.push_frame(TextFrame(self.__get_token_value(token)))
frame = TextFrame(self.__get_token_value(token))
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
except GeneratorExit:
logger.warning(f"{self} generator was closed prematurely")
except Exception as e:

View File

@@ -101,6 +101,12 @@ class AssistantTranscriptProcessor(BaseTranscriptProcessor):
self._current_text_parts: List[str] = []
self._aggregation_start_time: Optional[str] = None
# Whether to add spaces between text parts.
# (The use of this could be expanded to the UserTranscriptProcessor in
# the future if needed; currently the UserTranscriptProcessor assumes
# that user transcription frames do not need aggregation).
self._add_spaces = True
async def _emit_aggregated_text(self):
"""Aggregates and emits text fragments as a transcript message.
@@ -141,7 +147,7 @@ class AssistantTranscriptProcessor(BaseTranscriptProcessor):
Result: "Hello there how are you"
"""
if self._current_text_parts and self._aggregation_start_time:
content = concatenate_aggregated_text(self._current_text_parts)
content = concatenate_aggregated_text(self._current_text_parts, self._add_spaces)
if content:
logger.trace(f"Emitting aggregated assistant message: {content}")
message = TranscriptionMessage(
@@ -185,6 +191,10 @@ class AssistantTranscriptProcessor(BaseTranscriptProcessor):
if not self._aggregation_start_time:
self._aggregation_start_time = time_now_iso8601()
# Track whether we need to add spaces between text parts
# Assumption: we can just keep track of the latest frame's value
self._add_spaces = not frame.includes_inter_frame_spaces
self._current_text_parts.append(frame.text)
# Push frame.

View File

@@ -1453,7 +1453,10 @@ class GeminiLiveLLMService(LLMService):
self._bot_text_buffer += text
self._search_result_buffer += text # Also accumulate for grounding
await self.push_frame(LLMTextFrame(text=text))
frame = LLMTextFrame(text=text)
# Gemini Live text already includes any necessary inter-chunk spaces
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
# Check for grounding metadata in server content
if msg.server_content and msg.server_content.grounding_metadata:
@@ -1645,7 +1648,11 @@ class GeminiLiveLLMService(LLMService):
await self.push_frame(TTSStartedFrame())
await self.push_frame(LLMFullResponseStartFrame())
await self.push_frame(TTSTextFrame(text=text))
frame = TTSTextFrame(text=text)
# Gemini Live text already includes any necessary inter-chunk spaces
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
async def _handle_msg_grounding_metadata(self, message: LiveServerMessage):
"""Handle dedicated grounding metadata messages."""

View File

@@ -679,13 +679,19 @@ class OpenAIRealtimeLLMService(LLMService):
# We receive text deltas (as opposed to audio transcript deltas) when
# the output modality is "text"
if evt.delta:
await self.push_frame(LLMTextFrame(evt.delta))
frame = LLMTextFrame(evt.delta)
# OpenAI Realtime text already includes any necessary inter-chunk spaces
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
async def _handle_evt_audio_transcript_delta(self, evt):
# We receive audio transcript deltas (as opposed to text deltas) when
# the output modality is "audio" (the default)
if evt.delta:
await self.push_frame(TTSTextFrame(evt.delta))
frame = TTSTextFrame(evt.delta)
# OpenAI Realtime text already includes any necessary inter-chunk spaces
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
async def _handle_evt_function_call_arguments_done(self, evt):
"""Handle completion of function call arguments.

View File

@@ -198,7 +198,7 @@ def parse_start_end_tags(
return (None, current_tag_index)
def concatenate_aggregated_text(text_parts: List[str]) -> str:
def concatenate_aggregated_text(text_parts: List[str], add_spaces: bool) -> str:
"""Concatenate a list of text parts into a single string.
This function joins the provided list of text parts into a single string,
@@ -209,25 +209,14 @@ def concatenate_aggregated_text(text_parts: List[str]) -> str:
Args:
text_parts: A list of strings representing parts of text to concatenate.
add_spaces: Whether to add spaces between text parts during concatenation.
Returns:
A single concatenated string.
"""
# Check specifically for space characters, previously isspace() was used
# but that includes all whitespace characters (e.g. \n), not just spaces.
has_leading_spaces = any(part and part[0] == " " for part in text_parts[1:])
has_trailing_spaces = any(part and part[-1] == " " for part in text_parts[:-1])
# If there are embedded spaces in the fragments, use direct concatenation
contains_spacing_between_fragments = has_leading_spaces or has_trailing_spaces
# Apply corresponding joining method
if contains_spacing_between_fragments:
# Fragments already have spacing - just concatenate
result = "".join(text_parts)
else:
# Word-by-word fragments - join with spaces
result = " ".join(text_parts)
# Concatenate text parts with or without spaces based on the flag
separator = " " if add_spaces else ""
result = separator.join(text_parts)
# Clean up any excessive whitespace
result = result.strip()

View File

@@ -35,6 +35,7 @@ from pipecat.frames.frames import (
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineParams
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response import (
LLMAssistantAggregatorParams,
LLMUserAggregatorParams,
@@ -651,12 +652,20 @@ class BaseTestAssistantContextAggregator:
aggregator = self.AGGREGATOR_CLASS(
context, params=self.create_assistant_aggregator_params(expect_stripped_words=False)
)
# The newer LLMAssistantAggregator expects TextFrames to declare
# when they include inter-frame spaces.
def make_text_frame(text: str) -> TextFrame:
frame = TextFrame(text=text)
frame.includes_inter_frame_spaces = True
return frame
frames_to_send = [
LLMFullResponseStartFrame(),
TextFrame(text="Hello "),
TextFrame(text="Pipecat. "),
TextFrame(text="How are "),
TextFrame(text="you?"),
make_text_frame("Hello "),
make_text_frame("Pipecat. "),
make_text_frame("How are "),
make_text_frame("you?"),
LLMFullResponseEndFrame(),
]
expected_down_frames = [*self.EXPECTED_CONTEXT_FRAMES]
@@ -697,14 +706,22 @@ class BaseTestAssistantContextAggregator:
aggregator = self.AGGREGATOR_CLASS(
context, params=self.create_assistant_aggregator_params(expect_stripped_words=False)
)
# The newer LLMAssistantAggregator expects TextFrames to declare
# when they include inter-frame spaces.
def make_text_frame(text: str) -> TextFrame:
frame = TextFrame(text=text)
frame.includes_inter_frame_spaces = True
return frame
frames_to_send = [
LLMFullResponseStartFrame(),
TextFrame(text="Hello "),
TextFrame(text="Pipecat."),
make_text_frame("Hello "),
make_text_frame("Pipecat."),
LLMFullResponseEndFrame(),
LLMFullResponseStartFrame(),
TextFrame(text="How are "),
TextFrame(text="you?"),
make_text_frame(text="How are "),
make_text_frame(text="you?"),
LLMFullResponseEndFrame(),
]
expected_down_frames = [*self.EXPECTED_CONTEXT_FRAMES, *self.EXPECTED_CONTEXT_FRAMES]
@@ -724,16 +741,24 @@ class BaseTestAssistantContextAggregator:
aggregator = self.AGGREGATOR_CLASS(
context, params=self.create_assistant_aggregator_params(expect_stripped_words=False)
)
# The newer LLMAssistantAggregator expects TextFrames to declare
# when they include inter-frame spaces.
def make_text_frame(text: str) -> TextFrame:
frame = TextFrame(text=text)
frame.includes_inter_frame_spaces = True
return frame
frames_to_send = [
LLMFullResponseStartFrame(),
TextFrame(text="Hello "),
TextFrame(text="Pipecat."),
make_text_frame("Hello "),
make_text_frame("Pipecat."),
LLMFullResponseEndFrame(),
SleepFrame(AGGREGATION_SLEEP),
InterruptionFrame(),
LLMFullResponseStartFrame(),
TextFrame(text="How are "),
TextFrame(text="you?"),
make_text_frame("How are "),
make_text_frame("you?"),
LLMFullResponseEndFrame(),
]
expected_down_frames = [
@@ -969,7 +994,7 @@ class TestOpenAIAssistantContextAggregator(
class TestLLMAssistantAggregator(
BaseTestAssistantContextAggregator, unittest.IsolatedAsyncioTestCase
):
CONTEXT_CLASS = OpenAILLMContext
CONTEXT_CLASS = LLMContext
AGGREGATOR_CLASS = LLMAssistantAggregator
EXPECTED_CONTEXT_FRAMES = [LLMContextFrame, LLMContextAssistantTimestampFrame]

View File

@@ -438,17 +438,22 @@ class TestUserTranscriptProcessor(unittest.IsolatedAsyncioTestCase):
received_updates.append(frame)
# Test the specific pattern shared
def make_tts_text_frame(text: str) -> TTSTextFrame:
frame = TTSTextFrame(text=text)
frame.includes_inter_frame_spaces = True
return frame
frames_to_send = [
BotStartedSpeakingFrame(),
SleepFrame(),
TTSTextFrame(text="Hello"),
TTSTextFrame(text=" there"),
TTSTextFrame(text="!"),
TTSTextFrame(text=" How"),
TTSTextFrame(text="'s"),
TTSTextFrame(text=" it"),
TTSTextFrame(text=" going"),
TTSTextFrame(text="?"),
make_tts_text_frame("Hello"),
make_tts_text_frame(" there"),
make_tts_text_frame("!"),
make_tts_text_frame(" How"),
make_tts_text_frame("'s"),
make_tts_text_frame(" it"),
make_tts_text_frame(" going"),
make_tts_text_frame("?"),
BotStoppedSpeakingFrame(),
]