From 0c779b4c3d1f64e6ee114aa76880b8766642a4af Mon Sep 17 00:00:00 2001 From: filipi87 Date: Wed, 6 May 2026 11:01:58 -0300 Subject: [PATCH 1/4] Implementing dynamic watchdog timeout for Deepgram Flux STT --- src/pipecat/services/deepgram/flux/base.py | 9 ++++++--- src/pipecat/services/deepgram/flux/sagemaker/stt.py | 1 + src/pipecat/services/deepgram/flux/stt.py | 1 + 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/pipecat/services/deepgram/flux/base.py b/src/pipecat/services/deepgram/flux/base.py index d2a8a24cd..b69b60749 100644 --- a/src/pipecat/services/deepgram/flux/base.py +++ b/src/pipecat/services/deepgram/flux/base.py @@ -191,6 +191,7 @@ class DeepgramFluxSTTBase(STTService): self._last_stt_time: float | None = None self._watchdog_task: asyncio.Task | None = None self._user_is_speaking = False + self._last_audio_chunk_duration: float = 0.0 # Flux event handlers self._register_event_handler("on_start_of_turn") @@ -291,9 +292,11 @@ class DeepgramFluxSTTBase(STTService): """ while self._transport_is_active(): now = time.monotonic() - # More than 500 ms without sending new audio to Flux - if self._user_is_speaking and self._last_stt_time and now - self._last_stt_time > 0.5: - logger.warning("Sending silence to Flux to prevent dangling task") + # Send silence if we go more than 500 ms or twice the chunk size + # without sending new audio to Flux. + threshold = max(self._last_audio_chunk_duration * 2, 0.5) + if self._user_is_speaking and self._last_stt_time and now - self._last_stt_time > threshold: + logger.warning(f"No audio received for {threshold * 1000:.0f} ms. Sending silence to Flux to prevent a dangling task") try: await self._send_silence() except Exception as e: diff --git a/src/pipecat/services/deepgram/flux/sagemaker/stt.py b/src/pipecat/services/deepgram/flux/sagemaker/stt.py index 0f7a6fded..177e44063 100644 --- a/src/pipecat/services/deepgram/flux/sagemaker/stt.py +++ b/src/pipecat/services/deepgram/flux/sagemaker/stt.py @@ -245,6 +245,7 @@ class DeepgramFluxSageMakerSTTService(DeepgramFluxSTTBase): if self._client and self._client.is_active: try: self._last_stt_time = time.monotonic() + self._last_audio_chunk_duration = len(audio) / (self.sample_rate * 2) await self._client.send_audio_chunk(audio) except Exception as e: yield ErrorFrame(error=f"Unknown error occurred: {e}") diff --git a/src/pipecat/services/deepgram/flux/stt.py b/src/pipecat/services/deepgram/flux/stt.py index f89b37f0b..2499201cc 100644 --- a/src/pipecat/services/deepgram/flux/stt.py +++ b/src/pipecat/services/deepgram/flux/stt.py @@ -390,6 +390,7 @@ class DeepgramFluxSTTService(DeepgramFluxSTTBase, WebsocketService): try: self._last_stt_time = time.monotonic() + self._last_audio_chunk_duration = len(audio) / (self.sample_rate * 2) await self.send_with_retry(audio, self._report_error) except Exception as e: yield ErrorFrame(error=f"Unknown error occurred: {e}") From 1cb77b422aa14e4639343c8be44eec2682645711 Mon Sep 17 00:00:00 2001 From: filipi87 Date: Wed, 6 May 2026 11:22:37 -0300 Subject: [PATCH 2/4] Created a watchdog_min_timeout to allow to change the default value. --- src/pipecat/services/deepgram/flux/base.py | 18 +++++++++++++++--- .../services/deepgram/flux/sagemaker/stt.py | 4 ++++ src/pipecat/services/deepgram/flux/stt.py | 4 ++++ 3 files changed, 23 insertions(+), 3 deletions(-) diff --git a/src/pipecat/services/deepgram/flux/base.py b/src/pipecat/services/deepgram/flux/base.py index b69b60749..b2ae5ae2e 100644 --- a/src/pipecat/services/deepgram/flux/base.py +++ b/src/pipecat/services/deepgram/flux/base.py @@ -162,6 +162,7 @@ class DeepgramFluxSTTBase(STTService): mip_opt_out: bool | None = None, tag: list | None = None, should_interrupt: bool = True, + watchdog_min_timeout: float = 0.5, settings: Settings, **kwargs, ): @@ -173,6 +174,10 @@ class DeepgramFluxSTTBase(STTService): tag: Tags to label requests for identification during usage reporting. should_interrupt: Whether to interrupt the bot when Flux detects that the user is speaking. + watchdog_min_timeout: Minimum silence duration in seconds before the + watchdog sends a silence packet to prevent dangling turns. The + actual threshold is ``max(chunk_duration * 2, watchdog_min_timeout)``. + Defaults to 0.5. settings: Fully resolved settings instance (built by concrete subclass). **kwargs: Additional arguments passed to the parent STTService (e.g. ``sample_rate``, ``reconnect_on_error``). @@ -183,6 +188,7 @@ class DeepgramFluxSTTBase(STTService): self._mip_opt_out = mip_opt_out self._tag = tag or [] self._should_interrupt = should_interrupt + self._watchdog_min_timeout = watchdog_min_timeout # Connection readiness: Flux sends a "Connected" message when ready self._connection_established_event = asyncio.Event() @@ -294,9 +300,15 @@ class DeepgramFluxSTTBase(STTService): now = time.monotonic() # Send silence if we go more than 500 ms or twice the chunk size # without sending new audio to Flux. - threshold = max(self._last_audio_chunk_duration * 2, 0.5) - if self._user_is_speaking and self._last_stt_time and now - self._last_stt_time > threshold: - logger.warning(f"No audio received for {threshold * 1000:.0f} ms. Sending silence to Flux to prevent a dangling task") + threshold = max(self._last_audio_chunk_duration * 2, self._watchdog_min_timeout) + if ( + self._user_is_speaking + and self._last_stt_time + and now - self._last_stt_time > threshold + ): + logger.warning( + f"No audio received for {threshold * 1000:.0f} ms. Sending silence to Flux to prevent a dangling task" + ) try: await self._send_silence() except Exception as e: diff --git a/src/pipecat/services/deepgram/flux/sagemaker/stt.py b/src/pipecat/services/deepgram/flux/sagemaker/stt.py index 177e44063..9fdcf37be 100644 --- a/src/pipecat/services/deepgram/flux/sagemaker/stt.py +++ b/src/pipecat/services/deepgram/flux/sagemaker/stt.py @@ -89,6 +89,7 @@ class DeepgramFluxSageMakerSTTService(DeepgramFluxSTTBase): mip_opt_out: bool | None = None, tag: list | None = None, should_interrupt: bool = True, + watchdog_min_timeout: float = 0.5, settings: Settings | None = None, **kwargs, ): @@ -105,6 +106,8 @@ class DeepgramFluxSageMakerSTTService(DeepgramFluxSTTBase): tag: Tags to label requests for identification during usage reporting. should_interrupt: Whether to interrupt the bot when Flux detects that the user is speaking. Defaults to True. + watchdog_min_timeout: Minimum silence duration in seconds before the watchdog + sends silence to prevent dangling turns. Defaults to 0.5. settings: Runtime-updatable settings. **kwargs: Additional arguments passed to the parent STTService. """ @@ -129,6 +132,7 @@ class DeepgramFluxSageMakerSTTService(DeepgramFluxSTTBase): mip_opt_out=mip_opt_out, tag=tag, should_interrupt=should_interrupt, + watchdog_min_timeout=watchdog_min_timeout, settings=default_settings, sample_rate=sample_rate, **kwargs, diff --git a/src/pipecat/services/deepgram/flux/stt.py b/src/pipecat/services/deepgram/flux/stt.py index 2499201cc..f4b5241d0 100644 --- a/src/pipecat/services/deepgram/flux/stt.py +++ b/src/pipecat/services/deepgram/flux/stt.py @@ -115,6 +115,7 @@ class DeepgramFluxSTTService(DeepgramFluxSTTBase, WebsocketService): tag: list | None = None, params: InputParams | None = None, should_interrupt: bool = True, + watchdog_min_timeout: float = 0.5, settings: Settings | None = None, **kwargs, ): @@ -140,6 +141,8 @@ class DeepgramFluxSTTService(DeepgramFluxSTTBase, WebsocketService): Use ``settings=DeepgramFluxSTTService.Settings(...)`` instead. should_interrupt: Determine whether the bot should be interrupted when Flux detects that the user is speaking. + watchdog_min_timeout: Minimum silence duration in seconds before the watchdog + sends silence to prevent dangling turns. Defaults to 0.5. settings: Runtime-updatable settings. When provided alongside deprecated parameters, ``settings`` values take precedence. **kwargs: Additional arguments passed to the parent classes. @@ -224,6 +227,7 @@ class DeepgramFluxSTTService(DeepgramFluxSTTBase, WebsocketService): mip_opt_out=mip_opt_out, tag=tag, should_interrupt=should_interrupt, + watchdog_min_timeout=watchdog_min_timeout, settings=default_settings, sample_rate=sample_rate, **kwargs, From 5daf267c1110ab9a6e57f3c38f72b2fb6b3419ce Mon Sep 17 00:00:00 2001 From: filipi87 Date: Wed, 6 May 2026 11:26:14 -0300 Subject: [PATCH 3/4] Adding changelogs. --- changelog/4430.added.md | 1 + changelog/4430.changed.md | 1 + 2 files changed, 2 insertions(+) create mode 100644 changelog/4430.added.md create mode 100644 changelog/4430.changed.md diff --git a/changelog/4430.added.md b/changelog/4430.added.md new file mode 100644 index 000000000..2a433636b --- /dev/null +++ b/changelog/4430.added.md @@ -0,0 +1 @@ +- Added `watchdog_min_timeout` parameter to `DeepgramFluxSTT` and `DeepgramFluxSageMakerSTT` (default `0.5` seconds) to control the minimum silence duration before the watchdog sends a silence packet to prevent dangling turns. The actual threshold is `max(chunk_duration * 2, watchdog_min_timeout)`, so it also adapts automatically to the audio chunk size in use. diff --git a/changelog/4430.changed.md b/changelog/4430.changed.md new file mode 100644 index 000000000..dbad5d2c5 --- /dev/null +++ b/changelog/4430.changed.md @@ -0,0 +1 @@ +- `DeepgramFluxSTT` watchdog silence threshold is now dynamic: `max(chunk_duration * 2, watchdog_min_timeout)` instead of a fixed 500 ms. This prevents false silence injections when large audio chunks are sent at lower frequency. From 03e5ebb266fc1975ffdffa20b2a7134cebacac1f Mon Sep 17 00:00:00 2001 From: filipi87 Date: Wed, 6 May 2026 11:37:18 -0300 Subject: [PATCH 4/4] Improving watchdog_min_timeout description. --- src/pipecat/services/deepgram/flux/base.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/pipecat/services/deepgram/flux/base.py b/src/pipecat/services/deepgram/flux/base.py index b2ae5ae2e..b9db15ccf 100644 --- a/src/pipecat/services/deepgram/flux/base.py +++ b/src/pipecat/services/deepgram/flux/base.py @@ -174,10 +174,9 @@ class DeepgramFluxSTTBase(STTService): tag: Tags to label requests for identification during usage reporting. should_interrupt: Whether to interrupt the bot when Flux detects that the user is speaking. - watchdog_min_timeout: Minimum silence duration in seconds before the - watchdog sends a silence packet to prevent dangling turns. The - actual threshold is ``max(chunk_duration * 2, watchdog_min_timeout)``. - Defaults to 0.5. + watchdog_min_timeout: minimum idle timeout before sending silence to + prevent dangling turns. The actual threshold is + ``max(chunk_duration * 2, watchdog_min_timeout)``. Defaults to 0.5. settings: Fully resolved settings instance (built by concrete subclass). **kwargs: Additional arguments passed to the parent STTService (e.g. ``sample_rate``, ``reconnect_on_error``).