Merge pull request #853 from pipecat-ai/revert-849-aleix/no-need-for-super-process-frame

Revert "no longer necessary to call super().process_frame(frame, direction)"
This commit is contained in:
Aleix Conchillo Flaqué
2024-12-12 17:21:20 -08:00
committed by GitHub
57 changed files with 212 additions and 56 deletions

View File

@@ -13,12 +13,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
Tamil) and PlayHT (Afrikans, Albanian, Amharic, Arabic, Bengali, Croatian,
Galician, Hebrew, Mandarin, Serbian, Tagalog, Urdu, Xhosa).
### Changed
- It's no longer necessary to call `super().process_frame(frame, direction)` if
you subclass and implement `FrameProcessor.process_frame()`. This is all now
done internally and will avoid possible issues if you forget to add it.
### Deprecated
- `AWSTTSService` is now deprecated, use `PollyTTSService` instead.

View File

@@ -56,6 +56,8 @@ class MonthPrepender(FrameProcessor):
self.prepend_to_next_text_frame = False
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, MonthFrame):
self.most_recent_month = frame.month
elif self.prepend_to_next_text_frame and isinstance(frame, TextFrame):

View File

@@ -62,6 +62,8 @@ async def main():
self.text = ""
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame):
self.text = frame.text
await self.push_frame(frame, direction)
@@ -73,6 +75,8 @@ async def main():
self.frame = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TTSAudioRawFrame):
self.audio.extend(frame.audio)
self.frame = OutputAudioRawFrame(
@@ -86,6 +90,8 @@ async def main():
self.frame = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, URLImageRawFrame):
self.frame = frame
await self.push_frame(frame, direction)

View File

