Add configuration files for FastGPT and Xfyun voice services, enhancing LLM service capabilities. Update LLMConfig to include chat_id, variables, detail, and timeout settings. Refactor create_llm_service to support FastGPT integration and adjust pipeline to handle chat_id and greeting prompts. Implement context synchronization for interrupted assistant turns in text streaming.

This commit is contained in:
Xin Wang
2026-05-26 10:56:38 +08:00
parent e4e47f637e
commit b14ef64665
7 changed files with 583 additions and 13 deletions

View File

@@ -0,0 +1,58 @@
{
"server": {
"host": "0.0.0.0",
"port": 8000,
"cors_origins": ["*"]
},
"audio": {
"sample_rate_hz": 16000,
"channels": 1,
"frame_ms": 20
},
"session": {
"inactivity_timeout_sec": 60
},
"turn": {
"vad": {
"confidence": 0.7,
"start_secs": 0.2,
"stop_secs": 0.6,
"min_volume": 0.6
},
"interruption_min_chars": 3,
"interruption_use_interim": true,
"user_speech_timeout_sec": 1.0
},
"agent": {
"system_prompt": "FastGPT app owns the system prompt when send_system_prompt is false.",
"greeting": "你好",
"greeting_mode": "generated"
},
"services": {
"stt": {
"provider": "openai",
"api_key": "",
"base_url": null,
"model": "gpt-4o-mini-transcribe",
"language": "zh"
},
"llm": {
"provider": "fastgpt",
"api_key": "",
"base_url": null,
"model": "my-voice-app",
"chat_id": null,
"variables": {},
"detail": false,
"timeout_sec": 60.0,
"send_system_prompt": false
},
"tts": {
"provider": "openai",
"api_key": "",
"base_url": null,
"model": "gpt-4o-mini-tts",
"voice": "alloy"
}
}
}

86
config/voice-xfyun.json Normal file
View File

@@ -0,0 +1,86 @@
{
"server": {
"host": "0.0.0.0",
"port": 8000,
"cors_origins": ["*"]
},
"audio": {
"sample_rate_hz": 16000,
"channels": 1,
"frame_ms": 20
},
"session": {
"inactivity_timeout_sec": 60
},
"turn": {
"vad": {
"confidence": 0.7,
"start_secs": 0.35,
"stop_secs": 0.2,
"min_volume": 0.65
},
"interruption_min_chars": 3,
"interruption_use_interim": true,
"interruption_short_replies": [
"是",
"是的",
"对",
"对的",
"嗯",
"好",
"好的",
"行",
"可以",
"没问题",
"不是",
"不",
"不行",
"不用",
"不要",
"没有",
"否"
],
"user_speech_timeout_sec": 0.2
},
"agent": {
"system_prompt": "# 角色 你是一个高度集成、安全第一的交警AI接警员。正在收集事故人员伤亡情况时间地点事故原因事故车辆数量收集完成之后和用户说再见",
"greeting": "您好,这里是无锡交警,我将为您远程处理交通事故。请将人员撤离至路侧安全区域,开启危险报警双闪灯、放置三角警告牌、做好安全防护,谨防二次事故伤害。若您已经准备好了,请点击继续办理,如需人工服务,请说转人工。",
"greeting_mode": "fixed"
},
"services": {
"stt": {
"provider": "xfyun",
"app_id": "416ce125",
"api_key": "c65342fe603126c3610031d8429bb36d",
"api_secret": "MzkyYmI5OWEyODQzN2FiN2VhN2UzYzU4",
"base_url": "wss://iat-api.xfyun.cn/v2/iat",
"language": "zh_cn",
"domain": "iat",
"accent": "mandarin",
"encoding": "raw",
"frame_size": 1280,
"timeout_sec": 10.0
},
"llm": {
"provider": "openai",
"api_key": "sk-230701ff1b6143ecbf322b3170606016",
"base_url": "https://api.deepseek.com/v1",
"model": "deepseek-chat",
"temperature": 0.7
},
"tts": {
"provider": "xfyun",
"app_id": "416ce125",
"api_key": "c65342fe603126c3610031d8429bb36d",
"api_secret": "MzkyYmI5OWEyODQzN2FiN2VhN2UzYzU4",
"base_url": "wss://tts-api.xfyun.cn/v2/tts",
"voice": "x4_xiaoyan",
"aue": "raw",
"tte": "UTF8",
"speed": 50,
"volume": 50,
"pitch": 50,
"source_sample_rate_hz": 16000
}
}
}

