diff --git a/CHANGELOG.md b/CHANGELOG.md index 474f06989..37189eb47 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added Google TTS service and corresponding foundational example `07n-interruptible-google.py` + - Added AWS Polly TTS support and `07m-interruptible-aws.py` as an example. - Added InputParams to Azure TTS service. diff --git a/examples/foundational/07n-interruptible-google.py b/examples/foundational/07n-interruptible-google.py new file mode 100644 index 000000000..713b3dce3 --- /dev/null +++ b/examples/foundational/07n-interruptible-google.py @@ -0,0 +1,100 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import os +import sys + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from runner import configure + +from pipecat.frames.frames import LLMMessagesFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_response import ( + LLMAssistantResponseAggregator, + LLMUserResponseAggregator, +) +from pipecat.services.deepgram import DeepgramSTTService +from pipecat.services.google import GoogleTTSService +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.vad.silero import SileroVADAnalyzer + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, token) = await configure(session) + + transport = DailyTransport( + room_url, + token, + "Respond bot", + DailyParams( + audio_out_enabled=True, + audio_out_sample_rate=24000, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + vad_audio_passthrough=True, + ), + ) + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = GoogleTTSService( + credentials=os.getenv("GOOGLE_CREDENTIALS"), + voice_id="en-US-Neural2-J", + params=GoogleTTSService.InputParams(language="en-US", rate="1.05"), + ) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + + tma_in = LLMUserResponseAggregator(messages) + tma_out = LLMAssistantResponseAggregator(messages) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, # STT + tma_in, # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + tma_out, # Assistant spoken responses + ] + ) + + task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + transport.capture_participant_transcription(participant["id"]) + # Kick off the conversation. + messages.append({"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([LLMMessagesFrame(messages)]) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml index 73e9363db..e31755d50 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,7 +44,7 @@ elevenlabs = [ "websockets~=12.0" ] examples = [ "python-dotenv~=1.0.1", "flask~=3.0.3", "flask_cors~=4.0.1" ] fal = [ "fal-client~=0.4.1" ] gladia = [ "websockets~=12.0" ] -google = [ "google-generativeai~=0.7.2" ] +google = [ "google-generativeai~=0.7.2", "google-cloud-texttospeech~=2.17.2" ] gstreamer = [ "pygobject~=3.48.2" ] fireworks = [ "openai~=1.37.2" ] langchain = [ "langchain~=0.2.14", "langchain-community~=0.2.12", "langchain-openai~=0.1.20" ] diff --git a/src/pipecat/services/google.py b/src/pipecat/services/google.py index 4de6b77fa..38af3e41f 100644 --- a/src/pipecat/services/google.py +++ b/src/pipecat/services/google.py @@ -5,30 +5,37 @@ # import asyncio +import json +from typing import AsyncGenerator, List, Literal, Optional -from typing import List +from loguru import logger +from pydantic import BaseModel from pipecat.frames.frames import ( + ErrorFrame, Frame, + LLMFullResponseEndFrame, + LLMFullResponseStartFrame, + LLMMessagesFrame, LLMModelUpdateFrame, TextFrame, + TTSAudioRawFrame, + TTSStartedFrame, + TTSStoppedFrame, VisionImageRawFrame, - LLMMessagesFrame, - LLMFullResponseStartFrame, - LLMFullResponseEndFrame, ) -from pipecat.processors.frame_processor import FrameDirection -from pipecat.services.ai_services import LLMService from pipecat.processors.aggregators.openai_llm_context import ( OpenAILLMContext, OpenAILLMContextFrame, ) - -from loguru import logger +from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.ai_services import LLMService, TTSService try: - import google.generativeai as gai import google.ai.generativelanguage as glm + import google.generativeai as gai + from google.cloud import texttospeech_v1 + from google.oauth2 import service_account except ModuleNotFoundError as e: logger.error(f"Exception: {e}") logger.error( @@ -137,3 +144,188 @@ class GoogleLLMService(LLMService): if context: await self._process_context(context) + + +class GoogleTTSService(TTSService): + class InputParams(BaseModel): + pitch: Optional[str] = None + rate: Optional[str] = None + volume: Optional[str] = None + emphasis: Optional[Literal["strong", "moderate", "reduced", "none"]] = None + language: Optional[str] = None + gender: Optional[Literal["male", "female", "neutral"]] = None + google_style: Optional[Literal["apologetic", "calm", "empathetic", "firm", "lively"]] = None + + def __init__( + self, + *, + credentials: Optional[str] = None, + credentials_path: Optional[str] = None, + voice_id: str = "en-US-Neural2-A", + sample_rate: int = 24000, + params: InputParams = InputParams(), + **kwargs, + ): + super().__init__(sample_rate=sample_rate, **kwargs) + + self._voice_id: str = voice_id + self._params = params + self._client: texttospeech_v1.TextToSpeechAsyncClient = self._create_client( + credentials, credentials_path + ) + + def _create_client( + self, credentials: Optional[str], credentials_path: Optional[str] + ) -> texttospeech_v1.TextToSpeechAsyncClient: + creds: Optional[service_account.Credentials] = None + + # Create a Google Cloud service account for the Cloud Text-to-Speech API + # Using either the provided credentials JSON string or the path to a service account JSON + # file, create a Google Cloud service account and use it to authenticate with the API. + if credentials: + # Use provided credentials JSON string + json_account_info = json.loads(credentials) + creds = service_account.Credentials.from_service_account_info(json_account_info) + elif credentials_path: + # Use service account JSON file if provided + creds = service_account.Credentials.from_service_account_file(credentials_path) + else: + raise ValueError("Either 'credentials' or 'credentials_path' must be provided.") + + return texttospeech_v1.TextToSpeechAsyncClient(credentials=creds) + + def can_generate_metrics(self) -> bool: + return True + + def _construct_ssml(self, text: str) -> str: + ssml = "" + + # Voice tag + voice_attrs = [f"name='{self._voice_id}'"] + if self._params.language: + voice_attrs.append(f"language='{self._params.language}'") + if self._params.gender: + voice_attrs.append(f"gender='{self._params.gender}'") + ssml += f"" + + # Prosody tag + prosody_attrs = [] + if self._params.pitch: + prosody_attrs.append(f"pitch='{self._params.pitch}'") + if self._params.rate: + prosody_attrs.append(f"rate='{self._params.rate}'") + if self._params.volume: + prosody_attrs.append(f"volume='{self._params.volume}'") + + if prosody_attrs: + ssml += f"" + + # Emphasis tag + if self._params.emphasis: + ssml += f"" + + # Google style tag + if self._params.google_style: + ssml += f"" + + ssml += text + + # Close tags + if self._params.google_style: + ssml += "" + if self._params.emphasis: + ssml += "" + if prosody_attrs: + ssml += "" + ssml += "" + + return ssml + + async def set_voice(self, voice: str) -> None: + logger.debug(f"Switching TTS voice to: [{voice}]") + self._voice_id = voice + + async def set_language(self, language: str) -> None: + logger.debug(f"Switching TTS language to: [{language}]") + self._params.language = language + + async def set_pitch(self, pitch: str) -> None: + logger.debug(f"Switching TTS pitch to: [{pitch}]") + self._params.pitch = pitch + + async def set_rate(self, rate: str) -> None: + logger.debug(f"Switching TTS rate to: [{rate}]") + self._params.rate = rate + + async def set_volume(self, volume: str) -> None: + logger.debug(f"Switching TTS volume to: [{volume}]") + self._params.volume = volume + + async def set_emphasis( + self, emphasis: Literal["strong", "moderate", "reduced", "none"] + ) -> None: + logger.debug(f"Switching TTS emphasis to: [{emphasis}]") + self._params.emphasis = emphasis + + async def set_gender(self, gender: Literal["male", "female", "neutral"]) -> None: + logger.debug(f"Switch TTS gender to [{gender}]") + self._params.gender = gender + + async def google_style( + self, google_style: Literal["apologetic", "calm", "empathetic", "firm", "lively"] + ) -> None: + logger.debug(f"Switching TTS google style to: [{google_style}]") + self._params.google_style = google_style + + async def set_params(self, params: InputParams) -> None: + logger.debug(f"Switching TTS params to: [{params}]") + self._params = params + + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: + logger.debug(f"Generating TTS: [{text}]") + + try: + await self.start_ttfb_metrics() + + ssml = self._construct_ssml(text) + synthesis_input = texttospeech_v1.SynthesisInput(ssml=ssml) + voice = texttospeech_v1.VoiceSelectionParams( + language_code=self._params.language, name=self._voice_id + ) + audio_config = texttospeech_v1.AudioConfig( + audio_encoding=texttospeech_v1.AudioEncoding.LINEAR16, + sample_rate_hertz=self.sample_rate, + ) + + request = texttospeech_v1.SynthesizeSpeechRequest( + input=synthesis_input, voice=voice, audio_config=audio_config + ) + + response = await self._client.synthesize_speech(request=request) + + await self.start_tts_usage_metrics(text) + + await self.push_frame(TTSStartedFrame()) + + # Skip the first 44 bytes to remove the WAV header + audio_content = response.audio_content[44:] + + # Read and yield audio data in chunks + chunk_size = 8192 + for i in range(0, len(audio_content), chunk_size): + chunk = audio_content[i : i + chunk_size] + if not chunk: + break + await self.stop_ttfb_metrics() + frame = TTSAudioRawFrame(chunk, self.sample_rate, 1) + yield frame + await asyncio.sleep(0) # Allow other tasks to run + + await self.push_frame(TTSStoppedFrame()) + + except Exception as e: + logger.exception(f"{self} error generating TTS: {e}") + error_message = f"TTS generation error: {str(e)}" + yield ErrorFrame(error=error_message) + finally: + await self.push_frame(TTSStoppedFrame()) diff --git a/test-requirements.txt b/test-requirements.txt index 94c81331d..8c7db7377 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -7,6 +7,7 @@ deepgram-sdk~=3.5.0 fal-client~=0.4.1 fastapi~=0.112.1 faster-whisper~=1.0.3 +google-cloud-texttospeech~=2.17.2 google-generativeai~=0.7.2 langchain~=0.2.14 livekit~=0.13.1