a basic duplex agent is built

This commit is contained in:
Xin Wang
2026-01-29 16:36:46 +08:00
parent ac0c76e6e8
commit d6d0ade33e
6 changed files with 432 additions and 15 deletions

View File

@@ -48,6 +48,12 @@ class Settings(BaseSettings):
siliconflow_api_key: Optional[str] = Field(default=None, description="SiliconFlow API key") siliconflow_api_key: Optional[str] = Field(default=None, description="SiliconFlow API key")
siliconflow_tts_model: str = Field(default="FunAudioLLM/CosyVoice2-0.5B", description="SiliconFlow TTS model") siliconflow_tts_model: str = Field(default="FunAudioLLM/CosyVoice2-0.5B", description="SiliconFlow TTS model")
# ASR Configuration
asr_provider: str = Field(default="siliconflow", description="ASR provider (siliconflow, buffered)")
siliconflow_asr_model: str = Field(default="FunAudioLLM/SenseVoiceSmall", description="SiliconFlow ASR model")
asr_interim_interval_ms: int = Field(default=500, description="Interval for interim ASR results in ms")
asr_min_audio_ms: int = Field(default=300, description="Minimum audio duration before first ASR result")
# Duplex Pipeline Configuration # Duplex Pipeline Configuration
duplex_enabled: bool = Field(default=True, description="Enable duplex voice pipeline") duplex_enabled: bool = Field(default=True, description="Enable duplex voice pipeline")
duplex_greeting: Optional[str] = Field(default=None, description="Optional greeting message") duplex_greeting: Optional[str] = Field(default=None, description="Optional greeting message")

View File

