Unflatten LiveOptions back into a single live_options field on DeepgramSTTSettings and DeepgramSageMakerSTTSettings; add apply_update override with delta-merge semantics and from_mapping override for backward-compatible dict-style updates
This commit is contained in:
@@ -7,6 +7,7 @@
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
from deepgram import LiveOptions
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
@@ -106,10 +107,16 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
# NOTE: after this change, the bot will only respond if you speak Spanish
|
||||
await asyncio.sleep(10)
|
||||
logger.info("Updating Deepgram SageMaker STT settings: language=es")
|
||||
logger.info("Updating Deepgram SageMaker STT settings: language=es, punctuate=False")
|
||||
await task.queue_frame(
|
||||
STTUpdateSettingsFrame(delta=DeepgramSageMakerSTTSettings(language=Language.ES))
|
||||
STTUpdateSettingsFrame(
|
||||
delta=DeepgramSageMakerSTTSettings(
|
||||
language=Language.ES,
|
||||
live_options=LiveOptions(punctuate=False),
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
# Old-style dict update (for backward-compat testing):
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
from deepgram import LiveOptions
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
@@ -100,10 +101,16 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
# NOTE: after this change, the bot will only respond if you speak Spanish
|
||||
await asyncio.sleep(10)
|
||||
logger.info("Updating Deepgram STT settings: language=es")
|
||||
logger.info("Updating Deepgram STT settings: language=es, punctuate=False")
|
||||
await task.queue_frame(
|
||||
STTUpdateSettingsFrame(delta=DeepgramSTTSettings(language=Language.ES))
|
||||
STTUpdateSettingsFrame(
|
||||
delta=DeepgramSTTSettings(
|
||||
language=Language.ES,
|
||||
live_options=LiveOptions(punctuate=False),
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
# Old-style dict update (for backward-compat testing):
|
||||
|
||||
@@ -7,8 +7,8 @@
|
||||
"""Deepgram speech-to-text service implementation."""
|
||||
|
||||
import inspect
|
||||
from dataclasses import dataclass, field, fields
|
||||
from typing import Any, AsyncGenerator, Dict, Optional
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, AsyncGenerator, Dict, Mapping, Optional, Type
|
||||
|
||||
from loguru import logger
|
||||
|
||||
@@ -25,7 +25,7 @@ from pipecat.frames.frames import (
|
||||
VADUserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven
|
||||
from pipecat.services.settings import _S, NOT_GIVEN, STTSettings, _NotGiven, is_given
|
||||
from pipecat.services.stt_latency import DEEPGRAM_TTFS_P99
|
||||
from pipecat.services.stt_service import STTService
|
||||
from pipecat.transcriptions.language import Language
|
||||
@@ -52,32 +52,117 @@ except ModuleNotFoundError as e:
|
||||
class DeepgramSTTSettings(STTSettings):
|
||||
"""Settings for the Deepgram STT service.
|
||||
|
||||
Some commonly used ``LiveOptions`` fields are declared as top-level
|
||||
fields here so they can be updated individually via
|
||||
``STTUpdateSettingsFrame``. Any *additional* ``LiveOptions`` fields
|
||||
(e.g. ``filler_words``, ``diarize``, ``utterance_end_ms``) can be
|
||||
passed through the ``extra`` dict — they will be forwarded to
|
||||
``LiveOptions`` when the WebSocket connection is (re)established.
|
||||
This keeps the settings class future-proof: new Deepgram features work
|
||||
without code changes on the Pipecat side.
|
||||
Wraps the Deepgram SDK's ``LiveOptions`` in a single ``live_options``
|
||||
field. All Deepgram-specific options (``filler_words``, ``diarize``,
|
||||
``utterance_end_ms``, etc.) should be passed directly via
|
||||
``LiveOptions``.
|
||||
|
||||
In **delta mode** (i.e. when carried by ``STTUpdateSettingsFrame``),
|
||||
``live_options`` is treated as a **delta** — its non-None fields are
|
||||
merged into the stored ``LiveOptions``, not replaced wholesale. For
|
||||
example, ``DeepgramSTTSettings(live_options=LiveOptions(punctuate=False))``
|
||||
changes only ``punctuate`` and leaves all other options intact.
|
||||
|
||||
Parameters:
|
||||
encoding: Audio encoding format (e.g. ``"linear16"``).
|
||||
channels: Number of audio channels.
|
||||
interim_results: Whether to return interim transcription results.
|
||||
smart_format: Whether to enable Deepgram smart formatting.
|
||||
punctuate: Whether to add punctuation to transcripts.
|
||||
profanity_filter: Whether to filter profanity from transcripts.
|
||||
vad_events: Whether to enable Deepgram VAD events (deprecated).
|
||||
live_options: Deepgram ``LiveOptions`` for STT configuration.
|
||||
In delta mode only its non-None fields are merged into the
|
||||
stored options.
|
||||
"""
|
||||
|
||||
encoding: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
channels: int | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
interim_results: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
smart_format: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
punctuate: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
profanity_filter: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
vad_events: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
live_options: LiveOptions | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
|
||||
# Valid LiveOptions __init__ parameter names (cached at class level).
|
||||
_live_options_params: set[str] | None = field(default=None, init=False, repr=False)
|
||||
|
||||
@classmethod
|
||||
def _get_live_options_params(cls) -> set[str]:
|
||||
"""Return the set of valid ``LiveOptions.__init__`` parameter names."""
|
||||
if cls._live_options_params is None:
|
||||
cls._live_options_params = set(inspect.signature(LiveOptions.__init__).parameters) - {
|
||||
"self"
|
||||
}
|
||||
return cls._live_options_params
|
||||
|
||||
def apply_update(self: _S, delta: _S) -> Dict[str, Any]:
|
||||
"""Merge a delta into this store, with delta-merge for ``live_options``.
|
||||
|
||||
``live_options`` is merged field-by-field (non-None fields from the
|
||||
delta overwrite corresponding fields in the stored options) rather
|
||||
than being replaced wholesale.
|
||||
|
||||
``model`` and ``language`` are kept in sync bidirectionally between
|
||||
the top-level settings fields and ``live_options``.
|
||||
"""
|
||||
# Pull live_options out of the delta so super() doesn't replace it.
|
||||
delta_lo = getattr(delta, "live_options", NOT_GIVEN)
|
||||
if is_given(delta_lo):
|
||||
delta.live_options = NOT_GIVEN # type: ignore[assignment]
|
||||
|
||||
# Let the base class handle model, language, extra.
|
||||
changed = super().apply_update(delta)
|
||||
|
||||
# Sync top-level model/language changes into stored live_options.
|
||||
if "model" in changed:
|
||||
self.live_options.model = self.model # type: ignore[union-attr]
|
||||
if "language" in changed:
|
||||
self.live_options.language = self.language # type: ignore[union-attr]
|
||||
|
||||
# Merge live_options delta.
|
||||
if is_given(delta_lo):
|
||||
old_dict = self.live_options.to_dict() # type: ignore[union-attr]
|
||||
delta_dict = delta_lo.to_dict()
|
||||
|
||||
if delta_dict:
|
||||
merged = {**old_dict, **delta_dict}
|
||||
self.live_options = LiveOptions(**merged)
|
||||
|
||||
for key in delta_dict:
|
||||
old_val = old_dict.get(key, NOT_GIVEN)
|
||||
if old_val != delta_dict[key]:
|
||||
changed[key] = old_val
|
||||
|
||||
# Sync model/language from live_options delta to top-level.
|
||||
if "model" in delta_dict and delta_dict["model"] != self.model:
|
||||
changed.setdefault("model", self.model)
|
||||
self.model = delta_dict["model"]
|
||||
if "language" in delta_dict and delta_dict["language"] != self.language:
|
||||
changed.setdefault("language", self.language)
|
||||
self.language = delta_dict["language"]
|
||||
|
||||
return changed
|
||||
|
||||
@classmethod
|
||||
def from_mapping(cls: Type[_S], settings: Mapping[str, Any]) -> _S:
|
||||
"""Build a delta from a plain dict, routing LiveOptions keys correctly.
|
||||
|
||||
Keys that are valid ``LiveOptions.__init__`` parameters (and not
|
||||
top-level ``STTSettings`` fields like ``model`` / ``language``) are
|
||||
collected into a ``LiveOptions`` object. ``model`` and ``language``
|
||||
are routed to the top-level settings fields. Truly unknown keys go
|
||||
to ``extra``.
|
||||
"""
|
||||
lo_params = cls._get_live_options_params()
|
||||
stt_field_names = {"model", "language"}
|
||||
|
||||
kwargs: Dict[str, Any] = {}
|
||||
lo_kwargs: Dict[str, Any] = {}
|
||||
extra: Dict[str, Any] = {}
|
||||
|
||||
for key, value in settings.items():
|
||||
canonical = cls._aliases.get(key, key)
|
||||
if canonical in stt_field_names:
|
||||
kwargs[canonical] = value
|
||||
elif canonical in lo_params:
|
||||
lo_kwargs[canonical] = value
|
||||
else:
|
||||
extra[key] = value
|
||||
|
||||
if lo_kwargs:
|
||||
kwargs["live_options"] = LiveOptions(**lo_kwargs)
|
||||
|
||||
instance = cls(**kwargs)
|
||||
instance.extra = extra
|
||||
return instance
|
||||
|
||||
|
||||
class DeepgramSTTService(STTService):
|
||||
@@ -124,7 +209,9 @@ class DeepgramSTTService(STTService):
|
||||
|
||||
base_url: Custom Deepgram API base URL.
|
||||
sample_rate: Audio sample rate. If None, uses default or live_options value.
|
||||
live_options: Deepgram LiveOptions for detailed configuration.
|
||||
live_options: Deepgram LiveOptions configuration. Treated as a
|
||||
delta from a set of sensible defaults — only the fields you
|
||||
set are overridden; all others keep their default values.
|
||||
addons: Additional Deepgram features to enable.
|
||||
should_interrupt: Determine whether the bot should be interrupted when Deepgram VAD events are enabled and the system detects that the user is speaking.
|
||||
|
||||
@@ -163,29 +250,26 @@ class DeepgramSTTService(STTService):
|
||||
vad_events=False,
|
||||
)
|
||||
|
||||
merged_options = default_options.to_dict()
|
||||
merged_dict = default_options.to_dict()
|
||||
if live_options:
|
||||
default_model = default_options.model
|
||||
merged_options.update(live_options.to_dict())
|
||||
# NOTE(aleix): Fixes an in deepgram-sdk where `model` is initialized
|
||||
merged_dict.update(live_options.to_dict())
|
||||
# NOTE(aleix): Fixes a bug in deepgram-sdk where `model` is initialized
|
||||
# to the string "None" instead of the value `None`.
|
||||
if "model" in merged_options and merged_options["model"] == "None":
|
||||
merged_options["model"] = default_model
|
||||
if "model" in merged_dict and merged_dict["model"] == "None":
|
||||
merged_dict["model"] = default_model
|
||||
|
||||
if "language" in merged_options and isinstance(merged_options["language"], Language):
|
||||
merged_options["language"] = merged_options["language"].value
|
||||
if "language" in merged_dict and isinstance(merged_dict["language"], Language):
|
||||
merged_dict["language"] = merged_dict["language"].value
|
||||
|
||||
settings_fields = {f.name for f in fields(DeepgramSTTSettings)}
|
||||
settings_kwargs = {}
|
||||
extra = {}
|
||||
for key, value in merged_options.items():
|
||||
if key in settings_fields:
|
||||
settings_kwargs[key] = value
|
||||
else:
|
||||
extra[key] = value
|
||||
# Extract model/language for top-level STTSettings fields; everything
|
||||
# else lives inside LiveOptions.
|
||||
model = merged_dict.pop("model", None)
|
||||
language = merged_dict.pop("language", None)
|
||||
|
||||
settings = DeepgramSTTSettings(**settings_kwargs)
|
||||
settings.extra = extra
|
||||
settings = DeepgramSTTSettings(
|
||||
model=model, language=language, live_options=LiveOptions(**merged_dict)
|
||||
)
|
||||
|
||||
super().__init__(
|
||||
sample_rate=sample_rate,
|
||||
@@ -197,7 +281,7 @@ class DeepgramSTTService(STTService):
|
||||
self._addons = addons
|
||||
self._should_interrupt = should_interrupt
|
||||
|
||||
if self._settings.vad_events:
|
||||
if self._settings.live_options.vad_events:
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
@@ -228,7 +312,7 @@ class DeepgramSTTService(STTService):
|
||||
Returns:
|
||||
True if VAD events are enabled in the current settings.
|
||||
"""
|
||||
return self._settings.vad_events
|
||||
return self._settings.live_options.vad_events
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
"""Check if this service can generate processing metrics.
|
||||
@@ -290,32 +374,17 @@ class DeepgramSTTService(STTService):
|
||||
yield None
|
||||
|
||||
def _build_live_options(self) -> LiveOptions:
|
||||
"""Build a ``LiveOptions`` from flat settings fields, sample rate, and extras.
|
||||
"""Build a ``LiveOptions`` from stored settings and sample rate.
|
||||
|
||||
Returns:
|
||||
A fully-populated ``LiveOptions`` ready for the Deepgram SDK.
|
||||
"""
|
||||
valid_kwargs = set(inspect.signature(LiveOptions.__init__).parameters) - {"self"}
|
||||
opts: dict[str, Any] = self._settings.live_options.to_dict()
|
||||
|
||||
# Start with extras that are valid LiveOptions kwargs.
|
||||
opts: dict[str, Any] = {k: v for k, v in self._settings.extra.items() if k in valid_kwargs}
|
||||
|
||||
# Override with flat settings fields (these take precedence).
|
||||
s = self._settings
|
||||
opts.update(
|
||||
{
|
||||
"model": s.model,
|
||||
"language": s.language,
|
||||
"encoding": s.encoding,
|
||||
"channels": s.channels,
|
||||
"interim_results": s.interim_results,
|
||||
"smart_format": s.smart_format,
|
||||
"punctuate": s.punctuate,
|
||||
"profanity_filter": s.profanity_filter,
|
||||
"vad_events": s.vad_events,
|
||||
"sample_rate": self.sample_rate,
|
||||
}
|
||||
)
|
||||
# Overlay model/language from top-level settings and sample_rate from service.
|
||||
opts["model"] = self._settings.model
|
||||
opts["language"] = self._settings.language
|
||||
opts["sample_rate"] = self.sample_rate
|
||||
|
||||
return LiveOptions(**opts)
|
||||
|
||||
|
||||
@@ -15,8 +15,8 @@ languages, and various Deepgram features.
|
||||
import asyncio
|
||||
import inspect
|
||||
import json
|
||||
from dataclasses import dataclass, field, fields
|
||||
from typing import Any, AsyncGenerator, Optional
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, AsyncGenerator, Dict, Mapping, Optional, Type
|
||||
|
||||
from loguru import logger
|
||||
|
||||
@@ -33,7 +33,7 @@ from pipecat.frames.frames import (
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.aws.sagemaker.bidi_client import SageMakerBidiClient
|
||||
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven
|
||||
from pipecat.services.settings import _S, NOT_GIVEN, STTSettings, _NotGiven, is_given
|
||||
from pipecat.services.stt_latency import DEEPGRAM_SAGEMAKER_TTFS_P99
|
||||
from pipecat.services.stt_service import STTService
|
||||
from pipecat.transcriptions.language import Language
|
||||
@@ -54,32 +54,117 @@ except ModuleNotFoundError as e:
|
||||
class DeepgramSageMakerSTTSettings(STTSettings):
|
||||
"""Settings for the Deepgram SageMaker STT service.
|
||||
|
||||
Some commonly used ``LiveOptions`` fields are declared as top-level
|
||||
fields here so they can be updated individually via
|
||||
``STTUpdateSettingsFrame``. Any *additional* ``LiveOptions`` fields
|
||||
(e.g. ``filler_words``, ``diarize``, ``utterance_end_ms``) can be
|
||||
passed through the ``extra`` dict — they will be forwarded to
|
||||
``LiveOptions`` when the connection is (re)established. This keeps the
|
||||
settings class future-proof: new Deepgram features work without code
|
||||
changes on the Pipecat side.
|
||||
Wraps the Deepgram SDK's ``LiveOptions`` in a single ``live_options``
|
||||
field. All Deepgram-specific options (``filler_words``, ``diarize``,
|
||||
``utterance_end_ms``, etc.) should be passed directly via
|
||||
``LiveOptions``.
|
||||
|
||||
In **delta mode** (i.e. when carried by ``STTUpdateSettingsFrame``),
|
||||
``live_options`` is treated as a **delta** — its non-None fields are
|
||||
merged into the stored ``LiveOptions``, not replaced wholesale. For
|
||||
example, ``DeepgramSageMakerSTTSettings(live_options=LiveOptions(punctuate=False))``
|
||||
changes only ``punctuate`` and leaves all other options intact.
|
||||
|
||||
Parameters:
|
||||
encoding: Audio encoding format (e.g. ``"linear16"``).
|
||||
channels: Number of audio channels.
|
||||
interim_results: Whether to return interim transcription results.
|
||||
smart_format: Whether to enable Deepgram smart formatting.
|
||||
punctuate: Whether to add punctuation to transcripts.
|
||||
profanity_filter: Whether to filter profanity from transcripts.
|
||||
vad_events: Whether to enable Deepgram VAD events (deprecated).
|
||||
live_options: Deepgram ``LiveOptions`` for STT configuration.
|
||||
In delta mode only its non-None fields are merged into the
|
||||
stored options.
|
||||
"""
|
||||
|
||||
encoding: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
channels: int | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
interim_results: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
smart_format: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
punctuate: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
profanity_filter: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
vad_events: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
live_options: LiveOptions | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
|
||||
# Valid LiveOptions __init__ parameter names (cached at class level).
|
||||
_live_options_params: set[str] | None = field(default=None, init=False, repr=False)
|
||||
|
||||
@classmethod
|
||||
def _get_live_options_params(cls) -> set[str]:
|
||||
"""Return the set of valid ``LiveOptions.__init__`` parameter names."""
|
||||
if cls._live_options_params is None:
|
||||
cls._live_options_params = set(inspect.signature(LiveOptions.__init__).parameters) - {
|
||||
"self"
|
||||
}
|
||||
return cls._live_options_params
|
||||
|
||||
def apply_update(self: _S, delta: _S) -> Dict[str, Any]:
|
||||
"""Merge a delta into this store, with delta-merge for ``live_options``.
|
||||
|
||||
``live_options`` is merged field-by-field (non-None fields from the
|
||||
delta overwrite corresponding fields in the stored options) rather
|
||||
than being replaced wholesale.
|
||||
|
||||
``model`` and ``language`` are kept in sync bidirectionally between
|
||||
the top-level settings fields and ``live_options``.
|
||||
"""
|
||||
# Pull live_options out of the delta so super() doesn't replace it.
|
||||
delta_lo = getattr(delta, "live_options", NOT_GIVEN)
|
||||
if is_given(delta_lo):
|
||||
delta.live_options = NOT_GIVEN # type: ignore[assignment]
|
||||
|
||||
# Let the base class handle model, language, extra.
|
||||
changed = super().apply_update(delta)
|
||||
|
||||
# Sync top-level model/language changes into stored live_options.
|
||||
if "model" in changed:
|
||||
self.live_options.model = self.model # type: ignore[union-attr]
|
||||
if "language" in changed:
|
||||
self.live_options.language = self.language # type: ignore[union-attr]
|
||||
|
||||
# Merge live_options delta.
|
||||
if is_given(delta_lo):
|
||||
old_dict = self.live_options.to_dict() # type: ignore[union-attr]
|
||||
delta_dict = delta_lo.to_dict()
|
||||
|
||||
if delta_dict:
|
||||
merged = {**old_dict, **delta_dict}
|
||||
self.live_options = LiveOptions(**merged)
|
||||
|
||||
for key in delta_dict:
|
||||
old_val = old_dict.get(key, NOT_GIVEN)
|
||||
if old_val != delta_dict[key]:
|
||||
changed[key] = old_val
|
||||
|
||||
# Sync model/language from live_options delta to top-level.
|
||||
if "model" in delta_dict and delta_dict["model"] != self.model:
|
||||
changed.setdefault("model", self.model)
|
||||
self.model = delta_dict["model"]
|
||||
if "language" in delta_dict and delta_dict["language"] != self.language:
|
||||
changed.setdefault("language", self.language)
|
||||
self.language = delta_dict["language"]
|
||||
|
||||
return changed
|
||||
|
||||
@classmethod
|
||||
def from_mapping(cls: Type[_S], settings: Mapping[str, Any]) -> _S:
|
||||
"""Build a delta from a plain dict, routing LiveOptions keys correctly.
|
||||
|
||||
Keys that are valid ``LiveOptions.__init__`` parameters (and not
|
||||
top-level ``STTSettings`` fields like ``model`` / ``language``) are
|
||||
collected into a ``LiveOptions`` object. ``model`` and ``language``
|
||||
are routed to the top-level settings fields. Truly unknown keys go
|
||||
to ``extra``.
|
||||
"""
|
||||
lo_params = cls._get_live_options_params()
|
||||
stt_field_names = {"model", "language"}
|
||||
|
||||
kwargs: Dict[str, Any] = {}
|
||||
lo_kwargs: Dict[str, Any] = {}
|
||||
extra: Dict[str, Any] = {}
|
||||
|
||||
for key, value in settings.items():
|
||||
canonical = cls._aliases.get(key, key)
|
||||
if canonical in stt_field_names:
|
||||
kwargs[canonical] = value
|
||||
elif canonical in lo_params:
|
||||
lo_kwargs[canonical] = value
|
||||
else:
|
||||
extra[key] = value
|
||||
|
||||
if lo_kwargs:
|
||||
kwargs["live_options"] = LiveOptions(**lo_kwargs)
|
||||
|
||||
instance = cls(**kwargs)
|
||||
instance.extra = extra
|
||||
return instance
|
||||
|
||||
|
||||
class DeepgramSageMakerSTTService(STTService):
|
||||
@@ -130,8 +215,9 @@ class DeepgramSageMakerSTTService(STTService):
|
||||
region: AWS region where the endpoint is deployed (e.g., "us-east-2").
|
||||
sample_rate: Audio sample rate in Hz. If None, uses value from
|
||||
live_options or defaults to the value from StartFrame.
|
||||
live_options: Deepgram LiveOptions for detailed configuration. If None,
|
||||
uses sensible defaults (nova-3 model, English, interim results enabled).
|
||||
live_options: Deepgram LiveOptions configuration. Treated as a
|
||||
delta from a set of sensible defaults — only the fields you
|
||||
set are overridden; all others keep their default values.
|
||||
ttfs_p99_latency: P99 latency from speech end to final transcript in seconds.
|
||||
Override for your deployment. See https://github.com/pipecat-ai/stt-benchmark
|
||||
**kwargs: Additional arguments passed to the parent STTService.
|
||||
@@ -149,29 +235,26 @@ class DeepgramSageMakerSTTService(STTService):
|
||||
)
|
||||
|
||||
# Merge with provided options
|
||||
merged_options = default_options.to_dict()
|
||||
merged_dict = default_options.to_dict()
|
||||
if live_options:
|
||||
default_model = default_options.model
|
||||
merged_options.update(live_options.to_dict())
|
||||
merged_dict.update(live_options.to_dict())
|
||||
# Handle the "None" string bug from deepgram-sdk
|
||||
if "model" in merged_options and merged_options["model"] == "None":
|
||||
merged_options["model"] = default_model
|
||||
if "model" in merged_dict and merged_dict["model"] == "None":
|
||||
merged_dict["model"] = default_model
|
||||
|
||||
# Convert Language enum to string if needed
|
||||
if "language" in merged_options and isinstance(merged_options["language"], Language):
|
||||
merged_options["language"] = merged_options["language"].value
|
||||
if "language" in merged_dict and isinstance(merged_dict["language"], Language):
|
||||
merged_dict["language"] = merged_dict["language"].value
|
||||
|
||||
settings_fields = {f.name for f in fields(DeepgramSageMakerSTTSettings)}
|
||||
settings_kwargs = {}
|
||||
extra = {}
|
||||
for key, value in merged_options.items():
|
||||
if key in settings_fields:
|
||||
settings_kwargs[key] = value
|
||||
else:
|
||||
extra[key] = value
|
||||
# Extract model/language for top-level STTSettings fields; everything
|
||||
# else lives inside LiveOptions.
|
||||
model = merged_dict.pop("model", None)
|
||||
language = merged_dict.pop("language", None)
|
||||
|
||||
settings = DeepgramSageMakerSTTSettings(**settings_kwargs)
|
||||
settings.extra = extra
|
||||
settings = DeepgramSageMakerSTTSettings(
|
||||
model=model, language=language, live_options=LiveOptions(**merged_dict)
|
||||
)
|
||||
|
||||
super().__init__(
|
||||
sample_rate=sample_rate,
|
||||
@@ -255,32 +338,17 @@ class DeepgramSageMakerSTTService(STTService):
|
||||
yield None
|
||||
|
||||
def _build_live_options(self) -> LiveOptions:
|
||||
"""Build a ``LiveOptions`` from flat settings fields, sample rate, and extras.
|
||||
"""Build a ``LiveOptions`` from stored settings and sample rate.
|
||||
|
||||
Returns:
|
||||
A fully-populated ``LiveOptions`` ready for the Deepgram SDK.
|
||||
"""
|
||||
valid_kwargs = set(inspect.signature(LiveOptions.__init__).parameters) - {"self"}
|
||||
opts: dict[str, Any] = self._settings.live_options.to_dict()
|
||||
|
||||
# Start with extras that are valid LiveOptions kwargs.
|
||||
opts: dict[str, Any] = {k: v for k, v in self._settings.extra.items() if k in valid_kwargs}
|
||||
|
||||
# Override with flat settings fields (these take precedence).
|
||||
s = self._settings
|
||||
opts.update(
|
||||
{
|
||||
"model": s.model,
|
||||
"language": s.language,
|
||||
"encoding": s.encoding,
|
||||
"channels": s.channels,
|
||||
"interim_results": s.interim_results,
|
||||
"smart_format": s.smart_format,
|
||||
"punctuate": s.punctuate,
|
||||
"profanity_filter": s.profanity_filter,
|
||||
"vad_events": s.vad_events,
|
||||
"sample_rate": self.sample_rate,
|
||||
}
|
||||
)
|
||||
# Overlay model/language from top-level settings and sample_rate from service.
|
||||
opts["model"] = self._settings.model
|
||||
opts["language"] = self._settings.language
|
||||
opts["sample_rate"] = self.sample_rate
|
||||
|
||||
return LiveOptions(**opts)
|
||||
|
||||
|
||||
@@ -7,7 +7,10 @@
|
||||
"""Tests for the typed settings infrastructure in pipecat.services.settings."""
|
||||
|
||||
import pytest
|
||||
from deepgram import LiveOptions
|
||||
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTSettings
|
||||
from pipecat.services.deepgram.stt_sagemaker import DeepgramSageMakerSTTSettings
|
||||
from pipecat.services.settings import (
|
||||
NOT_GIVEN,
|
||||
LLMSettings,
|
||||
@@ -311,3 +314,211 @@ class TestRoundtrip:
|
||||
assert changed["model"] == "gpt-4o"
|
||||
assert current.model == "gpt-4o-mini"
|
||||
assert current.temperature == 0.9
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# DeepgramSTTSettings: live_options delta merge
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDeepgramSTTSettingsApplyUpdate:
|
||||
def _make_store(self, **lo_kwargs) -> DeepgramSTTSettings:
|
||||
"""Helper to build a store-mode DeepgramSTTSettings."""
|
||||
defaults = dict(
|
||||
encoding="linear16",
|
||||
channels=1,
|
||||
interim_results=True,
|
||||
smart_format=False,
|
||||
punctuate=True,
|
||||
profanity_filter=True,
|
||||
vad_events=False,
|
||||
)
|
||||
defaults.update(lo_kwargs)
|
||||
s = DeepgramSTTSettings(
|
||||
model="nova-3-general",
|
||||
language="en",
|
||||
live_options=LiveOptions(**defaults),
|
||||
)
|
||||
return s
|
||||
|
||||
def test_apply_update_merges_live_options_as_delta(self):
|
||||
"""Only the given fields in the delta LiveOptions are merged."""
|
||||
current = self._make_store()
|
||||
assert current.live_options.punctuate is True
|
||||
|
||||
delta = DeepgramSTTSettings(live_options=LiveOptions(punctuate=False))
|
||||
changed = current.apply_update(delta)
|
||||
|
||||
assert current.live_options.punctuate is False
|
||||
assert "punctuate" in changed
|
||||
# Other fields are untouched
|
||||
assert current.live_options.encoding == "linear16"
|
||||
assert current.live_options.channels == 1
|
||||
|
||||
def test_apply_update_syncs_model_from_live_options_to_top_level(self):
|
||||
"""model inside live_options delta should sync to top-level model."""
|
||||
current = self._make_store()
|
||||
assert current.model == "nova-3-general"
|
||||
|
||||
delta = DeepgramSTTSettings(live_options=LiveOptions(model="nova-2"))
|
||||
changed = current.apply_update(delta)
|
||||
|
||||
assert current.model == "nova-2"
|
||||
assert "model" in changed
|
||||
|
||||
def test_apply_update_syncs_language_from_live_options_to_top_level(self):
|
||||
"""language inside live_options delta should sync to top-level language."""
|
||||
current = self._make_store()
|
||||
assert current.language == "en"
|
||||
|
||||
delta = DeepgramSTTSettings(live_options=LiveOptions(language="es"))
|
||||
changed = current.apply_update(delta)
|
||||
|
||||
assert current.language == "es"
|
||||
assert "language" in changed
|
||||
|
||||
def test_apply_update_syncs_top_level_model_into_live_options(self):
|
||||
"""Top-level model change should propagate into stored live_options."""
|
||||
current = self._make_store()
|
||||
assert current.model == "nova-3-general"
|
||||
|
||||
delta = DeepgramSTTSettings(model="nova-2")
|
||||
changed = current.apply_update(delta)
|
||||
|
||||
assert current.model == "nova-2"
|
||||
assert current.live_options.model == "nova-2"
|
||||
assert "model" in changed
|
||||
|
||||
def test_apply_update_syncs_top_level_language_into_live_options(self):
|
||||
"""Top-level language change should propagate into stored live_options."""
|
||||
current = self._make_store()
|
||||
|
||||
delta = DeepgramSTTSettings(language="fr")
|
||||
changed = current.apply_update(delta)
|
||||
|
||||
assert current.language == "fr"
|
||||
assert current.live_options.language == "fr"
|
||||
assert "language" in changed
|
||||
|
||||
def test_apply_update_no_change(self):
|
||||
"""Delta with same values should report no changes."""
|
||||
current = self._make_store()
|
||||
delta = DeepgramSTTSettings(live_options=LiveOptions(punctuate=True))
|
||||
changed = current.apply_update(delta)
|
||||
assert changed == {}
|
||||
|
||||
|
||||
class TestDeepgramSTTSettingsFromMapping:
|
||||
def test_routes_live_options_kwargs(self):
|
||||
"""LiveOptions-valid keys should be collected into live_options."""
|
||||
delta = DeepgramSTTSettings.from_mapping({"punctuate": False, "filler_words": True})
|
||||
assert is_given(delta.live_options)
|
||||
assert delta.live_options.punctuate is False
|
||||
assert delta.live_options.filler_words is True
|
||||
|
||||
def test_routes_model_and_language_to_top_level(self):
|
||||
"""model and language should be top-level fields, not in live_options."""
|
||||
delta = DeepgramSTTSettings.from_mapping({"model": "nova-2", "language": "es"})
|
||||
assert delta.model == "nova-2"
|
||||
assert delta.language == "es"
|
||||
assert not is_given(delta.live_options)
|
||||
|
||||
def test_unknown_keys_go_to_extra(self):
|
||||
"""Keys that aren't LiveOptions params or STT fields go to extra."""
|
||||
delta = DeepgramSTTSettings.from_mapping({"unknown_param": 42})
|
||||
assert delta.extra == {"unknown_param": 42}
|
||||
assert not is_given(delta.live_options)
|
||||
|
||||
def test_mixed_keys(self):
|
||||
"""model + LiveOptions keys + unknown keys are routed correctly."""
|
||||
delta = DeepgramSTTSettings.from_mapping(
|
||||
{"model": "nova-2", "punctuate": False, "unknown": "val"}
|
||||
)
|
||||
assert delta.model == "nova-2"
|
||||
assert delta.live_options.punctuate is False
|
||||
assert delta.extra == {"unknown": "val"}
|
||||
|
||||
def test_roundtrip_from_mapping_apply_update(self):
|
||||
"""Simulate dict-style update: from_mapping -> apply_update."""
|
||||
current = DeepgramSTTSettings(
|
||||
model="nova-3-general",
|
||||
language="en",
|
||||
live_options=LiveOptions(
|
||||
encoding="linear16",
|
||||
channels=1,
|
||||
interim_results=True,
|
||||
punctuate=True,
|
||||
profanity_filter=True,
|
||||
vad_events=False,
|
||||
),
|
||||
)
|
||||
|
||||
raw = {"punctuate": False, "filler_words": True}
|
||||
delta = DeepgramSTTSettings.from_mapping(raw)
|
||||
changed = current.apply_update(delta)
|
||||
|
||||
assert current.live_options.punctuate is False
|
||||
assert current.live_options.filler_words is True
|
||||
# Unchanged fields stay put
|
||||
assert current.live_options.encoding == "linear16"
|
||||
assert current.model == "nova-3-general"
|
||||
assert "punctuate" in changed
|
||||
|
||||
def test_roundtrip_model_via_dict(self):
|
||||
"""Dict update with model should change top-level and NOT create live_options."""
|
||||
current = DeepgramSTTSettings(
|
||||
model="nova-3-general",
|
||||
language="en",
|
||||
live_options=LiveOptions(encoding="linear16", channels=1),
|
||||
)
|
||||
|
||||
raw = {"model": "nova-2"}
|
||||
delta = DeepgramSTTSettings.from_mapping(raw)
|
||||
changed = current.apply_update(delta)
|
||||
|
||||
assert current.model == "nova-2"
|
||||
assert current.live_options.model == "nova-2"
|
||||
assert "model" in changed
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# DeepgramSageMakerSTTSettings: same pattern
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDeepgramSageMakerSTTSettingsApplyUpdate:
|
||||
def _make_store(self, **lo_kwargs) -> DeepgramSageMakerSTTSettings:
|
||||
defaults = dict(
|
||||
encoding="linear16",
|
||||
channels=1,
|
||||
interim_results=True,
|
||||
punctuate=True,
|
||||
)
|
||||
defaults.update(lo_kwargs)
|
||||
return DeepgramSageMakerSTTSettings(
|
||||
model="nova-3",
|
||||
language="en",
|
||||
live_options=LiveOptions(**defaults),
|
||||
)
|
||||
|
||||
def test_apply_update_merges_live_options_as_delta(self):
|
||||
current = self._make_store()
|
||||
delta = DeepgramSageMakerSTTSettings(live_options=LiveOptions(punctuate=False))
|
||||
changed = current.apply_update(delta)
|
||||
assert current.live_options.punctuate is False
|
||||
assert "punctuate" in changed
|
||||
assert current.live_options.encoding == "linear16"
|
||||
|
||||
def test_apply_update_syncs_model_from_live_options(self):
|
||||
current = self._make_store()
|
||||
delta = DeepgramSageMakerSTTSettings(live_options=LiveOptions(model="nova-2"))
|
||||
current.apply_update(delta)
|
||||
assert current.model == "nova-2"
|
||||
|
||||
def test_from_mapping_routes_correctly(self):
|
||||
delta = DeepgramSageMakerSTTSettings.from_mapping(
|
||||
{"model": "nova-2", "punctuate": False, "unknown": "val"}
|
||||
)
|
||||
assert delta.model == "nova-2"
|
||||
assert delta.live_options.punctuate is False
|
||||
assert delta.extra == {"unknown": "val"}
|
||||
|
||||
Reference in New Issue
Block a user