From d42d02d809e4a437bfc3ddf7c3fe0653205934bd Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 22 May 2025 11:21:06 +0200 Subject: [PATCH] Add Google streaming TTS as a base TTS service. Rename non-streaming service to GoogleHttpTTSService. --- src/pipecat/services/google/tts.py | 147 ++++++++++++++++++++++++++++- 1 file changed, 146 insertions(+), 1 deletion(-) diff --git a/src/pipecat/services/google/tts.py b/src/pipecat/services/google/tts.py index 9725aa1c3..4a596ede9 100644 --- a/src/pipecat/services/google/tts.py +++ b/src/pipecat/services/google/tts.py @@ -202,7 +202,7 @@ def language_to_google_tts_language(language: Language) -> Optional[str]: return language_map.get(language) -class GoogleTTSService(TTSService): +class GoogleHttpTTSService(TTSService): class InputParams(BaseModel): pitch: Optional[str] = None rate: Optional[str] = None @@ -379,3 +379,148 @@ class GoogleTTSService(TTSService): logger.exception(f"{self} error generating TTS: {e}") error_message = f"TTS generation error: {str(e)}" yield ErrorFrame(error=error_message) + + +class GoogleTTSService(TTSService): + class InputParams(BaseModel): + pitch: Optional[str] = None + rate: Optional[str] = None + volume: Optional[str] = None + emphasis: Optional[Literal["strong", "moderate", "reduced", "none"]] = None + language: Optional[Language] = Language.EN + gender: Optional[Literal["male", "female", "neutral"]] = None + google_style: Optional[Literal["apologetic", "calm", "empathetic", "firm", "lively"]] = None + + def __init__( + self, + *, + credentials: Optional[str] = None, + credentials_path: Optional[str] = None, + voice_id: str = "en-US-Neural2-A", + sample_rate: Optional[int] = None, + params: InputParams = InputParams(), + **kwargs, + ): + super().__init__(sample_rate=sample_rate, **kwargs) + + self._settings = { + "pitch": params.pitch, + "rate": params.rate, + "volume": params.volume, + "emphasis": params.emphasis, + "language": self.language_to_service_language(params.language) + if params.language + else "en-US", + "gender": params.gender, + "google_style": params.google_style, + } + self.set_voice(voice_id) + self._client: texttospeech_v1.TextToSpeechAsyncClient = self._create_client( + credentials, credentials_path + ) + + def _create_client( + self, credentials: Optional[str], credentials_path: Optional[str] + ) -> texttospeech_v1.TextToSpeechAsyncClient: + creds: Optional[service_account.Credentials] = None + + # Create a Google Cloud service account for the Cloud Text-to-Speech API + # Using either the provided credentials JSON string or the path to a service account JSON + # file, create a Google Cloud service account and use it to authenticate with the API. + if credentials: + # Use provided credentials JSON string + json_account_info = json.loads(credentials) + creds = service_account.Credentials.from_service_account_info(json_account_info) + elif credentials_path: + # Use service account JSON file if provided + creds = service_account.Credentials.from_service_account_file(credentials_path) + else: + try: + creds, project_id = default( + scopes=["https://www.googleapis.com/auth/cloud-platform"] + ) + except GoogleAuthError: + pass + + if not creds: + raise ValueError("No valid credentials provided.") + + return texttospeech_v1.TextToSpeechAsyncClient(credentials=creds) + + def can_generate_metrics(self) -> bool: + return True + + def language_to_service_language(self, language: Language) -> Optional[str]: + return language_to_google_tts_language(language) + + @traced_tts + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: + # Chirp3 and Journey voices only. Does not support SSML. + logger.debug(f"{self}: Generating TTS [{text}]") + + try: + await self.start_ttfb_metrics() + + voice = texttospeech_v1.VoiceSelectionParams( + language_code=self._settings["language"], name=self._voice_id + ) + + streaming_config = texttospeech_v1.StreamingSynthesizeConfig( + voice=voice, + streaming_audio_config=texttospeech_v1.StreamingAudioConfig( + audio_encoding=texttospeech_v1.AudioEncoding.PCM, + sample_rate_hertz=self.sample_rate, + #speaking_rate=self._settings["rate"], + ), + ) + config_request = texttospeech_v1.StreamingSynthesizeRequest( + streaming_config=streaming_config + ) + async def request_generator(): + yield config_request + yield texttospeech_v1.StreamingSynthesizeRequest( + input=texttospeech_v1.StreamingSynthesisInput(text=text) + ) + + streaming_responses = await self._client.streaming_synthesize(request_generator()) + await self.start_tts_usage_metrics(text) + + yield TTSStartedFrame() + + audio_buffer = b"" + CHUNK_SIZE = 1024 + first_chunk_for_ttfb = False + + async for response in streaming_responses: + chunk = response.audio_content + if not chunk: + continue + + if not first_chunk_for_ttfb: + await self.stop_ttfb_metrics() + first_chunk_for_ttfb = True + + audio_buffer += chunk + while len(audio_buffer) >= CHUNK_SIZE: + piece = audio_buffer[:CHUNK_SIZE] + audio_buffer = audio_buffer[CHUNK_SIZE:] + yield TTSAudioRawFrame(piece, self.sample_rate, 1) + await asyncio.sleep(0) + + if audio_buffer: + yield TTSAudioRawFrame(audio_buffer, self.sample_rate, 1) + await asyncio.sleep(0) + + # Add 1 second pause between sentences + silence = b"\x00" * self.sample_rate + yield TTSAudioRawFrame( + audio=silence, sample_rate=self.sample_rate, num_channels=1 + ) + await asyncio.sleep(0) + + yield TTSStoppedFrame() + + except Exception as e: + logger.exception(f"{self} error generating TTS: {e}") + error_message = f"TTS generation error: {str(e)}" + yield ErrorFrame(error=error_message)