Merge pull request #1910 from matejmarinko-soniox/main

Add Soniox STT service integration
This commit is contained in:
kompfner
2025-07-17 09:29:07 -04:00
committed by GitHub
9 changed files with 593 additions and 1 deletions

View File

@@ -53,7 +53,7 @@ You can connect to Pipecat from any platform using our official SDKs:
| Category | Services |
| ------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [Parakeet (NVIDIA)](https://docs.pipecat.ai/server/services/stt/parakeet), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [Parakeet (NVIDIA)](https://docs.pipecat.ai/server/services/stt/parakeet), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Soniox](https://docs.pipecat.ai/server/services/stt/soniox), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/server/services/llm/sambanova) [Together AI](https://docs.pipecat.ai/server/services/llm/together) |
| Text-to-Speech | [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [FastPitch (NVIDIA)](https://docs.pipecat.ai/server/services/tts/fastpitch), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) |
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) |

View File

@@ -77,6 +77,7 @@ autodoc_mock_imports = [
"openpipe",
"simli",
"soundfile",
"soniox",
"pipecat_ai_krisp",
"pyaudio",
"_tkinter",

View File

@@ -46,6 +46,7 @@ pipecat-ai[sambanova]
pipecat-ai[silero]
pipecat-ai[simli]
pipecat-ai[soundfile]
pipecat-ai[soniox]
pipecat-ai[speechmatics]
pipecat-ai[tavus]
pipecat-ai[together]

View File

@@ -109,6 +109,9 @@ MINIMAX_GROUP_ID=...
# Sarvam AI
SARVAM_API_KEY=...
# Soniox
SONIOX_API_KEY=
# Speechmatics
SPEECHMATICS_API_KEY=...

View File

@@ -0,0 +1,109 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.soniox.stt import SonioxSTTService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
from pipecat.transports.services.daily import DailyParams
load_dotenv(override=True)
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
}
async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool):
logger.info(f"Starting bot")
stt = SonioxSTTService(
api_key=os.getenv("SONIOX_API_KEY"),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=handle_sigint)
await runner.run(task)
if __name__ == "__main__":
from pipecat.examples.run import main
main(run_example, transport_params=transport_params)

View File

@@ -0,0 +1,81 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, TranscriptionFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.soniox.stt import SonioxSTTService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
from pipecat.transports.services.daily import DailyParams
load_dotenv(override=True)
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
print(f"Transcription: {frame.text}")
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
}
async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool):
logger.info(f"Starting bot")
stt = SonioxSTTService(
api_key=os.getenv("SONIOX_API_KEY"),
)
tl = TranscriptionLogger()
pipeline = Pipeline([transport.input(), stt, tl])
task = PipelineTask(pipeline)
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
@transport.event_handler("on_client_closed")
async def on_client_closed(transport, client):
logger.info(f"Client closed connection")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
if __name__ == "__main__":
from pipecat.examples.run import main
main(run_example, transport_params=transport_params)

View File

