From 2968c846cef97ae720d55ec669bb440411dcfee6 Mon Sep 17 00:00:00 2001 From: Matej Marinko Date: Wed, 28 May 2025 09:35:21 +0200 Subject: [PATCH] Add Soniox STT service --- README.md | 2 +- docs/api/conf.py | 1 + docs/api/requirements.txt | 1 + dot-env.template | 5 +- .../foundational/07za-interruptible-soniox.py | 115 ++++++ .../foundational/13f-soniox-transcription.py | 77 ++++ pyproject.toml | 1 + src/pipecat/services/soniox/__init__.py | 13 + src/pipecat/services/soniox/config.py | 36 ++ src/pipecat/services/soniox/stt.py | 368 ++++++++++++++++++ 10 files changed, 617 insertions(+), 2 deletions(-) create mode 100644 examples/foundational/07za-interruptible-soniox.py create mode 100644 examples/foundational/13f-soniox-transcription.py create mode 100644 src/pipecat/services/soniox/__init__.py create mode 100644 src/pipecat/services/soniox/config.py create mode 100644 src/pipecat/services/soniox/stt.py diff --git a/README.md b/README.md index 3966e8d65..6833cf7c5 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,7 @@ You can connect to Pipecat from any platform using our official SDKs: | Category | Services | | ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [Parakeet (NVIDIA)](https://docs.pipecat.ai/server/services/stt/parakeet), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) | +| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [Parakeet (NVIDIA)](https://docs.pipecat.ai/server/services/stt/parakeet), [Soniox](https://docs.pipecat.ai/server/services/stt/soniox), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) | | LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [Together AI](https://docs.pipecat.ai/server/services/llm/together) | | Text-to-Speech | [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [FastPitch (NVIDIA)](https://docs.pipecat.ai/server/services/tts/fastpitch), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) | | Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) | diff --git a/docs/api/conf.py b/docs/api/conf.py index a33caa10c..8de9108f8 100644 --- a/docs/api/conf.py +++ b/docs/api/conf.py @@ -75,6 +75,7 @@ autodoc_mock_imports = [ "openpipe", "simli", "soundfile", + "soniox", "pipecat_ai_krisp", "pyaudio", "_tkinter", diff --git a/docs/api/requirements.txt b/docs/api/requirements.txt index a77ff1084..73cda37cc 100644 --- a/docs/api/requirements.txt +++ b/docs/api/requirements.txt @@ -45,6 +45,7 @@ pipecat-ai[remote-smart-turn] pipecat-ai[silero] pipecat-ai[simli] pipecat-ai[soundfile] +pipecat-ai[soniox] pipecat-ai[tavus] pipecat-ai[together] # pipecat-ai[ultravox] # Mocked diff --git a/dot-env.template b/dot-env.template index 20d73b3ad..9d2128355 100644 --- a/dot-env.template +++ b/dot-env.template @@ -107,4 +107,7 @@ MINIMAX_API_KEY=... MINIMAX_GROUP_ID=... # Sarvam AI -SARVAM_API_KEY=... \ No newline at end of file +SARVAM_API_KEY=... + +# Soniox +SONIOX_API_KEY= \ No newline at end of file diff --git a/examples/foundational/07za-interruptible-soniox.py b/examples/foundational/07za-interruptible-soniox.py new file mode 100644 index 000000000..f3b3487d7 --- /dev/null +++ b/examples/foundational/07za-interruptible-soniox.py @@ -0,0 +1,115 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import argparse +import os + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.openai.llm import OpenAILLMService +from pipecat.services.soniox.config import SonioxInputParams +from pipecat.services.soniox.stt import SonioxSTTService +from pipecat.transcriptions.language import Language +from pipecat.transports.base_transport import TransportParams +from pipecat.transports.network.small_webrtc import SmallWebRTCTransport +from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection + +load_dotenv(override=True) + + +async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespace): + logger.info(f"Starting bot") + + transport = SmallWebRTCTransport( + webrtc_connection=webrtc_connection, + params=TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + ) + + stt = SonioxSTTService( + api_key=os.getenv("SONIOX_API_KEY"), + params=SonioxInputParams( + # Add language hints to improve transcription accuracy. Variants are ignored. + # For example "en-GB" will be treated same as "en". + # List of supported languages: https://soniox.com/docs/speech-to-text/core-concepts/supported-languages + language_hints=[Language.EN, Language.ES, Language.JA, Language.ZH], + ), + ) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + allow_interruptions=True, + enable_metrics=True, + enable_usage_metrics=True, + report_only_initial_ttfb=True, + ), + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + # Kick off the conversation. + messages.append({"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + + @transport.event_handler("on_client_closed") + async def on_client_closed(transport, client): + logger.info(f"Client closed connection") + await task.cancel() + + runner = PipelineRunner(handle_sigint=False) + + await runner.run(task) + + +if __name__ == "__main__": + from run import main + + main() diff --git a/examples/foundational/13f-soniox-transcription.py b/examples/foundational/13f-soniox-transcription.py new file mode 100644 index 000000000..6e9d356f3 --- /dev/null +++ b/examples/foundational/13f-soniox-transcription.py @@ -0,0 +1,77 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import argparse +import os + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.frames.frames import Frame, TranscriptionFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineTask +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.services.soniox.config import SonioxInputParams +from pipecat.services.soniox.stt import SonioxSTTService +from pipecat.transcriptions.language import Language +from pipecat.transports.base_transport import TransportParams +from pipecat.transports.network.small_webrtc import SmallWebRTCTransport +from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection + +load_dotenv(override=True) + + +class TranscriptionLogger(FrameProcessor): + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, TranscriptionFrame): + print(f"Transcription: {frame.text}") + + +async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespace): + logger.info(f"Starting bot") + + transport = SmallWebRTCTransport( + webrtc_connection=webrtc_connection, + params=TransportParams(audio_in_enabled=True), + ) + + stt = SonioxSTTService( + api_key=os.getenv("SONIOX_API_KEY"), + params=SonioxInputParams( + # Add language hints to improve transcription accuracy. Variants are ignored. + # For example "en-GB" will be treated same as "en". + # List of supported languages: https://soniox.com/docs/speech-to-text/core-concepts/supported-languages + language_hints=[Language.EN, Language.ES, Language.JA, Language.ZH], + ), + ) + + tl = TranscriptionLogger() + + pipeline = Pipeline([transport.input(), stt, tl]) + + task = PipelineTask(pipeline) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + + @transport.event_handler("on_client_closed") + async def on_client_closed(transport, client): + logger.info(f"Client closed connection") + await task.cancel() + + runner = PipelineRunner(handle_sigint=False) + + await runner.run(task) + + +if __name__ == "__main__": + from run import main + + main() diff --git a/pyproject.toml b/pyproject.toml index a2e50906d..a89cee5f8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -84,6 +84,7 @@ local-smart-turn = [ "coremltools>=8.0", "transformers", "torch==2.5.0", "torcha remote-smart-turn = [] silero = [ "onnxruntime~=1.20.1" ] simli = [ "simli-ai~=0.1.10"] +soniox = [ "websockets~=13.1" ] soundfile = [ "soundfile~=0.13.0" ] tavus=[] together = [] diff --git a/src/pipecat/services/soniox/__init__.py b/src/pipecat/services/soniox/__init__.py new file mode 100644 index 000000000..c74b1c218 --- /dev/null +++ b/src/pipecat/services/soniox/__init__.py @@ -0,0 +1,13 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import sys + +from pipecat.services import DeprecatedModuleProxy + +from .stt import * + +sys.modules[__name__] = DeprecatedModuleProxy(globals(), "soniox", "soniox.stt") diff --git a/src/pipecat/services/soniox/config.py b/src/pipecat/services/soniox/config.py new file mode 100644 index 000000000..862b573f9 --- /dev/null +++ b/src/pipecat/services/soniox/config.py @@ -0,0 +1,36 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +from typing import List, Optional + +from pydantic import BaseModel + +from pipecat.transcriptions.language import Language + + +class SonioxInputParams(BaseModel): + """Real-time transcription settings. + + Attributes: + languages: List of language codes to use for transcription + code_switching: Whether to auto-detect language changes during transcription + """ + + model: str = "stt-rt-preview" + + audio_format: Optional[str] = "pcm_s16le" + num_channels: Optional[int] = 1 + sample_rate: Optional[int] = 16000 + + language_hints: Optional[List[Language]] = None + context: Optional[str] = None + + enable_non_final_tokens: Optional[bool] = True + max_non_final_tokens_duration_ms: Optional[int] = None + + enable_endpoint_detection: Optional[bool] = True + + client_reference_id: Optional[str] = None diff --git a/src/pipecat/services/soniox/stt.py b/src/pipecat/services/soniox/stt.py new file mode 100644 index 000000000..7d45c5a73 --- /dev/null +++ b/src/pipecat/services/soniox/stt.py @@ -0,0 +1,368 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import json +import time +from typing import AsyncGenerator, List, Optional + +from loguru import logger + +from pipecat.frames.frames import ( + CancelFrame, + EndFrame, + ErrorFrame, + Frame, + InterimTranscriptionFrame, + StartFrame, + TranscriptionFrame, + UserStoppedSpeakingFrame, +) +from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.soniox.config import SonioxInputParams +from pipecat.services.stt_service import STTService +from pipecat.transcriptions.language import Language +from pipecat.utils.time import time_now_iso8601 + +try: + import websockets +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error("In order to use Soniox, you need to `pip install pipecat-ai[soniox]`.") + raise Exception(f"Missing module: {e}") + + +KEEPALIVE_MESSAGE = json.dumps( + { + "type": "keepalive", + } +) + +FINALIZE_MESSAGE = json.dumps( + { + "type": "finalize", + } +) + +END_TOKEN = "" + +FINALIZED_TOKEN = "" + + +def is_end_token(token: dict) -> bool: + return token["text"] == END_TOKEN or token["text"] == FINALIZED_TOKEN + + +def language_to_soniox_language(language: Language) -> str: + """Pipecat Language enum uses same ISO 2-letter codes as Soniox, except with added regional variants. + + For a list of all supported languages, see: https://soniox.com/docs/speech-to-text/core-concepts/supported-languages + """ + lang_str = str(language.value).lower() + if "-" in lang_str: + return lang_str.split("-")[0] + return lang_str + + +def _prepare_language_hints( + language_hints: Optional[List[Language]], +) -> Optional[List[str]]: + if language_hints is None: + return None + + prepared_languages = [language_to_soniox_language(lang) for lang in language_hints] + # Remove duplicates (in case of language_hints with multiple regions) + return list(set(prepared_languages)) + + +class SonioxSTTService(STTService): + """Speech-to-Text service using Soniox's WebSocket API. + + This service connects to Soniox's WebSocket API for real-time transcription + with support for multiple languages, custom context, speaker diarization, + and more. + + For complete API documentation, see: https://soniox.com/docs/speech-to-text/api-reference/websocket-api + """ + + def __init__( + self, + *, + api_key: str, + url: str = "wss://stt-rt.soniox.com/transcribe-websocket", + sample_rate: Optional[int] = None, + params: SonioxInputParams = SonioxInputParams(), + enable_vad: bool = True, + auto_finalize_delay_ms: int | None = 3000, + **kwargs, + ): + """Initialize the Soniox STT service. + + Args: + api_key: Soniox API key + url: Soniox WebSocket API URL + model: Transcription model to use. + params: Additional configuration parameters, such as language hints, context and + speaker diarization. + enable_vad: Listen to `UserStoppedSpeakingFrame` to send finalize message to Soniox. + auto_finalize_delay: If no new tokens are received for a while and there is active + transcription (only InterimTranscriptionFrame), finalize the transcription by + sending the finalize message so user can receive the final transcript. If set + to `None`, the auto finalize feature is disabled. + **kwargs: Additional arguments passed to the STTService + """ + sample_rate = sample_rate or (params.sample_rate if params.sample_rate else None) + super().__init__(sample_rate=sample_rate, **kwargs) + + self._api_key = api_key + self._url = url + self.set_model_name(params.model) + self._params = params + self._enable_vad = enable_vad + self._auto_finalize_delay_ms = auto_finalize_delay_ms + self._websocket = None + + self._final_transcription_buffer = "" + self._last_tokens_received: float | None = None + + self._receive_task = None + self._keepalive_task = None + self._finalize_if_no_tokens_task = None + + async def start(self, frame: StartFrame): + """Start the Soniox STT websocket connection.""" + await super().start(frame) + if self._websocket: + return + + self._websocket = await websockets.connect(self._url) + + if not self._websocket: + logger.error(f"Unable to connect to Soniox API at {self._url}") + + # Send the initial configuration message + config = { + "api_key": self._api_key, + "model": self._model_name, + "audio_format": self._params.audio_format, + "num_channels": self._params.num_channels or 1, + "enable_endpoint_detection": self._params.enable_endpoint_detection, + "sample_rate": self._sample_rate, + "language_hints": _prepare_language_hints(self._params.language_hints), + "context": self._params.context, + "enable_non_final_tokens": self._params.enable_non_final_tokens, + "max_non_final_tokens_duration_ms": self._params.max_non_final_tokens_duration_ms, + "client_reference_id": self._params.client_reference_id, + } + + # Send the configuration message + await self._websocket.send(json.dumps(config)) + + if self._websocket and not self._receive_task: + self._receive_task = self.create_task(self._receive_task_handler()) + if self._websocket and not self._keepalive_task: + self._keepalive_task = self.create_task(self._keepalive_task_handler()) + if ( + self._websocket + and not self._finalize_if_no_tokens_task + and self._auto_finalize_delay_ms is not None + ): + self._finalize_if_no_tokens_task = self.create_task( + self._finalize_if_no_tokens_task_handler() + ) + + async def _cleanup(self): + if self._keepalive_task: + await self.cancel_task(self._keepalive_task) + self._keepalive_task = None + + if self._websocket: + await self._websocket.close() + self._websocket = None + + if self._receive_task: + await self.wait_for_task(self._receive_task) + self._receive_task = None + + if self._finalize_if_no_tokens_task: + await self.cancel_task(self._finalize_if_no_tokens_task) + self._finalize_if_no_tokens_task = None + + async def stop(self, frame: EndFrame): + """Stop the Soniox STT websocket connection. + + Stopping waits for the server to close the connection as we might receive + additional final tokens after sending the stop recording message. + """ + await super().stop(frame) + await self._send_stop_recording() + + async def cancel(self, frame: CancelFrame): + """Cancel the Soniox STT websocket connection. + + Compared to stop, this method closes the connection immediately without waiting + for the server to close it. This is useful when we want to stop the connection + immediately without waiting for the server to send any final tokens. + """ + await super().cancel(frame) + await self._cleanup() + + async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: + """Send audio data to Soniox STT Service.""" + await self.start_processing_metrics() + if self._websocket and not self._websocket.closed: + await self._websocket.send(audio) + await self.stop_processing_metrics() + + yield None + + async def process_frame(self, frame: Frame, direction: FrameDirection): + """Processes a frame of audio data, either buffering or transcribing it.""" + await super().process_frame(frame, direction) + + if isinstance(frame, UserStoppedSpeakingFrame) and self._enable_vad: + # Send finalize message to Soniox so we get the final tokens asap. + if self._websocket and not self._websocket.closed: + await self._websocket.send(FINALIZE_MESSAGE) + logger.debug(f"Triggered finalize event on: {frame.name=}, {direction=}") + + async def _send_stop_recording(self): + if self._websocket and not self._websocket.closed: + # Send stop recording message + await self._websocket.send("") + + async def _keepalive_task_handler(self): + """Connection has to be open all the time.""" + try: + while True: + logger.debug("Sending keepalive message") + if self._websocket and not self._websocket.closed: + await self._websocket.send(KEEPALIVE_MESSAGE) + else: + logger.debug("WebSocket connection closed.") + break + await asyncio.sleep(5) + + except websockets.exceptions.ConnectionClosed: + # Expected when closing the connection + logger.debug("WebSocket connection closed, keepalive task stopped.") + except Exception as e: + logger.error(f"{self} error (_keepalive_task_handler): {e}") + await self.push_error(ErrorFrame(f"{self} error (_keepalive_task_handler): {e}")) + + async def _receive_task_handler(self): + if not self._websocket: + return + + # Transcription frame will be only sent after we get the "endpoint" event. + self._final_transcription_buffer = "" + + async def send_endpoint_transcript(): + if self._final_transcription_buffer: + await self.push_frame( + TranscriptionFrame( + self._final_transcription_buffer, + "", + time_now_iso8601(), + ) + ) + self._final_transcription_buffer = "" + + try: + async for message in self._websocket: + content = json.loads(message) + + tokens = content["tokens"] + + if tokens: + # Got at least one token, so we can reset the auto finalize delay + self._last_tokens_received = time.time() + + # We will only send the final tokens after we get the "endpoint" event + non_final_transcription = "" + + for token in tokens: + if token["is_final"]: + if is_end_token(token): + # Found an endpoint, tokens until here will be sent as transcript, + # the rest will be sent as interim tokens (even final tokens). + await send_endpoint_transcript() + else: + self._final_transcription_buffer += token["text"] + else: + non_final_transcription += token["text"] + + if self._final_transcription_buffer or non_final_transcription: + await self.push_frame( + InterimTranscriptionFrame( + # Even final tokens are sent as interim tokens as we want to send + # nicely formatted messages - therefore waiting for the endpoint. + self._final_transcription_buffer + non_final_transcription, + "", + time_now_iso8601(), + ) + ) + + error_code = content.get("error_code") + error_message = content.get("error_message") + if error_code or error_message: + # In case of error, still send the final transcript (if any remaining in the buffer) + await send_endpoint_transcript() + logger.error( + f"{self} error: {error_code} (_receive_task_handler) - {error_message}" + ) + await self.push_error( + ErrorFrame( + f"{self} error: {error_code} (_receive_task_handler) - {error_message}" + ) + ) + + finished = content.get("finished") + if finished: + # When finished, still send the final transcript (if any remaining in the buffer) + await send_endpoint_transcript() + logger.debug("Transcription finished.") + await self._cleanup() + + except websockets.exceptions.ConnectionClosed: + # Expected when closing the connection + pass + except Exception as e: + logger.error(f"{self} error: {e}") + await self.push_error(ErrorFrame(f"{self} error: {e}")) + + async def _finalize_if_no_tokens_task_handler(self): + """Call finalize if no new tokens are received for a configured duration.""" + if not self._websocket or self._websocket.closed or self._auto_finalize_delay_ms is None: + return + + try: + while True: + await asyncio.sleep(0.5) + + if not self._websocket or self._websocket.closed: + break + + # Check if we have anything to send + if not self._final_transcription_buffer: + continue + + # Check if enough time has passed since the last tokens were received + if self._last_tokens_received: + last_token_age_ms = (time.time() - self._last_tokens_received) * 1000 + + if last_token_age_ms > self._auto_finalize_delay_ms: + # No new tokens received for a while, finalize the transcription + logger.debug("No pending frames, sending finalize message") + await self._websocket.send(FINALIZE_MESSAGE) + except websockets.exceptions.ConnectionClosed: + # Expected when closing the connection + pass + except Exception as e: + logger.error(f"{self} error (_finalize_if_no_tokens_task_handler): {e}") + await self.push_error( + ErrorFrame(f"{self} error (_finalize_if_no_tokens_task_handler): {e}") + )