From b14ef64665df68236a66f1ab9944963d0d822fd4 Mon Sep 17 00:00:00 2001 From: Xin Wang Date: Tue, 26 May 2026 10:56:38 +0800 Subject: [PATCH] Add configuration files for FastGPT and Xfyun voice services, enhancing LLM service capabilities. Update LLMConfig to include chat_id, variables, detail, and timeout settings. Refactor create_llm_service to support FastGPT integration and adjust pipeline to handle chat_id and greeting prompts. Implement context synchronization for interrupted assistant turns in text streaming. --- config/voice-fastgpt.example.json | 58 ++++++ config/voice-xfyun.json | 86 +++++++++ src/voice/config.py | 13 +- src/voice/fastgpt_llm.py | 300 ++++++++++++++++++++++++++++++ src/voice/pipeline.py | 40 +++- src/voice/services.py | 25 ++- src/voice/text_stream.py | 74 +++++++- 7 files changed, 583 insertions(+), 13 deletions(-) create mode 100644 config/voice-fastgpt.example.json create mode 100644 config/voice-xfyun.json create mode 100644 src/voice/fastgpt_llm.py diff --git a/config/voice-fastgpt.example.json b/config/voice-fastgpt.example.json new file mode 100644 index 0000000..c7063eb --- /dev/null +++ b/config/voice-fastgpt.example.json @@ -0,0 +1,58 @@ +{ + "server": { + "host": "0.0.0.0", + "port": 8000, + "cors_origins": ["*"] + }, + "audio": { + "sample_rate_hz": 16000, + "channels": 1, + "frame_ms": 20 + }, + "session": { + "inactivity_timeout_sec": 60 + }, + "turn": { + "vad": { + "confidence": 0.7, + "start_secs": 0.2, + "stop_secs": 0.6, + "min_volume": 0.6 + }, + "interruption_min_chars": 3, + "interruption_use_interim": true, + "user_speech_timeout_sec": 1.0 + }, + "agent": { + "system_prompt": "FastGPT app owns the system prompt when send_system_prompt is false.", + "greeting": "你好", + "greeting_mode": "generated" + }, + "services": { + "stt": { + "provider": "openai", + "api_key": "", + "base_url": null, + "model": "gpt-4o-mini-transcribe", + "language": "zh" + }, + "llm": { + "provider": "fastgpt", + "api_key": "", + "base_url": null, + "model": "my-voice-app", + "chat_id": null, + "variables": {}, + "detail": false, + "timeout_sec": 60.0, + "send_system_prompt": false + }, + "tts": { + "provider": "openai", + "api_key": "", + "base_url": null, + "model": "gpt-4o-mini-tts", + "voice": "alloy" + } + } +} diff --git a/config/voice-xfyun.json b/config/voice-xfyun.json new file mode 100644 index 0000000..5c302bd --- /dev/null +++ b/config/voice-xfyun.json @@ -0,0 +1,86 @@ +{ + "server": { + "host": "0.0.0.0", + "port": 8000, + "cors_origins": ["*"] + }, + "audio": { + "sample_rate_hz": 16000, + "channels": 1, + "frame_ms": 20 + }, + "session": { + "inactivity_timeout_sec": 60 + }, + "turn": { + "vad": { + "confidence": 0.7, + "start_secs": 0.35, + "stop_secs": 0.2, + "min_volume": 0.65 + }, + "interruption_min_chars": 3, + "interruption_use_interim": true, + "interruption_short_replies": [ + "是", + "是的", + "对", + "对的", + "嗯", + "好", + "好的", + "行", + "可以", + "没问题", + "不是", + "不", + "不行", + "不用", + "不要", + "没有", + "否" + ], + "user_speech_timeout_sec": 0.2 + }, + "agent": { + "system_prompt": "# 角色 你是一个高度集成、安全第一的交警AI接警员。正在收集事故人员伤亡情况,时间,地点,事故原因,事故车辆数量,收集完成之后和用户说再见", + "greeting": "您好,这里是无锡交警,我将为您远程处理交通事故。请将人员撤离至路侧安全区域,开启危险报警双闪灯、放置三角警告牌、做好安全防护,谨防二次事故伤害。若您已经准备好了,请点击继续办理,如需人工服务,请说转人工。", + "greeting_mode": "fixed" + }, + "services": { + "stt": { + "provider": "xfyun", + "app_id": "416ce125", + "api_key": "c65342fe603126c3610031d8429bb36d", + "api_secret": "MzkyYmI5OWEyODQzN2FiN2VhN2UzYzU4", + "base_url": "wss://iat-api.xfyun.cn/v2/iat", + "language": "zh_cn", + "domain": "iat", + "accent": "mandarin", + "encoding": "raw", + "frame_size": 1280, + "timeout_sec": 10.0 + }, + "llm": { + "provider": "openai", + "api_key": "sk-230701ff1b6143ecbf322b3170606016", + "base_url": "https://api.deepseek.com/v1", + "model": "deepseek-chat", + "temperature": 0.7 + }, + "tts": { + "provider": "xfyun", + "app_id": "416ce125", + "api_key": "c65342fe603126c3610031d8429bb36d", + "api_secret": "MzkyYmI5OWEyODQzN2FiN2VhN2UzYzU4", + "base_url": "wss://tts-api.xfyun.cn/v2/tts", + "voice": "x4_xiaoyan", + "aue": "raw", + "tte": "UTF8", + "speed": 50, + "volume": 50, + "pitch": 50, + "source_sample_rate_hz": 16000 + } + } +} diff --git a/src/voice/config.py b/src/voice/config.py index 0211f65..affbce0 100644 --- a/src/voice/config.py +++ b/src/voice/config.py @@ -107,6 +107,11 @@ class LLMConfig: base_url: str | None = None model: str = "gpt-4o-mini" temperature: float | None = 0.7 + chat_id: str | None = None + variables: dict[str, str] = field(default_factory=dict) + detail: bool = False + timeout_sec: float = 60.0 + send_system_prompt: bool = False @dataclass(frozen=True) @@ -183,6 +188,12 @@ def config_from_dict(data: dict) -> EngineConfig: if stt.get("language") == "": stt["language"] = None + llm = _dict(services.get("llm")) + if llm.get("chat_id") == "": + llm["chat_id"] = None + if not isinstance(llm.get("variables"), dict): + llm["variables"] = {} + turn = _dict(data.get("turn")) vad = _dict(turn.get("vad")) @@ -210,7 +221,7 @@ def config_from_dict(data: dict) -> EngineConfig: ), agent=AgentConfig(**agent), services=ServicesConfig( - llm=LLMConfig(**_dict(services.get("llm"))), + llm=LLMConfig(**llm), stt=STTConfig(**stt), tts=TTSConfig(**_dict(services.get("tts"))), ), diff --git a/src/voice/fastgpt_llm.py b/src/voice/fastgpt_llm.py new file mode 100644 index 0000000..4055d0c --- /dev/null +++ b/src/voice/fastgpt_llm.py @@ -0,0 +1,300 @@ +from __future__ import annotations + +import uuid +from dataclasses import dataclass, field +from typing import Any + +import httpx +from fastgpt_client import AsyncChatClient, FastGPTInteractiveEvent, aiter_stream_events +from fastgpt_client.exceptions import FastGPTError + +from pipecat.frames.frames import ( + CancelFrame, + EndFrame, + Frame, + LLMContextFrame, + LLMFullResponseEndFrame, + LLMFullResponseStartFrame, + LLMTextFrame, + OutputTransportMessageFrame, +) +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.llm_service import LLMService +from pipecat.services.settings import LLMSettings + + +def _extract_text_from_event(kind: str, payload: Any) -> str: + if not isinstance(payload, dict): + return "" + + if kind in {"answer", "fastAnswer"}: + text = payload.get("text") + if isinstance(text, str) and text: + return text + + choices = payload.get("choices") if isinstance(payload.get("choices"), list) else [] + if not choices: + return str(payload.get("text") or "") + + first_choice = choices[0] if isinstance(choices[0], dict) else {} + delta = first_choice.get("delta") if isinstance(first_choice.get("delta"), dict) else {} + content = delta.get("content") + if isinstance(content, str) and content: + return content + + message = first_choice.get("message") if isinstance(first_choice.get("message"), dict) else {} + message_content = message.get("content") + if isinstance(message_content, str) and message_content: + return message_content + + return "" + + +def _message_text(message: dict[str, Any]) -> str: + content = message.get("content") + if isinstance(content, str): + return content.strip() + if isinstance(content, list): + parts: list[str] = [] + for part in content: + if isinstance(part, dict) and part.get("type") == "text": + text = part.get("text") + if isinstance(text, str) and text.strip(): + parts.append(text.strip()) + return " ".join(parts) + return "" + + +def _first_nonempty_text(*values: Any) -> str: + for value in values: + if isinstance(value, str): + text = value.strip() + if text: + return text + return "" + + +def _interactive_spoken_prompt(event: FastGPTInteractiveEvent) -> str: + payload = event.data if isinstance(event.data, dict) else {} + params = payload.get("params") if isinstance(payload.get("params"), dict) else {} + + prompt = _first_nonempty_text( + payload.get("opener"), + params.get("opener"), + payload.get("prompt"), + params.get("prompt"), + payload.get("text"), + params.get("text"), + payload.get("title"), + params.get("title"), + payload.get("description"), + params.get("description"), + ) + if prompt: + return prompt + + if event.interaction_type == "userSelect": + raw_options = ( + params.get("userSelectOptions") + if isinstance(params.get("userSelectOptions"), list) + else [] + ) + labels: list[str] = [] + for index, raw in enumerate(raw_options, start=1): + if isinstance(raw, str) and raw.strip(): + labels.append(f"{index}. {raw.strip()}") + elif isinstance(raw, dict): + label = _first_nonempty_text(raw.get("label"), raw.get("value")) + if label: + labels.append(f"{index}. {label}") + if labels: + return "请选择:" + ",".join(labels) + return "请选择一个选项。" + + if event.interaction_type == "userInput": + input_form = params.get("inputForm") if isinstance(params.get("inputForm"), list) else [] + labels = [ + _first_nonempty_text(field.get("label"), field.get("name")) + for field in input_form + if isinstance(field, dict) + ] + labels = [label for label in labels if label] + if labels: + return "请提供以下信息:" + ",".join(labels) + return "请补充所需信息。" + + return "请继续。" + + +@dataclass +class FastGPTLLMSettings(LLMSettings): + variables: dict[str, Any] = field(default_factory=dict) + detail: bool = False + + +class FastGPTLLMService(LLMService): + """FastGPT LLM service using chatId server-side memory and workflow variables.""" + + Settings = FastGPTLLMSettings + + def __init__( + self, + *, + api_key: str, + base_url: str, + chat_id: str | None = None, + send_system_prompt: bool = False, + greeting_prompt: str | None = None, + timeout: float = 60.0, + settings: FastGPTLLMSettings | None = None, + **kwargs, + ) -> None: + default_settings = self.Settings(model="fastgpt") + if settings is not None: + default_settings.apply_update(settings) + super().__init__(settings=default_settings, **kwargs) + + self._chat_id = chat_id or f"voice_{uuid.uuid4().hex[:16]}" + self._send_system_prompt = send_system_prompt + self._greeting_prompt = (greeting_prompt or "你好").strip() or "你好" + self._client = AsyncChatClient( + api_key=api_key, + base_url=base_url, + timeout=timeout, + ) + self._active_response = None + + @property + def chat_id(self) -> str: + return self._chat_id + + def set_variables(self, variables: dict[str, Any]) -> None: + merged = dict(self._settings.variables) + merged.update(variables) + self._settings.variables = merged + + async def stop(self, frame: EndFrame) -> None: + await self._close_active_response() + await self._client.close() + await super().stop(frame) + + async def cancel(self, frame: CancelFrame) -> None: + await self._close_active_response() + await super().cancel(frame) + + async def _close_active_response(self) -> None: + response = self._active_response + self._active_response = None + if response is not None: + await response.aclose() + + def _build_fastgpt_messages(self, context: LLMContext) -> list[dict[str, str]]: + raw_messages = context.get_messages() + messages: list[dict[str, str]] = [] + + if self._send_system_prompt: + for message in raw_messages: + if not isinstance(message, dict) or message.get("role") != "system": + continue + text = _message_text(message) + if text: + messages.append({"role": "system", "content": text}) + + for message in reversed(raw_messages): + if not isinstance(message, dict) or message.get("role") != "user": + continue + text = _message_text(message) + if text: + messages.append({"role": "user", "content": text}) + return messages + + messages.append({"role": "user", "content": self._greeting_prompt}) + return messages + + async def _process_context(self, context: LLMContext) -> None: + messages = self._build_fastgpt_messages(context) + variables = self._settings.variables or None + + await self.start_ttfb_metrics() + + try: + response = await self._client.create_chat_completion( + messages=messages, + stream=True, + chatId=self._chat_id, + variables=variables, + detail=self._settings.detail, + ) + except FastGPTError as exc: + await self.push_error(error_msg=f"FastGPT request failed: {exc}", exception=exc) + return + except httpx.HTTPError as exc: + await self.push_error(error_msg=f"FastGPT HTTP error: {exc}", exception=exc) + return + + self._active_response = response + + try: + async for event in aiter_stream_events(response): + if event.kind in {"data", "answer", "fastAnswer"}: + text = _extract_text_from_event(event.kind, event.data) + if text: + await self.stop_ttfb_metrics() + await self.push_frame(LLMTextFrame(text)) + continue + + if event.kind == "interactive" and isinstance(event, FastGPTInteractiveEvent): + await self._handle_interactive(event) + break + + if event.kind == "error": + payload = event.data if isinstance(event.data, dict) else {} + message = _first_nonempty_text( + payload.get("message"), + payload.get("error"), + ) or "FastGPT stream error" + await self.push_error(error_msg=message) + break + + if event.kind == "done": + break + finally: + self._active_response = None + await response.aclose() + + async def _handle_interactive(self, event: FastGPTInteractiveEvent) -> None: + prompt = _interactive_spoken_prompt(event) + if prompt: + await self.stop_ttfb_metrics() + await self.push_frame(LLMTextFrame(prompt)) + + await self.push_frame( + OutputTransportMessageFrame( + message={ + "type": "response.interactive", + "interaction_type": event.interaction_type, + "data": event.data, + } + ), + FrameDirection.DOWNSTREAM, + ) + + async def process_frame(self, frame: Frame, direction: FrameDirection) -> None: + await super().process_frame(frame, direction) + + if isinstance(frame, LLMContextFrame): + try: + await self.push_frame(LLMFullResponseStartFrame()) + await self.start_processing_metrics() + await self._process_context(frame.context) + except httpx.TimeoutException as exc: + await self._call_event_handler("on_completion_timeout") + await self.push_error(error_msg="FastGPT completion timeout", exception=exc) + except Exception as exc: + await self.push_error(error_msg=f"FastGPT completion error: {exc}", exception=exc) + finally: + await self.stop_processing_metrics() + await self.push_frame(LLMFullResponseEndFrame()) + else: + await self.push_frame(frame, direction) diff --git a/src/voice/pipeline.py b/src/voice/pipeline.py index 4546713..0714016 100644 --- a/src/voice/pipeline.py +++ b/src/voice/pipeline.py @@ -1,5 +1,7 @@ from __future__ import annotations +import uuid + from loguru import logger from pipecat.audio.vad.silero import SileroVADAnalyzer @@ -33,7 +35,7 @@ from .config import EngineConfig from .protocol import ProductWebsocketSerializer from .services import create_llm_service, create_stt_service, create_tts_service from .text_input import ProductTextInputProcessor -from .text_stream import ProductTextStreamProcessor +from .text_stream import ProductTextStreamProcessor, sync_streamed_assistant_context from .transcript_stream import ProductTranscriptStreamProcessor from .turn_start import InterruptionGateUserTurnStartStrategy @@ -72,12 +74,26 @@ async def run_pipeline_with_serializer( ) stt = create_stt_service(config.services.stt, config.audio) - llm = create_llm_service(config.services.llm) + + llm_config = config.services.llm + chat_id = llm_config.chat_id or f"voice_{uuid.uuid4().hex[:16]}" + llm = create_llm_service( + llm_config, + chat_id=chat_id, + session_variables={"session_id": chat_id, "channel": "voice"}, + greeting_prompt=config.agent.greeting, + ) + if llm_config.provider == "fastgpt": + logger.info(f"FastGPT chatId={chat_id}") + tts = create_tts_service(config.services.tts, config.audio) - messages = [{"role": "system", "content": config.agent.system_prompt}] - if config.agent.greeting and config.agent.greeting_mode == "generated": - messages.append({"role": "system", "content": config.agent.greeting}) + use_fastgpt = llm_config.provider == "fastgpt" and not llm_config.send_system_prompt + messages: list[dict[str, str]] = [] + if not use_fastgpt: + messages = [{"role": "system", "content": config.agent.system_prompt}] + if config.agent.greeting and config.agent.greeting_mode == "generated": + messages.append({"role": "system", "content": config.agent.greeting}) context = LLMContext(messages) @@ -109,6 +125,8 @@ async def run_pipeline_with_serializer( ), ) + text_stream = ProductTextStreamProcessor() + pipeline = Pipeline( [ transport.input(), @@ -117,10 +135,10 @@ async def run_pipeline_with_serializer( ProductTranscriptStreamProcessor(), user_aggregator, llm, - ProductTextStreamProcessor(), - assistant_aggregator, + text_stream, tts, transport.output(), + assistant_aggregator, ] ) @@ -174,6 +192,14 @@ async def run_pipeline_with_serializer( @assistant_aggregator.event_handler("on_assistant_turn_stopped") async def on_assistant_turn_stopped(_aggregator, message: AssistantTurnStoppedMessage): logger.info(f"Assistant: {message.content}") + if message.interrupted: + streamed = text_stream.take_interrupted_stream_text() + if streamed: + sync_streamed_assistant_context( + _aggregator, + streamed_text=streamed, + committed_text=message.content or "", + ) runner = PipelineRunner(handle_sigint=False) await runner.run(task) diff --git a/src/voice/services.py b/src/voice/services.py index b322be6..4272003 100644 --- a/src/voice/services.py +++ b/src/voice/services.py @@ -13,6 +13,7 @@ from pipecat.services.openai.tts import VALID_VOICES, OpenAITTSService from pipecat.transcriptions.language import Language from .config import AudioConfig, LLMConfig, STTConfig, TTSConfig +from .fastgpt_llm import FastGPTLLMService, FastGPTLLMSettings from .xfyun_asr import DEFAULT_XFYUN_ASR_URL, XfyunASRService from .xfyun_tts import DEFAULT_XFYUN_TTS_URL, XfyunTTSService @@ -46,7 +47,29 @@ def create_stt_service(config: STTConfig, audio: AudioConfig | None = None): ) -def create_llm_service(config: LLMConfig): +def create_llm_service( + config: LLMConfig, + *, + chat_id: str | None = None, + session_variables: dict | None = None, + greeting_prompt: str | None = None, +): + if config.provider == "fastgpt": + variables = {**config.variables, **(session_variables or {})} + return FastGPTLLMService( + api_key=config.api_key, + base_url=config.base_url or "http://localhost:3000", + chat_id=chat_id or config.chat_id, + send_system_prompt=config.send_system_prompt, + greeting_prompt=greeting_prompt, + timeout=config.timeout_sec, + settings=FastGPTLLMSettings( + model=config.model or "fastgpt", + variables=variables, + detail=config.detail, + ), + ) + _require_provider(config.provider, "openai", "llm") return OpenAILLMService( api_key=config.api_key or None, diff --git a/src/voice/text_stream.py b/src/voice/text_stream.py index 253a9fd..2490959 100644 --- a/src/voice/text_stream.py +++ b/src/voice/text_stream.py @@ -1,6 +1,9 @@ from __future__ import annotations +from typing import Any, Protocol + from pipecat.frames.frames import ( + CancelFrame, Frame, InterruptionFrame, LLMFullResponseEndFrame, @@ -12,34 +15,95 @@ from pipecat.frames.frames import ( from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +class _AssistantContextSync(Protocol): + @property + def context(self) -> Any: ... + + +def sync_streamed_assistant_context( + aggregator: _AssistantContextSync, + *, + streamed_text: str, + committed_text: str, +) -> None: + """Align LLM context with UI text after an interrupted assistant turn. + + The assistant aggregator only commits TTS-spoken text on interrupt. Replace + or append the streamed LLM text so the next turn sees what the user saw. + """ + streamed = streamed_text.strip() + if not streamed or streamed == committed_text.strip(): + return + + committed = committed_text.strip() + + def _apply(messages: list[dict[str, Any]]) -> list[dict[str, Any]]: + updated = list(messages) + if committed and updated: + last = updated[-1] + if isinstance(last, dict) and last.get("role") == "assistant": + content = last.get("content") + if isinstance(content, str) and content.strip() == committed: + updated[-1] = {"role": "assistant", "content": streamed} + return updated + updated.append({"role": "assistant", "content": streamed}) + return updated + + aggregator.context.transform_messages(_apply) + + class ProductTextStreamProcessor(FrameProcessor): - """Mirrors LLM text frames as streaming protocol events.""" + """Mirrors LLM text frames as streaming protocol events. + + Placed between the LLM service and the TTS service, this processor + observes the LLM's text frames as they're emitted and forwards them + downstream as ``OutputTransportMessageUrgentFrame``s that the product + serializer turns into ``response.text.{started,delta,final}`` events. + + Urgent frames bypass TTS serialization and transport audio queues so text + reaches the client at least as quickly as synthesized audio. + + ``TTSSpeakFrame`` (used by the fixed-greeting code path, which bypasses + the LLM entirely) is also handled: the processor synthesizes a single + started/delta/final sequence for its fixed text. + """ def __init__(self) -> None: super().__init__() self._aggregation: list[str] = [] self._turn_active = False + self._interrupted_stream_text: str | None = None + + def take_interrupted_stream_text(self) -> str | None: + text = self._interrupted_stream_text + self._interrupted_stream_text = None + return text async def process_frame(self, frame: Frame, direction: FrameDirection) -> None: await super().process_frame(frame, direction) if isinstance(frame, LLMFullResponseStartFrame): + await self.push_frame(frame, direction) await self._start_turn() elif isinstance(frame, LLMTextFrame): + await self.push_frame(frame, direction) if frame.text: await self._delta(frame.text) elif isinstance(frame, LLMFullResponseEndFrame): + await self.push_frame(frame, direction) await self._end_turn(interrupted=False) - elif isinstance(frame, InterruptionFrame): + elif isinstance(frame, (InterruptionFrame, CancelFrame)): + await self.push_frame(frame, direction) await self._end_turn(interrupted=True) elif isinstance(frame, TTSSpeakFrame): text = frame.text or "" + await self.push_frame(frame, direction) await self._start_turn() if text: await self._delta(text) await self._end_turn(interrupted=False) - - await self.push_frame(frame, direction) + else: + await self.push_frame(frame, direction) async def _start_turn(self) -> None: if self._turn_active: @@ -58,6 +122,8 @@ class ProductTextStreamProcessor(FrameProcessor): if not self._turn_active: return full_text = "".join(self._aggregation) + if interrupted and full_text: + self._interrupted_stream_text = full_text self._turn_active = False self._aggregation = [] await self._emit(