From 1e31fc7f9b099b663d884b339d7a04c164298e4b Mon Sep 17 00:00:00 2001 From: shreyas-sarvam Date: Thu, 9 Oct 2025 22:09:25 +0530 Subject: [PATCH 01/11] fix: Format errors --- pyproject.toml | 3 +- src/pipecat/services/sarvam/stt.py | 411 +++++++++++++++++++++++++++++ uv.lock | 22 +- 3 files changed, 434 insertions(+), 2 deletions(-) create mode 100644 src/pipecat/services/sarvam/stt.py diff --git a/pyproject.toml b/pyproject.toml index c04dd2433..1aa72605d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ dependencies = [ # Pinning numba to resolve package dependencies "numba==0.61.2", "wait_for2>=0.4.1; python_version<'3.12'", + "sarvamai==0.1.21", ] [project.urls] @@ -93,7 +94,7 @@ rime = [ "pipecat-ai[websockets-base]" ] riva = [ "nvidia-riva-client~=2.21.1" ] runner = [ "python-dotenv>=1.0.0,<2.0.0", "uvicorn>=0.32.0,<1.0.0", "fastapi>=0.115.6,<0.117.0", "pipecat-ai-small-webrtc-prebuilt>=1.0.0"] sambanova = [] -sarvam = [ "pipecat-ai[websockets-base]" ] +sarvam = [ "sarvamai==0.1.21", "websockets>=13.1,<15.0" ] sentry = [ "sentry-sdk>=2.28.0,<3" ] local-smart-turn = [ "coremltools>=8.0", "transformers", "torch>=2.5.0,<3", "torchaudio>=2.5.0,<3" ] local-smart-turn-v3 = [ "transformers", "onnxruntime>=1.20.1,<2" ] diff --git a/src/pipecat/services/sarvam/stt.py b/src/pipecat/services/sarvam/stt.py new file mode 100644 index 000000000..77203e13a --- /dev/null +++ b/src/pipecat/services/sarvam/stt.py @@ -0,0 +1,411 @@ +"""Sarvam AI Speech-to-Text service implementation. + +This module provides a streaming Speech-to-Text service using Sarvam AI's WebSocket-based +API. It supports real-time transcription with Voice Activity Detection (VAD) and +can handle multiple audio formats for Indian language speech recognition. +""" + +import asyncio +import base64 +from enum import StrEnum +from typing import Literal, Optional + +from loguru import logger +from pydantic import BaseModel + +from pipecat.frames.frames import ( + CancelFrame, + EndFrame, + ErrorFrame, + StartFrame, + TranscriptionFrame, +) +from pipecat.services.stt_service import STTService +from pipecat.transcriptions.language import Language +from pipecat.utils.time import time_now_iso8601 +from pipecat.utils.tracing.service_decorators import traced_stt + +try: + from sarvamai import AsyncSarvamAI + from sarvamai.core.api_error import ApiError + from sarvamai.core.events import EventType +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error("In order to use Sarvam, you need to `pip install pipecat-ai[sarvam]`.") + raise Exception(f"Missing module: {e}") + + +class TranscriptionMetrics(BaseModel): + """Metrics for transcription performance.""" + + audio_duration: float + processing_latency: float + + +class TranscriptionData(BaseModel): + """Data structure for transcription results.""" + + request_id: str + transcript: str + language_code: Optional[str] + metrics: Optional[TranscriptionMetrics] = None + is_final: Optional[bool] = None + + +class TranscriptionResponse(BaseModel): + """Response structure for transcription data.""" + + type: Literal["data"] + data: TranscriptionData + + +class VADSignal(StrEnum): + """Voice Activity Detection signal types.""" + + START = "START_SPEECH" + END = "END_SPEECH" + + +class EventData(BaseModel): + """Data structure for VAD events.""" + + signal_type: VADSignal + occured_at: float + + +class EventResponse(BaseModel): + """Response structure for VAD events.""" + + type: Literal["events"] + data: EventData + + +def language_to_sarvam_language(language: Language) -> str: + """Convert a Language enum to Sarvam's language code format. + + Args: + language: The Language enum value to convert. + + Returns: + The Sarvam language code string. + """ + # Mapping of pipecat Language enum to Sarvam language codes + SARVAM_LANGUAGES = { + Language.BN_IN: "bn-IN", + Language.GU_IN: "gu-IN", + Language.HI_IN: "hi-IN", + Language.KN_IN: "kn-IN", + Language.ML_IN: "ml-IN", + Language.MR_IN: "mr-IN", + Language.TA_IN: "ta-IN", + Language.TE_IN: "te-IN", + Language.PA_IN: "pa-IN", + Language.OR_IN: "od-IN", + Language.EN_US: "en-US", + Language.EN_IN: "en-IN", + Language.AS_IN: "as-IN", + } + + return SARVAM_LANGUAGES.get(language, "hi-IN") # Default to Hindi + + +class SarvamSTTService(STTService): + """Sarvam speech-to-text service. + + Provides real-time speech recognition using Sarvam's WebSocket API. + """ + + def __init__( + self, + *, + api_key: str, + model: str = "saarika:v2.5", + language_code: Language = Language.HI_IN, + sample_rate: Optional[int] = None, + **kwargs, + ): + """Initialize the Sarvam STT service. + + Args: + api_key: Sarvam API key for authentication. + model: Sarvam model to use for transcription. + language_code: Language enum for transcription (e.g., Language.HI_IN, Language.KN_IN). + sample_rate: Audio sample rate. Defaults to 16000 if not specified. + **kwargs: Additional arguments passed to the parent STTService. + """ + super().__init__(sample_rate=sample_rate, **kwargs) + + self.set_model_name(model) + self._api_key = api_key + self._model = model + self._language_code = language_code + self._language_string = language_to_sarvam_language(language_code) + + # Initialize Sarvam SDK client + self._sarvam_client = AsyncSarvamAI(api_subscription_key=api_key) + self._websocket_context = None + self._socket_client = None + self._listening_task = None + + def language_to_service_language(self, language: Language) -> str: + """Convert pipecat Language enum to Sarvam's language code. + + Args: + language: The Language enum value to convert. + + Returns: + The Sarvam language code string. + """ + return language_to_sarvam_language(language) + + def can_generate_metrics(self) -> bool: + """Check if this service can generate processing metrics. + + Returns: + True, as Sarvam service supports metrics generation. + """ + return True + + async def set_model(self, model: str): + """Set the Sarvam model and reconnect. + + Args: + model: The Sarvam model name to use. + """ + await super().set_model(model) + logger.info(f"Switching STT model to: [{model}]") + self._model = model + await self._disconnect() + await self._connect() + + async def start(self, frame: StartFrame): + """Start the Sarvam STT service. + + Args: + frame: The start frame containing initialization parameters. + """ + await super().start(frame) + await self._connect() + + async def stop(self, frame: EndFrame): + """Stop the Sarvam STT service. + + Args: + frame: The end frame. + """ + await super().stop(frame) + await self._disconnect() + + async def cancel(self, frame: CancelFrame): + """Cancel the Sarvam STT service. + + Args: + frame: The cancel frame. + """ + await super().cancel(frame) + await self._disconnect() + + async def run_stt(self, audio: bytes): + """Send audio data to Sarvam for transcription. + + Args: + audio: Raw audio bytes to transcribe. + + Yields: + Frame: None (transcription results come via WebSocket callbacks). + """ + if not self._socket_client: + logger.warning("WebSocket not connected, cannot process audio") + yield None + return + + try: + # Convert audio bytes to base64 for Sarvam API + audio_base64 = base64.b64encode(audio).decode("utf-8") + + # Use appropriate method based on service type + if "saarika" in self._model.lower(): + # STT service + await self._socket_client.transcribe( + audio=audio_base64, encoding="audio/wav", sample_rate=self.sample_rate + ) + else: + # STT-translate service + await self._socket_client.translate( + audio=audio_base64, encoding="audio/wav", sample_rate=self.sample_rate + ) + + except Exception as e: + logger.error(f"Error sending audio to Sarvam: {e}") + await self.push_error(ErrorFrame(f"Failed to send audio: {e}")) + + yield None + + async def _connect(self): + """Connect to Sarvam WebSocket API using the SDK.""" + logger.debug("Connecting to Sarvam") + + try: + # Choose the appropriate service based on model + if "saarika" in self._model.lower(): + # STT service - requires language_code + logger.debug(f"Using STT service with language: {self._language_string}") + self._websocket_context = self._sarvam_client.speech_to_text_streaming.connect( + language_code=self._language_string, + model=self._model, + vad_signals=True, + high_vad_sensitivity=True, + sample_rate=str(self.sample_rate), + input_audio_codec="wav", + ) + else: + # STT-translate service - auto-detects language + logger.debug("Using STT-translate service") + self._websocket_context = ( + self._sarvam_client.speech_to_text_translate_streaming.connect( + model=self._model, + vad_signals=True, + high_vad_sensitivity=True, + sample_rate=str(self.sample_rate), + input_audio_codec="wav", + ) + ) + + # Enter the async context manager + self._socket_client = await self._websocket_context.__aenter__() + + # Set up event handlers + def on_open(data): + logger.debug("WebSocket connection opened") + + def on_message(message): + # Handle message in a separate task to avoid blocking + asyncio.create_task(self._handle_response(message)) + + def on_error(error): + logger.error(f"WebSocket error: {error}") + asyncio.create_task(self.push_error(ErrorFrame(f"WebSocket error: {error}"))) + + def on_close(data): + logger.debug("WebSocket connection closed") + + # Register event handlers + self._socket_client.on(EventType.OPEN, on_open) + self._socket_client.on(EventType.MESSAGE, on_message) + self._socket_client.on(EventType.ERROR, on_error) + self._socket_client.on(EventType.CLOSE, on_close) + + # Start listening for messages + self._listening_task = asyncio.create_task(self._socket_client.start_listening()) + + logger.info("Connected to Sarvam successfully") + + except ApiError as e: + logger.error(f"Sarvam API error: {e}") + await self.push_error(ErrorFrame(f"Sarvam API error: {e}")) + except Exception as e: + logger.error(f"Failed to connect to Sarvam: {e}") + self._socket_client = None + self._websocket_context = None + await self.push_error(ErrorFrame(f"Failed to connect to Sarvam: {e}")) + + async def _disconnect(self): + """Disconnect from Sarvam WebSocket API using SDK.""" + if self._listening_task: + self._listening_task.cancel() + try: + await self._listening_task + except asyncio.CancelledError: + pass + self._listening_task = None + + if self._websocket_context and self._socket_client: + try: + # Exit the async context manager + await self._websocket_context.__aexit__(None, None, None) + except Exception as e: + logger.error(f"Error closing WebSocket connection: {e}") + finally: + logger.debug("Disconnected from Sarvam WebSocket") + self._socket_client = None + self._websocket_context = None + + async def _handle_response(self, message): + """Handle transcription response from Sarvam SDK. + + Args: + message: The parsed response object from Sarvam WebSocket. + """ + logger.debug(f"Received response: {message}") + + try: + if message.type == "events": + # VAD event + signal = message.data.signal_type + timestamp = message.data.occured_at + logger.debug(f"VAD Signal: {signal}, Occurred at: {timestamp}") + + if signal == VADSignal.START: + await self.start_metrics() + logger.debug("User started speaking") + await self._call_event_handler("on_speech_started") + + elif message.type == "data": + await self.stop_ttfb_metrics() + transcript = message.data.transcript + language_code = message.data.language_code + if language_code is None: + language_code = "hi-IN" + language = self._map_language_code_to_enum(language_code) + + # Emit utterance end event + await self._call_event_handler("on_utterance_end") + + if transcript and transcript.strip(): + await self.push_frame( + TranscriptionFrame( + transcript, + self._user_id, + time_now_iso8601(), + language, + result=(message.dict() if hasattr(message, "dict") else str(message)), + ) + ) + + await self.stop_processing_metrics() + + except Exception as e: + logger.error(f"Error handling Sarvam response: {e}") + await self.push_error(ErrorFrame(f"Failed to handle response: {e}")) + + def _map_language_code_to_enum(self, language_code: str) -> Language: + """Map Sarvam language code to pipecat Language enum.""" + logger.debug(f"Audio language detected as: {language_code}") + mapping = { + "bn-IN": Language.BN_IN, + "gu-IN": Language.GU_IN, + "hi-IN": Language.HI_IN, + "kn-IN": Language.KN_IN, + "ml-IN": Language.ML_IN, + "mr-IN": Language.MR_IN, + "ta-IN": Language.TA_IN, + "te-IN": Language.TE_IN, + "pa-IN": Language.PA_IN, + "od-IN": Language.OR_IN, + "en-US": Language.EN_US, + "en-IN": Language.EN_IN, + "as-IN": Language.AS_IN, + } + return mapping.get(language_code, Language.HI_IN) + + async def start_metrics(self): + """Start TTFB and processing metrics collection.""" + await self.start_ttfb_metrics() + await self.start_processing_metrics() + + @traced_stt + async def _handle_transcription( + self, transcript: str, is_final: bool, language: Optional[Language] = None + ): + """Handle a transcription result with tracing.""" + pass diff --git a/uv.lock b/uv.lock index d7f8aebee..a49368358 100644 --- a/uv.lock +++ b/uv.lock @@ -4316,6 +4316,7 @@ dependencies = [ { name = "pydantic" }, { name = "pyloudnorm" }, { name = "resampy" }, + { name = "sarvamai" }, { name = "soxr" }, { name = "wait-for2", marker = "python_full_version < '3.12'" }, ] @@ -4461,6 +4462,7 @@ runner = [ { name = "uvicorn" }, ] sarvam = [ + { name = "sarvamai" }, { name = "websockets" }, ] sentry = [ @@ -4601,7 +4603,6 @@ requires-dist = [ { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'openai'" }, { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'playht'" }, { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'rime'" }, - { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'sarvam'" }, { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'soniox'" }, { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'websocket'" }, { name = "pipecat-ai-krisp", marker = "extra == 'krisp'", specifier = "~=0.4.0" }, @@ -4615,6 +4616,8 @@ requires-dist = [ { name = "python-dotenv", marker = "extra == 'runner'", specifier = ">=1.0.0,<2.0.0" }, { name = "pyvips", extras = ["binary"], marker = "extra == 'moondream'", specifier = "~=3.0.0" }, { name = "resampy", specifier = "~=0.4.3" }, + { name = "sarvamai", specifier = "==0.1.21" }, + { name = "sarvamai", marker = "extra == 'sarvam'", specifier = "==0.1.21" }, { name = "sentry-sdk", marker = "extra == 'sentry'", specifier = ">=2.28.0,<3" }, { name = "simli-ai", marker = "extra == 'simli'", specifier = "~=0.1.10" }, { name = "soundfile", marker = "extra == 'soundfile'", specifier = "~=0.13.0" }, @@ -4632,6 +4635,7 @@ requires-dist = [ { name = "uvicorn", marker = "extra == 'runner'", specifier = ">=0.32.0,<1.0.0" }, { name = "vllm", marker = "extra == 'ultravox'", specifier = ">=0.9.0" }, { name = "wait-for2", marker = "python_full_version < '3.12'", specifier = ">=0.4.1" }, + { name = "websockets", marker = "extra == 'sarvam'", specifier = ">=13.1,<15.0" }, { name = "websockets", marker = "extra == 'websockets-base'", specifier = ">=13.1,<16.0" }, ] provides-extras = ["aic", "anthropic", "assemblyai", "asyncai", "aws", "aws-nova-sonic", "azure", "cartesia", "cerebras", "deepseek", "daily", "deepgram", "elevenlabs", "fal", "fireworks", "fish", "gladia", "google", "grok", "groq", "gstreamer", "heygen", "hume", "inworld", "krisp", "koala", "langchain", "livekit", "lmnt", "local", "mcp", "mem0", "mistral", "mlx-whisper", "moondream", "nim", "neuphonic", "noisereduce", "openai", "openpipe", "openrouter", "perplexity", "playht", "qwen", "rime", "riva", "runner", "sambanova", "sarvam", "sentry", "local-smart-turn", "local-smart-turn-v3", "remote-smart-turn", "silero", "simli", "soniox", "soundfile", "speechmatics", "strands", "tavus", "together", "tracing", "ultravox", "webrtc", "websocket", "websockets-base", "whisper"] @@ -6289,6 +6293,22 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/2c/c3/c0be1135726618dc1e28d181b8c442403d8dbb9e273fd791de2d4384bcdd/safetensors-0.6.2-cp38-abi3-win_amd64.whl", hash = "sha256:c7b214870df923cbc1593c3faee16bec59ea462758699bd3fee399d00aac072c", size = 320192, upload-time = "2025-08-08T13:13:59.467Z" }, ] +[[package]] +name = "sarvamai" +version = "0.1.21" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "httpx" }, + { name = "pydantic" }, + { name = "pydantic-core" }, + { name = "typing-extensions" }, + { name = "websockets" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/e9/08/e5efcb30818ed220b818319255c22fd91e379489ebaa93efd6f444fb4987/sarvamai-0.1.21.tar.gz", hash = "sha256:865065635b2b99d40f5519308832954015627938e06a6333b5f62ae9c36278bb", size = 87386, upload-time = "2025-10-07T07:37:47.085Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2e/4e/b9933f72681b7aed91b86913337dd3981fad97027881fbc66c3c5eb03568/sarvamai-0.1.21-py3-none-any.whl", hash = "sha256:daa4e5d16635fe434f5f270cee416849249285369141d77132a17f0bf670f120", size = 175204, upload-time = "2025-10-07T07:37:46.024Z" }, +] + [[package]] name = "scipy" version = "1.15.3" From 5cc1d8a024fa9388f9681001790817e034a44392 Mon Sep 17 00:00:00 2001 From: shreyas-sarvam Date: Mon, 13 Oct 2025 10:18:15 +0530 Subject: [PATCH 02/11] refactor: Update dependencies and improve logging --- pyproject.toml | 3 +- src/pipecat/services/sarvam/stt.py | 76 ++---------------------------- uv.lock | 28 +++++++++-- 3 files changed, 29 insertions(+), 78 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 1aa72605d..f71358ff1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,7 +38,6 @@ dependencies = [ # Pinning numba to resolve package dependencies "numba==0.61.2", "wait_for2>=0.4.1; python_version<'3.12'", - "sarvamai==0.1.21", ] [project.urls] @@ -94,7 +93,7 @@ rime = [ "pipecat-ai[websockets-base]" ] riva = [ "nvidia-riva-client~=2.21.1" ] runner = [ "python-dotenv>=1.0.0,<2.0.0", "uvicorn>=0.32.0,<1.0.0", "fastapi>=0.115.6,<0.117.0", "pipecat-ai-small-webrtc-prebuilt>=1.0.0"] sambanova = [] -sarvam = [ "sarvamai==0.1.21", "websockets>=13.1,<15.0" ] +sarvam = [ "sarvamai==0.1.21", "pipecat-ai[websockets-base]" ] sentry = [ "sentry-sdk>=2.28.0,<3" ] local-smart-turn = [ "coremltools>=8.0", "transformers", "torch>=2.5.0,<3", "torchaudio>=2.5.0,<3" ] local-smart-turn-v3 = [ "transformers", "onnxruntime>=1.20.1,<2" ] diff --git a/src/pipecat/services/sarvam/stt.py b/src/pipecat/services/sarvam/stt.py index 77203e13a..27b2106a3 100644 --- a/src/pipecat/services/sarvam/stt.py +++ b/src/pipecat/services/sarvam/stt.py @@ -7,11 +7,9 @@ can handle multiple audio formats for Indian language speech recognition. import asyncio import base64 -from enum import StrEnum -from typing import Literal, Optional +from typing import Optional from loguru import logger -from pydantic import BaseModel from pipecat.frames.frames import ( CancelFrame, @@ -35,51 +33,6 @@ except ModuleNotFoundError as e: raise Exception(f"Missing module: {e}") -class TranscriptionMetrics(BaseModel): - """Metrics for transcription performance.""" - - audio_duration: float - processing_latency: float - - -class TranscriptionData(BaseModel): - """Data structure for transcription results.""" - - request_id: str - transcript: str - language_code: Optional[str] - metrics: Optional[TranscriptionMetrics] = None - is_final: Optional[bool] = None - - -class TranscriptionResponse(BaseModel): - """Response structure for transcription data.""" - - type: Literal["data"] - data: TranscriptionData - - -class VADSignal(StrEnum): - """Voice Activity Detection signal types.""" - - START = "START_SPEECH" - END = "END_SPEECH" - - -class EventData(BaseModel): - """Data structure for VAD events.""" - - signal_type: VADSignal - occured_at: float - - -class EventResponse(BaseModel): - """Response structure for VAD events.""" - - type: Literal["events"] - data: EventData - - def language_to_sarvam_language(language: Language) -> str: """Convert a Language enum to Sarvam's language code format. @@ -249,7 +202,6 @@ class SarvamSTTService(STTService): # Choose the appropriate service based on model if "saarika" in self._model.lower(): # STT service - requires language_code - logger.debug(f"Using STT service with language: {self._language_string}") self._websocket_context = self._sarvam_client.speech_to_text_streaming.connect( language_code=self._language_string, model=self._model, @@ -260,7 +212,6 @@ class SarvamSTTService(STTService): ) else: # STT-translate service - auto-detects language - logger.debug("Using STT-translate service") self._websocket_context = ( self._sarvam_client.speech_to_text_translate_streaming.connect( model=self._model, @@ -274,27 +225,6 @@ class SarvamSTTService(STTService): # Enter the async context manager self._socket_client = await self._websocket_context.__aenter__() - # Set up event handlers - def on_open(data): - logger.debug("WebSocket connection opened") - - def on_message(message): - # Handle message in a separate task to avoid blocking - asyncio.create_task(self._handle_response(message)) - - def on_error(error): - logger.error(f"WebSocket error: {error}") - asyncio.create_task(self.push_error(ErrorFrame(f"WebSocket error: {error}"))) - - def on_close(data): - logger.debug("WebSocket connection closed") - - # Register event handlers - self._socket_client.on(EventType.OPEN, on_open) - self._socket_client.on(EventType.MESSAGE, on_message) - self._socket_client.on(EventType.ERROR, on_error) - self._socket_client.on(EventType.CLOSE, on_close) - # Start listening for messages self._listening_task = asyncio.create_task(self._socket_client.start_listening()) @@ -345,7 +275,7 @@ class SarvamSTTService(STTService): timestamp = message.data.occured_at logger.debug(f"VAD Signal: {signal}, Occurred at: {timestamp}") - if signal == VADSignal.START: + if signal == "START_SPEECH": await self.start_metrics() logger.debug("User started speaking") await self._call_event_handler("on_speech_started") @@ -377,10 +307,10 @@ class SarvamSTTService(STTService): except Exception as e: logger.error(f"Error handling Sarvam response: {e}") await self.push_error(ErrorFrame(f"Failed to handle response: {e}")) + await self.stop_all_metrics() def _map_language_code_to_enum(self, language_code: str) -> Language: """Map Sarvam language code to pipecat Language enum.""" - logger.debug(f"Audio language detected as: {language_code}") mapping = { "bn-IN": Language.BN_IN, "gu-IN": Language.GU_IN, diff --git a/uv.lock b/uv.lock index a49368358..8dee6245e 100644 --- a/uv.lock +++ b/uv.lock @@ -569,6 +569,30 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/39/54/db7a801933dd2537f5376fb8a9e28caff488ef5c2d61f3a8fced55fe6336/blake3-1.0.7-cp313-cp313t-musllinux_1_1_x86_64.whl", hash = "sha256:d9046bb1e22a8607e1d0d7c3ff47e56e0a197c988502df4bf4d78563f3e9fe2c", size = 553411, upload-time = "2025-09-29T16:40:45.667Z" }, { url = "https://files.pythonhosted.org/packages/2c/08/949cf68d16d1f731d502968bb1486e1a4bf7ef032c38fbc2ef26a2353494/blake3-1.0.7-cp313-cp313t-win32.whl", hash = "sha256:bd2f638bcc00fc09ce985ea3c642d45940e1eda198ab1f4b90cfdecbebbc9315", size = 227049, upload-time = "2025-09-29T16:40:47.446Z" }, { url = "https://files.pythonhosted.org/packages/f2/ae/6783a5ca6235024e00a1e92ab6ca2cd855f4c61c763cf8d6d643846d110c/blake3-1.0.7-cp313-cp313t-win_amd64.whl", hash = "sha256:cb3aa1db14231c2ef0ec5acd805505ce128c39ffa510deb3384eed96fe4addcb", size = 214101, upload-time = "2025-09-29T16:40:48.656Z" }, + { url = "https://files.pythonhosted.org/packages/32/aa/99b4b6c22972b9a854f77d97846a717448a77d079e4bd38e46a3f8ecea76/blake3-1.0.7-cp314-cp314-macosx_10_12_x86_64.whl", hash = "sha256:f7db997205aa420d59fb5639346e40beafb9c09252e2ec6efedca8f230f7520c", size = 346664, upload-time = "2025-10-11T18:02:54.609Z" }, + { url = "https://files.pythonhosted.org/packages/f9/44/e98bc5450be415a335a191b154e299e335046d11fe9514d93961902b7aed/blake3-1.0.7-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:19afec6e276f3bc154541248d92b1ecb198af2ee920025f7ce521028f9a69d8b", size = 324576, upload-time = "2025-10-11T18:02:57.062Z" }, + { url = "https://files.pythonhosted.org/packages/74/25/23a39913c8424ac3df705ed71a00efe34cc1cdbd4588ed6eaf458ea9d7ef/blake3-1.0.7-cp314-cp314-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:006a11bbba65a95e88ddc069cca751c8812fd144d582715eeea512452fdbe80d", size = 370545, upload-time = "2025-10-11T18:02:59.824Z" }, + { url = "https://files.pythonhosted.org/packages/db/83/9f53a86de9a5999b043febfd84765d240014da42055aeac06d1005b20b07/blake3-1.0.7-cp314-cp314-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:7febeffdc8412fed105ca517cee641ac521fb9cfb750bf7e27a5cdf3ddf74a08", size = 374370, upload-time = "2025-10-11T18:03:01.412Z" }, + { url = "https://files.pythonhosted.org/packages/c4/4c/3290aa4fb7483975a7b3322a73692aa3cf491a77ce7ac61c216c71c6f834/blake3-1.0.7-cp314-cp314-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6c032ce7c52b71015651c0abe9fe599aa2669e6be578aa17d5f993dc93373401", size = 447808, upload-time = "2025-10-11T18:03:02.893Z" }, + { url = "https://files.pythonhosted.org/packages/66/26/92b6e15552865416aae1aedad8b9b4d8b47ca9b73d25373622b1798c05a9/blake3-1.0.7-cp314-cp314-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:5b81455f7d24b58fe26be037cc3854c28ea6eb3671ceab3b1ec0b1239aeb6fef", size = 506118, upload-time = "2025-10-11T18:03:04.51Z" }, + { url = "https://files.pythonhosted.org/packages/1b/ef/f158fc43a03fd366bc428a52a845bd0f884e518deda901c9216bd469867e/blake3-1.0.7-cp314-cp314-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:41b0127b0e7c8610054c421959dbe7140a81ac2c88fa9e099994fbaa529af3c1", size = 393239, upload-time = "2025-10-11T18:03:07.102Z" }, + { url = "https://files.pythonhosted.org/packages/10/49/2a56ce897ec7ed0e25953b3873da271ea60cc107ae02ecc6655252e554c7/blake3-1.0.7-cp314-cp314-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4755ca95b4114b629d8f3570bc661916d211d52d47f57ff70e9687377ab39cb9", size = 386267, upload-time = "2025-10-11T18:03:08.904Z" }, + { url = "https://files.pythonhosted.org/packages/d9/c4/ee4c03ea419198b91c889ef173015b5d637a390d3f7d63cb70033a7201d6/blake3-1.0.7-cp314-cp314-musllinux_1_1_aarch64.whl", hash = "sha256:8abe929cfd27b375e02e3dd7a690192fa4efecc52ef510df91ef01651ef08dc7", size = 549641, upload-time = "2025-10-11T18:03:10.64Z" }, + { url = "https://files.pythonhosted.org/packages/b2/cc/a918d6649b56fe705133e06d9958d90978aad30063d42cca4dfe23db16e9/blake3-1.0.7-cp314-cp314-musllinux_1_1_x86_64.whl", hash = "sha256:dd607eb5ad5a9b44ff62243759aa0af4085f6f43c9b01f503561a70da63e3b94", size = 553691, upload-time = "2025-10-11T18:03:12.108Z" }, + { url = "https://files.pythonhosted.org/packages/fd/9f/568546f555fd1555d4867c497e9413f67bf769d076e773b9ca9e07a0b6f6/blake3-1.0.7-cp314-cp314-win32.whl", hash = "sha256:a51684d1f346e7680f7c244c25b0e279e3b297f1938126e4ea8e32425ea269f5", size = 227552, upload-time = "2025-10-11T18:03:13.468Z" }, + { url = "https://files.pythonhosted.org/packages/97/2b/d4ef7365d9f601c8a127b5993f2662d45d2cb6d430bf3dbbb7a6f0b33639/blake3-1.0.7-cp314-cp314-win_amd64.whl", hash = "sha256:a6a481719e28e2c61aafd4273d32663365d97613341b72fcdf2f6afbd426319b", size = 214719, upload-time = "2025-10-11T18:03:14.835Z" }, + { url = "https://files.pythonhosted.org/packages/2f/53/f697cc34e382a225d163ea0c6a35c7eb4cfd1011e85db6610adfac98e522/blake3-1.0.7-cp314-cp314t-macosx_10_12_x86_64.whl", hash = "sha256:daa8933cd7db19143bd6b59f7ac4c7c7446767d7b2c3a748a4559aa483275fa2", size = 347071, upload-time = "2025-10-11T18:03:16.637Z" }, + { url = "https://files.pythonhosted.org/packages/4c/85/836dcb5c5709c2331f02ce065f7ebfaae710a6c1768cdc47ee3197645f98/blake3-1.0.7-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:24074adfffffe0fa7a7dd930cc608d6e965e70306e2c1e14d412e29ec94fa360", size = 324341, upload-time = "2025-10-11T18:03:18.073Z" }, + { url = "https://files.pythonhosted.org/packages/6d/48/36b2c25007933619ce60e24b9f360baaa77d08939284045476c8e157fe62/blake3-1.0.7-cp314-cp314t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:dce6e6f03de2674f9860cf330d8a4fcdb63a60659435e5e31d72d174fc102d8e", size = 370140, upload-time = "2025-10-11T18:03:19.582Z" }, + { url = "https://files.pythonhosted.org/packages/70/82/8a8977e5d56b9fb719033940c8ce34afc733190d34ab868a647a9af7b584/blake3-1.0.7-cp314-cp314t-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:e783f33d53a2de8d2ab845235dd53393d521b5e4a76c23d03e77e472266359d3", size = 373022, upload-time = "2025-10-11T18:03:21.143Z" }, + { url = "https://files.pythonhosted.org/packages/e2/c4/44017ba40804a528568b35a36c05187786830c4d891c5540d59a121a7cec/blake3-1.0.7-cp314-cp314t-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:782784aef18eb61f4ce8bf2b9506b7d90f0d183176b453345b221837a18041b7", size = 447243, upload-time = "2025-10-11T18:03:22.707Z" }, + { url = "https://files.pythonhosted.org/packages/78/c1/4fa20e68624784082734d31b8c9c80ad226658c024e61b9f9b6751ba0a4a/blake3-1.0.7-cp314-cp314t-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:6062122e77f40e3733cac2ef3f25e0fc7f555e352fe6f513f8404ad11dc69974", size = 506149, upload-time = "2025-10-11T18:03:24.424Z" }, + { url = "https://files.pythonhosted.org/packages/8e/63/af65466e27e7b92800a068afaee11b2fa071e34a7f5900f8e13832f18185/blake3-1.0.7-cp314-cp314t-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:6c2614bc9d69fd6067571f3bb37b3b07a6b86a56167553ad4784a3c508771f39", size = 393243, upload-time = "2025-10-11T18:03:25.872Z" }, + { url = "https://files.pythonhosted.org/packages/f3/82/54a4807a3243d0e094ada9d65687aeb40059587e374b3beb9c89f6552c9b/blake3-1.0.7-cp314-cp314t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d6df2bd56c43bdeb6699d4af0a0dd0d77537d95cb4a5dde4b39ed6e54cc725d6", size = 386318, upload-time = "2025-10-11T18:03:27.338Z" }, + { url = "https://files.pythonhosted.org/packages/42/e8/32b56531b5d9da67e476735ceaec7c3bf89310629abeeafb03c724145c88/blake3-1.0.7-cp314-cp314t-musllinux_1_1_aarch64.whl", hash = "sha256:8b635cf4350caf459ecb335b32be622068423245bda457d5bc159106eb20f912", size = 548945, upload-time = "2025-10-11T18:03:28.779Z" }, + { url = "https://files.pythonhosted.org/packages/ad/50/33b1aca708be629e285a537f1adf34dfcabc4c30b28c436361323d11f593/blake3-1.0.7-cp314-cp314t-musllinux_1_1_x86_64.whl", hash = "sha256:f96a685775f87ddf75ff495dc9698703268c66c170caca977347427ef8d52324", size = 553564, upload-time = "2025-10-11T18:03:30.247Z" }, + { url = "https://files.pythonhosted.org/packages/fe/07/8b17cbf40ccd9afeed6ae9f55018181786b30ff4e079ac8bf4ca4799e47b/blake3-1.0.7-cp314-cp314t-win32.whl", hash = "sha256:0633b7d9bad87dc7fce545042353f2e056604d993f71d1dce666a9f5edc13e05", size = 227345, upload-time = "2025-10-11T18:03:31.933Z" }, + { url = "https://files.pythonhosted.org/packages/d9/8a/ab9de8a73616350759356a483f440212bc2a22fc9aaa77cabbf06c3483db/blake3-1.0.7-cp314-cp314t-win_amd64.whl", hash = "sha256:5e356daa0089968dc1ff1d0d112e7cc1700533441d8f30ae99f835a94dc8b0f3", size = 213964, upload-time = "2025-10-11T18:03:33.919Z" }, ] [[package]] @@ -4316,7 +4340,6 @@ dependencies = [ { name = "pydantic" }, { name = "pyloudnorm" }, { name = "resampy" }, - { name = "sarvamai" }, { name = "soxr" }, { name = "wait-for2", marker = "python_full_version < '3.12'" }, ] @@ -4603,6 +4626,7 @@ requires-dist = [ { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'openai'" }, { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'playht'" }, { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'rime'" }, + { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'sarvam'" }, { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'soniox'" }, { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'websocket'" }, { name = "pipecat-ai-krisp", marker = "extra == 'krisp'", specifier = "~=0.4.0" }, @@ -4616,7 +4640,6 @@ requires-dist = [ { name = "python-dotenv", marker = "extra == 'runner'", specifier = ">=1.0.0,<2.0.0" }, { name = "pyvips", extras = ["binary"], marker = "extra == 'moondream'", specifier = "~=3.0.0" }, { name = "resampy", specifier = "~=0.4.3" }, - { name = "sarvamai", specifier = "==0.1.21" }, { name = "sarvamai", marker = "extra == 'sarvam'", specifier = "==0.1.21" }, { name = "sentry-sdk", marker = "extra == 'sentry'", specifier = ">=2.28.0,<3" }, { name = "simli-ai", marker = "extra == 'simli'", specifier = "~=0.1.10" }, @@ -4635,7 +4658,6 @@ requires-dist = [ { name = "uvicorn", marker = "extra == 'runner'", specifier = ">=0.32.0,<1.0.0" }, { name = "vllm", marker = "extra == 'ultravox'", specifier = ">=0.9.0" }, { name = "wait-for2", marker = "python_full_version < '3.12'", specifier = ">=0.4.1" }, - { name = "websockets", marker = "extra == 'sarvam'", specifier = ">=13.1,<15.0" }, { name = "websockets", marker = "extra == 'websockets-base'", specifier = ">=13.1,<16.0" }, ] provides-extras = ["aic", "anthropic", "assemblyai", "asyncai", "aws", "aws-nova-sonic", "azure", "cartesia", "cerebras", "deepseek", "daily", "deepgram", "elevenlabs", "fal", "fireworks", "fish", "gladia", "google", "grok", "groq", "gstreamer", "heygen", "hume", "inworld", "krisp", "koala", "langchain", "livekit", "lmnt", "local", "mcp", "mem0", "mistral", "mlx-whisper", "moondream", "nim", "neuphonic", "noisereduce", "openai", "openpipe", "openrouter", "perplexity", "playht", "qwen", "rime", "riva", "runner", "sambanova", "sarvam", "sentry", "local-smart-turn", "local-smart-turn-v3", "remote-smart-turn", "silero", "simli", "soniox", "soundfile", "speechmatics", "strands", "tavus", "together", "tracing", "ultravox", "webrtc", "websocket", "websockets-base", "whisper"] From 14a365aa16eb0bb512d5326824a0607e4b79d959 Mon Sep 17 00:00:00 2001 From: shreyas-sarvam Date: Thu, 30 Oct 2025 17:54:32 +0530 Subject: [PATCH 03/11] fix: Use message handler to handle responses --- src/pipecat/services/sarvam/stt.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/pipecat/services/sarvam/stt.py b/src/pipecat/services/sarvam/stt.py index 27b2106a3..c317d53e9 100644 --- a/src/pipecat/services/sarvam/stt.py +++ b/src/pipecat/services/sarvam/stt.py @@ -225,6 +225,18 @@ class SarvamSTTService(STTService): # Enter the async context manager self._socket_client = await self._websocket_context.__aenter__() + # Register event handler for incoming messages + def _message_handler(message): + """Wrapper to handle async response handler.""" + try: + loop = asyncio.get_running_loop() + loop.create_task(self._handle_response(message)) + except RuntimeError: + # Fallback if no running loop + asyncio.create_task(self._handle_response(message)) + + self._socket_client.on(EventType.MESSAGE, _message_handler) + # Start listening for messages self._listening_task = asyncio.create_task(self._socket_client.start_listening()) From 35c48a45cf5e260df4467666bb2ba27ce8194f23 Mon Sep 17 00:00:00 2001 From: shreyas-sarvam Date: Thu, 30 Oct 2025 18:51:18 +0530 Subject: [PATCH 04/11] fix: Ruff format --- src/pipecat/services/sarvam/stt.py | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/src/pipecat/services/sarvam/stt.py b/src/pipecat/services/sarvam/stt.py index c317d53e9..def8a9a62 100644 --- a/src/pipecat/services/sarvam/stt.py +++ b/src/pipecat/services/sarvam/stt.py @@ -10,6 +10,7 @@ import base64 from typing import Optional from loguru import logger +from pydantic import BaseModel from pipecat.frames.frames import ( CancelFrame, @@ -68,13 +69,22 @@ class SarvamSTTService(STTService): Provides real-time speech recognition using Sarvam's WebSocket API. """ + class InputParams(BaseModel): + """Configuration parameters for Sarvam STT service. + + Parameters: + language: Target language for transcription. Defaults to HI_IN. + """ + + language: Optional[Language] = Language.HI_IN + def __init__( self, *, api_key: str, model: str = "saarika:v2.5", - language_code: Language = Language.HI_IN, sample_rate: Optional[int] = None, + params: Optional[InputParams] = None, **kwargs, ): """Initialize the Sarvam STT service. @@ -82,17 +92,29 @@ class SarvamSTTService(STTService): Args: api_key: Sarvam API key for authentication. model: Sarvam model to use for transcription. - language_code: Language enum for transcription (e.g., Language.HI_IN, Language.KN_IN). sample_rate: Audio sample rate. Defaults to 16000 if not specified. + params: Configuration parameters for Sarvam STT service. **kwargs: Additional arguments passed to the parent STTService. """ super().__init__(sample_rate=sample_rate, **kwargs) + params = params or SarvamSTTService.InputParams() + + # Validate that saaras models don't accept language parameter + if "saaras" in model.lower(): + if params.language is not None: + raise ValueError( + f"Model '{model}' (saaras) does not accept language parameter. " + "saaras models auto-detect language." + ) + self.set_model_name(model) self._api_key = api_key self._model = model - self._language_code = language_code - self._language_string = language_to_sarvam_language(language_code) + self._language_code = params.language + self._language_string = ( + language_to_sarvam_language(params.language) if params.language else None + ) # Initialize Sarvam SDK client self._sarvam_client = AsyncSarvamAI(api_subscription_key=api_key) From e7b8da7a830803a0eadedb7a5aa9caf46e65658b Mon Sep 17 00:00:00 2001 From: shreyas-sarvam Date: Thu, 30 Oct 2025 19:01:04 +0530 Subject: [PATCH 05/11] feat: Refactor code to include language parameter, model_name and use _handle_transcription method --- src/pipecat/services/sarvam/stt.py | 44 +++++++++++++++++++----------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/src/pipecat/services/sarvam/stt.py b/src/pipecat/services/sarvam/stt.py index def8a9a62..0970b4f15 100644 --- a/src/pipecat/services/sarvam/stt.py +++ b/src/pipecat/services/sarvam/stt.py @@ -110,7 +110,6 @@ class SarvamSTTService(STTService): self.set_model_name(model) self._api_key = api_key - self._model = model self._language_code = params.language self._language_string = ( language_to_sarvam_language(params.language) if params.language else None @@ -141,15 +140,22 @@ class SarvamSTTService(STTService): """ return True - async def set_model(self, model: str): - """Set the Sarvam model and reconnect. + async def set_language(self, language: Language): + """Set the recognition language and reconnect. Args: - model: The Sarvam model name to use. + language: The language to use for speech recognition. """ - await super().set_model(model) - logger.info(f"Switching STT model to: [{model}]") - self._model = model + # saaras models do not accept a language parameter + if "saaras" in self.model_name.lower(): + raise ValueError( + f"Model '{self.model_name}' (saaras) does not accept language parameter. " + "saaras models auto-detect language." + ) + + logger.info(f"Switching STT language to: [{language}]") + self._language_code = language + self._language_string = language_to_sarvam_language(language) await self._disconnect() await self._connect() @@ -199,13 +205,13 @@ class SarvamSTTService(STTService): audio_base64 = base64.b64encode(audio).decode("utf-8") # Use appropriate method based on service type - if "saarika" in self._model.lower(): + if "saarika" in self.model_name.lower(): # STT service await self._socket_client.transcribe( audio=audio_base64, encoding="audio/wav", sample_rate=self.sample_rate ) else: - # STT-translate service + # STT-translate service - auto-detects input language and returns translated text await self._socket_client.translate( audio=audio_base64, encoding="audio/wav", sample_rate=self.sample_rate ) @@ -222,21 +228,21 @@ class SarvamSTTService(STTService): try: # Choose the appropriate service based on model - if "saarika" in self._model.lower(): + if "saarika" in self.model_name.lower(): # STT service - requires language_code self._websocket_context = self._sarvam_client.speech_to_text_streaming.connect( language_code=self._language_string, - model=self._model, + model=self.model_name, vad_signals=True, high_vad_sensitivity=True, sample_rate=str(self.sample_rate), input_audio_codec="wav", ) else: - # STT-translate service - auto-detects language + # STT-translate service - auto-detects input language and returns translated text self._websocket_context = ( self._sarvam_client.speech_to_text_translate_streaming.connect( - model=self._model, + model=self.model_name, vad_signals=True, high_vad_sensitivity=True, sample_rate=str(self.sample_rate), @@ -318,14 +324,20 @@ class SarvamSTTService(STTService): await self.stop_ttfb_metrics() transcript = message.data.transcript language_code = message.data.language_code - if language_code is None: - language_code = "hi-IN" - language = self._map_language_code_to_enum(language_code) + # Prefer language from message (auto-detected for translate models). Fallback to configured. + if language_code: + language = self._map_language_code_to_enum(language_code) + elif self._language_string: + language = self._map_language_code_to_enum(self._language_string) + else: + language = Language.HI_IN # Emit utterance end event await self._call_event_handler("on_utterance_end") if transcript and transcript.strip(): + # Record tracing for this transcription event + await self._handle_transcription(transcript, True, language) await self.push_frame( TranscriptionFrame( transcript, From 8d0e7e5e161098f2109323a420fae3b4381d1421 Mon Sep 17 00:00:00 2001 From: shreyas-sarvam Date: Thu, 30 Oct 2025 19:22:14 +0530 Subject: [PATCH 06/11] chore: Add changelog entry, update foundational examples --- CHANGELOG.md | 3 +++ examples/foundational/07z-interruptible-sarvam-http.py | 8 ++++++-- examples/foundational/07z-interruptible-sarvam.py | 9 +++++++-- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 24484687f..ab969fe52 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added supprt for Sarvam Speech-to-Text service (`SarvamSTTService`) with streaming WebSocket + support for `saarika` (STT) and `saaras` (STT-translate) models. + - Added `generation_config` parameter support to `CartesiaTTSService` and `CartesiaHttpTTSService` for Cartesia Sonic-3 models. Includes a new `GenerationConfig` class with `volume` (0.5-2.0), `speed` (0.6-1.5), diff --git a/examples/foundational/07z-interruptible-sarvam-http.py b/examples/foundational/07z-interruptible-sarvam-http.py index 29851d254..20b28c94c 100644 --- a/examples/foundational/07z-interruptible-sarvam-http.py +++ b/examples/foundational/07z-interruptible-sarvam-http.py @@ -22,8 +22,8 @@ from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport -from pipecat.services.deepgram.stt import DeepgramSTTService from pipecat.services.openai.llm import OpenAILLMService +from pipecat.services.sarvam.stt import SarvamSTTService from pipecat.services.sarvam.tts import SarvamHttpTTSService from pipecat.transcriptions.language import Language from pipecat.transports.base_transport import BaseTransport, TransportParams @@ -63,7 +63,11 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): # Create an HTTP session async with aiohttp.ClientSession() as session: - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + stt = SarvamSTTService( + api_key=os.getenv("SARVAM_API_KEY"), + model="saarika:v2.5", + params=SarvamSTTService.InputParams(language=None), + ) tts = SarvamHttpTTSService( api_key=os.getenv("SARVAM_API_KEY"), diff --git a/examples/foundational/07z-interruptible-sarvam.py b/examples/foundational/07z-interruptible-sarvam.py index 44e0b7844..9b60b2a24 100644 --- a/examples/foundational/07z-interruptible-sarvam.py +++ b/examples/foundational/07z-interruptible-sarvam.py @@ -24,8 +24,8 @@ from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport -from pipecat.services.deepgram.stt import DeepgramSTTService from pipecat.services.openai.llm import OpenAILLMService +from pipecat.services.sarvam.stt import SarvamSTTService from pipecat.services.sarvam.tts import SarvamTTSService from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams @@ -62,7 +62,12 @@ transport_params = { async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info(f"Starting bot") - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + stt = SarvamSTTService( + api_key=os.getenv("SARVAM_API_KEY"), + model="saarika:v2.5", + # Example: set Hindi; omit or change via set_language at runtime + params=SarvamSTTService.InputParams(language=None), + ) tts = SarvamTTSService( api_key=os.getenv("SARVAM_API_KEY"), From 1433df4de2ceda0c799b9dc1ddc0d50fe8c798cd Mon Sep 17 00:00:00 2001 From: shreyas-sarvam Date: Fri, 31 Oct 2025 13:23:08 +0530 Subject: [PATCH 07/11] fix: Fix language param and include suggested way of handling STT response --- .../07z-interruptible-sarvam-http.py | 1 - .../foundational/07z-interruptible-sarvam.py | 3 +- src/pipecat/services/sarvam/stt.py | 70 +++++++++++-------- 3 files changed, 42 insertions(+), 32 deletions(-) diff --git a/examples/foundational/07z-interruptible-sarvam-http.py b/examples/foundational/07z-interruptible-sarvam-http.py index 20b28c94c..0821167ef 100644 --- a/examples/foundational/07z-interruptible-sarvam-http.py +++ b/examples/foundational/07z-interruptible-sarvam-http.py @@ -66,7 +66,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): stt = SarvamSTTService( api_key=os.getenv("SARVAM_API_KEY"), model="saarika:v2.5", - params=SarvamSTTService.InputParams(language=None), ) tts = SarvamHttpTTSService( diff --git a/examples/foundational/07z-interruptible-sarvam.py b/examples/foundational/07z-interruptible-sarvam.py index 9b60b2a24..b6dd1432c 100644 --- a/examples/foundational/07z-interruptible-sarvam.py +++ b/examples/foundational/07z-interruptible-sarvam.py @@ -27,6 +27,7 @@ from pipecat.runner.utils import create_transport from pipecat.services.openai.llm import OpenAILLMService from pipecat.services.sarvam.stt import SarvamSTTService from pipecat.services.sarvam.tts import SarvamTTSService +from pipecat.transcriptions.language import Language from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams @@ -65,8 +66,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): stt = SarvamSTTService( api_key=os.getenv("SARVAM_API_KEY"), model="saarika:v2.5", - # Example: set Hindi; omit or change via set_language at runtime - params=SarvamSTTService.InputParams(language=None), ) tts = SarvamTTSService( diff --git a/src/pipecat/services/sarvam/stt.py b/src/pipecat/services/sarvam/stt.py index 0970b4f15..f2156fcd4 100644 --- a/src/pipecat/services/sarvam/stt.py +++ b/src/pipecat/services/sarvam/stt.py @@ -5,7 +5,6 @@ API. It supports real-time transcription with Voice Activity Detection (VAD) and can handle multiple audio formats for Indian language speech recognition. """ -import asyncio import base64 from typing import Optional @@ -55,7 +54,6 @@ def language_to_sarvam_language(language: Language) -> str: Language.TE_IN: "te-IN", Language.PA_IN: "pa-IN", Language.OR_IN: "od-IN", - Language.EN_US: "en-US", Language.EN_IN: "en-IN", Language.AS_IN: "as-IN", } @@ -119,7 +117,7 @@ class SarvamSTTService(STTService): self._sarvam_client = AsyncSarvamAI(api_subscription_key=api_key) self._websocket_context = None self._socket_client = None - self._listening_task = None + self._receive_task = None def language_to_service_language(self, language: Language) -> str: """Convert pipecat Language enum to Sarvam's language code. @@ -256,17 +254,13 @@ class SarvamSTTService(STTService): # Register event handler for incoming messages def _message_handler(message): """Wrapper to handle async response handler.""" - try: - loop = asyncio.get_running_loop() - loop.create_task(self._handle_response(message)) - except RuntimeError: - # Fallback if no running loop - asyncio.create_task(self._handle_response(message)) + # Use Pipecat's built-in task management + self.create_task(self._handle_message(message)) self._socket_client.on(EventType.MESSAGE, _message_handler) - # Start listening for messages - self._listening_task = asyncio.create_task(self._socket_client.start_listening()) + # Start receive task using Pipecat's task management + self._receive_task = self.create_task(self._receive_task_handler()) logger.info("Connected to Sarvam successfully") @@ -281,13 +275,9 @@ class SarvamSTTService(STTService): async def _disconnect(self): """Disconnect from Sarvam WebSocket API using SDK.""" - if self._listening_task: - self._listening_task.cancel() - try: - await self._listening_task - except asyncio.CancelledError: - pass - self._listening_task = None + if self._receive_task: + await self.cancel_task(self._receive_task) + self._receive_task = None if self._websocket_context and self._socket_client: try: @@ -300,8 +290,27 @@ class SarvamSTTService(STTService): self._socket_client = None self._websocket_context = None - async def _handle_response(self, message): - """Handle transcription response from Sarvam SDK. + async def _receive_task_handler(self): + """Handle incoming messages from Sarvam WebSocket. + + This task wraps the SDK's start_listening() method which processes + messages via the registered event handler callback. + """ + if not self._socket_client: + return + + try: + # Start listening for messages from the Sarvam SDK + # Messages will be handled via the _message_handler callback + await self._socket_client.start_listening() + except Exception as e: + logger.error(f"Error in Sarvam receive task: {e}") + await self.push_error(ErrorFrame(f"Sarvam receive task error: {e}")) + + async def _handle_message(self, message): + """Handle incoming WebSocket message from Sarvam SDK. + + Processes transcription data and VAD events from the Sarvam service. Args: message: The parsed response object from Sarvam WebSocket. @@ -351,10 +360,20 @@ class SarvamSTTService(STTService): await self.stop_processing_metrics() except Exception as e: - logger.error(f"Error handling Sarvam response: {e}") - await self.push_error(ErrorFrame(f"Failed to handle response: {e}")) + logger.error(f"Error handling Sarvam message: {e}") + await self.push_error(ErrorFrame(f"Failed to handle message: {e}")) await self.stop_all_metrics() + @traced_stt + async def _handle_transcription( + self, transcript: str, is_final: bool, language: Optional[Language] = None + ): + """Handle a transcription result with tracing. + + This method is decorated with @traced_stt for observability. + """ + pass + def _map_language_code_to_enum(self, language_code: str) -> Language: """Map Sarvam language code to pipecat Language enum.""" mapping = { @@ -378,10 +397,3 @@ class SarvamSTTService(STTService): """Start TTFB and processing metrics collection.""" await self.start_ttfb_metrics() await self.start_processing_metrics() - - @traced_stt - async def _handle_transcription( - self, transcript: str, is_final: bool, language: Optional[Language] = None - ): - """Handle a transcription result with tracing.""" - pass From 6f172bba8fd78628b8f5d225f13c47cf05a252a5 Mon Sep 17 00:00:00 2001 From: shreyas-sarvam Date: Fri, 31 Oct 2025 15:17:06 +0530 Subject: [PATCH 08/11] feat: Make input parameters accessible to users --- src/pipecat/services/sarvam/stt.py | 134 ++++++++++++++++++++++------- 1 file changed, 105 insertions(+), 29 deletions(-) diff --git a/src/pipecat/services/sarvam/stt.py b/src/pipecat/services/sarvam/stt.py index f2156fcd4..15285a19d 100644 --- a/src/pipecat/services/sarvam/stt.py +++ b/src/pipecat/services/sarvam/stt.py @@ -58,7 +58,9 @@ def language_to_sarvam_language(language: Language) -> str: Language.AS_IN: "as-IN", } - return SARVAM_LANGUAGES.get(language, "hi-IN") # Default to Hindi + return SARVAM_LANGUAGES.get( + language, "unknown" + ) # Default to unknown (Sarvam models auto-detect the language) class SarvamSTTService(STTService): @@ -71,10 +73,21 @@ class SarvamSTTService(STTService): """Configuration parameters for Sarvam STT service. Parameters: - language: Target language for transcription. Defaults to HI_IN. + language: Target language for transcription. Defaults to None (required for saarika models). + prompt: Optional prompt to guide translation style/context for STT-Translate models. + Only applicable to saaras (STT-Translate) models. Defaults to None. + sample_rate: Audio sample rate in Hz. Overrides the parent sample_rate if provided. + vad_signals: Enable VAD signals in response. Defaults to True. + high_vad_sensitivity: Enable high VAD (Voice Activity Detection) sensitivity. Defaults to False. + input_audio_codec: Audio codec/format of the input file. Defaults to "wav". """ - language: Optional[Language] = Language.HI_IN + language: Optional[Language] = None + prompt: Optional[str] = None + sample_rate: Optional[int] = None + vad_signals: bool = True + high_vad_sensitivity: bool = False + input_audio_codec: str = "wav" def __init__( self, @@ -102,16 +115,35 @@ class SarvamSTTService(STTService): if "saaras" in model.lower(): if params.language is not None: raise ValueError( - f"Model '{model}' (saaras) does not accept language parameter. " - "saaras models auto-detect language." + f"Model '{model}' does not accept language parameter. " + "STT-Translate models auto-detect language." + ) + + # Validate that saarika models don't accept prompt parameter + if "saarika" in model.lower(): + if params.prompt is not None: + raise ValueError( + f"Model '{model}' does not accept prompt parameter. " + "Prompts are only supported for STT-Translate models" ) self.set_model_name(model) self._api_key = api_key self._language_code = params.language - self._language_string = ( - language_to_sarvam_language(params.language) if params.language else None - ) + # For saarika models, default to "unknown" if language is not provided + if params.language: + self._language_string = language_to_sarvam_language(params.language) + elif "saarika" in model.lower(): + self._language_string = "unknown" + else: + self._language_string = None + self._prompt = params.prompt + + # Store connection parameters + self._sample_rate = params.sample_rate + self._vad_signals = params.vad_signals + self._high_vad_sensitivity = params.high_vad_sensitivity + self._input_audio_codec = params.input_audio_codec # Initialize Sarvam SDK client self._sarvam_client = AsyncSarvamAI(api_subscription_key=api_key) @@ -157,6 +189,29 @@ class SarvamSTTService(STTService): await self._disconnect() await self._connect() + async def set_prompt(self, prompt: Optional[str]): + """Set the translation prompt and reconnect. + + Args: + prompt: Prompt text to guide translation style/context. + Pass None to clear/disable prompt. + Only applicable to STT-Translate models, not STT models. + """ + # saarika models do not accept prompt parameter + if "saarika" in self.model_name.lower(): + if prompt is not None: + raise ValueError( + f"Model '{self.model_name}' does not accept prompt parameter. " + "Prompts are only supported for STT-Translate models." + ) + # If prompt is None and it's saarika, just silently return (no-op) + return + + logger.info("Updating STT-Translate prompt.") + self._prompt = prompt + await self._disconnect() + await self._connect() + async def start(self, frame: StartFrame): """Start the Sarvam STT service. @@ -202,17 +257,29 @@ class SarvamSTTService(STTService): # Convert audio bytes to base64 for Sarvam API audio_base64 = base64.b64encode(audio).decode("utf-8") + # Convert input_audio_codec to encoding format (prepend "audio/" if needed) + encoding = ( + self._input_audio_codec + if self._input_audio_codec.startswith("audio/") + else f"audio/{self._input_audio_codec}" + ) + + # Build method arguments + method_kwargs = { + "audio": audio_base64, + "encoding": encoding, + } + # Only include sample_rate if provided in params + if self._sample_rate is not None: + method_kwargs["sample_rate"] = self._sample_rate + # Use appropriate method based on service type if "saarika" in self.model_name.lower(): # STT service - await self._socket_client.transcribe( - audio=audio_base64, encoding="audio/wav", sample_rate=self.sample_rate - ) + await self._socket_client.transcribe(**method_kwargs) else: - # STT-translate service - auto-detects input language and returns translated text - await self._socket_client.translate( - audio=audio_base64, encoding="audio/wav", sample_rate=self.sample_rate - ) + # STT-Translate service - auto-detects input language and returns translated text + await self._socket_client.translate(**method_kwargs) except Exception as e: logger.error(f"Error sending audio to Sarvam: {e}") @@ -225,32 +292,41 @@ class SarvamSTTService(STTService): logger.debug("Connecting to Sarvam") try: + # Convert boolean parameters to string for SDK + vad_signals_str = "true" if self._vad_signals else "false" + high_vad_sensitivity_str = "true" if self._high_vad_sensitivity else "false" + + # Build common connection parameters + connect_kwargs = { + "model": self.model_name, + "vad_signals": vad_signals_str, + "high_vad_sensitivity": high_vad_sensitivity_str, + "input_audio_codec": self._input_audio_codec, + } + # Only include sample_rate if provided in params + if self._sample_rate is not None: + connect_kwargs["sample_rate"] = str(self._sample_rate) + # Choose the appropriate service based on model if "saarika" in self.model_name.lower(): # STT service - requires language_code + connect_kwargs["language_code"] = self._language_string self._websocket_context = self._sarvam_client.speech_to_text_streaming.connect( - language_code=self._language_string, - model=self.model_name, - vad_signals=True, - high_vad_sensitivity=True, - sample_rate=str(self.sample_rate), - input_audio_codec="wav", + **connect_kwargs ) else: - # STT-translate service - auto-detects input language and returns translated text + # STT-Translate service - auto-detects input language and returns translated text self._websocket_context = ( - self._sarvam_client.speech_to_text_translate_streaming.connect( - model=self.model_name, - vad_signals=True, - high_vad_sensitivity=True, - sample_rate=str(self.sample_rate), - input_audio_codec="wav", - ) + self._sarvam_client.speech_to_text_translate_streaming.connect(**connect_kwargs) ) # Enter the async context manager self._socket_client = await self._websocket_context.__aenter__() + # Set prompt if provided (only for STT-Translate models, after connection) + if self._prompt is not None and "saaras" in self.model_name.lower(): + await self._socket_client.set_prompt(self._prompt) + # Register event handler for incoming messages def _message_handler(message): """Wrapper to handle async response handler.""" From 2d03e5110995f980e347749a4f3d21eb1d21a3fc Mon Sep 17 00:00:00 2001 From: shreyas-sarvam Date: Fri, 31 Oct 2025 17:31:59 +0530 Subject: [PATCH 09/11] fix: Remove unused imports, use sample_rate from base class --- examples/foundational/07z-interruptible-sarvam.py | 1 - src/pipecat/services/sarvam/stt.py | 15 ++++++--------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/examples/foundational/07z-interruptible-sarvam.py b/examples/foundational/07z-interruptible-sarvam.py index b6dd1432c..3123df31d 100644 --- a/examples/foundational/07z-interruptible-sarvam.py +++ b/examples/foundational/07z-interruptible-sarvam.py @@ -27,7 +27,6 @@ from pipecat.runner.utils import create_transport from pipecat.services.openai.llm import OpenAILLMService from pipecat.services.sarvam.stt import SarvamSTTService from pipecat.services.sarvam.tts import SarvamTTSService -from pipecat.transcriptions.language import Language from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams diff --git a/src/pipecat/services/sarvam/stt.py b/src/pipecat/services/sarvam/stt.py index 15285a19d..ae1f38c2e 100644 --- a/src/pipecat/services/sarvam/stt.py +++ b/src/pipecat/services/sarvam/stt.py @@ -107,8 +107,6 @@ class SarvamSTTService(STTService): params: Configuration parameters for Sarvam STT service. **kwargs: Additional arguments passed to the parent STTService. """ - super().__init__(sample_rate=sample_rate, **kwargs) - params = params or SarvamSTTService.InputParams() # Validate that saaras models don't accept language parameter @@ -127,6 +125,10 @@ class SarvamSTTService(STTService): "Prompts are only supported for STT-Translate models" ) + # Use sample_rate from params if provided, otherwise use the parameter + sample_rate = params.sample_rate if params.sample_rate is not None else sample_rate + super().__init__(sample_rate=sample_rate, **kwargs) + self.set_model_name(model) self._api_key = api_key self._language_code = params.language @@ -140,7 +142,6 @@ class SarvamSTTService(STTService): self._prompt = params.prompt # Store connection parameters - self._sample_rate = params.sample_rate self._vad_signals = params.vad_signals self._high_vad_sensitivity = params.high_vad_sensitivity self._input_audio_codec = params.input_audio_codec @@ -268,10 +269,8 @@ class SarvamSTTService(STTService): method_kwargs = { "audio": audio_base64, "encoding": encoding, + "sample_rate": self.sample_rate, } - # Only include sample_rate if provided in params - if self._sample_rate is not None: - method_kwargs["sample_rate"] = self._sample_rate # Use appropriate method based on service type if "saarika" in self.model_name.lower(): @@ -302,10 +301,8 @@ class SarvamSTTService(STTService): "vad_signals": vad_signals_str, "high_vad_sensitivity": high_vad_sensitivity_str, "input_audio_codec": self._input_audio_codec, + "sample_rate": str(self.sample_rate), } - # Only include sample_rate if provided in params - if self._sample_rate is not None: - connect_kwargs["sample_rate"] = str(self._sample_rate) # Choose the appropriate service based on model if "saarika" in self.model_name.lower(): From 6c5d84ca4c64de180f1813e9af680414ee3426b9 Mon Sep 17 00:00:00 2001 From: shreyas-sarvam Date: Fri, 31 Oct 2025 23:03:25 +0530 Subject: [PATCH 10/11] fix: Fixes for sample_rate being passed by PipelineParams --- src/pipecat/services/sarvam/stt.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/pipecat/services/sarvam/stt.py b/src/pipecat/services/sarvam/stt.py index ae1f38c2e..ad057934e 100644 --- a/src/pipecat/services/sarvam/stt.py +++ b/src/pipecat/services/sarvam/stt.py @@ -76,7 +76,6 @@ class SarvamSTTService(STTService): language: Target language for transcription. Defaults to None (required for saarika models). prompt: Optional prompt to guide translation style/context for STT-Translate models. Only applicable to saaras (STT-Translate) models. Defaults to None. - sample_rate: Audio sample rate in Hz. Overrides the parent sample_rate if provided. vad_signals: Enable VAD signals in response. Defaults to True. high_vad_sensitivity: Enable high VAD (Voice Activity Detection) sensitivity. Defaults to False. input_audio_codec: Audio codec/format of the input file. Defaults to "wav". @@ -84,7 +83,6 @@ class SarvamSTTService(STTService): language: Optional[Language] = None prompt: Optional[str] = None - sample_rate: Optional[int] = None vad_signals: bool = True high_vad_sensitivity: bool = False input_audio_codec: str = "wav" @@ -125,8 +123,6 @@ class SarvamSTTService(STTService): "Prompts are only supported for STT-Translate models" ) - # Use sample_rate from params if provided, otherwise use the parameter - sample_rate = params.sample_rate if params.sample_rate is not None else sample_rate super().__init__(sample_rate=sample_rate, **kwargs) self.set_model_name(model) From d905b21f72924a41325b0cf236bd960c09367917 Mon Sep 17 00:00:00 2001 From: shreyas-sarvam Date: Fri, 31 Oct 2025 23:07:48 +0530 Subject: [PATCH 11/11] fix: Pass input_audio_codec as an __init__ parameter --- src/pipecat/services/sarvam/stt.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/pipecat/services/sarvam/stt.py b/src/pipecat/services/sarvam/stt.py index ad057934e..27816163d 100644 --- a/src/pipecat/services/sarvam/stt.py +++ b/src/pipecat/services/sarvam/stt.py @@ -78,14 +78,12 @@ class SarvamSTTService(STTService): Only applicable to saaras (STT-Translate) models. Defaults to None. vad_signals: Enable VAD signals in response. Defaults to True. high_vad_sensitivity: Enable high VAD (Voice Activity Detection) sensitivity. Defaults to False. - input_audio_codec: Audio codec/format of the input file. Defaults to "wav". """ language: Optional[Language] = None prompt: Optional[str] = None vad_signals: bool = True high_vad_sensitivity: bool = False - input_audio_codec: str = "wav" def __init__( self, @@ -93,6 +91,7 @@ class SarvamSTTService(STTService): api_key: str, model: str = "saarika:v2.5", sample_rate: Optional[int] = None, + input_audio_codec: str = "wav", params: Optional[InputParams] = None, **kwargs, ): @@ -102,6 +101,7 @@ class SarvamSTTService(STTService): api_key: Sarvam API key for authentication. model: Sarvam model to use for transcription. sample_rate: Audio sample rate. Defaults to 16000 if not specified. + input_audio_codec: Audio codec/format of the input file. Defaults to "wav". params: Configuration parameters for Sarvam STT service. **kwargs: Additional arguments passed to the parent STTService. """ @@ -140,7 +140,7 @@ class SarvamSTTService(STTService): # Store connection parameters self._vad_signals = params.vad_signals self._high_vad_sensitivity = params.high_vad_sensitivity - self._input_audio_codec = params.input_audio_codec + self._input_audio_codec = input_audio_codec # Initialize Sarvam SDK client self._sarvam_client = AsyncSarvamAI(api_subscription_key=api_key)