fix(types): assert_given at store-mode settings read sites

Apply assert_given across service modules to narrow reads from
store-mode settings fields (self._settings.X, default_settings.X),
where _NotGiven is declared in the field type but should never appear
at runtime (enforced by validate_complete()).

Two idioms used:

- Inline wrap for single uses:
    func(assert_given(self._settings.enable_prompt_caching), ...)

- Extract-and-reuse when the same value is used multiple times:
    thinking = assert_given(self._settings.thinking)
    if thinking:
        params["thinking"] = thinking.model_dump(...)

43 service files touched. Cleared ~172 pyright errors; remaining
_NotGiven-related errors are in adjacent categories (flavor mismatch
between openai/anthropic NotGiven and pipecat _NotGiven, settings
field types that should allow None but don't) that need different
fixes.
This commit is contained in:
Paul Kompfner
2026-04-23 17:39:17 -04:00
parent 70f3d32734
commit 6a337f1bc6
43 changed files with 344 additions and 231 deletions

View File

@@ -39,7 +39,7 @@ from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
from pipecat.services.settings import NOT_GIVEN as _NOT_GIVEN
from pipecat.services.settings import LLMSettings, _NotGiven, is_given
from pipecat.services.settings import LLMSettings, _NotGiven, assert_given, is_given
from pipecat.utils.tracing.service_decorators import traced_llm
try:
@@ -281,11 +281,13 @@ class AnthropicLLMService(LLMService):
messages = []
system = NOT_GIVEN
tools = []
effective_instruction = system_instruction or self._settings.system_instruction
effective_instruction = system_instruction or assert_given(
self._settings.system_instruction
)
adapter: AnthropicLLMAdapter = self.get_llm_adapter()
invocation_params = adapter.get_llm_invocation_params(
context,
enable_prompt_caching=self._settings.enable_prompt_caching,
enable_prompt_caching=assert_given(self._settings.enable_prompt_caching),
system_instruction=effective_instruction,
)
messages = invocation_params["messages"]
@@ -305,8 +307,9 @@ class AnthropicLLMService(LLMService):
"tools": tools,
"betas": ["interleaved-thinking-2025-05-14"],
}
if self._settings.thinking:
params["thinking"] = self._settings.thinking.model_dump(exclude_unset=True)
thinking = assert_given(self._settings.thinking)
if thinking:
params["thinking"] = thinking.model_dump(exclude_unset=True)
params.update(self._settings.extra)
@@ -319,8 +322,8 @@ class AnthropicLLMService(LLMService):
adapter: AnthropicLLMAdapter = self.get_llm_adapter()
params: AnthropicLLMInvocationParams = adapter.get_llm_invocation_params(
context,
enable_prompt_caching=self._settings.enable_prompt_caching,
system_instruction=self._settings.system_instruction,
enable_prompt_caching=assert_given(self._settings.enable_prompt_caching),
system_instruction=assert_given(self._settings.system_instruction),
)
return params
@@ -359,8 +362,9 @@ class AnthropicLLMService(LLMService):
}
# Add thinking parameter if set
if self._settings.thinking:
params["thinking"] = self._settings.thinking.model_dump(exclude_unset=True)
thinking = assert_given(self._settings.thinking)
if thinking:
params["thinking"] = thinking.model_dump(exclude_unset=True)
# Messages, system, tools
params.update(params_from_context)

View File

@@ -37,7 +37,7 @@ from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import LLMService
from pipecat.services.settings import NOT_GIVEN, LLMSettings, _NotGiven
from pipecat.services.settings import NOT_GIVEN, LLMSettings, _NotGiven, assert_given
from pipecat.utils.tracing.service_decorators import traced_llm
try:
@@ -279,7 +279,9 @@ class AWSBedrockLLMService(LLMService):
"""
messages = []
system = []
effective_instruction = system_instruction or self._settings.system_instruction
effective_instruction = system_instruction or assert_given(
self._settings.system_instruction
)
adapter: AWSBedrockLLMAdapter = self.get_llm_adapter()
params: AWSBedrockLLMInvocationParams = adapter.get_llm_invocation_params(
context, system_instruction=effective_instruction
@@ -371,7 +373,7 @@ class AWSBedrockLLMService(LLMService):
def _get_llm_invocation_params(self, context: LLMContext) -> AWSBedrockLLMInvocationParams:
adapter: AWSBedrockLLMAdapter = self.get_llm_adapter()
params: AWSBedrockLLMInvocationParams = adapter.get_llm_invocation_params(
context, system_instruction=self._settings.system_instruction
context, system_instruction=assert_given(self._settings.system_instruction)
)
return params

View File

@@ -51,7 +51,7 @@ from pipecat.frames.frames import (
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import LLMService
from pipecat.services.settings import NOT_GIVEN, LLMSettings, _NotGiven
from pipecat.services.settings import NOT_GIVEN, LLMSettings, _NotGiven, assert_given
from pipecat.utils.time import time_now_iso8601
try:
@@ -559,7 +559,9 @@ class AWSNovaSonicLLMService(LLMService):
# Start the bidirectional stream
self._stream = await self._client.invoke_model_with_bidirectional_stream(
InvokeModelWithBidirectionalStreamOperationInput(model_id=self._settings.model)
InvokeModelWithBidirectionalStreamOperationInput(
model_id=assert_given(self._settings.model)
)
)
# Send session start event
@@ -598,7 +600,7 @@ class AWSNovaSonicLLMService(LLMService):
# Read context
adapter: AWSNovaSonicLLMAdapter = self.get_llm_adapter()
llm_connection_params = adapter.get_llm_invocation_params(
self._context, system_instruction=self._settings.system_instruction
self._context, system_instruction=assert_given(self._settings.system_instruction)
)
# Send prompt start event, specifying tools.

View File

@@ -30,7 +30,7 @@ from pipecat.frames.frames import (
TranscriptionFrame,
)
from pipecat.services.aws.utils import build_event_message, decode_event, get_presigned_url
from pipecat.services.settings import STTSettings
from pipecat.services.settings import STTSettings, assert_given
from pipecat.services.stt_latency import AWS_TRANSCRIBE_TTFS_P99
from pipecat.services.stt_service import WebsocketSTTService
from pipecat.transcriptions.language import Language, resolve_language
@@ -260,7 +260,7 @@ class AWSTranscribeSTTService(WebsocketSTTService):
logger.debug("Connecting to AWS Transcribe WebSocket")
language_code = self._settings.language
language_code = assert_given(self._settings.language)
if not language_code:
raise ValueError(f"Unsupported language: {language_code}")
@@ -534,20 +534,21 @@ class AWSTranscribeSTTService(WebsocketSTTService):
is_final = not result.get("IsPartial", True)
if transcript:
language = assert_given(self._settings.language)
if is_final:
await self.push_frame(
TranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
self._settings.language,
language,
result=result,
)
)
await self._handle_transcription(
transcript,
is_final,
self._settings.language,
language,
)
await self.stop_processing_metrics()
else:
@@ -556,7 +557,7 @@ class AWSTranscribeSTTService(WebsocketSTTService):
transcript,
self._user_id,
time_now_iso8601(),
self._settings.language,
language,
result=result,
)
)

View File

@@ -27,7 +27,7 @@ from pipecat.frames.frames import (
TranscriptionFrame,
)
from pipecat.services.azure.common import language_to_azure_language
from pipecat.services.settings import STTSettings
from pipecat.services.settings import STTSettings, assert_given
from pipecat.services.stt_latency import AZURE_TTFS_P99
from pipecat.services.stt_service import STTService
from pipecat.transcriptions.language import Language
@@ -128,9 +128,9 @@ class AzureSTTService(STTService):
**kwargs,
)
recognition_language = default_settings.language or language_to_azure_language(
Language.EN_US
)
recognition_language = assert_given(
default_settings.language
) or language_to_azure_language(Language.EN_US)
if not region and not private_endpoint:
raise ValueError("Either 'region' or 'private_endpoint' must be provided.")
@@ -280,7 +280,9 @@ class AzureSTTService(STTService):
def _on_handle_recognized(self, event):
if event.result.reason == ResultReason.RecognizedSpeech and len(event.result.text) > 0:
language = getattr(event.result, "language", None) or self._settings.language
language = getattr(event.result, "language", None) or assert_given(
self._settings.language
)
frame = TranscriptionFrame(
event.result.text,
self._user_id,
@@ -295,7 +297,9 @@ class AzureSTTService(STTService):
def _on_handle_recognizing(self, event):
if event.result.reason == ResultReason.RecognizingSpeech and len(event.result.text) > 0:
language = getattr(event.result, "language", None) or self._settings.language
language = getattr(event.result, "language", None) or assert_given(
self._settings.language
)
frame = InterimTranscriptionFrame(
event.result.text,
self._user_id,

View File

@@ -25,7 +25,7 @@ from pipecat.frames.frames import (
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.azure.common import language_to_azure_language
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, assert_given
from pipecat.services.tts_service import TextAggregationMode, TTSService
from pipecat.transcriptions.language import Language
from pipecat.utils.tracing.service_decorators import traced_tts
@@ -428,7 +428,7 @@ class AzureTTSService(TTSService, AzureBaseTTSService):
Returns:
True if the language is CJK, False otherwise.
"""
language = (self._settings.language if self._settings.language else "").lower()
language = (assert_given(self._settings.language) or "").lower()
# Check if language starts with CJK language codes
return language.startswith(("zh", "ja", "ko", "cmn", "yue", "wuu"))