View File

@@ -107,6 +107,11 @@ class LLMConfig:
base_url: str | None = None
model: str = "gpt-4o-mini"
temperature: float | None = 0.7
chat_id: str | None = None
variables: dict[str, str] = field(default_factory=dict)
detail: bool = False
timeout_sec: float = 60.0
send_system_prompt: bool = False
@dataclass(frozen=True)
@@ -183,6 +188,12 @@ def config_from_dict(data: dict) -> EngineConfig:
if stt.get("language") == "":
stt["language"] = None
llm = _dict(services.get("llm"))
if llm.get("chat_id") == "":
llm["chat_id"] = None
if not isinstance(llm.get("variables"), dict):
llm["variables"] = {}
turn = _dict(data.get("turn"))
vad = _dict(turn.get("vad"))
@@ -210,7 +221,7 @@ def config_from_dict(data: dict) -> EngineConfig:
),
agent=AgentConfig(**agent),
services=ServicesConfig(
llm=LLMConfig(**_dict(services.get("llm"))),
llm=LLMConfig(**llm),
stt=STTConfig(**stt),
tts=TTSConfig(**_dict(services.get("tts"))),
),

300
src/voice/fastgpt_llm.py Normal file
View File

@@ -0,0 +1,300 @@
from __future__ import annotations
import uuid
from dataclasses import dataclass, field
from typing import Any
import httpx
from fastgpt_client import AsyncChatClient, FastGPTInteractiveEvent, aiter_stream_events
from fastgpt_client.exceptions import FastGPTError
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
LLMContextFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMTextFrame,
OutputTransportMessageFrame,
)
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import LLMService
from pipecat.services.settings import LLMSettings
def _extract_text_from_event(kind: str, payload: Any) -> str:
if not isinstance(payload, dict):
return ""
if kind in {"answer", "fastAnswer"}:
text = payload.get("text")
if isinstance(text, str) and text:
return text
choices = payload.get("choices") if isinstance(payload.get("choices"), list) else []
if not choices:
return str(payload.get("text") or "")
first_choice = choices[0] if isinstance(choices[0], dict) else {}
delta = first_choice.get("delta") if isinstance(first_choice.get("delta"), dict) else {}
content = delta.get("content")
if isinstance(content, str) and content:
return content
message = first_choice.get("message") if isinstance(first_choice.get("message"), dict) else {}
message_content = message.get("content")
if isinstance(message_content, str) and message_content:
return message_content
return ""
def _message_text(message: dict[str, Any]) -> str:
content = message.get("content")
if isinstance(content, str):
return content.strip()
if isinstance(content, list):
parts: list[str] = []
for part in content:
if isinstance(part, dict) and part.get("type") == "text":
text = part.get("text")
if isinstance(text, str) and text.strip():
parts.append(text.strip())
return " ".join(parts)
return ""
def _first_nonempty_text(*values: Any) -> str:
for value in values:
if isinstance(value, str):
text = value.strip()
if text:
return text
return ""
def _interactive_spoken_prompt(event: FastGPTInteractiveEvent) -> str:
payload = event.data if isinstance(event.data, dict) else {}
params = payload.get("params") if isinstance(payload.get("params"), dict) else {}
prompt = _first_nonempty_text(
payload.get("opener"),
params.get("opener"),
payload.get("prompt"),
params.get("prompt"),
payload.get("text"),
params.get("text"),
payload.get("title"),
params.get("title"),
payload.get("description"),
params.get("description"),
)
if prompt:
return prompt
if event.interaction_type == "userSelect":
raw_options = (
params.get("userSelectOptions")
if isinstance(params.get("userSelectOptions"), list)
else []
)
labels: list[str] = []
for index, raw in enumerate(raw_options, start=1):
if isinstance(raw, str) and raw.strip():
labels.append(f"{index}. {raw.strip()}")
elif isinstance(raw, dict):
label = _first_nonempty_text(raw.get("label"), raw.get("value"))
if label:
labels.append(f"{index}. {label}")
if labels:
return "请选择:" + "".join(labels)
return "请选择一个选项。"
if event.interaction_type == "userInput":
input_form = params.get("inputForm") if isinstance(params.get("inputForm"), list) else []
labels = [
_first_nonempty_text(field.get("label"), field.get("name"))
for field in input_form
if isinstance(field, dict)
]
labels = [label for label in labels if label]
if labels:
return "请提供以下信息:" + "".join(labels)
return "请补充所需信息。"
return "请继续。"
@dataclass
class FastGPTLLMSettings(LLMSettings):
variables: dict[str, Any] = field(default_factory=dict)
detail: bool = False
class FastGPTLLMService(LLMService):
"""FastGPT LLM service using chatId server-side memory and workflow variables."""
Settings = FastGPTLLMSettings
def __init__(
self,
*,
api_key: str,
base_url: str,
chat_id: str | None = None,
send_system_prompt: bool = False,
greeting_prompt: str | None = None,
timeout: float = 60.0,
settings: FastGPTLLMSettings | None = None,
**kwargs,
) -> None:
default_settings = self.Settings(model="fastgpt")
if settings is not None:
default_settings.apply_update(settings)
super().__init__(settings=default_settings, **kwargs)
self._chat_id = chat_id or f"voice_{uuid.uuid4().hex[:16]}"
self._send_system_prompt = send_system_prompt
self._greeting_prompt = (greeting_prompt or "你好").strip() or "你好"
self._client = AsyncChatClient(
api_key=api_key,
base_url=base_url,
timeout=timeout,
)
self._active_response = None
@property
def chat_id(self) -> str:
return self._chat_id
def set_variables(self, variables: dict[str, Any]) -> None:
merged = dict(self._settings.variables)
merged.update(variables)
self._settings.variables = merged
async def stop(self, frame: EndFrame) -> None:
await self._close_active_response()
await self._client.close()
await super().stop(frame)
async def cancel(self, frame: CancelFrame) -> None:
await self._close_active_response()
await super().cancel(frame)
async def _close_active_response(self) -> None:
response = self._active_response
self._active_response = None
if response is not None:
await response.aclose()
def _build_fastgpt_messages(self, context: LLMContext) -> list[dict[str, str]]:
raw_messages = context.get_messages()
messages: list[dict[str, str]] = []
if self._send_system_prompt:
for message in raw_messages:
if not isinstance(message, dict) or message.get("role") != "system":
continue
text = _message_text(message)
if text:
messages.append({"role": "system", "content": text})
for message in reversed(raw_messages):
if not isinstance(message, dict) or message.get("role") != "user":
continue
text = _message_text(message)
if text:
messages.append({"role": "user", "content": text})
return messages
messages.append({"role": "user", "content": self._greeting_prompt})
return messages
async def _process_context(self, context: LLMContext) -> None:
messages = self._build_fastgpt_messages(context)
variables = self._settings.variables or None
await self.start_ttfb_metrics()
try:
response = await self._client.create_chat_completion(
messages=messages,
stream=True,
chatId=self._chat_id,
variables=variables,
detail=self._settings.detail,
)
except FastGPTError as exc:
await self.push_error(error_msg=f"FastGPT request failed: {exc}", exception=exc)
return
except httpx.HTTPError as exc:
await self.push_error(error_msg=f"FastGPT HTTP error: {exc}", exception=exc)
return
self._active_response = response
try:
async for event in aiter_stream_events(response):
if event.kind in {"data", "answer", "fastAnswer"}:
text = _extract_text_from_event(event.kind, event.data)
if text:
await self.stop_ttfb_metrics()
await self.push_frame(LLMTextFrame(text))
continue
if event.kind == "interactive" and isinstance(event, FastGPTInteractiveEvent):
await self._handle_interactive(event)
break
if event.kind == "error":
payload = event.data if isinstance(event.data, dict) else {}
message = _first_nonempty_text(
payload.get("message"),
payload.get("error"),
) or "FastGPT stream error"
await self.push_error(error_msg=message)
break
if event.kind == "done":
break
finally:
self._active_response = None
await response.aclose()
async def _handle_interactive(self, event: FastGPTInteractiveEvent) -> None:
prompt = _interactive_spoken_prompt(event)
if prompt:
await self.stop_ttfb_metrics()
await self.push_frame(LLMTextFrame(prompt))
await self.push_frame(
OutputTransportMessageFrame(
message={
"type": "response.interactive",
"interaction_type": event.interaction_type,
"data": event.data,
}
),
FrameDirection.DOWNSTREAM,
)
async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
await super().process_frame(frame, direction)
if isinstance(frame, LLMContextFrame):
try:
await self.push_frame(LLMFullResponseStartFrame())
await self.start_processing_metrics()
await self._process_context(frame.context)
except httpx.TimeoutException as exc:
await self._call_event_handler("on_completion_timeout")
await self.push_error(error_msg="FastGPT completion timeout", exception=exc)
except Exception as exc:
await self.push_error(error_msg=f"FastGPT completion error: {exc}", exception=exc)
finally:
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())
else:
await self.push_frame(frame, direction)