@@ -86,6 +86,7 @@ local-smart-turn = [ "coremltools>=8.0", "transformers", "torch==2.5.0", "torcha
remote-smart-turn = []
silero = [ "onnxruntime~=1.20.1" ]
simli = [ "simli-ai~=0.1.10"]
soniox = [ "websockets~=13.1" ]
soundfile = [ "soundfile~=0.13.0" ]
speechmatics = [ "speechmatics-rt>=0.3.1" ]
tavus=[]

View File

View File

@@ -0,0 +1,396 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Soniox speech-to-text service implementation."""
import asyncio
import json
import time
from typing import AsyncGenerator, List, Optional
from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
TranscriptionFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.stt_service import STTService
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt
try:
import websockets
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Soniox, you need to `pip install pipecat-ai[soniox]`.")
raise Exception(f"Missing module: {e}")
KEEPALIVE_MESSAGE = '{"type": "keepalive"}'
FINALIZE_MESSAGE = '{"type": "finalize"}'
END_TOKEN = "<end>"
FINALIZED_TOKEN = "<fin>"
class SonioxInputParams(BaseModel):
"""Real-time transcription settings.
See Soniox WebSocket API documentation for more details:
https://soniox.com/docs/speech-to-text/api-reference/websocket-api#configuration-parameters
Parameters:
model: Model to use for transcription.
audio_format: Audio format to use for transcription.
num_channels: Number of channels to use for transcription.
language_hints: List of language hints to use for transcription.
context: Customization for transcription.
enable_non_final_tokens: Whether to enable non-final tokens. If false, only final tokens will be returned.
max_non_final_tokens_duration_ms: Maximum duration of non-final tokens.
client_reference_id: Client reference ID to use for transcription.
"""
model: str = "stt-rt-preview"
audio_format: Optional[str] = "pcm_s16le"
num_channels: Optional[int] = 1
language_hints: Optional[List[Language]] = None
context: Optional[str] = None
enable_non_final_tokens: Optional[bool] = True
max_non_final_tokens_duration_ms: Optional[int] = None
client_reference_id: Optional[str] = None
def is_end_token(token: dict) -> bool:
"""Determine if a token is an end token."""
return token["text"] == END_TOKEN or token["text"] == FINALIZED_TOKEN
def language_to_soniox_language(language: Language) -> str:
"""Pipecat Language enum uses same ISO 2-letter codes as Soniox, except with added regional variants.
For a list of all supported languages, see: https://soniox.com/docs/speech-to-text/core-concepts/supported-languages
"""
lang_str = str(language.value).lower()
if "-" in lang_str:
return lang_str.split("-")[0]
return lang_str
def _prepare_language_hints(
language_hints: Optional[List[Language]],
) -> Optional[List[str]]:
if language_hints is None:
return None
prepared_languages = [language_to_soniox_language(lang) for lang in language_hints]
# Remove duplicates (in case of language_hints with multiple regions).
return list(set(prepared_languages))
class SonioxSTTService(STTService):
"""Speech-to-Text service using Soniox's WebSocket API.
This service connects to Soniox's WebSocket API for real-time transcription
with support for multiple languages, custom context, speaker diarization,
and more.
For complete API documentation, see: https://soniox.com/docs/speech-to-text/api-reference/websocket-api
"""
def __init__(
self,
*,
api_key: str,
url: str = "wss://stt-rt.soniox.com/transcribe-websocket",
sample_rate: Optional[int] = None,
params: Optional[SonioxInputParams] = None,
vad_force_turn_endpoint: bool = False,
**kwargs,
):
"""Initialize the Soniox STT service.
Args:
api_key: Soniox API key.
url: Soniox WebSocket API URL.
sample_rate: Audio sample rate.
params: Additional configuration parameters, such as language hints, context and
speaker diarization.
vad_force_turn_endpoint: Listen to `UserStoppedSpeakingFrame` to send finalize message to Soniox. If disabled, Soniox will detect the end of the speech.
**kwargs: Additional arguments passed to the STTService.
"""
super().__init__(sample_rate=sample_rate, **kwargs)
params = params or SonioxInputParams()
self._api_key = api_key
self._url = url
self.set_model_name(params.model)
self._params = params
self._vad_force_turn_endpoint = vad_force_turn_endpoint
self._websocket = None
self._final_transcription_buffer = []
self._last_tokens_received: Optional[float] = None
self._receive_task = None
self._keepalive_task = None
async def start(self, frame: StartFrame):
"""Start the Soniox STT websocket connection.
Args:
frame: The start frame containing initialization parameters.
"""
await super().start(frame)
if self._websocket:
return
self._websocket = await websockets.connect(self._url)
if not self._websocket:
logger.error(f"Unable to connect to Soniox API at {self._url}")
# If vad_force_turn_endpoint is not enabled, we need to enable endpoint detection.
# Either one or the other is required.
enable_endpoint_detection = not self._vad_force_turn_endpoint
# Send the initial configuration message.
config = {
"api_key": self._api_key,
"model": self._model_name,
"audio_format": self._params.audio_format,
"num_channels": self._params.num_channels or 1,
"enable_endpoint_detection": enable_endpoint_detection,
"sample_rate": self.sample_rate,
"language_hints": _prepare_language_hints(self._params.language_hints),
"context": self._params.context,
"enable_non_final_tokens": self._params.enable_non_final_tokens,
"max_non_final_tokens_duration_ms": self._params.max_non_final_tokens_duration_ms,
"client_reference_id": self._params.client_reference_id,
}
# Send the configuration message.
await self._websocket.send(json.dumps(config))
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler())
if self._websocket and not self._keepalive_task:
self._keepalive_task = self.create_task(self._keepalive_task_handler())
async def _cleanup(self):
if self._keepalive_task:
await self.cancel_task(self._keepalive_task)
self._keepalive_task = None
if self._websocket:
await self._websocket.close()
self._websocket = None
if self._receive_task:
# Task cannot cancel itself. If task called _cleanup() we expect it to cancel itself.
if self._receive_task != asyncio.current_task():
await self.wait_for_task(self._receive_task)
self._receive_task = None
async def stop(self, frame: EndFrame):
"""Stop the Soniox STT websocket connection.
Stopping waits for the server to close the connection as we might receive
additional final tokens after sending the stop recording message.
Args:
frame: The end frame.
"""
await super().stop(frame)
await self._send_stop_recording()
async def cancel(self, frame: CancelFrame):
"""Cancel the Soniox STT websocket connection.
Compared to stop, this method closes the connection immediately without waiting
for the server to close it. This is useful when we want to stop the connection
immediately without waiting for the server to send any final tokens.
Args:
frame: The cancel frame.
"""
await super().cancel(frame)
await self._cleanup()
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""Send audio data to Soniox STT Service.
Args:
audio: Raw audio bytes to transcribe.
Yields:
Frame: None (transcription results come via WebSocket callbacks).
"""
await self.start_processing_metrics()
if self._websocket and not self._websocket.closed:
await self._websocket.send(audio)
await self.stop_processing_metrics()
yield None
@traced_stt
async def _handle_transcription(
self, transcript: str, is_final: bool, language: Optional[Language] = None
):
"""Handle a transcription result with tracing."""
pass
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Processes a frame of audio data, either buffering or transcribing it.
Args:
frame: The frame to process.
direction: The direction of frame processing.
"""
await super().process_frame(frame, direction)
if isinstance(frame, UserStoppedSpeakingFrame) and self._vad_force_turn_endpoint:
# Send finalize message to Soniox so we get the final tokens asap.
if self._websocket and not self._websocket.closed:
await self._websocket.send(FINALIZE_MESSAGE)
logger.debug(f"Triggered finalize event on: {frame.name=}, {direction=}")
async def _send_stop_recording(self):
"""Send stop recording message to Soniox."""
if self._websocket and not self._websocket.closed:
# Send stop recording message
await self._websocket.send("")
async def _keepalive_task_handler(self):
"""Connection has to be open all the time."""
try:
while True:
logger.debug("Sending keepalive message")
if self._websocket and not self._websocket.closed:
await self._websocket.send(KEEPALIVE_MESSAGE)
else:
logger.debug("WebSocket connection closed.")
break
await asyncio.sleep(5)
except websockets.exceptions.ConnectionClosed:
# Expected when closing the connection
logger.debug("WebSocket connection closed, keepalive task stopped.")
except Exception as e:
logger.error(f"{self} error (_keepalive_task_handler): {e}")
await self.push_error(ErrorFrame(f"{self} error (_keepalive_task_handler): {e}"))
async def _receive_task_handler(self):
if not self._websocket:
return
# Transcription frame will be only sent after we get the "endpoint" event.
self._final_transcription_buffer = []
async def send_endpoint_transcript():
if self._final_transcription_buffer:
text = "".join(map(lambda token: token["text"], self._final_transcription_buffer))
await self.push_frame(
TranscriptionFrame(
text=text,
user_id=self._user_id,
timestamp=time_now_iso8601(),
result=self._final_transcription_buffer,
)
)
await self._handle_transcription(text, is_final=True)
await self.stop_processing_metrics()
self._final_transcription_buffer = []
try:
async for message in self._websocket:
content = json.loads(message)
tokens = content["tokens"]
if tokens:
if len(tokens) == 1 and tokens[0]["text"] == FINALIZED_TOKEN:
# Ignore finalized token, prevent auto-finalize cycling.
pass
else:
# Got at least one token, so we can reset the auto finalize delay.
self._last_tokens_received = time.time()
# We will only send the final tokens after we get the "endpoint" event.
non_final_transcription = []
for token in tokens:
if token["is_final"]:
if is_end_token(token):
# Found an endpoint, tokens until here will be sent as transcript,
# the rest will be sent as interim tokens (even final tokens).
await send_endpoint_transcript()
else:
self._final_transcription_buffer.append(token)
else:
non_final_transcription.append(token)
if self._final_transcription_buffer or non_final_transcription:
final_text = "".join(
map(lambda token: token["text"], self._final_transcription_buffer)
)
non_final_text = "".join(
map(lambda token: token["text"], non_final_transcription)
)
await self.push_frame(
InterimTranscriptionFrame(
# Even final tokens are sent as interim tokens as we want to send
# nicely formatted messages - therefore waiting for the endpoint.
text=final_text + non_final_text,
user_id=self._user_id,
timestamp=time_now_iso8601(),
result=self._final_transcription_buffer + non_final_transcription,
)
)
error_code = content.get("error_code")
error_message = content.get("error_message")
if error_code or error_message:
# In case of error, still send the final transcript (if any remaining in the buffer).
await send_endpoint_transcript()
logger.error(
f"{self} error: {error_code} (_receive_task_handler) - {error_message}"
)
await self.push_error(
ErrorFrame(
f"{self} error: {error_code} (_receive_task_handler) - {error_message}"
)
)
finished = content.get("finished")
if finished:
# When finished, still send the final transcript (if any remaining in the buffer).
await send_endpoint_transcript()
logger.debug("Transcription finished.")
await self._cleanup()
return
except websockets.exceptions.ConnectionClosed:
# Expected when closing the connection.
pass
except Exception as e:
logger.error(f"{self} error: {e}")
await self.push_error(ErrorFrame(f"{self} error: {e}"))