fix: clear 8 more services from pyright ignore list

A fourth pass over low-error-count files. Drops 8 files (57 → 49) and
full-pyright errors from 525 → 496. Default pyright stays clean.

Optional access on transport/client receivers (4 files). Same fix
shape as #4359 — a receiver typed `X | None` accessed without a
guard. For "should never happen" cases (caller's lifecycle ensures
the field is non-None when the method runs), used `assert` rather
than silent early-return so an invariant violation surfaces loudly:

- `transports/whatsapp/client.py` (5 errors): `_validate_whatsapp_webhook_request`
  was typed `bytes` / `str` but called with `bytes | None` / `str | None`.
  Widened the helper signature and pushed the explicit None-check
  inside (matching its existing empty-string check). Also handled
  `pipecat_connection.get_answer()` returning `None` — would have
  crashed at `.get("sdp")` before.
- `transports/websocket/client.py` (5 errors): four are the deprecated
  `websockets.WebSocketClientProtocol` alias (same `# pyright: ignore[reportAttributeAccessIssue]`
  as the `services/websocket_service.py` fix from earlier in this PR).
  The fifth was `async for message in self._websocket` — traced the
  call chain and confirmed `_client_task` is created only after
  `self._websocket` is assigned and cancelled before it's cleared, so
  the field is never None when `_client_task_handler` runs. Used `assert`.
- `services/openai/stt.py` (4 errors): same pattern. `_receive_messages`
  is started by `_connect()` only when `self._websocket` is set, and
  the reconnect loop in `WebsocketService._receive_task_handler`
  re-establishes it before each retry. `assert` at entry. Plus L478/L483:
  the `try`/`except ModuleNotFoundError` import-guard makes
  `websocket_connect` and `State` `<type> | None`; `__init__` already
  raises `ImportError` if either is None, so an `assert` at the
  `_connect_websocket` use site is honest. Plus an L538 `Language | str`
  cast (same shape as last batch).
- `services/deepgram/flux/base.py` (2 errors): `event = data.get("event")`
  flowed into `_handle_turn_resumed(event: str)` as `Any | None`.
  Tightened with an `isinstance(event, str)` guard before the
  `FluxEventType(event)` lookup. The other error (`average_confidence > min_confidence`
  where `min_confidence: float | None`) was a latent crash on missing
  confidence data — restored the original `not min_confidence` (which
  treats both `None` and `0.0` as "no filter") and added an explicit
  drop-on-missing-confidence-data branch.

`gemini_live` Settings/InputParams (vertex). The deprecated `InputParams`
declares `modalities: GeminiModalities | None` and `media_resolution: GeminiMediaResolution | None`,
but their downstream usage at `services/google/gemini_live/llm.py:952,959`
calls `.value` on each — `None` would crash. Rather than touching the
deprecated input model, translate `None` to the canonical defaults
(`GeminiModalities.AUDIO`, `GeminiMediaResolution.UNSPECIFIED`) at the
assignment site in `vertex/llm.py`. Also fixed an unrelated annotation
bug: `_get_credentials` was annotated `-> str` but actually returns
`service_account.Credentials` (used correctly by the caller — only
the annotation was wrong).

`moondream/vision.py` (3 errors). `frame.format` is `str | None` but
`Image.frombytes(mode, ...)` requires `str`; raise instead of crashing
on missing format. The other two errors are pyright thinking the
moondream2-custom `encode_image` and `query` methods are `Tensor`
(rather than callables) — those are provided by the model code via
`trust_remote_code=True` and aren't visible to pyright on the base
`AutoModelForCausalLM` type. Scoped `# pyright: ignore[reportCallIssue]`
on the two call sites.

`transports/base_output.py` (3 errors). Two are `self._mixer.mix(...)`
calls in `with_mixer`, a closure invoked only when `self._mixer` is
truthy at the call site — captured the mixer to a local variable
inside the closure with an `assert`, then used that. Third is the
PIL `frombytes(mode, ...)` shape — `frame.format is None` early-
return guard at the top of `resize_frame` so the main resize logic
reads cleanly.

