From b71b6fa487c4f8daf0cdf75fb97caaf82bfcc775 Mon Sep 17 00:00:00 2001 From: James Hush Date: Sun, 25 May 2025 15:17:05 +0800 Subject: [PATCH] Send message to client --- examples/foundational/18b-gstreamer.py | 35 +++++++++++++++----------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/examples/foundational/18b-gstreamer.py b/examples/foundational/18b-gstreamer.py index 21925987f..b8723c419 100644 --- a/examples/foundational/18b-gstreamer.py +++ b/examples/foundational/18b-gstreamer.py @@ -26,6 +26,12 @@ from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask from pipecat.processors.aggregators.vision_image_frame import VisionImageFrameAggregator from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.processors.frameworks.rtvi import ( + RTVIConfig, + RTVIObserver, + RTVIProcessor, + RTVIServerMessageFrame, +) from pipecat.processors.gstreamer.pipeline_source import GStreamerPipelineSource from pipecat.services.moondream.vision import MoondreamService from pipecat.transports.base_input import BaseInputTransport @@ -38,8 +44,9 @@ load_dotenv(override=True) class AlertProcessor(FrameProcessor): - def __init__(self): + def __init__(self, connection: SmallWebRTCConnection): super().__init__() + self._connection = connection async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) @@ -47,11 +54,8 @@ class AlertProcessor(FrameProcessor): if isinstance(frame, TextFrame): logger.info(f"Alert Processor received text: {frame.text}") text = frame.text.strip().upper() - if text == "YES": - # SEND AN EMAIL HERE - logger.info("Alert: YES") - else: - logger.info("Alert: NO") + message_frame = RTVIServerMessageFrame(data=text) + await self.push_frame(message_frame) await self.push_frame(frame, direction) @@ -102,16 +106,19 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, args: argparse.Names ), ) + rtvi = RTVIProcessor(config=RTVIConfig(config=[])) + # If you run into weird description, try with use_cpu=True moondream = MoondreamService() ir = UserImageRequester() va = VisionImageFrameAggregator() - alert = AlertProcessor() + alert = AlertProcessor(connection=webrtc_connection) pipeline = Pipeline( [ gst, # GStreamer file source + rtvi, ir, # debug, va, @@ -124,6 +131,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, args: argparse.Names task = PipelineTask( pipeline, observers=[ + RTVIObserver(rtvi), DebugLogObserver( frame_types={ # TextFrame: None, @@ -135,18 +143,15 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, args: argparse.Names ], ) + @rtvi.event_handler("on_client_ready") + async def on_client_ready(rtvi): + logger.info(f"Bot ready: {rtvi}") + await rtvi.set_bot_ready() + @transport.event_handler("on_client_connected") async def on_client_connected(transport, client): logger.info(f"Client connected: {client}") - await task.queue_frames( - [ - TextFrame( - "Are there people in the bottom right corner of the image? Only answer with YES or NO." - ) - ] - ) - runner = PipelineRunner(handle_sigint=False) await runner.run(task)