From 404381c818784e1966dabdcb1e2ca61995eb8005 Mon Sep 17 00:00:00 2001 From: Xin Wang Date: Thu, 21 May 2026 15:42:49 +0800 Subject: [PATCH] Add xfyun asr service --- README.md | 27 +++ config/xfyun.json | 16 +- engine/config.py | 7 + engine/pipeline.py | 4 +- engine/services.py | 19 +- engine/transcript_stream.py | 30 ++++ engine/xfyun_asr.py | 335 ++++++++++++++++++++++++++++++++++++ examples/webpage/app.js | 28 ++- examples/webpage/styles.css | 4 + 9 files changed, 462 insertions(+), 8 deletions(-) create mode 100644 engine/transcript_stream.py create mode 100644 engine/xfyun_asr.py diff --git a/README.md b/README.md index fe45f64..9d88fae 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,7 @@ Returned bot audio: Returned transcripts and assistant text: ```json +{"type": "input.transcript.interim", "text": "What's the"} {"type": "input.transcript.final", "text": "What's the weather?", "user_id": "...", "timestamp": "..."} {"type": "response.text.started"} {"type": "response.text.delta", "text": "It's "} @@ -131,6 +132,32 @@ the TTS in the pipeline. `response.text.final` fires when the turn ends, carrying the full concatenated assistant text and an `interrupted` flag (true when an `input.text` or barge-in cut the turn short). +### Xfyun ASR + +The STT provider can be switched to iFlytek/Xfyun's streaming voice dictation +WebSocket API. The engine sends PCM chunks as `encoding: "raw"` and emits +`input.transcript.interim` events with the current full interim transcript as +Xfyun results arrive, followed by the existing `input.transcript.final` event. + +```json +"stt": { + "provider": "xfyun", + "app_id": "your_xfyun_app_id", + "api_key": "your_xfyun_api_key", + "api_secret": "your_xfyun_api_secret", + "base_url": "wss://iat-api.xfyun.cn/v2/iat", + "language": "zh_cn", + "domain": "iat", + "accent": "mandarin", + "encoding": "raw", + "frame_size": 1280, + "timeout_sec": 10.0 +} +``` + +Credentials may also be provided through `XFYUN_APP_ID`, `XFYUN_API_KEY`, and +`XFYUN_API_SECRET`. + ### Xfyun TTS The TTS provider can be switched to iFlytek/Xfyun's online TTS WebSocket API. diff --git a/config/xfyun.json b/config/xfyun.json index d5c8417..2a6694c 100644 --- a/config/xfyun.json +++ b/config/xfyun.json @@ -19,11 +19,17 @@ }, "services": { "stt": { - "provider": "openai", - "api_key": "sk-uudpgflahqqjbofhgcbwjjefgwhvwwmxgeyehcueqlemwavq", - "base_url": "https://api.siliconflow.cn/v1", - "model": "TeleAI/TeleSpeechASR", - "language": "zh" + "provider": "xfyun", + "app_id": "416ce125", + "api_key": "c65342fe603126c3610031d8429bb36d", + "api_secret": "MzkyYmI5OWEyODQzN2FiN2VhN2UzYzU4", + "base_url": "wss://iat-api.xfyun.cn/v2/iat", + "language": "zh_cn", + "domain": "iat", + "accent": "mandarin", + "encoding": "raw", + "frame_size": 1280, + "timeout_sec": 10.0 }, "llm": { "provider": "openai", diff --git a/engine/config.py b/engine/config.py index b00a6ec..71d2160 100644 --- a/engine/config.py +++ b/engine/config.py @@ -47,10 +47,17 @@ class LLMConfig: @dataclass(frozen=True) class STTConfig: provider: str = "openai" + app_id: str = "" api_key: str = "" + api_secret: str = "" base_url: str | None = None model: str = "gpt-4o-mini-transcribe" language: str | None = "en" + domain: str = "iat" + accent: str = "mandarin" + encoding: str = "raw" + frame_size: int = 1280 + timeout_sec: float = 10.0 @dataclass(frozen=True) diff --git a/engine/pipeline.py b/engine/pipeline.py index b7eb525..525ac9e 100644 --- a/engine/pipeline.py +++ b/engine/pipeline.py @@ -30,6 +30,7 @@ from .product_protocol import ProductWebsocketSerializer from .services import create_llm_service, create_stt_service, create_tts_service from .text_input import ProductTextInputProcessor from .text_stream import ProductTextStreamProcessor +from .transcript_stream import ProductTranscriptStreamProcessor async def run_voice_pipeline(websocket, config: EngineConfig) -> None: @@ -74,7 +75,7 @@ async def run_pipeline_with_serializer( ), ) - stt = create_stt_service(config.services.stt) + stt = create_stt_service(config.services.stt, config.audio) llm = create_llm_service(config.services.llm) tts = create_tts_service(config.services.tts, config.audio) @@ -93,6 +94,7 @@ async def run_pipeline_with_serializer( transport.input(), ProductTextInputProcessor(), stt, + ProductTranscriptStreamProcessor(), user_aggregator, llm, ProductTextStreamProcessor(), diff --git a/engine/services.py b/engine/services.py index 8509c2a..046de91 100644 --- a/engine/services.py +++ b/engine/services.py @@ -13,10 +13,27 @@ from pipecat.services.openai.tts import VALID_VOICES, OpenAITTSService from pipecat.transcriptions.language import Language from .config import AudioConfig, LLMConfig, STTConfig, TTSConfig +from .xfyun_asr import DEFAULT_XFYUN_ASR_URL, XfyunASRService from .xfyun_tts import DEFAULT_XFYUN_TTS_URL, XfyunTTSService -def create_stt_service(config: STTConfig): +def create_stt_service(config: STTConfig, audio: AudioConfig | None = None): + if config.provider == "xfyun": + sample_rate = audio.sample_rate_hz if audio else 16000 + return XfyunASRService( + app_id=config.app_id, + api_key=config.api_key or "", + api_secret=config.api_secret, + url=config.base_url or DEFAULT_XFYUN_ASR_URL, + language=config.language or "zh_cn", + domain=config.domain, + accent=config.accent, + sample_rate=sample_rate, + encoding=config.encoding, + frame_size=config.frame_size, + open_timeout=config.timeout_sec, + ) + _require_provider(config.provider, "openai", "stt") return OpenAISTTService( api_key=config.api_key or None, diff --git a/engine/transcript_stream.py b/engine/transcript_stream.py new file mode 100644 index 0000000..e44bb0a --- /dev/null +++ b/engine/transcript_stream.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +from pipecat.frames.frames import ( + Frame, + InterimTranscriptionFrame, + OutputTransportMessageUrgentFrame, +) +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor + + +class ProductTranscriptStreamProcessor(FrameProcessor): + """Mirrors interim STT frames to the product websocket protocol.""" + + async def process_frame(self, frame: Frame, direction: FrameDirection) -> None: + await super().process_frame(frame, direction) + + if isinstance(frame, InterimTranscriptionFrame): + await self.push_frame( + OutputTransportMessageUrgentFrame( + message={ + "type": "input.transcript.interim", + "text": frame.text, + "user_id": frame.user_id, + "timestamp": frame.timestamp, + } + ), + FrameDirection.DOWNSTREAM, + ) + + await self.push_frame(frame, direction) diff --git a/engine/xfyun_asr.py b/engine/xfyun_asr.py new file mode 100644 index 0000000..3a21ca0 --- /dev/null +++ b/engine/xfyun_asr.py @@ -0,0 +1,335 @@ +from __future__ import annotations + +import asyncio +import base64 +import hashlib +import hmac +import json +import os +from collections.abc import AsyncGenerator +from datetime import datetime, timezone +from email.utils import format_datetime +from typing import Any +from urllib.parse import urlencode, urlparse + +from loguru import logger + +from pipecat.frames.frames import ( + CancelFrame, + EndFrame, + ErrorFrame, + Frame, + InterimTranscriptionFrame, + TranscriptionFrame, + VADUserStartedSpeakingFrame, + VADUserStoppedSpeakingFrame, +) +from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.stt_service import STTService +from pipecat.transcriptions.language import Language +from pipecat.utils.time import time_now_iso8601 +from websockets.asyncio.client import connect as websocket_connect +from websockets.protocol import State + + +DEFAULT_XFYUN_ASR_URL = "wss://iat-api.xfyun.cn/v2/iat" + + +class XfyunASRService(STTService): + """iFlytek/Xfyun streaming voice dictation service for Pipecat.""" + + def __init__( + self, + *, + app_id: str, + api_key: str, + api_secret: str, + url: str | None = None, + language: str = "zh_cn", + domain: str = "iat", + accent: str = "mandarin", + sample_rate: int = 16000, + encoding: str = "raw", + frame_size: int = 1280, + open_timeout: float = 10.0, + **kwargs, + ) -> None: + super().__init__(sample_rate=sample_rate, **kwargs) + self._app_id = app_id or os.environ.get("XFYUN_APP_ID", "") + self._api_key = api_key or os.environ.get("XFYUN_API_KEY", "") + self._api_secret = api_secret or os.environ.get("XFYUN_API_SECRET", "") + self._url = url or DEFAULT_XFYUN_ASR_URL + self._language = language + self._domain = domain + self._accent = accent + self._encoding = encoding + self._frame_size = frame_size + self._open_timeout = open_timeout + + self._websocket = None + self._receive_task = None + self._audio_buffer = bytearray() + self._sent_first_frame = False + self._sent_final_frame = False + self._partials: list[str] = [] + self._last_text = "" + + async def cleanup(self) -> None: + await self._close_utterance() + await super().cleanup() + + async def stop(self, frame: EndFrame) -> None: + await self._close_utterance() + await super().stop(frame) + + async def cancel(self, frame: CancelFrame) -> None: + await self._close_utterance() + await super().cancel(frame) + + async def process_frame(self, frame: Frame, direction: FrameDirection) -> None: + await super().process_frame(frame, direction) + + if isinstance(frame, VADUserStartedSpeakingFrame): + await self._start_utterance() + elif isinstance(frame, VADUserStoppedSpeakingFrame): + await self._finish_utterance() + + async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame | None, None]: + if not audio: + yield None + return + + if not self._websocket or self._websocket.state is not State.OPEN: + await self._start_utterance() + + self._audio_buffer.extend(audio) + await self._flush_audio_buffer(final=False) + yield None + + async def _start_utterance(self) -> None: + if self._websocket and self._websocket.state is State.OPEN: + return + + if not self._app_id or not self._api_key or not self._api_secret: + await self.push_error("Xfyun ASR requires app_id, api_key, and api_secret") + return + + if self.sample_rate not in (8000, 16000): + await self.push_error("Xfyun ASR sample rate must be 8000 or 16000") + return + + self._audio_buffer.clear() + self._partials = [] + self._last_text = "" + self._sent_first_frame = False + self._sent_final_frame = False + + auth_url = _build_auth_url(self._url, self._api_key, self._api_secret) + try: + self._websocket = await websocket_connect( + auth_url, + max_size=None, + open_timeout=self._open_timeout, + ) + except Exception as exc: + await self.push_error(f"Xfyun ASR connection failed: {exc}", exception=exc) + self._websocket = None + return + + self._receive_task = self.create_task( + self._receive_messages(), + name="xfyun_asr_receive", + ) + + async def _finish_utterance(self) -> None: + if not self._websocket or self._websocket.state is not State.OPEN: + return + + await self._flush_audio_buffer(final=True) + if not self._sent_first_frame: + await self._close_utterance() + return + + if not self._sent_final_frame: + await self._send_payload({"data": {"status": 2}}) + self.request_finalize() + self._sent_final_frame = True + + async def _close_utterance(self) -> None: + current_task = asyncio.current_task() + if self._receive_task and self._receive_task is not current_task: + await self.cancel_task(self._receive_task) + self._receive_task = None + + websocket = self._websocket + self._websocket = None + if websocket and websocket.state is State.OPEN: + try: + await websocket.close() + except Exception: + pass + + self._audio_buffer.clear() + self._sent_first_frame = False + self._sent_final_frame = False + + async def _flush_audio_buffer(self, *, final: bool) -> None: + while len(self._audio_buffer) >= self._frame_size: + chunk = bytes(self._audio_buffer[: self._frame_size]) + del self._audio_buffer[: self._frame_size] + await self._send_audio_chunk(chunk, status=1) + + if final and self._audio_buffer: + chunk = bytes(self._audio_buffer) + self._audio_buffer.clear() + await self._send_audio_chunk(chunk, status=1) + + async def _send_audio_chunk(self, audio: bytes, *, status: int) -> None: + if not audio: + return + + if not self._sent_first_frame: + payload = { + "common": {"app_id": self._app_id}, + "business": { + "language": self._language, + "domain": self._domain, + "accent": self._accent, + }, + "data": { + "status": 0, + "format": f"audio/L16;rate={self.sample_rate}", + "encoding": self._encoding, + "audio": base64.b64encode(audio).decode("utf-8"), + }, + } + self._sent_first_frame = True + else: + payload = { + "data": { + "status": status, + "format": f"audio/L16;rate={self.sample_rate}", + "encoding": self._encoding, + "audio": base64.b64encode(audio).decode("utf-8"), + } + } + + await self._send_payload(payload) + + async def _send_payload(self, payload: dict[str, Any]) -> None: + if not self._websocket or self._websocket.state is not State.OPEN: + return + await self._websocket.send(json.dumps(payload, ensure_ascii=False)) + + async def _receive_messages(self) -> None: + websocket = self._websocket + if not websocket: + return + + try: + async for message in websocket: + await self._process_response(json.loads(message)) + except Exception as exc: + if self._websocket is websocket: + await self.push_error(f"Xfyun ASR receive failed: {exc}", exception=exc) + finally: + if self._websocket is websocket: + self._websocket = None + self._receive_task = None + + async def _process_response(self, payload: dict[str, Any]) -> None: + code = payload.get("code", -1) + if code != 0: + message = payload.get("message", "unknown error") + sid = payload.get("sid") + await self.push_error(f"Xfyun ASR error code={code}, sid={sid}, message={message}") + return + + data = payload.get("data") + if not isinstance(data, dict): + return + + recognition = data.get("result") + if isinstance(recognition, dict): + text = self._apply_recognition_result(recognition) + if text and text != self._last_text: + self._last_text = text + await self.push_frame( + InterimTranscriptionFrame( + text, + self._user_id, + time_now_iso8601(), + _language_or_none(self._language), + result=payload, + ) + ) + + if data.get("status") == 2: + final_text = self._last_text + if final_text: + self.confirm_finalize() + await self.push_frame( + TranscriptionFrame( + final_text, + self._user_id, + time_now_iso8601(), + _language_or_none(self._language), + result=payload, + ) + ) + await self._close_utterance() + + def _apply_recognition_result(self, recognition: dict[str, Any]) -> str: + partial = _extract_text_from_result(recognition) + if not partial: + return self._last_text + + if recognition.get("pgs") == "rpl" and recognition.get("rg"): + start, end = recognition["rg"] + if 1 <= start <= len(self._partials): + self._partials[start - 1 : end] = [partial] + else: + logger.debug(f"Ignoring out-of-range Xfyun replacement rg={recognition['rg']}") + else: + self._partials.append(partial) + + return "".join(self._partials) + + +def _extract_text_from_result(result: dict[str, Any]) -> str: + words: list[str] = [] + for item in result.get("ws", []): + for candidate in item.get("cw", []): + word = candidate.get("w") + if word: + words.append(word) + return "".join(words) + + +def _build_auth_url(url: str, api_key: str, api_secret: str) -> str: + parsed = urlparse(url) + host = parsed.netloc + path = parsed.path or "/v2/iat" + date = format_datetime(datetime.now(timezone.utc), usegmt=True) + request_line = f"GET {path} HTTP/1.1" + signature_origin = f"host: {host}\ndate: {date}\n{request_line}" + signature_sha = hmac.new( + api_secret.encode("utf-8"), + signature_origin.encode("utf-8"), + digestmod=hashlib.sha256, + ).digest() + signature = base64.b64encode(signature_sha).decode("utf-8") + authorization_origin = ( + f'api_key="{api_key}", algorithm="hmac-sha256", ' + f'headers="host date request-line", signature="{signature}"' + ) + authorization = base64.b64encode(authorization_origin.encode("utf-8")).decode("utf-8") + query = urlencode({"authorization": authorization, "date": date, "host": host}) + return f"{url}?{query}" + + +def _language_or_none(value: str) -> Language | None: + try: + return Language(value) + except ValueError: + return None diff --git a/examples/webpage/app.js b/examples/webpage/app.js index 7d3638c..ec618f4 100644 --- a/examples/webpage/app.js +++ b/examples/webpage/app.js @@ -60,6 +60,7 @@ const state = { botUiTimer: null, // Chat state. + currentUserBubble: null, currentAssistantBubble: null, // VU meter smoothing. @@ -151,6 +152,7 @@ function scrollChatToBottom() { function clearChat() { els.chatLog.innerHTML = ""; + state.currentUserBubble = null; state.currentAssistantBubble = null; const empty = document.createElement("div"); empty.className = "chat__empty"; @@ -531,7 +533,27 @@ function resetPlaybackClock() { function handleUserTranscript(text) { if (!text) return; state.currentAssistantBubble = null; - addBubble("user", text); + if (state.currentUserBubble) { + const body = state.currentUserBubble.querySelector(".bubble__text"); + body.textContent = text; + state.currentUserBubble.classList.remove("bubble--interim"); + } else { + addBubble("user", text); + } + state.currentUserBubble = null; +} + +function handleUserTranscriptInterim(text) { + if (!text) return; + state.currentAssistantBubble = null; + if (!state.currentUserBubble) { + state.currentUserBubble = addBubble("user", text); + state.currentUserBubble.classList.add("bubble--interim"); + } else { + const body = state.currentUserBubble.querySelector(".bubble__text"); + body.textContent = text; + } + scrollChatToBottom(); } function sendText(text) { @@ -553,6 +575,7 @@ function sendText(text) { // reply will be created when `response.text.started` arrives. wsSend(JSON.stringify(message)); stopPlaybackQueue(); + state.currentUserBubble = null; addBubble("user", value); return true; } @@ -627,6 +650,9 @@ function handleEvent(event) { case "input.transcript.final": handleUserTranscript(event.text); break; + case "input.transcript.interim": + handleUserTranscriptInterim(event.text); + break; case "transport.message": // Reserved for future structured messages; ignore silently. break; diff --git a/examples/webpage/styles.css b/examples/webpage/styles.css index 28ede32..6204693 100644 --- a/examples/webpage/styles.css +++ b/examples/webpage/styles.css @@ -239,6 +239,10 @@ body { border-bottom-right-radius: 4px; } +.bubble--user.bubble--interim { + opacity: 0.75; +} + .bubble--assistant { background: var(--assistant); color: var(--text);