from __future__ import annotations from loguru import logger from pipecat.frames.frames import Frame, InputTransportMessageFrame, LLMMessagesAppendFrame from pipecat.processors.frame_processor import FrameDirection, FrameProcessor class ProductTextInputProcessor(FrameProcessor): """Converts product text-input transport messages into LLM turns.""" async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) if not isinstance(frame, InputTransportMessageFrame): await self.push_frame(frame, direction) return message = frame.message if not isinstance(message, dict) or message.get("type") != "input.text": await self.push_frame(frame, direction) return text = str(message.get("text") or "").strip() if not text: return if message.get("interrupt", True): logger.info("Text input interrupting current response") await self.broadcast_interruption() await self.push_frame( LLMMessagesAppendFrame( messages=[{"role": "user", "content": text}], run_llm=True, ), FrameDirection.DOWNSTREAM, )