This commit is contained in:
Xin Wang
2026-05-28 14:29:30 +08:00
4 changed files with 236 additions and 1 deletions

View File

@@ -1,5 +1,7 @@
from __future__ import annotations
import asyncio
import json
import uuid
from dataclasses import dataclass, field
from typing import Any
@@ -19,12 +21,15 @@ from pipecat.frames.frames import (
LLMFullResponseStartFrame,
LLMTextFrame,
OutputTransportMessageFrame,
OutputTransportMessageUrgentFrame,
)
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import LLMService
from pipecat.services.settings import LLMSettings
from .state_info import FastGPTStateFlushRequestFrame
def _extract_text_from_event(kind: str, payload: Any) -> str:
if not isinstance(payload, dict):
@@ -369,10 +374,130 @@ class FastGPTLLMService(LLMService):
FrameDirection.DOWNSTREAM,
)
async def _process_state_flush_request(self, frame: FastGPTStateFlushRequestFrame) -> None:
try:
await self._run_state_transaction(frame)
except Exception as exc:
logger.error(
"FastGPT set_info failed "
f"request_id={frame.request_id} key={frame.key!r}: {exc}"
)
await self._push_state_ack(
request_id=frame.request_id,
ok=False,
error=str(exc) or "FastGPT state update failed",
retryable=True,
)
return
await self._push_state_ack(request_id=frame.request_id, ok=True)
async def _run_state_transaction(self, frame: FastGPTStateFlushRequestFrame) -> None:
task = asyncio.create_task(self._set_fastgpt_state(frame))
try:
await asyncio.shield(task)
except asyncio.CancelledError:
logger.info(
"Waiting for in-flight FastGPT set_info to finish after cancellation "
f"request_id={frame.request_id}"
)
await task
async def _set_fastgpt_state(self, frame: FastGPTStateFlushRequestFrame) -> None:
current_state = await self._read_fastgpt_state()
await self._delete_last_two_chat_records()
current_state[frame.key] = frame.value
logger.info(
"Writing FastGPT state "
f"chatId={self._chat_id} request_id={frame.request_id} key={frame.key!r}"
)
response = await self._client.create_chat_completion(
messages=[{"role": "user", "content": ""}],
chatId=self._chat_id,
stream=False,
detail=True,
variables={"state": current_state},
)
response.raise_for_status()
await self._delete_last_two_chat_records()
async def _read_fastgpt_state(self) -> dict[str, Any]:
response = await self._client.create_chat_completion(
messages=[{"role": "user", "content": ""}],
chatId=self._chat_id,
stream=False,
detail=True,
)
response.raise_for_status()
data = response.json()
state = data.get("newVariables", {}).get("state", {})
if isinstance(state, str):
state = json.loads(state) if state else {}
if state is None:
return {}
if not isinstance(state, dict):
raise ValueError("FastGPT newVariables.state must be an object or JSON object string")
return dict(state)
async def _delete_last_two_chat_records(self) -> None:
if not self._app_id:
raise ValueError("FastGPT app_id is required to clean synthetic chat records")
response = await self._client.get_chat_records(
appId=self._app_id,
chatId=self._chat_id,
offset=0,
pageSize=10,
)
response.raise_for_status()
data = response.json()
records = data.get("data", {}).get("list", [])
if len(records) < 2:
logger.warning(f"Less than 2 FastGPT records found for chatId={self._chat_id}")
return
data_ids = [record["dataId"] for record in records[-2:]]
logger.info(f"Deleting FastGPT synthetic records chatId={self._chat_id} dataIds={data_ids}")
for data_id in data_ids:
delete_response = await self._client.delete_chat_record(
appId=self._app_id,
chatId=self._chat_id,
contentId=data_id,
)
delete_response.raise_for_status()
async def _push_state_ack(
self,
*,
request_id: str,
ok: bool,
error: str | None = None,
retryable: bool | None = None,
) -> None:
payload: dict[str, Any] = {
"type": "session.set_info.ack",
"request_id": request_id,
"ok": ok,
}
if error is not None:
payload["error"] = error
if retryable is not None:
payload["retryable"] = retryable
await self.push_frame(
OutputTransportMessageUrgentFrame(message=payload),
FrameDirection.DOWNSTREAM,
)
async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
await super().process_frame(frame, direction)
if isinstance(frame, LLMContextFrame):
if isinstance(frame, FastGPTStateFlushRequestFrame):
await self._process_state_flush_request(frame)
elif isinstance(frame, LLMContextFrame):
try:
await self.push_frame(LLMFullResponseStartFrame())
await self.start_processing_metrics()