View File

@@ -31,7 +31,7 @@ from pipecat.frames.frames import (
StartFrame,
TTSAudioRawFrame,
)
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, assert_given
from pipecat.services.tts_service import TTSService
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.tracing.service_decorators import traced_tts
@@ -270,8 +270,8 @@ class CambTTSService(TTSService):
default_settings.apply_update(settings)
# Warn if sample rate doesn't match model's supported rate
_model = default_settings.model
if sample_rate and sample_rate != MODEL_SAMPLE_RATES.get(_model):
_model = assert_given(default_settings.model)
if sample_rate and _model is not None and sample_rate != MODEL_SAMPLE_RATES.get(_model):
logger.warning(
f"Camb.ai's {_model} model only supports {MODEL_SAMPLE_RATES.get(_model)}Hz "
f"sample rate. Current rate of {sample_rate}Hz may cause issues."
@@ -321,7 +321,8 @@ class CambTTSService(TTSService):
# Use model-specific sample rate if not explicitly specified
if not self._init_sample_rate:
self._sample_rate = MODEL_SAMPLE_RATES.get(self._settings.model, 22050)
model = assert_given(self._settings.model)
self._sample_rate = MODEL_SAMPLE_RATES.get(model, 22050) if model is not None else 22050
@traced_tts
async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame, None]:

View File

