Add Soniox STT service

This commit is contained in:
Matej Marinko
2025-05-28 09:35:21 +02:00
parent 0a39769cd0
commit 2968c846ce
10 changed files with 617 additions and 2 deletions

View File

@@ -53,7 +53,7 @@ You can connect to Pipecat from any platform using our official SDKs:
| Category | Services |
| ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [Parakeet (NVIDIA)](https://docs.pipecat.ai/server/services/stt/parakeet), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [Parakeet (NVIDIA)](https://docs.pipecat.ai/server/services/stt/parakeet), [Soniox](https://docs.pipecat.ai/server/services/stt/soniox), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [Together AI](https://docs.pipecat.ai/server/services/llm/together) |
| Text-to-Speech | [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [FastPitch (NVIDIA)](https://docs.pipecat.ai/server/services/tts/fastpitch), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) |
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) |

View File

@@ -75,6 +75,7 @@ autodoc_mock_imports = [
"openpipe",
"simli",
"soundfile",
"soniox",
"pipecat_ai_krisp",
"pyaudio",
"_tkinter",

View File

@@ -45,6 +45,7 @@ pipecat-ai[remote-smart-turn]
pipecat-ai[silero]
pipecat-ai[simli]
pipecat-ai[soundfile]
pipecat-ai[soniox]
pipecat-ai[tavus]
pipecat-ai[together]
# pipecat-ai[ultravox] # Mocked

View File

@@ -107,4 +107,7 @@ MINIMAX_API_KEY=...
MINIMAX_GROUP_ID=...
# Sarvam AI
SARVAM_API_KEY=...
SARVAM_API_KEY=...
# Soniox
SONIOX_API_KEY=

View File

@@ -0,0 +1,115 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.soniox.config import SonioxInputParams
from pipecat.services.soniox.stt import SonioxSTTService
from pipecat.transcriptions.language import Language
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
load_dotenv(override=True)
async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespace):
logger.info(f"Starting bot")
transport = SmallWebRTCTransport(
webrtc_connection=webrtc_connection,
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
stt = SonioxSTTService(
api_key=os.getenv("SONIOX_API_KEY"),
params=SonioxInputParams(
# Add language hints to improve transcription accuracy. Variants are ignored.
# For example "en-GB" will be treated same as "en".
# List of supported languages: https://soniox.com/docs/speech-to-text/core-concepts/supported-languages
language_hints=[Language.EN, Language.ES, Language.JA, Language.ZH],
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
@transport.event_handler("on_client_closed")
async def on_client_closed(transport, client):
logger.info(f"Client closed connection")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
if __name__ == "__main__":
from run import main
main()

View File

@@ -0,0 +1,77 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import Frame, TranscriptionFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.soniox.config import SonioxInputParams
from pipecat.services.soniox.stt import SonioxSTTService
from pipecat.transcriptions.language import Language
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
load_dotenv(override=True)
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
print(f"Transcription: {frame.text}")
async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespace):
logger.info(f"Starting bot")
transport = SmallWebRTCTransport(
webrtc_connection=webrtc_connection,
params=TransportParams(audio_in_enabled=True),
)
stt = SonioxSTTService(
api_key=os.getenv("SONIOX_API_KEY"),
params=SonioxInputParams(
# Add language hints to improve transcription accuracy. Variants are ignored.
# For example "en-GB" will be treated same as "en".
# List of supported languages: https://soniox.com/docs/speech-to-text/core-concepts/supported-languages
language_hints=[Language.EN, Language.ES, Language.JA, Language.ZH],
),
)
tl = TranscriptionLogger()
pipeline = Pipeline([transport.input(), stt, tl])
task = PipelineTask(pipeline)
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
@transport.event_handler("on_client_closed")
async def on_client_closed(transport, client):
logger.info(f"Client closed connection")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
if __name__ == "__main__":
from run import main
main()

View File

@@ -84,6 +84,7 @@ local-smart-turn = [ "coremltools>=8.0", "transformers", "torch==2.5.0", "torcha
remote-smart-turn = []
silero = [ "onnxruntime~=1.20.1" ]
simli = [ "simli-ai~=0.1.10"]
soniox = [ "websockets~=13.1" ]
soundfile = [ "soundfile~=0.13.0" ]
tavus=[]
together = []

View File

@@ -0,0 +1,13 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import sys
from pipecat.services import DeprecatedModuleProxy
from .stt import *
sys.modules[__name__] = DeprecatedModuleProxy(globals(), "soniox", "soniox.stt")

View File

@@ -0,0 +1,36 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import List, Optional
from pydantic import BaseModel
from pipecat.transcriptions.language import Language
class SonioxInputParams(BaseModel):
"""Real-time transcription settings.
Attributes:
languages: List of language codes to use for transcription
code_switching: Whether to auto-detect language changes during transcription
"""
model: str = "stt-rt-preview"
audio_format: Optional[str] = "pcm_s16le"
num_channels: Optional[int] = 1
sample_rate: Optional[int] = 16000
language_hints: Optional[List[Language]] = None
context: Optional[str] = None
enable_non_final_tokens: Optional[bool] = True
max_non_final_tokens_duration_ms: Optional[int] = None
enable_endpoint_detection: Optional[bool] = True
client_reference_id: Optional[str] = None

View File

@@ -0,0 +1,368 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import json
import time
from typing import AsyncGenerator, List, Optional
from loguru import logger
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
TranscriptionFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.soniox.config import SonioxInputParams
from pipecat.services.stt_service import STTService
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
try:
import websockets
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Soniox, you need to `pip install pipecat-ai[soniox]`.")
raise Exception(f"Missing module: {e}")
KEEPALIVE_MESSAGE = json.dumps(
{
"type": "keepalive",
}
)
FINALIZE_MESSAGE = json.dumps(
{
"type": "finalize",
}
)
END_TOKEN = "<end>"
FINALIZED_TOKEN = "<fin>"
def is_end_token(token: dict) -> bool:
return token["text"] == END_TOKEN or token["text"] == FINALIZED_TOKEN
def language_to_soniox_language(language: Language) -> str:
"""Pipecat Language enum uses same ISO 2-letter codes as Soniox, except with added regional variants.
For a list of all supported languages, see: https://soniox.com/docs/speech-to-text/core-concepts/supported-languages
"""
lang_str = str(language.value).lower()
if "-" in lang_str:
return lang_str.split("-")[0]
return lang_str
def _prepare_language_hints(
language_hints: Optional[List[Language]],
) -> Optional[List[str]]:
if language_hints is None:
return None
prepared_languages = [language_to_soniox_language(lang) for lang in language_hints]
# Remove duplicates (in case of language_hints with multiple regions)
return list(set(prepared_languages))
class SonioxSTTService(STTService):
"""Speech-to-Text service using Soniox's WebSocket API.
This service connects to Soniox's WebSocket API for real-time transcription
with support for multiple languages, custom context, speaker diarization,
and more.
For complete API documentation, see: https://soniox.com/docs/speech-to-text/api-reference/websocket-api
"""
def __init__(
self,
*,
api_key: str,
url: str = "wss://stt-rt.soniox.com/transcribe-websocket",
sample_rate: Optional[int] = None,
params: SonioxInputParams = SonioxInputParams(),
enable_vad: bool = True,
auto_finalize_delay_ms: int | None = 3000,
**kwargs,
):
"""Initialize the Soniox STT service.
Args:
api_key: Soniox API key
url: Soniox WebSocket API URL
model: Transcription model to use.
params: Additional configuration parameters, such as language hints, context and
speaker diarization.
enable_vad: Listen to `UserStoppedSpeakingFrame` to send finalize message to Soniox.
auto_finalize_delay: If no new tokens are received for a while and there is active
transcription (only InterimTranscriptionFrame), finalize the transcription by
sending the finalize message so user can receive the final transcript. If set
to `None`, the auto finalize feature is disabled.
**kwargs: Additional arguments passed to the STTService
"""
sample_rate = sample_rate or (params.sample_rate if params.sample_rate else None)
super().__init__(sample_rate=sample_rate, **kwargs)
self._api_key = api_key
self._url = url
self.set_model_name(params.model)
self._params = params
self._enable_vad = enable_vad
self._auto_finalize_delay_ms = auto_finalize_delay_ms
self._websocket = None
self._final_transcription_buffer = ""
self._last_tokens_received: float | None = None
self._receive_task = None
self._keepalive_task = None
self._finalize_if_no_tokens_task = None
async def start(self, frame: StartFrame):
"""Start the Soniox STT websocket connection."""
await super().start(frame)
if self._websocket:
return
self._websocket = await websockets.connect(self._url)
if not self._websocket:
logger.error(f"Unable to connect to Soniox API at {self._url}")
# Send the initial configuration message
config = {
"api_key": self._api_key,
"model": self._model_name,
"audio_format": self._params.audio_format,
"num_channels": self._params.num_channels or 1,
"enable_endpoint_detection": self._params.enable_endpoint_detection,
"sample_rate": self._sample_rate,
"language_hints": _prepare_language_hints(self._params.language_hints),
"context": self._params.context,
"enable_non_final_tokens": self._params.enable_non_final_tokens,
"max_non_final_tokens_duration_ms": self._params.max_non_final_tokens_duration_ms,
"client_reference_id": self._params.client_reference_id,
}
# Send the configuration message
await self._websocket.send(json.dumps(config))
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler())
if self._websocket and not self._keepalive_task:
self._keepalive_task = self.create_task(self._keepalive_task_handler())
if (
self._websocket
and not self._finalize_if_no_tokens_task
and self._auto_finalize_delay_ms is not None
):
self._finalize_if_no_tokens_task = self.create_task(
self._finalize_if_no_tokens_task_handler()
)
async def _cleanup(self):
if self._keepalive_task:
await self.cancel_task(self._keepalive_task)
self._keepalive_task = None
if self._websocket:
await self._websocket.close()
self._websocket = None
if self._receive_task:
await self.wait_for_task(self._receive_task)
self._receive_task = None
if self._finalize_if_no_tokens_task:
await self.cancel_task(self._finalize_if_no_tokens_task)
self._finalize_if_no_tokens_task = None
async def stop(self, frame: EndFrame):
"""Stop the Soniox STT websocket connection.
Stopping waits for the server to close the connection as we might receive
additional final tokens after sending the stop recording message.
"""
await super().stop(frame)
await self._send_stop_recording()
async def cancel(self, frame: CancelFrame):
"""Cancel the Soniox STT websocket connection.
Compared to stop, this method closes the connection immediately without waiting
for the server to close it. This is useful when we want to stop the connection
immediately without waiting for the server to send any final tokens.
"""
await super().cancel(frame)
await self._cleanup()
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""Send audio data to Soniox STT Service."""
await self.start_processing_metrics()
if self._websocket and not self._websocket.closed:
await self._websocket.send(audio)
await self.stop_processing_metrics()
yield None
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Processes a frame of audio data, either buffering or transcribing it."""
await super().process_frame(frame, direction)
if isinstance(frame, UserStoppedSpeakingFrame) and self._enable_vad:
# Send finalize message to Soniox so we get the final tokens asap.
if self._websocket and not self._websocket.closed:
await self._websocket.send(FINALIZE_MESSAGE)
logger.debug(f"Triggered finalize event on: {frame.name=}, {direction=}")
async def _send_stop_recording(self):
if self._websocket and not self._websocket.closed:
# Send stop recording message
await self._websocket.send("")
async def _keepalive_task_handler(self):
"""Connection has to be open all the time."""
try:
while True:
logger.debug("Sending keepalive message")
if self._websocket and not self._websocket.closed:
await self._websocket.send(KEEPALIVE_MESSAGE)
else:
logger.debug("WebSocket connection closed.")
break
await asyncio.sleep(5)
except websockets.exceptions.ConnectionClosed:
# Expected when closing the connection
logger.debug("WebSocket connection closed, keepalive task stopped.")
except Exception as e:
logger.error(f"{self} error (_keepalive_task_handler): {e}")
await self.push_error(ErrorFrame(f"{self} error (_keepalive_task_handler): {e}"))
async def _receive_task_handler(self):
if not self._websocket:
return
# Transcription frame will be only sent after we get the "endpoint" event.
self._final_transcription_buffer = ""
async def send_endpoint_transcript():
if self._final_transcription_buffer:
await self.push_frame(
TranscriptionFrame(
self._final_transcription_buffer,
"",
time_now_iso8601(),
)
)
self._final_transcription_buffer = ""
try:
async for message in self._websocket:
content = json.loads(message)
tokens = content["tokens"]
if tokens:
# Got at least one token, so we can reset the auto finalize delay
self._last_tokens_received = time.time()
# We will only send the final tokens after we get the "endpoint" event
non_final_transcription = ""
for token in tokens:
if token["is_final"]:
if is_end_token(token):
# Found an endpoint, tokens until here will be sent as transcript,
# the rest will be sent as interim tokens (even final tokens).
await send_endpoint_transcript()
else:
self._final_transcription_buffer += token["text"]
else:
non_final_transcription += token["text"]
if self._final_transcription_buffer or non_final_transcription:
await self.push_frame(
InterimTranscriptionFrame(
# Even final tokens are sent as interim tokens as we want to send
# nicely formatted messages - therefore waiting for the endpoint.
self._final_transcription_buffer + non_final_transcription,
"",
time_now_iso8601(),
)
)
error_code = content.get("error_code")
error_message = content.get("error_message")
if error_code or error_message:
# In case of error, still send the final transcript (if any remaining in the buffer)
await send_endpoint_transcript()
logger.error(
f"{self} error: {error_code} (_receive_task_handler) - {error_message}"
)
await self.push_error(
ErrorFrame(
f"{self} error: {error_code} (_receive_task_handler) - {error_message}"
)
)
finished = content.get("finished")
if finished:
# When finished, still send the final transcript (if any remaining in the buffer)
await send_endpoint_transcript()
logger.debug("Transcription finished.")
await self._cleanup()
except websockets.exceptions.ConnectionClosed:
# Expected when closing the connection
pass
except Exception as e:
logger.error(f"{self} error: {e}")
await self.push_error(ErrorFrame(f"{self} error: {e}"))
async def _finalize_if_no_tokens_task_handler(self):
"""Call finalize if no new tokens are received for a configured duration."""
if not self._websocket or self._websocket.closed or self._auto_finalize_delay_ms is None:
return
try:
while True:
await asyncio.sleep(0.5)
if not self._websocket or self._websocket.closed:
break
# Check if we have anything to send
if not self._final_transcription_buffer:
continue
# Check if enough time has passed since the last tokens were received
if self._last_tokens_received:
last_token_age_ms = (time.time() - self._last_tokens_received) * 1000
if last_token_age_ms > self._auto_finalize_delay_ms:
# No new tokens received for a while, finalize the transcription
logger.debug("No pending frames, sending finalize message")
await self._websocket.send(FINALIZE_MESSAGE)
except websockets.exceptions.ConnectionClosed:
# Expected when closing the connection
pass
except Exception as e:
logger.error(f"{self} error (_finalize_if_no_tokens_task_handler): {e}")
await self.push_error(
ErrorFrame(f"{self} error (_finalize_if_no_tokens_task_handler): {e}")
)