View File

@@ -39,6 +39,7 @@ from .fastgpt_llm import FastGPTLLMService
from .product_protocol import ProductWebsocketSerializer
from .response_state import StateTagResponseProcessor
from .services import create_llm_service, create_stt_service, create_tts_service
from .state_info import SetInfoProcessor
from .text_input import ProductTextInputProcessor
from .text_stream import ProductTextStreamProcessor, maybe_sync_assistant_context
from .transcript_stream import ProductTranscriptStreamProcessor
@@ -156,6 +157,7 @@ async def run_pipeline_with_serializer(
processors = [
transport.input(),
SetInfoProcessor(enabled=llm_config.is_fastgpt),
ProductTextInputProcessor(),
stt,
ProductTranscriptStreamProcessor(),

View File

@@ -153,6 +153,16 @@ class ProductWebsocketSerializer(FrameSerializer):
}
)
if message_type == "session.set_info":
return InputTransportMessageFrame(
message={
"type": "session.set_info",
"request_id": message.get("request_id"),
"key": message.get("key"),
"value": message.get("value"),
}
)
if message_type == "transport.message":
payload = message.get("message")
return InputTransportMessageFrame(message=payload if isinstance(payload, dict) else message)

98
engine/state_info.py Normal file
View File

@@ -0,0 +1,98 @@
from __future__ import annotations
import uuid
from dataclasses import dataclass
from typing import Any
from loguru import logger
from pipecat.frames.frames import (
ControlFrame,
Frame,
InputTransportMessageFrame,
OutputTransportMessageUrgentFrame,
UninterruptibleFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
@dataclass
class FastGPTStateFlushRequestFrame(ControlFrame, UninterruptibleFrame):
"""Queued FastGPT state update request.
This frame carries one pending set_info operation. It is not a state cache:
the FastGPT service still reads the latest state before applying it.
"""
request_id: str
key: str
value: Any
class SetInfoProcessor(FrameProcessor):
"""Converts product set_info messages into queued FastGPT state writes."""
def __init__(self, *, enabled: bool = True):
super().__init__()
self._enabled = enabled
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if not isinstance(frame, InputTransportMessageFrame):
await self.push_frame(frame, direction)
return
message = frame.message
if not isinstance(message, dict) or message.get("type") != "session.set_info":
await self.push_frame(frame, direction)
return
request_id = str(message.get("request_id") or uuid.uuid4().hex)
if not self._enabled:
await self._push_ack(
request_id=request_id,
ok=False,
error="session.set_info requires FastGPT LLM backend",
retryable=False,
)
return
key = message.get("key")
if not isinstance(key, str) or not key.strip():
await self._push_ack(
request_id=request_id,
ok=False,
error="session.set_info requires non-empty string key",
retryable=False,
)
return
logger.info(f"Queueing FastGPT set_info request request_id={request_id} key={key!r}")
await self.push_frame(
FastGPTStateFlushRequestFrame(
request_id=request_id,
key=key.strip(),
value=message.get("value"),
),
FrameDirection.DOWNSTREAM,
)
async def _push_ack(
self,
*,
request_id: str,
ok: bool,
error: str | None = None,
retryable: bool | None = None,
) -> None:
payload: dict[str, Any] = {
"type": "session.set_info.ack",
"request_id": request_id,
"ok": ok,
}
if error is not None:
payload["error"] = error
if retryable is not None:
payload["retryable"] = retryable
await self.push_frame(OutputTransportMessageUrgentFrame(message=payload))