From 1b3afb551196ef8d0a1db459da7c40e809a264ae Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Fri, 10 Oct 2025 09:44:47 -0300 Subject: [PATCH] Added audio filter KrispVivaFilter using the Krisp VIVA SDK --- CHANGELOG.md | 2 + env.example | 3 + .../07p-interruptible-krisp-viva.py | 129 ++++++++++++ .../audio/filters/krisp_viva_filter.py | 193 ++++++++++++++++++ 4 files changed, 327 insertions(+) create mode 100644 examples/foundational/07p-interruptible-krisp-viva.py create mode 100644 src/pipecat/audio/filters/krisp_viva_filter.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 7079f21f5..02b31d93f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added audio filter `KrispVivaFilter` using the Krisp VIVA SDK. + - Added `--folder` argument to the runner, allowing files saved in that folder to be downloaded from `http://HOST:PORT/file/FILE`. diff --git a/env.example b/env.example index 22768494b..f707f49c9 100644 --- a/env.example +++ b/env.example @@ -90,6 +90,9 @@ SIMLI_FACE_ID=... # Krisp KRISP_MODEL_PATH=... +# Krisp Viva +KRISP_VIVA_MODEL_PATH=... + # DeepSeek DEEPSEEK_API_KEY=... diff --git a/examples/foundational/07p-interruptible-krisp-viva.py b/examples/foundational/07p-interruptible-krisp-viva.py new file mode 100644 index 000000000..c7ca15b40 --- /dev/null +++ b/examples/foundational/07p-interruptible-krisp-viva.py @@ -0,0 +1,129 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + + +import os + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.audio.filters.krisp_viva_filter import KrispVivaFilter +from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams +from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.audio.vad.vad_analyzer import VADParams +from pipecat.frames.frames import LLMRunFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.deepgram.tts import DeepgramTTSService +from pipecat.services.openai.llm import OpenAILLMService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams +from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams + +load_dotenv(override=True) + +# We store functions so objects (e.g. SileroVADAnalyzer) don't get +# instantiated. The function will be called when the desired transport gets +# selected. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), + turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()), + audio_in_filter=KrispVivaFilter(), + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), + turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()), + audio_in_filter=KrispVivaFilter(), + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), + turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()), + audio_in_filter=KrispVivaFilter(), + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info(f"Starting bot") + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en") + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + + context = LLMContext(messages) + context_aggregator = LLMContextAggregatorPair(context) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, # STT + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + # Kick off the conversation. + messages.append({"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([LLMRunFrame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/src/pipecat/audio/filters/krisp_viva_filter.py b/src/pipecat/audio/filters/krisp_viva_filter.py new file mode 100644 index 000000000..ddb489168 --- /dev/null +++ b/src/pipecat/audio/filters/krisp_viva_filter.py @@ -0,0 +1,193 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Krisp noise reduction audio filter for Pipecat. + +This module provides an audio filter implementation using Krisp VIVA SDK. +""" + +import os + +import numpy as np +from loguru import logger + +from pipecat.audio.filters.base_audio_filter import BaseAudioFilter +from pipecat.frames.frames import FilterControlFrame, FilterEnableFrame + +try: + import krisp_audio +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error("In order to use the Krisp filter, you need to install krisp_audio.") + raise Exception(f"Missing module: {e}") + + +def _log_callback(log_message, log_level): + logger.info(f"[{log_level}] {log_message}") + + +class KrispVivaFilter(BaseAudioFilter): + """Audio filter using the Krisp VIVA SDK. + + Provides real-time noise reduction for audio streams using Krisp's + proprietary noise suppression algorithms. This filter requires a + valid Krisp model file to operate. + + Supported sample rates: + - 8000 Hz + - 16000 Hz + - 24000 Hz + - 32000 Hz + - 44100 Hz + - 48000 Hz + """ + + # Initialize Krisp Audio SDK globally + krisp_audio.globalInit("", _log_callback, krisp_audio.LogLevel.Off) + SDK_VERSION = krisp_audio.getVersion() + logger.debug( + f"Krisp Audio Python SDK Version: {SDK_VERSION.major}." + f"{SDK_VERSION.minor}.{SDK_VERSION.patch}" + ) + + SAMPLE_RATES = { + 8000: krisp_audio.SamplingRate.Sr8000Hz, + 16000: krisp_audio.SamplingRate.Sr16000Hz, + 24000: krisp_audio.SamplingRate.Sr24000Hz, + 32000: krisp_audio.SamplingRate.Sr32000Hz, + 44100: krisp_audio.SamplingRate.Sr44100Hz, + 48000: krisp_audio.SamplingRate.Sr48000Hz, + } + + FRAME_SIZE_MS = 10 # Krisp requires audio frames of 10ms duration for processing. + + def __init__(self, model_path: str = None, noise_suppression_level: int = 100) -> None: + """Initialize the Krisp noise reduction filter. + + Args: + model_path: Path to the Krisp model file (.kef extension). + If None, uses KRISP_VIVA_MODEL_PATH environment variable. + noise_suppression_level: Noise suppression level. + + Raises: + ValueError: If model_path is not provided and KRISP_VIVA_MODEL_PATH is not set. + Exception: If model file doesn't have .kef extension. + FileNotFoundError: If model file doesn't exist. + """ + super().__init__() + + # Set model path, checking environment if not specified + self._model_path = model_path or os.getenv("KRISP_VIVA_MODEL_PATH") + if not self._model_path: + logger.error("Model path is not provided and KRISP_VIVA_MODEL_PATH is not set.") + raise ValueError("Model path for KrispAudioProcessor must be provided.") + + if not self._model_path.endswith(".kef"): + raise Exception("Model is expected with .kef extension") + + if not os.path.isfile(self._model_path): + raise FileNotFoundError(f"Model file not found: {self._model_path}") + + self._filtering = True + self._session = None + self._samples_per_frame = None + self._noise_suppression_level = noise_suppression_level + + # Audio buffer to accumulate samples for complete frames + self._audio_buffer = bytearray() + + def _int_to_sample_rate(self, sample_rate): + """Convert integer sample rate to krisp_audio SamplingRate enum. + + Args: + sample_rate: Sample rate as integer + + Returns: + krisp_audio.SamplingRate enum value + + Raises: + ValueError: If sample rate is not supported + """ + if sample_rate not in self.SAMPLE_RATES: + raise ValueError("Unsupported sample rate") + return self.SAMPLE_RATES[sample_rate] + + async def start(self, sample_rate: int): + """Initialize the Krisp processor with the transport's sample rate. + + Args: + sample_rate: The sample rate of the input transport in Hz. + """ + model_info = krisp_audio.ModelInfo() + model_info.path = self._model_path + + nc_cfg = krisp_audio.NcSessionConfig() + nc_cfg.inputSampleRate = self._int_to_sample_rate(sample_rate) + nc_cfg.inputFrameDuration = krisp_audio.FrameDuration.Fd10ms + nc_cfg.outputSampleRate = nc_cfg.inputSampleRate + nc_cfg.modelInfo = model_info + + self._samples_per_frame = int((sample_rate * self.FRAME_SIZE_MS) / 1000) + self._session = krisp_audio.NcInt16.create(nc_cfg) + + async def stop(self): + """Clean up the Krisp processor when stopping.""" + self._session = None + + async def process_frame(self, frame: FilterControlFrame): + """Process control frames to enable/disable filtering. + + Args: + frame: The control frame containing filter commands. + """ + if isinstance(frame, FilterEnableFrame): + self._filtering = frame.enable + + async def filter(self, audio: bytes) -> bytes: + """Apply Krisp noise reduction to audio data. + + Args: + audio: Raw audio data as bytes to be filtered. + + Returns: + Noise-reduced audio data as bytes. + """ + if not self._filtering: + return audio + + # Add incoming audio to our buffer + self._audio_buffer.extend(audio) + + # Calculate how many complete frames we can process + total_samples = len(self._audio_buffer) // 2 # 2 bytes per int16 sample + num_complete_frames = total_samples // self._samples_per_frame + + if num_complete_frames == 0: + # Not enough samples for a complete frame yet, return empty + return b"" + + # Calculate how many bytes we need for complete frames + complete_samples_count = num_complete_frames * self._samples_per_frame + bytes_to_process = complete_samples_count * 2 # 2 bytes per sample + + # Extract the bytes we can process + audio_to_process = bytes(self._audio_buffer[:bytes_to_process]) + + # Remove processed bytes from buffer, keep the remainder + self._audio_buffer = self._audio_buffer[bytes_to_process:] + + # Process the complete frames + samples = np.frombuffer(audio_to_process, dtype=np.int16) + frames = samples.reshape(-1, self._samples_per_frame) + processed_samples = np.empty_like(samples) + + for i, frame in enumerate(frames): + cleaned_frame = self._session.process(frame, self._noise_suppression_level) + processed_samples[i * self._samples_per_frame : (i + 1) * self._samples_per_frame] = ( + cleaned_frame + ) + + return processed_samples.tobytes()