Merge pull request #2944 from pipecat-ai/filipi/flux_handlers

New event handlers for the DeepgramFluxSTTService.
This commit is contained in:
Filipi da Silva Fuchter
2025-10-30 16:40:41 -03:00
committed by GitHub
3 changed files with 18 additions and 0 deletions

View File

@@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- New event handlers for the `DeepgramFluxSTTService`: `on_start_of_turn`,
`on_turn_resumed`, `on_end_of_turn`, `on_eager_end_of_turn`, `on_update`.
- Added `generation_config` parameter support to `CartesiaTTSService` and
`CartesiaHttpTTSService` for Cartesia Sonic-3 models. Includes a new
`GenerationConfig` class with `volume` (0.5-2.0), `speed` (0.6-1.5),

View File

@@ -101,6 +101,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Client disconnected")
await task.cancel()
@stt.event_handler("on_update")
async def on_deepgram_flux_update(stt, transcript):
logger.debug(f"On deeggram flux update: {transcript}")
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)

View File

@@ -156,6 +156,12 @@ class DeepgramFluxSTTService(WebsocketSTTService):
self._language = Language.EN
self._websocket_url = None
self._receive_task = None
# Flux event handlers
self._register_event_handler("on_start_of_turn")
self._register_event_handler("on_turn_resumed")
self._register_event_handler("on_end_of_turn")
self._register_event_handler("on_eager_end_of_turn")
self._register_event_handler("on_update")
async def _connect(self):
"""Connect to WebSocket and start background tasks.
@@ -523,6 +529,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
await self.push_frame(UserStartedSpeakingFrame(), FrameDirection.DOWNSTREAM)
await self.push_frame(UserStartedSpeakingFrame(), FrameDirection.UPSTREAM)
await self.start_metrics()
await self._call_event_handler("on_start_of_turn", transcript)
if transcript:
logger.trace(f"Start of turn transcript: {transcript}")
@@ -537,6 +544,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
event: The event type string for logging purposes.
"""
logger.trace(f"Received event TurnResumed: {event}")
await self._call_event_handler("on_turn_resumed")
async def _handle_end_of_turn(self, transcript: str, data: Dict[str, Any]):
"""Handle EndOfTurn events from Deepgram Flux.
@@ -571,6 +579,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
await self.stop_processing_metrics()
await self.push_frame(UserStoppedSpeakingFrame(), FrameDirection.DOWNSTREAM)
await self.push_frame(UserStoppedSpeakingFrame(), FrameDirection.UPSTREAM)
await self._call_event_handler("on_end_of_turn", transcript)
async def _handle_eager_end_of_turn(self, transcript: str, data: Dict[str, Any]):
"""Handle EagerEndOfTurn events from Deepgram Flux.
@@ -615,6 +624,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
result=data,
)
)
await self._call_event_handler("on_eager_end_of_turn", transcript)
async def _handle_update(self, transcript: str):
"""Handle Update events from Deepgram Flux.
@@ -638,3 +648,4 @@ class DeepgramFluxSTTService(WebsocketSTTService):
# both the "user started speaking" event and the first transcript simultaneously,
# making this timing measurement meaningless in this context.
# await self.stop_ttfb_metrics()
await self._call_event_handler("on_update", transcript)