From 1ad8e28025dbe04272608cc00636f2ff76d90bd8 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 13 Mar 2025 14:21:36 -0400 Subject: [PATCH 1/8] Update TranscriptProcessor to more robustly handle different TTSTextFrame outputs --- .../processors/transcript_processor.py | 45 ++++++++++++++++++- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/src/pipecat/processors/transcript_processor.py b/src/pipecat/processors/transcript_processor.py index 614fe176c..6a7793335 100644 --- a/src/pipecat/processors/transcript_processor.py +++ b/src/pipecat/processors/transcript_processor.py @@ -90,9 +90,50 @@ class AssistantTranscriptProcessor(BaseTranscriptProcessor): self._aggregation_start_time: Optional[str] = None async def _emit_aggregated_text(self): - """Emit aggregated text as a transcript message.""" + """Emit aggregated text as a transcript message. + + This method intelligently joins text fragments to create natural spacing, + handling both word-by-word and pre-spaced text fragments appropriately. + + The implementation handles two common patterns from TTS services: + + 1. Word-by-word fragments without spacing: + ``` + TTSTextFrame: ['Hello.'] + TTSTextFrame: ['How'] + TTSTextFrame: ['can'] + TTSTextFrame: ['I'] + TTSTextFrame: ['assist'] + TTSTextFrame: ['you'] + TTSTextFrame: ['today?'] + ``` + Result: "Hello. How can I assist you today?" + + 2. Pre-spaced fragments: + ``` + TTSTextFrame: ['Hello'] + TTSTextFrame: [' there'] + TTSTextFrame: ['!'] + TTSTextFrame: [' How'] + TTSTextFrame: ["'s"] + TTSTextFrame: [' it'] + TTSTextFrame: [' going'] + TTSTextFrame: ['?'] + ``` + Result: "Hello there! How's it going?" + """ if self._current_text_parts and self._aggregation_start_time: - content = " ".join(self._current_text_parts).strip() + # Build content with intelligent spacing + content = "" + for i, part in enumerate(self._current_text_parts): + # Add a space only when the current part doesn't start with + # whitespace or punctuation/special characters + if i > 0 and not part.startswith((" ", ".", ",", "!", "?", ";", ":", "'", '"')): + content += " " + content += part + + content = content.strip() + if content: logger.debug(f"Emitting aggregated assistant message: {content}") message = TranscriptionMessage( From 5b6b700214b308319730263dd6854a6e99b22efe Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 13 Mar 2025 14:22:13 -0400 Subject: [PATCH 2/8] OpenAIRealtimeBetaLLMService outputs a TTSTextFrame --- src/pipecat/services/openai_realtime_beta/openai.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/pipecat/services/openai_realtime_beta/openai.py b/src/pipecat/services/openai_realtime_beta/openai.py index 00f8cd840..321c66826 100644 --- a/src/pipecat/services/openai_realtime_beta/openai.py +++ b/src/pipecat/services/openai_realtime_beta/openai.py @@ -43,6 +43,7 @@ from pipecat.frames.frames import ( TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame, + TTSTextFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame, ) @@ -471,6 +472,7 @@ class OpenAIRealtimeBetaLLMService(LLMService): async def _handle_evt_audio_transcript_delta(self, evt): if evt.delta: await self.push_frame(LLMTextFrame(evt.delta)) + await self.push_frame(TTSTextFrame(evt.delta)) async def _handle_evt_speech_started(self, evt): await self._truncate_current_audio_response() From 571c10403f9684bdcac175272c51d7e692f98378 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 13 Mar 2025 14:23:00 -0400 Subject: [PATCH 3/8] tests: Add additional coverage to test_transcript_processor --- tests/test_transcript_processor.py | 298 ++++++++++++++++++++++++++++- 1 file changed, 297 insertions(+), 1 deletion(-) diff --git a/tests/test_transcript_processor.py b/tests/test_transcript_processor.py index 1c6db277f..5f80b3ca6 100644 --- a/tests/test_transcript_processor.py +++ b/tests/test_transcript_processor.py @@ -275,7 +275,7 @@ class TestUserTranscriptProcessor(unittest.IsolatedAsyncioTestCase): # First update should be interrupted message first_message = received_updates[0].messages[0] self.assertEqual(first_message.role, "assistant") - self.assertEqual(first_message.content, "Hello world !") + self.assertEqual(first_message.content, "Hello world!") self.assertIsNotNone(first_message.timestamp) # Second update should be new response @@ -426,3 +426,299 @@ class TestUserTranscriptProcessor(unittest.IsolatedAsyncioTestCase): self.assertEqual(received_updates[0].content, "User message") self.assertEqual(received_updates[1].role, "assistant") self.assertEqual(received_updates[1].content, "Assistant message") + + async def test_text_fragments_with_spaces(self): + """Test aggregating text fragments with various spacing patterns""" + processor = AssistantTranscriptProcessor() + + # Track received updates + received_updates = [] + + @processor.event_handler("on_transcript_update") + async def handle_update(proc, frame: TranscriptionUpdateFrame): + received_updates.append(frame) + + # Test the specific pattern shared + frames_to_send = [ + BotStartedSpeakingFrame(), + SleepFrame(sleep=0.1), + TTSTextFrame(text="Hello"), + TTSTextFrame(text=" there"), + TTSTextFrame(text="!"), + TTSTextFrame(text=" How"), + TTSTextFrame(text="'s"), + TTSTextFrame(text=" it"), + TTSTextFrame(text=" going"), + TTSTextFrame(text="?"), + BotStoppedSpeakingFrame(), + ] + + expected_down_frames = [ + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TranscriptionUpdateFrame, + ] + + # Run test + received_frames, _ = await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + ) + + # Verify result + self.assertEqual(len(received_updates), 1) + message = received_updates[0].messages[0] + self.assertEqual(message.role, "assistant") + # Should be properly joined without extra spaces + self.assertEqual(message.content, "Hello there! How's it going?") + + async def test_mixed_spacing_styles(self): + """Test handling mixed word-by-word and pre-spaced fragments""" + processor = AssistantTranscriptProcessor() + + received_updates = [] + + @processor.event_handler("on_transcript_update") + async def handle_update(proc, frame: TranscriptionUpdateFrame): + received_updates.append(frame) + + # Mix of spacing styles within the same utterance + frames_to_send = [ + BotStartedSpeakingFrame(), + SleepFrame(sleep=0.1), + # Word-by-word style + TTSTextFrame(text="First"), + TTSTextFrame(text="style."), + # Pre-spaced style + TTSTextFrame(text=" Second"), + TTSTextFrame(text=" style"), + TTSTextFrame(text="!"), + BotStoppedSpeakingFrame(), + ] + + expected_down_frames = [ + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TranscriptionUpdateFrame, + ] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + ) + + self.assertEqual(len(received_updates), 1) + message = received_updates[0].messages[0] + self.assertEqual(message.content, "First style. Second style!") + + async def test_punctuation_handling(self): + """Test handling of various punctuation patterns""" + processor = AssistantTranscriptProcessor() + + received_updates = [] + + @processor.event_handler("on_transcript_update") + async def handle_update(proc, frame: TranscriptionUpdateFrame): + received_updates.append(frame) + + # Test various punctuation types + frames_to_send = [ + BotStartedSpeakingFrame(), + SleepFrame(sleep=0.1), + TTSTextFrame(text="Commas"), + TTSTextFrame(text=","), + TTSTextFrame(text="colons"), + TTSTextFrame(text=":"), + TTSTextFrame(text="semicolons"), + TTSTextFrame(text=";"), + TTSTextFrame(text="quotes"), + TTSTextFrame(text="'"), + TTSTextFrame(text="and"), + TTSTextFrame(text='"'), + TTSTextFrame(text="double quotes"), + TTSTextFrame(text="!"), + BotStoppedSpeakingFrame(), + ] + + expected_down_frames = [ + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TranscriptionUpdateFrame, + ] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + ) + + self.assertEqual(len(received_updates), 1) + message = received_updates[0].messages[0] + self.assertEqual( + message.content, "Commas, colons: semicolons; quotes' and\" double quotes!" + ) + + async def test_complex_mixed_case(self): + """Test a complex mix of patterns to ensure robustness""" + processor = AssistantTranscriptProcessor() + + received_updates = [] + + @processor.event_handler("on_transcript_update") + async def handle_update(proc, frame: TranscriptionUpdateFrame): + received_updates.append(frame) + + # Complex mixed case with various patterns + frames_to_send = [ + BotStartedSpeakingFrame(), + SleepFrame(sleep=0.1), + # Pre-spaced fragments + TTSTextFrame(text="Hello"), + TTSTextFrame(text=" there"), + TTSTextFrame(text="!"), + # Sentence boundary + TTSTextFrame(text=" I'm"), + TTSTextFrame(text=" testing"), + TTSTextFrame(text=" spacing"), + TTSTextFrame(text="."), + # Word-by-word fragments + TTSTextFrame(text="Does"), + TTSTextFrame(text="this"), + TTSTextFrame(text="work"), + TTSTextFrame(text="correctly"), + TTSTextFrame(text="?"), + # Mixed punctuation and spacing + TTSTextFrame(text=" Let's"), + TTSTextFrame(text=" see:"), + TTSTextFrame(text="commas"), + TTSTextFrame(text=","), + TTSTextFrame(text=" semicolons"), + TTSTextFrame(text=";"), + TTSTextFrame(text=" and"), + TTSTextFrame(text=" quotes"), + TTSTextFrame(text="'"), + TTSTextFrame(text="!"), + BotStoppedSpeakingFrame(), + ] + + expected_down_frames = [ + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TranscriptionUpdateFrame, + ] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + ) + + self.assertEqual(len(received_updates), 1) + message = received_updates[0].messages[0] + expected = "Hello there! I'm testing spacing. Does this work correctly? Let's see: commas, semicolons; and quotes'!" + self.assertEqual(message.content, expected) + + async def test_multiple_consecutive_punctuation(self): + """Test handling of multiple consecutive punctuation marks""" + processor = AssistantTranscriptProcessor() + + received_updates = [] + + @processor.event_handler("on_transcript_update") + async def handle_update(proc, frame: TranscriptionUpdateFrame): + received_updates.append(frame) + + frames_to_send = [ + BotStartedSpeakingFrame(), + SleepFrame(sleep=0.1), + TTSTextFrame(text="Wow"), + TTSTextFrame(text="!"), + TTSTextFrame(text="!"), + TTSTextFrame(text="!"), + TTSTextFrame(text=" That's"), + TTSTextFrame(text=" amazing"), + TTSTextFrame(text="..."), + TTSTextFrame(text=" Don't"), + TTSTextFrame(text=" you"), + TTSTextFrame(text=" think"), + TTSTextFrame(text="?"), + TTSTextFrame(text="?"), + BotStoppedSpeakingFrame(), + ] + + expected_down_frames = [ + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TTSTextFrame, + TranscriptionUpdateFrame, + ] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + ) + + self.assertEqual(len(received_updates), 1) + message = received_updates[0].messages[0] + self.assertEqual(message.content, "Wow!!! That's amazing... Don't you think??") From 6e6905405b48f1548370650a0109419e96cbaebd Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 13 Mar 2025 14:25:02 -0400 Subject: [PATCH 4/8] Update CHANGELOG --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e47f1010a..b24b74bc5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -90,6 +90,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- Updated `TranscriptProcessor` to support text output from + `OpenAIRealtimeBetaLLMService`. + +- `OpenAIRealtimeBetaLLMService` now pushes a `TTSTextFrame`. + - Updated the default mode for `CartesiaTTSService` and `CartesiaHttpTTSService` to `sonic-2`. From d5776c27f4cbeb8a576c00e6c32545f994f29a55 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 13 Mar 2025 14:31:57 -0400 Subject: [PATCH 5/8] Update 19-openai-realtime-beta --- examples/foundational/19-openai-realtime-beta.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/foundational/19-openai-realtime-beta.py b/examples/foundational/19-openai-realtime-beta.py index 504f8fbf1..8796e0141 100644 --- a/examples/foundational/19-openai-realtime-beta.py +++ b/examples/foundational/19-openai-realtime-beta.py @@ -147,8 +147,8 @@ Remember, your responses should be short. Just one or two sentences, usually.""" transport.input(), # Transport user input context_aggregator.user(), llm, # LLM - context_aggregator.assistant(), transport.output(), # Transport bot output + context_aggregator.assistant(), ] ) From 3f002f8ffb837f55d0f08a52cf2a28644e75a766 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 13 Mar 2025 14:34:05 -0400 Subject: [PATCH 6/8] Remove unnecessary TranscriptProcessor examples --- CHANGELOG.md | 4 + ...penai.py => 28-transcription-processor.py} | 0 .../28b-transcript-processor-anthropic.py | 177 --------------- .../28c-transcription-processor-gemini.py | 210 ------------------ 4 files changed, 4 insertions(+), 387 deletions(-) rename examples/foundational/{28a-transcription-processor-openai.py => 28-transcription-processor.py} (100%) delete mode 100644 examples/foundational/28b-transcript-processor-anthropic.py delete mode 100644 examples/foundational/28c-transcription-processor-gemini.py diff --git a/CHANGELOG.md b/CHANGELOG.md index b24b74bc5..596a3e1e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -123,6 +123,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added a Pipecat Cloud deployment example to the `examples` directory. +- Removed foundational examples 28b and 28c as the TranscriptProcessor no + longer has an LLM depedency. Renamed foundational example 28a to + `28-transcript-processor.py`. + ## [0.0.58] - 2025-02-26 ### Added diff --git a/examples/foundational/28a-transcription-processor-openai.py b/examples/foundational/28-transcription-processor.py similarity index 100% rename from examples/foundational/28a-transcription-processor-openai.py rename to examples/foundational/28-transcription-processor.py diff --git a/examples/foundational/28b-transcript-processor-anthropic.py b/examples/foundational/28b-transcript-processor-anthropic.py deleted file mode 100644 index c9f2672e0..000000000 --- a/examples/foundational/28b-transcript-processor-anthropic.py +++ /dev/null @@ -1,177 +0,0 @@ -# -# Copyright (c) 2024–2025, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# - -import asyncio -import os -import sys -from typing import List, Optional - -import aiohttp -from dotenv import load_dotenv -from loguru import logger -from runner import configure - -from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import TranscriptionMessage, TranscriptionUpdateFrame -from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask -from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext -from pipecat.processors.transcript_processor import TranscriptProcessor -from pipecat.services.anthropic import AnthropicLLMService -from pipecat.services.cartesia import CartesiaTTSService -from pipecat.services.deepgram import DeepgramSTTService -from pipecat.transports.services.daily import DailyParams, DailyTransport - -load_dotenv(override=True) - -logger.remove(0) -logger.add(sys.stderr, level="DEBUG") - - -class TranscriptHandler: - """Handles real-time transcript processing and output. - - Maintains a list of conversation messages and outputs them either to a log - or to a file as they are received. Each message includes its timestamp and role. - - Attributes: - messages: List of all processed transcript messages - output_file: Optional path to file where transcript is saved. If None, outputs to log only. - """ - - def __init__(self, output_file: Optional[str] = None): - """Initialize handler with optional file output. - - Args: - output_file: Path to output file. If None, outputs to log only. - """ - self.messages: List[TranscriptionMessage] = [] - self.output_file: Optional[str] = output_file - logger.debug( - f"TranscriptHandler initialized {'with output_file=' + output_file if output_file else 'with log output only'}" - ) - - async def save_message(self, message: TranscriptionMessage): - """Save a single transcript message. - - Outputs the message to the log and optionally to a file. - - Args: - message: The message to save - """ - timestamp = f"[{message.timestamp}] " if message.timestamp else "" - line = f"{timestamp}{message.role}: {message.content}" - - # Always log the message - logger.info(f"Transcript: {line}") - - # Optionally write to file - if self.output_file: - try: - with open(self.output_file, "a", encoding="utf-8") as f: - f.write(line + "\n") - except Exception as e: - logger.error(f"Error saving transcript message to file: {e}") - - async def on_transcript_update( - self, processor: TranscriptProcessor, frame: TranscriptionUpdateFrame - ): - """Handle new transcript messages. - - Args: - processor: The TranscriptProcessor that emitted the update - frame: TranscriptionUpdateFrame containing new messages - """ - logger.debug(f"Received transcript update with {len(frame.messages)} new messages") - - for msg in frame.messages: - self.messages.append(msg) - await self.save_message(msg) - - -async def main(): - async with aiohttp.ClientSession() as session: - (room_url, token) = await configure(session) - - transport = DailyTransport( - room_url, - None, - "Respond bot", - DailyParams( - audio_out_enabled=True, - vad_enabled=True, - vad_analyzer=SileroVADAnalyzer(), - vad_audio_passthrough=True, - ), - ) - - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - - tts = CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady - ) - - llm = AnthropicLLMService( - api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-5-sonnet-20241022" - ) - - messages = [ - { - "role": "system", - "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative, helpful, and brief way.", - }, - {"role": "user", "content": "Say hello."}, - ] - - context = OpenAILLMContext(messages) - context_aggregator = llm.create_context_aggregator(context) - - # Create transcript processor and handler - transcript = TranscriptProcessor() - transcript_handler = TranscriptHandler() # Output to log only - # transcript_handler = TranscriptHandler(output_file="transcript.txt") # Output to file and log - - pipeline = Pipeline( - [ - transport.input(), # Transport user input - stt, # STT - transcript.user(), # User transcripts - context_aggregator.user(), # User responses - llm, # LLM - tts, # TTS - transport.output(), # Transport bot output - transcript.assistant(), # Assistant transcripts - context_aggregator.assistant(), # Assistant spoken responses - ] - ) - - task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True)) - - @transport.event_handler("on_first_participant_joined") - async def on_first_participant_joined(transport, participant): - await transport.capture_participant_transcription(participant["id"]) - # Kick off the conversation. - await task.queue_frames([context_aggregator.user().get_context_frame()]) - - # Register event handler for transcript updates - @transcript.event_handler("on_transcript_update") - async def on_transcript_update(processor, frame): - await transcript_handler.on_transcript_update(processor, frame) - - @transport.event_handler("on_participant_left") - async def on_participant_left(transport, participant, reason): - # Stop the pipeline immediately when the participant leaves - await task.cancel() - - runner = PipelineRunner() - - await runner.run(task) - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/examples/foundational/28c-transcription-processor-gemini.py b/examples/foundational/28c-transcription-processor-gemini.py deleted file mode 100644 index 558edc76d..000000000 --- a/examples/foundational/28c-transcription-processor-gemini.py +++ /dev/null @@ -1,210 +0,0 @@ -# -# Copyright (c) 2024–2025, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# - -import asyncio -import os -import sqlite3 -import sys -from typing import List, Optional - -import aiohttp -from dotenv import load_dotenv -from loguru import logger -from runner import configure - -from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import TranscriptionMessage, TranscriptionUpdateFrame -from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask -from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext -from pipecat.processors.transcript_processor import TranscriptProcessor -from pipecat.services.cartesia import CartesiaTTSService -from pipecat.services.deepgram import DeepgramSTTService -from pipecat.services.google import GoogleLLMService -from pipecat.services.openai import OpenAILLMContext -from pipecat.transports.services.daily import DailyParams, DailyTransport - -load_dotenv(override=True) - -logger.remove(0) -logger.add(sys.stderr, level="DEBUG") - - -class TranscriptHandler: - """Handles real-time transcript processing and output. - - Maintains a list of conversation messages and outputs them either to a log - or to a file as they are received. Each message includes its timestamp and role. - - Attributes: - messages: List of all processed transcript messages - output_file: Optional path to file where transcript is saved. If None, outputs to log only. - """ - - def __init__(self, output_file: Optional[str] = None, output_db: Optional[str] = None): - """Initialize handler with optional file or database output. - - Args: - output_file: Path to output file. If None, outputs to log only. - """ - self.messages: List[TranscriptionMessage] = [] - self.output_file: Optional[str] = output_file - self.output_db: Optional[str] = output_db - - if self.output_db: - self.con = sqlite3.connect("example.db") - self.db = self.con.cursor() - - table = self.db.execute("SELECT name FROM sqlite_master WHERE name='messages'") - if not (table.fetchone()): - self.db.execute( - "CREATE TABLE messages(role TEXT, content TEXT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP )" - ) - logger.debug( - f"TranscriptHandler initialized; output file: {output_file}, output DB: {output_db}" - ) - - async def save_message(self, message: TranscriptionMessage): - """Save a single transcript message. - - Outputs the message to the log and optionally to a SQLite database or file. - - Args: - message: The message to save - """ - timestamp = f"[{message.timestamp}] " if message.timestamp else "" - line = f"{timestamp}{message.role}: {message.content}" - - # Always log the message - logger.info(f"Transcript: {line}") - - # Optionally write to file - if self.output_file: - try: - with open(self.output_file, "a", encoding="utf-8") as f: - f.write(line + "\n") - except Exception as e: - logger.error(f"Error saving transcript message to file: {e}") - - # and/or to a SQLite database - if self.output_db: - self.db.execute( - "INSERT INTO messages VALUES (?, ?, ?)", - (message.role, message.content, message.timestamp), - ) - self.con.commit() - - async def on_transcript_update( - self, processor: TranscriptProcessor, frame: TranscriptionUpdateFrame - ): - """Handle new transcript messages. - - Args: - processor: The TranscriptProcessor that emitted the update - frame: TranscriptionUpdateFrame containing new messages - """ - logger.debug(f"Received transcript update with {len(frame.messages)} new messages") - - for msg in frame.messages: - self.messages.append(msg) - await self.save_message(msg) - - -async def main(): - async with aiohttp.ClientSession() as session: - (room_url, token) = await configure(session) - - transport = DailyTransport( - room_url, - None, - "Respond bot", - DailyParams( - audio_out_enabled=True, - vad_enabled=True, - vad_analyzer=SileroVADAnalyzer(), - vad_audio_passthrough=True, - ), - ) - - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - - tts = CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady - ) - - llm = GoogleLLMService( - model="models/gemini-2.0-flash-exp", - # model="gemini-exp-1114", - api_key=os.getenv("GOOGLE_API_KEY"), - ) - - messages = [ - { - "role": "system", - "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative, helpful, and brief way.", - }, - {"role": "user", "content": "Say hello."}, - ] - - context = OpenAILLMContext(messages) - context_aggregator = llm.create_context_aggregator(context) - - # Create transcript processor and handler - transcript = TranscriptProcessor() - # Select a TranscriptHandler output method - # Uncomment out only one of the following lines: - transcript_handler = TranscriptHandler() # Output to log only - # transcript_handler = TranscriptHandler(output_file="transcript.txt") # Output to file and log - # transcript_handler = TranscriptHandler(output_db="example.db") # Output to SQLite DB and log - - pipeline = Pipeline( - [ - transport.input(), # Transport user input - stt, # STT - transcript.user(), # User transcripts - context_aggregator.user(), # User responses - llm, # LLM - tts, # TTS - transport.output(), # Transport bot output - transcript.assistant(), # Assistant transcripts - context_aggregator.assistant(), # Assistant spoken responses - ] - ) - - task = PipelineTask( - pipeline, - params=PipelineParams( - allow_interruptions=True, - enable_metrics=True, - enable_usage_metrics=True, - ), - ) - - @transport.event_handler("on_first_participant_joined") - async def on_first_participant_joined(transport, participant): - await transport.capture_participant_transcription(participant["id"]) - # Kick off the conversation. - await task.queue_frames([context_aggregator.user().get_context_frame()]) - - # Register event handler for transcript updates - @transcript.event_handler("on_transcript_update") - async def on_transcript_update(processor, frame): - await transcript_handler.on_transcript_update(processor, frame) - - @transport.event_handler("on_participant_left") - async def on_participant_left(transport, participant, reason): - # Stop the pipeline immediately when the participant leaves - await task.cancel() - - runner = PipelineRunner() - - await runner.run(task) - - -if __name__ == "__main__": - asyncio.run(main()) From acd0660f66ccecaabf3089480045faba0c37a586 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Fri, 14 Mar 2025 14:00:38 -0400 Subject: [PATCH 7/8] Update GeminiMultimodalLiveLLMService to work with the TranscriptProcessor --- CHANGELOG.md | 3 ++- src/pipecat/services/gemini_multimodal_live/gemini.py | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 596a3e1e0..7376603c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -93,7 +93,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Updated `TranscriptProcessor` to support text output from `OpenAIRealtimeBetaLLMService`. -- `OpenAIRealtimeBetaLLMService` now pushes a `TTSTextFrame`. +- `OpenAIRealtimeBetaLLMService` and `GeminiMultimodalLiveLLMService` now push + a `TTSTextFrame`. - Updated the default mode for `CartesiaTTSService` and `CartesiaHttpTTSService` to `sonic-2`. diff --git a/src/pipecat/services/gemini_multimodal_live/gemini.py b/src/pipecat/services/gemini_multimodal_live/gemini.py index ef49df329..5fe8a792b 100644 --- a/src/pipecat/services/gemini_multimodal_live/gemini.py +++ b/src/pipecat/services/gemini_multimodal_live/gemini.py @@ -38,6 +38,7 @@ from pipecat.frames.frames import ( TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame, + TTSTextFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame, ) @@ -312,6 +313,7 @@ class GeminiMultimodalLiveLLMService(LLMService): # context.add_message({"role": "assistant", "content": [{"type": "text", "text": text}]}) await self.push_frame(LLMFullResponseStartFrame()) await self.push_frame(LLMTextFrame(text=text)) + await self.push_frame(TTSTextFrame(text=text)) await self.push_frame(LLMFullResponseEndFrame()) async def _transcribe_audio(self, audio, context): From 6885d07e880341d1a5ae46054ae8d64609c3e9eb Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Mon, 17 Mar 2025 16:30:46 -0400 Subject: [PATCH 8/8] Simplify the TranscriptProcessor _emit_aggregated_text logic --- .../processors/transcript_processor.py | 86 +++--- tests/test_transcript_processor.py | 248 +----------------- 2 files changed, 50 insertions(+), 284 deletions(-) diff --git a/src/pipecat/processors/transcript_processor.py b/src/pipecat/processors/transcript_processor.py index 6a7793335..3eaff66ca 100644 --- a/src/pipecat/processors/transcript_processor.py +++ b/src/pipecat/processors/transcript_processor.py @@ -90,52 +90,62 @@ class AssistantTranscriptProcessor(BaseTranscriptProcessor): self._aggregation_start_time: Optional[str] = None async def _emit_aggregated_text(self): - """Emit aggregated text as a transcript message. + """Aggregates and emits text fragments as a transcript message. - This method intelligently joins text fragments to create natural spacing, - handling both word-by-word and pre-spaced text fragments appropriately. + This method uses a heuristic to automatically detect whether text fragments + use pre-spacing (spaces at the beginning of fragments) or not, and applies + the appropriate joining strategy. It handles fragments from different TTS + services with different formatting patterns. - The implementation handles two common patterns from TTS services: + Examples: + Pre-spaced fragments (concatenated): + ``` + TTSTextFrame: ["Hello"] + TTSTextFrame: [" there"] + TTSTextFrame: ["!"] + TTSTextFrame: [" How"] + TTSTextFrame: ["'s"] + TTSTextFrame: [" it"] + TTSTextFrame: [" going"] + TTSTextFrame: ["?"] + ``` + Result: "Hello there! How's it going?" - 1. Word-by-word fragments without spacing: - ``` - TTSTextFrame: ['Hello.'] - TTSTextFrame: ['How'] - TTSTextFrame: ['can'] - TTSTextFrame: ['I'] - TTSTextFrame: ['assist'] - TTSTextFrame: ['you'] - TTSTextFrame: ['today?'] - ``` - Result: "Hello. How can I assist you today?" - - 2. Pre-spaced fragments: - ``` - TTSTextFrame: ['Hello'] - TTSTextFrame: [' there'] - TTSTextFrame: ['!'] - TTSTextFrame: [' How'] - TTSTextFrame: ["'s"] - TTSTextFrame: [' it'] - TTSTextFrame: [' going'] - TTSTextFrame: ['?'] - ``` - Result: "Hello there! How's it going?" + Word-by-word fragments (joined with spaces): + ``` + TTSTextFrame: ["Hello"] + TTSTextFrame: ["there!"] + TTSTextFrame: ["How"] + TTSTextFrame: ["is"] + TTSTextFrame: ["it"] + TTSTextFrame: ["going?"] + ``` + Result: "Hello there! How is it going?" """ if self._current_text_parts and self._aggregation_start_time: - # Build content with intelligent spacing - content = "" - for i, part in enumerate(self._current_text_parts): - # Add a space only when the current part doesn't start with - # whitespace or punctuation/special characters - if i > 0 and not part.startswith((" ", ".", ",", "!", "?", ";", ":", "'", '"')): - content += " " - content += part + # Heuristic to detect pre-spaced fragments + uses_prespacing = False + if len(self._current_text_parts) > 1: + # Check if any fragment after the first one starts with whitespace + has_spaced_parts = any( + part and part[0].isspace() for part in self._current_text_parts[1:] + ) + if has_spaced_parts: + uses_prespacing = True + # Apply appropriate joining method + if uses_prespacing: + # Pre-spaced fragments - just concatenate + content = "".join(self._current_text_parts) + else: + # Word-by-word fragments - join with spaces + content = " ".join(self._current_text_parts) + + # Clean up any excessive whitespace content = content.strip() if content: - logger.debug(f"Emitting aggregated assistant message: {content}") + logger.trace(f"Emitting aggregated assistant message: {content}") message = TranscriptionMessage( role="assistant", content=content, @@ -143,7 +153,7 @@ class AssistantTranscriptProcessor(BaseTranscriptProcessor): ) await self._emit_update([message]) else: - logger.debug("No content to emit after stripping whitespace") + logger.trace("No content to emit after stripping whitespace") # Reset aggregation state self._current_text_parts = [] diff --git a/tests/test_transcript_processor.py b/tests/test_transcript_processor.py index 5f80b3ca6..d13246b2c 100644 --- a/tests/test_transcript_processor.py +++ b/tests/test_transcript_processor.py @@ -235,8 +235,7 @@ class TestUserTranscriptProcessor(unittest.IsolatedAsyncioTestCase): BotStartedSpeakingFrame(), SleepFrame(sleep=0.1), TTSTextFrame(text="Hello"), - TTSTextFrame(text="world"), - TTSTextFrame(text="!"), + TTSTextFrame(text="world!"), SleepFrame(sleep=0.1), StartInterruptionFrame(), # User interrupts here BotStartedSpeakingFrame(), @@ -251,8 +250,7 @@ class TestUserTranscriptProcessor(unittest.IsolatedAsyncioTestCase): expected_down_frames = [ BotStartedSpeakingFrame, TTSTextFrame, # "Hello" - TTSTextFrame, # "world" - TTSTextFrame, # "!" + TTSTextFrame, # "world!" TranscriptionUpdateFrame, # First message (emitted due to interruption) StartInterruptionFrame, # Interruption frame comes after the update BotStartedSpeakingFrame, @@ -480,245 +478,3 @@ class TestUserTranscriptProcessor(unittest.IsolatedAsyncioTestCase): self.assertEqual(message.role, "assistant") # Should be properly joined without extra spaces self.assertEqual(message.content, "Hello there! How's it going?") - - async def test_mixed_spacing_styles(self): - """Test handling mixed word-by-word and pre-spaced fragments""" - processor = AssistantTranscriptProcessor() - - received_updates = [] - - @processor.event_handler("on_transcript_update") - async def handle_update(proc, frame: TranscriptionUpdateFrame): - received_updates.append(frame) - - # Mix of spacing styles within the same utterance - frames_to_send = [ - BotStartedSpeakingFrame(), - SleepFrame(sleep=0.1), - # Word-by-word style - TTSTextFrame(text="First"), - TTSTextFrame(text="style."), - # Pre-spaced style - TTSTextFrame(text=" Second"), - TTSTextFrame(text=" style"), - TTSTextFrame(text="!"), - BotStoppedSpeakingFrame(), - ] - - expected_down_frames = [ - BotStartedSpeakingFrame, - BotStoppedSpeakingFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TranscriptionUpdateFrame, - ] - - await run_test( - processor, - frames_to_send=frames_to_send, - expected_down_frames=expected_down_frames, - ) - - self.assertEqual(len(received_updates), 1) - message = received_updates[0].messages[0] - self.assertEqual(message.content, "First style. Second style!") - - async def test_punctuation_handling(self): - """Test handling of various punctuation patterns""" - processor = AssistantTranscriptProcessor() - - received_updates = [] - - @processor.event_handler("on_transcript_update") - async def handle_update(proc, frame: TranscriptionUpdateFrame): - received_updates.append(frame) - - # Test various punctuation types - frames_to_send = [ - BotStartedSpeakingFrame(), - SleepFrame(sleep=0.1), - TTSTextFrame(text="Commas"), - TTSTextFrame(text=","), - TTSTextFrame(text="colons"), - TTSTextFrame(text=":"), - TTSTextFrame(text="semicolons"), - TTSTextFrame(text=";"), - TTSTextFrame(text="quotes"), - TTSTextFrame(text="'"), - TTSTextFrame(text="and"), - TTSTextFrame(text='"'), - TTSTextFrame(text="double quotes"), - TTSTextFrame(text="!"), - BotStoppedSpeakingFrame(), - ] - - expected_down_frames = [ - BotStartedSpeakingFrame, - BotStoppedSpeakingFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TranscriptionUpdateFrame, - ] - - await run_test( - processor, - frames_to_send=frames_to_send, - expected_down_frames=expected_down_frames, - ) - - self.assertEqual(len(received_updates), 1) - message = received_updates[0].messages[0] - self.assertEqual( - message.content, "Commas, colons: semicolons; quotes' and\" double quotes!" - ) - - async def test_complex_mixed_case(self): - """Test a complex mix of patterns to ensure robustness""" - processor = AssistantTranscriptProcessor() - - received_updates = [] - - @processor.event_handler("on_transcript_update") - async def handle_update(proc, frame: TranscriptionUpdateFrame): - received_updates.append(frame) - - # Complex mixed case with various patterns - frames_to_send = [ - BotStartedSpeakingFrame(), - SleepFrame(sleep=0.1), - # Pre-spaced fragments - TTSTextFrame(text="Hello"), - TTSTextFrame(text=" there"), - TTSTextFrame(text="!"), - # Sentence boundary - TTSTextFrame(text=" I'm"), - TTSTextFrame(text=" testing"), - TTSTextFrame(text=" spacing"), - TTSTextFrame(text="."), - # Word-by-word fragments - TTSTextFrame(text="Does"), - TTSTextFrame(text="this"), - TTSTextFrame(text="work"), - TTSTextFrame(text="correctly"), - TTSTextFrame(text="?"), - # Mixed punctuation and spacing - TTSTextFrame(text=" Let's"), - TTSTextFrame(text=" see:"), - TTSTextFrame(text="commas"), - TTSTextFrame(text=","), - TTSTextFrame(text=" semicolons"), - TTSTextFrame(text=";"), - TTSTextFrame(text=" and"), - TTSTextFrame(text=" quotes"), - TTSTextFrame(text="'"), - TTSTextFrame(text="!"), - BotStoppedSpeakingFrame(), - ] - - expected_down_frames = [ - BotStartedSpeakingFrame, - BotStoppedSpeakingFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TranscriptionUpdateFrame, - ] - - await run_test( - processor, - frames_to_send=frames_to_send, - expected_down_frames=expected_down_frames, - ) - - self.assertEqual(len(received_updates), 1) - message = received_updates[0].messages[0] - expected = "Hello there! I'm testing spacing. Does this work correctly? Let's see: commas, semicolons; and quotes'!" - self.assertEqual(message.content, expected) - - async def test_multiple_consecutive_punctuation(self): - """Test handling of multiple consecutive punctuation marks""" - processor = AssistantTranscriptProcessor() - - received_updates = [] - - @processor.event_handler("on_transcript_update") - async def handle_update(proc, frame: TranscriptionUpdateFrame): - received_updates.append(frame) - - frames_to_send = [ - BotStartedSpeakingFrame(), - SleepFrame(sleep=0.1), - TTSTextFrame(text="Wow"), - TTSTextFrame(text="!"), - TTSTextFrame(text="!"), - TTSTextFrame(text="!"), - TTSTextFrame(text=" That's"), - TTSTextFrame(text=" amazing"), - TTSTextFrame(text="..."), - TTSTextFrame(text=" Don't"), - TTSTextFrame(text=" you"), - TTSTextFrame(text=" think"), - TTSTextFrame(text="?"), - TTSTextFrame(text="?"), - BotStoppedSpeakingFrame(), - ] - - expected_down_frames = [ - BotStartedSpeakingFrame, - BotStoppedSpeakingFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TTSTextFrame, - TranscriptionUpdateFrame, - ] - - await run_test( - processor, - frames_to_send=frames_to_send, - expected_down_frames=expected_down_frames, - ) - - self.assertEqual(len(received_updates), 1) - message = received_updates[0].messages[0] - self.assertEqual(message.content, "Wow!!! That's amazing... Don't you think??")