Upgrading Deepgram to version 6.

This commit is contained in:
filipi87
2026-03-02 11:22:33 -03:00
parent ad74d19c6b
commit 8b09f7bbb4
7 changed files with 589 additions and 424 deletions

View File

@@ -0,0 +1 @@
- ⚠️ Updated `DeepgramSTTService` to use `deepgram-sdk` v6. The `LiveOptions` class was removed from the SDK and is now provided by pipecat directly; import it from `pipecat.services.deepgram.stt` instead of `deepgram`.

1
changelog/3848.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed `DeepgramSTTService` keepalive ping timeout disconnections. The deepgram-sdk v6 removed automatic keepalive; pipecat now sends explicit `KeepAlive` messages every 5 seconds, within the recommended 35 second interval before Deepgram's 10-second inactivity timeout.

View File

@@ -63,7 +63,7 @@ cartesia = [ "cartesia~=2.0.3", "pipecat-ai[websockets-base]" ]
camb = [ "camb-sdk>=1.5.4" ]
cerebras = []
daily = [ "daily-python~=0.23.0" ]
deepgram = [ "deepgram-sdk~=4.7.0", "pipecat-ai[websockets-base]" ]
deepgram = [ "deepgram-sdk~=6.0.1", "pipecat-ai[websockets-base]" ]
deepseek = []
elevenlabs = [ "pipecat-ai[websockets-base]" ]
fal = [ "fal-client~=0.5.9" ]

View File