@@ -26,7 +26,7 @@ from pipecat.frames.frames import (
TTSAudioRawFrame,
TTSStoppedFrame,
)
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, assert_given
from pipecat.services.tts_service import TextAggregationMode, TTSService, WebsocketTTSService
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.text.skip_tags_aggregator import SkipTagsAggregator
@@ -427,7 +427,7 @@ class CartesiaTTSService(WebsocketTTSService):
Returns:
List of (word, start_time) tuples processed for the language.
"""
current_language = self._settings.language
current_language = assert_given(self._settings.language)
# Check if this is a CJK language (if language is None, treat as non-CJK)
if current_language and self._is_cjk_language(current_language):
@@ -472,10 +472,9 @@ class CartesiaTTSService(WebsocketTTSService):
if self._settings.language:
msg["language"] = self._settings.language
if self._settings.generation_config:
msg["generation_config"] = self._settings.generation_config.model_dump(
exclude_none=True
)
generation_config = assert_given(self._settings.generation_config)
if generation_config:
msg["generation_config"] = generation_config.model_dump(exclude_none=True)
if self._settings.pronunciation_dict_id:
msg["pronunciation_dict_id"] = self._settings.pronunciation_dict_id
@@ -904,10 +903,9 @@ class CartesiaHttpTTSService(TTSService):
if self._settings.language:
payload["language"] = self._settings.language
if self._settings.generation_config:
payload["generation_config"] = self._settings.generation_config.model_dump(
exclude_none=True
)
generation_config = assert_given(self._settings.generation_config)
if generation_config:
payload["generation_config"] = generation_config.model_dump(exclude_none=True)
if self._settings.pronunciation_dict_id:
payload["pronunciation_dict_id"] = self._settings.pronunciation_dict_id

View File

@@ -25,7 +25,7 @@ from pipecat.frames.frames import (
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven, assert_given
from pipecat.services.stt_service import STTService
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.time import time_now_iso8601
@@ -647,7 +647,8 @@ class DeepgramFluxSTTBase(STTService):
average_confidence = self._calculate_average_confidence(data)
detected_language = self._primary_detected_language(data)
if not self._settings.min_confidence or average_confidence > self._settings.min_confidence:
min_confidence = assert_given(self._settings.min_confidence)
if not min_confidence or average_confidence > min_confidence:
# EndOfTurn means Flux has determined the turn is complete,
# so this TranscriptionFrame is always finalized
await self.push_frame(

View File

@@ -39,7 +39,7 @@ from pipecat.frames.frames import (
TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, assert_given
from pipecat.services.tts_service import (
TextAggregationMode,
TTSService,
@@ -1259,9 +1259,10 @@ class ElevenLabsHttpTTSService(TTSService):
# Use the with-timestamps endpoint
url = f"{self._base_url}/v1/text-to-speech/{self._settings.voice}/stream/with-timestamps"
model_id = assert_given(self._settings.model)
payload: dict[str, str | dict[str, float | bool]] = {
"text": text,
"model_id": self._settings.model,
"model_id": model_id,
}
# Include previous text as context if available
@@ -1276,11 +1277,12 @@ class ElevenLabsHttpTTSService(TTSService):
locator.model_dump() for locator in self._pronunciation_dictionary_locators
]
if self._settings.apply_text_normalization is not None:
payload["apply_text_normalization"] = self._settings.apply_text_normalization
apply_text_normalization = assert_given(self._settings.apply_text_normalization)
if apply_text_normalization is not None:
payload["apply_text_normalization"] = apply_text_normalization
language = self._settings.language
if self._settings.model in ELEVENLABS_MULTILINGUAL_MODELS and language:
language = assert_given(self._settings.language)
if model_id in ELEVENLABS_MULTILINGUAL_MODELS and language:
payload["language_code"] = language
logger.debug(f"Using language code: {language}")
elif language:
@@ -1297,8 +1299,9 @@ class ElevenLabsHttpTTSService(TTSService):
params = {
"output_format": self._output_format,
}
if self._settings.optimize_streaming_latency is not None:
params["optimize_streaming_latency"] = self._settings.optimize_streaming_latency
optimize_streaming_latency = assert_given(self._settings.optimize_streaming_latency)
if optimize_streaming_latency is not None:
params["optimize_streaming_latency"] = str(optimize_streaming_latency)
if self._enable_logging is not None:
params["enable_logging"] = str(self._enable_logging).lower()

View File

@@ -26,7 +26,7 @@ from pipecat.frames.frames import (
TTSAudioRawFrame,
TTSStoppedFrame,
)
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, assert_given
from pipecat.services.tts_service import InterruptibleTTSService
from pipecat.transcriptions.language import Language
from pipecat.utils.tracing.service_decorators import traced_tts
@@ -278,7 +278,9 @@ class FishAudioTTSService(InterruptibleTTSService):
logger.debug("Connecting to Fish Audio")
headers = {"Authorization": f"Bearer {self._api_key}"}
headers["model"] = self._settings.model
model = assert_given(self._settings.model)
if model is not None:
headers["model"] = model
self._websocket = await websocket_connect(self._base_url, additional_headers=headers)
# Send initial start message with ormsgpack

View File

@@ -39,7 +39,7 @@ from pipecat.services.gladia.config import (
PreProcessingConfig,
RealtimeProcessingConfig,
)
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven, assert_given
from pipecat.services.stt_latency import GLADIA_TTFS_P99
from pipecat.services.stt_service import WebsocketSTTService
from pipecat.transcriptions.language import Language, resolve_language
@@ -377,7 +377,7 @@ class GladiaSTTService(WebsocketSTTService):
}
# Add custom_metadata if provided
settings["custom_metadata"] = dict(s.custom_metadata or {})
settings["custom_metadata"] = dict(assert_given(s.custom_metadata) or {})
settings["custom_metadata"]["pipecat"] = pipecat_version()
# Add endpointing parameters if provided
@@ -389,20 +389,24 @@ class GladiaSTTService(WebsocketSTTService):
)
# Add language configuration
if s.language_config:
settings["language_config"] = s.language_config.model_dump(exclude_none=True)
language_config = assert_given(s.language_config)
if language_config:
settings["language_config"] = language_config.model_dump(exclude_none=True)
# Add pre_processing configuration if provided
if s.pre_processing:
settings["pre_processing"] = s.pre_processing.model_dump(exclude_none=True)
pre_processing = assert_given(s.pre_processing)
if pre_processing:
settings["pre_processing"] = pre_processing.model_dump(exclude_none=True)
# Add realtime_processing configuration if provided
if s.realtime_processing:
settings["realtime_processing"] = s.realtime_processing.model_dump(exclude_none=True)
realtime_processing = assert_given(s.realtime_processing)
if realtime_processing:
settings["realtime_processing"] = realtime_processing.model_dump(exclude_none=True)
# Add messages_config if provided
if s.messages_config:
settings["messages_config"] = s.messages_config.model_dump(exclude_none=True)
messages_config = assert_given(s.messages_config)
if messages_config:
settings["messages_config"] = messages_config.model_dump(exclude_none=True)
return settings

View File

@@ -61,7 +61,7 @@ from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.google.frames import LLMSearchOrigin, LLMSearchResponseFrame, LLMSearchResult
from pipecat.services.google.utils import update_google_client_http_options
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
from pipecat.services.settings import NOT_GIVEN, LLMSettings, _NotGiven
from pipecat.services.settings import NOT_GIVEN, LLMSettings, _NotGiven, assert_given
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.string import match_endofsentence
from pipecat.utils.time import time_now_iso8601
@@ -368,7 +368,7 @@ class GeminiLiveLLMService(LLMService):
@property
def _is_gemini_3(self) -> bool:
"""Check if the current model is a Gemini 3.x model."""
return "gemini-3" in (self._settings.model or "")
return "gemini-3" in (assert_given(self._settings.model) or "")
def __init__(
self,
@@ -482,9 +482,10 @@ class GeminiLiveLLMService(LLMService):
default_settings.apply_update(settings)
# Warn if user requested TEXT modality
if default_settings.modalities == GeminiModalities.TEXT:
default_modalities = assert_given(default_settings.modalities)
if default_modalities == GeminiModalities.TEXT:
logger.warning(
f"Modality {default_settings.modalities.value!r} may not be supported by recent "
f"Modality {default_modalities.value!r} may not be supported by recent "
"Gemini Live models."
)
@@ -524,13 +525,12 @@ class GeminiLiveLLMService(LLMService):
self._sample_rate = 24000
self._language = self._settings.language
self._language = assert_given(self._settings.language)
self._language_code = (
language_to_gemini_language(self._settings.language)
if self._settings.language
else "en-US"
language_to_gemini_language(self._language) if self._language else "en-US"
)
self._vad_disabled = bool(self._settings.vad and self._settings.vad.disabled)
vad_settings = assert_given(self._settings.vad)
self._vad_disabled = bool(vad_settings and vad_settings.disabled)
# Reconnection tracking
self._consecutive_failures = 0
@@ -780,7 +780,7 @@ class GeminiLiveLLMService(LLMService):
# chooses the init-provided value if there is one.
adapter: GeminiLLMAdapter = self.get_llm_adapter()
params = adapter.get_llm_invocation_params(
self._context, system_instruction=self._system_instruction_from_init
self._context, system_instruction=assert_given(self._system_instruction_from_init)
)
system_instruction = params["system_instruction"]
tools = params["tools"]
@@ -812,7 +812,10 @@ class GeminiLiveLLMService(LLMService):
"No messages found in initial context; seeding with system instruction to trigger bot response."
)
self._context.add_message(
{"role": "system", "content": self._system_instruction_from_init}
{
"role": "system",
"content": assert_given(self._system_instruction_from_init),
}
)
else:
logger.warning(
@@ -931,22 +934,25 @@ class GeminiLiveLLMService(LLMService):
logger.info("Connecting to Gemini service")
try:
# Assemble basic configuration
modalities = assert_given(self._settings.modalities)
media_resolution = assert_given(self._settings.media_resolution)
language = assert_given(self._settings.language)
config = LiveConnectConfig(
generation_config=GenerationConfig(
frequency_penalty=self._settings.frequency_penalty,
max_output_tokens=self._settings.max_tokens,
presence_penalty=self._settings.presence_penalty,
temperature=self._settings.temperature,
top_k=self._settings.top_k,
top_p=self._settings.top_p,
response_modalities=[Modality(self._settings.modalities.value)],
frequency_penalty=assert_given(self._settings.frequency_penalty),
max_output_tokens=assert_given(self._settings.max_tokens),
presence_penalty=assert_given(self._settings.presence_penalty),
temperature=assert_given(self._settings.temperature),
top_k=assert_given(self._settings.top_k),
top_p=assert_given(self._settings.top_p),
response_modalities=[Modality(modalities.value)],
speech_config=SpeechConfig(
voice_config=VoiceConfig(
prebuilt_voice_config={"voice_name": self._settings.voice}
prebuilt_voice_config={"voice_name": assert_given(self._settings.voice)}
),
language_code=self._settings.language,
language_code=language,
),
media_resolution=MediaResolution(self._settings.media_resolution.value),
media_resolution=MediaResolution(media_resolution.value),
),
input_audio_transcription=AudioTranscriptionConfig(),
output_audio_transcription=AudioTranscriptionConfig(),
@@ -959,7 +965,7 @@ class GeminiLiveLLMService(LLMService):
config.history_config = history_config
# Add context window compression to configuration, if enabled
cwc = self._settings.context_window_compression or {}
cwc = assert_given(self._settings.context_window_compression) or {}
if cwc.get("enabled", False):
compression_config = ContextWindowCompressionConfig()
@@ -986,9 +992,9 @@ class GeminiLiveLLMService(LLMService):
config.proactivity = self._settings.proactivity
# Add VAD configuration to configuration, if provided
if self._settings.vad:
vad_params = assert_given(self._settings.vad)
if vad_params:
vad_config = AutomaticActivityDetection()
vad_params = self._settings.vad
has_vad_settings = False
# Only add parameters that are explicitly set
@@ -1026,7 +1032,8 @@ class GeminiLiveLLMService(LLMService):
tools = None
if self._context:
params = adapter.get_llm_invocation_params(
self._context, system_instruction=self._system_instruction_from_init
self._context,
system_instruction=assert_given(self._system_instruction_from_init),
)
system_instruction = params["system_instruction"]
tools = params["tools"]
@@ -1048,9 +1055,10 @@ class GeminiLiveLLMService(LLMService):
await self.push_error(error_msg=f"Initialization error: {e}", exception=e)
async def _connection_task_handler(self, config: LiveConnectConfig):
async with self._client.aio.live.connect(
model=self._settings.model, config=config
) as session:
model = assert_given(self._settings.model)
if model is None:
raise ValueError("Gemini Live model must be specified")
async with self._client.aio.live.connect(model=model, config=config) as session:
logger.info("Connected to Gemini service")
# Mark connection start time

View File

@@ -27,7 +27,7 @@ from pydantic import BaseModel, Field
from pipecat.frames.frames import ErrorFrame, Frame, URLImageRawFrame
from pipecat.services.google.utils import update_google_client_http_options
from pipecat.services.image_service import ImageGenService
from pipecat.services.settings import NOT_GIVEN, ImageGenSettings, _NotGiven
from pipecat.services.settings import NOT_GIVEN, ImageGenSettings, _NotGiven, assert_given
try:
from google import genai
@@ -157,8 +157,8 @@ class GoogleImageGenService(ImageGenService):
model=self._settings.model,
prompt=prompt,
config=types.GenerateImagesConfig(
number_of_images=self._settings.number_of_images,
negative_prompt=self._settings.negative_prompt,
number_of_images=assert_given(self._settings.number_of_images),
negative_prompt=assert_given(self._settings.negative_prompt),
),
)
await self.stop_ttfb_metrics()

View File

@@ -43,6 +43,7 @@ from pipecat.services.settings import (
NOT_GIVEN,
LLMSettings,
_NotGiven,
assert_given,
is_given,
)
from pipecat.utils.tracing.service_decorators import traced_llm
@@ -356,10 +357,9 @@ class GoogleLLMService(LLMService):
}
# Add thinking parameters if configured
if self._settings.thinking:
generation_params["thinking_config"] = self._settings.thinking.model_dump(
exclude_unset=True
)
thinking = assert_given(self._settings.thinking)
if thinking:
generation_params["thinking_config"] = thinking.model_dump(exclude_unset=True)
if self._settings.extra:
generation_params.update(self._settings.extra)
@@ -368,8 +368,9 @@ class GoogleLLMService(LLMService):
def _maybe_unset_thinking_budget(self, generation_params: dict[str, Any]):
try:
model = assert_given(self._settings.model)
# If we have an image model, we don't apply a thinking default.
if "image" in self._settings.model:
if model is None or "image" in model:
return
# If thinking_config is already set, don't override it.
if "thinking_config" in generation_params:
@@ -377,7 +378,6 @@ class GoogleLLMService(LLMService):
# Apply model-aware low-latency thinking defaults.
# Gemini 2.5 Flash: disable thinking via thinking_budget.
# Gemini 3+ Flash: use minimal thinking via thinking_level.
model = self._settings.model
if model.startswith("gemini-2.5-flash"):
generation_params["thinking_config"] = {"thinking_budget": 0}
elif model.startswith("gemini-3") and "flash" in model:
@@ -388,7 +388,7 @@ class GoogleLLMService(LLMService):
async def _stream_content(self, context: LLMContext) -> AsyncIterator[GenerateContentResponse]:
adapter = self.get_llm_adapter()
params: GeminiLLMInvocationParams = adapter.get_llm_invocation_params(
context, system_instruction=self._settings.system_instruction
context, system_instruction=assert_given(self._settings.system_instruction)
)
logger.debug(

View File

@@ -37,7 +37,7 @@ from pipecat.frames.frames import (
StartFrame,
TranscriptionFrame,
)
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven, assert_given
from pipecat.services.stt_latency import GOOGLE_TTFS_P99
from pipecat.services.stt_service import STTService
from pipecat.transcriptions.language import Language, resolve_language
@@ -641,8 +641,9 @@ class GoogleSTTService(STTService):
"""
if self._settings.languages:
return [self.language_to_service_language(lang) for lang in self._settings.languages]
if self._settings.language_codes:
return list(self._settings.language_codes)
language_codes = assert_given(self._settings.language_codes)
if language_codes:
return list(language_codes)
return ["en-US"]
async def _reconnect_if_needed(self):

