From 1dcfb98e6339f515f4d6d0dd01068cd91281d016 Mon Sep 17 00:00:00 2001 From: Xin Wang Date: Thu, 28 May 2026 14:29:00 +0800 Subject: [PATCH] Add set_info input event --- engine/fastgpt_llm.py | 127 ++++++++++++++++++++++++++++++++++++- engine/pipeline.py | 2 + engine/product_protocol.py | 10 +++ engine/state_info.py | 98 ++++++++++++++++++++++++++++ 4 files changed, 236 insertions(+), 1 deletion(-) create mode 100644 engine/state_info.py diff --git a/engine/fastgpt_llm.py b/engine/fastgpt_llm.py index b05a0f2..d3c0466 100644 --- a/engine/fastgpt_llm.py +++ b/engine/fastgpt_llm.py @@ -1,5 +1,7 @@ from __future__ import annotations +import asyncio +import json import uuid from dataclasses import dataclass, field from typing import Any @@ -19,12 +21,15 @@ from pipecat.frames.frames import ( LLMFullResponseStartFrame, LLMTextFrame, OutputTransportMessageFrame, + OutputTransportMessageUrgentFrame, ) from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.frame_processor import FrameDirection from pipecat.services.llm_service import LLMService from pipecat.services.settings import LLMSettings +from .state_info import FastGPTStateFlushRequestFrame + def _extract_text_from_event(kind: str, payload: Any) -> str: if not isinstance(payload, dict): @@ -369,10 +374,130 @@ class FastGPTLLMService(LLMService): FrameDirection.DOWNSTREAM, ) + async def _process_state_flush_request(self, frame: FastGPTStateFlushRequestFrame) -> None: + try: + await self._run_state_transaction(frame) + except Exception as exc: + logger.error( + "FastGPT set_info failed " + f"request_id={frame.request_id} key={frame.key!r}: {exc}" + ) + await self._push_state_ack( + request_id=frame.request_id, + ok=False, + error=str(exc) or "FastGPT state update failed", + retryable=True, + ) + return + + await self._push_state_ack(request_id=frame.request_id, ok=True) + + async def _run_state_transaction(self, frame: FastGPTStateFlushRequestFrame) -> None: + task = asyncio.create_task(self._set_fastgpt_state(frame)) + try: + await asyncio.shield(task) + except asyncio.CancelledError: + logger.info( + "Waiting for in-flight FastGPT set_info to finish after cancellation " + f"request_id={frame.request_id}" + ) + await task + + async def _set_fastgpt_state(self, frame: FastGPTStateFlushRequestFrame) -> None: + current_state = await self._read_fastgpt_state() + await self._delete_last_two_chat_records() + + current_state[frame.key] = frame.value + logger.info( + "Writing FastGPT state " + f"chatId={self._chat_id} request_id={frame.request_id} key={frame.key!r}" + ) + + response = await self._client.create_chat_completion( + messages=[{"role": "user", "content": ""}], + chatId=self._chat_id, + stream=False, + detail=True, + variables={"state": current_state}, + ) + response.raise_for_status() + await self._delete_last_two_chat_records() + + async def _read_fastgpt_state(self) -> dict[str, Any]: + response = await self._client.create_chat_completion( + messages=[{"role": "user", "content": ""}], + chatId=self._chat_id, + stream=False, + detail=True, + ) + response.raise_for_status() + data = response.json() + + state = data.get("newVariables", {}).get("state", {}) + if isinstance(state, str): + state = json.loads(state) if state else {} + if state is None: + return {} + if not isinstance(state, dict): + raise ValueError("FastGPT newVariables.state must be an object or JSON object string") + return dict(state) + + async def _delete_last_two_chat_records(self) -> None: + if not self._app_id: + raise ValueError("FastGPT app_id is required to clean synthetic chat records") + + response = await self._client.get_chat_records( + appId=self._app_id, + chatId=self._chat_id, + offset=0, + pageSize=10, + ) + response.raise_for_status() + data = response.json() + + records = data.get("data", {}).get("list", []) + if len(records) < 2: + logger.warning(f"Less than 2 FastGPT records found for chatId={self._chat_id}") + return + + data_ids = [record["dataId"] for record in records[-2:]] + logger.info(f"Deleting FastGPT synthetic records chatId={self._chat_id} dataIds={data_ids}") + for data_id in data_ids: + delete_response = await self._client.delete_chat_record( + appId=self._app_id, + chatId=self._chat_id, + contentId=data_id, + ) + delete_response.raise_for_status() + + async def _push_state_ack( + self, + *, + request_id: str, + ok: bool, + error: str | None = None, + retryable: bool | None = None, + ) -> None: + payload: dict[str, Any] = { + "type": "session.set_info.ack", + "request_id": request_id, + "ok": ok, + } + if error is not None: + payload["error"] = error + if retryable is not None: + payload["retryable"] = retryable + await self.push_frame( + OutputTransportMessageUrgentFrame(message=payload), + FrameDirection.DOWNSTREAM, + ) + async def process_frame(self, frame: Frame, direction: FrameDirection) -> None: await super().process_frame(frame, direction) - if isinstance(frame, LLMContextFrame): + if isinstance(frame, FastGPTStateFlushRequestFrame): + await self._process_state_flush_request(frame) + elif isinstance(frame, LLMContextFrame): try: await self.push_frame(LLMFullResponseStartFrame()) await self.start_processing_metrics() diff --git a/engine/pipeline.py b/engine/pipeline.py index ec2519f..aa62c49 100644 --- a/engine/pipeline.py +++ b/engine/pipeline.py @@ -39,6 +39,7 @@ from .fastgpt_llm import FastGPTLLMService from .product_protocol import ProductWebsocketSerializer from .response_state import StateTagResponseProcessor from .services import create_llm_service, create_stt_service, create_tts_service +from .state_info import SetInfoProcessor from .text_input import ProductTextInputProcessor from .text_stream import ProductTextStreamProcessor, maybe_sync_assistant_context from .transcript_stream import ProductTranscriptStreamProcessor @@ -156,6 +157,7 @@ async def run_pipeline_with_serializer( processors = [ transport.input(), + SetInfoProcessor(enabled=llm_config.is_fastgpt), ProductTextInputProcessor(), stt, ProductTranscriptStreamProcessor(), diff --git a/engine/product_protocol.py b/engine/product_protocol.py index 79c71b0..9de8af0 100644 --- a/engine/product_protocol.py +++ b/engine/product_protocol.py @@ -153,6 +153,16 @@ class ProductWebsocketSerializer(FrameSerializer): } ) + if message_type == "session.set_info": + return InputTransportMessageFrame( + message={ + "type": "session.set_info", + "request_id": message.get("request_id"), + "key": message.get("key"), + "value": message.get("value"), + } + ) + if message_type == "transport.message": payload = message.get("message") return InputTransportMessageFrame(message=payload if isinstance(payload, dict) else message) diff --git a/engine/state_info.py b/engine/state_info.py new file mode 100644 index 0000000..da82849 --- /dev/null +++ b/engine/state_info.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +import uuid +from dataclasses import dataclass +from typing import Any + +from loguru import logger + +from pipecat.frames.frames import ( + ControlFrame, + Frame, + InputTransportMessageFrame, + OutputTransportMessageUrgentFrame, + UninterruptibleFrame, +) +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor + + +@dataclass +class FastGPTStateFlushRequestFrame(ControlFrame, UninterruptibleFrame): + """Queued FastGPT state update request. + + This frame carries one pending set_info operation. It is not a state cache: + the FastGPT service still reads the latest state before applying it. + """ + + request_id: str + key: str + value: Any + + +class SetInfoProcessor(FrameProcessor): + """Converts product set_info messages into queued FastGPT state writes.""" + + def __init__(self, *, enabled: bool = True): + super().__init__() + self._enabled = enabled + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if not isinstance(frame, InputTransportMessageFrame): + await self.push_frame(frame, direction) + return + + message = frame.message + if not isinstance(message, dict) or message.get("type") != "session.set_info": + await self.push_frame(frame, direction) + return + + request_id = str(message.get("request_id") or uuid.uuid4().hex) + if not self._enabled: + await self._push_ack( + request_id=request_id, + ok=False, + error="session.set_info requires FastGPT LLM backend", + retryable=False, + ) + return + + key = message.get("key") + if not isinstance(key, str) or not key.strip(): + await self._push_ack( + request_id=request_id, + ok=False, + error="session.set_info requires non-empty string key", + retryable=False, + ) + return + + logger.info(f"Queueing FastGPT set_info request request_id={request_id} key={key!r}") + await self.push_frame( + FastGPTStateFlushRequestFrame( + request_id=request_id, + key=key.strip(), + value=message.get("value"), + ), + FrameDirection.DOWNSTREAM, + ) + + async def _push_ack( + self, + *, + request_id: str, + ok: bool, + error: str | None = None, + retryable: bool | None = None, + ) -> None: + payload: dict[str, Any] = { + "type": "session.set_info.ack", + "request_id": request_id, + "ok": ok, + } + if error is not None: + payload["error"] = error + if retryable is not None: + payload["retryable"] = retryable + await self.push_frame(OutputTransportMessageUrgentFrame(message=payload))