From 52b33e5106055d63612ed981a7b546dcbd788850 Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Thu, 30 Oct 2025 16:09:07 -0300 Subject: [PATCH] New event handlers for the DeepgramFluxSTTService. --- CHANGELOG.md | 3 +++ .../foundational/07c-interruptible-deepgram-flux.py | 4 ++++ src/pipecat/services/deepgram/flux/stt.py | 11 +++++++++++ 3 files changed, 18 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 24484687f..c0ba85a6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- New event handlers for the `DeepgramFluxSTTService`: `on_start_of_turn`, + `on_turn_resumed`, `on_end_of_turn`, `on_eager_end_of_turn`, `on_update`. + - Added `generation_config` parameter support to `CartesiaTTSService` and `CartesiaHttpTTSService` for Cartesia Sonic-3 models. Includes a new `GenerationConfig` class with `volume` (0.5-2.0), `speed` (0.6-1.5), diff --git a/examples/foundational/07c-interruptible-deepgram-flux.py b/examples/foundational/07c-interruptible-deepgram-flux.py index 1331ab2a3..75a022a5c 100644 --- a/examples/foundational/07c-interruptible-deepgram-flux.py +++ b/examples/foundational/07c-interruptible-deepgram-flux.py @@ -101,6 +101,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info(f"Client disconnected") await task.cancel() + @stt.event_handler("on_update") + async def on_deepgram_flux_update(stt, transcript): + logger.debug(f"On deeggram flux update: {transcript}") + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) await runner.run(task) diff --git a/src/pipecat/services/deepgram/flux/stt.py b/src/pipecat/services/deepgram/flux/stt.py index f0b1a5baa..78f615a4d 100644 --- a/src/pipecat/services/deepgram/flux/stt.py +++ b/src/pipecat/services/deepgram/flux/stt.py @@ -156,6 +156,12 @@ class DeepgramFluxSTTService(WebsocketSTTService): self._language = Language.EN self._websocket_url = None self._receive_task = None + # Flux event handlers + self._register_event_handler("on_start_of_turn") + self._register_event_handler("on_turn_resumed") + self._register_event_handler("on_end_of_turn") + self._register_event_handler("on_eager_end_of_turn") + self._register_event_handler("on_update") async def _connect(self): """Connect to WebSocket and start background tasks. @@ -523,6 +529,7 @@ class DeepgramFluxSTTService(WebsocketSTTService): await self.push_frame(UserStartedSpeakingFrame(), FrameDirection.DOWNSTREAM) await self.push_frame(UserStartedSpeakingFrame(), FrameDirection.UPSTREAM) await self.start_metrics() + await self._call_event_handler("on_start_of_turn", transcript) if transcript: logger.trace(f"Start of turn transcript: {transcript}") @@ -537,6 +544,7 @@ class DeepgramFluxSTTService(WebsocketSTTService): event: The event type string for logging purposes. """ logger.trace(f"Received event TurnResumed: {event}") + await self._call_event_handler("on_turn_resumed") async def _handle_end_of_turn(self, transcript: str, data: Dict[str, Any]): """Handle EndOfTurn events from Deepgram Flux. @@ -571,6 +579,7 @@ class DeepgramFluxSTTService(WebsocketSTTService): await self.stop_processing_metrics() await self.push_frame(UserStoppedSpeakingFrame(), FrameDirection.DOWNSTREAM) await self.push_frame(UserStoppedSpeakingFrame(), FrameDirection.UPSTREAM) + await self._call_event_handler("on_end_of_turn", transcript) async def _handle_eager_end_of_turn(self, transcript: str, data: Dict[str, Any]): """Handle EagerEndOfTurn events from Deepgram Flux. @@ -615,6 +624,7 @@ class DeepgramFluxSTTService(WebsocketSTTService): result=data, ) ) + await self._call_event_handler("on_eager_end_of_turn", transcript) async def _handle_update(self, transcript: str): """Handle Update events from Deepgram Flux. @@ -638,3 +648,4 @@ class DeepgramFluxSTTService(WebsocketSTTService): # both the "user started speaking" event and the first transcript simultaneously, # making this timing measurement meaningless in this context. # await self.stop_ttfb_metrics() + await self._call_event_handler("on_update", transcript)