Add unit tests for ThoughtTranscriptProcessor

This commit is contained in:
Paul Kompfner
2025-12-08 10:14:31 -05:00
parent ef703e9d16
commit 8ccc2cbf31

View File

@@ -16,6 +16,10 @@ from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
CancelFrame,
InterruptionFrame,
LLMThoughtEndFrame,
LLMThoughtStartFrame,
LLMThoughtTextFrame,
ThoughtTranscriptionMessage,
TranscriptionFrame,
TranscriptionMessage,
TranscriptionUpdateFrame,
@@ -485,3 +489,309 @@ class TestUserTranscriptProcessor(unittest.IsolatedAsyncioTestCase):
self.assertEqual(message.role, "assistant")
# Should be properly joined without extra spaces
self.assertEqual(message.content, "Hello there! How's it going?")
class TestThoughtTranscription(unittest.IsolatedAsyncioTestCase):
"""Tests for thought transcription in AssistantTranscriptProcessor"""
async def test_basic_thought_transcription(self):
"""Test basic thought frame processing"""
processor = AssistantTranscriptProcessor()
received_updates: List[TranscriptionUpdateFrame] = []
@processor.event_handler("on_transcript_update")
async def handle_update(proc, frame: TranscriptionUpdateFrame):
received_updates.append(frame)
# Create frames for a simple thought
frames_to_send = [
LLMThoughtStartFrame(),
LLMThoughtTextFrame(text="Let me think about this..."),
LLMThoughtEndFrame(),
]
expected_down_frames = [
LLMThoughtStartFrame,
LLMThoughtTextFrame,
TranscriptionUpdateFrame,
LLMThoughtEndFrame,
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
)
# Verify update was received
self.assertEqual(len(received_updates), 1)
message = received_updates[0].messages[0]
self.assertIsInstance(message, ThoughtTranscriptionMessage)
self.assertEqual(message.content, "Let me think about this...")
self.assertIsNotNone(message.timestamp)
async def test_thought_aggregation(self):
"""Test that thought text frames are properly aggregated"""
processor = AssistantTranscriptProcessor()
received_updates: List[TranscriptionUpdateFrame] = []
@processor.event_handler("on_transcript_update")
async def handle_update(proc, frame: TranscriptionUpdateFrame):
received_updates.append(frame)
# Create frames simulating chunked thought text
frames_to_send = [
LLMThoughtStartFrame(),
LLMThoughtTextFrame(text="The user "),
LLMThoughtTextFrame(text="is asking "),
LLMThoughtTextFrame(text="about electric "),
LLMThoughtTextFrame(text="cars."),
LLMThoughtEndFrame(),
]
expected_down_frames = [
LLMThoughtStartFrame,
LLMThoughtTextFrame,
LLMThoughtTextFrame,
LLMThoughtTextFrame,
LLMThoughtTextFrame,
TranscriptionUpdateFrame,
LLMThoughtEndFrame,
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
)
# Verify aggregation
self.assertEqual(len(received_updates), 1)
message = received_updates[0].messages[0]
self.assertIsInstance(message, ThoughtTranscriptionMessage)
self.assertEqual(message.content, "The user is asking about electric cars.")
async def test_thought_with_interruption(self):
"""Test that thoughts are properly captured when interrupted"""
processor = AssistantTranscriptProcessor()
received_updates: List[TranscriptionUpdateFrame] = []
@processor.event_handler("on_transcript_update")
async def handle_update(proc, frame: TranscriptionUpdateFrame):
received_updates.append(frame)
frames_to_send = [
LLMThoughtStartFrame(),
LLMThoughtTextFrame(text="I need to consider "),
LLMThoughtTextFrame(text="multiple factors"),
SleepFrame(),
InterruptionFrame(), # User interrupts
]
expected_down_frames = [
LLMThoughtStartFrame,
LLMThoughtTextFrame,
LLMThoughtTextFrame,
InterruptionFrame,
TranscriptionUpdateFrame,
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
)
# Verify thought was captured on interruption
self.assertEqual(len(received_updates), 1)
message = received_updates[0].messages[0]
self.assertIsInstance(message, ThoughtTranscriptionMessage)
self.assertEqual(message.content, "I need to consider multiple factors")
async def test_thought_with_cancel(self):
"""Test that thoughts are properly captured when cancelled"""
processor = AssistantTranscriptProcessor()
received_updates: List[TranscriptionUpdateFrame] = []
@processor.event_handler("on_transcript_update")
async def handle_update(proc, frame: TranscriptionUpdateFrame):
received_updates.append(frame)
frames_to_send = [
LLMThoughtStartFrame(),
LLMThoughtTextFrame(text="Starting analysis"),
SleepFrame(),
CancelFrame(),
]
expected_down_frames = [
LLMThoughtStartFrame,
LLMThoughtTextFrame,
CancelFrame,
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
send_end_frame=False,
)
# Verify thought was captured on cancellation
self.assertEqual(len(received_updates), 1)
message = received_updates[0].messages[0]
self.assertIsInstance(message, ThoughtTranscriptionMessage)
self.assertEqual(message.content, "Starting analysis")
async def test_thought_with_end_frame(self):
"""Test that thoughts are captured when pipeline ends normally"""
processor = AssistantTranscriptProcessor()
received_updates: List[TranscriptionUpdateFrame] = []
@processor.event_handler("on_transcript_update")
async def handle_update(proc, frame: TranscriptionUpdateFrame):
received_updates.append(frame)
frames_to_send = [
LLMThoughtStartFrame(),
LLMThoughtTextFrame(text="Final thought"),
# Pipeline ends here; run_test will automatically send EndFrame
]
expected_down_frames = [
LLMThoughtStartFrame,
LLMThoughtTextFrame,
TranscriptionUpdateFrame,
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
)
# Verify thought was captured on EndFrame
self.assertEqual(len(received_updates), 1)
message = received_updates[0].messages[0]
self.assertIsInstance(message, ThoughtTranscriptionMessage)
self.assertEqual(message.content, "Final thought")
async def test_multiple_thoughts(self):
"""Test multiple separate thoughts in sequence"""
processor = AssistantTranscriptProcessor()
received_updates: List[TranscriptionUpdateFrame] = []
@processor.event_handler("on_transcript_update")
async def handle_update(proc, frame: TranscriptionUpdateFrame):
received_updates.append(frame)
frames_to_send = [
# First thought
LLMThoughtStartFrame(),
LLMThoughtTextFrame(text="First consideration"),
LLMThoughtEndFrame(),
# Second thought
LLMThoughtStartFrame(),
LLMThoughtTextFrame(text="Second consideration"),
LLMThoughtEndFrame(),
]
expected_down_frames = [
LLMThoughtStartFrame,
LLMThoughtTextFrame,
TranscriptionUpdateFrame,
LLMThoughtEndFrame,
LLMThoughtStartFrame,
LLMThoughtTextFrame,
TranscriptionUpdateFrame,
LLMThoughtEndFrame,
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
)
# Verify both thoughts were captured
self.assertEqual(len(received_updates), 2)
first_message = received_updates[0].messages[0]
self.assertIsInstance(first_message, ThoughtTranscriptionMessage)
self.assertEqual(first_message.content, "First consideration")
second_message = received_updates[1].messages[0]
self.assertIsInstance(second_message, ThoughtTranscriptionMessage)
self.assertEqual(second_message.content, "Second consideration")
# Verify timestamps are different
self.assertNotEqual(first_message.timestamp, second_message.timestamp)
async def test_empty_thought_handling(self):
"""Test that empty thoughts are not emitted"""
processor = AssistantTranscriptProcessor()
received_updates: List[TranscriptionUpdateFrame] = []
@processor.event_handler("on_transcript_update")
async def handle_update(proc, frame: TranscriptionUpdateFrame):
received_updates.append(frame)
frames_to_send = [
LLMThoughtStartFrame(),
LLMThoughtTextFrame(text=""), # Empty
LLMThoughtTextFrame(text=" "), # Just whitespace
LLMThoughtEndFrame(),
]
expected_down_frames = [
LLMThoughtStartFrame,
LLMThoughtTextFrame,
LLMThoughtTextFrame,
LLMThoughtEndFrame,
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
)
# Verify no updates emitted for empty content
self.assertEqual(len(received_updates), 0)
async def test_thought_without_start_frame(self):
"""Test that thought text without start frame is ignored"""
processor = AssistantTranscriptProcessor()
received_updates: List[TranscriptionUpdateFrame] = []
@processor.event_handler("on_transcript_update")
async def handle_update(proc, frame: TranscriptionUpdateFrame):
received_updates.append(frame)
# Send thought text without start frame
frames_to_send = [
LLMThoughtTextFrame(text="This should be ignored"),
LLMThoughtEndFrame(),
]
expected_down_frames = [
LLMThoughtTextFrame,
LLMThoughtEndFrame,
]
await run_test(
processor,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
)
# Verify no updates since thought wasn't properly started
self.assertEqual(len(received_updates), 0)