diff --git a/dot-env.template b/dot-env.template index e31681235..2da20fc0b 100644 --- a/dot-env.template +++ b/dot-env.template @@ -29,6 +29,9 @@ DAILY_SAMPLE_ROOM_URL=https://... ELEVENLABS_API_KEY=... ELEVENLABS_VOICE_ID=... +# Neuphonic +NEUPHONIC_API_KEY=... + # Fal FAL_KEY=... diff --git a/examples/foundational/07u-interruptible-neuphonic-http.py b/examples/foundational/07u-interruptible-neuphonic-http.py new file mode 100644 index 000000000..8377c3baf --- /dev/null +++ b/examples/foundational/07u-interruptible-neuphonic-http.py @@ -0,0 +1,102 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import os +import sys + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from runner import configure + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.services.neuphonic import NeuphonicHttpTTSService +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, token) = await configure(session) + + transport = DailyTransport( + room_url, + token, + "Respond bot", + DailyParams( + audio_out_enabled=True, + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + ) + + tts = NeuphonicHttpTTSService( + api_key=os.getenv("NEUPHONIC_API_KEY"), + voice_id="fc854436-2dac-4d21-aa69-ae17b54e98eb", # Emily + ) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + allow_interruptions=True, + enable_metrics=True, + enable_usage_metrics=True, + report_only_initial_ttfb=True, + ), + ) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + await transport.capture_participant_transcription(participant["id"]) + # Kick off the conversation. + messages.append({"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @transport.event_handler("on_participant_left") + async def on_participant_left(transport, participant, reason): + await task.cancel() + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/foundational/07u-interruptible-neuphonic.py b/examples/foundational/07u-interruptible-neuphonic.py new file mode 100644 index 000000000..b5d666511 --- /dev/null +++ b/examples/foundational/07u-interruptible-neuphonic.py @@ -0,0 +1,102 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import os +import sys + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from runner import configure + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.services.neuphonic import NeuphonicTTSService +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, token) = await configure(session) + + transport = DailyTransport( + room_url, + token, + "Respond bot", + DailyParams( + audio_out_enabled=True, + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + ) + + tts = NeuphonicTTSService( + api_key=os.getenv("NEUPHONIC_API_KEY"), + voice_id="fc854436-2dac-4d21-aa69-ae17b54e98eb", # Emily + ) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + allow_interruptions=True, + enable_metrics=True, + enable_usage_metrics=True, + report_only_initial_ttfb=True, + ), + ) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + await transport.capture_participant_transcription(participant["id"]) + # Kick off the conversation. + messages.append({"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @transport.event_handler("on_participant_left") + async def on_participant_left(transport, participant, reason): + await task.cancel() + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml index eab7b60f2..586c5c026 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,7 @@ aws = [ "boto3~=1.35.99" ] azure = [ "azure-cognitiveservices-speech~=1.42.0"] canonical = [ "aiofiles~=24.1.0" ] cartesia = [ "cartesia~=1.3.1", "websockets~=13.1" ] +neuphonic = [ "pyneuphonic~=1.5.13", "websockets~=13.1" ] cerebras = [] deepseek = [] daily = [ "daily-python~=0.15.0" ] diff --git a/src/pipecat/services/neuphonic.py b/src/pipecat/services/neuphonic.py new file mode 100644 index 000000000..a2ea98f42 --- /dev/null +++ b/src/pipecat/services/neuphonic.py @@ -0,0 +1,339 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import base64 +import json +from typing import Any, AsyncGenerator, Mapping, Optional + +from loguru import logger +from pydantic import BaseModel + +from pipecat.frames.frames import ( + BotStoppedSpeakingFrame, + CancelFrame, + EndFrame, + ErrorFrame, + Frame, + LLMFullResponseEndFrame, + StartFrame, + StartInterruptionFrame, + TTSAudioRawFrame, + TTSSpeakFrame, + TTSStartedFrame, + TTSStoppedFrame, +) +from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.ai_services import InterruptibleTTSService, TTSService +from pipecat.transcriptions.language import Language + +# See .env.example for Neuphonic configuration needed +try: + import websockets + from pyneuphonic import Neuphonic, TTSConfig +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error( + "In order to use Neuphonic, you need to `pip install pipecat-ai[neuphonic]`. Also, set `NEUPHONIC_API_KEY` environment variable." + ) + raise Exception(f"Missing module: {e}") + + +def language_to_neuphonic_lang_code(language: Language) -> Optional[str]: + BASE_LANGUAGES = { + Language.DE: "de", + Language.EN: "en", + Language.ES: "es", + Language.NL: "nl", + Language.AR: "ar", + } + + result = BASE_LANGUAGES.get(language) + + # If not found in base languages, try to find the base language from a variant + if not result: + # Convert enum value to string and get the base language part (e.g. es-ES -> es) + lang_str = str(language.value) + base_code = lang_str.split("-")[0].lower() + # Look up the base code in our supported languages + result = base_code if base_code in BASE_LANGUAGES.values() else None + + return result + + +class NeuphonicTTSService(InterruptibleTTSService): + class InputParams(BaseModel): + language: Optional[Language] = Language.EN + speed: Optional[float] = 1.0 + + def __init__( + self, + *, + api_key: str, + voice_id: Optional[str] = None, + url: str = "wss://api.neuphonic.com", + sample_rate: Optional[int] = 22050, + encoding: str = "pcm_linear", + params: InputParams = InputParams(), + **kwargs, + ): + super().__init__( + aggregate_sentences=True, + push_text_frames=False, + push_stop_frames=True, + stop_frame_timeout_s=2.0, + sample_rate=sample_rate, + **kwargs, + ) + + self._api_key = api_key + self._url = url + self._settings = { + "lang_code": self.language_to_service_language(params.language), + "speed": params.speed, + "encoding": encoding, + "sampling_rate": sample_rate, + } + self.set_voice(voice_id) + + # Indicates if we have sent TTSStartedFrame. It will reset to False when + # there's an interruption or TTSStoppedFrame. + self._started = False + self._cumulative_time = 0 + + def can_generate_metrics(self) -> bool: + return True + + def language_to_service_language(self, language: Language) -> Optional[str]: + return language_to_neuphonic_lang_code(language) + + async def _update_settings(self, settings: Mapping[str, Any]): + if "voice_id" in settings: + self.set_voice(settings["voice_id"]) + + await super()._update_settings(settings) + await self._disconnect() + await self._connect() + logger.info(f"Switching TTS to settings: [{self._settings}]") + + async def start(self, frame: StartFrame): + await super().start(frame) + await self._connect() + + async def stop(self, frame: EndFrame): + await super().stop(frame) + await self._disconnect() + + async def cancel(self, frame: CancelFrame): + await super().cancel(frame) + await self._disconnect() + + async def flush_audio(self): + if self._websocket: + msg = {"text": ""} + await self._websocket.send(json.dumps(msg)) + + async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): + await super().push_frame(frame, direction) + if isinstance(frame, (TTSStoppedFrame, StartInterruptionFrame)): + self._started = False + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + # If we received a TTSSpeakFrame and the LLM response included text (it + # might be that it's only a function calling response) we pause + # processing more frames until we receive a BotStoppedSpeakingFrame. + if isinstance(frame, TTSSpeakFrame): + await self.pause_processing_frames() + elif isinstance(frame, LLMFullResponseEndFrame) and self._started: + await self.pause_processing_frames() + elif isinstance(frame, BotStoppedSpeakingFrame): + await self.resume_processing_frames() + + async def _connect(self): + await self._connect_websocket() + + self._receive_task = self.create_task(self._receive_task_handler(self.push_error)) + self._keepalive_task = self.create_task(self._keepalive_task_handler()) + + async def _disconnect(self): + if self._receive_task: + await self.cancel_task(self._receive_task) + self._receive_task = None + + if self._keepalive_task: + await self.cancel_task(self._keepalive_task) + self._keepalive_task = None + + await self._disconnect_websocket() + + async def _connect_websocket(self): + try: + logger.debug("Connecting to Neuphonic") + + tts_config = { + **self._settings, + "voice_id": self._voice_id, + } + + query_params = [f"api_key={self._api_key}"] + for key, value in tts_config.items(): + if value is not None: + query_params.append(f"{key}={value}") + + url = f"{self._url}/speak/{self._settings['lang_code']}?{'&'.join(query_params)}" + + self._websocket = await websockets.connect(url) + + except Exception as e: + logger.error(f"{self} initialization error: {e}") + self._websocket = None + + async def _disconnect_websocket(self): + try: + await self.stop_all_metrics() + + if self._websocket: + logger.debug("Disconnecting from Neuphonic") + await self._websocket.close() + self._websocket = None + + self._started = False + except Exception as e: + logger.error(f"{self} error closing websocket: {e}") + + async def _receive_messages(self): + async for message in self._websocket: + if isinstance(message, str): + msg = json.loads(message) + if msg.get("data", {}).get("audio") is not None: + await self.stop_ttfb_metrics() + + audio = base64.b64decode(msg["data"]["audio"]) + frame = TTSAudioRawFrame(audio, self.sample_rate, 1) + await self.push_frame(frame) + + async def _keepalive_task_handler(self): + while True: + await asyncio.sleep(10) + await self._send_text("") + + async def _send_text(self, text: str): + if self._websocket: + msg = {"text": text} + logger.debug(f"Sending text to websocket: {msg}") + await self._websocket.send(json.dumps(msg)) + + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: + logger.debug(f"Generating TTS: [{text}]") + + try: + if not self._websocket: + await self._connect() + + try: + if not self._started: + await self.start_ttfb_metrics() + yield TTSStartedFrame() + self._started = True + self._cumulative_time = 0 + + await self._send_text(text) + await self.start_tts_usage_metrics(text) + except Exception as e: + logger.error(f"{self} error sending message: {e}") + yield TTSStoppedFrame() + await self._disconnect() + await self._connect() + return + yield None + except Exception as e: + logger.error(f"{self} exception: {e}") + + +class NeuphonicHttpTTSService(TTSService): + """Neuphonic Text-to-Speech service using HTTP streaming. + + Args: + api_key: Neuphonic API key + voice_id: ID of the voice to use + url: Base URL for the Neuphonic API (default: "https://api.neuphonic.com") + sample_rate: Sample rate for audio output (default: 22050Hz) + encoding: Audio encoding format (default: "pcm_linear") + params: Additional parameters for TTS generation including language and speed + **kwargs: Additional keyword arguments passed to the parent class + """ + + class InputParams(BaseModel): + language: Optional[Language] = Language.EN + speed: Optional[float] = 1.0 + + def __init__( + self, + *, + api_key: str, + voice_id: Optional[str] = None, + url: str = "https://api.neuphonic.com", + sample_rate: Optional[int] = 22050, + encoding: str = "pcm_linear", + params: InputParams = InputParams(), + **kwargs, + ): + super().__init__(sample_rate=sample_rate, **kwargs) + + self._api_key = api_key + self._url = url + self._settings = { + "lang_code": self.language_to_service_language(params.language), + "speed": params.speed, + "encoding": encoding, + "sampling_rate": sample_rate, + } + self.set_voice(voice_id) + + def can_generate_metrics(self) -> bool: + return True + + async def start(self, frame: StartFrame): + await super().start(frame) + + async def flush_audio(self): + pass + + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: + """Generate speech from text using Neuphonic streaming API. + + Args: + text: The text to convert to speech + Yields: + Frames containing audio data and status information + """ + logger.debug(f"Generating TTS: [{text}]") + + client = Neuphonic(api_key=self._api_key, base_url=self._url.replace("https://", "")) + + sse = client.tts.AsyncSSEClient() + + try: + await self.start_ttfb_metrics() + response = sse.send(text, TTSConfig(**self._settings, voice_id=self._voice_id)) + + await self.start_tts_usage_metrics(text) + yield TTSStartedFrame() + + async for message in response: + if message.status_code != 200: + logger.error(f"{self} error: {message.errors}") + yield ErrorFrame(error=f"Neuphonic API error: {message.errors}") + + await self.stop_ttfb_metrics() + yield TTSAudioRawFrame(message.data.audio, self.sample_rate, 1) + except Exception as e: + logger.error(f"Error in run_tts: {e}") + yield ErrorFrame(error=str(e)) + finally: + yield TTSStoppedFrame()