diff --git a/engine/xfyun_asr.py b/engine/xfyun_asr.py index 3a21ca0..6b5b2d8 100644 --- a/engine/xfyun_asr.py +++ b/engine/xfyun_asr.py @@ -25,6 +25,7 @@ from pipecat.frames.frames import ( VADUserStoppedSpeakingFrame, ) from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.settings import STTSettings from pipecat.services.stt_service import STTService from pipecat.transcriptions.language import Language from pipecat.utils.time import time_now_iso8601 @@ -54,7 +55,11 @@ class XfyunASRService(STTService): open_timeout: float = 10.0, **kwargs, ) -> None: - super().__init__(sample_rate=sample_rate, **kwargs) + super().__init__( + sample_rate=sample_rate, + settings=STTSettings(model=None, language=language), + **kwargs, + ) self._app_id = app_id or os.environ.get("XFYUN_APP_ID", "") self._api_key = api_key or os.environ.get("XFYUN_API_KEY", "") self._api_secret = api_secret or os.environ.get("XFYUN_API_SECRET", "") diff --git a/engine/xfyun_tts.py b/engine/xfyun_tts.py index 1cb513e..bbaca36 100644 --- a/engine/xfyun_tts.py +++ b/engine/xfyun_tts.py @@ -5,19 +5,38 @@ import hashlib import hmac import json import os +import re +import unicodedata from collections.abc import AsyncGenerator, AsyncIterator from datetime import datetime, timezone from email.utils import format_datetime from typing import Any from urllib.parse import urlencode, urlparse +from loguru import logger + from pipecat.frames.frames import ErrorFrame, Frame +from pipecat.services.settings import TTSSettings from pipecat.services.tts_service import TTSService from websockets.asyncio.client import connect DEFAULT_XFYUN_TTS_URL = "wss://tts-api.xfyun.cn/v2/tts" +# Strip characters Xfyun's online TTS cannot synthesize. The engine silently +# rejects (or returns empty audio for) text containing emoji and other +# non-BMP symbols, which surfaces as "request finished without audio data". +_EMOJI_AND_SYMBOL_RE = re.compile( + "[" + "\U0001F300-\U0001FAFF" # misc pictographs, emoji, symbols, transport, etc. + "\U00002600-\U000027BF" # misc symbols and dingbats + "\U0001F1E6-\U0001F1FF" # regional indicators (flags) + "\uFE00-\uFE0F" # variation selectors + "\u200D" # zero-width joiner + "]", + flags=re.UNICODE, +) + class XfyunTTSService(TTSService): """iFlytek/Xfyun online TTS service for Pipecat. @@ -46,7 +65,11 @@ class XfyunTTSService(TTSService): timeout: float = 30.0, **kwargs, ) -> None: - super().__init__(sample_rate=sample_rate, **kwargs) + super().__init__( + sample_rate=sample_rate, + settings=TTSSettings(model=None, voice=voice, language=None), + **kwargs, + ) self._app_id = app_id or os.environ.get("XFYUN_APP_ID", "") self._api_key = api_key or os.environ.get("XFYUN_API_KEY", "") self._api_secret = api_secret or os.environ.get("XFYUN_API_SECRET", "") @@ -59,6 +82,7 @@ class XfyunTTSService(TTSService): self._volume = volume self._pitch = pitch self._timeout = timeout + self._last_failure_detail: str | None = None async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame, None]: if not text: @@ -68,7 +92,21 @@ class XfyunTTSService(TTSService): yield ErrorFrame(error="Xfyun TTS requires app_id, api_key, and api_secret") return - if len(text.encode("utf-8")) >= 8000: + sanitized = _sanitize_text_for_tts(text) + if not sanitized: + logger.debug( + f"{self}: skipping Xfyun TTS, text became empty after sanitization " + f"(original={text!r})" + ) + return + + if sanitized != text: + logger.debug( + f"{self}: sanitized Xfyun TTS text " + f"(original={text!r}, sanitized={sanitized!r})" + ) + + if len(sanitized.encode("utf-8")) >= 8000: yield ErrorFrame(error="Xfyun TTS text must be less than 8000 UTF-8 bytes") return @@ -77,11 +115,11 @@ class XfyunTTSService(TTSService): return try: - await self.start_tts_usage_metrics(text) + await self.start_tts_usage_metrics(sanitized) first_frame = True async for frame in self._stream_audio_frames_from_iterator( - self._iter_audio_chunks(text), + self._iter_audio_chunks(sanitized), in_sample_rate=self._source_sample_rate, context_id=context_id, ): @@ -91,7 +129,13 @@ class XfyunTTSService(TTSService): yield frame if first_frame: - yield ErrorFrame(error="Xfyun TTS request finished without audio data") + detail = self._last_failure_detail or "no audio frames received" + yield ErrorFrame( + error=( + f"Xfyun TTS request finished without audio data ({detail}); " + f"text={sanitized!r}" + ) + ) except Exception as exc: yield ErrorFrame(error=f"Xfyun TTS request failed: {exc}") @@ -99,28 +143,52 @@ class XfyunTTSService(TTSService): request = self._build_request_frame(text) auth_url = _build_auth_url(self._url, self._api_key, self._api_secret) + self._last_failure_detail = None + frames_received = 0 + audio_bytes_received = 0 + last_status: int | None = None + last_sid: str | None = None + saw_status_2 = False + async with connect(auth_url, max_size=None, open_timeout=self._timeout) as websocket: await websocket.send(json.dumps(request, ensure_ascii=False)) - async for message in websocket: - payload = json.loads(message) + async for raw_message in websocket: + frames_received += 1 + payload = json.loads(raw_message) code = payload.get("code", -1) + sid = payload.get("sid") + if sid: + last_sid = sid if code != 0: - message = payload.get("message", "unknown error") - sid = payload.get("sid") - raise RuntimeError(f"code={code}, sid={sid}, message={message}") + err_msg = payload.get("message", "unknown error") + raise RuntimeError(f"code={code}, sid={sid}, message={err_msg}") data = payload.get("data") if not isinstance(data, dict): continue + last_status = data.get("status", last_status) + audio_b64 = data.get("audio") if audio_b64: - yield base64.b64decode(audio_b64) + audio_bytes = base64.b64decode(audio_b64) + audio_bytes_received += len(audio_bytes) + yield audio_bytes if data.get("status") == 2: + saw_status_2 = True break + if audio_bytes_received == 0: + self._last_failure_detail = ( + f"frames={frames_received}, audio_bytes=0, " + f"last_status={last_status}, saw_status_2={saw_status_2}, sid={last_sid}" + ) + logger.warning( + f"{self}: Xfyun TTS produced no audio ({self._last_failure_detail})" + ) + def _build_request_frame(self, text: str) -> dict[str, Any]: business: dict[str, Any] = { "aue": self._encoding, @@ -142,6 +210,31 @@ class XfyunTTSService(TTSService): } +def _sanitize_text_for_tts(text: str) -> str: + """Strip characters Xfyun's online TTS cannot synthesize. + + The Xfyun ``/v2/tts`` engine silently drops or rejects emoji, pictographs, + dingbats, regional-indicator flags, variation selectors, and zero-width + joiners. When such characters appear in the input the synthesis can + finish without any audio data ("Xfyun TTS request finished without audio + data"). We also drop control characters (other than common whitespace) + and "Symbol, Other" codepoints, then collapse runs of whitespace. + """ + if not text: + return text + + cleaned = _EMOJI_AND_SYMBOL_RE.sub("", text) + filtered: list[str] = [] + for ch in cleaned: + category = unicodedata.category(ch) + if category == "So": + continue + if category.startswith("C") and ch not in ("\n", "\r", "\t"): + continue + filtered.append(ch) + return re.sub(r"\s+", " ", "".join(filtered)).strip() + + def _build_auth_url(url: str, api_key: str, api_secret: str) -> str: parsed = urlparse(url) host = parsed.netloc