From 913194844e430a3056bd1272e87f450d42706499 Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Mon, 10 Nov 2025 12:28:40 -0500 Subject: [PATCH] Make the mechanism of adding spaces when concatenating TTS (or speech-to-speech LLM) output text explicit and deterministic, rather than heuristic-based. This fixes a bug where spaces were sometimes missing from assistant messages in context. --- CHANGELOG.md | 5 ++ src/pipecat/frames/frames.py | 8 +++ .../aggregators/llm_response_universal.py | 14 ++++- .../processors/frameworks/langchain.py | 4 +- .../processors/transcript_processor.py | 12 ++++- .../services/google/gemini_live/llm.py | 11 +++- src/pipecat/services/openai/realtime/llm.py | 10 +++- src/pipecat/utils/string.py | 21 ++------ tests/test_context_aggregators.py | 51 ++++++++++++++----- tests/test_transcript_processor.py | 21 +++++--- 10 files changed, 113 insertions(+), 44 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a2d000b0..afa2a5651 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `LivekitFrameSerializer` has been removed. Use `LiveKitTransport` instead. +### Fixed + +- Fixed a bug related to `LLMAssistantAggregator` where spaces were sometimes + missing from assistant messages in context. + ## [0.0.93] - 2025-11-07 ### Added diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 3e92b8480..6f48f79f7 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -330,10 +330,18 @@ class TextFrame(DataFrame): text: str skip_tts: bool = field(init=False) + # Whether any necessary inter-frame (leading/trailing) spaces are already + # included in the text. + # NOTE: Ideally this would be available at init time with a default value, + # but that would impact how subclasses can be initialized (it would require + # mandatory fields of theirs to have defaults to preserve + # non-default-before-default argument order) + includes_inter_frame_spaces: bool = field(init=False) def __post_init__(self): super().__post_init__() self.skip_tts = False + self.includes_inter_frame_spaces = False def __str__(self): pts = format_pts(self.pts) diff --git a/src/pipecat/processors/aggregators/llm_response_universal.py b/src/pipecat/processors/aggregators/llm_response_universal.py index 882428a6e..d4d9ad7da 100644 --- a/src/pipecat/processors/aggregators/llm_response_universal.py +++ b/src/pipecat/processors/aggregators/llm_response_universal.py @@ -92,6 +92,14 @@ class LLMContextAggregator(FrameProcessor): self._aggregation: List[str] = [] + # Whether to add spaces between text parts. + # (Currently only used by LLMAssistantAggregator, but could be expanded + # to LLMUserAggregator in the future if needed; that would require + # additional work since LLMUserAggregator currently trims spaces from + # incoming frames before determining whether it "really" received any + # text). + self._add_spaces = True + @property def messages(self) -> List[LLMContextMessage]: """Get messages from the LLM context. @@ -183,7 +191,7 @@ class LLMContextAggregator(FrameProcessor): Returns: The concatenated aggregation string. """ - return concatenate_aggregated_text(self._aggregation) + return concatenate_aggregated_text(self._aggregation, self._add_spaces) class LLMUserAggregator(LLMContextAggregator): @@ -813,6 +821,10 @@ class LLMAssistantAggregator(LLMContextAggregator): if len(frame.text) == 0: return + # Track whether we need to add spaces between text parts + # Assumption: we can just keep track of the latest frame's value + self._add_spaces = not frame.includes_inter_frame_spaces + self._aggregation.append(frame.text) def _context_updated_task_finished(self, task: asyncio.Task): diff --git a/src/pipecat/processors/frameworks/langchain.py b/src/pipecat/processors/frameworks/langchain.py index 97a6ce343..b8a472a3d 100644 --- a/src/pipecat/processors/frameworks/langchain.py +++ b/src/pipecat/processors/frameworks/langchain.py @@ -107,7 +107,9 @@ class LangchainProcessor(FrameProcessor): {self._transcript_key: text}, config={"configurable": {"session_id": self._participant_id}}, ): - await self.push_frame(TextFrame(self.__get_token_value(token))) + frame = TextFrame(self.__get_token_value(token)) + frame.includes_inter_frame_spaces = True + await self.push_frame(frame) except GeneratorExit: logger.warning(f"{self} generator was closed prematurely") except Exception as e: diff --git a/src/pipecat/processors/transcript_processor.py b/src/pipecat/processors/transcript_processor.py index 13b2bb97f..0b6f1b5bb 100644 --- a/src/pipecat/processors/transcript_processor.py +++ b/src/pipecat/processors/transcript_processor.py @@ -101,6 +101,12 @@ class AssistantTranscriptProcessor(BaseTranscriptProcessor): self._current_text_parts: List[str] = [] self._aggregation_start_time: Optional[str] = None + # Whether to add spaces between text parts. + # (The use of this could be expanded to the UserTranscriptProcessor in + # the future if needed; currently the UserTranscriptProcessor assumes + # that user transcription frames do not need aggregation). + self._add_spaces = True + async def _emit_aggregated_text(self): """Aggregates and emits text fragments as a transcript message. @@ -141,7 +147,7 @@ class AssistantTranscriptProcessor(BaseTranscriptProcessor): Result: "Hello there how are you" """ if self._current_text_parts and self._aggregation_start_time: - content = concatenate_aggregated_text(self._current_text_parts) + content = concatenate_aggregated_text(self._current_text_parts, self._add_spaces) if content: logger.trace(f"Emitting aggregated assistant message: {content}") message = TranscriptionMessage( @@ -185,6 +191,10 @@ class AssistantTranscriptProcessor(BaseTranscriptProcessor): if not self._aggregation_start_time: self._aggregation_start_time = time_now_iso8601() + # Track whether we need to add spaces between text parts + # Assumption: we can just keep track of the latest frame's value + self._add_spaces = not frame.includes_inter_frame_spaces + self._current_text_parts.append(frame.text) # Push frame. diff --git a/src/pipecat/services/google/gemini_live/llm.py b/src/pipecat/services/google/gemini_live/llm.py index 0f3304529..9c92076ab 100644 --- a/src/pipecat/services/google/gemini_live/llm.py +++ b/src/pipecat/services/google/gemini_live/llm.py @@ -1453,7 +1453,10 @@ class GeminiLiveLLMService(LLMService): self._bot_text_buffer += text self._search_result_buffer += text # Also accumulate for grounding - await self.push_frame(LLMTextFrame(text=text)) + frame = LLMTextFrame(text=text) + # Gemini Live text already includes any necessary inter-chunk spaces + frame.includes_inter_frame_spaces = True + await self.push_frame(frame) # Check for grounding metadata in server content if msg.server_content and msg.server_content.grounding_metadata: @@ -1645,7 +1648,11 @@ class GeminiLiveLLMService(LLMService): await self.push_frame(TTSStartedFrame()) await self.push_frame(LLMFullResponseStartFrame()) - await self.push_frame(TTSTextFrame(text=text)) + frame = TTSTextFrame(text=text) + # Gemini Live text already includes any necessary inter-chunk spaces + frame.includes_inter_frame_spaces = True + + await self.push_frame(frame) async def _handle_msg_grounding_metadata(self, message: LiveServerMessage): """Handle dedicated grounding metadata messages.""" diff --git a/src/pipecat/services/openai/realtime/llm.py b/src/pipecat/services/openai/realtime/llm.py index 0129a94f9..e38836b90 100644 --- a/src/pipecat/services/openai/realtime/llm.py +++ b/src/pipecat/services/openai/realtime/llm.py @@ -679,13 +679,19 @@ class OpenAIRealtimeLLMService(LLMService): # We receive text deltas (as opposed to audio transcript deltas) when # the output modality is "text" if evt.delta: - await self.push_frame(LLMTextFrame(evt.delta)) + frame = LLMTextFrame(evt.delta) + # OpenAI Realtime text already includes any necessary inter-chunk spaces + frame.includes_inter_frame_spaces = True + await self.push_frame(frame) async def _handle_evt_audio_transcript_delta(self, evt): # We receive audio transcript deltas (as opposed to text deltas) when # the output modality is "audio" (the default) if evt.delta: - await self.push_frame(TTSTextFrame(evt.delta)) + frame = TTSTextFrame(evt.delta) + # OpenAI Realtime text already includes any necessary inter-chunk spaces + frame.includes_inter_frame_spaces = True + await self.push_frame(frame) async def _handle_evt_function_call_arguments_done(self, evt): """Handle completion of function call arguments. diff --git a/src/pipecat/utils/string.py b/src/pipecat/utils/string.py index 25ce6afd5..177c0e8a5 100644 --- a/src/pipecat/utils/string.py +++ b/src/pipecat/utils/string.py @@ -198,7 +198,7 @@ def parse_start_end_tags( return (None, current_tag_index) -def concatenate_aggregated_text(text_parts: List[str]) -> str: +def concatenate_aggregated_text(text_parts: List[str], add_spaces: bool) -> str: """Concatenate a list of text parts into a single string. This function joins the provided list of text parts into a single string, @@ -209,25 +209,14 @@ def concatenate_aggregated_text(text_parts: List[str]) -> str: Args: text_parts: A list of strings representing parts of text to concatenate. + add_spaces: Whether to add spaces between text parts during concatenation. Returns: A single concatenated string. """ - # Check specifically for space characters, previously isspace() was used - # but that includes all whitespace characters (e.g. \n), not just spaces. - has_leading_spaces = any(part and part[0] == " " for part in text_parts[1:]) - has_trailing_spaces = any(part and part[-1] == " " for part in text_parts[:-1]) - - # If there are embedded spaces in the fragments, use direct concatenation - contains_spacing_between_fragments = has_leading_spaces or has_trailing_spaces - - # Apply corresponding joining method - if contains_spacing_between_fragments: - # Fragments already have spacing - just concatenate - result = "".join(text_parts) - else: - # Word-by-word fragments - join with spaces - result = " ".join(text_parts) + # Concatenate text parts with or without spaces based on the flag + separator = " " if add_spaces else "" + result = separator.join(text_parts) # Clean up any excessive whitespace result = result.strip() diff --git a/tests/test_context_aggregators.py b/tests/test_context_aggregators.py index 6196032a3..12be482c1 100644 --- a/tests/test_context_aggregators.py +++ b/tests/test_context_aggregators.py @@ -35,6 +35,7 @@ from pipecat.frames.frames import ( ) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.task import PipelineParams +from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response import ( LLMAssistantAggregatorParams, LLMUserAggregatorParams, @@ -651,12 +652,20 @@ class BaseTestAssistantContextAggregator: aggregator = self.AGGREGATOR_CLASS( context, params=self.create_assistant_aggregator_params(expect_stripped_words=False) ) + + # The newer LLMAssistantAggregator expects TextFrames to declare + # when they include inter-frame spaces. + def make_text_frame(text: str) -> TextFrame: + frame = TextFrame(text=text) + frame.includes_inter_frame_spaces = True + return frame + frames_to_send = [ LLMFullResponseStartFrame(), - TextFrame(text="Hello "), - TextFrame(text="Pipecat. "), - TextFrame(text="How are "), - TextFrame(text="you?"), + make_text_frame("Hello "), + make_text_frame("Pipecat. "), + make_text_frame("How are "), + make_text_frame("you?"), LLMFullResponseEndFrame(), ] expected_down_frames = [*self.EXPECTED_CONTEXT_FRAMES] @@ -697,14 +706,22 @@ class BaseTestAssistantContextAggregator: aggregator = self.AGGREGATOR_CLASS( context, params=self.create_assistant_aggregator_params(expect_stripped_words=False) ) + + # The newer LLMAssistantAggregator expects TextFrames to declare + # when they include inter-frame spaces. + def make_text_frame(text: str) -> TextFrame: + frame = TextFrame(text=text) + frame.includes_inter_frame_spaces = True + return frame + frames_to_send = [ LLMFullResponseStartFrame(), - TextFrame(text="Hello "), - TextFrame(text="Pipecat."), + make_text_frame("Hello "), + make_text_frame("Pipecat."), LLMFullResponseEndFrame(), LLMFullResponseStartFrame(), - TextFrame(text="How are "), - TextFrame(text="you?"), + make_text_frame(text="How are "), + make_text_frame(text="you?"), LLMFullResponseEndFrame(), ] expected_down_frames = [*self.EXPECTED_CONTEXT_FRAMES, *self.EXPECTED_CONTEXT_FRAMES] @@ -724,16 +741,24 @@ class BaseTestAssistantContextAggregator: aggregator = self.AGGREGATOR_CLASS( context, params=self.create_assistant_aggregator_params(expect_stripped_words=False) ) + + # The newer LLMAssistantAggregator expects TextFrames to declare + # when they include inter-frame spaces. + def make_text_frame(text: str) -> TextFrame: + frame = TextFrame(text=text) + frame.includes_inter_frame_spaces = True + return frame + frames_to_send = [ LLMFullResponseStartFrame(), - TextFrame(text="Hello "), - TextFrame(text="Pipecat."), + make_text_frame("Hello "), + make_text_frame("Pipecat."), LLMFullResponseEndFrame(), SleepFrame(AGGREGATION_SLEEP), InterruptionFrame(), LLMFullResponseStartFrame(), - TextFrame(text="How are "), - TextFrame(text="you?"), + make_text_frame("How are "), + make_text_frame("you?"), LLMFullResponseEndFrame(), ] expected_down_frames = [ @@ -969,7 +994,7 @@ class TestOpenAIAssistantContextAggregator( class TestLLMAssistantAggregator( BaseTestAssistantContextAggregator, unittest.IsolatedAsyncioTestCase ): - CONTEXT_CLASS = OpenAILLMContext + CONTEXT_CLASS = LLMContext AGGREGATOR_CLASS = LLMAssistantAggregator EXPECTED_CONTEXT_FRAMES = [LLMContextFrame, LLMContextAssistantTimestampFrame] diff --git a/tests/test_transcript_processor.py b/tests/test_transcript_processor.py index b433951ce..19366086c 100644 --- a/tests/test_transcript_processor.py +++ b/tests/test_transcript_processor.py @@ -438,17 +438,22 @@ class TestUserTranscriptProcessor(unittest.IsolatedAsyncioTestCase): received_updates.append(frame) # Test the specific pattern shared + def make_tts_text_frame(text: str) -> TTSTextFrame: + frame = TTSTextFrame(text=text) + frame.includes_inter_frame_spaces = True + return frame + frames_to_send = [ BotStartedSpeakingFrame(), SleepFrame(), - TTSTextFrame(text="Hello"), - TTSTextFrame(text=" there"), - TTSTextFrame(text="!"), - TTSTextFrame(text=" How"), - TTSTextFrame(text="'s"), - TTSTextFrame(text=" it"), - TTSTextFrame(text=" going"), - TTSTextFrame(text="?"), + make_tts_text_frame("Hello"), + make_tts_text_frame(" there"), + make_tts_text_frame("!"), + make_tts_text_frame(" How"), + make_tts_text_frame("'s"), + make_tts_text_frame(" it"), + make_tts_text_frame(" going"), + make_tts_text_frame("?"), BotStoppedSpeakingFrame(), ]