From 79ac6969736d3e41a83efabab27eecefb1b202bc Mon Sep 17 00:00:00 2001 From: Pedro Moreira Date: Tue, 4 Feb 2025 13:51:33 -0300 Subject: [PATCH 1/8] Add support for Piper TTS --- src/pipecat/services/piper.py | 103 ++++++++++++++++++++++++++++++++++ test-requirements.txt | 1 + tests/test_piper_tts.py | 101 +++++++++++++++++++++++++++++++++ 3 files changed, 205 insertions(+) create mode 100644 src/pipecat/services/piper.py create mode 100644 tests/test_piper_tts.py diff --git a/src/pipecat/services/piper.py b/src/pipecat/services/piper.py new file mode 100644 index 000000000..ecb831eb7 --- /dev/null +++ b/src/pipecat/services/piper.py @@ -0,0 +1,103 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +from typing import AsyncGenerator + +import aiohttp +from loguru import logger + +from pipecat.frames.frames import ( + ErrorFrame, + Frame, + TTSAudioRawFrame, + TTSStartedFrame, + TTSStoppedFrame, +) +from pipecat.services.ai_services import TTSService + +# This assumes a running TTS service running: https://github.com/rhasspy/piper/blob/master/src/python_run/README_http.md + + +class PiperTTSService(TTSService): + """Piper TTS service implementation. + + Provides integration with Piper's TTS server. + """ + + def __init__( + self, + *, + base_url: str, + aiohttp_session: aiohttp.ClientSession | None = None, + sample_rate: int = 24000, + **kwargs, + ): + """Initialize the PiperTTSService class instance. + + Args: + base_url (str): Base URL of the Piper TTS server (should not end with a slash). + aiohttp_session (aiohttp.ClientSession, optional): Optional aiohttp session to use for requests. Defaults to None. + sample_rate (int, optional): Sample rate in Hz. Defaults to 24000. + **kwargs (dict): Additional keyword arguments. + """ + super().__init__(sample_rate=sample_rate, **kwargs) + if not aiohttp_session: + aiohttp_session = aiohttp.ClientSession() + + if base_url.endswith("/"): + logger.warning("Base URL ends with a slash, this is not allowed.") + base_url = base_url[:-1] + + self._settings = {"base_url": base_url} + self.set_voice("voice_id") + self._aiohttp_session = aiohttp_session + + def can_generate_metrics(self) -> bool: + return True + + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: + logger.debug(f"Generating TTS: [{text}]") + + url = self._settings["base_url"] + "/?text=" + text.replace(".", "").replace("*", "") + + await self.start_ttfb_metrics() + + async with self._aiohttp_session.get(url) as r: + if r.status != 200: + text = await r.text() + logger.error(f"{self} error getting audio (status: {r.status}, error: {text})") + yield ErrorFrame(f"Error getting audio (status: {r.status}, error: {text})") + return + + await self.start_tts_usage_metrics(text) + + yield TTSStartedFrame() + + buffer = bytearray() + async for chunk in r.content.iter_chunked(1024): + if len(chunk) > 0: + await self.stop_ttfb_metrics() + # Append new chunk to the buffer. + buffer.extend(chunk) + + # Check if buffer has enough data for processing. + while ( + len(buffer) >= 48000 + ): # Assuming at least 0.5 seconds of audio data at 24000 Hz + # Process the buffer up to a safe size for resampling. + process_data = buffer[:48000] + # Remove processed data from buffer. + buffer = buffer[48000:] + + frame = TTSAudioRawFrame(process_data, self._sample_rate, 1) + yield frame + + # Process any remaining data in the buffer. + if len(buffer) > 0: + frame = TTSAudioRawFrame(buffer, self._sample_rate, 1) + yield frame + + yield TTSStoppedFrame() diff --git a/test-requirements.txt b/test-requirements.txt index 36e64060b..aae1c2dbb 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -21,6 +21,7 @@ pyaudio~=0.2.14 pydantic~=2.8.2 pyloudnorm~=0.1.1 pyht~=0.1.4 +pytest-aiohttp==1.1.0 python-dotenv~=1.0.1 silero-vad~=5.1 soxr~=0.5.0 diff --git a/tests/test_piper_tts.py b/tests/test_piper_tts.py new file mode 100644 index 000000000..296de7fae --- /dev/null +++ b/tests/test_piper_tts.py @@ -0,0 +1,101 @@ +"""Tests for PiperTTSService.""" + +import asyncio + +import pytest +from aiohttp import web + +from pipecat.frames.frames import ( + ErrorFrame, + TTSAudioRawFrame, + TTSStartedFrame, + TTSStoppedFrame, +) +from pipecat.services.piper import PiperTTSService + + +@pytest.mark.asyncio +async def test_run_piper_tts_success(aiohttp_client): + """Test successful TTS generation with chunked audio data. + + Checks frames for TTSStartedFrame -> TTSAudioRawFrame -> TTSStoppedFrame. + """ + + async def handler(request): + # The service expects a /?text= param + # Here we're just returning dummy chunked bytes to simulate an audio response + text_query = request.rel_url.query.get("text", "") + print(f"Mock server received text param: {text_query}") + + # Prepare a StreamResponse with chunked data + resp = web.StreamResponse( + status=200, + reason="OK", + headers={"Content-Type": "audio/raw"}, + ) + await resp.prepare(request) + + # Write out some chunked byte data + # In reality, you’d return WAV data or similar + data_chunk_1 = b"\x00\x01\x02\x03" * 12000 # 48000 bytes + data_chunk_2 = b"\x04\x05\x06\x07" * 6000 # another chunk + await resp.write(data_chunk_1) + await asyncio.sleep(0.01) # simulate async chunk delay + await resp.write(data_chunk_2) + await resp.write_eof() + + return resp + + # Create an aiohttp test server + app = web.Application() + app.router.add_get("/", handler) + client = await aiohttp_client(app) + + # Remove trailing slash if present in the test URL + base_url = str(client.make_url("")).rstrip("/") + + # Instantiate PiperTTSService with our mock server + tts_service = PiperTTSService(base_url=base_url) + + # Collect frames from the generator + frames = [] + async for frame in tts_service.run_tts("Hello world."): + frames.append(frame) + + # Ensure we received frames in the expected order/types + assert len(frames) >= 3, "Expecting at least TTSStartedFrame, TTSAudioRawFrame, TTSStoppedFrame" + assert isinstance(frames[0], TTSStartedFrame), "First frame must be TTSStartedFrame" + assert isinstance(frames[-1], TTSStoppedFrame), "Last frame must be TTSStoppedFrame" + + # Check we have at least one TTSAudioRawFrame + audio_frames = [f for f in frames if isinstance(f, TTSAudioRawFrame)] + assert len(audio_frames) > 0, "Should have received at least one TTSAudioRawFrame" + for a_frame in audio_frames: + assert a_frame.sample_rate == 24000, "Sample rate should match the default (24000)" + + +@pytest.mark.asyncio +async def test_run_piper_tts_error(aiohttp_client): + """Test how the service handles a non-200 response from the server. + + Expects an ErrorFrame to be returned. + """ + + async def handler(_request): + # Return an error status for any request + return web.Response(status=404, text="Not found") + + app = web.Application() + app.router.add_get("/", handler) + client = await aiohttp_client(app) + base_url = str(client.make_url("")).rstrip("/") + + tts_service = PiperTTSService(base_url=base_url) + + frames = [] + async for frame in tts_service.run_tts("Error case."): + frames.append(frame) + + assert len(frames) == 1, "Should only receive a single ErrorFrame" + assert isinstance(frames[0], ErrorFrame), "Must receive an ErrorFrame for 404" + assert "status: 404" in frames[0].error, "ErrorFrame should contain details about the 404" From c1f6a4e0790d09904c315e437dfce6ecaf3eed89 Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Thu, 27 Mar 2025 07:44:05 -0300 Subject: [PATCH 2/8] Adding PIPER_BASE_URL to the env template. --- dot-env.template | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dot-env.template b/dot-env.template index 2da20fc0b..f0b5bdc0f 100644 --- a/dot-env.template +++ b/dot-env.template @@ -90,3 +90,6 @@ ASSEMBLYAI_API_KEY=... # OpenRouter OPENROUTER_API_KEY=... + +# Piper +PIPER_BASE_URL=... \ No newline at end of file From ca4893397aec7f368f9c49884ac23e1c10907192 Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Thu, 27 Mar 2025 07:44:26 -0300 Subject: [PATCH 3/8] Creating a foundational example which uses the piper service. --- .../foundational/01-say-one-thing-piper.py | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 examples/foundational/01-say-one-thing-piper.py diff --git a/examples/foundational/01-say-one-thing-piper.py b/examples/foundational/01-say-one-thing-piper.py new file mode 100644 index 000000000..256447c23 --- /dev/null +++ b/examples/foundational/01-say-one-thing-piper.py @@ -0,0 +1,57 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import os +import sys + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from runner import configure + +from pipecat.frames.frames import EndFrame, TTSSpeakFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineTask +from pipecat.services.piper import PiperTTSService +from pipecat.transports.services.daily import DailyParams, DailyTransport + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, _) = await configure(session) + + transport = DailyTransport( + room_url, None, "Say One Thing", DailyParams(audio_out_enabled=True) + ) + + tts = PiperTTSService( + base_url=os.getenv("PIPER_BASE_URL"), aiohttp_session=session, sample_rate=24000 + ) + + runner = PipelineRunner() + + task = PipelineTask(Pipeline([tts, transport.output()])) + + # Register an event handler so we can play the audio when the + # participant joins. + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + await task.queue_frames( + [TTSSpeakFrame(f"Hello there, how are you today ?"), EndFrame()] + ) + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) From 053bf72da230193d3a6c4dc14ad91422c884599c Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Thu, 27 Mar 2025 07:44:46 -0300 Subject: [PATCH 4/8] Adding pytest-aiohttp to the dev requirements. --- dev-requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/dev-requirements.txt b/dev-requirements.txt index e65c2755c..af1c35721 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -6,6 +6,7 @@ pre-commit~=4.0.1 pyright~=1.1.397 pytest~=8.3.4 pytest-asyncio~=0.25.3 +pytest-aiohttp==1.1.0 ruff~=0.11.1 setuptools~=70.0.0 setuptools_scm~=8.1.0 From 45787520b2b2b4430b5468d452c763a9bc37cd28 Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Thu, 27 Mar 2025 07:45:28 -0300 Subject: [PATCH 5/8] Refactoring the piper test to use run_test provided by Pipecat --- tests/test_piper_tts.py | 83 ++++++++++++++++++++++++++++------------- 1 file changed, 57 insertions(+), 26 deletions(-) diff --git a/tests/test_piper_tts.py b/tests/test_piper_tts.py index 296de7fae..8db6bf1d1 100644 --- a/tests/test_piper_tts.py +++ b/tests/test_piper_tts.py @@ -2,16 +2,20 @@ import asyncio +import aiohttp import pytest from aiohttp import web from pipecat.frames.frames import ( ErrorFrame, TTSAudioRawFrame, + TTSSpeakFrame, TTSStartedFrame, TTSStoppedFrame, + TTSTextFrame, ) from pipecat.services.piper import PiperTTSService +from pipecat.tests.utils import run_test @pytest.mark.asyncio @@ -37,8 +41,8 @@ async def test_run_piper_tts_success(aiohttp_client): # Write out some chunked byte data # In reality, you’d return WAV data or similar - data_chunk_1 = b"\x00\x01\x02\x03" * 12000 # 48000 bytes - data_chunk_2 = b"\x04\x05\x06\x07" * 6000 # another chunk + data_chunk_1 = b"\x00\x01\x02\x03" * 1024 # 4096 bytes, 04 TTSAudioRawFrame + data_chunk_2 = b"\x04\x05\x06\x07" * 1024 # another chunk await resp.write(data_chunk_1) await asyncio.sleep(0.01) # simulate async chunk delay await resp.write(data_chunk_2) @@ -48,30 +52,43 @@ async def test_run_piper_tts_success(aiohttp_client): # Create an aiohttp test server app = web.Application() - app.router.add_get("/", handler) + app.router.add_post("/", handler) client = await aiohttp_client(app) # Remove trailing slash if present in the test URL base_url = str(client.make_url("")).rstrip("/") - # Instantiate PiperTTSService with our mock server - tts_service = PiperTTSService(base_url=base_url) + async with aiohttp.ClientSession() as session: + # Instantiate PiperTTSService with our mock server + tts_service = PiperTTSService(base_url=base_url, aiohttp_session=session, sample_rate=24000) - # Collect frames from the generator - frames = [] - async for frame in tts_service.run_tts("Hello world."): - frames.append(frame) + frames_to_send = [ + TTSSpeakFrame(text="Hello world."), + ] - # Ensure we received frames in the expected order/types - assert len(frames) >= 3, "Expecting at least TTSStartedFrame, TTSAudioRawFrame, TTSStoppedFrame" - assert isinstance(frames[0], TTSStartedFrame), "First frame must be TTSStartedFrame" - assert isinstance(frames[-1], TTSStoppedFrame), "Last frame must be TTSStoppedFrame" + expected_returned_frames = [ + TTSStartedFrame, + TTSAudioRawFrame, + TTSAudioRawFrame, + TTSAudioRawFrame, + TTSAudioRawFrame, + TTSAudioRawFrame, + TTSAudioRawFrame, + TTSAudioRawFrame, + TTSAudioRawFrame, + TTSStoppedFrame, + TTSTextFrame, + ] - # Check we have at least one TTSAudioRawFrame - audio_frames = [f for f in frames if isinstance(f, TTSAudioRawFrame)] - assert len(audio_frames) > 0, "Should have received at least one TTSAudioRawFrame" - for a_frame in audio_frames: - assert a_frame.sample_rate == 24000, "Sample rate should match the default (24000)" + frames_received = await run_test( + tts_service, + frames_to_send=frames_to_send, + expected_down_frames=expected_returned_frames, + ) + down_frames = frames_received[0] + audio_frames = [f for f in down_frames if isinstance(f, TTSAudioRawFrame)] + for a_frame in audio_frames: + assert a_frame.sample_rate == 24000, "Sample rate should match the default (24000)" @pytest.mark.asyncio @@ -86,16 +103,30 @@ async def test_run_piper_tts_error(aiohttp_client): return web.Response(status=404, text="Not found") app = web.Application() - app.router.add_get("/", handler) + app.router.add_post("/", handler) client = await aiohttp_client(app) base_url = str(client.make_url("")).rstrip("/") - tts_service = PiperTTSService(base_url=base_url) + async with aiohttp.ClientSession() as session: + tts_service = PiperTTSService(base_url=base_url, aiohttp_session=session, sample_rate=24000) - frames = [] - async for frame in tts_service.run_tts("Error case."): - frames.append(frame) + frames_to_send = [ + TTSSpeakFrame(text="Error case."), + ] - assert len(frames) == 1, "Should only receive a single ErrorFrame" - assert isinstance(frames[0], ErrorFrame), "Must receive an ErrorFrame for 404" - assert "status: 404" in frames[0].error, "ErrorFrame should contain details about the 404" + expected_down_frames = [TTSStoppedFrame, TTSTextFrame] + + expected_up_frames = [ErrorFrame] + + frames_received = await run_test( + tts_service, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + expected_up_frames=expected_up_frames, + ) + up_frames = frames_received[1] + + assert isinstance(up_frames[0], ErrorFrame), "Must receive an ErrorFrame for 404" + assert "status: 404" in up_frames[0].error, ( + "ErrorFrame should contain details about the 404" + ) From b348fde32b3972b87538238ff445d797fb4d9ca1 Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Thu, 27 Mar 2025 07:46:38 -0300 Subject: [PATCH 6/8] Refactoring PiperTTSService to match the others TTS services provided by Pipecat and fixing noise issue due to wav header. --- src/pipecat/services/piper.py | 103 +++++++++++++++++----------------- 1 file changed, 51 insertions(+), 52 deletions(-) diff --git a/src/pipecat/services/piper.py b/src/pipecat/services/piper.py index ecb831eb7..ad71a7d6c 100644 --- a/src/pipecat/services/piper.py +++ b/src/pipecat/services/piper.py @@ -4,7 +4,7 @@ # SPDX-License-Identifier: BSD 2-Clause License # -from typing import AsyncGenerator +from typing import AsyncGenerator, Optional import aiohttp from loguru import logger @@ -19,85 +19,84 @@ from pipecat.frames.frames import ( from pipecat.services.ai_services import TTSService # This assumes a running TTS service running: https://github.com/rhasspy/piper/blob/master/src/python_run/README_http.md - - class PiperTTSService(TTSService): """Piper TTS service implementation. Provides integration with Piper's TTS server. + + Args: + base_url: API base URL + aiohttp_session: aiohttp ClientSession + sample_rate: Output sample rate """ def __init__( self, *, base_url: str, - aiohttp_session: aiohttp.ClientSession | None = None, - sample_rate: int = 24000, + aiohttp_session: aiohttp.ClientSession, + # When using Piper, the sample rate of the generated audio depends on the + # voice model being used. + sample_rate: Optional[int] = None, **kwargs, ): - """Initialize the PiperTTSService class instance. - - Args: - base_url (str): Base URL of the Piper TTS server (should not end with a slash). - aiohttp_session (aiohttp.ClientSession, optional): Optional aiohttp session to use for requests. Defaults to None. - sample_rate (int, optional): Sample rate in Hz. Defaults to 24000. - **kwargs (dict): Additional keyword arguments. - """ super().__init__(sample_rate=sample_rate, **kwargs) - if not aiohttp_session: - aiohttp_session = aiohttp.ClientSession() if base_url.endswith("/"): logger.warning("Base URL ends with a slash, this is not allowed.") base_url = base_url[:-1] + self._base_url = base_url + self._session = aiohttp_session self._settings = {"base_url": base_url} - self.set_voice("voice_id") - self._aiohttp_session = aiohttp_session def can_generate_metrics(self) -> bool: return True async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: - logger.debug(f"Generating TTS: [{text}]") + """Generate speech from text using Piper API. - url = self._settings["base_url"] + "/?text=" + text.replace(".", "").replace("*", "") + Args: + text: The text to convert to speech - await self.start_ttfb_metrics() + Yields: + Frames containing audio data and status information + """ + logger.debug(f"{self}: Generating TTS [{text}]") + headers = { + "Content-Type": "text/plain", + } + try: + await self.start_ttfb_metrics() - async with self._aiohttp_session.get(url) as r: - if r.status != 200: - text = await r.text() - logger.error(f"{self} error getting audio (status: {r.status}, error: {text})") - yield ErrorFrame(f"Error getting audio (status: {r.status}, error: {text})") - return + async with self._session.post(self._base_url, data=text, headers=headers) as response: + if response.status != 200: + eror = await response.text() + logger.error( + f"{self} error getting audio (status: {response.status}, error: {eror})" + ) + yield ErrorFrame( + f"Error getting audio (status: {response.status}, error: {eror})" + ) + return - await self.start_tts_usage_metrics(text) + await self.start_tts_usage_metrics(text) - yield TTSStartedFrame() - - buffer = bytearray() - async for chunk in r.content.iter_chunked(1024): - if len(chunk) > 0: - await self.stop_ttfb_metrics() - # Append new chunk to the buffer. - buffer.extend(chunk) - - # Check if buffer has enough data for processing. - while ( - len(buffer) >= 48000 - ): # Assuming at least 0.5 seconds of audio data at 24000 Hz - # Process the buffer up to a safe size for resampling. - process_data = buffer[:48000] - # Remove processed data from buffer. - buffer = buffer[48000:] - - frame = TTSAudioRawFrame(process_data, self._sample_rate, 1) - yield frame - - # Process any remaining data in the buffer. - if len(buffer) > 0: - frame = TTSAudioRawFrame(buffer, self._sample_rate, 1) - yield frame + # Process the streaming response + CHUNK_SIZE = 1024 + yield TTSStartedFrame() + async for chunk in response.content.iter_chunked(CHUNK_SIZE): + # remove wav header if present + if chunk.startswith(b"RIFF"): + chunk = chunk[44:] + if len(chunk) > 0: + await self.stop_ttfb_metrics() + yield TTSAudioRawFrame(chunk, self.sample_rate, 1) + except Exception as e: + logger.error(f"Error in run_tts: {e}") + yield ErrorFrame(error=str(e)) + finally: + logger.debug(f"{self}: Finished TTS [{text}]") + await self.stop_ttfb_metrics() yield TTSStoppedFrame() From 50515aa8426c2e868c0044edb36a80d180a2d054 Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Thu, 27 Mar 2025 07:50:47 -0300 Subject: [PATCH 7/8] Adding PiperTTSService to the changelog. --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a93901bd1..5448399d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,13 @@ All notable changes to **Pipecat** will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added + +- Added support for a new TTS service, `PiperTTSService`. + (see https://github.com/rhasspy/piper/) + ## [0.0.61] - 2025-03-26 ### Added From a82b84797179dd4e84b53cf27efc86e941857f24 Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Thu, 27 Mar 2025 07:58:53 -0300 Subject: [PATCH 8/8] Fixing ruff format. --- src/pipecat/services/piper.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/pipecat/services/piper.py b/src/pipecat/services/piper.py index ad71a7d6c..12a936889 100644 --- a/src/pipecat/services/piper.py +++ b/src/pipecat/services/piper.py @@ -18,6 +18,7 @@ from pipecat.frames.frames import ( ) from pipecat.services.ai_services import TTSService + # This assumes a running TTS service running: https://github.com/rhasspy/piper/blob/master/src/python_run/README_http.md class PiperTTSService(TTSService): """Piper TTS service implementation.