Add a SmartTurnMetricsData class, emitted by Metrics Frame in response to smart turn responses
This commit is contained in:
@@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Added
|
||||
|
||||
- Added `SmartTurnMetricsData`, which contains end-of-turn prediction metrics,
|
||||
to the `MetricsFrame`. Using `MetricsFrame`, you can now retrieve prediction
|
||||
confidence scores and processing time metrics from the smart turn analyzers.
|
||||
|
||||
- Added support for Application Default Credentials in Google services,
|
||||
`GoogleSTTService`, `GoogleTTSService`, and `GoogleVertexLLMService`.
|
||||
|
||||
|
||||
@@ -6,13 +6,14 @@
|
||||
|
||||
import time
|
||||
from abc import abstractmethod
|
||||
from typing import Dict, Optional
|
||||
from typing import Any, Dict, Optional, Tuple
|
||||
|
||||
import numpy as np
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.audio.turn.base_turn_analyzer import BaseTurnAnalyzer, EndOfTurnState
|
||||
from pipecat.metrics.metrics import MetricsData, SmartTurnMetricsData
|
||||
|
||||
# Default timing parameters
|
||||
STOP_SECS = 3
|
||||
@@ -61,7 +62,6 @@ class BaseSmartTurn(BaseTurnAnalyzer):
|
||||
self._speech_triggered = True
|
||||
if self._speech_start_time is None:
|
||||
self._speech_start_time = time.time()
|
||||
logger.debug(f"Speech started at {self._speech_start_time}")
|
||||
else:
|
||||
if self._speech_triggered:
|
||||
chunk_duration_ms = len(audio_int16) / (self._sample_rate / 1000)
|
||||
@@ -87,28 +87,25 @@ class BaseSmartTurn(BaseTurnAnalyzer):
|
||||
|
||||
return state
|
||||
|
||||
def analyze_end_of_turn(self) -> EndOfTurnState:
|
||||
logger.debug("Analyzing End of Turn...")
|
||||
state = self._process_speech_segment(self._audio_buffer)
|
||||
def analyze_end_of_turn(self) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
|
||||
state, result = self._process_speech_segment(self._audio_buffer)
|
||||
if state == EndOfTurnState.COMPLETE or USE_ONLY_LAST_VAD_SEGMENT:
|
||||
self._clear(state)
|
||||
logger.debug(f"End of Turn result: {state}")
|
||||
return state
|
||||
return state, result
|
||||
|
||||
def _clear(self, turn_state: EndOfTurnState):
|
||||
# Reset internal state for next turn
|
||||
logger.debug("Clearing audio buffer...")
|
||||
# If the state is still incomplete, keep the _speech_triggered as True
|
||||
self._speech_triggered = turn_state == EndOfTurnState.INCOMPLETE
|
||||
self._audio_buffer = []
|
||||
self._speech_start_time = None
|
||||
self._silence_ms = 0
|
||||
|
||||
def _process_speech_segment(self, audio_buffer) -> EndOfTurnState:
|
||||
def _process_speech_segment(self, audio_buffer) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
|
||||
state = EndOfTurnState.INCOMPLETE
|
||||
|
||||
if not audio_buffer:
|
||||
return state
|
||||
return state, None
|
||||
|
||||
# Extract recent audio segment for prediction
|
||||
start_time = self._speech_start_time - (self._params.pre_speech_ms / 1000)
|
||||
@@ -124,15 +121,13 @@ class BaseSmartTurn(BaseTurnAnalyzer):
|
||||
segment_audio_chunks = [chunk for _, chunk in audio_buffer[start_index : end_index + 1]]
|
||||
segment_audio = np.concatenate(segment_audio_chunks)
|
||||
|
||||
logger.debug(f"Segment audio chunks after start index: {len(segment_audio)}")
|
||||
|
||||
# Limit maximum duration
|
||||
max_samples = int(self._params.max_duration_secs * self.sample_rate)
|
||||
if len(segment_audio) > max_samples:
|
||||
# slices the array to keep the last max_samples samples, discarding the earlier part.
|
||||
segment_audio = segment_audio[-max_samples:]
|
||||
|
||||
logger.debug(f"Segment audio chunks after limiting duration: {len(segment_audio)}")
|
||||
result_data = None
|
||||
|
||||
if len(segment_audio) > 0:
|
||||
start_time = time.perf_counter()
|
||||
@@ -142,20 +137,33 @@ class BaseSmartTurn(BaseTurnAnalyzer):
|
||||
)
|
||||
end_time = time.perf_counter()
|
||||
|
||||
logger.debug("--------")
|
||||
logger.debug(f"Prediction: {'Complete' if result['prediction'] == 1 else 'Incomplete'}")
|
||||
logger.debug(f"Probability of complete: {result['probability']:.4f}")
|
||||
logger.debug(f"Prediction took {(end_time - start_time) * 1000:.2f}ms seconds")
|
||||
else:
|
||||
logger.debug(f"params: {self._params}, stop_ms: {self._stop_ms}")
|
||||
logger.debug("Captured empty audio segment, skipping prediction.")
|
||||
# Calculate processing time
|
||||
e2e_processing_time_ms = (end_time - start_time) * 1000
|
||||
|
||||
return state
|
||||
# Prepare the result data
|
||||
result_data = SmartTurnMetricsData(
|
||||
processor="BaseSmartTurn",
|
||||
is_complete=result["prediction"] == 1,
|
||||
probability=result["probability"],
|
||||
inference_time_ms=result.get("inference_time", 0) * 1000,
|
||||
server_total_time_ms=result.get("total_time", 0) * 1000,
|
||||
e2e_processing_time_ms=e2e_processing_time_ms,
|
||||
)
|
||||
|
||||
logger.trace(f"Prediction: {'Complete' if result_data.is_complete else 'Incomplete'}")
|
||||
logger.trace(f"Probability of complete: {result_data.probability:.4f}")
|
||||
logger.trace(f"Inference time: {result_data.inference_time_ms:.2f}ms")
|
||||
logger.trace(f"Server total time: {result_data.server_total_time_ms:.2f}ms")
|
||||
logger.trace(f"E2E processing time: {result_data.e2e_processing_time_ms:.2f}ms")
|
||||
else:
|
||||
logger.trace(f"params: {self._params}, stop_ms: {self._stop_ms}")
|
||||
logger.trace("Captured empty audio segment, skipping prediction.")
|
||||
|
||||
return state, result_data
|
||||
|
||||
@abstractmethod
|
||||
def _predict_endpoint(self, buffer: np.ndarray) -> Dict[str, any]:
|
||||
"""
|
||||
Abstract method to predict if a turn has ended based on audio.
|
||||
def _predict_endpoint(self, buffer: np.ndarray) -> Dict[str, Any]:
|
||||
"""Abstract method to predict if a turn has ended based on audio.
|
||||
|
||||
Args:
|
||||
buffer: Float32 numpy array of audio samples at 16kHz.
|
||||
|
||||
@@ -6,7 +6,9 @@
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from pipecat.metrics.metrics import MetricsData
|
||||
|
||||
|
||||
class EndOfTurnState(Enum):
|
||||
@@ -15,8 +17,10 @@ class EndOfTurnState(Enum):
|
||||
|
||||
|
||||
class BaseTurnAnalyzer(ABC):
|
||||
"""
|
||||
Abstract base class for analyzing user end of turn.
|
||||
"""Abstract base class for analyzing user end of turn.
|
||||
|
||||
This class inherits from BaseObject to leverage its event handling system
|
||||
while still defining an abstract interface through abstract methods.
|
||||
"""
|
||||
|
||||
def __init__(self, *, sample_rate: Optional[int] = None):
|
||||
@@ -25,8 +29,7 @@ class BaseTurnAnalyzer(ABC):
|
||||
|
||||
@property
|
||||
def sample_rate(self) -> int:
|
||||
"""
|
||||
Returns the current sample rate.
|
||||
"""Returns the current sample rate.
|
||||
|
||||
Returns:
|
||||
int: The effective sample rate for audio processing.
|
||||
@@ -34,8 +37,7 @@ class BaseTurnAnalyzer(ABC):
|
||||
return self._sample_rate
|
||||
|
||||
def set_sample_rate(self, sample_rate: int):
|
||||
"""
|
||||
Sets the sample rate for audio processing.
|
||||
"""Sets the sample rate for audio processing.
|
||||
|
||||
If the initial sample rate was provided, it will use that; otherwise, it sets to
|
||||
the provided sample rate.
|
||||
@@ -48,8 +50,7 @@ class BaseTurnAnalyzer(ABC):
|
||||
@property
|
||||
@abstractmethod
|
||||
def speech_triggered(self) -> bool:
|
||||
"""
|
||||
Determines if speech has been detected.
|
||||
"""Determines if speech has been detected.
|
||||
|
||||
Returns:
|
||||
bool: True if speech is triggered, otherwise False.
|
||||
@@ -58,8 +59,7 @@ class BaseTurnAnalyzer(ABC):
|
||||
|
||||
@abstractmethod
|
||||
def append_audio(self, buffer: bytes, is_speech: bool) -> EndOfTurnState:
|
||||
"""
|
||||
Appends audio data for analysis.
|
||||
"""Appends audio data for analysis.
|
||||
|
||||
Args:
|
||||
buffer (bytes): The audio data to append.
|
||||
@@ -71,9 +71,8 @@ class BaseTurnAnalyzer(ABC):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def analyze_end_of_turn(self) -> EndOfTurnState:
|
||||
"""
|
||||
Analyzes if an end of turn has occurred based on the audio input.
|
||||
def analyze_end_of_turn(self) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
|
||||
"""Analyzes if an end of turn has occurred based on the audio input.
|
||||
|
||||
Returns:
|
||||
EndOfTurnState: The result of the end of turn analysis.
|
||||
|
||||
@@ -30,3 +30,13 @@ class LLMUsageMetricsData(MetricsData):
|
||||
|
||||
class TTSUsageMetricsData(MetricsData):
|
||||
value: int
|
||||
|
||||
|
||||
class SmartTurnMetricsData(MetricsData):
|
||||
"""Metrics data for smart turn predictions."""
|
||||
|
||||
is_complete: bool
|
||||
probability: float
|
||||
inference_time_ms: float
|
||||
server_total_time_ms: float
|
||||
e2e_processing_time_ms: float
|
||||
|
||||
@@ -6,11 +6,14 @@
|
||||
|
||||
import asyncio
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from typing import Optional
|
||||
from typing import Mapping, Optional
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.turn.base_turn_analyzer import BaseTurnAnalyzer, EndOfTurnState
|
||||
from pipecat.audio.turn.base_turn_analyzer import (
|
||||
BaseTurnAnalyzer,
|
||||
EndOfTurnState,
|
||||
)
|
||||
from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADState
|
||||
from pipecat.frames.frames import (
|
||||
BotInterruptionFrame,
|
||||
@@ -21,6 +24,7 @@ from pipecat.frames.frames import (
|
||||
FilterUpdateSettingsFrame,
|
||||
Frame,
|
||||
InputAudioRawFrame,
|
||||
MetricsFrame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
StopInterruptionFrame,
|
||||
@@ -29,6 +33,7 @@ from pipecat.frames.frames import (
|
||||
UserStoppedSpeakingFrame,
|
||||
VADParamsUpdateFrame,
|
||||
)
|
||||
from pipecat.metrics.metrics import MetricsData, SmartTurnMetricsData
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
|
||||
@@ -78,6 +83,7 @@ class BaseInputTransport(FrameProcessor):
|
||||
# Configure End of turn analyzer.
|
||||
if self._params.turn_analyzer:
|
||||
self._params.turn_analyzer.set_sample_rate(self._sample_rate)
|
||||
|
||||
# Start audio filter.
|
||||
if self._params.audio_in_filter:
|
||||
await self._params.audio_in_filter.start(self._sample_rate)
|
||||
@@ -216,9 +222,12 @@ class BaseInputTransport(FrameProcessor):
|
||||
|
||||
async def _handle_end_of_turn(self):
|
||||
if self.turn_analyzer:
|
||||
state = await self.get_event_loop().run_in_executor(
|
||||
state, prediction = await self.get_event_loop().run_in_executor(
|
||||
self._executor, self.turn_analyzer.analyze_end_of_turn
|
||||
)
|
||||
|
||||
await self._handle_prediction_result(prediction)
|
||||
|
||||
await self._handle_end_of_turn_complete(state)
|
||||
|
||||
async def _handle_end_of_turn_complete(self, state: EndOfTurnState):
|
||||
@@ -263,3 +272,11 @@ class BaseInputTransport(FrameProcessor):
|
||||
await self.push_frame(frame)
|
||||
|
||||
self._audio_in_queue.task_done()
|
||||
|
||||
async def _handle_prediction_result(self, result: MetricsData):
|
||||
"""Handle a prediction result event from the turn analyzer.
|
||||
|
||||
Args:
|
||||
result: The prediction result MetricsData.
|
||||
"""
|
||||
await self.push_frame(MetricsFrame(data=[result]))
|
||||
|
||||
Reference in New Issue
Block a user