Compare commits

...

1 Commits

Author SHA1 Message Date
James Hush
0a163201ea feat: Add sentence aggregation and Whisker debugger to transcript processor
- Enhance TranscriptHandler to aggregate transcript fragments into complete sentences using match_endofsentence()
- Add Whisker debugger integration for real-time pipeline visualization
- Implement sentence buffering for both user and assistant messages
- Add finalize_partial_sentences() method to handle incomplete sentences on disconnect
- Improves transcript readability by reducing fragmented output

Changes:
- Import match_endofsentence utility for sentence boundary detection
- Add pipecat_whisker.WhiskerObserver for debugging capabilities
- Modify on_transcript_update() to accumulate and aggregate messages
- Create _save_sentence() helper method for complete sentence handling
- Update client disconnect handler to preserve partial sentences
2025-09-25 14:01:19 +08:00

View File

@@ -29,6 +29,10 @@ from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.utils.string import match_endofsentence
logger.info("Loading Whisker debugger...")
from pipecat_whisker import WhiskerObserver
load_dotenv(override=True)
@@ -52,6 +56,8 @@ class TranscriptHandler:
"""
self.messages: List[TranscriptionMessage] = []
self.output_file: Optional[str] = output_file
self._current_user_sentence = ""
self._current_assistant_sentence = ""
logger.debug(
f"TranscriptHandler initialized {'with output_file=' + output_file if output_file else 'with log output only'}"
)
@@ -78,11 +84,29 @@ class TranscriptHandler:
except Exception as e:
logger.error(f"Error saving transcript message to file: {e}")
async def _save_sentence(self, role: str, content: str, timestamp: Optional[str] = None):
"""Save a complete sentence as a transcript message.
Args:
role: The role (user/assistant)
content: The complete sentence content
timestamp: Optional timestamp
"""
# Cast role to the appropriate literal type
message_role = "user" if role == "user" else "assistant"
sentence_message = TranscriptionMessage(
role=message_role, content=content.strip(), timestamp=timestamp
)
self.messages.append(sentence_message)
await self.save_message(sentence_message)
async def on_transcript_update(
self, processor: TranscriptProcessor, frame: TranscriptionUpdateFrame
):
"""Handle new transcript messages.
Aggregates messages into complete sentences before saving them using match_endofsentence.
Args:
processor: The TranscriptProcessor that emitted the update
frame: TranscriptionUpdateFrame containing new messages
@@ -90,8 +114,31 @@ class TranscriptHandler:
logger.debug(f"Received transcript update with {len(frame.messages)} new messages")
for msg in frame.messages:
self.messages.append(msg)
await self.save_message(msg)
# Accumulate text for the appropriate role
if msg.role == "user":
self._current_user_sentence += msg.content + " "
# Check if we have a complete sentence
if match_endofsentence(self._current_user_sentence):
await self._save_sentence("user", self._current_user_sentence, msg.timestamp)
self._current_user_sentence = ""
elif msg.role == "assistant":
self._current_assistant_sentence += msg.content + " "
# Check if we have a complete sentence
if match_endofsentence(self._current_assistant_sentence):
await self._save_sentence(
"assistant", self._current_assistant_sentence, msg.timestamp
)
self._current_assistant_sentence = ""
async def finalize_partial_sentences(self):
"""Save any remaining partial sentences when the conversation ends."""
if self._current_user_sentence.strip():
await self._save_sentence("user", self._current_user_sentence)
self._current_user_sentence = ""
if self._current_assistant_sentence.strip():
await self._save_sentence("assistant", self._current_assistant_sentence)
self._current_assistant_sentence = ""
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
@@ -160,12 +207,16 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
]
)
# Create Whisker debugger observer
whisker = WhiskerObserver(pipeline)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
observers=[whisker],
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@@ -183,6 +234,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
# Finalize any partial sentences before canceling
await transcript_handler.finalize_partial_sentences()
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)