@@ -6,9 +6,9 @@
"""Deepgram speech-to-text service implementation."""
import inspect
from dataclasses import dataclass, field
from typing import Any, AsyncGenerator, Dict, Mapping, Optional, Type
import asyncio
from dataclasses import dataclass, field, fields
from typing import Any, AsyncGenerator, Dict, Optional
from loguru import logger
@@ -33,14 +33,12 @@ from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt
try:
from deepgram import (
AsyncListenWebSocketClient,
DeepgramClient,
DeepgramClientOptions,
ErrorResponse,
LiveOptions,
LiveResultResponse,
LiveTranscriptionEvents,
from deepgram import AsyncDeepgramClient
from deepgram.core.events import EventType
from deepgram.listen.v1.types import (
ListenV1Results,
ListenV1SpeechStarted,
ListenV1UtteranceEnd,
)
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
@@ -48,166 +46,184 @@ except ModuleNotFoundError as e:
raise Exception(f"Missing module: {e}")
@dataclass
class _DeepgramSTTSettingsBase(STTSettings):
"""Base settings for Deepgram STT services that use ``LiveOptions``.
class LiveOptions:
"""Deepgram live transcription options.
Shared by ``DeepgramSTTSettings`` and ``DeepgramSageMakerSTTSettings``.
Not intended for other Deepgram services that don't use ``LiveOptions``.
Wraps the Deepgram SDK's ``LiveOptions`` in a single ``live_options``
field and provides delta-merge semantics: when used as a delta (e.g.
via ``STTUpdateSettingsFrame``), only the non-None fields of
``live_options`` are merged into the stored options rather than
replacing them wholesale.
``model`` and ``language`` are kept in sync bidirectionally between
the top-level settings fields and the nested ``live_options``.
Parameters:
live_options: Deepgram ``LiveOptions`` for STT configuration.
In delta mode only its non-None fields are merged into the
stored options.
Compatibility wrapper that mirrors the ``LiveOptions`` class removed in
deepgram-sdk v6. Pass this to :class:`DeepgramSTTService` via the
``live_options`` constructor argument.
"""
live_options: LiveOptions | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
# Valid LiveOptions __init__ parameter names (cached at class level).
_live_options_params: set[str] | None = field(default=None, init=False, repr=False)
@classmethod
def _get_live_options_params(cls) -> set[str]:
"""Return the set of valid ``LiveOptions.__init__`` parameter names."""
if cls._live_options_params is None:
cls._live_options_params = set(inspect.signature(LiveOptions.__init__).parameters) - {
"self"
}
return cls._live_options_params
def _merge_live_options_delta(self, delta: LiveOptions) -> Dict[str, Any]:
"""Merge a ``LiveOptions`` delta into the stored ``live_options``.
Non-None fields from *delta* overwrite corresponding fields in the
stored ``LiveOptions``. ``model`` and ``language`` are synced to
the top-level settings fields when they change.
def __init__(
self,
*,
callback: Optional[str] = None,
callback_method: Optional[str] = None,
channels: Optional[int] = None,
detect_entities: Optional[bool] = None,
diarize: Optional[bool] = None,
dictation: Optional[bool] = None,
encoding: Optional[str] = None,
endpointing: Optional[Any] = None,
extra: Optional[Any] = None,
interim_results: Optional[bool] = None,
keyterm: Optional[Any] = None,
keywords: Optional[Any] = None,
language: Optional[str] = None,
mip_opt_out: Optional[bool] = None,
model: Optional[str] = None,
multichannel: Optional[bool] = None,
numerals: Optional[bool] = None,
profanity_filter: Optional[bool] = None,
punctuate: Optional[bool] = None,
redact: Optional[Any] = None,
replace: Optional[Any] = None,
sample_rate: Optional[int] = None,
search: Optional[Any] = None,
smart_format: Optional[bool] = None,
tag: Optional[Any] = None,
utterance_end_ms: Optional[int] = None,
vad_events: Optional[bool] = None,
version: Optional[str] = None,
**kwargs,
):
"""Initialize live transcription options.
Args:
delta: A ``LiveOptions`` whose non-None fields are the desired
overrides.
Returns:
Dict mapping each changed key to its **previous** value (same
contract as ``apply_update``).
callback: Callback URL for async transcription delivery.
callback_method: HTTP method to use for the callback (``"GET"`` or ``"POST"``).
channels: Number of audio channels.
detect_entities: Enable named entity detection.
diarize: Enable speaker diarization.
dictation: Enable dictation mode (converts commands to punctuation).
encoding: Audio encoding (e.g. ``"linear16"``).
endpointing: Endpointing sensitivity in ms, or ``False`` to disable.
extra: Additional key-value metadata to attach to the transcription (str or list).
interim_results: Whether to emit interim transcriptions.
keyterm: Keyterms to boost (str or list of str).
keywords: Keywords to boost (str or list of str).
language: BCP-47 language tag (e.g. ``"en-US"``).
mip_opt_out: Opt out of model improvement program.
model: Deepgram model name (e.g. ``"nova-3-general"``).
multichannel: Enable per-channel transcription for multi-channel audio.
numerals: Convert spoken numbers to numerals.
profanity_filter: Filter profanity from transcripts.
punctuate: Add punctuation to transcripts.
redact: Redact sensitive information (str or list of redaction types).
replace: Word replacement rules (str or list).
sample_rate: Audio sample rate in Hz.
search: Search terms to highlight (str or list of str).
smart_format: Apply smart formatting to transcripts.
tag: Custom billing tag (str or list of str).
utterance_end_ms: Silence duration in ms before an utterance-end event.
vad_events: Enable Deepgram VAD speech-started / utterance-end events.
version: Model version (e.g. ``"latest"``).
**kwargs: Any additional Deepgram query parameters.
"""
old_dict = self.live_options.to_dict() # type: ignore[union-attr]
delta_dict = delta.to_dict()
self.callback = callback
self.callback_method = callback_method
self.channels = channels
self.detect_entities = detect_entities
self.diarize = diarize
self.dictation = dictation
self.encoding = encoding
self.endpointing = endpointing
self.extra = extra
self.interim_results = interim_results
self.keyterm = keyterm
self.keywords = keywords
self.language = language
self.mip_opt_out = mip_opt_out
self.model = model
self.multichannel = multichannel
self.numerals = numerals
self.profanity_filter = profanity_filter
self.punctuate = punctuate
self.redact = redact
self.replace = replace
self.sample_rate = sample_rate
self.search = search
self.smart_format = smart_format
self.tag = tag
self.utterance_end_ms = utterance_end_ms
self.vad_events = vad_events
self.version = version
self._extra = kwargs
# Deepgram SDK bug: model initialised to the *string* "None".
if delta_dict.get("model") == "None":
del delta_dict["model"]
def __getattr__(self, name: str):
# Fall back to _extra for any params passed as **kwargs.
# __getattr__ is only called when normal attribute lookup fails.
extra = self.__dict__.get("_extra", {})
try:
return extra[name]
except KeyError:
raise AttributeError(f"'LiveOptions' object has no attribute '{name}'")
if not delta_dict:
return {}
merged = {**old_dict, **delta_dict}
self.live_options = LiveOptions(**merged)
# Track what changed.
changed: Dict[str, Any] = {}
for key in delta_dict:
old_val = old_dict.get(key, NOT_GIVEN)
if old_val != delta_dict[key]:
changed[key] = old_val
# Sync model/language from live_options delta to top-level fields.
if "model" in delta_dict and delta_dict["model"] != self.model:
changed.setdefault("model", self.model)
self.model = delta_dict["model"]
if "language" in delta_dict and delta_dict["language"] != self.language:
changed.setdefault("language", self.language)
self.language = delta_dict["language"]
return changed
def apply_update(self: _S, delta: _S) -> Dict[str, Any]:
"""Merge a delta into this store, with delta-merge for ``live_options``.
``live_options`` is merged field-by-field via
``_merge_live_options_delta`` rather than being replaced wholesale.
``model`` and ``language`` are kept in sync bidirectionally between
the top-level settings fields and ``live_options``.
"""
# Pull live_options out of the delta so super() doesn't replace it.
delta_lo = getattr(delta, "live_options", NOT_GIVEN)
if is_given(delta_lo):
delta.live_options = NOT_GIVEN # type: ignore[assignment]
# Let the base class handle model, language, extra.
changed = super().apply_update(delta)
# Sync top-level model/language changes into stored live_options.
if "model" in changed:
self.live_options.model = self.model # type: ignore[union-attr]
if "language" in changed:
self.live_options.language = self.language # type: ignore[union-attr]
# Merge live_options delta. Top-level model/language take precedence
# over conflicting values in live_options, so write them into the
# delta before merging.
if is_given(delta_lo):
if "model" in changed:
delta_lo.model = self.model
if "language" in changed:
delta_lo.language = self.language
for key, old_val in self._merge_live_options_delta(delta_lo).items():
changed.setdefault(key, old_val)
return changed
@classmethod
def from_mapping(cls: Type[_S], settings: Mapping[str, Any]) -> _S:
"""Build a delta from a plain dict, routing LiveOptions keys correctly.
Keys that are valid ``LiveOptions.__init__`` parameters (and not
top-level ``STTSettings`` fields like ``model`` / ``language``) are
collected into a ``LiveOptions`` object. ``model`` and ``language``
are routed to the top-level settings fields. Truly unknown keys go
to ``extra``.
"""
lo_params = cls._get_live_options_params()
stt_field_names = {"model", "language"}
kwargs: Dict[str, Any] = {}
lo_kwargs: Dict[str, Any] = {}
extra: Dict[str, Any] = {}
for key, value in settings.items():
canonical = cls._aliases.get(key, key)
if canonical in stt_field_names:
kwargs[canonical] = value
elif canonical in lo_params:
lo_kwargs[canonical] = value
else:
extra[key] = value
if lo_kwargs:
kwargs["live_options"] = LiveOptions(**lo_kwargs)
instance = cls(**kwargs)
instance.extra = extra
return instance
def to_dict(self) -> dict:
"""Return a dict of all non-None options."""
result = {k: v for k, v in vars(self).items() if not k.startswith("_") and v is not None}
result.update({k: v for k, v in self._extra.items() if v is not None})
return result
@dataclass
class DeepgramSTTSettings(_DeepgramSTTSettingsBase):
"""Settings for the Deepgram STT service.
class DeepgramSTTSettings(STTSettings):
"""Settings for Deepgram STT services.
See ``_DeepgramSTTSettingsBase`` for full documentation.
``model`` and ``language`` are inherited from ``STTSettings`` /
``ServiceSettings``. Additional Deepgram connection params may
be passed in through extra ``extra`` (also inherited).
Parameters:
channels: Number of audio channels.
diarize: Enable speaker diarization.
encoding: Audio encoding (e.g. ``"linear16"``).
endpointing: Endpointing sensitivity in ms, or ``False`` to disable.
interim_results: Whether to emit interim transcriptions.
profanity_filter: Filter profanity from transcripts.
punctuate: Add punctuation to transcripts.
smart_format: Apply smart formatting to transcripts.
vad_events: Enable Deepgram VAD speech-started / utterance-end events.
extra: Additional Deepgram query parameters not covered by the fields above.
"""
pass
channels: int | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
diarize: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
encoding: str | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
endpointing: Any | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
interim_results: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
profanity_filter: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
punctuate: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
smart_format: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
vad_events: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
def _sync_extra_to_fields(self) -> None:
"""Sync values from extra dict to declared fields.
If a key in extra matches a field name and the field is NOT_GIVEN,
promote the extra value to the field. This ensures self._settings
always reflects the "final truth" of values that will be used.
Keys in extra that match declared fields are always removed from extra
to avoid confusion, even if the field was already set.
"""
if not self.extra:
return
field_names = {
f.name
for f in fields(self)
if f.name not in ("extra", "model", "language") and not f.name.startswith("_")
}
for key in list(self.extra.keys()):
if key in field_names:
current_value = getattr(self, key)
if not is_given(current_value):
# Promote extra value to the field
setattr(self, key, self.extra[key])
# Always remove from extra to avoid ambiguity
del self.extra[key]
class DeepgramSTTService(STTService):
@@ -254,7 +270,7 @@ class DeepgramSTTService(STTService):
base_url: Custom Deepgram API base URL.
sample_rate: Audio sample rate. If None, uses default or live_options value.
live_options: Deepgram LiveOptions configuration. Treated as a
live_options: :class: LiveOptions configuration. Treated as a
delta from a set of sensible defaults — only the fields you
set are overridden; all others keep their default values.
addons: Additional Deepgram features to enable.
@@ -283,25 +299,29 @@ class DeepgramSTTService(STTService):
)
base_url = url
default_options = LiveOptions(
encoding="linear16",
language=Language.EN,
settings = DeepgramSTTSettings(
model="nova-3-general",
language=Language.EN,
encoding="linear16",
channels=1,
interim_results=True,
smart_format=False,
punctuate=True,
profanity_filter=True,
vad_events=False,
diarize=False,
endpointing=None,
)
settings = DeepgramSTTSettings(
model=default_options.model,
language=default_options.language,
live_options=default_options,
)
if live_options:
settings._merge_live_options_delta(live_options)
lo_dict = live_options.to_dict()
delta = DeepgramSTTSettings.from_mapping(
{k: v for k, v in lo_dict.items() if k != "sample_rate"}
)
settings.apply_update(delta)
# Sync extra to top-level fields so self._settings is unambiguous
settings._sync_extra_to_fields()
super().__init__(
sample_rate=sample_rate,
@@ -313,7 +333,7 @@ class DeepgramSTTService(STTService):
self._addons = addons
self._should_interrupt = should_interrupt
if self._settings.live_options.vad_events:
if self._settings.vad_events:
import warnings
with warnings.catch_warnings():
@@ -325,13 +345,29 @@ class DeepgramSTTService(STTService):
stacklevel=2,
)
self._client = DeepgramClient(
api_key,
config=DeepgramClientOptions(
url=base_url,
options={"keepalive": "true"}, # verbose=logging.DEBUG
),
)
# Build client - support optional custom base URL via DeepgramClientEnvironment
if base_url:
try:
from deepgram import DeepgramClientEnvironment
ws_url = base_url if base_url.startswith("wss://") else f"wss://{base_url}"
http_url = base_url if base_url.startswith("https://") else f"https://{base_url}"
environment = DeepgramClientEnvironment(
base=http_url,
production=ws_url,
agent=ws_url,
)
self._client = AsyncDeepgramClient(api_key=api_key, environment=environment)
except Exception:
logger.warning(
f"{self}: Custom base_url configuration failed, falling back to default"
)
self._client = AsyncDeepgramClient(api_key=api_key)
else:
self._client = AsyncDeepgramClient(api_key=api_key)
self._connection = None
self._connection_task = None
if self.vad_enabled:
self._register_event_handler("on_speech_started")
@@ -344,7 +380,7 @@ class DeepgramSTTService(STTService):
Returns:
True if VAD events are enabled in the current settings.
"""
return self._settings.live_options.vad_events
return self._settings.vad_events
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
@@ -361,6 +397,10 @@ class DeepgramSTTService(STTService):
if not changed:
return changed
# Sync extra to fields after the update so self._settings stays unambiguous
if isinstance(self._settings, DeepgramSTTSettings):
self._settings._sync_extra_to_fields()
await self._disconnect()
await self._connect()
@@ -402,79 +442,126 @@ class DeepgramSTTService(STTService):
Yields:
Frame: None (transcription results come via WebSocket callbacks).
"""
await self._connection.send(audio)
if self._connection:
await self._connection.send_media(audio)
yield None
def _build_connect_kwargs(self) -> dict:
"""Build keyword arguments for ``client.listen.v1.connect()`` from current settings."""
kwargs = {}
s = self._settings
# Declared Deepgram-specific fields
for f in fields(s):
if f.name in ("model", "language", "extra") or f.name.startswith("_"):
continue
value = getattr(s, f.name)
if not is_given(value) or value is None:
continue
kwargs[f.name] = str(value).lower() if isinstance(value, bool) else str(value)
# model and language
if is_given(s.model) and s.model is not None:
kwargs["model"] = str(s.model)
if is_given(s.language) and s.language is not None:
kwargs["language"] = str(s.language)
# Any remaining values in extra (that didn't map to declared fields)
for key, value in s.extra.items():
if value is not None:
kwargs[key] = str(value).lower() if isinstance(value, bool) else str(value)
# Always inject sample_rate from service level.
kwargs["sample_rate"] = str(self.sample_rate)
if self._addons:
for key, value in self._addons.items():
kwargs[key] = str(value)
return kwargs
async def _connect(self):
logger.debug("Connecting to Deepgram")
self._connection: AsyncListenWebSocketClient = self._client.listen.asyncwebsocket.v("1")
self._connection.on(
LiveTranscriptionEvents(LiveTranscriptionEvents.Transcript), self._on_message
)
self._connection.on(LiveTranscriptionEvents(LiveTranscriptionEvents.Error), self._on_error)
if self.vad_enabled:
self._connection.on(
LiveTranscriptionEvents(LiveTranscriptionEvents.SpeechStarted),
self._on_speech_started,
)
self._connection.on(
LiveTranscriptionEvents(LiveTranscriptionEvents.UtteranceEnd),
self._on_utterance_end,
)
live_options = LiveOptions(
**{**self._settings.live_options.to_dict(), "sample_rate": self.sample_rate}
)
if not await self._connection.start(options=live_options, addons=self._addons):
await self.push_error(error_msg=f"Unable to connect to Deepgram")
else:
headers = {
k: v
for k, v in self._connection._socket.response.headers.items()
if k.startswith("dg-")
}
logger.debug(f'{self}: Websocket connection initialized: {{"headers": {headers}}}')
self._connection_task = self.create_task(self._connection_handler())
async def _disconnect(self):
if await self._connection.is_connected():
logger.debug("Disconnecting from Deepgram")
# Deepgram swallows asyncio.CancelledError internally which prevents
# proper cancellation propagation. This issue was found with
# parallel pipelines where `CancelFrame` was not awaited for to
# finish in all branches and it was pushed downstream reaching the
# end of the pipeline, which caused `cleanup()` to be called while
# Deepgram disconnection was still finishing and therefore
# preventing the task cancellation that occurs during `cleanup()`.
# GH issue: https://github.com/deepgram/deepgram-python-sdk/issues/570
await self._connection.finish()
if not self._connection_task:
return
logger.debug("Disconnecting from Deepgram")
# Ask Deepgram to close the stream gracefully before cancelling the task.
if self._connection:
await self._connection.send_close_stream()
await self.cancel_task(self._connection_task)
self._connection_task = None
self._connection = None
async def _connection_handler(self):
"""Manages the full WebSocket lifecycle inside a single async with block.
Reconnects automatically after transient errors. Exits cleanly when
the task is cancelled (i.e. on stop/cancel).
"""
while True:
connect_kwargs = self._build_connect_kwargs()
try:
async with self._client.listen.v1.connect(**connect_kwargs) as connection:
self._connection = connection
connection.on(EventType.MESSAGE, self._on_message)
connection.on(EventType.ERROR, self._on_error)
logger.debug(f"{self}: Websocket connection initialized")
keepalive_task = self.create_task(
self._keepalive_handler(), f"{self}::keepalive"
)
try:
await connection.start_listening()
finally:
await self.cancel_task(keepalive_task)
except asyncio.CancelledError:
raise
except Exception as e:
logger.warning(f"{self}: Connection lost, will retry: {e}")
finally:
self._connection = None
async def _keepalive_handler(self):
"""Periodically send KeepAlive frames to prevent server-side timeout.
Deepgram closes inactive connections after 10 seconds (NET-0001 error).
Sending every 5 seconds stays within the recommended 3-5 second interval.
"""
while True:
await asyncio.sleep(5)
if self._connection:
try:
await self._connection.send_keep_alive()
logger.trace(f"{self}: Sent keepalive")
except Exception as e:
logger.warning(f"{self}: Keepalive failed: {e}")
async def _start_metrics(self):
"""Start processing metrics collection for this utterance."""
await self.start_processing_metrics()
async def _on_error(self, *args, **kwargs):
error: ErrorResponse = kwargs["error"]
async def _on_error(self, error):
logger.warning(f"{self} connection error, will retry: {error}")
await self.push_error(error_msg=f"{error}")
await self.stop_all_metrics()
# NOTE(aleix): we don't disconnect (i.e. call finish on the connection)
# because this triggers more errors internally in the Deepgram SDK. So,
# we just forget about the previous connection and create a new one.
await self._connect()
# Reconnection is handled automatically by the retry loop in
# _connection_handler once start_listening() exits after the error.
async def _on_speech_started(self, *args, **kwargs):
async def _on_speech_started(self, message):
await self._start_metrics()
await self._call_event_handler("on_speech_started", *args, **kwargs)
await self._call_event_handler("on_speech_started", message)
await self.broadcast_frame(UserStartedSpeakingFrame)
if self._should_interrupt:
await self.push_interruption_task_frame_and_wait()
async def _on_utterance_end(self, *args, **kwargs):
await self._call_event_handler("on_utterance_end", *args, **kwargs)
async def _on_utterance_end(self, message):
await self._call_event_handler("on_utterance_end", message)
await self.broadcast_frame(UserStoppedSpeakingFrame)
@traced_stt
@@ -484,45 +571,51 @@ class DeepgramSTTService(STTService):
"""Handle a transcription result with tracing."""
pass
async def _on_message(self, *args, **kwargs):
result: LiveResultResponse = kwargs["result"]
if len(result.channel.alternatives) == 0:
return
is_final = result.is_final
transcript = result.channel.alternatives[0].transcript
language = None
if result.channel.alternatives[0].languages:
language = result.channel.alternatives[0].languages[0]
language = Language(language)
if len(transcript) > 0:
if is_final:
# Check if this response is from a finalize() call.
# Only mark as finalized when both we requested it AND Deepgram confirms it.
from_finalize = getattr(result, "from_finalize", False)
if from_finalize:
self.confirm_finalize()
await self.push_frame(
TranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
language,
result=result,
async def _on_message(self, message):
if isinstance(message, ListenV1SpeechStarted):
if self.vad_enabled:
await self._on_speech_started(message)
elif isinstance(message, ListenV1UtteranceEnd):
if self.vad_enabled:
await self._on_utterance_end(message)
elif isinstance(message, ListenV1Results):
if not message.channel or len(message.channel.alternatives) == 0:
return
is_final = message.is_final
transcript = message.channel.alternatives[0].transcript
language = None
if message.channel.alternatives[0].languages:
language = message.channel.alternatives[0].languages[0]
language = Language(language)
if len(transcript) > 0:
if is_final:
# Check if this response is from a finalize() call.
# Only mark as finalized when both we requested it AND Deepgram confirms it.
from_finalize = getattr(message, "from_finalize", False) or False
if from_finalize:
self.confirm_finalize()
await self.push_frame(
TranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
language,
result=message,
)
)
)
await self._handle_transcription(transcript, is_final, language)
await self.stop_processing_metrics()
else:
# For interim transcriptions, just push the frame without tracing
await self.push_frame(
InterimTranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
language,
result=result,
await self._handle_transcription(transcript, is_final, language)
await self.stop_processing_metrics()
else:
# For interim transcriptions, just push the frame without tracing
await self.push_frame(
InterimTranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
language,
result=message,
)
)
)
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames with Deepgram-specific handling.
@@ -539,6 +632,7 @@ class DeepgramSTTService(STTService):
elif isinstance(frame, VADUserStoppedSpeakingFrame):
# https://developers.deepgram.com/docs/finalize
# Mark that we're awaiting a from_finalize response
self.request_finalize()
await self._connection.finalize()
logger.trace(f"Triggered finalize event on: {frame.name=}, {direction=}")
if self._connection:
self.request_finalize()
await self._connection.send_finalize()
logger.trace(f"Triggered finalize event on: {frame.name=}, {direction=}")

View File

@@ -32,7 +32,7 @@ from pipecat.frames.frames import (
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.aws.sagemaker.bidi_client import SageMakerBidiClient
from pipecat.services.deepgram.stt import _DeepgramSTTSettingsBase
from pipecat.services.deepgram.stt import DeepgramSTTSettings
from pipecat.services.settings import STTSettings
from pipecat.services.stt_latency import DEEPGRAM_SAGEMAKER_TTFS_P99
from pipecat.services.stt_service import STTService
@@ -41,7 +41,7 @@ from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt
try:
from deepgram import LiveOptions
from pipecat.services.deepgram.stt import LiveOptions
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
@@ -51,10 +51,10 @@ except ModuleNotFoundError as e:
@dataclass
class DeepgramSageMakerSTTSettings(_DeepgramSTTSettingsBase):
class DeepgramSageMakerSTTSettings(DeepgramSTTSettings):
"""Settings for the Deepgram SageMaker STT service.
See ``_DeepgramSTTSettingsBase`` for full documentation.
See ``DeepgramSTTSettings`` for full documentation.
"""
pass
@@ -117,22 +117,21 @@ class DeepgramSageMakerSTTService(STTService):
"""
sample_rate = sample_rate or (live_options.sample_rate if live_options else None)
default_options = LiveOptions(
encoding="linear16",
language=Language.EN,
settings = DeepgramSageMakerSTTSettings(
model="nova-3",
language=Language.EN,
encoding="linear16",
channels=1,
interim_results=True,
punctuate=True,
)
settings = DeepgramSageMakerSTTSettings(
model=default_options.model,
language=default_options.language,
live_options=default_options,
)
if live_options:
settings._merge_live_options_delta(live_options)
lo_dict = live_options.to_dict()
delta = DeepgramSageMakerSTTSettings.from_mapping(
{k: v for k, v in lo_dict.items() if k != "sample_rate"}
)
settings.apply_update(delta)
super().__init__(
sample_rate=sample_rate,
@@ -224,9 +223,8 @@ class DeepgramSageMakerSTTService(STTService):
"""
logger.debug("Connecting to Deepgram on SageMaker...")
live_options = LiveOptions(
**{**self._settings.live_options.to_dict(), "sample_rate": self.sample_rate}
)
# Reconstruct a LiveOptions from the flat settings to build the query string.
live_options = LiveOptions(**self._settings.given_fields())
# Build query string from live_options, converting booleans to strings
query_params = {}
@@ -237,6 +235,7 @@ class DeepgramSageMakerSTTService(STTService):
query_params[key] = str(value).lower()
else:
query_params[key] = str(value)
query_params["sample_rate"] = str(self.sample_rate)
query_string = "&".join(f"{k}={v}" for k, v in query_params.items())

View File

@@ -6,10 +6,11 @@
"""Tests for the typed settings infrastructure in pipecat.services.settings."""
import pytest
from deepgram import LiveOptions
from unittest.mock import patch
from pipecat.services.deepgram.stt import DeepgramSTTSettings
import pytest
from pipecat.services.deepgram.stt import DeepgramSTTService, DeepgramSTTSettings
from pipecat.services.deepgram.stt_sagemaker import DeepgramSageMakerSTTSettings
from pipecat.services.settings import (
NOT_GIVEN,
@@ -317,14 +318,16 @@ class TestRoundtrip:
# ---------------------------------------------------------------------------
# DeepgramSTTSettings: live_options delta merge
# DeepgramSTTSettings: flat field apply_update
# ---------------------------------------------------------------------------
class TestDeepgramSTTSettingsApplyUpdate:
def _make_store(self, **lo_kwargs) -> DeepgramSTTSettings:
def _make_store(self, **kwargs) -> DeepgramSTTSettings:
"""Helper to build a store-mode DeepgramSTTSettings."""
defaults = dict(
model="nova-3-general",
language="en",
encoding="linear16",
channels=1,
interim_results=True,
@@ -333,52 +336,25 @@ class TestDeepgramSTTSettingsApplyUpdate:
profanity_filter=True,
vad_events=False,
)
defaults.update(lo_kwargs)
s = DeepgramSTTSettings(
model="nova-3-general",
language="en",
live_options=LiveOptions(**defaults),
)
return s
defaults.update(kwargs)
return DeepgramSTTSettings(**defaults)
def test_apply_update_merges_live_options_as_delta(self):
"""Only the given fields in the delta LiveOptions are merged."""
def test_apply_update_merges_flat_fields_as_delta(self):
"""Only the given fields in the delta are merged."""
current = self._make_store()
assert current.live_options.punctuate is True
assert current.punctuate is True
delta = DeepgramSTTSettings(live_options=LiveOptions(punctuate=False))
delta = DeepgramSTTSettings(punctuate=False)
changed = current.apply_update(delta)
assert current.live_options.punctuate is False
assert current.punctuate is False
assert "punctuate" in changed
# Other fields are untouched
assert current.live_options.encoding == "linear16"
assert current.live_options.channels == 1
assert current.encoding == "linear16"
assert current.channels == 1
def test_apply_update_syncs_model_from_live_options_to_top_level(self):
"""model inside live_options delta should sync to top-level model."""
current = self._make_store()
assert current.model == "nova-3-general"
delta = DeepgramSTTSettings(live_options=LiveOptions(model="nova-2"))
changed = current.apply_update(delta)
assert current.model == "nova-2"
assert "model" in changed
def test_apply_update_syncs_language_from_live_options_to_top_level(self):
"""language inside live_options delta should sync to top-level language."""
current = self._make_store()
assert current.language == "en"
delta = DeepgramSTTSettings(live_options=LiveOptions(language="es"))
changed = current.apply_update(delta)
assert current.language == "es"
assert "language" in changed
def test_apply_update_syncs_top_level_model_into_live_options(self):
"""Top-level model change should propagate into stored live_options."""
def test_apply_update_model(self):
"""model field is updated directly."""
current = self._make_store()
assert current.model == "nova-3-general"
@@ -386,86 +362,64 @@ class TestDeepgramSTTSettingsApplyUpdate:
changed = current.apply_update(delta)
assert current.model == "nova-2"
assert current.live_options.model == "nova-2"
assert "model" in changed
def test_apply_update_syncs_top_level_language_into_live_options(self):
"""Top-level language change should propagate into stored live_options."""
def test_apply_update_language(self):
"""language field is updated directly."""
current = self._make_store()
assert current.language == "en"
delta = DeepgramSTTSettings(language="fr")
delta = DeepgramSTTSettings(language="es")
changed = current.apply_update(delta)
assert current.language == "fr"
assert current.live_options.language == "fr"
assert current.language == "es"
assert "language" in changed
def test_apply_update_no_change(self):
"""Delta with same values should report no changes."""
current = self._make_store()
delta = DeepgramSTTSettings(live_options=LiveOptions(punctuate=True))
delta = DeepgramSTTSettings(punctuate=True)
changed = current.apply_update(delta)
assert changed == {}
def test_apply_update_top_level_model_takes_precedence_over_live_options(self):
"""When both top-level model and live_options.model are set, top-level wins."""
def test_apply_update_multiple_fields(self):
"""Multiple flat fields updated at once."""
current = self._make_store()
assert current.model == "nova-3-general"
delta = DeepgramSTTSettings(
model="nova-2",
live_options=LiveOptions(model="nova-3"),
)
delta = DeepgramSTTSettings(model="nova-2", language="fr", punctuate=False)
changed = current.apply_update(delta)
assert current.model == "nova-2"
assert current.live_options.model == "nova-2"
assert "model" in changed
def test_apply_update_top_level_language_takes_precedence_over_live_options(self):
"""When both top-level language and live_options.language are set, top-level wins."""
current = self._make_store()
assert current.language == "en"
delta = DeepgramSTTSettings(
language="fr",
live_options=LiveOptions(language="es"),
)
changed = current.apply_update(delta)
assert current.language == "fr"
assert current.live_options.language == "fr"
assert "language" in changed
assert current.punctuate is False
assert changed.keys() == {"model", "language", "punctuate"}
class TestDeepgramSTTSettingsFromMapping:
def test_routes_live_options_kwargs(self):
"""LiveOptions-valid keys should be collected into live_options."""
delta = DeepgramSTTSettings.from_mapping({"punctuate": False, "filler_words": True})
assert is_given(delta.live_options)
assert delta.live_options.punctuate is False
assert delta.live_options.filler_words is True
def test_known_flat_fields_mapped_directly(self):
"""Deepgram field names map directly to flat settings fields."""
delta = DeepgramSTTSettings.from_mapping({"punctuate": False, "diarize": True})
assert delta.punctuate is False
assert delta.diarize is True
def test_routes_model_and_language_to_top_level(self):
"""model and language should be top-level fields, not in live_options."""
def test_model_and_language_top_level(self):
"""model and language are top-level fields."""
delta = DeepgramSTTSettings.from_mapping({"model": "nova-2", "language": "es"})
assert delta.model == "nova-2"
assert delta.language == "es"
assert not is_given(delta.live_options)
def test_unknown_keys_go_to_extra(self):
"""Keys that aren't LiveOptions params or STT fields go to extra."""
"""Keys that aren't declared fields go to extra."""
delta = DeepgramSTTSettings.from_mapping({"unknown_param": 42})
assert delta.extra == {"unknown_param": 42}
assert not is_given(delta.live_options)
def test_mixed_keys(self):
"""model + LiveOptions keys + unknown keys are routed correctly."""
"""model + known Deepgram fields + unknown keys are routed correctly."""
delta = DeepgramSTTSettings.from_mapping(
{"model": "nova-2", "punctuate": False, "unknown": "val"}
)
assert delta.model == "nova-2"
assert delta.live_options.punctuate is False
assert delta.punctuate is False
assert delta.extra == {"unknown": "val"}
def test_roundtrip_from_mapping_apply_update(self):
@@ -473,33 +427,32 @@ class TestDeepgramSTTSettingsFromMapping:
current = DeepgramSTTSettings(
model="nova-3-general",
language="en",
live_options=LiveOptions(
encoding="linear16",
channels=1,
interim_results=True,
punctuate=True,
profanity_filter=True,
vad_events=False,
),
encoding="linear16",
channels=1,
interim_results=True,
punctuate=True,
profanity_filter=True,
vad_events=False,
)
raw = {"punctuate": False, "filler_words": True}
raw = {"punctuate": False, "diarize": True}
delta = DeepgramSTTSettings.from_mapping(raw)
changed = current.apply_update(delta)
assert current.live_options.punctuate is False
assert current.live_options.filler_words is True
assert current.punctuate is False
assert current.diarize is True
# Unchanged fields stay put
assert current.live_options.encoding == "linear16"
assert current.encoding == "linear16"
assert current.model == "nova-3-general"
assert "punctuate" in changed
def test_roundtrip_model_via_dict(self):
"""Dict update with model should change top-level and NOT create live_options."""
"""Dict update with model should change top-level model field."""
current = DeepgramSTTSettings(
model="nova-3-general",
language="en",
live_options=LiveOptions(encoding="linear16", channels=1),
encoding="linear16",
channels=1,
)
raw = {"model": "nova-2"}
@@ -507,26 +460,167 @@ class TestDeepgramSTTSettingsFromMapping:
changed = current.apply_update(delta)
assert current.model == "nova-2"
assert current.live_options.model == "nova-2"
assert "model" in changed
# ---------------------------------------------------------------------------
# DeepgramSageMakerSTTSettings: smoke test that the shared base is inherited
# DeepgramSageMakerSTTSettings: smoke test that flat base is inherited
# ---------------------------------------------------------------------------
class TestDeepgramSageMakerSTTSettings:
def test_inherits_live_options_behavior(self):
"""Smoke test: SageMaker settings inherit the shared base correctly."""
def test_inherits_flat_settings_behavior(self):
"""Smoke test: SageMaker settings inherit the flat base correctly."""
store = DeepgramSageMakerSTTSettings(
model="nova-3",
language="en",
live_options=LiveOptions(encoding="linear16", channels=1, punctuate=True),
encoding="linear16",
channels=1,
punctuate=True,
)
delta = DeepgramSageMakerSTTSettings(live_options=LiveOptions(punctuate=False))
delta = DeepgramSageMakerSTTSettings(punctuate=False)
changed = store.apply_update(delta)
assert store.live_options.punctuate is False
assert store.live_options.encoding == "linear16"
assert store.punctuate is False
assert store.encoding == "linear16"
assert "punctuate" in changed
# ---------------------------------------------------------------------------
# DeepgramSTTService: settings initialization with extra syncing
# ---------------------------------------------------------------------------
class TestDeepgramSTTSettingsExtraSync:
"""Test that settings.extra values are synced to declared fields at init time."""
def _make_service(self, **kwargs):
with patch("pipecat.services.deepgram.stt.AsyncDeepgramClient"):
return DeepgramSTTService(api_key="test-key", sample_rate=16000, **kwargs)
def test_extra_synced_to_declared_field_at_init(self):
"""If LiveOptions has unknown params in _extra, they can be synced if they match fields."""
from pipecat.services.deepgram.stt import LiveOptions
# Use **kwargs to pass undeclared params
live_options = LiveOptions(numerals=True) # 'numerals' goes into _extra
svc = self._make_service(live_options=live_options)
# 'numerals' doesn't match a declared DeepgramSTTSettings field,
# so it should stay in extra
assert svc._settings.extra["numerals"] is True
def test_declared_field_from_live_options(self):
"""LiveOptions fields that match DeepgramSTTSettings fields are applied."""
from pipecat.services.deepgram.stt import LiveOptions
live_options = LiveOptions(
punctuate=False,
diarize=True,
)
svc = self._make_service(live_options=live_options)
# These should be in the declared fields
assert svc._settings.punctuate is False
assert svc._settings.diarize is True
def test_sync_after_from_mapping_with_extra(self):
"""If we use from_mapping with keys matching declared fields, they sync."""
# Simulate a dict-style update with both declared and undeclared keys
raw_dict = {
"diarize": True, # matches declared field
"punctuate": False, # matches declared field
"numerals": True, # doesn't match - stays in extra
}
delta = DeepgramSTTSettings.from_mapping(raw_dict)
# After from_mapping, declared fields should be set
assert delta.diarize is True
assert delta.punctuate is False
# Unknown stays in extra
assert delta.extra["numerals"] is True
# Now simulate syncing (though from_mapping already routes correctly)
delta._sync_extra_to_fields()
# Still the same - from_mapping already put them in the right place
assert delta.diarize is True
assert delta.punctuate is False
assert delta.extra["numerals"] is True
def test_sync_promotes_extra_to_field_when_not_given(self):
"""_sync_extra_to_fields promotes extra dict entries to declared fields."""
settings = DeepgramSTTSettings()
# Manually populate extra with a key matching a declared field
settings.extra = {"diarize": True, "punctuate": False, "unknown": "value"}
# Before sync, fields are NOT_GIVEN
assert not is_given(settings.diarize)
assert not is_given(settings.punctuate)
# Sync it
settings._sync_extra_to_fields()
# Now the matching fields should be promoted
assert settings.diarize is True
assert settings.punctuate is False
# And removed from extra
assert "diarize" not in settings.extra
assert "punctuate" not in settings.extra
# Unknown stays
assert settings.extra["unknown"] == "value"
def test_sync_doesnt_overwrite_already_set_field(self):
"""If a field is already set, extra shouldn't overwrite it."""
settings = DeepgramSTTSettings(punctuate=True)
# Try to put a different value in extra
settings.extra = {"punctuate": False}
# Sync
settings._sync_extra_to_fields()
# The already-set field should win
assert settings.punctuate is True
# extra entry should still be removed to avoid confusion
assert "punctuate" not in settings.extra
def test_build_connect_kwargs_after_sync(self):
"""After syncing, _build_connect_kwargs should use the right values."""
from pipecat.services.deepgram.stt import LiveOptions
live_options = LiveOptions(
model="nova-2",
language="es",
punctuate=True,
diarize=False,
)
svc = self._make_service(live_options=live_options)
kwargs = svc._build_connect_kwargs()
# All should appear in connect kwargs
assert kwargs["model"] == "nova-2"
assert kwargs["language"] == "es"
assert kwargs["punctuate"] == "true"
assert kwargs["diarize"] == "false"
def test_unknown_params_stay_in_extra_and_appear_in_kwargs(self):
"""Unknown params (not matching fields) stay in extra and get forwarded."""
from pipecat.services.deepgram.stt import LiveOptions
# numerals isn't a declared field in DeepgramSTTSettings
live_options = LiveOptions(numerals=True, custom_param="test")
svc = self._make_service(live_options=live_options)
# Should be in extra
assert svc._settings.extra["numerals"] is True
assert svc._settings.extra["custom_param"] == "test"
# And forwarded to kwargs
kwargs = svc._build_connect_kwargs()
assert kwargs["numerals"] == "true"
assert kwargs["custom_param"] == "test"

36
uv.lock generated
View File

@@ -28,15 +28,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/5f/a0/d9ef19f780f319c21ee90ecfef4431cbeeca95bec7f14071785c17b6029b/accelerate-1.10.1-py3-none-any.whl", hash = "sha256:3621cff60b9a27ce798857ece05e2b9f56fcc71631cfb31ccf71f0359c311f11", size = 374909, upload-time = "2025-08-25T13:57:04.55Z" },
]
[[package]]
name = "aenum"
version = "3.1.16"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/09/7a/61ed58e8be9e30c3fe518899cc78c284896d246d51381bab59b5db11e1f3/aenum-3.1.16.tar.gz", hash = "sha256:bfaf9589bdb418ee3a986d85750c7318d9d2839c1b1a1d6fe8fc53ec201cf140", size = 137693, upload-time = "2026-01-12T22:34:38.819Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e3/52/6ad8f63ec8da1bf40f96996d25d5b650fdd38f5975f8c813732c47388f18/aenum-3.1.16-py3-none-any.whl", hash = "sha256:9035092855a98e41b66e3d0998bd7b96280e85ceb3a04cc035636138a1943eaf", size = 165627, upload-time = "2025-04-25T03:17:58.89Z" },
]
[[package]]
name = "aic-sdk"
version = "2.0.1"
@@ -1374,33 +1365,18 @@ wheels = [
[[package]]
name = "deepgram-sdk"
version = "4.7.0"
version = "6.0.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "aenum" },
{ name = "aiofiles" },
{ name = "aiohttp" },
{ name = "dataclasses-json" },
{ name = "deprecation" },
{ name = "httpx" },
{ name = "pydantic" },
{ name = "pydantic-core" },
{ name = "typing-extensions" },
{ name = "websockets" },
]
sdist = { url = "https://files.pythonhosted.org/packages/17/c7/3c5918c2c74e3d56cf3d738aa174bc688c73069dc9682fc1bfaeb2058cc6/deepgram_sdk-4.7.0.tar.gz", hash = "sha256:e371396d8835d449782df472c3bd501f6cad41b3c925f66771933ff3fc4b1a13", size = 100128, upload-time = "2025-07-21T15:43:56.705Z" }
sdist = { url = "https://files.pythonhosted.org/packages/73/46/6dc45de574d766a20853452d7beccf17cb0cfeb685a0f03460f1fe49b48e/deepgram_sdk-6.0.1.tar.gz", hash = "sha256:88558a43d6173a861c8b6d6491b9ee8805679fb09fb81ef51eeb6871dad77767", size = 176743, upload-time = "2026-02-24T13:52:17.163Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/33/63/43a6e46b35eae9739e22b5cace4a22ece76d4aff74b563563b9507411484/deepgram_sdk-4.7.0-py3-none-any.whl", hash = "sha256:1a2a0890aa43cbc510e07b0f911f6841770ca0222e6fcc069bd3e2afcde1c061", size = 157911, upload-time = "2025-07-21T15:43:55.695Z" },
]
[[package]]
name = "deprecation"
version = "2.1.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "packaging" },
]
sdist = { url = "https://files.pythonhosted.org/packages/5a/d3/8ae2869247df154b64c1884d7346d412fed0c49df84db635aab2d1c40e62/deprecation-2.1.0.tar.gz", hash = "sha256:72b3bde64e5d778694b0cf68178aed03d15e15477116add3fb773e581f9518ff", size = 173788, upload-time = "2020-04-20T14:23:38.738Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/02/c3/253a89ee03fc9b9682f1541728eb66db7db22148cd94f89ab22528cd1e1b/deprecation-2.1.0-py2.py3-none-any.whl", hash = "sha256:a10811591210e1fb0e768a8c25517cabeabcba6f0bf96564f8ff45189f90b14a", size = 11178, upload-time = "2020-04-20T14:23:36.581Z" },
{ url = "https://files.pythonhosted.org/packages/58/a4/53b9075816edc566694aed014d9864febedf232677b74f5d30bdde64b5de/deepgram_sdk-6.0.1-py3-none-any.whl", hash = "sha256:1b33d621b1c0b1d7a6a7b46fdc393aef4212e670521fada99764f5fb3f9d55fd", size = 490751, upload-time = "2026-02-24T13:52:15.998Z" },
]
[[package]]
@@ -4763,7 +4739,7 @@ requires-dist = [
{ name = "cartesia", marker = "extra == 'cartesia'", specifier = "~=2.0.3" },
{ name = "coremltools", marker = "extra == 'local-smart-turn'", specifier = ">=8.0" },
{ name = "daily-python", marker = "extra == 'daily'", specifier = "~=0.23.0" },
{ name = "deepgram-sdk", marker = "extra == 'deepgram'", specifier = "~=4.7.0" },
{ name = "deepgram-sdk", marker = "extra == 'deepgram'", specifier = "~=6.0.1" },
{ name = "docstring-parser", specifier = "~=0.16" },
{ name = "einops", marker = "extra == 'moondream'", specifier = "~=0.8.0" },
{ name = "fal-client", marker = "extra == 'fal'", specifier = "~=0.5.9" },