add adaptive audio enhancement example and support for runtime enhancement level updates in AICFilter.

This commit is contained in:
Gökmen Görgen
2026-04-24 09:07:31 +02:00
parent f75f361629
commit a2fbed86cf
2 changed files with 231 additions and 2 deletions

View File

@@ -0,0 +1,212 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Voice assistant with LLM-controlled audio enhancement.
Demonstrates how an LLM can dynamically adjust ai-coustics audio enhancement
in response to user feedback during a call. The LLM receives a
`set_audio_enhancement_level` tool and uses it whenever the user reports audio
quality issues. The tool pushes a `FilterUpdateSettingsFrame` into the pipeline,
which the transport's input stage forwards to the `AICFilter` instance.
Required env vars:
AICOUSTICS_LICENSE_KEY ai-coustics SDK license key
ANTHROPIC_API_KEY Anthropic API key
DEEPGRAM_API_KEY Deepgram STT key
CARTESIA_API_KEY Cartesia TTS key
Optional env vars:
AICOUSTICS_MODEL_ID Enhancement model ID (default: quail-vf-2.1-l-16khz)
"""
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.filters.aic_filter import AICFilter
from pipecat.frames.frames import FilterUpdateSettingsFrame, LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.anthropic.llm import AnthropicLLMService
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
_DEFAULT_ENHANCEMENT_LEVEL = 0.5
_MODEL_ID = os.getenv("AICOUSTICS_MODEL_ID", "quail-vf-2.1-l-16khz")
aic_filter = AICFilter(
license_key=os.getenv("AICOUSTICS_LICENSE_KEY", ""),
model_id=_MODEL_ID,
enhancement_level=_DEFAULT_ENHANCEMENT_LEVEL,
)
aic_vad = aic_filter.create_vad_analyzer(speech_hold_duration=0.05, sensitivity=6.0)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_filter=aic_filter,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_filter=aic_filter,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_filter=aic_filter,
),
}
_set_enhancement_schema = FunctionSchema(
name="set_audio_enhancement_level",
description=(
"Adjust the ai-coustics audio enhancement strength for the caller's microphone. "
"Use this when the user reports audio quality issues such as background noise, "
"echo, or difficulty being heard. Higher values apply stronger enhancement."
),
properties={
"level": {
"type": "number",
"description": "Enhancement strength between 0.0 (off) and 1.0 (maximum).",
},
"reason": {
"type": "string",
"description": "Brief reason for the change, for logging purposes.",
},
},
required=["level"],
)
_SYSTEM_PROMPT = f"""\
You are a helpful voice assistant.
You have a `set_audio_enhancement_level` tool that controls the ai-coustics audio \
enhancement applied to the caller's microphone input. The current level is \
{_DEFAULT_ENHANCEMENT_LEVEL}.
Use the tool proactively when:
- The user says they can't be heard, the audio is noisy, or asks you to improve the sound quality.
- You detect repeated misunderstandings that may be caused by poor audio.
- The user asks to "boost", "improve", "fix", or "turn up" audio quality.
After adjusting, briefly confirm the change in one sentence.
Your output will be spoken aloud. Avoid bullet points, emojis, or markdown formatting.
"""
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info("Starting bot")
stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"])
tts = CartesiaTTSService(
api_key=os.environ["CARTESIA_API_KEY"],
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = AnthropicLLMService(
api_key=os.environ["ANTHROPIC_API_KEY"],
settings=AnthropicLLMService.Settings(
system_instruction=_SYSTEM_PROMPT,
),
)
# task is defined below; capture it via a mutable cell so the handler closure can
# reference it before the variable is assigned.
task_ref: list[PipelineTask] = []
async def set_audio_enhancement_level(params: FunctionCallParams):
level = float(params.arguments["level"])
reason = params.arguments.get("reason", "")
if task_ref:
await task_ref[0].queue_frames(
[FilterUpdateSettingsFrame(settings={"enhancement_level": level})]
)
logger.info(f"Audio enhancement → {level}" + (f" ({reason})" if reason else ""))
await params.result_callback(f"Audio enhancement level set to {level}.")
llm.register_function("set_audio_enhancement_level", set_audio_enhancement_level)
tools = ToolsSchema(standard_tools=[_set_enhancement_schema])
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=aic_vad),
)
pipeline = Pipeline(
[
transport.input(),
stt,
user_aggregator,
llm,
tts,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
task_ref.append(task)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Client connected")
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -32,7 +32,7 @@ from loguru import logger
from pipecat.audio.filters.base_audio_filter import BaseAudioFilter
from pipecat.audio.vad.aic_vad import AICVADAnalyzer
from pipecat.frames.frames import FilterControlFrame, FilterEnableFrame
from pipecat.frames.frames import FilterControlFrame, FilterEnableFrame, FilterUpdateSettingsFrame
class AICModelManager:
@@ -446,7 +446,13 @@ class AICFilter(BaseAudioFilter):
self._model_cache_key = None
async def process_frame(self, frame: FilterControlFrame):
"""Process control frames to enable/disable filtering.
"""Process control frames to enable/disable filtering or update settings.
Handles ``FilterEnableFrame`` (bypass toggle) and ``FilterUpdateSettingsFrame``
with the following keys:
- ``enhancement_level`` (float, 0.01.0): Adjust enhancement strength at runtime.
- ``bypass`` (bool): Enable or disable the filter at runtime.
Args:
frame: The control frame containing filter commands.
@@ -462,6 +468,17 @@ class AICFilter(BaseAudioFilter):
self._apply_enhancement_level()
except Exception as e: # noqa: BLE001
logger.error(f"AIC set_parameter failed: {e}")
elif isinstance(frame, FilterUpdateSettingsFrame):
if "enhancement_level" in frame.settings:
val = float(frame.settings["enhancement_level"])
if 0.0 <= val <= 1.0:
self._enhancement_level = val
self._apply_enhancement_level()
else:
logger.warning(f"AIC enhancement_level {val} out of range [0.0, 1.0]; ignored.")
if "bypass" in frame.settings:
self._bypass = bool(frame.settings["bypass"])
self._apply_bypass()
async def filter(self, audio: bytes) -> bytes:
"""Apply AIC enhancement to audio data.