Creating a base class DeepgramFluxSTTBase so we can reuse Deepgram Flux logic.
This commit is contained in:
634
src/pipecat/services/deepgram/flux/base.py
Normal file
634
src/pipecat/services/deepgram/flux/base.py
Normal file
@@ -0,0 +1,634 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Deepgram Flux STT base class shared across transports (WebSocket, SageMaker, etc.)."""
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from abc import abstractmethod
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, Optional
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
InterimTranscriptionFrame,
|
||||
StartFrame,
|
||||
TranscriptionFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven
|
||||
from pipecat.services.stt_service import STTService
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.utils.time import time_now_iso8601
|
||||
from pipecat.utils.tracing.service_decorators import traced_stt
|
||||
|
||||
|
||||
class FluxMessageType(str, Enum):
|
||||
"""Deepgram Flux WebSocket message types.
|
||||
|
||||
These are the top-level message types that can be received from the
|
||||
Deepgram Flux WebSocket connection.
|
||||
"""
|
||||
|
||||
RECEIVE_CONNECTED = "Connected"
|
||||
RECEIVE_FATAL_ERROR = "Error"
|
||||
TURN_INFO = "TurnInfo"
|
||||
CONFIGURE_SUCCESS = "ConfigureSuccess"
|
||||
CONFIGURE_FAILURE = "ConfigureFailure"
|
||||
|
||||
|
||||
class FluxEventType(str, Enum):
|
||||
"""Deepgram Flux TurnInfo event types.
|
||||
|
||||
These events are contained within TurnInfo messages and indicate
|
||||
different stages of speech processing and turn detection.
|
||||
"""
|
||||
|
||||
START_OF_TURN = "StartOfTurn"
|
||||
TURN_RESUMED = "TurnResumed"
|
||||
END_OF_TURN = "EndOfTurn"
|
||||
EAGER_END_OF_TURN = "EagerEndOfTurn"
|
||||
UPDATE = "Update"
|
||||
|
||||
|
||||
@dataclass
|
||||
class DeepgramFluxSTTSettings(STTSettings):
|
||||
"""Settings for DeepgramFluxSTTService.
|
||||
|
||||
Parameters:
|
||||
eager_eot_threshold: EagerEndOfTurn/TurnResumed threshold. Off by default.
|
||||
Lower values = more aggressive (faster response, more LLM calls).
|
||||
Higher values = more conservative (slower response, fewer LLM calls).
|
||||
eot_threshold: End-of-turn confidence required to finish a turn (default 0.7).
|
||||
eot_timeout_ms: Time in ms after speech to finish a turn regardless of EOT
|
||||
confidence (default 5000).
|
||||
keyterm: Keyterms to boost recognition accuracy for specialized terminology.
|
||||
min_confidence: Minimum confidence required to create a TranscriptionFrame.
|
||||
"""
|
||||
|
||||
eager_eot_threshold: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
eot_threshold: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
eot_timeout_ms: int | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
keyterm: list | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
min_confidence: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
|
||||
|
||||
class DeepgramFluxSTTBase(STTService):
|
||||
"""Base class for Deepgram Flux STT services across transports.
|
||||
|
||||
Contains all shared Flux protocol logic (message handling, turn detection,
|
||||
metrics, settings). Concrete subclasses implement the transport layer by
|
||||
providing three abstract primitives: ``_transport_send_audio``,
|
||||
``_transport_send_json``, and ``_transport_is_active``.
|
||||
"""
|
||||
|
||||
Settings = DeepgramFluxSTTSettings
|
||||
_settings: Settings
|
||||
_CONFIGURE_FIELDS = {"keyterm", "eot_threshold", "eager_eot_threshold", "eot_timeout_ms"}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
encoding: str = "linear16",
|
||||
mip_opt_out: Optional[bool] = None,
|
||||
tag: Optional[list] = None,
|
||||
should_interrupt: bool = True,
|
||||
settings: Settings,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the Deepgram Flux STT base service.
|
||||
|
||||
Args:
|
||||
encoding: Audio encoding format. Must be "linear16".
|
||||
mip_opt_out: Opt out of the Deepgram Model Improvement Program.
|
||||
tag: Tags to label requests for identification during usage reporting.
|
||||
should_interrupt: Whether to interrupt the bot when Flux detects that
|
||||
the user is speaking.
|
||||
settings: Fully resolved settings instance (built by concrete subclass).
|
||||
**kwargs: Additional arguments passed to the parent STTService (e.g.
|
||||
``sample_rate``, ``reconnect_on_error``).
|
||||
"""
|
||||
super().__init__(settings=settings, **kwargs)
|
||||
|
||||
self._encoding = encoding
|
||||
self._mip_opt_out = mip_opt_out
|
||||
self._tag = tag or []
|
||||
self._should_interrupt = should_interrupt
|
||||
|
||||
# Connection readiness: Flux sends a "Connected" message when ready
|
||||
self._connection_established_event = asyncio.Event()
|
||||
|
||||
# Watchdog state — see _watchdog_task_handler for details
|
||||
self._last_stt_time: Optional[float] = None
|
||||
self._watchdog_task: Optional[asyncio.Task] = None
|
||||
self._user_is_speaking = False
|
||||
|
||||
# Flux event handlers
|
||||
self._register_event_handler("on_start_of_turn")
|
||||
self._register_event_handler("on_turn_resumed")
|
||||
self._register_event_handler("on_end_of_turn")
|
||||
self._register_event_handler("on_eager_end_of_turn")
|
||||
self._register_event_handler("on_update")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Abstract transport interface — implemented by each concrete subclass
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@abstractmethod
|
||||
async def _transport_send_audio(self, audio: bytes):
|
||||
"""Send raw audio bytes over the transport."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def _transport_send_json(self, message: dict):
|
||||
"""Serialize and send a JSON control message over the transport."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def _transport_is_active(self) -> bool:
|
||||
"""Return True if the transport connection is currently active."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def _connect(self):
|
||||
"""Establish the transport connection."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def _disconnect(self):
|
||||
"""Tear down the transport connection."""
|
||||
pass
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Connection helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _build_query_string(self) -> str:
|
||||
"""Build query string from current settings and init-only connection config."""
|
||||
params = [
|
||||
f"model={self._settings.model}",
|
||||
f"sample_rate={self.sample_rate}",
|
||||
f"encoding={self._encoding}",
|
||||
]
|
||||
|
||||
if self._settings.eager_eot_threshold is not None:
|
||||
params.append(f"eager_eot_threshold={self._settings.eager_eot_threshold}")
|
||||
|
||||
if self._settings.eot_threshold is not None:
|
||||
params.append(f"eot_threshold={self._settings.eot_threshold}")
|
||||
|
||||
if self._settings.eot_timeout_ms is not None:
|
||||
params.append(f"eot_timeout_ms={self._settings.eot_timeout_ms}")
|
||||
|
||||
if self._mip_opt_out is not None:
|
||||
params.append(f"mip_opt_out={str(self._mip_opt_out).lower()}")
|
||||
|
||||
# Add keyterm parameters (can have multiple)
|
||||
for keyterm in self._settings.keyterm:
|
||||
params.append(urlencode({"keyterm": keyterm}))
|
||||
|
||||
# Add tag parameters (can have multiple)
|
||||
for tag_value in self._tag:
|
||||
params.append(urlencode({"tag": tag_value}))
|
||||
|
||||
return "&".join(params)
|
||||
|
||||
async def _send_silence(self, duration_secs: float = 0.5):
|
||||
"""Send a block of silence of the specified duration (default 500 ms)."""
|
||||
sample_width = 2 # bytes per sample for 16-bit PCM
|
||||
num_channels = 1 # mono
|
||||
num_samples = int(self.sample_rate * duration_secs)
|
||||
silence = b"\x00" * (num_samples * sample_width * num_channels)
|
||||
await self._transport_send_audio(silence)
|
||||
|
||||
async def _watchdog_task_handler(self):
|
||||
"""Prevent dangling turns by sending silence when audio stops flowing.
|
||||
|
||||
If we stop sending audio to Flux after receiving a StartOfTurn,
|
||||
we never receive the UserStoppedSpeaking event unless we resume
|
||||
sending audio.
|
||||
"""
|
||||
while self._transport_is_active():
|
||||
now = time.monotonic()
|
||||
# More than 500 ms without sending new audio to Flux
|
||||
if self._user_is_speaking and self._last_stt_time and now - self._last_stt_time > 0.5:
|
||||
logger.warning("Sending silence to Flux to prevent dangling task")
|
||||
try:
|
||||
await self._send_silence()
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to send silence: {e}")
|
||||
self._last_stt_time = time.monotonic()
|
||||
# check every 100ms
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
async def _send_close_stream(self) -> None:
|
||||
"""Sends a CloseStream control message to Deepgram Flux.
|
||||
|
||||
This signals to the server that no more audio data will be sent.
|
||||
"""
|
||||
try:
|
||||
if self._transport_is_active():
|
||||
logger.debug("Sending CloseStream message to Deepgram Flux")
|
||||
await self._transport_send_json({"type": "CloseStream"})
|
||||
except Exception as e:
|
||||
await self.push_error(error_msg=f"Error sending CloseStream: {e}", exception=e)
|
||||
|
||||
async def _send_configure(self, fields: set[str]):
|
||||
"""Send a Configure control message to update settings mid-stream.
|
||||
|
||||
Builds a Configure JSON message containing only the fields that changed
|
||||
and sends it over the existing connection.
|
||||
|
||||
Args:
|
||||
fields: Set of changed field names to include in the message.
|
||||
"""
|
||||
message: dict[str, Any] = {"type": "Configure"}
|
||||
|
||||
if "keyterm" in fields:
|
||||
message["keyterms"] = self._settings.keyterm
|
||||
|
||||
thresholds: dict[str, Any] = {}
|
||||
if "eot_threshold" in fields:
|
||||
thresholds["eot_threshold"] = self._settings.eot_threshold
|
||||
if "eager_eot_threshold" in fields:
|
||||
thresholds["eager_eot_threshold"] = self._settings.eager_eot_threshold
|
||||
if "eot_timeout_ms" in fields:
|
||||
thresholds["eot_timeout_ms"] = self._settings.eot_timeout_ms
|
||||
if thresholds:
|
||||
message["thresholds"] = thresholds
|
||||
|
||||
logger.debug(f"{self}: sending Configure message: {message}")
|
||||
await self._transport_send_json(message)
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
"""Check if this service can generate processing metrics.
|
||||
|
||||
Returns:
|
||||
True, as Deepgram Flux service supports metrics generation.
|
||||
"""
|
||||
return True
|
||||
|
||||
async def _update_settings(self, delta: Settings) -> dict[str, Any]:
|
||||
"""Apply a settings delta.
|
||||
|
||||
Configure-able fields (keyterm, eot_threshold, eager_eot_threshold,
|
||||
eot_timeout_ms) are sent to Deepgram via a Configure message.
|
||||
Other fields are stored but cannot be applied to the active connection.
|
||||
"""
|
||||
changed = await super()._update_settings(delta)
|
||||
|
||||
if not changed:
|
||||
return changed
|
||||
|
||||
configure_fields = changed.keys() & self._CONFIGURE_FIELDS
|
||||
if configure_fields and self._transport_is_active():
|
||||
await self._send_configure(configure_fields)
|
||||
|
||||
self._warn_unhandled_updated_settings(changed.keys() - self._CONFIGURE_FIELDS)
|
||||
|
||||
return changed
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Lifecycle
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
"""Start the Deepgram Flux STT service.
|
||||
|
||||
Args:
|
||||
frame: The start frame containing initialization parameters and metadata.
|
||||
"""
|
||||
await super().start(frame)
|
||||
await self._connect()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
"""Stop the Deepgram Flux STT service.
|
||||
|
||||
Args:
|
||||
frame: The end frame.
|
||||
"""
|
||||
await super().stop(frame)
|
||||
await self._disconnect()
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
"""Cancel the Deepgram Flux STT service.
|
||||
|
||||
Args:
|
||||
frame: The cancel frame.
|
||||
"""
|
||||
await super().cancel(frame)
|
||||
await self._disconnect()
|
||||
|
||||
async def start_metrics(self):
|
||||
"""Start TTFB and processing metrics collection."""
|
||||
# TTFB (Time To First Byte) metrics are currently disabled for Deepgram Flux.
|
||||
# Ideally, TTFB should measure the time from when a user starts speaking
|
||||
# until we receive the first transcript. However, Deepgram Flux delivers
|
||||
# both the "user started speaking" event and the first transcript simultaneously,
|
||||
# making this timing measurement meaningless in this context.
|
||||
# await self.start_ttfb_metrics()
|
||||
await self.start_processing_metrics()
|
||||
|
||||
@traced_stt
|
||||
async def _handle_transcription(
|
||||
self, transcript: str, is_final: bool, language: Optional[Language] = None
|
||||
):
|
||||
"""Handle a transcription result with tracing."""
|
||||
pass
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Message handling
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _validate_message(self, data: Dict[str, Any]) -> bool:
|
||||
"""Validate basic message structure from Deepgram Flux.
|
||||
|
||||
Ensures the received message has the expected structure before processing.
|
||||
|
||||
Args:
|
||||
data: The parsed JSON message data to validate.
|
||||
|
||||
Returns:
|
||||
True if the message structure is valid, False otherwise.
|
||||
"""
|
||||
if not isinstance(data, dict):
|
||||
logger.warning("Message is not a dictionary")
|
||||
return False
|
||||
|
||||
if "type" not in data:
|
||||
logger.warning("Message missing 'type' field")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
async def _handle_message(self, data: Dict[str, Any]):
|
||||
"""Handle a parsed message from Deepgram Flux.
|
||||
|
||||
Routes messages to appropriate handlers based on their type. Validates
|
||||
message structure before processing.
|
||||
|
||||
Args:
|
||||
data: The parsed JSON message data.
|
||||
"""
|
||||
if not self._validate_message(data):
|
||||
return
|
||||
|
||||
message_type = data.get("type")
|
||||
|
||||
try:
|
||||
flux_message_type = FluxMessageType(message_type)
|
||||
except ValueError:
|
||||
logger.debug(f"Unhandled message type: {message_type or 'unknown'}")
|
||||
return
|
||||
|
||||
match flux_message_type:
|
||||
case FluxMessageType.RECEIVE_CONNECTED:
|
||||
await self._handle_connection_established()
|
||||
case FluxMessageType.RECEIVE_FATAL_ERROR:
|
||||
await self._handle_fatal_error(data)
|
||||
case FluxMessageType.TURN_INFO:
|
||||
await self._handle_turn_info(data)
|
||||
case FluxMessageType.CONFIGURE_SUCCESS:
|
||||
logger.info(f"{self}: Configure accepted: {data}")
|
||||
case FluxMessageType.CONFIGURE_FAILURE:
|
||||
error_code = data.get("error_code", "unknown")
|
||||
description = data.get("description", "no description")
|
||||
error_msg = f"Configure rejected: [{error_code}] {description}"
|
||||
logger.warning(f"{self}: {error_msg}")
|
||||
await self.push_error(error_msg=error_msg)
|
||||
|
||||
async def _handle_connection_established(self):
|
||||
"""Handle successful connection establishment to Deepgram Flux.
|
||||
|
||||
This event is fired when the connection to Deepgram Flux is successfully
|
||||
established and ready to receive audio data for transcription processing.
|
||||
"""
|
||||
logger.info("Connected to Flux - ready to stream audio")
|
||||
# Notify connection is established
|
||||
self._connection_established_event.set()
|
||||
|
||||
async def _handle_fatal_error(self, data: Dict[str, Any]):
|
||||
"""Handle fatal error messages from Deepgram Flux.
|
||||
|
||||
Fatal errors indicate unrecoverable issues with the connection or
|
||||
configuration that require intervention. These errors will cause
|
||||
the connection to be terminated.
|
||||
|
||||
Args:
|
||||
data: The error message data containing error details.
|
||||
|
||||
Raises:
|
||||
Exception: Always raises to trigger error handling in the transport layer.
|
||||
"""
|
||||
error_msg = data.get("error", "Unknown error")
|
||||
deepgram_error = f"Fatal error: {error_msg}"
|
||||
logger.error(deepgram_error)
|
||||
# Error will be handled by the transport's receive loop error handler
|
||||
raise Exception(deepgram_error)
|
||||
|
||||
async def _handle_turn_info(self, data: Dict[str, Any]):
|
||||
"""Handle TurnInfo events from Deepgram Flux.
|
||||
|
||||
TurnInfo messages contain various turn-based events that indicate
|
||||
the state of speech processing, including turn boundaries, interim
|
||||
results, and turn finalization events.
|
||||
|
||||
Args:
|
||||
data: The TurnInfo message data containing event type, transcript and some extra metadata.
|
||||
"""
|
||||
event = data.get("event")
|
||||
transcript = data.get("transcript", "")
|
||||
|
||||
try:
|
||||
flux_event_type = FluxEventType(event)
|
||||
except ValueError:
|
||||
logger.debug(f"Unhandled TurnInfo event: {event}")
|
||||
return
|
||||
|
||||
match flux_event_type:
|
||||
case FluxEventType.START_OF_TURN:
|
||||
await self._handle_start_of_turn(transcript)
|
||||
case FluxEventType.TURN_RESUMED:
|
||||
await self._handle_turn_resumed(event)
|
||||
case FluxEventType.END_OF_TURN:
|
||||
await self._handle_end_of_turn(transcript, data)
|
||||
case FluxEventType.EAGER_END_OF_TURN:
|
||||
await self._handle_eager_end_of_turn(transcript, data)
|
||||
case FluxEventType.UPDATE:
|
||||
await self._handle_update(transcript)
|
||||
|
||||
async def _handle_start_of_turn(self, transcript: str):
|
||||
"""Handle StartOfTurn events from Deepgram Flux.
|
||||
|
||||
StartOfTurn events are fired when Deepgram Flux detects the beginning
|
||||
of a new speaking turn. This triggers bot interruption to stop any
|
||||
ongoing speech synthesis and signals the start of user speech detection.
|
||||
|
||||
The service will:
|
||||
- Send a BotInterruptionFrame upstream to stop bot speech
|
||||
- Send a UserStartedSpeakingFrame downstream to notify other components
|
||||
- Start metrics collection for measuring response times
|
||||
|
||||
Args:
|
||||
transcript: maybe the first few words of the turn.
|
||||
"""
|
||||
logger.debug("User started speaking")
|
||||
self._user_is_speaking = True
|
||||
await self.broadcast_frame(UserStartedSpeakingFrame)
|
||||
if self._should_interrupt:
|
||||
await self.broadcast_interruption()
|
||||
await self.start_metrics()
|
||||
await self._call_event_handler("on_start_of_turn", transcript)
|
||||
if transcript:
|
||||
logger.trace(f"Start of turn transcript: {transcript}")
|
||||
|
||||
async def _handle_turn_resumed(self, event: str):
|
||||
"""Handle TurnResumed events from Deepgram Flux.
|
||||
|
||||
TurnResumed events indicate that speech has resumed after a brief pause
|
||||
within the same turn. This is primarily used for logging and debugging
|
||||
purposes and doesn't trigger any significant processing changes.
|
||||
|
||||
Args:
|
||||
event: The event type string for logging purposes.
|
||||
"""
|
||||
logger.trace(f"Received event TurnResumed: {event}")
|
||||
await self._call_event_handler("on_turn_resumed")
|
||||
|
||||
def _calculate_average_confidence(self, transcript_data) -> Optional[float]:
|
||||
"""Calculate the average confidence from transcript data.
|
||||
|
||||
Return None if the data is missing or invalid.
|
||||
"""
|
||||
# Example: Assume transcript_data has a list of words with confidence
|
||||
words = transcript_data.get("words")
|
||||
if not words or not isinstance(words, list):
|
||||
return None
|
||||
confidences = [
|
||||
w.get("confidence") for w in words if isinstance(w.get("confidence"), (float, int))
|
||||
]
|
||||
if not confidences:
|
||||
return None
|
||||
return sum(confidences) / len(confidences)
|
||||
|
||||
async def _handle_end_of_turn(self, transcript: str, data: Dict[str, Any]):
|
||||
"""Handle EndOfTurn events from Deepgram Flux.
|
||||
|
||||
EndOfTurn events are fired when Deepgram Flux determines that a speaking
|
||||
turn has concluded, either due to sufficient silence or end-of-turn
|
||||
confidence thresholds being met. This provides the final transcript
|
||||
for the completed turn.
|
||||
|
||||
The service will:
|
||||
- Create and send a final TranscriptionFrame with the complete transcript
|
||||
- Trigger transcription handling with tracing for metrics
|
||||
- Stop processing metrics collection
|
||||
- Send a UserStoppedSpeakingFrame to signal turn completion
|
||||
|
||||
Args:
|
||||
transcript: The final transcript text for the completed turn.
|
||||
data: The TurnInfo message data containing event type, transcript and some extra metadata.
|
||||
"""
|
||||
logger.debug("User stopped speaking")
|
||||
self._user_is_speaking = False
|
||||
|
||||
# Compute the average confidence
|
||||
average_confidence = self._calculate_average_confidence(data)
|
||||
|
||||
if not self._settings.min_confidence or average_confidence > self._settings.min_confidence:
|
||||
# EndOfTurn means Flux has determined the turn is complete,
|
||||
# so this TranscriptionFrame is always finalized
|
||||
await self.push_frame(
|
||||
TranscriptionFrame(
|
||||
transcript,
|
||||
self._user_id,
|
||||
time_now_iso8601(),
|
||||
self._settings.language,
|
||||
result=data,
|
||||
finalized=True,
|
||||
)
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"Transcription confidence below min_confidence threshold: {average_confidence}"
|
||||
)
|
||||
|
||||
await self._handle_transcription(transcript, True, self._settings.language)
|
||||
await self.stop_processing_metrics()
|
||||
await self.broadcast_frame(UserStoppedSpeakingFrame)
|
||||
await self._call_event_handler("on_end_of_turn", transcript)
|
||||
|
||||
async def _handle_eager_end_of_turn(self, transcript: str, data: Dict[str, Any]):
|
||||
"""Handle EagerEndOfTurn events from Deepgram Flux.
|
||||
|
||||
EagerEndOfTurn events are fired when the end-of-turn confidence reaches the
|
||||
EagerEndOfTurn threshold but hasn't yet reached the full end-of-turn threshold.
|
||||
These provide interim transcripts that can be used for faster response
|
||||
generation while still allowing the user to continue speaking.
|
||||
|
||||
EagerEndOfTurn events enable more responsive conversational AI by allowing
|
||||
the LLM to start processing likely final transcripts before the turn
|
||||
is definitively ended.
|
||||
|
||||
Args:
|
||||
transcript: The interim transcript text that triggered the EagerEndOfTurn event.
|
||||
data: The TurnInfo message data containing event type, transcript and some extra metadata.
|
||||
"""
|
||||
logger.trace(f"EagerEndOfTurn - {transcript}")
|
||||
# Deepgram's EagerEndOfTurn feature enables lower-latency voice agents by sending
|
||||
# medium-confidence transcripts before EndOfTurn certainty, allowing LLM processing to
|
||||
# begin early.
|
||||
#
|
||||
# However, if speech resumes or the transcripts differ from the final EndOfTurn, the
|
||||
# EagerEndOfTurn response should be cancelled to avoid incorrect or partial responses.
|
||||
#
|
||||
# Pipecat doesn't yet provide built-in Gate/control mechanisms to:
|
||||
# 1. Start LLM/TTS processing early on EagerEndOfTurn events
|
||||
# 2. Cancel in-flight processing when TurnResumed occurs
|
||||
#
|
||||
# By pushing EagerEndOfTurn transcripts as InterimTranscriptionFrame, we enable
|
||||
# developers to implement custom EagerEndOfTurn handling in their applications while
|
||||
# maintaining compatibility with existing interim transcription workflows.
|
||||
#
|
||||
# TODO: Implement proper EagerEndOfTurn support with cancellable processing pipeline
|
||||
# that can start response generation on EagerEndOfTurn and cancel or confirm it.
|
||||
await self.push_frame(
|
||||
InterimTranscriptionFrame(
|
||||
transcript,
|
||||
self._user_id,
|
||||
time_now_iso8601(),
|
||||
self._settings.language,
|
||||
result=data,
|
||||
)
|
||||
)
|
||||
await self._call_event_handler("on_eager_end_of_turn", transcript)
|
||||
|
||||
async def _handle_update(self, transcript: str):
|
||||
"""Handle Update events from Deepgram Flux.
|
||||
|
||||
Update events provide incremental transcript updates during an ongoing
|
||||
turn. These events allow for real-time display of transcription progress
|
||||
and can be used to provide visual feedback to users about what's being
|
||||
recognized.
|
||||
|
||||
Args:
|
||||
transcript: The current partial transcript text for the ongoing turn.
|
||||
"""
|
||||
if transcript:
|
||||
logger.trace(f"Update event: {transcript}")
|
||||
# TTFB (Time To First Byte) metrics are currently disabled for Deepgram Flux.
|
||||
# Ideally, TTFB should measure the time from when a user starts speaking
|
||||
# until we receive the first transcript. However, Deepgram Flux delivers
|
||||
# both the "user started speaking" event and the first transcript simultaneously,
|
||||
# making this timing measurement meaningless in this context.
|
||||
# await self.stop_ttfb_metrics()
|
||||
await self._call_event_handler("on_update", transcript)
|
||||
@@ -4,35 +4,28 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Deepgram Flux speech-to-text service implementation."""
|
||||
"""Deepgram Flux speech-to-text service implementation (WebSocket transport)."""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Any, AsyncGenerator, Dict, Optional
|
||||
from urllib.parse import urlencode
|
||||
from typing import AsyncGenerator, Optional
|
||||
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
InterimTranscriptionFrame,
|
||||
StartFrame,
|
||||
TranscriptionFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven
|
||||
from pipecat.services.stt_service import WebsocketSTTService
|
||||
from pipecat.services.deepgram.flux.base import (
|
||||
DeepgramFluxSTTBase,
|
||||
DeepgramFluxSTTSettings,
|
||||
FluxEventType,
|
||||
FluxMessageType,
|
||||
)
|
||||
from pipecat.services.websocket_service import WebsocketService
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.utils.time import time_now_iso8601
|
||||
from pipecat.utils.tracing.service_decorators import traced_stt
|
||||
|
||||
try:
|
||||
from websockets.asyncio.client import connect as websocket_connect
|
||||
@@ -42,65 +35,23 @@ except ModuleNotFoundError as e:
|
||||
logger.error("In order to use Deepgram Flux, you need to `pip install pipecat-ai[deepgram]`.")
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class FluxMessageType(str, Enum):
|
||||
"""Deepgram Flux WebSocket message types.
|
||||
|
||||
These are the top-level message types that can be received from the
|
||||
Deepgram Flux WebSocket connection.
|
||||
"""
|
||||
|
||||
RECEIVE_CONNECTED = "Connected"
|
||||
RECEIVE_FATAL_ERROR = "Error"
|
||||
TURN_INFO = "TurnInfo"
|
||||
CONFIGURE_SUCCESS = "ConfigureSuccess"
|
||||
CONFIGURE_FAILURE = "ConfigureFailure"
|
||||
# Re-export for backward compatibility
|
||||
__all__ = [
|
||||
"DeepgramFluxSTTService",
|
||||
"DeepgramFluxSTTSettings",
|
||||
"FluxEventType",
|
||||
"FluxMessageType",
|
||||
]
|
||||
|
||||
|
||||
class FluxEventType(str, Enum):
|
||||
"""Deepgram Flux TurnInfo event types.
|
||||
|
||||
These events are contained within TurnInfo messages and indicate
|
||||
different stages of speech processing and turn detection.
|
||||
"""
|
||||
|
||||
START_OF_TURN = "StartOfTurn"
|
||||
TURN_RESUMED = "TurnResumed"
|
||||
END_OF_TURN = "EndOfTurn"
|
||||
EAGER_END_OF_TURN = "EagerEndOfTurn"
|
||||
UPDATE = "Update"
|
||||
|
||||
|
||||
@dataclass
|
||||
class DeepgramFluxSTTSettings(STTSettings):
|
||||
"""Settings for DeepgramFluxSTTService.
|
||||
|
||||
Parameters:
|
||||
eager_eot_threshold: EagerEndOfTurn/TurnResumed threshold. Off by default.
|
||||
Lower values = more aggressive (faster response, more LLM calls).
|
||||
Higher values = more conservative (slower response, fewer LLM calls).
|
||||
eot_threshold: End-of-turn confidence required to finish a turn (default 0.7).
|
||||
eot_timeout_ms: Time in ms after speech to finish a turn regardless of EOT
|
||||
confidence (default 5000).
|
||||
keyterm: Keyterms to boost recognition accuracy for specialized terminology.
|
||||
min_confidence: Minimum confidence required to create a TranscriptionFrame.
|
||||
"""
|
||||
|
||||
eager_eot_threshold: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
eot_threshold: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
eot_timeout_ms: int | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
keyterm: list | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
min_confidence: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
|
||||
|
||||
|
||||
class DeepgramFluxSTTService(WebsocketSTTService):
|
||||
class DeepgramFluxSTTService(DeepgramFluxSTTBase, WebsocketService):
|
||||
"""Deepgram Flux speech-to-text service.
|
||||
|
||||
Provides real-time speech recognition using Deepgram's WebSocket API with Flux capabilities.
|
||||
Supports configurable models, VAD events, and various audio processing options
|
||||
including advanced turn detection and EagerEndOfTurn events for improved conversational AI performance.
|
||||
|
||||
Event handlers available (in addition to WebsocketSTTService events):
|
||||
Event handlers available (in addition to base events):
|
||||
|
||||
- on_speech_started(service): Deepgram detected start of speech
|
||||
- on_utterance_end(service): Deepgram detected end of utterance
|
||||
@@ -117,7 +68,6 @@ class DeepgramFluxSTTService(WebsocketSTTService):
|
||||
|
||||
Settings = DeepgramFluxSTTSettings
|
||||
_settings: Settings
|
||||
_CONFIGURE_FIELDS = {"keyterm", "eot_threshold", "eager_eot_threshold", "eot_timeout_ms"}
|
||||
|
||||
class InputParams(BaseModel):
|
||||
"""Configuration parameters for Deepgram Flux API.
|
||||
@@ -189,7 +139,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
|
||||
should_interrupt: Determine whether the bot should be interrupted when Flux detects that the user is speaking.
|
||||
settings: Runtime-updatable settings. When provided alongside deprecated
|
||||
parameters, ``settings`` values take precedence.
|
||||
**kwargs: Additional arguments passed to the parent WebsocketSTTService class.
|
||||
**kwargs: Additional arguments passed to the parent classes.
|
||||
|
||||
Examples:
|
||||
Basic usage with default parameters::
|
||||
@@ -254,34 +204,39 @@ class DeepgramFluxSTTService(WebsocketSTTService):
|
||||
if settings is not None:
|
||||
default_settings.apply_update(settings)
|
||||
|
||||
super().__init__(
|
||||
sample_rate=sample_rate,
|
||||
reconnect_on_error=False,
|
||||
DeepgramFluxSTTBase.__init__(
|
||||
self,
|
||||
encoding=flux_encoding,
|
||||
mip_opt_out=mip_opt_out,
|
||||
tag=tag,
|
||||
should_interrupt=should_interrupt,
|
||||
settings=default_settings,
|
||||
sample_rate=sample_rate,
|
||||
**kwargs,
|
||||
)
|
||||
WebsocketService.__init__(self, reconnect_on_error=False)
|
||||
|
||||
self._api_key = api_key
|
||||
self._url = url
|
||||
self._should_interrupt = should_interrupt
|
||||
self._encoding = flux_encoding
|
||||
self._mip_opt_out = mip_opt_out
|
||||
self._tag = tag or []
|
||||
self._websocket_url = None
|
||||
self._receive_task = None
|
||||
|
||||
# Flux event handlers
|
||||
self._register_event_handler("on_start_of_turn")
|
||||
self._register_event_handler("on_turn_resumed")
|
||||
self._register_event_handler("on_end_of_turn")
|
||||
self._register_event_handler("on_eager_end_of_turn")
|
||||
self._register_event_handler("on_update")
|
||||
self._connection_established_event = asyncio.Event()
|
||||
# Watchdog task to prevent dangling tasks
|
||||
# If we stop sending audio to Flux after we have received that the User has started speaking
|
||||
# we never receive the user stopped speaking event unless we resume sending audio to it.
|
||||
self._last_stt_time = None
|
||||
self._watchdog_task = None
|
||||
self._user_is_speaking = False
|
||||
# ------------------------------------------------------------------
|
||||
# Transport interface implementation
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _transport_send_audio(self, audio: bytes):
|
||||
await self._websocket.send(audio)
|
||||
|
||||
async def _transport_send_json(self, message: dict):
|
||||
await self._websocket.send(json.dumps(message))
|
||||
|
||||
def _transport_is_active(self) -> bool:
|
||||
return self._websocket is not None and self._websocket.state is State.OPEN
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Connection management
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _connect(self):
|
||||
"""Connect to WebSocket and start background tasks.
|
||||
@@ -290,7 +245,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
|
||||
the background task for receiving transcription results.
|
||||
"""
|
||||
await super()._connect()
|
||||
|
||||
self._websocket_url = f"{self._url}?{self._build_query_string()}"
|
||||
await self._connect_websocket()
|
||||
|
||||
async def _disconnect(self):
|
||||
@@ -309,25 +264,6 @@ class DeepgramFluxSTTService(WebsocketSTTService):
|
||||
# Reset state only after everything is cleaned up
|
||||
self._websocket = None
|
||||
|
||||
async def _send_silence(self, duration_secs: float = 0.5):
|
||||
"""Send a block of silence of the specified duration (default 500 ms)."""
|
||||
sample_width = 2 # bytes per sample for 16-bit PCM
|
||||
num_channels = 1 # mono
|
||||
num_samples = int(self.sample_rate * duration_secs)
|
||||
silence = b"\x00" * (num_samples * sample_width * num_channels)
|
||||
await self._websocket.send(silence)
|
||||
|
||||
async def _watchdog_task_handler(self):
|
||||
while self._websocket and self._websocket.state is State.OPEN:
|
||||
now = time.monotonic()
|
||||
# More than 500 ms without sending new audio to Flux
|
||||
if self._user_is_speaking and self._last_stt_time and now - self._last_stt_time > 0.5:
|
||||
logger.warning("Sending silence to Flux to prevent dangling task")
|
||||
await self._send_silence()
|
||||
self._last_stt_time = time.monotonic()
|
||||
# check every 100ms
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
async def _connect_websocket(self):
|
||||
"""Establish WebSocket connection to API.
|
||||
|
||||
@@ -400,134 +336,9 @@ class DeepgramFluxSTTService(WebsocketSTTService):
|
||||
self._websocket = None
|
||||
await self._call_event_handler("on_disconnected")
|
||||
|
||||
async def _send_close_stream(self) -> None:
|
||||
"""Sends a CloseStream control message to the Deepgram Flux WebSocket API.
|
||||
|
||||
This signals to the server that no more audio data will be sent.
|
||||
"""
|
||||
try:
|
||||
if self._websocket:
|
||||
logger.debug("Sending CloseStream message to Deepgram Flux")
|
||||
message = {"type": "CloseStream"}
|
||||
await self._websocket.send(json.dumps(message))
|
||||
except Exception as e:
|
||||
await self.push_error(error_msg=f"Error sending closeStream: {e}", exception=e)
|
||||
|
||||
async def _send_configure(self, fields: set[str]):
|
||||
"""Send a Configure control message to update settings mid-stream.
|
||||
|
||||
Builds a Configure JSON message containing only the fields that changed
|
||||
and sends it over the existing WebSocket connection.
|
||||
|
||||
Args:
|
||||
fields: Set of changed field names to include in the message.
|
||||
"""
|
||||
message: dict[str, Any] = {"type": "Configure"}
|
||||
|
||||
if "keyterm" in fields:
|
||||
message["keyterms"] = self._settings.keyterm
|
||||
|
||||
thresholds: dict[str, Any] = {}
|
||||
if "eot_threshold" in fields:
|
||||
thresholds["eot_threshold"] = self._settings.eot_threshold
|
||||
if "eager_eot_threshold" in fields:
|
||||
thresholds["eager_eot_threshold"] = self._settings.eager_eot_threshold
|
||||
if "eot_timeout_ms" in fields:
|
||||
thresholds["eot_timeout_ms"] = self._settings.eot_timeout_ms
|
||||
if thresholds:
|
||||
message["thresholds"] = thresholds
|
||||
|
||||
logger.debug(f"{self}: sending Configure message: {message}")
|
||||
await self._websocket.send(json.dumps(message))
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
"""Check if this service can generate processing metrics.
|
||||
|
||||
Returns:
|
||||
True, as Deepgram service supports metrics generation.
|
||||
"""
|
||||
return True
|
||||
|
||||
async def _update_settings(self, delta: Settings) -> dict[str, Any]:
|
||||
"""Apply a settings delta.
|
||||
|
||||
Configure-able fields (keyterm, eot_threshold, eager_eot_threshold,
|
||||
eot_timeout_ms) are sent to Deepgram via a Configure WebSocket message.
|
||||
Other fields are stored but cannot be applied to the active connection.
|
||||
"""
|
||||
changed = await super()._update_settings(delta)
|
||||
|
||||
if not changed:
|
||||
return changed
|
||||
|
||||
configure_fields = changed.keys() & self._CONFIGURE_FIELDS
|
||||
if configure_fields and self._websocket and self._websocket.state is State.OPEN:
|
||||
await self._send_configure(configure_fields)
|
||||
|
||||
self._warn_unhandled_updated_settings(changed.keys() - self._CONFIGURE_FIELDS)
|
||||
|
||||
return changed
|
||||
|
||||
def _build_query_string(self) -> str:
|
||||
"""Build query string from current settings and init-only connection config."""
|
||||
params = [
|
||||
f"model={self._settings.model}",
|
||||
f"sample_rate={self.sample_rate}",
|
||||
f"encoding={self._encoding}",
|
||||
]
|
||||
|
||||
if self._settings.eager_eot_threshold is not None:
|
||||
params.append(f"eager_eot_threshold={self._settings.eager_eot_threshold}")
|
||||
|
||||
if self._settings.eot_threshold is not None:
|
||||
params.append(f"eot_threshold={self._settings.eot_threshold}")
|
||||
|
||||
if self._settings.eot_timeout_ms is not None:
|
||||
params.append(f"eot_timeout_ms={self._settings.eot_timeout_ms}")
|
||||
|
||||
if self._mip_opt_out is not None:
|
||||
params.append(f"mip_opt_out={str(self._mip_opt_out).lower()}")
|
||||
|
||||
# Add keyterm parameters (can have multiple)
|
||||
for keyterm in self._settings.keyterm:
|
||||
params.append(urlencode({"keyterm": keyterm}))
|
||||
|
||||
# Add tag parameters (can have multiple)
|
||||
for tag_value in self._tag:
|
||||
params.append(urlencode({"tag": tag_value}))
|
||||
|
||||
return "&".join(params)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
"""Start the Deepgram Flux STT service.
|
||||
|
||||
Initializes the service by constructing the WebSocket URL with all configured
|
||||
parameters and establishing the connection to begin transcription processing.
|
||||
|
||||
Args:
|
||||
frame: The start frame containing initialization parameters and metadata.
|
||||
"""
|
||||
await super().start(frame)
|
||||
self._websocket_url = f"{self._url}?{self._build_query_string()}"
|
||||
await self._connect()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
"""Stop the Deepgram Flux STT service.
|
||||
|
||||
Args:
|
||||
frame: The end frame.
|
||||
"""
|
||||
await super().stop(frame)
|
||||
await self._disconnect()
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
"""Cancel the Deepgram Flux STT service.
|
||||
|
||||
Args:
|
||||
frame: The cancel frame.
|
||||
"""
|
||||
await super().cancel(frame)
|
||||
await self._disconnect()
|
||||
# ------------------------------------------------------------------
|
||||
# Audio sending and receiving
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
|
||||
"""Send audio data to Deepgram Flux for transcription.
|
||||
@@ -559,23 +370,6 @@ class DeepgramFluxSTTService(WebsocketSTTService):
|
||||
|
||||
yield None
|
||||
|
||||
async def start_metrics(self):
|
||||
"""Start TTFB and processing metrics collection."""
|
||||
# TTFB (Time To First Byte) metrics are currently disabled for Deepgram Flux.
|
||||
# Ideally, TTFB should measure the time from when a user starts speaking
|
||||
# until we receive the first transcript. However, Deepgram Flux delivers
|
||||
# both the "user started speaking" event and the first transcript simultaneously,
|
||||
# making this timing measurement meaningless in this context.
|
||||
# await self.start_ttfb_metrics()
|
||||
await self.start_processing_metrics()
|
||||
|
||||
@traced_stt
|
||||
async def _handle_transcription(
|
||||
self, transcript: str, is_final: bool, language: Optional[Language] = None
|
||||
):
|
||||
"""Handle a transcription result with tracing."""
|
||||
pass
|
||||
|
||||
def _get_websocket(self):
|
||||
"""Get the current WebSocket connection.
|
||||
|
||||
@@ -592,27 +386,6 @@ class DeepgramFluxSTTService(WebsocketSTTService):
|
||||
return self._websocket
|
||||
raise Exception("Websocket not connected")
|
||||
|
||||
def _validate_message(self, data: Dict[str, Any]) -> bool:
|
||||
"""Validate basic message structure from Deepgram Flux.
|
||||
|
||||
Ensures the received message has the expected structure before processing.
|
||||
|
||||
Args:
|
||||
data: The parsed JSON message data to validate.
|
||||
|
||||
Returns:
|
||||
True if the message structure is valid, False otherwise.
|
||||
"""
|
||||
if not isinstance(data, dict):
|
||||
logger.warning("Message is not a dictionary")
|
||||
return False
|
||||
|
||||
if "type" not in data:
|
||||
logger.warning("Message missing 'type' field")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
async def _receive_messages(self):
|
||||
"""Receive and process messages from WebSocket.
|
||||
|
||||
@@ -637,269 +410,6 @@ class DeepgramFluxSTTService(WebsocketSTTService):
|
||||
else:
|
||||
logger.warning(f"Received non-string message: {type(message)}")
|
||||
|
||||
async def _handle_message(self, data: Dict[str, Any]):
|
||||
"""Handle a parsed WebSocket message from Deepgram Flux.
|
||||
|
||||
Routes messages to appropriate handlers based on their type. Validates
|
||||
message structure before processing.
|
||||
|
||||
Args:
|
||||
data: The parsed JSON message data from the WebSocket.
|
||||
"""
|
||||
if not self._validate_message(data):
|
||||
return
|
||||
|
||||
message_type = data.get("type")
|
||||
|
||||
try:
|
||||
flux_message_type = FluxMessageType(message_type)
|
||||
except ValueError:
|
||||
logger.debug(f"Unhandled message type: {message_type or 'unknown'}")
|
||||
return
|
||||
|
||||
match flux_message_type:
|
||||
case FluxMessageType.RECEIVE_CONNECTED:
|
||||
await self._handle_connection_established()
|
||||
case FluxMessageType.RECEIVE_FATAL_ERROR:
|
||||
await self._handle_fatal_error(data)
|
||||
case FluxMessageType.TURN_INFO:
|
||||
await self._handle_turn_info(data)
|
||||
case FluxMessageType.CONFIGURE_SUCCESS:
|
||||
logger.info(f"{self}: Configure accepted: {data}")
|
||||
case FluxMessageType.CONFIGURE_FAILURE:
|
||||
error_code = data.get("error_code", "unknown")
|
||||
description = data.get("description", "no description")
|
||||
error_msg = f"Configure rejected: [{error_code}] {description}"
|
||||
logger.warning(f"{self}: {error_msg}")
|
||||
await self.push_error(error_msg=error_msg)
|
||||
|
||||
async def _handle_connection_established(self):
|
||||
"""Handle successful connection establishment to Deepgram Flux.
|
||||
|
||||
This event is fired when the WebSocket connection to Deepgram Flux
|
||||
is successfully established and ready to receive audio data for
|
||||
transcription processing.
|
||||
"""
|
||||
logger.info("Connected to Flux - ready to stream audio")
|
||||
# Notify connection is established
|
||||
self._connection_established_event.set()
|
||||
|
||||
async def _handle_fatal_error(self, data: Dict[str, Any]):
|
||||
"""Handle fatal error messages from Deepgram Flux.
|
||||
|
||||
Fatal errors indicate unrecoverable issues with the connection or
|
||||
configuration that require intervention. These errors will cause
|
||||
the connection to be terminated.
|
||||
|
||||
Args:
|
||||
data: The error message data containing error details.
|
||||
|
||||
Raises:
|
||||
Exception: Always raises to trigger error handling in the parent service.
|
||||
"""
|
||||
error_msg = data.get("error", "Unknown error")
|
||||
deepgram_error = f"Fatal error: {error_msg}"
|
||||
logger.error(deepgram_error)
|
||||
# Error will be handled inside WebsocketService->_receive_task_handler
|
||||
raise Exception(deepgram_error)
|
||||
|
||||
async def _handle_turn_info(self, data: Dict[str, Any]):
|
||||
"""Handle TurnInfo events from Deepgram Flux.
|
||||
|
||||
TurnInfo messages contain various turn-based events that indicate
|
||||
the state of speech processing, including turn boundaries, interim
|
||||
results, and turn finalization events.
|
||||
|
||||
Args:
|
||||
data: The TurnInfo message data containing event type, transcript and some extra metadata.
|
||||
"""
|
||||
event = data.get("event")
|
||||
transcript = data.get("transcript", "")
|
||||
|
||||
try:
|
||||
flux_event_type = FluxEventType(event)
|
||||
except ValueError:
|
||||
logger.debug(f"Unhandled TurnInfo event: {event}")
|
||||
return
|
||||
|
||||
match flux_event_type:
|
||||
case FluxEventType.START_OF_TURN:
|
||||
await self._handle_start_of_turn(transcript)
|
||||
case FluxEventType.TURN_RESUMED:
|
||||
await self._handle_turn_resumed(event)
|
||||
case FluxEventType.END_OF_TURN:
|
||||
await self._handle_end_of_turn(transcript, data)
|
||||
case FluxEventType.EAGER_END_OF_TURN:
|
||||
await self._handle_eager_end_of_turn(transcript, data)
|
||||
case FluxEventType.UPDATE:
|
||||
await self._handle_update(transcript)
|
||||
|
||||
async def _handle_start_of_turn(self, transcript: str):
|
||||
"""Handle StartOfTurn events from Deepgram Flux.
|
||||
|
||||
StartOfTurn events are fired when Deepgram Flux detects the beginning
|
||||
of a new speaking turn. This triggers bot interruption to stop any
|
||||
ongoing speech synthesis and signals the start of user speech detection.
|
||||
|
||||
The service will:
|
||||
- Send a BotInterruptionFrame upstream to stop bot speech
|
||||
- Send a UserStartedSpeakingFrame downstream to notify other components
|
||||
- Start metrics collection for measuring response times
|
||||
|
||||
Args:
|
||||
transcript: maybe the first few words of the turn.
|
||||
"""
|
||||
logger.debug("User started speaking")
|
||||
self._user_is_speaking = True
|
||||
await self.broadcast_frame(UserStartedSpeakingFrame)
|
||||
if self._should_interrupt:
|
||||
await self.broadcast_interruption()
|
||||
await self.start_metrics()
|
||||
await self._call_event_handler("on_start_of_turn", transcript)
|
||||
if transcript:
|
||||
logger.trace(f"Start of turn transcript: {transcript}")
|
||||
|
||||
async def _handle_turn_resumed(self, event: str):
|
||||
"""Handle TurnResumed events from Deepgram Flux.
|
||||
|
||||
TurnResumed events indicate that speech has resumed after a brief pause
|
||||
within the same turn. This is primarily used for logging and debugging
|
||||
purposes and doesn't trigger any significant processing changes.
|
||||
|
||||
Args:
|
||||
event: The event type string for logging purposes.
|
||||
"""
|
||||
logger.trace(f"Received event TurnResumed: {event}")
|
||||
await self._call_event_handler("on_turn_resumed")
|
||||
|
||||
def _calculate_average_confidence(self, transcript_data) -> Optional[float]:
|
||||
"""Calculate the average confidence from transcript data.
|
||||
|
||||
Return None if the data is missing or invalid.
|
||||
"""
|
||||
# Example: Assume transcript_data has a list of words with confidence
|
||||
words = transcript_data.get("words")
|
||||
if not words or not isinstance(words, list):
|
||||
return None
|
||||
confidences = [
|
||||
w.get("confidence") for w in words if isinstance(w.get("confidence"), (float, int))
|
||||
]
|
||||
if not confidences:
|
||||
return None
|
||||
return sum(confidences) / len(confidences)
|
||||
|
||||
async def _handle_end_of_turn(self, transcript: str, data: Dict[str, Any]):
|
||||
"""Handle EndOfTurn events from Deepgram Flux.
|
||||
|
||||
EndOfTurn events are fired when Deepgram Flux determines that a speaking
|
||||
turn has concluded, either due to sufficient silence or end-of-turn
|
||||
confidence thresholds being met. This provides the final transcript
|
||||
for the completed turn.
|
||||
|
||||
The service will:
|
||||
- Create and send a final TranscriptionFrame with the complete transcript
|
||||
- Trigger transcription handling with tracing for metrics
|
||||
- Stop processing metrics collection
|
||||
- Send a UserStoppedSpeakingFrame to signal turn completion
|
||||
|
||||
Args:
|
||||
transcript: The final transcript text for the completed turn.
|
||||
data: The TurnInfo message data containing event type, transcript and some extra metadata.
|
||||
"""
|
||||
logger.debug("User stopped speaking")
|
||||
self._user_is_speaking = False
|
||||
|
||||
# Compute the average confidence
|
||||
average_confidence = self._calculate_average_confidence(data)
|
||||
|
||||
if not self._settings.min_confidence or average_confidence > self._settings.min_confidence:
|
||||
# EndOfTurn means Flux has determined the turn is complete,
|
||||
# so this TranscriptionFrame is always finalized
|
||||
await self.push_frame(
|
||||
TranscriptionFrame(
|
||||
transcript,
|
||||
self._user_id,
|
||||
time_now_iso8601(),
|
||||
self._settings.language,
|
||||
result=data,
|
||||
finalized=True,
|
||||
)
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"Transcription confidence below min_confidence threshold: {average_confidence}"
|
||||
)
|
||||
|
||||
await self._handle_transcription(transcript, True, self._settings.language)
|
||||
await self.stop_processing_metrics()
|
||||
await self.broadcast_frame(UserStoppedSpeakingFrame)
|
||||
await self._call_event_handler("on_end_of_turn", transcript)
|
||||
|
||||
async def _handle_eager_end_of_turn(self, transcript: str, data: Dict[str, Any]):
|
||||
"""Handle EagerEndOfTurn events from Deepgram Flux.
|
||||
|
||||
EagerEndOfTurn events are fired when the end-of-turn confidence reaches the
|
||||
EagerEndOfTurn threshold but hasn't yet reached the full end-of-turn threshold.
|
||||
These provide interim transcripts that can be used for faster response
|
||||
generation while still allowing the user to continue speaking.
|
||||
|
||||
EagerEndOfTurn events enable more responsive conversational AI by allowing
|
||||
the LLM to start processing likely final transcripts before the turn
|
||||
is definitively ended.
|
||||
|
||||
Args:
|
||||
transcript: The interim transcript text that triggered the EagerEndOfTurn event.
|
||||
data: The TurnInfo message data containing event type, transcript and some extra metadata.
|
||||
"""
|
||||
logger.trace(f"EagerEndOfTurn - {transcript}")
|
||||
# Deepgram's EagerEndOfTurn feature enables lower-latency voice agents by sending
|
||||
# medium-confidence transcripts before EndOfTurn certainty, allowing LLM processing to
|
||||
# begin early.
|
||||
#
|
||||
# However, if speech resumes or the transcripts differ from the final EndOfTurn, the
|
||||
# EagerEndOfTurn response should be cancelled to avoid incorrect or partial responses.
|
||||
#
|
||||
# Pipecat doesn't yet provide built-in Gate/control mechanisms to:
|
||||
# 1. Start LLM/TTS processing early on EagerEndOfTurn events
|
||||
# 2. Cancel in-flight processing when TurnResumed occurs
|
||||
#
|
||||
# By pushing EagerEndOfTurn transcripts as InterimTranscriptionFrame, we enable
|
||||
# developers to implement custom EagerEndOfTurn handling in their applications while
|
||||
# maintaining compatibility with existing interim transcription workflows.
|
||||
#
|
||||
# TODO: Implement proper EagerEndOfTurn support with cancellable processing pipeline
|
||||
# that can start response generation on EagerEndOfTurn and cancel or confirm it.
|
||||
await self.push_frame(
|
||||
InterimTranscriptionFrame(
|
||||
transcript,
|
||||
self._user_id,
|
||||
time_now_iso8601(),
|
||||
self._settings.language,
|
||||
result=data,
|
||||
)
|
||||
)
|
||||
await self._call_event_handler("on_eager_end_of_turn", transcript)
|
||||
|
||||
async def _handle_update(self, transcript: str):
|
||||
"""Handle Update events from Deepgram Flux.
|
||||
|
||||
Update events provide incremental transcript updates during an ongoing
|
||||
turn. These events allow for real-time display of transcription progress
|
||||
and can be used to provide visual feedback to users about what's being
|
||||
recognized.
|
||||
|
||||
The service stops TTFB (Time To First Byte) metrics when the first
|
||||
substantial update is received, indicating successful processing start.
|
||||
|
||||
Args:
|
||||
transcript: The current partial transcript text for the ongoing turn.
|
||||
"""
|
||||
if transcript:
|
||||
logger.trace(f"Update event: {transcript}")
|
||||
# TTFB (Time To First Byte) metrics are currently disabled for Deepgram Flux.
|
||||
# Ideally, TTFB should measure the time from when a user starts speaking
|
||||
# until we receive the first transcript. However, Deepgram Flux delivers
|
||||
# both the "user started speaking" event and the first transcript simultaneously,
|
||||
# making this timing measurement meaningless in this context.
|
||||
# await self.stop_ttfb_metrics()
|
||||
await self._call_event_handler("on_update", transcript)
|
||||
async def _report_error(self, error):
|
||||
await self._call_event_handler("on_connection_error", error.error)
|
||||
await self.push_error_frame(error)
|
||||
|
||||
@@ -4,45 +4,26 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Deepgram Flux speech-to-text service for AWS SageMaker.
|
||||
|
||||
This module provides a Pipecat STT service that connects to Deepgram Flux models
|
||||
deployed on AWS SageMaker endpoints. Uses HTTP/2 bidirectional streaming for
|
||||
low-latency real-time transcription with advanced turn detection (StartOfTurn,
|
||||
EndOfTurn, EagerEndOfTurn, TurnResumed).
|
||||
"""
|
||||
"""Deepgram Flux speech-to-text service for AWS SageMaker (HTTP/2 BiDi transport)."""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, AsyncGenerator, Dict, Optional
|
||||
from urllib.parse import urlencode
|
||||
from typing import AsyncGenerator, Optional
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
InterimTranscriptionFrame,
|
||||
StartFrame,
|
||||
TranscriptionFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.services.aws.sagemaker.bidi_client import SageMakerBidiClient
|
||||
from pipecat.services.deepgram.flux.stt import (
|
||||
from pipecat.services.deepgram.flux.base import (
|
||||
DeepgramFluxSTTBase,
|
||||
DeepgramFluxSTTSettings,
|
||||
FluxEventType,
|
||||
FluxMessageType,
|
||||
)
|
||||
from pipecat.services.settings import STTSettings
|
||||
from pipecat.services.stt_service import STTService
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.utils.time import time_now_iso8601
|
||||
from pipecat.utils.tracing.service_decorators import traced_stt
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -55,7 +36,7 @@ class DeepgramFluxSageMakerSTTSettings(DeepgramFluxSTTSettings):
|
||||
pass
|
||||
|
||||
|
||||
class DeepgramFluxSageMakerSTTService(STTService):
|
||||
class DeepgramFluxSageMakerSTTService(DeepgramFluxSTTBase):
|
||||
"""Deepgram Flux speech-to-text service for AWS SageMaker.
|
||||
|
||||
Provides real-time speech recognition using Deepgram Flux models deployed on
|
||||
@@ -98,7 +79,6 @@ class DeepgramFluxSageMakerSTTService(STTService):
|
||||
|
||||
Settings = DeepgramFluxSageMakerSTTSettings
|
||||
_settings: Settings
|
||||
_CONFIGURE_FIELDS = {"keyterm", "eot_threshold", "eager_eot_threshold", "eot_timeout_ms"}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -145,67 +125,37 @@ class DeepgramFluxSageMakerSTTService(STTService):
|
||||
default_settings.apply_update(settings)
|
||||
|
||||
super().__init__(
|
||||
sample_rate=sample_rate,
|
||||
encoding=encoding,
|
||||
mip_opt_out=mip_opt_out,
|
||||
tag=tag,
|
||||
should_interrupt=should_interrupt,
|
||||
settings=default_settings,
|
||||
sample_rate=sample_rate,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
self._endpoint_name = endpoint_name
|
||||
self._region = region
|
||||
self._encoding = encoding
|
||||
self._mip_opt_out = mip_opt_out
|
||||
self._tag = tag or []
|
||||
self._should_interrupt = should_interrupt
|
||||
|
||||
self._client: Optional[SageMakerBidiClient] = None
|
||||
self._response_task: Optional[asyncio.Task] = None
|
||||
self._watchdog_task: Optional[asyncio.Task] = None
|
||||
|
||||
# Watchdog state
|
||||
self._last_stt_time: Optional[float] = None
|
||||
self._user_is_speaking = False
|
||||
# ------------------------------------------------------------------
|
||||
# Transport interface implementation
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
# Connection readiness: Flux sends a "Connected" message when ready
|
||||
self._connection_established_event = asyncio.Event()
|
||||
async def _transport_send_audio(self, audio: bytes):
|
||||
await self._client.send_audio_chunk(audio)
|
||||
|
||||
# Flux event handlers
|
||||
self._register_event_handler("on_start_of_turn")
|
||||
self._register_event_handler("on_turn_resumed")
|
||||
self._register_event_handler("on_end_of_turn")
|
||||
self._register_event_handler("on_eager_end_of_turn")
|
||||
self._register_event_handler("on_update")
|
||||
async def _transport_send_json(self, message: dict):
|
||||
await self._client.send_json(message)
|
||||
|
||||
def _build_query_string(self) -> str:
|
||||
"""Build query string from current settings and init-only connection config."""
|
||||
params = []
|
||||
def _transport_is_active(self) -> bool:
|
||||
return self._client is not None and self._client.is_active
|
||||
|
||||
s = self._settings
|
||||
|
||||
params.append(f"model={s.model}")
|
||||
params.append(f"sample_rate={self.sample_rate}")
|
||||
params.append(f"encoding={self._encoding}")
|
||||
|
||||
if s.eager_eot_threshold is not None:
|
||||
params.append(f"eager_eot_threshold={s.eager_eot_threshold}")
|
||||
|
||||
if s.eot_threshold is not None:
|
||||
params.append(f"eot_threshold={s.eot_threshold}")
|
||||
|
||||
if s.eot_timeout_ms is not None:
|
||||
params.append(f"eot_timeout_ms={s.eot_timeout_ms}")
|
||||
|
||||
if self._mip_opt_out is not None:
|
||||
params.append(f"mip_opt_out={str(self._mip_opt_out).lower()}")
|
||||
|
||||
# Add keyterm parameters (can have multiple)
|
||||
for keyterm in s.keyterm:
|
||||
params.append(urlencode({"keyterm": keyterm}))
|
||||
|
||||
# Add tag parameters (can have multiple)
|
||||
for tag_value in self._tag:
|
||||
params.append(urlencode({"tag": tag_value}))
|
||||
|
||||
return "&".join(params)
|
||||
# ------------------------------------------------------------------
|
||||
# Connection management
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _connect(self):
|
||||
"""Connect to the SageMaker endpoint and start the BiDi session.
|
||||
@@ -215,15 +165,13 @@ class DeepgramFluxSageMakerSTTService(STTService):
|
||||
"""
|
||||
logger.debug("Connecting to Deepgram Flux on SageMaker...")
|
||||
|
||||
query_string = self._build_query_string()
|
||||
|
||||
self._connection_established_event.clear()
|
||||
|
||||
self._client = SageMakerBidiClient(
|
||||
endpoint_name=self._endpoint_name,
|
||||
region=self._region,
|
||||
model_invocation_path="v2/listen",
|
||||
model_query_string=query_string,
|
||||
model_query_string=self._build_query_string(),
|
||||
)
|
||||
|
||||
try:
|
||||
@@ -270,124 +218,9 @@ class DeepgramFluxSageMakerSTTService(STTService):
|
||||
logger.debug("Disconnected from Deepgram Flux on SageMaker")
|
||||
await self._call_event_handler("on_disconnected")
|
||||
|
||||
async def _send_silence(self, duration_secs: float = 0.5):
|
||||
"""Send a block of silence of the specified duration (default 500 ms)."""
|
||||
sample_width = 2 # bytes per sample for 16-bit PCM
|
||||
num_channels = 1 # mono
|
||||
num_samples = int(self.sample_rate * duration_secs)
|
||||
silence = b"\x00" * (num_samples * sample_width * num_channels)
|
||||
await self._client.send_audio_chunk(silence)
|
||||
|
||||
async def _watchdog_task_handler(self):
|
||||
"""Prevent dangling turns by sending silence when audio stops flowing.
|
||||
|
||||
If we stop sending audio to Flux after receiving a StartOfTurn,
|
||||
we never receive the UserStoppedSpeaking event unless we resume
|
||||
sending audio.
|
||||
"""
|
||||
while self._client and self._client.is_active:
|
||||
now = time.monotonic()
|
||||
# More than 500 ms without sending new audio to Flux
|
||||
if self._user_is_speaking and self._last_stt_time and now - self._last_stt_time > 0.5:
|
||||
logger.warning("Sending silence to Flux to prevent dangling task")
|
||||
try:
|
||||
await self._send_silence()
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to send silence: {e}")
|
||||
self._last_stt_time = time.monotonic()
|
||||
# check every 100ms
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
async def _send_close_stream(self) -> None:
|
||||
"""Sends a CloseStream control message to the Deepgram Flux SageMaker endpoint.
|
||||
|
||||
This signals to the server that no more audio data will be sent.
|
||||
"""
|
||||
try:
|
||||
if self._client and self._client.is_active:
|
||||
logger.debug("Sending CloseStream message to Deepgram Flux on SageMaker")
|
||||
await self._client.send_json({"type": "CloseStream"})
|
||||
except Exception as e:
|
||||
await self.push_error(error_msg=f"Error sending CloseStream: {e}", exception=e)
|
||||
|
||||
async def _send_configure(self, fields: set[str]):
|
||||
"""Send a Configure control message to update settings mid-stream.
|
||||
|
||||
Args:
|
||||
fields: Set of changed field names to include in the message.
|
||||
"""
|
||||
message: dict[str, Any] = {"type": "Configure"}
|
||||
|
||||
if "keyterm" in fields:
|
||||
message["keyterms"] = self._settings.keyterm
|
||||
|
||||
thresholds: dict[str, Any] = {}
|
||||
if "eot_threshold" in fields:
|
||||
thresholds["eot_threshold"] = self._settings.eot_threshold
|
||||
if "eager_eot_threshold" in fields:
|
||||
thresholds["eager_eot_threshold"] = self._settings.eager_eot_threshold
|
||||
if "eot_timeout_ms" in fields:
|
||||
thresholds["eot_timeout_ms"] = self._settings.eot_timeout_ms
|
||||
if thresholds:
|
||||
message["thresholds"] = thresholds
|
||||
|
||||
logger.debug(f"{self}: sending Configure message: {message}")
|
||||
await self._client.send_json(message)
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
"""Check if this service can generate processing metrics.
|
||||
|
||||
Returns:
|
||||
True, as Deepgram Flux SageMaker service supports metrics generation.
|
||||
"""
|
||||
return True
|
||||
|
||||
async def _update_settings(self, delta: STTSettings) -> dict[str, Any]:
|
||||
"""Apply a settings delta.
|
||||
|
||||
Configure-able fields (keyterm, eot_threshold, eager_eot_threshold,
|
||||
eot_timeout_ms) are sent to Deepgram via a Configure message.
|
||||
Other fields are stored but cannot be applied to the active connection.
|
||||
"""
|
||||
changed = await super()._update_settings(delta)
|
||||
|
||||
if not changed:
|
||||
return changed
|
||||
|
||||
configure_fields = changed.keys() & self._CONFIGURE_FIELDS
|
||||
if configure_fields and self._client and self._client.is_active:
|
||||
await self._send_configure(configure_fields)
|
||||
|
||||
self._warn_unhandled_updated_settings(changed.keys() - self._CONFIGURE_FIELDS)
|
||||
|
||||
return changed
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
"""Start the Deepgram Flux SageMaker STT service.
|
||||
|
||||
Args:
|
||||
frame: The start frame containing initialization parameters.
|
||||
"""
|
||||
await super().start(frame)
|
||||
await self._connect()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
"""Stop the Deepgram Flux SageMaker STT service.
|
||||
|
||||
Args:
|
||||
frame: The end frame.
|
||||
"""
|
||||
await super().stop(frame)
|
||||
await self._disconnect()
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
"""Cancel the Deepgram Flux SageMaker STT service.
|
||||
|
||||
Args:
|
||||
frame: The cancel frame.
|
||||
"""
|
||||
await super().cancel(frame)
|
||||
await self._disconnect()
|
||||
# ------------------------------------------------------------------
|
||||
# Audio sending and response receiving
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
|
||||
"""Send audio data to Deepgram Flux for transcription.
|
||||
@@ -409,42 +242,6 @@ class DeepgramFluxSageMakerSTTService(STTService):
|
||||
yield ErrorFrame(error=f"Unknown error occurred: {e}")
|
||||
yield None
|
||||
|
||||
async def start_metrics(self):
|
||||
"""Start TTFB and processing metrics collection."""
|
||||
# TTFB (Time To First Byte) metrics are currently disabled for Deepgram Flux.
|
||||
# Ideally, TTFB should measure the time from when a user starts speaking
|
||||
# until we receive the first transcript. However, Deepgram Flux delivers
|
||||
# both the "user started speaking" event and the first transcript simultaneously,
|
||||
# making this timing measurement meaningless in this context.
|
||||
# await self.start_ttfb_metrics()
|
||||
await self.start_processing_metrics()
|
||||
|
||||
@traced_stt
|
||||
async def _handle_transcription(
|
||||
self, transcript: str, is_final: bool, language: Optional[Language] = None
|
||||
):
|
||||
"""Handle a transcription result with tracing."""
|
||||
pass
|
||||
|
||||
def _validate_message(self, data: Dict[str, Any]) -> bool:
|
||||
"""Validate basic message structure from Deepgram Flux.
|
||||
|
||||
Args:
|
||||
data: The parsed JSON message data to validate.
|
||||
|
||||
Returns:
|
||||
True if the message structure is valid, False otherwise.
|
||||
"""
|
||||
if not isinstance(data, dict):
|
||||
logger.warning("Message is not a dictionary")
|
||||
return False
|
||||
|
||||
if "type" not in data:
|
||||
logger.warning("Message missing 'type' field")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
async def _process_responses(self):
|
||||
"""Process streaming responses from Deepgram Flux on SageMaker."""
|
||||
try:
|
||||
@@ -470,269 +267,3 @@ class DeepgramFluxSageMakerSTTService(STTService):
|
||||
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
|
||||
finally:
|
||||
logger.debug("Response processor stopped")
|
||||
|
||||
async def _handle_message(self, data: Dict[str, Any]):
|
||||
"""Handle a parsed message from Deepgram Flux.
|
||||
|
||||
Routes messages to appropriate handlers based on their type.
|
||||
|
||||
Args:
|
||||
data: The parsed JSON message data.
|
||||
"""
|
||||
if not self._validate_message(data):
|
||||
return
|
||||
|
||||
message_type = data.get("type")
|
||||
|
||||
try:
|
||||
flux_message_type = FluxMessageType(message_type)
|
||||
except ValueError:
|
||||
logger.debug(f"Unhandled message type: {message_type or 'unknown'}")
|
||||
return
|
||||
|
||||
match flux_message_type:
|
||||
case FluxMessageType.RECEIVE_CONNECTED:
|
||||
await self._handle_connection_established()
|
||||
case FluxMessageType.RECEIVE_FATAL_ERROR:
|
||||
await self._handle_fatal_error(data)
|
||||
case FluxMessageType.TURN_INFO:
|
||||
await self._handle_turn_info(data)
|
||||
case FluxMessageType.CONFIGURE_SUCCESS:
|
||||
logger.info(f"{self}: Configure accepted: {data}")
|
||||
case FluxMessageType.CONFIGURE_FAILURE:
|
||||
error_code = data.get("error_code", "unknown")
|
||||
description = data.get("description", "no description")
|
||||
error_msg = f"Configure rejected: [{error_code}] {description}"
|
||||
logger.warning(f"{self}: {error_msg}")
|
||||
await self.push_error(error_msg=error_msg)
|
||||
|
||||
async def _handle_connection_established(self):
|
||||
"""Handle successful connection establishment to Deepgram Flux.
|
||||
|
||||
This event is fired when the WebSocket connection to Deepgram Flux
|
||||
is successfully established and ready to receive audio data for
|
||||
transcription processing.
|
||||
"""
|
||||
logger.info("Connected to Flux - ready to stream audio")
|
||||
# Notify connection is established
|
||||
self._connection_established_event.set()
|
||||
|
||||
async def _handle_fatal_error(self, data: Dict[str, Any]):
|
||||
"""Handle fatal error messages from Deepgram Flux.
|
||||
|
||||
Fatal errors indicate unrecoverable issues with the connection or
|
||||
configuration that require intervention. These errors will cause
|
||||
the connection to be terminated.
|
||||
|
||||
Args:
|
||||
data: The error message data containing error details.
|
||||
|
||||
Raises:
|
||||
Exception: Always raises to trigger error handling in the parent service.
|
||||
"""
|
||||
error_msg = data.get("error", "Unknown error")
|
||||
deepgram_error = f"Fatal error: {error_msg}"
|
||||
logger.error(deepgram_error)
|
||||
# Error will be handled inside WebsocketService->_receive_task_handler
|
||||
raise Exception(deepgram_error)
|
||||
|
||||
async def _handle_turn_info(self, data: Dict[str, Any]):
|
||||
"""Handle TurnInfo events from Deepgram Flux.
|
||||
|
||||
TurnInfo messages contain various turn-based events that indicate
|
||||
the state of speech processing, including turn boundaries, interim
|
||||
results, and turn finalization events.
|
||||
|
||||
Args:
|
||||
data: The TurnInfo message data containing event type, transcript and some extra metadata.
|
||||
"""
|
||||
event = data.get("event")
|
||||
transcript = data.get("transcript", "")
|
||||
|
||||
try:
|
||||
flux_event_type = FluxEventType(event)
|
||||
except ValueError:
|
||||
logger.debug(f"Unhandled TurnInfo event: {event}")
|
||||
return
|
||||
|
||||
match flux_event_type:
|
||||
case FluxEventType.START_OF_TURN:
|
||||
await self._handle_start_of_turn(transcript)
|
||||
case FluxEventType.TURN_RESUMED:
|
||||
await self._handle_turn_resumed(event)
|
||||
case FluxEventType.END_OF_TURN:
|
||||
await self._handle_end_of_turn(transcript, data)
|
||||
case FluxEventType.EAGER_END_OF_TURN:
|
||||
await self._handle_eager_end_of_turn(transcript, data)
|
||||
case FluxEventType.UPDATE:
|
||||
await self._handle_update(transcript)
|
||||
|
||||
async def _handle_start_of_turn(self, transcript: str):
|
||||
"""Handle StartOfTurn events from Deepgram Flux.
|
||||
|
||||
StartOfTurn events are fired when Deepgram Flux detects the beginning
|
||||
of a new speaking turn. This triggers bot interruption to stop any
|
||||
ongoing speech synthesis and signals the start of user speech detection.
|
||||
|
||||
The service will:
|
||||
- Send a BotInterruptionFrame upstream to stop bot speech
|
||||
- Send a UserStartedSpeakingFrame downstream to notify other components
|
||||
- Start metrics collection for measuring response times
|
||||
|
||||
Args:
|
||||
transcript: maybe the first few words of the turn.
|
||||
"""
|
||||
logger.debug("User started speaking")
|
||||
self._user_is_speaking = True
|
||||
await self.broadcast_frame(UserStartedSpeakingFrame)
|
||||
if self._should_interrupt:
|
||||
await self.broadcast_interruption()
|
||||
await self.start_metrics()
|
||||
await self._call_event_handler("on_start_of_turn", transcript)
|
||||
if transcript:
|
||||
logger.trace(f"Start of turn transcript: {transcript}")
|
||||
|
||||
async def _handle_turn_resumed(self, event: str):
|
||||
"""Handle TurnResumed events from Deepgram Flux.
|
||||
|
||||
TurnResumed events indicate that speech has resumed after a brief pause
|
||||
within the same turn. This is primarily used for logging and debugging
|
||||
purposes and doesn't trigger any significant processing changes.
|
||||
|
||||
Args:
|
||||
event: The event type string for logging purposes.
|
||||
"""
|
||||
logger.trace(f"Received event TurnResumed: {event}")
|
||||
await self._call_event_handler("on_turn_resumed")
|
||||
|
||||
def _calculate_average_confidence(self, transcript_data) -> Optional[float]:
|
||||
"""Calculate the average confidence from transcript data.
|
||||
|
||||
Return None if the data is missing or invalid.
|
||||
"""
|
||||
# Example: Assume transcript_data has a list of words with confidence
|
||||
words = transcript_data.get("words")
|
||||
if not words or not isinstance(words, list):
|
||||
return None
|
||||
confidences = [
|
||||
w.get("confidence") for w in words if isinstance(w.get("confidence"), (float, int))
|
||||
]
|
||||
if not confidences:
|
||||
return None
|
||||
return sum(confidences) / len(confidences)
|
||||
|
||||
async def _handle_end_of_turn(self, transcript: str, data: Dict[str, Any]):
|
||||
"""Handle EndOfTurn events from Deepgram Flux.
|
||||
|
||||
EndOfTurn events are fired when Deepgram Flux determines that a speaking
|
||||
turn has concluded, either due to sufficient silence or end-of-turn
|
||||
confidence thresholds being met. This provides the final transcript
|
||||
for the completed turn.
|
||||
|
||||
The service will:
|
||||
- Create and send a final TranscriptionFrame with the complete transcript
|
||||
- Trigger transcription handling with tracing for metrics
|
||||
- Stop processing metrics collection
|
||||
- Send a UserStoppedSpeakingFrame to signal turn completion
|
||||
|
||||
Args:
|
||||
transcript: The final transcript text for the completed turn.
|
||||
data: The TurnInfo message data containing event type, transcript and some extra metadata.
|
||||
"""
|
||||
logger.debug("User stopped speaking")
|
||||
self._user_is_speaking = False
|
||||
|
||||
# Compute the average confidence
|
||||
average_confidence = self._calculate_average_confidence(data)
|
||||
|
||||
if not self._settings.min_confidence or average_confidence > self._settings.min_confidence:
|
||||
# EndOfTurn means Flux has determined the turn is complete,
|
||||
# so this TranscriptionFrame is always finalized
|
||||
await self.push_frame(
|
||||
TranscriptionFrame(
|
||||
transcript,
|
||||
self._user_id,
|
||||
time_now_iso8601(),
|
||||
self._settings.language,
|
||||
result=data,
|
||||
finalized=True,
|
||||
)
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"Transcription confidence below min_confidence threshold: {average_confidence}"
|
||||
)
|
||||
|
||||
await self._handle_transcription(transcript, True, self._settings.language)
|
||||
await self.stop_processing_metrics()
|
||||
await self.broadcast_frame(UserStoppedSpeakingFrame)
|
||||
await self._call_event_handler("on_end_of_turn", transcript)
|
||||
|
||||
async def _handle_eager_end_of_turn(self, transcript: str, data: Dict[str, Any]):
|
||||
"""Handle EagerEndOfTurn events from Deepgram Flux.
|
||||
|
||||
EagerEndOfTurn events are fired when the end-of-turn confidence reaches the
|
||||
EagerEndOfTurn threshold but hasn't yet reached the full end-of-turn threshold.
|
||||
These provide interim transcripts that can be used for faster response
|
||||
generation while still allowing the user to continue speaking.
|
||||
|
||||
EagerEndOfTurn events enable more responsive conversational AI by allowing
|
||||
the LLM to start processing likely final transcripts before the turn
|
||||
is definitively ended.
|
||||
|
||||
Args:
|
||||
transcript: The interim transcript text that triggered the EagerEndOfTurn event.
|
||||
data: The TurnInfo message data containing event type, transcript and some extra metadata.
|
||||
"""
|
||||
logger.trace(f"EagerEndOfTurn - {transcript}")
|
||||
# Deepgram's EagerEndOfTurn feature enables lower-latency voice agents by sending
|
||||
# medium-confidence transcripts before EndOfTurn certainty, allowing LLM processing to
|
||||
# begin early.
|
||||
#
|
||||
# However, if speech resumes or the transcripts differ from the final EndOfTurn, the
|
||||
# EagerEndOfTurn response should be cancelled to avoid incorrect or partial responses.
|
||||
#
|
||||
# Pipecat doesn't yet provide built-in Gate/control mechanisms to:
|
||||
# 1. Start LLM/TTS processing early on EagerEndOfTurn events
|
||||
# 2. Cancel in-flight processing when TurnResumed occurs
|
||||
#
|
||||
# By pushing EagerEndOfTurn transcripts as InterimTranscriptionFrame, we enable
|
||||
# developers to implement custom EagerEndOfTurn handling in their applications while
|
||||
# maintaining compatibility with existing interim transcription workflows.
|
||||
#
|
||||
# TODO: Implement proper EagerEndOfTurn support with cancellable processing pipeline
|
||||
# that can start response generation on EagerEndOfTurn and cancel or confirm it.
|
||||
await self.push_frame(
|
||||
InterimTranscriptionFrame(
|
||||
transcript,
|
||||
self._user_id,
|
||||
time_now_iso8601(),
|
||||
self._settings.language,
|
||||
result=data,
|
||||
)
|
||||
)
|
||||
await self._call_event_handler("on_eager_end_of_turn", transcript)
|
||||
|
||||
async def _handle_update(self, transcript: str):
|
||||
"""Handle Update events from Deepgram Flux.
|
||||
|
||||
Update events provide incremental transcript updates during an ongoing
|
||||
turn. These events allow for real-time display of transcription progress
|
||||
and can be used to provide visual feedback to users about what's being
|
||||
recognized.
|
||||
|
||||
The service stops TTFB (Time To First Byte) metrics when the first
|
||||
substantial update is received, indicating successful processing start.
|
||||
|
||||
Args:
|
||||
transcript: The current partial transcript text for the ongoing turn.
|
||||
"""
|
||||
if transcript:
|
||||
logger.trace(f"Update event: {transcript}")
|
||||
# TTFB (Time To First Byte) metrics are currently disabled for Deepgram Flux.
|
||||
# Ideally, TTFB should measure the time from when a user starts speaking
|
||||
# until we receive the first transcript. However, Deepgram Flux delivers
|
||||
# both the "user started speaking" event and the first transcript simultaneously,
|
||||
# making this timing measurement meaningless in this context.
|
||||
# await self.stop_ttfb_metrics()
|
||||
await self._call_event_handler("on_update", transcript)
|
||||
|
||||
Reference in New Issue
Block a user