@@ -47,6 +47,8 @@ class ImageSyncAggregator(FrameProcessor):
self._waiting_image_bytes = self._waiting_image.tobytes()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if not isinstance(frame, SystemFrame) and direction == FrameDirection.DOWNSTREAM:
await self.push_frame(
OutputImageRawFrame(

View File

@@ -82,6 +82,8 @@ class UserAudioCollector(FrameProcessor):
self._user_speaking = False
async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
# We could gracefully handle both audio input and text/transcription input ...
# but let's leave that as an exercise to the reader. :-)
@@ -124,6 +126,7 @@ class TranscriptExtractor(FrameProcessor):
self._accumulating_transcript = False
async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
if isinstance(frame, LLMFullResponseStartFrame):
self._processing_llm_response = True
self._accumulating_transcript = True
@@ -177,6 +180,8 @@ class TanscriptionContextFixup(FrameProcessor):
self._context.messages[-1].parts[-1].text += f"\n\n{marker}\n{self._transcript}\n"
async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
if isinstance(frame, MagicDemoTranscriptionFrame):
self._transcript = frame.text
elif isinstance(frame, LLMFullResponseEndFrame) or isinstance(

View File

@@ -35,6 +35,8 @@ logger.add(sys.stderr, level="DEBUG")
class MirrorProcessor(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, InputAudioRawFrame):
await self.push_frame(
OutputAudioRawFrame(

View File

@@ -39,6 +39,8 @@ logger.add(sys.stderr, level="DEBUG")
class MirrorProcessor(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, InputAudioRawFrame):
await self.push_frame(
OutputAudioRawFrame(

View File

@@ -60,6 +60,8 @@ for file in sound_files:
class OutboundSoundEffectWrapper(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, LLMFullResponseEndFrame):
await self.push_frame(sounds["ding1.wav"])
# In case anything else downstream needs it
@@ -70,6 +72,8 @@ class OutboundSoundEffectWrapper(FrameProcessor):
class InboundSoundEffectWrapper(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, OpenAILLMContextFrame):
await self.push_frame(sounds["ding2.wav"])
# In case anything else downstream needs it

View File

@@ -42,6 +42,8 @@ class UserImageRequester(FrameProcessor):
self._participant_id = participant_id
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if self._participant_id and isinstance(frame, TextFrame):
await self.push_frame(
UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM

View File

@@ -42,6 +42,8 @@ class UserImageRequester(FrameProcessor):
self._participant_id = participant_id
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if self._participant_id and isinstance(frame, TextFrame):
await self.push_frame(
UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM

View File

@@ -42,6 +42,8 @@ class UserImageRequester(FrameProcessor):
self._participant_id = participant_id
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if self._participant_id and isinstance(frame, TextFrame):
await self.push_frame(
UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM

View File

@@ -42,6 +42,8 @@ class UserImageRequester(FrameProcessor):
self._participant_id = participant_id
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if self._participant_id and isinstance(frame, TextFrame):
await self.push_frame(
UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM

View File

@@ -30,6 +30,8 @@ logger.add(sys.stderr, level="DEBUG")
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
print(f"Transcription: {frame.text}")

View File

@@ -28,6 +28,8 @@ logger.add(sys.stderr, level="DEBUG")
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
print(f"Transcription: {frame.text}")

View File

@@ -31,6 +31,8 @@ logger.add(sys.stderr, level="DEBUG")
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
print(f"Transcription: {frame.text}")

View File

@@ -29,6 +29,8 @@ logger.add(sys.stderr, level="DEBUG")
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
print(f"Transcription: {frame.text}")

View File

@@ -29,6 +29,8 @@ logger.add(sys.stderr, level="DEBUG")
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
print(f"Transcription: {frame.text}")

View File

@@ -64,6 +64,7 @@ class StatementJudgeContextFilter(FrameProcessor):
self._notifier = notifier
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# We must not block system frames.
if isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
@@ -117,6 +118,7 @@ class CompletenessCheck(FrameProcessor):
self._notifier = notifier
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame) and frame.text == "YES":
logger.debug("Completeness check YES")
await self.push_frame(UserStoppedSpeakingFrame())
@@ -139,6 +141,8 @@ class OutputGate(FrameProcessor):
self._gate_open = True
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# We must not block system frames.
if isinstance(frame, SystemFrame):
if isinstance(frame, StartFrame):

View File

@@ -101,12 +101,12 @@ HIGH PRIORITY SIGNALS:
Examples:
# Complete Wh-question
[{"role": "assistant", "content": "I can help you learn."},
[{"role": "assistant", "content": "I can help you learn."},
{"role": "user", "content": "What's the fastest way to learn Spanish"}]
Output: YES
# Complete Yes/No question despite STT error
[{"role": "assistant", "content": "I know about planets."},
[{"role": "assistant", "content": "I know about planets."},
{"role": "user", "content": "Is is Jupiter the biggest planet"}]
Output: YES
@@ -118,12 +118,12 @@ Output: YES
Examples:
# Direct instruction
[{"role": "assistant", "content": "I can explain many topics."},
[{"role": "assistant", "content": "I can explain many topics."},
{"role": "user", "content": "Tell me about black holes"}]
Output: YES
# Action demand
[{"role": "assistant", "content": "I can help with math."},
[{"role": "assistant", "content": "I can help with math."},
{"role": "user", "content": "Solve this equation x plus 5 equals 12"}]
Output: YES
@@ -134,12 +134,12 @@ Output: YES
Examples:
# Specific answer
[{"role": "assistant", "content": "What's your favorite color?"},
[{"role": "assistant", "content": "What's your favorite color?"},
{"role": "user", "content": "I really like blue"}]
Output: YES
# Option selection
[{"role": "assistant", "content": "Would you prefer morning or evening?"},
[{"role": "assistant", "content": "Would you prefer morning or evening?"},
{"role": "user", "content": "Morning"}]
Output: YES
@@ -153,17 +153,17 @@ MEDIUM PRIORITY SIGNALS:
Examples:
# Self-correction reaching completion
[{"role": "assistant", "content": "What would you like to know?"},
[{"role": "assistant", "content": "What would you like to know?"},
{"role": "user", "content": "Tell me about... no wait, explain how rainbows form"}]
Output: YES
# Topic change with complete thought
[{"role": "assistant", "content": "The weather is nice today."},
[{"role": "assistant", "content": "The weather is nice today."},
{"role": "user", "content": "Actually can you tell me who invented the telephone"}]
Output: YES
# Mid-sentence completion
[{"role": "assistant", "content": "Hello I'm ready."},
[{"role": "assistant", "content": "Hello I'm ready."},
{"role": "user", "content": "What's the capital of? France"}]
Output: YES
@@ -175,12 +175,12 @@ Output: YES
Examples:
# Acknowledgment
[{"role": "assistant", "content": "Should we talk about history?"},
[{"role": "assistant", "content": "Should we talk about history?"},
{"role": "user", "content": "Sure"}]
Output: YES
# Disagreement with completion
[{"role": "assistant", "content": "Is that what you meant?"},
[{"role": "assistant", "content": "Is that what you meant?"},
{"role": "user", "content": "No not really"}]
Output: YES
@@ -194,12 +194,12 @@ LOW PRIORITY SIGNALS:
Examples:
# Word repetition but complete
[{"role": "assistant", "content": "I can help with that."},
[{"role": "assistant", "content": "I can help with that."},
{"role": "user", "content": "What what is the time right now"}]
Output: YES
# Missing punctuation but complete
[{"role": "assistant", "content": "I can explain that."},
[{"role": "assistant", "content": "I can explain that."},
{"role": "user", "content": "Please tell me how computers work"}]
Output: YES
@@ -211,12 +211,12 @@ Output: YES
Examples:
# Filler words but complete
[{"role": "assistant", "content": "What would you like to know?"},
[{"role": "assistant", "content": "What would you like to know?"},
{"role": "user", "content": "Um uh how do airplanes fly"}]
Output: YES
# Thinking pause but incomplete
[{"role": "assistant", "content": "I can explain anything."},
[{"role": "assistant", "content": "I can explain anything."},
{"role": "user", "content": "Well um I want to know about the"}]
Output: NO
@@ -241,17 +241,17 @@ DECISION RULES:
Examples:
# Incomplete despite corrections
[{"role": "assistant", "content": "What would you like to know about?"},
[{"role": "assistant", "content": "What would you like to know about?"},
{"role": "user", "content": "Can you tell me about"}]
Output: NO
# Complete despite multiple artifacts
[{"role": "assistant", "content": "I can help you learn."},
[{"role": "assistant", "content": "I can help you learn."},
{"role": "user", "content": "How do you I mean what's the best way to learn programming"}]
Output: YES
# Trailing off incomplete
[{"role": "assistant", "content": "I can explain anything."},
[{"role": "assistant", "content": "I can explain anything."},
{"role": "user", "content": "I was wondering if you could tell me why"}]
Output: NO
"""
@@ -268,6 +268,7 @@ class StatementJudgeContextFilter(FrameProcessor):
self._notifier = notifier
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# We must not block system frames.
if isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
@@ -319,6 +320,8 @@ class CompletenessCheck(FrameProcessor):
self._notifier = notifier
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame) and frame.text == "YES":
logger.debug("!!! Completeness check YES")
await self.push_frame(UserStoppedSpeakingFrame())
@@ -341,6 +344,8 @@ class OutputGate(FrameProcessor):
self._gate_open = True
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# We must not block system frames.
if isinstance(frame, SystemFrame):
if isinstance(frame, StartFrame):

View File

@@ -90,6 +90,8 @@ class StatementJudgeAudioContextAccumulator(FrameProcessor):
self._user_speaking = False
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# ignore context frame
if isinstance(frame, OpenAILLMContextFrame):
return
@@ -131,6 +133,8 @@ class CompletenessCheck(FrameProcessor):
self._audio_accumulator = audio_accumulator
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame) and frame.text.startswith("YES"):
logger.debug("Completeness check YES")
await self.push_frame(UserStoppedSpeakingFrame())
@@ -155,6 +159,8 @@ class OutputGate(FrameProcessor):
self._gate_open = True
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# We must not block system frames.
if isinstance(frame, SystemFrame):
if isinstance(frame, StartFrame):

View File

@@ -95,6 +95,8 @@ class UserAudioCollector(FrameProcessor):
self._user_speaking = False
async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
# We could gracefully handle both audio input and text/transcription input ...
# but let's leave that as an exercise to the reader. :-)
@@ -133,6 +135,8 @@ class InputTranscriptionContextFilter(FrameProcessor):
"""
async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
if isinstance(frame, SystemFrame):
# We don't want to block system frames.
await self.push_frame(frame, direction)
@@ -206,6 +210,8 @@ class InputTranscriptionFrameEmitter(FrameProcessor):
self._aggregation = ""
async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame):
self._aggregation += frame.text
elif isinstance(frame, LLMFullResponseEndFrame):
@@ -256,6 +262,8 @@ class TranscriptionContextFixup(FrameProcessor):
audio_part.text = self._transcript
async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
if isinstance(frame, LLMDemoTranscriptionFrame):
logger.info(f"Transcription from Gemini: {frame.text}")
self._transcript = frame.text

View File

@@ -81,6 +81,8 @@ class TalkingAnimation(FrameProcessor):
self._is_talking = False
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, BotStartedSpeakingFrame):
if not self._is_talking:
await self.push_frame(talking_frame)
@@ -101,6 +103,8 @@ class UserImageRequester(FrameProcessor):
self.participant_id = participant_id
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if self.participant_id and isinstance(frame, TextFrame):
if frame.text == user_request_answer:
await self.push_frame(
@@ -117,6 +121,8 @@ class TextFilterProcessor(FrameProcessor):
self.text = text
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame):
if frame.text != self.text:
await self.push_frame(frame)
@@ -126,6 +132,8 @@ class TextFilterProcessor(FrameProcessor):
class ImageFilterProcessor(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if not isinstance(frame, ImageRawFrame):
await self.push_frame(frame, direction)

View File

@@ -95,6 +95,8 @@ class TalkingAnimation(FrameProcessor):
frame: The incoming frame to process
direction: The direction of frame flow in the pipeline
"""
await super().process_frame(frame, direction)
# Switch to talking animation when bot starts speaking
if isinstance(frame, BotStartedSpeakingFrame):
if not self._is_talking:

View File

@@ -95,6 +95,8 @@ class TalkingAnimation(FrameProcessor):
frame: The incoming frame to process
direction: The direction of frame flow in the pipeline
"""
await super().process_frame(frame, direction)
# Switch to talking animation when bot starts speaking
if isinstance(frame, BotStartedSpeakingFrame):
if not self._is_talking:

View File

@@ -54,6 +54,8 @@ class StoryImageProcessor(FrameProcessor):
self._fal_service = fal_service
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, StoryImageFrame):
try:
async with timeout(7):
@@ -88,6 +90,8 @@ class StoryProcessor(FrameProcessor):
self._story = story
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, UserStoppedSpeakingFrame):
# Send an app message to the UI
await self.push_frame(DailyTransportMessageFrame(CUE_ASSISTANT_TURN))

View File

@@ -51,6 +51,8 @@ class TranslationProcessor(FrameProcessor):
self._language = language
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame):
context = [
{
@@ -76,6 +78,8 @@ class TranslationSubtitles(FrameProcessor):
# subtitles.
#
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame):
message = {"language": self._language, "text": frame.text}
await self.push_frame(DailyTransportMessageFrame(message))

View File

@@ -28,6 +28,8 @@ class Source(FrameProcessor):
self._push_frame_func = push_frame_func
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
match direction:
case FrameDirection.UPSTREAM:
if isinstance(frame, SystemFrame):
@@ -49,6 +51,8 @@ class Sink(FrameProcessor):
self._push_frame_func = push_frame_func
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
match direction:
case FrameDirection.UPSTREAM:
await self.push_frame(frame, direction)
@@ -116,6 +120,8 @@ class ParallelPipeline(BasePipeline):
self._down_task = loop.create_task(self._process_down_queue())
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, StartFrame):
await self._start_tasks()

View File

@@ -17,6 +17,8 @@ class PipelineSource(FrameProcessor):
self._upstream_push_frame = upstream_push_frame
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
match direction:
case FrameDirection.UPSTREAM:
await self._upstream_push_frame(frame, direction)
@@ -30,6 +32,8 @@ class PipelineSink(FrameProcessor):
self._downstream_push_frame = downstream_push_frame
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
match direction:
case FrameDirection.UPSTREAM:
await self.push_frame(frame, direction)
@@ -70,6 +74,8 @@ class Pipeline(BasePipeline):
await self._cleanup_processors()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if direction == FrameDirection.DOWNSTREAM:
await self._source.queue_frame(frame, FrameDirection.DOWNSTREAM)
elif direction == FrameDirection.UPSTREAM:

View File

@@ -31,6 +31,8 @@ class Source(FrameProcessor):
self._up_queue = upstream_queue
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
match direction:
case FrameDirection.UPSTREAM:
await self._up_queue.put(frame)
@@ -44,6 +46,8 @@ class Sink(FrameProcessor):
self._down_queue = downstream_queue
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
match direction:
case FrameDirection.UPSTREAM:
await self.push_frame(frame, direction)
@@ -99,6 +103,8 @@ class SyncParallelPipeline(BasePipeline):
#
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# The last processor of each pipeline needs to be synchronous otherwise
# this element won't work. Since, we know it should be synchronous we
# push a SyncFrame. Since frames are ordered we know this frame will be

View File

@@ -45,6 +45,8 @@ class Source(FrameProcessor):
self._up_queue = up_queue
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
match direction:
case FrameDirection.UPSTREAM:
await self._handle_upstream_frame(frame)
@@ -73,6 +75,8 @@ class Sink(FrameProcessor):
self._down_queue = down_queue
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# We really just want to know when the EndFrame reached the sink.
if isinstance(frame, EndFrame):
await self._down_queue.put(frame)

View File

@@ -56,6 +56,8 @@ class GatedAggregator(FrameProcessor):
self._accumulator: List[Tuple[Frame, FrameDirection]] = []
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# We must not block system frames.
if isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)

View File

@@ -24,6 +24,8 @@ class GatedOpenAILLMContextAggregator(FrameProcessor):
self._last_context_frame = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, StartFrame):
await self.push_frame(frame)
await self._start()

View File

@@ -86,6 +86,8 @@ class LLMResponseAggregator(FrameProcessor):
# and T2 would be dropped.
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
send_aggregation = False
if isinstance(frame, self._start_frame):
@@ -238,6 +240,8 @@ class LLMFullResponseAggregator(FrameProcessor):
self._aggregation = ""
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame):
self._aggregation += frame.text
elif isinstance(frame, LLMFullResponseEndFrame):

View File

@@ -33,6 +33,8 @@ class SentenceAggregator(FrameProcessor):
self._aggregation = ""
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# We ignore interim description at this point.
if isinstance(frame, InterimTranscriptionFrame):
return

View File

@@ -85,6 +85,8 @@ class ResponseAggregator(FrameProcessor):
# and T2 would be dropped.
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
send_aggregation = False
if isinstance(frame, self._start_frame):

View File

@@ -31,6 +31,8 @@ class VisionImageFrameAggregator(FrameProcessor):
self._describe_text = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame):
self._describe_text = frame.text
elif isinstance(frame, InputImageRawFrame):

View File

@@ -24,6 +24,8 @@ class AsyncGeneratorProcessor(FrameProcessor):
self._data_queue = asyncio.Queue()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
await self.push_frame(frame, direction)
if isinstance(frame, (CancelFrame, EndFrame)):

View File

@@ -68,6 +68,8 @@ class AudioBufferProcessor(FrameProcessor):
self._bot_audio_buffer = bytearray()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# Include all audio from the user.
if isinstance(frame, InputAudioRawFrame):
resampled = resample_audio(frame.audio, frame.sample_rate, self._sample_rate)

View File

@@ -39,6 +39,8 @@ class SileroVAD(FrameProcessor):
#
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, AudioRawFrame):
await self._analyze_audio(frame)
if self._audio_passthrough:

View File

@@ -26,5 +26,7 @@ class FrameFilter(FrameProcessor):
return isinstance(frame, ControlFrame) or isinstance(frame, SystemFrame)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if self._should_passthrough_frame(frame):
await self.push_frame(frame, direction)

View File

@@ -29,6 +29,8 @@ class FunctionFilter(FrameProcessor):
return isinstance(frame, SystemFrame) or direction != self._direction
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
passthrough = self._should_passthrough_frame(frame, direction)
allowed = await self._filter(frame)
if passthrough or allowed:

View File

@@ -26,4 +26,5 @@ class IdentityFilter(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process an incoming frame by passing it through unchanged."""
await super().process_frame(frame, direction)
await self.push_frame(frame, direction)

View File

@@ -45,6 +45,8 @@ class WakeCheckFilter(FrameProcessor):
self._wake_patterns.append(pattern)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
try:
if isinstance(frame, TranscriptionFrame):
p = self._participant_states.get(frame.user_id)

View File

@@ -32,6 +32,8 @@ class WakeNotifierFilter(FrameProcessor):
self._filter = filter
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, self._types) and await self._filter(frame):
await self._notifier.notify()

View File

@@ -161,13 +161,6 @@ class FrameProcessor:
def get_clock(self) -> BaseClock:
return self._clock
async def pause_processing_frames(self):
self.__should_block_frames = True
async def resume_processing_frames(self):
self.__input_event.set()
self.__should_block_frames = False
async def queue_frame(
self,
frame: Frame,
@@ -182,13 +175,32 @@ class FrameProcessor:
if isinstance(frame, SystemFrame):
# We don't want to queue system frames.
await self._process_frame(frame, direction)
await self.process_frame(frame, direction)
else:
# We queue everything else.
await self.__input_queue.put((frame, direction, callback))
async def pause_processing_frames(self):
self.__should_block_frames = True
async def resume_processing_frames(self):
self.__input_event.set()
self.__should_block_frames = False
async def process_frame(self, frame: Frame, direction: FrameDirection):
pass
if isinstance(frame, StartFrame):
self._clock = frame.clock
self._allow_interruptions = frame.allow_interruptions
self._enable_metrics = frame.enable_metrics
self._enable_usage_metrics = frame.enable_usage_metrics
self._report_only_initial_ttfb = frame.report_only_initial_ttfb
elif isinstance(frame, StartInterruptionFrame):
await self._start_interruption()
await self.stop_all_metrics()
elif isinstance(frame, StopInterruptionFrame):
self._should_report_ttfb = True
elif isinstance(frame, CancelFrame):
self._cancelling = True
async def push_error(self, error: ErrorFrame):
await self.push_frame(error, FrameDirection.UPSTREAM)
@@ -216,28 +228,6 @@ class FrameProcessor:
raise Exception(f"Event handler {event_name} already registered")
self._event_handlers[event_name] = []
#
# Frame processing
#
async def _process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, StartFrame):
self._clock = frame.clock
self._allow_interruptions = frame.allow_interruptions
self._enable_metrics = frame.enable_metrics
self._enable_usage_metrics = frame.enable_usage_metrics
self._report_only_initial_ttfb = frame.report_only_initial_ttfb
elif isinstance(frame, StartInterruptionFrame):
await self._start_interruption()
await self.stop_all_metrics()
elif isinstance(frame, StopInterruptionFrame):
self._should_report_ttfb = True
elif isinstance(frame, CancelFrame):
self._cancelling = True
# Call subclass.
await self.process_frame(frame, direction)
#
# Handle interruptions
#
@@ -299,7 +289,7 @@ class FrameProcessor:
(frame, direction, callback) = await self.__input_queue.get()
# Process the frame.
await self._process_frame(frame, direction)
await self.process_frame(frame, direction)
# If this frame has an associated callback, call it now.
if callback:

View File

@@ -36,6 +36,8 @@ class LangchainProcessor(FrameProcessor):
self._participant_id = participant_id
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, LLMMessagesFrame):
# Messages are accumulated on the context as a list of messages.
# The last one by the human is the one we want to send to the LLM.

View File

@@ -380,6 +380,8 @@ class RTVISpeakingProcessor(RTVIFrameProcessor):
super().__init__(**kwargs)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
await self.push_frame(frame, direction)
if isinstance(frame, (UserStartedSpeakingFrame, UserStoppedSpeakingFrame)):
@@ -413,6 +415,8 @@ class RTVIUserTranscriptionProcessor(RTVIFrameProcessor):
super().__init__(**kwargs)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
await self.push_frame(frame, direction)
if isinstance(frame, (TranscriptionFrame, InterimTranscriptionFrame)):
@@ -442,6 +446,8 @@ class RTVIUserLLMTextProcessor(RTVIFrameProcessor):
super().__init__(**kwargs)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
await self.push_frame(frame, direction)
if isinstance(frame, OpenAILLMContextFrame):
@@ -467,6 +473,8 @@ class RTVIBotTranscriptionProcessor(RTVIFrameProcessor):
self._aggregation = ""
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
await self.push_frame(frame, direction)
if isinstance(frame, UserStartedSpeakingFrame):
@@ -488,6 +496,8 @@ class RTVIBotLLMProcessor(RTVIFrameProcessor):
super().__init__(**kwargs)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
await self.push_frame(frame, direction)
if isinstance(frame, LLMFullResponseStartFrame):
@@ -504,6 +514,8 @@ class RTVIBotTTSProcessor(RTVIFrameProcessor):
super().__init__(**kwargs)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
await self.push_frame(frame, direction)
if isinstance(frame, TTSStartedFrame):
@@ -520,6 +532,8 @@ class RTVIMetricsProcessor(RTVIFrameProcessor):
super().__init__(**kwargs)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
await self.push_frame(frame, direction)
if isinstance(frame, MetricsFrame):
@@ -628,6 +642,8 @@ class RTVIProcessor(FrameProcessor):
await self._push_transport_message(message, exclude_none=False)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# Specific system frames
if isinstance(frame, StartFrame):
# Push StartFrame before start(), because we want StartFrame to be

View File

@@ -66,6 +66,8 @@ class GStreamerPipelineSource(FrameProcessor):
bus.connect("message", self._on_gstreamer_message)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# Specific system frames
if isinstance(frame, StartFrame):
# Push StartFrame before start(), because we want StartFrame to be

View File

@@ -35,6 +35,8 @@ class IdleFrameProcessor(FrameProcessor):
self._create_idle_task()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
await self.push_frame(frame, direction)
# If we are not waiting for any specific frame set the event, otherwise

View File

@@ -27,6 +27,8 @@ class StatelessTextTransformer(FrameProcessor):
self._transform_fn = transform_fn
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame):
result = self._transform_fn(frame.text)
if isinstance(result, Coroutine):

View File

@@ -43,6 +43,8 @@ class UserIdleProcessor(FrameProcessor):
await self._idle_task
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# Check for end frames before processing
if isinstance(frame, (EndFrame, CancelFrame)):
await self._stop()

View File

@@ -110,6 +110,8 @@ class AIService(FrameProcessor):
logger.warning(f"Unknown setting for {self.name} service: {key}")
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, StartFrame):
await self.start(frame)
elif isinstance(frame, CancelFrame):

View File

@@ -92,6 +92,7 @@ class SimliVideoService(FrameProcessor):
pass
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, StartFrame):
await self.push_frame(frame, direction)
await self._start_connection()

View File

@@ -79,6 +79,8 @@ class BaseInputTransport(FrameProcessor):
#
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# Specific system frames
if isinstance(frame, StartFrame):
# Push StartFrame before start(), because we want StartFrame to be

View File

@@ -120,6 +120,8 @@ class BaseOutputTransport(FrameProcessor):
#
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
#
# System frames (like StartInterruptionFrame) are pushed
# immediately. Other frames require order so they are put in the sink

View File

@@ -13,6 +13,8 @@ class TestFrameProcessor(FrameProcessor):
super().__init__()
async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
if not self.test_frames[
0
]: # then we've run out of required frames but the generator is still going?

View File

@@ -42,6 +42,8 @@ class TestLangchain(unittest.IsolatedAsyncioTestCase):
return self.name
async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
if isinstance(frame, LLMFullResponseStartFrame):
self.start_collecting = True
elif isinstance(frame, TextFrame) and self.start_collecting: