From 0a163201ea49801abcd0d856cb5d4b02f77deaa5 Mon Sep 17 00:00:00 2001 From: James Hush Date: Thu, 25 Sep 2025 14:01:19 +0800 Subject: [PATCH] feat: Add sentence aggregation and Whisker debugger to transcript processor - Enhance TranscriptHandler to aggregate transcript fragments into complete sentences using match_endofsentence() - Add Whisker debugger integration for real-time pipeline visualization - Implement sentence buffering for both user and assistant messages - Add finalize_partial_sentences() method to handle incomplete sentences on disconnect - Improves transcript readability by reducing fragmented output Changes: - Import match_endofsentence utility for sentence boundary detection - Add pipecat_whisker.WhiskerObserver for debugging capabilities - Modify on_transcript_update() to accumulate and aggregate messages - Create _save_sentence() helper method for complete sentence handling - Update client disconnect handler to preserve partial sentences --- .../28-transcription-processor.py | 57 ++++++++++++++++++- 1 file changed, 55 insertions(+), 2 deletions(-) diff --git a/examples/foundational/28-transcription-processor.py b/examples/foundational/28-transcription-processor.py index 6aa3168ac..56f0980ec 100644 --- a/examples/foundational/28-transcription-processor.py +++ b/examples/foundational/28-transcription-processor.py @@ -29,6 +29,10 @@ from pipecat.services.openai.llm import OpenAILLMService from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams +from pipecat.utils.string import match_endofsentence + +logger.info("Loading Whisker debugger...") +from pipecat_whisker import WhiskerObserver load_dotenv(override=True) @@ -52,6 +56,8 @@ class TranscriptHandler: """ self.messages: List[TranscriptionMessage] = [] self.output_file: Optional[str] = output_file + self._current_user_sentence = "" + self._current_assistant_sentence = "" logger.debug( f"TranscriptHandler initialized {'with output_file=' + output_file if output_file else 'with log output only'}" ) @@ -78,11 +84,29 @@ class TranscriptHandler: except Exception as e: logger.error(f"Error saving transcript message to file: {e}") + async def _save_sentence(self, role: str, content: str, timestamp: Optional[str] = None): + """Save a complete sentence as a transcript message. + + Args: + role: The role (user/assistant) + content: The complete sentence content + timestamp: Optional timestamp + """ + # Cast role to the appropriate literal type + message_role = "user" if role == "user" else "assistant" + sentence_message = TranscriptionMessage( + role=message_role, content=content.strip(), timestamp=timestamp + ) + self.messages.append(sentence_message) + await self.save_message(sentence_message) + async def on_transcript_update( self, processor: TranscriptProcessor, frame: TranscriptionUpdateFrame ): """Handle new transcript messages. + Aggregates messages into complete sentences before saving them using match_endofsentence. + Args: processor: The TranscriptProcessor that emitted the update frame: TranscriptionUpdateFrame containing new messages @@ -90,8 +114,31 @@ class TranscriptHandler: logger.debug(f"Received transcript update with {len(frame.messages)} new messages") for msg in frame.messages: - self.messages.append(msg) - await self.save_message(msg) + # Accumulate text for the appropriate role + if msg.role == "user": + self._current_user_sentence += msg.content + " " + # Check if we have a complete sentence + if match_endofsentence(self._current_user_sentence): + await self._save_sentence("user", self._current_user_sentence, msg.timestamp) + self._current_user_sentence = "" + elif msg.role == "assistant": + self._current_assistant_sentence += msg.content + " " + # Check if we have a complete sentence + if match_endofsentence(self._current_assistant_sentence): + await self._save_sentence( + "assistant", self._current_assistant_sentence, msg.timestamp + ) + self._current_assistant_sentence = "" + + async def finalize_partial_sentences(self): + """Save any remaining partial sentences when the conversation ends.""" + if self._current_user_sentence.strip(): + await self._save_sentence("user", self._current_user_sentence) + self._current_user_sentence = "" + + if self._current_assistant_sentence.strip(): + await self._save_sentence("assistant", self._current_assistant_sentence) + self._current_assistant_sentence = "" # We store functions so objects (e.g. SileroVADAnalyzer) don't get @@ -160,12 +207,16 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ] ) + # Create Whisker debugger observer + whisker = WhiskerObserver(pipeline) + task = PipelineTask( pipeline, params=PipelineParams( enable_metrics=True, enable_usage_metrics=True, ), + observers=[whisker], idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, ) @@ -183,6 +234,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): @transport.event_handler("on_client_disconnected") async def on_client_disconnected(transport, client): logger.info(f"Client disconnected") + # Finalize any partial sentences before canceling + await transcript_handler.finalize_partial_sentences() await task.cancel() runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)