diff --git a/examples/foundational/99-content-moderation.py b/examples/foundational/99-content-moderation.py index cd5431c65..210840b8f 100644 --- a/examples/foundational/99-content-moderation.py +++ b/examples/foundational/99-content-moderation.py @@ -10,7 +10,7 @@ from dotenv import load_dotenv from loguru import logger from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import TextFrame +from pipecat.frames.frames import EndFrame, EndTaskFrame, TextFrame, TTSSpeakFrame from pipecat.pipeline.parallel_pipeline import ParallelPipeline from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner @@ -19,6 +19,7 @@ from pipecat.processors.aggregators.gated_openai_llm_context import GatedOpenAIL from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.processors.filters.null_filter import NullFilter from pipecat.processors.filters.wake_notifier_filter import WakeNotifierFilter +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.processors.user_idle_processor import UserIdleProcessor from pipecat.services.cartesia.tts import CartesiaTTSService from pipecat.services.deepgram.stt import DeepgramSTTService @@ -31,6 +32,35 @@ from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection load_dotenv(override=True) +class KickParticipantProcessor(FrameProcessor): + """This processor will kick the participant if they say something inappropriate. + + This is a simple example of how to use the LLM to moderate + the conversation. In this case we are using the OpenAI LLM to determine if + the user is saying something inappropriate. + + """ + + def __init__(self): + """Initialize the processor.""" + super().__init__() + + async def process_frame(self, frame, direction): + await super().process_frame(frame, direction) + + if isinstance(frame, TextFrame) and frame.text == "YES": + await self.push_frame( + TTSSpeakFrame( + "You are being kicked from the call because of content moderation. Have a nice day!" + ) + ) + + # Signal that the task should end after processing this frame + await self.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM) + + await self.push_frame(frame, direction) + + async def run_bot(webrtc_connection: SmallWebRTCConnection): logger.info(f"Starting bot") @@ -101,7 +131,7 @@ community guidelines. # This a filter that will wake up the notifier if the given predicate # (wake_check_filter) returns true. - completness_check = WakeNotifierFilter(notifier, types=(TextFrame,), filter=wake_check_filter) + completeness_check = WakeNotifierFilter(notifier, types=(TextFrame,), filter=wake_check_filter) # This processor keeps the last context and will let it through once the # notifier is woken up. We start with the gate open because we send an @@ -116,6 +146,8 @@ community guidelines. # moderated, this will wake up the notifier if that happens. user_idle = UserIdleProcessor(callback=user_idle_notifier, timeout=3.0) + kick_participant = KickParticipantProcessor() + # The ParallePipeline input are the user transcripts. We have two # contexts. The first one will be used to determine if the user is # moderated and if so the notifier will be woken up. The second @@ -129,7 +161,8 @@ community guidelines. [ moderator_context_aggregator.user(), moderator_llm, - completness_check, + kick_participant, + completeness_check, NullFilter(), ], [context_aggregator.user(), gated_context_aggregator, llm],