From 79ac6969736d3e41a83efabab27eecefb1b202bc Mon Sep 17 00:00:00 2001 From: Pedro Moreira Date: Tue, 4 Feb 2025 13:51:33 -0300 Subject: [PATCH 01/37] Add support for Piper TTS --- src/pipecat/services/piper.py | 103 ++++++++++++++++++++++++++++++++++ test-requirements.txt | 1 + tests/test_piper_tts.py | 101 +++++++++++++++++++++++++++++++++ 3 files changed, 205 insertions(+) create mode 100644 src/pipecat/services/piper.py create mode 100644 tests/test_piper_tts.py diff --git a/src/pipecat/services/piper.py b/src/pipecat/services/piper.py new file mode 100644 index 000000000..ecb831eb7 --- /dev/null +++ b/src/pipecat/services/piper.py @@ -0,0 +1,103 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +from typing import AsyncGenerator + +import aiohttp +from loguru import logger + +from pipecat.frames.frames import ( + ErrorFrame, + Frame, + TTSAudioRawFrame, + TTSStartedFrame, + TTSStoppedFrame, +) +from pipecat.services.ai_services import TTSService + +# This assumes a running TTS service running: https://github.com/rhasspy/piper/blob/master/src/python_run/README_http.md + + +class PiperTTSService(TTSService): + """Piper TTS service implementation. + + Provides integration with Piper's TTS server. + """ + + def __init__( + self, + *, + base_url: str, + aiohttp_session: aiohttp.ClientSession | None = None, + sample_rate: int = 24000, + **kwargs, + ): + """Initialize the PiperTTSService class instance. + + Args: + base_url (str): Base URL of the Piper TTS server (should not end with a slash). + aiohttp_session (aiohttp.ClientSession, optional): Optional aiohttp session to use for requests. Defaults to None. + sample_rate (int, optional): Sample rate in Hz. Defaults to 24000. + **kwargs (dict): Additional keyword arguments. + """ + super().__init__(sample_rate=sample_rate, **kwargs) + if not aiohttp_session: + aiohttp_session = aiohttp.ClientSession() + + if base_url.endswith("/"): + logger.warning("Base URL ends with a slash, this is not allowed.") + base_url = base_url[:-1] + + self._settings = {"base_url": base_url} + self.set_voice("voice_id") + self._aiohttp_session = aiohttp_session + + def can_generate_metrics(self) -> bool: + return True + + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: + logger.debug(f"Generating TTS: [{text}]") + + url = self._settings["base_url"] + "/?text=" + text.replace(".", "").replace("*", "") + + await self.start_ttfb_metrics() + + async with self._aiohttp_session.get(url) as r: + if r.status != 200: + text = await r.text() + logger.error(f"{self} error getting audio (status: {r.status}, error: {text})") + yield ErrorFrame(f"Error getting audio (status: {r.status}, error: {text})") + return + + await self.start_tts_usage_metrics(text) + + yield TTSStartedFrame() + + buffer = bytearray() + async for chunk in r.content.iter_chunked(1024): + if len(chunk) > 0: + await self.stop_ttfb_metrics() + # Append new chunk to the buffer. + buffer.extend(chunk) + + # Check if buffer has enough data for processing. + while ( + len(buffer) >= 48000 + ): # Assuming at least 0.5 seconds of audio data at 24000 Hz + # Process the buffer up to a safe size for resampling. + process_data = buffer[:48000] + # Remove processed data from buffer. + buffer = buffer[48000:] + + frame = TTSAudioRawFrame(process_data, self._sample_rate, 1) + yield frame + + # Process any remaining data in the buffer. + if len(buffer) > 0: + frame = TTSAudioRawFrame(buffer, self._sample_rate, 1) + yield frame + + yield TTSStoppedFrame() diff --git a/test-requirements.txt b/test-requirements.txt index 36e64060b..aae1c2dbb 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -21,6 +21,7 @@ pyaudio~=0.2.14 pydantic~=2.8.2 pyloudnorm~=0.1.1 pyht~=0.1.4 +pytest-aiohttp==1.1.0 python-dotenv~=1.0.1 silero-vad~=5.1 soxr~=0.5.0 diff --git a/tests/test_piper_tts.py b/tests/test_piper_tts.py new file mode 100644 index 000000000..296de7fae --- /dev/null +++ b/tests/test_piper_tts.py @@ -0,0 +1,101 @@ +"""Tests for PiperTTSService.""" + +import asyncio + +import pytest +from aiohttp import web + +from pipecat.frames.frames import ( + ErrorFrame, + TTSAudioRawFrame, + TTSStartedFrame, + TTSStoppedFrame, +) +from pipecat.services.piper import PiperTTSService + + +@pytest.mark.asyncio +async def test_run_piper_tts_success(aiohttp_client): + """Test successful TTS generation with chunked audio data. + + Checks frames for TTSStartedFrame -> TTSAudioRawFrame -> TTSStoppedFrame. + """ + + async def handler(request): + # The service expects a /?text= param + # Here we're just returning dummy chunked bytes to simulate an audio response + text_query = request.rel_url.query.get("text", "") + print(f"Mock server received text param: {text_query}") + + # Prepare a StreamResponse with chunked data + resp = web.StreamResponse( + status=200, + reason="OK", + headers={"Content-Type": "audio/raw"}, + ) + await resp.prepare(request) + + # Write out some chunked byte data + # In reality, you’d return WAV data or similar + data_chunk_1 = b"\x00\x01\x02\x03" * 12000 # 48000 bytes + data_chunk_2 = b"\x04\x05\x06\x07" * 6000 # another chunk + await resp.write(data_chunk_1) + await asyncio.sleep(0.01) # simulate async chunk delay + await resp.write(data_chunk_2) + await resp.write_eof() + + return resp + + # Create an aiohttp test server + app = web.Application() + app.router.add_get("/", handler) + client = await aiohttp_client(app) + + # Remove trailing slash if present in the test URL + base_url = str(client.make_url("")).rstrip("/") + + # Instantiate PiperTTSService with our mock server + tts_service = PiperTTSService(base_url=base_url) + + # Collect frames from the generator + frames = [] + async for frame in tts_service.run_tts("Hello world."): + frames.append(frame) + + # Ensure we received frames in the expected order/types + assert len(frames) >= 3, "Expecting at least TTSStartedFrame, TTSAudioRawFrame, TTSStoppedFrame" + assert isinstance(frames[0], TTSStartedFrame), "First frame must be TTSStartedFrame" + assert isinstance(frames[-1], TTSStoppedFrame), "Last frame must be TTSStoppedFrame" + + # Check we have at least one TTSAudioRawFrame + audio_frames = [f for f in frames if isinstance(f, TTSAudioRawFrame)] + assert len(audio_frames) > 0, "Should have received at least one TTSAudioRawFrame" + for a_frame in audio_frames: + assert a_frame.sample_rate == 24000, "Sample rate should match the default (24000)" + + +@pytest.mark.asyncio +async def test_run_piper_tts_error(aiohttp_client): + """Test how the service handles a non-200 response from the server. + + Expects an ErrorFrame to be returned. + """ + + async def handler(_request): + # Return an error status for any request + return web.Response(status=404, text="Not found") + + app = web.Application() + app.router.add_get("/", handler) + client = await aiohttp_client(app) + base_url = str(client.make_url("")).rstrip("/") + + tts_service = PiperTTSService(base_url=base_url) + + frames = [] + async for frame in tts_service.run_tts("Error case."): + frames.append(frame) + + assert len(frames) == 1, "Should only receive a single ErrorFrame" + assert isinstance(frames[0], ErrorFrame), "Must receive an ErrorFrame for 404" + assert "status: 404" in frames[0].error, "ErrorFrame should contain details about the 404" From 2e1a18503b9da4c5cee86c188483ce39e6701811 Mon Sep 17 00:00:00 2001 From: balalo Date: Tue, 18 Mar 2025 10:41:43 +0100 Subject: [PATCH 02/37] Set tool choice from context aggregator --- src/pipecat/frames/frames.py | 6 ++++++ .../processors/aggregators/llm_response.py | 18 +++++++++++++++++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 74dd2accb..aec433033 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -362,6 +362,12 @@ class LLMSetToolsFrame(DataFrame): tools: List[dict] +@dataclass +class LLMSetToolChoiceFrame(DataFrame): + """A frame containing a tool choice for an LLM to use for function calling.""" + + tool_choice: Literal["none", "auto", "required"] + @dataclass class LLMEnablePromptCachingFrame(DataFrame): diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index d8582e32f..620e34b2c 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -7,7 +7,7 @@ import asyncio import time from abc import abstractmethod -from typing import List +from typing import List, Literal from pipecat.frames.frames import ( CancelFrame, @@ -22,6 +22,7 @@ from pipecat.frames.frames import ( LLMMessagesFrame, LLMMessagesUpdateFrame, LLMSetToolsFrame, + LLMSetToolChoiceFrame, LLMTextFrame, StartFrame, StartInterruptionFrame, @@ -132,6 +133,11 @@ class BaseLLMResponseAggregator(FrameProcessor): """Set LLM tools to be used in the current conversation.""" pass + @abstractmethod + def set_tool_choice(self, tool_choice): + """Set the tool choice. This should modify the LLM context.""" + pass + @abstractmethod def reset(self): """Reset the internals of this aggregator. This should not modify the @@ -185,6 +191,9 @@ class LLMResponseAggregator(BaseLLMResponseAggregator): def set_tools(self, tools): pass + def set_tool_choice(self, tool_choice): + pass + def reset(self): self._aggregation = "" @@ -244,6 +253,9 @@ class LLMContextResponseAggregator(BaseLLMResponseAggregator): def set_tools(self, tools: List): self._context.set_tools(tools) + def set_tool_choice(self, tool_choice: Literal["none", "auto", "required"]): + self._context.set_tool_choice(tool_choice) + def reset(self): self._aggregation = "" @@ -328,6 +340,8 @@ class LLMUserContextAggregator(LLMContextResponseAggregator): self.set_messages(frame.messages) elif isinstance(frame, LLMSetToolsFrame): self.set_tools(frame.tools) + elif isinstance(frame, LLMSetToolChoiceFrame): + self.set_tool_choice(frame.tool_choice) else: await self.push_frame(frame, direction) @@ -448,6 +462,8 @@ class LLMAssistantContextAggregator(LLMContextResponseAggregator): self.set_messages(frame.messages) elif isinstance(frame, LLMSetToolsFrame): self.set_tools(frame.tools) + elif isinstance(frame, LLMSetToolChoiceFrame): + self.set_tool_choice(frame.tool_choice) else: await self.push_frame(frame, direction) From 1c19777d5eda4cf237f83d8dcf3dc82c4f2a4540 Mon Sep 17 00:00:00 2001 From: balalo Date: Tue, 18 Mar 2025 11:09:40 +0100 Subject: [PATCH 03/37] Fix format --- src/pipecat/frames/frames.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index aec433033..7b07a331e 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -362,6 +362,7 @@ class LLMSetToolsFrame(DataFrame): tools: List[dict] + @dataclass class LLMSetToolChoiceFrame(DataFrame): """A frame containing a tool choice for an LLM to use for function calling.""" From dc5067407d6b2136e70b411e070cf774bdfea1e0 Mon Sep 17 00:00:00 2001 From: balalo Date: Tue, 18 Mar 2025 11:12:51 +0100 Subject: [PATCH 04/37] Fix ruff check --- src/pipecat/processors/aggregators/llm_response.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index 620e34b2c..9c18e67ca 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -21,8 +21,8 @@ from pipecat.frames.frames import ( LLMMessagesAppendFrame, LLMMessagesFrame, LLMMessagesUpdateFrame, - LLMSetToolsFrame, LLMSetToolChoiceFrame, + LLMSetToolsFrame, LLMTextFrame, StartFrame, StartInterruptionFrame, From 48b6850df43d8bf6f74f72845f85188a0e2e153a Mon Sep 17 00:00:00 2001 From: balalo Date: Tue, 18 Mar 2025 20:45:31 +0100 Subject: [PATCH 05/37] allow other function names --- src/pipecat/frames/frames.py | 2 +- src/pipecat/processors/aggregators/llm_response.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 7b07a331e..734f0ea6a 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -367,7 +367,7 @@ class LLMSetToolsFrame(DataFrame): class LLMSetToolChoiceFrame(DataFrame): """A frame containing a tool choice for an LLM to use for function calling.""" - tool_choice: Literal["none", "auto", "required"] + tool_choice: Literal["none", "auto", "required"] | str @dataclass diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index 9c18e67ca..44d41f535 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -253,7 +253,7 @@ class LLMContextResponseAggregator(BaseLLMResponseAggregator): def set_tools(self, tools: List): self._context.set_tools(tools) - def set_tool_choice(self, tool_choice: Literal["none", "auto", "required"]): + def set_tool_choice(self, tool_choice: Literal["none", "auto", "required"] | str): self._context.set_tool_choice(tool_choice) def reset(self): From c99436b80eecd7e44be7804e0be16ddbbceeaf9f Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Tue, 25 Mar 2025 17:26:31 -0400 Subject: [PATCH 06/37] Bump daily-python dependency to 0.16.0 to pick up support in `DailyTransport` for updating remote participants' `canReceive` permission via the `update_remote_participants()` method --- CHANGELOG.md | 4 ++++ pyproject.toml | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c06240bc..928ba88fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added support in `DailyTransport` for updating remote participants' + `canReceive` permission via the `update_remote_participants()` method, by + bumping the daily-python dependency to >= 0.16.0. + - ElevenLabs TTS services now support a sample rate of 8000. ### Fixed diff --git a/pyproject.toml b/pyproject.toml index 0d1e46ed9..d2fa46dcf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,7 +48,7 @@ cartesia = [ "cartesia~=1.4.0", "websockets~=13.1" ] neuphonic = [ "pyneuphonic~=1.5.13", "websockets~=13.1" ] cerebras = [] deepseek = [] -daily = [ "daily-python~=0.15.0" ] +daily = [ "daily-python~=0.16.0" ] deepgram = [ "deepgram-sdk~=3.8.0" ] elevenlabs = [ "websockets~=13.1" ] fal = [ "fal-client~=0.5.9" ] From 01458895c2b35e6d3af49def81159cab1d7aca30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Mon, 24 Mar 2025 15:39:35 -0700 Subject: [PATCH 07/37] LLMAssistantContextAggregator: create a task to run on_context_updated --- CHANGELOG.md | 3 +++ .../processors/aggregators/llm_response.py | 20 +++++++++++++++---- src/pipecat/processors/frame_processor.py | 7 +++++-- src/pipecat/services/ai_services.py | 2 +- 4 files changed, 25 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c06240bc..d7c8c5dd8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed an issue that would cause `LLMAssistantContextAggregator` to block + processing more frames while processing a function call result. + - Fixed an issue where the `RTVIObserver` would report two bot started and stopped speaking events for each bot turn. diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index 75435a214..7e84f6376 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -6,7 +6,7 @@ import asyncio from abc import abstractmethod -from typing import Dict, List +from typing import Dict, List, Set from loguru import logger @@ -380,6 +380,7 @@ class LLMAssistantContextAggregator(LLMContextResponseAggregator): self._started = 0 self._function_calls_in_progress: Dict[str, FunctionCallInProgressFrame] = {} + self._context_updated_tasks: Set[asyncio.Task] = set() async def handle_aggregation(self, aggregation: str): self._context.add_message({"role": "assistant", "content": aggregation}) @@ -486,10 +487,14 @@ class LLMAssistantContextAggregator(LLMContextResponseAggregator): if run_llm: await self.push_context_frame(FrameDirection.UPSTREAM) - # Emit the on_context_updated callback once the function call - # result is added to the context + # Call the `on_context_updated` callback once the function call result + # is added to the context. Also, run this in a separate task to make + # sure we don't block the pipeline. if properties and properties.on_context_updated: - await properties.on_context_updated() + task_name = f"{frame.function_name}:{frame.tool_call_id}:on_context_updated" + task = self.create_task(properties.on_context_updated(), task_name) + self._context_updated_tasks.add(task) + task.add_done_callback(self._context_updated_task_finished) async def _handle_function_call_cancel(self, frame: FunctionCallCancelFrame): logger.debug( @@ -535,6 +540,13 @@ class LLMAssistantContextAggregator(LLMContextResponseAggregator): else: self._aggregation += frame.text + def _context_updated_task_finished(self, task: asyncio.Task): + self._context_updated_tasks.discard(task) + # The task is finished so this should exit immediately. We need to do + # this because otherwise the task manager would report a dangling task + # if we don't remove it. + asyncio.run_coroutine_threadsafe(self.wait_for_task(task), self.get_event_loop()) + class LLMUserResponseAggregator(LLMUserContextAggregator): def __init__(self, messages: List[dict] = [], **kwargs): diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 847cdf175..590698e7f 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -147,10 +147,13 @@ class FrameProcessor(BaseObject): await self.stop_ttfb_metrics() await self.stop_processing_metrics() - def create_task(self, coroutine: Coroutine) -> asyncio.Task: + def create_task(self, coroutine: Coroutine, name: Optional[str] = None) -> asyncio.Task: if not self._task_manager: raise Exception(f"{self} TaskManager is still not initialized.") - name = f"{self}::{coroutine.cr_code.co_name}" + if name: + name = f"{self}::{name}" + else: + name = f"{self}::{coroutine.cr_code.co_name}" return self._task_manager.create_task(coroutine, name) async def cancel_task(self, task: asyncio.Task, timeout: Optional[float] = None): diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index 9f9804e65..a78c268dd 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -369,7 +369,7 @@ class LLMService(AIService): if tuple_to_remove: self._function_call_tasks.discard(tuple_to_remove) # The task is finished so this should exit immediately. We need to - # do this because otherwise the task manager would have a dangling + # do this because otherwise the task manager would report a dangling # task if we don't remove it. asyncio.run_coroutine_threadsafe(self.wait_for_task(task), self.get_event_loop()) From 8aebf00c2d9006ac207165172e15f98a56f95e61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 25 Mar 2025 14:40:46 -0700 Subject: [PATCH 08/37] GoogleAssistantContextAggregator: function call result should be a JSON object --- CHANGELOG.md | 4 ++++ src/pipecat/frames/frames.py | 6 +++--- src/pipecat/services/anthropic.py | 2 +- src/pipecat/services/google/google.py | 24 ++++++++++-------------- src/pipecat/services/openai.py | 2 +- 5 files changed, 19 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d7c8c5dd8..f75d4eab6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed a `GoogleAssistantContextAggregator` issue where function calls + placeholders where not being updated when then function call result was + different from a string. + - Fixed an issue that would cause `LLMAssistantContextAggregator` to block processing more frames while processing a function call result. diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index c2a79461f..6452cbfe4 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -384,7 +384,7 @@ class FunctionCallResultFrame(DataFrame): function_name: str tool_call_id: str - arguments: str + arguments: Any result: Any properties: Optional[FunctionCallResultProperties] = None @@ -633,8 +633,8 @@ class FunctionCallInProgressFrame(SystemFrame): function_name: str tool_call_id: str - arguments: str - cancel_on_interruption: bool + arguments: Any + cancel_on_interruption: bool = False @dataclass diff --git a/src/pipecat/services/anthropic.py b/src/pipecat/services/anthropic.py index 6a95d04e2..3e369075a 100644 --- a/src/pipecat/services/anthropic.py +++ b/src/pipecat/services/anthropic.py @@ -725,7 +725,7 @@ class AnthropicAssistantContextAggregator(LLMAssistantContextAggregator): ) async def _update_function_call_result( - self, function_name: str, tool_call_id: str, result: str + self, function_name: str, tool_call_id: str, result: Any ): for message in self._context.messages: if message["role"] == "user": diff --git a/src/pipecat/services/google/google.py b/src/pipecat/services/google/google.py index bfddce46d..554d9cb6b 100644 --- a/src/pipecat/services/google/google.py +++ b/src/pipecat/services/google/google.py @@ -601,23 +601,18 @@ class GoogleAssistantContextAggregator(OpenAIAssistantContextAggregator): async def handle_function_call_result(self, frame: FunctionCallResultFrame): if frame.result: - if not isinstance(frame.result, str): - return - - response = {"response": frame.result} - + await self._update_function_call_result( + frame.function_name, frame.tool_call_id, frame.result + ) + else: + response = {"response": "COMPLETED"} await self._update_function_call_result( frame.function_name, frame.tool_call_id, response ) - else: - await self._update_function_call_result( - frame.function_name, frame.tool_call_id, "COMPLETED" - ) async def handle_function_call_cancel(self, frame: FunctionCallCancelFrame): - await self._update_function_call_result( - frame.function_name, frame.tool_call_id, "CANCELLED" - ) + response = {"response": "CANCELLED"} + await self._update_function_call_result(frame.function_name, frame.tool_call_id, response) async def _update_function_call_result( self, function_name: str, tool_call_id: str, result: Any @@ -626,11 +621,12 @@ class GoogleAssistantContextAggregator(OpenAIAssistantContextAggregator): if message.role == "user": for part in message.parts: if part.function_response and part.function_response.id == tool_call_id: - part.function_response.response = {"response": result} + part.function_response.response = result async def handle_user_image_frame(self, frame: UserImageRawFrame): + response = {"response": "COMPLETED"} await self._update_function_call_result( - frame.request.function_name, frame.request.tool_call_id, "COMPLETED" + frame.request.function_name, frame.request.tool_call_id, response ) self._context.add_image_frame_message( format=frame.format, diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index ff7bc0442..cb1edea72 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -613,7 +613,7 @@ class OpenAIAssistantContextAggregator(LLMAssistantContextAggregator): ) async def _update_function_call_result( - self, function_name: str, tool_call_id: str, result: str + self, function_name: str, tool_call_id: str, result: Any ): for message in self._context.messages: if ( From 19b464ba23691c939ffd06f8f7704baef753e2fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 25 Mar 2025 14:41:33 -0700 Subject: [PATCH 09/37] tests: add assistant aggregator function call frame handling --- tests/test_context_aggregators.py | 97 ++++++++++++++++++++++++++----- 1 file changed, 84 insertions(+), 13 deletions(-) diff --git a/tests/test_context_aggregators.py b/tests/test_context_aggregators.py index 185725632..baee3496f 100644 --- a/tests/test_context_aggregators.py +++ b/tests/test_context_aggregators.py @@ -4,13 +4,18 @@ # SPDX-License-Identifier: BSD 2-Clause License # +import json import unittest +from typing import Any import google.ai.generativelanguage as glm from pipecat.frames.frames import ( EmulateUserStartedSpeakingFrame, EmulateUserStoppedSpeakingFrame, + FunctionCallInProgressFrame, + FunctionCallResultFrame, + FunctionCallResultProperties, InterimTranscriptionFrame, LLMFullResponseEndFrame, LLMFullResponseStartFrame, @@ -21,10 +26,7 @@ from pipecat.frames.frames import ( UserStartedSpeakingFrame, UserStoppedSpeakingFrame, ) -from pipecat.processors.aggregators.llm_response import ( - LLMAssistantContextAggregator, - LLMUserContextAggregator, -) +from pipecat.processors.aggregators.llm_response import LLMUserContextAggregator from pipecat.processors.aggregators.openai_llm_context import ( OpenAILLMContext, OpenAILLMContextFrame, @@ -423,6 +425,9 @@ class BaseTestAssistantContextAggreagator: ): assert context.messages[index]["content"] == content + def check_function_call_result(self, context: OpenAILLMContext, index: int, content: str): + assert json.loads(context.messages[index]["content"]) == content + async def test_empty(self): assert self.CONTEXT_CLASS is not None, "CONTEXT_CLASS must be set in a subclass" assert self.AGGREGATOR_CLASS is not None, "AGGREGATOR_CLASS must be set in a subclass" @@ -556,9 +561,76 @@ class BaseTestAssistantContextAggreagator: self.check_message_multi_content(context, 0, 0, "Hello Pipecat.") self.check_message_multi_content(context, 0, 1, "How are you?") + async def test_function_call(self): + assert self.CONTEXT_CLASS is not None, "CONTEXT_CLASS must be set in a subclass" + assert self.AGGREGATOR_CLASS is not None, "AGGREGATOR_CLASS must be set in a subclass" + + context = self.CONTEXT_CLASS() + aggregator = self.AGGREGATOR_CLASS(context) + frames_to_send = [ + FunctionCallInProgressFrame( + function_name="get_weather", + tool_call_id="1", + arguments={"location": "Los Angeles"}, + cancel_on_interruption=False, + ), + SleepFrame(), + FunctionCallResultFrame( + function_name="get_weather", + tool_call_id="1", + arguments={"location": "Los Angeles"}, + result={"conditions": "Sunny"}, + ), + ] + expected_down_frames = [] + await run_test( + aggregator, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + ) + self.check_function_call_result(context, -1, {"conditions": "Sunny"}) + + async def test_function_call_on_context_updated(self): + assert self.CONTEXT_CLASS is not None, "CONTEXT_CLASS must be set in a subclass" + assert self.AGGREGATOR_CLASS is not None, "AGGREGATOR_CLASS must be set in a subclass" + + context_updated = False + + async def on_context_updated(): + nonlocal context_updated + context_updated = True + + context = self.CONTEXT_CLASS() + aggregator = self.AGGREGATOR_CLASS(context) + frames_to_send = [ + FunctionCallInProgressFrame( + function_name="get_weather", + tool_call_id="1", + arguments={"location": "Los Angeles"}, + cancel_on_interruption=False, + ), + SleepFrame(), + FunctionCallResultFrame( + function_name="get_weather", + tool_call_id="1", + arguments={"location": "Los Angeles"}, + result={"conditions": "Sunny"}, + properties=FunctionCallResultProperties(on_context_updated=on_context_updated), + ), + SleepFrame(), + ] + expected_down_frames = [] + await run_test( + aggregator, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + ) + self.check_function_call_result(context, -1, {"conditions": "Sunny"}) + assert context_updated + # -# LLMUserContextAggregator, LLMAssistantContextAggregator +# LLMUserContextAggregator # @@ -567,14 +639,6 @@ class TestLLMUserContextAggregator(BaseTestUserContextAggregator, unittest.Isola AGGREGATOR_CLASS = LLMUserContextAggregator -class TestLLMAssistantContextAggregator( - BaseTestAssistantContextAggreagator, unittest.IsolatedAsyncioTestCase -): - CONTEXT_CLASS = OpenAILLMContext - AGGREGATOR_CLASS = LLMAssistantContextAggregator - EXPECTED_CONTEXT_FRAMES = [OpenAILLMContextFrame, OpenAILLMContextAssistantTimestampFrame] - - # # OpenAI # @@ -626,6 +690,9 @@ class TestAnthropicAssistantContextAggregator( messages = context.messages[content_index] assert messages["content"][index]["text"] == content + def check_function_call_result(self, context: OpenAILLMContext, index: int, content: Any): + assert context.messages[index]["content"][0]["content"] == json.dumps(content) + # # Google @@ -665,3 +732,7 @@ class TestGoogleAssistantContextAggregator( ): obj = glm.Content.to_dict(context.messages[index]) assert obj["parts"][0]["text"] == content + + def check_function_call_result(self, context: OpenAILLMContext, index: int, content: Any): + obj = glm.Content.to_dict(context.messages[index]) + assert obj["parts"][0]["function_response"]["response"] == content From 077952b6584d6fc97971c76447ddd09c844623ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 25 Mar 2025 19:11:27 -0700 Subject: [PATCH 10/37] GoogleAssistantContextAggregator: allow any value as function call result --- src/pipecat/services/google/google.py | 13 ++++++------- tests/test_context_aggregators.py | 2 +- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/pipecat/services/google/google.py b/src/pipecat/services/google/google.py index 554d9cb6b..3d207a254 100644 --- a/src/pipecat/services/google/google.py +++ b/src/pipecat/services/google/google.py @@ -605,14 +605,14 @@ class GoogleAssistantContextAggregator(OpenAIAssistantContextAggregator): frame.function_name, frame.tool_call_id, frame.result ) else: - response = {"response": "COMPLETED"} await self._update_function_call_result( - frame.function_name, frame.tool_call_id, response + frame.function_name, frame.tool_call_id, "COMPLETED" ) async def handle_function_call_cancel(self, frame: FunctionCallCancelFrame): - response = {"response": "CANCELLED"} - await self._update_function_call_result(frame.function_name, frame.tool_call_id, response) + await self._update_function_call_result( + frame.function_name, frame.tool_call_id, "CANCELLED" + ) async def _update_function_call_result( self, function_name: str, tool_call_id: str, result: Any @@ -621,12 +621,11 @@ class GoogleAssistantContextAggregator(OpenAIAssistantContextAggregator): if message.role == "user": for part in message.parts: if part.function_response and part.function_response.id == tool_call_id: - part.function_response.response = result + part.function_response.response = {"value": json.dumps(result)} async def handle_user_image_frame(self, frame: UserImageRawFrame): - response = {"response": "COMPLETED"} await self._update_function_call_result( - frame.request.function_name, frame.request.tool_call_id, response + frame.request.function_name, frame.request.tool_call_id, "COMPLETED" ) self._context.add_image_frame_message( format=frame.format, diff --git a/tests/test_context_aggregators.py b/tests/test_context_aggregators.py index baee3496f..df00f11e4 100644 --- a/tests/test_context_aggregators.py +++ b/tests/test_context_aggregators.py @@ -735,4 +735,4 @@ class TestGoogleAssistantContextAggregator( def check_function_call_result(self, context: OpenAILLMContext, index: int, content: Any): obj = glm.Content.to_dict(context.messages[index]) - assert obj["parts"][0]["function_response"]["response"] == content + assert obj["parts"][0]["function_response"]["response"]["value"] == json.dumps(content) From ce9f75a8515e8741075abb87ac05611543f47c4b Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Wed, 26 Mar 2025 08:17:50 -0300 Subject: [PATCH 11/37] Fixing the tool choice extra type to be a dict instead of string. --- src/pipecat/frames/frames.py | 2 +- src/pipecat/processors/aggregators/llm_response.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 39e25fc11..11cd17801 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -367,7 +367,7 @@ class LLMSetToolsFrame(DataFrame): class LLMSetToolChoiceFrame(DataFrame): """A frame containing a tool choice for an LLM to use for function calling.""" - tool_choice: Literal["none", "auto", "required"] | str + tool_choice: Literal["none", "auto", "required"] | dict @dataclass diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index c0ef65f12..af8bf1a2e 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -210,7 +210,7 @@ class LLMContextResponseAggregator(BaseLLMResponseAggregator): def set_tools(self, tools: List): self._context.set_tools(tools) - def set_tool_choice(self, tool_choice: Literal["none", "auto", "required"] | str): + def set_tool_choice(self, tool_choice: Literal["none", "auto", "required"] | dict): self._context.set_tool_choice(tool_choice) def reset(self): From aeac40312e6f67e1c3c0dc420a448bde409f3ed0 Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Wed, 26 Mar 2025 09:06:29 -0300 Subject: [PATCH 12/37] Added the feature to change dynamically the tool choice to the changelog. --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e47c2560e..e88155896 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added a new frame, `LLMSetToolChoiceFrame`, which provides a mechanism + for modifying the `tool_choice` in the context. + - Added support in `DailyTransport` for updating remote participants' `canReceive` permission via the `update_remote_participants()` method, by bumping the daily-python dependency to >= 0.16.0. From 72d373e5655710f9a95a869c3478b1775d921ac9 Mon Sep 17 00:00:00 2001 From: Nico <105345946+nicougou@users.noreply.github.com> Date: Tue, 25 Mar 2025 10:25:16 +0100 Subject: [PATCH 13/37] feature/support instructions in OpenAITTSService --- src/pipecat/services/openai.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index cb1edea72..3be1df243 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -391,6 +391,7 @@ class OpenAIImageGenService(ImageGenService): self, *, api_key: str, + base_url: Optional[str] = None, aiohttp_session: aiohttp.ClientSession, image_size: Literal["256x256", "512x512", "1024x1024", "1792x1024", "1024x1792"], model: str = "dall-e-3", @@ -398,7 +399,7 @@ class OpenAIImageGenService(ImageGenService): super().__init__() self.set_model_name(model) self._image_size = image_size - self._client = AsyncOpenAI(api_key=api_key) + self._client = AsyncOpenAI(api_key=api_key, base_url=base_url) self._aiohttp_session = aiohttp_session async def run_image_gen(self, prompt: str) -> AsyncGenerator[Frame, None]: @@ -501,9 +502,12 @@ class OpenAITTSService(TTSService): self, *, api_key: Optional[str] = None, + base_url: Optional[str] = None, + websocket_base_url: Optional[str] = None, voice: str = "alloy", model: str = "gpt-4o-mini-tts", sample_rate: Optional[int] = None, + instructions: Optional[str] = None, **kwargs, ): if sample_rate and sample_rate != self.OPENAI_SAMPLE_RATE: @@ -515,8 +519,8 @@ class OpenAITTSService(TTSService): self.set_model_name(model) self.set_voice(voice) - - self._client = AsyncOpenAI(api_key=api_key) + self._instructions = instructions + self._client = AsyncOpenAI(api_key=api_key, base_url=base_url, websocket_base_url=websocket_base_url) def can_generate_metrics(self) -> bool: return True @@ -538,11 +542,17 @@ class OpenAITTSService(TTSService): try: await self.start_ttfb_metrics() + # Setup extra body parameters + extra_body = {} + if self._instructions: + extra_body["instructions"] = self._instructions + async with self._client.audio.speech.with_streaming_response.create( input=text or " ", # Text must contain at least one character model=self.model_name, voice=VALID_VOICES[self._voice_id], response_format="pcm", + extra_body=extra_body, ) as r: if r.status_code != 200: error = await r.text() From d982fc35d8aebadaadb6449201917f7e741661f8 Mon Sep 17 00:00:00 2001 From: Nico <105345946+nicougou@users.noreply.github.com> Date: Tue, 25 Mar 2025 10:35:53 +0100 Subject: [PATCH 14/37] fix: formatter --- src/pipecat/services/openai.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index 3be1df243..c631fd8c0 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -520,7 +520,9 @@ class OpenAITTSService(TTSService): self.set_model_name(model) self.set_voice(voice) self._instructions = instructions - self._client = AsyncOpenAI(api_key=api_key, base_url=base_url, websocket_base_url=websocket_base_url) + self._client = AsyncOpenAI( + api_key=api_key, base_url=base_url, websocket_base_url=websocket_base_url + ) def can_generate_metrics(self) -> bool: return True From dc2ee2bf0a96393bf6e01f85d3671df0bcdf5119 Mon Sep 17 00:00:00 2001 From: Nico <105345946+nicougou@users.noreply.github.com> Date: Wed, 26 Mar 2025 15:41:37 +0100 Subject: [PATCH 15/37] review: remove websocket_base_url --- src/pipecat/services/openai.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index c631fd8c0..3f85d917c 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -503,7 +503,6 @@ class OpenAITTSService(TTSService): *, api_key: Optional[str] = None, base_url: Optional[str] = None, - websocket_base_url: Optional[str] = None, voice: str = "alloy", model: str = "gpt-4o-mini-tts", sample_rate: Optional[int] = None, @@ -520,9 +519,7 @@ class OpenAITTSService(TTSService): self.set_model_name(model) self.set_voice(voice) self._instructions = instructions - self._client = AsyncOpenAI( - api_key=api_key, base_url=base_url, websocket_base_url=websocket_base_url - ) + self._client = AsyncOpenAI(api_key=api_key, base_url=base_url) def can_generate_metrics(self) -> bool: return True From e6e339a02ecf6afe7fca531be0670ae8170aa22f Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Wed, 26 Mar 2025 11:22:23 -0400 Subject: [PATCH 16/37] Bump daily-python dependency to 0.16.1 to pick up a bugfix --- CHANGELOG.md | 5 ++++- pyproject.toml | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e88155896..105e6d891 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- Added a new frame, `LLMSetToolChoiceFrame`, which provides a mechanism +- Added a new frame, `LLMSetToolChoiceFrame`, which provides a mechanism for modifying the `tool_choice` in the context. - Added support in `DailyTransport` for updating remote participants' @@ -20,6 +20,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed an issue in Daily involving switching virtual devices, by bumping the + daily-python dependency to >= 0.16.1. + - Fixed a `GoogleAssistantContextAggregator` issue where function calls placeholders where not being updated when then function call result was different from a string. diff --git a/pyproject.toml b/pyproject.toml index d2fa46dcf..c3fbea3e2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,7 +48,7 @@ cartesia = [ "cartesia~=1.4.0", "websockets~=13.1" ] neuphonic = [ "pyneuphonic~=1.5.13", "websockets~=13.1" ] cerebras = [] deepseek = [] -daily = [ "daily-python~=0.16.0" ] +daily = [ "daily-python~=0.16.1" ] deepgram = [ "deepgram-sdk~=3.8.0" ] elevenlabs = [ "websockets~=13.1" ] fal = [ "fal-client~=0.5.9" ] From 499e69846d0918d1b42bd295d4ab07aa67553e89 Mon Sep 17 00:00:00 2001 From: Nico <105345946+nicougou@users.noreply.github.com> Date: Wed, 26 Mar 2025 17:13:30 +0100 Subject: [PATCH 17/37] review: add changelog entries --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e88155896..fa272561d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - ElevenLabs TTS services now support a sample rate of 8000. +- Added support for `instructions` in `OpenAITTSService` + +- Added support for `base_url` in `OpenAIImageGenService` and `OpenAITTSService` + ### Fixed - Fixed a `GoogleAssistantContextAggregator` issue where function calls From 060bb4c26bf616b372eab086bb58bfcbc14be231 Mon Sep 17 00:00:00 2001 From: Kwindla Hultman Kramer Date: Tue, 25 Mar 2025 18:47:01 -0700 Subject: [PATCH 18/37] wip --- .../foundational/07y-interruptible-groq.py | 101 ++++++++++++++++++ pyproject.toml | 2 +- src/pipecat/services/groq.py | 78 +++++++++++++- 3 files changed, 179 insertions(+), 2 deletions(-) create mode 100644 examples/foundational/07y-interruptible-groq.py diff --git a/examples/foundational/07y-interruptible-groq.py b/examples/foundational/07y-interruptible-groq.py new file mode 100644 index 000000000..48d0eb700 --- /dev/null +++ b/examples/foundational/07y-interruptible-groq.py @@ -0,0 +1,101 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import os +import sys + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from runner import configure + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.services.groq import GroqLLMService, GroqSTTService, GroqTTSService +from pipecat.transports.services.daily import DailyParams, DailyTransport + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, token) = await configure(session) + + transport = DailyTransport( + room_url, + token, + "Respond bot", + DailyParams( + audio_out_enabled=True, + # transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + vad_audio_passthrough=True, + ), + ) + + stt = GroqSTTService(api_key=os.getenv("GROQ_API_KEY")) + + llm = GroqLLMService(api_key=os.getenv("GROQ_API_KEY"), model="llama-3.3-70b-versatile") + + tts = GroqTTSService(api_key=os.getenv("GROQ_API_KEY"), voice_id="Atlas-PlayAI") + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + allow_interruptions=True, + enable_metrics=True, + enable_usage_metrics=True, + ), + ) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + await transport.capture_participant_transcription(participant["id"]) + # Kick off the conversation. + messages.append({"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @transport.event_handler("on_participant_left") + async def on_participant_left(transport, participant, reason): + await task.cancel() + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml index d2fa46dcf..da9c18dd0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,7 +56,7 @@ fish = [ "ormsgpack~=1.7.0", "websockets~=13.1" ] gladia = [ "websockets~=13.1" ] google = [ "google-cloud-speech~=2.31.1", "google-cloud-texttospeech~=2.25.1", "google-genai~=1.7.0", "google-generativeai~=0.8.4" ] grok = [] -groq = [] +groq = [ "groq~=0.20.0" ] gstreamer = [ "pygobject~=3.50.0" ] fireworks = [] krisp = [ "pipecat-ai-krisp~=0.3.0" ] diff --git a/src/pipecat/services/groq.py b/src/pipecat/services/groq.py index 66cc9357f..1b3570abb 100644 --- a/src/pipecat/services/groq.py +++ b/src/pipecat/services/groq.py @@ -5,10 +5,14 @@ # -from typing import Optional +from typing import AsyncGenerator, Optional +from groq import AsyncGroq from loguru import logger +from pydantic import BaseModel +from pipecat.frames.frames import Frame, TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame +from pipecat.services.ai_services import InterruptibleTTSService from pipecat.services.base_whisper import BaseWhisperSTTService, Transcription from pipecat.services.openai import OpenAILLMService from pipecat.transcriptions.language import Language @@ -98,3 +102,75 @@ class GroqSTTService(BaseWhisperSTTService): kwargs["temperature"] = self._temperature return await self._client.audio.transcriptions.create(**kwargs) + + +class GroqTTSService(InterruptibleTTSService): + class InputParams(BaseModel): + language: Optional[Language] = Language.EN + speed: Optional[float] = 1.0 + seed: Optional[int] = None + + def __init__( + self, + *, + api_key: str, + output_format: str = "wav", + params: InputParams = InputParams(), + model_name: str = "playai-tts", + voice_id: str = "Atlas-PlayAI", + **kwargs, + ): + super().__init__( + pause_frame_processing=True, + **kwargs, + ) + + self._api_key = api_key + self._model_name = model_name + self._output_format = output_format + self._voice_id = voice_id + self._params = params + + self._client = AsyncGroq(api_key=self._api_key) + + def can_generate_metrics(self) -> bool: + return True + + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: + logger.debug(f"{self}: Generating TTS [{text}]") + measuring_ttfb = True + await self.start_ttfb_metrics() + yield TTSStartedFrame() + + response = await self._client.audio.speech.create( + model=self._model_name, + voice=self._voice_id, + response_format=self._output_format, + input=text, + ) + + async for data in response.iter_bytes(4096): + if measuring_ttfb: + await self.stop_ttfb_metrics() + measuring_ttfb = False + # remove wav header if present + if data.startswith(b"RIFF"): + continue + yield TTSAudioRawFrame(data, 48000, 1) + + yield TTSStoppedFrame() + + async def _connect(self) -> None: + pass + + async def _disconnect(self) -> None: + pass + + async def _connect_websocket(self) -> None: + pass + + async def _disconnect_websocket(self) -> None: + pass + + async def _receive_messages(self) -> None: + pass From 406f5a395b56859727ae4df72c890ce584e4e3db Mon Sep 17 00:00:00 2001 From: Kwindla Hultman Kramer Date: Wed, 26 Mar 2025 07:54:57 -0700 Subject: [PATCH 19/37] fix class heirarchy and audio chunking --- .../foundational/07y-interruptible-groq.py | 2 +- src/pipecat/services/groq.py | 27 +++++-------------- 2 files changed, 8 insertions(+), 21 deletions(-) diff --git a/examples/foundational/07y-interruptible-groq.py b/examples/foundational/07y-interruptible-groq.py index 48d0eb700..9e5719c21 100644 --- a/examples/foundational/07y-interruptible-groq.py +++ b/examples/foundational/07y-interruptible-groq.py @@ -48,7 +48,7 @@ async def main(): llm = GroqLLMService(api_key=os.getenv("GROQ_API_KEY"), model="llama-3.3-70b-versatile") - tts = GroqTTSService(api_key=os.getenv("GROQ_API_KEY"), voice_id="Atlas-PlayAI") + tts = GroqTTSService(api_key=os.getenv("GROQ_API_KEY")) messages = [ { diff --git a/src/pipecat/services/groq.py b/src/pipecat/services/groq.py index 1b3570abb..9345f9be1 100644 --- a/src/pipecat/services/groq.py +++ b/src/pipecat/services/groq.py @@ -12,7 +12,7 @@ from loguru import logger from pydantic import BaseModel from pipecat.frames.frames import Frame, TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame -from pipecat.services.ai_services import InterruptibleTTSService +from pipecat.services.ai_services import TTSService from pipecat.services.base_whisper import BaseWhisperSTTService, Transcription from pipecat.services.openai import OpenAILLMService from pipecat.transcriptions.language import Language @@ -104,7 +104,7 @@ class GroqSTTService(BaseWhisperSTTService): return await self._client.audio.transcriptions.create(**kwargs) -class GroqTTSService(InterruptibleTTSService): +class GroqTTSService(TTSService): class InputParams(BaseModel): language: Optional[Language] = Language.EN speed: Optional[float] = 1.0 @@ -117,7 +117,7 @@ class GroqTTSService(InterruptibleTTSService): output_format: str = "wav", params: InputParams = InputParams(), model_name: str = "playai-tts", - voice_id: str = "Atlas-PlayAI", + voice_id: str = "Celeste-PlayAI", **kwargs, ): super().__init__( @@ -149,28 +149,15 @@ class GroqTTSService(InterruptibleTTSService): input=text, ) - async for data in response.iter_bytes(4096): + async for data in response.iter_bytes(): if measuring_ttfb: await self.stop_ttfb_metrics() measuring_ttfb = False # remove wav header if present if data.startswith(b"RIFF"): - continue + data = data[44:] + if len(data) == 0: + continue yield TTSAudioRawFrame(data, 48000, 1) yield TTSStoppedFrame() - - async def _connect(self) -> None: - pass - - async def _disconnect(self) -> None: - pass - - async def _connect_websocket(self) -> None: - pass - - async def _disconnect_websocket(self) -> None: - pass - - async def _receive_messages(self) -> None: - pass From e087f6ec5d7b5c7faaa99ea95709431de1180dd5 Mon Sep 17 00:00:00 2001 From: Kwindla Hultman Kramer Date: Wed, 26 Mar 2025 08:57:42 -0700 Subject: [PATCH 20/37] GroqTTSService added to CHANGELOG.md --- CHANGELOG.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e88155896..877387b4a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,9 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- Added a new frame, `LLMSetToolChoiceFrame`, which provides a mechanism +- Added a new frame, `LLMSetToolChoiceFrame`, which provides a mechanism for modifying the `tool_choice` in the context. +- Added `GroqTTSService` which provides text-to-speech functionality using + Groq's API. + - Added support in `DailyTransport` for updating remote participants' `canReceive` permission via the `update_remote_participants()` method, by bumping the daily-python dependency to >= 0.16.0. From f5d49fea8137067272da01af892872e9c316b32a Mon Sep 17 00:00:00 2001 From: Kwindla Hultman Kramer Date: Wed, 26 Mar 2025 09:13:03 -0700 Subject: [PATCH 21/37] try/catch import of groq SDK --- src/pipecat/services/groq.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/pipecat/services/groq.py b/src/pipecat/services/groq.py index 9345f9be1..9d4b09455 100644 --- a/src/pipecat/services/groq.py +++ b/src/pipecat/services/groq.py @@ -7,7 +7,6 @@ from typing import AsyncGenerator, Optional -from groq import AsyncGroq from loguru import logger from pydantic import BaseModel @@ -17,6 +16,15 @@ from pipecat.services.base_whisper import BaseWhisperSTTService, Transcription from pipecat.services.openai import OpenAILLMService from pipecat.transcriptions.language import Language +try: + from groq import AsyncGroq +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error( + "In order to use Groq, you need to `pip install pipecat-ai[groq]`. Also, set a `GROQ_API_KEY` environment variable." + ) + raise Exception(f"Missing module: {e}") + class GroqLLMService(OpenAILLMService): """A service for interacting with Groq's API using the OpenAI-compatible interface. From 887c197bce272f49c31034641d5f9b054dcfe9ce Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Wed, 26 Mar 2025 12:26:11 -0400 Subject: [PATCH 22/37] Add sample_rate to the constructor --- examples/foundational/07e-interruptible-playht.py | 2 +- src/pipecat/services/groq.py | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/examples/foundational/07e-interruptible-playht.py b/examples/foundational/07e-interruptible-playht.py index 5ccb96c15..c402c09db 100644 --- a/examples/foundational/07e-interruptible-playht.py +++ b/examples/foundational/07e-interruptible-playht.py @@ -48,7 +48,7 @@ async def main(): tts = PlayHTTTSService( user_id=os.getenv("PLAYHT_USER_ID"), api_key=os.getenv("PLAYHT_API_KEY"), - voice_url="s3://voice-cloning-zero-shot/d9ff78ba-d016-47f6-b0ef-dd630f59414e/female-cs/manifest.json", + voice_url="s3://voice-cloning-zero-shot/e46b4027-b38d-4d24-b292-38fbca2be0ef/original/manifest.json", params=PlayHTTTSService.InputParams(language=Language.EN), ) diff --git a/src/pipecat/services/groq.py b/src/pipecat/services/groq.py index 9d4b09455..bf0304df2 100644 --- a/src/pipecat/services/groq.py +++ b/src/pipecat/services/groq.py @@ -118,6 +118,8 @@ class GroqTTSService(TTSService): speed: Optional[float] = 1.0 seed: Optional[int] = None + GROQ_SAMPLE_RATE = 48000 # Groq TTS only supports 48kHz sample rate + def __init__( self, *, @@ -126,10 +128,14 @@ class GroqTTSService(TTSService): params: InputParams = InputParams(), model_name: str = "playai-tts", voice_id: str = "Celeste-PlayAI", + sample_rate: Optional[int] = GROQ_SAMPLE_RATE, **kwargs, ): + if sample_rate != self.GROQ_SAMPLE_RATE: + logger.warning(f"Groq TTS only supports {self.GROQ_SAMPLE_RATE}Hz sample rate. ") super().__init__( pause_frame_processing=True, + sample_rate=sample_rate, **kwargs, ) @@ -166,6 +172,6 @@ class GroqTTSService(TTSService): data = data[44:] if len(data) == 0: continue - yield TTSAudioRawFrame(data, 48000, 1) + yield TTSAudioRawFrame(data, self.sample_rate, 1) yield TTSStoppedFrame() From 4ef4dcefce6dc22ddff92a45d0eb3a793ce7fe9a Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Wed, 26 Mar 2025 13:06:31 -0400 Subject: [PATCH 23/37] Update CHANGELOG for 0.0.61 --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c521d0fcf..43c180778 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,7 @@ All notable changes to **Pipecat** will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased] +## [0.0.61] - 2025-03-26 ### Added From b414077a07c7c28aa43fd3d345c15af1e00fe594 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Wed, 26 Mar 2025 13:55:42 -0400 Subject: [PATCH 24/37] Fix: Resolve an issue where Google LLM context messages were causing a TypeError --- CHANGELOG.md | 4 ++++ src/pipecat/processors/frameworks/rtvi.py | 20 +++++++++++++++++--- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0504bd7ae..a8278cfdc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed an issue in `RTVIObserver` that prevented handling of Google LLM + context messages. The observer now processes both OpenAI-style and + Google-style contexts. + - Fixed an issue in Daily involving switching virtual devices, by bumping the daily-python dependency to >= 0.16.1. diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index f782e6ea8..eec07a29f 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -540,10 +540,23 @@ class RTVIObserver(BaseObserver): await self.push_transport_message_urgent(message) async def _handle_context(self, frame: OpenAILLMContextFrame): + """Process LLM context frames to extract user messages for the RTVI client.""" try: messages = frame.context.messages - if len(messages) > 0: - message = messages[-1] + if not messages: + return + + message = messages[-1] + + # Handle Google LLM format (protobuf objects with attributes) + if hasattr(message, "role") and message.role == "user" and hasattr(message, "parts"): + text = "".join(part.text for part in message.parts if hasattr(part, "text")) + if text: + rtvi_message = RTVIUserLLMTextMessage(data=RTVITextMessageData(text=text)) + await self.push_transport_message_urgent(rtvi_message) + + # Handle OpenAI format (original implementation) + elif isinstance(message, dict): if message["role"] == "user": content = message["content"] if isinstance(content, list): @@ -552,7 +565,8 @@ class RTVIObserver(BaseObserver): text = content rtvi_message = RTVIUserLLMTextMessage(data=RTVITextMessageData(text=text)) await self.push_transport_message_urgent(rtvi_message) - except TypeError as e: + + except Exception as e: logger.warning(f"Caught an error while trying to handle context: {e}") async def _handle_metrics(self, frame: MetricsFrame): From c4f9171fe106d1662b4d48efa23020d8f59a4366 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 26 Mar 2025 14:37:36 -0700 Subject: [PATCH 25/37] frames: indicate if UserStartedSpeakingFrame/UserStoppedSpeakingFrame are emulated --- CHANGELOG.md | 12 ++++++++++-- src/pipecat/frames/frames.py | 4 ++-- src/pipecat/transports/base_input.py | 4 ++-- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a93901bd1..69b1ba743 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,13 @@ All notable changes to **Pipecat** will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added + +- It is now possible to tell whether `UserStartedSpeakingFrame` or + `UserStoppedSpeakingFrame` have been generated because of emulation frames. + ## [0.0.61] - 2025-03-26 ### Added @@ -21,9 +28,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - ElevenLabs TTS services now support a sample rate of 8000. -- Added support for `instructions` in `OpenAITTSService` +- Added support for `instructions` in `OpenAITTSService`. -- Added support for `base_url` in `OpenAIImageGenService` and `OpenAITTSService` +- Added support for `base_url` in `OpenAIImageGenService` and + `OpenAITTSService`. ### Fixed diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 11cd17801..30d8622d9 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -562,14 +562,14 @@ class UserStartedSpeakingFrame(SystemFrame): """ - pass + emulated: bool = False @dataclass class UserStoppedSpeakingFrame(SystemFrame): """Emitted by the VAD to indicate that a user stopped speaking.""" - pass + emulated: bool = False @dataclass diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index 971dfe066..26f386576 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -117,10 +117,10 @@ class BaseInputTransport(FrameProcessor): await self._handle_bot_interruption(frame) elif isinstance(frame, EmulateUserStartedSpeakingFrame): logger.debug("Emulating user started speaking") - await self._handle_user_interruption(UserStartedSpeakingFrame()) + await self._handle_user_interruption(UserStartedSpeakingFrame(emulated=True)) elif isinstance(frame, EmulateUserStoppedSpeakingFrame): logger.debug("Emulating user stopped speaking") - await self._handle_user_interruption(UserStoppedSpeakingFrame()) + await self._handle_user_interruption(UserStoppedSpeakingFrame(emulated=True)) # All other system frames elif isinstance(frame, SystemFrame): await self.push_frame(frame, direction) From 750bb8858632d3de914356a0eaa3766596054d9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 26 Mar 2025 14:38:48 -0700 Subject: [PATCH 26/37] SegmentedSTTService: ignore emulated frames --- CHANGELOG.md | 6 ++++++ src/pipecat/services/ai_services.py | 7 ++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 69b1ba743..1cf797076 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - It is now possible to tell whether `UserStartedSpeakingFrame` or `UserStoppedSpeakingFrame` have been generated because of emulation frames. +### Fixed + +- Fixed an issue that would cause `SegmentedSTTService` based services + (e.g. `OpenAISTTService`) to try to transcribe non-spoken audio, causing + invalid transcriptions. + ## [0.0.61] - 2025-03-26 ### Added diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index a78c268dd..97aad0d40 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -1048,9 +1048,14 @@ class SegmentedSTTService(STTService): await self._handle_user_stopped_speaking(frame) async def _handle_user_started_speaking(self, frame: UserStartedSpeakingFrame): + if frame.emulated: + return self._user_speaking = True async def _handle_user_stopped_speaking(self, frame: UserStoppedSpeakingFrame): + if frame.emulated: + return + self._user_speaking = False content = io.BytesIO() @@ -1068,7 +1073,7 @@ class SegmentedSTTService(STTService): self._audio_buffer.clear() async def process_audio_frame(self, frame: AudioRawFrame, direction: FrameDirection): - # If the user is speaking the audio buffer will keep growin. + # If the user is speaking the audio buffer will keep growing. self._audio_buffer += frame.audio # If the user is not speaking we keep just a little bit of audio. From 055a3f1c53a2612afd26405993ca3ad2d249731f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 26 Mar 2025 14:39:12 -0700 Subject: [PATCH 27/37] LLMAssistantContextAggregator: stop emulations if the user starts speaking --- src/pipecat/processors/aggregators/llm_response.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index af8bf1a2e..e40ad266b 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -249,7 +249,7 @@ class LLMUserContextAggregator(LLMContextResponseAggregator): self._waiting_for_aggregation = False async def handle_aggregation(self, aggregation: str): - self._context.add_message({"role": self.role, "content": self._aggregation}) + self._context.add_message({"role": self.role, "content": aggregation}) async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) @@ -290,12 +290,14 @@ class LLMUserContextAggregator(LLMContextResponseAggregator): async def push_aggregation(self): if len(self._aggregation) > 0: - await self.handle_aggregation(self._aggregation) + aggregation = self._aggregation # Reset the aggregation. Reset it before pushing it down, otherwise # if the tasks gets cancelled we won't be able to clear things up. self.reset() + await self.handle_aggregation(aggregation) + frame = OpenAILLMContextFrame(self._context) await self.push_frame(frame) @@ -308,10 +310,16 @@ class LLMUserContextAggregator(LLMContextResponseAggregator): async def _cancel(self, frame: CancelFrame): await self._cancel_aggregation_task() - async def _handle_user_started_speaking(self, _: UserStartedSpeakingFrame): + async def _handle_user_started_speaking(self, frame: UserStartedSpeakingFrame): self._user_speaking = True self._waiting_for_aggregation = True + # If we get a non-emulated UserStartedSpeakingFrame but we are in the + # middle of emulating VAD, let's stop emulating VAD (i.e. don't send the + # EmulateUserStoppedSpeakingFrame). + if not frame.emulated and self._emulating_vad: + self._emulating_vad = False + async def _handle_user_stopped_speaking(self, _: UserStoppedSpeakingFrame): self._user_speaking = False # We just stopped speaking. Let's see if there's some aggregation to From c1f6a4e0790d09904c315e437dfce6ecaf3eed89 Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Thu, 27 Mar 2025 07:44:05 -0300 Subject: [PATCH 28/37] Adding PIPER_BASE_URL to the env template. --- dot-env.template | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dot-env.template b/dot-env.template index 2da20fc0b..f0b5bdc0f 100644 --- a/dot-env.template +++ b/dot-env.template @@ -90,3 +90,6 @@ ASSEMBLYAI_API_KEY=... # OpenRouter OPENROUTER_API_KEY=... + +# Piper +PIPER_BASE_URL=... \ No newline at end of file From ca4893397aec7f368f9c49884ac23e1c10907192 Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Thu, 27 Mar 2025 07:44:26 -0300 Subject: [PATCH 29/37] Creating a foundational example which uses the piper service. --- .../foundational/01-say-one-thing-piper.py | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 examples/foundational/01-say-one-thing-piper.py diff --git a/examples/foundational/01-say-one-thing-piper.py b/examples/foundational/01-say-one-thing-piper.py new file mode 100644 index 000000000..256447c23 --- /dev/null +++ b/examples/foundational/01-say-one-thing-piper.py @@ -0,0 +1,57 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import os +import sys + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from runner import configure + +from pipecat.frames.frames import EndFrame, TTSSpeakFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineTask +from pipecat.services.piper import PiperTTSService +from pipecat.transports.services.daily import DailyParams, DailyTransport + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, _) = await configure(session) + + transport = DailyTransport( + room_url, None, "Say One Thing", DailyParams(audio_out_enabled=True) + ) + + tts = PiperTTSService( + base_url=os.getenv("PIPER_BASE_URL"), aiohttp_session=session, sample_rate=24000 + ) + + runner = PipelineRunner() + + task = PipelineTask(Pipeline([tts, transport.output()])) + + # Register an event handler so we can play the audio when the + # participant joins. + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + await task.queue_frames( + [TTSSpeakFrame(f"Hello there, how are you today ?"), EndFrame()] + ) + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) From 053bf72da230193d3a6c4dc14ad91422c884599c Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Thu, 27 Mar 2025 07:44:46 -0300 Subject: [PATCH 30/37] Adding pytest-aiohttp to the dev requirements. --- dev-requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/dev-requirements.txt b/dev-requirements.txt index e65c2755c..af1c35721 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -6,6 +6,7 @@ pre-commit~=4.0.1 pyright~=1.1.397 pytest~=8.3.4 pytest-asyncio~=0.25.3 +pytest-aiohttp==1.1.0 ruff~=0.11.1 setuptools~=70.0.0 setuptools_scm~=8.1.0 From 45787520b2b2b4430b5468d452c763a9bc37cd28 Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Thu, 27 Mar 2025 07:45:28 -0300 Subject: [PATCH 31/37] Refactoring the piper test to use run_test provided by Pipecat --- tests/test_piper_tts.py | 83 ++++++++++++++++++++++++++++------------- 1 file changed, 57 insertions(+), 26 deletions(-) diff --git a/tests/test_piper_tts.py b/tests/test_piper_tts.py index 296de7fae..8db6bf1d1 100644 --- a/tests/test_piper_tts.py +++ b/tests/test_piper_tts.py @@ -2,16 +2,20 @@ import asyncio +import aiohttp import pytest from aiohttp import web from pipecat.frames.frames import ( ErrorFrame, TTSAudioRawFrame, + TTSSpeakFrame, TTSStartedFrame, TTSStoppedFrame, + TTSTextFrame, ) from pipecat.services.piper import PiperTTSService +from pipecat.tests.utils import run_test @pytest.mark.asyncio @@ -37,8 +41,8 @@ async def test_run_piper_tts_success(aiohttp_client): # Write out some chunked byte data # In reality, you’d return WAV data or similar - data_chunk_1 = b"\x00\x01\x02\x03" * 12000 # 48000 bytes - data_chunk_2 = b"\x04\x05\x06\x07" * 6000 # another chunk + data_chunk_1 = b"\x00\x01\x02\x03" * 1024 # 4096 bytes, 04 TTSAudioRawFrame + data_chunk_2 = b"\x04\x05\x06\x07" * 1024 # another chunk await resp.write(data_chunk_1) await asyncio.sleep(0.01) # simulate async chunk delay await resp.write(data_chunk_2) @@ -48,30 +52,43 @@ async def test_run_piper_tts_success(aiohttp_client): # Create an aiohttp test server app = web.Application() - app.router.add_get("/", handler) + app.router.add_post("/", handler) client = await aiohttp_client(app) # Remove trailing slash if present in the test URL base_url = str(client.make_url("")).rstrip("/") - # Instantiate PiperTTSService with our mock server - tts_service = PiperTTSService(base_url=base_url) + async with aiohttp.ClientSession() as session: + # Instantiate PiperTTSService with our mock server + tts_service = PiperTTSService(base_url=base_url, aiohttp_session=session, sample_rate=24000) - # Collect frames from the generator - frames = [] - async for frame in tts_service.run_tts("Hello world."): - frames.append(frame) + frames_to_send = [ + TTSSpeakFrame(text="Hello world."), + ] - # Ensure we received frames in the expected order/types - assert len(frames) >= 3, "Expecting at least TTSStartedFrame, TTSAudioRawFrame, TTSStoppedFrame" - assert isinstance(frames[0], TTSStartedFrame), "First frame must be TTSStartedFrame" - assert isinstance(frames[-1], TTSStoppedFrame), "Last frame must be TTSStoppedFrame" + expected_returned_frames = [ + TTSStartedFrame, + TTSAudioRawFrame, + TTSAudioRawFrame, + TTSAudioRawFrame, + TTSAudioRawFrame, + TTSAudioRawFrame, + TTSAudioRawFrame, + TTSAudioRawFrame, + TTSAudioRawFrame, + TTSStoppedFrame, + TTSTextFrame, + ] - # Check we have at least one TTSAudioRawFrame - audio_frames = [f for f in frames if isinstance(f, TTSAudioRawFrame)] - assert len(audio_frames) > 0, "Should have received at least one TTSAudioRawFrame" - for a_frame in audio_frames: - assert a_frame.sample_rate == 24000, "Sample rate should match the default (24000)" + frames_received = await run_test( + tts_service, + frames_to_send=frames_to_send, + expected_down_frames=expected_returned_frames, + ) + down_frames = frames_received[0] + audio_frames = [f for f in down_frames if isinstance(f, TTSAudioRawFrame)] + for a_frame in audio_frames: + assert a_frame.sample_rate == 24000, "Sample rate should match the default (24000)" @pytest.mark.asyncio @@ -86,16 +103,30 @@ async def test_run_piper_tts_error(aiohttp_client): return web.Response(status=404, text="Not found") app = web.Application() - app.router.add_get("/", handler) + app.router.add_post("/", handler) client = await aiohttp_client(app) base_url = str(client.make_url("")).rstrip("/") - tts_service = PiperTTSService(base_url=base_url) + async with aiohttp.ClientSession() as session: + tts_service = PiperTTSService(base_url=base_url, aiohttp_session=session, sample_rate=24000) - frames = [] - async for frame in tts_service.run_tts("Error case."): - frames.append(frame) + frames_to_send = [ + TTSSpeakFrame(text="Error case."), + ] - assert len(frames) == 1, "Should only receive a single ErrorFrame" - assert isinstance(frames[0], ErrorFrame), "Must receive an ErrorFrame for 404" - assert "status: 404" in frames[0].error, "ErrorFrame should contain details about the 404" + expected_down_frames = [TTSStoppedFrame, TTSTextFrame] + + expected_up_frames = [ErrorFrame] + + frames_received = await run_test( + tts_service, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + expected_up_frames=expected_up_frames, + ) + up_frames = frames_received[1] + + assert isinstance(up_frames[0], ErrorFrame), "Must receive an ErrorFrame for 404" + assert "status: 404" in up_frames[0].error, ( + "ErrorFrame should contain details about the 404" + ) From b348fde32b3972b87538238ff445d797fb4d9ca1 Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Thu, 27 Mar 2025 07:46:38 -0300 Subject: [PATCH 32/37] Refactoring PiperTTSService to match the others TTS services provided by Pipecat and fixing noise issue due to wav header. --- src/pipecat/services/piper.py | 103 +++++++++++++++++----------------- 1 file changed, 51 insertions(+), 52 deletions(-) diff --git a/src/pipecat/services/piper.py b/src/pipecat/services/piper.py index ecb831eb7..ad71a7d6c 100644 --- a/src/pipecat/services/piper.py +++ b/src/pipecat/services/piper.py @@ -4,7 +4,7 @@ # SPDX-License-Identifier: BSD 2-Clause License # -from typing import AsyncGenerator +from typing import AsyncGenerator, Optional import aiohttp from loguru import logger @@ -19,85 +19,84 @@ from pipecat.frames.frames import ( from pipecat.services.ai_services import TTSService # This assumes a running TTS service running: https://github.com/rhasspy/piper/blob/master/src/python_run/README_http.md - - class PiperTTSService(TTSService): """Piper TTS service implementation. Provides integration with Piper's TTS server. + + Args: + base_url: API base URL + aiohttp_session: aiohttp ClientSession + sample_rate: Output sample rate """ def __init__( self, *, base_url: str, - aiohttp_session: aiohttp.ClientSession | None = None, - sample_rate: int = 24000, + aiohttp_session: aiohttp.ClientSession, + # When using Piper, the sample rate of the generated audio depends on the + # voice model being used. + sample_rate: Optional[int] = None, **kwargs, ): - """Initialize the PiperTTSService class instance. - - Args: - base_url (str): Base URL of the Piper TTS server (should not end with a slash). - aiohttp_session (aiohttp.ClientSession, optional): Optional aiohttp session to use for requests. Defaults to None. - sample_rate (int, optional): Sample rate in Hz. Defaults to 24000. - **kwargs (dict): Additional keyword arguments. - """ super().__init__(sample_rate=sample_rate, **kwargs) - if not aiohttp_session: - aiohttp_session = aiohttp.ClientSession() if base_url.endswith("/"): logger.warning("Base URL ends with a slash, this is not allowed.") base_url = base_url[:-1] + self._base_url = base_url + self._session = aiohttp_session self._settings = {"base_url": base_url} - self.set_voice("voice_id") - self._aiohttp_session = aiohttp_session def can_generate_metrics(self) -> bool: return True async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: - logger.debug(f"Generating TTS: [{text}]") + """Generate speech from text using Piper API. - url = self._settings["base_url"] + "/?text=" + text.replace(".", "").replace("*", "") + Args: + text: The text to convert to speech - await self.start_ttfb_metrics() + Yields: + Frames containing audio data and status information + """ + logger.debug(f"{self}: Generating TTS [{text}]") + headers = { + "Content-Type": "text/plain", + } + try: + await self.start_ttfb_metrics() - async with self._aiohttp_session.get(url) as r: - if r.status != 200: - text = await r.text() - logger.error(f"{self} error getting audio (status: {r.status}, error: {text})") - yield ErrorFrame(f"Error getting audio (status: {r.status}, error: {text})") - return + async with self._session.post(self._base_url, data=text, headers=headers) as response: + if response.status != 200: + eror = await response.text() + logger.error( + f"{self} error getting audio (status: {response.status}, error: {eror})" + ) + yield ErrorFrame( + f"Error getting audio (status: {response.status}, error: {eror})" + ) + return - await self.start_tts_usage_metrics(text) + await self.start_tts_usage_metrics(text) - yield TTSStartedFrame() - - buffer = bytearray() - async for chunk in r.content.iter_chunked(1024): - if len(chunk) > 0: - await self.stop_ttfb_metrics() - # Append new chunk to the buffer. - buffer.extend(chunk) - - # Check if buffer has enough data for processing. - while ( - len(buffer) >= 48000 - ): # Assuming at least 0.5 seconds of audio data at 24000 Hz - # Process the buffer up to a safe size for resampling. - process_data = buffer[:48000] - # Remove processed data from buffer. - buffer = buffer[48000:] - - frame = TTSAudioRawFrame(process_data, self._sample_rate, 1) - yield frame - - # Process any remaining data in the buffer. - if len(buffer) > 0: - frame = TTSAudioRawFrame(buffer, self._sample_rate, 1) - yield frame + # Process the streaming response + CHUNK_SIZE = 1024 + yield TTSStartedFrame() + async for chunk in response.content.iter_chunked(CHUNK_SIZE): + # remove wav header if present + if chunk.startswith(b"RIFF"): + chunk = chunk[44:] + if len(chunk) > 0: + await self.stop_ttfb_metrics() + yield TTSAudioRawFrame(chunk, self.sample_rate, 1) + except Exception as e: + logger.error(f"Error in run_tts: {e}") + yield ErrorFrame(error=str(e)) + finally: + logger.debug(f"{self}: Finished TTS [{text}]") + await self.stop_ttfb_metrics() yield TTSStoppedFrame() From 50515aa8426c2e868c0044edb36a80d180a2d054 Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Thu, 27 Mar 2025 07:50:47 -0300 Subject: [PATCH 33/37] Adding PiperTTSService to the changelog. --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a93901bd1..5448399d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,13 @@ All notable changes to **Pipecat** will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added + +- Added support for a new TTS service, `PiperTTSService`. + (see https://github.com/rhasspy/piper/) + ## [0.0.61] - 2025-03-26 ### Added From a82b84797179dd4e84b53cf27efc86e941857f24 Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Thu, 27 Mar 2025 07:58:53 -0300 Subject: [PATCH 34/37] Fixing ruff format. --- src/pipecat/services/piper.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/pipecat/services/piper.py b/src/pipecat/services/piper.py index ad71a7d6c..12a936889 100644 --- a/src/pipecat/services/piper.py +++ b/src/pipecat/services/piper.py @@ -18,6 +18,7 @@ from pipecat.frames.frames import ( ) from pipecat.services.ai_services import TTSService + # This assumes a running TTS service running: https://github.com/rhasspy/piper/blob/master/src/python_run/README_http.md class PiperTTSService(TTSService): """Piper TTS service implementation. From aa85df4fd6fe34472fd3926ea2cfb9499c48886c Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Wed, 26 Mar 2025 16:21:31 -0400 Subject: [PATCH 35/37] Fix: GoogleTTSService was emitting two TTSStoppedFrames --- CHANGELOG.md | 2 ++ src/pipecat/services/google/google.py | 5 +++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e81db88a2..0263bae47 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 (e.g. `OpenAISTTService`) to try to transcribe non-spoken audio, causing invalid transcriptions. +- Fixed an issue where `GoogleTTSService` was emitting two `TTSStoppedFrames`. + ## [0.0.61] - 2025-03-26 ### Added diff --git a/src/pipecat/services/google/google.py b/src/pipecat/services/google/google.py index 3d207a254..95c5a1edb 100644 --- a/src/pipecat/services/google/google.py +++ b/src/pipecat/services/google/google.py @@ -1343,6 +1343,7 @@ class GoogleVertexLLMService(OpenAILLMService): **kwargs, ): """Initializes the VertexLLMService. + Args: credentials (Optional[str]): JSON string of service account credentials. credentials_path (Optional[str]): Path to the service account JSON file. @@ -1366,9 +1367,11 @@ class GoogleVertexLLMService(OpenAILLMService): @staticmethod def _get_api_token(credentials: Optional[str], credentials_path: Optional[str]) -> str: """Retrieves an authentication token using Google service account credentials. + Args: credentials (Optional[str]): JSON string of service account credentials. credentials_path (Optional[str]): Path to the service account JSON file. + Returns: str: OAuth token for API authentication. """ @@ -1557,8 +1560,6 @@ class GoogleTTSService(TTSService): logger.exception(f"{self} error generating TTS: {e}") error_message = f"TTS generation error: {str(e)}" yield ErrorFrame(error=error_message) - finally: - yield TTSStoppedFrame() class GoogleImageGenService(ImageGenService): From a55a7bbb96234f8b87cf84afd3dd4b5e0c464095 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 27 Mar 2025 08:03:16 -0400 Subject: [PATCH 36/37] Add Piper to README --- README.md | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index f28005618..c49478d27 100644 --- a/README.md +++ b/README.md @@ -55,17 +55,17 @@ pip install "pipecat-ai[option,...]" ### Available services -| Category | Services | Install Command Example | -| ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------------------- | -| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [Parakeet (NVIDIA)](https://docs.pipecat.ai/server/services/stt/parakeet), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) | `pip install "pipecat-ai[deepgram]"` | -| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Together AI](https://docs.pipecat.ai/server/services/llm/together) | `pip install "pipecat-ai[openai]"` | -| Text-to-Speech | [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [FastPitch (NVIDIA)](https://docs.pipecat.ai/server/services/tts/fastpitch), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) | `pip install "pipecat-ai[cartesia]"` | -| Speech-to-Speech | [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) | `pip install "pipecat-ai[google]"` | -| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local | `pip install "pipecat-ai[daily]"` | -| Video | [Tavus](https://docs.pipecat.ai/server/services/video/tavus), [Simli](https://docs.pipecat.ai/server/services/video/simli) | `pip install "pipecat-ai[tavus,simli]"` | -| Vision & Image | [fal](https://docs.pipecat.ai/server/services/image-generation/fal), [Google Imagen](https://docs.pipecat.ai/server/services/image-generation/fal), [Moondream](https://docs.pipecat.ai/server/services/vision/moondream) | `pip install "pipecat-ai[moondream]"` | -| Audio Processing | [Silero VAD](https://docs.pipecat.ai/server/utilities/audio/silero-vad-analyzer), [Krisp](https://docs.pipecat.ai/server/utilities/audio/krisp-filter), [Koala](https://docs.pipecat.ai/server/utilities/audio/koala-filter), [Noisereduce](https://docs.pipecat.ai/server/utilities/audio/noisereduce-filter) | `pip install "pipecat-ai[silero]"` | -| Analytics & Metrics | [Canonical AI](https://docs.pipecat.ai/server/services/analytics/canonical), [Sentry](https://docs.pipecat.ai/server/services/analytics/sentry) | `pip install "pipecat-ai[canonical]"` | +| Category | Services | Install Command Example | +| ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------------------- | +| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [Parakeet (NVIDIA)](https://docs.pipecat.ai/server/services/stt/parakeet), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) | `pip install "pipecat-ai[deepgram]"` | +| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Together AI](https://docs.pipecat.ai/server/services/llm/together) | `pip install "pipecat-ai[openai]"` | +| Text-to-Speech | [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [FastPitch (NVIDIA)](https://docs.pipecat.ai/server/services/tts/fastpitch), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) | `pip install "pipecat-ai[cartesia]"` | +| Speech-to-Speech | [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) | `pip install "pipecat-ai[google]"` | +| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local | `pip install "pipecat-ai[daily]"` | +| Video | [Tavus](https://docs.pipecat.ai/server/services/video/tavus), [Simli](https://docs.pipecat.ai/server/services/video/simli) | `pip install "pipecat-ai[tavus,simli]"` | +| Vision & Image | [fal](https://docs.pipecat.ai/server/services/image-generation/fal), [Google Imagen](https://docs.pipecat.ai/server/services/image-generation/fal), [Moondream](https://docs.pipecat.ai/server/services/vision/moondream) | `pip install "pipecat-ai[moondream]"` | +| Audio Processing | [Silero VAD](https://docs.pipecat.ai/server/utilities/audio/silero-vad-analyzer), [Krisp](https://docs.pipecat.ai/server/utilities/audio/krisp-filter), [Koala](https://docs.pipecat.ai/server/utilities/audio/koala-filter), [Noisereduce](https://docs.pipecat.ai/server/utilities/audio/noisereduce-filter) | `pip install "pipecat-ai[silero]"` | +| Analytics & Metrics | [Canonical AI](https://docs.pipecat.ai/server/services/analytics/canonical), [Sentry](https://docs.pipecat.ai/server/services/analytics/sentry) | `pip install "pipecat-ai[canonical]"` | 📚 [View full services documentation →](https://docs.pipecat.ai/server/services/supported-services) From b6007bb3d66cbd65307b74c4fb1b98e033c8c2dd Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Thu, 27 Mar 2025 17:26:03 -0300 Subject: [PATCH 37/37] Added support to ProtobufFrameSerializer to send the transport messages --- CHANGELOG.md | 2 ++ src/pipecat/frames/frames.proto | 5 +++++ src/pipecat/frames/protobufs/frames_pb2.py | 24 ++++++++++++++++------ src/pipecat/serializers/protobuf.py | 16 +++++++++++++++ 4 files changed, 41 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0263bae47..ef26f0266 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added support to `ProtobufFrameSerializer` to send the messages from `TransportMessageFrame` and `TransportMessageUrgentFrame`. + - Added support for a new TTS service, `PiperTTSService`. (see https://github.com/rhasspy/piper/) diff --git a/src/pipecat/frames/frames.proto b/src/pipecat/frames/frames.proto index 98dc014db..ebdb16fcc 100644 --- a/src/pipecat/frames/frames.proto +++ b/src/pipecat/frames/frames.proto @@ -35,10 +35,15 @@ message TranscriptionFrame { string timestamp = 5; } +message MessageFrame { + string data = 1; +} + message Frame { oneof frame { TextFrame text = 1; AudioRawFrame audio = 2; TranscriptionFrame transcription = 3; + MessageFrame message = 4; } } diff --git a/src/pipecat/frames/protobufs/frames_pb2.py b/src/pipecat/frames/protobufs/frames_pb2.py index d58bc8baa..7884c6ccc 100644 --- a/src/pipecat/frames/protobufs/frames_pb2.py +++ b/src/pipecat/frames/protobufs/frames_pb2.py @@ -1,12 +1,22 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE # source: frames.proto -# Protobuf Python Version: 4.25.1 +# Protobuf Python Version: 5.27.2 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version from google.protobuf import symbol_database as _symbol_database from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 5, + 27, + 2, + '', + 'frames.proto' +) # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() @@ -14,19 +24,21 @@ _sym_db = _symbol_database.Default() -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x66rames.proto\x12\x07pipecat\"3\n\tTextFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\"}\n\rAudioRawFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05\x61udio\x18\x03 \x01(\x0c\x12\x13\n\x0bsample_rate\x18\x04 \x01(\r\x12\x14\n\x0cnum_channels\x18\x05 \x01(\r\x12\x10\n\x03pts\x18\x06 \x01(\x04H\x00\x88\x01\x01\x42\x06\n\x04_pts\"`\n\x12TranscriptionFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\x12\x0f\n\x07user_id\x18\x04 \x01(\t\x12\x11\n\ttimestamp\x18\x05 \x01(\t\"\x93\x01\n\x05\x46rame\x12\"\n\x04text\x18\x01 \x01(\x0b\x32\x12.pipecat.TextFrameH\x00\x12\'\n\x05\x61udio\x18\x02 \x01(\x0b\x32\x16.pipecat.AudioRawFrameH\x00\x12\x34\n\rtranscription\x18\x03 \x01(\x0b\x32\x1b.pipecat.TranscriptionFrameH\x00\x42\x07\n\x05\x66rameb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x66rames.proto\x12\x07pipecat\"3\n\tTextFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\"}\n\rAudioRawFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05\x61udio\x18\x03 \x01(\x0c\x12\x13\n\x0bsample_rate\x18\x04 \x01(\r\x12\x14\n\x0cnum_channels\x18\x05 \x01(\r\x12\x10\n\x03pts\x18\x06 \x01(\x04H\x00\x88\x01\x01\x42\x06\n\x04_pts\"`\n\x12TranscriptionFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\x12\x0f\n\x07user_id\x18\x04 \x01(\t\x12\x11\n\ttimestamp\x18\x05 \x01(\t\"\x1c\n\x0cMessageFrame\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\t\"\xbd\x01\n\x05\x46rame\x12\"\n\x04text\x18\x01 \x01(\x0b\x32\x12.pipecat.TextFrameH\x00\x12\'\n\x05\x61udio\x18\x02 \x01(\x0b\x32\x16.pipecat.AudioRawFrameH\x00\x12\x34\n\rtranscription\x18\x03 \x01(\x0b\x32\x1b.pipecat.TranscriptionFrameH\x00\x12(\n\x07message\x18\x04 \x01(\x0b\x32\x15.pipecat.MessageFrameH\x00\x42\x07\n\x05\x66rameb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'frames_pb2', _globals) -if _descriptor._USE_C_DESCRIPTORS == False: - DESCRIPTOR._options = None +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None _globals['_TEXTFRAME']._serialized_start=25 _globals['_TEXTFRAME']._serialized_end=76 _globals['_AUDIORAWFRAME']._serialized_start=78 _globals['_AUDIORAWFRAME']._serialized_end=203 _globals['_TRANSCRIPTIONFRAME']._serialized_start=205 _globals['_TRANSCRIPTIONFRAME']._serialized_end=301 - _globals['_FRAME']._serialized_start=304 - _globals['_FRAME']._serialized_end=451 + _globals['_MESSAGEFRAME']._serialized_start=303 + _globals['_MESSAGEFRAME']._serialized_end=331 + _globals['_FRAME']._serialized_start=334 + _globals['_FRAME']._serialized_end=523 # @@protoc_insertion_point(module_scope) diff --git a/src/pipecat/serializers/protobuf.py b/src/pipecat/serializers/protobuf.py index 125f2037f..c3b6d86af 100644 --- a/src/pipecat/serializers/protobuf.py +++ b/src/pipecat/serializers/protobuf.py @@ -5,6 +5,7 @@ # import dataclasses +import json from loguru import logger @@ -15,15 +16,24 @@ from pipecat.frames.frames import ( OutputAudioRawFrame, TextFrame, TranscriptionFrame, + TransportMessageFrame, + TransportMessageUrgentFrame, ) from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType +# Data class for converting transport messages into Protobuf format. +@dataclasses.dataclass +class MessageFrame: + data: str + + class ProtobufFrameSerializer(FrameSerializer): SERIALIZABLE_TYPES = { TextFrame: "text", OutputAudioRawFrame: "audio", TranscriptionFrame: "transcription", + MessageFrame: "message", } SERIALIZABLE_FIELDS = {v: k for k, v in SERIALIZABLE_TYPES.items()} @@ -42,6 +52,12 @@ class ProtobufFrameSerializer(FrameSerializer): return FrameSerializerType.BINARY async def serialize(self, frame: Frame) -> str | bytes | None: + # Wrapping this messages as a JSONFrame to send + if isinstance(frame, (TransportMessageFrame, TransportMessageUrgentFrame)): + frame = MessageFrame( + data=json.dumps(frame.message), + ) + proto_frame = frame_protos.Frame() if type(frame) not in self.SERIALIZABLE_TYPES: logger.warning(f"Frame type {type(frame)} is not serializable")