`elevenlabs/tts.py` (4 errors). The payload-building dict at L1271
was typed `dict[str, str | dict[str, float | bool]]` — an aspirational
shape that matched only the first two assignments. Subsequent code
assigned `list[dict[...]]` (pronunciation locators) and bools, all
violating the annotation. Same pattern at L926 (the WebSocket-init
`msg`). Both widened to `dict[str, Any]`, which is the honest shape
for a JSON request payload and what similar code uses elsewhere.

Files dropped from the ignore list (57 → 49):
services/deepgram/flux/base.py, services/elevenlabs/tts.py,
services/google/gemini_live/vertex/llm.py,
services/moondream/vision.py, services/openai/stt.py,
transports/base_output.py, transports/websocket/client.py,
transports/whatsapp/client.py.
This commit is contained in:
Paul Kompfner
2026-04-30 11:24:50 -04:00
parent 26a40e2e62
commit 4703df8686
9 changed files with 86 additions and 30 deletions

View File

@@ -19,13 +19,10 @@
"src/pipecat/services/aws/nova_sonic/llm.py", "src/pipecat/services/aws/nova_sonic/llm.py",
"src/pipecat/services/aws/sagemaker/bidi_client.py", "src/pipecat/services/aws/sagemaker/bidi_client.py",
"src/pipecat/services/azure/tts.py", "src/pipecat/services/azure/tts.py",
"src/pipecat/services/deepgram/flux/base.py",
"src/pipecat/services/deepgram/flux/sagemaker/stt.py", "src/pipecat/services/deepgram/flux/sagemaker/stt.py",
"src/pipecat/services/deepgram/sagemaker/stt.py", "src/pipecat/services/deepgram/sagemaker/stt.py",
"src/pipecat/services/deepgram/sagemaker/tts.py", "src/pipecat/services/deepgram/sagemaker/tts.py",
"src/pipecat/services/elevenlabs/tts.py",
"src/pipecat/services/google/gemini_live/llm.py", "src/pipecat/services/google/gemini_live/llm.py",
"src/pipecat/services/google/gemini_live/vertex/llm.py",
"src/pipecat/services/google/llm.py", "src/pipecat/services/google/llm.py",
"src/pipecat/services/google/stt.py", "src/pipecat/services/google/stt.py",
"src/pipecat/services/google/tts.py", "src/pipecat/services/google/tts.py",
@@ -35,12 +32,10 @@
"src/pipecat/services/llm_service.py", "src/pipecat/services/llm_service.py",
"src/pipecat/services/mem0/memory.py", "src/pipecat/services/mem0/memory.py",
"src/pipecat/services/mistral/tts.py", "src/pipecat/services/mistral/tts.py",
"src/pipecat/services/moondream/vision.py",
"src/pipecat/services/nvidia/stt.py", "src/pipecat/services/nvidia/stt.py",
"src/pipecat/services/nvidia/tts.py", "src/pipecat/services/nvidia/tts.py",
"src/pipecat/services/openai/base_llm.py", "src/pipecat/services/openai/base_llm.py",
"src/pipecat/services/openai/realtime/llm.py", "src/pipecat/services/openai/realtime/llm.py",
"src/pipecat/services/openai/stt.py",
"src/pipecat/services/rime/tts.py", "src/pipecat/services/rime/tts.py",
"src/pipecat/services/sambanova/llm.py", "src/pipecat/services/sambanova/llm.py",
"src/pipecat/services/sarvam/stt.py", "src/pipecat/services/sarvam/stt.py",
@@ -52,16 +47,13 @@
"src/pipecat/services/ultravox/llm.py", "src/pipecat/services/ultravox/llm.py",
"src/pipecat/services/whisper/stt.py", "src/pipecat/services/whisper/stt.py",
"src/pipecat/services/xai/realtime/llm.py", "src/pipecat/services/xai/realtime/llm.py",
"src/pipecat/transports/base_output.py",
"src/pipecat/transports/daily/transport.py", "src/pipecat/transports/daily/transport.py",
"src/pipecat/transports/lemonslice/transport.py", "src/pipecat/transports/lemonslice/transport.py",
"src/pipecat/transports/livekit/transport.py", "src/pipecat/transports/livekit/transport.py",
"src/pipecat/transports/smallwebrtc/connection.py", "src/pipecat/transports/smallwebrtc/connection.py",
"src/pipecat/transports/smallwebrtc/transport.py", "src/pipecat/transports/smallwebrtc/transport.py",
"src/pipecat/transports/tavus/transport.py", "src/pipecat/transports/tavus/transport.py",
"src/pipecat/transports/websocket/client.py", "src/pipecat/transports/websocket/server.py"
"src/pipecat/transports/websocket/server.py",
"src/pipecat/transports/whatsapp/client.py"
], ],
"reportMissingImports": false "reportMissingImports": false
} }

