Add multilingual support for Deepgram Flux STT

Enables the flux-general-multi model with one or more language_hints.
Hints are sent as repeatable URL params at connect time and via a
Configure control message when updated mid-stream (detect-then-lock).
TranscriptionFrame.language now reflects the language Flux detected
for each turn via the TurnInfo `languages` field.
This commit is contained in:
Mark Backman
2026-04-17 10:30:45 -04:00
parent 4c19f5584c
commit 6bb4e8295f
4 changed files with 141 additions and 14 deletions

View File

@@ -23,6 +23,7 @@ from pipecat.processors.aggregators.llm_response_universal import (
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.flux.base import DeepgramFluxSTTSettings
from pipecat.services.deepgram.flux.stt import DeepgramFluxSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transcriptions.language import Language
@@ -51,7 +52,16 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramFluxSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
# Start with the multilingual model and broad hints so Flux can auto-detect
# across English and Spanish. TranscriptionFrame.language will reflect
# whichever language Flux detected for each turn.
stt = DeepgramFluxSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
settings=DeepgramFluxSTTService.Settings(
model="flux-general-multi",
language_hints=[Language.EN, Language.ES],
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
@@ -115,10 +125,12 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
)
)
# Detect-then-lock: narrow the hints to a single language mid-stream.
# Sent as a Configure message — no reconnect needed.
await asyncio.sleep(10)
logger.info("Updating Deepgram Flux STT settings: language=es")
logger.info("Updating Deepgram Flux STT settings: language_hints=[es]")
await task.queue_frame(
STTUpdateSettingsFrame(delta=DeepgramFluxSTTService.Settings(language=Language.ES))
STTUpdateSettingsFrame(delta=DeepgramFluxSTTSettings(language_hints=[Language.ES]))
)
@transport.event_handler("on_client_disconnected")

View File

