diff --git a/config/voice-fastgpt-state-xfyunSuperTTS.json b/config/voice-fastgpt-state-xfyunSuperTTS.json
new file mode 100644
index 0000000..98878a3
--- /dev/null
+++ b/config/voice-fastgpt-state-xfyunSuperTTS.json
@@ -0,0 +1,101 @@
+{
+ "server": {
+ "host": "0.0.0.0",
+ "port": 8000,
+ "cors_origins": ["*"]
+ },
+ "audio": {
+ "sample_rate_hz": 16000,
+ "channels": 1,
+ "frame_ms": 20
+ },
+ "session": {
+ "inactivity_timeout_sec": 60
+ },
+ "turn": {
+ "vad": {
+ "confidence": 0.8,
+ "start_secs": 0.4,
+ "stop_secs": 0.2,
+ "min_volume": 0.8
+ },
+ "interruption_min_chars": 3,
+ "interruption_use_interim": true,
+ "interruption_short_replies": [
+ "是",
+ "是的",
+ "对",
+ "对的",
+ "嗯",
+ "好",
+ "好的",
+ "行",
+ "可以",
+ "没问题",
+ "不是",
+ "不",
+ "不行",
+ "不用",
+ "不要",
+ "没有",
+ "否",
+ "你好",
+ "在吗"
+ ],
+ "user_speech_timeout_sec": 0.2
+ },
+ "agent": {
+ "system_prompt": "FastGPT app owns the system prompt when send_system_prompt is false.",
+ "greeting": "您好,这里是无锡交警,我将为您远程处理交通事故。请将人员撤离至路侧安全区域,开启危险报警双闪灯、放置三角警告牌、做好安全防护,谨防二次事故伤害。若您已经准备好了,请点击继续办理,如需人工服务,请说转人工。",
+ "greeting_mode": "fixed",
+ "response_state": {
+ "enabled": true,
+ "tag": "state",
+ "event_type": "response.state",
+ "max_prefix_chars": 256
+ }
+ },
+ "services": {
+ "stt": {
+ "provider": "xfyun",
+ "app_id": "416ce125",
+ "api_key": "c65342fe603126c3610031d8429bb36d",
+ "api_secret": "MzkyYmI5OWEyODQzN2FiN2VhN2UzYzU4",
+ "base_url": "wss://iat-api.xfyun.cn/v2/iat",
+ "language": "zh_cn",
+ "domain": "iat",
+ "accent": "mandarin",
+ "encoding": "raw",
+ "frame_size": 1280,
+ "timeout_sec": 10.0
+ },
+ "llm": {
+ "provider": "fastgpt",
+ "api_key": "fastgpt-zlLjYtWZWN0uhQHs3ZOFHG4KLGMIdr2CkbZLCSfqGm5vcdx5xIZbp",
+ "base_url": "http://localhost:3030",
+ "model": "my-voice-app",
+ "app_id": "691eddaa53e3f8d9f25f1370",
+ "chat_id": null,
+ "variables": {},
+ "detail": false,
+ "timeout_sec": 60.0,
+ "send_system_prompt": false
+ },
+ "tts": {
+ "provider": "xfyun_super",
+ "app_id": "416ce125",
+ "api_key": "c65342fe603126c3610031d8429bb36d",
+ "api_secret": "MzkyYmI5OWEyODQzN2FiN2VhN2UzYzU4",
+ "base_url": "wss://cbm01.cn-huabei-1.xf-yun.com/v1/private/mcd9m97e6",
+ "voice": "x5_lingxiaoxuan_flow",
+ "aue": "raw",
+ "speed": 50,
+ "volume": 50,
+ "pitch": 50,
+ "oral_level": "mid",
+ "source_sample_rate_hz": 24000,
+ "text_aggregation_mode": "token",
+ "timeout_sec": 30.0
+ }
+ }
+}
diff --git a/config/voice-fastgpt-xfyunSuperTTS.json b/config/voice-fastgpt-xfyunSuperTTS.json
new file mode 100644
index 0000000..cdca5f4
--- /dev/null
+++ b/config/voice-fastgpt-xfyunSuperTTS.json
@@ -0,0 +1,101 @@
+{
+ "server": {
+ "host": "0.0.0.0",
+ "port": 8000,
+ "cors_origins": ["*"]
+ },
+ "audio": {
+ "sample_rate_hz": 16000,
+ "channels": 1,
+ "frame_ms": 20
+ },
+ "session": {
+ "inactivity_timeout_sec": 60
+ },
+ "turn": {
+ "vad": {
+ "confidence": 0.8,
+ "start_secs": 0.4,
+ "stop_secs": 0.2,
+ "min_volume": 0.8
+ },
+ "interruption_min_chars": 3,
+ "interruption_use_interim": true,
+ "interruption_short_replies": [
+ "是",
+ "是的",
+ "对",
+ "对的",
+ "嗯",
+ "好",
+ "好的",
+ "行",
+ "可以",
+ "没问题",
+ "不是",
+ "不",
+ "不行",
+ "不用",
+ "不要",
+ "没有",
+ "否",
+ "你好",
+ "在吗"
+ ],
+ "user_speech_timeout_sec": 0.2
+ },
+ "agent": {
+ "system_prompt": "FastGPT app owns the system prompt when send_system_prompt is false.",
+ "greeting": "您好,这里是无锡交警,我将为您远程处理交通事故。请将人员撤离至路侧安全区域,开启危险报警双闪灯、放置三角警告牌、做好安全防护,谨防二次事故伤害。若您已经准备好了,请点击继续办理,如需人工服务,请说转人工。",
+ "greeting_mode": "fixed",
+ "response_state": {
+ "enabled": true,
+ "tag": "state",
+ "event_type": "response.state",
+ "max_prefix_chars": 256
+ }
+ },
+ "services": {
+ "stt": {
+ "provider": "xfyun",
+ "app_id": "416ce125",
+ "api_key": "c65342fe603126c3610031d8429bb36d",
+ "api_secret": "MzkyYmI5OWEyODQzN2FiN2VhN2UzYzU4",
+ "base_url": "wss://iat-api.xfyun.cn/v2/iat",
+ "language": "zh_cn",
+ "domain": "iat",
+ "accent": "mandarin",
+ "encoding": "raw",
+ "frame_size": 1280,
+ "timeout_sec": 10.0
+ },
+ "llm": {
+ "provider": "fastgpt",
+ "api_key": "fastgpt-v1FljAxBz3tJeS0bH7HZU4yVGclsTcfiy9yK7V9Zr9126maDHQ97Xlo8n",
+ "base_url": "http://localhost:3030",
+ "model": "my-voice-app",
+ "app_id": "6a153aed53e3f8d9f2744905",
+ "chat_id": null,
+ "variables": {},
+ "detail": false,
+ "timeout_sec": 60.0,
+ "send_system_prompt": false
+ },
+ "tts": {
+ "provider": "xfyun_super",
+ "app_id": "416ce125",
+ "api_key": "c65342fe603126c3610031d8429bb36d",
+ "api_secret": "MzkyYmI5OWEyODQzN2FiN2VhN2UzYzU4",
+ "base_url": "wss://cbm01.cn-huabei-1.xf-yun.com/v1/private/mcd9m97e6",
+ "voice": "x5_lingxiaoxuan_flow",
+ "aue": "raw",
+ "speed": 50,
+ "volume": 50,
+ "pitch": 50,
+ "oral_level": "mid",
+ "source_sample_rate_hz": 24000,
+ "text_aggregation_mode": "token",
+ "timeout_sec": 30.0
+ }
+ }
+}
diff --git a/config/voice-fastgpt-xfyunTTS.json b/config/voice-fastgpt-xfyunTTS.json
new file mode 100644
index 0000000..4fcf843
--- /dev/null
+++ b/config/voice-fastgpt-xfyunTTS.json
@@ -0,0 +1,101 @@
+{
+ "server": {
+ "host": "0.0.0.0",
+ "port": 8000,
+ "cors_origins": ["*"]
+ },
+ "audio": {
+ "sample_rate_hz": 16000,
+ "channels": 1,
+ "frame_ms": 20
+ },
+ "session": {
+ "inactivity_timeout_sec": 60
+ },
+ "turn": {
+ "vad": {
+ "confidence": 0.7,
+ "start_secs": 0.35,
+ "stop_secs": 0.2,
+ "min_volume": 0.65
+ },
+ "interruption_min_chars": 3,
+ "interruption_use_interim": true,
+ "interruption_short_replies": [
+ "是",
+ "是的",
+ "对",
+ "对的",
+ "嗯",
+ "好",
+ "好的",
+ "行",
+ "可以",
+ "没问题",
+ "不是",
+ "不",
+ "不行",
+ "不用",
+ "不要",
+ "没有",
+ "否",
+ "你好",
+ "在吗"
+ ],
+ "user_speech_timeout_sec": 0.2
+ },
+ "agent": {
+ "system_prompt": "FastGPT app owns the system prompt when send_system_prompt is false.",
+ "greeting": "您好,这里是无锡交警,我将为您远程处理交通事故。请将人员撤离至路侧安全区域,开启危险报警双闪灯、放置三角警告牌、做好安全防护,谨防二次事故伤害。若您已经准备好了,请点击继续办理,如需人工服务,请说转人工。",
+ "greeting_mode": "fixed",
+ "response_state": {
+ "enabled": true,
+ "tag": "state",
+ "event_type": "response.state",
+ "max_prefix_chars": 256
+ }
+ },
+ "services": {
+ "stt": {
+ "provider": "xfyun",
+ "app_id": "416ce125",
+ "api_key": "c65342fe603126c3610031d8429bb36d",
+ "api_secret": "MzkyYmI5OWEyODQzN2FiN2VhN2UzYzU4",
+ "base_url": "wss://iat-api.xfyun.cn/v2/iat",
+ "language": "zh_cn",
+ "domain": "iat",
+ "accent": "mandarin",
+ "encoding": "raw",
+ "frame_size": 1280,
+ "timeout_sec": 10.0
+ },
+ "llm": {
+ "provider": "fastgpt",
+ "api_key": "fastgpt-v1FljAxBz3tJeS0bH7HZU4yVGclsTcfiy9yK7V9Zr9126maDHQ97Xlo8n",
+ "base_url": "http://localhost:3030",
+ "model": "my-voice-app",
+ "app_id": "6a153aed53e3f8d9f2744905",
+ "chat_id": null,
+ "variables": {},
+ "detail": false,
+ "timeout_sec": 60.0,
+ "send_system_prompt": false
+ },
+ "tts": {
+ "provider": "xfyun_super",
+ "app_id": "416ce125",
+ "api_key": "c65342fe603126c3610031d8429bb36d",
+ "api_secret": "MzkyYmI5OWEyODQzN2FiN2VhN2UzYzU4",
+ "base_url": "wss://cbm01.cn-huabei-1.xf-yun.com/v1/private/mcd9m97e6",
+ "voice": "x5_lingxiaoxuan_flow",
+ "aue": "raw",
+ "speed": 50,
+ "volume": 50,
+ "pitch": 50,
+ "oral_level": "mid",
+ "source_sample_rate_hz": 24000,
+ "text_aggregation_mode": "token",
+ "timeout_sec": 30.0
+ }
+ }
+}
diff --git a/config/voice-fastgpt.example.json b/config/voice-fastgpt.example.json
deleted file mode 100644
index c7063eb..0000000
--- a/config/voice-fastgpt.example.json
+++ /dev/null
@@ -1,58 +0,0 @@
-{
- "server": {
- "host": "0.0.0.0",
- "port": 8000,
- "cors_origins": ["*"]
- },
- "audio": {
- "sample_rate_hz": 16000,
- "channels": 1,
- "frame_ms": 20
- },
- "session": {
- "inactivity_timeout_sec": 60
- },
- "turn": {
- "vad": {
- "confidence": 0.7,
- "start_secs": 0.2,
- "stop_secs": 0.6,
- "min_volume": 0.6
- },
- "interruption_min_chars": 3,
- "interruption_use_interim": true,
- "user_speech_timeout_sec": 1.0
- },
- "agent": {
- "system_prompt": "FastGPT app owns the system prompt when send_system_prompt is false.",
- "greeting": "你好",
- "greeting_mode": "generated"
- },
- "services": {
- "stt": {
- "provider": "openai",
- "api_key": "",
- "base_url": null,
- "model": "gpt-4o-mini-transcribe",
- "language": "zh"
- },
- "llm": {
- "provider": "fastgpt",
- "api_key": "",
- "base_url": null,
- "model": "my-voice-app",
- "chat_id": null,
- "variables": {},
- "detail": false,
- "timeout_sec": 60.0,
- "send_system_prompt": false
- },
- "tts": {
- "provider": "openai",
- "api_key": "",
- "base_url": null,
- "model": "gpt-4o-mini-tts",
- "voice": "alloy"
- }
- }
-}
diff --git a/config/voice-xfyun.json b/config/voice-xfyun.json
index 5c302bd..6143b60 100644
--- a/config/voice-xfyun.json
+++ b/config/voice-xfyun.json
@@ -45,7 +45,13 @@
"agent": {
"system_prompt": "# 角色 你是一个高度集成、安全第一的交警AI接警员。正在收集事故人员伤亡情况,时间,地点,事故原因,事故车辆数量,收集完成之后和用户说再见",
"greeting": "您好,这里是无锡交警,我将为您远程处理交通事故。请将人员撤离至路侧安全区域,开启危险报警双闪灯、放置三角警告牌、做好安全防护,谨防二次事故伤害。若您已经准备好了,请点击继续办理,如需人工服务,请说转人工。",
- "greeting_mode": "fixed"
+ "greeting_mode": "fixed",
+ "response_state": {
+ "enabled": true,
+ "tag": "state",
+ "event_type": "response.state",
+ "max_prefix_chars": 256
+ }
},
"services": {
"stt": {
diff --git a/config/voice.json b/config/voice.json
index 64f64af..10891ad 100644
--- a/config/voice.json
+++ b/config/voice.json
@@ -47,7 +47,13 @@
"agent": {
"system_prompt": "You are a helpful, friendly voice assistant. Keep responses concise and natural for spoken conversation.",
"greeting": "Please introduce yourself briefly.",
- "greeting_mode": "generated"
+ "greeting_mode": "generated",
+ "response_state": {
+ "enabled": false,
+ "tag": "state",
+ "event_type": "response.state",
+ "max_prefix_chars": 256
+ }
},
"services": {
"stt": {
diff --git a/src/voice/config.py b/src/voice/config.py
index affbce0..3cf3c3e 100644
--- a/src/voice/config.py
+++ b/src/voice/config.py
@@ -26,6 +26,9 @@ def resolve_voice_config_path() -> Path:
DEFAULT_VOICE_CONFIG = resolve_voice_config_path()
+SUPPORTED_LLM_PROVIDERS = frozenset({"openai", "fastgpt"})
+_LLM_PROVIDER_ALIASES = {"llm": "openai", "openai": "openai", "fastgpt": "fastgpt"}
+
@dataclass(frozen=True)
class ServerConfig:
@@ -93,11 +96,20 @@ class TurnConfig:
)
+@dataclass(frozen=True)
+class ResponseStateConfig:
+ enabled: bool = False
+ tag: str = "state"
+ event_type: str = "response.state"
+ max_prefix_chars: int = 256
+
+
@dataclass(frozen=True)
class AgentConfig:
system_prompt: str = "You are a helpful, friendly voice assistant."
greeting: str | None = None
greeting_mode: str = "generated"
+ response_state: ResponseStateConfig = field(default_factory=ResponseStateConfig)
@dataclass(frozen=True)
@@ -106,6 +118,7 @@ class LLMConfig:
api_key: str = ""
base_url: str | None = None
model: str = "gpt-4o-mini"
+ app_id: str | None = None
temperature: float | None = 0.7
chat_id: str | None = None
variables: dict[str, str] = field(default_factory=dict)
@@ -113,6 +126,19 @@ class LLMConfig:
timeout_sec: float = 60.0
send_system_prompt: bool = False
+ @property
+ def is_fastgpt(self) -> bool:
+ return self.provider == "fastgpt"
+
+ @property
+ def is_openai(self) -> bool:
+ return self.provider == "openai"
+
+ @property
+ def uses_local_context_history(self) -> bool:
+ """Whether the pipeline should seed and maintain local LLM context history."""
+ return not self.is_fastgpt or self.send_system_prompt
+
@dataclass(frozen=True)
class STTConfig:
@@ -147,6 +173,8 @@ class TTSConfig:
pitch: int = 50
timeout_sec: float = 30.0
source_sample_rate_hz: int | None = None
+ oral_level: str = "mid"
+ text_aggregation_mode: str | None = None
@dataclass(frozen=True)
@@ -183,14 +211,24 @@ def config_from_dict(data: dict) -> EngineConfig:
agent["greeting"] = None
if agent.get("greeting_mode") not in (None, "generated", "fixed", "off"):
raise ValueError("agent.greeting_mode must be one of: generated, fixed, off")
+ response_state = ResponseStateConfig(**_dict(agent.pop("response_state")))
+ if response_state.max_prefix_chars < 1:
+ raise ValueError("agent.response_state.max_prefix_chars must be greater than 0")
+ if not response_state.tag:
+ raise ValueError("agent.response_state.tag must not be empty")
+ if not response_state.event_type:
+ raise ValueError("agent.response_state.event_type must not be empty")
stt = _dict(services.get("stt") or services.get("asr"))
if stt.get("language") == "":
stt["language"] = None
llm = _dict(services.get("llm"))
+ llm["provider"] = _normalize_llm_provider(llm.get("provider", LLMConfig().provider))
if llm.get("chat_id") == "":
llm["chat_id"] = None
+ if llm.get("app_id") == "":
+ llm["app_id"] = None
if not isinstance(llm.get("variables"), dict):
llm["variables"] = {}
@@ -219,7 +257,7 @@ def config_from_dict(data: dict) -> EngineConfig:
)
),
),
- agent=AgentConfig(**agent),
+ agent=AgentConfig(**agent, response_state=response_state),
services=ServicesConfig(
llm=LLMConfig(**llm),
stt=STTConfig(**stt),
@@ -230,3 +268,14 @@ def config_from_dict(data: dict) -> EngineConfig:
def _dict(value: object) -> dict:
return dict(value) if isinstance(value, dict) else {}
+
+
+def _normalize_llm_provider(value: object) -> str:
+ provider = str(value or LLMConfig().provider).strip().lower()
+ normalized = _LLM_PROVIDER_ALIASES.get(provider)
+ if normalized is None:
+ supported = ", ".join(sorted(SUPPORTED_LLM_PROVIDERS | {"llm"}))
+ raise ValueError(
+ f"services.llm.provider must be one of: {supported}; got {value!r}"
+ )
+ return normalized
diff --git a/src/voice/context_sync.py b/src/voice/context_sync.py
new file mode 100644
index 0000000..3dab3c3
--- /dev/null
+++ b/src/voice/context_sync.py
@@ -0,0 +1,40 @@
+from __future__ import annotations
+
+from typing import Any
+
+from pipecat.frames.frames import Frame, InterruptionFrame, LLMMessagesAppendFrame
+from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
+
+from .text_stream import ProductTextStreamProcessor, maybe_sync_assistant_context
+
+
+class AssistantContextSyncProcessor(FrameProcessor):
+ """Sync LLM context to urgent-streamed assistant text before text-input turns.
+
+ ``input.text`` with ``interrupt: true`` queues ``InterruptionFrame`` before
+ ``LLMMessagesAppendFrame``. This processor runs context repair after the
+ interrupt has propagated (including TTS-phase interrupts) and before the new
+ user message is appended.
+ """
+
+ def __init__(
+ self,
+ *,
+ text_stream: ProductTextStreamProcessor,
+ assistant_aggregator: Any,
+ ) -> None:
+ super().__init__()
+ self._text_stream = text_stream
+ self._assistant_aggregator = assistant_aggregator
+ self._sync_on_next_append = False
+
+ async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
+ await super().process_frame(frame, direction)
+
+ if isinstance(frame, InterruptionFrame):
+ self._sync_on_next_append = True
+ elif isinstance(frame, LLMMessagesAppendFrame) and self._sync_on_next_append:
+ self._sync_on_next_append = False
+ maybe_sync_assistant_context(self._assistant_aggregator, self._text_stream)
+
+ await self.push_frame(frame, direction)
diff --git a/src/voice/fastgpt_llm.py b/src/voice/fastgpt_llm.py
index 4055d0c..b05a0f2 100644
--- a/src/voice/fastgpt_llm.py
+++ b/src/voice/fastgpt_llm.py
@@ -7,11 +7,13 @@ from typing import Any
import httpx
from fastgpt_client import AsyncChatClient, FastGPTInteractiveEvent, aiter_stream_events
from fastgpt_client.exceptions import FastGPTError
+from loguru import logger
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
+ InterruptionFrame,
LLMContextFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
@@ -133,6 +135,24 @@ class FastGPTLLMSettings(LLMSettings):
detail: bool = False
+def _default_fastgpt_settings(*, model: str = "fastgpt") -> FastGPTLLMSettings:
+ return FastGPTLLMSettings(
+ model=model,
+ system_instruction=None,
+ temperature=None,
+ max_tokens=None,
+ top_p=None,
+ top_k=None,
+ frequency_penalty=None,
+ presence_penalty=None,
+ seed=None,
+ filter_incomplete_user_turns=False,
+ user_turn_completion_config=None,
+ variables={},
+ detail=False,
+ )
+
+
class FastGPTLLMService(LLMService):
"""FastGPT LLM service using chatId server-side memory and workflow variables."""
@@ -144,18 +164,20 @@ class FastGPTLLMService(LLMService):
api_key: str,
base_url: str,
chat_id: str | None = None,
+ app_id: str | None = None,
send_system_prompt: bool = False,
greeting_prompt: str | None = None,
timeout: float = 60.0,
settings: FastGPTLLMSettings | None = None,
**kwargs,
) -> None:
- default_settings = self.Settings(model="fastgpt")
+ default_settings = _default_fastgpt_settings()
if settings is not None:
default_settings.apply_update(settings)
super().__init__(settings=default_settings, **kwargs)
self._chat_id = chat_id or f"voice_{uuid.uuid4().hex[:16]}"
+ self._app_id = (app_id or "").strip()
self._send_system_prompt = send_system_prompt
self._greeting_prompt = (greeting_prompt or "你好").strip() or "你好"
self._client = AsyncChatClient(
@@ -165,6 +187,10 @@ class FastGPTLLMService(LLMService):
)
self._active_response = None
+ @property
+ def app_id(self) -> str:
+ return self._app_id
+
@property
def chat_id(self) -> str:
return self._chat_id
@@ -183,6 +209,63 @@ class FastGPTLLMService(LLMService):
await self._close_active_response()
await super().cancel(frame)
+ async def _handle_interruptions(self, _: InterruptionFrame) -> None:
+ await self._close_active_response()
+ await super()._handle_interruptions(_)
+
+ @staticmethod
+ def _welcome_text_from_init_payload(payload: Any) -> str:
+ if not isinstance(payload, dict):
+ return ""
+
+ for container in (payload.get("app"), payload.get("data"), payload):
+ if not isinstance(container, dict):
+ continue
+ nested_app = container.get("app")
+ if isinstance(nested_app, dict):
+ text = FastGPTLLMService._welcome_text_from_app(nested_app)
+ if text:
+ return text
+ text = FastGPTLLMService._welcome_text_from_app(container)
+ if text:
+ return text
+ return ""
+
+ @staticmethod
+ def _welcome_text_from_app(app_payload: dict[str, Any]) -> str:
+ chat_config = (
+ app_payload.get("chatConfig")
+ if isinstance(app_payload.get("chatConfig"), dict)
+ else {}
+ )
+ return _first_nonempty_text(
+ chat_config.get("welcomeText"),
+ app_payload.get("welcomeText"),
+ )
+
+ async def fetch_welcome_text(self) -> str | None:
+ """Return FastGPT app welcome text from chat init when ``app_id`` is configured."""
+ if not self._app_id:
+ return None
+
+ try:
+ response = await self._client.get_chat_init(
+ appId=self._app_id,
+ chatId=self._chat_id,
+ )
+ response.raise_for_status()
+ text = self._welcome_text_from_init_payload(response.json())
+ if text:
+ logger.info(f"FastGPT welcomeText loaded for appId={self._app_id}")
+ return text or None
+ except FastGPTError as exc:
+ logger.warning(f"FastGPT chat init failed: {exc}")
+ except httpx.HTTPError as exc:
+ logger.warning(f"FastGPT chat init HTTP error: {exc}")
+ except Exception as exc:
+ logger.warning(f"FastGPT chat init error: {exc}")
+ return None
+
async def _close_active_response(self) -> None:
response = self._active_response
self._active_response = None
@@ -216,6 +299,12 @@ class FastGPTLLMService(LLMService):
messages = self._build_fastgpt_messages(context)
variables = self._settings.variables or None
+ logger.info(
+ "FastGPT chat completion "
+ f"chatId={self._chat_id} appId={self._app_id or '-'} "
+ f"variables={sorted((variables or {}).keys())} messages={messages!r}"
+ )
+
await self.start_ttfb_metrics()
try:
diff --git a/src/voice/pipeline.py b/src/voice/pipeline.py
index 0714016..a49e8ec 100644
--- a/src/voice/pipeline.py
+++ b/src/voice/pipeline.py
@@ -32,10 +32,13 @@ from pipecat.turns.user_stop.speech_timeout_user_turn_stop_strategy import (
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from .config import EngineConfig
+from .context_sync import AssistantContextSyncProcessor
+from .fastgpt_llm import FastGPTLLMService
from .protocol import ProductWebsocketSerializer
from .services import create_llm_service, create_stt_service, create_tts_service
+from .response_state import StateTagResponseProcessor
from .text_input import ProductTextInputProcessor
-from .text_stream import ProductTextStreamProcessor, sync_streamed_assistant_context
+from .text_stream import ProductTextStreamProcessor, maybe_sync_assistant_context
from .transcript_stream import ProductTranscriptStreamProcessor
from .turn_start import InterruptionGateUserTurnStartStrategy
@@ -83,14 +86,15 @@ async def run_pipeline_with_serializer(
session_variables={"session_id": chat_id, "channel": "voice"},
greeting_prompt=config.agent.greeting,
)
- if llm_config.provider == "fastgpt":
- logger.info(f"FastGPT chatId={chat_id}")
+ if llm_config.is_fastgpt:
+ logger.info(f"LLM backend=fastgpt chatId={chat_id} appId={llm_config.app_id or '-'}")
+ else:
+ logger.info(f"LLM backend=openai model={llm_config.model}")
tts = create_tts_service(config.services.tts, config.audio)
- use_fastgpt = llm_config.provider == "fastgpt" and not llm_config.send_system_prompt
messages: list[dict[str, str]] = []
- if not use_fastgpt:
+ if llm_config.uses_local_context_history:
messages = [{"role": "system", "content": config.agent.system_prompt}]
if config.agent.greeting and config.agent.greeting_mode == "generated":
messages.append({"role": "system", "content": config.agent.greeting})
@@ -126,21 +130,31 @@ async def run_pipeline_with_serializer(
)
text_stream = ProductTextStreamProcessor()
+ context_sync = AssistantContextSyncProcessor(
+ text_stream=text_stream,
+ assistant_aggregator=assistant_aggregator,
+ )
- pipeline = Pipeline(
+ processors = [
+ transport.input(),
+ ProductTextInputProcessor(),
+ stt,
+ ProductTranscriptStreamProcessor(),
+ context_sync,
+ user_aggregator,
+ llm,
+ ]
+ if config.agent.response_state.enabled:
+ processors.append(StateTagResponseProcessor(config.agent.response_state))
+ processors.extend(
[
- transport.input(),
- ProductTextInputProcessor(),
- stt,
- ProductTranscriptStreamProcessor(),
- user_aggregator,
- llm,
text_stream,
tts,
transport.output(),
assistant_aggregator,
]
)
+ pipeline = Pipeline(processors)
task = PipelineTask(
pipeline,
@@ -160,7 +174,14 @@ async def run_pipeline_with_serializer(
if config.agent.greeting_mode == "fixed" and config.agent.greeting:
await task.queue_frames([TTSSpeakFrame(config.agent.greeting)])
elif config.agent.greeting_mode == "generated":
- await task.queue_frames([LLMRunFrame()])
+ if isinstance(llm, FastGPTLLMService):
+ welcome = await llm.fetch_welcome_text()
+ if welcome:
+ await task.queue_frames([TTSSpeakFrame(welcome)])
+ else:
+ await task.queue_frames([LLMRunFrame()])
+ else:
+ await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(_transport, _client):
@@ -192,14 +213,12 @@ async def run_pipeline_with_serializer(
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
async def on_assistant_turn_stopped(_aggregator, message: AssistantTurnStoppedMessage):
logger.info(f"Assistant: {message.content}")
- if message.interrupted:
- streamed = text_stream.take_interrupted_stream_text()
- if streamed:
- sync_streamed_assistant_context(
- _aggregator,
- streamed_text=streamed,
- committed_text=message.content or "",
- )
+ maybe_sync_assistant_context(
+ _aggregator,
+ text_stream,
+ committed_text=message.content or "",
+ )
+ text_stream.take_interrupted_stream_text()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
diff --git a/src/voice/protocol.py b/src/voice/protocol.py
index 79d4473..6ee3633 100644
--- a/src/voice/protocol.py
+++ b/src/voice/protocol.py
@@ -1,6 +1,7 @@
from __future__ import annotations
import base64
+import binascii
import json
from typing import Any
@@ -19,10 +20,15 @@ from pipecat.frames.frames import (
OutputTransportMessageUrgentFrame,
TextFrame,
TranscriptionFrame,
+ UserImageRawFrame,
)
from pipecat.serializers.base_serializer import FrameSerializer
+MAX_INPUT_IMAGE_BYTES = 8 * 1024 * 1024
+SUPPORTED_INPUT_IMAGE_MIME_TYPES = {"image/jpeg", "image/png", "image/webp"}
+
+
class ProductWebsocketSerializer(FrameSerializer):
"""Stable app-facing JSON/base64 protocol adapter for Pipecat websocket transport."""
@@ -118,7 +124,7 @@ class ProductWebsocketSerializer(FrameSerializer):
return None
try:
pcm = base64.b64decode(audio)
- except ValueError as exc:
+ except (binascii.Error, ValueError) as exc:
logger.warning(f"Invalid input.audio base64: {exc}")
return None
return InputAudioRawFrame(
@@ -127,6 +133,9 @@ class ProductWebsocketSerializer(FrameSerializer):
num_channels=int(message.get("channels") or self._channels),
)
+ if message_type == "input.image":
+ return self._deserialize_input_image(message)
+
if message_type == "input.text":
text = message.get("text")
if not isinstance(text, str) or not text.strip():
@@ -147,6 +156,61 @@ class ProductWebsocketSerializer(FrameSerializer):
logger.warning(f"Unsupported product websocket message type: {message_type!r}")
return None
+ def _deserialize_input_image(self, message: dict[str, Any]) -> Frame | None:
+ encoded = message.get("image") or message.get("data")
+ if not isinstance(encoded, str):
+ logger.warning("input.image requires base64 'image' or 'data'")
+ return None
+
+ mime_type = str(message.get("mime_type") or message.get("media_type") or "image/jpeg")
+ if mime_type not in SUPPORTED_INPUT_IMAGE_MIME_TYPES:
+ logger.warning(
+ "input.image unsupported mime_type "
+ f"{mime_type!r}; expected one of {sorted(SUPPORTED_INPUT_IMAGE_MIME_TYPES)}"
+ )
+ return None
+
+ try:
+ width = int(message.get("width") or 0)
+ height = int(message.get("height") or 0)
+ except (TypeError, ValueError):
+ logger.warning("input.image width and height must be integers")
+ return None
+
+ if width <= 0 or height <= 0:
+ logger.warning("input.image requires positive integer width and height")
+ return None
+
+ if "," in encoded and encoded.lstrip().startswith("data:"):
+ encoded = encoded.split(",", 1)[1]
+
+ try:
+ image = base64.b64decode(encoded, validate=True)
+ except (binascii.Error, ValueError) as exc:
+ logger.warning(f"Invalid input.image base64: {exc}")
+ return None
+
+ if len(image) > MAX_INPUT_IMAGE_BYTES:
+ logger.warning(
+ f"input.image too large: {len(image)} bytes; "
+ f"max is {MAX_INPUT_IMAGE_BYTES} bytes"
+ )
+ return None
+
+ text = message.get("text")
+ if text is not None and not isinstance(text, str):
+ logger.warning("input.image text must be a string when provided")
+ return None
+
+ return UserImageRawFrame(
+ image=image,
+ size=(width, height),
+ format=mime_type,
+ user_id=str(message.get("user_id") or "product-user"),
+ text=text or "Answer using this camera image.",
+ append_to_context=bool(message.get("append_to_context", True)),
+ )
+
def _event(self, event_type: str, **payload: Any) -> str:
self._sequence += 1
return json.dumps(
diff --git a/src/voice/response_state.py b/src/voice/response_state.py
new file mode 100644
index 0000000..5983061
--- /dev/null
+++ b/src/voice/response_state.py
@@ -0,0 +1,136 @@
+from __future__ import annotations
+
+from pipecat.frames.frames import (
+ CancelFrame,
+ Frame,
+ InterruptionFrame,
+ LLMFullResponseEndFrame,
+ LLMFullResponseStartFrame,
+ LLMTextFrame,
+ OutputTransportMessageUrgentFrame,
+)
+from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
+
+from .config import ResponseStateConfig
+
+
+class StateTagResponseProcessor(FrameProcessor):
+ """Extract a leading state tag from LLM text before text streaming and TTS.
+
+ Expected model output:
+
+
Chat cleared.
"; @@ -169,6 +182,209 @@ function truncateLogValue(value, maxLength = 160) { return `${text.slice(0, maxLength - 1)}…`; } +function formatLogTime(date = new Date()) { + return date.toLocaleTimeString([], { + hour12: false, + hour: "2-digit", + minute: "2-digit", + second: "2-digit", + }); +} + +function formatLogBytes(byteCount) { + if (byteCount >= 1048576) { + return `${(byteCount / 1048576).toFixed(2)} MB`; + } + if (byteCount >= 1024) { + return `${(byteCount / 1024).toFixed(1)} KB`; + } + return `${byteCount} bytes`; +} + +function wsLogGroupLabel(groupKey) { + if (groupKey === WS_LOG_GROUP_KEYS.AUDIO_DELTA) { + return "response.audio.delta"; + } + if (groupKey === WS_LOG_GROUP_KEYS.AUDIO_SEND) { + return "input.audio binary"; + } + return "grouped events"; +} + +function ensureWsLogReady() { + if (els.wsLog.querySelector(".ws-log__empty")) { + els.wsLog.innerHTML = ""; + } +} + +function scrollWsLogToBottom() { + els.wsLog.scrollTop = els.wsLog.scrollHeight; +} + +function trimWsLog() { + while (els.wsLog.children.length > MAX_WS_LOG_LINES) { + const first = els.wsLog.firstElementChild; + if (state.wsLogGroup?.element === first) { + state.wsLogGroup = null; + } + first.remove(); + } +} + +function finalizeWsLogGroup() { + state.wsLogGroup = null; +} + +function createWsLogEntry(direction, detail, kind, timeText = formatLogTime()) { + const entry = document.createElement("div"); + entry.className = `ws-log__entry ws-log__entry--${kind}`; + + const time = document.createElement("span"); + time.className = "ws-log__time"; + time.textContent = timeText; + + const dir = document.createElement("span"); + dir.className = "ws-log__direction"; + dir.textContent = + direction === "send" + ? "SEND" + : direction === "recv" + ? "RECV" + : direction.toUpperCase(); + + const body = document.createElement("span"); + body.className = "ws-log__detail"; + body.textContent = detail; + + entry.append(time, dir, body); + return entry; +} + +function updateWsLogGroupSummary(group) { + group.summaryEl.textContent = `${wsLogGroupLabel(group.key)} ×${group.count} (${formatLogBytes(group.totalBytes)})`; +} + +function appendWsLogGroupChildDom(group, item) { + const entry = createWsLogEntry( + group.direction, + item.detail, + group.kind, + item.time, + ); + entry.classList.add("ws-log__entry--child"); + group.childrenEl.appendChild(entry); + + const childEntries = group.childrenEl.querySelectorAll(".ws-log__entry"); + if (childEntries.length > MAX_GROUP_CHILDREN_RENDER) { + const omit = group.childrenEl.querySelector(".ws-log__group-omit"); + if (!omit) { + const omitted = document.createElement("div"); + omitted.className = "ws-log__group-omit"; + omitted.textContent = "… earlier events omitted"; + group.childrenEl.insertBefore(omitted, group.childrenEl.firstElementChild); + } + childEntries[0].remove(); + } +} + +function renderWsLogGroupChildren(group) { + group.childrenEl.innerHTML = ""; + const items = group.items; + const start = Math.max(0, items.length - MAX_GROUP_CHILDREN_RENDER); + if (start > 0) { + const omitted = document.createElement("div"); + omitted.className = "ws-log__group-omit"; + omitted.textContent = `… ${start} earlier events omitted`; + group.childrenEl.appendChild(omitted); + } + for (let i = start; i < items.length; i += 1) { + appendWsLogGroupChildDom(group, items[i]); + } +} + +function toggleWsLogGroup(group) { + group.collapsed = !group.collapsed; + group.childrenEl.hidden = group.collapsed; + group.chevronEl.textContent = group.collapsed ? "▶" : "▼"; + group.headerEl.setAttribute("aria-expanded", group.collapsed ? "false" : "true"); + + if (!group.collapsed && group.childrenEl.childElementCount === 0) { + renderWsLogGroupChildren(group); + } +} + +function appendWsLogGroupItem(groupKey, direction, kind, itemDetail, byteCount = 0) { + ensureWsLogReady(); + + let group = state.wsLogGroup; + if (!group || group.key !== groupKey) { + finalizeWsLogGroup(); + + const groupEl = document.createElement("div"); + groupEl.className = `ws-log__group ws-log__group--${kind}`; + + const header = document.createElement("button"); + header.type = "button"; + header.className = "ws-log__group-header"; + header.setAttribute("aria-expanded", "false"); + + const time = document.createElement("span"); + time.className = "ws-log__time"; + time.textContent = formatLogTime(); + + const dir = document.createElement("span"); + dir.className = "ws-log__direction"; + dir.textContent = direction === "send" ? "SEND" : "RECV"; + + const chevron = document.createElement("span"); + chevron.className = "ws-log__group-chevron"; + chevron.textContent = "▶"; + chevron.setAttribute("aria-hidden", "true"); + + const summary = document.createElement("span"); + summary.className = "ws-log__group-summary"; + + header.append(time, dir, chevron, summary); + + const children = document.createElement("div"); + children.className = "ws-log__group-children"; + children.hidden = true; + + groupEl.append(header, children); + els.wsLog.appendChild(groupEl); + + group = { + key: groupKey, + direction, + kind, + element: groupEl, + headerEl: header, + chevronEl: chevron, + summaryEl: summary, + childrenEl: children, + collapsed: true, + count: 0, + totalBytes: 0, + items: [], + }; + state.wsLogGroup = group; + header.addEventListener("click", () => toggleWsLogGroup(group)); + } + + group.count += 1; + group.totalBytes += byteCount; + const item = { time: formatLogTime(), detail: itemDetail }; + group.items.push(item); + updateWsLogGroupSummary(group); + + if (!group.collapsed) { + appendWsLogGroupChildDom(group, item); + } + + trimWsLog(); + scrollWsLogToBottom(); +} + function compactWsPayload(payload) { if (!payload || typeof payload !== "object") return String(payload); const compact = { ...payload }; @@ -191,85 +407,27 @@ function compactWsPayload(payload) { } function addWsLog(direction, detail, kind = direction) { - if (els.wsLog.querySelector(".ws-log__empty")) { - els.wsLog.innerHTML = ""; - } - - const entry = document.createElement("div"); - entry.className = `ws-log__entry ws-log__entry--${kind}`; - - const time = document.createElement("span"); - time.className = "ws-log__time"; - time.textContent = new Date().toLocaleTimeString([], { - hour12: false, - hour: "2-digit", - minute: "2-digit", - second: "2-digit", - }); - - const dir = document.createElement("span"); - dir.className = "ws-log__direction"; - dir.textContent = - direction === "send" - ? "SEND" - : direction === "recv" - ? "RECV" - : direction.toUpperCase(); - - const body = document.createElement("span"); - body.className = "ws-log__detail"; - body.textContent = detail; - - entry.append(time, dir, body); - els.wsLog.appendChild(entry); - - while (els.wsLog.children.length > MAX_WS_LOG_LINES) { - els.wsLog.firstElementChild.remove(); - } - els.wsLog.scrollTop = els.wsLog.scrollHeight; -} - -function flushAudioDeltaLog() { - if (state.audioDeltaLogCount === 0) return; - addWsLog( - "recv", - `response.audio.delta x${state.audioDeltaLogCount} (${state.audioDeltaLogBytes} bytes)`, - ); - state.audioDeltaLogCount = 0; - state.audioDeltaLogBytes = 0; - state.lastAudioDeltaLogAt = performance.now(); -} - -function flushAudioSendLog() { - if (state.audioSendLogCount === 0) return; - addWsLog( - "send", - `input.audio binary x${state.audioSendLogCount} (${state.audioSendLogBytes} bytes)`, - ); - state.audioSendLogCount = 0; - state.audioSendLogBytes = 0; - state.lastAudioSendLogAt = performance.now(); -} - -function flushPendingWsLogs() { - flushAudioDeltaLog(); - flushAudioSendLog(); + finalizeWsLogGroup(); + ensureWsLogReady(); + els.wsLog.appendChild(createWsLogEntry(direction, detail, kind)); + trimWsLog(); + scrollWsLogToBottom(); } function logWsPayload(direction, payload) { - if (direction === "send") { - flushAudioSendLog(); - } else { - flushAudioDeltaLog(); - } - if (direction === "recv" && payload?.type === "response.audio.delta") { - state.audioDeltaLogCount += 1; - state.audioDeltaLogBytes += payload.bytes || payload.audio?.length || 0; - const now = performance.now(); - if (now - state.lastAudioDeltaLogAt >= AUDIO_DELTA_LOG_INTERVAL_MS) { - flushAudioDeltaLog(); - } + const bytes = payload.bytes || 0; + const detail = + payload.seq != null + ? `seq=${payload.seq} (${bytes} bytes)` + : `(${bytes} bytes)`; + appendWsLogGroupItem( + WS_LOG_GROUP_KEYS.AUDIO_DELTA, + "recv", + "recv", + detail, + bytes, + ); return; } @@ -277,12 +435,13 @@ function logWsPayload(direction, payload) { } function logBinarySend(byteLength) { - state.audioSendLogCount += 1; - state.audioSendLogBytes += byteLength; - const now = performance.now(); - if (now - state.lastAudioSendLogAt >= AUDIO_DELTA_LOG_INTERVAL_MS) { - flushAudioSendLog(); - } + appendWsLogGroupItem( + WS_LOG_GROUP_KEYS.AUDIO_SEND, + "send", + "send", + `(${byteLength} bytes)`, + byteLength, + ); } function wsSend(data) { @@ -292,8 +451,6 @@ function wsSend(data) { try { logWsPayload("send", JSON.parse(data)); } catch (_) { - flushAudioSendLog(); - flushAudioDeltaLog(); addWsLog("send", truncateLogValue(data)); } } else { @@ -313,10 +470,7 @@ function wsSend(data) { } function clearWsLog() { - state.audioDeltaLogCount = 0; - state.audioDeltaLogBytes = 0; - state.audioSendLogCount = 0; - state.audioSendLogBytes = 0; + state.wsLogGroup = null; els.wsLog.innerHTML = '