From 8ccc2cbf3168e7aea7d351409cb12591b008dfbc Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Mon, 8 Dec 2025 10:14:31 -0500 Subject: [PATCH] Add unit tests for `ThoughtTranscriptProcessor` --- tests/test_transcript_processor.py | 310 +++++++++++++++++++++++++++++ 1 file changed, 310 insertions(+) diff --git a/tests/test_transcript_processor.py b/tests/test_transcript_processor.py index d86e42101..a5e0702a0 100644 --- a/tests/test_transcript_processor.py +++ b/tests/test_transcript_processor.py @@ -16,6 +16,10 @@ from pipecat.frames.frames import ( BotStoppedSpeakingFrame, CancelFrame, InterruptionFrame, + LLMThoughtEndFrame, + LLMThoughtStartFrame, + LLMThoughtTextFrame, + ThoughtTranscriptionMessage, TranscriptionFrame, TranscriptionMessage, TranscriptionUpdateFrame, @@ -485,3 +489,309 @@ class TestUserTranscriptProcessor(unittest.IsolatedAsyncioTestCase): self.assertEqual(message.role, "assistant") # Should be properly joined without extra spaces self.assertEqual(message.content, "Hello there! How's it going?") + + +class TestThoughtTranscription(unittest.IsolatedAsyncioTestCase): + """Tests for thought transcription in AssistantTranscriptProcessor""" + + async def test_basic_thought_transcription(self): + """Test basic thought frame processing""" + processor = AssistantTranscriptProcessor() + + received_updates: List[TranscriptionUpdateFrame] = [] + + @processor.event_handler("on_transcript_update") + async def handle_update(proc, frame: TranscriptionUpdateFrame): + received_updates.append(frame) + + # Create frames for a simple thought + frames_to_send = [ + LLMThoughtStartFrame(), + LLMThoughtTextFrame(text="Let me think about this..."), + LLMThoughtEndFrame(), + ] + + expected_down_frames = [ + LLMThoughtStartFrame, + LLMThoughtTextFrame, + TranscriptionUpdateFrame, + LLMThoughtEndFrame, + ] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + ) + + # Verify update was received + self.assertEqual(len(received_updates), 1) + message = received_updates[0].messages[0] + self.assertIsInstance(message, ThoughtTranscriptionMessage) + self.assertEqual(message.content, "Let me think about this...") + self.assertIsNotNone(message.timestamp) + + async def test_thought_aggregation(self): + """Test that thought text frames are properly aggregated""" + processor = AssistantTranscriptProcessor() + + received_updates: List[TranscriptionUpdateFrame] = [] + + @processor.event_handler("on_transcript_update") + async def handle_update(proc, frame: TranscriptionUpdateFrame): + received_updates.append(frame) + + # Create frames simulating chunked thought text + frames_to_send = [ + LLMThoughtStartFrame(), + LLMThoughtTextFrame(text="The user "), + LLMThoughtTextFrame(text="is asking "), + LLMThoughtTextFrame(text="about electric "), + LLMThoughtTextFrame(text="cars."), + LLMThoughtEndFrame(), + ] + + expected_down_frames = [ + LLMThoughtStartFrame, + LLMThoughtTextFrame, + LLMThoughtTextFrame, + LLMThoughtTextFrame, + LLMThoughtTextFrame, + TranscriptionUpdateFrame, + LLMThoughtEndFrame, + ] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + ) + + # Verify aggregation + self.assertEqual(len(received_updates), 1) + message = received_updates[0].messages[0] + self.assertIsInstance(message, ThoughtTranscriptionMessage) + self.assertEqual(message.content, "The user is asking about electric cars.") + + async def test_thought_with_interruption(self): + """Test that thoughts are properly captured when interrupted""" + processor = AssistantTranscriptProcessor() + + received_updates: List[TranscriptionUpdateFrame] = [] + + @processor.event_handler("on_transcript_update") + async def handle_update(proc, frame: TranscriptionUpdateFrame): + received_updates.append(frame) + + frames_to_send = [ + LLMThoughtStartFrame(), + LLMThoughtTextFrame(text="I need to consider "), + LLMThoughtTextFrame(text="multiple factors"), + SleepFrame(), + InterruptionFrame(), # User interrupts + ] + + expected_down_frames = [ + LLMThoughtStartFrame, + LLMThoughtTextFrame, + LLMThoughtTextFrame, + InterruptionFrame, + TranscriptionUpdateFrame, + ] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + ) + + # Verify thought was captured on interruption + self.assertEqual(len(received_updates), 1) + message = received_updates[0].messages[0] + self.assertIsInstance(message, ThoughtTranscriptionMessage) + self.assertEqual(message.content, "I need to consider multiple factors") + + async def test_thought_with_cancel(self): + """Test that thoughts are properly captured when cancelled""" + processor = AssistantTranscriptProcessor() + + received_updates: List[TranscriptionUpdateFrame] = [] + + @processor.event_handler("on_transcript_update") + async def handle_update(proc, frame: TranscriptionUpdateFrame): + received_updates.append(frame) + + frames_to_send = [ + LLMThoughtStartFrame(), + LLMThoughtTextFrame(text="Starting analysis"), + SleepFrame(), + CancelFrame(), + ] + + expected_down_frames = [ + LLMThoughtStartFrame, + LLMThoughtTextFrame, + CancelFrame, + ] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + send_end_frame=False, + ) + + # Verify thought was captured on cancellation + self.assertEqual(len(received_updates), 1) + message = received_updates[0].messages[0] + self.assertIsInstance(message, ThoughtTranscriptionMessage) + self.assertEqual(message.content, "Starting analysis") + + async def test_thought_with_end_frame(self): + """Test that thoughts are captured when pipeline ends normally""" + processor = AssistantTranscriptProcessor() + + received_updates: List[TranscriptionUpdateFrame] = [] + + @processor.event_handler("on_transcript_update") + async def handle_update(proc, frame: TranscriptionUpdateFrame): + received_updates.append(frame) + + frames_to_send = [ + LLMThoughtStartFrame(), + LLMThoughtTextFrame(text="Final thought"), + # Pipeline ends here; run_test will automatically send EndFrame + ] + + expected_down_frames = [ + LLMThoughtStartFrame, + LLMThoughtTextFrame, + TranscriptionUpdateFrame, + ] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + ) + + # Verify thought was captured on EndFrame + self.assertEqual(len(received_updates), 1) + message = received_updates[0].messages[0] + self.assertIsInstance(message, ThoughtTranscriptionMessage) + self.assertEqual(message.content, "Final thought") + + async def test_multiple_thoughts(self): + """Test multiple separate thoughts in sequence""" + processor = AssistantTranscriptProcessor() + + received_updates: List[TranscriptionUpdateFrame] = [] + + @processor.event_handler("on_transcript_update") + async def handle_update(proc, frame: TranscriptionUpdateFrame): + received_updates.append(frame) + + frames_to_send = [ + # First thought + LLMThoughtStartFrame(), + LLMThoughtTextFrame(text="First consideration"), + LLMThoughtEndFrame(), + # Second thought + LLMThoughtStartFrame(), + LLMThoughtTextFrame(text="Second consideration"), + LLMThoughtEndFrame(), + ] + + expected_down_frames = [ + LLMThoughtStartFrame, + LLMThoughtTextFrame, + TranscriptionUpdateFrame, + LLMThoughtEndFrame, + LLMThoughtStartFrame, + LLMThoughtTextFrame, + TranscriptionUpdateFrame, + LLMThoughtEndFrame, + ] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + ) + + # Verify both thoughts were captured + self.assertEqual(len(received_updates), 2) + + first_message = received_updates[0].messages[0] + self.assertIsInstance(first_message, ThoughtTranscriptionMessage) + self.assertEqual(first_message.content, "First consideration") + + second_message = received_updates[1].messages[0] + self.assertIsInstance(second_message, ThoughtTranscriptionMessage) + self.assertEqual(second_message.content, "Second consideration") + + # Verify timestamps are different + self.assertNotEqual(first_message.timestamp, second_message.timestamp) + + async def test_empty_thought_handling(self): + """Test that empty thoughts are not emitted""" + processor = AssistantTranscriptProcessor() + + received_updates: List[TranscriptionUpdateFrame] = [] + + @processor.event_handler("on_transcript_update") + async def handle_update(proc, frame: TranscriptionUpdateFrame): + received_updates.append(frame) + + frames_to_send = [ + LLMThoughtStartFrame(), + LLMThoughtTextFrame(text=""), # Empty + LLMThoughtTextFrame(text=" "), # Just whitespace + LLMThoughtEndFrame(), + ] + + expected_down_frames = [ + LLMThoughtStartFrame, + LLMThoughtTextFrame, + LLMThoughtTextFrame, + LLMThoughtEndFrame, + ] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + ) + + # Verify no updates emitted for empty content + self.assertEqual(len(received_updates), 0) + + async def test_thought_without_start_frame(self): + """Test that thought text without start frame is ignored""" + processor = AssistantTranscriptProcessor() + + received_updates: List[TranscriptionUpdateFrame] = [] + + @processor.event_handler("on_transcript_update") + async def handle_update(proc, frame: TranscriptionUpdateFrame): + received_updates.append(frame) + + # Send thought text without start frame + frames_to_send = [ + LLMThoughtTextFrame(text="This should be ignored"), + LLMThoughtEndFrame(), + ] + + expected_down_frames = [ + LLMThoughtTextFrame, + LLMThoughtEndFrame, + ] + + await run_test( + processor, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + ) + + # Verify no updates since thought wasn't properly started + self.assertEqual(len(received_updates), 0)