diff --git a/dot-env.template b/dot-env.template index e31681235..331eba79c 100644 --- a/dot-env.template +++ b/dot-env.template @@ -29,6 +29,9 @@ DAILY_SAMPLE_ROOM_URL=https://... ELEVENLABS_API_KEY=... ELEVENLABS_VOICE_ID=... +# Neuphonic +NEUPHONIC_API_TOKEN=... + # Fal FAL_KEY=... diff --git a/pyproject.toml b/pyproject.toml index dd58b1bfe..b5a870941 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,7 @@ aws = [ "boto3~=1.35.99" ] azure = [ "azure-cognitiveservices-speech~=1.42.0"] canonical = [ "aiofiles~=24.1.0" ] cartesia = [ "cartesia~=1.3.1", "websockets~=13.1" ] +neuphonic = [ "pyneuphonic~=1.5.10", "websockets>=12.0,<14.0" ] cerebras = [] deepseek = [] daily = [ "daily-python~=0.15.0" ] diff --git a/src/pipecat/services/neuphonic.py b/src/pipecat/services/neuphonic.py new file mode 100644 index 000000000..b14c325f8 --- /dev/null +++ b/src/pipecat/services/neuphonic.py @@ -0,0 +1,364 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import base64 +import json +from typing import Any, AsyncGenerator, Mapping, Optional + +from loguru import logger +from pydantic import BaseModel + +from pipecat.frames.frames import ( + BotStoppedSpeakingFrame, + CancelFrame, + EndFrame, + ErrorFrame, + Frame, + LLMFullResponseEndFrame, + StartFrame, + StartInterruptionFrame, + TTSAudioRawFrame, + TTSSpeakFrame, + TTSStartedFrame, + TTSStoppedFrame, +) +from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.ai_services import TTSService, WordTTSService +from pipecat.services.websocket_service import WebsocketService +from pipecat.transcriptions.language import Language + +# See .env.example for Neuphonic configuration needed +try: + import websockets + from pyneuphonic import Neuphonic, TTSConfig +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error( + "In order to use Neuphonic, you need to `pip install pipecat-ai[neuphonic]`. Also, set `NEUPHONIC_API_TOKEN` environment variable." + ) + raise Exception(f"Missing module: {e}") + +# Models that support language codes +NEUPHONIC_MULTILINGUAL_MODELS = { + "neu_fast", + "neu_hq", +} + + +def language_to_neuphonic_lang_code(language: Language) -> Optional[str]: + BASE_LANGUAGES = { + Language.DE: "de", + Language.EN: "en", + Language.ES: "es", + Language.NL: "nl", + Language.AR: "ar", + } + + result = BASE_LANGUAGES.get(language) + + # If not found in base languages, try to find the base language from a variant + if not result: + # Convert enum value to string and get the base language part (e.g. es-ES -> es) + lang_str = str(language.value) + base_code = lang_str.split("-")[0].lower() + # Look up the base code in our supported languages + result = base_code if base_code in BASE_LANGUAGES.values() else None + + return result + + +class NeuphonicTTSService(WordTTSService, WebsocketService): + class InputParams(BaseModel): + language: Optional[Language] = Language.EN + speed: Optional[float] = 1.0 + + def __init__( + self, + *, + api_key: str, + voice_id: Optional[str] = None, + model: str = "neu_hq", + url: str = "wss://api.neuphonic.com", + sample_rate: Optional[int] = 22050, + encoding: str = "pcm_linear", + params: InputParams = InputParams(), + **kwargs, + ): + WordTTSService.__init__( + self, + aggregate_sentences=True, + push_text_frames=False, + push_stop_frames=True, + stop_frame_timeout_s=2.0, + sample_rate=sample_rate, + **kwargs, + ) + WebsocketService.__init__(self) + + self._api_key = api_key + self._url = url + self._settings = { + "lang_code": self.language_to_service_language(params.language), + "speed": params.speed, + "encoding": encoding, + "model": model, + "sampling_rate": sample_rate, + } + self.set_model_name(model) + self.set_voice(voice_id) + + # Indicates if we have sent TTSStartedFrame. It will reset to False when + # there's an interruption or TTSStoppedFrame. + self._started = False + self._cumulative_time = 0 + + def can_generate_metrics(self) -> bool: + return True + + def language_to_service_language(self, language: Language) -> Optional[str]: + return language_to_neuphonic_lang_code(language) + + async def set_model(self, model: str): + await super().set_model(model) + logger.info(f"Switching TTS model to: [{model}]") + await self._disconnect() + await self._connect() + + async def _update_settings(self, settings: Mapping[str, Any]): + if "voice_id" in settings: + self.set_voice(settings["voice_id"]) + + await super()._update_settings(settings) + await self._disconnect() + await self._connect() + logger.info(f"Switching TTS to settings: [{self._settings}]") + + async def start(self, frame: StartFrame): + await super().start(frame) + await self._connect() + + async def stop(self, frame: EndFrame): + await super().stop(frame) + await self._disconnect() + + async def cancel(self, frame: CancelFrame): + await super().cancel(frame) + await self._disconnect() + + async def flush_audio(self): + if self._websocket: + msg = {"text": ""} + await self._websocket.send(json.dumps(msg)) + + async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): + await super().push_frame(frame, direction) + if isinstance(frame, (TTSStoppedFrame, StartInterruptionFrame)): + self._started = False + if isinstance(frame, TTSStoppedFrame): + await self.add_word_timestamps([("LLMFullResponseEndFrame", 0), ("Reset", 0)]) + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + # If we received a TTSSpeakFrame and the LLM response included text (it + # might be that it's only a function calling response) we pause + # processing more frames until we receive a BotStoppedSpeakingFrame. + if isinstance(frame, TTSSpeakFrame): + await self.pause_processing_frames() + elif isinstance(frame, LLMFullResponseEndFrame) and self._started: + await self.pause_processing_frames() + elif isinstance(frame, BotStoppedSpeakingFrame): + await self.resume_processing_frames() + + async def _connect(self): + await self._connect_websocket() + + self._receive_task = self.create_task(self._receive_task_handler(self.push_error)) + self._keepalive_task = self.create_task(self._keepalive_task_handler()) + + async def _disconnect(self): + if self._receive_task: + await self.cancel_task(self._receive_task) + self._receive_task = None + + if self._keepalive_task: + await self.cancel_task(self._keepalive_task) + self._keepalive_task = None + + await self._disconnect_websocket() + + async def _connect_websocket(self): + try: + logger.debug("Connecting to Neuphonic") + + tts_config = { + **self._settings, + "voice_id": self._voice_id, + "model": self.model_name, + } + + query_params = [f"api_key={self._api_key}"] + for key, value in tts_config.items(): + if value is not None: + query_params.append(f"{key}={value}") + + url = f"{self._url}/speak/{self._settings['lang_code']}?{'&'.join(query_params)}" + + self._websocket = await websockets.connect(url) + + except Exception as e: + logger.error(f"{self} initialization error: {e}") + self._websocket = None + + async def _disconnect_websocket(self): + try: + await self.stop_all_metrics() + + if self._websocket: + logger.debug("Disconnecting from Neuphonic") + await self._websocket.close() + self._websocket = None + + self._started = False + except Exception as e: + logger.error(f"{self} error closing websocket: {e}") + + async def _receive_messages(self): + async for message in self._websocket: + if isinstance(message, str): + msg = json.loads(message) + if msg.get("data", {}).get("audio") is not None: + await self.stop_ttfb_metrics() + self.start_word_timestamps() + + audio = base64.b64decode(msg["data"]["audio"]) + frame = TTSAudioRawFrame(audio, self.sample_rate, 1) + await self.push_frame(frame) + + async def _keepalive_task_handler(self): + while True: + await asyncio.sleep(10) + await self._send_text("") + + async def _send_text(self, text: str): + if self._websocket: + msg = {"text": text} + logger.debug(f"Sending text to websocket: {msg}") + await self._websocket.send(json.dumps(msg)) + + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: + logger.debug(f"Generating TTS: [{text}]") + + try: + if not self._websocket: + await self._connect() + + try: + if not self._started: + await self.start_ttfb_metrics() + yield TTSStartedFrame() + self._started = True + self._cumulative_time = 0 + + await self._send_text(text) + await self.start_tts_usage_metrics(text) + except Exception as e: + logger.error(f"{self} error sending message: {e}") + yield TTSStoppedFrame() + await self._disconnect() + await self._connect() + return + yield None + except Exception as e: + logger.error(f"{self} exception: {e}") + + +class NeuphonicHttpTTSService(TTSService): + """Neuphonic Text-to-Speech service using HTTP streaming. + + Args: + api_key: Neuphonic API key + voice_id: ID of the voice to use + model: Neuphonic model to use (default: "neu_hq") + url: Base URL for the Neuphonic API (default: "https://api.neuphonic.com") + sample_rate: Sample rate for audio output (default: 22050Hz) + encoding: Audio encoding format (default: "pcm_linear") + params: Additional parameters for TTS generation including language and speed + **kwargs: Additional keyword arguments passed to the parent class + """ + + class InputParams(BaseModel): + language: Optional[Language] = Language.EN + speed: Optional[float] = 1.0 + + def __init__( + self, + *, + api_key: str, + voice_id: Optional[str] = None, + model: str = "neu_hq", + url: str = "https://api.neuphonic.com", + sample_rate: Optional[int] = 22050, + encoding: str = "pcm_linear", + params: InputParams = InputParams(), + **kwargs, + ): + super().__init__(sample_rate=sample_rate, **kwargs) + + self._api_key = api_key + self._url = url + self._settings = { + "lang_code": self.language_to_service_language(params.language), + "speed": params.speed, + "encoding": encoding, + "model": model, + "sampling_rate": sample_rate, + } + self.set_model_name(model) + self.set_voice(voice_id) + + def can_generate_metrics(self) -> bool: + return True + + async def start(self, frame: StartFrame): + await super().start(frame) + + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: + """Generate speech from text using Neuphonic streaming API. + + Args: + text: The text to convert to speech + Yields: + Frames containing audio data and status information + """ + logger.debug(f"Generating TTS: [{text}]") + + client = Neuphonic(api_key=self._api_key, base_url=self._url.replace("https://", "")) + + sse = client.tts.AsyncSSEClient() + + try: + await self.start_ttfb_metrics() + response = sse.send( + text, TTSConfig(**self._settings, model=self.model_name, voice_id=self._voice_id) + ) + + await self.start_tts_usage_metrics(text) + yield TTSStartedFrame() + + async for message in response: + if response.status_code != 200: + logger.error(f"{self} error: {message.errors}") + yield ErrorFrame(error=f"Neuphonic API error: {message.errors}") + + await self.stop_ttfb_metrics() + yield TTSAudioRawFrame(message.data.audio, self.sample_rate, 1) + except Exception as e: + logger.error(f"Error in run_tts: {e}") + yield ErrorFrame(error=str(e)) + finally: + yield TTSStoppedFrame()