from __future__ import annotations from typing import Any from pipecat.frames.frames import Frame, InterruptionFrame, LLMMessagesAppendFrame from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from .text_stream import ProductTextStreamProcessor, maybe_sync_assistant_context class AssistantContextSyncProcessor(FrameProcessor): """Sync LLM context to urgent-streamed assistant text before text-input turns. ``input.text`` with ``interrupt: true`` queues ``InterruptionFrame`` before ``LLMMessagesAppendFrame``. This processor runs context repair after the interrupt has propagated (including TTS-phase interrupts) and before the new user message is appended. """ def __init__( self, *, text_stream: ProductTextStreamProcessor, assistant_aggregator: Any, ) -> None: super().__init__() self._text_stream = text_stream self._assistant_aggregator = assistant_aggregator self._sync_on_next_append = False async def process_frame(self, frame: Frame, direction: FrameDirection) -> None: await super().process_frame(frame, direction) if isinstance(frame, InterruptionFrame): self._sync_on_next_append = True elif isinstance(frame, LLMMessagesAppendFrame) and self._sync_on_next_append: self._sync_on_next_append = False maybe_sync_assistant_context(self._assistant_aggregator, self._text_stream) await self.push_frame(frame, direction)