Compare commits
7 Commits
hush/moder
...
update-mod
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
50b19a9e77 | ||
|
|
f9d1a53e28 | ||
|
|
3f3010af79 | ||
|
|
a02d47ddbd | ||
|
|
a649aff3e7 | ||
|
|
d77c37ff14 | ||
|
|
b4916f9dae |
@@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Added
|
||||
|
||||
- Added `SmartTurnMetricsData`, which contains end-of-turn prediction metrics,
|
||||
to the `MetricsFrame`. Using `MetricsFrame`, you can now retrieve prediction
|
||||
confidence scores and processing time metrics from the smart turn analyzers.
|
||||
|
||||
- Added support for Application Default Credentials in Google services,
|
||||
`GoogleSTTService`, `GoogleTTSService`, and `GoogleVertexLLMService`.
|
||||
|
||||
|
||||
@@ -10,24 +10,27 @@ import aiohttp
|
||||
import modal
|
||||
from bot import _voice_bot_process
|
||||
from fastapi import HTTPException
|
||||
from fastapi.responses import JSONResponse
|
||||
from fastapi.responses import RedirectResponse
|
||||
from loguru import logger
|
||||
|
||||
MAX_SESSION_TIME = 15 * 60 # 15 minutes
|
||||
|
||||
app = modal.App("pipecat-modal")
|
||||
|
||||
|
||||
image = modal.Image.debian_slim(python_version="3.12").pip_install_from_requirements(
|
||||
"requirements.txt"
|
||||
image = (
|
||||
modal.Image.debian_slim(python_version="3.13")
|
||||
.apt_install("ffmpeg")
|
||||
.pip_install_from_requirements("requirements.txt")
|
||||
.pip_install("pipecat-ai[daily,silero,cartesia,openai]")
|
||||
.add_local_python_source("bot")
|
||||
)
|
||||
|
||||
app = modal.App("pipecat-modal", image=image)
|
||||
|
||||
|
||||
@app.function(
|
||||
image=image,
|
||||
cpu=1.0,
|
||||
secrets=[modal.Secret.from_dotenv()],
|
||||
keep_warm=1,
|
||||
min_containers=1,
|
||||
enable_memory_snapshot=True,
|
||||
max_inputs=1, # Do not reuse instances across requests
|
||||
retries=0,
|
||||
@@ -40,7 +43,7 @@ def launch_bot_process(room_url: str, token: str):
|
||||
image=image,
|
||||
secrets=[modal.Secret.from_dotenv()],
|
||||
)
|
||||
@modal.web_endpoint(method="POST")
|
||||
@modal.fastapi_endpoint(method="GET")
|
||||
async def start():
|
||||
from pipecat.transports.services.helpers.daily_rest import (
|
||||
DailyRESTHelper,
|
||||
@@ -77,4 +80,4 @@ async def start():
|
||||
|
||||
# Return room URL to the user to join
|
||||
# Note: in production, you would want to return a token to the user
|
||||
return JSONResponse(content={"room_url": room.url, token: token})
|
||||
return RedirectResponse(room.url)
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
python-dotenv==1.0.1
|
||||
modal==0.71.3
|
||||
pipecat-ai[daily,silero,cartesia,openai]==0.0.52
|
||||
fastapi==0.115.6
|
||||
aiohttp==3.11.11
|
||||
|
||||
@@ -1,211 +0,0 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import EndFrame, EndTaskFrame, Frame, TextFrame, TTSSpeakFrame
|
||||
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.gated_openai_llm_context import GatedOpenAILLMContextAggregator
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.filters.null_filter import NullFilter
|
||||
from pipecat.processors.filters.wake_notifier_filter import WakeNotifierFilter
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.processors.user_idle_processor import UserIdleProcessor
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.sync.event_notifier import EventNotifier
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
|
||||
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
class KickParticipantProcessor(FrameProcessor):
|
||||
"""This processor will kick the participant if they say something inappropriate.
|
||||
|
||||
This is a simple example of how to use the LLM to moderate
|
||||
the conversation. In this case we are using the OpenAI LLM to determine if
|
||||
the user is saying something inappropriate.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the processor."""
|
||||
super().__init__()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, TextFrame) and frame.text == "YES":
|
||||
await self.push_frame(
|
||||
TTSSpeakFrame(
|
||||
"You are being kicked from the call because of content moderation. Have a nice day!"
|
||||
)
|
||||
)
|
||||
|
||||
# Signal that the task should end after processing this frame
|
||||
await self.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
async def run_bot(webrtc_connection: SmallWebRTCConnection):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
transport = SmallWebRTCTransport(
|
||||
webrtc_connection=webrtc_connection,
|
||||
params=TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True,
|
||||
),
|
||||
)
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
)
|
||||
|
||||
# This is the LLM that will be used to detect if the user has said
|
||||
# something inappropriate.
|
||||
moderator_llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
|
||||
moderator_messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": """
|
||||
You are a helpful LLM that will be used to moderate a conversation
|
||||
between a user and an assistant. Your goal is to determine if the user
|
||||
is saying something inappropriate. You will be given the user
|
||||
transcript and you will have to determine if the user is saying
|
||||
something inappropriate. If you think the user is saying something
|
||||
inappropriate please respond with "YES". If you think the user is
|
||||
saying something appropriate please respond with "NO". Examples of inappropriate
|
||||
content are: hate speech, racism, sexism, bullying, harassment,
|
||||
violence, self-harm, and any other content that violates the
|
||||
community guidelines.
|
||||
""",
|
||||
},
|
||||
]
|
||||
|
||||
moderator_context = OpenAILLMContext(moderator_messages)
|
||||
moderator_context_aggregator = moderator_llm.create_context_aggregator(moderator_context)
|
||||
|
||||
# This is the regular LLM.
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
# We have instructed the LLM to return 'YES' if it thinks the user
|
||||
# completed a sentence. So, if it's 'YES' we will return true in this
|
||||
# predicate which will wake up the notifier.
|
||||
async def wake_check_filter(frame):
|
||||
return frame.text == "YES"
|
||||
|
||||
# This is a notifier that we use to synchronize the two LLMs.
|
||||
notifier = EventNotifier()
|
||||
|
||||
# This a filter that will wake up the notifier if the given predicate
|
||||
# (wake_check_filter) returns true.
|
||||
completeness_check = WakeNotifierFilter(notifier, types=(TextFrame,), filter=wake_check_filter)
|
||||
|
||||
# This processor keeps the last context and will let it through once the
|
||||
# notifier is woken up. We start with the gate open because we send an
|
||||
# initial context frame to start the conversation.
|
||||
gated_context_aggregator = GatedOpenAILLMContextAggregator(notifier=notifier, start_open=True)
|
||||
|
||||
# Notify if the user hasn't said anything.
|
||||
async def user_idle_notifier(frame):
|
||||
await notifier.notify()
|
||||
|
||||
# Sometimes the LLM will fail detecting if a user should be
|
||||
# moderated, this will wake up the notifier if that happens.
|
||||
user_idle = UserIdleProcessor(callback=user_idle_notifier, timeout=3.0)
|
||||
|
||||
kick_participant = KickParticipantProcessor()
|
||||
|
||||
# The ParallePipeline input are the user transcripts. We have two
|
||||
# contexts. The first one will be used to determine if the user is
|
||||
# moderated and if so the notifier will be woken up. The second
|
||||
# context is simply the regular context but it's gated waiting for the
|
||||
# notifier to be woken up.
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
ParallelPipeline(
|
||||
[
|
||||
moderator_context_aggregator.user(),
|
||||
moderator_llm,
|
||||
kick_participant,
|
||||
# completeness_check,
|
||||
NullFilter(),
|
||||
],
|
||||
[context_aggregator.user(), gated_context_aggregator, llm],
|
||||
),
|
||||
user_idle,
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
report_only_initial_ttfb=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
|
||||
@transport.event_handler("on_client_closed")
|
||||
async def on_client_closed(transport, client):
|
||||
logger.info(f"Client closed connection")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=False)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from run import main
|
||||
|
||||
main()
|
||||
@@ -6,14 +6,14 @@ build-backend = "setuptools.build_meta"
|
||||
name = "pipecat-ai"
|
||||
dynamic = ["version"]
|
||||
description = "An open source framework for voice (and multimodal) assistants"
|
||||
license = { text = "BSD 2-Clause License" }
|
||||
license = "BSD-2-Clause"
|
||||
license-files = ["LICENSE"]
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.10"
|
||||
keywords = ["webrtc", "audio", "video", "ai"]
|
||||
classifiers = [
|
||||
"Development Status :: 5 - Production/Stable",
|
||||
"Intended Audience :: Developers",
|
||||
"License :: OSI Approved :: BSD License",
|
||||
"Topic :: Communications :: Conferencing",
|
||||
"Topic :: Multimedia :: Sound/Audio",
|
||||
"Topic :: Multimedia :: Video",
|
||||
@@ -92,9 +92,11 @@ websocket = [ "websockets~=13.1", "fastapi~=0.115.6" ]
|
||||
whisper = [ "faster-whisper~=1.1.1" ]
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
# All the following settings are optional:
|
||||
where = ["src"]
|
||||
|
||||
[tool.setuptools.package-data]
|
||||
"pipecat" = ["py.typed"]
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
addopts = "--verbose"
|
||||
testpaths = ["tests"]
|
||||
|
||||
@@ -6,13 +6,14 @@
|
||||
|
||||
import time
|
||||
from abc import abstractmethod
|
||||
from typing import Dict, Optional
|
||||
from typing import Any, Dict, Optional, Tuple
|
||||
|
||||
import numpy as np
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.audio.turn.base_turn_analyzer import BaseTurnAnalyzer, EndOfTurnState
|
||||
from pipecat.metrics.metrics import MetricsData, SmartTurnMetricsData
|
||||
|
||||
# Default timing parameters
|
||||
STOP_SECS = 3
|
||||
@@ -61,7 +62,6 @@ class BaseSmartTurn(BaseTurnAnalyzer):
|
||||
self._speech_triggered = True
|
||||
if self._speech_start_time is None:
|
||||
self._speech_start_time = time.time()
|
||||
logger.debug(f"Speech started at {self._speech_start_time}")
|
||||
else:
|
||||
if self._speech_triggered:
|
||||
chunk_duration_ms = len(audio_int16) / (self._sample_rate / 1000)
|
||||
@@ -87,28 +87,25 @@ class BaseSmartTurn(BaseTurnAnalyzer):
|
||||
|
||||
return state
|
||||
|
||||
def analyze_end_of_turn(self) -> EndOfTurnState:
|
||||
logger.debug("Analyzing End of Turn...")
|
||||
state = self._process_speech_segment(self._audio_buffer)
|
||||
def analyze_end_of_turn(self) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
|
||||
state, result = self._process_speech_segment(self._audio_buffer)
|
||||
if state == EndOfTurnState.COMPLETE or USE_ONLY_LAST_VAD_SEGMENT:
|
||||
self._clear(state)
|
||||
logger.debug(f"End of Turn result: {state}")
|
||||
return state
|
||||
return state, result
|
||||
|
||||
def _clear(self, turn_state: EndOfTurnState):
|
||||
# Reset internal state for next turn
|
||||
logger.debug("Clearing audio buffer...")
|
||||
# If the state is still incomplete, keep the _speech_triggered as True
|
||||
self._speech_triggered = turn_state == EndOfTurnState.INCOMPLETE
|
||||
self._audio_buffer = []
|
||||
self._speech_start_time = None
|
||||
self._silence_ms = 0
|
||||
|
||||
def _process_speech_segment(self, audio_buffer) -> EndOfTurnState:
|
||||
def _process_speech_segment(self, audio_buffer) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
|
||||
state = EndOfTurnState.INCOMPLETE
|
||||
|
||||
if not audio_buffer:
|
||||
return state
|
||||
return state, None
|
||||
|
||||
# Extract recent audio segment for prediction
|
||||
start_time = self._speech_start_time - (self._params.pre_speech_ms / 1000)
|
||||
@@ -124,15 +121,13 @@ class BaseSmartTurn(BaseTurnAnalyzer):
|
||||
segment_audio_chunks = [chunk for _, chunk in audio_buffer[start_index : end_index + 1]]
|
||||
segment_audio = np.concatenate(segment_audio_chunks)
|
||||
|
||||
logger.debug(f"Segment audio chunks after start index: {len(segment_audio)}")
|
||||
|
||||
# Limit maximum duration
|
||||
max_samples = int(self._params.max_duration_secs * self.sample_rate)
|
||||
if len(segment_audio) > max_samples:
|
||||
# slices the array to keep the last max_samples samples, discarding the earlier part.
|
||||
segment_audio = segment_audio[-max_samples:]
|
||||
|
||||
logger.debug(f"Segment audio chunks after limiting duration: {len(segment_audio)}")
|
||||
result_data = None
|
||||
|
||||
if len(segment_audio) > 0:
|
||||
start_time = time.perf_counter()
|
||||
@@ -142,20 +137,33 @@ class BaseSmartTurn(BaseTurnAnalyzer):
|
||||
)
|
||||
end_time = time.perf_counter()
|
||||
|
||||
logger.debug("--------")
|
||||
logger.debug(f"Prediction: {'Complete' if result['prediction'] == 1 else 'Incomplete'}")
|
||||
logger.debug(f"Probability of complete: {result['probability']:.4f}")
|
||||
logger.debug(f"Prediction took {(end_time - start_time) * 1000:.2f}ms seconds")
|
||||
else:
|
||||
logger.debug(f"params: {self._params}, stop_ms: {self._stop_ms}")
|
||||
logger.debug("Captured empty audio segment, skipping prediction.")
|
||||
# Calculate processing time
|
||||
e2e_processing_time_ms = (end_time - start_time) * 1000
|
||||
|
||||
return state
|
||||
# Prepare the result data
|
||||
result_data = SmartTurnMetricsData(
|
||||
processor="BaseSmartTurn",
|
||||
is_complete=result["prediction"] == 1,
|
||||
probability=result["probability"],
|
||||
inference_time_ms=result.get("inference_time", 0) * 1000,
|
||||
server_total_time_ms=result.get("total_time", 0) * 1000,
|
||||
e2e_processing_time_ms=e2e_processing_time_ms,
|
||||
)
|
||||
|
||||
logger.trace(f"Prediction: {'Complete' if result_data.is_complete else 'Incomplete'}")
|
||||
logger.trace(f"Probability of complete: {result_data.probability:.4f}")
|
||||
logger.trace(f"Inference time: {result_data.inference_time_ms:.2f}ms")
|
||||
logger.trace(f"Server total time: {result_data.server_total_time_ms:.2f}ms")
|
||||
logger.trace(f"E2E processing time: {result_data.e2e_processing_time_ms:.2f}ms")
|
||||
else:
|
||||
logger.trace(f"params: {self._params}, stop_ms: {self._stop_ms}")
|
||||
logger.trace("Captured empty audio segment, skipping prediction.")
|
||||
|
||||
return state, result_data
|
||||
|
||||
@abstractmethod
|
||||
def _predict_endpoint(self, buffer: np.ndarray) -> Dict[str, any]:
|
||||
"""
|
||||
Abstract method to predict if a turn has ended based on audio.
|
||||
def _predict_endpoint(self, buffer: np.ndarray) -> Dict[str, Any]:
|
||||
"""Abstract method to predict if a turn has ended based on audio.
|
||||
|
||||
Args:
|
||||
buffer: Float32 numpy array of audio samples at 16kHz.
|
||||
|
||||
@@ -6,7 +6,9 @@
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from pipecat.metrics.metrics import MetricsData
|
||||
|
||||
|
||||
class EndOfTurnState(Enum):
|
||||
@@ -15,8 +17,10 @@ class EndOfTurnState(Enum):
|
||||
|
||||
|
||||
class BaseTurnAnalyzer(ABC):
|
||||
"""
|
||||
Abstract base class for analyzing user end of turn.
|
||||
"""Abstract base class for analyzing user end of turn.
|
||||
|
||||
This class inherits from BaseObject to leverage its event handling system
|
||||
while still defining an abstract interface through abstract methods.
|
||||
"""
|
||||
|
||||
def __init__(self, *, sample_rate: Optional[int] = None):
|
||||
@@ -25,8 +29,7 @@ class BaseTurnAnalyzer(ABC):
|
||||
|
||||
@property
|
||||
def sample_rate(self) -> int:
|
||||
"""
|
||||
Returns the current sample rate.
|
||||
"""Returns the current sample rate.
|
||||
|
||||
Returns:
|
||||
int: The effective sample rate for audio processing.
|
||||
@@ -34,8 +37,7 @@ class BaseTurnAnalyzer(ABC):
|
||||
return self._sample_rate
|
||||
|
||||
def set_sample_rate(self, sample_rate: int):
|
||||
"""
|
||||
Sets the sample rate for audio processing.
|
||||
"""Sets the sample rate for audio processing.
|
||||
|
||||
If the initial sample rate was provided, it will use that; otherwise, it sets to
|
||||
the provided sample rate.
|
||||
@@ -48,8 +50,7 @@ class BaseTurnAnalyzer(ABC):
|
||||
@property
|
||||
@abstractmethod
|
||||
def speech_triggered(self) -> bool:
|
||||
"""
|
||||
Determines if speech has been detected.
|
||||
"""Determines if speech has been detected.
|
||||
|
||||
Returns:
|
||||
bool: True if speech is triggered, otherwise False.
|
||||
@@ -58,8 +59,7 @@ class BaseTurnAnalyzer(ABC):
|
||||
|
||||
@abstractmethod
|
||||
def append_audio(self, buffer: bytes, is_speech: bool) -> EndOfTurnState:
|
||||
"""
|
||||
Appends audio data for analysis.
|
||||
"""Appends audio data for analysis.
|
||||
|
||||
Args:
|
||||
buffer (bytes): The audio data to append.
|
||||
@@ -71,9 +71,8 @@ class BaseTurnAnalyzer(ABC):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def analyze_end_of_turn(self) -> EndOfTurnState:
|
||||
"""
|
||||
Analyzes if an end of turn has occurred based on the audio input.
|
||||
def analyze_end_of_turn(self) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
|
||||
"""Analyzes if an end of turn has occurred based on the audio input.
|
||||
|
||||
Returns:
|
||||
EndOfTurnState: The result of the end of turn analysis.
|
||||
|
||||
@@ -30,3 +30,13 @@ class LLMUsageMetricsData(MetricsData):
|
||||
|
||||
class TTSUsageMetricsData(MetricsData):
|
||||
value: int
|
||||
|
||||
|
||||
class SmartTurnMetricsData(MetricsData):
|
||||
"""Metrics data for smart turn predictions."""
|
||||
|
||||
is_complete: bool
|
||||
probability: float
|
||||
inference_time_ms: float
|
||||
server_total_time_ms: float
|
||||
e2e_processing_time_ms: float
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
|
||||
@@ -70,7 +70,7 @@ class OpenAITTSService(TTSService):
|
||||
if sample_rate and sample_rate != self.OPENAI_SAMPLE_RATE:
|
||||
logger.warning(
|
||||
f"OpenAI TTS only supports {self.OPENAI_SAMPLE_RATE}Hz sample rate. "
|
||||
f"Current rate of {self.sample_rate}Hz may cause issues."
|
||||
f"Current rate of {sample_rate}Hz may cause issues."
|
||||
)
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
|
||||
@@ -6,11 +6,14 @@
|
||||
|
||||
import asyncio
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from typing import Optional
|
||||
from typing import Mapping, Optional
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.turn.base_turn_analyzer import BaseTurnAnalyzer, EndOfTurnState
|
||||
from pipecat.audio.turn.base_turn_analyzer import (
|
||||
BaseTurnAnalyzer,
|
||||
EndOfTurnState,
|
||||
)
|
||||
from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADState
|
||||
from pipecat.frames.frames import (
|
||||
BotInterruptionFrame,
|
||||
@@ -21,6 +24,7 @@ from pipecat.frames.frames import (
|
||||
FilterUpdateSettingsFrame,
|
||||
Frame,
|
||||
InputAudioRawFrame,
|
||||
MetricsFrame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
StopInterruptionFrame,
|
||||
@@ -29,6 +33,7 @@ from pipecat.frames.frames import (
|
||||
UserStoppedSpeakingFrame,
|
||||
VADParamsUpdateFrame,
|
||||
)
|
||||
from pipecat.metrics.metrics import MetricsData, SmartTurnMetricsData
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
|
||||
@@ -78,6 +83,7 @@ class BaseInputTransport(FrameProcessor):
|
||||
# Configure End of turn analyzer.
|
||||
if self._params.turn_analyzer:
|
||||
self._params.turn_analyzer.set_sample_rate(self._sample_rate)
|
||||
|
||||
# Start audio filter.
|
||||
if self._params.audio_in_filter:
|
||||
await self._params.audio_in_filter.start(self._sample_rate)
|
||||
@@ -216,9 +222,12 @@ class BaseInputTransport(FrameProcessor):
|
||||
|
||||
async def _handle_end_of_turn(self):
|
||||
if self.turn_analyzer:
|
||||
state = await self.get_event_loop().run_in_executor(
|
||||
state, prediction = await self.get_event_loop().run_in_executor(
|
||||
self._executor, self.turn_analyzer.analyze_end_of_turn
|
||||
)
|
||||
|
||||
await self._handle_prediction_result(prediction)
|
||||
|
||||
await self._handle_end_of_turn_complete(state)
|
||||
|
||||
async def _handle_end_of_turn_complete(self, state: EndOfTurnState):
|
||||
@@ -263,3 +272,11 @@ class BaseInputTransport(FrameProcessor):
|
||||
await self.push_frame(frame)
|
||||
|
||||
self._audio_in_queue.task_done()
|
||||
|
||||
async def _handle_prediction_result(self, result: MetricsData):
|
||||
"""Handle a prediction result event from the turn analyzer.
|
||||
|
||||
Args:
|
||||
result: The prediction result MetricsData.
|
||||
"""
|
||||
await self.push_frame(MetricsFrame(data=[result]))
|
||||
|
||||
Reference in New Issue
Block a user