from __future__ import annotations from loguru import logger from pipecat.frames.frames import ( Frame, InputTransportMessageFrame, LLMMessagesAppendFrame, UserImageRawFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor class ProductTextInputProcessor(FrameProcessor): """Converts product text-input transport messages and marks image input as user activity.""" async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) if isinstance(frame, UserImageRawFrame): await self.broadcast_frame(UserStartedSpeakingFrame) await self.push_frame(frame, direction) await self.broadcast_frame(UserStoppedSpeakingFrame) return if not isinstance(frame, InputTransportMessageFrame): await self.push_frame(frame, direction) return message = frame.message if not isinstance(message, dict) or message.get("type") != "input.text": await self.push_frame(frame, direction) return text = str(message.get("text") or "").strip() if not text: return await self.broadcast_frame(UserStartedSpeakingFrame) if message.get("interrupt", True): logger.info("Text input interrupting current response") await self.broadcast_interruption() await self.push_frame( LLMMessagesAppendFrame( messages=[{"role": "user", "content": text}], run_llm=True, ), FrameDirection.DOWNSTREAM, ) await self.broadcast_frame(UserStoppedSpeakingFrame)