@@ -27,11 +27,59 @@ from pipecat.frames.frames import (
)
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven
from pipecat.services.stt_service import STTService
from pipecat.transcriptions.language import Language
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt
def language_to_deepgram_flux_language(language: Language) -> str | None:
"""Convert a Pipecat Language to a Deepgram Flux language code.
Only honored by the ``flux-general-multi`` model. Locale variants
(e.g. ``Language.EN_GB``) fall back to the base code.
"""
LANGUAGE_MAP = {
Language.DE: "de",
Language.EN: "en",
Language.ES: "es",
Language.FR: "fr",
Language.HI: "hi",
Language.IT: "it",
Language.JA: "ja",
Language.NL: "nl",
Language.PT: "pt",
Language.RU: "ru",
}
return resolve_language(language, LANGUAGE_MAP, use_base_code=True)
def _prepare_language_hints(hints: list[Language] | None) -> list[str]:
"""Convert a list of Pipecat Languages to Deepgram Flux codes.
Drops entries that can't be mapped and deduplicates while preserving order.
"""
if not hints:
return []
seen: set[str] = set()
prepared: list[str] = []
for hint in hints:
code = language_to_deepgram_flux_language(hint)
if code is None or code in seen:
continue
seen.add(code)
prepared.append(code)
return prepared
def _code_to_pipecat_language(code: str) -> Language | None:
"""Convert a Deepgram-returned language code to a Pipecat Language."""
try:
return Language(code)
except ValueError:
logger.debug(f"Unmapped Deepgram Flux detected language code: {code}")
return None
class FluxMessageType(StrEnum):
"""Deepgram Flux WebSocket message types.
@@ -73,6 +121,10 @@ class DeepgramFluxSTTSettings(STTSettings):
confidence (default 5000).
keyterm: Keyterms to boost recognition accuracy for specialized terminology.
min_confidence: Minimum confidence required to create a TranscriptionFrame.
language_hints: Languages to bias transcription toward. Only honored by the
``flux-general-multi`` model. An empty list clears any active hints;
``None``/``NOT_GIVEN`` means no hints (auto-detect). Can be updated
mid-stream via ``STTUpdateSettingsFrame``.
"""
eager_eot_threshold: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
@@ -80,6 +132,7 @@ class DeepgramFluxSTTSettings(STTSettings):
eot_timeout_ms: int | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
keyterm: list | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
min_confidence: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
language_hints: list[Language] | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
class DeepgramFluxSTTBase(STTService):
@@ -93,7 +146,14 @@ class DeepgramFluxSTTBase(STTService):
Settings = DeepgramFluxSTTSettings
_settings: Settings
_CONFIGURE_FIELDS = {"keyterm", "eot_threshold", "eager_eot_threshold", "eot_timeout_ms"}
_CONFIGURE_FIELDS = {
"keyterm",
"eot_threshold",
"eager_eot_threshold",
"eot_timeout_ms",
"language_hints",
}
_MULTILINGUAL_MODEL = "flux-general-multi"
def __init__(
self,
@@ -200,6 +260,18 @@ class DeepgramFluxSTTBase(STTService):
for tag_value in self._tag:
params.append(urlencode({"tag": tag_value}))
# Add language_hint parameters (only valid on flux-general-multi)
hints = self._settings.language_hints
if hints and not isinstance(hints, _NotGiven):
if self._settings.model == self._MULTILINGUAL_MODEL:
for code in _prepare_language_hints(hints):
params.append(urlencode({"language_hint": code}))
else:
logger.warning(
f"language_hints only supported on {self._MULTILINGUAL_MODEL}; "
f"ignoring hints for model {self._settings.model!r}"
)
return "&".join(params)
async def _send_silence(self, duration_secs: float = 0.5):
@@ -266,6 +338,21 @@ class DeepgramFluxSTTBase(STTService):
if thresholds:
message["thresholds"] = thresholds
if "language_hints" in fields:
if self._settings.model != self._MULTILINGUAL_MODEL:
logger.warning(
f"language_hints only supported on {self._MULTILINGUAL_MODEL}; "
f"skipping Configure update for model {self._settings.model!r}"
)
else:
hints = self._settings.language_hints
# Empty list clears hints; NOT_GIVEN/None also treated as clear
# since we only reach this branch when the user set the field.
if hints is None or isinstance(hints, _NotGiven):
message["language_hints"] = []
else:
message["language_hints"] = _prepare_language_hints(hints)
logger.debug(f"{self}: sending Configure message: {message}")
await self._transport_send_json(message)
@@ -281,8 +368,9 @@ class DeepgramFluxSTTBase(STTService):
"""Apply a settings delta.
Configure-able fields (keyterm, eot_threshold, eager_eot_threshold,
eot_timeout_ms) are sent to Deepgram via a Configure message.
Other fields are stored but cannot be applied to the active connection.
eot_timeout_ms, language_hints) are sent to Deepgram via a Configure
message. Other fields are stored but cannot be applied to the active
connection.
"""
changed = await super()._update_settings(delta)
@@ -520,6 +608,16 @@ class DeepgramFluxSTTBase(STTService):
return None
return sum(confidences) / len(confidences)
def _primary_detected_language(self, data: dict[str, Any]) -> Language | None:
"""Extract the primary detected language from a TurnInfo payload.
Only populated by ``flux-general-multi``; returns ``None`` otherwise.
"""
codes = data.get("languages") or []
if not codes:
return None
return _code_to_pipecat_language(codes[0])
async def _handle_end_of_turn(self, transcript: str, data: dict[str, Any]):
"""Handle EndOfTurn events from Deepgram Flux.
@@ -543,6 +641,7 @@ class DeepgramFluxSTTBase(STTService):
# Compute the average confidence
average_confidence = self._calculate_average_confidence(data)
detected_language = self._primary_detected_language(data)
if not self._settings.min_confidence or average_confidence > self._settings.min_confidence:
# EndOfTurn means Flux has determined the turn is complete,
@@ -552,7 +651,7 @@ class DeepgramFluxSTTBase(STTService):
transcript,
self._user_id,
time_now_iso8601(),
self._settings.language,
detected_language,
result=data,
finalized=True,
)
@@ -562,7 +661,7 @@ class DeepgramFluxSTTBase(STTService):
f"Transcription confidence below min_confidence threshold: {average_confidence}"
)
await self._handle_transcription(transcript, True, self._settings.language)
await self._handle_transcription(transcript, True, detected_language)
await self.stop_processing_metrics()
await self.broadcast_frame(UserStoppedSpeakingFrame)
await self._call_event_handler("on_end_of_turn", transcript)
@@ -606,7 +705,7 @@ class DeepgramFluxSTTBase(STTService):
transcript,
self._user_id,
time_now_iso8601(),
self._settings.language,
self._primary_detected_language(data),
result=data,
)
)

View File

@@ -23,7 +23,6 @@ from pipecat.services.deepgram.flux.base import (
DeepgramFluxSTTBase,
DeepgramFluxSTTSettings,
)
from pipecat.transcriptions.language import Language
@dataclass
@@ -112,12 +111,13 @@ class DeepgramFluxSageMakerSTTService(DeepgramFluxSTTBase):
# Initialize default settings
default_settings = self.Settings(
model="flux-general-en",
language=Language.EN,
language=None,
eager_eot_threshold=None,
eot_threshold=None,
eot_timeout_ms=None,
keyterm=[],
min_confidence=None,
language_hints=None,
)
# Apply settings delta

View File

@@ -24,7 +24,6 @@ from pipecat.services.deepgram.flux.base import (
FluxMessageType,
)
from pipecat.services.websocket_service import WebsocketService
from pipecat.transcriptions.language import Language
try:
from websockets.asyncio.client import connect as websocket_connect
@@ -50,6 +49,12 @@ class DeepgramFluxSTTService(DeepgramFluxSTTBase, WebsocketService):
Supports configurable models, VAD events, and various audio processing options
including advanced turn detection and EagerEndOfTurn events for improved conversational AI performance.
For multilingual use, set ``model="flux-general-multi"`` and pass
``language_hints`` to bias detection toward specific languages. Hints can
be updated mid-stream via ``STTUpdateSettingsFrame`` (e.g. to implement a
detect-then-lock flow). ``TranscriptionFrame.language`` reflects whichever
language Flux detected for each turn.
Event handlers available (in addition to base events):
- on_start_of_turn(service, transcript): Deepgram detected start of speech
@@ -156,6 +161,16 @@ class DeepgramFluxSTTService(DeepgramFluxSTTBase, WebsocketService):
tag=["production", "voice-agent"],
),
)
Multilingual usage with language hints::
stt = DeepgramFluxSTTService(
api_key="your-api-key",
settings=DeepgramFluxSTTService.Settings(
model="flux-general-multi",
language_hints=[Language.EN, Language.ES],
),
)
"""
# Note: For DeepgramFluxSTTService, differently from other processes, we need to create
# the _receive_task inside _connect_websocket, because the websocket should only be
@@ -171,12 +186,13 @@ class DeepgramFluxSTTService(DeepgramFluxSTTBase, WebsocketService):
# 1. Initialize default_settings with hardcoded defaults
default_settings = self.Settings(
model="flux-general-en",
language=Language.EN,
language=None,
eager_eot_threshold=None,
eot_threshold=None,
eot_timeout_ms=None,
keyterm=[],
min_confidence=None,
language_hints=None,
)
# 2. Apply direct init arg overrides (deprecated)