View File

@@ -536,6 +536,10 @@ class DeepgramFluxSTTBase(STTService):
event = data.get("event") event = data.get("event")
transcript = data.get("transcript", "") transcript = data.get("transcript", "")
if not isinstance(event, str):
logger.debug(f"Unhandled TurnInfo event (not a string): {event}")
return
try: try:
flux_event_type = FluxEventType(event) flux_event_type = FluxEventType(event)
except ValueError: except ValueError:
@@ -648,7 +652,11 @@ class DeepgramFluxSTTBase(STTService):
detected_language = self._primary_detected_language(data) detected_language = self._primary_detected_language(data)
min_confidence = assert_given(self._settings.min_confidence) min_confidence = assert_given(self._settings.min_confidence)
if not min_confidence or average_confidence > min_confidence: # No threshold (None or 0.0) → accept. Otherwise require confidence
# data and compare; drop if data is missing.
if not min_confidence or (
average_confidence is not None and average_confidence > min_confidence
):
# EndOfTurn means Flux has determined the turn is complete, # EndOfTurn means Flux has determined the turn is complete,
# so this TranscriptionFrame is always finalized # so this TranscriptionFrame is always finalized
await self.push_frame( await self.push_frame(

View File

@@ -923,7 +923,7 @@ class ElevenLabsTTSService(WebsocketTTSService):
self._partial_word_start_time = 0.0 self._partial_word_start_time = 0.0
# Initialize context with voice settings and pronunciation dictionaries # Initialize context with voice settings and pronunciation dictionaries
msg = {"text": " ", "context_id": context_id} msg: dict[str, Any] = {"text": " ", "context_id": context_id}
if self._voice_settings: if self._voice_settings:
msg["voice_settings"] = self._voice_settings msg["voice_settings"] = self._voice_settings
if self._pronunciation_dictionary_locators: if self._pronunciation_dictionary_locators:
@@ -1268,7 +1268,7 @@ class ElevenLabsHttpTTSService(TTSService):
url = f"{self._base_url}/v1/text-to-speech/{self._settings.voice}/stream/with-timestamps" url = f"{self._base_url}/v1/text-to-speech/{self._settings.voice}/stream/with-timestamps"
model_id = assert_given(self._settings.model) model_id = assert_given(self._settings.model)
payload: dict[str, str | dict[str, float | bool]] = { payload: dict[str, Any] = {
"text": text, "text": text,
"model_id": model_id, "model_id": model_id,
} }

View File

@@ -174,11 +174,17 @@ class GeminiLiveVertexLLMService(GeminiLiveLLMService):
default_settings.temperature = params.temperature default_settings.temperature = params.temperature
default_settings.top_k = params.top_k default_settings.top_k = params.top_k
default_settings.top_p = params.top_p default_settings.top_p = params.top_p
default_settings.modalities = params.modalities # `params.modalities` and `params.media_resolution` are typed
# `<Enum> | None` on the deprecated InputParams, but None isn't
# a valid setting value (downstream uses call `.value` on
# them). Fall back to the canonical defaults.
default_settings.modalities = params.modalities or GeminiModalities.AUDIO
default_settings.language = ( default_settings.language = (
language_to_gemini_language(params.language) if params.language else "en-US" language_to_gemini_language(params.language) if params.language else "en-US"
) )
default_settings.media_resolution = params.media_resolution default_settings.media_resolution = (
params.media_resolution or GeminiMediaResolution.UNSPECIFIED
)
default_settings.vad = params.vad default_settings.vad = params.vad
default_settings.context_window_compression = ( default_settings.context_window_compression = (
params.context_window_compression.model_dump() params.context_window_compression.model_dump()
@@ -233,7 +239,9 @@ class GeminiLiveVertexLLMService(GeminiLiveLLMService):
) )
@staticmethod @staticmethod
def _get_credentials(credentials: str | None, credentials_path: str | None) -> str: def _get_credentials(
credentials: str | None, credentials_path: str | None
) -> service_account.Credentials:
"""Retrieve Credentials using Google service account credentials JSON. """Retrieve Credentials using Google service account credentials JSON.
Supports multiple authentication methods: Supports multiple authentication methods:
@@ -246,7 +254,8 @@ class GeminiLiveVertexLLMService(GeminiLiveLLMService):
credentials_path: Path to the service account JSON file. credentials_path: Path to the service account JSON file.
Returns: Returns:
OAuth token for API authentication. A service-account ``Credentials`` object suitable for the Vertex
AI client (with its access token refreshed).
Raises: Raises:
ValueError: If no valid credentials are provided or found. ValueError: If no valid credentials are provided or found.

View File

@@ -153,9 +153,14 @@ class MoondreamService(VisionService):
logger.debug(f"Analyzing image (bytes length: {len(frame.image)})") logger.debug(f"Analyzing image (bytes length: {len(frame.image)})")
def get_image_description(image_bytes: bytes, text: str | None) -> str: def get_image_description(image_bytes: bytes, text: str | None) -> str:
if frame.format is None:
raise ValueError("Cannot decode image bytes without a format")
image = Image.frombytes(frame.format, frame.size, image_bytes) image = Image.frombytes(frame.format, frame.size, image_bytes)
image_embeds = self._model.encode_image(image) # `encode_image` and `query` are custom methods provided by the
description = self._model.query(image_embeds, text)["answer"] # moondream2 model code (via `trust_remote_code=True`) that pyright
# can't see on `AutoModelForCausalLM`'s base type.
image_embeds = self._model.encode_image(image) # pyright: ignore[reportCallIssue]
description = self._model.query(image_embeds, text)["answer"] # pyright: ignore[reportCallIssue]
return description return description
description = await asyncio.to_thread(get_image_description, frame.image, frame.text) description = await asyncio.to_thread(get_image_description, frame.image, frame.text)

View File

@@ -18,7 +18,7 @@ import base64
import json import json
from collections.abc import AsyncGenerator from collections.abc import AsyncGenerator
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Any, Literal from typing import Any, Literal, cast
from loguru import logger from loguru import logger
@@ -475,6 +475,9 @@ class OpenAIRealtimeSTTService(WebsocketSTTService):
async def _connect_websocket(self): async def _connect_websocket(self):
"""Establish the WebSocket connection to the transcription endpoint.""" """Establish the WebSocket connection to the transcription endpoint."""
try: try:
# `__init__` raises if websockets isn't installed, so these symbols
# are non-None by the time any method runs.
assert websocket_connect is not None and State is not None
if self._websocket and self._websocket.state is State.OPEN: if self._websocket and self._websocket.state is State.OPEN:
return return
@@ -534,7 +537,9 @@ class OpenAIRealtimeSTTService(WebsocketSTTService):
"""Send ``session.update`` to configure the transcription session.""" """Send ``session.update`` to configure the transcription session."""
transcription: dict = {"model": self._settings.model} transcription: dict = {"model": self._settings.model}
language = assert_given(self._settings.language) # Technically `_settings.language` could be a raw string, but Language
# is a StrEnum so downstream handles either.
language = cast("Language | None", assert_given(self._settings.language))
language_code = self._language_to_code(language) if language else None language_code = self._language_to_code(language) if language else None
if language_code: if language_code:
transcription["language"] = language_code transcription["language"] = language_code
@@ -611,6 +616,10 @@ class OpenAIRealtimeSTTService(WebsocketSTTService):
Called by ``WebsocketService._receive_task_handler`` which wraps Called by ``WebsocketService._receive_task_handler`` which wraps
this method with automatic reconnection on connection errors. this method with automatic reconnection on connection errors.
""" """
# `_connect` only starts the receive task after `_websocket` is set,
# and reconnects re-establish it before the next iteration, so this
# invariant should always hold when this method runs.
assert self._websocket is not None
async for message in self._websocket: async for message in self._websocket:
try: try:
evt = json.loads(message) evt = json.loads(message)

View File

@@ -771,13 +771,16 @@ class BaseOutputTransport(FrameProcessor):
await self._bot_stopped_speaking() await self._bot_stopped_speaking()
async def with_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]: async def with_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
# Caller below only invokes this when `self._mixer` is set.
mixer = self._mixer
assert mixer is not None
last_frame_time = 0 last_frame_time = 0
silence = b"\x00" * self._audio_chunk_size silence = b"\x00" * self._audio_chunk_size
while True: while True:
try: try:
frame = self._audio_queue.get_nowait() frame = self._audio_queue.get_nowait()
if isinstance(frame, OutputAudioRawFrame): if isinstance(frame, OutputAudioRawFrame):
frame.audio = await self._mixer.mix(frame.audio) frame.audio = await mixer.mix(frame.audio)
last_frame_time = time.time() last_frame_time = time.time()
yield frame yield frame
self._audio_queue.task_done() self._audio_queue.task_done()
@@ -788,7 +791,7 @@ class BaseOutputTransport(FrameProcessor):
await self._bot_stopped_speaking() await self._bot_stopped_speaking()
# Generate an audio frame with only the mixer's part. # Generate an audio frame with only the mixer's part.
frame = OutputAudioRawFrame( frame = OutputAudioRawFrame(
audio=await self._mixer.mix(silence), audio=await mixer.mix(silence),
sample_rate=self._sample_rate, sample_rate=self._sample_rate,
num_channels=self._params.audio_out_channels, num_channels=self._params.audio_out_channels,
) )
@@ -927,6 +930,11 @@ class BaseOutputTransport(FrameProcessor):
""" """
def resize_frame(frame: OutputImageRawFrame) -> OutputImageRawFrame: def resize_frame(frame: OutputImageRawFrame) -> OutputImageRawFrame:
# Without a format we can't decode the bytes, so leave the
# frame as-is and let the transport pass it through unchanged.
if frame.format is None:
return frame
desired_size = (self._params.video_out_width, self._params.video_out_height) desired_size = (self._params.video_out_width, self._params.video_out_height)
# TODO: we should refactor in the future to support dynamic resolutions # TODO: we should refactor in the future to support dynamic resolutions

View File

@@ -64,9 +64,18 @@ class WebsocketClientCallbacks(BaseModel):
on_message: Called when a message is received from the WebSocket. on_message: Called when a message is received from the WebSocket.
""" """
on_connected: Callable[[websockets.WebSocketClientProtocol], Awaitable[None]] on_connected: Callable[
on_disconnected: Callable[[websockets.WebSocketClientProtocol], Awaitable[None]] [websockets.WebSocketClientProtocol], # pyright: ignore[reportAttributeAccessIssue]
on_message: Callable[[websockets.WebSocketClientProtocol, websockets.Data], Awaitable[None]] Awaitable[None],
]
on_disconnected: Callable[
[websockets.WebSocketClientProtocol], # pyright: ignore[reportAttributeAccessIssue]
Awaitable[None],
]
on_message: Callable[
[websockets.WebSocketClientProtocol, websockets.Data], # pyright: ignore[reportAttributeAccessIssue]
Awaitable[None],
]
class WebsocketClientSession: class WebsocketClientSession:
@@ -98,7 +107,7 @@ class WebsocketClientSession:
self._leave_counter = 0 self._leave_counter = 0
self._task_manager: BaseTaskManager | None = None self._task_manager: BaseTaskManager | None = None
self._websocket: websockets.WebSocketClientProtocol | None = None self._websocket: websockets.WebSocketClientProtocol | None = None # pyright: ignore[reportAttributeAccessIssue]
@property @property
def task_manager(self) -> BaseTaskManager: def task_manager(self) -> BaseTaskManager:
@@ -192,6 +201,10 @@ class WebsocketClientSession:
async def _client_task_handler(self): async def _client_task_handler(self):
"""Handle incoming messages from the WebSocket connection.""" """Handle incoming messages from the WebSocket connection."""
# `connect()` only starts this task after `_websocket` is assigned, and
# `disconnect()` cancels the task before clearing `_websocket`, so this
# invariant should always hold when this method runs.
assert self._websocket is not None
try: try:
# Handle incoming messages # Handle incoming messages
async for message in self._websocket: async for message in self._websocket:

View File

@@ -154,8 +154,17 @@ class WhatsAppClient:
return int(challenge) return int(challenge)
async def _validate_whatsapp_webhook_request(self, raw_body: bytes, sha256_signature: str): async def _validate_whatsapp_webhook_request(
self, raw_body: bytes | None, sha256_signature: str | None
):
"""Common handler for both /start and /connect endpoints.""" """Common handler for both /start and /connect endpoints."""
# Callers gate on `self._whatsapp_secret`, so the assert holds.
assert self._whatsapp_secret is not None
if raw_body is None:
raise Exception("Missing raw request body")
if not sha256_signature:
raise Exception("Missing X-Hub-Signature-256 header")
# Compute HMAC SHA256 using your App Secret # Compute HMAC SHA256 using your App Secret
expected_signature = hmac.new( expected_signature = hmac.new(
key=self._whatsapp_secret.encode("utf-8"), key=self._whatsapp_secret.encode("utf-8"),
@@ -164,8 +173,6 @@ class WhatsAppClient:
).hexdigest() ).hexdigest()
# Extract signature from header (strip 'sha256=' prefix) # Extract signature from header (strip 'sha256=' prefix)
if not sha256_signature:
raise Exception("Missing X-Hub-Signature-256 header")
received_signature = sha256_signature.split("sha256=")[-1] received_signature = sha256_signature.split("sha256=")[-1]
# Compare signatures securely # Compare signatures securely
@@ -306,7 +313,12 @@ class WhatsAppClient:
# Create and initialize WebRTC connection # Create and initialize WebRTC connection
pipecat_connection = SmallWebRTCConnection(self._ice_servers) pipecat_connection = SmallWebRTCConnection(self._ice_servers)
await pipecat_connection.initialize(sdp=call.session.sdp, type=call.session.sdp_type) await pipecat_connection.initialize(sdp=call.session.sdp, type=call.session.sdp_type)
sdp_answer = pipecat_connection.get_answer().get("sdp") answer = pipecat_connection.get_answer()
if answer is None:
raise RuntimeError("SmallWebRTC connection produced no SDP answer")
sdp_answer = answer.get("sdp")
if sdp_answer is None:
raise RuntimeError("SmallWebRTC SDP answer missing 'sdp' field")
sdp_answer = self._filter_sdp_for_whatsapp(sdp_answer) sdp_answer = self._filter_sdp_for_whatsapp(sdp_answer)
logger.debug(f"SDP answer generated for call {call.id}") logger.debug(f"SDP answer generated for call {call.id}")