Merge pull request #4430 from pipecat-ai/filipi/flux_audio

Implementing dynamic watchdog timeout for Deepgram Flux STT
This commit is contained in:
Filipi da Silva Fuchter
2026-05-06 11:40:06 -03:00
committed by GitHub
5 changed files with 29 additions and 3 deletions

1
changelog/4430.added.md Normal file
View File

@@ -0,0 +1 @@
- Added `watchdog_min_timeout` parameter to `DeepgramFluxSTT` and `DeepgramFluxSageMakerSTT` (default `0.5` seconds) to control the minimum silence duration before the watchdog sends a silence packet to prevent dangling turns. The actual threshold is `max(chunk_duration * 2, watchdog_min_timeout)`, so it also adapts automatically to the audio chunk size in use.

View File

@@ -0,0 +1 @@
- `DeepgramFluxSTT` watchdog silence threshold is now dynamic: `max(chunk_duration * 2, watchdog_min_timeout)` instead of a fixed 500 ms. This prevents false silence injections when large audio chunks are sent at lower frequency.

View File

@@ -162,6 +162,7 @@ class DeepgramFluxSTTBase(STTService):
mip_opt_out: bool | None = None,
tag: list | None = None,
should_interrupt: bool = True,
watchdog_min_timeout: float = 0.5,
settings: Settings,
**kwargs,
):
@@ -173,6 +174,9 @@ class DeepgramFluxSTTBase(STTService):
tag: Tags to label requests for identification during usage reporting.
should_interrupt: Whether to interrupt the bot when Flux detects that
the user is speaking.
watchdog_min_timeout: minimum idle timeout before sending silence to
prevent dangling turns. The actual threshold is
``max(chunk_duration * 2, watchdog_min_timeout)``. Defaults to 0.5.
settings: Fully resolved settings instance (built by concrete subclass).
**kwargs: Additional arguments passed to the parent STTService (e.g.
``sample_rate``, ``reconnect_on_error``).
@@ -183,6 +187,7 @@ class DeepgramFluxSTTBase(STTService):
self._mip_opt_out = mip_opt_out
self._tag = tag or []
self._should_interrupt = should_interrupt
self._watchdog_min_timeout = watchdog_min_timeout
# Connection readiness: Flux sends a "Connected" message when ready
self._connection_established_event = asyncio.Event()
@@ -191,6 +196,7 @@ class DeepgramFluxSTTBase(STTService):
self._last_stt_time: float | None = None
self._watchdog_task: asyncio.Task | None = None
self._user_is_speaking = False
self._last_audio_chunk_duration: float = 0.0
# Flux event handlers
self._register_event_handler("on_start_of_turn")
@@ -291,9 +297,17 @@ class DeepgramFluxSTTBase(STTService):
"""
while self._transport_is_active():
now = time.monotonic()
# More than 500 ms without sending new audio to Flux
if self._user_is_speaking and self._last_stt_time and now - self._last_stt_time > 0.5:
logger.warning("Sending silence to Flux to prevent dangling task")
# Send silence if we go more than 500 ms or twice the chunk size
# without sending new audio to Flux.
threshold = max(self._last_audio_chunk_duration * 2, self._watchdog_min_timeout)
if (
self._user_is_speaking
and self._last_stt_time
and now - self._last_stt_time > threshold
):
logger.warning(
f"No audio received for {threshold * 1000:.0f} ms. Sending silence to Flux to prevent a dangling task"
)
try:
await self._send_silence()
except Exception as e:

View File

@@ -89,6 +89,7 @@ class DeepgramFluxSageMakerSTTService(DeepgramFluxSTTBase):
mip_opt_out: bool | None = None,
tag: list | None = None,
should_interrupt: bool = True,
watchdog_min_timeout: float = 0.5,
settings: Settings | None = None,
**kwargs,
):
@@ -105,6 +106,8 @@ class DeepgramFluxSageMakerSTTService(DeepgramFluxSTTBase):
tag: Tags to label requests for identification during usage reporting.
should_interrupt: Whether to interrupt the bot when Flux detects that
the user is speaking. Defaults to True.
watchdog_min_timeout: Minimum silence duration in seconds before the watchdog
sends silence to prevent dangling turns. Defaults to 0.5.
settings: Runtime-updatable settings.
**kwargs: Additional arguments passed to the parent STTService.
"""
@@ -129,6 +132,7 @@ class DeepgramFluxSageMakerSTTService(DeepgramFluxSTTBase):
mip_opt_out=mip_opt_out,
tag=tag,
should_interrupt=should_interrupt,
watchdog_min_timeout=watchdog_min_timeout,
settings=default_settings,
sample_rate=sample_rate,
**kwargs,
@@ -245,6 +249,7 @@ class DeepgramFluxSageMakerSTTService(DeepgramFluxSTTBase):
if self._client and self._client.is_active:
try:
self._last_stt_time = time.monotonic()
self._last_audio_chunk_duration = len(audio) / (self.sample_rate * 2)
await self._client.send_audio_chunk(audio)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")

View File

@@ -115,6 +115,7 @@ class DeepgramFluxSTTService(DeepgramFluxSTTBase, WebsocketService):
tag: list | None = None,
params: InputParams | None = None,
should_interrupt: bool = True,
watchdog_min_timeout: float = 0.5,
settings: Settings | None = None,
**kwargs,
):
@@ -140,6 +141,8 @@ class DeepgramFluxSTTService(DeepgramFluxSTTBase, WebsocketService):
Use ``settings=DeepgramFluxSTTService.Settings(...)`` instead.
should_interrupt: Determine whether the bot should be interrupted when Flux detects that the user is speaking.
watchdog_min_timeout: Minimum silence duration in seconds before the watchdog
sends silence to prevent dangling turns. Defaults to 0.5.
settings: Runtime-updatable settings. When provided alongside deprecated
parameters, ``settings`` values take precedence.
**kwargs: Additional arguments passed to the parent classes.
@@ -224,6 +227,7 @@ class DeepgramFluxSTTService(DeepgramFluxSTTBase, WebsocketService):
mip_opt_out=mip_opt_out,
tag=tag,
should_interrupt=should_interrupt,
watchdog_min_timeout=watchdog_min_timeout,
settings=default_settings,
sample_rate=sample_rate,
**kwargs,
@@ -390,6 +394,7 @@ class DeepgramFluxSTTService(DeepgramFluxSTTBase, WebsocketService):
try:
self._last_stt_time = time.monotonic()
self._last_audio_chunk_duration = len(audio) / (self.sample_rate * 2)
await self.send_with_retry(audio, self._report_error)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")