View File

@@ -1,5 +1,7 @@
from __future__ import annotations
import uuid
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
@@ -33,7 +35,7 @@ from .config import EngineConfig
from .protocol import ProductWebsocketSerializer
from .services import create_llm_service, create_stt_service, create_tts_service
from .text_input import ProductTextInputProcessor
from .text_stream import ProductTextStreamProcessor
from .text_stream import ProductTextStreamProcessor, sync_streamed_assistant_context
from .transcript_stream import ProductTranscriptStreamProcessor
from .turn_start import InterruptionGateUserTurnStartStrategy
@@ -72,12 +74,26 @@ async def run_pipeline_with_serializer(
)
stt = create_stt_service(config.services.stt, config.audio)
llm = create_llm_service(config.services.llm)
llm_config = config.services.llm
chat_id = llm_config.chat_id or f"voice_{uuid.uuid4().hex[:16]}"
llm = create_llm_service(
llm_config,
chat_id=chat_id,
session_variables={"session_id": chat_id, "channel": "voice"},
greeting_prompt=config.agent.greeting,
)
if llm_config.provider == "fastgpt":
logger.info(f"FastGPT chatId={chat_id}")
tts = create_tts_service(config.services.tts, config.audio)
messages = [{"role": "system", "content": config.agent.system_prompt}]
if config.agent.greeting and config.agent.greeting_mode == "generated":
messages.append({"role": "system", "content": config.agent.greeting})
use_fastgpt = llm_config.provider == "fastgpt" and not llm_config.send_system_prompt
messages: list[dict[str, str]] = []
if not use_fastgpt:
messages = [{"role": "system", "content": config.agent.system_prompt}]
if config.agent.greeting and config.agent.greeting_mode == "generated":
messages.append({"role": "system", "content": config.agent.greeting})
context = LLMContext(messages)
@@ -109,6 +125,8 @@ async def run_pipeline_with_serializer(
),
)
text_stream = ProductTextStreamProcessor()
pipeline = Pipeline(
[
transport.input(),
@@ -117,10 +135,10 @@ async def run_pipeline_with_serializer(
ProductTranscriptStreamProcessor(),
user_aggregator,
llm,
ProductTextStreamProcessor(),
assistant_aggregator,
text_stream,
tts,
transport.output(),
assistant_aggregator,
]
)
@@ -174,6 +192,14 @@ async def run_pipeline_with_serializer(
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
async def on_assistant_turn_stopped(_aggregator, message: AssistantTurnStoppedMessage):
logger.info(f"Assistant: {message.content}")
if message.interrupted:
streamed = text_stream.take_interrupted_stream_text()
if streamed:
sync_streamed_assistant_context(
_aggregator,
streamed_text=streamed,
committed_text=message.content or "",
)
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)

View File

@@ -13,6 +13,7 @@ from pipecat.services.openai.tts import VALID_VOICES, OpenAITTSService
from pipecat.transcriptions.language import Language
from .config import AudioConfig, LLMConfig, STTConfig, TTSConfig
from .fastgpt_llm import FastGPTLLMService, FastGPTLLMSettings
from .xfyun_asr import DEFAULT_XFYUN_ASR_URL, XfyunASRService
from .xfyun_tts import DEFAULT_XFYUN_TTS_URL, XfyunTTSService
@@ -46,7 +47,29 @@ def create_stt_service(config: STTConfig, audio: AudioConfig | None = None):
)
def create_llm_service(config: LLMConfig):
def create_llm_service(
config: LLMConfig,
*,
chat_id: str | None = None,
session_variables: dict | None = None,
greeting_prompt: str | None = None,
):
if config.provider == "fastgpt":
variables = {**config.variables, **(session_variables or {})}
return FastGPTLLMService(
api_key=config.api_key,
base_url=config.base_url or "http://localhost:3000",
chat_id=chat_id or config.chat_id,
send_system_prompt=config.send_system_prompt,
greeting_prompt=greeting_prompt,
timeout=config.timeout_sec,
settings=FastGPTLLMSettings(
model=config.model or "fastgpt",
variables=variables,
detail=config.detail,
),
)
_require_provider(config.provider, "openai", "llm")
return OpenAILLMService(
api_key=config.api_key or None,

View File

@@ -1,6 +1,9 @@
from __future__ import annotations
from typing import Any, Protocol
from pipecat.frames.frames import (
CancelFrame,
Frame,
InterruptionFrame,
LLMFullResponseEndFrame,
@@ -12,34 +15,95 @@ from pipecat.frames.frames import (
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class _AssistantContextSync(Protocol):
@property
def context(self) -> Any: ...
def sync_streamed_assistant_context(
aggregator: _AssistantContextSync,
*,
streamed_text: str,
committed_text: str,
) -> None:
"""Align LLM context with UI text after an interrupted assistant turn.
The assistant aggregator only commits TTS-spoken text on interrupt. Replace
or append the streamed LLM text so the next turn sees what the user saw.
"""
streamed = streamed_text.strip()
if not streamed or streamed == committed_text.strip():
return
committed = committed_text.strip()
def _apply(messages: list[dict[str, Any]]) -> list[dict[str, Any]]:
updated = list(messages)
if committed and updated:
last = updated[-1]
if isinstance(last, dict) and last.get("role") == "assistant":
content = last.get("content")
if isinstance(content, str) and content.strip() == committed:
updated[-1] = {"role": "assistant", "content": streamed}
return updated
updated.append({"role": "assistant", "content": streamed})
return updated
aggregator.context.transform_messages(_apply)
class ProductTextStreamProcessor(FrameProcessor):
"""Mirrors LLM text frames as streaming protocol events."""
"""Mirrors LLM text frames as streaming protocol events.
Placed between the LLM service and the TTS service, this processor
observes the LLM's text frames as they're emitted and forwards them
downstream as ``OutputTransportMessageUrgentFrame``s that the product
serializer turns into ``response.text.{started,delta,final}`` events.
Urgent frames bypass TTS serialization and transport audio queues so text
reaches the client at least as quickly as synthesized audio.
``TTSSpeakFrame`` (used by the fixed-greeting code path, which bypasses
the LLM entirely) is also handled: the processor synthesizes a single
started/delta/final sequence for its fixed text.
"""
def __init__(self) -> None:
super().__init__()
self._aggregation: list[str] = []
self._turn_active = False
self._interrupted_stream_text: str | None = None
def take_interrupted_stream_text(self) -> str | None:
text = self._interrupted_stream_text
self._interrupted_stream_text = None
return text
async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
await super().process_frame(frame, direction)
if isinstance(frame, LLMFullResponseStartFrame):
await self.push_frame(frame, direction)
await self._start_turn()
elif isinstance(frame, LLMTextFrame):
await self.push_frame(frame, direction)
if frame.text:
await self._delta(frame.text)
elif isinstance(frame, LLMFullResponseEndFrame):
await self.push_frame(frame, direction)
await self._end_turn(interrupted=False)
elif isinstance(frame, InterruptionFrame):
elif isinstance(frame, (InterruptionFrame, CancelFrame)):
await self.push_frame(frame, direction)
await self._end_turn(interrupted=True)
elif isinstance(frame, TTSSpeakFrame):
text = frame.text or ""
await self.push_frame(frame, direction)
await self._start_turn()
if text:
await self._delta(text)
await self._end_turn(interrupted=False)
await self.push_frame(frame, direction)
else:
await self.push_frame(frame, direction)
async def _start_turn(self) -> None:
if self._turn_active:
@@ -58,6 +122,8 @@ class ProductTextStreamProcessor(FrameProcessor):
if not self._turn_active:
return
full_text = "".join(self._aggregation)
if interrupted and full_text:
self._interrupted_stream_text = full_text
self._turn_active = False
self._aggregation = []
await self._emit(