Files
py-active-call/services/asr.py

148 lines
4.7 KiB
Python

"""ASR (Automatic Speech Recognition) Service implementations.
Provides speech-to-text capabilities with streaming support.
"""
import os
import asyncio
import json
from typing import AsyncIterator, Optional
from loguru import logger
from services.base import BaseASRService, ASRResult, ServiceState
# Try to import websockets for streaming ASR
try:
import websockets
WEBSOCKETS_AVAILABLE = True
except ImportError:
WEBSOCKETS_AVAILABLE = False
class BufferedASRService(BaseASRService):
"""
Buffered ASR service that accumulates audio and provides
a simple text accumulator for use with EOU detection.
This is a lightweight implementation that works with the
existing VAD + EOU pattern without requiring external ASR.
"""
def __init__(
self,
sample_rate: int = 16000,
language: str = "en"
):
super().__init__(sample_rate=sample_rate, language=language)
self._audio_buffer: bytes = b""
self._current_text: str = ""
self._transcript_queue: asyncio.Queue[ASRResult] = asyncio.Queue()
async def connect(self) -> None:
"""No connection needed for buffered ASR."""
self.state = ServiceState.CONNECTED
logger.info("Buffered ASR service connected")
async def disconnect(self) -> None:
"""Clear buffers on disconnect."""
self._audio_buffer = b""
self._current_text = ""
self.state = ServiceState.DISCONNECTED
logger.info("Buffered ASR service disconnected")
async def send_audio(self, audio: bytes) -> None:
"""Buffer audio for later processing."""
self._audio_buffer += audio
async def receive_transcripts(self) -> AsyncIterator[ASRResult]:
"""Yield transcription results."""
while True:
try:
result = await asyncio.wait_for(
self._transcript_queue.get(),
timeout=0.1
)
yield result
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
break
def set_text(self, text: str) -> None:
"""
Set the current transcript text directly.
This allows external integration (e.g., Whisper, other ASR)
to provide transcripts.
"""
self._current_text = text
result = ASRResult(text=text, is_final=False)
asyncio.create_task(self._transcript_queue.put(result))
def get_and_clear_text(self) -> str:
"""Get accumulated text and clear buffer."""
text = self._current_text
self._current_text = ""
self._audio_buffer = b""
return text
def get_audio_buffer(self) -> bytes:
"""Get accumulated audio buffer."""
return self._audio_buffer
def clear_audio_buffer(self) -> None:
"""Clear audio buffer."""
self._audio_buffer = b""
class MockASRService(BaseASRService):
"""
Mock ASR service for testing without actual recognition.
"""
def __init__(self, sample_rate: int = 16000, language: str = "en"):
super().__init__(sample_rate=sample_rate, language=language)
self._transcript_queue: asyncio.Queue[ASRResult] = asyncio.Queue()
self._mock_texts = [
"Hello, how are you?",
"That's interesting.",
"Tell me more about that.",
"I understand.",
]
self._text_index = 0
async def connect(self) -> None:
self.state = ServiceState.CONNECTED
logger.info("Mock ASR service connected")
async def disconnect(self) -> None:
self.state = ServiceState.DISCONNECTED
logger.info("Mock ASR service disconnected")
async def send_audio(self, audio: bytes) -> None:
"""Mock audio processing - generates fake transcripts periodically."""
pass
def trigger_transcript(self) -> None:
"""Manually trigger a transcript (for testing)."""
text = self._mock_texts[self._text_index % len(self._mock_texts)]
self._text_index += 1
result = ASRResult(text=text, is_final=True, confidence=0.95)
asyncio.create_task(self._transcript_queue.put(result))
async def receive_transcripts(self) -> AsyncIterator[ASRResult]:
"""Yield transcription results."""
while True:
try:
result = await asyncio.wait_for(
self._transcript_queue.get(),
timeout=0.1
)
yield result
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
break