From 34d6f3fa00d0c09f67a89d1c5046a8c181f317bb Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 6 Nov 2025 10:01:37 -0500 Subject: [PATCH 1/8] fix: correct GoogleLLMService token counting --- CHANGELOG.md | 3 +++ src/pipecat/services/google/llm.py | 16 +++++++++++----- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c7d7c5d97..fe200345b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -87,6 +87,9 @@ reason")`. - `GeminiLiveLLMService` now properly supports context-provided system instruction and tools. +- Fixed `GoogleLLMService` token counting to avoid double-counting tokens when + Gemini sends usage metadata across multiple streaming chunks. + ### Removed - Removed `needs_mcp_alternate_schema()` from `LLMService`. The mechanism that diff --git a/src/pipecat/services/google/llm.py b/src/pipecat/services/google/llm.py index 47877c4df..883932b76 100644 --- a/src/pipecat/services/google/llm.py +++ b/src/pipecat/services/google/llm.py @@ -899,12 +899,18 @@ class GoogleLLMService(LLMService): async for chunk in response: # Stop TTFB metrics after the first chunk await self.stop_ttfb_metrics() + # Gemini may send usage_metadata in multiple chunks with varying behavior: + # - Sometimes a single chunk, sometimes multiple chunks + # - Token counts may be cumulative (growing) or may change between chunks + # - Early chunks may include estimates/overhead that gets refined + # We use assignment (not accumulation) because the final chunk always contains + # the authoritative, billable token usage for the entire response. if chunk.usage_metadata: - prompt_tokens += chunk.usage_metadata.prompt_token_count or 0 - completion_tokens += chunk.usage_metadata.candidates_token_count or 0 - total_tokens += chunk.usage_metadata.total_token_count or 0 - cache_read_input_tokens += chunk.usage_metadata.cached_content_token_count or 0 - reasoning_tokens += chunk.usage_metadata.thoughts_token_count or 0 + prompt_tokens = chunk.usage_metadata.prompt_token_count or 0 + completion_tokens = chunk.usage_metadata.candidates_token_count or 0 + total_tokens = chunk.usage_metadata.total_token_count or 0 + cache_read_input_tokens = chunk.usage_metadata.cached_content_token_count or 0 + reasoning_tokens = chunk.usage_metadata.thoughts_token_count or 0 if not chunk.candidates: continue From f6b6aa87661d4eca65d34594967f805ee765056f Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 6 Nov 2025 11:38:28 -0500 Subject: [PATCH 2/8] fix: STTMuteFilter no longer sends STTMuteFrame --- CHANGELOG.md | 7 ++++++ .../processors/filters/stt_mute_filter.py | 22 ++++++------------- tests/test_stt_mute_filter.py | 15 ------------- 3 files changed, 14 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c7d7c5d97..59e272a16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,6 +58,13 @@ reason")`. ### Changed +- `STTMuteFilter` no longer sends `STTMuteFrame` to the STT service. The filter + now blocks frames locally without instructing the STT service to stop + processing audio. This prevents inactivity-related errors (such as 409 errors + from Google STT) while maintaining the same muting behavior at the application + level. Important: The STTMuteFilter should be placed _after_ the STT service + itself. + - Bumped the `fastapi` dependency's upperbound to `<0.122.0`. - Updated the default model for `GoogleVertexLLMService` to `gemini-2.5-flash`. diff --git a/src/pipecat/processors/filters/stt_mute_filter.py b/src/pipecat/processors/filters/stt_mute_filter.py index 613d1ef51..a134fee5d 100644 --- a/src/pipecat/processors/filters/stt_mute_filter.py +++ b/src/pipecat/processors/filters/stt_mute_filter.py @@ -118,24 +118,16 @@ class STTMuteFilter(FrameProcessor): self._first_speech_handled = False self._bot_is_speaking = False self._function_call_in_progress = False - self._is_muted = False # Initialize as unmuted, will set state on StartFrame if needed - - @property - def is_muted(self) -> bool: - """Check if STT is currently muted. - - Returns: - True if STT is currently muted and audio frames are being suppressed. - """ - return self._is_muted + self._is_muted = False async def _handle_mute_state(self, should_mute: bool): """Handle STT muting and interruption control state changes.""" - if should_mute != self.is_muted: + if should_mute != self._is_muted: logger.debug(f"STTMuteFilter {'muting' if should_mute else 'unmuting'}") self._is_muted = should_mute - await self.push_frame(STTMuteFrame(mute=should_mute), FrameDirection.UPSTREAM) - await self.push_frame(STTMuteFrame(mute=should_mute), FrameDirection.DOWNSTREAM) + # Note: We don't send STTMuteFrame to the STT service itself. + # The filter blocks frames locally, but the STT service continues + # processing audio to keep streaming connections alive (e.g., Google STT). async def _should_mute(self) -> bool: """Determine if STT should be muted based on current state and strategies.""" @@ -215,7 +207,7 @@ class STTMuteFilter(FrameProcessor): ), ): # Only pass VAD-related frames when not muted - if not self.is_muted: + if not self._is_muted: await self.push_frame(frame, direction) else: logger.trace(f"{frame.__class__.__name__} suppressed - STT currently muted") @@ -224,5 +216,5 @@ class STTMuteFilter(FrameProcessor): await self.push_frame(frame, direction) # Finally handle mute state change if needed - if should_mute is not None and should_mute != self.is_muted: + if should_mute is not None and should_mute != self._is_muted: await self._handle_mute_state(should_mute) diff --git a/tests/test_stt_mute_filter.py b/tests/test_stt_mute_filter.py index 3d707dca7..2bd041b65 100644 --- a/tests/test_stt_mute_filter.py +++ b/tests/test_stt_mute_filter.py @@ -13,7 +13,6 @@ from pipecat.frames.frames import ( FunctionCallResultFrame, InputAudioRawFrame, InterimTranscriptionFrame, - STTMuteFrame, TranscriptionFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame, @@ -49,9 +48,7 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase): expected_returned_frames = [ BotStartedSpeakingFrame, - STTMuteFrame, # mute=True BotStoppedSpeakingFrame, - STTMuteFrame, # mute=False BotStartedSpeakingFrame, VADUserStartedSpeakingFrame, # Now passes through UserStartedSpeakingFrame, # Now passes through @@ -98,18 +95,14 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase): expected_returned_frames = [ BotStartedSpeakingFrame, - STTMuteFrame, # mute=True BotStoppedSpeakingFrame, - STTMuteFrame, # mute=False VADUserStartedSpeakingFrame, UserStartedSpeakingFrame, InputAudioRawFrame, VADUserStoppedSpeakingFrame, UserStoppedSpeakingFrame, BotStartedSpeakingFrame, - STTMuteFrame, # mute=True BotStoppedSpeakingFrame, - STTMuteFrame, # mute=False ] await run_test( @@ -144,9 +137,7 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase): expected_returned_frames = [ BotStartedSpeakingFrame, - STTMuteFrame, # mute=True BotStoppedSpeakingFrame, - STTMuteFrame, # mute=False InterimTranscriptionFrame, # Only passes through after bot stops speaking TranscriptionFrame, # Only passes through after bot stops speaking ] @@ -193,9 +184,7 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase): # VADUserStoppedSpeakingFrame, # UserStoppedSpeakingFrame, # FunctionCallInProgressFrame, - # STTMuteFrame, # mute=True # FunctionCallResultFrame, - # STTMuteFrame, # mute=False # VADUserStartedSpeakingFrame, # UserStartedSpeakingFrame, # VADUserStoppedSpeakingFrame, @@ -245,10 +234,8 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase): ] expected_returned_frames = [ - STTMuteFrame, # mute=True after first speech BotStartedSpeakingFrame, BotStoppedSpeakingFrame, - STTMuteFrame, # mute=False after first speech VADUserStartedSpeakingFrame, UserStartedSpeakingFrame, InputAudioRawFrame, @@ -320,9 +307,7 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase): VADUserStoppedSpeakingFrame, UserStoppedSpeakingFrame, BotStartedSpeakingFrame, - STTMuteFrame, # mute=True BotStoppedSpeakingFrame, - STTMuteFrame, # mute=False VADUserStartedSpeakingFrame, UserStartedSpeakingFrame, InputAudioRawFrame, From accdddce9545f8b79346b791017deafe9f2e7dab Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 6 Nov 2025 09:06:31 -0500 Subject: [PATCH 3/8] Add MetricsLogObserver --- CHANGELOG.md | 5 + .../observers/loggers/metrics_log_observer.py | 218 ++++++++++++++++++ 2 files changed, 223 insertions(+) create mode 100644 src/pipecat/observers/loggers/metrics_log_observer.py diff --git a/CHANGELOG.md b/CHANGELOG.md index c7d7c5d97..73a20e628 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added `MetricsLogObserver` for logging performance metrics from `MetricsFrame` + instances. Supports filtering via `include_metrics` parameter to control which + metrics types are logged (TTFB, processing time, LLM token usage, TTS usage, + smart turn metrics). + - Added support for loading external observers. You can now register custom pipeline observers by setting the `PIPECAT_OBSERVER_FILES` environment variable. This variable should contain a colon-separated list of Python files diff --git a/src/pipecat/observers/loggers/metrics_log_observer.py b/src/pipecat/observers/loggers/metrics_log_observer.py new file mode 100644 index 000000000..dbef805be --- /dev/null +++ b/src/pipecat/observers/loggers/metrics_log_observer.py @@ -0,0 +1,218 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Metrics logging observer for Pipecat. + +This module provides an observer that logs metrics frames to the console, +allowing developers to monitor performance metrics, token usage, and other +statistics in real-time. +""" + +from typing import Optional, Set, Type + +from loguru import logger + +from pipecat.frames.frames import MetricsFrame +from pipecat.metrics.metrics import ( + LLMTokenUsage, + LLMUsageMetricsData, + MetricsData, + ProcessingMetricsData, + SmartTurnMetricsData, + TTFBMetricsData, + TTSUsageMetricsData, +) +from pipecat.observers.base_observer import BaseObserver, FramePushed + + +class MetricsLogObserver(BaseObserver): + """Observer to log metrics activity to the console. + + Monitors and logs all MetricsFrame instances, including: + + - TTFBMetricsData (Time To First Byte) + - ProcessingMetricsData (General processing time) + - LLMUsageMetricsData (Token usage statistics) + - TTSUsageMetricsData (Text-to-Speech character counts) + - SmartTurnMetricsData (Turn prediction metrics) + + This allows developers to track performance metrics, token usage, + and other statistics throughout the pipeline. + + Examples: + Log all metrics types:: + + observers = [MetricsLogObserver()] + + Log only LLM and TTS metrics:: + + from pipecat.metrics.metrics import LLMUsageMetricsData, TTSUsageMetricsData + observers = [ + MetricsLogObserver( + include_metrics={LLMUsageMetricsData, TTSUsageMetricsData} + ) + ] + """ + + def __init__( + self, + include_metrics: Optional[Set[Type[MetricsData]]] = None, + **kwargs, + ): + """Initialize the metrics log observer. + + Args: + include_metrics: Set of metrics types to include. If specified, only these + metrics types will be logged. If None, all metrics are logged. + **kwargs: Additional arguments passed to parent class. + """ + super().__init__(**kwargs) + self._include_metrics = include_metrics + self._frames_seen = set() + + async def on_push_frame(self, data: FramePushed): + """Handle frame push events and log metrics frames. + + Logs MetricsFrame instances with detailed information about the + metrics data, formatted appropriately for each metrics type. + + Args: + data: Frame push event data containing source, frame, and timestamp. + """ + frame = data.frame + timestamp = data.timestamp + + if not isinstance(frame, MetricsFrame): + return + + # Skip frames we've already seen to avoid duplicate logging + if frame.id in self._frames_seen: + return + + self._frames_seen.add(frame.id) + + time_sec = timestamp / 1_000_000_000 + + # Process each metrics data item in the frame + for metrics_data in frame.data: + # Check if this metrics type should be logged + if not self._should_log_metrics(metrics_data): + continue + + self._log_metrics_data(metrics_data, time_sec) + + def _should_log_metrics(self, metrics_data: MetricsData) -> bool: + """Determine if a metrics data item should be logged based on filters. + + Args: + metrics_data: The metrics data to check. + + Returns: + True if the metrics should be logged, False otherwise. + """ + # If include_metrics is specified, only log those types + if self._include_metrics is not None: + return type(metrics_data) in self._include_metrics + + # Otherwise, log all metrics + return True + + def _log_metrics_data(self, metrics_data: MetricsData, time_sec: float): + """Log a single metrics data item. + + Args: + metrics_data: The metrics data to log. + time_sec: Timestamp in seconds. + """ + processor_info = f"[{metrics_data.processor}]" + model_info = f" ({metrics_data.model})" if metrics_data.model else "" + + if isinstance(metrics_data, TTFBMetricsData): + logger.debug( + f"📊 {processor_info} TTFB{model_info}: {metrics_data.value}s at {time_sec:.3f}s" + ) + elif isinstance(metrics_data, ProcessingMetricsData): + logger.debug( + f"📊 {processor_info} PROCESSING TIME{model_info}: {metrics_data.value}s at {time_sec:.3f}s" + ) + elif isinstance(metrics_data, LLMUsageMetricsData): + self._log_llm_usage(metrics_data, processor_info, model_info, time_sec) + elif isinstance(metrics_data, TTSUsageMetricsData): + logger.debug( + f"📊 {processor_info} TTS USAGE{model_info}: {metrics_data.value} characters at {time_sec:.3f}s" + ) + elif isinstance(metrics_data, SmartTurnMetricsData): + self._log_smart_turn(metrics_data, processor_info, model_info, time_sec) + else: + # Generic fallback for unknown metrics types + logger.debug( + f"📊 {processor_info} METRICS{model_info}: {metrics_data} at {time_sec:.3f}s" + ) + + def _log_llm_usage( + self, + metrics_data: LLMUsageMetricsData, + processor_info: str, + model_info: str, + time_sec: float, + ): + """Log LLM token usage metrics. + + Args: + metrics_data: The LLM usage metrics data. + processor_info: Formatted processor name string. + model_info: Formatted model name string. + time_sec: Timestamp in seconds. + """ + usage: LLMTokenUsage = metrics_data.value + + # Build usage details + details = [ + f"prompt: {usage.prompt_tokens}", + f"completion: {usage.completion_tokens}", + f"total: {usage.total_tokens}", + ] + + if usage.cache_read_input_tokens is not None: + details.append(f"cache_read: {usage.cache_read_input_tokens}") + + if usage.cache_creation_input_tokens is not None: + details.append(f"cache_creation: {usage.cache_creation_input_tokens}") + + if usage.reasoning_tokens is not None: + details.append(f"reasoning: {usage.reasoning_tokens}") + + usage_str = ", ".join(details) + + logger.debug( + f"📊 {processor_info} LLM TOKEN USAGE{model_info}: {usage_str} at {time_sec:.2f}s" + ) + + def _log_smart_turn( + self, + metrics_data: SmartTurnMetricsData, + processor_info: str, + model_info: str, + time_sec: float, + ): + """Log smart turn prediction metrics. + + Args: + metrics_data: The smart turn metrics data. + processor_info: Formatted processor name string. + model_info: Formatted model name string. + time_sec: Timestamp in seconds. + """ + complete_str = "COMPLETE" if metrics_data.is_complete else "INCOMPLETE" + + logger.debug( + f"📊 {processor_info} SMART TURN{model_info}: {complete_str} " + f"(probability: {metrics_data.probability:.2%}, " + f"inference: {metrics_data.inference_time_ms:.1f}ms, " + f"server: {metrics_data.server_total_time_ms:.1f}ms, " + f"e2e: {metrics_data.e2e_processing_time_ms:.1f}ms) " + f"at {time_sec:.2f}s" + ) From 58b552171d0dac390313e75527a807d5d20bc8fe Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Wed, 5 Nov 2025 14:09:03 -0500 Subject: [PATCH 4/8] Add pronunciation_dictionary_locators to ElevenLabs TTS Services --- CHANGELOG.md | 3 +++ src/pipecat/services/elevenlabs/tts.py | 32 ++++++++++++++++++++++++-- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 56f94bb64..b220622b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 metrics types are logged (TTFB, processing time, LLM token usage, TTS usage, smart turn metrics). +- Added `pronunciation_dictionary_locators` to `ElevenLabsTTSService` and + `ElevenLabsHttpTTSService`. + - Added support for loading external observers. You can now register custom pipeline observers by setting the `PIPECAT_OBSERVER_FILES` environment variable. This variable should contain a colon-separated list of Python files diff --git a/src/pipecat/services/elevenlabs/tts.py b/src/pipecat/services/elevenlabs/tts.py index e87e20431..44017e264 100644 --- a/src/pipecat/services/elevenlabs/tts.py +++ b/src/pipecat/services/elevenlabs/tts.py @@ -157,6 +157,18 @@ def build_elevenlabs_voice_settings( return voice_settings or None +class PronunciationDictionaryLocator(BaseModel): + """Locator for a pronunciation dictionary. + + Attributes: + pronunciation_dictionary_id: The ID of the pronunciation dictionary. + version_id: The version ID of the pronunciation dictionary. + """ + + pronunciation_dictionary_id: str + version_id: str + + def calculate_word_times( alignment_info: Mapping[str, Any], cumulative_time: float, @@ -239,6 +251,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService): enable_ssml_parsing: Whether to parse SSML tags in text. enable_logging: Whether to enable ElevenLabs logging. apply_text_normalization: Text normalization mode ("auto", "on", "off"). + pronunciation_dictionary_locators: List of pronunciation dictionary locators to use. """ language: Optional[Language] = None @@ -251,6 +264,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService): enable_ssml_parsing: Optional[bool] = None enable_logging: Optional[bool] = None apply_text_normalization: Optional[Literal["auto", "on", "off"]] = None + pronunciation_dictionary_locators: Optional[List[PronunciationDictionaryLocator]] = None def __init__( self, @@ -321,6 +335,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService): self.set_voice(voice_id) self._output_format = "" # initialized in start() self._voice_settings = self._set_voice_settings() + self._pronunciation_dictionary_locators = params.pronunciation_dictionary_locators # Indicates if we have sent TTSStartedFrame. It will reset to False when # there's an interruption or TTSStoppedFrame. @@ -704,12 +719,17 @@ class ElevenLabsTTSService(AudioContextWordTTSService): if not self.audio_context_available(self._context_id): await self.create_audio_context(self._context_id) - # Initialize context with voice settings + # Initialize context with voice settings and pronunciation dictionaries msg = {"text": " ", "context_id": self._context_id} if self._voice_settings: msg["voice_settings"] = self._voice_settings + if self._pronunciation_dictionary_locators: + msg["pronunciation_dictionary_locators"] = [ + locator.model_dump() + for locator in self._pronunciation_dictionary_locators + ] await self._websocket.send(json.dumps(msg)) - logger.trace(f"Created new context {self._context_id} with voice settings") + logger.trace(f"Created new context {self._context_id}") await self._send_text(text) await self.start_tts_usage_metrics(text) @@ -745,6 +765,7 @@ class ElevenLabsHttpTTSService(WordTTSService): use_speaker_boost: Whether to use speaker boost enhancement. speed: Voice speed control (0.25 to 4.0). apply_text_normalization: Text normalization mode ("auto", "on", "off"). + pronunciation_dictionary_locators: List of pronunciation dictionary locators to use. """ language: Optional[Language] = None @@ -755,6 +776,7 @@ class ElevenLabsHttpTTSService(WordTTSService): use_speaker_boost: Optional[bool] = None speed: Optional[float] = None apply_text_normalization: Optional[Literal["auto", "on", "off"]] = None + pronunciation_dictionary_locators: Optional[List[PronunciationDictionaryLocator]] = None def __init__( self, @@ -813,6 +835,7 @@ class ElevenLabsHttpTTSService(WordTTSService): self.set_voice(voice_id) self._output_format = "" # initialized in start() self._voice_settings = self._set_voice_settings() + self._pronunciation_dictionary_locators = params.pronunciation_dictionary_locators # Track cumulative time to properly sequence word timestamps across utterances self._cumulative_time = 0 @@ -977,6 +1000,11 @@ class ElevenLabsHttpTTSService(WordTTSService): if self._voice_settings: payload["voice_settings"] = self._voice_settings + if self._pronunciation_dictionary_locators: + payload["pronunciation_dictionary_locators"] = [ + locator.model_dump() for locator in self._pronunciation_dictionary_locators + ] + if self._settings["apply_text_normalization"] is not None: payload["apply_text_normalization"] = self._settings["apply_text_normalization"] From 1fb6d6bd23241dbf0dda98748a3e6183dd2f877d Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 6 Nov 2025 12:59:37 -0500 Subject: [PATCH 5/8] GoogleSTTService: Add more robust handling of 409 errors --- CHANGELOG.md | 7 +++++++ examples/foundational/07n-interruptible-google.py | 3 ++- src/pipecat/services/google/stt.py | 13 +++++++++++++ 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3aa59cf93..649424eec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -73,6 +73,13 @@ reason")`. level. Important: The STTMuteFilter should be placed _after_ the STT service itself. +- Improved `GoogleSTTService` error handling to properly catch gRPC `Aborted` + exceptions (corresponding to 409 errors) caused by stream inactivity. These + exceptions are now logged at DEBUG level instead of ERROR level, since they + indicate expected behavior when no audio is sent for 10+ seconds (e.g., during + long silences or when audio input is blocked). The service automatically + reconnects when this occurs. + - Bumped the `fastapi` dependency's upperbound to `<0.122.0`. - Updated the default model for `GoogleVertexLLMService` to `gemini-2.5-flash`. diff --git a/examples/foundational/07n-interruptible-google.py b/examples/foundational/07n-interruptible-google.py index 203099c59..879ed0d1a 100644 --- a/examples/foundational/07n-interruptible-google.py +++ b/examples/foundational/07n-interruptible-google.py @@ -61,8 +61,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info(f"Starting bot") stt = GoogleSTTService( - params=GoogleSTTService.InputParams(languages=Language.EN_US), + params=GoogleSTTService.InputParams(languages=Language.EN_US, model="chirp_3"), credentials=os.getenv("GOOGLE_TEST_CREDENTIALS"), + location="us", ) tts = GoogleTTSService( diff --git a/src/pipecat/services/google/stt.py b/src/pipecat/services/google/stt.py index fa538b02e..59d1dcd63 100644 --- a/src/pipecat/services/google/stt.py +++ b/src/pipecat/services/google/stt.py @@ -41,6 +41,7 @@ from pipecat.utils.time import time_now_iso8601 try: from google.api_core.client_options import ClientOptions + from google.api_core.exceptions import Aborted from google.auth import default from google.auth.exceptions import GoogleAuthError from google.cloud import speech_v2 @@ -886,6 +887,18 @@ class GoogleSTTService(STTService): result=result, ) ) + except Aborted as e: + # Handle stream abort due to inactivity (409 error). + # This occurs when no audio is sent to the stream for 10+ seconds, + # which can happen when InputAudioRawFrames are blocked (e.g., by STTMuteFilter). + # Google's STT service automatically closes the stream in this case. + # We log at DEBUG level (not ERROR) since this is recoverable, then re-raise + # to trigger automatic reconnection in _stream_audio. + logger.debug( + f"{self} Stream aborted due to inactivity (no audio input). " + f"Reconnecting automatically..." + ) + raise except Exception as e: logger.error(f"Error processing Google STT responses: {e}") # Re-raise the exception to let it propagate (e.g. in the case of a From a14d00b8069205a1f761a6109e49aeceabf9e47f Mon Sep 17 00:00:00 2001 From: Marcus <111281783+marcus-daily@users.noreply.github.com> Date: Thu, 6 Nov 2025 19:42:05 +0000 Subject: [PATCH 6/8] Improved LocalSmartTurnAnalyzerV3 performance on systems with a low CPU count (#2982) --- CHANGELOG.md | 3 +++ src/pipecat/audio/turn/smart_turn/local_smart_turn_v3.py | 6 +++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 649424eec..f0b16df29 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -64,6 +64,9 @@ reason")`. - Added support for passing in an `LLMSwicher` to `MCPClient.register_tools()` (as well as the new `MCPClient.register_tools_schema()`). +- Added `cpu_count` parameter to `LocalSmartTurnAnalyzerV3`. This is set to `1` + by default for more predictable performance on low-CPU systems. + ### Changed - `STTMuteFilter` no longer sends `STTMuteFrame` to the STT service. The filter diff --git a/src/pipecat/audio/turn/smart_turn/local_smart_turn_v3.py b/src/pipecat/audio/turn/smart_turn/local_smart_turn_v3.py index ec60c293a..08b9f3cd1 100644 --- a/src/pipecat/audio/turn/smart_turn/local_smart_turn_v3.py +++ b/src/pipecat/audio/turn/smart_turn/local_smart_turn_v3.py @@ -35,12 +35,15 @@ class LocalSmartTurnAnalyzerV3(BaseSmartTurn): enabling offline operation without network dependencies. """ - def __init__(self, *, smart_turn_model_path: Optional[str] = None, **kwargs): + def __init__( + self, *, smart_turn_model_path: Optional[str] = None, cpu_count: int = 1, **kwargs + ): """Initialize the local ONNX smart-turn-v3 analyzer. Args: smart_turn_model_path: Path to the ONNX model file. If this is not set, the bundled smart-turn-v3.0 model will be used. + cpu_count: The number of CPUs to use for inference. Defaults to 1. **kwargs: Additional arguments passed to BaseSmartTurn. """ super().__init__(**kwargs) @@ -70,6 +73,7 @@ class LocalSmartTurnAnalyzerV3(BaseSmartTurn): so = ort.SessionOptions() so.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL so.inter_op_num_threads = 1 + so.intra_op_num_threads = cpu_count so.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL self._feature_extractor = WhisperFeatureExtractor(chunk_length=8) From f6916428b1f07d1b0484973b58a4924cab91cd0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Thu, 6 Nov 2025 09:40:33 -0800 Subject: [PATCH 7/8] FrameProcessor: add new broadcast_frame() method --- CHANGELOG.md | 10 ++++- .../processors/filters/stt_mute_filter.py | 1 - src/pipecat/processors/frame_processor.py | 15 ++++++- src/pipecat/services/deepgram/flux/stt.py | 3 +- src/pipecat/services/llm_service.py | 42 ++++--------------- src/pipecat/transports/base_input.py | 13 ++---- 6 files changed, 35 insertions(+), 49 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f0b16df29..9baa61a9c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added new `FrameProcessor.broadcast_frame()` method. This will push two + instances of a given frame class, one upstream and the other downstream. + + ```python + await self.broadcast_frame(UserSpeakingFrame) + ``` + - Added `MetricsLogObserver` for logging performance metrics from `MetricsFrame` instances. Supports filtering via `include_metrics` parameter to control which metrics types are logged (TTFB, processing time, LLM token usage, TTS usage, @@ -36,8 +43,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `CancelFrame` and `CancelTaskFrame` have an optional `reason` field to indicate why the pipeline is being canceled. This can be also specified when - you cancel a task with `PipelineTask.cancel(reason="cancellation your -reason")`. + you cancel a task with `PipelineTask.cancel(reason="cancellation reason")`. - Added `include_prob_metrics` parameter to Whisper STT services to enable access to probability metrics from transcription results. diff --git a/src/pipecat/processors/filters/stt_mute_filter.py b/src/pipecat/processors/filters/stt_mute_filter.py index a134fee5d..0b822b54a 100644 --- a/src/pipecat/processors/filters/stt_mute_filter.py +++ b/src/pipecat/processors/filters/stt_mute_filter.py @@ -27,7 +27,6 @@ from pipecat.frames.frames import ( InterimTranscriptionFrame, InterruptionFrame, StartFrame, - STTMuteFrame, TranscriptionFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame, diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 1ca3333b5..361aa0a44 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -14,7 +14,7 @@ management, and frame flow control mechanisms. import asyncio from dataclasses import dataclass from enum import Enum -from typing import Any, Awaitable, Callable, Coroutine, List, Optional, Sequence, Tuple +from typing import Any, Awaitable, Callable, Coroutine, List, Optional, Sequence, Tuple, Type from loguru import logger @@ -689,6 +689,19 @@ class FrameProcessor(BaseObject): self._wait_for_interruption = False + async def broadcast_frame(self, frame_cls: Type[Frame], **kwargs): + """Broadcasts a frame of the specified class upstream and downstream. + + This method creates two instances of the given frame class using the + provided keyword arguments and pushes them upstream and downstream. + + Args: + frame_cls: The class of the frame to be broadcasted. + **kwargs: Keyword arguments to be passed to the frame's constructor. + """ + await self.push_frame(frame_cls(**kwargs)) + await self.push_frame(frame_cls(**kwargs), FrameDirection.UPSTREAM) + async def __start(self, frame: StartFrame): """Handle the start frame to initialize processor state. diff --git a/src/pipecat/services/deepgram/flux/stt.py b/src/pipecat/services/deepgram/flux/stt.py index 78f615a4d..9b22a5d28 100644 --- a/src/pipecat/services/deepgram/flux/stt.py +++ b/src/pipecat/services/deepgram/flux/stt.py @@ -526,8 +526,7 @@ class DeepgramFluxSTTService(WebsocketSTTService): """ logger.debug("User started speaking") await self.push_interruption_task_frame_and_wait() - await self.push_frame(UserStartedSpeakingFrame(), FrameDirection.DOWNSTREAM) - await self.push_frame(UserStartedSpeakingFrame(), FrameDirection.UPSTREAM) + await self.broadcast_frame(UserStartedSpeakingFrame) await self.start_metrics() await self._call_event_handler("on_start_of_turn", transcript) if transcript: diff --git a/src/pipecat/services/llm_service.py b/src/pipecat/services/llm_service.py index e342be7eb..7f743354d 100644 --- a/src/pipecat/services/llm_service.py +++ b/src/pipecat/services/llm_service.py @@ -433,11 +433,7 @@ class LLMService(AIService): await self._call_event_handler("on_function_calls_started", function_calls) - # Push frame both downstream and upstream - started_frame_downstream = FunctionCallsStartedFrame(function_calls=function_calls) - started_frame_upstream = FunctionCallsStartedFrame(function_calls=function_calls) - await self.push_frame(started_frame_downstream, FrameDirection.DOWNSTREAM) - await self.push_frame(started_frame_upstream, FrameDirection.UPSTREAM) + await self.broadcast_frame(FunctionCallsStartedFrame, function_calls=function_calls) for function_call in function_calls: if function_call.function_name in self._functions.keys(): @@ -552,33 +548,24 @@ class LLMService(AIService): # NOTE(aleix): This needs to be removed after we remove the deprecation. await self._call_start_function(runner_item.context, runner_item.function_name) - # Push a function call in-progress downstream. This frame will let our - # assistant context aggregator know that we are in the middle of a - # function call. Some contexts/aggregators may not need this. But some - # definitely do (Anthropic, for example). Also push it upstream for use - # by other processors, like STTMuteFilter. - progress_frame_downstream = FunctionCallInProgressFrame( + # Broadcast function call in-progress. This frame will let our assistant + # context aggregator know that we are in the middle of a function + # call. Some contexts/aggregators may not need this. But some definitely + # do (Anthropic, for example). + await self.broadcast_frame( + FunctionCallInProgressFrame, function_name=runner_item.function_name, tool_call_id=runner_item.tool_call_id, arguments=runner_item.arguments, cancel_on_interruption=item.cancel_on_interruption, ) - progress_frame_upstream = FunctionCallInProgressFrame( - function_name=runner_item.function_name, - tool_call_id=runner_item.tool_call_id, - arguments=runner_item.arguments, - cancel_on_interruption=item.cancel_on_interruption, - ) - - # Push frame both downstream and upstream - await self.push_frame(progress_frame_downstream, FrameDirection.DOWNSTREAM) - await self.push_frame(progress_frame_upstream, FrameDirection.UPSTREAM) # Define a callback function that pushes a FunctionCallResultFrame upstream & downstream. async def function_call_result_callback( result: Any, *, properties: Optional[FunctionCallResultProperties] = None ): - result_frame_downstream = FunctionCallResultFrame( + await self.broadcast_frame( + FunctionCallResultFrame, function_name=runner_item.function_name, tool_call_id=runner_item.tool_call_id, arguments=runner_item.arguments, @@ -586,17 +573,6 @@ class LLMService(AIService): run_llm=runner_item.run_llm, properties=properties, ) - result_frame_upstream = FunctionCallResultFrame( - function_name=runner_item.function_name, - tool_call_id=runner_item.tool_call_id, - arguments=runner_item.arguments, - result=result, - run_llm=runner_item.run_llm, - properties=properties, - ) - - await self.push_frame(result_frame_downstream, FrameDirection.DOWNSTREAM) - await self.push_frame(result_frame_upstream, FrameDirection.UPSTREAM) if isinstance(item.handler, DirectFunctionWrapper): # Handler is a DirectFunctionWrapper diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index 0f247c6dc..e66007e04 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -335,10 +335,7 @@ class BaseInputTransport(FrameProcessor): logger.debug("User started speaking") self._user_speaking = True - upstream_frame = UserStartedSpeakingFrame(emulated=emulated) - downstream_frame = UserStartedSpeakingFrame(emulated=emulated) - await self.push_frame(downstream_frame) - await self.push_frame(upstream_frame, FrameDirection.UPSTREAM) + await self.broadcast_frame(UserStartedSpeakingFrame, emulated=emulated) # Only push InterruptionFrame if: # 1. No interruption config is set, OR @@ -359,10 +356,7 @@ class BaseInputTransport(FrameProcessor): logger.debug("User stopped speaking") self._user_speaking = False - upstream_frame = UserStoppedSpeakingFrame(emulated=emulated) - downstream_frame = UserStoppedSpeakingFrame(emulated=emulated) - await self.push_frame(downstream_frame) - await self.push_frame(upstream_frame, FrameDirection.UPSTREAM) + await self.broadcast_frame(UserStoppedSpeakingFrame, emulated=emulated) # # Handle bot speaking state @@ -479,8 +473,7 @@ class BaseInputTransport(FrameProcessor): await self._run_turn_analyzer(frame, vad_state, previous_vad_state) if vad_state == VADState.SPEAKING: - await self.push_frame(UserSpeakingFrame()) - await self.push_frame(UserSpeakingFrame(), FrameDirection.UPSTREAM) + await self.broadcast_frame(UserSpeakingFrame) # Push audio downstream if passthrough is set. if self._params.audio_in_passthrough: From 4c8c44ecc35c0652ff93f7a9eabe771c39a35c70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Thu, 6 Nov 2025 11:16:34 -0800 Subject: [PATCH 8/8] BaseOutputTransport: send silence when EndFrame is received --- CHANGELOG.md | 5 +++++ src/pipecat/transports/base_output.py | 13 +++++++++++++ src/pipecat/transports/base_transport.py | 2 ++ 3 files changed, 20 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9baa61a9c..5c6ff6143 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added `TransportParams.audio_out_silence_secs`, which specifies how many + seconds of silence to output when an `EndFrame` reaches the output + transport. This can help ensure that all audio data is fully delivered to + clients. + - Added new `FrameProcessor.broadcast_frame()` method. This will push two instances of a given frame class, one upstream and the other downstream. diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index 3edf434a9..ace1e2edf 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -729,11 +729,24 @@ class BaseOutputTransport(FrameProcessor): else: return without_mixer(BOT_VAD_STOP_SECS) + async def _send_silence(self, secs: int): + if secs <= 0: + return + + sample_width = 2 + silence = b"\x00" * self.sample_rate * sample_width * secs + silence_frame = OutputAudioRawFrame( + audio=silence, sample_rate=self.sample_rate, num_channels=1 + ) + await self._transport.write_audio_frame(silence_frame) + async def _audio_task_handler(self): """Main audio processing task handler.""" async for frame in self._next_frame(): # No need to push EndFrame, it's pushed from process_frame(). if isinstance(frame, EndFrame): + # Send some final silence so words don't cut out. + await self._send_silence(self._params.audio_out_end_silence_secs) break # Handle frame. diff --git a/src/pipecat/transports/base_transport.py b/src/pipecat/transports/base_transport.py index b48cec02d..e1f7ecb50 100644 --- a/src/pipecat/transports/base_transport.py +++ b/src/pipecat/transports/base_transport.py @@ -83,6 +83,7 @@ class TransportParams(BaseModel): audio_out_10ms_chunks: Number of 10ms chunks to buffer for output. audio_out_mixer: Audio mixer instance or destination mapping. audio_out_destinations: List of audio output destination identifiers. + audio_out_end_silence_secs: How much silence to send after an EndFrame (0 for no silence). audio_in_enabled: Enable audio input streaming. audio_in_sample_rate: Input audio sample rate in Hz. audio_in_channels: Number of input audio channels. @@ -131,6 +132,7 @@ class TransportParams(BaseModel): audio_out_10ms_chunks: int = 4 audio_out_mixer: Optional[BaseAudioMixer | Mapping[Optional[str], BaseAudioMixer]] = None audio_out_destinations: List[str] = Field(default_factory=list) + audio_out_end_silence_secs: int = 2 audio_in_enabled: bool = False audio_in_sample_rate: Optional[int] = None audio_in_channels: int = 1