diff --git a/CHANGELOG.md b/CHANGELOG.md index 1bee0ca5c..c5f5d0c8c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added `ElevenLabsHttpTTSService` and the + `07d-interruptible-elevenlabs-http.py` foundational example. + - Introduced pipeline frame observers. Observers can view all the frames that go through the pipeline without the need to inject processors in the pipeline. This can be useful, for example, to implement frame loggers or diff --git a/src/pipecat/services/elevenlabs.py b/src/pipecat/services/elevenlabs.py index c1d326dfc..188c5a9c9 100644 --- a/src/pipecat/services/elevenlabs.py +++ b/src/pipecat/services/elevenlabs.py @@ -7,8 +7,9 @@ import asyncio import base64 import json -from typing import Any, AsyncGenerator, Dict, List, Literal, Mapping, Optional, Tuple +from typing import Any, AsyncGenerator, Dict, List, Literal, Mapping, Optional, Tuple, Union +import aiohttp from loguru import logger from pydantic import BaseModel, model_validator @@ -16,6 +17,7 @@ from pipecat.frames.frames import ( BotStoppedSpeakingFrame, CancelFrame, EndFrame, + ErrorFrame, Frame, LLMFullResponseEndFrame, StartFrame, @@ -26,13 +28,15 @@ from pipecat.frames.frames import ( TTSStoppedFrame, ) from pipecat.processors.frame_processor import FrameDirection -from pipecat.services.ai_services import WordTTSService +from pipecat.services.ai_services import TTSService, WordTTSService from pipecat.services.websocket_service import WebsocketService from pipecat.transcriptions.language import Language # See .env.example for ElevenLabs configuration needed try: import websockets + from elevenlabs import VoiceSettings + from elevenlabs.client import ElevenLabs except ModuleNotFoundError as e: logger.error(f"Exception: {e}") logger.error( @@ -418,3 +422,160 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService): yield None except Exception as e: logger.error(f"{self} exception: {e}") + + +class ElevenLabsHttpTTSService(TTSService): + """ElevenLabs Text-to-Speech service using HTTP streaming. + + Args: + api_key: ElevenLabs API key + voice_id: ID of the voice to use + aiohttp_session: aiohttp ClientSession + model: Model ID (default: "eleven_flash_v2_5" for low latency) + base_url: API base URL + output_format: Audio output format (PCM) + params: Additional parameters for voice configuration + """ + + class InputParams(BaseModel): + language: Optional[Language] = Language.EN + optimize_streaming_latency: Optional[int] = None + stability: Optional[float] = None + similarity_boost: Optional[float] = None + style: Optional[float] = None + use_speaker_boost: Optional[bool] = None + + def __init__( + self, + *, + api_key: str, + voice_id: str, + aiohttp_session: aiohttp.ClientSession, + model: str = "eleven_flash_v2_5", + base_url: str = "https://api.elevenlabs.io", + output_format: ElevenLabsOutputFormat = "pcm_24000", + params: InputParams = InputParams(), + **kwargs, + ): + super().__init__(sample_rate=sample_rate_from_output_format(output_format), **kwargs) + + self._api_key = api_key + self._base_url = base_url + self._output_format = output_format + self._params = params + self._session = aiohttp_session + + self._settings = { + "sample_rate": sample_rate_from_output_format(output_format), + "language": self.language_to_service_language(params.language) + if params.language + else "en", + "output_format": output_format, + "optimize_streaming_latency": params.optimize_streaming_latency, + "stability": params.stability, + "similarity_boost": params.similarity_boost, + "style": params.style, + "use_speaker_boost": params.use_speaker_boost, + } + self.set_model_name(model) + self.set_voice(voice_id) + self._voice_settings = self._set_voice_settings() + + def can_generate_metrics(self) -> bool: + return True + + def _set_voice_settings(self) -> Optional[Dict[str, Union[float, bool]]]: + """Configure voice settings if stability and similarity_boost are provided. + + Returns: + Dictionary of voice settings or None if required parameters are missing. + """ + voice_settings: Dict[str, Union[float, bool]] = {} + if ( + self._settings["stability"] is not None + and self._settings["similarity_boost"] is not None + ): + voice_settings["stability"] = float(self._settings["stability"]) + voice_settings["similarity_boost"] = float(self._settings["similarity_boost"]) + if self._settings["style"] is not None: + voice_settings["style"] = float(self._settings["style"]) + if self._settings["use_speaker_boost"] is not None: + voice_settings["use_speaker_boost"] = bool(self._settings["use_speaker_boost"]) + else: + if self._settings["style"] is not None: + logger.warning( + "'style' is set but will not be applied because 'stability' and 'similarity_boost' are not both set." + ) + if self._settings["use_speaker_boost"] is not None: + logger.warning( + "'use_speaker_boost' is set but will not be applied because 'stability' and 'similarity_boost' are not both set." + ) + + return voice_settings or None + + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: + """Generate speech from text using ElevenLabs streaming API. + + Args: + text: The text to convert to speech + + Yields: + Frames containing audio data and status information + """ + logger.debug(f"Generating TTS: [{text}]") + + url = f"{self._base_url}/v1/text-to-speech/{self._voice_id}/stream" + + payload = { + "text": text, + "model_id": self._model_name, + } + + if self._voice_settings: + payload["voice_settings"] = json.dumps(self._voice_settings) + + if self._settings["language"]: + payload["language_code"] = self._settings["language"] + + headers = { + "xi-api-key": self._api_key, + "Content-Type": "application/json", + } + + # Build query parameters + params = { + "output_format": self._output_format, + } + if self._settings["optimize_streaming_latency"] is not None: + params["optimize_streaming_latency"] = self._settings["optimize_streaming_latency"] + + logger.debug(f"ElevenLabs request - payload: {payload}, params: {params}") + + try: + await self.start_ttfb_metrics() + + async with self._session.post( + url, json=payload, headers=headers, params=params + ) as response: + if response.status != 200: + error_text = await response.text() + logger.error(f"{self} error: {error_text}") + yield ErrorFrame(error=f"ElevenLabs API error: {error_text}") + return + + await self.start_tts_usage_metrics(text) + yield TTSStartedFrame() + + async for chunk in response.content: + if chunk: + await self.stop_ttfb_metrics() + yield TTSAudioRawFrame(chunk, self._settings["sample_rate"], 1) + + yield TTSStoppedFrame() + + except Exception as e: + logger.error(f"Error in run_tts: {e}") + yield ErrorFrame(error=str(e)) + + finally: + yield TTSStoppedFrame()