"""ASR (Automatic Speech Recognition) Service implementations. Provides speech-to-text capabilities with streaming support. """ import os import asyncio import json from typing import AsyncIterator, Optional from loguru import logger from services.base import BaseASRService, ASRResult, ServiceState # Try to import websockets for streaming ASR try: import websockets WEBSOCKETS_AVAILABLE = True except ImportError: WEBSOCKETS_AVAILABLE = False class BufferedASRService(BaseASRService): """ Buffered ASR service that accumulates audio and provides a simple text accumulator for use with EOU detection. This is a lightweight implementation that works with the existing VAD + EOU pattern without requiring external ASR. """ def __init__( self, sample_rate: int = 16000, language: str = "en" ): super().__init__(sample_rate=sample_rate, language=language) self._audio_buffer: bytes = b"" self._current_text: str = "" self._transcript_queue: asyncio.Queue[ASRResult] = asyncio.Queue() async def connect(self) -> None: """No connection needed for buffered ASR.""" self.state = ServiceState.CONNECTED logger.info("Buffered ASR service connected") async def disconnect(self) -> None: """Clear buffers on disconnect.""" self._audio_buffer = b"" self._current_text = "" self.state = ServiceState.DISCONNECTED logger.info("Buffered ASR service disconnected") async def send_audio(self, audio: bytes) -> None: """Buffer audio for later processing.""" self._audio_buffer += audio async def receive_transcripts(self) -> AsyncIterator[ASRResult]: """Yield transcription results.""" while True: try: result = await asyncio.wait_for( self._transcript_queue.get(), timeout=0.1 ) yield result except asyncio.TimeoutError: continue except asyncio.CancelledError: break def set_text(self, text: str) -> None: """ Set the current transcript text directly. This allows external integration (e.g., Whisper, other ASR) to provide transcripts. """ self._current_text = text result = ASRResult(text=text, is_final=False) asyncio.create_task(self._transcript_queue.put(result)) def get_and_clear_text(self) -> str: """Get accumulated text and clear buffer.""" text = self._current_text self._current_text = "" self._audio_buffer = b"" return text def get_audio_buffer(self) -> bytes: """Get accumulated audio buffer.""" return self._audio_buffer def clear_audio_buffer(self) -> None: """Clear audio buffer.""" self._audio_buffer = b"" class MockASRService(BaseASRService): """ Mock ASR service for testing without actual recognition. """ def __init__(self, sample_rate: int = 16000, language: str = "en"): super().__init__(sample_rate=sample_rate, language=language) self._transcript_queue: asyncio.Queue[ASRResult] = asyncio.Queue() self._mock_texts = [ "Hello, how are you?", "That's interesting.", "Tell me more about that.", "I understand.", ] self._text_index = 0 async def connect(self) -> None: self.state = ServiceState.CONNECTED logger.info("Mock ASR service connected") async def disconnect(self) -> None: self.state = ServiceState.DISCONNECTED logger.info("Mock ASR service disconnected") async def send_audio(self, audio: bytes) -> None: """Mock audio processing - generates fake transcripts periodically.""" pass def trigger_transcript(self) -> None: """Manually trigger a transcript (for testing).""" text = self._mock_texts[self._text_index % len(self._mock_texts)] self._text_index += 1 result = ASRResult(text=text, is_final=True, confidence=0.95) asyncio.create_task(self._transcript_queue.put(result)) async def receive_transcripts(self) -> AsyncIterator[ASRResult]: """Yield transcription results.""" while True: try: result = await asyncio.wait_for( self._transcript_queue.get(), timeout=0.1 ) yield result except asyncio.TimeoutError: continue except asyncio.CancelledError: break