From 4703df8686b8a3771e4f7a54ef04ebdc30888fc0 Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Thu, 30 Apr 2026 11:24:50 -0400 Subject: [PATCH] fix: clear 8 more services from pyright ignore list MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A fourth pass over low-error-count files. Drops 8 files (57 → 49) and full-pyright errors from 525 → 496. Default pyright stays clean. Optional access on transport/client receivers (4 files). Same fix shape as #4359 — a receiver typed `X | None` accessed without a guard. For "should never happen" cases (caller's lifecycle ensures the field is non-None when the method runs), used `assert` rather than silent early-return so an invariant violation surfaces loudly: - `transports/whatsapp/client.py` (5 errors): `_validate_whatsapp_webhook_request` was typed `bytes` / `str` but called with `bytes | None` / `str | None`. Widened the helper signature and pushed the explicit None-check inside (matching its existing empty-string check). Also handled `pipecat_connection.get_answer()` returning `None` — would have crashed at `.get("sdp")` before. - `transports/websocket/client.py` (5 errors): four are the deprecated `websockets.WebSocketClientProtocol` alias (same `# pyright: ignore[reportAttributeAccessIssue]` as the `services/websocket_service.py` fix from earlier in this PR). The fifth was `async for message in self._websocket` — traced the call chain and confirmed `_client_task` is created only after `self._websocket` is assigned and cancelled before it's cleared, so the field is never None when `_client_task_handler` runs. Used `assert`. - `services/openai/stt.py` (4 errors): same pattern. `_receive_messages` is started by `_connect()` only when `self._websocket` is set, and the reconnect loop in `WebsocketService._receive_task_handler` re-establishes it before each retry. `assert` at entry. Plus L478/L483: the `try`/`except ModuleNotFoundError` import-guard makes `websocket_connect` and `State` ` | None`; `__init__` already raises `ImportError` if either is None, so an `assert` at the `_connect_websocket` use site is honest. Plus an L538 `Language | str` cast (same shape as last batch). - `services/deepgram/flux/base.py` (2 errors): `event = data.get("event")` flowed into `_handle_turn_resumed(event: str)` as `Any | None`. Tightened with an `isinstance(event, str)` guard before the `FluxEventType(event)` lookup. The other error (`average_confidence > min_confidence` where `min_confidence: float | None`) was a latent crash on missing confidence data — restored the original `not min_confidence` (which treats both `None` and `0.0` as "no filter") and added an explicit drop-on-missing-confidence-data branch. `gemini_live` Settings/InputParams (vertex). The deprecated `InputParams` declares `modalities: GeminiModalities | None` and `media_resolution: GeminiMediaResolution | None`, but their downstream usage at `services/google/gemini_live/llm.py:952,959` calls `.value` on each — `None` would crash. Rather than touching the deprecated input model, translate `None` to the canonical defaults (`GeminiModalities.AUDIO`, `GeminiMediaResolution.UNSPECIFIED`) at the assignment site in `vertex/llm.py`. Also fixed an unrelated annotation bug: `_get_credentials` was annotated `-> str` but actually returns `service_account.Credentials` (used correctly by the caller — only the annotation was wrong). `moondream/vision.py` (3 errors). `frame.format` is `str | None` but `Image.frombytes(mode, ...)` requires `str`; raise instead of crashing on missing format. The other two errors are pyright thinking the moondream2-custom `encode_image` and `query` methods are `Tensor` (rather than callables) — those are provided by the model code via `trust_remote_code=True` and aren't visible to pyright on the base `AutoModelForCausalLM` type. Scoped `# pyright: ignore[reportCallIssue]` on the two call sites. `transports/base_output.py` (3 errors). Two are `self._mixer.mix(...)` calls in `with_mixer`, a closure invoked only when `self._mixer` is truthy at the call site — captured the mixer to a local variable inside the closure with an `assert`, then used that. Third is the PIL `frombytes(mode, ...)` shape — `frame.format is None` early- return guard at the top of `resize_frame` so the main resize logic reads cleanly. `elevenlabs/tts.py` (4 errors). The payload-building dict at L1271 was typed `dict[str, str | dict[str, float | bool]]` — an aspirational shape that matched only the first two assignments. Subsequent code assigned `list[dict[...]]` (pronunciation locators) and bools, all violating the annotation. Same pattern at L926 (the WebSocket-init `msg`). Both widened to `dict[str, Any]`, which is the honest shape for a JSON request payload and what similar code uses elsewhere. Files dropped from the ignore list (57 → 49): services/deepgram/flux/base.py, services/elevenlabs/tts.py, services/google/gemini_live/vertex/llm.py, services/moondream/vision.py, services/openai/stt.py, transports/base_output.py, transports/websocket/client.py, transports/whatsapp/client.py. --- pyrightconfig.json | 10 +-------- src/pipecat/services/deepgram/flux/base.py | 10 ++++++++- src/pipecat/services/elevenlabs/tts.py | 4 ++-- .../services/google/gemini_live/vertex/llm.py | 17 +++++++++++---- src/pipecat/services/moondream/vision.py | 9 ++++++-- src/pipecat/services/openai/stt.py | 13 ++++++++++-- src/pipecat/transports/base_output.py | 12 +++++++++-- src/pipecat/transports/websocket/client.py | 21 +++++++++++++++---- src/pipecat/transports/whatsapp/client.py | 20 ++++++++++++++---- 9 files changed, 86 insertions(+), 30 deletions(-) diff --git a/pyrightconfig.json b/pyrightconfig.json index db78dad8c..528229034 100644 --- a/pyrightconfig.json +++ b/pyrightconfig.json @@ -19,13 +19,10 @@ "src/pipecat/services/aws/nova_sonic/llm.py", "src/pipecat/services/aws/sagemaker/bidi_client.py", "src/pipecat/services/azure/tts.py", - "src/pipecat/services/deepgram/flux/base.py", "src/pipecat/services/deepgram/flux/sagemaker/stt.py", "src/pipecat/services/deepgram/sagemaker/stt.py", "src/pipecat/services/deepgram/sagemaker/tts.py", - "src/pipecat/services/elevenlabs/tts.py", "src/pipecat/services/google/gemini_live/llm.py", - "src/pipecat/services/google/gemini_live/vertex/llm.py", "src/pipecat/services/google/llm.py", "src/pipecat/services/google/stt.py", "src/pipecat/services/google/tts.py", @@ -35,12 +32,10 @@ "src/pipecat/services/llm_service.py", "src/pipecat/services/mem0/memory.py", "src/pipecat/services/mistral/tts.py", - "src/pipecat/services/moondream/vision.py", "src/pipecat/services/nvidia/stt.py", "src/pipecat/services/nvidia/tts.py", "src/pipecat/services/openai/base_llm.py", "src/pipecat/services/openai/realtime/llm.py", - "src/pipecat/services/openai/stt.py", "src/pipecat/services/rime/tts.py", "src/pipecat/services/sambanova/llm.py", "src/pipecat/services/sarvam/stt.py", @@ -52,16 +47,13 @@ "src/pipecat/services/ultravox/llm.py", "src/pipecat/services/whisper/stt.py", "src/pipecat/services/xai/realtime/llm.py", - "src/pipecat/transports/base_output.py", "src/pipecat/transports/daily/transport.py", "src/pipecat/transports/lemonslice/transport.py", "src/pipecat/transports/livekit/transport.py", "src/pipecat/transports/smallwebrtc/connection.py", "src/pipecat/transports/smallwebrtc/transport.py", "src/pipecat/transports/tavus/transport.py", - "src/pipecat/transports/websocket/client.py", - "src/pipecat/transports/websocket/server.py", - "src/pipecat/transports/whatsapp/client.py" + "src/pipecat/transports/websocket/server.py" ], "reportMissingImports": false } diff --git a/src/pipecat/services/deepgram/flux/base.py b/src/pipecat/services/deepgram/flux/base.py index 173da5dc7..d2a8a24cd 100644 --- a/src/pipecat/services/deepgram/flux/base.py +++ b/src/pipecat/services/deepgram/flux/base.py @@ -536,6 +536,10 @@ class DeepgramFluxSTTBase(STTService): event = data.get("event") transcript = data.get("transcript", "") + if not isinstance(event, str): + logger.debug(f"Unhandled TurnInfo event (not a string): {event}") + return + try: flux_event_type = FluxEventType(event) except ValueError: @@ -648,7 +652,11 @@ class DeepgramFluxSTTBase(STTService): detected_language = self._primary_detected_language(data) min_confidence = assert_given(self._settings.min_confidence) - if not min_confidence or average_confidence > min_confidence: + # No threshold (None or 0.0) → accept. Otherwise require confidence + # data and compare; drop if data is missing. + if not min_confidence or ( + average_confidence is not None and average_confidence > min_confidence + ): # EndOfTurn means Flux has determined the turn is complete, # so this TranscriptionFrame is always finalized await self.push_frame( diff --git a/src/pipecat/services/elevenlabs/tts.py b/src/pipecat/services/elevenlabs/tts.py index 239e6a64e..fe8e7ab84 100644 --- a/src/pipecat/services/elevenlabs/tts.py +++ b/src/pipecat/services/elevenlabs/tts.py @@ -923,7 +923,7 @@ class ElevenLabsTTSService(WebsocketTTSService): self._partial_word_start_time = 0.0 # Initialize context with voice settings and pronunciation dictionaries - msg = {"text": " ", "context_id": context_id} + msg: dict[str, Any] = {"text": " ", "context_id": context_id} if self._voice_settings: msg["voice_settings"] = self._voice_settings if self._pronunciation_dictionary_locators: @@ -1268,7 +1268,7 @@ class ElevenLabsHttpTTSService(TTSService): url = f"{self._base_url}/v1/text-to-speech/{self._settings.voice}/stream/with-timestamps" model_id = assert_given(self._settings.model) - payload: dict[str, str | dict[str, float | bool]] = { + payload: dict[str, Any] = { "text": text, "model_id": model_id, } diff --git a/src/pipecat/services/google/gemini_live/vertex/llm.py b/src/pipecat/services/google/gemini_live/vertex/llm.py index 44ded852f..b02c18a60 100644 --- a/src/pipecat/services/google/gemini_live/vertex/llm.py +++ b/src/pipecat/services/google/gemini_live/vertex/llm.py @@ -174,11 +174,17 @@ class GeminiLiveVertexLLMService(GeminiLiveLLMService): default_settings.temperature = params.temperature default_settings.top_k = params.top_k default_settings.top_p = params.top_p - default_settings.modalities = params.modalities + # `params.modalities` and `params.media_resolution` are typed + # ` | None` on the deprecated InputParams, but None isn't + # a valid setting value (downstream uses call `.value` on + # them). Fall back to the canonical defaults. + default_settings.modalities = params.modalities or GeminiModalities.AUDIO default_settings.language = ( language_to_gemini_language(params.language) if params.language else "en-US" ) - default_settings.media_resolution = params.media_resolution + default_settings.media_resolution = ( + params.media_resolution or GeminiMediaResolution.UNSPECIFIED + ) default_settings.vad = params.vad default_settings.context_window_compression = ( params.context_window_compression.model_dump() @@ -233,7 +239,9 @@ class GeminiLiveVertexLLMService(GeminiLiveLLMService): ) @staticmethod - def _get_credentials(credentials: str | None, credentials_path: str | None) -> str: + def _get_credentials( + credentials: str | None, credentials_path: str | None + ) -> service_account.Credentials: """Retrieve Credentials using Google service account credentials JSON. Supports multiple authentication methods: @@ -246,7 +254,8 @@ class GeminiLiveVertexLLMService(GeminiLiveLLMService): credentials_path: Path to the service account JSON file. Returns: - OAuth token for API authentication. + A service-account ``Credentials`` object suitable for the Vertex + AI client (with its access token refreshed). Raises: ValueError: If no valid credentials are provided or found. diff --git a/src/pipecat/services/moondream/vision.py b/src/pipecat/services/moondream/vision.py index 01482df4b..49cb0ba34 100644 --- a/src/pipecat/services/moondream/vision.py +++ b/src/pipecat/services/moondream/vision.py @@ -153,9 +153,14 @@ class MoondreamService(VisionService): logger.debug(f"Analyzing image (bytes length: {len(frame.image)})") def get_image_description(image_bytes: bytes, text: str | None) -> str: + if frame.format is None: + raise ValueError("Cannot decode image bytes without a format") image = Image.frombytes(frame.format, frame.size, image_bytes) - image_embeds = self._model.encode_image(image) - description = self._model.query(image_embeds, text)["answer"] + # `encode_image` and `query` are custom methods provided by the + # moondream2 model code (via `trust_remote_code=True`) that pyright + # can't see on `AutoModelForCausalLM`'s base type. + image_embeds = self._model.encode_image(image) # pyright: ignore[reportCallIssue] + description = self._model.query(image_embeds, text)["answer"] # pyright: ignore[reportCallIssue] return description description = await asyncio.to_thread(get_image_description, frame.image, frame.text) diff --git a/src/pipecat/services/openai/stt.py b/src/pipecat/services/openai/stt.py index b7dddf441..6f635639a 100644 --- a/src/pipecat/services/openai/stt.py +++ b/src/pipecat/services/openai/stt.py @@ -18,7 +18,7 @@ import base64 import json from collections.abc import AsyncGenerator from dataclasses import dataclass, field -from typing import Any, Literal +from typing import Any, Literal, cast from loguru import logger @@ -475,6 +475,9 @@ class OpenAIRealtimeSTTService(WebsocketSTTService): async def _connect_websocket(self): """Establish the WebSocket connection to the transcription endpoint.""" try: + # `__init__` raises if websockets isn't installed, so these symbols + # are non-None by the time any method runs. + assert websocket_connect is not None and State is not None if self._websocket and self._websocket.state is State.OPEN: return @@ -534,7 +537,9 @@ class OpenAIRealtimeSTTService(WebsocketSTTService): """Send ``session.update`` to configure the transcription session.""" transcription: dict = {"model": self._settings.model} - language = assert_given(self._settings.language) + # Technically `_settings.language` could be a raw string, but Language + # is a StrEnum so downstream handles either. + language = cast("Language | None", assert_given(self._settings.language)) language_code = self._language_to_code(language) if language else None if language_code: transcription["language"] = language_code @@ -611,6 +616,10 @@ class OpenAIRealtimeSTTService(WebsocketSTTService): Called by ``WebsocketService._receive_task_handler`` which wraps this method with automatic reconnection on connection errors. """ + # `_connect` only starts the receive task after `_websocket` is set, + # and reconnects re-establish it before the next iteration, so this + # invariant should always hold when this method runs. + assert self._websocket is not None async for message in self._websocket: try: evt = json.loads(message) diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index d485129f4..3f1427627 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -771,13 +771,16 @@ class BaseOutputTransport(FrameProcessor): await self._bot_stopped_speaking() async def with_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]: + # Caller below only invokes this when `self._mixer` is set. + mixer = self._mixer + assert mixer is not None last_frame_time = 0 silence = b"\x00" * self._audio_chunk_size while True: try: frame = self._audio_queue.get_nowait() if isinstance(frame, OutputAudioRawFrame): - frame.audio = await self._mixer.mix(frame.audio) + frame.audio = await mixer.mix(frame.audio) last_frame_time = time.time() yield frame self._audio_queue.task_done() @@ -788,7 +791,7 @@ class BaseOutputTransport(FrameProcessor): await self._bot_stopped_speaking() # Generate an audio frame with only the mixer's part. frame = OutputAudioRawFrame( - audio=await self._mixer.mix(silence), + audio=await mixer.mix(silence), sample_rate=self._sample_rate, num_channels=self._params.audio_out_channels, ) @@ -927,6 +930,11 @@ class BaseOutputTransport(FrameProcessor): """ def resize_frame(frame: OutputImageRawFrame) -> OutputImageRawFrame: + # Without a format we can't decode the bytes, so leave the + # frame as-is and let the transport pass it through unchanged. + if frame.format is None: + return frame + desired_size = (self._params.video_out_width, self._params.video_out_height) # TODO: we should refactor in the future to support dynamic resolutions diff --git a/src/pipecat/transports/websocket/client.py b/src/pipecat/transports/websocket/client.py index 5665dfd23..9f2a43dbc 100644 --- a/src/pipecat/transports/websocket/client.py +++ b/src/pipecat/transports/websocket/client.py @@ -64,9 +64,18 @@ class WebsocketClientCallbacks(BaseModel): on_message: Called when a message is received from the WebSocket. """ - on_connected: Callable[[websockets.WebSocketClientProtocol], Awaitable[None]] - on_disconnected: Callable[[websockets.WebSocketClientProtocol], Awaitable[None]] - on_message: Callable[[websockets.WebSocketClientProtocol, websockets.Data], Awaitable[None]] + on_connected: Callable[ + [websockets.WebSocketClientProtocol], # pyright: ignore[reportAttributeAccessIssue] + Awaitable[None], + ] + on_disconnected: Callable[ + [websockets.WebSocketClientProtocol], # pyright: ignore[reportAttributeAccessIssue] + Awaitable[None], + ] + on_message: Callable[ + [websockets.WebSocketClientProtocol, websockets.Data], # pyright: ignore[reportAttributeAccessIssue] + Awaitable[None], + ] class WebsocketClientSession: @@ -98,7 +107,7 @@ class WebsocketClientSession: self._leave_counter = 0 self._task_manager: BaseTaskManager | None = None - self._websocket: websockets.WebSocketClientProtocol | None = None + self._websocket: websockets.WebSocketClientProtocol | None = None # pyright: ignore[reportAttributeAccessIssue] @property def task_manager(self) -> BaseTaskManager: @@ -192,6 +201,10 @@ class WebsocketClientSession: async def _client_task_handler(self): """Handle incoming messages from the WebSocket connection.""" + # `connect()` only starts this task after `_websocket` is assigned, and + # `disconnect()` cancels the task before clearing `_websocket`, so this + # invariant should always hold when this method runs. + assert self._websocket is not None try: # Handle incoming messages async for message in self._websocket: diff --git a/src/pipecat/transports/whatsapp/client.py b/src/pipecat/transports/whatsapp/client.py index 8f479520f..171a79247 100644 --- a/src/pipecat/transports/whatsapp/client.py +++ b/src/pipecat/transports/whatsapp/client.py @@ -154,8 +154,17 @@ class WhatsAppClient: return int(challenge) - async def _validate_whatsapp_webhook_request(self, raw_body: bytes, sha256_signature: str): + async def _validate_whatsapp_webhook_request( + self, raw_body: bytes | None, sha256_signature: str | None + ): """Common handler for both /start and /connect endpoints.""" + # Callers gate on `self._whatsapp_secret`, so the assert holds. + assert self._whatsapp_secret is not None + if raw_body is None: + raise Exception("Missing raw request body") + if not sha256_signature: + raise Exception("Missing X-Hub-Signature-256 header") + # Compute HMAC SHA256 using your App Secret expected_signature = hmac.new( key=self._whatsapp_secret.encode("utf-8"), @@ -164,8 +173,6 @@ class WhatsAppClient: ).hexdigest() # Extract signature from header (strip 'sha256=' prefix) - if not sha256_signature: - raise Exception("Missing X-Hub-Signature-256 header") received_signature = sha256_signature.split("sha256=")[-1] # Compare signatures securely @@ -306,7 +313,12 @@ class WhatsAppClient: # Create and initialize WebRTC connection pipecat_connection = SmallWebRTCConnection(self._ice_servers) await pipecat_connection.initialize(sdp=call.session.sdp, type=call.session.sdp_type) - sdp_answer = pipecat_connection.get_answer().get("sdp") + answer = pipecat_connection.get_answer() + if answer is None: + raise RuntimeError("SmallWebRTC connection produced no SDP answer") + sdp_answer = answer.get("sdp") + if sdp_answer is None: + raise RuntimeError("SmallWebRTC SDP answer missing 'sdp' field") sdp_answer = self._filter_sdp_for_whatsapp(sdp_answer) logger.debug(f"SDP answer generated for call {call.id}")