From 58a038ddb2e4d2fc289dca4fad8bb2ad4f8e501f Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 23 Apr 2026 16:10:12 -0400 Subject: [PATCH] Add Soniox real-time TTS service Introduce SonioxTTSService, a WebSocket TTS provider that streams text and receives audio over a persistent connection, multiplexing up to 5 concurrent streams per socket via Soniox's `stream_id`. Also updates the README service table and the Soniox voice example to use the new TTS end-to-end. --- README.md | 28 +- examples/voice/voice-soniox.py | 10 +- src/pipecat/services/soniox/tts.py | 561 +++++++++++++++++++++++++++++ 3 files changed, 580 insertions(+), 19 deletions(-) create mode 100644 src/pipecat/services/soniox/tts.py diff --git a/README.md b/README.md index 10ad65d72..a1abf195e 100644 --- a/README.md +++ b/README.md @@ -89,20 +89,20 @@ Catch new features, interviews, and how-tos on our [Pipecat TV](https://www.yout ## 🧩 Available services -| Category | Services | -| ------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/api-reference/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/api-reference/server/services/stt/aws), [Azure](https://docs.pipecat.ai/api-reference/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/api-reference/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/api-reference/server/services/stt/deepgram), [ElevenLabs](https://docs.pipecat.ai/api-reference/server/services/stt/elevenlabs), [Fal Wizper](https://docs.pipecat.ai/api-reference/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/api-reference/server/services/stt/gladia), [Google](https://docs.pipecat.ai/api-reference/server/services/stt/google), [Gradium](https://docs.pipecat.ai/api-reference/server/services/stt/gradium), [Groq (Whisper)](https://docs.pipecat.ai/api-reference/server/services/stt/groq), [Mistral](https://docs.pipecat.ai/api-reference/server/services/stt/mistral), [NVIDIA Riva](https://docs.pipecat.ai/api-reference/server/services/stt/riva), [OpenAI (Whisper)](https://docs.pipecat.ai/api-reference/server/services/stt/openai), [Sarvam](https://docs.pipecat.ai/api-reference/server/services/stt/sarvam), [Soniox](https://docs.pipecat.ai/api-reference/server/services/stt/soniox), [Speechmatics](https://docs.pipecat.ai/api-reference/server/services/stt/speechmatics), [Whisper](https://docs.pipecat.ai/api-reference/server/services/stt/whisper), [xAI](https://docs.pipecat.ai/api-reference/server/services/stt/xai) | -| LLMs | [Anthropic](https://docs.pipecat.ai/api-reference/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/api-reference/server/services/llm/aws), [Azure](https://docs.pipecat.ai/api-reference/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/api-reference/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/api-reference/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/api-reference/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/api-reference/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/api-reference/server/services/llm/grok), [Groq](https://docs.pipecat.ai/api-reference/server/services/llm/groq), [Mistral](https://docs.pipecat.ai/api-reference/server/services/llm/mistral), [Nebius](https://docs.pipecat.ai/api-reference/server/services/llm/nebius), [Novita](https://docs.pipecat.ai/api-reference/server/services/llm/novita), [NVIDIA NIM](https://docs.pipecat.ai/api-reference/server/services/llm/nvidia), [Ollama](https://docs.pipecat.ai/api-reference/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/api-reference/server/services/llm/openai), [OpenAI Responses](https://docs.pipecat.ai/api-reference/server/services/llm/openai-responses), [OpenRouter](https://docs.pipecat.ai/api-reference/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/api-reference/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/api-reference/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/api-reference/server/services/llm/sambanova), [Sarvam](https://docs.pipecat.ai/api-reference/server/services/llm/sarvam), [Together AI](https://docs.pipecat.ai/api-reference/server/services/llm/together) | -| Text-to-Speech | [Async](https://docs.pipecat.ai/api-reference/server/services/tts/asyncai), [AWS](https://docs.pipecat.ai/api-reference/server/services/tts/aws), [Azure](https://docs.pipecat.ai/api-reference/server/services/tts/azure), [Camb AI](https://docs.pipecat.ai/api-reference/server/services/tts/camb), [Cartesia](https://docs.pipecat.ai/api-reference/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/api-reference/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/api-reference/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/api-reference/server/services/tts/fish), [Google](https://docs.pipecat.ai/api-reference/server/services/tts/google), [Gradium](https://docs.pipecat.ai/api-reference/server/services/tts/gradium), [Groq](https://docs.pipecat.ai/api-reference/server/services/tts/groq), [Hume](https://docs.pipecat.ai/api-reference/server/services/tts/hume), [Inworld](https://docs.pipecat.ai/api-reference/server/services/tts/inworld), [Kokoro](https://docs.pipecat.ai/api-reference/server/services/tts/kokoro), [LMNT](https://docs.pipecat.ai/api-reference/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/api-reference/server/services/tts/minimax), [Mistral](https://docs.pipecat.ai/api-reference/server/services/tts/mistral), [Neuphonic](https://docs.pipecat.ai/api-reference/server/services/tts/neuphonic), [NVIDIA Riva](https://docs.pipecat.ai/api-reference/server/services/tts/riva), [OpenAI](https://docs.pipecat.ai/api-reference/server/services/tts/openai), [Piper](https://docs.pipecat.ai/api-reference/server/services/tts/piper), [Resemble](https://docs.pipecat.ai/api-reference/server/services/tts/resemble), [Rime](https://docs.pipecat.ai/api-reference/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/api-reference/server/services/tts/sarvam), [Smallest](https://docs.pipecat.ai/api-reference/server/services/tts/smallest), [Speechmatics](https://docs.pipecat.ai/api-reference/server/services/tts/speechmatics), [xAI](https://docs.pipecat.ai/api-reference/server/services/tts/xai), [XTTS](https://docs.pipecat.ai/api-reference/server/services/tts/xtts) | -| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/api-reference/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/api-reference/server/services/s2s/gemini), [Grok Voice Agent](https://docs.pipecat.ai/api-reference/server/services/s2s/grok), [OpenAI Realtime](https://docs.pipecat.ai/api-reference/server/services/s2s/openai), [Ultravox](https://docs.pipecat.ai/api-reference/server/services/s2s/ultravox), | -| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/api-reference/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/api-reference/server/services/transport/fastapi-websocket), [LiveKit (WebRTC)](https://docs.pipecat.ai/api-reference/server/services/transport/livekit), [SmallWebRTCTransport](https://docs.pipecat.ai/api-reference/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/api-reference/server/services/transport/websocket-server), [WhatsApp](https://docs.pipecat.ai/api-reference/server/services/transport/whatsapp), Local | -| Serializers | [Exotel](https://docs.pipecat.ai/api-reference/server/services/serializers/exotel), [Genesys](https://docs.pipecat.ai/api-reference/server/services/serializers/genesys), [Plivo](https://docs.pipecat.ai/api-reference/server/services/serializers/plivo), [Twilio](https://docs.pipecat.ai/api-reference/server/services/serializers/twilio), [Telnyx](https://docs.pipecat.ai/api-reference/server/services/serializers/telnyx), [Vonage](https://docs.pipecat.ai/api-reference/server/services/serializers/vonage) | -| Video | [HeyGen](https://docs.pipecat.ai/api-reference/server/services/video/heygen), [LemonSlice](https://docs.pipecat.ai/api-reference/server/services/transport/lemonslice), [Tavus](https://docs.pipecat.ai/api-reference/server/services/video/tavus), [Simli](https://docs.pipecat.ai/api-reference/server/services/video/simli) | -| Memory | [mem0](https://docs.pipecat.ai/api-reference/server/services/memory/mem0) | -| Vision & Image | [fal](https://docs.pipecat.ai/api-reference/server/services/image-generation/fal), [Google Imagen](https://docs.pipecat.ai/api-reference/server/services/image-generation/google-imagen), [Moondream](https://docs.pipecat.ai/api-reference/server/services/vision/moondream) | -| Audio Processing | [Silero VAD](https://docs.pipecat.ai/api-reference/server/utilities/audio/silero-vad-analyzer), [Krisp Viva](https://docs.pipecat.ai/guides/features/krisp-viva), [Koala](https://docs.pipecat.ai/api-reference/server/utilities/audio/koala-filter), [ai-coustics](https://docs.pipecat.ai/api-reference/server/utilities/audio/aic-filter), [RNNoise](https://docs.pipecat.ai/api-reference/server/utilities/audio/rnnoise-filter) | -| Analytics & Metrics | [OpenTelemetry](https://docs.pipecat.ai/api-reference/server/utilities/opentelemetry), [Sentry](https://docs.pipecat.ai/api-reference/server/services/analytics/sentry) | -| Community | [Browse community integrations →](https://docs.pipecat.ai/api-reference/server/services/community-integrations) | +| Category | Services | +| ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | +| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/api-reference/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/api-reference/server/services/stt/aws), [Azure](https://docs.pipecat.ai/api-reference/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/api-reference/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/api-reference/server/services/stt/deepgram), [ElevenLabs](https://docs.pipecat.ai/api-reference/server/services/stt/elevenlabs), [Fal Wizper](https://docs.pipecat.ai/api-reference/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/api-reference/server/services/stt/gladia), [Google](https://docs.pipecat.ai/api-reference/server/services/stt/google), [Gradium](https://docs.pipecat.ai/api-reference/server/services/stt/gradium), [Groq (Whisper)](https://docs.pipecat.ai/api-reference/server/services/stt/groq), [Mistral](https://docs.pipecat.ai/api-reference/server/services/stt/mistral), [NVIDIA Riva](https://docs.pipecat.ai/api-reference/server/services/stt/riva), [OpenAI (Whisper)](https://docs.pipecat.ai/api-reference/server/services/stt/openai), [Sarvam](https://docs.pipecat.ai/api-reference/server/services/stt/sarvam), [Soniox](https://docs.pipecat.ai/api-reference/server/services/stt/soniox), [Speechmatics](https://docs.pipecat.ai/api-reference/server/services/stt/speechmatics), [Whisper](https://docs.pipecat.ai/api-reference/server/services/stt/whisper), [xAI](https://docs.pipecat.ai/api-reference/server/services/stt/xai) | +| LLMs | [Anthropic](https://docs.pipecat.ai/api-reference/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/api-reference/server/services/llm/aws), [Azure](https://docs.pipecat.ai/api-reference/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/api-reference/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/api-reference/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/api-reference/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/api-reference/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/api-reference/server/services/llm/grok), [Groq](https://docs.pipecat.ai/api-reference/server/services/llm/groq), [Mistral](https://docs.pipecat.ai/api-reference/server/services/llm/mistral), [Nebius](https://docs.pipecat.ai/api-reference/server/services/llm/nebius), [Novita](https://docs.pipecat.ai/api-reference/server/services/llm/novita), [NVIDIA NIM](https://docs.pipecat.ai/api-reference/server/services/llm/nvidia), [Ollama](https://docs.pipecat.ai/api-reference/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/api-reference/server/services/llm/openai), [OpenAI Responses](https://docs.pipecat.ai/api-reference/server/services/llm/openai-responses), [OpenRouter](https://docs.pipecat.ai/api-reference/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/api-reference/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/api-reference/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/api-reference/server/services/llm/sambanova), [Sarvam](https://docs.pipecat.ai/api-reference/server/services/llm/sarvam), [Together AI](https://docs.pipecat.ai/api-reference/server/services/llm/together) | +| Text-to-Speech | [Async](https://docs.pipecat.ai/api-reference/server/services/tts/asyncai), [AWS](https://docs.pipecat.ai/api-reference/server/services/tts/aws), [Azure](https://docs.pipecat.ai/api-reference/server/services/tts/azure), [Camb AI](https://docs.pipecat.ai/api-reference/server/services/tts/camb), [Cartesia](https://docs.pipecat.ai/api-reference/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/api-reference/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/api-reference/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/api-reference/server/services/tts/fish), [Google](https://docs.pipecat.ai/api-reference/server/services/tts/google), [Gradium](https://docs.pipecat.ai/api-reference/server/services/tts/gradium), [Groq](https://docs.pipecat.ai/api-reference/server/services/tts/groq), [Hume](https://docs.pipecat.ai/api-reference/server/services/tts/hume), [Inworld](https://docs.pipecat.ai/api-reference/server/services/tts/inworld), [Kokoro](https://docs.pipecat.ai/api-reference/server/services/tts/kokoro), [LMNT](https://docs.pipecat.ai/api-reference/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/api-reference/server/services/tts/minimax), [Mistral](https://docs.pipecat.ai/api-reference/server/services/tts/mistral), [Neuphonic](https://docs.pipecat.ai/api-reference/server/services/tts/neuphonic), [NVIDIA Riva](https://docs.pipecat.ai/api-reference/server/services/tts/riva), [OpenAI](https://docs.pipecat.ai/api-reference/server/services/tts/openai), [Piper](https://docs.pipecat.ai/api-reference/server/services/tts/piper), [Resemble](https://docs.pipecat.ai/api-reference/server/services/tts/resemble), [Rime](https://docs.pipecat.ai/api-reference/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/api-reference/server/services/tts/sarvam), [Smallest](https://docs.pipecat.ai/api-reference/server/services/tts/smallest), [Soniox](https://docs.pipecat.ai/api-reference/server/services/tts/soniox), [Speechmatics](https://docs.pipecat.ai/api-reference/server/services/tts/speechmatics), [xAI](https://docs.pipecat.ai/api-reference/server/services/tts/xai), [XTTS](https://docs.pipecat.ai/api-reference/server/services/tts/xtts) | +| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/api-reference/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/api-reference/server/services/s2s/gemini), [Grok Voice Agent](https://docs.pipecat.ai/api-reference/server/services/s2s/grok), [OpenAI Realtime](https://docs.pipecat.ai/api-reference/server/services/s2s/openai), [Ultravox](https://docs.pipecat.ai/api-reference/server/services/s2s/ultravox), | +| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/api-reference/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/api-reference/server/services/transport/fastapi-websocket), [LiveKit (WebRTC)](https://docs.pipecat.ai/api-reference/server/services/transport/livekit), [SmallWebRTCTransport](https://docs.pipecat.ai/api-reference/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/api-reference/server/services/transport/websocket-server), [WhatsApp](https://docs.pipecat.ai/api-reference/server/services/transport/whatsapp), Local | +| Serializers | [Exotel](https://docs.pipecat.ai/api-reference/server/services/serializers/exotel), [Genesys](https://docs.pipecat.ai/api-reference/server/services/serializers/genesys), [Plivo](https://docs.pipecat.ai/api-reference/server/services/serializers/plivo), [Twilio](https://docs.pipecat.ai/api-reference/server/services/serializers/twilio), [Telnyx](https://docs.pipecat.ai/api-reference/server/services/serializers/telnyx), [Vonage](https://docs.pipecat.ai/api-reference/server/services/serializers/vonage) | +| Video | [HeyGen](https://docs.pipecat.ai/api-reference/server/services/video/heygen), [LemonSlice](https://docs.pipecat.ai/api-reference/server/services/transport/lemonslice), [Tavus](https://docs.pipecat.ai/api-reference/server/services/video/tavus), [Simli](https://docs.pipecat.ai/api-reference/server/services/video/simli) | +| Memory | [mem0](https://docs.pipecat.ai/api-reference/server/services/memory/mem0) | +| Vision & Image | [fal](https://docs.pipecat.ai/api-reference/server/services/image-generation/fal), [Google Imagen](https://docs.pipecat.ai/api-reference/server/services/image-generation/google-imagen), [Moondream](https://docs.pipecat.ai/api-reference/server/services/vision/moondream) | +| Audio Processing | [Silero VAD](https://docs.pipecat.ai/api-reference/server/utilities/audio/silero-vad-analyzer), [Krisp Viva](https://docs.pipecat.ai/guides/features/krisp-viva), [Koala](https://docs.pipecat.ai/api-reference/server/utilities/audio/koala-filter), [ai-coustics](https://docs.pipecat.ai/api-reference/server/utilities/audio/aic-filter), [RNNoise](https://docs.pipecat.ai/api-reference/server/utilities/audio/rnnoise-filter) | +| Analytics & Metrics | [OpenTelemetry](https://docs.pipecat.ai/api-reference/server/utilities/opentelemetry), [Sentry](https://docs.pipecat.ai/api-reference/server/services/analytics/sentry) | +| Community | [Browse community integrations →](https://docs.pipecat.ai/api-reference/server/services/community-integrations) | 📚 [View full services documentation →](https://docs.pipecat.ai/api-reference/server/services/supported-services) diff --git a/examples/voice/voice-soniox.py b/examples/voice/voice-soniox.py index b89106d14..62da4b8f1 100644 --- a/examples/voice/voice-soniox.py +++ b/examples/voice/voice-soniox.py @@ -22,9 +22,9 @@ from pipecat.processors.aggregators.llm_response_universal import ( ) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport -from pipecat.services.cartesia.tts import CartesiaTTSService from pipecat.services.openai.llm import OpenAILLMService from pipecat.services.soniox.stt import SonioxSTTService +from pipecat.services.soniox.tts import SonioxTTSService from pipecat.transcriptions.language import Language from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams @@ -61,10 +61,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ), ) - tts = CartesiaTTSService( - api_key=os.environ["CARTESIA_API_KEY"], - settings=CartesiaTTSService.Settings( - voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + tts = SonioxTTSService( + api_key=os.environ["SONIOX_API_KEY"], + settings=SonioxTTSService.Settings( + voice="Maya", ), ) diff --git a/src/pipecat/services/soniox/tts.py b/src/pipecat/services/soniox/tts.py new file mode 100644 index 000000000..45c88156e --- /dev/null +++ b/src/pipecat/services/soniox/tts.py @@ -0,0 +1,561 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Soniox text-to-speech service implementation. + +This module provides a WebSocket-based TTS service using the Soniox real-time +Text-to-Speech API. It streams text to the server incrementally and receives +audio back as base64-encoded chunks, multiplexed across multiple concurrent +streams by ``stream_id``. + +Soniox API reference: https://soniox.com/docs/tts/api-reference/websocket-api +""" + +import asyncio +import base64 +import json +from collections.abc import AsyncGenerator +from dataclasses import dataclass +from typing import Any + +from loguru import logger + +from pipecat.frames.frames import ( + CancelFrame, + EndFrame, + ErrorFrame, + Frame, + StartFrame, + TTSAudioRawFrame, + TTSStoppedFrame, +) +from pipecat.services.settings import TTSSettings +from pipecat.services.tts_service import TextAggregationMode, WebsocketTTSService +from pipecat.transcriptions.language import Language, resolve_language +from pipecat.utils.tracing.service_decorators import traced_tts + +try: + import websockets + from websockets.asyncio.client import connect as websocket_connect + from websockets.protocol import State +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error("In order to use Soniox, you need to `pip install pipecat-ai[soniox]`.") + raise Exception(f"Missing module: {e}") + + +# Soniox idle timeout is 20-30s; keepalive cadence must stay well inside it. +KEEPALIVE_INTERVAL_SECONDS = 20 + +# Soniox-supported sample rates for raw PCM formats +VALID_SAMPLE_RATES = {8000, 16000, 24000, 44100, 48000} + + +def language_to_soniox_tts_language(language: Language) -> str | None: + """Convert a Pipecat Language to a Soniox TTS language code. + + For the full list of supported languages, see: + https://soniox.com/docs/tts/concepts/languages + """ + LANGUAGE_MAP = { + Language.AF: "af", + Language.AR: "ar", + Language.AZ: "az", + Language.BE: "be", + Language.BG: "bg", + Language.BN: "bn", + Language.BS: "bs", + Language.CA: "ca", + Language.CS: "cs", + Language.CY: "cy", + Language.DA: "da", + Language.DE: "de", + Language.EL: "el", + Language.EN: "en", + Language.ES: "es", + Language.ET: "et", + Language.EU: "eu", + Language.FA: "fa", + Language.FI: "fi", + Language.FR: "fr", + Language.GL: "gl", + Language.GU: "gu", + Language.HE: "he", + Language.HI: "hi", + Language.HR: "hr", + Language.HU: "hu", + Language.ID: "id", + Language.IT: "it", + Language.JA: "ja", + Language.KK: "kk", + Language.KN: "kn", + Language.KO: "ko", + Language.LT: "lt", + Language.LV: "lv", + Language.MK: "mk", + Language.ML: "ml", + Language.MR: "mr", + Language.MS: "ms", + Language.NL: "nl", + Language.NO: "no", + Language.PA: "pa", + Language.PL: "pl", + Language.PT: "pt", + Language.RO: "ro", + Language.RU: "ru", + Language.SK: "sk", + Language.SL: "sl", + Language.SQ: "sq", + Language.SR: "sr", + Language.SV: "sv", + Language.SW: "sw", + Language.TA: "ta", + Language.TE: "te", + Language.TH: "th", + Language.TL: "tl", + Language.TR: "tr", + Language.UK: "uk", + Language.UR: "ur", + Language.VI: "vi", + Language.ZH: "zh", + } + return resolve_language(language, LANGUAGE_MAP, use_base_code=True) + + +@dataclass +class SonioxTTSSettings(TTSSettings): + """Settings for SonioxTTSService. + + ``voice``, ``model``, and ``language`` travel in the per-stream + config message, so changing any of them does not require reconnecting the + WebSocket. The current context is flushed so the next stream opens with the + new values. + """ + + pass + + +class SonioxTTSService(WebsocketTTSService): + """Soniox WebSocket TTS service with streaming text-in, streaming audio-out. + + Streams text incrementally to Soniox's real-time TTS endpoint and routes the + returned base64-encoded audio back as :class:`TTSAudioRawFrame` frames. + Multiple concurrent streams are multiplexed over a single WebSocket + connection via Pipecat's audio-context mechanism (mapped to Soniox's + ``stream_id``). Supports up to 5 concurrent streams per connection. + + For complete API documentation, see: + https://soniox.com/docs/tts/api-reference/websocket-api + """ + + Settings = SonioxTTSSettings + _settings: Settings + + def __init__( + self, + *, + api_key: str, + url: str = "wss://tts-rt.soniox.com/tts-websocket", + sample_rate: int | None = None, + audio_format: str = "pcm_s16le", + settings: Settings | None = None, + text_aggregation_mode: TextAggregationMode | None = None, + **kwargs, + ): + """Initialize the Soniox TTS service. + + Args: + api_key: Soniox API key for authentication. Create API keys at + https://console.soniox.com. + url: WebSocket URL for the Soniox TTS endpoint. + sample_rate: Output sample rate in Hz. Must be one of + ``{8000, 16000, 24000, 44100, 48000}`` when using a raw PCM + audio format. If ``None``, inherits from the pipeline. + audio_format: Output audio format. Defaults to ``"pcm_s16le"``, + which matches Pipecat's downstream audio pipeline. + settings: Runtime-updatable settings. When provided alongside + deprecated parameters, ``settings`` values take precedence. + text_aggregation_mode: How to aggregate incoming text before + synthesis. Defaults to ``TextAggregationMode.SENTENCE``. + **kwargs: Additional arguments passed to the parent service. + """ + # Initialize default_settings + default_settings = self.Settings( + model="tts-rt-v1-preview", + voice="Adrian", + language=Language.EN, + ) + + # Settings delta (canonical API, always wins) + if settings is not None: + default_settings.apply_update(settings) + + super().__init__( + text_aggregation_mode=text_aggregation_mode, + # Soniox doesn't expose alignment data, so TTSTextFrames can be + # pushed immediately by the base class. + push_text_frames=True, + # We push TTSStoppedFrame ourselves when Soniox sends `terminated`. + push_stop_frames=False, + # Let the base class create audio contexts and emit TTSStartedFrame. + push_start_frame=True, + pause_frame_processing=False, + sample_rate=sample_rate, + settings=default_settings, + **kwargs, + ) + + self._api_key = api_key + self._url = url + + # Init-only audio format (not runtime-updatable). + self._audio_format = audio_format + + # Tracks which context_ids have had their per-stream config sent. + # Soniox rejects duplicate config for the same stream_id. + self._configured_contexts: set[str] = set() + + self._receive_task: asyncio.Task | None = None + self._keepalive_task: asyncio.Task | None = None + + def can_generate_metrics(self) -> bool: + """Check if this service can generate processing metrics. + + Returns: + True, as Soniox TTS supports metrics generation. + """ + return True + + def language_to_service_language(self, language: Language) -> str | None: + """Convert a Language enum to a Soniox TTS language code. + + Args: + language: The language to convert. + + Returns: + The Soniox-specific language code, or None if not supported. + """ + return language_to_soniox_tts_language(language) + + async def start(self, frame: StartFrame): + """Start the Soniox TTS service. + + Args: + frame: The start frame containing initialization parameters. + """ + await super().start(frame) + if self._audio_format.startswith("pcm_") and self.sample_rate not in VALID_SAMPLE_RATES: + logger.warning( + f"{self}: sample_rate={self.sample_rate} is not in Soniox supported rates " + f"{sorted(VALID_SAMPLE_RATES)}; the server may reject the stream." + ) + await self._connect() + + async def stop(self, frame: EndFrame): + """Stop the Soniox TTS service. + + Args: + frame: The end frame. + """ + await super().stop(frame) + await self._disconnect() + + async def cancel(self, frame: CancelFrame): + """Cancel the Soniox TTS service. + + Args: + frame: The cancel frame. + """ + await super().cancel(frame) + await self._disconnect() + + async def flush_audio(self, context_id: str | None = None): + """Flush any pending audio and finalize the current stream. + + Args: + context_id: The specific context to flush. If ``None``, falls back + to the currently active context. + """ + flush_id = context_id or self.get_active_audio_context_id() + if not flush_id or not self._websocket: + return + logger.trace(f"{self}: flushing audio for stream {flush_id}") + msg = {"text": "", "text_end": True, "stream_id": flush_id} + await self._websocket.send(json.dumps(msg)) + + async def _close_stream(self, context_id: str): + """Cancel a Soniox stream and forget local state. + + Mirrors Inworld's ``_close_context``. ``cancel:true`` works on any + currently-open stream (Soniox replies with ``terminated``). Gated on + ``_configured_contexts`` because ``cancel`` on a stream_id Soniox + never saw would error. Do not call after ``text_end:true`` — that + already terminates the stream. + """ + if context_id in self._configured_contexts: + if self._websocket and self._websocket.state is State.OPEN: + try: + msg = {"stream_id": context_id, "cancel": True} + await self._websocket.send(json.dumps(msg)) + except Exception as e: + logger.warning(f"{self}: failed to cancel stream {context_id}: {e}") + self._configured_contexts.discard(context_id) + + async def on_turn_context_created(self, context_id: str): + """Eagerly open the Soniox stream when a new turn context is created. + + Overlaps Soniox-side stream creation with sentence aggregation so the + stream is ready by the time text reaches ``run_tts``. + """ + try: + await self._send_config(context_id) + except Exception as e: + logger.warning(f"{self}: failed to pre-open Soniox stream {context_id}: {e}") + + async def on_turn_context_completed(self): + """Cancel any eagerly-opened Soniox stream that never received text. + + The base class sends ``text_end:true`` (via ``flush_audio``) for + streams that received text — that already terminates the stream. For + an empty turn (e.g., the LLM produced only tool calls), no text + reaches ``run_tts`` and the eager-opened stream would otherwise sit + until Soniox's per-stream idle timer fires. Cancel it here. + """ + ctx_id = self._turn_context_id + was_active = ctx_id is not None and self.audio_context_available(ctx_id) + await super().on_turn_context_completed() + if ctx_id is not None and not was_active: + await self._close_stream(ctx_id) + + async def on_audio_context_interrupted(self, context_id: str): + """Cancel the active Soniox stream when the bot is interrupted.""" + await self.stop_all_metrics() + await self._close_stream(context_id) + await super().on_audio_context_interrupted(context_id) + + async def _update_settings(self, delta: TTSSettings) -> dict[str, Any]: + """Apply a settings delta, flushing the active stream if needed. + + All Soniox config fields live in the per-stream config message, so + changes take effect on the next stream. The current stream is flushed + so subsequent sentences in this turn open a fresh stream with the + updated values. + + Args: + delta: A TTS settings delta. + + Returns: + Dict mapping changed field names to their previous values. + """ + changed = await super()._update_settings(delta) + if not changed: + return changed + + if changed.keys() & {"voice", "model", "language"}: + if self._turn_context_id and self.audio_context_available(self._turn_context_id): + await self.flush_audio(context_id=self._turn_context_id) + # Assign a new turn context ID so subsequent sentences in this turn + # open a new Soniox stream with the updated settings. + if self._turn_context_id: + self._turn_context_id = None + self._turn_context_id = self.create_context_id() + + return changed + + async def _connect(self): + await super()._connect() + + await self._connect_websocket() + + if self._websocket and not self._receive_task: + self._receive_task = self.create_task(self._receive_task_handler(self._report_error)) + + if self._websocket and not self._keepalive_task: + self._keepalive_task = self.create_task(self._keepalive_task_handler()) + + async def _disconnect(self): + await super()._disconnect() + + if self._receive_task: + await self.cancel_task(self._receive_task) + self._receive_task = None + + if self._keepalive_task: + await self.cancel_task(self._keepalive_task) + self._keepalive_task = None + + await self._disconnect_websocket() + + async def _connect_websocket(self): + try: + if self._websocket and self._websocket.state is State.OPEN: + return + logger.debug("Connecting to Soniox TTS") + # Soniox expects the api_key in the per-stream config message, not + # as a header or query param, so the connect call is bare. + self._websocket = await websocket_connect(self._url) + await self._call_event_handler("on_connected") + except Exception as e: + self._websocket = None + await self.push_error(error_msg=f"Unable to connect to Soniox TTS: {e}", exception=e) + await self._call_event_handler("on_connection_error", f"{e}") + + async def _disconnect_websocket(self): + try: + await self.stop_all_metrics() + if self._websocket: + logger.debug("Disconnecting from Soniox TTS") + await self._websocket.close() + except Exception as e: + await self.push_error(error_msg=f"Error closing Soniox websocket: {e}", exception=e) + finally: + await self.remove_active_audio_context() + self._configured_contexts.clear() + self._websocket = None + await self._call_event_handler("on_disconnected") + + def _get_websocket(self): + if self._websocket: + return self._websocket + raise Exception("Websocket not connected") + + def _build_config_msg(self, context_id: str) -> dict[str, Any]: + """Build the per-stream configuration message for a new stream_id.""" + s = self._settings + config: dict[str, Any] = { + "api_key": self._api_key, + "stream_id": context_id, + "model": s.model, + "voice": s.voice, + "audio_format": self._audio_format, + } + if s.language is not None: + config["language"] = s.language + if self._audio_format.startswith("pcm_"): + config["sample_rate"] = self.sample_rate + return config + + async def _send_config(self, context_id: str): + """Send the per-stream config for ``context_id``, idempotently. + + Soniox rejects duplicate config for the same stream_id, so the set of + already-configured contexts gates the send. Mirrors Inworld's + ``_send_context``. + """ + if context_id in self._configured_contexts: + return + config = self._build_config_msg(context_id) + await self._get_websocket().send(json.dumps(config)) + self._configured_contexts.add(context_id) + logger.trace(f"{self}: opened Soniox stream {context_id}") + + async def _keepalive_task_handler(self): + """Send periodic keepalive messages to prevent Soniox's idle timeout. + + Soniox closes idle connections after 20-30s; sending ``{"keep_alive": true}`` + resets the timer without triggering synthesis. + """ + while True: + await asyncio.sleep(KEEPALIVE_INTERVAL_SECONDS) + try: + if self._websocket and self._websocket.state is State.OPEN: + await self._websocket.send(json.dumps({"keep_alive": True})) + logger.trace(f"{self}: sent Soniox keepalive") + except websockets.ConnectionClosed as e: + logger.warning(f"{self} keepalive error: {e}") + break + except Exception as e: + logger.warning(f"{self}: unexpected keepalive error: {e}") + break + + async def _receive_messages(self): + """Handle incoming WebSocket messages from Soniox. + + Routes audio, error, and terminal events to the appropriate audio + contexts. A failed stream does not close the WebSocket; other active + streams continue uninterrupted. + """ + async for message in self._get_websocket(): + try: + msg = json.loads(message) + except json.JSONDecodeError: + logger.warning(f"{self}: received non-JSON Soniox message: {message!r}") + continue + + stream_id = msg.get("stream_id") + + error_code = msg.get("error_code") + if error_code is not None: + error_message = msg.get("error_message", "") + await self.push_error( + error_msg=f"Soniox TTS error {error_code} (stream {stream_id}): {error_message}" + ) + if stream_id and self.audio_context_available(stream_id): + await self.append_to_audio_context( + stream_id, TTSStoppedFrame(context_id=stream_id) + ) + await self.remove_audio_context(stream_id) + self._configured_contexts.discard(stream_id) + continue + + if msg.get("terminated"): + if stream_id and self.audio_context_available(stream_id): + await self.append_to_audio_context( + stream_id, TTSStoppedFrame(context_id=stream_id) + ) + await self.remove_audio_context(stream_id) + self._configured_contexts.discard(stream_id) + continue + + audio_b64 = msg.get("audio") + if audio_b64 and stream_id and self.audio_context_available(stream_id): + await self.stop_ttfb_metrics() + audio = base64.b64decode(audio_b64) + frame = TTSAudioRawFrame(audio, self.sample_rate, 1, context_id=stream_id) + await self.append_to_audio_context(stream_id, frame) + + # audio_end is informational; the real end-of-stream signal is + # `terminated`, handled above. + + @traced_tts + async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame | None, None]: + """Stream text to Soniox and deliver synthesized audio asynchronously. + + The first ``run_tts`` call for a given ``context_id`` sends the + per-stream config message; subsequent calls within the same stream + send only text chunks. Audio arrives via the receive loop and is + appended to the matching audio context. + + Args: + text: The text to synthesize. + context_id: The audio context (maps to Soniox ``stream_id``). + + Yields: + ``None`` — audio frames are delivered out of band via the receive + task and the audio-context queue. + """ + if self._is_streaming_tokens: + logger.trace(f"{self}: Generating TTS [{text}]") + else: + logger.debug(f"{self}: Generating TTS [{text}]") + + try: + if not self._websocket or self._websocket.state is State.CLOSED: + await self._connect() + + try: + text_msg = {"text": text, "text_end": False, "stream_id": context_id} + await self._get_websocket().send(json.dumps(text_msg)) + await self.start_tts_usage_metrics(text) + except Exception as e: + yield ErrorFrame(error=f"Unknown error occurred: {e}") + yield TTSStoppedFrame(context_id=context_id) + await self._disconnect() + await self._connect() + return + yield None + except Exception as e: + yield ErrorFrame(error=f"Unknown error occurred: {e}")