View File

@@ -39,6 +39,7 @@ from pipecat.services.settings import (
NOT_GIVEN,
TTSSettings,
_NotGiven,
assert_given,
is_given,
)
from pipecat.services.tts_service import TTSService
@@ -815,8 +816,9 @@ class GoogleHttpTTSService(TTSService):
try:
# Check if the voice is a Chirp voice (including Chirp 3) or Journey voice
is_chirp_voice = "chirp" in self._settings.voice.lower()
is_journey_voice = "journey" in self._settings.voice.lower()
voice_name = assert_given(self._settings.voice)
is_chirp_voice = "chirp" in (voice_name or "").lower()
is_journey_voice = "journey" in (voice_name or "").lower()
# Create synthesis input based on voice_id
if is_chirp_voice or is_journey_voice:
@@ -1447,7 +1449,7 @@ class GeminiTTSService(GoogleBaseTTSService):
# Use base class streaming logic with prompt support
async for frame in self._stream_tts(
streaming_config, text, context_id, self._settings.prompt
streaming_config, text, context_id, assert_given(self._settings.prompt)
):
yield frame

View File

@@ -31,7 +31,7 @@ from pipecat.frames.frames import (
VADUserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven, assert_given
from pipecat.services.stt_latency import GRADIUM_TTFS_P99
from pipecat.services.stt_service import WebsocketSTTService
from pipecat.transcriptions.language import Language, resolve_language
@@ -397,8 +397,9 @@ class GradiumSTTService(WebsocketSTTService):
json_config = {}
if self._json_config:
json_config = json.loads(self._json_config)
if self._settings.language:
gradium_language = language_to_gradium_language(self._settings.language)
language = assert_given(self._settings.language)
if language:
gradium_language = language_to_gradium_language(language)
if gradium_language:
json_config["language"] = gradium_language
if self._settings.delay_in_frames:
@@ -482,7 +483,7 @@ class GradiumSTTService(WebsocketSTTService):
text=accumulated,
user_id=self._user_id,
timestamp=time_now_iso8601(),
language=self._settings.language,
language=assert_given(self._settings.language),
)
)
await self.stop_processing_metrics()
@@ -514,12 +515,13 @@ class GradiumSTTService(WebsocketSTTService):
text = " ".join(self._accumulated_text)
self._accumulated_text.clear()
logger.debug(f"Final transcription: [{text}]")
language = assert_given(self._settings.language)
await self.push_frame(
TranscriptionFrame(
text,
self._user_id,
time_now_iso8601(),
self._settings.language,
language,
)
)
await self._trace_transcription(text, is_final=True, language=self._settings.language)
await self._trace_transcription(text, is_final=True, language=language)

View File

@@ -19,7 +19,7 @@ from pipecat.frames.frames import (
Frame,
TTSAudioRawFrame,
)
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, assert_given
from pipecat.services.tts_service import TTSService
from pipecat.transcriptions.language import Language
from pipecat.utils.tracing.service_decorators import traced_tts
@@ -173,13 +173,20 @@ class GroqTTSService(TTSService):
logger.debug(f"{self}: Generating TTS [{text}]")
measuring_ttfb = True
try:
model = assert_given(self._settings.model)
voice = assert_given(self._settings.voice)
speed = assert_given(self._settings.speed)
if model is None:
raise ValueError("Groq TTS model must be specified")
if speed is None:
raise ValueError("Groq TTS speed must be specified")
response = await self._client.audio.speech.create(
model=self._settings.model,
voice=self._settings.voice,
model=model,
voice=voice,
response_format=self._output_format,
# Note: as of 2026-02-25, only a speed of 1.0 is supported, but
# here we pass it for completeness and future-proofing
speed=self._settings.speed,
speed=speed,
input=text,
)

View File

