41 lines
1.5 KiB
Python
41 lines
1.5 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
from pipecat.frames.frames import Frame, InterruptionFrame, LLMMessagesAppendFrame
|
|
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
|
|
|
from .text_stream import ProductTextStreamProcessor, maybe_sync_assistant_context
|
|
|
|
|
|
class AssistantContextSyncProcessor(FrameProcessor):
|
|
"""Sync LLM context to urgent-streamed assistant text before text-input turns.
|
|
|
|
``input.text`` with ``interrupt: true`` queues ``InterruptionFrame`` before
|
|
``LLMMessagesAppendFrame``. This processor runs context repair after the
|
|
interrupt has propagated (including TTS-phase interrupts) and before the new
|
|
user message is appended.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
text_stream: ProductTextStreamProcessor,
|
|
assistant_aggregator: Any,
|
|
) -> None:
|
|
super().__init__()
|
|
self._text_stream = text_stream
|
|
self._assistant_aggregator = assistant_aggregator
|
|
self._sync_on_next_append = False
|
|
|
|
async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
|
|
await super().process_frame(frame, direction)
|
|
|
|
if isinstance(frame, InterruptionFrame):
|
|
self._sync_on_next_append = True
|
|
elif isinstance(frame, LLMMessagesAppendFrame) and self._sync_on_next_append:
|
|
self._sync_on_next_append = False
|
|
maybe_sync_assistant_context(self._assistant_aggregator, self._text_stream)
|
|
|
|
await self.push_frame(frame, direction)
|