Files
engine-v5-pipecat-core/engine/text_input.py
2026-06-02 13:45:38 +08:00

55 lines
1.7 KiB
Python

from __future__ import annotations
from loguru import logger
from pipecat.frames.frames import (
Frame,
InputTransportMessageFrame,
LLMMessagesAppendFrame,
UserImageRawFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class ProductTextInputProcessor(FrameProcessor):
"""Converts product text-input transport messages and marks image input as user activity."""
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, UserImageRawFrame):
await self.broadcast_frame(UserStartedSpeakingFrame)
await self.push_frame(frame, direction)
await self.broadcast_frame(UserStoppedSpeakingFrame)
return
if not isinstance(frame, InputTransportMessageFrame):
await self.push_frame(frame, direction)
return
message = frame.message
if not isinstance(message, dict) or message.get("type") != "input.text":
await self.push_frame(frame, direction)
return
text = str(message.get("text") or "").strip()
if not text:
return
await self.broadcast_frame(UserStartedSpeakingFrame)
if message.get("interrupt", True):
logger.info("Text input interrupting current response")
await self.broadcast_interruption()
await self.push_frame(
LLMMessagesAppendFrame(
messages=[{"role": "user", "content": text}],
run_llm=True,
),
FrameDirection.DOWNSTREAM,
)
await self.broadcast_frame(UserStoppedSpeakingFrame)