@@ -26,7 +26,7 @@ from pipecat.frames.frames import (
TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, assert_given
from pipecat.services.tts_service import TTSService
from pipecat.utils.tracing.service_decorators import traced_tts
@@ -292,7 +292,7 @@ class HumeTTSService(TTSService):
# Build the request payload
utterance_kwargs: dict[str, Any] = {
"text": text,
"voice": PostedUtteranceVoiceWithId(id=self._settings.voice),
"voice": PostedUtteranceVoiceWithId(id=assert_given(self._settings.voice)),
}
if self._settings.description is not None:
utterance_kwargs["description"] = self._settings.description

View File

@@ -55,6 +55,7 @@ from pipecat.services.settings import (
NOT_GIVEN,
LLMSettings,
_NotGiven,
assert_given,
is_given,
)
from pipecat.utils.time import time_now_iso8601
@@ -385,13 +386,14 @@ class InworldRealtimeLLMService(LLMService):
Returns:
Configured sample rate or None if not manually configured.
"""
if not self._settings.session_properties.audio:
session_properties = assert_given(self._settings.session_properties)
if not session_properties.audio:
return None
audio_config = (
self._settings.session_properties.audio.input
session_properties.audio.input
if direction == "input"
else self._settings.session_properties.audio.output
else session_properties.audio.output
)
if audio_config and audio_config.format:
@@ -463,7 +465,7 @@ class InworldRealtimeLLMService(LLMService):
"""
self._input_sample_rate = input_sample_rate
self._output_sample_rate = output_sample_rate
props = self._settings.session_properties
props = assert_given(self._settings.session_properties)
if not props.audio:
props.audio = events.AudioConfiguration()
if not props.audio.input:
@@ -661,12 +663,13 @@ class InworldRealtimeLLMService(LLMService):
async def _send_session_update(self):
"""Update session settings on the server."""
settings = self._settings.session_properties
settings = assert_given(self._settings.session_properties)
adapter: InworldRealtimeLLMAdapter = self.get_llm_adapter()
if self._context:
llm_invocation_params = adapter.get_llm_invocation_params(
self._context, system_instruction=self._settings.system_instruction
self._context,
system_instruction=assert_given(self._settings.system_instruction),
)
if llm_invocation_params["tools"]:
@@ -969,7 +972,8 @@ class InworldRealtimeLLMService(LLMService):
)
llm_invocation_params = adapter.get_llm_invocation_params(
self._context, system_instruction=self._settings.system_instruction
self._context,
system_instruction=assert_given(self._settings.system_instruction),
)
messages = llm_invocation_params["messages"]
@@ -986,7 +990,10 @@ class InworldRealtimeLLMService(LLMService):
await self.start_processing_metrics()
await self.start_ttfb_metrics()
modalities = self._settings.session_properties.output_modalities or ["text", "audio"]
modalities = assert_given(self._settings.session_properties).output_modalities or [
"text",
"audio",
]
await self.send_client_event(
events.ResponseCreateEvent(response=events.ResponseProperties(modalities=modalities))
)

View File

@@ -21,7 +21,7 @@ from pipecat.frames.frames import (
Frame,
TTSAudioRawFrame,
)
from pipecat.services.settings import TTSSettings
from pipecat.services.settings import TTSSettings, assert_given
from pipecat.services.tts_service import TTSService
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.tracing.service_decorators import traced_tts
@@ -215,9 +215,11 @@ class KokoroTTSService(TTSService):
try:
await self.start_tts_usage_metrics(text)
stream = self._kokoro.create_stream(
text, voice=self._settings.voice, lang=self._settings.language, speed=1.0
)
voice = assert_given(self._settings.voice)
lang = assert_given(self._settings.language)
if lang is None:
raise ValueError("Kokoro TTS language must be specified")
stream = self._kokoro.create_stream(text, voice=voice, lang=lang, speed=1.0)
async for samples, sample_rate in stream:
await self.stop_ttfb_metrics()

View File

