Merge pull request #4158 from pipecat-ai/filipi/flux_refactor

Creating a base class, DeepgramFluxSTTBase, to reuse Deepgram Flux logic
This commit is contained in:
Filipi da Silva Fuchter
2026-03-26 17:33:35 -04:00
committed by GitHub
5 changed files with 657 additions and 861 deletions

View File

@@ -23,7 +23,7 @@ from pipecat.processors.aggregators.llm_response_universal import (
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.aws.llm import AWSBedrockLLMService, AWSBedrockLLMSettings
from pipecat.services.deepgram.sagemaker.flux_stt import DeepgramFluxSageMakerSTTService
from pipecat.services.deepgram.flux.sagemaker.stt import DeepgramFluxSageMakerSTTService
from pipecat.services.deepgram.sagemaker.tts import DeepgramSageMakerTTSService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams

View File

@@ -4,19 +4,14 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Deepgram Flux speech-to-text service for AWS SageMaker.
This module provides a Pipecat STT service that connects to Deepgram Flux models
deployed on AWS SageMaker endpoints. Uses HTTP/2 bidirectional streaming for
low-latency real-time transcription with advanced turn detection (StartOfTurn,
EndOfTurn, EagerEndOfTurn, TurnResumed).
"""
"""Deepgram Flux STT base class shared across transports (WebSocket, SageMaker, etc.)."""
import asyncio
import json
import time
from dataclasses import dataclass
from typing import Any, AsyncGenerator, Dict, Optional
from abc import abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, Optional
from urllib.parse import urlencode
from loguru import logger
@@ -24,151 +19,119 @@ from loguru import logger
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.aws.sagemaker.bidi_client import SageMakerBidiClient
from pipecat.services.deepgram.flux.stt import (
DeepgramFluxSTTSettings,
FluxEventType,
FluxMessageType,
)
from pipecat.services.settings import STTSettings
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven
from pipecat.services.stt_service import STTService
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt
class FluxMessageType(str, Enum):
"""Deepgram Flux WebSocket message types.
These are the top-level message types that can be received from the
Deepgram Flux WebSocket connection.
"""
RECEIVE_CONNECTED = "Connected"
RECEIVE_FATAL_ERROR = "Error"
TURN_INFO = "TurnInfo"
CONFIGURE_SUCCESS = "ConfigureSuccess"
CONFIGURE_FAILURE = "ConfigureFailure"
class FluxEventType(str, Enum):
"""Deepgram Flux TurnInfo event types.
These events are contained within TurnInfo messages and indicate
different stages of speech processing and turn detection.
"""
START_OF_TURN = "StartOfTurn"
TURN_RESUMED = "TurnResumed"
END_OF_TURN = "EndOfTurn"
EAGER_END_OF_TURN = "EagerEndOfTurn"
UPDATE = "Update"
@dataclass
class DeepgramFluxSageMakerSTTSettings(DeepgramFluxSTTSettings):
"""Settings for the Deepgram Flux SageMaker STT service.
class DeepgramFluxSTTSettings(STTSettings):
"""Settings for DeepgramFluxSTTService.
Inherits all fields from :class:`DeepgramFluxSTTSettings`.
Parameters:
eager_eot_threshold: EagerEndOfTurn/TurnResumed threshold. Off by default.
Lower values = more aggressive (faster response, more LLM calls).
Higher values = more conservative (slower response, fewer LLM calls).
eot_threshold: End-of-turn confidence required to finish a turn (default 0.7).
eot_timeout_ms: Time in ms after speech to finish a turn regardless of EOT
confidence (default 5000).
keyterm: Keyterms to boost recognition accuracy for specialized terminology.
min_confidence: Minimum confidence required to create a TranscriptionFrame.
"""
pass
eager_eot_threshold: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
eot_threshold: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
eot_timeout_ms: int | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
keyterm: list | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
min_confidence: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
class DeepgramFluxSageMakerSTTService(STTService):
"""Deepgram Flux speech-to-text service for AWS SageMaker.
class DeepgramFluxSTTBase(STTService):
"""Base class for Deepgram Flux STT services across transports.
Provides real-time speech recognition using Deepgram Flux models deployed on
AWS SageMaker endpoints. Uses HTTP/2 bidirectional streaming for low-latency
transcription with advanced turn detection (StartOfTurn, EndOfTurn,
EagerEndOfTurn, TurnResumed).
Unlike the Nova-based SageMaker STT service, Flux handles turn detection
natively, so no external VAD is needed for turn boundaries. Use
``ExternalUserTurnStrategies`` in your pipeline.
Requirements:
- AWS credentials configured (via environment variables, AWS CLI, or instance metadata)
- A deployed SageMaker endpoint with Deepgram Flux model
Event handlers available:
- on_connected: Called when the SageMaker session is established
- on_disconnected: Called when the session is closed
- on_connection_error: Called on connection failure
- on_start_of_turn: Deepgram Flux detected start of speech
- on_end_of_turn: Deepgram Flux detected end of turn
- on_eager_end_of_turn: Deepgram Flux predicted end of turn
- on_turn_resumed: User resumed speaking after EagerEndOfTurn
- on_update: Interim transcript update during a turn
Example::
stt = DeepgramFluxSageMakerSTTService(
endpoint_name="my-deepgram-flux-endpoint",
region="us-east-2",
settings=DeepgramFluxSageMakerSTTService.Settings(
model="flux-general-en",
eot_threshold=0.7,
eager_eot_threshold=0.5,
),
)
Contains all shared Flux protocol logic (message handling, turn detection,
metrics, settings). Concrete subclasses implement the transport layer by
providing three abstract primitives: ``_transport_send_audio``,
``_transport_send_json``, and ``_transport_is_active``.
"""
Settings = DeepgramFluxSageMakerSTTSettings
Settings = DeepgramFluxSTTSettings
_settings: Settings
_CONFIGURE_FIELDS = {"keyterm", "eot_threshold", "eager_eot_threshold", "eot_timeout_ms"}
def __init__(
self,
*,
endpoint_name: str,
region: str,
encoding: str = "linear16",
sample_rate: Optional[int] = None,
mip_opt_out: Optional[bool] = None,
tag: Optional[list] = None,
should_interrupt: bool = True,
settings: Optional[Settings] = None,
settings: Settings,
**kwargs,
):
"""Initialize the Deepgram Flux SageMaker STT service.
"""Initialize the Deepgram Flux STT base service.
Args:
endpoint_name: Name of the SageMaker endpoint with Deepgram Flux model
deployed (e.g., "my-deepgram-flux-endpoint").
region: AWS region where the endpoint is deployed (e.g., "us-east-2").
encoding: Audio encoding format. Defaults to "linear16".
sample_rate: Audio sample rate in Hz. If None, uses the pipeline
sample rate.
mip_opt_out: Opt out of Deepgram model improvement program.
encoding: Audio encoding format. Must be "linear16".
mip_opt_out: Opt out of the Deepgram Model Improvement Program.
tag: Tags to label requests for identification during usage reporting.
should_interrupt: Whether to interrupt the bot when Flux detects that
the user is speaking. Defaults to True.
settings: Runtime-updatable settings.
**kwargs: Additional arguments passed to the parent STTService.
the user is speaking.
settings: Fully resolved settings instance (built by concrete subclass).
**kwargs: Additional arguments passed to the parent STTService (e.g.
``sample_rate``, ``reconnect_on_error``).
"""
# Initialize default settings
default_settings = self.Settings(
model="flux-general-en",
language=Language.EN,
eager_eot_threshold=None,
eot_threshold=None,
eot_timeout_ms=None,
keyterm=[],
min_confidence=None,
)
super().__init__(settings=settings, **kwargs)
# Apply settings delta
if settings is not None:
default_settings.apply_update(settings)
super().__init__(
sample_rate=sample_rate,
settings=default_settings,
**kwargs,
)
self._endpoint_name = endpoint_name
self._region = region
self._encoding = encoding
self._mip_opt_out = mip_opt_out
self._tag = tag or []
self._should_interrupt = should_interrupt
self._client: Optional[SageMakerBidiClient] = None
self._response_task: Optional[asyncio.Task] = None
self._watchdog_task: Optional[asyncio.Task] = None
# Watchdog state
self._last_stt_time: Optional[float] = None
self._user_is_speaking = False
# Connection readiness: Flux sends a "Connected" message when ready
self._connection_established_event = asyncio.Event()
# Watchdog state — see _watchdog_task_handler for details
self._last_stt_time: Optional[float] = None
self._watchdog_task: Optional[asyncio.Task] = None
self._user_is_speaking = False
# Flux event handlers
self._register_event_handler("on_start_of_turn")
self._register_event_handler("on_turn_resumed")
@@ -176,37 +139,115 @@ class DeepgramFluxSageMakerSTTService(STTService):
self._register_event_handler("on_eager_end_of_turn")
self._register_event_handler("on_update")
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
# ------------------------------------------------------------------
# Abstract transport interface — implemented by each concrete subclass
# ------------------------------------------------------------------
Returns:
True, as Deepgram Flux SageMaker service supports metrics generation.
@abstractmethod
async def _transport_send_audio(self, audio: bytes):
"""Send raw audio bytes over the transport."""
pass
@abstractmethod
async def _transport_send_json(self, message: dict):
"""Serialize and send a JSON control message over the transport."""
pass
@abstractmethod
def _transport_is_active(self) -> bool:
"""Return True if the transport connection is currently active."""
pass
@abstractmethod
async def _connect(self):
"""Establish the transport connection."""
pass
@abstractmethod
async def _disconnect(self):
"""Tear down the transport connection."""
pass
# ------------------------------------------------------------------
# Connection helpers
# ------------------------------------------------------------------
def _build_query_string(self) -> str:
"""Build query string from current settings and init-only connection config."""
params = [
f"model={self._settings.model}",
f"sample_rate={self.sample_rate}",
f"encoding={self._encoding}",
]
if self._settings.eager_eot_threshold is not None:
params.append(f"eager_eot_threshold={self._settings.eager_eot_threshold}")
if self._settings.eot_threshold is not None:
params.append(f"eot_threshold={self._settings.eot_threshold}")
if self._settings.eot_timeout_ms is not None:
params.append(f"eot_timeout_ms={self._settings.eot_timeout_ms}")
if self._mip_opt_out is not None:
params.append(f"mip_opt_out={str(self._mip_opt_out).lower()}")
# Add keyterm parameters (can have multiple)
for keyterm in self._settings.keyterm:
params.append(urlencode({"keyterm": keyterm}))
# Add tag parameters (can have multiple)
for tag_value in self._tag:
params.append(urlencode({"tag": tag_value}))
return "&".join(params)
async def _send_silence(self, duration_secs: float = 0.5):
"""Send a block of silence of the specified duration (default 500 ms)."""
sample_width = 2 # bytes per sample for 16-bit PCM
num_channels = 1 # mono
num_samples = int(self.sample_rate * duration_secs)
silence = b"\x00" * (num_samples * sample_width * num_channels)
await self._transport_send_audio(silence)
async def _watchdog_task_handler(self):
"""Prevent dangling turns by sending silence when audio stops flowing.
If we stop sending audio to Flux after receiving a StartOfTurn,
we never receive the UserStoppedSpeaking event unless we resume
sending audio.
"""
return True
while self._transport_is_active():
now = time.monotonic()
# More than 500 ms without sending new audio to Flux
if self._user_is_speaking and self._last_stt_time and now - self._last_stt_time > 0.5:
logger.warning("Sending silence to Flux to prevent dangling task")
try:
await self._send_silence()
except Exception as e:
logger.warning(f"Failed to send silence: {e}")
self._last_stt_time = time.monotonic()
# check every 100ms
await asyncio.sleep(0.1)
async def _update_settings(self, delta: STTSettings) -> dict[str, Any]:
"""Apply a settings delta.
async def _send_close_stream(self) -> None:
"""Sends a CloseStream control message to Deepgram Flux.
Configure-able fields (keyterm, eot_threshold, eager_eot_threshold,
eot_timeout_ms) are sent to Deepgram via a Configure message.
Other fields are stored but cannot be applied to the active connection.
This signals to the server that no more audio data will be sent.
"""
changed = await super()._update_settings(delta)
if not changed:
return changed
configure_fields = changed.keys() & self._CONFIGURE_FIELDS
if configure_fields and self._client and self._client.is_active:
await self._send_configure(configure_fields)
self._warn_unhandled_updated_settings(changed.keys() - self._CONFIGURE_FIELDS)
return changed
try:
if self._transport_is_active():
logger.debug("Sending CloseStream message to Deepgram Flux")
await self._transport_send_json({"type": "CloseStream"})
except Exception as e:
await self.push_error(error_msg=f"Error sending CloseStream: {e}", exception=e)
async def _send_configure(self, fields: set[str]):
"""Send a Configure control message to update settings mid-stream.
Builds a Configure JSON message containing only the fields that changed
and sends it over the existing connection.
Args:
fields: Set of changed field names to include in the message.
"""
@@ -226,19 +267,51 @@ class DeepgramFluxSageMakerSTTService(STTService):
message["thresholds"] = thresholds
logger.debug(f"{self}: sending Configure message: {message}")
await self._client.send_json(message)
await self._transport_send_json(message)
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as Deepgram Flux service supports metrics generation.
"""
return True
async def _update_settings(self, delta: Settings) -> dict[str, Any]:
"""Apply a settings delta.
Configure-able fields (keyterm, eot_threshold, eager_eot_threshold,
eot_timeout_ms) are sent to Deepgram via a Configure message.
Other fields are stored but cannot be applied to the active connection.
"""
changed = await super()._update_settings(delta)
if not changed:
return changed
configure_fields = changed.keys() & self._CONFIGURE_FIELDS
if configure_fields and self._transport_is_active():
await self._send_configure(configure_fields)
self._warn_unhandled_updated_settings(changed.keys() - self._CONFIGURE_FIELDS)
return changed
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
async def start(self, frame: StartFrame):
"""Start the Deepgram Flux SageMaker STT service.
"""Start the Deepgram Flux STT service.
Args:
frame: The start frame containing initialization parameters.
frame: The start frame containing initialization parameters and metadata.
"""
await super().start(frame)
await self._connect()
async def stop(self, frame: EndFrame):
"""Stop the Deepgram Flux SageMaker STT service.
"""Stop the Deepgram Flux STT service.
Args:
frame: The end frame.
@@ -247,7 +320,7 @@ class DeepgramFluxSageMakerSTTService(STTService):
await self._disconnect()
async def cancel(self, frame: CancelFrame):
"""Cancel the Deepgram Flux SageMaker STT service.
"""Cancel the Deepgram Flux STT service.
Args:
frame: The cancel frame.
@@ -255,180 +328,32 @@ class DeepgramFluxSageMakerSTTService(STTService):
await super().cancel(frame)
await self._disconnect()
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""Send audio data to Deepgram Flux for transcription.
async def start_metrics(self):
"""Start TTFB and processing metrics collection."""
# TTFB (Time To First Byte) metrics are currently disabled for Deepgram Flux.
# Ideally, TTFB should measure the time from when a user starts speaking
# until we receive the first transcript. However, Deepgram Flux delivers
# both the "user started speaking" event and the first transcript simultaneously,
# making this timing measurement meaningless in this context.
# await self.start_ttfb_metrics()
await self.start_processing_metrics()
Args:
audio: Raw audio bytes to transcribe.
@traced_stt
async def _handle_transcription(
self, transcript: str, is_final: bool, language: Optional[Language] = None
):
"""Handle a transcription result with tracing."""
pass
Yields:
Frame: None (transcription results come via BiDi stream callbacks).
"""
if not self._connection_established_event.is_set():
yield None
return
if self._client and self._client.is_active:
try:
self._last_stt_time = time.monotonic()
await self._client.send_audio_chunk(audio)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
yield None
def _build_query_string(self) -> str:
"""Build query string from current settings and init-only connection config."""
params = []
s = self._settings
params.append(f"model={s.model}")
params.append(f"sample_rate={self.sample_rate}")
params.append(f"encoding={self._encoding}")
if s.eager_eot_threshold is not None:
params.append(f"eager_eot_threshold={s.eager_eot_threshold}")
if s.eot_threshold is not None:
params.append(f"eot_threshold={s.eot_threshold}")
if s.eot_timeout_ms is not None:
params.append(f"eot_timeout_ms={s.eot_timeout_ms}")
if self._mip_opt_out is not None:
params.append(f"mip_opt_out={str(self._mip_opt_out).lower()}")
# Add keyterm parameters (can have multiple)
for keyterm in s.keyterm:
params.append(urlencode({"keyterm": keyterm}))
# Add tag parameters (can have multiple)
for tag_value in self._tag:
params.append(urlencode({"tag": tag_value}))
return "&".join(params)
async def _connect(self):
"""Connect to the SageMaker endpoint and start the BiDi session.
Starts the HTTP/2 session and waits for the Flux ``Connected`` message
before returning, ensuring audio is not sent before the model is ready.
"""
logger.debug("Connecting to Deepgram Flux on SageMaker...")
query_string = self._build_query_string()
self._connection_established_event.clear()
self._client = SageMakerBidiClient(
endpoint_name=self._endpoint_name,
region=self._region,
model_invocation_path="v2/listen",
model_query_string=query_string,
)
try:
await self._client.start_session()
# Start response processor first so we can receive the Connected message
self._response_task = self.create_task(self._process_responses())
# Wait for Flux to confirm the connection is ready
logger.debug("SageMaker session started, waiting for Flux connection confirmation...")
await self._connection_established_event.wait()
# Note: Flux does not support KeepAlive messages (only CloseStream and
# Configure are valid). The watchdog task handles keeping the connection
# alive by sending silence when needed.
self._watchdog_task = self.create_task(self._watchdog_task_handler())
logger.debug("Connected to Deepgram Flux on SageMaker")
await self._call_event_handler("on_connected")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
await self._call_event_handler("on_connection_error", str(e))
async def _disconnect(self):
"""Disconnect from the SageMaker endpoint."""
self._connection_established_event.clear()
if self._client and self._client.is_active:
logger.debug("Disconnecting from Deepgram Flux on SageMaker...")
try:
await self._client.send_json({"type": "CloseStream"})
except Exception as e:
logger.warning(f"Failed to send CloseStream message: {e}")
if self._watchdog_task and not self._watchdog_task.done():
await self.cancel_task(self._watchdog_task)
self._watchdog_task = None
self._last_stt_time = None
if self._response_task and not self._response_task.done():
await self.cancel_task(self._response_task)
await self._client.close_session()
logger.debug("Disconnected from Deepgram Flux on SageMaker")
await self._call_event_handler("on_disconnected")
async def _send_silence(self, duration_secs: float = 0.5):
"""Send a block of silence of the specified duration (default 500 ms)."""
sample_width = 2 # bytes per sample for 16-bit PCM
num_channels = 1 # mono
num_samples = int(self.sample_rate * duration_secs)
silence = b"\x00" * (num_samples * sample_width * num_channels)
await self._client.send_audio_chunk(silence)
async def _watchdog_task_handler(self):
"""Prevent dangling turns by sending silence when audio stops flowing.
If we stop sending audio to Flux after receiving a StartOfTurn,
we never receive the UserStoppedSpeaking event unless we resume
sending audio.
"""
while self._client and self._client.is_active:
now = time.monotonic()
if self._user_is_speaking and self._last_stt_time and now - self._last_stt_time > 0.5:
logger.warning("Sending silence to Flux to prevent dangling task")
try:
await self._send_silence()
except Exception as e:
logger.warning(f"Failed to send silence: {e}")
self._last_stt_time = time.monotonic()
await asyncio.sleep(0.1)
async def _process_responses(self):
"""Process streaming responses from Deepgram Flux on SageMaker."""
try:
while self._client and self._client.is_active:
result = await self._client.receive_response()
if result is None:
break
if hasattr(result, "value") and hasattr(result.value, "bytes_"):
if result.value.bytes_:
response_data = result.value.bytes_.decode("utf-8")
try:
parsed = json.loads(response_data)
await self._handle_message(parsed)
except json.JSONDecodeError:
logger.warning(f"Non-JSON response: {response_data}")
except asyncio.CancelledError:
logger.debug("Response processor cancelled")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
logger.debug("Response processor stopped")
# ------------------------------------------------------------------
# Message handling
# ------------------------------------------------------------------
def _validate_message(self, data: Dict[str, Any]) -> bool:
"""Validate basic message structure from Deepgram Flux.
Ensures the received message has the expected structure before processing.
Args:
data: The parsed JSON message data to validate.
@@ -448,7 +373,8 @@ class DeepgramFluxSageMakerSTTService(STTService):
async def _handle_message(self, data: Dict[str, Any]):
"""Handle a parsed message from Deepgram Flux.
Routes messages to appropriate handlers based on their type.
Routes messages to appropriate handlers based on their type. Validates
message structure before processing.
Args:
data: The parsed JSON message data.
@@ -466,12 +392,9 @@ class DeepgramFluxSageMakerSTTService(STTService):
match flux_message_type:
case FluxMessageType.RECEIVE_CONNECTED:
logger.info("Connected to Flux on SageMaker - ready to stream audio")
self._connection_established_event.set()
await self._handle_connection_established()
case FluxMessageType.RECEIVE_FATAL_ERROR:
error_msg = data.get("error") or data.get("message") or data.get("description")
logger.error(f"Fatal error from Deepgram Flux: {error_msg} (full: {data})")
await self.push_error(error_msg=f"Fatal error: {error_msg or 'Unknown error'}")
await self._handle_fatal_error(data)
case FluxMessageType.TURN_INFO:
await self._handle_turn_info(data)
case FluxMessageType.CONFIGURE_SUCCESS:
@@ -483,11 +406,44 @@ class DeepgramFluxSageMakerSTTService(STTService):
logger.warning(f"{self}: {error_msg}")
await self.push_error(error_msg=error_msg)
async def _handle_connection_established(self):
"""Handle successful connection establishment to Deepgram Flux.
This event is fired when the connection to Deepgram Flux is successfully
established and ready to receive audio data for transcription processing.
"""
logger.info("Connected to Flux - ready to stream audio")
# Notify connection is established
self._connection_established_event.set()
async def _handle_fatal_error(self, data: Dict[str, Any]):
"""Handle fatal error messages from Deepgram Flux.
Fatal errors indicate unrecoverable issues with the connection or
configuration that require intervention. These errors will cause
the connection to be terminated.
Args:
data: The error message data containing error details.
Raises:
Exception: Always raises to trigger error handling in the transport layer.
"""
error_msg = data.get("error", "Unknown error")
deepgram_error = f"Fatal error: {error_msg}"
logger.error(deepgram_error)
# Error will be handled by the transport's receive loop error handler
raise Exception(deepgram_error)
async def _handle_turn_info(self, data: Dict[str, Any]):
"""Handle TurnInfo events from Deepgram Flux.
TurnInfo messages contain various turn-based events that indicate
the state of speech processing, including turn boundaries, interim
results, and turn finalization events.
Args:
data: The TurnInfo message data containing event type and transcript.
data: The TurnInfo message data containing event type, transcript and some extra metadata.
"""
event = data.get("event")
transcript = data.get("transcript", "")
@@ -513,15 +469,24 @@ class DeepgramFluxSageMakerSTTService(STTService):
async def _handle_start_of_turn(self, transcript: str):
"""Handle StartOfTurn events from Deepgram Flux.
StartOfTurn events are fired when Deepgram Flux detects the beginning
of a new speaking turn. This triggers bot interruption to stop any
ongoing speech synthesis and signals the start of user speech detection.
The service will:
- Send a BotInterruptionFrame upstream to stop bot speech
- Send a UserStartedSpeakingFrame downstream to notify other components
- Start metrics collection for measuring response times
Args:
transcript: Maybe the first few words of the turn.
transcript: maybe the first few words of the turn.
"""
logger.debug("User started speaking")
self._user_is_speaking = True
await self.broadcast_frame(UserStartedSpeakingFrame)
if self._should_interrupt:
await self.broadcast_interruption()
await self.start_processing_metrics()
await self.start_metrics()
await self._call_event_handler("on_start_of_turn", transcript)
if transcript:
logger.trace(f"Start of turn transcript: {transcript}")
@@ -529,6 +494,10 @@ class DeepgramFluxSageMakerSTTService(STTService):
async def _handle_turn_resumed(self, event: str):
"""Handle TurnResumed events from Deepgram Flux.
TurnResumed events indicate that speech has resumed after a brief pause
within the same turn. This is primarily used for logging and debugging
purposes and doesn't trigger any significant processing changes.
Args:
event: The event type string for logging purposes.
"""
@@ -540,6 +509,7 @@ class DeepgramFluxSageMakerSTTService(STTService):
Return None if the data is missing or invalid.
"""
# Example: Assume transcript_data has a list of words with confidence
words = transcript_data.get("words")
if not words or not isinstance(words, list):
return None
@@ -553,16 +523,30 @@ class DeepgramFluxSageMakerSTTService(STTService):
async def _handle_end_of_turn(self, transcript: str, data: Dict[str, Any]):
"""Handle EndOfTurn events from Deepgram Flux.
EndOfTurn events are fired when Deepgram Flux determines that a speaking
turn has concluded, either due to sufficient silence or end-of-turn
confidence thresholds being met. This provides the final transcript
for the completed turn.
The service will:
- Create and send a final TranscriptionFrame with the complete transcript
- Trigger transcription handling with tracing for metrics
- Stop processing metrics collection
- Send a UserStoppedSpeakingFrame to signal turn completion
Args:
transcript: The final transcript text for the completed turn.
data: The TurnInfo message data.
data: The TurnInfo message data containing event type, transcript and some extra metadata.
"""
logger.debug("User stopped speaking")
self._user_is_speaking = False
# Compute the average confidence
average_confidence = self._calculate_average_confidence(data)
if not self._settings.min_confidence or average_confidence > self._settings.min_confidence:
# EndOfTurn means Flux has determined the turn is complete,
# so this TranscriptionFrame is always finalized
await self.push_frame(
TranscriptionFrame(
transcript,
@@ -586,11 +570,37 @@ class DeepgramFluxSageMakerSTTService(STTService):
async def _handle_eager_end_of_turn(self, transcript: str, data: Dict[str, Any]):
"""Handle EagerEndOfTurn events from Deepgram Flux.
EagerEndOfTurn events are fired when the end-of-turn confidence reaches the
EagerEndOfTurn threshold but hasn't yet reached the full end-of-turn threshold.
These provide interim transcripts that can be used for faster response
generation while still allowing the user to continue speaking.
EagerEndOfTurn events enable more responsive conversational AI by allowing
the LLM to start processing likely final transcripts before the turn
is definitively ended.
Args:
transcript: The interim transcript text.
data: The TurnInfo message data.
transcript: The interim transcript text that triggered the EagerEndOfTurn event.
data: The TurnInfo message data containing event type, transcript and some extra metadata.
"""
logger.trace(f"EagerEndOfTurn - {transcript}")
# Deepgram's EagerEndOfTurn feature enables lower-latency voice agents by sending
# medium-confidence transcripts before EndOfTurn certainty, allowing LLM processing to
# begin early.
#
# However, if speech resumes or the transcripts differ from the final EndOfTurn, the
# EagerEndOfTurn response should be cancelled to avoid incorrect or partial responses.
#
# Pipecat doesn't yet provide built-in Gate/control mechanisms to:
# 1. Start LLM/TTS processing early on EagerEndOfTurn events
# 2. Cancel in-flight processing when TurnResumed occurs
#
# By pushing EagerEndOfTurn transcripts as InterimTranscriptionFrame, we enable
# developers to implement custom EagerEndOfTurn handling in their applications while
# maintaining compatibility with existing interim transcription workflows.
#
# TODO: Implement proper EagerEndOfTurn support with cancellable processing pipeline
# that can start response generation on EagerEndOfTurn and cancel or confirm it.
await self.push_frame(
InterimTranscriptionFrame(
transcript,
@@ -605,16 +615,20 @@ class DeepgramFluxSageMakerSTTService(STTService):
async def _handle_update(self, transcript: str):
"""Handle Update events from Deepgram Flux.
Update events provide incremental transcript updates during an ongoing
turn. These events allow for real-time display of transcription progress
and can be used to provide visual feedback to users about what's being
recognized.
Args:
transcript: The current partial transcript text for the ongoing turn.
"""
if transcript:
logger.trace(f"Update event: {transcript}")
# TTFB (Time To First Byte) metrics are currently disabled for Deepgram Flux.
# Ideally, TTFB should measure the time from when a user starts speaking
# until we receive the first transcript. However, Deepgram Flux delivers
# both the "user started speaking" event and the first transcript simultaneously,
# making this timing measurement meaningless in this context.
# await self.stop_ttfb_metrics()
await self._call_event_handler("on_update", transcript)
@traced_stt
async def _handle_transcription(
self, transcript: str, is_final: bool, language: Optional[Language] = None
):
"""Handle a transcription result with tracing."""
pass

View File

@@ -0,0 +1,269 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Deepgram Flux speech-to-text service for AWS SageMaker (HTTP/2 BiDi transport)."""
import asyncio
import json
import time
from dataclasses import dataclass
from typing import AsyncGenerator, Optional
from loguru import logger
from pipecat.frames.frames import (
ErrorFrame,
Frame,
)
from pipecat.services.aws.sagemaker.bidi_client import SageMakerBidiClient
from pipecat.services.deepgram.flux.base import (
DeepgramFluxSTTBase,
DeepgramFluxSTTSettings,
)
from pipecat.transcriptions.language import Language
@dataclass
class DeepgramFluxSageMakerSTTSettings(DeepgramFluxSTTSettings):
"""Settings for the Deepgram Flux SageMaker STT service.
Inherits all fields from :class:`DeepgramFluxSTTSettings`.
"""
pass
class DeepgramFluxSageMakerSTTService(DeepgramFluxSTTBase):
"""Deepgram Flux speech-to-text service for AWS SageMaker.
Provides real-time speech recognition using Deepgram Flux models deployed on
AWS SageMaker endpoints. Uses HTTP/2 bidirectional streaming for low-latency
transcription with advanced turn detection (StartOfTurn, EndOfTurn,
EagerEndOfTurn, TurnResumed).
Unlike the Nova-based SageMaker STT service, Flux handles turn detection
natively, so no external VAD is needed for turn boundaries. Use
``ExternalUserTurnStrategies`` in your pipeline.
Requirements:
- AWS credentials configured (via environment variables, AWS CLI, or instance metadata)
- A deployed SageMaker endpoint with Deepgram Flux model
Event handlers available:
- on_connected: Called when the SageMaker session is established
- on_disconnected: Called when the session is closed
- on_connection_error: Called on connection failure
- on_start_of_turn: Deepgram Flux detected start of speech
- on_end_of_turn: Deepgram Flux detected end of turn
- on_eager_end_of_turn: Deepgram Flux predicted end of turn
- on_turn_resumed: User resumed speaking after EagerEndOfTurn
- on_update: Interim transcript update during a turn
Example::
stt = DeepgramFluxSageMakerSTTService(
endpoint_name="my-deepgram-flux-endpoint",
region="us-east-2",
settings=DeepgramFluxSageMakerSTTService.Settings(
model="flux-general-en",
eot_threshold=0.7,
eager_eot_threshold=0.5,
),
)
"""
Settings = DeepgramFluxSageMakerSTTSettings
_settings: Settings
def __init__(
self,
*,
endpoint_name: str,
region: str,
encoding: str = "linear16",
sample_rate: Optional[int] = None,
mip_opt_out: Optional[bool] = None,
tag: Optional[list] = None,
should_interrupt: bool = True,
settings: Optional[Settings] = None,
**kwargs,
):
"""Initialize the Deepgram Flux SageMaker STT service.
Args:
endpoint_name: Name of the SageMaker endpoint with Deepgram Flux model
deployed (e.g., "my-deepgram-flux-endpoint").
region: AWS region where the endpoint is deployed (e.g., "us-east-2").
encoding: Audio encoding format. Defaults to "linear16".
sample_rate: Audio sample rate in Hz. If None, uses the pipeline
sample rate.
mip_opt_out: Opt out of Deepgram model improvement program.
tag: Tags to label requests for identification during usage reporting.
should_interrupt: Whether to interrupt the bot when Flux detects that
the user is speaking. Defaults to True.
settings: Runtime-updatable settings.
**kwargs: Additional arguments passed to the parent STTService.
"""
# Initialize default settings
default_settings = self.Settings(
model="flux-general-en",
language=Language.EN,
eager_eot_threshold=None,
eot_threshold=None,
eot_timeout_ms=None,
keyterm=[],
min_confidence=None,
)
# Apply settings delta
if settings is not None:
default_settings.apply_update(settings)
super().__init__(
encoding=encoding,
mip_opt_out=mip_opt_out,
tag=tag,
should_interrupt=should_interrupt,
settings=default_settings,
sample_rate=sample_rate,
**kwargs,
)
self._endpoint_name = endpoint_name
self._region = region
self._client: Optional[SageMakerBidiClient] = None
self._response_task: Optional[asyncio.Task] = None
# ------------------------------------------------------------------
# Transport interface implementation
# ------------------------------------------------------------------
async def _transport_send_audio(self, audio: bytes):
await self._client.send_audio_chunk(audio)
async def _transport_send_json(self, message: dict):
await self._client.send_json(message)
def _transport_is_active(self) -> bool:
return self._client is not None and self._client.is_active
# ------------------------------------------------------------------
# Connection management
# ------------------------------------------------------------------
async def _connect(self):
"""Connect to the SageMaker endpoint and start the BiDi session.
Starts the HTTP/2 session and waits for the Flux ``Connected`` message
before returning, ensuring audio is not sent before the model is ready.
"""
logger.debug("Connecting to Deepgram Flux on SageMaker...")
self._connection_established_event.clear()
self._client = SageMakerBidiClient(
endpoint_name=self._endpoint_name,
region=self._region,
model_invocation_path="v2/listen",
model_query_string=self._build_query_string(),
)
try:
await self._client.start_session()
# Start response processor first so we can receive the Connected message
self._response_task = self.create_task(self._process_responses())
# Wait for Flux to confirm the connection is ready
logger.debug("SageMaker session started, waiting for Flux connection confirmation...")
await self._connection_established_event.wait()
# Note: Flux does not support KeepAlive messages (only CloseStream and
# Configure are valid). The watchdog task handles keeping the connection
# alive by sending silence when needed.
self._watchdog_task = self.create_task(self._watchdog_task_handler())
logger.debug("Connected to Deepgram Flux on SageMaker")
await self._call_event_handler("on_connected")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
await self._call_event_handler("on_connection_error", str(e))
async def _disconnect(self):
"""Disconnect from the SageMaker endpoint."""
self._connection_established_event.clear()
if self._client and self._client.is_active:
logger.debug("Disconnecting from Deepgram Flux on SageMaker...")
await self._send_close_stream()
if self._watchdog_task and not self._watchdog_task.done():
await self.cancel_task(self._watchdog_task)
self._watchdog_task = None
self._last_stt_time = None
if self._response_task and not self._response_task.done():
await self.cancel_task(self._response_task)
await self._client.close_session()
logger.debug("Disconnected from Deepgram Flux on SageMaker")
await self._call_event_handler("on_disconnected")
# ------------------------------------------------------------------
# Audio sending and response receiving
# ------------------------------------------------------------------
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""Send audio data to Deepgram Flux for transcription.
Args:
audio: Raw audio bytes to transcribe.
Yields:
Frame: None (transcription results come via BiDi stream callbacks).
"""
if not self._connection_established_event.is_set():
return
if self._client and self._client.is_active:
try:
self._last_stt_time = time.monotonic()
await self._client.send_audio_chunk(audio)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
yield None
async def _process_responses(self):
"""Process streaming responses from Deepgram Flux on SageMaker."""
try:
while self._client and self._client.is_active:
result = await self._client.receive_response()
if result is None:
break
if hasattr(result, "value") and hasattr(result.value, "bytes_"):
if result.value.bytes_:
response_data = result.value.bytes_.decode("utf-8")
try:
parsed = json.loads(response_data)
await self._handle_message(parsed)
except json.JSONDecodeError:
logger.warning(f"Non-JSON response: {response_data}")
except asyncio.CancelledError:
logger.debug("Response processor cancelled")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
logger.debug("Response processor stopped")

View File

@@ -4,35 +4,28 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Deepgram Flux speech-to-text service implementation."""
"""Deepgram Flux speech-to-text service implementation (WebSocket transport)."""
import asyncio
import json
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, AsyncGenerator, Dict, Optional
from urllib.parse import urlencode
from typing import AsyncGenerator, Optional
from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven
from pipecat.services.stt_service import WebsocketSTTService
from pipecat.services.deepgram.flux.base import (
DeepgramFluxSTTBase,
DeepgramFluxSTTSettings,
FluxEventType,
FluxMessageType,
)
from pipecat.services.websocket_service import WebsocketService
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt
try:
from websockets.asyncio.client import connect as websocket_connect
@@ -42,65 +35,23 @@ except ModuleNotFoundError as e:
logger.error("In order to use Deepgram Flux, you need to `pip install pipecat-ai[deepgram]`.")
raise Exception(f"Missing module: {e}")
class FluxMessageType(str, Enum):
"""Deepgram Flux WebSocket message types.
These are the top-level message types that can be received from the
Deepgram Flux WebSocket connection.
"""
RECEIVE_CONNECTED = "Connected"
RECEIVE_FATAL_ERROR = "Error"
TURN_INFO = "TurnInfo"
CONFIGURE_SUCCESS = "ConfigureSuccess"
CONFIGURE_FAILURE = "ConfigureFailure"
# Re-export for backward compatibility
__all__ = [
"DeepgramFluxSTTService",
"DeepgramFluxSTTSettings",
"FluxEventType",
"FluxMessageType",
]
class FluxEventType(str, Enum):
"""Deepgram Flux TurnInfo event types.
These events are contained within TurnInfo messages and indicate
different stages of speech processing and turn detection.
"""
START_OF_TURN = "StartOfTurn"
TURN_RESUMED = "TurnResumed"
END_OF_TURN = "EndOfTurn"
EAGER_END_OF_TURN = "EagerEndOfTurn"
UPDATE = "Update"
@dataclass
class DeepgramFluxSTTSettings(STTSettings):
"""Settings for DeepgramFluxSTTService.
Parameters:
eager_eot_threshold: EagerEndOfTurn/TurnResumed threshold. Off by default.
Lower values = more aggressive (faster response, more LLM calls).
Higher values = more conservative (slower response, fewer LLM calls).
eot_threshold: End-of-turn confidence required to finish a turn (default 0.7).
eot_timeout_ms: Time in ms after speech to finish a turn regardless of EOT
confidence (default 5000).
keyterm: Keyterms to boost recognition accuracy for specialized terminology.
min_confidence: Minimum confidence required to create a TranscriptionFrame.
"""
eager_eot_threshold: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
eot_threshold: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
eot_timeout_ms: int | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
keyterm: list | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
min_confidence: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
class DeepgramFluxSTTService(WebsocketSTTService):
class DeepgramFluxSTTService(DeepgramFluxSTTBase, WebsocketService):
"""Deepgram Flux speech-to-text service.
Provides real-time speech recognition using Deepgram's WebSocket API with Flux capabilities.
Supports configurable models, VAD events, and various audio processing options
including advanced turn detection and EagerEndOfTurn events for improved conversational AI performance.
Event handlers available (in addition to WebsocketSTTService events):
Event handlers available (in addition to base events):
- on_speech_started(service): Deepgram detected start of speech
- on_utterance_end(service): Deepgram detected end of utterance
@@ -117,7 +68,6 @@ class DeepgramFluxSTTService(WebsocketSTTService):
Settings = DeepgramFluxSTTSettings
_settings: Settings
_CONFIGURE_FIELDS = {"keyterm", "eot_threshold", "eager_eot_threshold", "eot_timeout_ms"}
class InputParams(BaseModel):
"""Configuration parameters for Deepgram Flux API.
@@ -189,7 +139,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
should_interrupt: Determine whether the bot should be interrupted when Flux detects that the user is speaking.
settings: Runtime-updatable settings. When provided alongside deprecated
parameters, ``settings`` values take precedence.
**kwargs: Additional arguments passed to the parent WebsocketSTTService class.
**kwargs: Additional arguments passed to the parent classes.
Examples:
Basic usage with default parameters::
@@ -254,34 +204,39 @@ class DeepgramFluxSTTService(WebsocketSTTService):
if settings is not None:
default_settings.apply_update(settings)
super().__init__(
sample_rate=sample_rate,
reconnect_on_error=False,
DeepgramFluxSTTBase.__init__(
self,
encoding=flux_encoding,
mip_opt_out=mip_opt_out,
tag=tag,
should_interrupt=should_interrupt,
settings=default_settings,
sample_rate=sample_rate,
**kwargs,
)
WebsocketService.__init__(self, reconnect_on_error=False)
self._api_key = api_key
self._url = url
self._should_interrupt = should_interrupt
self._encoding = flux_encoding
self._mip_opt_out = mip_opt_out
self._tag = tag or []
self._websocket_url = None
self._receive_task = None
# Flux event handlers
self._register_event_handler("on_start_of_turn")
self._register_event_handler("on_turn_resumed")
self._register_event_handler("on_end_of_turn")
self._register_event_handler("on_eager_end_of_turn")
self._register_event_handler("on_update")
self._connection_established_event = asyncio.Event()
# Watchdog task to prevent dangling tasks
# If we stop sending audio to Flux after we have received that the User has started speaking
# we never receive the user stopped speaking event unless we resume sending audio to it.
self._last_stt_time = None
self._watchdog_task = None
self._user_is_speaking = False
# ------------------------------------------------------------------
# Transport interface implementation
# ------------------------------------------------------------------
async def _transport_send_audio(self, audio: bytes):
await self._websocket.send(audio)
async def _transport_send_json(self, message: dict):
await self._websocket.send(json.dumps(message))
def _transport_is_active(self) -> bool:
return self._websocket is not None and self._websocket.state is State.OPEN
# ------------------------------------------------------------------
# Connection management
# ------------------------------------------------------------------
async def _connect(self):
"""Connect to WebSocket and start background tasks.
@@ -290,7 +245,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
the background task for receiving transcription results.
"""
await super()._connect()
self._websocket_url = f"{self._url}?{self._build_query_string()}"
await self._connect_websocket()
async def _disconnect(self):
@@ -309,25 +264,6 @@ class DeepgramFluxSTTService(WebsocketSTTService):
# Reset state only after everything is cleaned up
self._websocket = None
async def _send_silence(self, duration_secs: float = 0.5):
"""Send a block of silence of the specified duration (default 500 ms)."""
sample_width = 2 # bytes per sample for 16-bit PCM
num_channels = 1 # mono
num_samples = int(self.sample_rate * duration_secs)
silence = b"\x00" * (num_samples * sample_width * num_channels)
await self._websocket.send(silence)
async def _watchdog_task_handler(self):
while self._websocket and self._websocket.state is State.OPEN:
now = time.monotonic()
# More than 500 ms without sending new audio to Flux
if self._user_is_speaking and self._last_stt_time and now - self._last_stt_time > 0.5:
logger.warning("Sending silence to Flux to prevent dangling task")
await self._send_silence()
self._last_stt_time = time.monotonic()
# check every 100ms
await asyncio.sleep(0.1)
async def _connect_websocket(self):
"""Establish WebSocket connection to API.
@@ -400,131 +336,9 @@ class DeepgramFluxSTTService(WebsocketSTTService):
self._websocket = None
await self._call_event_handler("on_disconnected")
async def _send_close_stream(self) -> None:
"""Sends a CloseStream control message to the Deepgram Flux WebSocket API.
This signals to the server that no more audio data will be sent.
"""
try:
if self._websocket:
logger.debug("Sending CloseStream message to Deepgram Flux")
message = {"type": "CloseStream"}
await self._websocket.send(json.dumps(message))
except Exception as e:
await self.push_error(error_msg=f"Error sending closeStream: {e}", exception=e)
async def _send_configure(self, fields: set[str]):
"""Send a Configure control message to update settings mid-stream.
Builds a Configure JSON message containing only the fields that changed
and sends it over the existing WebSocket connection.
Args:
fields: Set of changed field names to include in the message.
"""
message: dict[str, Any] = {"type": "Configure"}
if "keyterm" in fields:
message["keyterms"] = self._settings.keyterm
thresholds: dict[str, Any] = {}
if "eot_threshold" in fields:
thresholds["eot_threshold"] = self._settings.eot_threshold
if "eager_eot_threshold" in fields:
thresholds["eager_eot_threshold"] = self._settings.eager_eot_threshold
if "eot_timeout_ms" in fields:
thresholds["eot_timeout_ms"] = self._settings.eot_timeout_ms
if thresholds:
message["thresholds"] = thresholds
logger.debug(f"{self}: sending Configure message: {message}")
await self._websocket.send(json.dumps(message))
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as Deepgram service supports metrics generation.
"""
return True
async def _update_settings(self, delta: Settings) -> dict[str, Any]:
"""Apply a settings delta.
Configure-able fields (keyterm, eot_threshold, eager_eot_threshold,
eot_timeout_ms) are sent to Deepgram via a Configure WebSocket message.
Other fields are stored but cannot be applied to the active connection.
"""
changed = await super()._update_settings(delta)
if not changed:
return changed
configure_fields = changed.keys() & self._CONFIGURE_FIELDS
if configure_fields and self._websocket and self._websocket.state is State.OPEN:
await self._send_configure(configure_fields)
self._warn_unhandled_updated_settings(changed.keys() - self._CONFIGURE_FIELDS)
return changed
async def start(self, frame: StartFrame):
"""Start the Deepgram Flux STT service.
Initializes the service by constructing the WebSocket URL with all configured
parameters and establishing the connection to begin transcription processing.
Args:
frame: The start frame containing initialization parameters and metadata.
"""
await super().start(frame)
url_params = [
f"model={self._settings.model}",
f"sample_rate={self.sample_rate}",
f"encoding={self._encoding}",
]
if self._settings.eager_eot_threshold is not None:
url_params.append(f"eager_eot_threshold={self._settings.eager_eot_threshold}")
if self._settings.eot_threshold is not None:
url_params.append(f"eot_threshold={self._settings.eot_threshold}")
if self._settings.eot_timeout_ms is not None:
url_params.append(f"eot_timeout_ms={self._settings.eot_timeout_ms}")
if self._mip_opt_out is not None:
url_params.append(f"mip_opt_out={str(self._mip_opt_out).lower()}")
# Add keyterm parameters (can have multiple)
for keyterm in self._settings.keyterm:
url_params.append(urlencode({"keyterm": keyterm}))
# Add tag parameters (can have multiple)
for tag_value in self._tag:
url_params.append(urlencode({"tag": tag_value}))
self._websocket_url = f"{self._url}?{'&'.join(url_params)}"
await self._connect()
async def stop(self, frame: EndFrame):
"""Stop the Deepgram Flux STT service.
Args:
frame: The end frame.
"""
await super().stop(frame)
await self._disconnect()
async def cancel(self, frame: CancelFrame):
"""Cancel the Deepgram Flux STT service.
Args:
frame: The cancel frame.
"""
await super().cancel(frame)
await self._disconnect()
# ------------------------------------------------------------------
# Audio sending and receiving
# ------------------------------------------------------------------
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""Send audio data to Deepgram Flux for transcription.
@@ -556,23 +370,6 @@ class DeepgramFluxSTTService(WebsocketSTTService):
yield None
async def start_metrics(self):
"""Start TTFB and processing metrics collection."""
# TTFB (Time To First Byte) metrics are currently disabled for Deepgram Flux.
# Ideally, TTFB should measure the time from when a user starts speaking
# until we receive the first transcript. However, Deepgram Flux delivers
# both the "user started speaking" event and the first transcript simultaneously,
# making this timing measurement meaningless in this context.
# await self.start_ttfb_metrics()
await self.start_processing_metrics()
@traced_stt
async def _handle_transcription(
self, transcript: str, is_final: bool, language: Optional[Language] = None
):
"""Handle a transcription result with tracing."""
pass
def _get_websocket(self):
"""Get the current WebSocket connection.
@@ -589,27 +386,6 @@ class DeepgramFluxSTTService(WebsocketSTTService):
return self._websocket
raise Exception("Websocket not connected")
def _validate_message(self, data: Dict[str, Any]) -> bool:
"""Validate basic message structure from Deepgram Flux.
Ensures the received message has the expected structure before processing.
Args:
data: The parsed JSON message data to validate.
Returns:
True if the message structure is valid, False otherwise.
"""
if not isinstance(data, dict):
logger.warning("Message is not a dictionary")
return False
if "type" not in data:
logger.warning("Message missing 'type' field")
return False
return True
async def _receive_messages(self):
"""Receive and process messages from WebSocket.
@@ -634,269 +410,6 @@ class DeepgramFluxSTTService(WebsocketSTTService):
else:
logger.warning(f"Received non-string message: {type(message)}")
async def _handle_message(self, data: Dict[str, Any]):
"""Handle a parsed WebSocket message from Deepgram Flux.
Routes messages to appropriate handlers based on their type. Validates
message structure before processing.
Args:
data: The parsed JSON message data from the WebSocket.
"""
if not self._validate_message(data):
return
message_type = data.get("type")
try:
flux_message_type = FluxMessageType(message_type)
except ValueError:
logger.debug(f"Unhandled message type: {message_type or 'unknown'}")
return
match flux_message_type:
case FluxMessageType.RECEIVE_CONNECTED:
await self._handle_connection_established()
case FluxMessageType.RECEIVE_FATAL_ERROR:
await self._handle_fatal_error(data)
case FluxMessageType.TURN_INFO:
await self._handle_turn_info(data)
case FluxMessageType.CONFIGURE_SUCCESS:
logger.info(f"{self}: Configure accepted: {data}")
case FluxMessageType.CONFIGURE_FAILURE:
error_code = data.get("error_code", "unknown")
description = data.get("description", "no description")
error_msg = f"Configure rejected: [{error_code}] {description}"
logger.warning(f"{self}: {error_msg}")
await self.push_error(error_msg=error_msg)
async def _handle_connection_established(self):
"""Handle successful connection establishment to Deepgram Flux.
This event is fired when the WebSocket connection to Deepgram Flux
is successfully established and ready to receive audio data for
transcription processing.
"""
logger.info("Connected to Flux - ready to stream audio")
# Notify connection is established
self._connection_established_event.set()
async def _handle_fatal_error(self, data: Dict[str, Any]):
"""Handle fatal error messages from Deepgram Flux.
Fatal errors indicate unrecoverable issues with the connection or
configuration that require intervention. These errors will cause
the connection to be terminated.
Args:
data: The error message data containing error details.
Raises:
Exception: Always raises to trigger error handling in the parent service.
"""
error_msg = data.get("error", "Unknown error")
deepgram_error = f"Fatal error: {error_msg}"
logger.error(deepgram_error)
# Error will be handled inside WebsocketService->_receive_task_handler
raise Exception(deepgram_error)
async def _handle_turn_info(self, data: Dict[str, Any]):
"""Handle TurnInfo events from Deepgram Flux.
TurnInfo messages contain various turn-based events that indicate
the state of speech processing, including turn boundaries, interim
results, and turn finalization events.
Args:
data: The TurnInfo message data containing event type, transcript and some extra metadata.
"""
event = data.get("event")
transcript = data.get("transcript", "")
try:
flux_event_type = FluxEventType(event)
except ValueError:
logger.debug(f"Unhandled TurnInfo event: {event}")
return
match flux_event_type:
case FluxEventType.START_OF_TURN:
await self._handle_start_of_turn(transcript)
case FluxEventType.TURN_RESUMED:
await self._handle_turn_resumed(event)
case FluxEventType.END_OF_TURN:
await self._handle_end_of_turn(transcript, data)
case FluxEventType.EAGER_END_OF_TURN:
await self._handle_eager_end_of_turn(transcript, data)
case FluxEventType.UPDATE:
await self._handle_update(transcript)
async def _handle_start_of_turn(self, transcript: str):
"""Handle StartOfTurn events from Deepgram Flux.
StartOfTurn events are fired when Deepgram Flux detects the beginning
of a new speaking turn. This triggers bot interruption to stop any
ongoing speech synthesis and signals the start of user speech detection.
The service will:
- Send a BotInterruptionFrame upstream to stop bot speech
- Send a UserStartedSpeakingFrame downstream to notify other components
- Start metrics collection for measuring response times
Args:
transcript: maybe the first few words of the turn.
"""
logger.debug("User started speaking")
self._user_is_speaking = True
await self.broadcast_frame(UserStartedSpeakingFrame)
if self._should_interrupt:
await self.broadcast_interruption()
await self.start_metrics()
await self._call_event_handler("on_start_of_turn", transcript)
if transcript:
logger.trace(f"Start of turn transcript: {transcript}")
async def _handle_turn_resumed(self, event: str):
"""Handle TurnResumed events from Deepgram Flux.
TurnResumed events indicate that speech has resumed after a brief pause
within the same turn. This is primarily used for logging and debugging
purposes and doesn't trigger any significant processing changes.
Args:
event: The event type string for logging purposes.
"""
logger.trace(f"Received event TurnResumed: {event}")
await self._call_event_handler("on_turn_resumed")
def _calculate_average_confidence(self, transcript_data) -> Optional[float]:
"""Calculate the average confidence from transcript data.
Return None if the data is missing or invalid.
"""
# Example: Assume transcript_data has a list of words with confidence
words = transcript_data.get("words")
if not words or not isinstance(words, list):
return None
confidences = [
w.get("confidence") for w in words if isinstance(w.get("confidence"), (float, int))
]
if not confidences:
return None
return sum(confidences) / len(confidences)
async def _handle_end_of_turn(self, transcript: str, data: Dict[str, Any]):
"""Handle EndOfTurn events from Deepgram Flux.
EndOfTurn events are fired when Deepgram Flux determines that a speaking
turn has concluded, either due to sufficient silence or end-of-turn
confidence thresholds being met. This provides the final transcript
for the completed turn.
The service will:
- Create and send a final TranscriptionFrame with the complete transcript
- Trigger transcription handling with tracing for metrics
- Stop processing metrics collection
- Send a UserStoppedSpeakingFrame to signal turn completion
Args:
transcript: The final transcript text for the completed turn.
data: The TurnInfo message data containing event type, transcript and some extra metadata.
"""
logger.debug("User stopped speaking")
self._user_is_speaking = False
# Compute the average confidence
average_confidence = self._calculate_average_confidence(data)
if not self._settings.min_confidence or average_confidence > self._settings.min_confidence:
# EndOfTurn means Flux has determined the turn is complete,
# so this TranscriptionFrame is always finalized
await self.push_frame(
TranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
self._settings.language,
result=data,
finalized=True,
)
)
else:
logger.warning(
f"Transcription confidence below min_confidence threshold: {average_confidence}"
)
await self._handle_transcription(transcript, True, self._settings.language)
await self.stop_processing_metrics()
await self.broadcast_frame(UserStoppedSpeakingFrame)
await self._call_event_handler("on_end_of_turn", transcript)
async def _handle_eager_end_of_turn(self, transcript: str, data: Dict[str, Any]):
"""Handle EagerEndOfTurn events from Deepgram Flux.
EagerEndOfTurn events are fired when the end-of-turn confidence reaches the
EagerEndOfTurn threshold but hasn't yet reached the full end-of-turn threshold.
These provide interim transcripts that can be used for faster response
generation while still allowing the user to continue speaking.
EagerEndOfTurn events enable more responsive conversational AI by allowing
the LLM to start processing likely final transcripts before the turn
is definitively ended.
Args:
transcript: The interim transcript text that triggered the EagerEndOfTurn event.
data: The TurnInfo message data containing event type, transcript and some extra metadata.
"""
logger.trace(f"EagerEndOfTurn - {transcript}")
# Deepgram's EagerEndOfTurn feature enables lower-latency voice agents by sending
# medium-confidence transcripts before EndOfTurn certainty, allowing LLM processing to
# begin early.
#
# However, if speech resumes or the transcripts differ from the final EndOfTurn, the
# EagerEndOfTurn response should be cancelled to avoid incorrect or partial responses.
#
# Pipecat doesn't yet provide built-in Gate/control mechanisms to:
# 1. Start LLM/TTS processing early on EagerEndOfTurn events
# 2. Cancel in-flight processing when TurnResumed occurs
#
# By pushing EagerEndOfTurn transcripts as InterimTranscriptionFrame, we enable
# developers to implement custom EagerEndOfTurn handling in their applications while
# maintaining compatibility with existing interim transcription workflows.
#
# TODO: Implement proper EagerEndOfTurn support with cancellable processing pipeline
# that can start response generation on EagerEndOfTurn and cancel or confirm it.
await self.push_frame(
InterimTranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
self._settings.language,
result=data,
)
)
await self._call_event_handler("on_eager_end_of_turn", transcript)
async def _handle_update(self, transcript: str):
"""Handle Update events from Deepgram Flux.
Update events provide incremental transcript updates during an ongoing
turn. These events allow for real-time display of transcription progress
and can be used to provide visual feedback to users about what's being
recognized.
The service stops TTFB (Time To First Byte) metrics when the first
substantial update is received, indicating successful processing start.
Args:
transcript: The current partial transcript text for the ongoing turn.
"""
if transcript:
logger.trace(f"Update event: {transcript}")
# TTFB (Time To First Byte) metrics are currently disabled for Deepgram Flux.
# Ideally, TTFB should measure the time from when a user starts speaking
# until we receive the first transcript. However, Deepgram Flux delivers
# both the "user started speaking" event and the first transcript simultaneously,
# making this timing measurement meaningless in this context.
# await self.stop_ttfb_metrics()
await self._call_event_handler("on_update", transcript)
async def _report_error(self, error):
await self._call_event_handler("on_connection_error", error.error)
await self.push_error_frame(error)