Files
engine-v5-pipecat-core/engine/context_sync.py

41 lines
1.5 KiB
Python

from __future__ import annotations
from typing import Any
from pipecat.frames.frames import Frame, InterruptionFrame, LLMMessagesAppendFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from .text_stream import ProductTextStreamProcessor, maybe_sync_assistant_context
class AssistantContextSyncProcessor(FrameProcessor):
"""Sync LLM context to urgent-streamed assistant text before text-input turns.
``input.text`` with ``interrupt: true`` queues ``InterruptionFrame`` before
``LLMMessagesAppendFrame``. This processor runs context repair after the
interrupt has propagated (including TTS-phase interrupts) and before the new
user message is appended.
"""
def __init__(
self,
*,
text_stream: ProductTextStreamProcessor,
assistant_aggregator: Any,
) -> None:
super().__init__()
self._text_stream = text_stream
self._assistant_aggregator = assistant_aggregator
self._sync_on_next_append = False
async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
await super().process_frame(frame, direction)
if isinstance(frame, InterruptionFrame):
self._sync_on_next_append = True
elif isinstance(frame, LLMMessagesAppendFrame) and self._sync_on_next_append:
self._sync_on_next_append = False
maybe_sync_assistant_context(self._assistant_aggregator, self._text_stream)
await self.push_frame(frame, direction)