@@ -25,6 +25,7 @@ from services.llm import OpenAILLMService, MockLLMService
from services.tts import EdgeTTSService, MockTTSService from services.tts import EdgeTTSService, MockTTSService
from services.asr import BufferedASRService from services.asr import BufferedASRService
from services.siliconflow_tts import SiliconFlowTTSService from services.siliconflow_tts import SiliconFlowTTSService
from services.siliconflow_asr import SiliconFlowASRService
from app.config import settings from app.config import settings
@@ -90,7 +91,10 @@ class DuplexPipeline:
# Initialize services # Initialize services
self.llm_service = llm_service self.llm_service = llm_service
self.tts_service = tts_service self.tts_service = tts_service
self.asr_service = asr_service or BufferedASRService() self.asr_service = asr_service # Will be initialized in start()
# Track last sent transcript to avoid duplicates
self._last_sent_transcript = ""
# Conversation manager # Conversation manager
self.conversation = ConversationManager( self.conversation = ConversationManager(
@@ -148,6 +152,23 @@ class DuplexPipeline:
await self.tts_service.connect() await self.tts_service.connect()
# Connect ASR service # Connect ASR service
if not self.asr_service:
if settings.asr_provider == "siliconflow" and settings.siliconflow_api_key:
self.asr_service = SiliconFlowASRService(
api_key=settings.siliconflow_api_key,
model=settings.siliconflow_asr_model,
sample_rate=settings.sample_rate,
interim_interval_ms=settings.asr_interim_interval_ms,
min_audio_for_interim_ms=settings.asr_min_audio_ms,
on_transcript=self._on_transcript_callback
)
logger.info("Using SiliconFlow ASR service")
else:
self.asr_service = BufferedASRService(
sample_rate=settings.sample_rate
)
logger.info("Using Buffered ASR service (no real transcription)")
await self.asr_service.connect() await self.asr_service.connect()
logger.info("DuplexPipeline services connected") logger.info("DuplexPipeline services connected")
@@ -205,7 +226,10 @@ class DuplexPipeline:
self._audio_buffer += pcm_bytes self._audio_buffer += pcm_bytes
await self.asr_service.send_audio(pcm_bytes) await self.asr_service.send_audio(pcm_bytes)
# 4. Check for End of Utterance # For SiliconFlow ASR, trigger interim transcription periodically
# The service handles timing internally via start_interim_transcription()
# 4. Check for End of Utterance - this triggers LLM response
if self.eou_detector.process(vad_status): if self.eou_detector.process(vad_status):
await self._on_end_of_utterance() await self._on_end_of_utterance()
@@ -237,12 +261,47 @@ class DuplexPipeline:
"""Interrupt current bot speech (manual interrupt command).""" """Interrupt current bot speech (manual interrupt command)."""
await self._handle_barge_in() await self._handle_barge_in()
async def _on_transcript_callback(self, text: str, is_final: bool) -> None:
"""
Callback for ASR transcription results.
Streams transcription to client for display.
Args:
text: Transcribed text
is_final: Whether this is the final transcription
"""
# Avoid sending duplicate transcripts
if text == self._last_sent_transcript and not is_final:
return
self._last_sent_transcript = text
# Send transcript event to client
await self.transport.send_event({
"event": "transcript",
"trackId": self.session_id,
"text": text,
"isFinal": is_final,
"timestamp": self._get_timestamp_ms()
})
logger.debug(f"Sent transcript ({'final' if is_final else 'interim'}): {text[:50]}...")
async def _on_speech_start(self) -> None: async def _on_speech_start(self) -> None:
"""Handle user starting to speak.""" """Handle user starting to speak."""
if self.conversation.state == ConversationState.IDLE: if self.conversation.state == ConversationState.IDLE:
await self.conversation.start_user_turn() await self.conversation.start_user_turn()
self._audio_buffer = b"" self._audio_buffer = b""
self._last_sent_transcript = ""
self.eou_detector.reset() self.eou_detector.reset()
# Clear ASR buffer and start interim transcriptions
if hasattr(self.asr_service, 'clear_buffer'):
self.asr_service.clear_buffer()
if hasattr(self.asr_service, 'start_interim_transcription'):
await self.asr_service.start_interim_transcription()
logger.debug("User speech started") logger.debug("User speech started")
async def _on_end_of_utterance(self) -> None: async def _on_end_of_utterance(self) -> None:
@@ -250,25 +309,36 @@ class DuplexPipeline:
if self.conversation.state != ConversationState.LISTENING: if self.conversation.state != ConversationState.LISTENING:
return return
# Get transcribed text (if using ASR that provides it) # Stop interim transcriptions
if hasattr(self.asr_service, 'stop_interim_transcription'):
await self.asr_service.stop_interim_transcription()
# Get final transcription from ASR service
user_text = "" user_text = ""
if hasattr(self.asr_service, 'get_and_clear_text'):
if hasattr(self.asr_service, 'get_final_transcription'):
# SiliconFlow ASR - get final transcription
user_text = await self.asr_service.get_final_transcription()
elif hasattr(self.asr_service, 'get_and_clear_text'):
# Buffered ASR - get accumulated text
user_text = self.asr_service.get_and_clear_text() user_text = self.asr_service.get_and_clear_text()
# If no ASR text, we could use the audio buffer for external ASR # Skip if no meaningful text
# For now, just use placeholder if no ASR text if not user_text or not user_text.strip():
if not user_text: logger.debug("EOU detected but no transcription - skipping")
# In a real implementation, you'd send audio_buffer to ASR here # Reset for next utterance
# For demo purposes, use mock text self._audio_buffer = b""
user_text = "[User speech detected]" self._last_sent_transcript = ""
logger.warning("No ASR text available - using placeholder") await self.conversation.start_user_turn()
return
logger.info(f"EOU detected - user said: {user_text[:50]}...") logger.info(f"EOU detected - user said: {user_text[:100]}...")
# Clear buffers # Clear buffers
self._audio_buffer = b"" self._audio_buffer = b""
self._last_sent_transcript = ""
# Process the turn # Process the turn - trigger LLM response
await self.conversation.end_user_turn(user_text) await self.conversation.end_user_turn(user_text)
self._current_turn_task = asyncio.create_task(self._handle_turn(user_text)) self._current_turn_task = asyncio.create_task(self._handle_turn(user_text))

View File

@@ -269,6 +269,15 @@ class MicrophoneClient:
print("← User speech detected") print("← User speech detected")
elif event_type == "silence": elif event_type == "silence":
print("← User silence detected") print("← User silence detected")
elif event_type == "transcript":
# Display user speech transcription
text = event.get("text", "")
is_final = event.get("isFinal", False)
if is_final:
print(f"← You said: {text}")
else:
# Interim result - show with indicator
print(f"← [listening] {text}", end="\r")
elif event_type == "trackStart": elif event_type == "trackStart":
print("← Bot started speaking") print("← Bot started speaking")
# Clear any old audio in buffer # Clear any old audio in buffer

View File

@@ -129,11 +129,21 @@ class SimpleVoiceClient:
# JSON event # JSON event
event = json.loads(msg) event = json.loads(msg)
etype = event.get("event", "?") etype = event.get("event", "?")
print(f"<- {etype}")
if etype == "hangup": if etype == "transcript":
# User speech transcription
text = event.get("text", "")
is_final = event.get("isFinal", False)
if is_final:
print(f"<- You said: {text}")
else:
print(f"<- [listening] {text}", end="\r")
elif etype == "hangup":
print(f"<- {etype}")
self.running = False self.running = False
break break
else:
print(f"<- {etype}")
except asyncio.TimeoutError: except asyncio.TimeoutError:
continue continue

View File

@@ -15,6 +15,8 @@ from services.base import (
from services.llm import OpenAILLMService, MockLLMService from services.llm import OpenAILLMService, MockLLMService
from services.tts import EdgeTTSService, MockTTSService from services.tts import EdgeTTSService, MockTTSService
from services.asr import BufferedASRService, MockASRService from services.asr import BufferedASRService, MockASRService
from services.siliconflow_asr import SiliconFlowASRService
from services.siliconflow_tts import SiliconFlowTTSService
from services.realtime import RealtimeService, RealtimeConfig, RealtimePipeline from services.realtime import RealtimeService, RealtimeConfig, RealtimePipeline
__all__ = [ __all__ = [
@@ -35,6 +37,9 @@ __all__ = [
# ASR # ASR
"BufferedASRService", "BufferedASRService",
"MockASRService", "MockASRService",
"SiliconFlowASRService",
# TTS (SiliconFlow)
"SiliconFlowTTSService",
# Realtime # Realtime
"RealtimeService", "RealtimeService",
"RealtimeConfig", "RealtimeConfig",

317
services/siliconflow_asr.py Normal file
View File

@@ -0,0 +1,317 @@
"""SiliconFlow ASR (Automatic Speech Recognition) Service.
Uses the SiliconFlow API for speech-to-text transcription.
API: https://docs.siliconflow.cn/cn/api-reference/audio/create-audio-transcriptions
"""
import asyncio
import io
import wave
from typing import AsyncIterator, Optional, Callable, Awaitable
from loguru import logger
try:
import aiohttp
AIOHTTP_AVAILABLE = True
except ImportError:
AIOHTTP_AVAILABLE = False
logger.warning("aiohttp not available - SiliconFlowASRService will not work")
from services.base import BaseASRService, ASRResult, ServiceState
class SiliconFlowASRService(BaseASRService):
"""
SiliconFlow ASR service for speech-to-text transcription.
Features:
- Buffers incoming audio chunks
- Provides interim transcriptions periodically (for streaming to client)
- Final transcription on EOU
API Details:
- Endpoint: POST https://api.siliconflow.cn/v1/audio/transcriptions
- Models: FunAudioLLM/SenseVoiceSmall (default), TeleAI/TeleSpeechASR
- Input: Audio file (multipart/form-data)
- Output: {"text": "transcribed text"}
"""
# Supported models
MODELS = {
"sensevoice": "FunAudioLLM/SenseVoiceSmall",
"telespeech": "TeleAI/TeleSpeechASR",
}
API_URL = "https://api.siliconflow.cn/v1/audio/transcriptions"
def __init__(
self,
api_key: str,
model: str = "FunAudioLLM/SenseVoiceSmall",
sample_rate: int = 16000,
language: str = "auto",
interim_interval_ms: int = 500, # How often to send interim results
min_audio_for_interim_ms: int = 300, # Min audio before first interim
on_transcript: Optional[Callable[[str, bool], Awaitable[None]]] = None
):
"""
Initialize SiliconFlow ASR service.
Args:
api_key: SiliconFlow API key
model: ASR model name or alias
sample_rate: Audio sample rate (16000 recommended)
language: Language code (auto for automatic detection)
interim_interval_ms: How often to generate interim transcriptions
min_audio_for_interim_ms: Minimum audio duration before first interim
on_transcript: Callback for transcription results (text, is_final)
"""
super().__init__(sample_rate=sample_rate, language=language)
if not AIOHTTP_AVAILABLE:
raise RuntimeError("aiohttp is required for SiliconFlowASRService")
self.api_key = api_key
self.model = self.MODELS.get(model.lower(), model)
self.interim_interval_ms = interim_interval_ms
self.min_audio_for_interim_ms = min_audio_for_interim_ms
self.on_transcript = on_transcript
# Session
self._session: Optional[aiohttp.ClientSession] = None
# Audio buffer
self._audio_buffer: bytes = b""
self._current_text: str = ""
self._last_interim_time: float = 0
# Transcript queue for async iteration
self._transcript_queue: asyncio.Queue[ASRResult] = asyncio.Queue()
# Background task for interim results
self._interim_task: Optional[asyncio.Task] = None
self._running = False
logger.info(f"SiliconFlowASRService initialized with model: {self.model}")
async def connect(self) -> None:
"""Connect to the service."""
self._session = aiohttp.ClientSession(
headers={
"Authorization": f"Bearer {self.api_key}"
}
)
self._running = True
self.state = ServiceState.CONNECTED
logger.info("SiliconFlowASRService connected")
async def disconnect(self) -> None:
"""Disconnect and cleanup."""
self._running = False
if self._interim_task:
self._interim_task.cancel()
try:
await self._interim_task
except asyncio.CancelledError:
pass
self._interim_task = None
if self._session:
await self._session.close()
self._session = None
self._audio_buffer = b""
self._current_text = ""
self.state = ServiceState.DISCONNECTED
logger.info("SiliconFlowASRService disconnected")
async def send_audio(self, audio: bytes) -> None:
"""
Buffer incoming audio data.
Args:
audio: PCM audio data (16-bit, mono)
"""
self._audio_buffer += audio
async def transcribe_buffer(self, is_final: bool = False) -> Optional[str]:
"""
Transcribe current audio buffer.
Args:
is_final: Whether this is the final transcription
Returns:
Transcribed text or None if not enough audio
"""
if not self._session:
logger.warning("ASR session not connected")
return None
# Check minimum audio duration
audio_duration_ms = len(self._audio_buffer) / (self.sample_rate * 2) * 1000
if not is_final and audio_duration_ms < self.min_audio_for_interim_ms:
return None
if audio_duration_ms < 100: # Less than 100ms - too short
return None
try:
# Convert PCM to WAV in memory
wav_buffer = io.BytesIO()
with wave.open(wav_buffer, 'wb') as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2) # 16-bit
wav_file.setframerate(self.sample_rate)
wav_file.writeframes(self._audio_buffer)
wav_buffer.seek(0)
wav_data = wav_buffer.read()
# Send to API
form_data = aiohttp.FormData()
form_data.add_field(
'file',
wav_data,
filename='audio.wav',
content_type='audio/wav'
)
form_data.add_field('model', self.model)
async with self._session.post(self.API_URL, data=form_data) as response:
if response.status == 200:
result = await response.json()
text = result.get("text", "").strip()
if text:
self._current_text = text
# Notify via callback
if self.on_transcript:
await self.on_transcript(text, is_final)
# Queue result
await self._transcript_queue.put(
ASRResult(text=text, is_final=is_final)
)
logger.debug(f"ASR {'final' if is_final else 'interim'}: {text[:50]}...")
return text
else:
error_text = await response.text()
logger.error(f"ASR API error {response.status}: {error_text}")
return None
except Exception as e:
logger.error(f"ASR transcription error: {e}")
return None
async def get_final_transcription(self) -> str:
"""
Get final transcription and clear buffer.
Call this when EOU is detected.
Returns:
Final transcribed text
"""
# Transcribe full buffer as final
text = await self.transcribe_buffer(is_final=True)
# Clear buffer
result = text or self._current_text
self._audio_buffer = b""
self._current_text = ""
return result
def get_and_clear_text(self) -> str:
"""
Get accumulated text and clear buffer.
Compatible with BufferedASRService interface.
"""
text = self._current_text
self._current_text = ""
self._audio_buffer = b""
return text
def get_audio_buffer(self) -> bytes:
"""Get current audio buffer."""
return self._audio_buffer
def get_audio_duration_ms(self) -> float:
"""Get current audio buffer duration in milliseconds."""
return len(self._audio_buffer) / (self.sample_rate * 2) * 1000
def clear_buffer(self) -> None:
"""Clear audio and text buffers."""
self._audio_buffer = b""
self._current_text = ""
async def receive_transcripts(self) -> AsyncIterator[ASRResult]:
"""
Async iterator for transcription results.
Yields:
ASRResult with text and is_final flag
"""
while self._running:
try:
result = await asyncio.wait_for(
self._transcript_queue.get(),
timeout=0.1
)
yield result
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
break
async def start_interim_transcription(self) -> None:
"""
Start background task for interim transcriptions.
This periodically transcribes buffered audio for
real-time feedback to the user.
"""
if self._interim_task and not self._interim_task.done():
return
self._interim_task = asyncio.create_task(self._interim_loop())
async def stop_interim_transcription(self) -> None:
"""Stop interim transcription task."""
if self._interim_task:
self._interim_task.cancel()
try:
await self._interim_task
except asyncio.CancelledError:
pass
self._interim_task = None
async def _interim_loop(self) -> None:
"""Background loop for interim transcriptions."""
import time
while self._running:
try:
await asyncio.sleep(self.interim_interval_ms / 1000)
# Check if we have enough new audio
current_time = time.time()
time_since_last = (current_time - self._last_interim_time) * 1000
if time_since_last >= self.interim_interval_ms:
audio_duration = self.get_audio_duration_ms()
if audio_duration >= self.min_audio_for_interim_ms:
await self.transcribe_buffer(is_final=False)
self._last_interim_time = current_time
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Interim transcription error: {e}")