From 6bb4e8295f71e7551b93e29d8fd980df9c0d918e Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Fri, 17 Apr 2026 10:30:45 -0400 Subject: [PATCH] Add multilingual support for Deepgram Flux STT Enables the flux-general-multi model with one or more language_hints. Hints are sent as repeatable URL params at connect time and via a Configure control message when updated mid-stream (detect-then-lock). TranscriptionFrame.language now reflects the language Flux detected for each turn via the TurnInfo `languages` field. --- .../update-settings/stt/stt-deepgram-flux.py | 18 ++- src/pipecat/services/deepgram/flux/base.py | 113 ++++++++++++++++-- .../services/deepgram/flux/sagemaker/stt.py | 4 +- src/pipecat/services/deepgram/flux/stt.py | 20 +++- 4 files changed, 141 insertions(+), 14 deletions(-) diff --git a/examples/update-settings/stt/stt-deepgram-flux.py b/examples/update-settings/stt/stt-deepgram-flux.py index 24bb8b7ad..82e60ce67 100644 --- a/examples/update-settings/stt/stt-deepgram-flux.py +++ b/examples/update-settings/stt/stt-deepgram-flux.py @@ -23,6 +23,7 @@ from pipecat.processors.aggregators.llm_response_universal import ( from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.flux.base import DeepgramFluxSTTSettings from pipecat.services.deepgram.flux.stt import DeepgramFluxSTTService from pipecat.services.openai.llm import OpenAILLMService from pipecat.transcriptions.language import Language @@ -51,7 +52,16 @@ transport_params = { async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info(f"Starting bot") - stt = DeepgramFluxSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + # Start with the multilingual model and broad hints so Flux can auto-detect + # across English and Spanish. TranscriptionFrame.language will reflect + # whichever language Flux detected for each turn. + stt = DeepgramFluxSTTService( + api_key=os.getenv("DEEPGRAM_API_KEY"), + settings=DeepgramFluxSTTService.Settings( + model="flux-general-multi", + language_hints=[Language.EN, Language.ES], + ), + ) tts = CartesiaTTSService( api_key=os.getenv("CARTESIA_API_KEY"), @@ -115,10 +125,12 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ) ) + # Detect-then-lock: narrow the hints to a single language mid-stream. + # Sent as a Configure message — no reconnect needed. await asyncio.sleep(10) - logger.info("Updating Deepgram Flux STT settings: language=es") + logger.info("Updating Deepgram Flux STT settings: language_hints=[es]") await task.queue_frame( - STTUpdateSettingsFrame(delta=DeepgramFluxSTTService.Settings(language=Language.ES)) + STTUpdateSettingsFrame(delta=DeepgramFluxSTTSettings(language_hints=[Language.ES])) ) @transport.event_handler("on_client_disconnected") diff --git a/src/pipecat/services/deepgram/flux/base.py b/src/pipecat/services/deepgram/flux/base.py index baefbd060..089b5525a 100644 --- a/src/pipecat/services/deepgram/flux/base.py +++ b/src/pipecat/services/deepgram/flux/base.py @@ -27,11 +27,59 @@ from pipecat.frames.frames import ( ) from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven from pipecat.services.stt_service import STTService -from pipecat.transcriptions.language import Language +from pipecat.transcriptions.language import Language, resolve_language from pipecat.utils.time import time_now_iso8601 from pipecat.utils.tracing.service_decorators import traced_stt +def language_to_deepgram_flux_language(language: Language) -> str | None: + """Convert a Pipecat Language to a Deepgram Flux language code. + + Only honored by the ``flux-general-multi`` model. Locale variants + (e.g. ``Language.EN_GB``) fall back to the base code. + """ + LANGUAGE_MAP = { + Language.DE: "de", + Language.EN: "en", + Language.ES: "es", + Language.FR: "fr", + Language.HI: "hi", + Language.IT: "it", + Language.JA: "ja", + Language.NL: "nl", + Language.PT: "pt", + Language.RU: "ru", + } + return resolve_language(language, LANGUAGE_MAP, use_base_code=True) + + +def _prepare_language_hints(hints: list[Language] | None) -> list[str]: + """Convert a list of Pipecat Languages to Deepgram Flux codes. + + Drops entries that can't be mapped and deduplicates while preserving order. + """ + if not hints: + return [] + seen: set[str] = set() + prepared: list[str] = [] + for hint in hints: + code = language_to_deepgram_flux_language(hint) + if code is None or code in seen: + continue + seen.add(code) + prepared.append(code) + return prepared + + +def _code_to_pipecat_language(code: str) -> Language | None: + """Convert a Deepgram-returned language code to a Pipecat Language.""" + try: + return Language(code) + except ValueError: + logger.debug(f"Unmapped Deepgram Flux detected language code: {code}") + return None + + class FluxMessageType(StrEnum): """Deepgram Flux WebSocket message types. @@ -73,6 +121,10 @@ class DeepgramFluxSTTSettings(STTSettings): confidence (default 5000). keyterm: Keyterms to boost recognition accuracy for specialized terminology. min_confidence: Minimum confidence required to create a TranscriptionFrame. + language_hints: Languages to bias transcription toward. Only honored by the + ``flux-general-multi`` model. An empty list clears any active hints; + ``None``/``NOT_GIVEN`` means no hints (auto-detect). Can be updated + mid-stream via ``STTUpdateSettingsFrame``. """ eager_eot_threshold: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) @@ -80,6 +132,7 @@ class DeepgramFluxSTTSettings(STTSettings): eot_timeout_ms: int | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) keyterm: list | _NotGiven = field(default_factory=lambda: NOT_GIVEN) min_confidence: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) + language_hints: list[Language] | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) class DeepgramFluxSTTBase(STTService): @@ -93,7 +146,14 @@ class DeepgramFluxSTTBase(STTService): Settings = DeepgramFluxSTTSettings _settings: Settings - _CONFIGURE_FIELDS = {"keyterm", "eot_threshold", "eager_eot_threshold", "eot_timeout_ms"} + _CONFIGURE_FIELDS = { + "keyterm", + "eot_threshold", + "eager_eot_threshold", + "eot_timeout_ms", + "language_hints", + } + _MULTILINGUAL_MODEL = "flux-general-multi" def __init__( self, @@ -200,6 +260,18 @@ class DeepgramFluxSTTBase(STTService): for tag_value in self._tag: params.append(urlencode({"tag": tag_value})) + # Add language_hint parameters (only valid on flux-general-multi) + hints = self._settings.language_hints + if hints and not isinstance(hints, _NotGiven): + if self._settings.model == self._MULTILINGUAL_MODEL: + for code in _prepare_language_hints(hints): + params.append(urlencode({"language_hint": code})) + else: + logger.warning( + f"language_hints only supported on {self._MULTILINGUAL_MODEL}; " + f"ignoring hints for model {self._settings.model!r}" + ) + return "&".join(params) async def _send_silence(self, duration_secs: float = 0.5): @@ -266,6 +338,21 @@ class DeepgramFluxSTTBase(STTService): if thresholds: message["thresholds"] = thresholds + if "language_hints" in fields: + if self._settings.model != self._MULTILINGUAL_MODEL: + logger.warning( + f"language_hints only supported on {self._MULTILINGUAL_MODEL}; " + f"skipping Configure update for model {self._settings.model!r}" + ) + else: + hints = self._settings.language_hints + # Empty list clears hints; NOT_GIVEN/None also treated as clear + # since we only reach this branch when the user set the field. + if hints is None or isinstance(hints, _NotGiven): + message["language_hints"] = [] + else: + message["language_hints"] = _prepare_language_hints(hints) + logger.debug(f"{self}: sending Configure message: {message}") await self._transport_send_json(message) @@ -281,8 +368,9 @@ class DeepgramFluxSTTBase(STTService): """Apply a settings delta. Configure-able fields (keyterm, eot_threshold, eager_eot_threshold, - eot_timeout_ms) are sent to Deepgram via a Configure message. - Other fields are stored but cannot be applied to the active connection. + eot_timeout_ms, language_hints) are sent to Deepgram via a Configure + message. Other fields are stored but cannot be applied to the active + connection. """ changed = await super()._update_settings(delta) @@ -520,6 +608,16 @@ class DeepgramFluxSTTBase(STTService): return None return sum(confidences) / len(confidences) + def _primary_detected_language(self, data: dict[str, Any]) -> Language | None: + """Extract the primary detected language from a TurnInfo payload. + + Only populated by ``flux-general-multi``; returns ``None`` otherwise. + """ + codes = data.get("languages") or [] + if not codes: + return None + return _code_to_pipecat_language(codes[0]) + async def _handle_end_of_turn(self, transcript: str, data: dict[str, Any]): """Handle EndOfTurn events from Deepgram Flux. @@ -543,6 +641,7 @@ class DeepgramFluxSTTBase(STTService): # Compute the average confidence average_confidence = self._calculate_average_confidence(data) + detected_language = self._primary_detected_language(data) if not self._settings.min_confidence or average_confidence > self._settings.min_confidence: # EndOfTurn means Flux has determined the turn is complete, @@ -552,7 +651,7 @@ class DeepgramFluxSTTBase(STTService): transcript, self._user_id, time_now_iso8601(), - self._settings.language, + detected_language, result=data, finalized=True, ) @@ -562,7 +661,7 @@ class DeepgramFluxSTTBase(STTService): f"Transcription confidence below min_confidence threshold: {average_confidence}" ) - await self._handle_transcription(transcript, True, self._settings.language) + await self._handle_transcription(transcript, True, detected_language) await self.stop_processing_metrics() await self.broadcast_frame(UserStoppedSpeakingFrame) await self._call_event_handler("on_end_of_turn", transcript) @@ -606,7 +705,7 @@ class DeepgramFluxSTTBase(STTService): transcript, self._user_id, time_now_iso8601(), - self._settings.language, + self._primary_detected_language(data), result=data, ) ) diff --git a/src/pipecat/services/deepgram/flux/sagemaker/stt.py b/src/pipecat/services/deepgram/flux/sagemaker/stt.py index da61b169a..94738fc9c 100644 --- a/src/pipecat/services/deepgram/flux/sagemaker/stt.py +++ b/src/pipecat/services/deepgram/flux/sagemaker/stt.py @@ -23,7 +23,6 @@ from pipecat.services.deepgram.flux.base import ( DeepgramFluxSTTBase, DeepgramFluxSTTSettings, ) -from pipecat.transcriptions.language import Language @dataclass @@ -112,12 +111,13 @@ class DeepgramFluxSageMakerSTTService(DeepgramFluxSTTBase): # Initialize default settings default_settings = self.Settings( model="flux-general-en", - language=Language.EN, + language=None, eager_eot_threshold=None, eot_threshold=None, eot_timeout_ms=None, keyterm=[], min_confidence=None, + language_hints=None, ) # Apply settings delta diff --git a/src/pipecat/services/deepgram/flux/stt.py b/src/pipecat/services/deepgram/flux/stt.py index 5b0b16472..6874b1b69 100644 --- a/src/pipecat/services/deepgram/flux/stt.py +++ b/src/pipecat/services/deepgram/flux/stt.py @@ -24,7 +24,6 @@ from pipecat.services.deepgram.flux.base import ( FluxMessageType, ) from pipecat.services.websocket_service import WebsocketService -from pipecat.transcriptions.language import Language try: from websockets.asyncio.client import connect as websocket_connect @@ -50,6 +49,12 @@ class DeepgramFluxSTTService(DeepgramFluxSTTBase, WebsocketService): Supports configurable models, VAD events, and various audio processing options including advanced turn detection and EagerEndOfTurn events for improved conversational AI performance. + For multilingual use, set ``model="flux-general-multi"`` and pass + ``language_hints`` to bias detection toward specific languages. Hints can + be updated mid-stream via ``STTUpdateSettingsFrame`` (e.g. to implement a + detect-then-lock flow). ``TranscriptionFrame.language`` reflects whichever + language Flux detected for each turn. + Event handlers available (in addition to base events): - on_start_of_turn(service, transcript): Deepgram detected start of speech @@ -156,6 +161,16 @@ class DeepgramFluxSTTService(DeepgramFluxSTTBase, WebsocketService): tag=["production", "voice-agent"], ), ) + + Multilingual usage with language hints:: + + stt = DeepgramFluxSTTService( + api_key="your-api-key", + settings=DeepgramFluxSTTService.Settings( + model="flux-general-multi", + language_hints=[Language.EN, Language.ES], + ), + ) """ # Note: For DeepgramFluxSTTService, differently from other processes, we need to create # the _receive_task inside _connect_websocket, because the websocket should only be @@ -171,12 +186,13 @@ class DeepgramFluxSTTService(DeepgramFluxSTTBase, WebsocketService): # 1. Initialize default_settings with hardcoded defaults default_settings = self.Settings( model="flux-general-en", - language=Language.EN, + language=None, eager_eot_threshold=None, eot_threshold=None, eot_timeout_ms=None, keyterm=[], min_confidence=None, + language_hints=None, ) # 2. Apply direct init arg overrides (deprecated)