diff --git a/changelog/4430.added.md b/changelog/4430.added.md new file mode 100644 index 000000000..2a433636b --- /dev/null +++ b/changelog/4430.added.md @@ -0,0 +1 @@ +- Added `watchdog_min_timeout` parameter to `DeepgramFluxSTT` and `DeepgramFluxSageMakerSTT` (default `0.5` seconds) to control the minimum silence duration before the watchdog sends a silence packet to prevent dangling turns. The actual threshold is `max(chunk_duration * 2, watchdog_min_timeout)`, so it also adapts automatically to the audio chunk size in use. diff --git a/changelog/4430.changed.md b/changelog/4430.changed.md new file mode 100644 index 000000000..dbad5d2c5 --- /dev/null +++ b/changelog/4430.changed.md @@ -0,0 +1 @@ +- `DeepgramFluxSTT` watchdog silence threshold is now dynamic: `max(chunk_duration * 2, watchdog_min_timeout)` instead of a fixed 500 ms. This prevents false silence injections when large audio chunks are sent at lower frequency. diff --git a/src/pipecat/services/deepgram/flux/base.py b/src/pipecat/services/deepgram/flux/base.py index d2a8a24cd..b9db15ccf 100644 --- a/src/pipecat/services/deepgram/flux/base.py +++ b/src/pipecat/services/deepgram/flux/base.py @@ -162,6 +162,7 @@ class DeepgramFluxSTTBase(STTService): mip_opt_out: bool | None = None, tag: list | None = None, should_interrupt: bool = True, + watchdog_min_timeout: float = 0.5, settings: Settings, **kwargs, ): @@ -173,6 +174,9 @@ class DeepgramFluxSTTBase(STTService): tag: Tags to label requests for identification during usage reporting. should_interrupt: Whether to interrupt the bot when Flux detects that the user is speaking. + watchdog_min_timeout: minimum idle timeout before sending silence to + prevent dangling turns. The actual threshold is + ``max(chunk_duration * 2, watchdog_min_timeout)``. Defaults to 0.5. settings: Fully resolved settings instance (built by concrete subclass). **kwargs: Additional arguments passed to the parent STTService (e.g. ``sample_rate``, ``reconnect_on_error``). @@ -183,6 +187,7 @@ class DeepgramFluxSTTBase(STTService): self._mip_opt_out = mip_opt_out self._tag = tag or [] self._should_interrupt = should_interrupt + self._watchdog_min_timeout = watchdog_min_timeout # Connection readiness: Flux sends a "Connected" message when ready self._connection_established_event = asyncio.Event() @@ -191,6 +196,7 @@ class DeepgramFluxSTTBase(STTService): self._last_stt_time: float | None = None self._watchdog_task: asyncio.Task | None = None self._user_is_speaking = False + self._last_audio_chunk_duration: float = 0.0 # Flux event handlers self._register_event_handler("on_start_of_turn") @@ -291,9 +297,17 @@ class DeepgramFluxSTTBase(STTService): """ while self._transport_is_active(): now = time.monotonic() - # More than 500 ms without sending new audio to Flux - if self._user_is_speaking and self._last_stt_time and now - self._last_stt_time > 0.5: - logger.warning("Sending silence to Flux to prevent dangling task") + # Send silence if we go more than 500 ms or twice the chunk size + # without sending new audio to Flux. + threshold = max(self._last_audio_chunk_duration * 2, self._watchdog_min_timeout) + if ( + self._user_is_speaking + and self._last_stt_time + and now - self._last_stt_time > threshold + ): + logger.warning( + f"No audio received for {threshold * 1000:.0f} ms. Sending silence to Flux to prevent a dangling task" + ) try: await self._send_silence() except Exception as e: diff --git a/src/pipecat/services/deepgram/flux/sagemaker/stt.py b/src/pipecat/services/deepgram/flux/sagemaker/stt.py index 0f7a6fded..9fdcf37be 100644 --- a/src/pipecat/services/deepgram/flux/sagemaker/stt.py +++ b/src/pipecat/services/deepgram/flux/sagemaker/stt.py @@ -89,6 +89,7 @@ class DeepgramFluxSageMakerSTTService(DeepgramFluxSTTBase): mip_opt_out: bool | None = None, tag: list | None = None, should_interrupt: bool = True, + watchdog_min_timeout: float = 0.5, settings: Settings | None = None, **kwargs, ): @@ -105,6 +106,8 @@ class DeepgramFluxSageMakerSTTService(DeepgramFluxSTTBase): tag: Tags to label requests for identification during usage reporting. should_interrupt: Whether to interrupt the bot when Flux detects that the user is speaking. Defaults to True. + watchdog_min_timeout: Minimum silence duration in seconds before the watchdog + sends silence to prevent dangling turns. Defaults to 0.5. settings: Runtime-updatable settings. **kwargs: Additional arguments passed to the parent STTService. """ @@ -129,6 +132,7 @@ class DeepgramFluxSageMakerSTTService(DeepgramFluxSTTBase): mip_opt_out=mip_opt_out, tag=tag, should_interrupt=should_interrupt, + watchdog_min_timeout=watchdog_min_timeout, settings=default_settings, sample_rate=sample_rate, **kwargs, @@ -245,6 +249,7 @@ class DeepgramFluxSageMakerSTTService(DeepgramFluxSTTBase): if self._client and self._client.is_active: try: self._last_stt_time = time.monotonic() + self._last_audio_chunk_duration = len(audio) / (self.sample_rate * 2) await self._client.send_audio_chunk(audio) except Exception as e: yield ErrorFrame(error=f"Unknown error occurred: {e}") diff --git a/src/pipecat/services/deepgram/flux/stt.py b/src/pipecat/services/deepgram/flux/stt.py index f89b37f0b..f4b5241d0 100644 --- a/src/pipecat/services/deepgram/flux/stt.py +++ b/src/pipecat/services/deepgram/flux/stt.py @@ -115,6 +115,7 @@ class DeepgramFluxSTTService(DeepgramFluxSTTBase, WebsocketService): tag: list | None = None, params: InputParams | None = None, should_interrupt: bool = True, + watchdog_min_timeout: float = 0.5, settings: Settings | None = None, **kwargs, ): @@ -140,6 +141,8 @@ class DeepgramFluxSTTService(DeepgramFluxSTTBase, WebsocketService): Use ``settings=DeepgramFluxSTTService.Settings(...)`` instead. should_interrupt: Determine whether the bot should be interrupted when Flux detects that the user is speaking. + watchdog_min_timeout: Minimum silence duration in seconds before the watchdog + sends silence to prevent dangling turns. Defaults to 0.5. settings: Runtime-updatable settings. When provided alongside deprecated parameters, ``settings`` values take precedence. **kwargs: Additional arguments passed to the parent classes. @@ -224,6 +227,7 @@ class DeepgramFluxSTTService(DeepgramFluxSTTBase, WebsocketService): mip_opt_out=mip_opt_out, tag=tag, should_interrupt=should_interrupt, + watchdog_min_timeout=watchdog_min_timeout, settings=default_settings, sample_rate=sample_rate, **kwargs, @@ -390,6 +394,7 @@ class DeepgramFluxSTTService(DeepgramFluxSTTBase, WebsocketService): try: self._last_stt_time = time.monotonic() + self._last_audio_chunk_duration = len(audio) / (self.sample_rate * 2) await self.send_with_retry(audio, self._report_error) except Exception as e: yield ErrorFrame(error=f"Unknown error occurred: {e}")