Merge branch 'pipecat-ai:main' into patch-1
This commit is contained in:
43
CHANGELOG.md
43
CHANGELOG.md
@@ -9,6 +9,26 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Added
|
||||
|
||||
- Added `TransportParams.audio_out_silence_secs`, which specifies how many
|
||||
seconds of silence to output when an `EndFrame` reaches the output
|
||||
transport. This can help ensure that all audio data is fully delivered to
|
||||
clients.
|
||||
|
||||
- Added new `FrameProcessor.broadcast_frame()` method. This will push two
|
||||
instances of a given frame class, one upstream and the other downstream.
|
||||
|
||||
```python
|
||||
await self.broadcast_frame(UserSpeakingFrame)
|
||||
```
|
||||
|
||||
- Added `MetricsLogObserver` for logging performance metrics from `MetricsFrame`
|
||||
instances. Supports filtering via `include_metrics` parameter to control which
|
||||
metrics types are logged (TTFB, processing time, LLM token usage, TTS usage,
|
||||
smart turn metrics).
|
||||
|
||||
- Added `pronunciation_dictionary_locators` to `ElevenLabsTTSService` and
|
||||
`ElevenLabsHttpTTSService`.
|
||||
|
||||
- Added support for loading external observers. You can now register custom
|
||||
pipeline observers by setting the `PIPECAT_OBSERVER_FILES` environment
|
||||
variable. This variable should contain a colon-separated list of Python files
|
||||
@@ -28,8 +48,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
- `CancelFrame` and `CancelTaskFrame` have an optional `reason` field to
|
||||
indicate why the pipeline is being canceled. This can be also specified when
|
||||
you cancel a task with `PipelineTask.cancel(reason="cancellation your
|
||||
reason")`.
|
||||
you cancel a task with `PipelineTask.cancel(reason="cancellation reason")`.
|
||||
|
||||
- Added `include_prob_metrics` parameter to Whisper STT services to enable access
|
||||
to probability metrics from transcription results.
|
||||
@@ -56,8 +75,25 @@ reason")`.
|
||||
- Added support for passing in an `LLMSwicher` to `MCPClient.register_tools()`
|
||||
(as well as the new `MCPClient.register_tools_schema()`).
|
||||
|
||||
- Added `cpu_count` parameter to `LocalSmartTurnAnalyzerV3`. This is set to `1`
|
||||
by default for more predictable performance on low-CPU systems.
|
||||
|
||||
### Changed
|
||||
|
||||
- `STTMuteFilter` no longer sends `STTMuteFrame` to the STT service. The filter
|
||||
now blocks frames locally without instructing the STT service to stop
|
||||
processing audio. This prevents inactivity-related errors (such as 409 errors
|
||||
from Google STT) while maintaining the same muting behavior at the application
|
||||
level. Important: The STTMuteFilter should be placed _after_ the STT service
|
||||
itself.
|
||||
|
||||
- Improved `GoogleSTTService` error handling to properly catch gRPC `Aborted`
|
||||
exceptions (corresponding to 409 errors) caused by stream inactivity. These
|
||||
exceptions are now logged at DEBUG level instead of ERROR level, since they
|
||||
indicate expected behavior when no audio is sent for 10+ seconds (e.g., during
|
||||
long silences or when audio input is blocked). The service automatically
|
||||
reconnects when this occurs.
|
||||
|
||||
- Bumped the `fastapi` dependency's upperbound to `<0.122.0`.
|
||||
|
||||
- Updated the default model for `GoogleVertexLLMService` to `gemini-2.5-flash`.
|
||||
@@ -87,6 +123,9 @@ reason")`.
|
||||
- `GeminiLiveLLMService` now properly supports context-provided system
|
||||
instruction and tools.
|
||||
|
||||
- Fixed `GoogleLLMService` token counting to avoid double-counting tokens when
|
||||
Gemini sends usage metadata across multiple streaming chunks.
|
||||
|
||||
### Removed
|
||||
|
||||
- Removed `needs_mcp_alternate_schema()` from `LLMService`. The mechanism that
|
||||
|
||||
@@ -61,8 +61,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = GoogleSTTService(
|
||||
params=GoogleSTTService.InputParams(languages=Language.EN_US),
|
||||
params=GoogleSTTService.InputParams(languages=Language.EN_US, model="chirp_3"),
|
||||
credentials=os.getenv("GOOGLE_TEST_CREDENTIALS"),
|
||||
location="us",
|
||||
)
|
||||
|
||||
tts = GoogleTTSService(
|
||||
|
||||
@@ -35,12 +35,15 @@ class LocalSmartTurnAnalyzerV3(BaseSmartTurn):
|
||||
enabling offline operation without network dependencies.
|
||||
"""
|
||||
|
||||
def __init__(self, *, smart_turn_model_path: Optional[str] = None, **kwargs):
|
||||
def __init__(
|
||||
self, *, smart_turn_model_path: Optional[str] = None, cpu_count: int = 1, **kwargs
|
||||
):
|
||||
"""Initialize the local ONNX smart-turn-v3 analyzer.
|
||||
|
||||
Args:
|
||||
smart_turn_model_path: Path to the ONNX model file. If this is not
|
||||
set, the bundled smart-turn-v3.0 model will be used.
|
||||
cpu_count: The number of CPUs to use for inference. Defaults to 1.
|
||||
**kwargs: Additional arguments passed to BaseSmartTurn.
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
@@ -70,6 +73,7 @@ class LocalSmartTurnAnalyzerV3(BaseSmartTurn):
|
||||
so = ort.SessionOptions()
|
||||
so.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL
|
||||
so.inter_op_num_threads = 1
|
||||
so.intra_op_num_threads = cpu_count
|
||||
so.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
|
||||
|
||||
self._feature_extractor = WhisperFeatureExtractor(chunk_length=8)
|
||||
|
||||
218
src/pipecat/observers/loggers/metrics_log_observer.py
Normal file
218
src/pipecat/observers/loggers/metrics_log_observer.py
Normal file
@@ -0,0 +1,218 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Metrics logging observer for Pipecat.
|
||||
|
||||
This module provides an observer that logs metrics frames to the console,
|
||||
allowing developers to monitor performance metrics, token usage, and other
|
||||
statistics in real-time.
|
||||
"""
|
||||
|
||||
from typing import Optional, Set, Type
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import MetricsFrame
|
||||
from pipecat.metrics.metrics import (
|
||||
LLMTokenUsage,
|
||||
LLMUsageMetricsData,
|
||||
MetricsData,
|
||||
ProcessingMetricsData,
|
||||
SmartTurnMetricsData,
|
||||
TTFBMetricsData,
|
||||
TTSUsageMetricsData,
|
||||
)
|
||||
from pipecat.observers.base_observer import BaseObserver, FramePushed
|
||||
|
||||
|
||||
class MetricsLogObserver(BaseObserver):
|
||||
"""Observer to log metrics activity to the console.
|
||||
|
||||
Monitors and logs all MetricsFrame instances, including:
|
||||
|
||||
- TTFBMetricsData (Time To First Byte)
|
||||
- ProcessingMetricsData (General processing time)
|
||||
- LLMUsageMetricsData (Token usage statistics)
|
||||
- TTSUsageMetricsData (Text-to-Speech character counts)
|
||||
- SmartTurnMetricsData (Turn prediction metrics)
|
||||
|
||||
This allows developers to track performance metrics, token usage,
|
||||
and other statistics throughout the pipeline.
|
||||
|
||||
Examples:
|
||||
Log all metrics types::
|
||||
|
||||
observers = [MetricsLogObserver()]
|
||||
|
||||
Log only LLM and TTS metrics::
|
||||
|
||||
from pipecat.metrics.metrics import LLMUsageMetricsData, TTSUsageMetricsData
|
||||
observers = [
|
||||
MetricsLogObserver(
|
||||
include_metrics={LLMUsageMetricsData, TTSUsageMetricsData}
|
||||
)
|
||||
]
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
include_metrics: Optional[Set[Type[MetricsData]]] = None,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the metrics log observer.
|
||||
|
||||
Args:
|
||||
include_metrics: Set of metrics types to include. If specified, only these
|
||||
metrics types will be logged. If None, all metrics are logged.
|
||||
**kwargs: Additional arguments passed to parent class.
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
self._include_metrics = include_metrics
|
||||
self._frames_seen = set()
|
||||
|
||||
async def on_push_frame(self, data: FramePushed):
|
||||
"""Handle frame push events and log metrics frames.
|
||||
|
||||
Logs MetricsFrame instances with detailed information about the
|
||||
metrics data, formatted appropriately for each metrics type.
|
||||
|
||||
Args:
|
||||
data: Frame push event data containing source, frame, and timestamp.
|
||||
"""
|
||||
frame = data.frame
|
||||
timestamp = data.timestamp
|
||||
|
||||
if not isinstance(frame, MetricsFrame):
|
||||
return
|
||||
|
||||
# Skip frames we've already seen to avoid duplicate logging
|
||||
if frame.id in self._frames_seen:
|
||||
return
|
||||
|
||||
self._frames_seen.add(frame.id)
|
||||
|
||||
time_sec = timestamp / 1_000_000_000
|
||||
|
||||
# Process each metrics data item in the frame
|
||||
for metrics_data in frame.data:
|
||||
# Check if this metrics type should be logged
|
||||
if not self._should_log_metrics(metrics_data):
|
||||
continue
|
||||
|
||||
self._log_metrics_data(metrics_data, time_sec)
|
||||
|
||||
def _should_log_metrics(self, metrics_data: MetricsData) -> bool:
|
||||
"""Determine if a metrics data item should be logged based on filters.
|
||||
|
||||
Args:
|
||||
metrics_data: The metrics data to check.
|
||||
|
||||
Returns:
|
||||
True if the metrics should be logged, False otherwise.
|
||||
"""
|
||||
# If include_metrics is specified, only log those types
|
||||
if self._include_metrics is not None:
|
||||
return type(metrics_data) in self._include_metrics
|
||||
|
||||
# Otherwise, log all metrics
|
||||
return True
|
||||
|
||||
def _log_metrics_data(self, metrics_data: MetricsData, time_sec: float):
|
||||
"""Log a single metrics data item.
|
||||
|
||||
Args:
|
||||
metrics_data: The metrics data to log.
|
||||
time_sec: Timestamp in seconds.
|
||||
"""
|
||||
processor_info = f"[{metrics_data.processor}]"
|
||||
model_info = f" ({metrics_data.model})" if metrics_data.model else ""
|
||||
|
||||
if isinstance(metrics_data, TTFBMetricsData):
|
||||
logger.debug(
|
||||
f"📊 {processor_info} TTFB{model_info}: {metrics_data.value}s at {time_sec:.3f}s"
|
||||
)
|
||||
elif isinstance(metrics_data, ProcessingMetricsData):
|
||||
logger.debug(
|
||||
f"📊 {processor_info} PROCESSING TIME{model_info}: {metrics_data.value}s at {time_sec:.3f}s"
|
||||
)
|
||||
elif isinstance(metrics_data, LLMUsageMetricsData):
|
||||
self._log_llm_usage(metrics_data, processor_info, model_info, time_sec)
|
||||
elif isinstance(metrics_data, TTSUsageMetricsData):
|
||||
logger.debug(
|
||||
f"📊 {processor_info} TTS USAGE{model_info}: {metrics_data.value} characters at {time_sec:.3f}s"
|
||||
)
|
||||
elif isinstance(metrics_data, SmartTurnMetricsData):
|
||||
self._log_smart_turn(metrics_data, processor_info, model_info, time_sec)
|
||||
else:
|
||||
# Generic fallback for unknown metrics types
|
||||
logger.debug(
|
||||
f"📊 {processor_info} METRICS{model_info}: {metrics_data} at {time_sec:.3f}s"
|
||||
)
|
||||
|
||||
def _log_llm_usage(
|
||||
self,
|
||||
metrics_data: LLMUsageMetricsData,
|
||||
processor_info: str,
|
||||
model_info: str,
|
||||
time_sec: float,
|
||||
):
|
||||
"""Log LLM token usage metrics.
|
||||
|
||||
Args:
|
||||
metrics_data: The LLM usage metrics data.
|
||||
processor_info: Formatted processor name string.
|
||||
model_info: Formatted model name string.
|
||||
time_sec: Timestamp in seconds.
|
||||
"""
|
||||
usage: LLMTokenUsage = metrics_data.value
|
||||
|
||||
# Build usage details
|
||||
details = [
|
||||
f"prompt: {usage.prompt_tokens}",
|
||||
f"completion: {usage.completion_tokens}",
|
||||
f"total: {usage.total_tokens}",
|
||||
]
|
||||
|
||||
if usage.cache_read_input_tokens is not None:
|
||||
details.append(f"cache_read: {usage.cache_read_input_tokens}")
|
||||
|
||||
if usage.cache_creation_input_tokens is not None:
|
||||
details.append(f"cache_creation: {usage.cache_creation_input_tokens}")
|
||||
|
||||
if usage.reasoning_tokens is not None:
|
||||
details.append(f"reasoning: {usage.reasoning_tokens}")
|
||||
|
||||
usage_str = ", ".join(details)
|
||||
|
||||
logger.debug(
|
||||
f"📊 {processor_info} LLM TOKEN USAGE{model_info}: {usage_str} at {time_sec:.2f}s"
|
||||
)
|
||||
|
||||
def _log_smart_turn(
|
||||
self,
|
||||
metrics_data: SmartTurnMetricsData,
|
||||
processor_info: str,
|
||||
model_info: str,
|
||||
time_sec: float,
|
||||
):
|
||||
"""Log smart turn prediction metrics.
|
||||
|
||||
Args:
|
||||
metrics_data: The smart turn metrics data.
|
||||
processor_info: Formatted processor name string.
|
||||
model_info: Formatted model name string.
|
||||
time_sec: Timestamp in seconds.
|
||||
"""
|
||||
complete_str = "COMPLETE" if metrics_data.is_complete else "INCOMPLETE"
|
||||
|
||||
logger.debug(
|
||||
f"📊 {processor_info} SMART TURN{model_info}: {complete_str} "
|
||||
f"(probability: {metrics_data.probability:.2%}, "
|
||||
f"inference: {metrics_data.inference_time_ms:.1f}ms, "
|
||||
f"server: {metrics_data.server_total_time_ms:.1f}ms, "
|
||||
f"e2e: {metrics_data.e2e_processing_time_ms:.1f}ms) "
|
||||
f"at {time_sec:.2f}s"
|
||||
)
|
||||
@@ -27,7 +27,6 @@ from pipecat.frames.frames import (
|
||||
InterimTranscriptionFrame,
|
||||
InterruptionFrame,
|
||||
StartFrame,
|
||||
STTMuteFrame,
|
||||
TranscriptionFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
@@ -118,24 +117,16 @@ class STTMuteFilter(FrameProcessor):
|
||||
self._first_speech_handled = False
|
||||
self._bot_is_speaking = False
|
||||
self._function_call_in_progress = False
|
||||
self._is_muted = False # Initialize as unmuted, will set state on StartFrame if needed
|
||||
|
||||
@property
|
||||
def is_muted(self) -> bool:
|
||||
"""Check if STT is currently muted.
|
||||
|
||||
Returns:
|
||||
True if STT is currently muted and audio frames are being suppressed.
|
||||
"""
|
||||
return self._is_muted
|
||||
self._is_muted = False
|
||||
|
||||
async def _handle_mute_state(self, should_mute: bool):
|
||||
"""Handle STT muting and interruption control state changes."""
|
||||
if should_mute != self.is_muted:
|
||||
if should_mute != self._is_muted:
|
||||
logger.debug(f"STTMuteFilter {'muting' if should_mute else 'unmuting'}")
|
||||
self._is_muted = should_mute
|
||||
await self.push_frame(STTMuteFrame(mute=should_mute), FrameDirection.UPSTREAM)
|
||||
await self.push_frame(STTMuteFrame(mute=should_mute), FrameDirection.DOWNSTREAM)
|
||||
# Note: We don't send STTMuteFrame to the STT service itself.
|
||||
# The filter blocks frames locally, but the STT service continues
|
||||
# processing audio to keep streaming connections alive (e.g., Google STT).
|
||||
|
||||
async def _should_mute(self) -> bool:
|
||||
"""Determine if STT should be muted based on current state and strategies."""
|
||||
@@ -215,7 +206,7 @@ class STTMuteFilter(FrameProcessor):
|
||||
),
|
||||
):
|
||||
# Only pass VAD-related frames when not muted
|
||||
if not self.is_muted:
|
||||
if not self._is_muted:
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
logger.trace(f"{frame.__class__.__name__} suppressed - STT currently muted")
|
||||
@@ -224,5 +215,5 @@ class STTMuteFilter(FrameProcessor):
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
# Finally handle mute state change if needed
|
||||
if should_mute is not None and should_mute != self.is_muted:
|
||||
if should_mute is not None and should_mute != self._is_muted:
|
||||
await self._handle_mute_state(should_mute)
|
||||
|
||||
@@ -14,7 +14,7 @@ management, and frame flow control mechanisms.
|
||||
import asyncio
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Any, Awaitable, Callable, Coroutine, List, Optional, Sequence, Tuple
|
||||
from typing import Any, Awaitable, Callable, Coroutine, List, Optional, Sequence, Tuple, Type
|
||||
|
||||
from loguru import logger
|
||||
|
||||
@@ -689,6 +689,19 @@ class FrameProcessor(BaseObject):
|
||||
|
||||
self._wait_for_interruption = False
|
||||
|
||||
async def broadcast_frame(self, frame_cls: Type[Frame], **kwargs):
|
||||
"""Broadcasts a frame of the specified class upstream and downstream.
|
||||
|
||||
This method creates two instances of the given frame class using the
|
||||
provided keyword arguments and pushes them upstream and downstream.
|
||||
|
||||
Args:
|
||||
frame_cls: The class of the frame to be broadcasted.
|
||||
**kwargs: Keyword arguments to be passed to the frame's constructor.
|
||||
"""
|
||||
await self.push_frame(frame_cls(**kwargs))
|
||||
await self.push_frame(frame_cls(**kwargs), FrameDirection.UPSTREAM)
|
||||
|
||||
async def __start(self, frame: StartFrame):
|
||||
"""Handle the start frame to initialize processor state.
|
||||
|
||||
|
||||
@@ -526,8 +526,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
|
||||
"""
|
||||
logger.debug("User started speaking")
|
||||
await self.push_interruption_task_frame_and_wait()
|
||||
await self.push_frame(UserStartedSpeakingFrame(), FrameDirection.DOWNSTREAM)
|
||||
await self.push_frame(UserStartedSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
await self.broadcast_frame(UserStartedSpeakingFrame)
|
||||
await self.start_metrics()
|
||||
await self._call_event_handler("on_start_of_turn", transcript)
|
||||
if transcript:
|
||||
|
||||
@@ -157,6 +157,18 @@ def build_elevenlabs_voice_settings(
|
||||
return voice_settings or None
|
||||
|
||||
|
||||
class PronunciationDictionaryLocator(BaseModel):
|
||||
"""Locator for a pronunciation dictionary.
|
||||
|
||||
Attributes:
|
||||
pronunciation_dictionary_id: The ID of the pronunciation dictionary.
|
||||
version_id: The version ID of the pronunciation dictionary.
|
||||
"""
|
||||
|
||||
pronunciation_dictionary_id: str
|
||||
version_id: str
|
||||
|
||||
|
||||
def calculate_word_times(
|
||||
alignment_info: Mapping[str, Any],
|
||||
cumulative_time: float,
|
||||
@@ -239,6 +251,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
enable_ssml_parsing: Whether to parse SSML tags in text.
|
||||
enable_logging: Whether to enable ElevenLabs logging.
|
||||
apply_text_normalization: Text normalization mode ("auto", "on", "off").
|
||||
pronunciation_dictionary_locators: List of pronunciation dictionary locators to use.
|
||||
"""
|
||||
|
||||
language: Optional[Language] = None
|
||||
@@ -251,6 +264,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
enable_ssml_parsing: Optional[bool] = None
|
||||
enable_logging: Optional[bool] = None
|
||||
apply_text_normalization: Optional[Literal["auto", "on", "off"]] = None
|
||||
pronunciation_dictionary_locators: Optional[List[PronunciationDictionaryLocator]] = None
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -321,6 +335,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
self.set_voice(voice_id)
|
||||
self._output_format = "" # initialized in start()
|
||||
self._voice_settings = self._set_voice_settings()
|
||||
self._pronunciation_dictionary_locators = params.pronunciation_dictionary_locators
|
||||
|
||||
# Indicates if we have sent TTSStartedFrame. It will reset to False when
|
||||
# there's an interruption or TTSStoppedFrame.
|
||||
@@ -704,12 +719,17 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
if not self.audio_context_available(self._context_id):
|
||||
await self.create_audio_context(self._context_id)
|
||||
|
||||
# Initialize context with voice settings
|
||||
# Initialize context with voice settings and pronunciation dictionaries
|
||||
msg = {"text": " ", "context_id": self._context_id}
|
||||
if self._voice_settings:
|
||||
msg["voice_settings"] = self._voice_settings
|
||||
if self._pronunciation_dictionary_locators:
|
||||
msg["pronunciation_dictionary_locators"] = [
|
||||
locator.model_dump()
|
||||
for locator in self._pronunciation_dictionary_locators
|
||||
]
|
||||
await self._websocket.send(json.dumps(msg))
|
||||
logger.trace(f"Created new context {self._context_id} with voice settings")
|
||||
logger.trace(f"Created new context {self._context_id}")
|
||||
|
||||
await self._send_text(text)
|
||||
await self.start_tts_usage_metrics(text)
|
||||
@@ -745,6 +765,7 @@ class ElevenLabsHttpTTSService(WordTTSService):
|
||||
use_speaker_boost: Whether to use speaker boost enhancement.
|
||||
speed: Voice speed control (0.25 to 4.0).
|
||||
apply_text_normalization: Text normalization mode ("auto", "on", "off").
|
||||
pronunciation_dictionary_locators: List of pronunciation dictionary locators to use.
|
||||
"""
|
||||
|
||||
language: Optional[Language] = None
|
||||
@@ -755,6 +776,7 @@ class ElevenLabsHttpTTSService(WordTTSService):
|
||||
use_speaker_boost: Optional[bool] = None
|
||||
speed: Optional[float] = None
|
||||
apply_text_normalization: Optional[Literal["auto", "on", "off"]] = None
|
||||
pronunciation_dictionary_locators: Optional[List[PronunciationDictionaryLocator]] = None
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -813,6 +835,7 @@ class ElevenLabsHttpTTSService(WordTTSService):
|
||||
self.set_voice(voice_id)
|
||||
self._output_format = "" # initialized in start()
|
||||
self._voice_settings = self._set_voice_settings()
|
||||
self._pronunciation_dictionary_locators = params.pronunciation_dictionary_locators
|
||||
|
||||
# Track cumulative time to properly sequence word timestamps across utterances
|
||||
self._cumulative_time = 0
|
||||
@@ -977,6 +1000,11 @@ class ElevenLabsHttpTTSService(WordTTSService):
|
||||
if self._voice_settings:
|
||||
payload["voice_settings"] = self._voice_settings
|
||||
|
||||
if self._pronunciation_dictionary_locators:
|
||||
payload["pronunciation_dictionary_locators"] = [
|
||||
locator.model_dump() for locator in self._pronunciation_dictionary_locators
|
||||
]
|
||||
|
||||
if self._settings["apply_text_normalization"] is not None:
|
||||
payload["apply_text_normalization"] = self._settings["apply_text_normalization"]
|
||||
|
||||
|
||||
@@ -899,12 +899,18 @@ class GoogleLLMService(LLMService):
|
||||
async for chunk in response:
|
||||
# Stop TTFB metrics after the first chunk
|
||||
await self.stop_ttfb_metrics()
|
||||
# Gemini may send usage_metadata in multiple chunks with varying behavior:
|
||||
# - Sometimes a single chunk, sometimes multiple chunks
|
||||
# - Token counts may be cumulative (growing) or may change between chunks
|
||||
# - Early chunks may include estimates/overhead that gets refined
|
||||
# We use assignment (not accumulation) because the final chunk always contains
|
||||
# the authoritative, billable token usage for the entire response.
|
||||
if chunk.usage_metadata:
|
||||
prompt_tokens += chunk.usage_metadata.prompt_token_count or 0
|
||||
completion_tokens += chunk.usage_metadata.candidates_token_count or 0
|
||||
total_tokens += chunk.usage_metadata.total_token_count or 0
|
||||
cache_read_input_tokens += chunk.usage_metadata.cached_content_token_count or 0
|
||||
reasoning_tokens += chunk.usage_metadata.thoughts_token_count or 0
|
||||
prompt_tokens = chunk.usage_metadata.prompt_token_count or 0
|
||||
completion_tokens = chunk.usage_metadata.candidates_token_count or 0
|
||||
total_tokens = chunk.usage_metadata.total_token_count or 0
|
||||
cache_read_input_tokens = chunk.usage_metadata.cached_content_token_count or 0
|
||||
reasoning_tokens = chunk.usage_metadata.thoughts_token_count or 0
|
||||
|
||||
if not chunk.candidates:
|
||||
continue
|
||||
|
||||
@@ -41,6 +41,7 @@ from pipecat.utils.time import time_now_iso8601
|
||||
|
||||
try:
|
||||
from google.api_core.client_options import ClientOptions
|
||||
from google.api_core.exceptions import Aborted
|
||||
from google.auth import default
|
||||
from google.auth.exceptions import GoogleAuthError
|
||||
from google.cloud import speech_v2
|
||||
@@ -886,6 +887,18 @@ class GoogleSTTService(STTService):
|
||||
result=result,
|
||||
)
|
||||
)
|
||||
except Aborted as e:
|
||||
# Handle stream abort due to inactivity (409 error).
|
||||
# This occurs when no audio is sent to the stream for 10+ seconds,
|
||||
# which can happen when InputAudioRawFrames are blocked (e.g., by STTMuteFilter).
|
||||
# Google's STT service automatically closes the stream in this case.
|
||||
# We log at DEBUG level (not ERROR) since this is recoverable, then re-raise
|
||||
# to trigger automatic reconnection in _stream_audio.
|
||||
logger.debug(
|
||||
f"{self} Stream aborted due to inactivity (no audio input). "
|
||||
f"Reconnecting automatically..."
|
||||
)
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing Google STT responses: {e}")
|
||||
# Re-raise the exception to let it propagate (e.g. in the case of a
|
||||
|
||||
@@ -433,11 +433,7 @@ class LLMService(AIService):
|
||||
|
||||
await self._call_event_handler("on_function_calls_started", function_calls)
|
||||
|
||||
# Push frame both downstream and upstream
|
||||
started_frame_downstream = FunctionCallsStartedFrame(function_calls=function_calls)
|
||||
started_frame_upstream = FunctionCallsStartedFrame(function_calls=function_calls)
|
||||
await self.push_frame(started_frame_downstream, FrameDirection.DOWNSTREAM)
|
||||
await self.push_frame(started_frame_upstream, FrameDirection.UPSTREAM)
|
||||
await self.broadcast_frame(FunctionCallsStartedFrame, function_calls=function_calls)
|
||||
|
||||
for function_call in function_calls:
|
||||
if function_call.function_name in self._functions.keys():
|
||||
@@ -552,33 +548,24 @@ class LLMService(AIService):
|
||||
# NOTE(aleix): This needs to be removed after we remove the deprecation.
|
||||
await self._call_start_function(runner_item.context, runner_item.function_name)
|
||||
|
||||
# Push a function call in-progress downstream. This frame will let our
|
||||
# assistant context aggregator know that we are in the middle of a
|
||||
# function call. Some contexts/aggregators may not need this. But some
|
||||
# definitely do (Anthropic, for example). Also push it upstream for use
|
||||
# by other processors, like STTMuteFilter.
|
||||
progress_frame_downstream = FunctionCallInProgressFrame(
|
||||
# Broadcast function call in-progress. This frame will let our assistant
|
||||
# context aggregator know that we are in the middle of a function
|
||||
# call. Some contexts/aggregators may not need this. But some definitely
|
||||
# do (Anthropic, for example).
|
||||
await self.broadcast_frame(
|
||||
FunctionCallInProgressFrame,
|
||||
function_name=runner_item.function_name,
|
||||
tool_call_id=runner_item.tool_call_id,
|
||||
arguments=runner_item.arguments,
|
||||
cancel_on_interruption=item.cancel_on_interruption,
|
||||
)
|
||||
progress_frame_upstream = FunctionCallInProgressFrame(
|
||||
function_name=runner_item.function_name,
|
||||
tool_call_id=runner_item.tool_call_id,
|
||||
arguments=runner_item.arguments,
|
||||
cancel_on_interruption=item.cancel_on_interruption,
|
||||
)
|
||||
|
||||
# Push frame both downstream and upstream
|
||||
await self.push_frame(progress_frame_downstream, FrameDirection.DOWNSTREAM)
|
||||
await self.push_frame(progress_frame_upstream, FrameDirection.UPSTREAM)
|
||||
|
||||
# Define a callback function that pushes a FunctionCallResultFrame upstream & downstream.
|
||||
async def function_call_result_callback(
|
||||
result: Any, *, properties: Optional[FunctionCallResultProperties] = None
|
||||
):
|
||||
result_frame_downstream = FunctionCallResultFrame(
|
||||
await self.broadcast_frame(
|
||||
FunctionCallResultFrame,
|
||||
function_name=runner_item.function_name,
|
||||
tool_call_id=runner_item.tool_call_id,
|
||||
arguments=runner_item.arguments,
|
||||
@@ -586,17 +573,6 @@ class LLMService(AIService):
|
||||
run_llm=runner_item.run_llm,
|
||||
properties=properties,
|
||||
)
|
||||
result_frame_upstream = FunctionCallResultFrame(
|
||||
function_name=runner_item.function_name,
|
||||
tool_call_id=runner_item.tool_call_id,
|
||||
arguments=runner_item.arguments,
|
||||
result=result,
|
||||
run_llm=runner_item.run_llm,
|
||||
properties=properties,
|
||||
)
|
||||
|
||||
await self.push_frame(result_frame_downstream, FrameDirection.DOWNSTREAM)
|
||||
await self.push_frame(result_frame_upstream, FrameDirection.UPSTREAM)
|
||||
|
||||
if isinstance(item.handler, DirectFunctionWrapper):
|
||||
# Handler is a DirectFunctionWrapper
|
||||
|
||||
@@ -335,10 +335,7 @@ class BaseInputTransport(FrameProcessor):
|
||||
logger.debug("User started speaking")
|
||||
self._user_speaking = True
|
||||
|
||||
upstream_frame = UserStartedSpeakingFrame(emulated=emulated)
|
||||
downstream_frame = UserStartedSpeakingFrame(emulated=emulated)
|
||||
await self.push_frame(downstream_frame)
|
||||
await self.push_frame(upstream_frame, FrameDirection.UPSTREAM)
|
||||
await self.broadcast_frame(UserStartedSpeakingFrame, emulated=emulated)
|
||||
|
||||
# Only push InterruptionFrame if:
|
||||
# 1. No interruption config is set, OR
|
||||
@@ -359,10 +356,7 @@ class BaseInputTransport(FrameProcessor):
|
||||
logger.debug("User stopped speaking")
|
||||
self._user_speaking = False
|
||||
|
||||
upstream_frame = UserStoppedSpeakingFrame(emulated=emulated)
|
||||
downstream_frame = UserStoppedSpeakingFrame(emulated=emulated)
|
||||
await self.push_frame(downstream_frame)
|
||||
await self.push_frame(upstream_frame, FrameDirection.UPSTREAM)
|
||||
await self.broadcast_frame(UserStoppedSpeakingFrame, emulated=emulated)
|
||||
|
||||
#
|
||||
# Handle bot speaking state
|
||||
@@ -479,8 +473,7 @@ class BaseInputTransport(FrameProcessor):
|
||||
await self._run_turn_analyzer(frame, vad_state, previous_vad_state)
|
||||
|
||||
if vad_state == VADState.SPEAKING:
|
||||
await self.push_frame(UserSpeakingFrame())
|
||||
await self.push_frame(UserSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
await self.broadcast_frame(UserSpeakingFrame)
|
||||
|
||||
# Push audio downstream if passthrough is set.
|
||||
if self._params.audio_in_passthrough:
|
||||
|
||||
@@ -729,11 +729,24 @@ class BaseOutputTransport(FrameProcessor):
|
||||
else:
|
||||
return without_mixer(BOT_VAD_STOP_SECS)
|
||||
|
||||
async def _send_silence(self, secs: int):
|
||||
if secs <= 0:
|
||||
return
|
||||
|
||||
sample_width = 2
|
||||
silence = b"\x00" * self.sample_rate * sample_width * secs
|
||||
silence_frame = OutputAudioRawFrame(
|
||||
audio=silence, sample_rate=self.sample_rate, num_channels=1
|
||||
)
|
||||
await self._transport.write_audio_frame(silence_frame)
|
||||
|
||||
async def _audio_task_handler(self):
|
||||
"""Main audio processing task handler."""
|
||||
async for frame in self._next_frame():
|
||||
# No need to push EndFrame, it's pushed from process_frame().
|
||||
if isinstance(frame, EndFrame):
|
||||
# Send some final silence so words don't cut out.
|
||||
await self._send_silence(self._params.audio_out_end_silence_secs)
|
||||
break
|
||||
|
||||
# Handle frame.
|
||||
|
||||
@@ -83,6 +83,7 @@ class TransportParams(BaseModel):
|
||||
audio_out_10ms_chunks: Number of 10ms chunks to buffer for output.
|
||||
audio_out_mixer: Audio mixer instance or destination mapping.
|
||||
audio_out_destinations: List of audio output destination identifiers.
|
||||
audio_out_end_silence_secs: How much silence to send after an EndFrame (0 for no silence).
|
||||
audio_in_enabled: Enable audio input streaming.
|
||||
audio_in_sample_rate: Input audio sample rate in Hz.
|
||||
audio_in_channels: Number of input audio channels.
|
||||
@@ -131,6 +132,7 @@ class TransportParams(BaseModel):
|
||||
audio_out_10ms_chunks: int = 4
|
||||
audio_out_mixer: Optional[BaseAudioMixer | Mapping[Optional[str], BaseAudioMixer]] = None
|
||||
audio_out_destinations: List[str] = Field(default_factory=list)
|
||||
audio_out_end_silence_secs: int = 2
|
||||
audio_in_enabled: bool = False
|
||||
audio_in_sample_rate: Optional[int] = None
|
||||
audio_in_channels: int = 1
|
||||
|
||||
@@ -13,7 +13,6 @@ from pipecat.frames.frames import (
|
||||
FunctionCallResultFrame,
|
||||
InputAudioRawFrame,
|
||||
InterimTranscriptionFrame,
|
||||
STTMuteFrame,
|
||||
TranscriptionFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
@@ -49,9 +48,7 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
expected_returned_frames = [
|
||||
BotStartedSpeakingFrame,
|
||||
STTMuteFrame, # mute=True
|
||||
BotStoppedSpeakingFrame,
|
||||
STTMuteFrame, # mute=False
|
||||
BotStartedSpeakingFrame,
|
||||
VADUserStartedSpeakingFrame, # Now passes through
|
||||
UserStartedSpeakingFrame, # Now passes through
|
||||
@@ -98,18 +95,14 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
expected_returned_frames = [
|
||||
BotStartedSpeakingFrame,
|
||||
STTMuteFrame, # mute=True
|
||||
BotStoppedSpeakingFrame,
|
||||
STTMuteFrame, # mute=False
|
||||
VADUserStartedSpeakingFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
InputAudioRawFrame,
|
||||
VADUserStoppedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
BotStartedSpeakingFrame,
|
||||
STTMuteFrame, # mute=True
|
||||
BotStoppedSpeakingFrame,
|
||||
STTMuteFrame, # mute=False
|
||||
]
|
||||
|
||||
await run_test(
|
||||
@@ -144,9 +137,7 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
expected_returned_frames = [
|
||||
BotStartedSpeakingFrame,
|
||||
STTMuteFrame, # mute=True
|
||||
BotStoppedSpeakingFrame,
|
||||
STTMuteFrame, # mute=False
|
||||
InterimTranscriptionFrame, # Only passes through after bot stops speaking
|
||||
TranscriptionFrame, # Only passes through after bot stops speaking
|
||||
]
|
||||
@@ -193,9 +184,7 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
|
||||
# VADUserStoppedSpeakingFrame,
|
||||
# UserStoppedSpeakingFrame,
|
||||
# FunctionCallInProgressFrame,
|
||||
# STTMuteFrame, # mute=True
|
||||
# FunctionCallResultFrame,
|
||||
# STTMuteFrame, # mute=False
|
||||
# VADUserStartedSpeakingFrame,
|
||||
# UserStartedSpeakingFrame,
|
||||
# VADUserStoppedSpeakingFrame,
|
||||
@@ -245,10 +234,8 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
|
||||
]
|
||||
|
||||
expected_returned_frames = [
|
||||
STTMuteFrame, # mute=True after first speech
|
||||
BotStartedSpeakingFrame,
|
||||
BotStoppedSpeakingFrame,
|
||||
STTMuteFrame, # mute=False after first speech
|
||||
VADUserStartedSpeakingFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
InputAudioRawFrame,
|
||||
@@ -320,9 +307,7 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
|
||||
VADUserStoppedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
BotStartedSpeakingFrame,
|
||||
STTMuteFrame, # mute=True
|
||||
BotStoppedSpeakingFrame,
|
||||
STTMuteFrame, # mute=False
|
||||
VADUserStartedSpeakingFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
InputAudioRawFrame,
|
||||
|
||||
Reference in New Issue
Block a user