@@ -53,7 +53,7 @@ from pipecat.processors.aggregators.llm_context import (
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_service import AIService
from pipecat.services.settings import LLMSettings
from pipecat.services.settings import LLMSettings, assert_given
from pipecat.services.websocket_service import WebsocketService
from pipecat.turns.user_turn_completion_mixin import UserTurnCompletionLLMServiceMixin
from pipecat.utils.async_tool_cancellation import (
@@ -378,7 +378,9 @@ class LLMService(UserTurnCompletionLLMServiceMixin, AIService):
self._base_system_instruction = None
if "user_turn_completion_config" in changed and self._filter_incomplete_user_turns:
self.set_user_turn_completion_config(self._settings.user_turn_completion_config)
self.set_user_turn_completion_config(
assert_given(self._settings.user_turn_completion_config)
)
self._compose_system_instruction()
if (

View File

@@ -27,7 +27,7 @@ from pipecat.frames.frames import (
VADUserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.settings import STTSettings
from pipecat.services.settings import STTSettings, assert_given
from pipecat.services.stt_latency import MISTRAL_TTFS_P99
from pipecat.services.stt_service import STTService
from pipecat.utils.time import time_now_iso8601
@@ -214,8 +214,11 @@ class MistralSTTService(STTService):
sample_rate=self.sample_rate,
)
model = assert_given(self._settings.model)
if model is None:
raise ValueError("Mistral STT model must be specified")
self._connection = await self._client.audio.realtime.connect(
model=self._settings.model,
model=model,
audio_format=audio_format,
target_streaming_delay_ms=self._target_streaming_delay_ms,
)

View File

@@ -22,7 +22,7 @@ from pipecat.frames.frames import (
Frame,
TTSAudioRawFrame,
)
from pipecat.services.settings import TTSSettings
from pipecat.services.settings import TTSSettings, assert_given
from pipecat.services.tts_service import TTSService
from pipecat.utils.tracing.service_decorators import traced_tts
@@ -137,8 +137,8 @@ class MistralTTSService(TTSService):
async with await self._client.audio.speech.complete_async(
input=text,
model=self._settings.model,
voice_id=self._settings.voice,
model=assert_given(self._settings.model),
voice_id=assert_given(self._settings.voice),
response_format="pcm",
stream=True,
) as event_stream:

View File

@@ -25,7 +25,7 @@ from pipecat.frames.frames import (
VisionFullResponseStartFrame,
VisionTextFrame,
)
from pipecat.services.settings import VisionSettings
from pipecat.services.settings import VisionSettings, assert_given
from pipecat.services.vision_service import VisionService
try:
@@ -127,8 +127,11 @@ class MoondreamService(VisionService):
logger.debug("Loading Moondream model...")
model_path = assert_given(self._settings.model)
if model_path is None:
raise ValueError("Moondream model must be specified")
self._model = AutoModelForCausalLM.from_pretrained(
self._settings.model,
model_path,
trust_remote_code=True,
revision=revision,
device_map={"": device},

View File

@@ -30,7 +30,7 @@ from pipecat.frames.frames import (
StartFrame,
TranscriptionFrame,
)
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven, assert_given
from pipecat.services.stt_latency import NVIDIA_TTFS_P99
from pipecat.services.stt_service import SegmentedSTTService, STTService
from pipecat.transcriptions.language import Language, resolve_language
@@ -307,8 +307,11 @@ class NvidiaSTTService(STTService):
interim_results=s.interim_results,
)
if s.boosted_lm_words:
riva.client.add_word_boosting_to_config(config, s.boosted_lm_words, s.boosted_lm_score)
boosted_lm_words = assert_given(s.boosted_lm_words)
if boosted_lm_words:
riva.client.add_word_boosting_to_config(
config, boosted_lm_words, assert_given(s.boosted_lm_score)
)
riva.client.add_endpoint_parameters_to_config(
config,
@@ -323,9 +326,10 @@ class NvidiaSTTService(STTService):
if self._custom_configuration:
riva.client.add_custom_configuration_to_config(config, self._custom_configuration)
if s.speaker_diarization:
speaker_diarization = assert_given(s.speaker_diarization)
if speaker_diarization:
riva.client.add_speaker_diarization_to_config(
config, s.speaker_diarization, s.diarization_max_speakers
config, speaker_diarization, assert_given(s.diarization_max_speakers)
)
return config
@@ -468,6 +472,7 @@ class NvidiaSTTService(STTService):
transcript = result.alternatives[0].transcript
if transcript and len(transcript) > 0:
language = assert_given(self._settings.language)
if result.is_final:
await self.stop_processing_metrics()
logger.debug(f"Transcription: [{transcript}]")
@@ -476,7 +481,7 @@ class NvidiaSTTService(STTService):
transcript,
self._user_id,
time_now_iso8601(),
self._settings.language,
language,
result=result,
finalized=True,
)
@@ -484,7 +489,7 @@ class NvidiaSTTService(STTService):
await self._handle_transcription(
transcript=transcript,
is_final=result.is_final,
language=self._settings.language,
language=language,
)
else:
await self.push_frame(
@@ -492,7 +497,7 @@ class NvidiaSTTService(STTService):
transcript,
self._user_id,
time_now_iso8601(),
self._settings.language,
language,
result=result,
)
)
@@ -689,7 +694,7 @@ class NvidiaSegmentedSTTService(SegmentedSTTService):
def _get_language_code(self) -> str:
"""Get the current NVIDIA Nemotron Speech language code string."""
return self._settings.language or "en-US"
return assert_given(self._settings.language) or "en-US"
def _create_recognition_config(self):
"""Create the NVIDIA Nemotron Speech ASR recognition configuration."""
@@ -705,8 +710,11 @@ class NvidiaSegmentedSTTService(SegmentedSTTService):
)
# Add word boosting if specified
if s.boosted_lm_words:
riva.client.add_word_boosting_to_config(config, s.boosted_lm_words, s.boosted_lm_score)
boosted_lm_words = assert_given(s.boosted_lm_words)
if boosted_lm_words:
riva.client.add_word_boosting_to_config(
config, boosted_lm_words, assert_given(s.boosted_lm_score)
)
# Add any custom configuration
if self._custom_configuration:
@@ -796,15 +804,16 @@ class NvidiaSegmentedSTTService(SegmentedSTTService):
text = alternatives[0].transcript.strip()
if text:
logger.debug(f"Transcription: [{text}]")
language = assert_given(self._settings.language)
yield TranscriptionFrame(
text,
self._user_id,
time_now_iso8601(),
self._settings.language,
language,
)
transcription_found = True
await self._handle_transcription(text, True, self._settings.language)
await self._handle_transcription(text, True, language)
if not transcription_found:
logger.debug(

View File

@@ -26,7 +26,7 @@ from pipecat.frames.frames import (
URLImageRawFrame,
)
from pipecat.services.image_service import ImageGenService
from pipecat.services.settings import NOT_GIVEN, ImageGenSettings, _NotGiven
from pipecat.services.settings import NOT_GIVEN, ImageGenSettings, _NotGiven, assert_given
@dataclass
@@ -117,7 +117,10 @@ class OpenAIImageGenService(ImageGenService):
logger.debug(f"Generating image from prompt: {prompt}")
image = await self._client.images.generate(
prompt=prompt, model=self._settings.model, n=1, size=self._settings.image_size
prompt=prompt,
model=assert_given(self._settings.model),
n=1,
size=assert_given(self._settings.image_size),
)
image_url = image.data[0].url

View File

@@ -55,6 +55,7 @@ from pipecat.services.settings import (
NOT_GIVEN,
LLMSettings,
_NotGiven,
assert_given,
is_given,
)
from pipecat.transcriptions.language import Language
@@ -359,12 +360,18 @@ class OpenAIRealtimeLLMService(LLMService):
def _is_modality_enabled(self, modality: str) -> bool:
"""Check if a specific modality is enabled, "text" or "audio"."""
modalities = self._settings.session_properties.output_modalities or ["audio", "text"]
modalities = assert_given(self._settings.session_properties).output_modalities or [
"audio",
"text",
]
return modality in modalities
def _get_enabled_modalities(self) -> list[str]:
"""Get the list of enabled modalities."""
modalities = self._settings.session_properties.output_modalities or ["audio", "text"]
modalities = assert_given(self._settings.session_properties).output_modalities or [
"audio",
"text",
]
# API only supports single modality responses: either ["text"] or ["audio"]
if "audio" in modalities:
return ["audio"]
@@ -436,10 +443,11 @@ class OpenAIRealtimeLLMService(LLMService):
async def _handle_interruption(self):
# None and False are different. Check for False. None means we're using OpenAI's
# built-in turn detection defaults.
session_properties = assert_given(self._settings.session_properties)
turn_detection_disabled = (
self._settings.session_properties.audio
and self._settings.session_properties.audio.input
and self._settings.session_properties.audio.input.turn_detection is False
session_properties.audio
and session_properties.audio.input
and session_properties.audio.input.turn_detection is False
)
if turn_detection_disabled:
await self.send_client_event(events.InputAudioBufferClearEvent())
@@ -458,10 +466,11 @@ class OpenAIRealtimeLLMService(LLMService):
async def _handle_user_stopped_speaking(self, frame):
# None and False are different. Check for False. None means we're using OpenAI's
# built-in turn detection defaults.
session_properties = assert_given(self._settings.session_properties)
turn_detection_disabled = (
self._settings.session_properties.audio
and self._settings.session_properties.audio.input
and self._settings.session_properties.audio.input.turn_detection is False
session_properties.audio
and session_properties.audio.input
and session_properties.audio.input.turn_detection is False
)
if turn_detection_disabled:
await self.send_client_event(events.InputAudioBufferCommitEvent())
@@ -647,12 +656,13 @@ class OpenAIRealtimeLLMService(LLMService):
return changed
async def _send_session_update(self):
settings = self._settings.session_properties
settings = assert_given(self._settings.session_properties)
adapter: OpenAIRealtimeLLMAdapter = self.get_llm_adapter()
if self._context:
llm_invocation_params = adapter.get_llm_invocation_params(
self._context, system_instruction=self._settings.system_instruction
self._context,
system_instruction=assert_given(self._settings.system_instruction),
)
# tools given in the context override the tools in the session properties

View File

@@ -49,7 +49,7 @@ from pipecat.services.llm_service import (
WebsocketReconnectedError,
)
from pipecat.services.settings import NOT_GIVEN as _NOT_GIVEN
from pipecat.services.settings import LLMSettings, _NotGiven
from pipecat.services.settings import LLMSettings, _NotGiven, assert_given
from pipecat.utils.tracing.service_decorators import traced_llm
try:
@@ -285,7 +285,9 @@ class _BaseOpenAIResponsesLLMService(LLMService):
The LLM's response as a string, or None if no response is generated.
"""
adapter: OpenAIResponsesLLMAdapter = self.get_llm_adapter()
effective_instruction = system_instruction or self._settings.system_instruction
effective_instruction = system_instruction or assert_given(
self._settings.system_instruction
)
invocation_params = adapter.get_llm_invocation_params(
context, system_instruction=effective_instruction
)
@@ -742,7 +744,7 @@ class OpenAIResponsesLLMService(_BaseOpenAIResponsesLLMService, WebsocketLLMServ
)
invocation_params = adapter.get_llm_invocation_params(
context, system_instruction=self._settings.system_instruction
context, system_instruction=assert_given(self._settings.system_instruction)
)
full_input = invocation_params["input"]
@@ -982,7 +984,7 @@ class OpenAIResponsesHttpLLMService(_BaseOpenAIResponsesLLMService):
)
invocation_params = adapter.get_llm_invocation_params(
context, system_instruction=self._settings.system_instruction
context, system_instruction=assert_given(self._settings.system_instruction)
)
params = self._build_response_params(invocation_params)

View File

@@ -36,7 +36,7 @@ from pipecat.frames.frames import (
VADUserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven, assert_given
from pipecat.services.stt_latency import OPENAI_REALTIME_TTFS_P99, OPENAI_TTFS_P99
from pipecat.services.stt_service import WebsocketSTTService
from pipecat.services.whisper.base_stt import (
@@ -534,9 +534,8 @@ class OpenAIRealtimeSTTService(WebsocketSTTService):
"""Send ``session.update`` to configure the transcription session."""
transcription: dict = {"model": self._settings.model}
language_code = (
self._language_to_code(self._settings.language) if self._settings.language else None
)
language = assert_given(self._settings.language)
language_code = self._language_to_code(language) if language else None
if language_code:
transcription["language"] = language_code

View File

@@ -24,7 +24,7 @@ from pipecat.frames.frames import (
StartFrame,
TTSAudioRawFrame,
)
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, assert_given
from pipecat.services.tts_service import TTSService
from pipecat.utils.tracing.service_decorators import traced_tts
@@ -240,7 +240,7 @@ class OpenAITTSService(TTSService):
create_params = {
"input": text,
"model": self._settings.model,
"voice": VALID_VOICES[self._settings.voice],
"voice": VALID_VOICES[assert_given(self._settings.voice)],
"response_format": "pcm",
}

View File

@@ -17,6 +17,7 @@ from loguru import logger
from pipecat.services.openai.base_llm import BaseOpenAILLMService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.settings import assert_given
@dataclass
@@ -105,7 +106,8 @@ class OpenRouterLLMService(OpenAILLMService):
Transformed parameters ready for the API call.
"""
params = super().build_chat_completion_params(params_from_context)
if "gemini" in self._settings.model.lower():
model = assert_given(self._settings.model)
if model is not None and "gemini" in model.lower():
messages = params.get("messages", [])
if not messages:
return params

View File

@@ -20,7 +20,7 @@ from pipecat.frames.frames import (
Frame,
TTSStoppedFrame,
)
from pipecat.services.settings import TTSSettings
from pipecat.services.settings import TTSSettings, assert_given
from pipecat.services.tts_service import TTSService
from pipecat.utils.tracing.service_decorators import traced_tts
@@ -100,7 +100,7 @@ class PiperTTSService(TTSService):
download_dir = download_dir or Path.cwd()
_voice = self._settings.voice
_voice = assert_given(self._settings.voice)
model_file = f"{_voice}.onnx"
model_path_resolved = Path(download_dir) / model_file

View File

@@ -37,6 +37,7 @@ from pipecat.services.settings import (
NOT_GIVEN,
STTSettings,
_NotGiven,
assert_given,
is_given,
)
from pipecat.services.stt_latency import SARVAM_TTFS_P99
@@ -319,8 +320,8 @@ class SarvamSTTService(STTService):
default_settings.apply_update(settings)
# Resolve model config and validate (after all overrides)
resolved_model = default_settings.model
if resolved_model not in MODEL_CONFIGS:
resolved_model = assert_given(default_settings.model)
if resolved_model is None or resolved_model not in MODEL_CONFIGS:
allowed = ", ".join(sorted(MODEL_CONFIGS.keys()))
raise ValueError(f"Unsupported model '{resolved_model}'. Allowed values: {allowed}.")
@@ -407,8 +408,9 @@ class SarvamSTTService(STTService):
def _get_language_string(self) -> str | None:
"""Resolve the current language setting to a Sarvam language code string."""
if self._settings.language:
return language_to_sarvam_language(self._settings.language)
language = assert_given(self._settings.language)
if language:
return language_to_sarvam_language(language)
return self._config.default_language
def can_generate_metrics(self) -> bool:

View File

@@ -59,7 +59,7 @@ from pipecat.frames.frames import (
TTSStoppedFrame,
)
from pipecat.services.sarvam._sdk import sdk_headers
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, is_given
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, assert_given, is_given
from pipecat.services.tts_service import InterruptibleTTSService, TextAggregationMode, TTSService
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.tracing.service_decorators import traced_tts
@@ -489,8 +489,8 @@ class SarvamHttpTTSService(TTSService):
default_settings.apply_update(settings)
# Get model configuration (validates model exists)
resolved_model = default_settings.model
if resolved_model not in TTS_MODEL_CONFIGS:
resolved_model = assert_given(default_settings.model)
if resolved_model is None or resolved_model not in TTS_MODEL_CONFIGS:
allowed = ", ".join(sorted(TTS_MODEL_CONFIGS.keys()))
raise ValueError(f"Unsupported model '{resolved_model}'. Allowed values: {allowed}.")
@@ -505,7 +505,7 @@ class SarvamHttpTTSService(TTSService):
default_settings.voice = self._config.default_speaker
# Validate and clamp pace to model's valid range
pace = default_settings.pace
pace = assert_given(default_settings.pace)
pace_min, pace_max = self._config.pace_range
if pace is not None and (pace < pace_min or pace > pace_max):
logger.warning(f"Pace {pace} is outside model range ({pace_min}-{pace_max}). Clamping.")
@@ -907,8 +907,8 @@ class SarvamTTSService(InterruptibleTTSService):
default_settings.apply_update(settings)
# Get model configuration (validates model exists)
resolved_model = default_settings.model
if resolved_model not in TTS_MODEL_CONFIGS:
resolved_model = assert_given(default_settings.model)
if resolved_model is None or resolved_model not in TTS_MODEL_CONFIGS:
allowed = ", ".join(sorted(TTS_MODEL_CONFIGS.keys()))
raise ValueError(f"Unsupported model '{resolved_model}'. Allowed values: {allowed}.")
@@ -923,7 +923,7 @@ class SarvamTTSService(InterruptibleTTSService):
default_settings.voice = self._config.default_speaker
# Validate and clamp pace to model's valid range
pace = default_settings.pace
pace = assert_given(default_settings.pace)
pace_min, pace_max = self._config.pace_range
if pace is not None and (pace < pace_min or pace > pace_max):
logger.warning(f"Pace {pace} is outside model range ({pace_min}-{pace_max}). Clamping.")

View File

@@ -25,7 +25,7 @@ from pipecat.frames.frames import (
VADUserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven, assert_given
from pipecat.services.stt_latency import SONIOX_TTFS_P99
from pipecat.services.stt_service import WebsocketSTTService
from pipecat.transcriptions.language import Language, resolve_language
@@ -504,7 +504,7 @@ class SonioxSTTService(WebsocketSTTService):
"num_channels": self._num_channels,
"enable_endpoint_detection": enable_endpoint_detection,
"sample_rate": self.sample_rate,
"language_hints": _prepare_language_hints(s.language_hints),
"language_hints": _prepare_language_hints(assert_given(s.language_hints)),
"language_hints_strict": s.language_hints_strict,
"context": context,
"enable_speaker_diarization": s.enable_speaker_diarization,

View File

@@ -34,7 +34,7 @@ from pipecat.frames.frames import (
VADUserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven, assert_given
from pipecat.services.stt_latency import SPEECHMATICS_TTFS_P99
from pipecat.services.stt_service import STTService
from pipecat.transcriptions.language import Language, resolve_language
@@ -719,22 +719,26 @@ class SpeechmaticsSTTService(STTService):
s = settings
# Preset from turn detection mode
config = VoiceAgentConfigPreset.load(s.turn_detection_mode.value)
turn_detection_mode = assert_given(s.turn_detection_mode)
config = VoiceAgentConfigPreset.load(turn_detection_mode.value)
# Audio encoding (init-only, stored as instance attribute)
config.audio_encoding = self._audio_encoding
# Language + domain
language = s.language
language = assert_given(s.language)
config.language = self._language_to_speechmatics_language(language)
config.domain = s.domain if s.domain is not None else None
config.output_locale = self._locale_to_speechmatics_locale(config.language, language)
# Speaker config
focus_speakers = assert_given(s.focus_speakers)
ignore_speakers = assert_given(s.ignore_speakers)
focus_mode = assert_given(s.focus_mode)
config.speaker_config = SpeakerFocusConfig(
focus_speakers=s.focus_speakers if s.focus_speakers is not None else [],
ignore_speakers=s.ignore_speakers if s.ignore_speakers is not None else [],
focus_mode=s.focus_mode if s.focus_mode is not None else SpeakerFocusMode.RETAIN,
focus_speakers=focus_speakers if focus_speakers is not None else [],
ignore_speakers=ignore_speakers if ignore_speakers is not None else [],
focus_mode=focus_mode if focus_mode is not None else SpeakerFocusMode.RETAIN,
)
config.known_speakers = s.known_speakers if s.known_speakers is not None else []
@@ -766,7 +770,8 @@ class SpeechmaticsSTTService(STTService):
setattr(config, key, value)
# Enable sentences
split = s.split_sentences if s.split_sentences is not None else False
split_sentences = assert_given(s.split_sentences)
split = split_sentences if split_sentences is not None else False
config.speech_segment_config = SpeechSegmentConfig(emit_sentences=split or False)
return config
@@ -962,11 +967,9 @@ class SpeechmaticsSTTService(STTService):
# Create frame from segment
def attr_from_segment(segment: dict[str, Any]) -> dict[str, Any]:
# Formats the output text based on the speaker and defined formats from the config.
text = (
self._settings.speaker_active_format
if segment.get("is_active", True)
else self._settings.speaker_passive_format
).format(
active_format = assert_given(self._settings.speaker_active_format)
passive_format = assert_given(self._settings.speaker_passive_format)
text = (active_format if segment.get("is_active", True) else passive_format).format(
**{
"speaker_id": segment.get("speaker_id", "UU"),
"text": segment.get("text", ""),

View File

@@ -20,7 +20,7 @@ from pipecat.frames.frames import (
Frame,
TTSAudioRawFrame,
)
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, assert_given
from pipecat.services.tts_service import TTSService
from pipecat.utils.network import exponential_backoff_time
from pipecat.utils.tracing.service_decorators import traced_tts
@@ -183,7 +183,9 @@ class SpeechmaticsTTSService(TTSService):
}
# Complete HTTP URL
url = _get_endpoint_url(self._base_url, self._settings.voice, self.sample_rate)
url = _get_endpoint_url(
self._base_url, assert_given(self._settings.voice), self.sample_rate
)
try:
# Track attempt
@@ -208,7 +210,8 @@ class SpeechmaticsTTSService(TTSService):
attempt += 1
# Check if we've exceeded the maximum number of attempts
if attempt >= self._settings.max_retries:
max_retries = assert_given(self._settings.max_retries)
if max_retries is not None and attempt >= max_retries:
raise ValueError()
# Report error frame

View File

@@ -48,7 +48,7 @@ from pipecat.frames.frames import (
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
from pipecat.services.settings import NOT_GIVEN, LLMSettings, _NotGiven
from pipecat.services.settings import NOT_GIVEN, LLMSettings, _NotGiven, assert_given
from pipecat.utils.time import time_now_iso8601
try:
@@ -377,7 +377,7 @@ class UltravoxRealtimeLLMService(LLMService):
async def _update_settings(self, delta: Settings):
changed = await super()._update_settings(delta)
if "output_medium" in changed:
await self._update_output_medium(self._settings.output_medium)
await self._update_output_medium(assert_given(self._settings.output_medium))
self._warn_unhandled_updated_settings(changed.keys() - {"output_medium"})
return changed

View File

@@ -21,7 +21,7 @@ from loguru import logger
from typing_extensions import override
from pipecat.frames.frames import ErrorFrame, Frame, TranscriptionFrame
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven, assert_given
from pipecat.services.stt_service import SegmentedSTTService
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.time import time_now_iso8601
@@ -315,8 +315,11 @@ class WhisperSTTService(SegmentedSTTService):
from faster_whisper import WhisperModel
logger.debug("Loading Whisper model...")
model_name = assert_given(self._settings.model)
if model_name is None:
raise ValueError("Whisper model must be specified")
self._model = WhisperModel(
self._settings.model, device=self._device, compute_type=self._compute_type
model_name, device=self._device, compute_type=self._compute_type
)
logger.debug("Loaded Whisper model")
except ModuleNotFoundError as e:
@@ -354,24 +357,29 @@ class WhisperSTTService(SegmentedSTTService):
# Divide by 32768 because we have signed 16-bit data.
audio_float = np.frombuffer(audio, dtype=np.int16).astype(np.float32) / 32768.0
language = assert_given(self._settings.language)
segments, _ = await asyncio.to_thread(
self._model.transcribe, audio_float, language=self._settings.language
self._model.transcribe, audio_float, language=language
)
text: str = ""
no_speech_prob_threshold = assert_given(self._settings.no_speech_prob)
for segment in segments:
if segment.no_speech_prob < self._settings.no_speech_prob:
if (
no_speech_prob_threshold is not None
and segment.no_speech_prob < no_speech_prob_threshold
):
text += f"{segment.text} "
await self.stop_processing_metrics()
if text:
await self._handle_transcription(text, True, self._settings.language)
await self._handle_transcription(text, True, language)
logger.debug(f"Transcription: [{text}]")
yield TranscriptionFrame(
text,
self._user_id,
time_now_iso8601(),
self._settings.language,
language,
)
@@ -494,20 +502,29 @@ class WhisperSTTServiceMLX(WhisperSTTService):
# Divide by 32768 because we have signed 16-bit data.
audio_float = np.frombuffer(audio, dtype=np.int16).astype(np.float32) / 32768.0
model_path = assert_given(self._settings.model)
if model_path is None:
raise ValueError("Whisper model must be specified")
temperature = assert_given(self._settings.temperature)
language = assert_given(self._settings.language)
chunk = await asyncio.to_thread(
mlx_whisper.transcribe,
audio_float,
path_or_hf_repo=self._settings.model,
temperature=self._settings.temperature,
language=self._settings.language,
path_or_hf_repo=model_path,
temperature=temperature,
language=language,
)
text: str = ""
no_speech_prob_threshold = assert_given(self._settings.no_speech_prob)
for segment in chunk.get("segments", []):
# Drop likely hallucinations
if segment.get("compression_ratio", None) == 0.5555555555555556:
continue
if segment.get("no_speech_prob", 0.0) < self._settings.no_speech_prob:
if (
no_speech_prob_threshold is not None
and segment.get("no_speech_prob", 0.0) < no_speech_prob_threshold
):
text += f"{segment.get('text', '')} "
if len(text.strip()) == 0:
@@ -516,13 +533,13 @@ class WhisperSTTServiceMLX(WhisperSTTService):
await self.stop_processing_metrics()
if text:
await self._handle_transcription(text, True, self._settings.language)
await self._handle_transcription(text, True, language)
logger.debug(f"Transcription: [{text}]")
yield TranscriptionFrame(
text,
self._user_id,
time_now_iso8601(),
self._settings.language,
language,
)
except Exception as e:

View File

@@ -53,6 +53,7 @@ from pipecat.services.settings import (
NOT_GIVEN,
LLMSettings,
_NotGiven,
assert_given,
is_given,
)
from pipecat.utils.time import time_now_iso8601
@@ -319,13 +320,14 @@ class GrokRealtimeLLMService(LLMService):
Configured sample rate or None if not manually configured.
For PCMU/PCMA formats, returns 8000 Hz (G.711 standard).
"""
if not self._settings.session_properties.audio:
session_properties = assert_given(self._settings.session_properties)
if not session_properties.audio:
return None
audio_config = (
self._settings.session_properties.audio.input
session_properties.audio.input
if direction == "input"
else self._settings.session_properties.audio.output
else session_properties.audio.output
)
if audio_config and audio_config.format:
@@ -355,8 +357,9 @@ class GrokRealtimeLLMService(LLMService):
def _is_turn_detection_enabled(self) -> bool:
"""Check if server-side VAD is enabled."""
if self._settings.session_properties.turn_detection:
return self._settings.session_properties.turn_detection.type == "server_vad"
session_properties = assert_given(self._settings.session_properties)
if session_properties.turn_detection:
return session_properties.turn_detection.type == "server_vad"
return False
async def _handle_interruption(self):
@@ -423,7 +426,7 @@ class GrokRealtimeLLMService(LLMService):
input_sample_rate: Sample rate for audio input (Hz).
output_sample_rate: Sample rate for audio output (Hz).
"""
props = self._settings.session_properties
props = assert_given(self._settings.session_properties)
if not props.audio:
props.audio = events.AudioConfiguration()
if not props.audio.input:
@@ -592,12 +595,13 @@ class GrokRealtimeLLMService(LLMService):
async def _send_session_update(self):
"""Update session settings on the server."""
settings = self._settings.session_properties
settings = assert_given(self._settings.session_properties)
adapter: GrokRealtimeLLMAdapter = self.get_llm_adapter()
if self._context:
llm_invocation_params = adapter.get_llm_invocation_params(
self._context, system_instruction=self._settings.system_instruction
self._context,
system_instruction=assert_given(self._settings.system_instruction),
)
if llm_invocation_params["tools"]:

View File

@@ -24,7 +24,7 @@ from pipecat.frames.frames import (
StartFrame,
TTSAudioRawFrame,
)
from pipecat.services.settings import TTSSettings
from pipecat.services.settings import TTSSettings, assert_given
from pipecat.services.tts_service import TTSService
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.tracing.service_decorators import traced_tts
@@ -211,7 +211,7 @@ class XTTSService(TTSService):
logger.error(f"{self} no studio speakers available")
return
embeddings = self._studio_speakers[self._settings.voice]
embeddings = self._studio_speakers[assert_given(self._settings.voice)]
url = self._base_url + "/tts_stream"