Files
AI-VideoAssistant/engine/runtime/ports/asr.py
Xin Wang da38157638 Add ASR interim results support in Assistant model and API
- Introduced `asr_interim_enabled` field in the Assistant model to control interim ASR results.
- Updated AssistantBase and AssistantUpdate schemas to include the new field.
- Modified the database schema to add the `asr_interim_enabled` column.
- Enhanced runtime metadata to reflect interim ASR settings.
- Updated API endpoints and tests to validate the new functionality.
- Adjusted documentation to include details about interim ASR results configuration.
2026-03-06 12:58:54 +08:00

86 lines
2.4 KiB
Python

"""ASR extension port contracts."""
from __future__ import annotations
from dataclasses import dataclass
from typing import AsyncIterator, Awaitable, Callable, Literal, Optional, Protocol
from providers.common.base import ASRResult
TranscriptCallback = Callable[[str, bool], Awaitable[None]]
ASRMode = Literal["offline", "streaming"]
@dataclass(frozen=True)
class ASRServiceSpec:
"""Resolved runtime configuration for ASR service creation."""
provider: str
sample_rate: int
mode: Optional[ASRMode] = None
language: str = "auto"
api_key: Optional[str] = None
api_url: Optional[str] = None
model: Optional[str] = None
enable_interim: bool = False
interim_interval_ms: int = 500
min_audio_for_interim_ms: int = 300
on_transcript: Optional[TranscriptCallback] = None
class ASRPort(Protocol):
"""Port for speech recognition providers."""
mode: ASRMode
async def connect(self) -> None:
"""Establish connection to ASR provider."""
async def disconnect(self) -> None:
"""Release ASR resources."""
async def send_audio(self, audio: bytes) -> None:
"""Push one PCM audio chunk for recognition."""
async def receive_transcripts(self) -> AsyncIterator[ASRResult]:
"""Stream partial/final recognition results."""
class OfflineASRPort(ASRPort, Protocol):
"""Port for offline/buffered ASR providers."""
mode: Literal["offline"]
async def start_interim_transcription(self) -> None:
"""Start interim transcription loop."""
async def stop_interim_transcription(self) -> None:
"""Stop interim transcription loop."""
def clear_buffer(self) -> None:
"""Clear provider-side ASR buffer."""
async def get_final_transcription(self) -> str:
"""Return final transcription for the current utterance."""
def get_and_clear_text(self) -> str:
"""Return buffered text and clear internal state."""
class StreamingASRPort(ASRPort, Protocol):
"""Port for streaming ASR providers."""
mode: Literal["streaming"]
async def begin_utterance(self) -> None:
"""Start a new utterance stream."""
async def end_utterance(self) -> None:
"""Signal end of current utterance stream."""
async def wait_for_final_transcription(self, timeout_ms: int = 800) -> str:
"""Wait for final transcript after utterance end."""
def clear_utterance(self) -> None:
"""Reset utterance-local state."""