From df4c3e56c495c3221c202e9b75b59bbae5e5ad12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Mon, 1 Jul 2024 10:17:21 -0700 Subject: [PATCH 1/5] services: add missing * keyword separator --- src/pipecat/processors/async_frame_processor.py | 3 ++- src/pipecat/processors/frame_processor.py | 1 + src/pipecat/services/ai_services.py | 3 ++- src/pipecat/services/anthropic.py | 1 + src/pipecat/services/deepgram.py | 5 ++--- src/pipecat/services/fireworks.py | 1 + src/pipecat/services/google.py | 2 +- src/pipecat/services/moondream.py | 1 + src/pipecat/services/ollama.py | 2 +- src/pipecat/services/openai.py | 6 +++--- src/pipecat/services/openpipe.py | 7 ++++--- src/pipecat/services/whisper.py | 7 ++++--- 12 files changed, 23 insertions(+), 16 deletions(-) diff --git a/src/pipecat/processors/async_frame_processor.py b/src/pipecat/processors/async_frame_processor.py index db805f924..24c016867 100644 --- a/src/pipecat/processors/async_frame_processor.py +++ b/src/pipecat/processors/async_frame_processor.py @@ -14,10 +14,11 @@ class AsyncFrameProcessor(FrameProcessor): def __init__( self, + *, name: str | None = None, loop: asyncio.AbstractEventLoop | None = None, **kwargs): - super().__init__(name, loop, **kwargs) + super().__init__(name=name, loop=loop, **kwargs) self._create_push_task() diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index e9c8e30c1..18c03f169 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -66,6 +66,7 @@ class FrameProcessor: def __init__( self, + *, name: str | None = None, loop: asyncio.AbstractEventLoop | None = None, **kwargs): diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index e3667a85c..06e4b3ebe 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -118,7 +118,7 @@ class LLMService(AIService): class TTSService(AIService): - def __init__(self, aggregate_sentences: bool = True, **kwargs): + def __init__(self, *, aggregate_sentences: bool = True, **kwargs): super().__init__(**kwargs) self._aggregate_sentences: bool = aggregate_sentences self._current_sentence: str = "" @@ -180,6 +180,7 @@ class STTService(AIService): """STTService is a base class for speech-to-text services.""" def __init__(self, + *, min_volume: float = 0.6, max_silence_secs: float = 0.3, max_buffer_secs: float = 1.5, diff --git a/src/pipecat/services/anthropic.py b/src/pipecat/services/anthropic.py index ca32f0451..3868617eb 100644 --- a/src/pipecat/services/anthropic.py +++ b/src/pipecat/services/anthropic.py @@ -41,6 +41,7 @@ class AnthropicLLMService(LLMService): def __init__( self, + *, api_key: str, model: str = "claude-3-opus-20240229", max_tokens: int = 1024): diff --git a/src/pipecat/services/deepgram.py b/src/pipecat/services/deepgram.py index 4ccd8c50f..59d323c33 100644 --- a/src/pipecat/services/deepgram.py +++ b/src/pipecat/services/deepgram.py @@ -5,7 +5,6 @@ # import aiohttp -import asyncio import time from typing import AsyncGenerator @@ -18,11 +17,10 @@ from pipecat.frames.frames import ( Frame, InterimTranscriptionFrame, StartFrame, - StartInterruptionFrame, SystemFrame, TranscriptionFrame) from pipecat.processors.frame_processor import FrameDirection -from pipecat.services.ai_services import AIService, AsyncAIService, TTSService +from pipecat.services.ai_services import AsyncAIService, TTSService from loguru import logger @@ -96,6 +94,7 @@ class DeepgramTTSService(TTSService): class DeepgramSTTService(AsyncAIService): def __init__(self, + *, api_key: str, url: str = "", live_options: LiveOptions = LiveOptions( diff --git a/src/pipecat/services/fireworks.py b/src/pipecat/services/fireworks.py index 2c1020be5..7fa4d64e8 100644 --- a/src/pipecat/services/fireworks.py +++ b/src/pipecat/services/fireworks.py @@ -19,6 +19,7 @@ except ModuleNotFoundError as e: class FireworksLLMService(BaseOpenAILLMService): def __init__(self, + *, model: str = "accounts/fireworks/models/firefunction-v1", base_url: str = "https://api.fireworks.ai/inference/v1"): super().__init__(model, base_url) diff --git a/src/pipecat/services/google.py b/src/pipecat/services/google.py index c558ca283..da83e9274 100644 --- a/src/pipecat/services/google.py +++ b/src/pipecat/services/google.py @@ -42,7 +42,7 @@ class GoogleLLMService(LLMService): franca for all LLM services, so that it is easy to switch between different LLMs. """ - def __init__(self, api_key: str, model: str = "gemini-1.5-flash-latest", **kwargs): + def __init__(self, *, api_key: str, model: str = "gemini-1.5-flash-latest", **kwargs): super().__init__(**kwargs) gai.configure(api_key=api_key) self._client = gai.GenerativeModel(model) diff --git a/src/pipecat/services/moondream.py b/src/pipecat/services/moondream.py index 2c105c92d..cff8d3172 100644 --- a/src/pipecat/services/moondream.py +++ b/src/pipecat/services/moondream.py @@ -46,6 +46,7 @@ def detect_device(): class MoondreamService(VisionService): def __init__( self, + *, model="vikhyatk/moondream2", revision="2024-04-02", use_cpu=False diff --git a/src/pipecat/services/ollama.py b/src/pipecat/services/ollama.py index 5de16dc07..8fa3fc2de 100644 --- a/src/pipecat/services/ollama.py +++ b/src/pipecat/services/ollama.py @@ -9,5 +9,5 @@ from pipecat.services.openai import BaseOpenAILLMService class OLLamaLLMService(BaseOpenAILLMService): - def __init__(self, model: str = "llama2", base_url: str = "http://localhost:11434/v1"): + def __init__(self, *, model: str = "llama2", base_url: str = "http://localhost:11434/v1"): super().__init__(model=model, base_url=base_url, api_key="ollama") diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index 1e7cd070b..345c76043 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -67,7 +67,7 @@ class BaseOpenAILLMService(LLMService): calls from the LLM. """ - def __init__(self, model: str, api_key=None, base_url=None, **kwargs): + def __init__(self, *, model: str, api_key=None, base_url=None, **kwargs): super().__init__(**kwargs) self._model: str = model self._client = self.create_client(api_key=api_key, base_url=base_url, **kwargs) @@ -236,8 +236,8 @@ class BaseOpenAILLMService(LLMService): class OpenAILLMService(BaseOpenAILLMService): - def __init__(self, model="gpt-4o", **kwargs): - super().__init__(model, **kwargs) + def __init__(self, *, model: str = "gpt-4o", **kwargs): + super().__init__(model=model, **kwargs) class OpenAIImageGenService(ImageGenService): diff --git a/src/pipecat/services/openpipe.py b/src/pipecat/services/openpipe.py index de9d9fcfa..ada7824fb 100644 --- a/src/pipecat/services/openpipe.py +++ b/src/pipecat/services/openpipe.py @@ -25,6 +25,7 @@ class OpenPipeLLMService(BaseOpenAILLMService): def __init__( self, + *, model: str = "gpt-4o", api_key: str | None = None, base_url: str | None = None, @@ -33,9 +34,9 @@ class OpenPipeLLMService(BaseOpenAILLMService): tags: Dict[str, str] | None = None, **kwargs): super().__init__( - model, - api_key, - base_url, + model=model, + api_key=api_key, + base_url=base_url, openpipe_api_key=openpipe_api_key, openpipe_base_url=openpipe_base_url, **kwargs) diff --git a/src/pipecat/services/whisper.py b/src/pipecat/services/whisper.py index 9cd3150dc..5ef06f135 100644 --- a/src/pipecat/services/whisper.py +++ b/src/pipecat/services/whisper.py @@ -42,7 +42,8 @@ class WhisperSTTService(STTService): """Class to transcribe audio with a locally-downloaded Whisper model""" def __init__(self, - model: Model = Model.DISTIL_MEDIUM_EN, + *, + model: str | Model = Model.DISTIL_MEDIUM_EN, device: str = "auto", compute_type: str = "default", no_speech_prob: float = 0.4, @@ -51,7 +52,7 @@ class WhisperSTTService(STTService): super().__init__(**kwargs) self._device: str = device self._compute_type = compute_type - self._model_name: Model = model + self._model_name: str | Model = model self._no_speech_prob = no_speech_prob self._model: WhisperModel | None = None self._load() @@ -64,7 +65,7 @@ class WhisperSTTService(STTService): this model is being run, it will take time to download.""" logger.debug("Loading Whisper model...") self._model = WhisperModel( - self._model_name.value, + self._model_name.value if isinstance(self._model_name, Enum) else self._model_name, device=self._device, compute_type=self._compute_type) logger.debug("Loaded Whisper model") From 06f817c7e3217e9db5b5500090d0c1ebd1fa0a92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Mon, 1 Jul 2024 10:18:20 -0700 Subject: [PATCH 2/5] transport(websocket): don't send if serializer returns None --- src/pipecat/transports/network/websocket_server.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/pipecat/transports/network/websocket_server.py b/src/pipecat/transports/network/websocket_server.py index d17174eba..fe775ce45 100644 --- a/src/pipecat/transports/network/websocket_server.py +++ b/src/pipecat/transports/network/websocket_server.py @@ -124,6 +124,9 @@ class WebsocketServerOutputTransport(BaseOutputTransport): self._websocket = websocket async def write_raw_audio_frames(self, frames: bytes): + if not self._websocket: + return + self._audio_buffer += frames while len(self._audio_buffer) >= self._params.audio_frame_size: frame = AudioRawFrame( @@ -148,8 +151,8 @@ class WebsocketServerOutputTransport(BaseOutputTransport): frame = wav_frame proto = self._params.serializer.serialize(frame) - - await self._websocket.send(proto) + if proto: + await self._websocket.send(proto) self._audio_buffer = self._audio_buffer[self._params.audio_frame_size:] From ddd0ca6a8f9890d1334f7b5f5a45df8810b79d2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Mon, 1 Jul 2024 10:21:50 -0700 Subject: [PATCH 3/5] update CHANGELOG --- CHANGELOG.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f5966ad68..c3d6b32b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,8 +25,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 processing metrics indicate the time a processor needs to generate all its output. Note that not all processors generate these kind of metrics. +### Changed + +- `WhisperSTTService` model can now also be a string. + +- Added missing * keyword separators in services. + ### Fixed +- `WebsocketServerTransport` doesn't try to send frames anymore if serializers + returns `None`. + - Fixed an issue where exceptions that occurred inside frame processors were being swallowed and not displayed. From 7f9fd9ffcec33eabff0b354403b1d0db426cad55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Mon, 1 Jul 2024 10:40:17 -0700 Subject: [PATCH 4/5] examples: added 07i-interruptible-xtts --- CHANGELOG.md | 3 + .../foundational/07i-interruptible-xtts.py | 96 +++++++++++++++++++ src/pipecat/services/xtts.py | 31 +++--- 3 files changed, 115 insertions(+), 15 deletions(-) create mode 100644 examples/foundational/07i-interruptible-xtts.py diff --git a/CHANGELOG.md b/CHANGELOG.md index c3d6b32b5..60f30806f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added `XTTSService`. This is a local Text-To-Speech service. + See https://github.com/coqui-ai/TTS + - It is now possible to specify a Silero VAD version when using `SileroVADAnalyzer` or `SileroVAD`. diff --git a/examples/foundational/07i-interruptible-xtts.py b/examples/foundational/07i-interruptible-xtts.py new file mode 100644 index 000000000..73414f294 --- /dev/null +++ b/examples/foundational/07i-interruptible-xtts.py @@ -0,0 +1,96 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import aiohttp +import os +import sys + +from pipecat.frames.frames import LLMMessagesFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_response import ( + LLMAssistantResponseAggregator, LLMUserResponseAggregator) +from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService +from pipecat.services.openai import OpenAILLMService +from pipecat.services.xtts import XTTSService +from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.vad.silero import SileroVADAnalyzer + +from runner import configure + +from loguru import logger + +from dotenv import load_dotenv +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(room_url: str, token): + async with aiohttp.ClientSession() as session: + transport = DailyTransport( + room_url, + token, + "Respond bot", + DailyParams( + audio_out_enabled=True, + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ) + ) + + tts = XTTSService( + aiohttp_session=session, + voice_id="Claribel Dervla", + language="en", + base_url="http://localhost:8000" + ) + + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), + model="gpt-4o") + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + + tma_in = LLMUserResponseAggregator(messages) + tma_out = LLMAssistantResponseAggregator(messages) + + pipeline = Pipeline([ + transport.input(), # Transport user input + tma_in, # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + tma_out # Assistant spoken responses + ]) + + task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + transport.capture_participant_transcription(participant["id"]) + # Kick off the conversation. + messages.append( + {"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([LLMMessagesFrame(messages)]) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + (url, token) = configure() + asyncio.run(main(url, token)) diff --git a/src/pipecat/services/xtts.py b/src/pipecat/services/xtts.py index faf93f7cb..67aa86ccc 100644 --- a/src/pipecat/services/xtts.py +++ b/src/pipecat/services/xtts.py @@ -24,13 +24,14 @@ except ModuleNotFoundError as e: logger.error("In order to use XTTS, you need to `pip install pipecat-ai[xtts]`.") raise Exception(f"Missing module: {e}") -##### -## The server below can connect to XTTS through a local running docker -## -## Docker command: $ docker run --gpus=all -e COQUI_TOS_AGREED=1 --rm -p 8000:80 ghcr.io/coqui-ai/xtts-streaming-server:latest-cuda121 -## -## You can find more information on the official repo: https://github.com/coqui-ai/xtts-streaming-server -#### + +# The server below can connect to XTTS through a local running docker +# +# Docker command: $ docker run --gpus=all -e COQUI_TOS_AGREED=1 --rm -p 8000:80 ghcr.io/coqui-ai/xtts-streaming-server:latest-cuda121 +# +# You can find more information on the official repo: +# https://github.com/coqui-ai/xtts-streaming-server + class XTTSService(TTSService): @@ -40,7 +41,7 @@ class XTTSService(TTSService): aiohttp_session: aiohttp.ClientSession, voice_id: str, language: str, - base_url:str, + base_url: str, **kwargs): super().__init__(**kwargs) @@ -58,9 +59,9 @@ class XTTSService(TTSService): embeddings = self._studio_speakers[self._voice_id] url = self._base_url + "/tts_stream" - - payload={ - "text": text.replace('.','').replace('*',''), + + payload = { + "text": text.replace('.', '').replace('*', ''), "language": self._language, "speaker_embedding": embeddings["speaker_embedding"], "gpt_cond_latent": embeddings["gpt_cond_latent"], @@ -76,7 +77,7 @@ class XTTSService(TTSService): logger.error(f"{self} error getting audio (status: {r.status}, error: {text})") yield ErrorFrame(f"Error getting audio (status: {r.status}, error: {text})") return - + buffer = bytearray() async for chunk in r.content.iter_chunked(1024): @@ -84,14 +85,14 @@ class XTTSService(TTSService): await self.stop_ttfb_metrics() # Append new chunk to the buffer buffer.extend(chunk) - + # Check if buffer has enough data for processing while len(buffer) >= 48000: # Assuming at least 0.5 seconds of audio data at 24000 Hz # Process the buffer up to a safe size for resampling process_data = buffer[:48000] # Remove processed data from buffer buffer = buffer[48000:] - + # Convert the byte data to numpy array for resampling audio_np = np.frombuffer(process_data, dtype=np.int16) # Resample the audio from 24000 Hz to 16000 Hz @@ -108,4 +109,4 @@ class XTTSService(TTSService): resampled_audio = resampy.resample(audio_np, 24000, 16000) resampled_audio_bytes = resampled_audio.astype(np.int16).tobytes() frame = AudioRawFrame(resampled_audio_bytes, 16000, 1) - yield frame \ No newline at end of file + yield frame From 5018a552c1a906a02196ad9e12cce1783b461d67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Mon, 1 Jul 2024 10:44:32 -0700 Subject: [PATCH 5/5] services(xtts): no need the WAV header --- src/pipecat/services/xtts.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pipecat/services/xtts.py b/src/pipecat/services/xtts.py index 67aa86ccc..590a9dd3d 100644 --- a/src/pipecat/services/xtts.py +++ b/src/pipecat/services/xtts.py @@ -65,7 +65,7 @@ class XTTSService(TTSService): "language": self._language, "speaker_embedding": embeddings["speaker_embedding"], "gpt_cond_latent": embeddings["gpt_cond_latent"], - "add_wav_header": True, + "add_wav_header": False, "stream_chunk_size": 20, }