From 225b65c3d2848bb58bb911d8f0813f232e03a3eb Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 16 Jan 2025 22:39:36 -0500 Subject: [PATCH 1/4] Add ElevenLabsHttpTTSService --- .../07d-interruptible-elevenlabs-http.py | 103 ++++++++++++++++ src/pipecat/services/elevenlabs.py | 115 ++++++++++++++++++ 2 files changed, 218 insertions(+) create mode 100644 examples/foundational/07d-interruptible-elevenlabs-http.py diff --git a/examples/foundational/07d-interruptible-elevenlabs-http.py b/examples/foundational/07d-interruptible-elevenlabs-http.py new file mode 100644 index 000000000..1f09680d2 --- /dev/null +++ b/examples/foundational/07d-interruptible-elevenlabs-http.py @@ -0,0 +1,103 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import os +import sys + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from runner import configure + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import EndFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.services.elevenlabs import ElevenLabsHttpTTSService +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, token) = await configure(session) + + transport = DailyTransport( + room_url, + token, + "Respond bot", + DailyParams( + audio_out_enabled=True, + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + ) + + tts = ElevenLabsHttpTTSService( + api_key=os.getenv("ELEVENLABS_API_KEY", ""), + voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""), + ) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses + ] + ) + + task = PipelineTask( + pipeline, + PipelineParams( + allow_interruptions=True, + enable_metrics=True, + enable_usage_metrics=True, + report_only_initial_ttfb=True, + ), + ) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + await transport.capture_participant_transcription(participant["id"]) + # Kick off the conversation. + messages.append({"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @transport.event_handler("on_participant_left") + async def on_participant_left(transport, participant, reason): + await task.queue_frame(EndFrame()) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/pipecat/services/elevenlabs.py b/src/pipecat/services/elevenlabs.py index c1d326dfc..6c4c5469e 100644 --- a/src/pipecat/services/elevenlabs.py +++ b/src/pipecat/services/elevenlabs.py @@ -6,9 +6,11 @@ import asyncio import base64 +import io import json from typing import Any, AsyncGenerator, Dict, List, Literal, Mapping, Optional, Tuple +import aiohttp from loguru import logger from pydantic import BaseModel, model_validator @@ -33,6 +35,8 @@ from pipecat.transcriptions.language import Language # See .env.example for ElevenLabs configuration needed try: import websockets + from elevenlabs import Voice, VoiceSettings + from elevenlabs.client import ElevenLabs except ModuleNotFoundError as e: logger.error(f"Exception: {e}") logger.error( @@ -418,3 +422,114 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService): yield None except Exception as e: logger.error(f"{self} exception: {e}") + + +class ElevenLabsHttpTTSService(WordTTSService): + class InputParams(BaseModel): + language: Optional[Language] = Language.EN + optimize_streaming_latency: Optional[str] = None + stability: Optional[float] = None + similarity_boost: Optional[float] = None + style: Optional[float] = None + use_speaker_boost: Optional[bool] = None + + def __init__( + self, + *, + api_key: str, + voice_id: str, + model: str = "eleven_flash_v2_5", + output_format: ElevenLabsOutputFormat = "pcm_24000", + params: InputParams = InputParams(), + **kwargs, + ): + sample_rate = self._sample_rate_from_output_format(output_format) + super().__init__( + aggregate_sentences=True, + push_text_frames=False, + push_stop_frames=True, + stop_frame_timeout_s=2.0, + sample_rate=sample_rate, + **kwargs, + ) + + self._client = ElevenLabs(api_key=api_key) + self._voice_id = voice_id + self._model = model + self._output_format = output_format + + # Create voice settings if provided + self._voice_settings = None + if params.stability is not None and params.similarity_boost is not None: + self._voice_settings = VoiceSettings( + stability=params.stability, + similarity_boost=params.similarity_boost, + style=params.style or 0.0, + use_speaker_boost=params.use_speaker_boost or False, + ) + + logger.debug(f"Initialized with sample rate: {sample_rate}") + + @staticmethod + def _sample_rate_from_output_format(output_format: str) -> int: + return { + "pcm_16000": 16000, + "pcm_22050": 22050, + "pcm_24000": 24000, + "pcm_44100": 44100, + }[output_format] + + async def start(self, frame: StartFrame): + await super().start(frame) + + async def stop(self, frame: EndFrame): + await super().stop(frame) + + async def cancel(self, frame: CancelFrame): + await super().cancel(frame) + + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: + def read_audio_stream(**kwargs): + # Run the streaming in a separate thread + audio_chunks = [] + stream = self._client.text_to_speech.convert_as_stream(**kwargs) + for chunk in stream: + if chunk: + audio_chunks.append(chunk) + return b"".join(audio_chunks) + + try: + yield TTSStartedFrame() + + # Prepare parameters + params = { + "text": text, + "voice_id": self._voice_id, + "model_id": self._model, + "output_format": self._output_format, + "voice_settings": self._voice_settings, + "optimize_streaming_latency": 4, # Maximum optimization + disabled text normalizer + } + + # Get audio data in a separate thread + audio_data = await asyncio.to_thread(read_audio_stream, **params) + + if not audio_data: + logger.error(f"{self} No audio data returned") + yield None + return + + # Stream the audio data in chunks + chunk_size = 4096 # Adjust this value as needed + for i in range(0, len(audio_data), chunk_size): + chunk = audio_data[i : i + chunk_size] + if len(chunk) > 0: + yield TTSAudioRawFrame( + chunk, self._sample_rate_from_output_format(self._output_format), 1 + ) + + yield TTSStoppedFrame() + + except Exception as e: + logger.error(f"Error in run_tts: {e}") + yield TTSStoppedFrame() From 740d2743df16149b031186fdf1e89c2bfee297c0 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 16 Jan 2025 22:56:25 -0500 Subject: [PATCH 2/4] Add TTFB metrics --- CHANGELOG.md | 3 +++ pyproject.toml | 2 +- src/pipecat/services/elevenlabs.py | 33 ++++++++++++++++++++++-------- 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a47979a85..e2870e9a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added `ElevenLabsHttpTTSService` and the + `07d-interruptible-elevenlabs-http.py` foundational example. + - Introduced pipeline frame observers. Observers can view all the frames that go through the pipeline without the need to inject processors in the pipeline. This can be useful, for example, to implement frame loggers or diff --git a/pyproject.toml b/pyproject.toml index ee842a533..b50f9d180 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,7 +51,7 @@ cerebras = [ "openai~=1.59.6" ] deepseek = [ "openai~=1.59.6" ] daily = [ "daily-python~=0.14.2" ] deepgram = [ "deepgram-sdk~=3.8.0" ] -elevenlabs = [ "websockets~=13.1" ] +elevenlabs = [ "elevenlabs~=1.50.3","websockets~=13.1" ] fal = [ "fal-client~=0.5.6" ] fish = [ "ormsgpack~=1.7.0", "websockets~=13.1" ] gladia = [ "websockets~=13.1" ] diff --git a/src/pipecat/services/elevenlabs.py b/src/pipecat/services/elevenlabs.py index 6c4c5469e..cca421052 100644 --- a/src/pipecat/services/elevenlabs.py +++ b/src/pipecat/services/elevenlabs.py @@ -6,11 +6,9 @@ import asyncio import base64 -import io import json from typing import Any, AsyncGenerator, Dict, List, Literal, Mapping, Optional, Tuple -import aiohttp from loguru import logger from pydantic import BaseModel, model_validator @@ -18,6 +16,7 @@ from pipecat.frames.frames import ( BotStoppedSpeakingFrame, CancelFrame, EndFrame, + ErrorFrame, Frame, LLMFullResponseEndFrame, StartFrame, @@ -28,14 +27,14 @@ from pipecat.frames.frames import ( TTSStoppedFrame, ) from pipecat.processors.frame_processor import FrameDirection -from pipecat.services.ai_services import WordTTSService +from pipecat.services.ai_services import TTSService, WordTTSService from pipecat.services.websocket_service import WebsocketService from pipecat.transcriptions.language import Language # See .env.example for ElevenLabs configuration needed try: import websockets - from elevenlabs import Voice, VoiceSettings + from elevenlabs import VoiceSettings from elevenlabs.client import ElevenLabs except ModuleNotFoundError as e: logger.error(f"Exception: {e}") @@ -424,7 +423,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService): logger.error(f"{self} exception: {e}") -class ElevenLabsHttpTTSService(WordTTSService): +class ElevenLabsHttpTTSService(TTSService): class InputParams(BaseModel): language: Optional[Language] = Language.EN optimize_streaming_latency: Optional[str] = None @@ -479,6 +478,9 @@ class ElevenLabsHttpTTSService(WordTTSService): "pcm_44100": 44100, }[output_format] + def can_generate_metrics(self) -> bool: + return True + async def start(self, frame: StartFrame): await super().start(frame) @@ -490,7 +492,6 @@ class ElevenLabsHttpTTSService(WordTTSService): async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: def read_audio_stream(**kwargs): - # Run the streaming in a separate thread audio_chunks = [] stream = self._client.text_to_speech.convert_as_stream(**kwargs) for chunk in stream: @@ -498,8 +499,11 @@ class ElevenLabsHttpTTSService(WordTTSService): audio_chunks.append(chunk) return b"".join(audio_chunks) + logger.debug(f"Generating TTS: [{text}]") + try: - yield TTSStartedFrame() + # Start TTFB metrics before any processing + await self.start_ttfb_metrics() # Prepare parameters params = { @@ -508,7 +512,7 @@ class ElevenLabsHttpTTSService(WordTTSService): "model_id": self._model, "output_format": self._output_format, "voice_settings": self._voice_settings, - "optimize_streaming_latency": 4, # Maximum optimization + disabled text normalizer + "optimize_streaming_latency": 4, } # Get audio data in a separate thread @@ -519,11 +523,19 @@ class ElevenLabsHttpTTSService(WordTTSService): yield None return + # Start usage metrics before sending any frames + await self.start_tts_usage_metrics(text) + + yield TTSStartedFrame() + # Stream the audio data in chunks - chunk_size = 4096 # Adjust this value as needed + chunk_size = 4096 for i in range(0, len(audio_data), chunk_size): chunk = audio_data[i : i + chunk_size] if len(chunk) > 0: + # Stop TTFB metrics on first chunk + await self.stop_ttfb_metrics() + yield TTSAudioRawFrame( chunk, self._sample_rate_from_output_format(self._output_format), 1 ) @@ -532,4 +544,7 @@ class ElevenLabsHttpTTSService(WordTTSService): except Exception as e: logger.error(f"Error in run_tts: {e}") + yield ErrorFrame(error=str(e)) + + finally: yield TTSStoppedFrame() From d51893f61ccec3f0828f74478be8da51ac50a69a Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 16 Jan 2025 23:49:53 -0500 Subject: [PATCH 3/4] Refactor for aiohttp, correct use of settings --- .../07d-interruptible-elevenlabs-http.py | 1 + pyproject.toml | 2 +- src/pipecat/services/elevenlabs.py | 188 +++++++++++------- 3 files changed, 114 insertions(+), 77 deletions(-) diff --git a/examples/foundational/07d-interruptible-elevenlabs-http.py b/examples/foundational/07d-interruptible-elevenlabs-http.py index 1f09680d2..4b8aefd98 100644 --- a/examples/foundational/07d-interruptible-elevenlabs-http.py +++ b/examples/foundational/07d-interruptible-elevenlabs-http.py @@ -48,6 +48,7 @@ async def main(): tts = ElevenLabsHttpTTSService( api_key=os.getenv("ELEVENLABS_API_KEY", ""), voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""), + # params=ElevenLabsHttpTTSService.InputParams(language="en"), ) llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") diff --git a/pyproject.toml b/pyproject.toml index b50f9d180..ee842a533 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,7 +51,7 @@ cerebras = [ "openai~=1.59.6" ] deepseek = [ "openai~=1.59.6" ] daily = [ "daily-python~=0.14.2" ] deepgram = [ "deepgram-sdk~=3.8.0" ] -elevenlabs = [ "elevenlabs~=1.50.3","websockets~=13.1" ] +elevenlabs = [ "websockets~=13.1" ] fal = [ "fal-client~=0.5.6" ] fish = [ "ormsgpack~=1.7.0", "websockets~=13.1" ] gladia = [ "websockets~=13.1" ] diff --git a/src/pipecat/services/elevenlabs.py b/src/pipecat/services/elevenlabs.py index cca421052..0ad542821 100644 --- a/src/pipecat/services/elevenlabs.py +++ b/src/pipecat/services/elevenlabs.py @@ -7,8 +7,9 @@ import asyncio import base64 import json -from typing import Any, AsyncGenerator, Dict, List, Literal, Mapping, Optional, Tuple +from typing import Any, AsyncGenerator, Dict, List, Literal, Mapping, Optional, Tuple, Union +import aiohttp from loguru import logger from pydantic import BaseModel, model_validator @@ -424,9 +425,20 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService): class ElevenLabsHttpTTSService(TTSService): + """ElevenLabs Text-to-Speech service using HTTP streaming. + + Args: + api_key: ElevenLabs API key + voice_id: ID of the voice to use + model: Model ID (default: "eleven_flash_v2_5" for low latency) + base_url: API base URL + output_format: Audio output format (PCM) + params: Additional parameters for voice configuration + """ + class InputParams(BaseModel): language: Optional[Language] = Language.EN - optimize_streaming_latency: Optional[str] = None + optimize_streaming_latency: Optional[int] = None stability: Optional[float] = None similarity_boost: Optional[float] = None style: Optional[float] = None @@ -438,109 +450,133 @@ class ElevenLabsHttpTTSService(TTSService): api_key: str, voice_id: str, model: str = "eleven_flash_v2_5", + base_url: str = "https://api.elevenlabs.io", output_format: ElevenLabsOutputFormat = "pcm_24000", params: InputParams = InputParams(), **kwargs, ): - sample_rate = self._sample_rate_from_output_format(output_format) - super().__init__( - aggregate_sentences=True, - push_text_frames=False, - push_stop_frames=True, - stop_frame_timeout_s=2.0, - sample_rate=sample_rate, - **kwargs, - ) + sample_rate = sample_rate_from_output_format(output_format) + super().__init__(sample_rate=sample_rate, **kwargs) - self._client = ElevenLabs(api_key=api_key) - self._voice_id = voice_id - self._model = model + self._api_key = api_key + self._base_url = base_url self._output_format = output_format + self._params = params + self._session: Optional[aiohttp.ClientSession] = None - # Create voice settings if provided - self._voice_settings = None - if params.stability is not None and params.similarity_boost is not None: - self._voice_settings = VoiceSettings( - stability=params.stability, - similarity_boost=params.similarity_boost, - style=params.style or 0.0, - use_speaker_boost=params.use_speaker_boost or False, - ) - - logger.debug(f"Initialized with sample rate: {sample_rate}") - - @staticmethod - def _sample_rate_from_output_format(output_format: str) -> int: - return { - "pcm_16000": 16000, - "pcm_22050": 22050, - "pcm_24000": 24000, - "pcm_44100": 44100, - }[output_format] + self._settings = { + "sample_rate": sample_rate_from_output_format(output_format), + "language": self.language_to_service_language(params.language) + if params.language + else "en", + "output_format": output_format, + "optimize_streaming_latency": params.optimize_streaming_latency, + "stability": params.stability, + "similarity_boost": params.similarity_boost, + "style": params.style, + "use_speaker_boost": params.use_speaker_boost, + } + self.set_model_name(model) + self.set_voice(voice_id) + self._voice_settings = self._set_voice_settings() def can_generate_metrics(self) -> bool: return True + def _set_voice_settings(self) -> Optional[Dict[str, Union[float, bool]]]: + voice_settings: Dict[str, Union[float, bool]] = {} + if ( + self._settings["stability"] is not None + and self._settings["similarity_boost"] is not None + ): + voice_settings["stability"] = float(self._settings["stability"]) + voice_settings["similarity_boost"] = float(self._settings["similarity_boost"]) + if self._settings["style"] is not None: + voice_settings["style"] = float(self._settings["style"]) + if self._settings["use_speaker_boost"] is not None: + voice_settings["use_speaker_boost"] = bool(self._settings["use_speaker_boost"]) + else: + if self._settings["style"] is not None: + logger.warning( + "'style' is set but will not be applied because 'stability' and 'similarity_boost' are not both set." + ) + if self._settings["use_speaker_boost"] is not None: + logger.warning( + "'use_speaker_boost' is set but will not be applied because 'stability' and 'similarity_boost' are not both set." + ) + + return voice_settings or None + async def start(self, frame: StartFrame): await super().start(frame) + self._session = aiohttp.ClientSession() async def stop(self, frame: EndFrame): await super().stop(frame) + if self._session: + await self._session.close() + self._session = None async def cancel(self, frame: CancelFrame): await super().cancel(frame) + if self._session: + await self._session.close() + self._session = None async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: - def read_audio_stream(**kwargs): - audio_chunks = [] - stream = self._client.text_to_speech.convert_as_stream(**kwargs) - for chunk in stream: - if chunk: - audio_chunks.append(chunk) - return b"".join(audio_chunks) - logger.debug(f"Generating TTS: [{text}]") + if not self._session: + self._session = aiohttp.ClientSession() + + url = f"{self._base_url}/v1/text-to-speech/{self._voice_id}/stream" + + payload = { + "text": text, + "model_id": self._model_name, + } + + if self._voice_settings: + payload["voice_settings"] = json.dumps(self._voice_settings) + + if self._settings["language"]: + payload["language_code"] = self._settings["language"] + + headers = { + "xi-api-key": self._api_key, + "Content-Type": "application/json", + } + + # Build query parameters + params = { + "output_format": self._output_format, + } + if self._settings["optimize_streaming_latency"] is not None: + params["optimize_streaming_latency"] = self._settings["optimize_streaming_latency"] + + logger.debug(f"ElevenLabs request - payload: {payload}, params: {params}") + try: - # Start TTFB metrics before any processing await self.start_ttfb_metrics() - # Prepare parameters - params = { - "text": text, - "voice_id": self._voice_id, - "model_id": self._model, - "output_format": self._output_format, - "voice_settings": self._voice_settings, - "optimize_streaming_latency": 4, - } + async with self._session.post( + url, json=payload, headers=headers, params=params + ) as response: + if response.status != 200: + error_text = await response.text() + logger.error(f"{self} error: {error_text}") + yield ErrorFrame(error=f"ElevenLabs API error: {error_text}") + return - # Get audio data in a separate thread - audio_data = await asyncio.to_thread(read_audio_stream, **params) + await self.start_tts_usage_metrics(text) + yield TTSStartedFrame() - if not audio_data: - logger.error(f"{self} No audio data returned") - yield None - return + async for chunk in response.content: + if chunk: + await self.stop_ttfb_metrics() + yield TTSAudioRawFrame(chunk, self._settings["sample_rate"], 1) - # Start usage metrics before sending any frames - await self.start_tts_usage_metrics(text) - - yield TTSStartedFrame() - - # Stream the audio data in chunks - chunk_size = 4096 - for i in range(0, len(audio_data), chunk_size): - chunk = audio_data[i : i + chunk_size] - if len(chunk) > 0: - # Stop TTFB metrics on first chunk - await self.stop_ttfb_metrics() - - yield TTSAudioRawFrame( - chunk, self._sample_rate_from_output_format(self._output_format), 1 - ) - - yield TTSStoppedFrame() + yield TTSStoppedFrame() except Exception as e: logger.error(f"Error in run_tts: {e}") From b81323d6765c997d33d17c30b8930894ba3f533b Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Fri, 17 Jan 2025 20:11:16 -0500 Subject: [PATCH 4/4] Code review fixes + docstrings --- .../07d-interruptible-elevenlabs-http.py | 104 ------------------ src/pipecat/services/elevenlabs.py | 39 +++---- 2 files changed, 17 insertions(+), 126 deletions(-) delete mode 100644 examples/foundational/07d-interruptible-elevenlabs-http.py diff --git a/examples/foundational/07d-interruptible-elevenlabs-http.py b/examples/foundational/07d-interruptible-elevenlabs-http.py deleted file mode 100644 index 4b8aefd98..000000000 --- a/examples/foundational/07d-interruptible-elevenlabs-http.py +++ /dev/null @@ -1,104 +0,0 @@ -# -# Copyright (c) 2024–2025, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# - -import asyncio -import os -import sys - -import aiohttp -from dotenv import load_dotenv -from loguru import logger -from runner import configure - -from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import EndFrame -from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask -from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext -from pipecat.services.elevenlabs import ElevenLabsHttpTTSService -from pipecat.services.openai import OpenAILLMService -from pipecat.transports.services.daily import DailyParams, DailyTransport - -load_dotenv(override=True) - -logger.remove(0) -logger.add(sys.stderr, level="DEBUG") - - -async def main(): - async with aiohttp.ClientSession() as session: - (room_url, token) = await configure(session) - - transport = DailyTransport( - room_url, - token, - "Respond bot", - DailyParams( - audio_out_enabled=True, - transcription_enabled=True, - vad_enabled=True, - vad_analyzer=SileroVADAnalyzer(), - ), - ) - - tts = ElevenLabsHttpTTSService( - api_key=os.getenv("ELEVENLABS_API_KEY", ""), - voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""), - # params=ElevenLabsHttpTTSService.InputParams(language="en"), - ) - - llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") - - messages = [ - { - "role": "system", - "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", - }, - ] - - context = OpenAILLMContext(messages) - context_aggregator = llm.create_context_aggregator(context) - - pipeline = Pipeline( - [ - transport.input(), # Transport user input - context_aggregator.user(), # User responses - llm, # LLM - tts, # TTS - transport.output(), # Transport bot output - context_aggregator.assistant(), # Assistant spoken responses - ] - ) - - task = PipelineTask( - pipeline, - PipelineParams( - allow_interruptions=True, - enable_metrics=True, - enable_usage_metrics=True, - report_only_initial_ttfb=True, - ), - ) - - @transport.event_handler("on_first_participant_joined") - async def on_first_participant_joined(transport, participant): - await transport.capture_participant_transcription(participant["id"]) - # Kick off the conversation. - messages.append({"role": "system", "content": "Please introduce yourself to the user."}) - await task.queue_frames([context_aggregator.user().get_context_frame()]) - - @transport.event_handler("on_participant_left") - async def on_participant_left(transport, participant, reason): - await task.queue_frame(EndFrame()) - - runner = PipelineRunner() - - await runner.run(task) - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/src/pipecat/services/elevenlabs.py b/src/pipecat/services/elevenlabs.py index 0ad542821..188c5a9c9 100644 --- a/src/pipecat/services/elevenlabs.py +++ b/src/pipecat/services/elevenlabs.py @@ -430,6 +430,7 @@ class ElevenLabsHttpTTSService(TTSService): Args: api_key: ElevenLabs API key voice_id: ID of the voice to use + aiohttp_session: aiohttp ClientSession model: Model ID (default: "eleven_flash_v2_5" for low latency) base_url: API base URL output_format: Audio output format (PCM) @@ -449,20 +450,20 @@ class ElevenLabsHttpTTSService(TTSService): *, api_key: str, voice_id: str, + aiohttp_session: aiohttp.ClientSession, model: str = "eleven_flash_v2_5", base_url: str = "https://api.elevenlabs.io", output_format: ElevenLabsOutputFormat = "pcm_24000", params: InputParams = InputParams(), **kwargs, ): - sample_rate = sample_rate_from_output_format(output_format) - super().__init__(sample_rate=sample_rate, **kwargs) + super().__init__(sample_rate=sample_rate_from_output_format(output_format), **kwargs) self._api_key = api_key self._base_url = base_url self._output_format = output_format self._params = params - self._session: Optional[aiohttp.ClientSession] = None + self._session = aiohttp_session self._settings = { "sample_rate": sample_rate_from_output_format(output_format), @@ -484,6 +485,11 @@ class ElevenLabsHttpTTSService(TTSService): return True def _set_voice_settings(self) -> Optional[Dict[str, Union[float, bool]]]: + """Configure voice settings if stability and similarity_boost are provided. + + Returns: + Dictionary of voice settings or None if required parameters are missing. + """ voice_settings: Dict[str, Union[float, bool]] = {} if ( self._settings["stability"] is not None @@ -507,27 +513,16 @@ class ElevenLabsHttpTTSService(TTSService): return voice_settings or None - async def start(self, frame: StartFrame): - await super().start(frame) - self._session = aiohttp.ClientSession() - - async def stop(self, frame: EndFrame): - await super().stop(frame) - if self._session: - await self._session.close() - self._session = None - - async def cancel(self, frame: CancelFrame): - await super().cancel(frame) - if self._session: - await self._session.close() - self._session = None - async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: - logger.debug(f"Generating TTS: [{text}]") + """Generate speech from text using ElevenLabs streaming API. - if not self._session: - self._session = aiohttp.ClientSession() + Args: + text: The text to convert to speech + + Yields: + Frames containing audio data and status information + """ + logger.debug(f"Generating TTS: [{text}]") url = f"{self._base_url}/v1/text-to-speech/{self._voice_id}/stream"