Files
ai-video-fullstack/backend/services/pipecat/pipeline.py
Xin Wang e25dfd4003 Add support for Xfyun ASR and TTS services in the backend
- Introduce new Xfyun ASR and TTS services, enabling integration with iFlytek's voice recognition and synthesis capabilities.
- Update AssistantConfig model to include interface types for STT and TTS.
- Enhance credential testing to validate Xfyun credentials.
- Modify service factory to create Xfyun services based on configuration.
- Update README with new configuration details for Xfyun integration.
- Add new frontend components for visualizing audio streams and managing user interactions.
2026-06-11 10:51:08 +08:00

182 lines
6.2 KiB
Python

"""管线核心:给定一个 transport + 配置,跑完整的语音闭环。
关键设计:**transport 由调用方传入**,管线本身不关心是 WebRTC 还是 WS。
这就是"同时支持多种输出"的落点——加输出方式不用动这里。
对应 dograh 的 pipeline_builder.py + run_pipeline.py(已砍掉 workflow 引擎/DB/录音/指标)。
"""
from loguru import logger
from models import AssistantConfig
from services.pipecat.service_factory import create_services
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
EndFrame,
InputTextRawFrame,
InputTransportMessageFrame,
LLMMessagesAppendFrame,
OutputTransportMessageUrgentFrame,
TTSSpeakFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.turns.user_start import (
TranscriptionUserTurnStartStrategy,
VADUserTurnStartStrategy,
)
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.utils.time import time_now_iso8601
from pipecat.workers.runner import WorkerRunner
def _text_input(message) -> tuple[str, bool] | None:
"""解析现有 user-text 与 RTVI send-text 两种前端文字消息。"""
if not isinstance(message, dict):
return None
if message.get("type") == "user-text":
text = str(message.get("text") or "").strip()
return (text, True) if text else None
if message.get("type") == "send-text":
data = message.get("data")
if not isinstance(data, dict):
return None
text = str(data.get("content") or "").strip()
options = data.get("options")
run_immediately = not isinstance(options, dict) or options.get(
"run_immediately", True
)
return (text, bool(run_immediately)) if text else None
return None
class TextInputProcessor(FrameProcessor):
"""把 transport 文字消息转换成级联与实时 LLM 都能消费的帧。"""
def __init__(self):
super().__init__()
self._register_event_handler("on_text_input")
async def process_frame(self, frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if not isinstance(frame, InputTransportMessageFrame):
await self.push_frame(frame, direction)
return
parsed = _text_input(frame.message)
if not parsed:
await self.push_frame(frame, direction)
return
text, run_immediately = parsed
if run_immediately:
await self.broadcast_interruption()
await self.push_frame(
LLMMessagesAppendFrame(
messages=[{"role": "user", "content": text}],
run_llm=run_immediately,
)
)
if run_immediately:
await self.push_frame(InputTextRawFrame(text=text))
await self._call_event_handler("on_text_input", text)
async def run_pipeline(transport, cfg: AssistantConfig) -> None:
"""在给定 transport 上构建并运行管线,直到连接结束。
Args:
transport: 任意 pipecat transport(WebRTC / WS / 电话…),
只要有 .input() / .output() / event_handler 即可。
cfg: 助手配置(随请求内联传入)。
"""
logger.info(f"启动管线: assistant={cfg.name} mode={cfg.runtimeMode}")
stt, llm, tts = create_services(cfg)
context = LLMContext(messages=[{"role": "system", "content": cfg.prompt}])
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
vad_analyzer=SileroVADAnalyzer(),
user_turn_strategies=UserTurnStrategies(
start=[
VADUserTurnStartStrategy(enable_interruptions=cfg.enableInterrupt),
TranscriptionUserTurnStartStrategy(
enable_interruptions=cfg.enableInterrupt
),
]
),
),
)
text_input = TextInputProcessor()
pipeline = Pipeline(
[
transport.input(),
text_input,
stt,
user_aggregator,
llm,
tts,
transport.output(),
assistant_aggregator,
]
)
worker = PipelineWorker(
pipeline,
params=PipelineParams(
enable_metrics=False,
),
enable_rtvi=False,
)
async def queue_transcript(role: str, content: str, timestamp: str) -> None:
if content:
await worker.queue_frame(
OutputTransportMessageUrgentFrame(
message={
"type": "transcript",
"role": role,
"content": content,
"timestamp": timestamp,
},
)
)
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(_aggregator, _strategy, message):
await queue_transcript("user", message.content, message.timestamp)
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
async def on_assistant_turn_stopped(_aggregator, message):
await queue_transcript("assistant", message.content, message.timestamp)
@text_input.event_handler("on_text_input")
async def on_text_input(_processor, text):
await queue_transcript("user", text, time_now_iso8601())
@transport.event_handler("on_client_connected")
async def on_client_connected(_transport, _client):
if cfg.greeting:
await worker.queue_frame(TTSSpeakFrame(cfg.greeting))
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(_transport, _client):
logger.info("对端断开,结束管线")
await worker.queue_frame(EndFrame())
runner = WorkerRunner(handle_sigint=False)
await runner.add_workers(worker)
await runner.run()
logger.info("管线已结束")